package org.apache.flink.runtime.jobmaster.slotpool;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.stream.Collectors;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
import org.apache.flink.runtime.jobmaster.SlotInfo;
import org.apache.flink.runtime.jobmaster.slotpool.AllocatedSlotPool;
import org.apache.flink.runtime.jobmaster.slotpool.DeclarativeSlotPool;
import org.apache.flink.runtime.slots.DefaultRequirementMatcher;
import org.apache.flink.runtime.slots.RequirementMatcher;
import org.apache.flink.runtime.slots.ResourceRequirement;
import org.apache.flink.runtime.taskexecutor.slot.SlotOffer;
import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
import org.apache.flink.runtime.util.ResourceCounter;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/runtime/jobmaster/slotpool/DefaultDeclarativeSlotPool.class */
public class DefaultDeclarativeSlotPool implements DeclarativeSlotPool {
    private static final Logger LOG = LoggerFactory.getLogger(DefaultDeclarativeSlotPool.class);
    private final Consumer<? super Collection<ResourceRequirement>> notifyNewResourceRequirements;
    private final Time idleSlotTimeout;
    private final Time rpcTimeout;
    private final JobID jobId;
    private final AllocatedSlotPool slotPool;
    private DeclarativeSlotPool.NewSlotsListener newSlotsListener = DeclarativeSlotPool.NoOpNewSlotsListener.INSTANCE;
    private final RequirementMatcher requirementMatcher = new DefaultRequirementMatcher();
    private ResourceCounter totalResourceRequirements = ResourceCounter.empty();
    private ResourceCounter fulfilledResourceRequirements = ResourceCounter.empty();
    private final Map<AllocationID, ResourceProfile> slotToRequirementProfileMappings = new HashMap();

    public DefaultDeclarativeSlotPool(JobID jobID, AllocatedSlotPool allocatedSlotPool, Consumer<? super Collection<ResourceRequirement>> consumer, Time time, Time time2) {
        this.jobId = jobID;
        this.slotPool = allocatedSlotPool;
        this.notifyNewResourceRequirements = consumer;
        this.idleSlotTimeout = time;
        this.rpcTimeout = time2;
    }

    @Override // org.apache.flink.runtime.jobmaster.slotpool.DeclarativeSlotPool
    public void increaseResourceRequirementsBy(ResourceCounter resourceCounter) {
        if (resourceCounter.isEmpty()) {
            return;
        }
        this.totalResourceRequirements = this.totalResourceRequirements.add(resourceCounter);
        declareResourceRequirements();
    }

    @Override // org.apache.flink.runtime.jobmaster.slotpool.DeclarativeSlotPool
    public void decreaseResourceRequirementsBy(ResourceCounter resourceCounter) {
        if (resourceCounter.isEmpty()) {
            return;
        }
        this.totalResourceRequirements = this.totalResourceRequirements.subtract(resourceCounter);
        declareResourceRequirements();
    }

    @Override // org.apache.flink.runtime.jobmaster.slotpool.DeclarativeSlotPool
    public void setResourceRequirements(ResourceCounter resourceCounter) {
        this.totalResourceRequirements = resourceCounter;
        declareResourceRequirements();
    }

    private void declareResourceRequirements() {
        Collection<ResourceRequirement> resourceRequirements = getResourceRequirements();
        LOG.debug("Declare new resource requirements for job {}.{}\trequired resources: {}{}\tacquired resources: {}", new Object[]{this.jobId, System.lineSeparator(), resourceRequirements, System.lineSeparator(), this.fulfilledResourceRequirements});
        this.notifyNewResourceRequirements.accept(resourceRequirements);
    }

    @Override // org.apache.flink.runtime.jobmaster.slotpool.DeclarativeSlotPool
    public Collection<ResourceRequirement> getResourceRequirements() {
        ArrayList arrayList = new ArrayList();
        for (Map.Entry<ResourceProfile, Integer> entry : this.totalResourceRequirements.getResourcesWithCount()) {
            arrayList.add(ResourceRequirement.create(entry.getKey(), entry.getValue().intValue()));
        }
        return arrayList;
    }

    @Override // org.apache.flink.runtime.jobmaster.slotpool.DeclarativeSlotPool
    public Collection<SlotOffer> offerSlots(Collection<? extends SlotOffer> collection, TaskManagerLocation taskManagerLocation, TaskManagerGateway taskManagerGateway, long j) {
        LOG.debug("Received {} slot offers from TaskExecutor {}.", collection, taskManagerLocation);
        return internalOfferSlots(collection, taskManagerLocation, taskManagerGateway, j, this::matchWithOutstandingRequirement);
    }

    private Collection<SlotOffer> internalOfferSlots(Collection<? extends SlotOffer> collection, TaskManagerLocation taskManagerLocation, TaskManagerGateway taskManagerGateway, long j, Function<ResourceProfile, Optional<ResourceProfile>> function) {
        ArrayList arrayList = new ArrayList();
        ArrayList arrayList2 = new ArrayList();
        for (SlotOffer slotOffer : collection) {
            if (this.slotPool.containsSlot(slotOffer.getAllocationId())) {
                arrayList.add(slotOffer);
            } else {
                Optional<AllocatedSlot> matchOfferWithOutstandingRequirements = matchOfferWithOutstandingRequirements(slotOffer, taskManagerLocation, taskManagerGateway, function);
                if (matchOfferWithOutstandingRequirements.isPresent()) {
                    arrayList.add(slotOffer);
                    arrayList2.add(matchOfferWithOutstandingRequirements.get());
                } else {
                    LOG.debug("Could not match offer {} to any outstanding requirement.", slotOffer.getAllocationId());
                }
            }
        }
        this.slotPool.addSlots(arrayList2, j);
        if (!arrayList2.isEmpty()) {
            LOG.debug("Acquired new resources; new total acquired resources: {}", this.fulfilledResourceRequirements);
            this.newSlotsListener.notifyNewSlotsAreAvailable(arrayList2);
        }
        return arrayList;
    }

    @Override // org.apache.flink.runtime.jobmaster.slotpool.DeclarativeSlotPool
    public void registerSlots(Collection<? extends SlotOffer> collection, TaskManagerLocation taskManagerLocation, TaskManagerGateway taskManagerGateway, long j) {
        LOG.debug("Register slots {} from TaskManager {}.", collection, taskManagerLocation);
        internalOfferSlots(collection, taskManagerLocation, taskManagerGateway, j, this::matchWithOutstandingRequirementOrWildcard);
    }

    private Optional<ResourceProfile> matchWithOutstandingRequirementOrWildcard(ResourceProfile resourceProfile) {
        Optional<ResourceProfile> matchWithOutstandingRequirement = matchWithOutstandingRequirement(resourceProfile);
        return matchWithOutstandingRequirement.isPresent() ? matchWithOutstandingRequirement : Optional.of(ResourceProfile.ANY);
    }

    private Optional<AllocatedSlot> matchOfferWithOutstandingRequirements(SlotOffer slotOffer, TaskManagerLocation taskManagerLocation, TaskManagerGateway taskManagerGateway, Function<ResourceProfile, Optional<ResourceProfile>> function) {
        Optional<ResourceProfile> apply = function.apply(slotOffer.getResourceProfile());
        if (!apply.isPresent()) {
            return Optional.empty();
        }
        ResourceProfile resourceProfile = apply.get();
        LOG.debug("Matched slot offer {} to requirement {}.", slotOffer.getAllocationId(), resourceProfile);
        increaseAvailableResources(ResourceCounter.withResource(resourceProfile, 1));
        AllocatedSlot createAllocatedSlot = createAllocatedSlot(slotOffer, taskManagerLocation, taskManagerGateway);
        this.slotToRequirementProfileMappings.put(createAllocatedSlot.getAllocationId(), resourceProfile);
        return Optional.of(createAllocatedSlot);
    }

    private Optional<ResourceProfile> matchWithOutstandingRequirement(ResourceProfile resourceProfile) {
        RequirementMatcher requirementMatcher = this.requirementMatcher;
        ResourceCounter resourceCounter = this.totalResourceRequirements;
        ResourceCounter resourceCounter2 = this.fulfilledResourceRequirements;
        resourceCounter2.getClass();
        return requirementMatcher.match(resourceProfile, resourceCounter, resourceCounter2::getResourceCount);
    }

    @VisibleForTesting
    ResourceCounter calculateUnfulfilledResources() {
        return this.totalResourceRequirements.subtract(this.fulfilledResourceRequirements);
    }

    private AllocatedSlot createAllocatedSlot(SlotOffer slotOffer, TaskManagerLocation taskManagerLocation, TaskManagerGateway taskManagerGateway) {
        return new AllocatedSlot(slotOffer.getAllocationId(), taskManagerLocation, slotOffer.getSlotIndex(), slotOffer.getResourceProfile(), taskManagerGateway);
    }

    private void increaseAvailableResources(ResourceCounter resourceCounter) {
        this.fulfilledResourceRequirements = this.fulfilledResourceRequirements.add(resourceCounter);
    }

    @Nonnull
    private ResourceProfile getMatchingResourceProfile(AllocationID allocationID) {
        return (ResourceProfile) Preconditions.checkNotNull(this.slotToRequirementProfileMappings.get(allocationID), "No matching resource profile found for %s", allocationID);
    }

    @Override // org.apache.flink.runtime.jobmaster.slotpool.DeclarativeSlotPool
    public PhysicalSlot reserveFreeSlot(AllocationID allocationID, ResourceProfile resourceProfile) {
        AllocatedSlot reserveFreeSlot = this.slotPool.reserveFreeSlot(allocationID);
        Preconditions.checkState(reserveFreeSlot.getResourceProfile().isMatching(resourceProfile), "Slot {} cannot fulfill the given requirement. SlotProfile={} Requirement={}", allocationID, reserveFreeSlot.getResourceProfile(), resourceProfile);
        ResourceProfile resourceProfile2 = (ResourceProfile) Preconditions.checkNotNull(this.slotToRequirementProfileMappings.get(allocationID));
        if (!resourceProfile2.equals(resourceProfile)) {
            updateSlotToRequirementProfileMapping(allocationID, resourceProfile);
            if (resourceProfile2 == ResourceProfile.ANY) {
                LOG.debug("Re-matched slot offer {} to requirement {}.", allocationID, resourceProfile);
            } else {
                LOG.debug("Adjusting requirements because a slot was reserved for a different requirement than initially assumed. Slot={} assumedRequirement={} actualRequirement={}", new Object[]{allocationID, resourceProfile2, resourceProfile});
                adjustRequirements(resourceProfile2, resourceProfile);
            }
        }
        return reserveFreeSlot;
    }

    @Override // org.apache.flink.runtime.jobmaster.slotpool.DeclarativeSlotPool
    public ResourceCounter freeReservedSlot(AllocationID allocationID, @Nullable Throwable th, long j) {
        LOG.debug("Free reserved slot {}.", allocationID);
        Optional<AllocatedSlot> freeReservedSlot = this.slotPool.freeReservedSlot(allocationID, j);
        Optional map = freeReservedSlot.map((v0) -> {
            return Collections.singleton(v0);
        }).map((v1) -> {
            return getFulfilledRequirements(v1);
        });
        freeReservedSlot.ifPresent(allocatedSlot -> {
            releasePayload(Collections.singleton(allocatedSlot), th);
            this.newSlotsListener.notifyNewSlotsAreAvailable(Collections.singletonList(allocatedSlot));
        });
        return (ResourceCounter) map.orElseGet(ResourceCounter::empty);
    }

    private void updateSlotToRequirementProfileMapping(AllocationID allocationID, ResourceProfile resourceProfile) {
        ResourceProfile resourceProfile2 = (ResourceProfile) Preconditions.checkNotNull(this.slotToRequirementProfileMappings.put(allocationID, resourceProfile), "Expected slot profile matching to be non-empty.");
        this.fulfilledResourceRequirements = this.fulfilledResourceRequirements.add(resourceProfile, 1);
        this.fulfilledResourceRequirements = this.fulfilledResourceRequirements.subtract(resourceProfile2, 1);
    }

    private void adjustRequirements(ResourceProfile resourceProfile, ResourceProfile resourceProfile2) {
        decreaseResourceRequirementsBy(ResourceCounter.withResource(resourceProfile2, 1));
        increaseResourceRequirementsBy(ResourceCounter.withResource(resourceProfile, 1));
    }

    @Override // org.apache.flink.runtime.jobmaster.slotpool.DeclarativeSlotPool
    public void registerNewSlotsListener(DeclarativeSlotPool.NewSlotsListener newSlotsListener) {
        Preconditions.checkState(this.newSlotsListener == DeclarativeSlotPool.NoOpNewSlotsListener.INSTANCE, "DefaultDeclarativeSlotPool only supports a single slot listener.");
        this.newSlotsListener = newSlotsListener;
    }

    @Override // org.apache.flink.runtime.jobmaster.slotpool.DeclarativeSlotPool
    public ResourceCounter releaseSlots(ResourceID resourceID, Exception exc) {
        AllocatedSlotPool.AllocatedSlotsAndReservationStatus removeSlots = this.slotPool.removeSlots(resourceID);
        ArrayList arrayList = new ArrayList();
        for (AllocatedSlot allocatedSlot : removeSlots.getAllocatedSlots()) {
            if (!removeSlots.wasFree(allocatedSlot.getAllocationId())) {
                arrayList.add(allocatedSlot);
            }
        }
        return freeAndReleaseSlots(arrayList, removeSlots.getAllocatedSlots(), exc);
    }

    @Override // org.apache.flink.runtime.jobmaster.slotpool.DeclarativeSlotPool
    public ResourceCounter releaseSlot(AllocationID allocationID, Exception exc) {
        boolean containsFreeSlot = this.slotPool.containsFreeSlot(allocationID);
        Optional<AllocatedSlot> removeSlot = this.slotPool.removeSlot(allocationID);
        if (!removeSlot.isPresent()) {
            return ResourceCounter.empty();
        }
        Set singleton = Collections.singleton(removeSlot.get());
        return freeAndReleaseSlots(containsFreeSlot ? Collections.emptySet() : singleton, singleton, exc);
    }

    private ResourceCounter freeAndReleaseSlots(Collection<AllocatedSlot> collection, Collection<AllocatedSlot> collection2, Exception exc) {
        ResourceCounter fulfilledRequirements = getFulfilledRequirements(collection);
        releasePayload(collection, exc);
        releaseSlots(collection2, exc);
        return fulfilledRequirements;
    }

    private void releasePayload(Iterable<? extends AllocatedSlot> iterable, Throwable th) {
        Iterator<? extends AllocatedSlot> it = iterable.iterator();
        while (it.hasNext()) {
            it.next().releasePayload(th);
        }
    }

    @Override // org.apache.flink.runtime.jobmaster.slotpool.DeclarativeSlotPool
    public void releaseIdleSlots(long j) {
        Collection<AllocatedSlotPool.FreeSlotInfo> freeSlotsInformation = this.slotPool.getFreeSlotsInformation();
        ResourceCounter subtract = this.fulfilledResourceRequirements.subtract(this.totalResourceRequirements);
        Iterator<AllocatedSlotPool.FreeSlotInfo> it = freeSlotsInformation.iterator();
        ArrayList arrayList = new ArrayList();
        while (!subtract.isEmpty() && it.hasNext()) {
            AllocatedSlotPool.FreeSlotInfo next = it.next();
            if (j >= next.getFreeSince() + this.idleSlotTimeout.toMilliseconds()) {
                ResourceProfile matchingResourceProfile = getMatchingResourceProfile(next.getAllocationId());
                if (subtract.containsResource(matchingResourceProfile)) {
                    subtract = subtract.subtract(matchingResourceProfile, 1);
                    arrayList.add(this.slotPool.removeSlot(next.getAllocationId()).orElseThrow(() -> {
                        return new IllegalStateException(String.format("Could not find slot for allocation id %s.", next.getAllocationId()));
                    }));
                }
            }
        }
        releaseSlots(arrayList, new FlinkException("Returning idle slots to their owners."));
        LOG.debug("Idle slots have been returned; new total acquired resources: {}", this.fulfilledResourceRequirements);
    }

    private void releaseSlots(Iterable<AllocatedSlot> iterable, Throwable th) {
        for (AllocatedSlot allocatedSlot : iterable) {
            Preconditions.checkState(!allocatedSlot.isUsed(), "Free slot must not be used.");
            if (LOG.isDebugEnabled()) {
                LOG.info("Releasing slot [{}].", allocatedSlot.getAllocationId(), th);
            } else {
                LOG.info("Releasing slot [{}].", allocatedSlot.getAllocationId());
            }
            this.fulfilledResourceRequirements = this.fulfilledResourceRequirements.subtract(getMatchingResourceProfile(allocatedSlot.getAllocationId()), 1);
            this.slotToRequirementProfileMappings.remove(allocatedSlot.getAllocationId());
            allocatedSlot.getTaskManagerGateway().freeSlot(allocatedSlot.getAllocationId(), th, this.rpcTimeout).whenComplete((acknowledge, th2) -> {
                if (th2 != null) {
                    LOG.debug("Releasing slot [{}] of registered TaskExecutor {} failed. Discarding slot.", new Object[]{allocatedSlot.getAllocationId(), allocatedSlot.getTaskManagerId(), th2});
                }
            });
        }
    }

    @Override // org.apache.flink.runtime.jobmaster.slotpool.DeclarativeSlotPool
    public Collection<SlotInfoWithUtilization> getFreeSlotsInformation() {
        return (Collection) this.slotPool.getFreeSlotsInformation().stream().map((v0) -> {
            return v0.asSlotInfo();
        }).collect(Collectors.toList());
    }

    @Override // org.apache.flink.runtime.jobmaster.slotpool.DeclarativeSlotPool
    public Collection<? extends SlotInfo> getAllSlotsInformation() {
        return this.slotPool.getAllSlotsInformation();
    }

    @Override // org.apache.flink.runtime.jobmaster.slotpool.DeclarativeSlotPool
    public boolean containsFreeSlot(AllocationID allocationID) {
        return this.slotPool.containsFreeSlot(allocationID);
    }

    @Override // org.apache.flink.runtime.jobmaster.slotpool.DeclarativeSlotPool
    public boolean containsSlots(ResourceID resourceID) {
        return this.slotPool.containsSlots(resourceID);
    }

    private ResourceCounter getFulfilledRequirements(Iterable<? extends AllocatedSlot> iterable) {
        ResourceCounter empty = ResourceCounter.empty();
        Iterator<? extends AllocatedSlot> it = iterable.iterator();
        while (it.hasNext()) {
            empty = empty.add(getMatchingResourceProfile(it.next().getAllocationId()), 1);
        }
        return empty;
    }

    @VisibleForTesting
    ResourceCounter getFulfilledResourceRequirements() {
        return this.fulfilledResourceRequirements;
    }
}
