package org.apache.flink.table.runtime.operators.wmassigners;

import org.apache.flink.api.common.functions.util.FunctionUtils;
import org.apache.flink.api.common.operators.ProcessingTimeService;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.ChainingStrategy;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.watermarkstatus.WatermarkStatus;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.runtime.generated.WatermarkGenerator;
import org.apache.flink.util.Preconditions;

/* loaded from: input_file:org/apache/flink/table/runtime/operators/wmassigners/WatermarkAssignerOperator.class */
public class WatermarkAssignerOperator extends AbstractStreamOperator<RowData> implements OneInputStreamOperator<RowData, RowData>, ProcessingTimeService.ProcessingTimeCallback {
    private static final long serialVersionUID = 1;
    private final int rowtimeFieldIndex;
    private final long idleTimeout;
    private final WatermarkGenerator watermarkGenerator;
    private transient long lastWatermark;
    private transient long watermarkInterval;
    private transient long currentWatermark;
    private transient long lastRecordTime;
    private transient WatermarkStatus currentStatus = WatermarkStatus.ACTIVE;

    public WatermarkAssignerOperator(int i, WatermarkGenerator watermarkGenerator, long j, org.apache.flink.streaming.runtime.tasks.ProcessingTimeService processingTimeService) {
        this.rowtimeFieldIndex = i;
        this.watermarkGenerator = watermarkGenerator;
        this.idleTimeout = j;
        this.chainingStrategy = ChainingStrategy.ALWAYS;
        this.processingTimeService = (org.apache.flink.streaming.runtime.tasks.ProcessingTimeService) Preconditions.checkNotNull(processingTimeService);
    }

    @Override // org.apache.flink.streaming.api.operators.AbstractStreamOperator, org.apache.flink.streaming.api.operators.StreamOperator
    public void open() throws Exception {
        super.open();
        this.currentWatermark = 0L;
        this.watermarkInterval = getExecutionConfig().getAutoWatermarkInterval();
        this.lastRecordTime = getProcessingTimeService().getCurrentProcessingTime();
        if (this.watermarkInterval > 0) {
            getProcessingTimeService().registerTimer(getProcessingTimeService().getCurrentProcessingTime() + this.watermarkInterval, this);
        }
        FunctionUtils.setFunctionRuntimeContext(this.watermarkGenerator, getRuntimeContext());
        FunctionUtils.openFunction(this.watermarkGenerator, new Configuration());
    }

    @Override // org.apache.flink.streaming.api.operators.Input
    public void processElement(StreamRecord<RowData> streamRecord) throws Exception {
        if (this.idleTimeout > 0 && this.currentStatus.equals(WatermarkStatus.IDLE)) {
            emitWatermarkStatus(WatermarkStatus.ACTIVE);
            this.lastRecordTime = getProcessingTimeService().getCurrentProcessingTime();
        }
        RowData value = streamRecord.getValue();
        if (value.isNullAt(this.rowtimeFieldIndex)) {
            throw new RuntimeException("RowTime field should not be null, please convert it to a non-null long value.");
        }
        Long currentWatermark = this.watermarkGenerator.currentWatermark(value);
        if (currentWatermark != null) {
            this.currentWatermark = Math.max(this.currentWatermark, currentWatermark.longValue());
        }
        this.output.collect(streamRecord);
        if (this.currentWatermark - this.lastWatermark > this.watermarkInterval) {
            advanceWatermark();
        }
    }

    private void advanceWatermark() {
        if (this.currentWatermark > this.lastWatermark) {
            this.lastWatermark = this.currentWatermark;
            this.output.emitWatermark(new Watermark(this.currentWatermark));
        }
    }

    @Override // org.apache.flink.api.common.operators.ProcessingTimeService.ProcessingTimeCallback
    public void onProcessingTime(long j) throws Exception {
        advanceWatermark();
        if (this.idleTimeout > 0 && this.currentStatus.equals(WatermarkStatus.ACTIVE) && getProcessingTimeService().getCurrentProcessingTime() - this.lastRecordTime > this.idleTimeout) {
            emitWatermarkStatus(WatermarkStatus.IDLE);
        }
        getProcessingTimeService().registerTimer(getProcessingTimeService().getCurrentProcessingTime() + this.watermarkInterval, this);
    }

    @Override // org.apache.flink.streaming.api.operators.AbstractStreamOperator, org.apache.flink.streaming.api.operators.Input
    public void processWatermark(Watermark watermark) throws Exception {
        if (watermark.getTimestamp() != CheckpointOptions.NO_ALIGNED_CHECKPOINT_TIME_OUT || this.currentWatermark == CheckpointOptions.NO_ALIGNED_CHECKPOINT_TIME_OUT) {
            return;
        }
        if (this.idleTimeout > 0 && this.currentStatus.equals(WatermarkStatus.IDLE)) {
            emitWatermarkStatus(WatermarkStatus.ACTIVE);
        }
        this.currentWatermark = CheckpointOptions.NO_ALIGNED_CHECKPOINT_TIME_OUT;
        this.output.emitWatermark(watermark);
    }

    @Override // org.apache.flink.streaming.api.operators.AbstractStreamOperator, org.apache.flink.streaming.api.operators.Input
    public void processWatermarkStatus(WatermarkStatus watermarkStatus) throws Exception {
        emitWatermarkStatus(watermarkStatus);
    }

    private void emitWatermarkStatus(WatermarkStatus watermarkStatus) {
        this.currentStatus = watermarkStatus;
        this.output.emitWatermarkStatus(watermarkStatus);
    }

    @Override // org.apache.flink.streaming.api.operators.AbstractStreamOperator, org.apache.flink.streaming.api.operators.StreamOperator
    public void finish() throws Exception {
        processWatermark(Watermark.MAX_WATERMARK);
    }

    @Override // org.apache.flink.streaming.api.operators.AbstractStreamOperator, org.apache.flink.streaming.api.operators.StreamOperator
    public void close() throws Exception {
        FunctionUtils.closeFunction(this.watermarkGenerator);
        super.close();
    }
}
