package org.apache.carbondata.streaming;

import java.io.IOException;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.carbondata.common.logging.LogServiceFactory;
import org.apache.carbondata.core.constants.CarbonCommonConstants;
import org.apache.carbondata.core.datastore.impl.FileFactory;
import org.apache.carbondata.core.dictionary.server.DictionaryServer;
import org.apache.carbondata.core.dictionary.server.NonSecureDictionaryServer;
import org.apache.carbondata.core.dictionary.service.NonSecureDictionaryServiceProvider;
import org.apache.carbondata.core.locks.CarbonLockFactory;
import org.apache.carbondata.core.locks.ICarbonLock;
import org.apache.carbondata.core.locks.LockUsage;
import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
import org.apache.carbondata.core.util.CarbonProperties;
import org.apache.carbondata.core.util.path.CarbonTablePath;
import org.apache.carbondata.events.OperationContext;
import org.apache.carbondata.events.OperationListenerBus;
import org.apache.carbondata.processing.loading.events.LoadEvents;
import org.apache.carbondata.processing.loading.model.CarbonLoadModel;
import org.apache.carbondata.processing.loading.model.CarbonLoadModelBuilder;
import org.apache.carbondata.processing.loading.model.LoadOption;
import org.apache.carbondata.processing.util.CarbonBadRecordUtil;
import org.apache.carbondata.spark.dictionary.provider.SecureDictionaryServiceProvider;
import org.apache.carbondata.spark.dictionary.server.SecureDictionaryServer;
import org.apache.carbondata.streaming.segment.StreamSegment;
import org.apache.hadoop.conf.Configuration;
import org.apache.log4j.Logger;
import org.apache.spark.SparkConf;
import org.apache.spark.scheduler.SparkListener;
import org.apache.spark.scheduler.SparkListenerApplicationEnd;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.execution.streaming.CarbonAppendableStreamSink;
import org.apache.spark.sql.execution.streaming.Sink;
import scala.None$;
import scala.Option;
import scala.Predef$;
import scala.Some;
import scala.StringContext;
import scala.collection.JavaConverters$;
import scala.collection.MapLike;
import scala.collection.TraversableLike;
import scala.collection.TraversableOnce;
import scala.collection.immutable.Map;
import scala.collection.immutable.StringOps;
import scala.collection.mutable.Buffer$;
import scala.collection.mutable.StringBuilder;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;

/* compiled from: StreamSinkFactory.scala */
/* loaded from: input_file:org/apache/carbondata/streaming/StreamSinkFactory$.class */
public final class StreamSinkFactory$ {
    public static final StreamSinkFactory$ MODULE$ = null;
    private final Logger LOGGER;
    private final ConcurrentHashMap<String, ICarbonLock> locks;

    static {
        new StreamSinkFactory$();
    }

    public Logger LOGGER() {
        return this.LOGGER;
    }

    public ConcurrentHashMap<String, ICarbonLock> locks() {
        return this.locks;
    }

    public void lock(CarbonTable carbonTable) {
        ICarbonLock carbonLockObj = CarbonLockFactory.getCarbonLockObj(carbonTable.getAbsoluteTableIdentifier(), LockUsage.STREAMING_LOCK);
        if (!carbonLockObj.lockWithRetries()) {
            LOGGER().error(new StringBuilder().append("Not able to acquire the streaming lock for stream table:").append(carbonTable.getDatabaseName()).append(".").append(carbonTable.getTableName()).toString());
            throw new IOException(new StringBuilder().append("Not able to acquire the streaming lock for stream table: ").append(carbonTable.getDatabaseName()).append(".").append(carbonTable.getTableName()).toString());
        }
        locks().put(carbonTable.getTableUniqueName(), carbonLockObj);
        LOGGER().info(new StringBuilder().append("Acquired the streaming lock for stream table: ").append(carbonTable.getDatabaseName()).append(".").append(carbonTable.getTableName()).toString());
    }

    public void unLock(String str) {
        ICarbonLock remove = locks().remove(str);
        if (remove != null) {
            remove.unlock();
        }
    }

    public Sink createStreamTableSink(SparkSession sparkSession, Configuration configuration, CarbonTable carbonTable, Map<String, String> map) {
        lock(carbonTable);
        validateParameters(map);
        CarbonLoadModel buildCarbonLoadModelForStream = buildCarbonLoadModelForStream(sparkSession, configuration, carbonTable, map, "");
        OperationContext operationContext = new OperationContext();
        OperationListenerBus.getInstance().fireEvent(new LoadEvents.LoadTablePreExecutionEvent(carbonTable.getCarbonTableIdentifier(), buildCarbonLoadModelForStream), operationContext);
        String streamSegmentId = getStreamSegmentId(carbonTable);
        buildCarbonLoadModelForStream.setSegmentId(streamSegmentId);
        OperationListenerBus.getInstance().fireEvent(new LoadEvents.LoadMetadataEvent(carbonTable, false, (java.util.Map) JavaConverters$.MODULE$.mapAsJavaMapConverter(map).asJava()), operationContext);
        Option<DictionaryServer> startDictionaryServer = startDictionaryServer(sparkSession, carbonTable, buildCarbonLoadModelForStream);
        if (startDictionaryServer.isDefined()) {
            buildCarbonLoadModelForStream.setUseOnePass(true);
        } else {
            buildCarbonLoadModelForStream.setUseOnePass(false);
        }
        CarbonAppendableStreamSink carbonAppendableStreamSink = new CarbonAppendableStreamSink(sparkSession, carbonTable, streamSegmentId, map, buildCarbonLoadModelForStream, startDictionaryServer, operationContext);
        OperationListenerBus.getInstance().fireEvent(new LoadEvents.LoadTablePostExecutionEvent(carbonTable.getCarbonTableIdentifier(), buildCarbonLoadModelForStream), operationContext);
        return carbonAppendableStreamSink;
    }

    private void validateParameters(Map<String, String> map) {
        Option option = map.get(CarbonCommonConstants.HANDOFF_SIZE);
        if (option.isDefined()) {
            try {
                if (Long.parseLong((String) option.get()) < CarbonCommonConstants.HANDOFF_SIZE_MIN) {
                    new CarbonStreamException(new StringBuilder().append("carbon.streaming.segment.max.sizeshould be bigger than or equal ").append(BoxesRunTime.boxToLong(CarbonCommonConstants.HANDOFF_SIZE_MIN)).toString());
                }
            } catch (NumberFormatException unused) {
                new CarbonStreamException(new StringBuilder().append(CarbonCommonConstants.HANDOFF_SIZE).append(new StringContext(Predef$.MODULE$.wrapRefArray(new String[]{" ", " is an illegal number"})).s(Predef$.MODULE$.genericWrapArray(new Object[]{option}))).toString());
            }
        }
    }

    private String getStreamSegmentId(CarbonTable carbonTable) {
        String open = StreamSegment.open(carbonTable);
        String segmentPath = CarbonTablePath.getSegmentPath(carbonTable.getTablePath(), open);
        FileFactory.FileType fileType = FileFactory.getFileType(segmentPath);
        String metadataPath = CarbonTablePath.getMetadataPath(carbonTable.getTablePath());
        if (FileFactory.isFileExist(metadataPath, fileType)) {
            BoxedUnit boxedUnit = BoxedUnit.UNIT;
        } else {
            BoxesRunTime.boxToBoolean(FileFactory.mkdirs(metadataPath, fileType));
        }
        if (FileFactory.isFileExist(segmentPath, fileType)) {
            StreamSegment.recoverSegmentIfRequired(segmentPath);
            BoxedUnit boxedUnit2 = BoxedUnit.UNIT;
        } else {
            BoxesRunTime.boxToBoolean(FileFactory.mkdirs(segmentPath, fileType));
        }
        return open;
    }

    public Option<DictionaryServer> startDictionaryServer(SparkSession sparkSession, CarbonTable carbonTable, CarbonLoadModel carbonLoadModel) {
        Some some;
        boolean exists = ((TraversableOnce) JavaConverters$.MODULE$.asScalaBufferConverter(carbonTable.getAllDimensions()).asScala()).toList().exists(new StreamSinkFactory$$anonfun$1());
        String property = CarbonProperties.getInstance().getProperty(CarbonCommonConstants.CARBON_SECURE_DICTIONARY_SERVER, "true");
        SparkConf conf = sparkSession.sqlContext().sparkContext().getConf();
        String str = sparkSession.sqlContext().sparkContext().getConf().get("spark.driver.host");
        if (!exists) {
            some = None$.MODULE$;
        } else if (conf.get("spark.authenticate", "false").equalsIgnoreCase("true") && new StringOps(Predef$.MODULE$.augmentString(property)).toBoolean()) {
            final DictionaryServer secureDictionaryServer = SecureDictionaryServer.getInstance(conf, str.toString(), carbonLoadModel.getDictionaryServerPort(), carbonTable);
            carbonLoadModel.setDictionaryServerPort(secureDictionaryServer.getPort());
            carbonLoadModel.setDictionaryServerHost(secureDictionaryServer.getHost());
            carbonLoadModel.setDictionaryServerSecretKey(secureDictionaryServer.getSecretKey());
            carbonLoadModel.setDictionaryEncryptServerSecure(Predef$.MODULE$.boolean2Boolean(secureDictionaryServer.isEncryptSecureServer()));
            carbonLoadModel.setDictionaryServiceProvider(new SecureDictionaryServiceProvider());
            sparkSession.sparkContext().addSparkListener(new SparkListener(secureDictionaryServer) { // from class: org.apache.carbondata.streaming.StreamSinkFactory$$anon$1
                private final DictionaryServer dictionaryServer$1;

                public void onApplicationEnd(SparkListenerApplicationEnd sparkListenerApplicationEnd) {
                    this.dictionaryServer$1.shutdown();
                }

                {
                    this.dictionaryServer$1 = secureDictionaryServer;
                }
            });
            some = new Some(secureDictionaryServer);
        } else {
            final DictionaryServer nonSecureDictionaryServer = NonSecureDictionaryServer.getInstance(carbonLoadModel.getDictionaryServerPort(), carbonTable);
            carbonLoadModel.setDictionaryServerPort(nonSecureDictionaryServer.getPort());
            carbonLoadModel.setDictionaryServerHost(nonSecureDictionaryServer.getHost());
            carbonLoadModel.setDictionaryEncryptServerSecure(Predef$.MODULE$.boolean2Boolean(false));
            carbonLoadModel.setDictionaryServiceProvider(new NonSecureDictionaryServiceProvider(nonSecureDictionaryServer.getPort()));
            sparkSession.sparkContext().addSparkListener(new SparkListener(nonSecureDictionaryServer) { // from class: org.apache.carbondata.streaming.StreamSinkFactory$$anon$2
                private final DictionaryServer dictionaryServer$2;

                public void onApplicationEnd(SparkListenerApplicationEnd sparkListenerApplicationEnd) {
                    this.dictionaryServer$2.shutdown();
                }

                {
                    this.dictionaryServer$2 = nonSecureDictionaryServer;
                }
            });
            some = new Some(nonSecureDictionaryServer);
        }
        return some;
    }

    private CarbonLoadModel buildCarbonLoadModelForStream(SparkSession sparkSession, Configuration configuration, CarbonTable carbonTable, Map<String, String> map, String str) {
        CarbonProperties carbonProperties = CarbonProperties.getInstance();
        carbonProperties.addProperty("zookeeper.enable.lock", "false");
        java.util.Map<String, String> fillOptionWithDefaultValue = LoadOption.fillOptionWithDefaultValue((java.util.Map) JavaConverters$.MODULE$.mapAsJavaMapConverter(map).asJava());
        fillOptionWithDefaultValue.put(CarbonCommonConstants.SORT_SCOPE, "no_sort");
        if (map.get("fileheader").isEmpty()) {
            fillOptionWithDefaultValue.put("fileheader", ((TraversableOnce) ((TraversableLike) JavaConverters$.MODULE$.asScalaBufferConverter(carbonTable.getCreateOrderColumn(carbonTable.getTableName())).asScala()).map(new StreamSinkFactory$$anonfun$buildCarbonLoadModelForStream$1(), Buffer$.MODULE$.canBuildFrom())).mkString(","));
        } else {
            BoxedUnit boxedUnit = BoxedUnit.UNIT;
        }
        fillOptionWithDefaultValue.put("bad_record_path", CarbonBadRecordUtil.getBadRecordsPath((java.util.Map) JavaConverters$.MODULE$.mapAsJavaMapConverter(map).asJava(), carbonTable));
        CarbonLoadModel carbonLoadModel = new CarbonLoadModel();
        new CarbonLoadModelBuilder(carbonTable).build((java.util.Map) JavaConverters$.MODULE$.mapAsJavaMapConverter(map).asJava(), fillOptionWithDefaultValue, carbonLoadModel, configuration);
        carbonLoadModel.setSegmentId(str);
        String str2 = (String) map.getOrElse(CarbonCommonConstants.DICTIONARY_SERVER_PORT, new StreamSinkFactory$$anonfun$2(carbonProperties));
        carbonLoadModel.setDictionaryServerHost(sparkSession.sqlContext().sparkContext().getConf().get("spark.driver.host"));
        carbonLoadModel.setDictionaryServerPort(new StringOps(Predef$.MODULE$.augmentString(str2)).toInt());
        carbonLoadModel.setColumnCompressor((String) ((MapLike) JavaConverters$.MODULE$.mapAsScalaMapConverter(carbonTable.getTableInfo().getFactTable().getTableProperties()).asScala()).getOrElse(CarbonCommonConstants.COMPRESSOR, new StreamSinkFactory$$anonfun$3()));
        return carbonLoadModel;
    }

    private StreamSinkFactory$() {
        MODULE$ = this;
        this.LOGGER = LogServiceFactory.getLogService(getClass().getCanonicalName());
        this.locks = new ConcurrentHashMap<>();
    }
}
