Friday, 18 December 2015

Apache Kafka – Java Producer Example with Multibroker & Partition

In this post I will be demonstrating about how you can implement Java producer which can connect to multiple brokers and how you can produce messages to different partitions in a topic.

I also published couple of other posts about Kafka. If you are new and would like to learn Kafka from scratch, I would recommend to walk through below posts first.

Prerequisite 
I am assuming that you already have Kafka setup in your local environment. If not, you can setup Kafka in windows environment by following this link.

Setup Mutibroker and Topic with Partition
1. First you need to start Zookeeper server. To start it, execute below command.<kafka_dir> needs to be replaced with the location where you have installed kafka.
<kafka_dir>\bin\windows\zookeeper-server-start.bat ..\..\config\zookeeper.properties
2. Go to <kafka_dir>\config\server.properties file and make a copy of it at same location say ‘first- broker-server.properties’.

3. You just need to change couple of properties in first- broker-server.properties to setup first broker.
# The id of the broker. This must be set to a unique integer for each broker.
broker.id=1

# The port the socket server listens on, it should be unique for each broker

port=9092

# A comma seperated list of directories under which to store log files

log.dirs=<kafka_dir>/kafka-logs/first-broker-server

# Zookeeper connection string. This this the host and port where your zookeeper server is running.

zookeeper.connect=localhost:2181
4. Go to <kafka_dir>\config\server.properties file and make another copy of it at same location say ‘second-broker-server.properties’.

5. Now change the properties in second-broker-server.properties for second broker.
# The id of the broker. This must be set to a unique integer for each broker.
broker.id=2

# The port the socket server listens on, it should be unique for each broker

port=9093

# A comma seperated list of directories under which to store log files

log.dirs=<kafka_dir>/kafka-logs/second-broker-server

# Zookeeper connection string. This this the host and port where your zookeeper server is running.

zookeeper.connect=localhost:2181
6. Now you need to start both brokers. To start broker, execute below commands for all the brokers:

    Start first broker:
<kafka_dir>\bin\windows\kafka-server-start.bat ..\..\config\first-broker-server.properties
    Start second broker:
<kafka_dir>\bin\windows\kafka-server-start.bat ..\..\config\second-broker-server.properties
7. Now create topic 'multibrokertopic' with 2 partition and 2 replication. 
<kafka_dir>\bin\windows\kafka-topics.bat --create --zookeeper localhost:2181 --replication-factor 2 --partitions 2 --topic multibrokertopic
Java Producer Example with Multibroker And Partition
Now let's write a Java producer which will connect to two brokers. We already created a topic 'EmployeeLoginEventTopic' with 2 partitions. In this example we will see how we can send message to specific partition in a topic.


In this example program, I have tried to simulate the logic about sending employee login events to different Kafka brokers. Auto generated employeeId will be used as key and message will be sent to different partitions in format of 'EmployeeID:<employeeId>, LoginTime: <currentDate&Time>'. 


Firs of all you need to understand what properties are required to initialize producer:
Properties props = new Properties();
props.put("metadata.broker.list", "localhost:9092,localhost:9093"); //1
props.put("serializer.class", "kafka.serializer.StringEncoder"); //2
props.put("partitioner.class", "com.nvexample.kafka.partition.PartitionerExample"); //3
props.put("request.required.acks", "1"); //4
  • In first property, you need to mention the list of kafka brokers where producer will be connected.
  • In second property, serializer class for the message key needs to be mentioned. You can use default class i.e. 'kafka.serializer.StringEncoder'.
  • In third property, you need to implement 'kafka.producer.Partitioner' interface. In this implementation you can write a logic that will decide which message should be sent to which partition based on message key. 
  • In forth property, set as '1' if you want to make sure that producer will be acknowledged when message is received by brokers successfully 
Partitioner Class Implementation: 
package com.nvexample.kafka.partition;

import kafka.producer.Partitioner;
import kafka.utils.VerifiableProperties;
public class PartitionerExample implements Partitioner {

       public PartitionerExample(VerifiableProperties props) {
       }
       public int partition(Object employeeIdStr, int numOfPartitions) {
             int partition = 0;
             String stringKey = (String) employeeIdStr;
             Integer intKey = Integer.parseInt(stringKey);
             if (intKey > 0) {
                    partition = intKey % numOfPartitions;               
             }
             System.out.println("Returning partition number [" + partition + "] " +
                           "for key ["+employeeIdStr+"]");
             return partition;  
       }
}

In this implementation class, get the key 'employeeIdStr' and perform modulo operation on the number of partitions configured in a topic 'multibrokertopic'. This partitioning logic ensures that message will be sent to the same partition for same key. I mean, all login event for same employeeId will be served by same partition. 

Multibroker Producer Example:
package com.nvexample.kafka.partition;

import java.util.Date;
import java.util.Properties;
import java.util.Random;
import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;

public class ProducerWithPartitionExample {
      
       private static Producer<String, String> producer;
       public final static String brokerList = "localhost:9092,localhost:9093";
       public final static String PARTITIONER_IMPLEMENTATION_CLASS
                           = "com.nvexample.kafka.partition.PartitionerExample";
       private static final String TOPIC = "EmployeeLoginEventTopic";
      
       public void initialize() {
             Properties props = new Properties();
             props.put("metadata.broker.list", brokerList);
             props.put("serializer.class", "kafka.serializer.StringEncoder");
             props.put("partitioner.class", PARTITIONER_IMPLEMENTATION_CLASS);
             props.put("request.required.acks", "1");
             ProducerConfig config = new ProducerConfig(props);
             producer = new Producer<String, String>(config);
       }
       public void publish(String key, String message) {
             KeyedMessage<String, String> data = new KeyedMessage<String, String>(
                           TOPIC, key, message);
             producer.send(data);
       }
       public void closeProducer() {
             producer.close();
       }
       public static void main(String[] args) {
             ProducerWithPartitionExample producerWithPartition
                       = new ProducerWithPartitionExample();
             // Initialize the producer with required properties
             producerWithPartition.initialize();           
             // Publish message to brokers
             Random rnd = new Random();
             for (long employeeLogInEvent = 0; employeeLogInEvent < 10;
                                                             employeeLogInEvent++) {
                    String employeeId = String.valueOf(rnd.nextInt(10));
                    String msg =   "EmployeeID:" + employeeId + ",
                                   LoginTime: " + new Date();
                    producerWithPartition.publish(employeeId, msg);
             }           
             // Close the connection between broker and producer
             producerWithPartition.closeProducer();
       }
}

In this example, we are sending employee login event as message along with employeeId as key. If you've defined a partitioner class and key is not sent along with message, Kafka assigns the message to a random partition.

Java Consumer Example 
This is the consumer program. On starting this program, consumer will connect to different brokers via zookeeper and will start consuming messages published on 'EmployeeLoginEventTopic'.

   package com.nvexample.kafka;

   import java.util.*;
   import kafka.consumer.Consumer;
   import kafka.consumer.ConsumerConfig;
   import kafka.consumer.ConsumerIterator;
   import kafka.consumer.KafkaStream;
   import kafka.javaapi.consumer.ConsumerConnector;

    public class KafkaConsumer {
       private ConsumerConnector consumerConnector = null;
       private final String topic = "EmployeeLoginEventTopic";

       public void initialize() {
             Properties props = new Properties();
             props.put("zookeeper.connect""localhost:2181");
             props.put("group.id""testgroup");
             props.put("zookeeper.session.timeout.ms""400");
             props.put("zookeeper.sync.time.ms""300");
             props.put("auto.commit.interval.ms""1000");
             ConsumerConfig conConfig = new ConsumerConfig(props);
             consumerConnector = Consumer.createJavaConsumerConnector(conConfig);
       }

       public void consume() {
             //Key = topic name, Value = No. of threads for topic
             Map<String, Integer> topicCount = new HashMap<String, Integer>();       
             topicCount.put(topicnew Integer(1));
            
             //ConsumerConnector creates the message stream for each topic
             Map<String, List<KafkaStream<byte[], byte[]>>> consumerStreams =
                   consumerConnector.createMessageStreams(topicCount);         
            
             // Get Kafka stream for topic 'mytopic'
             List<KafkaStream<byte[], byte[]>> kStreamList =
                                                  consumerStreams.get(topic);
             // Iterate stream using ConsumerIterator
             for (final KafkaStream<byte[], byte[]> kStreams : kStreamList) {
                    ConsumerIterator<byte[], byte[]> consumerIte = kStreams.iterator();
                   
                    while (consumerIte.hasNext())
                           System.out.println("Message consumed from topic
                                         [" + topic + "] : "       +
                                           new String(consumerIte.next().message()));              
             }
             //Shutdown the consumer connector
             if (consumerConnector != null)   consumerConnector.shutdown();          
       }

       public static void main(String[] args) throws InterruptedException {
             KafkaConsumer kafkaConsumer = new KafkaConsumer();
             // Configure Kafka consumer
             kafkaConsumer.initialize();
             // Start consumption
             kafkaConsumer.consume();
       }
   }



Output of Producer and Consumer Program
Execute consumer program first and than producer program. You will get following output. Notice in producer program console same partition number is returned for same key. This is ensuring that, the message for same employee id will be sent to same partition. 

ProducerWithPartitionExample.java Program Console Output:



KafkaConsumer.java Program Console Output:


Hope this post helped you learning Java kafka producer with Multibroker & Partition flavor. It will be great if you leave your feedback on this post.

33 comments:

  1. Really successfully composed account. Will probably be best for any person who staff the concept, as well as me personally. Maintain the great function – can’t hold out to find out further articles or blog posts. Best laser hair removal spa in Edmonton

    ReplyDelete
  2. Certainly a fantastic piece of work ... It has relevant information. Thanks for posting this. Your blog is so interesting and very informative.Thanks sharing. Definitely a great piece of work Thanks for your work. KBF Club Holidays

    ReplyDelete
  3. Sugarcane harvest will likely then be ready-made into sugar could be very hard. There are various kinds of sugar that can be found in the future very good article that deserves all the praise, congratulations. Move N Grow Noida

    ReplyDelete
  4. Great tips, many thanks for sharing. I have printed and will stick on the wall! I like this blog. best replacement windows nj

    ReplyDelete
  5. Your site is one of the beautiful place and I am delighted to find some more information about its natural beauty. Now I am much excited to visit it. ceramic coating perth

    ReplyDelete
  6. Such a relative in a general sense the lion's share of us without a doubt picked considering your website. The thought been discovered truly demonstrated joined with moreover unmistakable to discover. In no way such as much more There are presently bought got recognize which can be indeed absolutely not that important. Cheap weed Canada

    ReplyDelete
  7. We have came across, When i by no means more likely to go to the definitely lovely publish on-line.. Following discovering this specific location, When i imagined thus lucky to determine the written content. European License Plate Holder for Kia Motors

    ReplyDelete
  8. I just got to this amazing site not long ago. I was actually captured with the piece of resources you have got here. Big thumbs up for making such wonderful blog page! JUMBO BOX BRAIDS

    ReplyDelete
  9. Which is really information, clean up and also apparent. I do think by which Almost everything may be referred to inside of structured way in order that market can quickly find utmost facts and discover quite a few points. vitamin for hair loss in women

    ReplyDelete
  10. Incredibly enjoyable write-up. honestly helpful. epidermis so-called internet websites I've obtained comprehend for a passing fancy issue, that particular is usually informative. Best no wagering casinos on offer

    ReplyDelete
  11. Incredibly enjoyable write-up. honestly helpful. epidermis so-called internet websites I've obtained comprehend for a passing fancy issue, that particular is usually informative. cannabis retail businessplan Canada

    ReplyDelete
  12. I've obtained explain a lot of post on this web site when quite a few ended up being rather interesting and also eye-catching. This post supplies effective title combined with effective data. cannabis dispensary business

    ReplyDelete
  13. My spouse and i seriously favorite examining your site. It was incredibly authored and also clear to see. Unlike far more I've obtained comprehend which may be honestly not necessarily that effective... ειδησεισ τωρα ελλαδα

    ReplyDelete
  14. Now i am surprised. You might be honestly educated and extremely wise. An individual published an issue that people can know and also developed specific difficult in your case. Now i am reducing this intended for long lasting take advantage of. hemp nursery business plan

    ReplyDelete
  15. Outstanding write-up and also men and women in the world will certainly acquire greatly as a result of examining which. I'm going to go back repeatedly to learn to read this type of posts. Cheers! Optima Mattress Range Sydney

    ReplyDelete
  16. Thanks a lot intended for publishing and also providing this great post. It really is and so enjoyable. I'd like to know some other facts about this kind of website. Therefore it is advisable to present us this information quickly. Let me undoubtedly recognize an individual. pompano beach vacation rentals

    ReplyDelete
  17. The actual an actual rather enjoyable submitting genuinely. Cheers intended for speaking around this type of enjoyable submitting close to. jual fire door

    ReplyDelete
  18. This content could be developed incredibly. Your current utilization of structure when discovering your own personal issues creates your own personal studies obvious and also clear to see. Thanks. engineering companies edmonton

    ReplyDelete
  19. This is the exceptional post, and also I would like more details when you have just about any. I am awestruck by simply this issue plus your write-up may be one of the better I've obtained comprehend. 6a certified machine shops edmonton

    ReplyDelete
  20. This might be a terrific and also helpful, that contain nearly all facts additionally features a great have an effect on the brand new design know-how. Cheers intended for providing which. fiberglass reinforcing bars

    ReplyDelete
  21. You will be providing a really wonderful, effective & helpful posts. best wishes intended for sharing! Medical Devices market research firm

    ReplyDelete
  22. This might be a terrific and also helpful, that contain nearly all facts additionally features a great have an effect on the brand new design know-how. Cheers intended for providing which This might be an incredible write-up. Must reveal you're in between your easiest copy internet writers We've acquired evaluate. Love associated with location this kind of post. شراء السيارات من أمريكا

    ReplyDelete
  23. We have now keep shows inside your face to face ones seat that contain fantastic answer, faithfulness and also interactivity by which a lot outperforms just about any publishing uncatalogued previously developed. Costs barrister

    ReplyDelete
  24. Which is really information, clean up and also apparent. I do think by which Almost everything may be referred to inside of structured way in order that market can quickly find utmost facts and discover quite a few points. طبيب نفسي

    ReplyDelete
  25. You may put together longevity, agility, endurance and also our health in the injure, more mature, overweight otherwise fragile buyer by way of introducing aquatic treatments on their life-style. برناج إدارة المبيعات لأكثر من فرع

    ReplyDelete
  26. A powerful share, I simply given this onto a colleague who was doing a little analysis on this. And he the truth is purchased me breakfast because I found it for him.. smile.Very well I really liked studying it! Buy Mirrors and art work online

    ReplyDelete
  27. This post is very simple to read and appreciate without leaving any details out. Great work! You completed certain reliable points there. I did a search on the subject and found nearly all persons will agree with your blog. cheap sandals online free shipping

    ReplyDelete
  28. This is a smart blog. I mean it. You have so much knowledge about this issue, and so much passion. You also know how to make people rally behind it, obviously from the responses. You've got a design here that's not too flashy, but makes a statement as big as what you're saying. Great job, indeed. boys romper suit

    ReplyDelete
  29. This was a great review of the digital publishing models. This was really very interesting and fun to read. They are impressive and has a lot of useful information. Macbook repair

    ReplyDelete
  30. I too remember standing in line at the school to get my sugar cube with the polio vaccine. One fellow in my class at school had polio. online sports gambling

    ReplyDelete
  31. The review will help people who have considered trying new business. The experience folks knows significantly. Board Games

    ReplyDelete
  32. Awesome blog. I enjoyed reading your articles. This is truly a great read for me. I have bookmarked it and I am looking forward to reading new articles. Keep up the good work! Dental Clinic In Bhopal

    ReplyDelete
  33. What a clear and concise walkthrough! Your step-by-step approach really demystifies the setup process for new Kafka producers and the use of a Web Console. It’s refreshing to see code snippets tied so neatly to real-world scenarios. Definitely bookmarking this for my Kafka toolbox! frp profiles

    ReplyDelete