The basic rule is that you can scale up to the number of Kafka partitions. If you set spark.executor.cores
greater than the number of partitions, some of the threads will be idle. If it's less than the number of partitions, Spark will have threads read from one partition then the other. So:
2 partitions, 1 executor: reads from one partition then then other. (I am not sure how Spark decides how much to read from each before switching)
2p, 2c: parallel execution
1p, 2c: one thread is idle
For case #1, note that having more partitions than executors is OK since it allows you to scale out later without having to re-partition. The trick is to make sure that your partitions are evenly divisible by the number of executors. Spark has to process all the partitions before passing data onto the next step in the pipeline. So, if you have 'remainder' partitions, this can slow down processing. For example, 5 partitions and 4 threads => processing takes the time of 2 partitions - 4 at once then one thread running the 5th partition by itself.
Also note that you may also see better processing throughput if you keep the number of partitions / RDDs the same throughout the pipeline by explicitly setting the number of data partitions in functions like reduceByKey()
.
This post already has answers, but I am adding my view with a few pictures from Kafka Definitive Guide
Before answering the questions, let's look at an overview of producer components:
1. When a producer is producing a message - It will specify the topic it wants to send the message to, is that right? Does it care about partitions?
The producer will decide target partition to place any message, depending on:
- Partition id, if it's specified within the message
- key % num partitions, if no partition id is mentioned
- Round robin if neither partition id nor message key is available in the message means only the value is available
2. When a subscriber is running - Does it specify its group id so that it can be part of a cluster of consumers of the same topic or several topics that this group of consumers is interested in?
You should always configure group.id unless you are using the simple assignment API and you don’t need to store offsets in Kafka. It will not be a part of any group. source
3. Does each consumer group have a corresponding partition on the broker or does each consumer have one?
In one consumer group, each partition will be processed by one consumer only. These are the possible scenarios
- Number of consumers is less than number of topic partitions then multiple partitions can be assigned to one of the consumers in the group
- Number of consumers same as number of topic partitions, then partition and consumer mapping can be like below,
- Number of consumers is higher than number of topic partitions, then partition and consumer mapping can be as seen below, Not effective, check Consumer 5
4. As the partitions created by the broker, therefore not a concern for the consumers?
Consumer should be aware of the number of partitions, as was discussed in question 3.
5. Since this is a queue with an offset for each partition, is it the responsibility of the consumer to specify which messages it wants to read? Does it need to save its state?
Kafka(to be specific Group Coordinator) takes care of the offset state by producing a message to an internal __consumer_offsets topic, this behavior can be configurable to manual as well by setting enable.auto.commit
to false
. In that case consumer.commitSync()
and consumer.commitAsync()
can be helpful for managing offset.
More about Group Coordinator:
- It's one of the elected brokers in the cluster from Kafka server side.
- Consumers interact with the Group Coordinator for offset commits and fetch requests.
- Consumer sends periodic heartbeats to Group Coordinator.
6. What happens when a message is deleted from the queue? - For example, The retention was for 3 hours, then the time passes, how is the offset being handled on both sides?
If any consumer starts after the retention period, messages will be consumed as per auto.offset.reset
configuration which could be latest/earliest
. technically it's latest
(start processing new messages) because all the messages got expired by that time and retention is topic-level configuration.
Best Answer
I made the following observations, in case its helpful for someone:
Creating multiple streams would help in two ways: 1. You don't need to apply the filter operation to process different topics differently. 2. You can read multiple streams in parallel (as opposed to one by one in case of single stream). To do so, there is an undocumented config parameter
spark.streaming.concurrentJobs*
. So, I decided to create multiple streams.