Setting consumer partition

Hi I have a data source which write on a meteodata Kafka topic with 3 partitions (replica factor 1, I have only 1 broker).
I need to create a consumer group which read data from the topic meteodata and from partition with id 0 and write these data on a database.
How can I set with a python client the partition where consumer reads data? In the other 2 partition data are available?
Any help is apreciated.
g

@GiuseppeR the Confluent provided python client supports the consumer.assign function. Note: When using assign instead of subscribe, you are manually managing the consumer partition assignment and so consumer groups are not relevant. This stack over flow answer describes this well:

Here is a stack overflow answer for the python assign usage:

The API documentation is here:
http://docs.confluent.io/current/clients/confluent-kafka-python/index.html

Good luck

1 Like

Thank you @rick. I try to test 2 consumers of the same groupid:

# Create Consumer instance
consumer_conf = ccloud_lib.pop_schema_registry_params_from_config(conf)
consumer_conf['group.id'] = 'dbsink_consumer'
consumer_conf['auto.offset.reset'] = 'earliest'
consumer = Consumer(consumer_conf)


# Read arguments and configurations and initialize
args = ccloud_lib.parse_args()
config_file = args.config_file
topic = args.topic
conf = ccloud_lib.read_ccloud_config(config_file)

# Create Producer instance
producer_conf = ccloud_lib.pop_schema_registry_params_from_config(conf)
producer = Producer(producer_conf)
# Subscribe to topic
consumer.subscribe([topic])

When the 2 consumers run at the same time, it seems one consumer haven’t data from the partition (I have a topic with 3 partitions hosted on the same broker). So I have a question: Kafka with a topic with 3 partitions writes the same data on the 3 partitions? If Kafka writes the same data on the 3 partitions It seems using consumer.assign is the solution: manually I decide where a consumer takes data…But I have soome doubt if Kafka write the same data on the 3 topics.
Thanks.

Let me try and focus on the producer side of the question.

Partitioning strategy refers to the method that a producer uses to decide which partition to send records to. The strategy is configurable, however, there is a default strategy. Depending on which version Apache Kafka you are using will determine the default strategy. With Kafka 2.4 and later the default strategy is called the Sticky Partitioner. What’s most important to know about this is that the strategy depends a lot on the Key you assign the record you are producing. If you are assigning a Key to the record, the strategy will calculate a hash from the key and use that to determine which partition to send the record to. This is done to attempt to send records of the same key to the same partition over time. If your records do not have a key, the Sticky Partitioner will send records to random partitions (which balances out over time).

Here is a great blog post with the details:

I hope this helps explain the partitioning strategy and that may help you decide if you need to manage your own consumer partition assignment or you can (preferably) use the Consumer Group feature to allow the Consumer Group Coordinator do that work for you.

Hi @rick thank you for your precious help. I’m using Confluent Platform 6.1.0.
I try to use the consumer groups, so each consumer is part of a consumer group: I have 3 consumers in 3 differents consumer groups, I have always 3 partitions of the topic consumed from consumers.
In this manner the systems seems work…but I’m not sure all messages received from my topic are consumed at the same manner from the 3 consumers.

This is a code part of my producer:

# Read arguments and configurations and initialize
args = ccloud_lib.parse_args()
config_file = args.config_file
topic = args.topic
conf = ccloud_lib.read_ccloud_config(config_file)

def acked(err, msg):
    global delivered_records
    """Delivery report handler called on
    successful or failed delivery of message
    """
    if err is not None:
        print("Failed to deliver message: {}".format(err))
    else:
        delivered_records += 1
# Create Producer instance
producer_conf = ccloud_lib.pop_schema_registry_params_from_config(conf)
producer = Producer(producer_conf)

producer.produce(topic, key=record_key, value=record_value, on_delivery=acked)
producer.poll(1)

Is it correct using this approach?
Thanks.

Can you try and elaborate on what you see that leads you to believe there is some kind of issue with your applications?

Hi @rick I suppose Kafka writes different data in the 3 partitions. I try to create a new topic with one partition and 3 different consumers in 3 different consumer groups.
With some experiments, I saw with a topic with 3 partitions and 3 consumers in 3 different consumer groups, one consumer seems to not receive some data (I set an alert if a value is over a threshold) but I’m sure this threshold was passed but I haven’t received any alert.
Thanks.
Thanks.

@GiuseppeR you could try further debugging with kcat a very clean and simple command line utility for Kafka. Here is an example I used to look at some data in a topic with multiple partitions.

kcat -F <my-config-file> -C -t six-parts -o beginning -f "%t:%p:%o\n"

Here i’m priting everything from the topic six-parts and for each record i’m printing out the topic, partition, and offset. More details on how to use this on the README in the GitHub repository linked above.