package org.apache.flink.table.store.file.operation;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.Callable;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.apache.flink.configuration.MemorySize;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.binary.BinaryRowData;
import org.apache.flink.table.store.file.Snapshot;
import org.apache.flink.table.store.file.data.DataFileMeta;
import org.apache.flink.table.store.file.manifest.FileKind;
import org.apache.flink.table.store.file.manifest.ManifestCommittable;
import org.apache.flink.table.store.file.manifest.ManifestEntry;
import org.apache.flink.table.store.file.manifest.ManifestFile;
import org.apache.flink.table.store.file.manifest.ManifestFileMeta;
import org.apache.flink.table.store.file.manifest.ManifestList;
import org.apache.flink.table.store.file.predicate.Predicate;
import org.apache.flink.table.store.file.predicate.PredicateConverter;
import org.apache.flink.table.store.file.utils.AtomicFileWriter;
import org.apache.flink.table.store.file.utils.FileStorePathFactory;
import org.apache.flink.table.store.file.utils.SnapshotManager;
import org.apache.flink.table.store.utils.RowDataToObjectArrayConverter;
import org.apache.flink.table.types.logical.RowType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/table/store/file/operation/FileStoreCommitImpl.class */
public class FileStoreCommitImpl implements FileStoreCommit {
    private static final Logger LOG = LoggerFactory.getLogger(FileStoreCommitImpl.class);
    private final long schemaId;
    private final String commitUser;
    private final RowType partitionType;
    private final RowDataToObjectArrayConverter partitionObjectConverter;
    private final FileStorePathFactory pathFactory;
    private final SnapshotManager snapshotManager;
    private final ManifestFile manifestFile;
    private final ManifestList manifestList;
    private final FileStoreScan scan;
    private final int numBucket;
    private final MemorySize manifestTargetSize;
    private final int manifestMergeMinCount;

    @Nullable
    private final Comparator<RowData> keyComparator;

    @Nullable
    private Lock lock = null;
    private boolean createEmptyCommit = false;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/table/store/file/operation/FileStoreCommitImpl$LevelIdentifier.class */
    public static class LevelIdentifier {
        private final BinaryRowData partition;
        private final int bucket;
        private final int level;

        private LevelIdentifier(BinaryRowData binaryRowData, int i, int i2) {
            this.partition = binaryRowData;
            this.bucket = i;
            this.level = i2;
        }

        public boolean equals(Object obj) {
            if (!(obj instanceof LevelIdentifier)) {
                return false;
            }
            LevelIdentifier levelIdentifier = (LevelIdentifier) obj;
            return Objects.equals(this.partition, levelIdentifier.partition) && this.bucket == levelIdentifier.bucket && this.level == levelIdentifier.level;
        }

        public int hashCode() {
            return Objects.hash(this.partition, Integer.valueOf(this.bucket), Integer.valueOf(this.level));
        }
    }

    public FileStoreCommitImpl(long j, String str, RowType rowType, FileStorePathFactory fileStorePathFactory, SnapshotManager snapshotManager, ManifestFile.Factory factory, ManifestList.Factory factory2, FileStoreScan fileStoreScan, int i, MemorySize memorySize, int i2, @Nullable Comparator<RowData> comparator) {
        this.schemaId = j;
        this.commitUser = str;
        this.partitionType = rowType;
        this.partitionObjectConverter = new RowDataToObjectArrayConverter(rowType);
        this.pathFactory = fileStorePathFactory;
        this.snapshotManager = snapshotManager;
        this.manifestFile = factory.create();
        this.manifestList = factory2.create();
        this.scan = fileStoreScan;
        this.numBucket = i;
        this.manifestTargetSize = memorySize;
        this.manifestMergeMinCount = i2;
        this.keyComparator = comparator;
    }

    @Override // org.apache.flink.table.store.file.operation.FileStoreCommit
    public FileStoreCommit withLock(Lock lock) {
        this.lock = lock;
        return this;
    }

    @Override // org.apache.flink.table.store.file.operation.FileStoreCommit
    public FileStoreCommit withCreateEmptyCommit(boolean z) {
        this.createEmptyCommit = z;
        return this;
    }

    @Override // org.apache.flink.table.store.file.operation.FileStoreCommit
    public List<ManifestCommittable> filterCommitted(List<ManifestCommittable> list) {
        Long latestSnapshotId;
        if (!list.isEmpty() && (latestSnapshotId = this.snapshotManager.latestSnapshotId()) != null) {
            LinkedHashMap linkedHashMap = new LinkedHashMap();
            for (ManifestCommittable manifestCommittable : list) {
                linkedHashMap.put(manifestCommittable.identifier(), manifestCommittable);
            }
            long longValue = latestSnapshotId.longValue();
            while (true) {
                long j = longValue;
                if (j < 1 || !this.snapshotManager.snapshotExists(j)) {
                    break;
                }
                Snapshot snapshot = this.snapshotManager.snapshot(j);
                if (this.commitUser.equals(snapshot.commitUser())) {
                    if (!linkedHashMap.containsKey(snapshot.commitIdentifier())) {
                        break;
                    }
                    linkedHashMap.remove(snapshot.commitIdentifier());
                }
                longValue = j - 1;
            }
            return new ArrayList(linkedHashMap.values());
        }
        return list;
    }

    @Override // org.apache.flink.table.store.file.operation.FileStoreCommit
    public void commit(ManifestCommittable manifestCommittable, Map<String, String> map) {
        if (LOG.isDebugEnabled()) {
            LOG.debug("Ready to commit\n" + manifestCommittable.toString());
        }
        List<ManifestEntry> collectChanges = collectChanges(manifestCommittable.newFiles(), FileKind.ADD);
        if (this.createEmptyCommit || !collectChanges.isEmpty()) {
            tryCommit(collectChanges, manifestCommittable.identifier(), manifestCommittable.logOffsets(), Snapshot.CommitKind.APPEND, false);
        }
        ArrayList arrayList = new ArrayList();
        arrayList.addAll(collectChanges(manifestCommittable.compactBefore(), FileKind.DELETE));
        arrayList.addAll(collectChanges(manifestCommittable.compactAfter(), FileKind.ADD));
        if (arrayList.isEmpty()) {
            return;
        }
        tryCommit(arrayList, manifestCommittable.identifier(), manifestCommittable.logOffsets(), Snapshot.CommitKind.COMPACT, true);
    }

    @Override // org.apache.flink.table.store.file.operation.FileStoreCommit
    public void overwrite(Map<String, String> map, ManifestCommittable manifestCommittable, Map<String, String> map2) {
        if (LOG.isDebugEnabled()) {
            LOG.debug("Ready to overwrite partition " + map.toString() + "\n" + manifestCommittable.toString());
        }
        List<ManifestEntry> collectChanges = collectChanges(manifestCommittable.newFiles(), FileKind.ADD);
        Predicate fromMap = PredicateConverter.fromMap(map, this.partitionType);
        if (fromMap != null) {
            for (ManifestEntry manifestEntry : collectChanges) {
                if (!fromMap.test(this.partitionObjectConverter.convert(manifestEntry.partition()))) {
                    throw new IllegalArgumentException("Trying to overwrite partition " + map + ", but the changes in " + this.pathFactory.getPartitionString(manifestEntry.partition()) + " does not belong to this partition");
                }
            }
        }
        tryOverwrite(fromMap, collectChanges, manifestCommittable.identifier(), manifestCommittable.logOffsets());
        ArrayList arrayList = new ArrayList();
        arrayList.addAll(collectChanges(manifestCommittable.compactBefore(), FileKind.DELETE));
        arrayList.addAll(collectChanges(manifestCommittable.compactAfter(), FileKind.ADD));
        if (arrayList.isEmpty()) {
            return;
        }
        tryCommit(arrayList, manifestCommittable.identifier(), manifestCommittable.logOffsets(), Snapshot.CommitKind.COMPACT, true);
    }

    private void tryCommit(List<ManifestEntry> list, String str, Map<Integer, Long> map, Snapshot.CommitKind commitKind, boolean z) {
        do {
        } while (!tryCommitOnce(list, str, map, commitKind, this.snapshotManager.latestSnapshotId(), z));
    }

    private void tryOverwrite(Predicate predicate, List<ManifestEntry> list, String str, Map<Integer, Long> map) {
        Long latestSnapshotId;
        ArrayList arrayList;
        do {
            latestSnapshotId = this.snapshotManager.latestSnapshotId();
            arrayList = new ArrayList();
            if (latestSnapshotId != null) {
                for (ManifestEntry manifestEntry : this.scan.withSnapshot(latestSnapshotId.longValue()).withPartitionFilter(predicate).plan().files()) {
                    arrayList.add(new ManifestEntry(FileKind.DELETE, manifestEntry.partition(), manifestEntry.bucket(), manifestEntry.totalBuckets(), manifestEntry.file()));
                }
            }
            arrayList.addAll(list);
        } while (!tryCommitOnce(arrayList, str, map, Snapshot.CommitKind.OVERWRITE, latestSnapshotId, false));
    }

    private List<ManifestEntry> collectChanges(Map<BinaryRowData, Map<Integer, List<DataFileMeta>>> map, FileKind fileKind) {
        ArrayList arrayList = new ArrayList();
        for (Map.Entry<BinaryRowData, Map<Integer, List<DataFileMeta>>> entry : map.entrySet()) {
            for (Map.Entry<Integer, List<DataFileMeta>> entry2 : entry.getValue().entrySet()) {
                arrayList.addAll((Collection) entry2.getValue().stream().map(dataFileMeta -> {
                    return new ManifestEntry(fileKind, (BinaryRowData) entry.getKey(), ((Integer) entry2.getKey()).intValue(), this.numBucket, dataFileMeta);
                }).collect(Collectors.toList()));
            }
        }
        return arrayList;
    }

    private boolean tryCommitOnce(List<ManifestEntry> list, String str, Map<Integer, Long> map, Snapshot.CommitKind commitKind, Long l, boolean z) {
        long longValue = l == null ? 1L : l.longValue() + 1;
        Path snapshotPath = this.snapshotManager.snapshotPath(longValue);
        if (LOG.isDebugEnabled()) {
            LOG.debug("Ready to commit changes to snapshot #" + longValue);
            Iterator<ManifestEntry> it = list.iterator();
            while (it.hasNext()) {
                LOG.debug("  * " + it.next().toString());
            }
        }
        Snapshot snapshot = null;
        if (l != null) {
            if (z) {
                noConflictsOrFail(l.longValue(), list);
            }
            snapshot = this.snapshotManager.snapshot(l.longValue());
        }
        String str2 = null;
        String str3 = null;
        List<ManifestFileMeta> arrayList = new ArrayList<>();
        List<ManifestFileMeta> arrayList2 = new ArrayList<>();
        if (snapshot != null) {
            try {
                arrayList.addAll(snapshot.readAllManifests(this.manifestList));
                Map<Integer, Long> logOffsets = snapshot.getLogOffsets();
                map.getClass();
                logOffsets.forEach((v1, v2) -> {
                    r1.putIfAbsent(v1, v2);
                });
            } catch (Throwable th) {
                cleanUpTmpManifests(str2, str3, arrayList, arrayList2);
                throw new RuntimeException(String.format("Exception occurs when preparing snapshot #%d (path %s) by user %s with hash %s and kind %s. Clean up.", Long.valueOf(longValue), snapshotPath.toString(), this.commitUser, str, commitKind.name()), th);
            }
        }
        arrayList2.addAll(ManifestFileMeta.merge(arrayList, this.manifestFile, this.manifestTargetSize.getBytes(), this.manifestMergeMinCount));
        str2 = this.manifestList.write(arrayList2);
        List<ManifestFileMeta> write = this.manifestFile.write(list);
        arrayList2.addAll(write);
        str3 = this.manifestList.write(write);
        Snapshot snapshot2 = new Snapshot(longValue, this.schemaId, str2, str3, this.commitUser, str, commitKind, System.currentTimeMillis(), map);
        try {
            FileSystem fileSystem = snapshotPath.getFileSystem();
            Callable callable = () -> {
                if (fileSystem.exists(snapshotPath)) {
                    return false;
                }
                boolean writeFileUtf8 = AtomicFileWriter.writeFileUtf8(snapshotPath, snapshot2.toJson());
                if (writeFileUtf8) {
                    this.snapshotManager.commitLatestHint(longValue);
                }
                return Boolean.valueOf(writeFileUtf8);
            };
            if (!(this.lock != null ? ((Boolean) this.lock.runWithLock(() -> {
                return Boolean.valueOf(!fileSystem.exists(snapshotPath) && ((Boolean) callable.call()).booleanValue());
            })).booleanValue() : ((Boolean) callable.call()).booleanValue())) {
                LOG.warn(String.format("Atomic commit failed for snapshot #%d (path %s) by user %s with identifier %s and kind %s. Clean up and try again.", Long.valueOf(longValue), snapshotPath, this.commitUser, str, commitKind.name()));
                cleanUpTmpManifests(str2, str3, arrayList, arrayList2);
                return false;
            }
            if (!LOG.isDebugEnabled()) {
                return true;
            }
            LOG.debug(String.format("Successfully commit snapshot #%d (path %s) by user %s with identifier %s and kind %s.", Long.valueOf(longValue), snapshotPath, this.commitUser, str, commitKind.name()));
            return true;
        } catch (Throwable th2) {
            throw new RuntimeException(String.format("Exception occurs when committing snapshot #%d (path %s) by user %s with identifier %s and kind %s. Cannot clean up because we can't determine the success.", Long.valueOf(longValue), snapshotPath, this.commitUser, str, commitKind.name()), th2);
        }
    }

    private void noConflictsOrFail(long j, List<ManifestEntry> list) {
        try {
            ArrayList arrayList = new ArrayList(this.scan.withSnapshot(j).withPartitionFilter((List<BinaryRowData>) list.stream().map((v0) -> {
                return v0.partition();
            }).distinct().collect(Collectors.toList())).plan().files());
            arrayList.addAll(list);
            try {
                Collection<ManifestEntry> mergeManifestEntries = ManifestEntry.mergeManifestEntries(arrayList);
                if (this.keyComparator == null) {
                    return;
                }
                HashMap hashMap = new HashMap();
                for (ManifestEntry manifestEntry : mergeManifestEntries) {
                    int level = manifestEntry.file().level();
                    if (level >= 1) {
                        ((List) hashMap.computeIfAbsent(new LevelIdentifier(manifestEntry.partition(), manifestEntry.bucket(), level), levelIdentifier -> {
                            return new ArrayList();
                        })).add(manifestEntry);
                    }
                }
                for (List list2 : hashMap.values()) {
                    list2.sort((manifestEntry2, manifestEntry3) -> {
                        return this.keyComparator.compare(manifestEntry2.file().minKey(), manifestEntry3.file().minKey());
                    });
                    for (int i = 0; i + 1 < list2.size(); i++) {
                        ManifestEntry manifestEntry4 = (ManifestEntry) list2.get(i);
                        ManifestEntry manifestEntry5 = (ManifestEntry) list2.get(i + 1);
                        if (this.keyComparator.compare(manifestEntry4.file().maxKey(), manifestEntry5.file().minKey()) >= 0) {
                            throw new RuntimeException("LSM conflicts detected! Give up committing compact changes. Conflict files are:\n" + manifestEntry4.identifier().toString(this.pathFactory) + "\n" + manifestEntry5.identifier().toString(this.pathFactory));
                        }
                    }
                }
            } catch (Throwable th) {
                throw new RuntimeException("File deletion conflicts detected! Give up committing compact changes.", th);
            }
        } catch (Throwable th2) {
            throw new RuntimeException("Cannot determine if conflicts exist.", th2);
        }
    }

    private void cleanUpTmpManifests(String str, String str2, List<ManifestFileMeta> list, List<ManifestFileMeta> list2) {
        if (str != null) {
            this.manifestList.delete(str);
        }
        if (str2 != null) {
            this.manifestList.delete(str2);
        }
        HashSet hashSet = new HashSet(list);
        for (ManifestFileMeta manifestFileMeta : list2) {
            if (!hashSet.contains(manifestFileMeta)) {
                this.manifestList.delete(manifestFileMeta.fileName());
            }
        }
    }
}
