/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.coordinator.group.streams.topics;

import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Optional;
import java.util.OptionalInt;
import java.util.Set;
import java.util.SortedMap;
import java.util.TreeMap;
import java.util.TreeSet;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.kafka.common.message.CreateTopicsRequestData;
import org.apache.kafka.common.utils.ImplicitLinkedHashCollection;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.coordinator.group.generated.StreamsGroupTopologyValue;
import org.apache.kafka.coordinator.group.streams.StreamsTopology;
import org.apache.kafka.coordinator.group.streams.topics.ChangelogTopics;
import org.apache.kafka.coordinator.group.streams.topics.ConfiguredInternalTopic;
import org.apache.kafka.coordinator.group.streams.topics.ConfiguredSubtopology;
import org.apache.kafka.coordinator.group.streams.topics.ConfiguredTopology;
import org.apache.kafka.coordinator.group.streams.topics.CopartitionedTopicsEnforcer;
import org.apache.kafka.coordinator.group.streams.topics.RepartitionTopics;
import org.apache.kafka.coordinator.group.streams.topics.TopicConfigurationException;
import org.apache.kafka.image.TopicImage;
import org.apache.kafka.image.TopicsImage;
import org.slf4j.Logger;

public class InternalTopicManager {
    public static ConfiguredTopology configureTopics(LogContext logContext, long metadataHash, StreamsTopology topology, TopicsImage topicsImage) {
        Logger log = logContext.logger(InternalTopicManager.class);
        Collection<StreamsGroupTopologyValue.Subtopology> subtopologies = topology.subtopologies().values();
        Map<String, Collection<Set<String>>> copartitionGroupsBySubtopology = subtopologies.stream().collect(Collectors.toMap(StreamsGroupTopologyValue.Subtopology::subtopologyId, InternalTopicManager::copartitionGroupsFromPersistedSubtopology));
        try {
            Optional<Object> topicConfigurationException = Optional.empty();
            InternalTopicManager.throwOnMissingSourceTopics(topology, topicsImage);
            Map<String, Integer> decidedPartitionCountsForInternalTopics = InternalTopicManager.decidePartitionCounts(logContext, topology, topicsImage, copartitionGroupsBySubtopology, log);
            SortedMap configuredSubtopologies = subtopologies.stream().collect(Collectors.toMap(StreamsGroupTopologyValue.Subtopology::subtopologyId, x -> InternalTopicManager.fromPersistedSubtopology(x, topicsImage, decidedPartitionCountsForInternalTopics), (v1, v2) -> {
                throw new RuntimeException(String.format("Duplicate key for values %s and %s", v1, v2));
            }, TreeMap::new));
            Map<String, CreateTopicsRequestData.CreatableTopic> internalTopicsToCreate = InternalTopicManager.missingInternalTopics(configuredSubtopologies, topology, topicsImage);
            if (!internalTopicsToCreate.isEmpty()) {
                topicConfigurationException = Optional.of(TopicConfigurationException.missingInternalTopics("Internal topics are missing: " + String.valueOf(internalTopicsToCreate.keySet())));
                log.info("Valid topic configuration found, but internal topics are missing for topology epoch {}: {}", (Object)topology.topologyEpoch(), (Object)((TopicConfigurationException)topicConfigurationException.get()).toString());
            } else {
                log.info("Valid topic configuration found, topology epoch {} is now initialized.", (Object)topology.topologyEpoch());
            }
            return new ConfiguredTopology(topology.topologyEpoch(), metadataHash, Optional.of(configuredSubtopologies), internalTopicsToCreate, topicConfigurationException);
        }
        catch (TopicConfigurationException e) {
            log.warn("Topic configuration failed for topology epoch {}: {} ", (Object)topology.topologyEpoch(), (Object)e.toString());
            return new ConfiguredTopology(topology.topologyEpoch(), metadataHash, Optional.empty(), Map.of(), Optional.of(e));
        }
    }

    private static void throwOnMissingSourceTopics(StreamsTopology topology, TopicsImage topicsImage) {
        TreeSet<String> sortedMissingTopics = new TreeSet<String>();
        for (StreamsGroupTopologyValue.Subtopology subtopology : topology.subtopologies().values()) {
            for (String sourceTopic : subtopology.sourceTopics()) {
                if (topicsImage.getTopic(sourceTopic) != null) continue;
                sortedMissingTopics.add(sourceTopic);
            }
        }
        if (!sortedMissingTopics.isEmpty()) {
            throw TopicConfigurationException.missingSourceTopics("Source topics " + String.join((CharSequence)", ", sortedMissingTopics) + " are missing.");
        }
    }

    private static Map<String, Integer> decidePartitionCounts(LogContext logContext, StreamsTopology topology, TopicsImage topicsImage, Map<String, Collection<Set<String>>> copartitionGroupsBySubtopology, Logger log) {
        HashMap<String, Integer> decidedPartitionCountsForInternalTopics = new HashMap<String, Integer>();
        Function<String, OptionalInt> topicPartitionCountProvider = topic -> InternalTopicManager.getPartitionCount(topicsImage, topic, decidedPartitionCountsForInternalTopics);
        RepartitionTopics repartitionTopics = new RepartitionTopics(logContext, topology.subtopologies().values(), topicPartitionCountProvider);
        CopartitionedTopicsEnforcer copartitionedTopicsEnforcer = new CopartitionedTopicsEnforcer(logContext, topicPartitionCountProvider);
        ChangelogTopics changelogTopics = new ChangelogTopics(logContext, topology.subtopologies().values(), topicPartitionCountProvider);
        decidedPartitionCountsForInternalTopics.putAll(repartitionTopics.setup());
        InternalTopicManager.enforceCopartitioning(topology, copartitionGroupsBySubtopology, decidedPartitionCountsForInternalTopics, copartitionedTopicsEnforcer);
        decidedPartitionCountsForInternalTopics.putAll(changelogTopics.setup());
        return decidedPartitionCountsForInternalTopics;
    }

    private static void enforceCopartitioning(StreamsTopology topology, Map<String, Collection<Set<String>>> copartitionGroupsBySubtopology, Map<String, Integer> decidedPartitionCountsForInternalTopics, CopartitionedTopicsEnforcer copartitionedTopicsEnforcer) {
        Set<String> fixedRepartitionTopics = topology.subtopologies().values().stream().flatMap(x -> x.repartitionSourceTopics().stream().filter(y -> y.partitions() != 0)).map(StreamsGroupTopologyValue.TopicInfo::name).collect(Collectors.toSet());
        Set<String> flexibleRepartitionTopics = topology.subtopologies().values().stream().flatMap(x -> x.repartitionSourceTopics().stream().filter(y -> y.partitions() == 0)).map(StreamsGroupTopologyValue.TopicInfo::name).collect(Collectors.toSet());
        for (Collection<Set<String>> copartitionGroups : copartitionGroupsBySubtopology.values()) {
            for (Set<String> copartitionGroup : copartitionGroups) {
                decidedPartitionCountsForInternalTopics.putAll(copartitionedTopicsEnforcer.enforce(copartitionGroup, fixedRepartitionTopics, flexibleRepartitionTopics));
            }
        }
    }

    private static Map<String, CreateTopicsRequestData.CreatableTopic> missingInternalTopics(Map<String, ConfiguredSubtopology> subtopologyMap, StreamsTopology topology, TopicsImage topicsImage) {
        HashMap<String, CreateTopicsRequestData.CreatableTopic> topicsToCreate = new HashMap<String, CreateTopicsRequestData.CreatableTopic>();
        for (ConfiguredSubtopology subtopology : subtopologyMap.values()) {
            subtopology.repartitionSourceTopics().values().forEach(x -> topicsToCreate.put(x.name(), InternalTopicManager.toCreatableTopic(x)));
            subtopology.stateChangelogTopics().values().forEach(x -> topicsToCreate.put(x.name(), InternalTopicManager.toCreatableTopic(x)));
        }
        for (String topic : topology.requiredTopics()) {
            CreateTopicsRequestData.CreatableTopic expectedTopic;
            TopicImage topicImage = topicsImage.getTopic(topic);
            if (topicImage == null || (expectedTopic = (CreateTopicsRequestData.CreatableTopic)topicsToCreate.remove(topic)) == null || topicImage.partitions().size() == expectedTopic.numPartitions()) continue;
            throw TopicConfigurationException.incorrectlyPartitionedTopics("Existing topic " + topic + " has different number of partitions: expected " + expectedTopic.numPartitions() + ", found " + topicImage.partitions().size());
        }
        return topicsToCreate;
    }

    private static OptionalInt getPartitionCount(TopicsImage topicsImage, String topic, Map<String, Integer> decidedPartitionCountsForInternalTopics) {
        TopicImage topicImage = topicsImage.getTopic(topic);
        if (topicImage == null) {
            if (decidedPartitionCountsForInternalTopics.containsKey(topic)) {
                return OptionalInt.of(decidedPartitionCountsForInternalTopics.get(topic));
            }
            return OptionalInt.empty();
        }
        return OptionalInt.of(topicImage.partitions().size());
    }

    private static CreateTopicsRequestData.CreatableTopic toCreatableTopic(ConfiguredInternalTopic config) {
        CreateTopicsRequestData.CreatableTopic creatableTopic = new CreateTopicsRequestData.CreatableTopic();
        creatableTopic.setName(config.name());
        creatableTopic.setNumPartitions(config.numberOfPartitions());
        if (config.replicationFactor().isPresent() && config.replicationFactor().get() != 0) {
            creatableTopic.setReplicationFactor(config.replicationFactor().get().shortValue());
        } else {
            creatableTopic.setReplicationFactor((short)-1);
        }
        CreateTopicsRequestData.CreatableTopicConfigCollection topicConfigs = new CreateTopicsRequestData.CreatableTopicConfigCollection();
        config.topicConfigs().forEach((k, v) -> {
            CreateTopicsRequestData.CreatableTopicConfig topicConfig = new CreateTopicsRequestData.CreatableTopicConfig();
            topicConfig.setName(k);
            topicConfig.setValue(v);
            topicConfigs.add((ImplicitLinkedHashCollection.Element)topicConfig);
        });
        creatableTopic.setConfigs(topicConfigs);
        return creatableTopic;
    }

    private static ConfiguredSubtopology fromPersistedSubtopology(StreamsGroupTopologyValue.Subtopology subtopology, TopicsImage topicsImage, Map<String, Integer> decidedPartitionCountsForInternalTopics) {
        return new ConfiguredSubtopology(InternalTopicManager.computeNumberOfTasks(subtopology, topicsImage, decidedPartitionCountsForInternalTopics), new HashSet<String>(subtopology.sourceTopics()), subtopology.repartitionSourceTopics().stream().map(x -> InternalTopicManager.fromPersistedTopicInfo(x, decidedPartitionCountsForInternalTopics)).collect(Collectors.toMap(ConfiguredInternalTopic::name, x -> x)), new HashSet<String>(subtopology.repartitionSinkTopics()), subtopology.stateChangelogTopics().stream().map(x -> InternalTopicManager.fromPersistedTopicInfo(x, decidedPartitionCountsForInternalTopics)).collect(Collectors.toMap(ConfiguredInternalTopic::name, x -> x)));
    }

    private static int computeNumberOfTasks(StreamsGroupTopologyValue.Subtopology subtopology, TopicsImage topicsImage, Map<String, Integer> decidedPartitionCountsForInternalTopics) {
        return Stream.concat(subtopology.sourceTopics().stream(), subtopology.repartitionSourceTopics().stream().map(StreamsGroupTopologyValue.TopicInfo::name)).map(topic -> InternalTopicManager.getPartitionCount(topicsImage, topic, decidedPartitionCountsForInternalTopics).orElseThrow(() -> new IllegalStateException("Number of partitions must be set for topic " + topic))).max(Integer::compareTo).orElseThrow(() -> new IllegalStateException("Subtopology does not contain any source topics"));
    }

    private static ConfiguredInternalTopic fromPersistedTopicInfo(StreamsGroupTopologyValue.TopicInfo topicInfo, Map<String, Integer> decidedPartitionCountsForInternalTopics) {
        if (topicInfo.partitions() == 0 && !decidedPartitionCountsForInternalTopics.containsKey(topicInfo.name())) {
            throw new IllegalStateException("Number of partitions must be set for topic " + topicInfo.name());
        }
        return new ConfiguredInternalTopic(topicInfo.name(), topicInfo.partitions() == 0 ? decidedPartitionCountsForInternalTopics.get(topicInfo.name()).intValue() : topicInfo.partitions(), topicInfo.replicationFactor() == 0 ? Optional.empty() : Optional.of(topicInfo.replicationFactor()), topicInfo.topicConfigs() != null ? topicInfo.topicConfigs().stream().collect(Collectors.toMap(StreamsGroupTopologyValue.TopicConfig::key, StreamsGroupTopologyValue.TopicConfig::value)) : Map.of());
    }

    private static Collection<Set<String>> copartitionGroupsFromPersistedSubtopology(StreamsGroupTopologyValue.Subtopology subtopology) {
        return subtopology.copartitionGroups().stream().map(copartitionGroup -> Stream.concat(copartitionGroup.sourceTopics().stream().map(i -> subtopology.sourceTopics().get(i.shortValue())), copartitionGroup.repartitionSourceTopics().stream().map(i -> subtopology.repartitionSourceTopics().get(i.shortValue()).name())).collect(Collectors.toSet())).toList();
    }
}

