Apache Kafka Series | Session 10 | Consumer re-balancing

We’ve already created a topic namely tweet, with 3 partitions and 1 replication-factor. Also this consumer is a part of the consumer group, kafka-tweet-group. Suppose if we run a java consumer, let’s check what Kafka writes to the console. We’re using the same console, that we’ve used in our last example.

1. [Consumer clientId=consumer-kafka-tweet-group-1, groupId=kafka-tweet-group] (Re-)joining group
2. [Consumer clientId=consumer-kafka-tweet-group-1, groupId=kafka-tweet-group] Finished assignment for group at generation 30: {consumer-kafka-tweet-group-1-0049dca3-c916-46d2-8da4-6422fe17ed70=Assignment(partitions=[tweet-0, tweet-1, tweet-2])}
3. [Consumer clientId=consumer-kafka-tweet-group-1, groupId=kafka-tweet-group] Successfully joined group with generation 30
4. [Consumer clientId=consumer-kafka-tweet-group-1, groupId=kafka-tweet-group] Notifying assignor about the new Assignment(partitions=[tweet-0, tweet-1, tweet-2])
5. [Consumer clientId=consumer-kafka-tweet-group-1, groupId=kafka-tweet-group] Adding newly assigned partitions: tweet-0, tweet-2, tweet-1

It’s clear from the above log that this consumer joining / rejoining to the consumer group kafka-tweet-group. Also it’s given that this consumer will consume all messages, that are being sent to the partitions [tweet-0, tweet-1, tweet-2].

Cool, now let’s run one more consumer and see what will happen at the console. Keep the existing consumer in running state.

If you are facing issues with running the same class in parallel in IntelliJ, then follow these steps:

1. Select the class, that you wanted to make run in parallel, from the configuration drop-down on the top right on the IDE.
2. Click Run menu
3. Select Edit Configurations option.
4. On the pop-up window, select "Allow Parallel run" option.
5. Click OK. Now the configuration is saved.

We’ve received following logs in the console for consumer-1 and consumer-2

consumer-1
1. (Re-)joining group
2. [Consumer clientId=consumer-kafka-tweet-group-1, groupId=kafka-tweet-group] Finished assignment for group at generation 31: {consumer-kafka-tweet-group-1-0049dca3-c916-46d2-8da4-6422fe17ed70=Assignment(partitions=[tweet-0, tweet-1]), consumer-kafka-tweet-group-1-9b8b10aa-7174-4af3-ae3e-6ad4964aeca4=Assignment(partitions=[tweet-2])}
3. [Consumer clientId=consumer-kafka-tweet-group-1, groupId=kafka-tweet-group] Successfully joined group with generation 31
4. [Consumer clientId=consumer-kafka-tweet-group-1, groupId=kafka-tweet-group] Notifying assignor about the new Assignment(partitions=[tweet-0, tweet-1])
5. [Consumer clientId=consumer-kafka-tweet-group-1, groupId=kafka-tweet-group] Adding newly assigned partitions: tweet-0, tweet-1
consumer-2
1. (Re-)joining group
2. [Consumer clientId=consumer-kafka-tweet-group-1, groupId=kafka-tweet-group] Successfully joined group with generation 31
3. [Consumer clientId=consumer-kafka-tweet-group-1, groupId=kafka-tweet-group] Notifying assignor about the new Assignment(partitions=[tweet-2])
4. [Consumer clientId=consumer-kafka-tweet-group-1, groupId=kafka-tweet-group] Adding newly assigned partitions: tweet-2

It’s clear that the joining / re-joining is happening whenever a new consumer joins to the same consumer group. This is called consumer re-balancing. And it’s also comprehensible that consumer-1 will consume all messages, that are sent to the partitions [tweet-0, tweet-1], and consumer-2 will consume messages that are sent to [tweet-2].

Let’s run one more instance of the consumer by keeping the existing instances in the running state. This is what we received for each consumer instance.

consumer-1
1. (Re-)joining group
2. [Consumer clientId=consumer-kafka-tweet-group-1, groupId=kafka-tweet-group] Finished assignment for group at generation 32: {consumer-kafka-tweet-group-1-0049dca3-c916-46d2-8da4-6422fe17ed70=Assignment(partitions=[tweet-0]), consumer-kafka-tweet-group-1-21a062ff-aeaf-46d3-ad06-fc3334d4c44e=Assignment(partitions=[tweet-1]), consumer-kafka-tweet-group-1-9b8b10aa-7174-4af3-ae3e-6ad4964aeca4=Assignment(partitions=[tweet-2])}
3. [Consumer clientId=consumer-kafka-tweet-group-1, groupId=kafka-tweet-group] Successfully joined group with generation 32
4. [Consumer clientId=consumer-kafka-tweet-group-1, groupId=kafka-tweet-group] Notifying assignor about the new Assignment(partitions=[tweet-0])
5. [Consumer clientId=consumer-kafka-tweet-group-1, groupId=kafka-tweet-group] Adding newly assigned partitions: tweet-0
consumer-2
1. (Re-)joining group
2. [Consumer clientId=consumer-kafka-tweet-group-1, groupId=kafka-tweet-group] Successfully joined group with generation 32
3. [Consumer clientId=consumer-kafka-tweet-group-1, groupId=kafka-tweet-group] Notifying assignor about the new Assignment(partitions=[tweet-2])
4. [Consumer clientId=consumer-kafka-tweet-group-1, groupId=kafka-tweet-group] Adding newly assigned partitions: tweet-2
consumer-3
1. (Re-)joining group
2. [Consumer clientId=consumer-kafka-tweet-group-1, groupId=kafka-tweet-group] Successfully joined group with generation 32
3. [Consumer clientId=consumer-kafka-tweet-group-1, groupId=kafka-tweet-group] Notifying assignor about the new Assignment(partitions=[tweet-1])
4. [Consumer clientId=consumer-kafka-tweet-group-1, groupId=kafka-tweet-group] Adding newly assigned partitions: tweet-1

Again, the re-balancing happens between the consumers. All the consumers are joining / re-joining to the same group and now each consumer will have their own partition. Consumer-1 will consume from [tweet-0], consumer-2 from [tweet-2] and consumer-3 from [tweet-1].

Now, we can try one more scenario. We’ve 3 partitions for the topic and all the three partitions were distributed evenly among 3 consumers. Now, let’s find out what will happen if we run one more consumer.

consumer-1
1. (Re-)joining group
2. [Consumer clientId=consumer-kafka-tweet-group-1, groupId=kafka-tweet-group] Finished assignment for group at generation 33: {consumer-kafka-tweet-group-1-0049dca3-c916-46d2-8da4-6422fe17ed70=Assignment(partitions=[tweet-0]), consumer-kafka-tweet-group-1-21a062ff-aeaf-46d3-ad06-fc3334d4c44e=Assignment(partitions=[tweet-2]),  consumer-kafka-tweet-group-1-9b8b10aa-7174-4af3-ae3e-6ad4964aeca4=Assignment(partitions=[]), consumer-kafka-tweet-group-1-00563a69-8d00-4318-af7c-8128f93b3ea9=Assignment(partitions=[tweet-1])}
3. [Consumer clientId=consumer-kafka-tweet-group-1, groupId=kafka-tweet-group] Successfully joined group with generation 33
4. [Consumer clientId=consumer-kafka-tweet-group-1, groupId=kafka-tweet-group] Notifying assignor about the new Assignment(partitions=[tweet-0])
5. [Consumer clientId=consumer-kafka-tweet-group-1, groupId=kafka-tweet-group] Adding newly assigned partitions: tweet-0
consumer-2
1. (Re-)joining group
2. [Consumer clientId=consumer-kafka-tweet-group-1, groupId=kafka-tweet-group] Successfully joined group with generation 33
3. [Consumer clientId=consumer-kafka-tweet-group-1, groupId=kafka-tweet-group] Notifying assignor about the new Assignment(partitions=[tweet-2])
4. [Consumer clientId=consumer-kafka-tweet-group-1, groupId=kafka-tweet-group] Adding newly assigned partitions: tweet-2
consumer-3

1. (Re-)joining group
2. [Consumer clientId=consumer-kafka-tweet-group-1, groupId=kafka-tweet-group] Successfully joined group with generation 33
3. [Consumer clientId=consumer-kafka-tweet-group-1, groupId=kafka-tweet-group] Notifying assignor about the new Assignment(partitions=)
4. [Consumer clientId=consumer-kafka-tweet-group-1, groupId=kafka-tweet-group] Adding newly assigned partitions: 
consumer-4

1. (Re-)joining group
2. [Consumer clientId=consumer-kafka-tweet-group-1, groupId=kafka-tweet-group] Successfully joined group with generation 33
3. [Consumer clientId=consumer-kafka-tweet-group-1, groupId=kafka-tweet-group] Notifying assignor about the new Assignment(partitions=[tweet-1])
4. [Consumer clientId=consumer-kafka-tweet-group-1, groupId=kafka-tweet-group] Adding newly assigned partitions: tweet-1

This is quite interesting, the joining / re-joining process and re-balancing happened again and 3 partitions were distributed among 4 consumers. Consumer-1 will consume from [tweet-0], consumer-2 from [tweet-2], consumer-4 from [tweet-1], but consumer-3 will not consume any messages as no partitions allocated for that. If we start more and more consumers, the same thing will going to happen. Some consumers will be allocated with various partitions, and some will not.