Friday 18 December 2015

Apache Kafka – Java Producer Example with Multibroker & Partition

In this post I will be demonstrating about how you can implement Java producer which can connect to multiple brokers and how you can produce messages to different partitions in a topic.

I also published couple of other posts about Kafka. If you are new and would like to learn Kafka from scratch, I would recommend to walk through below posts first.

Prerequisite 
I am assuming that you already have Kafka setup in your local environment. If not, you can setup Kafka in windows environment by following this link.

Setup Mutibroker and Topic with Partition
1. First you need to start Zookeeper server. To start it, execute below command.<kafka_dir> needs to be replaced with the location where you have installed kafka.
<kafka_dir>\bin\windows\zookeeper-server-start.bat ..\..\config\zookeeper.properties
2. Go to <kafka_dir>\config\server.properties file and make a copy of it at same location say ‘first- broker-server.properties’.

3. You just need to change couple of properties in first- broker-server.properties to setup first broker.
# The id of the broker. This must be set to a unique integer for each broker.
broker.id=1

# The port the socket server listens on, it should be unique for each broker

port=9092

# A comma seperated list of directories under which to store log files

log.dirs=<kafka_dir>/kafka-logs/first-broker-server

# Zookeeper connection string. This this the host and port where your zookeeper server is running.

zookeeper.connect=localhost:2181
4. Go to <kafka_dir>\config\server.properties file and make another copy of it at same location say ‘second-broker-server.properties’.

5. Now change the properties in second-broker-server.properties for second broker.
# The id of the broker. This must be set to a unique integer for each broker.
broker.id=2

# The port the socket server listens on, it should be unique for each broker

port=9093

# A comma seperated list of directories under which to store log files

log.dirs=<kafka_dir>/kafka-logs/second-broker-server

# Zookeeper connection string. This this the host and port where your zookeeper server is running.

zookeeper.connect=localhost:2181
6. Now you need to start both brokers. To start broker, execute below commands for all the brokers:

    Start first broker:
<kafka_dir>\bin\windows\kafka-server-start.bat ..\..\config\first-broker-server.properties
    Start second broker:
<kafka_dir>\bin\windows\kafka-server-start.bat ..\..\config\second-broker-server.properties
7. Now create topic 'multibrokertopic' with 2 partition and 2 replication. 
<kafka_dir>\bin\windows\kafka-topics.bat --create --zookeeper localhost:2181 --replication-factor 2 --partitions 2 --topic multibrokertopic
Java Producer Example with Multibroker And Partition
Now let's write a Java producer which will connect to two brokers. We already created a topic 'EmployeeLoginEventTopic' with 2 partitions. In this example we will see how we can send message to specific partition in a topic.


In this example program, I have tried to simulate the logic about sending employee login events to different Kafka brokers. Auto generated employeeId will be used as key and message will be sent to different partitions in format of 'EmployeeID:<employeeId>, LoginTime: <currentDate&Time>'. 


Firs of all you need to understand what properties are required to initialize producer:
Properties props = new Properties();
props.put("metadata.broker.list", "localhost:9092,localhost:9093"); //1
props.put("serializer.class", "kafka.serializer.StringEncoder"); //2
props.put("partitioner.class", "com.nvexample.kafka.partition.PartitionerExample"); //3
props.put("request.required.acks", "1"); //4
  • In first property, you need to mention the list of kafka brokers where producer will be connected.
  • In second property, serializer class for the message key needs to be mentioned. You can use default class i.e. 'kafka.serializer.StringEncoder'.
  • In third property, you need to implement 'kafka.producer.Partitioner' interface. In this implementation you can write a logic that will decide which message should be sent to which partition based on message key. 
  • In forth property, set as '1' if you want to make sure that producer will be acknowledged when message is received by brokers successfully 
Partitioner Class Implementation: 
package com.nvexample.kafka.partition;

import kafka.producer.Partitioner;
import kafka.utils.VerifiableProperties;
public class PartitionerExample implements Partitioner {

       public PartitionerExample(VerifiableProperties props) {
       }
       public int partition(Object employeeIdStr, int numOfPartitions) {
             int partition = 0;
             String stringKey = (String) employeeIdStr;
             Integer intKey = Integer.parseInt(stringKey);
             if (intKey > 0) {
                    partition = intKey % numOfPartitions;               
             }
             System.out.println("Returning partition number [" + partition + "] " +
                           "for key ["+employeeIdStr+"]");
             return partition;  
       }
}

In this implementation class, get the key 'employeeIdStr' and perform modulo operation on the number of partitions configured in a topic 'multibrokertopic'. This partitioning logic ensures that message will be sent to the same partition for same key. I mean, all login event for same employeeId will be served by same partition. 

Multibroker Producer Example:
package com.nvexample.kafka.partition;

import java.util.Date;
import java.util.Properties;
import java.util.Random;
import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;

public class ProducerWithPartitionExample {
      
       private static Producer<String, String> producer;
       public final static String brokerList = "localhost:9092,localhost:9093";
       public final static String PARTITIONER_IMPLEMENTATION_CLASS
                           = "com.nvexample.kafka.partition.PartitionerExample";
       private static final String TOPIC = "EmployeeLoginEventTopic";
      
       public void initialize() {
             Properties props = new Properties();
             props.put("metadata.broker.list", brokerList);
             props.put("serializer.class", "kafka.serializer.StringEncoder");
             props.put("partitioner.class", PARTITIONER_IMPLEMENTATION_CLASS);
             props.put("request.required.acks", "1");
             ProducerConfig config = new ProducerConfig(props);
             producer = new Producer<String, String>(config);
       }
       public void publish(String key, String message) {
             KeyedMessage<String, String> data = new KeyedMessage<String, String>(
                           TOPIC, key, message);
             producer.send(data);
       }
       public void closeProducer() {
             producer.close();
       }
       public static void main(String[] args) {
             ProducerWithPartitionExample producerWithPartition
                       = new ProducerWithPartitionExample();
             // Initialize the producer with required properties
             producerWithPartition.initialize();           
             // Publish message to brokers
             Random rnd = new Random();
             for (long employeeLogInEvent = 0; employeeLogInEvent < 10;
                                                             employeeLogInEvent++) {
                    String employeeId = String.valueOf(rnd.nextInt(10));
                    String msg =   "EmployeeID:" + employeeId + ",
                                   LoginTime: " + new Date();
                    producerWithPartition.publish(employeeId, msg);
             }           
             // Close the connection between broker and producer
             producerWithPartition.closeProducer();
       }
}

In this example, we are sending employee login event as message along with employeeId as key. If you've defined a partitioner class and key is not sent along with message, Kafka assigns the message to a random partition.

Java Consumer Example 
This is the consumer program. On starting this program, consumer will connect to different brokers via zookeeper and will start consuming messages published on 'EmployeeLoginEventTopic'.

   package com.nvexample.kafka;

   import java.util.*;
   import kafka.consumer.Consumer;
   import kafka.consumer.ConsumerConfig;
   import kafka.consumer.ConsumerIterator;
   import kafka.consumer.KafkaStream;
   import kafka.javaapi.consumer.ConsumerConnector;

    public class KafkaConsumer {
       private ConsumerConnector consumerConnector = null;
       private final String topic = "EmployeeLoginEventTopic";

       public void initialize() {
             Properties props = new Properties();
             props.put("zookeeper.connect""localhost:2181");
             props.put("group.id""testgroup");
             props.put("zookeeper.session.timeout.ms""400");
             props.put("zookeeper.sync.time.ms""300");
             props.put("auto.commit.interval.ms""1000");
             ConsumerConfig conConfig = new ConsumerConfig(props);
             consumerConnector = Consumer.createJavaConsumerConnector(conConfig);
       }

       public void consume() {
             //Key = topic name, Value = No. of threads for topic
             Map<String, Integer> topicCount = new HashMap<String, Integer>();       
             topicCount.put(topicnew Integer(1));
            
             //ConsumerConnector creates the message stream for each topic
             Map<String, List<KafkaStream<byte[], byte[]>>> consumerStreams =
                   consumerConnector.createMessageStreams(topicCount);         
            
             // Get Kafka stream for topic 'mytopic'
             List<KafkaStream<byte[], byte[]>> kStreamList =
                                                  consumerStreams.get(topic);
             // Iterate stream using ConsumerIterator
             for (final KafkaStream<byte[], byte[]> kStreams : kStreamList) {
                    ConsumerIterator<byte[], byte[]> consumerIte = kStreams.iterator();
                   
                    while (consumerIte.hasNext())
                           System.out.println("Message consumed from topic
                                         [" + topic + "] : "       +
                                           new String(consumerIte.next().message()));              
             }
             //Shutdown the consumer connector
             if (consumerConnector != null)   consumerConnector.shutdown();          
       }

       public static void main(String[] args) throws InterruptedException {
             KafkaConsumer kafkaConsumer = new KafkaConsumer();
             // Configure Kafka consumer
             kafkaConsumer.initialize();
             // Start consumption
             kafkaConsumer.consume();
       }
   }



Output of Producer and Consumer Program
Execute consumer program first and than producer program. You will get following output. Notice in producer program console same partition number is returned for same key. This is ensuring that, the message for same employee id will be sent to same partition. 

ProducerWithPartitionExample.java Program Console Output:



KafkaConsumer.java Program Console Output:


Hope this post helped you learning Java kafka producer with Multibroker & Partition flavor. It will be great if you leave your feedback on this post.

Monday 14 December 2015

Apache Kafka – MultiBroker + Partitioning + Replication

The main goal of this post is to demonstrate the concept of multi broker, partitioning and replication in Apache Kafka. At the end of this post, steps are included to setup multiple brokers along with partitioning and replication.

If you are new to Apache kafka, you can refer below posts to understand Kafka quickly.

About Multi Brokers
Setting up more than one broker in kafka cluster is called multi broker concept. Below mentioned diagram demonstrates the architecture of multi broker in Kafka cluster. If you are in hurry setting up multi broker, you can scroll down to the below mentioned steps.



About Partition in Kafka
Topic is one of the main abstractions in Kafka where partitions can be considered as subset of the topic. Partitions are managed by Kafka brokers. Producers are responsible to produce messages to topic/partitions. Each message in a partition is represented with unique id which is called ‘message offset’. Basically, this message offset can be understood as increasing logical time stamp within a partition. Consumers are responsible to request message from certain offset onward. For each consumer group, messages are guaranteed to be consumed at least once. Below diagram shows the basic architecture of partitions.


About Replication in Kafka
Suppose you are having 2 brokers (broker-1 and broker-2) and your message published to broker-1. What if your broker-1 fails due to some error? In this case your message will be lost and will never be consumed. To solve this problem, replication concept plays important role. Replication means, replicating messages among different brokers. Replication gives the guarantee that any published message should not be lost and consumed properly even broker fails due to program error or machine error. Replication provides better durability and higher availability. Both producers and consumers are aware about replication in Kafka. Below diagram shows the concept of replication.

There are so many other aspects to understand about replication in kafka. Each and every detail is not covered in this post. To get more detail please follow this link.

How to Setup Multi Broker
Well, we talked about enough theory. Now it's time to do some practical work. Setting up multiple brokers is straight forward. You just need to follow below steps to setup multiple brokers. 

1. First you need to start Zookeeper server. To run it, execute below command.
<kafka_dir>\bin\windows\zookeeper-server-start.bat ..\..\config\zookeeper.properties
2. Go to <kafka_dir>\config\server.properties file and make a copy of it at same location say ‘first- broker-server.properties’.

3. You just need to change couple of properties in first- broker-server.properties to setup first broker.
# The id of the broker. This must be set to a unique integer for each broker.
broker.id=1

# The port the socket server listens on, it should be unique for each broker

port=9092

# A comma seperated list of directories under which to store log files

log.dirs=<kafka_dir>/kafka-logs/first-broker-server

# Zookeeper connection string. This this the host and port where your zookeeper server is running.

zookeeper.connect=localhost:2181
4. Go to <kafka_dir>\config\server.properties file and make another copy of it at same location say ‘second-broker-server.properties’.

5. Now change the properties in second-broker-server.properties for second broker.
# The id of the broker. This must be set to a unique integer for each broker.
broker.id=2

# The port the socket server listens on, it should be unique for each broker

port=9093

# A comma seperated list of directories under which to store log files

log.dirs=<kafka_dir>/kafka-logs/second-broker-server

# Zookeeper connection string. This this the host and port where your zookeeper server is running.

zookeeper.connect=localhost:2181
6. Now you need to start both brokers. To start broker, execute below commands for all the brokers:

    Start first broker:
<kafka_dir>\bin\windows\kafka-server-start.bat ..\..\config\first-broker-server.properties
    Start second broker:
<kafka_dir>\bin\windows\kafka-server-start.bat ..\..\config\second-broker-server.properties
7. Now create topic 'multibrokertopic' with 2 partition and 2 replication. 
<kafka_dir>\bin\windows\kafka-topics.bat --create --zookeeper localhost:2181 --replication-factor 2 --partitions 2 --topic multibrokertopic
8. In this step you can see how to produce message to multiple brokers using command. To connect any producer to multiple brokers, we need to configure the list of brokers (comma separated list of <ip>:<port> where brokers are running). 
<kafka_dir>\bin\windows\kafka-console-producer.bat --broker-list localhost:9092,localhost:9093 --topic multibrokertopic
9. Now start the consumer to see the published messages.. 
<kafka_dir>\bin\windows\kafka-console-consumer.bat --zookeeper localhost:2181 --topic multibrokertopic
Hope you had great time reading this post. In next post, I will demonstrate how to implement producer in Java to send messages to multiple brokers along with partitioning.

Saturday 5 December 2015

Apache Kafka - Example of Producer/Consumer in Java

If you are searching for how you can write simple Kafka producer and consumer in Java, I think you reached to the right blog. In this post you will see how you can write standalone program that can produce messages and publish them to Kafka broker. Also, you can learn how to write another standalone program that can work as consumer to consume the messages published by Kafka producer.

In this post, I have focused on simple producer/consumer implementation and its examples. To keep it simple, I have not included example of message partitioning, multiple topics and multiple Kafka nodes. Let’s use single topic to produce and consume the message without message partitioning.


Prerequisite
Before proceeding further, please ensure that:
  • Kafka cluster is already setup. If it is not, don’t worry, follow my another post ‘ApacheKafka - Quick Start on Windows’ to setup Kafka in local environment.
  • Topic ‘mytopic’ is created. 
  • Zookeeper should be running on  localhost:2181.
  • Kafka should be running on localhost:9092.

Setting Up Project in Eclipse to Create Producer/Consumer
  1. Create simple java project in Eclipse.
  2. Add Kafka specific libraries in your project build path. You can locate Kafka libraries at ‘<kafka_dir>\libs’. This is the directory where you have extracted Kafka distribution during Kafka setup. 
   
       About Kafka Producer APIs
       To implement Kafka producer we need to use following Kafka classes. Let’s understand the responsibility of these different classes:
  1.  kafka.producer.ProducerConfig: This class is used to wrap different properties those are required to establish connection with Kafka broker. To get more detail on producer’s properties, you can follow section ‘Important configuration properties for the producer’ from this link.
  2. kafka.producer.KeyedMessage: This class is used by Kafka producer to send message/data to Kafka broker. With this class we can define the topic name, message partition key and message. Producer sends data to the same topic which is defined in KeyedMessage. Defining message partition key is an optional feature.
  3. kafka.javaapi.producer.Producer: This class is used to send data to the broker in form of KeyedMessage object. Message can be sent in both way synchronously or asynchronously.
     Kafka Producer - Java Example 
 In below diagram you can get the detail of different APIs implemented in example producer program. 


      
   package com.nvexample.kafka;

   import java.io.BufferedReader;
   import java.io.InputStreamReader;
   import java.util.Properties;

   import kafka.javaapi.producer.Producer;
   import kafka.producer.KeyedMessage;
   import kafka.producer.ProducerConfig;

    public class KafkaProducer {
       private static Producer<Integer, String> producer;
       private static final String topic= "mytopic";

       public void initialize() {
             Properties producerProps = new Properties();
             producerProps.put("metadata.broker.list", "localhost:9092");
             producerProps.put("serializer.class", "kafka.serializer.StringEncoder");
             producerProps.put("request.required.acks", "1");
             ProducerConfig producerConfig = new ProducerConfig(producerProps);
             producer = new Producer<Integer, String>(producerConfig);
       }
       public void publishMesssage() throws Exception{            
             BufferedReader reader = new BufferedReader(new InputStreamReader(System.in));               
         while (true){
             System.out.print("Enter message to send to kafka broker
                                       (Press 'Y' to close producer): ");
           String msg = null;
           msg = reader.readLine(); // Read message from console
           //Define topic name and message
           KeyedMessage<Integer, String> keyedMsg =
                        new KeyedMessage<Integer, String>(topic, msg);
           producer.send(keyedMsg); // This publishes message on given topic
           if("Y".equals(msg)){ break; }
           System.out.println("--> Message [" + msg + "] sent.
                         Check message on Consumer's program console");
         }
         return;
       }

       public static void main(String[] args) throws Exception {
             KafkaProducer kafkaProducer = new KafkaProducer();
             // Initialize producer
             kafkaProducer.initialize();            
             // Publish message
             kafkaProducer.publishMesssage();
             //Close the producer
             producer.close();
       }
   }

       
   About Kafka Consumer APIs
    To implement Kafka consumer we need to use following Kafka classes. Let’s understand the responsibility of these different classes:
  • kafka.consumer.ConsumerConfig: This class is used to wrap different properties those are required to establish connection between consumer and Zookeeper. To get more detail on producer’s properties, you can follow section ‘Important configuration properties for the consumer’ from this link.
  • kafka.javaapi.consumer.ConsumerConnector: This is the Kafka interface. ZookeeperConsumerConnector is the implementer class for this interface. This implementer class is used to establish connection with ZooKeeper. All the interactions with ZooKeeper are taken care by this implementer class class.
  • kafka.consumer.KafkaStream: ConsumerConnector returns the list of KafkaStream object for each topic wrapped in a map as mentioned below. Map key is the topic name and value is the list of KafkaStream objects. KafkaStream K is partition key type and V is the actual message Map<String, List<KafkaStream<K, V>>
  • kafka.consumer.ConsumerIterator: ConsumerIterator is used to iterate KafkaStream.
   Kafka Consumer- Java Example 
    In below diagram you can get the detail of different APIs implemented in example consumer program. 


   package com.nvexample.kafka;

   import java.util.*;
   import kafka.consumer.Consumer;
   import kafka.consumer.ConsumerConfig;
   import kafka.consumer.ConsumerIterator;
   import kafka.consumer.KafkaStream;
   import kafka.javaapi.consumer.ConsumerConnector;

    public class KafkaConsumer {
       private ConsumerConnector consumerConnector = null;
       private final String topic = "mytopic";

       public void initialize() {
             Properties props = new Properties();
             props.put("zookeeper.connect", "localhost:2181");
             props.put("group.id", "testgroup");
             props.put("zookeeper.session.timeout.ms", "400");
             props.put("zookeeper.sync.time.ms", "300");
             props.put("auto.commit.interval.ms", "1000");
             ConsumerConfig conConfig = new ConsumerConfig(props);
             consumerConnector = Consumer.createJavaConsumerConnector(conConfig);
       }

       public void consume() {
             //Key = topic name, Value = No. of threads for topic
             Map<String, Integer> topicCount = new HashMap<String, Integer>();       
             topicCount.put(topic, new Integer(1));
            
             //ConsumerConnector creates the message stream for each topic
             Map<String, List<KafkaStream<byte[], byte[]>>> consumerStreams =
                   consumerConnector.createMessageStreams(topicCount);         
            
             // Get Kafka stream for topic 'mytopic'
             List<KafkaStream<byte[], byte[]>> kStreamList =
                                                  consumerStreams.get(topic);
             // Iterate stream using ConsumerIterator
             for (final KafkaStream<byte[], byte[]> kStreams : kStreamList) {
                    ConsumerIterator<byte[], byte[]> consumerIte = kStreams.iterator();
                   
                    while (consumerIte.hasNext())
                           System.out.println("Message consumed from topic
                                         [" + topic + "] : "       +
                                           new String(consumerIte.next().message()));              
             }
             //Shutdown the consumer connector
             if (consumerConnector != null)   consumerConnector.shutdown();          
       }

       public static void main(String[] args) throws InterruptedException {
             KafkaConsumer kafkaConsumer = new KafkaConsumer();
             // Configure Kafka consumer
             kafkaConsumer.initialize();
             // Start consumption
             kafkaConsumer.consume();
       }
   }



Running KafkaProducer & KafkaConsumer Standalone Program  
      Now you can run the Kafka consumer and Producer program. After running producer it will prompt to enter message. As you type the message and press enter in KafkaProducer program console, see the KafkaConsumer program console, it will print the consumed message.

KafkaProducer Program Console:


KafkaConsumer Program Console:



I hope this post helped you learning Kafka Producer and Consumer Programming.

!!! Happy Kafka Learning !!