package org.apache.flink.connector.file.src.impl;

import java.io.IOException;
import java.util.ArrayList;
import javax.annotation.Nullable;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.io.InputStreamFSInputWrapper;
import org.apache.flink.api.common.io.compression.InflaterInputStreamFactory;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.IllegalConfigurationException;
import org.apache.flink.configuration.MemorySize;
import org.apache.flink.connector.file.src.FileSourceSplit;
import org.apache.flink.connector.file.src.compression.StandardDeCompressors;
import org.apache.flink.connector.file.src.reader.BulkFormat;
import org.apache.flink.connector.file.src.reader.StreamFormat;
import org.apache.flink.connector.file.src.util.CheckpointedPosition;
import org.apache.flink.connector.file.src.util.IteratorResultIterator;
import org.apache.flink.connector.file.src.util.Utils;
import org.apache.flink.core.fs.FSDataInputStream;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
import org.apache.flink.util.IOUtils;
import org.apache.flink.util.MathUtils;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.function.ThrowingRunnable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Internal
/* loaded from: input_file:org/apache/flink/connector/file/src/impl/StreamFormatAdapter.class */
public final class StreamFormatAdapter<T> implements BulkFormat<T, FileSourceSplit> {
    private static final long serialVersionUID = 1;
    private static final Logger LOG;
    private final StreamFormat<T> streamFormat;
    static final /* synthetic */ boolean $assertionsDisabled;

    /* loaded from: input_file:org/apache/flink/connector/file/src/impl/StreamFormatAdapter$Reader.class */
    public static final class Reader<T> implements BulkFormat.Reader<T> {
        private final StreamFormat.Reader<T> reader;
        private final TrackingFsDataInputStream stream;
        private long lastOffset;
        private long lastRecordsAfterOffset;

        Reader(StreamFormat.Reader<T> reader, TrackingFsDataInputStream trackingFsDataInputStream, long j, long j2) {
            this.reader = (StreamFormat.Reader) Preconditions.checkNotNull(reader);
            this.stream = (TrackingFsDataInputStream) Preconditions.checkNotNull(trackingFsDataInputStream);
            this.lastOffset = j;
            this.lastRecordsAfterOffset = j2;
        }

        @Override // org.apache.flink.connector.file.src.reader.BulkFormat.Reader
        @Nullable
        public BulkFormat.RecordIterator<T> readBatch() throws IOException {
            T read;
            updateCheckpointedPosition();
            this.stream.newBatch();
            ArrayList arrayList = new ArrayList();
            while (this.stream.hasRemainingInBatch() && (read = this.reader.read()) != null) {
                arrayList.add(read);
            }
            if (arrayList.isEmpty()) {
                return null;
            }
            IteratorResultIterator iteratorResultIterator = new IteratorResultIterator(arrayList.iterator(), this.lastOffset, this.lastRecordsAfterOffset);
            this.lastRecordsAfterOffset += arrayList.size();
            return iteratorResultIterator;
        }

        @Override // org.apache.flink.connector.file.src.reader.BulkFormat.Reader, java.io.Closeable, java.lang.AutoCloseable
        public void close() throws IOException {
            try {
                this.reader.close();
            } finally {
                IOUtils.closeQuietly(this.stream);
            }
        }

        private void updateCheckpointedPosition() {
            CheckpointedPosition checkpointedPosition = this.reader.getCheckpointedPosition();
            if (checkpointedPosition != null) {
                this.lastOffset = checkpointedPosition.getOffset();
                this.lastRecordsAfterOffset = checkpointedPosition.getRecordsAfterOffset();
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/connector/file/src/impl/StreamFormatAdapter$TrackingFsDataInputStream.class */
    public static final class TrackingFsDataInputStream extends FSDataInputStream {
        private final FSDataInputStream stream;
        private final long fileLength;
        private final int batchSize;
        private int remainingInBatch;

        TrackingFsDataInputStream(FSDataInputStream fSDataInputStream, long j, int i) {
            Preconditions.checkArgument(j > 0);
            Preconditions.checkArgument(i > 0);
            this.stream = fSDataInputStream;
            this.fileLength = j;
            this.batchSize = i;
        }

        @Override // org.apache.flink.core.fs.FSDataInputStream
        public void seek(long j) throws IOException {
            this.stream.seek(j);
            this.remainingInBatch = 0;
        }

        @Override // org.apache.flink.core.fs.FSDataInputStream
        public long getPos() throws IOException {
            return this.stream.getPos();
        }

        @Override // java.io.InputStream
        public int read() throws IOException {
            this.remainingInBatch--;
            return this.stream.read();
        }

        @Override // java.io.InputStream
        public int read(byte[] bArr, int i, int i2) throws IOException {
            this.remainingInBatch -= i2;
            return this.stream.read(bArr, i, i2);
        }

        @Override // java.io.InputStream, java.io.Closeable, java.lang.AutoCloseable
        public void close() throws IOException {
            this.stream.close();
        }

        boolean hasRemainingInBatch() {
            return this.remainingInBatch > 0;
        }

        void newBatch() {
            this.remainingInBatch = this.batchSize;
        }

        long getFileLength() {
            return this.fileLength;
        }
    }

    public StreamFormatAdapter(StreamFormat<T> streamFormat) {
        this.streamFormat = (StreamFormat) Preconditions.checkNotNull(streamFormat);
    }

    @Override // org.apache.flink.connector.file.src.reader.BulkFormat
    /* renamed from: createReader */
    public BulkFormat.Reader<T> createReader2(Configuration configuration, FileSourceSplit fileSourceSplit) throws IOException {
        TrackingFsDataInputStream openStream = openStream(fileSourceSplit.path(), configuration, fileSourceSplit.offset());
        long offset = fileSourceSplit.offset() + fileSourceSplit.length();
        return (BulkFormat.Reader) Utils.doWithCleanupOnException(openStream, () -> {
            return new Reader(this.streamFormat.createReader(configuration, openStream, openStream.getFileLength(), offset), openStream, -1L, 0L);
        });
    }

    @Override // org.apache.flink.connector.file.src.reader.BulkFormat
    /* renamed from: restoreReader */
    public BulkFormat.Reader<T> restoreReader2(Configuration configuration, FileSourceSplit fileSourceSplit) throws IOException {
        if (!$assertionsDisabled && !fileSourceSplit.getReaderPosition().isPresent()) {
            throw new AssertionError();
        }
        CheckpointedPosition checkpointedPosition = fileSourceSplit.getReaderPosition().get();
        TrackingFsDataInputStream openStream = openStream(fileSourceSplit.path(), configuration, fileSourceSplit.offset());
        long offset = fileSourceSplit.offset() + fileSourceSplit.length();
        return (BulkFormat.Reader) Utils.doWithCleanupOnException(openStream, () -> {
            StreamFormat.Reader<T> createReader = checkpointedPosition.getOffset() == -1 ? this.streamFormat.createReader(configuration, openStream, openStream.getFileLength(), offset) : this.streamFormat.restoreReader(configuration, openStream, checkpointedPosition.getOffset(), openStream.getFileLength(), offset);
            Utils.doWithCleanupOnException(createReader, (ThrowingRunnable<IOException>) () -> {
                long recordsAfterOffset = checkpointedPosition.getRecordsAfterOffset();
                while (true) {
                    long j = recordsAfterOffset;
                    if (j <= 0 || createReader.read() == null) {
                        break;
                    } else {
                        recordsAfterOffset = j - 1;
                    }
                }
                if (LOG.isDebugEnabled()) {
                    LOG.debug("{} records have been skipped.", Long.valueOf(checkpointedPosition.getRecordsAfterOffset()));
                }
            });
            return new Reader(createReader, openStream, checkpointedPosition.getOffset(), checkpointedPosition.getRecordsAfterOffset());
        });
    }

    @Override // org.apache.flink.connector.file.src.reader.BulkFormat
    public boolean isSplittable() {
        return this.streamFormat.isSplittable();
    }

    @Override // org.apache.flink.connector.file.src.reader.BulkFormat, org.apache.flink.api.java.typeutils.ResultTypeQueryable
    /* renamed from: getProducedType */
    public TypeInformation<T> getProducedType2() {
        return this.streamFormat.getProducedType2();
    }

    private static TrackingFsDataInputStream openStream(Path path, Configuration configuration, long j) throws IOException {
        FileSystem fileSystem = path.getFileSystem();
        long len = fileSystem.getFileStatus(path).getLen();
        int checkedDownCast = MathUtils.checkedDownCast(((MemorySize) configuration.get(StreamFormat.FETCH_IO_SIZE)).getBytes());
        if (checkedDownCast <= 0) {
            throw new IllegalConfigurationException(String.format("The fetch size (%s) must be > 0, but is %d", StreamFormat.FETCH_IO_SIZE.key(), Integer.valueOf(checkedDownCast)));
        }
        InflaterInputStreamFactory<?> decompressorForFileName = StandardDeCompressors.getDecompressorForFileName(path.getPath());
        FSDataInputStream open = fileSystem.open(path);
        return (TrackingFsDataInputStream) Utils.doWithCleanupOnException(open, () -> {
            FSDataInputStream inputStreamFSInputWrapper = decompressorForFileName == null ? open : new InputStreamFSInputWrapper(decompressorForFileName.create(open));
            inputStreamFSInputWrapper.seek(j);
            return new TrackingFsDataInputStream(inputStreamFSInputWrapper, len, checkedDownCast);
        });
    }

    static {
        $assertionsDisabled = !StreamFormatAdapter.class.desiredAssertionStatus();
        LOG = LoggerFactory.getLogger(StreamFormatAdapter.class);
    }
}
