package org.apache.flink.runtime.rest.handler.job;

import java.util.ArrayList;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import javax.annotation.Nonnull;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.metrics.MetricNames;
import org.apache.flink.runtime.rest.handler.AbstractRestHandler;
import org.apache.flink.runtime.rest.handler.HandlerRequest;
import org.apache.flink.runtime.rest.handler.RestHandlerException;
import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcher;
import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricStore;
import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
import org.apache.flink.runtime.rest.messages.JobIDPathParameter;
import org.apache.flink.runtime.rest.messages.JobVertexBackPressureInfo;
import org.apache.flink.runtime.rest.messages.JobVertexIdPathParameter;
import org.apache.flink.runtime.rest.messages.JobVertexMessageParameters;
import org.apache.flink.runtime.rest.messages.MessageHeaders;
import org.apache.flink.runtime.webmonitor.RestfulGateway;
import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;

/* loaded from: input_file:org/apache/flink/runtime/rest/handler/job/JobVertexBackPressureHandler.class */
public class JobVertexBackPressureHandler extends AbstractRestHandler<RestfulGateway, EmptyRequestBody, JobVertexBackPressureInfo, JobVertexMessageParameters> {
    private final MetricFetcher metricFetcher;

    public JobVertexBackPressureHandler(GatewayRetriever<? extends RestfulGateway> gatewayRetriever, Time time, Map<String, String> map, MessageHeaders<EmptyRequestBody, JobVertexBackPressureInfo, JobVertexMessageParameters> messageHeaders, MetricFetcher metricFetcher) {
        super(gatewayRetriever, time, map, messageHeaders);
        this.metricFetcher = metricFetcher;
    }

    @Override // org.apache.flink.runtime.rest.handler.AbstractRestHandler
    protected CompletableFuture<JobVertexBackPressureInfo> handleRequest(@Nonnull HandlerRequest<EmptyRequestBody> handlerRequest, @Nonnull RestfulGateway restfulGateway) throws RestHandlerException {
        this.metricFetcher.update();
        MetricStore.TaskMetricStore taskMetricStore = this.metricFetcher.getMetricStore().getTaskMetricStore(((JobID) handlerRequest.getPathParameter(JobIDPathParameter.class)).toString(), ((JobVertexID) handlerRequest.getPathParameter(JobVertexIdPathParameter.class)).toString());
        return CompletableFuture.completedFuture(taskMetricStore != null ? createJobVertexBackPressureInfo(taskMetricStore.getAllSubtaskMetricStores()) : JobVertexBackPressureInfo.deprecated());
    }

    private JobVertexBackPressureInfo createJobVertexBackPressureInfo(Map<Integer, MetricStore.ComponentMetricStore> map) {
        List<JobVertexBackPressureInfo.SubtaskBackPressureInfo> createSubtaskBackPressureInfo = createSubtaskBackPressureInfo(map);
        return new JobVertexBackPressureInfo(JobVertexBackPressureInfo.VertexBackPressureStatus.OK, getBackPressureLevel(getMaxBackPressureRatio(createSubtaskBackPressureInfo)), Long.valueOf(this.metricFetcher.getLastUpdateTime()), createSubtaskBackPressureInfo);
    }

    private List<JobVertexBackPressureInfo.SubtaskBackPressureInfo> createSubtaskBackPressureInfo(Map<Integer, MetricStore.ComponentMetricStore> map) {
        ArrayList arrayList = new ArrayList(map.size());
        for (Map.Entry<Integer, MetricStore.ComponentMetricStore> entry : map.entrySet()) {
            int intValue = entry.getKey().intValue();
            MetricStore.ComponentMetricStore value = entry.getValue();
            double backPressureRatio = getBackPressureRatio(value);
            arrayList.add(new JobVertexBackPressureInfo.SubtaskBackPressureInfo(intValue, getBackPressureLevel(backPressureRatio), backPressureRatio, getIdleRatio(value), getBusyRatio(value)));
        }
        arrayList.sort(Comparator.comparingInt((v0) -> {
            return v0.getSubtask();
        }));
        return arrayList;
    }

    private double getMaxBackPressureRatio(List<JobVertexBackPressureInfo.SubtaskBackPressureInfo> list) {
        return list.stream().mapToDouble(subtaskBackPressureInfo -> {
            return subtaskBackPressureInfo.getBackPressuredRatio();
        }).max().getAsDouble();
    }

    private double getBackPressureRatio(MetricStore.ComponentMetricStore componentMetricStore) {
        return getMsPerSecondMetricAsRatio(componentMetricStore, MetricNames.TASK_BACK_PRESSURED_TIME);
    }

    private double getIdleRatio(MetricStore.ComponentMetricStore componentMetricStore) {
        return getMsPerSecondMetricAsRatio(componentMetricStore, MetricNames.TASK_IDLE_TIME);
    }

    private double getBusyRatio(MetricStore.ComponentMetricStore componentMetricStore) {
        return getMsPerSecondMetricAsRatio(componentMetricStore, MetricNames.TASK_BUSY_TIME);
    }

    private double getMsPerSecondMetricAsRatio(MetricStore.ComponentMetricStore componentMetricStore, String str) {
        return Double.valueOf(componentMetricStore.getMetric(str, "0")).doubleValue() / 1000.0d;
    }

    private static JobVertexBackPressureInfo.VertexBackPressureLevel getBackPressureLevel(double d) {
        return d <= 0.1d ? JobVertexBackPressureInfo.VertexBackPressureLevel.OK : d <= 0.5d ? JobVertexBackPressureInfo.VertexBackPressureLevel.LOW : JobVertexBackPressureInfo.VertexBackPressureLevel.HIGH;
    }
}
