/*
 * Decompiled with CFR 0.152.
 */
package com.avaya.pim.eventsdk.common;

import com.avaya.pim.eventsdk.common.EventSDKConfig;
import com.avaya.pim.eventsdk.consumer.ConsumerRebalanceImpl;
import com.avaya.pim.eventsdk.core.LoginContext;
import com.avaya.pim.eventsdk.exception.IPOMEventErrorCode;
import com.avaya.pim.eventsdk.exception.POMEventException;
import com.avaya.pim.eventsdk.logger.POMEventLoggerHolder;
import java.util.Properties;
import java.util.regex.Pattern;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.KafkaConsumer;

public class KafkaUtil {
    public static final String DEFAULT_ORG_NAME = "Default";
    public static final String HEARTBEAT_TOPIC_NAME = "POM.HEARTBEAT";
    public static final String JOB_TOPIC_NAME = "POM_JOB";
    public static final String JOB_STATISTCIS_TOPIC_NAME = "POM_JOB_STATISTICS";
    public static final String AGENT_TOPIC_NAME = "POM_AGENT";
    public static final String AGENT_STATISTCIS_TOPIC_NAME = "POM_AGENT_STATISTICS";
    public static final String ENCRICHED_ATTEMPT_RESULT = "POM_ENRICHED_ATTEMPT_RESULT";
    public static final String ATTEMPT = "POM_ATTEMPT";
    public static final String HEARTBEAT_EVENT_CONSUMER_THREAD = "HeartBeatEventConsumerThread";
    public static final String JOB_EVENT_CONSUMER_THREAD = "JobEventConsumerThread";
    public static final String JOB_STATISTICS_EVENT_CONSUMER_THREAD = "JobStatisticsEventConsumerThread";
    public static final String ATTEMPT_CONSUMER_THREAD = "AttemptConsumerThread";
    public static final String ENRICHED_ATTEMPT_CONSUMER_THREAD = "EnrichedAttemptConsumerThread";
    public static final String AGENT_STATISTICS_EVENT_CONSUMER_THREAD = "AgentStatisticsEventConsumerThread";
    public static final String IMPORT_STATISTICS_CONSUMER_THREAD = "ImportStatisticsConsumerThread";
    public static final String INBOUND_SKILL_CONSUMER_THREAD = "InboundSkillConsumerThread";
    public static final String CCTREND_EVENT_CONSUMER_THREAD = "CCTrendEventConsumerThread";
    private static final String KAFKA_BOOTSTRAP_SERVERS_PROP_NAME = "bootstrap.servers";
    private static final String KAFKA_CONSUMER_GROUP_ID = "group.id";
    private static final String SEPERATOR = "_";
    public static final String IMPORT_TOPIC_NAME = "IMPORTSTATISTICS";
    public static final String INBOUND_SKILL_TOPIC_NAME = "INBOUNDSKILL";
    public static final String CCTREND_TOPIC_NAME = "CCTRENDS";

    private static String getTopicName(String tenantName, String topicName) {
        if (tenantName == null) {
            return "Default_" + topicName;
        }
        return tenantName + SEPERATOR + topicName;
    }

    public static String getJobKafkaTopicName(String tenantName) {
        return KafkaUtil.getTopicName(tenantName, JOB_TOPIC_NAME);
    }

    public static String getJobStatisticsKafkaTopicName(String tenantName) {
        return KafkaUtil.getTopicName(tenantName, JOB_STATISTCIS_TOPIC_NAME);
    }

    public static String getAgentKafkaTopicName(String tenantName) {
        return KafkaUtil.getTopicName(tenantName, AGENT_TOPIC_NAME);
    }

    public static String getAgentStatisticsKafkaTopicName(String tenantName) {
        return KafkaUtil.getTopicName(tenantName, AGENT_STATISTCIS_TOPIC_NAME);
    }

    public static String getAttemptResultKafkaTopicName(String tenantName) {
        return KafkaUtil.getTopicName(tenantName, ATTEMPT);
    }

    public static String getEnrichedAttemptResultKafkaTopicName(String tenantName) {
        return KafkaUtil.getTopicName(tenantName, ENCRICHED_ATTEMPT_RESULT);
    }

    public static KafkaConsumer<String, String> getKafkaConsumer(String eventName) throws POMEventException {
        KafkaConsumer kafkaConsumer = null;
        try {
            Properties configProperties = EventSDKConfig.getInstance().getKafkaConsumerProperties();
            configProperties.put(KAFKA_CONSUMER_GROUP_ID, EventSDKConfig.getInstance().getGroupId());
            configProperties.put(KAFKA_BOOTSTRAP_SERVERS_PROP_NAME, EventSDKConfig.getInstance().getKafkaServerString());
            kafkaConsumer = new KafkaConsumer(configProperties);
            Pattern eventSubscriptionPattern = HEARTBEAT_TOPIC_NAME.equals(eventName) ? Pattern.compile(eventName) : KafkaUtil.getSubscriptionPattern(LoginContext.getInstance().getOrgName(), LoginContext.getInstance().getTopicsMap().get(eventName));
            POMEventLoggerHolder.getInstance().getLogger().fine("Consumer started with " + configProperties + " on topic " + eventSubscriptionPattern.toString());
            kafkaConsumer.subscribe(eventSubscriptionPattern, (ConsumerRebalanceListener)new ConsumerRebalanceImpl((Consumer<?, ?>)kafkaConsumer, EventSDKConfig.getInstance().isMessageReceiveAfterStart()));
        }
        catch (Exception e) {
            POMEventLoggerHolder.getInstance().getLogger().error("getKafkaConsumer", e);
            throw new POMEventException(IPOMEventErrorCode.UNABLE_TO_CONNECT_TO_MESSAGE_BUS, "Unable to connect to kafka");
        }
        return kafkaConsumer;
    }

    private static Pattern getSubscriptionPattern(String orgName, String eventName) {
        if (DEFAULT_ORG_NAME.equals(orgName) || eventName.endsWith(INBOUND_SKILL_TOPIC_NAME)) {
            String orgPattern = eventName.replace("POM.Default.", "").replace("Default_", "");
            POMEventLoggerHolder.getInstance().getLogger().fine("Org Pattern is :" + orgPattern);
            return Pattern.compile(".*(" + orgPattern + ")$");
        }
        return Pattern.compile(eventName);
    }

    public static void closeConsumer(KafkaConsumer<String, String> kafkaConsumer) {
        if (kafkaConsumer == null) {
            return;
        }
        kafkaConsumer.close();
    }

    public static void main(String[] args) {
    }
}

