Publishing Events to Kafka

When Event Messages are published to an Event Bus (or Event Store), they can be forwarded to a Kafka topic using the KafkaPublisher. To achieve this it will utilize a Kafka Producer, retrieved through Axon’s ProducerFactory. The KafkaPublisher in turn receives the events to publish from a KafkaEventPublisher.

Since the KafkaEventPublisher is an event message handler in Axon terms, we can provide it to any Event Processor to receive the published events. The choice of event processor which brings differing characteristics for event publication to Kafka:

  • Subscribing Event Processor - publication of messages to Kafka will occur in the same thread (and Unit of Work) which published the events to the event bus. This approach ensures failure to publish to Kafka enforces failure of the initial event publication on the event bus

  • Tracking Event Processor - publication of messages to Kafka is run in a different thread (and Unit of Work) than the one which published the events to the event bus. This approach ensures the event has been published on the event bus regardless of whether publication to Kafka works

When setting up event publication it is also important to take into account which ConfirmationMode is used. The ConfirmationMode influences the process of actually producing an event message on a Kafka topic, but also what kind of Producer the ProducerFactory will instantiate:

  • TRANSACTIONAL - This will require the Producer to start, commit and (in case of failure) rollback the transaction of publishing an event message. Alongside this, it will create a pool of Producer instances in the ProducerFactory to avoid continuous creation of new ones, requiring the user to provide a "transactional id prefix" to uniquely identify every Producer in the pool.

  • WAIT_FOR_ACK - Setting "WAIT_FOR_ACK" as the ConfirmationMode will require the Producer instance to wait for a default of 1 second (configurable on the KafkaPublisher) until the event message publication has been acknowledged. Alongside this, it will create a single, shareable Producer instance from within the ProducerFactory.

  • NONE - This is the default mode, which only ensures a single, shareable Producer instance from within the ProducerFactory.

Configuring event publication to Kafka

It is a several step process to configure Event publication to Kafka, which starts with the ProducerFactory. Axon provides the DefaultProducerFactory implementation of the ProducerFactory, which should be instantiated through the provided DefaultProducerFactory.Builder.

The builder has one hard requirement, which is the Producer configuration Map. The Map contains the settings to use for the Kafka Producer client, such as the Kafka instance locations. Please check the Kafka documentation for the possible settings and their values.

public class KafkaEventPublicationConfiguration {
    // ...
    public ProducerFactory<String, byte[]> producerFactory(Duration closeTimeout,
                                                           int producerCacheSize,
                                                           Map<String, Object> producerConfiguration,
                                                           ConfirmationMode confirmationMode,
                                                           String transactionIdPrefix) {
        return DefaultProducerFactory.<String, byte[]>builder()
                .closeTimeout(closeTimeout)                 // Defaults to "30" seconds
                .producerCacheSize(producerCacheSize)       // Defaults to "10"; only used for "TRANSACTIONAL" mode
                .configuration(producerConfiguration)       // Hard requirement
                .confirmationMode(confirmationMode)         // Defaults to a Confirmation Mode of "NONE"
                .transactionalIdPrefix(transactionIdPrefix) // Hard requirement when in "TRANSACTIONAL" mode
                .build();
    }
    // ...
}

The second infrastructure component to introduce is the KafkaPublisher, which has a hard requirement on the ProducerFactory. Additionally, this would be the place to define the Kafka topics upon which Axon event messages will be published. You can set a function from event to Optional<String>. You can use this to only publish certain events, or put different events to different topics. Its not uncommon for Kafka topics to only contain one type of message. Note that the KafkaPublisher needs to be shutDown properly, to ensure all Producer instances are properly closed.

public class KafkaEventPublicationConfiguration {
    // ...

    public KafkaPublisher<String, byte[]> kafkaPublisher(String topic,
                                                         ProducerFactory<String, byte[]> producerFactory,
                                                         KafkaMessageConverter<String, byte[]> kafkaMessageConverter,
                                                         int publisherAckTimeout) {
        return KafkaPublisher.<String, byte[]>builder()
                .topicResolver(m -> Optional.of(topic))     // Defaults to "Axon.Events" for all events
                .producerFactory(producerFactory)           // Hard requirement
                .messageConverter(kafkaMessageConverter)    // Defaults to a "DefaultKafkaMessageConverter"
                .publisherAckTimeout(publisherAckTimeout)   // Defaults to "1000" milliseconds; only used for "WAIT_FOR_ACK" mode
                .build();
    }
    // ...
}

Lastly, we need to provide Axon’s event messages to the KafkaPublisher. To that end a KafkaEventPublisher should be instantiated through the builder pattern. Remember to add the KafkaEventPublisher to an event processor implementation of your choice. It is recommended to use the KafkaEventPublisher#DEFAULT_PROCESSING_GROUP as the processing group name of the event processor to distinguish it from other event processors.

public class KafkaEventPublicationConfiguration {
    // ...
    public KafkaEventPublisher<String, byte[]> kafkaEventPublisher(KafkaPublisher<String, byte[]> kafkaPublisher) {
        return KafkaEventPublisher.<String, byte[]>builder()
                .kafkaPublisher(kafkaPublisher)             // Hard requirement
                .build();
    }

    public void registerPublisherToEventProcessor(EventProcessingConfigurer eventProcessingConfigurer,
                                                  KafkaEventPublisher<String, byte[]> kafkaEventPublisher) {
        String processingGroup = KafkaEventPublisher.DEFAULT_PROCESSING_GROUP;
        eventProcessingConfigurer.registerEventHandler(configuration -> kafkaEventPublisher)
                                 .assignHandlerTypesMatching(
                                         processingGroup,
                                         clazz -> clazz.isAssignableFrom(KafkaEventPublisher.class)
                                 )
                                 .registerSubscribingEventProcessor(processingGroup);
        // Replace `registerSubscribingEventProcessor` for `registerTrackingEventProcessor` to use a tracking processor
    }
    // ...
}

Topic partition publication considerations

Kafka ensures message ordering on a topic-partition level, not on an entire topic. To control events of a certain group to be placed in a dedicated partition, based on aggregate identifier for example, the message converter’s SequencingPolicy can be utilized.

The topic-partition pair events have been published in also has impact on event consumption. This extension mitigates any ordering concerns with the streamable solution, by ensuring a Consumer always receives all events of a topic to be able to perform a complete ordering. This guarantee is however not given when using the subscribable event consumption approach. The subscribable stream leaves all the ordering specifics in the hands of Kafka, which means the events should be published on a consistent partition to ensure ordering.