/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.kstream.internals.foreignkeyjoin;

import java.util.Arrays;
import java.util.function.Function;
import java.util.function.Supplier;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.streams.kstream.internals.Change;
import org.apache.kafka.streams.kstream.internals.foreignkeyjoin.SubscriptionWrapper;
import org.apache.kafka.streams.processor.api.ContextualProcessor;
import org.apache.kafka.streams.processor.api.Processor;
import org.apache.kafka.streams.processor.api.ProcessorContext;
import org.apache.kafka.streams.processor.api.ProcessorSupplier;
import org.apache.kafka.streams.processor.api.Record;
import org.apache.kafka.streams.processor.api.RecordMetadata;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
import org.apache.kafka.streams.processor.internals.metrics.TaskMetrics;
import org.apache.kafka.streams.state.internals.Murmur3;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ForeignJoinSubscriptionSendProcessorSupplier<K, KO, V>
implements ProcessorSupplier<K, Change<V>, KO, SubscriptionWrapper<K>> {
    private static final Logger LOG = LoggerFactory.getLogger(ForeignJoinSubscriptionSendProcessorSupplier.class);
    private final Function<V, KO> foreignKeyExtractor;
    private final Supplier<String> foreignKeySerdeTopicSupplier;
    private final Supplier<String> valueSerdeTopicSupplier;
    private final boolean leftJoin;
    private Serializer<KO> foreignKeySerializer;
    private Serializer<V> valueSerializer;

    public ForeignJoinSubscriptionSendProcessorSupplier(Function<V, KO> foreignKeyExtractor, Supplier<String> foreignKeySerdeTopicSupplier, Supplier<String> valueSerdeTopicSupplier, Serde<KO> foreignKeySerde, Serializer<V> valueSerializer, boolean leftJoin) {
        this.foreignKeyExtractor = foreignKeyExtractor;
        this.foreignKeySerdeTopicSupplier = foreignKeySerdeTopicSupplier;
        this.valueSerdeTopicSupplier = valueSerdeTopicSupplier;
        this.valueSerializer = valueSerializer;
        this.leftJoin = leftJoin;
        this.foreignKeySerializer = foreignKeySerde == null ? null : foreignKeySerde.serializer();
    }

    @Override
    public Processor<K, Change<V>, KO, SubscriptionWrapper<K>> get() {
        return new UnbindChangeProcessor();
    }

    private class UnbindChangeProcessor
    extends ContextualProcessor<K, Change<V>, KO, SubscriptionWrapper<K>> {
        private Sensor droppedRecordsSensor;
        private String foreignKeySerdeTopic;
        private String valueSerdeTopic;

        private UnbindChangeProcessor() {
        }

        @Override
        public void init(ProcessorContext<KO, SubscriptionWrapper<K>> context) {
            super.init(context);
            this.foreignKeySerdeTopic = (String)ForeignJoinSubscriptionSendProcessorSupplier.this.foreignKeySerdeTopicSupplier.get();
            this.valueSerdeTopic = (String)ForeignJoinSubscriptionSendProcessorSupplier.this.valueSerdeTopicSupplier.get();
            if (ForeignJoinSubscriptionSendProcessorSupplier.this.foreignKeySerializer == null) {
                ForeignJoinSubscriptionSendProcessorSupplier.this.foreignKeySerializer = context.keySerde().serializer();
            }
            if (ForeignJoinSubscriptionSendProcessorSupplier.this.valueSerializer == null) {
                ForeignJoinSubscriptionSendProcessorSupplier.this.valueSerializer = context.valueSerde().serializer();
            }
            this.droppedRecordsSensor = TaskMetrics.droppedRecordsSensor(Thread.currentThread().getName(), context.taskId().toString(), (StreamsMetricsImpl)context.metrics());
        }

        @Override
        public void process(Record<K, Change<V>> record) {
            long[] currentHash;
            long[] lArray = currentHash = record.value().newValue == null ? null : Murmur3.hash128(ForeignJoinSubscriptionSendProcessorSupplier.this.valueSerializer.serialize(this.valueSerdeTopic, record.value().newValue));
            if (record.value().oldValue != null) {
                Object oldForeignKey = ForeignJoinSubscriptionSendProcessorSupplier.this.foreignKeyExtractor.apply(record.value().oldValue);
                if (oldForeignKey == null) {
                    if (this.context().recordMetadata().isPresent()) {
                        RecordMetadata recordMetadata = this.context().recordMetadata().get();
                        LOG.warn("Skipping record due to null foreign key. topic=[{}] partition=[{}] offset=[{}]", new Object[]{recordMetadata.topic(), recordMetadata.partition(), recordMetadata.offset()});
                    } else {
                        LOG.warn("Skipping record due to null foreign key. Topic, partition, and offset not known.");
                    }
                    this.droppedRecordsSensor.record();
                    return;
                }
                if (record.value().newValue != null) {
                    Object newForeignKey = ForeignJoinSubscriptionSendProcessorSupplier.this.foreignKeyExtractor.apply(record.value().newValue);
                    if (newForeignKey == null) {
                        if (this.context().recordMetadata().isPresent()) {
                            RecordMetadata recordMetadata = this.context().recordMetadata().get();
                            LOG.warn("Skipping record due to null foreign key. topic=[{}] partition=[{}] offset=[{}]", new Object[]{recordMetadata.topic(), recordMetadata.partition(), recordMetadata.offset()});
                        } else {
                            LOG.warn("Skipping record due to null foreign key. Topic, partition, and offset not known.");
                        }
                        this.droppedRecordsSensor.record();
                        return;
                    }
                    byte[] serialOldForeignKey = ForeignJoinSubscriptionSendProcessorSupplier.this.foreignKeySerializer.serialize(this.foreignKeySerdeTopic, oldForeignKey);
                    byte[] serialNewForeignKey = ForeignJoinSubscriptionSendProcessorSupplier.this.foreignKeySerializer.serialize(this.foreignKeySerdeTopic, newForeignKey);
                    if (!Arrays.equals(serialNewForeignKey, serialOldForeignKey)) {
                        this.context().forward(record.withKey(oldForeignKey).withValue(new SubscriptionWrapper(currentHash, SubscriptionWrapper.Instruction.DELETE_KEY_NO_PROPAGATE, record.key())));
                    }
                    this.context().forward(record.withKey(newForeignKey).withValue(new SubscriptionWrapper(currentHash, SubscriptionWrapper.Instruction.PROPAGATE_NULL_IF_NO_FK_VAL_AVAILABLE, record.key())));
                } else {
                    this.context().forward(record.withKey(oldForeignKey).withValue(new SubscriptionWrapper(currentHash, SubscriptionWrapper.Instruction.DELETE_KEY_AND_PROPAGATE, record.key())));
                }
            } else if (record.value().newValue != null) {
                SubscriptionWrapper.Instruction instruction = ForeignJoinSubscriptionSendProcessorSupplier.this.leftJoin ? SubscriptionWrapper.Instruction.PROPAGATE_NULL_IF_NO_FK_VAL_AVAILABLE : SubscriptionWrapper.Instruction.PROPAGATE_ONLY_IF_FK_VAL_AVAILABLE;
                Object newForeignKey = ForeignJoinSubscriptionSendProcessorSupplier.this.foreignKeyExtractor.apply(record.value().newValue);
                if (newForeignKey == null) {
                    if (this.context().recordMetadata().isPresent()) {
                        RecordMetadata recordMetadata = this.context().recordMetadata().get();
                        LOG.warn("Skipping record due to null foreign key. topic=[{}] partition=[{}] offset=[{}]", new Object[]{recordMetadata.topic(), recordMetadata.partition(), recordMetadata.offset()});
                    } else {
                        LOG.warn("Skipping record due to null foreign key. Topic, partition, and offset not known.");
                    }
                    this.droppedRecordsSensor.record();
                } else {
                    this.context().forward(record.withKey(newForeignKey).withValue(new SubscriptionWrapper(currentHash, instruction, record.key())));
                }
            }
        }
    }
}

