Apache Kafka Series | Session 9 | Java Consumer

Now, we’re going to learn how a java consumer for Kafka is being created. Here, we should define Bootstrap server IP, Key deserializer and Value deserializer classes like we did at producer class. For a producer we’d defined Key and Value serializer classes, here it’s deserializer classes, that’s the difference.

If you haven’t read Java Producer article before, please have a look at this.

Apart from the above mentioned properties, we’ll have to add two more properties. First one is the Group ID, which we’ve already explained in one of our previous posts. The Group ID is mandatory for Java consumer, otherwise we’ll get an exception like this.

Exception in thread "main" org.apache.kafka.common.errors.InvalidGroupIdException: To use the group management or offset commit APIs, you must provide a valid group.id in the consumer configuration.

Second one is the auto.offset.reset property, which accepts three values of type String. The property defines, what to do when there is no initial offset in Kafka or if the current offset does not exist any more on the server (e.g. because that data has been deleted):

It accepts following values:

  • earliest: automatically reset the offset to the earliest offset (equivalent to --from-beginning)
  • latest: automatically reset the offset to the latest offset
  • none: throw exception to the consumer if no previous offset is found for the consumer’s group

If we pass anything else, it will throw an exception.

This is our consumer class

package kafka.basics.consumer;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;

public class ConsumerDemo {
    public static void main(String[] args) {
        Logger logger = LoggerFactory.getLogger(ConsumerDemo.class.getName());
        Properties properties = new Properties();
        properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "kafka-tweet-group");
        properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);
        consumer.subscribe(Arrays.asList("tweet"));

        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
            for (ConsumerRecord record : records) {
                logger.info("Key: " + record.key() + "\n" +
                        "Value: " + record.value() + "\n" +
                        "Partition: " + record.partition() + "\n" +
                        "Offset: " + record.offset()
                );
            }
        }
    }
}

Run the consumer class first, and whenever we send a message to the topic tweet, by using the java producer, that we have already discussed here, this consumer consumes that message and outputs like this.

Key: null
Value: {"created_at":"Mon Nov 30 12:26:49 +0000 2020","id":1358987003086311427,"text":"mi ne aaj ke ipl maich mein csk ko chaar viket se haraaya","favorited":false,"retweeted":false,"lang":"hi","timestamp_ms":"1606755809325"}
Partition: 2
Offset: 3