In this post I will be demonstrating about how you can
implement Java producer which can connect to multiple brokers and how you can produce messages to different partitions in a topic.
I also published couple of other posts about Kafka. If you are new and would like to learn Kafka from scratch, I would recommend to walk through below posts first.
Prerequisite
I am assuming that you already have Kafka setup in your local environment. If not, you can setup Kafka in windows environment by following this link.
Prerequisite
I am assuming that you already have Kafka setup in your local environment. If not, you can setup Kafka in windows environment by following this link.
Setup Mutibroker and Topic with Partition
1. First you need to start Zookeeper server. To start it, execute below command.<kafka_dir> needs to be replaced with the location where you have installed kafka.
<kafka_dir>\bin\windows\zookeeper-server-start.bat ..\..\config\zookeeper.properties
2. Go to <kafka_dir>\config\server.properties file and make a copy of it at same location say ‘first- broker-server.properties’.
3. You just need to change couple of properties in first- broker-server.properties to setup first broker.
# The id of the broker. This must be set to a unique integer for each broker.broker.id=1
# The port the socket server listens on, it should be unique for each broker
port=9092
# A comma seperated list of directories under which to store log files
log.dirs=<kafka_dir>/kafka-logs/first-broker-server
# Zookeeper connection string. This this the host and port where your zookeeper server is running.
zookeeper.connect=localhost:2181
4. Go to <kafka_dir>\config\server.properties file and make another copy of it at same location say ‘second-broker-server.properties’.
5. Now change the properties in second-broker-server.properties for second broker.
# The id of the broker. This must be set to a unique integer for each broker.broker.id=2
# The port the socket server listens on, it should be unique for each broker
port=9093
# A comma seperated list of directories under which to store log files
log.dirs=<kafka_dir>/kafka-logs/second-broker-server
# Zookeeper connection string. This this the host and port where your zookeeper server is running.
zookeeper.connect=localhost:2181
6. Now you need to start both brokers. To start broker, execute below commands for all the brokers:
Start first broker:
<kafka_dir>\bin\windows\kafka-server-start.bat ..\..\config\first-broker-server.properties
Start second broker:
<kafka_dir>\bin\windows\kafka-server-start.bat ..\..\config\second-broker-server.properties
7. Now create topic 'multibrokertopic' with 2 partition and 2 replication.
<kafka_dir>\bin\windows\kafka-topics.bat --create --zookeeper localhost:2181 --replication-factor 2 --partitions 2 --topic multibrokertopic
Java Producer Example with Multibroker And Partition
Now let's write a Java producer which will connect to two brokers. We already created a topic 'EmployeeLoginEventTopic' with 2 partitions. In this example we will see how we can send message to specific partition in a topic.
In this example program, I have tried to simulate the logic about sending employee login events to different Kafka brokers. Auto generated employeeId will be used as key and message will be sent to different partitions in format of 'EmployeeID:<employeeId>, LoginTime: <currentDate&Time>'.
Firs of all you need to understand what properties are required to initialize producer:
Properties props = new Properties();
props.put("metadata.broker.list", "localhost:9092,localhost:9093"); //1
props.put("serializer.class", "kafka.serializer.StringEncoder"); //2
props.put("partitioner.class", "com.nvexample.kafka.partition.PartitionerExample"); //3
props.put("request.required.acks", "1"); //4
- In first property, you need to mention the list of kafka brokers where producer will be connected.
- In second property, serializer class for the message key needs to be mentioned. You can use default class i.e. 'kafka.serializer.StringEncoder'.
- In third property, you need to implement 'kafka.producer.Partitioner' interface. In this implementation you can write a logic that will decide which message should be sent to which partition based on message key.
- In forth property, set as '1' if you want to make sure that producer will be acknowledged when message is received by brokers successfully
package
com.nvexample.kafka.partition;
import
kafka.producer.Partitioner;
import
kafka.utils.VerifiableProperties;
public class PartitionerExample
implements Partitioner {
public
PartitionerExample(VerifiableProperties props) {
}
public int partition(Object
employeeIdStr, int numOfPartitions) {
int partition = 0;
String stringKey = (String)
employeeIdStr;
Integer intKey = Integer.parseInt(stringKey);
if (intKey > 0) {
partition = intKey %
numOfPartitions;
}
System.out.println("Returning
partition number [" + partition + "] " +
"for key
["+employeeIdStr+"]");
return partition;
}
}
|
In this implementation class, get the key 'employeeIdStr' and perform modulo operation on the number of partitions configured in a topic 'multibrokertopic'. This partitioning logic ensures that message will be sent to the same partition for same key. I mean, all login event for same employeeId will be served by same partition.
Multibroker Producer Example:
package
com.nvexample.kafka.partition;
import java.util.Date;
import
java.util.Properties;
import java.util.Random;
import
kafka.javaapi.producer.Producer;
import
kafka.producer.KeyedMessage;
import
kafka.producer.ProducerConfig;
public class
ProducerWithPartitionExample {
private static
Producer<String, String> producer;
public final static String brokerList = "localhost:9092,localhost:9093";
public final static String PARTITIONER_IMPLEMENTATION_CLASS
= "com.nvexample.kafka.partition.PartitionerExample";
private static final String TOPIC = "EmployeeLoginEventTopic";
public void initialize() {
Properties props = new Properties();
props.put("metadata.broker.list", brokerList);
props.put("serializer.class", "kafka.serializer.StringEncoder");
props.put("partitioner.class", PARTITIONER_IMPLEMENTATION_CLASS);
props.put("request.required.acks", "1");
ProducerConfig config = new ProducerConfig(props);
producer = new
Producer<String, String>(config);
}
public void publish(String
key, String message) {
KeyedMessage<String,
String> data = new KeyedMessage<String, String>(
TOPIC, key, message);
producer.send(data);
}
public void closeProducer() {
producer.close();
}
public static void main(String[]
args) {
ProducerWithPartitionExample
producerWithPartition
= new
ProducerWithPartitionExample();
// Initialize the producer with
required properties
producerWithPartition.initialize();
// Publish message to brokers
Random rnd = new Random();
for (long employeeLogInEvent
= 0; employeeLogInEvent < 10;
employeeLogInEvent++) {
String employeeId =
String.valueOf(rnd.nextInt(10));
String msg = "EmployeeID:" + employeeId + ",
LoginTime:
"
+ new Date();
producerWithPartition.publish(employeeId,
msg);
}
// Close the connection between
broker and producer
producerWithPartition.closeProducer();
}
}
|
This is the consumer program. On starting this program, consumer will connect to different brokers via zookeeper and will start consuming messages published on 'EmployeeLoginEventTopic'.
package com.nvexample.kafka;
import java.util.*;
import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
public class KafkaConsumer {
private ConsumerConnector consumerConnector = null;
private final String topic = "EmployeeLoginEventTopic";
public void initialize() {
Properties props = new Properties();
props.put("zookeeper.connect", "localhost:2181");
props.put("group.id", "testgroup");
props.put("zookeeper.session.timeout.ms", "400");
props.put("zookeeper.sync.time.ms", "300");
props.put("auto.commit.interval.ms", "1000");
ConsumerConfig conConfig = new ConsumerConfig(props);
consumerConnector = Consumer.createJavaConsumerConnector(conConfig);
}
public void consume() {
//Key = topic name, Value = No. of threads for topic
Map<String, Integer> topicCount = new HashMap<String, Integer>();
topicCount.put(topic, new Integer(1));
//ConsumerConnector creates the message stream for each topic
Map<String, List<KafkaStream<byte[], byte[]>>> consumerStreams =
consumerConnector.createMessageStreams(topicCount);
// Get Kafka stream for topic 'mytopic'
List<KafkaStream<byte[], byte[]>> kStreamList =
consumerStreams.get(topic);
// Iterate stream using ConsumerIterator
for (final KafkaStream<byte[], byte[]> kStreams : kStreamList) {
ConsumerIterator<byte[], byte[]> consumerIte = kStreams.iterator();
while (consumerIte.hasNext())
System.out.println("Message consumed from topic
[" + topic + "] : " +
new String(consumerIte.next().message()));
}
//Shutdown the consumer connector
if (consumerConnector != null) consumerConnector.shutdown();
}
public static void main(String[] args) throws InterruptedException {
KafkaConsumer kafkaConsumer = new KafkaConsumer();
// Configure Kafka consumer
kafkaConsumer.initialize();
// Start consumption
kafkaConsumer.consume();
}
}
|
Execute consumer program first and than producer program. You will get following output. Notice in producer program console same partition number is returned for same key. This is ensuring that, the message for same employee id will be sent to same partition.
ProducerWithPartitionExample.java Program Console Output:
KafkaConsumer.java Program Console Output:
Hope this post helped you learning Java kafka producer with Multibroker & Partition flavor. It will be great if you leave your feedback on this post.