Connecting SpringBoot to Kafka

Table of Contents

Creating topics

Let's explore how to create topics using SpringBoot.

1@Configuration
2  public class KafkaTopicConfig {
3      @Value(value="${spring.kafka.bootstrap-servers:localhost:9092}")
4      private String bootstrapAddress;
5  
6      @Bean
7      public KafkaAdmin kafkaAdmin() {
8          Map<String, Object> configs = new HashMap<>();
9          //configure bootstrap servers using bootstrap address
10          configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
11          return new KafkaAdmin(configs);
12      }
13  
14      @Bean
15      public NewTopic topic1() {
16          return new NewTopic("tweets", 1, (short) 1);
17          // takes in (String name, int numPartitions, short replicationFactor)
18      }
19  
20  }
21  

In the code above, we define a Kafka configuration class, KafkaTopicConfig, to manage the creation and configuration of Kafka topics within our application.

Firstly, we can create a KafkaAdmin bean. This bean is responsible for creating new topics in our broker. To add a topic to the broker, we can create a NewTopic @bean. Here, we are creating a NewTopic called tweets, with 1 partitions and a replication factor of 1.

  • Partitions: The topic is configured with 1 partition. Partitions allow for parallel message processing and better scalability.
  • Replication Factor: The replication factor is set to 1, meaning the topic's data is not replicated. This is set as 1 for simplicity sake but is generally not advised as it lacks fault tolerance.

Configuring Producers

To create producers in Kafka, we need to define a ProducerFactory and a KafkaTemplate. The ProducerFactory sets the strategy for creating Kafka producer instances. In this configuration, we specify the bootstrap.servers address and define serializers for both the keys and values.

The key determines the partition to which a message is routed. Kafka uses the key to generate a hash, which maps the message to a specific partition. The value, on the other hand, is the actual data payload being sent. By setting the appropriate serializers, we ensure that both keys and values are correctly formatted for transmission.

1@Configuration
2public class KafkaProducerConfig {
3    @Value(value="${spring.kafka.bootstrap-servers:localhost:9092}")
4    private String bootstrapAddress;
5    
6    //ProducerFactory is used to set the strategy for creating kafka producer instances
7    @Bean
8    public ProducerFactory<String, String> producerFactory() {
9        Map<String, Object> configProps = new HashMap<>();
10        configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
11        configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
12        configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
13        return new DefaultKafkaProducerFactory<>(configProps);
14    }
15
16    @Bean
17    public ProducerFactory<String, Greeting> greetingProducerFactory() {
18        Map<String, Object> configProps = new HashMap<>();
19        configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
20        configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
21        configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
22        return new DefaultKafkaProducerFactory<>(configProps);
23    }
24    
25    @Bean
26    public KafkaTemplate<String, String> kafkaTemplate() {
27        return new KafkaTemplate<>(producerFactory());
28    }
29
30    @Bean
31    public KafkaTemplate<String, Greeting> greetingKafkaTemplate() {
32        return new KafkaTemplate<>(greetingProducerFactory());
33    }
34}

KafkaTemplate is a class which spring boot provides to produce messages into the kafka topic. It is a thin wrapper around a kafka producer, and provides a number of convenience methods for producing to kafka topics.

Producing our first message

1@RestController
2@RequestMapping("/api/publish")
3@Slf4j
4public class MessageController {
5    @Autowired
6    KafkaTemplate<String, String> kafkaTemplate;
7
8    @Autowired
9    KafkaTemplate<String, Greeting> greetingKafkaTemplate;
10
11
12    private static final String TOPIC = "tweets";
13
14    @GetMapping
15    public String publishMessage(@RequestParam String message)
16    {
17        CompletableFuture<SendResult<String, String>> future = kafkaTemplate.send(TOPIC, message);
18        future.whenComplete((result, ex)-> {
19            if(ex == null) {
20                log.info("Message successfully published");
21            } else {
22                log.error(ex.getMessage());
23            }
24        });
25        return "Message published";
26    }
27
28    @PostMapping
29    public String publishGreeting(@RequestBody Greeting greeting)
30    {
31        CompletableFuture<SendResult<String, Greeting>> future = greetingKafkaTemplate.send(TOPIC, greeting);
32        future.whenComplete((result, ex)-> {
33            if(ex == null) {
34                log.info("Message successfully published");
35            } else {
36                log.error(ex.getMessage());
37            }
38        });
39        return "Message published";
40    }
41}
42

To produce a message, we can make use of Spring boot's dependency injection to inject the KafkaTemplate that was previously created in our producer configuration class. With the KafkaTemplate, we can then publish message on a particular topic. In the example above, i have demonstrated how we can publish both a plain String message as well as a Greeting object to our tweets topic.

Configuring Consumers

1@EnableKafka // enables detection of @kafkaListener annotations on any spring managed bean
2@Configuration
3public class KafkaConsumerConfig {
4    @Value(value="${spring.kafka.bootstrap-servers:localhost:9092}")
5    private String bootstrapAddress;
6
7    @Value(value="${spring.kafka.groupId:group1}")
8    private String groupId;
9
10    @Bean
11    public ConsumerFactory<String, String> consumerFactory() {
12        Map<String, Object> props = new HashMap<>();
13        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
14//        props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
15        // default groupId used if there is no groupId(or id) on the @KafkaListener
16        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
17        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
18        return new DefaultKafkaConsumerFactory<>(props);
19    }
20
21    @Bean
22    public ConsumerFactory<String, Greeting> greetingConsumerFactory() {
23        Map<String, Object> props = new HashMap<>();
24        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
25        props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
26        // default groupId used if there is no groupId(or id) on the @KafkaListener
27        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
28        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
29        return new DefaultKafkaConsumerFactory<>(props, new StringDeserializer(), new JsonDeserializer<>(Greeting.class));
30    }
31
32    @Bean
33    public ConcurrentKafkaListenerContainerFactory<String, Greeting> greetingConcurrentKafkaListenerContainerFactory() {
34        ConcurrentKafkaListenerContainerFactory<String, Greeting> factory = new ConcurrentKafkaListenerContainerFactory<>();
35        factory.setConsumerFactory(greetingConsumerFactory());
36        return factory;
37    }
38
39    // The KafkaListenerContainerFactory is used to create containers for annotated methods.
40    @Bean
41    public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
42        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
43        factory.setConsumerFactory(consumerFactory());
44        return factory;
45    }
46
47    //this listener container factory allows filtering of records of words containing World
48    @Bean
49    public ConcurrentKafkaListenerContainerFactory<String, String> filterKafkaListenerContainerFactory() {
50        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
51        factory.setConsumerFactory(consumerFactory());
52        factory.setRecordFilterStrategy(record-> record.value().contains("World"));
53        return factory;
54    }

In code above, we configure the Consumer in our Kafka setup. The @EnableKafka annotation is used to enable detection of KafkaListener annotation on any spring-managed bean in the container. A consumerFactory is used to provide the strategy to produce consumer instances. Additionally, ConcurrentKafkaListenerContainerFactory is used to create containers for methods annotated with @Kafkalistener. We can even use setRecordFilterStrategy to allow filtering of records for words that contain "world".

Setting up listeners

Here, we configure listeners to consume messages from the Kafka topictweets. We have configured two listeners: one to listen to plain string messages and another to listen to custom Greeting objects. We annotate a listener with@KafkaListener and provide the topic,groupId, as well as acontainerFactory.

1@Slf4j
2@Service
3public class Listener {
4    // to do this, requires an @EnableKafka annotation on your @Configuration classes and a listener container factory.
5    //By default, a bean with name kafkaListenerContainerFactory is expected.
6    // The listener container factory is used to configure the underlying ConcurrentMessageListenerContainer. This is a
7    //spring managed container that allows concurrent processing of messages from kafka topics by managing multiple kafka
8    //consumer threads.
9
10    //@KafkaListener is used to designate a bean method as a listener for a listener container
11    @KafkaListener(topics = "tweets", groupId = "group1", containerFactory = "kafkaListenerContainerFactory")
12    public void listen(String data) {
13        log.info(data + " received");
14    }
15
16    @KafkaListener(topics = "tweets", groupId = "group1", containerFactory = "greetingConcurrentKafkaListenerContainerFactory")
17    public void greetingListener(Greeting greeting) {
18        log.info("greeting " + greeting);
19    }
20}

In the example above:

  • The first listener processes string messages from the topictweets using the kafkaListenerContainerFactory.
  • The second listener handles Greetingobjects from the same topic, using a different container factory namedgreetingConcurrentKafkaListenerContainerFactory.

To enable these listeners, ensure that your configuration class includes the@EnableKafka annotation. This allows Spring to detect and manage listener methods efficiently.