package org.apache.sling.distribution.journal.bookkeeper;

import java.io.Closeable;
import java.io.IOException;
import java.io.PrintWriter;
import java.io.StringWriter;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import org.apache.commons.io.IOUtils;
import org.apache.sling.api.resource.LoginException;
import org.apache.sling.api.resource.PersistenceException;
import org.apache.sling.api.resource.ResourceResolver;
import org.apache.sling.api.resource.ResourceResolverFactory;
import org.apache.sling.api.resource.ValueMap;
import org.apache.sling.commons.metrics.Timer;
import org.apache.sling.distribution.ImportPostProcessException;
import org.apache.sling.distribution.ImportPostProcessor;
import org.apache.sling.distribution.common.DistributionException;
import org.apache.sling.distribution.journal.messages.LogMessage;
import org.apache.sling.distribution.journal.messages.PackageMessage;
import org.apache.sling.distribution.journal.messages.PackageStatusMessage;
import org.apache.sling.distribution.journal.shared.DistributionMetricsService;
import org.apache.sling.distribution.journal.shared.NoOpImportPostProcessor;
import org.osgi.service.event.EventAdmin;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/sling/distribution/journal/bookkeeper/BookKeeper.class */
public class BookKeeper implements Closeable {
    public static final String STORE_TYPE_STATUS = "statuses";
    public static final String KEY_OFFSET = "offset";
    public static final int COMMIT_AFTER_NUM_SKIPPED = 10;
    private static final String SUBSERVICE_IMPORTER = "importer";
    private static final String SUBSERVICE_BOOKKEEPER = "bookkeeper";
    private static final int RETRY_SEND_DELAY = 1000;
    private final Logger log;
    private final ResourceResolverFactory resolverFactory;
    private final DistributionMetricsService distributionMetricsService;
    private final PackageHandler packageHandler;
    private final EventAdmin eventAdmin;
    private final Consumer<PackageStatusMessage> sender;
    private final Consumer<LogMessage> logSender;
    private final BookKeeperConfig config;
    private final boolean errorQueueEnabled;
    private final PackageRetries packageRetries;
    private final LocalStore statusStore;
    private final LocalStore processedOffsets;
    private final DistributionMetricsService.GaugeService<Integer> retriesGauge;
    private final ImportPostProcessor importPostProcessor;
    private int skippedCounter;

    /* loaded from: input_file:org/apache/sling/distribution/journal/bookkeeper/BookKeeper$PackageStatus.class */
    public static class PackageStatus {
        public final PackageStatusMessage.Status status;
        final Long offset;
        final String pubAgentName;
        final Boolean sent;

        PackageStatus(PackageStatusMessage.Status status, long j, String str) {
            this.status = status;
            this.offset = Long.valueOf(j);
            this.pubAgentName = str;
            this.sent = false;
        }

        public PackageStatus(ValueMap valueMap) {
            Integer num = (Integer) valueMap.get("statusNumber", Integer.class);
            this.status = num != null ? PackageStatusMessage.Status.fromNumber(num.intValue()) : null;
            this.offset = (Long) valueMap.get(BookKeeper.KEY_OFFSET, Long.class);
            this.pubAgentName = (String) valueMap.get("pubAgentName", String.class);
            this.sent = (Boolean) valueMap.get("sent", true);
        }

        Map<String, Object> asMap() {
            HashMap hashMap = new HashMap();
            hashMap.put("pubAgentName", this.pubAgentName);
            hashMap.put("statusNumber", Integer.valueOf(this.status.getNumber()));
            hashMap.put(BookKeeper.KEY_OFFSET, this.offset);
            hashMap.put("sent", this.sent);
            return hashMap;
        }
    }

    public BookKeeper(ResourceResolverFactory resourceResolverFactory, DistributionMetricsService distributionMetricsService, PackageHandler packageHandler, EventAdmin eventAdmin, Consumer<PackageStatusMessage> consumer, Consumer<LogMessage> consumer2, BookKeeperConfig bookKeeperConfig) {
        this(resourceResolverFactory, distributionMetricsService, packageHandler, eventAdmin, consumer, consumer2, bookKeeperConfig, new NoOpImportPostProcessor());
    }

    public BookKeeper(ResourceResolverFactory resourceResolverFactory, DistributionMetricsService distributionMetricsService, PackageHandler packageHandler, EventAdmin eventAdmin, Consumer<PackageStatusMessage> consumer, Consumer<LogMessage> consumer2, BookKeeperConfig bookKeeperConfig, ImportPostProcessor importPostProcessor) {
        this.log = LoggerFactory.getLogger(getClass());
        this.packageRetries = new PackageRetries();
        this.skippedCounter = 0;
        this.packageHandler = packageHandler;
        this.eventAdmin = eventAdmin;
        this.sender = consumer;
        this.logSender = consumer2;
        this.config = bookKeeperConfig;
        String str = "distribution.journal.subscriber.current_retries;sub_name=" + bookKeeperConfig.getSubAgentName();
        PackageRetries packageRetries = this.packageRetries;
        Objects.requireNonNull(packageRetries);
        this.retriesGauge = distributionMetricsService.createGauge(str, "Retries of current package", packageRetries::getSum);
        this.resolverFactory = resourceResolverFactory;
        this.distributionMetricsService = distributionMetricsService;
        this.errorQueueEnabled = bookKeeperConfig.getMaxRetries() >= 0;
        this.statusStore = new LocalStore(resourceResolverFactory, STORE_TYPE_STATUS, bookKeeperConfig.getSubAgentName());
        this.processedOffsets = new LocalStore(resourceResolverFactory, bookKeeperConfig.getPackageNodeName(), bookKeeperConfig.getSubAgentName());
        this.importPostProcessor = importPostProcessor;
        this.log.info("Started bookkeeper {}.", bookKeeperConfig);
    }

    public void importPackage(PackageMessage packageMessage, long j, long j2) throws DistributionException {
        this.log.debug("Importing distribution package {} at offset={}", packageMessage, Long.valueOf(j));
        try {
            Timer.Context time = this.distributionMetricsService.getImportedPackageDuration().time();
            try {
                ResourceResolver serviceResolver = getServiceResolver(SUBSERVICE_IMPORTER);
                try {
                    this.packageHandler.apply(serviceResolver, packageMessage);
                    if (this.config.isEditable()) {
                        storeStatus(serviceResolver, new PackageStatus(PackageStatusMessage.Status.IMPORTED, j, packageMessage.getPubAgentName()));
                    }
                    storeOffset(serviceResolver, j);
                    serviceResolver.commit();
                    this.distributionMetricsService.getImportedPackageSize().update(packageMessage.getPkgLength());
                    this.distributionMetricsService.getPackageDistributedDuration().update(System.currentTimeMillis() - j2, TimeUnit.MILLISECONDS);
                    postProcess(packageMessage);
                    this.packageRetries.clear(packageMessage.getPubAgentName());
                    this.eventAdmin.postEvent(new ImportedEvent(packageMessage, this.config.getSubAgentName()).toEvent());
                    this.log.info("Imported distribution package {} at offset={}", packageMessage, Long.valueOf(j));
                    if (serviceResolver != null) {
                        serviceResolver.close();
                    }
                    if (time != null) {
                        time.close();
                    }
                } catch (Throwable th) {
                    if (serviceResolver != null) {
                        try {
                            serviceResolver.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    }
                    throw th;
                }
            } finally {
            }
        } catch (DistributionException | LoginException | IOException | RuntimeException | ImportPostProcessException e) {
            failure(packageMessage, j, e);
        }
    }

    private void postProcess(PackageMessage packageMessage) throws ImportPostProcessException {
        this.log.debug("Executing import post processor for package [{}]", packageMessage);
        HashMap hashMap = new HashMap();
        hashMap.put("distribution.type", packageMessage.getReqType().name());
        hashMap.put("distribution.paths", packageMessage.getPaths());
        hashMap.put("distribution.package.id", packageMessage.getPkgId());
        long currentTimeMillis = System.currentTimeMillis();
        this.distributionMetricsService.getImportPostProcessRequest().increment();
        this.importPostProcessor.process(hashMap);
        this.log.debug("Executed import post processor for package [{}]", packageMessage.getPkgId());
        this.distributionMetricsService.getImportPostProcessDuration().update(System.currentTimeMillis() - currentTimeMillis, TimeUnit.MILLISECONDS);
        this.distributionMetricsService.getImportPostProcessSuccess().increment();
    }

    private void failure(PackageMessage packageMessage, long j, Exception exc) throws DistributionException {
        this.distributionMetricsService.getFailedPackageImports().mark();
        String pubAgentName = packageMessage.getPubAgentName();
        int i = this.packageRetries.get(pubAgentName);
        boolean z = this.errorQueueEnabled && i >= this.config.getMaxRetries();
        String format = String.format("Failed attempt (%s/%s) to import the distribution package %s at offset=%d because of '%s', the importer will %s", Integer.valueOf(i), this.errorQueueEnabled ? Integer.toString(this.config.getMaxRetries()) : "infinite", packageMessage, Long.valueOf(j), exc.getMessage(), z ? "skip the package" : "retry later");
        try {
            this.logSender.accept(getLogMessage(pubAgentName, format, exc));
        } catch (Exception e) {
            this.log.warn("Error sending log message", e);
        }
        if (!z) {
            this.packageRetries.increase(pubAgentName);
            throw new DistributionException(format, exc);
        }
        this.log.warn(format, exc);
        removeFailedPackage(packageMessage, j);
    }

    private LogMessage getLogMessage(String str, String str2, Exception exc) {
        StringWriter stringWriter = new StringWriter();
        exc.printStackTrace(new PrintWriter(stringWriter));
        return LogMessage.builder().pubAgentName(str).subSlingId(this.config.getSubSlingId()).subAgentName(this.config.getSubAgentName()).message(str2).stacktrace(stringWriter.getBuffer().toString()).build();
    }

    public void removePackage(PackageMessage packageMessage, long j) throws LoginException, PersistenceException {
        this.log.info("Removing distribution package {} of type {} at offset {}", new Object[]{packageMessage.getPkgId(), packageMessage.getReqType(), Long.valueOf(j)});
        Timer.Context time = this.distributionMetricsService.getRemovedPackageDuration().time();
        ResourceResolver serviceResolver = getServiceResolver(SUBSERVICE_BOOKKEEPER);
        try {
            if (this.config.isEditable()) {
                storeStatus(serviceResolver, new PackageStatus(PackageStatusMessage.Status.REMOVED, j, packageMessage.getPubAgentName()));
            }
            storeOffset(serviceResolver, j);
            serviceResolver.commit();
            if (serviceResolver != null) {
                serviceResolver.close();
            }
            this.packageRetries.clear(packageMessage.getPubAgentName());
            time.stop();
        } catch (Throwable th) {
            if (serviceResolver != null) {
                try {
                    serviceResolver.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    public void skipPackage(long j) throws LoginException, PersistenceException {
        this.log.info("Skipping package at offset={}", Long.valueOf(j));
        if (shouldCommitSkipped()) {
            ResourceResolver serviceResolver = getServiceResolver(SUBSERVICE_BOOKKEEPER);
            try {
                storeOffset(serviceResolver, j);
                serviceResolver.commit();
                if (serviceResolver != null) {
                    serviceResolver.close();
                }
            } catch (Throwable th) {
                if (serviceResolver != null) {
                    try {
                        serviceResolver.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
                throw th;
            }
        }
    }

    public synchronized boolean shouldCommitSkipped() {
        this.skippedCounter++;
        if (this.skippedCounter <= 10) {
            return false;
        }
        this.skippedCounter = 1;
        return true;
    }

    public boolean sendStoredStatus(int i) {
        PackageStatus packageStatus = new PackageStatus(this.statusStore.load());
        return packageStatus.sent.booleanValue() || sendStoredStatus(packageStatus, i);
    }

    private boolean sendStoredStatus(PackageStatus packageStatus, int i) {
        try {
            sendStatusMessage(packageStatus);
            markStatusSent();
            return true;
        } catch (Exception e) {
            this.log.warn("Cannot send status (retry {})", Integer.valueOf(i), e);
            retryDelay();
            return false;
        }
    }

    private void sendStatusMessage(PackageStatus packageStatus) {
        PackageStatusMessage build = PackageStatusMessage.builder().subSlingId(this.config.getSubSlingId()).subAgentName(this.config.getSubAgentName()).pubAgentName(packageStatus.pubAgentName).offset(packageStatus.offset.longValue()).status(packageStatus.status).build();
        this.sender.accept(build);
        this.log.info("Sent status message {}", build);
    }

    public void markStatusSent() {
        try {
            ResourceResolver serviceResolver = getServiceResolver(SUBSERVICE_BOOKKEEPER);
            try {
                this.statusStore.store(serviceResolver, "sent", true);
                serviceResolver.commit();
                if (serviceResolver != null) {
                    serviceResolver.close();
                }
            } finally {
            }
        } catch (Exception e) {
            this.log.warn("Failed to mark status as sent", e);
        }
    }

    public long loadOffset() {
        return ((Long) this.processedOffsets.load(KEY_OFFSET, (String) (-1L))).longValue();
    }

    public int getRetries(String str) {
        return this.packageRetries.get(str);
    }

    public PackageRetries getPackageRetries() {
        return this.packageRetries;
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() throws IOException {
        IOUtils.closeQuietly(this.retriesGauge);
    }

    private void removeFailedPackage(PackageMessage packageMessage, long j) throws DistributionException {
        this.log.info("Removing failed distribution package {} at offset={}", packageMessage, Long.valueOf(j));
        Timer.Context time = this.distributionMetricsService.getRemovedFailedPackageDuration().time();
        try {
            ResourceResolver serviceResolver = getServiceResolver(SUBSERVICE_BOOKKEEPER);
            try {
                storeStatus(serviceResolver, new PackageStatus(PackageStatusMessage.Status.REMOVED_FAILED, j, packageMessage.getPubAgentName()));
                storeOffset(serviceResolver, j);
                serviceResolver.commit();
                if (serviceResolver != null) {
                    serviceResolver.close();
                }
                time.stop();
            } finally {
            }
        } catch (Exception e) {
            throw new DistributionException("Error removing failed package", e);
        }
    }

    private void storeStatus(ResourceResolver resourceResolver, PackageStatus packageStatus) throws PersistenceException {
        Map<String, Object> asMap = packageStatus.asMap();
        this.statusStore.store(resourceResolver, asMap);
        this.log.info("Stored status {}", asMap);
    }

    private void storeOffset(ResourceResolver resourceResolver, long j) throws PersistenceException {
        this.processedOffsets.store(resourceResolver, KEY_OFFSET, Long.valueOf(j));
    }

    private ResourceResolver getServiceResolver(String str) throws LoginException {
        return this.resolverFactory.getServiceResourceResolver(Collections.singletonMap("sling.service.subservice", str));
    }

    static void retryDelay() {
        try {
            Thread.sleep(1000L);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }
}
