Kafka Implementation With Spring Boot
Posted By : Avnish Pandey | 28-Mar-2018
Kafka :
Kafka is a distributed massaging system. Kafka is a very fast and reliable massaging system. It is the best replacement for more traditional message broker. If we compare the Kafka with another massaging service then we found it has better throughput, replication, and fault-tolerance, which makes Kafka very reliable.
Kafka integration with Spring Boot :
Spring has already given a library for implementing Kafka with Spring Boot. By using this library we can create the producer for producing data and consumer for consuming the data. We just need to add the dependency for spring. It automatically downloads the Kafka library, then we can use the spring library for Kafka.
Here are the dependencies which we have to add in pom.XML :
<dependency> <groupId> org.springframework.kafka </groupId> <artifactId> spring-kafka </artifactId> </dependency>
Now we have to configure the Kafka :
package com.kakfa; import java.util.HashMap; import java.util.Map; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.common.serialization.StringSerializer; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.kafka.annotation.EnableKafka; import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; import org.springframework.kafka.core.ConsumerFactory; import org.springframework.kafka.core.DefaultKafkaConsumerFactory; import org.springframework.kafka.core.DefaultKafkaProducerFactory; import org.springframework.kafka.core.KafkaTemplate; @EnableKafka @Configuration public class KafkaConfiguration { private static final String broker = "localhost:9092"; private static final String groupId = "kafka-group"; @Bean public KafkaTemplate<String, String> getKafkaTemplate() { Map<String, Object> props = new HashMap<>(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, broker); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); return new KafkaTemplate<>(new DefaultKafkaProducerFactory<>(props)); } @Bean public ConsumerFactory<String, String> consumerFactory() { Map<String, Object> props = new HashMap<>(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, broker); props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); return new DefaultKafkaConsumerFactory<>(props); } @Bean public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() { ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(consumerFactory()); return factory; } }
Producing the data by using KafkaTemplate class object :
package com.kakfa; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.kafka.core.KafkaTemplate; public class KafkaProducer { @Autowired private KafkaTemplate<String, String> kafkaTemplate; public void produce(String data) { kafkaTemplate.send("topic", data); } }
Consuming the data by using KafkaListner annotation :
package com.kakfa; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.stereotype.Component; @Component public class KafkaConsumer { public static Logger LOGGER = LoggerFactory.getLogger(KafkaConsumer.class); @KafkaListener(topics = "topic") public void consume(String content) { LOGGER.info("Consumed data :: "+content); } }
Thanks,
Cookies are important to the proper functioning of a site. To improve your experience, we use cookies to remember log-in details and provide secure log-in, collect statistics to optimize site functionality, and deliver content tailored to your interests. Click Agree and Proceed to accept cookies and go directly to the site or click on View Cookie Settings to see detailed descriptions of the types of cookies and choose whether to accept certain cookies while on the site.
About Author
Avnish Pandey
Avnish has a good knowledge in core & advance Java, Spring and Hibernate Framework. He loves to learn new technologies.