package org.apache.flink.table.store.table.sink;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.binary.BinaryRowData;
import org.apache.flink.table.store.file.operation.FileStoreWrite;
import org.apache.flink.table.store.file.writer.RecordWriter;
import org.apache.flink.util.concurrent.ExecutorThreadFactory;

/* loaded from: input_file:org/apache/flink/table/store/table/sink/AbstractTableWrite.class */
public abstract class AbstractTableWrite<T> implements TableWrite {
    private final FileStoreWrite<T> write;
    private final SinkRecordConverter recordConverter;
    private boolean overwrite = false;
    protected final Map<BinaryRowData, Map<Integer, RecordWriter<T>>> writers = new HashMap();
    private final ExecutorService compactExecutor = Executors.newSingleThreadScheduledExecutor(new ExecutorThreadFactory("compaction-thread"));

    /* JADX INFO: Access modifiers changed from: protected */
    public AbstractTableWrite(FileStoreWrite<T> fileStoreWrite, SinkRecordConverter sinkRecordConverter) {
        this.write = fileStoreWrite;
        this.recordConverter = sinkRecordConverter;
    }

    @Override // org.apache.flink.table.store.table.sink.TableWrite
    public TableWrite withOverwrite(boolean z) {
        this.overwrite = z;
        return this;
    }

    @Override // org.apache.flink.table.store.table.sink.TableWrite
    public SinkRecordConverter recordConverter() {
        return this.recordConverter;
    }

    @Override // org.apache.flink.table.store.table.sink.TableWrite
    public SinkRecord write(RowData rowData) throws Exception {
        SinkRecord convert = this.recordConverter.convert(rowData);
        writeSinkRecord(convert, getWriter(convert.partition(), convert.bucket()));
        return convert;
    }

    @Override // org.apache.flink.table.store.table.sink.TableWrite
    public List<FileCommittable> prepareCommit(boolean z) throws Exception {
        ArrayList arrayList = new ArrayList();
        Iterator<Map.Entry<BinaryRowData, Map<Integer, RecordWriter<T>>>> it = this.writers.entrySet().iterator();
        while (it.hasNext()) {
            Map.Entry<BinaryRowData, Map<Integer, RecordWriter<T>>> next = it.next();
            BinaryRowData key = next.getKey();
            Iterator<Map.Entry<Integer, RecordWriter<T>>> it2 = next.getValue().entrySet().iterator();
            while (it2.hasNext()) {
                Map.Entry<Integer, RecordWriter<T>> next2 = it2.next();
                int intValue = next2.getKey().intValue();
                RecordWriter<T> value = next2.getValue();
                FileCommittable fileCommittable = new FileCommittable(key, intValue, value.prepareCommit(z));
                arrayList.add(fileCommittable);
                if (fileCommittable.increment().newFiles().isEmpty()) {
                    value.close();
                    it2.remove();
                }
            }
            if (next.getValue().isEmpty()) {
                it.remove();
            }
        }
        return arrayList;
    }

    @Override // org.apache.flink.table.store.table.sink.TableWrite
    public void close() throws Exception {
        Iterator<Map<Integer, RecordWriter<T>>> it = this.writers.values().iterator();
        while (it.hasNext()) {
            Iterator<RecordWriter<T>> it2 = it.next().values().iterator();
            while (it2.hasNext()) {
                it2.next().close();
            }
        }
        this.writers.clear();
        this.compactExecutor.shutdownNow();
    }

    @VisibleForTesting
    public Map<BinaryRowData, Map<Integer, RecordWriter<T>>> writers() {
        return this.writers;
    }

    protected abstract void writeSinkRecord(SinkRecord sinkRecord, RecordWriter<T> recordWriter) throws Exception;

    private RecordWriter<T> getWriter(BinaryRowData binaryRowData, int i) {
        Map<Integer, RecordWriter<T>> map = this.writers.get(binaryRowData);
        if (map == null) {
            map = new HashMap();
            this.writers.put(binaryRowData.copy(), map);
        }
        return map.computeIfAbsent(Integer.valueOf(i), num -> {
            return createWriter(binaryRowData.copy(), i);
        });
    }

    private RecordWriter<T> createWriter(BinaryRowData binaryRowData, int i) {
        RecordWriter<T> createEmptyWriter = this.overwrite ? this.write.createEmptyWriter(binaryRowData.copy(), i, this.compactExecutor) : this.write.createWriter(binaryRowData.copy(), i, this.compactExecutor);
        notifyNewWriter(createEmptyWriter);
        return createEmptyWriter;
    }

    protected void notifyNewWriter(RecordWriter<T> recordWriter) {
    }
}
