Apache Kafka With Topic Partition Implementation
Posted By : Ankur Bansala | 28-Jul-2018
What is Topic In Kafka?
We use Kafka to store data in it and In Kafka data is stored in topics.
In another word, Messages'stream that belonging to the same category is often called a topic.
Now moving on to the topic partition concept -
What is topic partitions?
Kafka topics can be divided into the number of partitions. In Kafka, Concept of partitions allows us to store the same kind of data in the same partition and consumer can read that data directly from that partition by topic name or multiple consumers also can read that data from that partition. Multiple partitions of a topic are responsible for very high message/data processing throughput.
Partitions of a topic come in a picture when you want to create only one topic and in that topic, you want to further divide the category of your data or you want to archive maximum throughput on the topic by applying multiple listeners (consumers) and those listeners will listen from their separate partitions.
How to create the partition of a topic?
By default, only one partition is created when the topic is created in Kafka. You can change this configuration as shown below:-
Go to your Kafka dir -- open config dir --> open server.properties file in any text editor
Now you will find "num.partitions=1" change it to "num.partitions=4"
Now, every time any topic is created in Kafka, it contains 4 partitions.
If you want to give partition number at the time of creating a topic, you can see below :
bin/
Now, see the description of the created topic.
There are 4 partitions, as shown above, you can see that.
Now move to the coding part.
What is the concept of Key in Kafka?
If we mapped a Key to the purticuler partition then data will go to the that mapped partition only otherwise Kafka uses round robin algo to produce data in partitions.
Key is nothing just a string value and that value we can define anything and mapped it to the partiton.
How to implement partition in Spring?
As we created a topic with the name "
Let's produce some data in the partition of the topic.
@Service
public class KafkaProducerImpl<T> implements KafkaProducer<T> {
@Resource(name = "kafkaTemplate")
private KafkaTemplate<String, Object> kafkaTemplate;
@Override
public int send(String topic, T data) throws InterruptedException, ExecutionException {
try {
kafkaTemplate.send(topic, 0, data);//0 is the partiton number
kafkaTemplate.send(topic, 1, data);//1 is the partition number
}
kafkaTemplate.flush();
} catch (Exception e) {
logger.error("describe error or handle accordingly {}", e);
}
return 0;
}
}
Let's consume data in the partition that we have produced of the topic.
@Component
public class KafkaConsumerServiceImpl<T> implements KafkaConsumerService<T> {
@KafkaListener(topicPartitions = {@TopicPartition(topic = "testtopic", partitions = { "0" }) })
public void consumerOne(T content) {
ConsumerRecord<String, T> records = null;
try {
records = (ConsumerRecord<String, T>) content;
log.info("records value from 0 partion= {}", records.value());
} catch (IOException e) {
fileUtil.saveData(records.value());
log.error("exception while saving data= {}", e.getMessage());
}
}
}
I hope this will help you to explore further.
Cookies are important to the proper functioning of a site. To improve your experience, we use cookies to remember log-in details and provide secure log-in, collect statistics to optimize site functionality, and deliver content tailored to your interests. Click Agree and Proceed to accept cookies and go directly to the site or click on View Cookie Settings to see detailed descriptions of the types of cookies and choose whether to accept certain cookies while on the site.
About Author
Ankur Bansala
Ankur is an experienced Java back-end Developer and having capabilities to build web application.