package org.apache.flink.runtime.rest.handler;

import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.function.Function;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.runtime.entrypoint.ClusterEntryPointExceptionUtils;
import org.apache.flink.runtime.rest.FileUploadHandler;
import org.apache.flink.runtime.rest.FlinkHttpObjectAggregator;
import org.apache.flink.runtime.rest.handler.router.RoutedRequest;
import org.apache.flink.runtime.rest.handler.util.HandlerUtils;
import org.apache.flink.runtime.rest.messages.ErrorResponseBody;
import org.apache.flink.runtime.rest.messages.MessageParameters;
import org.apache.flink.runtime.rest.messages.RequestBody;
import org.apache.flink.runtime.rest.messages.UntypedResponseMessageHeaders;
import org.apache.flink.runtime.rest.util.RestMapperUtils;
import org.apache.flink.runtime.webmonitor.RestfulGateway;
import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
import org.apache.flink.shaded.guava30.com.google.common.base.Ascii;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonParseException;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonMappingException;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf;
import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBufInputStream;
import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandlerContext;
import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.FullHttpRequest;
import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpRequest;
import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
import org.apache.flink.util.AutoCloseableAsync;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.concurrent.FutureUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/runtime/rest/handler/AbstractHandler.class */
public abstract class AbstractHandler<T extends RestfulGateway, R extends RequestBody, M extends MessageParameters> extends LeaderRetrievalHandler<T> implements AutoCloseableAsync {
    protected final Logger log;
    protected static final ObjectMapper MAPPER = RestMapperUtils.getStrictObjectMapper();
    private static final int OTHER_RESP_PAYLOAD_OVERHEAD = 1024;
    private final UntypedResponseMessageHeaders<R, M> untypedResponseMessageHeaders;
    private final InFlightRequestTracker inFlightRequestTracker;
    private CompletableFuture<Void> terminationFuture;
    private final Object lock;

    /* JADX INFO: Access modifiers changed from: protected */
    public AbstractHandler(@Nonnull GatewayRetriever<? extends T> gatewayRetriever, @Nonnull Time time, @Nonnull Map<String, String> map, @Nonnull UntypedResponseMessageHeaders<R, M> untypedResponseMessageHeaders) {
        super(gatewayRetriever, time, map);
        this.log = LoggerFactory.getLogger(getClass());
        this.lock = new Object();
        this.untypedResponseMessageHeaders = (UntypedResponseMessageHeaders) Preconditions.checkNotNull(untypedResponseMessageHeaders);
        this.inFlightRequestTracker = new InFlightRequestTracker();
    }

    @Override // org.apache.flink.runtime.rest.handler.LeaderRetrievalHandler
    protected void respondAsLeader(ChannelHandlerContext channelHandlerContext, RoutedRequest routedRequest, T t) {
        RequestBody requestBody;
        FullHttpRequest request = routedRequest.getRequest();
        if (this.log.isTraceEnabled()) {
            this.log.trace("Received request " + request.uri() + '.');
        }
        FileUploads fileUploads = null;
        try {
            if (!this.inFlightRequestTracker.registerRequest()) {
                this.log.debug("The handler instance for {} had already been closed.", this.untypedResponseMessageHeaders.getTargetRestEndpointURL());
                channelHandlerContext.channel().close();
                return;
            }
            if (!(request instanceof FullHttpRequest)) {
                this.log.error("Implementation error: Received a request that wasn't a FullHttpRequest.");
                throw new RestHandlerException("Bad request received.", HttpResponseStatus.BAD_REQUEST);
            }
            ByteBuf content = request.content();
            fileUploads = FileUploadHandler.getMultipartFileUploads(channelHandlerContext);
            if (!this.untypedResponseMessageHeaders.acceptsFileUploads() && !fileUploads.getUploadedFiles().isEmpty()) {
                throw new RestHandlerException("File uploads not allowed.", HttpResponseStatus.BAD_REQUEST);
            }
            if (content.capacity() != 0) {
                try {
                    requestBody = (RequestBody) MAPPER.readValue(new ByteBufInputStream(content), this.untypedResponseMessageHeaders.getRequestClass());
                    HandlerRequest<R> resolveParametersAndCreate = HandlerRequest.resolveParametersAndCreate(requestBody, this.untypedResponseMessageHeaders.getUnresolvedMessageParameters(), routedRequest.getRouteResult().pathParams(), routedRequest.getRouteResult().queryParams(), fileUploads.getUploadedFiles());
                    this.log.trace("Starting request processing.");
                    respondToRequest(channelHandlerContext, request, resolveParametersAndCreate, t).handle((r8, th) -> {
                        return th != null ? handleException(ExceptionUtils.stripCompletionException(th), channelHandlerContext, request) : CompletableFuture.completedFuture(null);
                    }).thenCompose((Function<? super U, ? extends CompletionStage<U>>) Function.identity()).whenComplete((r6, th2) -> {
                        if (th2 != null) {
                            this.log.warn("An exception occurred while handling another exception.", th2);
                        }
                        finalizeRequestProcessing(fileUploads);
                    });
                } catch (JsonParseException | JsonMappingException e) {
                    throw new RestHandlerException(String.format("Request did not match expected format %s.", this.untypedResponseMessageHeaders.getRequestClass().getSimpleName()), HttpResponseStatus.BAD_REQUEST, e);
                }
            }
            try {
                requestBody = (RequestBody) MAPPER.readValue("{}", this.untypedResponseMessageHeaders.getRequestClass());
                try {
                    HandlerRequest<R> resolveParametersAndCreate2 = HandlerRequest.resolveParametersAndCreate(requestBody, this.untypedResponseMessageHeaders.getUnresolvedMessageParameters(), routedRequest.getRouteResult().pathParams(), routedRequest.getRouteResult().queryParams(), fileUploads.getUploadedFiles());
                    this.log.trace("Starting request processing.");
                    respondToRequest(channelHandlerContext, request, resolveParametersAndCreate2, t).handle((r82, th3) -> {
                        return th3 != null ? handleException(ExceptionUtils.stripCompletionException(th3), channelHandlerContext, request) : CompletableFuture.completedFuture(null);
                    }).thenCompose((Function<? super U, ? extends CompletionStage<U>>) Function.identity()).whenComplete((r62, th22) -> {
                        if (th22 != null) {
                            this.log.warn("An exception occurred while handling another exception.", th22);
                        }
                        finalizeRequestProcessing(fileUploads);
                    });
                } catch (HandlerRequestException e2) {
                    this.log.error("Could not create the handler request.", e2);
                    throw new RestHandlerException(String.format("Bad request, could not parse parameters: %s", e2.getMessage()), HttpResponseStatus.BAD_REQUEST, e2);
                }
            } catch (JsonParseException | JsonMappingException e3) {
                throw new RestHandlerException("Bad request received. Request did not conform to expected format.", HttpResponseStatus.BAD_REQUEST, e3);
            }
        } catch (Throwable th4) {
            FileUploads fileUploads2 = fileUploads;
            handleException(th4, channelHandlerContext, request).whenComplete((r5, th5) -> {
                finalizeRequestProcessing(fileUploads2);
            });
        }
    }

    private void finalizeRequestProcessing(FileUploads fileUploads) {
        this.inFlightRequestTracker.deregisterRequest();
        cleanupFileUploads(fileUploads);
    }

    private CompletableFuture<Void> handleException(Throwable th, ChannelHandlerContext channelHandlerContext, HttpRequest httpRequest) {
        ClusterEntryPointExceptionUtils.tryEnrichClusterEntryPointError(th);
        FlinkHttpObjectAggregator flinkHttpObjectAggregator = channelHandlerContext.pipeline().get(FlinkHttpObjectAggregator.class);
        if (flinkHttpObjectAggregator == null) {
            this.log.warn("The connection was unexpectedly closed by the client.");
            return CompletableFuture.completedFuture(null);
        }
        int maxContentLength = flinkHttpObjectAggregator.maxContentLength() - 1024;
        if (!(th instanceof RestHandlerException)) {
            this.log.error("Unhandled exception.", th);
            return HandlerUtils.sendErrorResponse(channelHandlerContext, httpRequest, new ErrorResponseBody((List<String>) Arrays.asList("Internal server error.", Ascii.truncate(String.format("<Exception on server side:%n%s%nEnd of exception on server side>", ExceptionUtils.stringifyException(th)), maxContentLength, "..."))), HttpResponseStatus.INTERNAL_SERVER_ERROR, this.responseHeaders);
        }
        RestHandlerException restHandlerException = (RestHandlerException) th;
        String truncate = Ascii.truncate(ExceptionUtils.stringifyException(restHandlerException), maxContentLength, "...");
        if (this.log.isDebugEnabled()) {
            this.log.error("Exception occurred in REST handler.", restHandlerException);
        } else if (restHandlerException.logException()) {
            this.log.error("Exception occurred in REST handler: {}", restHandlerException.getMessage());
        }
        return HandlerUtils.sendErrorResponse(channelHandlerContext, httpRequest, new ErrorResponseBody(truncate), restHandlerException.getHttpResponseStatus(), this.responseHeaders);
    }

    @Override // org.apache.flink.util.AutoCloseableAsync
    public final CompletableFuture<Void> closeAsync() {
        CompletableFuture<Void> completableFuture;
        synchronized (this.lock) {
            if (this.terminationFuture == null) {
                CompletableFuture<Void> closeHandlerAsync = closeHandlerAsync();
                InFlightRequestTracker inFlightRequestTracker = this.inFlightRequestTracker;
                inFlightRequestTracker.getClass();
                this.terminationFuture = FutureUtils.composeAfterwards(closeHandlerAsync, inFlightRequestTracker::awaitAsync);
            } else {
                this.log.warn("The handler instance for {} had already been closed, but another attempt at closing it was made.", this.untypedResponseMessageHeaders.getTargetRestEndpointURL());
            }
            completableFuture = this.terminationFuture;
        }
        return completableFuture;
    }

    protected CompletableFuture<Void> closeHandlerAsync() {
        return CompletableFuture.completedFuture(null);
    }

    private void cleanupFileUploads(@Nullable FileUploads fileUploads) {
        if (fileUploads != null) {
            try {
                fileUploads.close();
            } catch (IOException e) {
                this.log.warn("Could not cleanup uploaded files.", e);
            }
        }
    }

    protected abstract CompletableFuture<Void> respondToRequest(ChannelHandlerContext channelHandlerContext, HttpRequest httpRequest, HandlerRequest<R> handlerRequest, T t) throws RestHandlerException;
}
