package org.apache.flink.runtime.rest;

import java.io.IOException;
import java.io.Serializable;
import java.net.BindException;
import java.net.InetSocketAddress;
import java.net.URL;
import java.nio.file.Files;
import java.nio.file.LinkOption;
import java.nio.file.Path;
import java.nio.file.attribute.FileAttribute;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashSet;
import java.util.IdentityHashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.ServiceLoader;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.IllegalConfigurationException;
import org.apache.flink.runtime.io.network.netty.InboundChannelHandlerFactory;
import org.apache.flink.runtime.io.network.netty.SSLHandlerFactory;
import org.apache.flink.runtime.net.RedirectingSslHandler;
import org.apache.flink.runtime.rest.handler.PipelineErrorHandler;
import org.apache.flink.runtime.rest.handler.RestHandlerSpecification;
import org.apache.flink.runtime.rest.handler.router.Router;
import org.apache.flink.runtime.rest.handler.router.RouterHandler;
import org.apache.flink.runtime.rest.versioning.RestAPIVersion;
import org.apache.flink.shaded.netty4.io.netty.bootstrap.ServerBootstrap;
import org.apache.flink.shaded.netty4.io.netty.bootstrap.ServerBootstrapConfig;
import org.apache.flink.shaded.netty4.io.netty.channel.Channel;
import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandler;
import org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandler;
import org.apache.flink.shaded.netty4.io.netty.channel.ChannelInitializer;
import org.apache.flink.shaded.netty4.io.netty.channel.EventLoopGroup;
import org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoopGroup;
import org.apache.flink.shaded.netty4.io.netty.channel.socket.SocketChannel;
import org.apache.flink.shaded.netty4.io.netty.channel.socket.nio.NioServerSocketChannel;
import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpServerCodec;
import org.apache.flink.shaded.netty4.io.netty.handler.stream.ChunkedWriteHandler;
import org.apache.flink.util.AutoCloseableAsync;
import org.apache.flink.util.ConfigurationException;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.NetUtils;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.concurrent.ExecutorThreadFactory;
import org.apache.flink.util.concurrent.FutureUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/runtime/rest/RestServerEndpoint.class */
public abstract class RestServerEndpoint implements RestService {
    private final Configuration configuration;
    private final String restAddress;
    private final String restBindAddress;
    private final String restBindPortRange;

    @Nullable
    private final SSLHandlerFactory sslHandlerFactory;
    private final int maxContentLength;
    protected final Path uploadDir;
    protected final Map<String, String> responseHeaders;
    private final CompletableFuture<Void> terminationFuture;
    private List<Tuple2<RestHandlerSpecification, ChannelInboundHandler>> handlers;
    private ServerBootstrap bootstrap;
    private Channel serverChannel;
    private String restBaseUrl;
    private int port;

    @VisibleForTesting
    List<InboundChannelHandlerFactory> inboundChannelHandlerFactories;
    protected final Logger log = LoggerFactory.getLogger(getClass());
    private final Object lock = new Object();
    private State state = State.CREATED;

    /* loaded from: input_file:org/apache/flink/runtime/rest/RestServerEndpoint$RestHandlerUrlComparator.class */
    public static final class RestHandlerUrlComparator implements Comparator<Tuple2<RestHandlerSpecification, ChannelInboundHandler>>, Serializable {
        private static final long serialVersionUID = 2388466767835547926L;
        private static final Comparator<String> CASE_INSENSITIVE_ORDER = new CaseInsensitiveOrderComparator();
        private static final Comparator<RestAPIVersion> API_VERSION_ORDER = new RestAPIVersion.RestAPIVersionComparator();
        static final RestHandlerUrlComparator INSTANCE = new RestHandlerUrlComparator();

        /* loaded from: input_file:org/apache/flink/runtime/rest/RestServerEndpoint$RestHandlerUrlComparator$CaseInsensitiveOrderComparator.class */
        public static final class CaseInsensitiveOrderComparator implements Comparator<String>, Serializable {
            private static final long serialVersionUID = 8550835445193437027L;

            @Override // java.util.Comparator
            public int compare(String str, String str2) {
                char upperCase;
                char upperCase2;
                char lowerCase;
                char lowerCase2;
                int length = str.length();
                int length2 = str2.length();
                int min = Math.min(length, length2);
                for (int i = 0; i < min; i++) {
                    char charAt = str.charAt(i);
                    char charAt2 = str2.charAt(i);
                    if (charAt != charAt2 && (upperCase = Character.toUpperCase(charAt)) != (upperCase2 = Character.toUpperCase(charAt2)) && (lowerCase = Character.toLowerCase(upperCase)) != (lowerCase2 = Character.toLowerCase(upperCase2))) {
                        if (lowerCase == ':') {
                            return 1;
                        }
                        if (lowerCase2 == ':') {
                            return -1;
                        }
                        return lowerCase - lowerCase2;
                    }
                }
                return length - length2;
            }
        }

        /* JADX WARN: Multi-variable type inference failed */
        @Override // java.util.Comparator
        public int compare(Tuple2<RestHandlerSpecification, ChannelInboundHandler> tuple2, Tuple2<RestHandlerSpecification, ChannelInboundHandler> tuple22) {
            int compare = CASE_INSENSITIVE_ORDER.compare(tuple2.f0.getTargetRestEndpointURL(), tuple22.f0.getTargetRestEndpointURL());
            return compare != 0 ? compare : API_VERSION_ORDER.compare(Collections.min(tuple2.f0.getSupportedAPIVersions()), Collections.min(tuple22.f0.getSupportedAPIVersions()));
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/runtime/rest/RestServerEndpoint$State.class */
    public enum State {
        CREATED,
        RUNNING,
        SHUTDOWN
    }

    public RestServerEndpoint(Configuration configuration) throws IOException, ConfigurationException {
        Preconditions.checkNotNull(configuration);
        RestServerEndpointConfiguration fromConfiguration = RestServerEndpointConfiguration.fromConfiguration(configuration);
        Preconditions.checkNotNull(fromConfiguration);
        this.configuration = configuration;
        this.restAddress = fromConfiguration.getRestAddress();
        this.restBindAddress = fromConfiguration.getRestBindAddress();
        this.restBindPortRange = fromConfiguration.getRestBindPortRange();
        this.sslHandlerFactory = fromConfiguration.getSslHandlerFactory();
        this.uploadDir = fromConfiguration.getUploadDir();
        createUploadDir(this.uploadDir, this.log, true);
        this.maxContentLength = fromConfiguration.getMaxContentLength();
        this.responseHeaders = fromConfiguration.getResponseHeaders();
        this.terminationFuture = new CompletableFuture<>();
        this.inboundChannelHandlerFactories = new ArrayList();
        Iterator it = ServiceLoader.load(InboundChannelHandlerFactory.class).iterator();
        while (it.hasNext()) {
            try {
                InboundChannelHandlerFactory inboundChannelHandlerFactory = (InboundChannelHandlerFactory) it.next();
                if (inboundChannelHandlerFactory != null) {
                    this.inboundChannelHandlerFactories.add(inboundChannelHandlerFactory);
                    this.log.info("Loaded channel inbound factory: {}", inboundChannelHandlerFactory);
                }
            } catch (Throwable th) {
                this.log.error("Could not load channel inbound factory.", th);
                throw th;
            }
        }
        this.inboundChannelHandlerFactories.sort(Comparator.comparingInt((v0) -> {
            return v0.priority();
        }).reversed());
    }

    protected abstract List<Tuple2<RestHandlerSpecification, ChannelInboundHandler>> initializeHandlers(CompletableFuture<String> completableFuture);

    public final void start() throws Exception {
        synchronized (this.lock) {
            Preconditions.checkState(this.state == State.CREATED, "The RestServerEndpoint cannot be restarted.");
            this.log.info("Starting rest endpoint.");
            final Router router = new Router();
            final CompletableFuture<String> completableFuture = new CompletableFuture<>();
            this.handlers = initializeHandlers(completableFuture);
            Collections.sort(this.handlers, RestHandlerUrlComparator.INSTANCE);
            checkAllEndpointsAndHandlersAreUnique(this.handlers);
            this.handlers.forEach(tuple2 -> {
                registerHandler(router, tuple2, this.log);
            });
            ChannelInitializer<SocketChannel> channelInitializer = new ChannelInitializer<SocketChannel>() { // from class: org.apache.flink.runtime.rest.RestServerEndpoint.1
                /* JADX INFO: Access modifiers changed from: protected */
                public void initChannel(SocketChannel socketChannel) throws ConfigurationException {
                    RouterHandler routerHandler = new RouterHandler(router, RestServerEndpoint.this.responseHeaders);
                    if (RestServerEndpoint.this.isHttpsEnabled()) {
                        socketChannel.pipeline().addLast("ssl", new RedirectingSslHandler(RestServerEndpoint.this.restAddress, completableFuture, RestServerEndpoint.this.sslHandlerFactory));
                    }
                    socketChannel.pipeline().addLast(new ChannelHandler[]{new HttpServerCodec()}).addLast(new ChannelHandler[]{new FileUploadHandler(RestServerEndpoint.this.uploadDir)}).addLast(new ChannelHandler[]{new FlinkHttpObjectAggregator(RestServerEndpoint.this.maxContentLength, RestServerEndpoint.this.responseHeaders)});
                    Iterator<InboundChannelHandlerFactory> it = RestServerEndpoint.this.inboundChannelHandlerFactories.iterator();
                    while (it.hasNext()) {
                        Optional<ChannelHandler> createHandler = it.next().createHandler(RestServerEndpoint.this.configuration, RestServerEndpoint.this.responseHeaders);
                        if (createHandler.isPresent()) {
                            socketChannel.pipeline().addLast(new ChannelHandler[]{createHandler.get()});
                        }
                    }
                    socketChannel.pipeline().addLast(new ChannelHandler[]{new ChunkedWriteHandler()}).addLast(routerHandler.getName(), routerHandler).addLast(new ChannelHandler[]{new PipelineErrorHandler(RestServerEndpoint.this.log, RestServerEndpoint.this.responseHeaders)});
                }
            };
            NioEventLoopGroup nioEventLoopGroup = new NioEventLoopGroup(1, new ExecutorThreadFactory("flink-rest-server-netty-boss"));
            NioEventLoopGroup nioEventLoopGroup2 = new NioEventLoopGroup(0, new ExecutorThreadFactory("flink-rest-server-netty-worker"));
            this.bootstrap = new ServerBootstrap();
            this.bootstrap.group(nioEventLoopGroup, nioEventLoopGroup2).channel(NioServerSocketChannel.class).childHandler(channelInitializer);
            try {
                Iterator<Integer> portRangeFromString = NetUtils.getPortRangeFromString(this.restBindPortRange);
                int i = 0;
                while (portRangeFromString.hasNext()) {
                    try {
                        i = portRangeFromString.next().intValue();
                        this.serverChannel = (this.restBindAddress == null ? this.bootstrap.bind(i) : this.bootstrap.bind(this.restBindAddress, i)).syncUninterruptibly().channel();
                        break;
                    } catch (Exception e) {
                        if (!(e instanceof BindException)) {
                            throw e;
                        }
                    }
                }
                if (this.serverChannel == null) {
                    throw new BindException("Could not start rest endpoint on any port in port range " + this.restBindPortRange);
                }
                this.log.debug("Binding rest endpoint to {}:{}.", this.restBindAddress, Integer.valueOf(i));
                InetSocketAddress inetSocketAddress = (InetSocketAddress) this.serverChannel.localAddress();
                String hostAddress = inetSocketAddress.getAddress().isAnyLocalAddress() ? this.restAddress : inetSocketAddress.getAddress().getHostAddress();
                this.port = inetSocketAddress.getPort();
                this.log.info("Rest endpoint listening at {}:{}", hostAddress, Integer.valueOf(this.port));
                this.restBaseUrl = new URL(determineProtocol(), hostAddress, this.port, "").toString();
                completableFuture.complete(this.restBaseUrl);
                this.state = State.RUNNING;
                startInternal();
            } catch (IllegalConfigurationException e2) {
                throw e2;
            } catch (Exception e3) {
                throw new IllegalArgumentException("Invalid port range definition: " + this.restBindPortRange);
            }
        }
    }

    protected abstract void startInternal() throws Exception;

    @Nullable
    public InetSocketAddress getServerAddress() {
        synchronized (this.lock) {
            assertRestServerHasBeenStarted();
            Channel channel = this.serverChannel;
            if (channel != null) {
                try {
                    return (InetSocketAddress) channel.localAddress();
                } catch (Exception e) {
                    this.log.error("Cannot access local server address", e);
                }
            }
            return null;
        }
    }

    public String getRestBaseUrl() {
        String str;
        synchronized (this.lock) {
            assertRestServerHasBeenStarted();
            str = this.restBaseUrl;
        }
        return str;
    }

    private void assertRestServerHasBeenStarted() {
        Preconditions.checkState(this.state != State.CREATED, "The RestServerEndpoint has not been started yet.");
    }

    @Override // org.apache.flink.runtime.rest.RestService
    public int getRestPort() {
        int i;
        synchronized (this.lock) {
            assertRestServerHasBeenStarted();
            i = this.port;
        }
        return i;
    }

    @Override // org.apache.flink.util.AutoCloseableAsync
    public CompletableFuture<Void> closeAsync() {
        CompletableFuture<Void> completableFuture;
        synchronized (this.lock) {
            this.log.info("Shutting down rest endpoint.");
            if (this.state == State.RUNNING) {
                FutureUtils.composeAfterwards(closeHandlersAsync(), this::shutDownInternal).whenComplete((r4, th) -> {
                    this.log.info("Shut down complete.");
                    if (th != null) {
                        this.terminationFuture.completeExceptionally(th);
                    } else {
                        this.terminationFuture.complete(null);
                    }
                });
                this.state = State.SHUTDOWN;
            } else if (this.state == State.CREATED) {
                this.terminationFuture.complete(null);
                this.state = State.SHUTDOWN;
            }
            completableFuture = this.terminationFuture;
        }
        return completableFuture;
    }

    private FutureUtils.ConjunctFuture<Void> closeHandlersAsync() {
        return FutureUtils.waitForAll((Collection) this.handlers.stream().map(tuple2 -> {
            return (ChannelInboundHandler) tuple2.f1;
        }).filter(channelInboundHandler -> {
            return channelInboundHandler instanceof AutoCloseableAsync;
        }).map(channelInboundHandler2 -> {
            return ((AutoCloseableAsync) channelInboundHandler2).closeAsync();
        }).collect(Collectors.toList()));
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public CompletableFuture<Void> shutDownInternal() {
        CompletableFuture<Void> completableFuture;
        synchronized (this.lock) {
            CompletableFuture completableFuture2 = new CompletableFuture();
            if (this.serverChannel != null) {
                this.serverChannel.close().addListener(future -> {
                    if (future.isSuccess()) {
                        completableFuture2.complete(null);
                    } else {
                        completableFuture2.completeExceptionally(future.cause());
                    }
                });
                this.serverChannel = null;
            }
            completableFuture = new CompletableFuture<>();
            completableFuture2.thenRun(() -> {
                CompletableFuture completableFuture3 = new CompletableFuture();
                CompletableFuture completableFuture4 = new CompletableFuture();
                Time seconds = Time.seconds(10L);
                if (this.bootstrap != null) {
                    ServerBootstrapConfig config = this.bootstrap.config();
                    EventLoopGroup group = config.group();
                    if (group != null) {
                        group.shutdownGracefully(0L, seconds.toMilliseconds(), TimeUnit.MILLISECONDS).addListener(future2 -> {
                            if (future2.isSuccess()) {
                                completableFuture3.complete(null);
                            } else {
                                completableFuture3.completeExceptionally(future2.cause());
                            }
                        });
                    } else {
                        completableFuture3.complete(null);
                    }
                    EventLoopGroup childGroup = config.childGroup();
                    if (childGroup != null) {
                        childGroup.shutdownGracefully(0L, seconds.toMilliseconds(), TimeUnit.MILLISECONDS).addListener(future3 -> {
                            if (future3.isSuccess()) {
                                completableFuture4.complete(null);
                            } else {
                                completableFuture4.completeExceptionally(future3.cause());
                            }
                        });
                    } else {
                        completableFuture4.complete(null);
                    }
                    this.bootstrap = null;
                } else {
                    completableFuture3.complete(null);
                    completableFuture4.complete(null);
                }
                FutureUtils.completeAll(Arrays.asList(completableFuture3, completableFuture4)).whenComplete((r4, th) -> {
                    if (th != null) {
                        completableFuture.completeExceptionally(th);
                    } else {
                        completableFuture.complete(null);
                    }
                });
            });
        }
        return completableFuture;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public boolean isHttpsEnabled() {
        return this.sslHandlerFactory != null;
    }

    private String determineProtocol() {
        return isHttpsEnabled() ? "https" : "http";
    }

    private static void registerHandler(Router router, Tuple2<RestHandlerSpecification, ChannelInboundHandler> tuple2, Logger logger) {
        String targetRestEndpointURL = tuple2.f0.getTargetRestEndpointURL();
        for (RestAPIVersion restAPIVersion : tuple2.f0.getSupportedAPIVersions()) {
            String str = '/' + restAPIVersion.getURLVersionPrefix() + targetRestEndpointURL;
            logger.debug("Register handler {} under {}@{}.", new Object[]{tuple2.f1, tuple2.f0.getHttpMethod(), str});
            registerHandler(router, str, tuple2.f0.getHttpMethod(), tuple2.f1);
            if (restAPIVersion.isDefaultVersion()) {
                logger.debug("Register handler {} under {}@{}.", new Object[]{tuple2.f1, tuple2.f0.getHttpMethod(), targetRestEndpointURL});
                registerHandler(router, targetRestEndpointURL, tuple2.f0.getHttpMethod(), tuple2.f1);
            }
        }
    }

    private static void registerHandler(Router router, String str, HttpMethodWrapper httpMethodWrapper, ChannelInboundHandler channelInboundHandler) {
        switch (httpMethodWrapper) {
            case GET:
                router.addGet(str, channelInboundHandler);
                return;
            case POST:
                router.addPost(str, channelInboundHandler);
                return;
            case DELETE:
                router.addDelete(str, channelInboundHandler);
                return;
            case PATCH:
                router.addPatch(str, channelInboundHandler);
                return;
            default:
                throw new RuntimeException("Unsupported http method: " + httpMethodWrapper + '.');
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static void createUploadDir(Path path, Logger logger, boolean z) throws IOException {
        if (Files.exists(path, new LinkOption[0])) {
            return;
        }
        if (z) {
            logger.info("Upload directory {} does not exist. ", path);
        } else {
            logger.warn("Upload directory {} has been deleted externally. Previously uploaded files are no longer available.", path);
        }
        checkAndCreateUploadDir(path, logger);
    }

    private static synchronized void checkAndCreateUploadDir(Path path, Logger logger) throws IOException {
        if (Files.exists(path, new LinkOption[0]) && Files.isWritable(path)) {
            logger.info("Using directory {} for file uploads.", path);
        } else if (Files.isWritable(Files.createDirectories(path, new FileAttribute[0]))) {
            logger.info("Created directory {} for file uploads.", path);
        } else {
            logger.warn("Upload directory {} cannot be created or is not writable.", path);
            throw new IOException(String.format("Upload directory %s cannot be created or is not writable.", path));
        }
    }

    private static void checkAllEndpointsAndHandlersAreUnique(List<Tuple2<RestHandlerSpecification, ChannelInboundHandler>> list) {
        HashSet hashSet = new HashSet();
        Set newSetFromMap = Collections.newSetFromMap(new IdentityHashMap());
        for (Tuple2<RestHandlerSpecification, ChannelInboundHandler> tuple2 : list) {
            if (!newSetFromMap.add(tuple2.f1)) {
                throw new FlinkRuntimeException("Duplicate REST handler instance found. Please ensure each instance is registered only once.");
            }
            RestHandlerSpecification restHandlerSpecification = tuple2.f0;
            for (RestAPIVersion restAPIVersion : restHandlerSpecification.getSupportedAPIVersions()) {
                if (!hashSet.add((restAPIVersion.toString() + restHandlerSpecification.getHttpMethod() + restHandlerSpecification.getTargetRestEndpointURL()).replaceAll(":[\\w-]+", ":param"))) {
                    throw new FlinkRuntimeException(String.format("REST handler registration overlaps with another registration for: version=%s, method=%s, url=%s.", restAPIVersion, restHandlerSpecification.getHttpMethod(), restHandlerSpecification.getTargetRestEndpointURL()));
                }
            }
        }
    }
}
