Archive

Archive for the ‘messaging’ Category

Apache Kafka API Code Sample in Java

apache_kafka_api.png

In my earlier blog post, I already have explained what is Apache Kafka and how to install and test it with Apache zookeeper using out of box Apache Kafka commands. If you have missed my earlier article then please refer here: https://javainsider.wordpress.com/2017/05/12/apache-kafka-introduction-installation/

A simple producer/consumer application:

You’ve seen how Kafka works out of the box. Next, let’s develop a custom producer/consumer test client java program. The test producer will send 50  new messages to Kafka server from the sample standalone program. The test consumer will retrieve messages for a given topic and print them to the console in our standalone java application. The producer and consumer components in this case are your own implementations of kafka-console-producer.bat / .sh and kafka-console-consumer.bat/ .sh.

Let’s start by creating a TestProducer.java class. This client class contains logic to create 50 test messages and send them as a message to the Kafka server.

package com.javainsider.kafkaapi;
import java.util.Properties;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;

public class TestProducer {

 @SuppressWarnings("unchecked")
 public static void main(String[] args) {

 Properties props = new Properties();
 props.put("bootstrap.servers", "localhost:9092");
 props.put("acks", "all");
 props.put("retries", 0);
 props.put("batch.size", 16384);
 props.put("linger.ms", 1);
 props.put("buffer.memory", 33554432);
 props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
 props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

 @SuppressWarnings("rawtypes")
 KafkaProducer producer = new KafkaProducer(props);
 for (int key = 0; key < 50; key++) {
 producer.send(new ProducerRecord<String, String>("javainsider", Integer.toString(key), "My test message for key:"+key));
 System.out.println("Kafka topic Key=" + key +", Kafka Message:"+"My test message for key:"+key);
 }

 producer.close();
 }

}

Be sure configure the properties properly as per your kafka server host and port configured in your environment. You should also be sure you have the right topic name configured.

Let’s start by creating a TestConsumer.java class. This client class contains logic to read all messages from kafka server and print them into the console.

package com.javainsider.kafkaapi;

import java.util.Arrays;
import java.util.Properties;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

public class TestConsumer {

 public static void main(String[] args) {
 String topic = "javainsider";
 Properties props = new Properties();
 props.put("bootstrap.servers", "localhost:9092");
 props.put("group.id", "test");
 props.put("enable.auto.commit", "true");
 props.put("auto.commit.interval.ms", "1000");
 props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
 props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
 @SuppressWarnings("resource")
 KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);

 consumer.subscribe(Arrays.asList(topic));
 System.out.println("Subscribed to topic " + topic);

 while (true) {
 ConsumerRecords<String, String> records = consumer.poll(100);
 for (ConsumerRecord<String, String> record : records)
 System.out.printf("Kafka offset = %d, Kafka topic Key = %s, Kafka Message: = %s\n", record.offset(), record.key(), record.value());
 }
 }
}

Take care of your Kafka system configurations properly with topic name.

You can download the complete code from GIT:  https://github.com/javainsider/kafkaapi

Happy coding…..

Apache Kafka – Introduction & Installation

apache_kafkaApache Kafka is a distributed streaming platform. What exactly does that mean? Why this Kafka?

Most traditional messaging systems don’t scale up to handle big data in realtime, however. So engineers at LinkedIn built and open-sourced Kafka: a distributed messaging framework that meets the demands of big data by scaling on commodity hardware.

In this post let we will start with how to install and run Apache Kafka in your development environment.

What is Kafka?

Apache Kafka is messaging system built to scale for big data. Similar to Apache ActiveMQ or RabbitMq, Kafka enables applications built on different platforms to communicate via asynchronous message passing. But Kafka differs from these more traditional messaging systems in key ways:

  • It’s designed to scale horizontally, by adding more commodity servers.
  • It provides much higher throughput for both producer and consumer processes.
  • It can be used to support both batch and real-time use cases.
  • It doesn’t support JMS, Java’s message-oriented middleware API.

Kafka’s basic terminology:

  • A producer is process that can publish a message to a topic.
  • a consumer is a process that can subscribe to one or more topics and consume messages published to topics.
  • A topic category is the name of the feed to which messages are published.
  • Each record consists of a key, a value, and a timestamp.
  • A broker is a process running on single machine.
  • A cluster is a group of brokers working together.
  • Kafka is run as a cluster on one or more servers.

Kafka’s API’s:

  • The Producer API allows an application to publish a stream of records to one or more Kafka topics.
  • The Consumer API allows an application to subscribe to one or more topics and process the stream of records produced to them.
  • The Streams API allows an application to act as a stream processor, consuming an input stream from one or more topics and producing an output stream to one or more output topics, effectively transforming the input streams to output streams.
  • The Connector API allows building and running reusable producers or consumers that connect Kafka topics to existing applications or data systems. For example, a connector to a relational database might capture every change to a table.

More details about Apache Kafka: https://kafka.apache.org/intro.

Kafka Architecture:

kafka_cluster_architecture

Kafka Installation:

Before installing Apache Kafka, you need to install Apache Zookeeper.  Service required by kafka for maintaining  all the required configuration information and for providing distributed synchronization.

Download Apache Zookeeper from: Download Zookeeper

Extract the zookeeper-3.4.10.tar.gz into your local drive eg: C:\apache\zookeeper-3.4.10\

  • Once you extracted the zookeeper, locate the conf folder. eg: C:\apache\zookeeper-3.4.10\conf
  • Rename the “zoo_sample.cfg” to “zoo.cfg” inside the conf folder
  • Create a data directory for zookeeper in your local drive. eg: C:\zk_data
  • and update the “dataDir=C:/zk_data” in the “zoo.cfg” file.

Download Apache Kafka from: Download Kafka

Extract the “kafka_2.10-0.10.2.1.tgz” into your local drive eg: C:\apache\kafka-2.10\

  • after extract you will find the server.properties file in side the config folder, in my case its: C:\apache\kafka-2.10\config\

Lets now start the zookeeper followed by kafka:

  • Start the Zookeeper server by executing the command:

       C:\apache\zookeeper-3.4.10\bin>zkServer.cmd            

zk_start.PNG

  • Start apache Kafka server by executing the command:

  C:\apache\kafka-2.10\bin\windows>kafka-server-start.bat C:/apache/kafka-2.10/config/server.properties 

kfk_start

  • Create a test topic that you can use for testing: “javainsider”

         C:\apache\kafka-2.10\bin\windows>kafka-topics.bat –create –zookeeper localhost:2181 –replication-factor 1 –partitions 1 –topic javainsider      

topic_created.PNG

  • Start a simple console consumer that can consume messages published to a given topic, such as “javainsider”:

    C:\apache\kafka-2.10\bin\windows>kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic javainsider --from-beginning    

kafka_consumer.PNG

  • Start up a simple producer console that can publish messages to the test topic:

     C:\apache\kafka-2.10\bin\windows>kafka-console-producer.bat --broker-list localhost:9092 --topic javainsider   

kfk_producer.PNG

  • Try typing one or two messages into the producer console. Your messages should show in the consumer console.

Congratulations! You are done with Apache Kafka installation and testing a Kafka instance with an out-of-the-box producer and consumer.

In my next blog post I will explain how to use the Apache Kafka with Java programs. Happy learning 🙂

%d bloggers like this: