Apache Kafka Series | Session 6 | Java Producer

Now, we’ve grown to write down a Kafka producer in Java. We’ve already seen how to create a Kafka producer using CLI, now it’s time to learn Java producer too.

1. CREATE A MAVEN PROJECT AND ADD DEPENDENCIES

These are the dependencies added by me in pom.xml3

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <parent>
        <artifactId>kafka-diy</artifactId>
        <groupId>com.inspired.java</groupId>
        <version>1.0</version>
    </parent>
    <modelVersion>4.0.0</modelVersion>
    <artifactId>kafka-basics</artifactId>
    <dependencies>
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>2.6.0</version>
        </dependency>

        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-simple</artifactId>
            <version>1.7.30</version>
        </dependency>

        <dependency>
            <groupId>com.fasterxml.jackson.core</groupId>
            <artifactId>jackson-databind</artifactId>
            <version>2.11.2</version>
        </dependency>
    </dependencies>

</project>

kafka-clients is used to create Kafka producer and consumer in a programmatic way. slf4j-simple is used for logging purpose and jackson-databind is used to convert POJO to JSON and vice-versa.

2. create a java producer
package kafka.basics.producer;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.Properties;

public class ProducerDemo {
    public static void main(String[] args) {
        Properties properties = new Properties();
        properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

        KafkaProducer<String, String> producer = new KafkaProducer<>(properties);
        String value = "{\"created_at\":\"Mon Nov 30 12:26:49 +0000 2020\",\"id\":1358987003086311427,\"text\":\"mi ne aaj ke ipl maich mein csk ko chaar viket se haraaya\",\"favorited\":false,\"retweeted\":false,\"lang\":\"hi\",\"timestamp_ms\":\"1606755809325\"}";

        ProducerRecord<String, String> record = new ProducerRecord<>("tweet", value);

        producer.send(record);
        producer.flush();
        producer.close();
    }
}

Here, we’ve added three properties, first one defines the address of bootstrap server, second one tells the type of the key given in the producer and finally third one describes the type of the value given in the producer. Here both key and value are of type String.

We’ve then created a KafkaProducer object and a ProducerRecord object. Then we’ve sent the record using the producer object. As the producer.send() method is asynchronous, we’ve added couple of lines (flush() and close()) to make in synchronous, otherwise as soon as the producer.send() is executed, the program will be terminated and nothing will be sent.

Here the key is “tweet” and the value is a sample tweet given in JSON format.

3. Create the topic

Now, it’s the time to create a topic namely “tweet”. In order to create that we need to get both zookeeper and kafka started.

zookeeper-server-start.bat config\zookeeper.properties

kafka-server-start.bat config/server.properties

kafka-topics --zookeeper localhost:2181 --topic tweet --create --partitions 3 --replication-factor 1

We’ve already discussed about zookeeper, Kafka and topic in our one of the previous posts. Please have a look if you are still confused.

4. create a consumer cli

As we’ve already discussed, creating a consumer CLI is as simple as that.

kafka-console-consumer --bootstrap-server localhost:9092 --topic tweet

Now you can see, as soon as the producer sent the record, it’s being consumed by the consumer CLI. This is the output.