Archive

Posts Tagged ‘java kafka’

Apache Kafka API Code Sample in Java

apache_kafka_api.png

In my earlier blog post, I already have explained what is Apache Kafka and how to install and test it with Apache zookeeper using out of box Apache Kafka commands. If you have missed my earlier article then please refer here: https://javainsider.wordpress.com/2017/05/12/apache-kafka-introduction-installation/

A simple producer/consumer application:

You’ve seen how Kafka works out of the box. Next, let’s develop a custom producer/consumer test client java program. The test producer will send 50  new messages to Kafka server from the sample standalone program. The test consumer will retrieve messages for a given topic and print them to the console in our standalone java application. The producer and consumer components in this case are your own implementations of kafka-console-producer.bat / .sh and kafka-console-consumer.bat/ .sh.

Let’s start by creating a TestProducer.java class. This client class contains logic to create 50 test messages and send them as a message to the Kafka server.

package com.javainsider.kafkaapi;
import java.util.Properties;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;

public class TestProducer {

 @SuppressWarnings("unchecked")
 public static void main(String[] args) {

 Properties props = new Properties();
 props.put("bootstrap.servers", "localhost:9092");
 props.put("acks", "all");
 props.put("retries", 0);
 props.put("batch.size", 16384);
 props.put("linger.ms", 1);
 props.put("buffer.memory", 33554432);
 props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
 props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

 @SuppressWarnings("rawtypes")
 KafkaProducer producer = new KafkaProducer(props);
 for (int key = 0; key < 50; key++) {
 producer.send(new ProducerRecord<String, String>("javainsider", Integer.toString(key), "My test message for key:"+key));
 System.out.println("Kafka topic Key=" + key +", Kafka Message:"+"My test message for key:"+key);
 }

 producer.close();
 }

}

Be sure configure the properties properly as per your kafka server host and port configured in your environment. You should also be sure you have the right topic name configured.

Let’s start by creating a TestConsumer.java class. This client class contains logic to read all messages from kafka server and print them into the console.

package com.javainsider.kafkaapi;

import java.util.Arrays;
import java.util.Properties;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

public class TestConsumer {

 public static void main(String[] args) {
 String topic = "javainsider";
 Properties props = new Properties();
 props.put("bootstrap.servers", "localhost:9092");
 props.put("group.id", "test");
 props.put("enable.auto.commit", "true");
 props.put("auto.commit.interval.ms", "1000");
 props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
 props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
 @SuppressWarnings("resource")
 KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);

 consumer.subscribe(Arrays.asList(topic));
 System.out.println("Subscribed to topic " + topic);

 while (true) {
 ConsumerRecords<String, String> records = consumer.poll(100);
 for (ConsumerRecord<String, String> record : records)
 System.out.printf("Kafka offset = %d, Kafka topic Key = %s, Kafka Message: = %s\n", record.offset(), record.key(), record.value());
 }
 }
}

Take care of your Kafka system configurations properly with topic name.

You can download the complete code from GIT:  https://github.com/javainsider/kafkaapi

Happy coding…..

Advertisements
%d bloggers like this: