Apache Kafka API Code Sample in Java


In my earlier blog post, I already have explained what is Apache Kafka and how to install and test it with Apache zookeeper using out of box Apache Kafka commands. If you have missed my earlier article then please refer here: https://javainsider.wordpress.com/2017/05/12/apache-kafka-introduction-installation/

A simple producer/consumer application:

You’ve seen how Kafka works out of the box. Next, let’s develop a custom producer/consumer test client java program. The test producer will send 50  new messages to Kafka server from the sample standalone program. The test consumer will retrieve messages for a given topic and print them to the console in our standalone java application. The producer and consumer components in this case are your own implementations of kafka-console-producer.bat / .sh and kafka-console-consumer.bat/ .sh.

Let’s start by creating a TestProducer.java class. This client class contains logic to create 50 test messages and send them as a message to the Kafka server.

package com.javainsider.kafkaapi;
import java.util.Properties;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;

public class TestProducer {

 public static void main(String[] args) {

 Properties props = new Properties();
 props.put("bootstrap.servers", "localhost:9092");
 props.put("acks", "all");
 props.put("retries", 0);
 props.put("batch.size", 16384);
 props.put("linger.ms", 1);
 props.put("buffer.memory", 33554432);
 props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
 props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

 KafkaProducer producer = new KafkaProducer(props);
 for (int key = 0; key < 50; key++) {
 producer.send(new ProducerRecord<String, String>("javainsider", Integer.toString(key), "My test message for key:"+key));
 System.out.println("Kafka topic Key=" + key +", Kafka Message:"+"My test message for key:"+key);



Be sure configure the properties properly as per your kafka server host and port configured in your environment. You should also be sure you have the right topic name configured.

Let’s start by creating a TestConsumer.java class. This client class contains logic to read all messages from kafka server and print them into the console.

package com.javainsider.kafkaapi;

import java.util.Arrays;
import java.util.Properties;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

public class TestConsumer {

 public static void main(String[] args) {
 String topic = "javainsider";
 Properties props = new Properties();
 props.put("bootstrap.servers", "localhost:9092");
 props.put("group.id", "test");
 props.put("enable.auto.commit", "true");
 props.put("auto.commit.interval.ms", "1000");
 props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
 props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
 KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);

 System.out.println("Subscribed to topic " + topic);

 while (true) {
 ConsumerRecords<String, String> records = consumer.poll(100);
 for (ConsumerRecord<String, String> record : records)
 System.out.printf("Kafka offset = %d, Kafka topic Key = %s, Kafka Message: = %s\n", record.offset(), record.key(), record.value());

Take care of your Kafka system configurations properly with topic name.

You can download the complete code from GIT:  https://github.com/javainsider/kafkaapi

Happy coding…..


Apache Kafka – Introduction & Installation

apache_kafkaApache Kafka is a distributed streaming platform. What exactly does that mean? Why this Kafka?

Most traditional messaging systems don’t scale up to handle big data in realtime, however. So engineers at LinkedIn built and open-sourced Kafka: a distributed messaging framework that meets the demands of big data by scaling on commodity hardware.

In this post let we will start with how to install and run Apache Kafka in your development environment.

What is Kafka?

Apache Kafka is messaging system built to scale for big data. Similar to Apache ActiveMQ or RabbitMq, Kafka enables applications built on different platforms to communicate via asynchronous message passing. But Kafka differs from these more traditional messaging systems in key ways:

  • It’s designed to scale horizontally, by adding more commodity servers.
  • It provides much higher throughput for both producer and consumer processes.
  • It can be used to support both batch and real-time use cases.
  • It doesn’t support JMS, Java’s message-oriented middleware API.

Kafka’s basic terminology:

  • A producer is process that can publish a message to a topic.
  • a consumer is a process that can subscribe to one or more topics and consume messages published to topics.
  • A topic category is the name of the feed to which messages are published.
  • Each record consists of a key, a value, and a timestamp.
  • A broker is a process running on single machine.
  • A cluster is a group of brokers working together.
  • Kafka is run as a cluster on one or more servers.

Kafka’s API’s:

  • The Producer API allows an application to publish a stream of records to one or more Kafka topics.
  • The Consumer API allows an application to subscribe to one or more topics and process the stream of records produced to them.
  • The Streams API allows an application to act as a stream processor, consuming an input stream from one or more topics and producing an output stream to one or more output topics, effectively transforming the input streams to output streams.
  • The Connector API allows building and running reusable producers or consumers that connect Kafka topics to existing applications or data systems. For example, a connector to a relational database might capture every change to a table.

More details about Apache Kafka: https://kafka.apache.org/intro.

Kafka Architecture:


Kafka Installation:

Before installing Apache Kafka, you need to install Apache Zookeeper.  Service required by kafka for maintaining  all the required configuration information and for providing distributed synchronization.

Download Apache Zookeeper from: Download Zookeeper

Extract the zookeeper-3.4.10.tar.gz into your local drive eg: C:\apache\zookeeper-3.4.10\

  • Once you extracted the zookeeper, locate the conf folder. eg: C:\apache\zookeeper-3.4.10\conf
  • Rename the “zoo_sample.cfg” to “zoo.cfg” inside the conf folder
  • Create a data directory for zookeeper in your local drive. eg: C:\zk_data
  • and update the “dataDir=C:/zk_data” in the “zoo.cfg” file.

Download Apache Kafka from: Download Kafka

Extract the “kafka_2.10-” into your local drive eg: C:\apache\kafka-2.10\

  • after extract you will find the server.properties file in side the config folder, in my case its: C:\apache\kafka-2.10\config\

Lets now start the zookeeper followed by kafka:

  • Start the Zookeeper server by executing the command:



  • Start apache Kafka server by executing the command:

  C:\apache\kafka-2.10\bin\windows>kafka-server-start.bat C:/apache/kafka-2.10/config/server.properties 


  • Create a test topic that you can use for testing: “javainsider”

         C:\apache\kafka-2.10\bin\windows>kafka-topics.bat –create –zookeeper localhost:2181 –replication-factor 1 –partitions 1 –topic javainsider      


  • Start a simple console consumer that can consume messages published to a given topic, such as “javainsider”:

    C:\apache\kafka-2.10\bin\windows>kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic javainsider --from-beginning    


  • Start up a simple producer console that can publish messages to the test topic:

     C:\apache\kafka-2.10\bin\windows>kafka-console-producer.bat --broker-list localhost:9092 --topic javainsider   


  • Try typing one or two messages into the producer console. Your messages should show in the consumer console.

Congratulations! You are done with Apache Kafka installation and testing a Kafka instance with an out-of-the-box producer and consumer.

In my next blog post I will explain how to use the Apache Kafka with Java programs. Happy learning 🙂

Most Important Java 8 VM Options for Servers


Let’s talk about java enterprise server applications. I always want to know what are the best or default JVM settings for a server application to start with in production? I read a lot on the web and tried several things myself and wanted to share what I found out:



Use “-server”: All 64-bit JVMs use the server VM as default. This setting generally optimizes the JVM for long running server applications instead of startup time. The JVM will collect more data about the Java byte code during program execution and generate the most efficient machine code via JIT.

-Xms=<heap size>[g|m|k] -Xmx=<heap size>[g|m|k]

The “-Xmx/-Xms” settings specify the maximum and minimum values for the JVM heap memory. For servers, both params should have the same value to avoid heap resizing during runtime.

Depending on your application, you will have to try out how much memory will be best suited for your application.

-XX:MaxMetaspaceSize=<metaspace size>[g|m|k]

Java 8 has no “Permanent Generation” (PermGen) anymore but requires additional “Metaspace” memory instead. This memory is used, in addition to the heap memory we specified before, for storing class meta data information.

The default size will be unlimited – I tend to limit MaxMetaspaceSize with a somewhat high value. Just in case something goes wrong with the application, the JVM will not hog all the memory of the server.

Note: Let your application run for a couple of days to get a feeling for how much Metaspace Size it uses normally. Upon next restart of the application set the limit to e.g. double the value.


Additionally, you might want to allow the JVM to unload classes which are held in memory but no code is pointing to them any more. If your application generates lots of dynamic classes, this is what you want.


This option makes the JVM use the ConcurrentMarkSweepGC – It can do much work in parallel to program execution but in some circumstances a “full GC” with a “STW pause” might still occur. I’ve read many articles and came to the conclusion that this GC is still the best one for server workloads.


The option CMSParallelRemarkEnabled means the remarking is done in parallel to program execution – which is what you want if your server has many cores (and most servers do).


Normally the GC will use heuristics to know when it’s time to clear memory. GC might kick in too late with default settings (causing full-Gcs).
Some sources say it might be a good idea to disable heuristics altogether and just use generation occupancy to start a CMS collection cycle. Setting values around 70% worked fine for all of my applications and use cases.


The first option tells the GC to first free memory by clearing out the “young generation” or fairly new objects before doing a full GC.


CMSScavengeBeforeRemark does attempt a minor collection before the CMS remark phase – thus keeping the remark pause afterwards short.


The option “-XX:+ExplicitGCInvokesConcurrentAndUnloadsClasses” is especially important if your application uses RMI (remote method invocation). The usage of RMI will cause the JVM to do a FULL-GC EVERY HOUR! This might be a very bad idea for large heap sizes because the FULL-GC pause might take up to several seconds. It would be better to do a concurrent GC and try to unload unused classes to free up more memory – which is exactly what the second option does.

-Xloggc:"<path to log>"

These options shown here will write out all GC related information to a specified log file. You can see how well your GC configuration works by looking into it.

I personally prefer to use the “Visual GC” plug in for the “Visual VM” tool to monitor the general JVM and GC behavior.

-XX:HeapDumpPath=<path to dump>'date'.hprof

When your JVM runs out of memory, you will want to know why. Since the OOM error might be hard to reproduce and you want to get your production server up and running again – you should specify a path for a heap dump. When things have settled down, you can analyze the dump afterwards.

-Djava.rmi.server.hostname=<external IP>

These options will help you to specify an IP and port for JMX – you will need those ports open to connect remotely to a JVM running on a server for tools like VisualVM. You can gain deep insights over CPU and memory usage, gc behaviour, class loading, thread count and usage of your application this way.

Lastly, I would like to recommend to you the VisualVM tool which is bundled with the Java 8 JDK. You can use it to gain more insights about your specific application behaviour on the JVM – like cpu and memory usage, thread utilisation and much more.

VisualVM can be extended with a plug in called “Visual GC”. It will briefly show you VERY detailed information about the usage of the young and old generation object spaces. You can easily spot problems with garbage collection simply by analyzing these graphs during application runtime.

Categories: J2EE, Java

Java 8 Default Methods

Java8-LogoInterfaces in Java always contained method declaration not their definitions (method body). There was no way of defining method body / definition in interfaces. This is because historically Java didn’t allow multiple inheritance of classes. It allowed multiple inheritance of interfaces as interface were nothing but method declaration. This solves the problem of ambiguity in multiple inheritance. Since Java 8 it is now possible to add method bodies in interfaces.

Java 8 has a new feature called Default Methods. It is now possible to add method bodies into interfaces!

package com.javainsider.java8.defaultmethods;

public interface DefaultMethod {
int addNumber(int num1, int num2);
default int multiplyNumber(int num1 , int num2){
return num1 * num2;

In above DefaultMethod interface we added a method multiplyNumber with actual method body.

Why we need Default Methods?

Why would one want to add methods into Interfaces? We’ll it is because interfaces are too tightly coupled with their implementation classes. i.e. it is not possible to add a method in interface without breaking the implementor class. Once you add a method in interface, all its implemented classes must declare method body of this new method.

Since Java 8, things started getting ugly. A new feature Lambda was introduce which is cool. However it is not possible to use this feature in existing Java libraries such as java.util package. If you add a single method in interface List, it breaks everything. You need to add its implementation in every class that implements List interface. Imagine in real world how many custom classes would change.

So for backward compatibility, Java 8 cleverly added Default Methods.

Virtual Extension Methods

It added a new concept Virtual extension methods, or as they are often called defender methods, can now be added to interfaces providing a default implementation of the declared behavior. So existing interfaces can be augmented without compromising backward compatibility by adding extension methods to the interface, whose declaration would contain instructions for finding the default implementation in the event that implementors do not provide a method body. A key characteristic of extension methods is that they are virtual methods just like other interface methods, but provide a default implementation in the event that the implementing class does not provide a method body.

Consider following example:

package com.javainsider.java8.defaultmethods;

public interface Person {

default void sayHello() {

public void sayGoodby();



package com.javainsider.java8.defaultmethods;

public class JavaPerson implements Person {

public void sayGoodby() {
System.out.println(“Goodby Java person……”);


package com.javainsider.java8.defaultmethods;

public class TestJavaPerson {

public static void main(String[] args) {
JavaPerson javaPerson = new JavaPerson();

// calling sayHello method calls the method defined in interface

// calling sayGoodby- method calls the method implemented in JavapPerson




Goodby Java person……

In above code we added a defender method sayHello() in Person interface. So it was ok for class JavaPersonto avoid declaring this methods body.

What about Multiple Inheritance?

Adding method definitions in interfaces can add ambiguity in multiple inheritance. isn’t it? Well, it does. However Java 8 handle this issue at Compile type. Consider below example:

package com.javainsider.java8.multipleinheritance;

public interface Person {

default void sayHello() {


package com.javainsider.java8.multipleinheritance;

public interface Female {
default void sayHello() {
System.out.println(“Hello Ms….”);


package com.javainsider.java8.multipleinheritance;

public interface Male {

default void sayHello() {
System.out.println(“Hello Mr….”);


package com.javainsider.java8.multipleinheritance;

public class Martin implements Person, Male{

public void sayHello() {
System.out.println(“Hello… I am Martin here….”);


It is also possible to explicitly call method from child class to parent interface. Consider in above example you want to call sayHello method from Male interface when Martin.sayHello is called. You can use super keyword to explicitly call the appropriate method or you can write your own implementation.

You can access the code from GIT repo:  https://github.com/javainsider/java8features


Happy Coding!




Install MongoDB On Mac OS X

Easy spets to nstall MongoDB on Mac OS X ( I have used MAC OS – Mavericks)


Step 1: Download MongoDB:

Download MongoDB for MAC OS from official MongoDB site:


Step 2: Install MongoDB:

$ cd ~/Download
$ tar xzf mongodb-osx-x86_64-2.6.1.tar
$ sudo mv mongodb-osx-x86_64-2.6.1 /usr/local/mongodb


Step 3: Create MongoDB Data Directory:

By default, MongoDB write/store data into the /data/db folder, you need to create this folder manually and assign proper permission as given below:

$ sudo mkdir -p /data/db
$ whoami
$ sudo chown garnaik /data/db

Step 4: Set you PATH on Bash Profile for MongoDB:

Create a ~/.bash_profile file if not exists else update, and assign /usr/local/mongodb/bin to $PATH environment variable, so that you can access MongoDB on commands easily.

$ cd ~
$ pwd/Users/garnaik
$ touch .bash_profile
$ vim .bash_profile
##restart terminal
$ mongo -version
MongoDB shell version: 2.6.1

Step 5: Setting for auto start MongoDB:

To auto start yor installed mongoDB, create a launchd job on your Mac OS.

$ sudo vim /Library/LaunchDaemons/mongodb.plist

And add the following content to “/Library/LaunchDaemons/mongodb.plist”

<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE plist PUBLIC "-//Apple//DTD PLIST 1.0//EN"
<plist version="1.0">

Save the file and load the above job:

$ sudo launchctl load /Library/LaunchDaemons/mongodb.plist

$ ps -ef | grep mongo
    0    12     1   0  4:06PM ??         0:20.29 /usr/local/mongodb/bin/mongod
  501   542   435   0  4:37PM ttys000    0:00.00 grep mongo

Now restart your MAC OS.

Step 6: Start MongoDB:

If you already have added the above steps then MongoDB will start by the time your MAC OS started.


Otherwise you have to start MongoDB by using the below command:

$ mongod

Now ready to use MongoDB,


$ mongo
> mongod -version 
MongoDB shell version: 2.6.1
> show dbs
Local (empty)

Proxy Design Pattern (GOF)

ProxyWhen to use this pattern?
Proxy pattern is used when we need to create a wrapper to cover the main object’s complexity from the client.

  • Provide a surrogate or placeholder for another object to control access to it.
  • Use an extra level of indirection to support distributed, controlled, or intelligent access.
  • Add a wrapper and delegation to protect the real component from undue complexity.

You need to support resource-hungry objects, and you do not want to instantiate such objects unless and until they are actually requested by the client.

What are the usage scenarios?

  1. Virtual Proxy – Imagine a situation where there is multiple database call to extract huge size image. Since this is an expensive operation we can possibly use the proxy pattern which would create multiple proxies and point to the huge size memory consuming object for further processing. The real object gets created only when a client first requests/accesses the object and after that we can just refer to the proxy to reuse the object. This avoids duplication of the object and hence saving memory.
  2. Remote Proxy – A remote proxy can be thought about the stub in the RPC call. The remote proxy provides a local representation of the object which is present in the different address location. Another example can be providing interface for remote resources such as web service or REST resources.
  3. Protective Proxy – The protective proxy acts as an authorisation layer to verify if the actual user has access to appropriate content. An example can be thought about the proxy server which provides restrictive internet access in office. Only the websites and contents which are valid will be allowed and the remaining ones will be blocked.
  4. Smart Proxy – A smart proxy provides additional layer of security by interposing specific actions when the object is accessed. An example can be to check if the real object is locked before it is accessed to ensure that no other object can change it.


  1. Subject – This object defines the common interface for RealSubject and Proxy so that a Proxy can be used anywhere a RealSubject is expected.
  2. Proxy – It maintains a reference to the RealSubject so that Proxy can access it. It also implements the same interface as the RealSubject so that Proxy can be used in place of RealSubject. Proxy also controls the access to the RealSubject and can create or delete this object.
  3. RealSubject – This refers the main object which the proxy represents.


Code Example: –  Virtual Proxy Example:

As mentioned earlier virtual proxy is useful to save expensive memory resources. Let’s take a scenario where the real image contains a huge size data which clients needs to access. To save our resources and memory the implementation will be as below:

– Create an interface which will be accessed by the client. All its methods will be implemented by the ProxyImage class and RealImage class.
– RealImage runs on the different system and contains the image information is accessed from the database.
– The ProxyImage which is running on a different system can represent the RealImage in the new system. Using the proxy we can avoid multiple loading of the image.


public interface Image {
public void showImage();



public class RealImage implements Image {

private String fileName = null;
public RealImage(String strFileName){
this.fileName = strFileName;
public void showImage() {
System.out.println(“Show Image:” +fileName);





public class ProxyImage implements Image {
private RealImage img= null;
private String fileName = null;

public ProxyImage(String strFileName) {
this.fileName = strFileName;
* (non-Javadoc)
* @see com.proxy.virtualproxy.Image#showImage()
public void showImage() {
if(img == null){
img = new RealImage(fileName);


public class Client {
public static void main(String[] args) {
final Image img1 = new ProxyImage(“Image***1”);
final Image img2 = new ProxyImage(“Image***2”);


  •  One of the advantages of Proxy pattern as you have seen in the above example is about security.
  •  This pattern avoids duplication of objects which might be huge size and memory intensive. This in turn increases the performance of the application.
  •  The remote proxy also ensures about security by installing the local code proxy (stub) in the client machine and then accessing the server with help of the remote code.



This pattern introduces another layer of abstraction which sometimes may be an issue if the RealSubject code is accessed by some of the clients directly and some of them might access the Proxy classes. This might cause disparate behaviour.

Rules of thumb:
– There are few differences between the related patterns. Like Adapter pattern gives a different interface to its subject, while Proxy patterns provides the same interface from the original object but the decorator provides an enhanced interface. Decorator pattern adds additional behaviour at runtime.

– Proxy used in Java API:  java.rmi.*;



MVP – Minimum Viable Product Strategy

Not all good ideas turn out into great products.  There are quite a few products that fail in the market due to a variety of reasons.  But does it stop us from investing in the Products? How do we find out if a Product is going to be a hit in the market?  And more importantly how do we find that out by investing the minimum amount possible.  Enter MVP – The Minimum Viable Product.
The concept of MVP is getting acceptance throughout in the area of Product Development.  It is a concept largely used by start ups.  And it will immensely help new product design and development in larger organizations.
What is an MVP?
Our objective with a Minimum Viable Product is to provide a mechanism for maximum learning about the target audience or the target market with the minimum effort.  Does it mean that we only ship 3 out of the 10 features that is required to hit the market at the earliest.  No.  The concept is beyond just the product features.  A Minimum Viable Product takes into account the Product idea, how it generates interest among the users, what features that the customers or the market really wants, demand for the product, etc.  It is a strategy that is used for learning about the customers early into the product life cycle, so that they can make the changes for the good.
Strategies for MVP
  • A survey for the likely features of the product
  • An email campaign to see the interest generated for the product
  • A website which shares videos, articles about the features and the benefits to the customers
  • Continuous Deployment (An Agile Practice, which can add more value to the business incrementally providing more learning)
  • A prototype / demo version of the product featuring the critical features
  • A closed beta for only very few customers
Why do we need MVP?
  • We do not have endless budget to build products
  • We want to cut the risk of a worst case scenario.  What if the product fails after 2 years of hard work?
  • We want to stay in the business yet not stop experimenting for potential opportunities
Benefits of MVP
  • Maximum learnings from minimum efforts
  • Early feedback about the product
  • Ability to realize the true value / demand of a product more quickly
  • More energy into the product development as we see some real demands
Demerits of MVP
  • Difficult to conceptualize the apt minimum.  There is no pre-defined rules for coming up with a minimum, it is pretty much judgemental.
  • Need to alter the product road map according to the feedback / learning which might be possible only with teams that are Agile.
The concept of MVP is ideal for start-ups and new R&D products/ideas, where your feedback cycle needs to be really fast.  It is better for a start-up to fail with a product idea in 3 months than 1 year.  The time and effort that is saved due to a MVP strategy is a big boon for start-ups and even larger organizations trying for new R&D product/ideas.
%d bloggers like this: