package org.apache.sling.distribution.journal.kafka;

import com.fasterxml.jackson.databind.ObjectMapper;
import java.nio.charset.StandardCharsets;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.stream.Collectors;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.internals.RecordHeader;
import org.apache.sling.distribution.journal.ExceptionEventSender;
import org.apache.sling.distribution.journal.MessageSender;
import org.apache.sling.distribution.journal.MessagingException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/sling/distribution/journal/kafka/KafkaJsonMessageSender.class */
public class KafkaJsonMessageSender<T> implements MessageSender<T> {
    private static final Logger LOG = LoggerFactory.getLogger(KafkaJsonMessageSender.class);
    private final ObjectMapper mapper = new ObjectMapper();
    private final KafkaProducer<String, String> producer;
    private final ExceptionEventSender eventSender;
    private final String topic;

    public KafkaJsonMessageSender(KafkaProducer<String, String> kafkaProducer, String str, ExceptionEventSender exceptionEventSender) {
        this.topic = str;
        this.eventSender = exceptionEventSender;
        this.producer = (KafkaProducer) Objects.requireNonNull(kafkaProducer);
    }

    public void send(T t) throws MessagingException {
        send(t, Collections.emptyMap());
    }

    public void send(T t, Map<String, String> map) throws MessagingException {
        try {
            String writeValueAsString = this.mapper.writerFor(t.getClass()).writeValueAsString(t);
            List list = (List) map.entrySet().stream().map(this::toHeader).collect(Collectors.toList());
            list.add(header(KafkaMessageInfo.KEY_MESSAGE_TYPE, t.getClass().getSimpleName()));
            LOG.info("Sent to topic={}, offset={}", this.topic, Long.valueOf(((RecordMetadata) this.producer.send(new ProducerRecord(this.topic, 0, (Object) null, writeValueAsString, list)).get()).offset()));
        } catch (Exception e) {
            this.eventSender.send(e);
            throw new MessagingException(String.format("Failed to send JSON message on topic %s", this.topic), e);
        }
    }

    private Header toHeader(Map.Entry<String, String> entry) {
        return header(entry.getKey(), entry.getValue());
    }

    private RecordHeader header(String str, String str2) {
        return new RecordHeader(str, str2.getBytes(StandardCharsets.UTF_8));
    }
}
