If you are searching for how you can write simple Kafka producer
and consumer in Java, I think you reached to the right blog. In this post you
will see how you can write standalone program that can produce messages and
publish them to Kafka broker. Also, you can learn how to write another
standalone program that can work as consumer to consume the messages published
by Kafka producer.
Prerequisite
Before proceeding further, please ensure that:
- Kafka cluster is already setup. If it is not, don’t worry, follow my another post ‘ApacheKafka - Quick Start on Windows’ to setup Kafka in local environment.
- Topic ‘mytopic’ is created.
- Zookeeper should be running on localhost:2181.
- Kafka should be running on localhost:9092.
Setting Up
Project in Eclipse to Create Producer/Consumer
- Create simple java project in Eclipse.
- Add Kafka specific libraries in your project build path. You can locate Kafka libraries at ‘<kafka_dir>\libs’. This is the directory where you have extracted Kafka distribution during Kafka setup.
About Kafka Producer APIs
To implement Kafka producer we need to use following Kafka
classes. Let’s understand the responsibility of these different classes:
- kafka.producer.ProducerConfig: This class is used to wrap different properties those are required to establish connection with Kafka broker. To get more detail on producer’s properties, you can follow section ‘Important configuration properties for the producer’ from this link.
- kafka.producer.KeyedMessage: This class is used by Kafka producer to send message/data to Kafka broker. With this class we can define the topic name, message partition key and message. Producer sends data to the same topic which is defined in KeyedMessage. Defining message partition key is an optional feature.
- kafka.javaapi.producer.Producer: This class is used to send data to the broker in form of KeyedMessage object. Message can be sent in both way synchronously or asynchronously.
Kafka Producer - Java Example
In below diagram you can get the detail of different APIs implemented in example producer program.
In below diagram you can get the detail of different APIs implemented in example producer program.
package
com.nvexample.kafka;
import
java.io.BufferedReader;
import
java.io.InputStreamReader;
import
java.util.Properties;
import
kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import
kafka.producer.ProducerConfig;
public class KafkaProducer {
private static
Producer<Integer, String> producer;
private static final String topic= "mytopic";
public void initialize() {
Properties producerProps = new Properties();
producerProps.put("metadata.broker.list", "localhost:9092");
producerProps.put("serializer.class", "kafka.serializer.StringEncoder");
producerProps.put("request.required.acks", "1");
ProducerConfig producerConfig = new
ProducerConfig(producerProps);
producer = new
Producer<Integer, String>(producerConfig);
}
public void publishMesssage() throws Exception{
BufferedReader reader = new BufferedReader(new
InputStreamReader(System.in));
while (true){
System.out.print("Enter
message to send to kafka broker
(Press
'Y' to close producer): ");
String msg = null;
msg = reader.readLine(); // Read message
from console
//Define topic name and message
KeyedMessage<Integer, String> keyedMsg =
new KeyedMessage<Integer,
String>(topic, msg);
producer.send(keyedMsg); // This publishes
message on given topic
if("Y".equals(msg)){ break; }
System.out.println("-->
Message [" + msg + "] sent.
Check message on
Consumer's program console");
}
return;
}
public static void main(String[]
args) throws Exception {
KafkaProducer kafkaProducer = new KafkaProducer();
// Initialize producer
kafkaProducer.initialize();
// Publish message
kafkaProducer.publishMesssage();
//Close the producer
producer.close();
}
}
|
About Kafka Consumer APIs
To implement Kafka consumer we need to use following Kafka
classes. Let’s understand the responsibility of these different classes:
- kafka.consumer.ConsumerConfig: This class is used to wrap different properties those are required to establish connection between consumer and Zookeeper. To get more detail on producer’s properties, you can follow section ‘Important configuration properties for the consumer’ from this link.
- kafka.javaapi.consumer.ConsumerConnector: This is the Kafka interface. ZookeeperConsumerConnector is the implementer class for this interface. This implementer class is used to establish connection with ZooKeeper. All the interactions with ZooKeeper are taken care by this implementer class class.
- kafka.consumer.KafkaStream: ConsumerConnector returns the list of KafkaStream object for each topic wrapped in a map as mentioned below. Map key is the topic name and value is the list of KafkaStream objects. KafkaStream K is partition key type and V is the actual message Map<String, List<KafkaStream<K, V>>
- kafka.consumer.ConsumerIterator: ConsumerIterator is used to iterate KafkaStream.
Kafka Consumer- Java Example
Running KafkaProducer & KafkaConsumer Standalone Program
In below diagram you can get the detail of different APIs implemented in example consumer program.
package
com.nvexample.kafka;
import java.util.*;
import
kafka.consumer.Consumer;
import
kafka.consumer.ConsumerConfig;
import
kafka.consumer.ConsumerIterator;
import
kafka.consumer.KafkaStream;
import
kafka.javaapi.consumer.ConsumerConnector;
public class KafkaConsumer {
private ConsumerConnector consumerConnector = null;
private final String topic = "mytopic";
public void initialize() {
Properties props = new Properties();
props.put("zookeeper.connect", "localhost:2181");
props.put("group.id", "testgroup");
props.put("zookeeper.session.timeout.ms", "400");
props.put("zookeeper.sync.time.ms", "300");
props.put("auto.commit.interval.ms", "1000");
ConsumerConfig conConfig = new
ConsumerConfig(props);
consumerConnector = Consumer.createJavaConsumerConnector(conConfig);
}
public void consume() {
//Key = topic name, Value = No. of
threads for topic
Map<String, Integer>
topicCount = new HashMap<String, Integer>();
topicCount.put(topic, new Integer(1));
//ConsumerConnector creates the
message stream for each topic
Map<String,
List<KafkaStream<byte[], byte[]>>> consumerStreams =
consumerConnector.createMessageStreams(topicCount);
// Get Kafka stream for topic 'mytopic'
List<KafkaStream<byte[], byte[]>>
kStreamList =
consumerStreams.get(topic);
// Iterate stream using
ConsumerIterator
for (final KafkaStream<byte[], byte[]> kStreams :
kStreamList) {
ConsumerIterator<byte[], byte[]> consumerIte
= kStreams.iterator();
while
(consumerIte.hasNext())
System.out.println("Message
consumed from topic
[" + topic + "] : " +
new
String(consumerIte.next().message()));
}
//Shutdown the consumer connector
if (consumerConnector != null) consumerConnector.shutdown();
}
public static void main(String[]
args) throws InterruptedException {
KafkaConsumer kafkaConsumer = new KafkaConsumer();
// Configure Kafka consumer
kafkaConsumer.initialize();
// Start consumption
kafkaConsumer.consume();
}
}
|
Now you can run the Kafka consumer and Producer program. After running producer it will prompt to enter message. As you type the message and press enter in KafkaProducer program console, see the KafkaConsumer program console, it will print the consumed message.
KafkaProducer Program Console:
KafkaConsumer Program Console:
I hope this post helped you learning Kafka Producer and Consumer Programming.
!!! Happy Kafka Learning !!
I wanted to thank you for this great read!! I definitely enjoying every little bit of it I have you bookmarked to check out new stuff you post. shroom bros
ReplyDelete