/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.kstream.internals;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.Map;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.streams.kstream.Aggregator;
import org.apache.kafka.streams.kstream.Initializer;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Merger;
import org.apache.kafka.streams.kstream.SessionWindows;
import org.apache.kafka.streams.kstream.SlidingWindows;
import org.apache.kafka.streams.kstream.Window;
import org.apache.kafka.streams.kstream.Windows;
import org.apache.kafka.streams.kstream.internals.AbstractStream;
import org.apache.kafka.streams.kstream.internals.InternalStreamsBuilder;
import org.apache.kafka.streams.kstream.internals.KGroupedStreamImpl;
import org.apache.kafka.streams.kstream.internals.KStreamAggProcessorSupplier;
import org.apache.kafka.streams.kstream.internals.KStreamAggregate;
import org.apache.kafka.streams.kstream.internals.KStreamImpl;
import org.apache.kafka.streams.kstream.internals.KStreamSessionWindowAggregate;
import org.apache.kafka.streams.kstream.internals.KStreamSlidingWindowAggregate;
import org.apache.kafka.streams.kstream.internals.KStreamWindowAggregate;
import org.apache.kafka.streams.kstream.internals.KTableImpl;
import org.apache.kafka.streams.kstream.internals.KTablePassThrough;
import org.apache.kafka.streams.kstream.internals.NamedInternal;
import org.apache.kafka.streams.kstream.internals.graph.BaseRepartitionNode;
import org.apache.kafka.streams.kstream.internals.graph.GraphNode;
import org.apache.kafka.streams.kstream.internals.graph.OptimizableRepartitionNode;
import org.apache.kafka.streams.kstream.internals.graph.ProcessorGraphNode;
import org.apache.kafka.streams.kstream.internals.graph.ProcessorParameters;
import org.apache.kafka.streams.kstream.internals.graph.StatefulProcessorNode;
import org.apache.kafka.streams.processor.api.ProcessorSupplier;
import org.apache.kafka.streams.state.StoreBuilder;

class CogroupedStreamAggregateBuilder<K, VOut> {
    private final InternalStreamsBuilder builder;
    private final Map<KGroupedStreamImpl<K, ?>, GraphNode> parentNodes = new LinkedHashMap();

    CogroupedStreamAggregateBuilder(InternalStreamsBuilder builder) {
        this.builder = builder;
    }

    <KR> KTable<KR, VOut> build(Map<KGroupedStreamImpl<K, ?>, Aggregator<? super K, ? super Object, VOut>> groupPatterns, Initializer<VOut> initializer, NamedInternal named, StoreBuilder<?> storeBuilder, Serde<KR> keySerde, Serde<VOut> valueSerde, String queryableName) {
        this.processRepartitions(groupPatterns, storeBuilder);
        ArrayList<GraphNode> processors = new ArrayList<GraphNode>();
        ArrayList<KStreamAggProcessorSupplier> parentProcessors = new ArrayList<KStreamAggProcessorSupplier>();
        boolean stateCreated = false;
        int counter = 0;
        for (Map.Entry<KGroupedStreamImpl<K, ?>, Aggregator<K, Object, VOut>> kGroupedStream : groupPatterns.entrySet()) {
            KStreamAggregate<? super K, ? super Object, VOut> parentProcessor = new KStreamAggregate<K, Object, VOut>(storeBuilder.name(), initializer, kGroupedStream.getValue());
            parentProcessors.add(parentProcessor);
            StatefulProcessorNode<K, ?> statefulProcessorNode = this.getStatefulProcessorNode(named.suffixWithOrElseGet("-cogroup-agg-" + counter++, this.builder, "COGROUPKSTREAM-AGGREGATE-"), stateCreated, storeBuilder, parentProcessor);
            stateCreated = true;
            processors.add(statefulProcessorNode);
            this.builder.addGraphNode(this.parentNodes.get(kGroupedStream.getKey()), statefulProcessorNode);
        }
        return this.createTable(processors, parentProcessors, named, keySerde, valueSerde, queryableName, storeBuilder.name());
    }

    <KR, W extends Window> KTable<KR, VOut> build(Map<KGroupedStreamImpl<K, ?>, Aggregator<? super K, ? super Object, VOut>> groupPatterns, Initializer<VOut> initializer, NamedInternal named, StoreBuilder<?> storeBuilder, Serde<KR> keySerde, Serde<VOut> valueSerde, String queryableName, Windows<W> windows) {
        this.processRepartitions(groupPatterns, storeBuilder);
        ArrayList<GraphNode> processors = new ArrayList<GraphNode>();
        ArrayList<KStreamAggProcessorSupplier> parentProcessors = new ArrayList<KStreamAggProcessorSupplier>();
        boolean stateCreated = false;
        int counter = 0;
        for (Map.Entry<KGroupedStreamImpl<K, ?>, Aggregator<K, Object, VOut>> kGroupedStream : groupPatterns.entrySet()) {
            KStreamWindowAggregate<? super K, ? super Object, VOut, W> parentProcessor = new KStreamWindowAggregate<K, Object, VOut, W>(windows, storeBuilder.name(), initializer, kGroupedStream.getValue());
            parentProcessors.add(parentProcessor);
            StatefulProcessorNode<? super K, ? super Object> statefulProcessorNode = this.getStatefulProcessorNode(named.suffixWithOrElseGet("-cogroup-agg-" + counter++, this.builder, "COGROUPKSTREAM-AGGREGATE-"), stateCreated, storeBuilder, parentProcessor);
            stateCreated = true;
            processors.add(statefulProcessorNode);
            this.builder.addGraphNode(this.parentNodes.get(kGroupedStream.getKey()), statefulProcessorNode);
        }
        return this.createTable(processors, parentProcessors, named, keySerde, valueSerde, queryableName, storeBuilder.name());
    }

    <KR> KTable<KR, VOut> build(Map<KGroupedStreamImpl<K, ?>, Aggregator<? super K, ? super Object, VOut>> groupPatterns, Initializer<VOut> initializer, NamedInternal named, StoreBuilder<?> storeBuilder, Serde<KR> keySerde, Serde<VOut> valueSerde, String queryableName, SessionWindows sessionWindows, Merger<? super K, VOut> sessionMerger) {
        this.processRepartitions(groupPatterns, storeBuilder);
        ArrayList<GraphNode> processors = new ArrayList<GraphNode>();
        ArrayList<KStreamAggProcessorSupplier> parentProcessors = new ArrayList<KStreamAggProcessorSupplier>();
        boolean stateCreated = false;
        int counter = 0;
        for (Map.Entry<KGroupedStreamImpl<K, ?>, Aggregator<K, Object, VOut>> kGroupedStream : groupPatterns.entrySet()) {
            KStreamSessionWindowAggregate<? super K, ? super Object, VOut> parentProcessor = new KStreamSessionWindowAggregate<K, Object, VOut>(sessionWindows, storeBuilder.name(), initializer, kGroupedStream.getValue(), sessionMerger);
            parentProcessors.add(parentProcessor);
            StatefulProcessorNode<K, ?> statefulProcessorNode = this.getStatefulProcessorNode(named.suffixWithOrElseGet("-cogroup-agg-" + counter++, this.builder, "COGROUPKSTREAM-AGGREGATE-"), stateCreated, storeBuilder, parentProcessor);
            stateCreated = true;
            processors.add(statefulProcessorNode);
            this.builder.addGraphNode(this.parentNodes.get(kGroupedStream.getKey()), statefulProcessorNode);
        }
        return this.createTable(processors, parentProcessors, named, keySerde, valueSerde, queryableName, storeBuilder.name());
    }

    <KR> KTable<KR, VOut> build(Map<KGroupedStreamImpl<K, ?>, Aggregator<? super K, ? super Object, VOut>> groupPatterns, Initializer<VOut> initializer, NamedInternal named, StoreBuilder<?> storeBuilder, Serde<KR> keySerde, Serde<VOut> valueSerde, String queryableName, SlidingWindows slidingWindows) {
        this.processRepartitions(groupPatterns, storeBuilder);
        ArrayList<KStreamAggProcessorSupplier> parentProcessors = new ArrayList<KStreamAggProcessorSupplier>();
        ArrayList<GraphNode> processors = new ArrayList<GraphNode>();
        boolean stateCreated = false;
        int counter = 0;
        for (Map.Entry<KGroupedStreamImpl<K, ?>, Aggregator<K, Object, VOut>> kGroupedStream : groupPatterns.entrySet()) {
            KStreamSlidingWindowAggregate<? super K, ? super Object, VOut> parentProcessor = new KStreamSlidingWindowAggregate<K, Object, VOut>(slidingWindows, storeBuilder.name(), initializer, kGroupedStream.getValue());
            parentProcessors.add(parentProcessor);
            StatefulProcessorNode<K, ?> statefulProcessorNode = this.getStatefulProcessorNode(named.suffixWithOrElseGet("-cogroup-agg-" + counter++, this.builder, "COGROUPKSTREAM-AGGREGATE-"), stateCreated, storeBuilder, parentProcessor);
            stateCreated = true;
            processors.add(statefulProcessorNode);
            this.builder.addGraphNode(this.parentNodes.get(kGroupedStream.getKey()), statefulProcessorNode);
        }
        return this.createTable(processors, parentProcessors, named, keySerde, valueSerde, queryableName, storeBuilder.name());
    }

    private void processRepartitions(Map<KGroupedStreamImpl<K, ?>, Aggregator<? super K, ? super Object, VOut>> groupPatterns, StoreBuilder<?> storeBuilder) {
        for (KGroupedStreamImpl<K, ?> repartitionReqs : groupPatterns.keySet()) {
            if (repartitionReqs.repartitionRequired) {
                OptimizableRepartitionNode.OptimizableRepartitionNodeBuilder repartitionNodeBuilder = OptimizableRepartitionNode.optimizableRepartitionNodeBuilder();
                String repartitionNamePrefix = repartitionReqs.userProvidedRepartitionTopicName != null ? repartitionReqs.userProvidedRepartitionTopicName : storeBuilder.name();
                this.createRepartitionSource(repartitionNamePrefix, repartitionNodeBuilder, repartitionReqs.keySerde, repartitionReqs.valueSerde);
                if (this.parentNodes.containsKey(repartitionReqs)) continue;
                BaseRepartitionNode repartitionNode = repartitionNodeBuilder.build();
                this.builder.addGraphNode(repartitionReqs.graphNode, (GraphNode)repartitionNode);
                this.parentNodes.put(repartitionReqs, repartitionNode);
                continue;
            }
            this.parentNodes.put(repartitionReqs, repartitionReqs.graphNode);
        }
        ArrayList groupedStreams = new ArrayList(this.parentNodes.keySet());
        AbstractStream kGrouped = (AbstractStream)groupedStreams.iterator().next();
        groupedStreams.remove(kGrouped);
        kGrouped.ensureCopartitionWith(groupedStreams);
    }

    <KR, VIn> KTable<KR, VOut> createTable(Collection<GraphNode> processors, Collection<KStreamAggProcessorSupplier> parentProcessors, NamedInternal named, Serde<KR> keySerde, Serde<VOut> valueSerde, String queryableName, String storeName) {
        String mergeProcessorName = named.suffixWithOrElseGet("-cogroup-merge", this.builder, "COGROUPKSTREAM-MERGE-");
        KTablePassThrough passThrough = new KTablePassThrough(parentProcessors, storeName);
        ProcessorParameters processorParameters = new ProcessorParameters(passThrough, mergeProcessorName);
        ProcessorGraphNode mergeNode = new ProcessorGraphNode(mergeProcessorName, processorParameters);
        this.builder.addGraphNode(processors, mergeNode);
        return new KTableImpl(mergeProcessorName, keySerde, valueSerde, Collections.singleton(mergeNode.nodeName()), queryableName, passThrough, mergeNode, this.builder);
    }

    private StatefulProcessorNode<K, ?> getStatefulProcessorNode(String processorName, boolean stateCreated, StoreBuilder<?> storeBuilder, ProcessorSupplier<K, ?, K, ?> kStreamAggregate) {
        StatefulProcessorNode statefulProcessorNode = !stateCreated ? new StatefulProcessorNode(processorName, new ProcessorParameters(kStreamAggregate, processorName), storeBuilder) : new StatefulProcessorNode(processorName, new ProcessorParameters(kStreamAggregate, processorName), new String[]{storeBuilder.name()});
        return statefulProcessorNode;
    }

    private <VIn> void createRepartitionSource(String repartitionTopicNamePrefix, OptimizableRepartitionNode.OptimizableRepartitionNodeBuilder<K, ?> optimizableRepartitionNodeBuilder, Serde<K> keySerde, Serde<?> valueSerde) {
        KStreamImpl.createRepartitionedSource(this.builder, keySerde, valueSerde, repartitionTopicNamePrefix, null, optimizableRepartitionNodeBuilder);
    }
}

