/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.kstream.internals.graph;

import java.util.Arrays;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.streams.kstream.internals.Change;
import org.apache.kafka.streams.kstream.internals.KTableKTableAbstractJoin;
import org.apache.kafka.streams.kstream.internals.KTableKTableJoinMerger;
import org.apache.kafka.streams.kstream.internals.graph.BaseJoinProcessorNode;
import org.apache.kafka.streams.kstream.internals.graph.ProcessorParameters;
import org.apache.kafka.streams.kstream.internals.graph.VersionedSemanticsGraphNode;
import org.apache.kafka.streams.processor.api.ProcessorSupplier;
import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder;

public class KTableKTableJoinNode<K, V1, V2, VR>
extends BaseJoinProcessorNode<K, Change<V1>, Change<V2>, Change<VR>>
implements VersionedSemanticsGraphNode {
    private final Serde<K> keySerde;
    private final Serde<VR> valueSerde;
    private final String[] joinThisStoreNames;
    private final String[] joinOtherStoreNames;

    KTableKTableJoinNode(String nodeName, ProcessorParameters<K, Change<V1>, ?, ?> joinThisProcessorParameters, ProcessorParameters<K, Change<V2>, ?, ?> joinOtherProcessorParameters, ProcessorParameters<K, Change<VR>, ?, ?> joinMergeProcessorParameters, String thisJoinSide, String otherJoinSide, Serde<K> keySerde, Serde<VR> valueSerde, String[] joinThisStoreNames, String[] joinOtherStoreNames) {
        super(nodeName, null, joinThisProcessorParameters, joinOtherProcessorParameters, joinMergeProcessorParameters, thisJoinSide, otherJoinSide);
        this.keySerde = keySerde;
        this.valueSerde = valueSerde;
        this.joinThisStoreNames = joinThisStoreNames;
        this.joinOtherStoreNames = joinOtherStoreNames;
    }

    public Serde<K> keySerde() {
        return this.keySerde;
    }

    public Serde<VR> valueSerde() {
        return this.valueSerde;
    }

    public String[] joinThisStoreNames() {
        return this.joinThisStoreNames;
    }

    public String[] joinOtherStoreNames() {
        return this.joinOtherStoreNames;
    }

    public String queryableStoreName() {
        return this.joinMerger().queryableName();
    }

    public KTableKTableJoinMerger<K, VR> joinMerger() {
        ProcessorSupplier kChangeProcessorSupplier = this.mergeProcessorParameters().processorSupplier();
        return (KTableKTableJoinMerger)kChangeProcessorSupplier;
    }

    @Override
    public void enableVersionedSemantics(boolean useVersionedSemantics, String parentNodeName) {
        this.enableVersionedSemantics(this.thisProcessorParameters(), useVersionedSemantics, parentNodeName);
        this.enableVersionedSemantics(this.otherProcessorParameters(), useVersionedSemantics, parentNodeName);
    }

    private void enableVersionedSemantics(ProcessorParameters<K, ?, ?, ?> processorParameters, boolean useVersionedSemantics, String parentNodeName) {
        ProcessorSupplier<K, ?, ?, ?> processorSupplier = processorParameters.processorSupplier();
        if (!(processorSupplier instanceof KTableKTableAbstractJoin)) {
            throw new IllegalStateException("Unexpected processor type for table-table join: " + processorSupplier.getClass().getName());
        }
        KTableKTableAbstractJoin tableJoin = (KTableKTableAbstractJoin)processorSupplier;
        if (parentNodeName.equals(tableJoin.joinThisParentNodeName())) {
            tableJoin.setUseVersionedSemantics(useVersionedSemantics);
        }
    }

    @Override
    public void writeToTopology(InternalTopologyBuilder topologyBuilder) {
        String thisProcessorName = this.thisProcessorParameters().processorName();
        String otherProcessorName = this.otherProcessorParameters().processorName();
        this.thisProcessorParameters().addProcessorTo(topologyBuilder, this.thisJoinSideNodeName());
        this.otherProcessorParameters().addProcessorTo(topologyBuilder, this.otherJoinSideNodeName());
        this.mergeProcessorParameters().addProcessorTo(topologyBuilder, thisProcessorName, otherProcessorName);
        topologyBuilder.connectProcessorAndStateStores(thisProcessorName, this.joinOtherStoreNames);
        topologyBuilder.connectProcessorAndStateStores(otherProcessorName, this.joinThisStoreNames);
    }

    @Override
    public String toString() {
        return "KTableKTableJoinNode{joinThisStoreNames=" + Arrays.toString(this.joinThisStoreNames()) + ", joinOtherStoreNames=" + Arrays.toString(this.joinOtherStoreNames()) + "} " + super.toString();
    }

    public static <K, V1, V2, VR> KTableKTableJoinNodeBuilder<K, V1, V2, VR> kTableKTableJoinNodeBuilder() {
        return new KTableKTableJoinNodeBuilder();
    }

    public static final class KTableKTableJoinNodeBuilder<K, V1, V2, VR> {
        private String nodeName;
        private ProcessorParameters<K, Change<V1>, ?, ?> joinThisProcessorParameters;
        private ProcessorParameters<K, Change<V2>, ?, ?> joinOtherProcessorParameters;
        private String thisJoinSide;
        private String otherJoinSide;
        private Serde<K> keySerde;
        private Serde<VR> valueSerde;
        private String[] joinThisStoreNames;
        private String[] joinOtherStoreNames;
        private ProcessorParameters<K, Change<VR>, ?, ?> joinMergeProcessorParameters;

        private KTableKTableJoinNodeBuilder() {
        }

        public KTableKTableJoinNodeBuilder<K, V1, V2, VR> withNodeName(String nodeName) {
            this.nodeName = nodeName;
            return this;
        }

        public KTableKTableJoinNodeBuilder<K, V1, V2, VR> withJoinThisProcessorParameters(ProcessorParameters<K, Change<V1>, ?, ?> joinThisProcessorParameters) {
            this.joinThisProcessorParameters = joinThisProcessorParameters;
            return this;
        }

        public KTableKTableJoinNodeBuilder<K, V1, V2, VR> withJoinOtherProcessorParameters(ProcessorParameters<K, Change<V2>, ?, ?> joinOtherProcessorParameters) {
            this.joinOtherProcessorParameters = joinOtherProcessorParameters;
            return this;
        }

        public KTableKTableJoinNodeBuilder<K, V1, V2, VR> withThisJoinSideNodeName(String thisJoinSide) {
            this.thisJoinSide = thisJoinSide;
            return this;
        }

        public KTableKTableJoinNodeBuilder<K, V1, V2, VR> withOtherJoinSideNodeName(String otherJoinSide) {
            this.otherJoinSide = otherJoinSide;
            return this;
        }

        public KTableKTableJoinNodeBuilder<K, V1, V2, VR> withKeySerde(Serde<K> keySerde) {
            this.keySerde = keySerde;
            return this;
        }

        public KTableKTableJoinNodeBuilder<K, V1, V2, VR> withValueSerde(Serde<VR> valueSerde) {
            this.valueSerde = valueSerde;
            return this;
        }

        public KTableKTableJoinNodeBuilder<K, V1, V2, VR> withJoinThisStoreNames(String[] joinThisStoreNames) {
            this.joinThisStoreNames = joinThisStoreNames;
            return this;
        }

        public KTableKTableJoinNodeBuilder<K, V1, V2, VR> withJoinOtherStoreNames(String[] joinOtherStoreNames) {
            this.joinOtherStoreNames = joinOtherStoreNames;
            return this;
        }

        public KTableKTableJoinNodeBuilder<K, V1, V2, VR> withMergeProcessorParameters(ProcessorParameters<K, Change<VR>, ?, ?> joinMergeProcessorParameters) {
            this.joinMergeProcessorParameters = joinMergeProcessorParameters;
            return this;
        }

        public KTableKTableJoinNode<K, V1, V2, VR> build() {
            return new KTableKTableJoinNode<K, V1, V2, VR>(this.nodeName, this.joinThisProcessorParameters, this.joinOtherProcessorParameters, this.joinMergeProcessorParameters, this.thisJoinSide, this.otherJoinSide, this.keySerde, this.valueSerde, this.joinThisStoreNames, this.joinOtherStoreNames);
        }
    }
}

