Customizing Event Message Format

In the previous sections the KafkaMessageConverter<K, V> has been shown as a requirement for event production and consumption. The K is the format of the message’s key, where the V stand for the message’s value. The extension provides a DefaultKafkaMessageConverter which converts an axon EventMessage to a Kafka ProducerRecord, and an ConsumerRecord back into an EventMessage. This DefaultKafkaMessageConverter uses String as the key and byte[] as the value of the message to de-/serialize.

Albeit the default, this implementation allows for some customization, such as how the EventMessage MetaData is mapped to Kafka headers. This is achieved by adjusting the "header value mapper" in the DefaultKafkaMessageConverter builder.

The SequencingPolicy can be adjusted to change the behaviour of the record key being used. The default sequencing policy is the SequentialPerAggregatePolicy, which leads to the aggregate identifier of an event being the key of a ProducerRecord and ConsumerRecord.

The format of an event message defines an API between the producer and the consumer of the message. This API may change over time, leading to incompatibility between the event class' structure on the receiving side and the event structure of a message containing the old format. Axon addresses the topic of Event Versioning by introducing Event Upcasters. The DefaultKafkaMessageConverter supports this by provisioning an EventUpcasterChain and run the upcasting process on the MetaData and Payload of individual messages converted from ConsumerRecord before those are passed to the Serializer and converted into Event instances.

Note that the KafkaMessageConverter feeds the upcasters with messages one-by-one, limiting it to one-to-one or one-to-many upcasting only. Upcasters performing a many-to-one or many-to-many operation thus won’t be able to operate inside the extension (yet).

Lastly, the Serializer used by the converter can be adjusted. See the Serializer section for more details on this.

public class KafkaMessageConversationConfiguration {
    // ...
    public KafkaMessageConverter<String, byte[]> kafkaMessageConverter(Serializer serializer,
                                                                       SequencingPolicy<? super EventMessage<?>> sequencingPolicy,
                                                                       BiFunction<String, Object, RecordHeader> headerValueMapper,
                                                                       EventUpcasterChain upcasterChain) {
        return DefaultKafkaMessageConverter.builder()
                                           .serializer(serializer)                  // Hard requirement
                                           .sequencingPolicy(sequencingPolicy)      // Defaults to a "SequentialPerAggregatePolicy"
                                           .upcasterChain(upcasterChain)            // Defaults to empty upcaster chain
                                           .headerValueMapper(headerValueMapper)    // Defaults to "HeaderUtils#byteMapper()"
                                           .build();
    }
    // ...
}

Make sure to use an identical KafkaMessageConverter on both the producing and consuming end, as otherwise exception upon deserialization should be expected. A CloudEventKafkaMessageConverter is also available using the Cloud Events spec.