Archive

Archive for May, 2017

Apache Kafka API Code Sample in Java

apache_kafka_api.png

In my earlier blog post, I already have explained what is Apache Kafka and how to install and test it with Apache zookeeper using out of box Apache Kafka commands. If you have missed my earlier article then please refer here: https://javainsider.wordpress.com/2017/05/12/apache-kafka-introduction-installation/

A simple producer/consumer application:

You’ve seen how Kafka works out of the box. Next, let’s develop a custom producer/consumer test client java program. The test producer will send 50  new messages to Kafka server from the sample standalone program. The test consumer will retrieve messages for a given topic and print them to the console in our standalone java application. The producer and consumer components in this case are your own implementations of kafka-console-producer.bat / .sh and kafka-console-consumer.bat/ .sh.

Let’s start by creating a TestProducer.java class. This client class contains logic to create 50 test messages and send them as a message to the Kafka server.

package com.javainsider.kafkaapi;
import java.util.Properties;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;

public class TestProducer {

 @SuppressWarnings("unchecked")
 public static void main(String[] args) {

 Properties props = new Properties();
 props.put("bootstrap.servers", "localhost:9092");
 props.put("acks", "all");
 props.put("retries", 0);
 props.put("batch.size", 16384);
 props.put("linger.ms", 1);
 props.put("buffer.memory", 33554432);
 props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
 props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

 @SuppressWarnings("rawtypes")
 KafkaProducer producer = new KafkaProducer(props);
 for (int key = 0; key < 50; key++) {
 producer.send(new ProducerRecord<String, String>("javainsider", Integer.toString(key), "My test message for key:"+key));
 System.out.println("Kafka topic Key=" + key +", Kafka Message:"+"My test message for key:"+key);
 }

 producer.close();
 }

}

Be sure configure the properties properly as per your kafka server host and port configured in your environment. You should also be sure you have the right topic name configured.

Let’s start by creating a TestConsumer.java class. This client class contains logic to read all messages from kafka server and print them into the console.

package com.javainsider.kafkaapi;

import java.util.Arrays;
import java.util.Properties;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

public class TestConsumer {

 public static void main(String[] args) {
 String topic = "javainsider";
 Properties props = new Properties();
 props.put("bootstrap.servers", "localhost:9092");
 props.put("group.id", "test");
 props.put("enable.auto.commit", "true");
 props.put("auto.commit.interval.ms", "1000");
 props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
 props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
 @SuppressWarnings("resource")
 KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);

 consumer.subscribe(Arrays.asList(topic));
 System.out.println("Subscribed to topic " + topic);

 while (true) {
 ConsumerRecords<String, String> records = consumer.poll(100);
 for (ConsumerRecord<String, String> record : records)
 System.out.printf("Kafka offset = %d, Kafka topic Key = %s, Kafka Message: = %s\n", record.offset(), record.key(), record.value());
 }
 }
}

Take care of your Kafka system configurations properly with topic name.

You can download the complete code from GIT:  https://github.com/javainsider/kafkaapi

Happy coding…..

Apache Kafka – Introduction & Installation

apache_kafkaApache Kafka is a distributed streaming platform. What exactly does that mean? Why this Kafka?

Most traditional messaging systems don’t scale up to handle big data in realtime, however. So engineers at LinkedIn built and open-sourced Kafka: a distributed messaging framework that meets the demands of big data by scaling on commodity hardware.

In this post let we will start with how to install and run Apache Kafka in your development environment.

What is Kafka?

Apache Kafka is messaging system built to scale for big data. Similar to Apache ActiveMQ or RabbitMq, Kafka enables applications built on different platforms to communicate via asynchronous message passing. But Kafka differs from these more traditional messaging systems in key ways:

  • It’s designed to scale horizontally, by adding more commodity servers.
  • It provides much higher throughput for both producer and consumer processes.
  • It can be used to support both batch and real-time use cases.
  • It doesn’t support JMS, Java’s message-oriented middleware API.

Kafka’s basic terminology:

  • A producer is process that can publish a message to a topic.
  • a consumer is a process that can subscribe to one or more topics and consume messages published to topics.
  • A topic category is the name of the feed to which messages are published.
  • Each record consists of a key, a value, and a timestamp.
  • A broker is a process running on single machine.
  • A cluster is a group of brokers working together.
  • Kafka is run as a cluster on one or more servers.

Kafka’s API’s:

  • The Producer API allows an application to publish a stream of records to one or more Kafka topics.
  • The Consumer API allows an application to subscribe to one or more topics and process the stream of records produced to them.
  • The Streams API allows an application to act as a stream processor, consuming an input stream from one or more topics and producing an output stream to one or more output topics, effectively transforming the input streams to output streams.
  • The Connector API allows building and running reusable producers or consumers that connect Kafka topics to existing applications or data systems. For example, a connector to a relational database might capture every change to a table.

More details about Apache Kafka: https://kafka.apache.org/intro.

Kafka Architecture:

kafka_cluster_architecture

Kafka Installation:

Before installing Apache Kafka, you need to install Apache Zookeeper.  Service required by kafka for maintaining  all the required configuration information and for providing distributed synchronization.

Download Apache Zookeeper from: Download Zookeeper

Extract the zookeeper-3.4.10.tar.gz into your local drive eg: C:\apache\zookeeper-3.4.10\

  • Once you extracted the zookeeper, locate the conf folder. eg: C:\apache\zookeeper-3.4.10\conf
  • Rename the “zoo_sample.cfg” to “zoo.cfg” inside the conf folder
  • Create a data directory for zookeeper in your local drive. eg: C:\zk_data
  • and update the “dataDir=C:/zk_data” in the “zoo.cfg” file.

Download Apache Kafka from: Download Kafka

Extract the “kafka_2.10-0.10.2.1.tgz” into your local drive eg: C:\apache\kafka-2.10\

  • after extract you will find the server.properties file in side the config folder, in my case its: C:\apache\kafka-2.10\config\

Lets now start the zookeeper followed by kafka:

  • Start the Zookeeper server by executing the command:

       C:\apache\zookeeper-3.4.10\bin>zkServer.cmd            

zk_start.PNG

  • Start apache Kafka server by executing the command:

  C:\apache\kafka-2.10\bin\windows>kafka-server-start.bat C:/apache/kafka-2.10/config/server.properties 

kfk_start

  • Create a test topic that you can use for testing: “javainsider”

         C:\apache\kafka-2.10\bin\windows>kafka-topics.bat –create –zookeeper localhost:2181 –replication-factor 1 –partitions 1 –topic javainsider      

topic_created.PNG

  • Start a simple console consumer that can consume messages published to a given topic, such as “javainsider”:

    C:\apache\kafka-2.10\bin\windows>kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic javainsider --from-beginning    

kafka_consumer.PNG

  • Start up a simple producer console that can publish messages to the test topic:

     C:\apache\kafka-2.10\bin\windows>kafka-console-producer.bat --broker-list localhost:9092 --topic javainsider   

kfk_producer.PNG

  • Try typing one or two messages into the producer console. Your messages should show in the consumer console.

Congratulations! You are done with Apache Kafka installation and testing a Kafka instance with an out-of-the-box producer and consumer.

In my next blog post I will explain how to use the Apache Kafka with Java programs. Happy learning 🙂

Most Important Java 8 VM Options for Servers

java

Let’s talk about java enterprise server applications. I always want to know what are the best or default JVM settings for a server application to start with in production? I read a lot on the web and tried several things myself and wanted to share what I found out:

 

-server

Use “-server”: All 64-bit JVMs use the server VM as default. This setting generally optimizes the JVM for long running server applications instead of startup time. The JVM will collect more data about the Java byte code during program execution and generate the most efficient machine code via JIT.

-Xms=<heap size>[g|m|k] -Xmx=<heap size>[g|m|k]

The “-Xmx/-Xms” settings specify the maximum and minimum values for the JVM heap memory. For servers, both params should have the same value to avoid heap resizing during runtime.

Depending on your application, you will have to try out how much memory will be best suited for your application.

-XX:MaxMetaspaceSize=<metaspace size>[g|m|k]

Java 8 has no “Permanent Generation” (PermGen) anymore but requires additional “Metaspace” memory instead. This memory is used, in addition to the heap memory we specified before, for storing class meta data information.

The default size will be unlimited – I tend to limit MaxMetaspaceSize with a somewhat high value. Just in case something goes wrong with the application, the JVM will not hog all the memory of the server.

Note: Let your application run for a couple of days to get a feeling for how much Metaspace Size it uses normally. Upon next restart of the application set the limit to e.g. double the value.

-XX:+CMSClassUnloadingEnabled

Additionally, you might want to allow the JVM to unload classes which are held in memory but no code is pointing to them any more. If your application generates lots of dynamic classes, this is what you want.

-XX:+UseConcMarkSweepGC

This option makes the JVM use the ConcurrentMarkSweepGC – It can do much work in parallel to program execution but in some circumstances a “full GC” with a “STW pause” might still occur. I’ve read many articles and came to the conclusion that this GC is still the best one for server workloads.

-XX:+CMSParallelRemarkEnabled

The option CMSParallelRemarkEnabled means the remarking is done in parallel to program execution – which is what you want if your server has many cores (and most servers do).

 -XX:+UseCMSInitiatingOccupancyOnly
 -XX:CMSInitiatingOccupancyFraction=<percent>

Normally the GC will use heuristics to know when it’s time to clear memory. GC might kick in too late with default settings (causing full-Gcs).
Some sources say it might be a good idea to disable heuristics altogether and just use generation occupancy to start a CMS collection cycle. Setting values around 70% worked fine for all of my applications and use cases.

-XX:+ScavengeBeforeFullGC

The first option tells the GC to first free memory by clearing out the “young generation” or fairly new objects before doing a full GC.

-XX:+CMSScavengeBeforeRemark

CMSScavengeBeforeRemark does attempt a minor collection before the CMS remark phase – thus keeping the remark pause afterwards short.

-XX:+ExplicitGCInvokesConcurrentAndUnloadsClasses

The option “-XX:+ExplicitGCInvokesConcurrentAndUnloadsClasses” is especially important if your application uses RMI (remote method invocation). The usage of RMI will cause the JVM to do a FULL-GC EVERY HOUR! This might be a very bad idea for large heap sizes because the FULL-GC pause might take up to several seconds. It would be better to do a concurrent GC and try to unload unused classes to free up more memory – which is exactly what the second option does.

-XX:+PrintGCDateStamps
-verbose:gc
-XX:+PrintGCDetails
-Xloggc:"<path to log>"

These options shown here will write out all GC related information to a specified log file. You can see how well your GC configuration works by looking into it.

I personally prefer to use the “Visual GC” plug in for the “Visual VM” tool to monitor the general JVM and GC behavior.

-XX:+HeapDumpOnOutOfMemoryError
-XX:HeapDumpPath=<path to dump>'date'.hprof

When your JVM runs out of memory, you will want to know why. Since the OOM error might be hard to reproduce and you want to get your production server up and running again – you should specify a path for a heap dump. When things have settled down, you can analyze the dump afterwards.

-Djava.rmi.server.hostname=<external IP>
-Dcom.sun.management.jmxremote.port=<port>

These options will help you to specify an IP and port for JMX – you will need those ports open to connect remotely to a JVM running on a server for tools like VisualVM. You can gain deep insights over CPU and memory usage, gc behaviour, class loading, thread count and usage of your application this way.

Lastly, I would like to recommend to you the VisualVM tool which is bundled with the Java 8 JDK. You can use it to gain more insights about your specific application behaviour on the JVM – like cpu and memory usage, thread utilisation and much more.

VisualVM can be extended with a plug in called “Visual GC”. It will briefly show you VERY detailed information about the usage of the young and old generation object spaces. You can easily spot problems with garbage collection simply by analyzing these graphs during application runtime.

Categories: J2EE, Java
%d bloggers like this: