package org.apache.flink.streaming.api.operators.collect;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.net.Socket;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.SortedMap;
import java.util.TreeMap;
import java.util.UUID;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.accumulators.SerializedListAccumulator;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.api.common.state.CheckpointListener;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.memory.DataInputViewStreamWrapper;
import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
import org.apache.flink.runtime.operators.coordination.OperatorEventGateway;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
import org.apache.flink.util.NetUtils;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/*  JADX ERROR: NullPointerException in pass: ClassModifier
    java.lang.NullPointerException
    */
@Internal
/* loaded from: input_file:org/apache/flink/streaming/api/operators/collect/CollectSinkFunction.class */
public class CollectSinkFunction<IN> extends RichSinkFunction<IN> implements CheckpointedFunction, CheckpointListener {
    private static final long serialVersionUID = 1;
    private static final Logger LOG = LoggerFactory.getLogger(CollectSinkFunction.class);
    private final TypeSerializer<IN> serializer;
    private final long maxBytesPerBatch;
    private final long bufferSizeLimitBytes;
    private final String accumulatorName;
    private transient OperatorEventGateway eventGateway;
    private transient LinkedList<byte[]> buffer;
    private transient long currentBufferBytes;
    private transient ReentrantLock bufferLock;
    private transient Condition bufferCanAddNextResultCondition;
    private transient long invokingRecordBytes;
    private transient String version;
    private transient long offset;
    private transient long lastCheckpointedOffset;
    private transient CollectSinkFunction<IN>.ServerThread serverThread;
    private transient ListState<byte[]> bufferState;
    private transient ListState<Long> offsetState;
    private transient SortedMap<Long, Long> uncompletedCheckpointMap;

    /* loaded from: input_file:org/apache/flink/streaming/api/operators/collect/CollectSinkFunction$ServerThread.class */
    private class ServerThread extends Thread {
        private final TypeSerializer<IN> serializer;
        private final ServerSocket serverSocket;
        private boolean running;
        private Socket connection;
        private DataInputViewStreamWrapper inStream;
        private DataOutputViewStreamWrapper outStream;

        private ServerThread(TypeSerializer<IN> typeSerializer) throws Exception {
            this.serializer = typeSerializer.duplicate2();
            this.serverSocket = new ServerSocket(0, 0, getBindAddress());
            this.running = true;
        }

        @Override // java.lang.Thread, java.lang.Runnable
        public void run() {
            while (this.running) {
                try {
                    if (this.connection == null) {
                        this.connection = NetUtils.acceptWithoutTimeout(this.serverSocket);
                        this.inStream = new DataInputViewStreamWrapper(this.connection.getInputStream());
                        this.outStream = new DataOutputViewStreamWrapper(this.connection.getOutputStream());
                        CollectSinkFunction.LOG.info("Coordinator connection received");
                    }
                    CollectCoordinationRequest collectCoordinationRequest = new CollectCoordinationRequest(this.inStream);
                    String version = collectCoordinationRequest.getVersion();
                    long offset = collectCoordinationRequest.getOffset();
                    if (CollectSinkFunction.LOG.isDebugEnabled()) {
                        CollectSinkFunction.LOG.debug("Request received, version = " + version + ", offset = " + offset);
                        CollectSinkFunction.LOG.debug("Expecting version = " + CollectSinkFunction.this.version + ", offset = " + CollectSinkFunction.this.offset);
                    }
                    if (CollectSinkFunction.this.version.equals(version) && offset >= CollectSinkFunction.this.offset) {
                        ArrayList arrayList = new ArrayList();
                        CollectSinkFunction.this.bufferLock.lock();
                        try {
                            int i = (int) (offset - CollectSinkFunction.this.offset);
                            for (int i2 = 0; i2 < i && !CollectSinkFunction.this.buffer.isEmpty(); i2++) {
                                CollectSinkFunction.access$802(CollectSinkFunction.this, CollectSinkFunction.this.currentBufferBytes - ((byte[]) CollectSinkFunction.this.buffer.removeFirst()).length);
                                CollectSinkFunction.access$508(CollectSinkFunction.this);
                            }
                            long j = 0;
                            Iterator it = CollectSinkFunction.this.buffer.iterator();
                            while (it.hasNext()) {
                                byte[] bArr = (byte[]) it.next();
                                if (j + bArr.length > CollectSinkFunction.this.maxBytesPerBatch) {
                                    break;
                                }
                                arrayList.add(bArr);
                                j += bArr.length;
                            }
                            if (CollectSinkFunction.this.currentBufferBytes + CollectSinkFunction.this.invokingRecordBytes <= CollectSinkFunction.this.bufferSizeLimitBytes) {
                                CollectSinkFunction.this.bufferCanAddNextResultCondition.signal();
                            }
                            CollectSinkFunction.this.bufferLock.unlock();
                            sendBackResults(arrayList);
                        } catch (Throwable th) {
                            CollectSinkFunction.this.bufferLock.unlock();
                            throw th;
                            break;
                        }
                    } else {
                        CollectSinkFunction.LOG.info("Invalid request. Received version = " + version + ", offset = " + offset + ", while expected version = " + CollectSinkFunction.this.version + ", offset = " + CollectSinkFunction.this.offset);
                        sendBackResults(Collections.emptyList());
                    }
                } catch (Exception e) {
                    if (CollectSinkFunction.LOG.isDebugEnabled()) {
                        CollectSinkFunction.LOG.debug("Collect sink server encounters an exception", e);
                    }
                    closeCurrentConnection();
                }
            }
        }

        public void close() {
            this.running = false;
            closeServerSocket();
            closeCurrentConnection();
        }

        public InetSocketAddress getServerSocketAddress() {
            RuntimeContext runtimeContext = CollectSinkFunction.this.getRuntimeContext();
            Preconditions.checkState(runtimeContext instanceof StreamingRuntimeContext, "CollectSinkFunction can only be used in StreamTask");
            return new InetSocketAddress(((StreamingRuntimeContext) runtimeContext).getTaskManagerRuntimeInfo().getTaskManagerExternalAddress(), this.serverSocket.getLocalPort());
        }

        private InetAddress getBindAddress() {
            RuntimeContext runtimeContext = CollectSinkFunction.this.getRuntimeContext();
            Preconditions.checkState(runtimeContext instanceof StreamingRuntimeContext, "CollectSinkFunction can only be used in StreamTask");
            String taskManagerBindAddress = ((StreamingRuntimeContext) runtimeContext).getTaskManagerRuntimeInfo().getTaskManagerBindAddress();
            if (taskManagerBindAddress == null) {
                return null;
            }
            try {
                return InetAddress.getByName(taskManagerBindAddress);
            } catch (UnknownHostException e) {
                return null;
            }
        }

        private void sendBackResults(List<byte[]> list) throws IOException {
            if (CollectSinkFunction.LOG.isDebugEnabled()) {
                CollectSinkFunction.LOG.debug("Sending back " + list.size() + " results");
            }
            new CollectCoordinationResponse(CollectSinkFunction.this.version, CollectSinkFunction.this.lastCheckpointedOffset, list).serialize(this.outStream);
        }

        private void closeCurrentConnection() {
            try {
                if (this.connection != null) {
                    this.connection.close();
                    this.connection = null;
                }
            } catch (Exception e) {
                CollectSinkFunction.LOG.warn("Error occurs when closing client connections in CollectSinkFunction", e);
            }
        }

        private void closeServerSocket() {
            try {
                this.serverSocket.close();
            } catch (Exception e) {
                CollectSinkFunction.LOG.warn("Error occurs when closing server in CollectSinkFunction", e);
            }
        }

        /* synthetic */ ServerThread(CollectSinkFunction collectSinkFunction, TypeSerializer typeSerializer, AnonymousClass1 anonymousClass1) throws Exception {
            this(typeSerializer);
        }
    }

    public CollectSinkFunction(TypeSerializer<IN> typeSerializer, long j, String str) {
        this.serializer = typeSerializer;
        this.maxBytesPerBatch = j;
        this.bufferSizeLimitBytes = j * 2;
        this.accumulatorName = str;
    }

    private void initBuffer() {
        if (this.buffer != null) {
            return;
        }
        this.buffer = new LinkedList<>();
        this.currentBufferBytes = 0L;
        this.bufferLock = new ReentrantLock();
        this.bufferCanAddNextResultCondition = this.bufferLock.newCondition();
        this.offset = 0L;
        this.lastCheckpointedOffset = this.offset;
    }

    @Override // org.apache.flink.streaming.api.checkpoint.CheckpointedFunction
    public void initializeState(FunctionInitializationContext functionInitializationContext) throws Exception {
        initBuffer();
        this.bufferState = functionInitializationContext.getOperatorStateStore().getListState(new ListStateDescriptor("bufferState", BytePrimitiveArraySerializer.INSTANCE));
        Iterator it = this.bufferState.get().iterator();
        while (it.hasNext()) {
            this.buffer.add((byte[]) it.next());
            this.currentBufferBytes += r0.length;
        }
        this.offsetState = functionInitializationContext.getOperatorStateStore().getListState(new ListStateDescriptor("offsetState", Long.class));
        Iterator it2 = this.offsetState.get().iterator();
        while (it2.hasNext()) {
            this.offset = ((Long) it2.next()).longValue();
        }
        this.lastCheckpointedOffset = this.offset;
        LOG.info("Initializing collect sink state with offset = " + this.lastCheckpointedOffset + ", buffered results bytes = " + this.currentBufferBytes);
        this.uncompletedCheckpointMap = new TreeMap();
    }

    @Override // org.apache.flink.streaming.api.checkpoint.CheckpointedFunction
    public void snapshotState(FunctionSnapshotContext functionSnapshotContext) throws Exception {
        this.bufferLock.lock();
        try {
            this.bufferState.clear();
            this.bufferState.addAll(this.buffer);
            this.offsetState.clear();
            this.offsetState.add(Long.valueOf(this.offset));
            this.uncompletedCheckpointMap.put(Long.valueOf(functionSnapshotContext.getCheckpointId()), Long.valueOf(this.offset));
            if (LOG.isDebugEnabled()) {
                LOG.debug("Checkpoint begin with checkpointId = " + functionSnapshotContext.getCheckpointId() + ", lastCheckpointedOffset = " + this.lastCheckpointedOffset + ", buffered results bytes = " + this.currentBufferBytes);
            }
        } finally {
            this.bufferLock.unlock();
        }
    }

    @Override // org.apache.flink.api.common.functions.AbstractRichFunction, org.apache.flink.api.common.functions.RichFunction
    public void open(Configuration configuration) throws Exception {
        Preconditions.checkState(getRuntimeContext().getNumberOfParallelSubtasks() == 1, "The parallelism of CollectSinkFunction must be 1");
        initBuffer();
        this.version = UUID.randomUUID().toString();
        this.serverThread = new ServerThread(this.serializer);
        this.serverThread.start();
        Preconditions.checkNotNull(this.eventGateway, "Operator event gateway hasn't been set");
        InetSocketAddress serverSocketAddress = this.serverThread.getServerSocketAddress();
        LOG.info("Collect sink server established, address = " + serverSocketAddress);
        this.eventGateway.sendEventToCoordinator(new CollectSinkAddressEvent(serverSocketAddress));
    }

    @Override // org.apache.flink.streaming.api.functions.sink.SinkFunction
    public void invoke(IN in, SinkFunction.Context context) throws Exception {
        this.bufferLock.lock();
        try {
            ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
            this.serializer.serialize(in, new DataOutputViewStreamWrapper(byteArrayOutputStream));
            this.invokingRecordBytes = byteArrayOutputStream.size();
            if (this.invokingRecordBytes > this.maxBytesPerBatch) {
                throw new RuntimeException("Record size is too large for CollectSinkFunction. Record size is " + this.invokingRecordBytes + " bytes, but max bytes per batch is only " + this.maxBytesPerBatch + " bytes. Please consider increasing max bytes per batch value by setting " + CollectSinkOperatorFactory.MAX_BATCH_SIZE.key());
            }
            if (this.currentBufferBytes + this.invokingRecordBytes > this.bufferSizeLimitBytes) {
                this.bufferCanAddNextResultCondition.await();
            }
            this.buffer.add(byteArrayOutputStream.toByteArray());
            this.currentBufferBytes += byteArrayOutputStream.size();
            this.bufferLock.unlock();
        } catch (Throwable th) {
            this.bufferLock.unlock();
            throw th;
        }
    }

    @Override // org.apache.flink.api.common.functions.AbstractRichFunction, org.apache.flink.api.common.functions.RichFunction
    public void close() throws Exception {
        this.serverThread.close();
        this.serverThread.join();
    }

    public void accumulateFinalResults() throws Exception {
        this.bufferLock.lock();
        try {
            SerializedListAccumulator serializedListAccumulator = new SerializedListAccumulator();
            serializedListAccumulator.add(serializeAccumulatorResult(this.offset, this.version, this.lastCheckpointedOffset, this.buffer), BytePrimitiveArraySerializer.INSTANCE);
            getRuntimeContext().addAccumulator(this.accumulatorName, serializedListAccumulator);
        } finally {
            this.bufferLock.unlock();
        }
    }

    @Override // org.apache.flink.api.common.state.CheckpointListener
    public void notifyCheckpointComplete(long j) {
        this.lastCheckpointedOffset = this.uncompletedCheckpointMap.get(Long.valueOf(j)).longValue();
        this.uncompletedCheckpointMap.headMap(Long.valueOf(j + 1)).clear();
        if (LOG.isDebugEnabled()) {
            LOG.debug("Checkpoint complete with checkpointId = " + j + ", lastCheckpointedOffset = " + this.lastCheckpointedOffset);
        }
    }

    @Override // org.apache.flink.api.common.state.CheckpointListener
    public void notifyCheckpointAborted(long j) {
    }

    public void setOperatorEventGateway(OperatorEventGateway operatorEventGateway) {
        this.eventGateway = operatorEventGateway;
    }

    @VisibleForTesting
    public static byte[] serializeAccumulatorResult(long j, String str, long j2, List<byte[]> list) throws IOException {
        ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
        DataOutputViewStreamWrapper dataOutputViewStreamWrapper = new DataOutputViewStreamWrapper(byteArrayOutputStream);
        dataOutputViewStreamWrapper.writeLong(j);
        new CollectCoordinationResponse(str, j2, list).serialize(dataOutputViewStreamWrapper);
        return byteArrayOutputStream.toByteArray();
    }

    public static Tuple2<Long, CollectCoordinationResponse> deserializeAccumulatorResult(byte[] bArr) throws IOException {
        DataInputViewStreamWrapper dataInputViewStreamWrapper = new DataInputViewStreamWrapper(new ByteArrayInputStream(bArr));
        long readLong = dataInputViewStreamWrapper.readLong();
        return Tuple2.of(Long.valueOf(readLong), new CollectCoordinationResponse(dataInputViewStreamWrapper));
    }

    /*  JADX ERROR: Failed to decode insn: 0x0002: MOVE_MULTI, method: org.apache.flink.streaming.api.operators.collect.CollectSinkFunction.access$802(org.apache.flink.streaming.api.operators.collect.CollectSinkFunction, long):long
        java.lang.ArrayIndexOutOfBoundsException: arraycopy: source index -1 out of bounds for object array[6]
        	at java.base/java.lang.System.arraycopy(Native Method)
        	at jadx.plugins.input.java.data.code.StackState.insert(StackState.java:49)
        	at jadx.plugins.input.java.data.code.CodeDecodeState.insert(CodeDecodeState.java:118)
        	at jadx.plugins.input.java.data.code.JavaInsnsRegister.dup2x1(JavaInsnsRegister.java:313)
        	at jadx.plugins.input.java.data.code.JavaInsnData.decode(JavaInsnData.java:46)
        	at jadx.core.dex.instructions.InsnDecoder.lambda$process$0(InsnDecoder.java:54)
        	at jadx.plugins.input.java.data.code.JavaCodeReader.visitInstructions(JavaCodeReader.java:81)
        	at jadx.core.dex.instructions.InsnDecoder.process(InsnDecoder.java:50)
        	at jadx.core.dex.nodes.MethodNode.load(MethodNode.java:156)
        	at jadx.core.dex.nodes.ClassNode.load(ClassNode.java:443)
        	at jadx.core.ProcessClass.process(ProcessClass.java:70)
        	at jadx.core.ProcessClass.generateCode(ProcessClass.java:118)
        	at jadx.core.dex.nodes.ClassNode.generateClassCode(ClassNode.java:400)
        	at jadx.core.dex.nodes.ClassNode.decompile(ClassNode.java:388)
        	at jadx.core.dex.nodes.ClassNode.getCode(ClassNode.java:338)
        */
    static /* synthetic */ long access$802(org.apache.flink.streaming.api.operators.collect.CollectSinkFunction r6, long r7) {
        /*
            r0 = r6
            r1 = r7
            // decode failed: arraycopy: source index -1 out of bounds for object array[6]
            r0.currentBufferBytes = r1
            return r-1
        */
        throw new UnsupportedOperationException("Method not decompiled: org.apache.flink.streaming.api.operators.collect.CollectSinkFunction.access$802(org.apache.flink.streaming.api.operators.collect.CollectSinkFunction, long):long");
    }

    /*  JADX ERROR: Failed to decode insn: 0x0005: MOVE_MULTI, method: org.apache.flink.streaming.api.operators.collect.CollectSinkFunction.access$508(org.apache.flink.streaming.api.operators.collect.CollectSinkFunction):long
        java.lang.ArrayIndexOutOfBoundsException: arraycopy: source index -1 out of bounds for object array[8]
        	at java.base/java.lang.System.arraycopy(Native Method)
        	at jadx.plugins.input.java.data.code.StackState.insert(StackState.java:49)
        	at jadx.plugins.input.java.data.code.CodeDecodeState.insert(CodeDecodeState.java:118)
        	at jadx.plugins.input.java.data.code.JavaInsnsRegister.dup2x1(JavaInsnsRegister.java:313)
        	at jadx.plugins.input.java.data.code.JavaInsnData.decode(JavaInsnData.java:46)
        	at jadx.core.dex.instructions.InsnDecoder.lambda$process$0(InsnDecoder.java:54)
        	at jadx.plugins.input.java.data.code.JavaCodeReader.visitInstructions(JavaCodeReader.java:81)
        	at jadx.core.dex.instructions.InsnDecoder.process(InsnDecoder.java:50)
        	at jadx.core.dex.nodes.MethodNode.load(MethodNode.java:156)
        	at jadx.core.dex.nodes.ClassNode.load(ClassNode.java:443)
        	at jadx.core.ProcessClass.process(ProcessClass.java:70)
        	at jadx.core.ProcessClass.generateCode(ProcessClass.java:118)
        	at jadx.core.dex.nodes.ClassNode.generateClassCode(ClassNode.java:400)
        	at jadx.core.dex.nodes.ClassNode.decompile(ClassNode.java:388)
        	at jadx.core.dex.nodes.ClassNode.getCode(ClassNode.java:338)
        */
    static /* synthetic */ long access$508(org.apache.flink.streaming.api.operators.collect.CollectSinkFunction r8) {
        /*
            r0 = r8
            r1 = r0
            long r1 = r1.offset
            // decode failed: arraycopy: source index -1 out of bounds for object array[8]
            r2 = 1
            long r1 = r1 + r2
            r0.offset = r1
            return r-1
        */
        throw new UnsupportedOperationException("Method not decompiled: org.apache.flink.streaming.api.operators.collect.CollectSinkFunction.access$508(org.apache.flink.streaming.api.operators.collect.CollectSinkFunction):long");
    }

    static {
    }
}
