Rdkafka producer performance limits

Hi,
I’m quite new to Kafka.
I’m using very strong devices for both producer and kafka server and trying to get to the maximum performance for producing records.
My setup:

  • Producer sits on 2 sockets device with 2 CPUs, 48 cores total of Intel(R) Xeon(R) Gold 6248R CPU @ 3.00GHz, 264 RAM.
  • Kafka server sits on 2 sockets device with 2 CPUs, 128 cores total of Intel(R) Xeon(R) Gold 6338N CPU @ 2.20GHz, 264 RAM.
  • I have a cable of 100Gbps connected directly from the producer to the kafka server.

I’m using both rdkafka_performance application of rdkafka and my own code that I wrote using rdkafka c library.

I read couple of articles regarding improving producer performance, so my settings are, the bigger batch size and longest linger time, as suggested in those articles:

  • batch.num.messages: 1000000
  • linger.ms: 1000
  • queue.buffering.max.kbytes: 2147483647
  • message.max.bytes: 1000000000
  • compression.codec: snappy

More information:

  • Actual 1 message size is around 90 bytes
  • Specifically in my application I use rd_kafka_produce_batch() (as opposed to rdkafka_performance which doesn’t use that).
  • I also have cpu affinity for utilizing a core privately.

The kafka server is actually a docker-compose which starts 1 kafka container and 1 zookeeper container.
I added these to the docker-compose env variables in order to make it utilize more threads:

      - KAFKA_CFG_BACKGROUND_THREADS=30
      - KAFKA_CFG_NUM_NETWORK_THREADS=30
      - KAFKA_CFG_NUM_IO_THREADS=30

I then created a topic mytopic with 10 partitions:

kafka-topics.sh --create --topic mytopic --zookeeper zookeeper:2181 --partitions 10 --replication-factor 1 --config max.message.bytes=52428800 --config retention.bytes=107374182400 --config delete.retention.ms=20000 ```

When I try to run both my application or rdkafka_performance, 1 thread gets about to 1200Mbps.
Then in my application I create 5 threads, each writes to a different partition, I get to ~4000-5000Mbps.
When increasing to 10 threads, each writes to a different partition, I get to ~7000-8000Mbps which is not stable.
BTW, using wireshark I can see the MTU may reach to ~65000 because there’s no limitation in this setup due to a direct cable.

My questions are:

  • Do I do anything wrong? Is 1200Mbps the maximum for 1 thread on this setup? Does it seems right?
  • Can I somehow increase bandwidth?
    – Different settings for the producer
    – Different settings for the kafka server

Any help will be grateful.
Please let me know if you need any further information.

Thanks

As with all performance testing, I would need to see the client stats to get an idea of what the actual bottleneck is. And as importantly you don’t state what your goals are. perf testing just to perf test is a rabbit hole that doesn’t tend to be beneficial. A better testing statement would be “I need to produce X number of messages or bytes, at Y latency. And my cluster size is N nodes”. Latency and throughput are almost always at odds when doing perf testing. Those metrics tend to be solid things, the rest of the metrics can be variables you adjust while testing.

I’m going to assume you meant a 10GB nic and not 100GB(if you do have 100GB , NICE TOY!). If you’re getting 800MB/sec and bursting to 1200Mbps, you’re PROBABLY hitting the limitations of the NIC or the ability of the NIC to get data from the OS any faster.

I can’t really suggest anything to tune as I don’t know your goals, or what bottleneck you’re hitting. it’s worth saying that batch.size and linger.ms are your two big levers to toy with. And will often inversely effect latency and throughput.

Hi @mitchell-h , thanks for the reply!

  • Client stats from rdkafka_performance, do you mean this?
# ./rdkafka_performance -P -D -t mymetadata -b 192.168.201.154 -m "This is my record. And I'm about to check rdkafka performance within this application." -z snappy 
% Sending messages of size 86 bytes
%5|1635274283.928|CONFWARN|rdkafka#producer-1| [thrd:app]: No `bootstrap.servers` configured: client will not be able to connect to Kafka cluster
% 1276714 messages produced (109797404 bytes), 1070690 delivered (offset 501516, 0 failed) in 1000ms: 1070690 msgs/s and 92.08 MB/s, 0 produce failures, 206024 in queue, snappy compression
...
^C% 10322365 messages produced (892315446 bytes), 10322367 delivered (offset 40550745, 0 failed) in 8329ms: 1239214 msgs/s and 106.57 MB/s, 0 produce failures, 0 in queue, snappy compression

  • Currently my goal is to reach 10Gbps (data, not records). Thus latency is less matter in this case.
    Because that’s just a POC, I don’t really know what my cluster is - it will be whatever it should be in order to support 10Gbps.
    BTW how does it matter? Bigger cluster is clumsier because each record need to be replicated (for high availability) thus it takes more time for committing a record?
  • Actually I meant 100Gbps :open_mouth:
    When using 1 thread I reach to quite solid 1200Mbps (bit, not Byte). I don’t think that’s the NIC / OS limitation because the proof is that 10 threads do ~8-7Gbps.
  • Another issue I didn’t say before. I use perf to check the client and see what’s the bottleneck (if it’s in the rdkafka client). I can see that most of the stuff goes on mutex:

Thank you very much.

If you’re stuck in mutex this matters a lot. Not only does the client have the push the data to the cluster. The cluster has to accept the data, and replicate it. If the kafka cluster gets busy it can’t do the accepting and your threads will stay locked(doing no work) until the messages can be pushed to the cluster.

with a replication factor of 1 you won’t have to replicate the data, but a broker has to receive and store the data.

Also noticed you only have 10 partitions. Partitions can be thought of as a unit of parallelism in kafka. More partitions means more places to send data(as long as there’s cluster capacity & client capacity).

So you’re saying, increase partitions and run more threads? thread per partition?

You meant the perf picture I attached? the mutex from there?
That a mutex means we’re waiting for an ack from the kafka server?
I also tried to ran with acks == 0 without any performance improvement.

that would be a good start. But the problem is if your brokers run out of capacity it won’t matter. You can’t REALLY perf test a client in isolation. clients send, broker receive. If either side slows down the other will too, it’s a benefit and a curse of kafka.

without having full TRACE logs from the clients, that’s my assumption. There can only be so many messages in flight before the client sending thread simply locks its self and refuses to start any more requests or is too busy serving other requests that it can’t hand anymore send requests.

if you’re not getting a performance improvement with acks=0. Something is at its limits.

  • OK, I’ll try increasing partitions and producer threads
    – Looks like 20 partitions, 20 threads writing to them, gives ~9Gbps, not stable.
  • That’s why I introduced the kafka server device, which has lots of cores and RAM. The only thing I’m thinking about is, disk writing speed that may be slow? May it slow down the kafka server? Is there a way to check it out? I tried using JMX and jconsole but didn’t see a metric of that.
    – Or, maybe there are kafka server configurations I can apply that might speed up the server?
  • Any assumption what? Or how it can be tested?

Do you mean running the client with -v -d all ?
That’s some of the output, I can’t upload a file :confused:

%7|1635317819.565|INIT|rdkafka#producer-1| [thrd:app]: librdkafka v1.5.0 (0x10500ff) rdkafka#producer-1 initialized (builtin.features snappy,ssl,sasl,regex,lz4,sasl_plain,sasl_scram,plugins,sasl_oauthbearer, GCC GXX INSTALL GNULD LIBDL PLUGINS SSL HDRHISTOGRAM SYSLOG SNAPPY SOCKEM SASL_SCRAM SASL_OAUTHBEARER CRC32C_HW, debug 0xfffff)
%5|1635317819.565|CONFWARN|rdkafka#producer-1| [thrd:app]: No `bootstrap.servers` configured: client will not be able to connect to Kafka cluster
%7|1635317819.565|WAKEUPFD|rdkafka#producer-1| [thrd:app]: 1.1.1.2:9092/bootstrap: Enabled low-latency ops queue wake-ups
%7|1635317819.565|BROKER|rdkafka#producer-1| [thrd:app]: 1.1.1.2:9092/bootstrap: Added new broker with NodeId -1
%7|1635317819.565|CONNECT|rdkafka#producer-1| [thrd:app]: 1.1.1.2:9092/bootstrap: Selected for cluster connection: bootstrap servers added (broker has 0 connection attempt(s))
%7|1635317819.565|TOPIC|rdkafka#producer-1| [thrd:app]: New local topic: mymetadata
%7|1635317819.565|TOPPARNEW|rdkafka#producer-1| [thrd:app]: NEW mymetadata [-1] 0x55b6b5931b40 (at rd_kafka_topic_new0:441)
%7|1635317819.565|CONNECT|rdkafka#producer-1| [thrd:app]: Not selecting any broker for cluster connection: still suppressed for 49ms: leader query
%7|1635317819.565|METADATA|rdkafka#producer-1| [thrd:app]: Skipping metadata refresh of 1 topic(s): no usable brokers
%7|1635317819.565|BRKMAIN|rdkafka#producer-1| [thrd:1.1.1.2:9092/bootstrap]: 1.1.1.2:9092/bootstrap: Enter main broker thread
%7|1635317819.565|BRKMAIN|rdkafka#producer-1| [thrd::0/internal]: :0/internal: Enter main broker thread
%7|1635317819.565|CONNECT|rdkafka#producer-1| [thrd:1.1.1.2:9092/bootstrap]: 1.1.1.2:9092/bootstrap: Received CONNECT op
%7|1635317819.565|STATE|rdkafka#producer-1| [thrd:1.1.1.2:9092/bootstrap]: 1.1.1.2:9092/bootstrap: Broker changed state INIT -> TRY_CONNECT
%7|1635317819.565|BROADCAST|rdkafka#producer-1| [thrd:1.1.1.2:9092/bootstrap]: Broadcasting state change
%7|1635317819.565|CONNECT|rdkafka#producer-1| [thrd:1.1.1.2:9092/bootstrap]: 1.1.1.2:9092/bootstrap: broker in state TRY_CONNECT connecting
%7|1635317819.565|STATE|rdkafka#producer-1| [thrd:1.1.1.2:9092/bootstrap]: 1.1.1.2:9092/bootstrap: Broker changed state TRY_CONNECT -> CONNECT
%7|1635317819.565|BROADCAST|rdkafka#producer-1| [thrd:1.1.1.2:9092/bootstrap]: Broadcasting state change
%7|1635317819.565|CONNECT|rdkafka#producer-1| [thrd:1.1.1.2:9092/bootstrap]: 1.1.1.2:9092/bootstrap: Connecting to ipv4#1.1.1.2:9092 (plaintext) with socket 7
%7|1635317819.566|CONNECT|rdkafka#producer-1| [thrd:1.1.1.2:9092/bootstrap]: 1.1.1.2:9092/bootstrap: Connected to ipv4#1.1.1.2:9092
%7|1635317819.566|CONNECTED|rdkafka#producer-1| [thrd:1.1.1.2:9092/bootstrap]: 1.1.1.2:9092/bootstrap: Connected (#1)
%7|1635317819.566|FEATURE|rdkafka#producer-1| [thrd:1.1.1.2:9092/bootstrap]: 1.1.1.2:9092/bootstrap: Updated enabled protocol features +ApiVersion to ApiVersion
%7|1635317819.566|STATE|rdkafka#producer-1| [thrd:1.1.1.2:9092/bootstrap]: 1.1.1.2:9092/bootstrap: Broker changed state CONNECT -> APIVERSION_QUERY
%7|1635317819.566|BROADCAST|rdkafka#producer-1| [thrd:1.1.1.2:9092/bootstrap]: Broadcasting state change
%7|1635317819.566|SEND|rdkafka#producer-1| [thrd:1.1.1.2:9092/bootstrap]: 1.1.1.2:9092/bootstrap: Sent ApiVersionRequest (v3, 40 bytes @ 0, CorrId 1)
%7|1635317819.571|RECV|rdkafka#producer-1| [thrd:1.1.1.2:9092/bootstrap]: 1.1.1.2:9092/bootstrap: Received ApiVersionResponse (v3, 410 bytes, CorrId 1, rtt 5.66ms)
%7|1635317819.571|APIVERSION|rdkafka#producer-1| [thrd:1.1.1.2:9092/bootstrap]: 1.1.1.2:9092/bootstrap: Broker API support:
%7|1635317819.571|APIVERSION|rdkafka#producer-1| [thrd:1.1.1.2:9092/bootstrap]: 1.1.1.2:9092/bootstrap:   ApiKey Produce (0) Versions 0..9
%7|1635317819.571|APIVERSION|rdkafka#producer-1| [thrd:1.1.1.2:9092/bootstrap]: 1.1.1.2:9092/bootstrap:   ApiKey Fetch (1) Versions 0..12
%7|1635317819.571|APIVERSION|rdkafka#producer-1| [thrd:1.1.1.2:9092/bootstrap]: 1.1.1.2:9092/bootstrap:   ApiKey Offset (2) Versions 0..6
%7|1635317819.571|APIVERSION|rdkafka#producer-1| [thrd:1.1.1.2:9092/bootstrap]: 1.1.1.2:9092/bootstrap:   ApiKey Metadata (3) Versions 0..11
%7|1635317819.571|APIVERSION|rdkafka#producer-1| [thrd:1.1.1.2:9092/bootstrap]: 1.1.1.2:9092/bootstrap:   ApiKey LeaderAndIsr (4) Versions 0..5
%7|1635317819.571|APIVERSION|rdkafka#producer-1| [thrd:1.1.1.2:9092/bootstrap]: 1.1.1.2:9092/bootstrap:   ApiKey StopReplica (5) Versions 0..3
%7|1635317819.571|APIVERSION|rdkafka#producer-1| [thrd:1.1.1.2:9092/bootstrap]: 1.1.1.2:9092/bootstrap:   ApiKey UpdateMetadata (6) Versions 0..7
%7|1635317819.571|APIVERSION|rdkafka#producer-1| [thrd:1.1.1.2:9092/bootstrap]: 1.1.1.2:9092/bootstrap:   ApiKey ControlledShutdown (7) Versions 0..3
%7|1635317819.571|APIVERSION|rdkafka#producer-1| [thrd:1.1.1.2:9092/bootstrap]: 1.1.1.2:9092/bootstrap:   ApiKey OffsetCommit (8) Versions 0..8
%7|1635317819.571|APIVERSION|rdkafka#producer-1| [thrd:1.1.1.2:9092/bootstrap]: 1.1.1.2:9092/bootstrap:   ApiKey OffsetFetch (9) Versions 0..7
%7|1635317819.571|APIVERSION|rdkafka#producer-1| [thrd:1.1.1.2:9092/bootstrap]: 1.1.1.2:9092/bootstrap:   ApiKey FindCoordinator (10) Versions 0..3
%7|1635317819.571|APIVERSION|rdkafka#producer-1| [thrd:1.1.1.2:9092/bootstrap]: 1.1.1.2:9092/bootstrap:   ApiKey JoinGroup (11) Versions 0..7
%7|1635317819.571|APIVERSION|rdkafka#producer-1| [thrd:1.1.1.2:9092/bootstrap]: 1.1.1.2:9092/bootstrap:   ApiKey Heartbeat (12) Versions 0..4
%7|1635317819.571|APIVERSION|rdkafka#producer-1| [thrd:1.1.1.2:9092/bootstrap]: 1.1.1.2:9092/bootstrap:   ApiKey LeaveGroup (13) Versions 0..4
%7|1635317819.571|APIVERSION|rdkafka#producer-1| [thrd:1.1.1.2:9092/bootstrap]: 1.1.1.2:9092/bootstrap:   ApiKey SyncGroup (14) Versions 0..5
%7|1635317819.571|APIVERSION|rdkafka#producer-1| [thrd:1.1.1.2:9092/bootstrap]: 1.1.1.2:9092/bootstrap:   ApiKey DescribeGroups (15) Versions 0..5
%7|1635317819.571|APIVERSION|rdkafka#producer-1| [thrd:1.1.1.2:9092/bootstrap]: 1.1.1.2:9092/bootstrap:   ApiKey ListGroups (16) Versions 0..4
%7|1635317819.571|APIVERSION|rdkafka#producer-1| [thrd:1.1.1.2:9092/bootstrap]: 1.1.1.2:9092/bootstrap:   ApiKey SaslHandshake (17) Versions 0..1
%7|1635317819.571|APIVERSION|rdkafka#producer-1| [thrd:1.1.1.2:9092/bootstrap]: 1.1.1.2:9092/bootstrap:   ApiKey ApiVersion (18) Versions 0..3
%7|1635317819.571|APIVERSION|rdkafka#producer-1| [thrd:1.1.1.2:9092/bootstrap]: 1.1.1.2:9092/bootstrap:   ApiKey CreateTopics (19) Versions 0..7
%7|1635317819.571|APIVERSION|rdkafka#producer-1| [thrd:1.1.1.2:9092/bootstrap]: 1.1.1.2:9092/bootstrap:   ApiKey DeleteTopics (20) Versions 0..6
%7|1635317819.571|APIVERSION|rdkafka#producer-1| [thrd:1.1.1.2:9092/bootstrap]: 1.1.1.2:9092/bootstrap:   ApiKey DeleteRecords (21) Versions 0..2
%7|1635317819.571|APIVERSION|rdkafka#producer-1| [thrd:1.1.1.2:9092/bootstrap]: 1.1.1.2:9092/bootstrap:   ApiKey InitProducerId (22) Versions 0..4
%7|1635317819.572|APIVERSION|rdkafka#producer-1| [thrd:1.1.1.2:9092/bootstrap]: 1.1.1.2:9092/bootstrap:   ApiKey OffsetForLeaderEpoch (23) Versions 0..4
%7|1635317819.572|APIVERSION|rdkafka#producer-1| [thrd:1.1.1.2:9092/bootstrap]: 1.1.1.2:9092/bootstrap:   ApiKey AddPartitionsToTxn (24) Versions 0..3
%7|1635317819.572|APIVERSION|rdkafka#producer-1| [thrd:1.1.1.2:9092/bootstrap]: 1.1.1.2:9092/bootstrap:   ApiKey AddOffsetsToTxn (25) Versions 0..3
%7|1635317819.572|APIVERSION|rdkafka#producer-1| [thrd:1.1.1.2:9092/bootstrap]: 1.1.1.2:9092/bootstrap:   ApiKey EndTxn (26) Versions 0..3
%7|1635317819.572|APIVERSION|rdkafka#producer-1| [thrd:1.1.1.2:9092/bootstrap]: 1.1.1.2:9092/bootstrap:   ApiKey WriteTxnMarkers (27) Versions 0..1
%7|1635317819.572|APIVERSION|rdkafka#producer-1| [thrd:1.1.1.2:9092/bootstrap]: 1.1.1.2:9092/bootstrap:   ApiKey TxnOffsetCommit (28) Versions 0..3
%7|1635317819.572|APIVERSION|rdkafka#producer-1| [thrd:1.1.1.2:9092/bootstrap]: 1.1.1.2:9092/bootstrap:   ApiKey DescribeAcls (29) Versions 0..2
%7|1635317819.572|APIVERSION|rdkafka#producer-1| [thrd:1.1.1.2:9092/bootstrap]: 1.1.1.2:9092/bootstrap:   ApiKey CreateAcls (30) Versions 0..2
%7|1635317819.572|APIVERSION|rdkafka#producer-1| [thrd:1.1.1.2:9092/bootstrap]: 1.1.1.2:9092/bootstrap:   ApiKey DeleteAcls (31) Versions 0..2
%7|1635317819.572|APIVERSION|rdkafka#producer-1| [thrd:1.1.1.2:9092/bootstrap]: 1.1.1.2:9092/bootstrap:   ApiKey DescribeConfigs (32) Versions 0..4
%7|1635317819.572|APIVERSION|rdkafka#producer-1| [thrd:1.1.1.2:9092/bootstrap]: 1.1.1.2:9092/bootstrap:   ApiKey AlterConfigs (33) Versions 0..2
%7|1635317819.572|APIVERSION|rdkafka#producer-1| [thrd:1.1.1.2:9092/bootstrap]: 1.1.1.2:9092/bootstrap:   ApiKey AlterReplicaLogDirs (34) Versions 0..2
%7|1635317819.572|APIVERSION|rdkafka#producer-1| [thrd:1.1.1.2:9092/bootstrap]: 1.1.1.2:9092/bootstrap:   ApiKey DescribeLogDirs (35) Versions 0..2
%7|1635317819.572|APIVERSION|rdkafka#producer-1| [thrd:1.1.1.2:9092/bootstrap]: 1.1.1.2:9092/bootstrap:   ApiKey SaslAuthenticate (36) Versions 0..2
%7|1635317819.572|APIVERSION|rdkafka#producer-1| [thrd:1.1.1.2:9092/bootstrap]: 1.1.1.2:9092/bootstrap:   ApiKey CreatePartitions (37) Versions 0..3
%7|1635317819.572|APIVERSION|rdkafka#producer-1| [thrd:1.1.1.2:9092/bootstrap]: 1.1.1.2:9092/bootstrap:   ApiKey CreateDelegationToken (38) Versions 0..2
%7|1635317819.572|APIVERSION|rdkafka#producer-1| [thrd:1.1.1.2:9092/bootstrap]: 1.1.1.2:9092/bootstrap:   ApiKey RenewDelegationToken (39) Versions 0..2
%7|1635317819.572|APIVERSION|rdkafka#producer-1| [thrd:1.1.1.2:9092/bootstrap]: 1.1.1.2:9092/bootstrap:   ApiKey ExpireDelegationToken (40) Versions 0..2
%7|1635317819.572|APIVERSION|rdkafka#producer-1| [thrd:1.1.1.2:9092/bootstrap]: 1.1.1.2:9092/bootstrap:   ApiKey DescribeDelegationToken (41) Versions 0..2
%7|1635317819.572|APIVERSION|rdkafka#producer-1| [thrd:1.1.1.2:9092/bootstrap]: 1.1.1.2:9092/bootstrap:   ApiKey DeleteGroups (42) Versions 0..2
%7|1635317819.572|APIVERSION|rdkafka#producer-1| [thrd:1.1.1.2:9092/bootstrap]: 1.1.1.2:9092/bootstrap:   ApiKey Unknown-43? (43) Versions 0..2
%7|1635317819.572|APIVERSION|rdkafka#producer-1| [thrd:1.1.1.2:9092/bootstrap]: 1.1.1.2:9092/bootstrap:   ApiKey Unknown-44? (44) Versions 0..1
%7|1635317819.572|APIVERSION|rdkafka#producer-1| [thrd:1.1.1.2:9092/bootstrap]: 1.1.1.2:9092/bootstrap:   ApiKey Unknown-45? (45) Versions 0..0
%7|1635317819.572|APIVERSION|rdkafka#producer-1| [thrd:1.1.1.2:9092/bootstrap]: 1.1.1.2:9092/bootstrap:   ApiKey Unknown-46? (46) Versions 0..0
%7|1635317819.572|APIVERSION|rdkafka#producer-1| [thrd:1.1.1.2:9092/bootstrap]: 1.1.1.2:9092/bootstrap:   ApiKey Unknown-47? (47) Versions 0..0
%7|1635317819.572|APIVERSION|rdkafka#producer-1| [thrd:1.1.1.2:9092/bootstrap]: 1.1.1.2:9092/bootstrap:   ApiKey Unknown-48? (48) Versions 0..1
%7|1635317819.572|APIVERSION|rdkafka#producer-1| [thrd:1.1.1.2:9092/bootstrap]: 1.1.1.2:9092/bootstrap:   ApiKey Unknown-49? (49) Versions 0..1
%7|1635317819.572|APIVERSION|rdkafka#producer-1| [thrd:1.1.1.2:9092/bootstrap]: 1.1.1.2:9092/bootstrap:   ApiKey Unknown-50? (50) Versions 0..0
%7|1635317819.572|APIVERSION|rdkafka#producer-1| [thrd:1.1.1.2:9092/bootstrap]: 1.1.1.2:9092/bootstrap:   ApiKey Unknown-51? (51) Versions 0..0
%7|1635317819.572|APIVERSION|rdkafka#producer-1| [thrd:1.1.1.2:9092/bootstrap]: 1.1.1.2:9092/bootstrap:   ApiKey Unknown-56? (56) Versions 0..0
%7|1635317819.572|APIVERSION|rdkafka#producer-1| [thrd:1.1.1.2:9092/bootstrap]: 1.1.1.2:9092/bootstrap:   ApiKey Unknown-57? (57) Versions 0..0
%7|1635317819.572|APIVERSION|rdkafka#producer-1| [thrd:1.1.1.2:9092/bootstrap]: 1.1.1.2:9092/bootstrap:   ApiKey Unknown-60? (60) Versions 0..0
%7|1635317819.572|APIVERSION|rdkafka#producer-1| [thrd:1.1.1.2:9092/bootstrap]: 1.1.1.2:9092/bootstrap:   ApiKey Unknown-61? (61) Versions 0..0
%7|1635317819.572|APIVERSION|rdkafka#producer-1| [thrd:1.1.1.2:9092/bootstrap]: 1.1.1.2:9092/bootstrap:  Feature MsgVer1: Produce (2..2) supported by broker
%7|1635317819.572|APIVERSION|rdkafka#producer-1| [thrd:1.1.1.2:9092/bootstrap]: 1.1.1.2:9092/bootstrap:  Feature MsgVer1: Fetch (2..2) supported by broker
%7|1635317819.572|APIVERSION|rdkafka#producer-1| [thrd:1.1.1.2:9092/bootstrap]: 1.1.1.2:9092/bootstrap: Enabling feature MsgVer1
%7|1635317819.572|APIVERSION|rdkafka#producer-1| [thrd:1.1.1.2:9092/bootstrap]: 1.1.1.2:9092/bootstrap:  Feature MsgVer2: Produce (3..3) supported by broker
%7|1635317819.572|APIVERSION|rdkafka#producer-1| [thrd:1.1.1.2:9092/bootstrap]: 1.1.1.2:9092/bootstrap:  Feature MsgVer2: Fetch (4..4) supported by broker
%7|1635317819.572|APIVERSION|rdkafka#producer-1| [thrd:1.1.1.2:9092/bootstrap]: 1.1.1.2:9092/bootstrap: Enabling feature MsgVer2
%7|1635317819.572|APIVERSION|rdkafka#producer-1| [thrd:1.1.1.2:9092/bootstrap]: 1.1.1.2:9092/bootstrap:  Feature ApiVersion: ApiVersion (0..0) supported by broker
%7|1635317819.572|APIVERSION|rdkafka#producer-1| [thrd:1.1.1.2:9092/bootstrap]: 1.1.1.2:9092/bootstrap: Enabling feature ApiVersion
%7|1635317819.572|APIVERSION|rdkafka#producer-1| [thrd:1.1.1.2:9092/bootstrap]: 1.1.1.2:9092/bootstrap:  Feature BrokerGroupCoordinator: FindCoordinator (0..0) supported by broker
%7|1635317819.572|APIVERSION|rdkafka#producer-1| [thrd:1.1.1.2:9092/bootstrap]: 1.1.1.2:9092/bootstrap: Enabling feature BrokerGroupCoordinator
%7|1635317819.572|APIVERSION|rdkafka#producer-1| [thrd:1.1.1.2:9092/bootstrap]: 1.1.1.2:9092/bootstrap:  Feature BrokerBalancedConsumer: FindCoordinator (0..0) supported by broker
%7|1635317819.572|APIVERSION|rdkafka#producer-1| [thrd:1.1.1.2:9092/bootstrap]: 1.1.1.2:9092/bootstrap:  Feature BrokerBalancedConsumer: OffsetCommit (1..2) supported by broker
%7|1635317819.572|APIVERSION|rdkafka#producer-1| [thrd:1.1.1.2:9092/bootstrap]: 1.1.1.2:9092/bootstrap:  Feature BrokerBalancedConsumer: OffsetFetch (1..1) supported by broker
%7|1635317819.572|APIVERSION|rdkafka#producer-1| [thrd:1.1.1.2:9092/bootstrap]: 1.1.1.2:9092/bootstrap:  Feature BrokerBalancedConsumer: JoinGroup (0..0) supported by broker
%7|1635317819.572|APIVERSION|rdkafka#producer-1| [thrd:1.1.1.2:9092/bootstrap]: 1.1.1.2:9092/bootstrap:  Feature BrokerBalancedConsumer: SyncGroup (0..0) supported by broker
%7|1635317819.572|APIVERSION|rdkafka#producer-1| [thrd:1.1.1.2:9092/bootstrap]: 1.1.1.2:9092/bootstrap:  Feature BrokerBalancedConsumer: Heartbeat (0..0) supported by broker
%7|1635317819.572|APIVERSION|rdkafka#producer-1| [thrd:1.1.1.2:9092/bootstrap]: 1.1.1.2:9092/bootstrap:  Feature BrokerBalancedConsumer: LeaveGroup (0..0) supported by broker
%7|1635317819.572|APIVERSION|rdkafka#producer-1| [thrd:1.1.1.2:9092/bootstrap]: 1.1.1.2:9092/bootstrap: Enabling feature BrokerBalancedConsumer
%7|1635317819.572|APIVERSION|rdkafka#producer-1| [thrd:1.1.1.2:9092/bootstrap]: 1.1.1.2:9092/bootstrap:  Feature ThrottleTime: Produce (1..2) supported by broker
%7|1635317819.572|APIVERSION|rdkafka#producer-1| [thrd:1.1.1.2:9092/bootstrap]: 1.1.1.2:9092/bootstrap:  Feature ThrottleTime: Fetch (1..2) supported by broker
%7|1635317819.572|APIVERSION|rdkafka#producer-1| [thrd:1.1.1.2:9092/bootstrap]: 1.1.1.2:9092/bootstrap: Enabling feature ThrottleTime
%7|1635317819.572|APIVERSION|rdkafka#producer-1| [thrd:1.1.1.2:9092/bootstrap]: 1.1.1.2:9092/bootstrap:  Feature Sasl: JoinGroup (0..0) supported by broker
%7|1635317819.572|APIVERSION|rdkafka#producer-1| [thrd:1.1.1.2:9092/bootstrap]: 1.1.1.2:9092/bootstrap: Enabling feature Sasl
%7|1635317819.572|APIVERSION|rdkafka#producer-1| [thrd:1.1.1.2:9092/bootstrap]: 1.1.1.2:9092/bootstrap:  Feature SaslHandshake: SaslHandshake (0..0) supported by broker
%7|1635317819.572|APIVERSION|rdkafka#producer-1| [thrd:1.1.1.2:9092/bootstrap]: 1.1.1.2:9092/bootstrap: Enabling feature SaslHandshake
%7|1635317819.572|APIVERSION|rdkafka#producer-1| [thrd:1.1.1.2:9092/bootstrap]: 1.1.1.2:9092/bootstrap:  Feature LZ4: FindCoordinator (0..0) supported by broker
%7|1635317819.572|APIVERSION|rdkafka#producer-1| [thrd:1.1.1.2:9092/bootstrap]: 1.1.1.2:9092/bootstrap: Enabling feature LZ4
%7|1635317819.572|APIVERSION|rdkafka#producer-1| [thrd:1.1.1.2:9092/bootstrap]: 1.1.1.2:9092/bootstrap:  Feature OffsetTime: Offset (1..1) supported by broker
%7|1635317819.572|APIVERSION|rdkafka#producer-1| [thrd:1.1.1.2:9092/bootstrap]: 1.1.1.2:9092/bootstrap: Enabling feature OffsetTime
%7|1635317819.572|APIVERSION|rdkafka#producer-1| [thrd:1.1.1.2:9092/bootstrap]: 1.1.1.2:9092/bootstrap:  Feature IdempotentProducer: InitProducerId (0..0) supported by broker
%7|1635317819.572|APIVERSION|rdkafka#producer-1| [thrd:1.1.1.2:9092/bootstrap]: 1.1.1.2:9092/bootstrap: Enabling feature IdempotentProducer
%7|1635317819.572|APIVERSION|rdkafka#producer-1| [thrd:1.1.1.2:9092/bootstrap]: 1.1.1.2:9092/bootstrap:  Feature ZSTD: Produce (7..7) supported by broker
%7|1635317819.572|APIVERSION|rdkafka#producer-1| [thrd:1.1.1.2:9092/bootstrap]: 1.1.1.2:9092/bootstrap:  Feature ZSTD: Fetch (10..10) supported by broker
%7|1635317819.572|APIVERSION|rdkafka#producer-1| [thrd:1.1.1.2:9092/bootstrap]: 1.1.1.2:9092/bootstrap: Enabling feature ZSTD
%7|1635317819.572|APIVERSION|rdkafka#producer-1| [thrd:1.1.1.2:9092/bootstrap]: 1.1.1.2:9092/bootstrap:  Feature SaslAuthReq: SaslHandshake (1..1) supported by broker
%7|1635317819.572|APIVERSION|rdkafka#producer-1| [thrd:1.1.1.2:9092/bootstrap]: 1.1.1.2:9092/bootstrap:  Feature SaslAuthReq: SaslAuthenticate (0..0) supported by broker
%7|1635317819.572|APIVERSION|rdkafka#producer-1| [thrd:1.1.1.2:9092/bootstrap]: 1.1.1.2:9092/bootstrap: Enabling feature SaslAuthReq
%7|1635317819.572|FEATURE|rdkafka#producer-1| [thrd:1.1.1.2:9092/bootstrap]: 1.1.1.2:9092/bootstrap: Updated enabled protocol features to MsgVer1,ApiVersion,BrokerBalancedConsumer,ThrottleTime,Sasl,SaslHandshake,BrokerGroupCoordinator,LZ4,OffsetTime,MsgVer2,IdempotentProducer,ZSTD,SaslAuthReq
%7|1635317819.572|STATE|rdkafka#producer-1| [thrd:1.1.1.2:9092/bootstrap]: 1.1.1.2:9092/bootstrap: Broker changed state APIVERSION_QUERY -> UP
%7|1635317819.572|BROADCAST|rdkafka#producer-1| [thrd:1.1.1.2:9092/bootstrap]: Broadcasting state change
%7|1635317819.572|METADATA|rdkafka#producer-1| [thrd:1.1.1.2:9092/bootstrap]: Hinted cache of 1/1 topic(s) being queried
%7|1635317819.572|METADATA|rdkafka#producer-1| [thrd:1.1.1.2:9092/bootstrap]: Requesting metadata for 1/1 topics: connected
%7|1635317819.572|METADATA|rdkafka#producer-1| [thrd:1.1.1.2:9092/bootstrap]: 1.1.1.2:9092/bootstrap: Request metadata for 1 topic(s): connected
%7|1635317819.572|SEND|rdkafka#producer-1| [thrd:1.1.1.2:9092/bootstrap]: 1.1.1.2:9092/bootstrap: Sent MetadataRequest (v4, 38 bytes @ 0, CorrId 2)
%7|1635317819.574|RECV|rdkafka#producer-1| [thrd:1.1.1.2:9092/bootstrap]: 1.1.1.2:9092/bootstrap: Received MetadataResponse (v4, 336 bytes, CorrId 2, rtt 1.89ms)
%7|1635317819.574|METADATA|rdkafka#producer-1| [thrd:main]: 1.1.1.2:9092/bootstrap: ===== Received metadata (for 1 requested topics): connected =====
%7|1635317819.574|METADATA|rdkafka#producer-1| [thrd:main]: 1.1.1.2:9092/bootstrap: ClusterId: TS-jpakESFe1OxHJyuW2WA, ControllerId: 1001
%7|1635317819.574|METADATA|rdkafka#producer-1| [thrd:main]: 1.1.1.2:9092/bootstrap: 1 brokers, 1 topics
%7|1635317819.574|METADATA|rdkafka#producer-1| [thrd:main]: 1.1.1.2:9092/bootstrap:   Broker #0/1: kafka:9092 NodeId 1001
%7|1635317819.574|WAKEUPFD|rdkafka#producer-1| [thrd:main]: kafka:9092/1001: Enabled low-latency ops queue wake-ups
%7|1635317819.574|BROKER|rdkafka#producer-1| [thrd:main]: kafka:9092/1001: Added new broker with NodeId 1001
%7|1635317819.574|METADATA|rdkafka#producer-1| [thrd:main]: 1.1.1.2:9092/bootstrap:   Topic #0/1: mymetadata with 10 partitions
%7|1635317819.574|STATE|rdkafka#producer-1| [thrd:main]: Topic mymetadata changed state unknown -> exists
%7|1635317819.574|PARTCNT|rdkafka#producer-1| [thrd:main]: Topic mymetadata partition count changed from 0 to 10
%7|1635317819.574|TOPPARNEW|rdkafka#producer-1| [thrd:main]: NEW mymetadata [0] 0x7f2e8400f1f0 (at rd_kafka_topic_partition_cnt_update:750)
%7|1635317819.574|TOPPARNEW|rdkafka#producer-1| [thrd:main]: NEW mymetadata [1] 0x7f2e8400f750 (at rd_kafka_topic_partition_cnt_update:750)
%7|1635317819.574|TOPPARNEW|rdkafka#producer-1| [thrd:main]: NEW mymetadata [2] 0x7f2e8400fcb0 (at rd_kafka_topic_partition_cnt_update:750)
%7|1635317819.574|TOPPARNEW|rdkafka#producer-1| [thrd:main]: NEW mymetadata [3] 0x7f2e84010210 (at rd_kafka_topic_partition_cnt_update:750)
%7|1635317819.574|TOPPARNEW|rdkafka#producer-1| [thrd:main]: NEW mymetadata [4] 0x7f2e84010770 (at rd_kafka_topic_partition_cnt_update:750)
%7|1635317819.574|TOPPARNEW|rdkafka#producer-1| [thrd:main]: NEW mymetadata [5] 0x7f2e84010cd0 (at rd_kafka_topic_partition_cnt_update:750)
%7|1635317819.574|TOPPARNEW|rdkafka#producer-1| [thrd:main]: NEW mymetadata [6] 0x7f2e84011230 (at rd_kafka_topic_partition_cnt_update:750)
%7|1635317819.574|TOPPARNEW|rdkafka#producer-1| [thrd:main]: NEW mymetadata [7] 0x7f2e84011790 (at rd_kafka_topic_partition_cnt_update:750)
%7|1635317819.574|BRKMAIN|rdkafka#producer-1| [thrd:kafka:9092/1001]: kafka:9092/1001: Enter main broker thread
%7|1635317819.574|TOPPARNEW|rdkafka#producer-1| [thrd:main]: NEW mymetadata [8] 0x7f2e84011cf0 (at rd_kafka_topic_partition_cnt_update:750)
%7|1635317819.574|TOPPARNEW|rdkafka#producer-1| [thrd:main]: NEW mymetadata [9] 0x7f2e84012250 (at rd_kafka_topic_partition_cnt_update:750)
%7|1635317819.574|METADATA|rdkafka#producer-1| [thrd:main]:   Topic mymetadata partition 0 Leader 1001
%7|1635317819.574|BRKDELGT|rdkafka#producer-1| [thrd:main]: mymetadata [0]: delegate to broker kafka:9092/1001 (rktp 0x7f2e8400f1f0, term 0, ref 2)
%7|1635317819.574|BRKDELGT|rdkafka#producer-1| [thrd:main]: mymetadata [0]: delegating to broker kafka:9092/1001 for partition with 0 messages (0 bytes) queued
%7|1635317819.574|BRKMIGR|rdkafka#producer-1| [thrd:main]: Migrating topic mymetadata [0] 0x7f2e8400f1f0 from (none) to kafka:9092/1001 (sending PARTITION_JOIN to kafka:9092/1001)
%7|1635317819.574|METADATA|rdkafka#producer-1| [thrd:main]:   Topic mymetadata partition 1 Leader 1001
%7|1635317819.574|BRKDELGT|rdkafka#producer-1| [thrd:main]: mymetadata [1]: delegate to broker kafka:9092/1001 (rktp 0x7f2e8400f750, term 0, ref 2)
%7|1635317819.574|TOPBRK|rdkafka#producer-1| [thrd:kafka:9092/1001]: kafka:9092/1001: Topic mymetadata [0]: joining broker (rktp 0x7f2e8400f1f0, 0 message(s) queued)
%7|1635317819.574|BRKDELGT|rdkafka#producer-1| [thrd:main]: mymetadata [1]: delegating to broker kafka:9092/1001 for partition with 0 messages (0 bytes) queued
%7|1635317819.574|BRKMIGR|rdkafka#producer-1| [thrd:main]: Migrating topic mymetadata [1] 0x7f2e8400f750 from (none) to kafka:9092/1001 (sending PARTITION_JOIN to kafka:9092/1001)
%7|1635317819.574|METADATA|rdkafka#producer-1| [thrd:main]:   Topic mymetadata partition 2 Leader 1001
%7|1635317819.574|BRKDELGT|rdkafka#producer-1| [thrd:main]: mymetadata [2]: delegate to broker kafka:9092/1001 (rktp 0x7f2e8400fcb0, term 0, ref 2)
%7|1635317819.574|BRKDELGT|rdkafka#producer-1| [thrd:main]: mymetadata [2]: delegating to broker kafka:9092/1001 for partition with 0 messages (0 bytes) queued
%7|1635317819.574|BRKMIGR|rdkafka#producer-1| [thrd:main]: Migrating topic mymetadata [2] 0x7f2e8400fcb0 from (none) to kafka:9092/1001 (sending PARTITION_JOIN to kafka:9092/1001)
%7|1635317819.574|METADATA|rdkafka#producer-1| [thrd:main]:   Topic mymetadata partition 3 Leader 1001
%7|1635317819.574|BRKDELGT|rdkafka#producer-1| [thrd:main]: mymetadata [3]: delegate to broker kafka:9092/1001 (rktp 0x7f2e84010210, term 0, ref 2)
%7|1635317819.574|BRKDELGT|rdkafka#producer-1| [thrd:main]: mymetadata [3]: delegating to broker kafka:9092/1001 for partition with 0 messages (0 bytes) queued
%7|1635317819.574|BRKMIGR|rdkafka#producer-1| [thrd:main]: Migrating topic mymetadata [3] 0x7f2e84010210 from (none) to kafka:9092/1001 (sending PARTITION_JOIN to kafka:9092/1001)
%7|1635317819.574|METADATA|rdkafka#producer-1| [thrd:main]:   Topic mymetadata partition 4 Leader 1001
%7|1635317819.574|BRKDELGT|rdkafka#producer-1| [thrd:main]: mymetadata [4]: delegate to broker kafka:9092/1001 (rktp 0x7f2e84010770, term 0, ref 2)
%7|1635317819.574|BRKDELGT|rdkafka#producer-1| [thrd:main]: mymetadata [4]: delegating to broker kafka:9092/1001 for partition with 0 messages (0 bytes) queued
%7|1635317819.574|BRKMIGR|rdkafka#producer-1| [thrd:main]: Migrating topic mymetadata [4] 0x7f2e84010770 from (none) to kafka:9092/1001 (sending PARTITION_JOIN to kafka:9092/1001)
%7|1635317819.574|METADATA|rdkafka#producer-1| [thrd:main]:   Topic mymetadata partition 5 Leader 1001
%7|1635317819.574|BRKDELGT|rdkafka#producer-1| [thrd:main]: mymetadata [5]: delegate to broker kafka:9092/1001 (rktp 0x7f2e84010cd0, term 0, ref 2)
%7|1635317819.574|BRKDELGT|rdkafka#producer-1| [thrd:main]: mymetadata [5]: delegating to broker kafka:9092/1001 for partition with 0 messages (0 bytes) queued
%7|1635317819.574|BRKMIGR|rdkafka#producer-1| [thrd:main]: Migrating topic mymetadata [5] 0x7f2e84010cd0 from (none) to kafka:9092/1001 (sending PARTITION_JOIN to kafka:9092/1001)
%7|1635317819.574|METADATA|rdkafka#producer-1| [thrd:main]:   Topic mymetadata partition 6 Leader 1001
%7|1635317819.574|BRKDELGT|rdkafka#producer-1| [thrd:main]: mymetadata [6]: delegate to broker kafka:9092/1001 (rktp 0x7f2e84011230, term 0, ref 2)
%7|1635317819.574|BRKDELGT|rdkafka#producer-1| [thrd:main]: mymetadata [6]: delegating to broker kafka:9092/1001 for partition with 0 messages (0 bytes) queued
%7|1635317819.574|BRKMIGR|rdkafka#producer-1| [thrd:main]: Migrating topic mymetadata [6] 0x7f2e84011230 from (none) to kafka:9092/1001 (sending PARTITION_JOIN to kafka:9092/1001)
%7|1635317819.574|METADATA|rdkafka#producer-1| [thrd:main]:   Topic mymetadata partition 7 Leader 1001
%7|1635317819.574|BRKDELGT|rdkafka#producer-1| [thrd:main]: mymetadata [7]: delegate to broker kafka:9092/1001 (rktp 0x7f2e84011790, term 0, ref 2)
%7|1635317819.574|BRKDELGT|rdkafka#producer-1| [thrd:main]: mymetadata [7]: delegating to broker kafka:9092/1001 for partition with 0 messages (0 bytes) queued
%7|1635317819.574|BRKMIGR|rdkafka#producer-1| [thrd:main]: Migrating topic mymetadata [7] 0x7f2e84011790 from (none) to kafka:9092/1001 (sending PARTITION_JOIN to kafka:9092/1001)
%7|1635317819.574|METADATA|rdkafka#producer-1| [thrd:main]:   Topic mymetadata partition 8 Leader 1001
%7|1635317819.574|BRKDELGT|rdkafka#producer-1| [thrd:main]: mymetadata [8]: delegate to broker kafka:9092/1001 (rktp 0x7f2e84011cf0, term 0, ref 2)
%7|1635317819.574|BRKDELGT|rdkafka#producer-1| [thrd:main]: mymetadata [8]: delegating to broker kafka:9092/1001 for partition with 0 messages (0 bytes) queued
%7|1635317819.574|BRKMIGR|rdkafka#producer-1| [thrd:main]: Migrating topic mymetadata [8] 0x7f2e84011cf0 from (none) to kafka:9092/1001 (sending PARTITION_JOIN to kafka:9092/1001)
%7|1635317819.574|METADATA|rdkafka#producer-1| [thrd:main]:   Topic mymetadata partition 9 Leader 1001
%7|1635317819.574|BRKDELGT|rdkafka#producer-1| [thrd:main]: mymetadata [9]: delegate to broker kafka:9092/1001 (rktp 0x7f2e84012250, term 0, ref 2)
%7|1635317819.574|BRKDELGT|rdkafka#producer-1| [thrd:main]: mymetadata [9]: delegating to broker kafka:9092/1001 for partition with 0 messages (0 bytes) queued
%7|1635317819.574|BRKMIGR|rdkafka#producer-1| [thrd:main]: Migrating topic mymetadata [9] 0x7f2e84012250 from (none) to kafka:9092/1001 (sending PARTITION_JOIN to kafka:9092/1001)
%7|1635317819.574|PARTCNT|rdkafka#producer-1| [thrd:main]: Partitioning 27815 unassigned messages in topic mymetadata to 10 partitions
%7|1635317819.574|FETCHADD|rdkafka#producer-1| [thrd:kafka:9092/1001]: kafka:9092/1001: Added mymetadata [0] to active list (1 entries, opv 0, 0 messages queued): joining
%7|1635317819.575|BROADCAST|rdkafka#producer-1| [thrd:kafka:9092/1001]: Broadcasting state change
%7|1635317819.575|TOPBRK|rdkafka#producer-1| [thrd:kafka:9092/1001]: kafka:9092/1001: Topic mymetadata [1]: joining broker (rktp 0x7f2e8400f750, 0 message(s) queued)
%7|1635317819.575|FETCHADD|rdkafka#producer-1| [thrd:kafka:9092/1001]: kafka:9092/1001: Added mymetadata [1] to active list (2 entries, opv 0, 0 messages queued): joining
%7|1635317819.575|BROADCAST|rdkafka#producer-1| [thrd:kafka:9092/1001]: Broadcasting state change
%7|1635317819.575|TOPBRK|rdkafka#producer-1| [thrd:kafka:9092/1001]: kafka:9092/1001: Topic mymetadata [2]: joining broker (rktp 0x7f2e8400fcb0, 0 message(s) queued)
%7|1635317819.575|FETCHADD|rdkafka#producer-1| [thrd:kafka:9092/1001]: kafka:9092/1001: Added mymetadata [2] to active list (3 entries, opv 0, 0 messages queued): joining
%7|1635317819.575|BROADCAST|rdkafka#producer-1| [thrd:kafka:9092/1001]: Broadcasting state change
%7|1635317819.575|TOPBRK|rdkafka#producer-1| [thrd:kafka:9092/1001]: kafka:9092/1001: Topic mymetadata [3]: joining broker (rktp 0x7f2e84010210, 1 message(s) queued)
%7|1635317819.575|FETCHADD|rdkafka#producer-1| [thrd:kafka:9092/1001]: kafka:9092/1001: Added mymetadata [3] to active list (4 entries, opv 0, 1 messages queued): joining
%7|1635317819.575|BROADCAST|rdkafka#producer-1| [thrd:kafka:9092/1001]: Broadcasting state change
%7|1635317819.575|TOPBRK|rdkafka#producer-1| [thrd:kafka:9092/1001]: kafka:9092/1001: Topic mymetadata [4]: joining broker (rktp 0x7f2e84010770, 0 message(s) queued)
%7|1635317819.575|FETCHADD|rdkafka#producer-1| [thrd:kafka:9092/1001]: kafka:9092/1001: Added mymetadata [4] to active list (5 entries, opv 0, 0 messages queued): joining
%7|1635317819.575|BROADCAST|rdkafka#producer-1| [thrd:kafka:9092/1001]: Broadcasting state change
%7|1635317819.575|TOPBRK|rdkafka#producer-1| [thrd:kafka:9092/1001]: kafka:9092/1001: Topic mymetadata [5]: joining broker (rktp 0x7f2e84010cd0, 2 message(s) queued)
%7|1635317819.575|FETCHADD|rdkafka#producer-1| [thrd:kafka:9092/1001]: kafka:9092/1001: Added mymetadata [5] to active list (6 entries, opv 0, 2 messages queued): joining
%7|1635317819.575|BROADCAST|rdkafka#producer-1| [thrd:kafka:9092/1001]: Broadcasting state change
%7|1635317819.575|TOPBRK|rdkafka#producer-1| [thrd:kafka:9092/1001]: kafka:9092/1001: Topic mymetadata [6]: joining broker (rktp 0x7f2e84011230, 3 message(s) queued)
%7|1635317819.575|FETCHADD|rdkafka#producer-1| [thrd:kafka:9092/1001]: kafka:9092/1001: Added mymetadata [6] to active list (7 entries, opv 0, 3 messages queued): joining
...

Thank you very much

I misspoke. Would not need TRACE logs, you would need librdkafka/STATISTICS.md at f092c290995ca81b3afb4015fcc3350ba02caa96 · edenhill/librdkafka · GitHub those. That will tell you where the bottleneck is.

Can you please take a look?

{
  "name": "me_0#producer-1",
  "client_id": "me_0",
  "type": "producer",
  "ts": 1822097535201,
  "time": 1635953387,
  "age": 8001773,
  "replyq": 0,
  "msg_cnt": 99184,
  "msg_size": 8578810,
  "msg_max": 100000,
  "msg_size_max": 2199023254528,
  "simple_cnt": 0,
  "metadata_cache_cnt": 1,
  "brokers": {
    "kafka:9092/1001": {
      "name": "kafka:9092/1001",
      "nodeid": 1001,
      "nodename": "kafka:9092",
      "source": "learned",
      "state": "UP",
      "stateage": 6976559,
      "outbuf_cnt": 1,
      "outbuf_msg_cnt": 10342,
      "waitresp_cnt": 6,
      "waitresp_msg_cnt": 62038,
      "tx": 1188,
      "txbytes": 101688955,
      "txerrs": 0,
      "txretries": 0,
      "txidle": 5589,
      "req_timeouts": 0,
      "rx": 1182,
      "rxbytes": 73640,
      "rxerrs": 0,
      "rxcorriderrs": 0,
      "rxpartial": 0,
      "rxidle": 1425,
      "zbuf_grow": 0,
      "buf_grow": 0,
      "wakeups": 1871,
      "connects": 1,
      "disconnects": 0,
      "int_latency": {
        "min": 1827,
        "max": 61776,
        "avg": 15479,
        "sum": 63060937788,
        "stddev": 10992,
        "p50": 11455,
        "p75": 21887,
        "p90": 34303,
        "p95": 37631,
        "p99": 43775,
        "p99_99": 61439,
        "outofrange": 0,
        "hdrsize": 15472,
        "cnt": 4073922
      },
      "outbuf_latency": {
        "min": 12,
        "max": 37621,
        "avg": 2935,
        "sum": 1156493,
        "stddev": 5613,
        "p50": 999,
        "p75": 2655,
        "p90": 7839,
        "p95": 12095,
        "p99": 33279,
        "p99_99": 37631,
        "outofrange": 0,
        "hdrsize": 11376,
        "cnt": 394
      },
      "rtt": {
        "min": 4819,
        "max": 43619,
        "avg": 25113,
        "sum": 9819549,
        "stddev": 8307,
        "p50": 27391,
        "p75": 31103,
        "p90": 33279,
        "p95": 36095,
        "p99": 40959,
        "p99_99": 43775,
        "outofrange": 0,
        "hdrsize": 13424,
        "cnt": 391
      },
      "throttle": {
        "min": 0,
        "max": 0,
        "avg": 0,
        "sum": 0,
        "stddev": 0,
        "p50": 0,
        "p75": 0,
        "p90": 0,
        "p95": 0,
        "p99": 0,
        "p99_99": 0,
        "outofrange": 0,
        "hdrsize": 17520,
        "cnt": 0
      },
      "req": {
        "Produce": 1187,
        "ListOffsets": 0,
        "Metadata": 0,
        "FindCoordinator": 0,
        "SaslHandshake": 0,
        "ApiVersion": 1,
        "InitProducerId": 0,
        "AddPartitionsToTxn": 0,
        "AddOffsetsToTxn": 0,
        "EndTxn": 0,
        "TxnOffsetCommit": 0,
        "SaslAuthenticate": 0,
        "OffsetDeleteRequest": 0,
        "DescribeClientQuotasRequest": 0,
        "AlterClientQuotasRequest": 0,
        "DescribeUserScramCredentialsRequest": 0
      },
      "toppars": {
        "mymetadata-0": {
          "topic": "mymetadata",
          "partition": 0
        }
      }
    },
    "1.1.1.2:9092/bootstrap": {
      "name": "1.1.1.2:9092/bootstrap",
      "nodeid": -1,
      "nodename": "1.1.1.2:9092",
      "source": "configured",
      "state": "UP",
      "stateage": 7985254,
      "outbuf_cnt": 0,
      "outbuf_msg_cnt": 0,
      "waitresp_cnt": 0,
      "waitresp_msg_cnt": 0,
      "tx": 3,
      "txbytes": 110,
      "txerrs": 0,
      "txretries": 0,
      "txidle": 7001408,
      "req_timeouts": 0,
      "rx": 3,
      "rxbytes": 612,
      "rxerrs": 0,
      "rxcorriderrs": 0,
      "rxpartial": 0,
      "rxidle": 6995431,
      "zbuf_grow": 0,
      "buf_grow": 0,
      "wakeups": 9,
      "connects": 1,
      "disconnects": 0,
      "int_latency": {
        "min": 0,
        "max": 0,
        "avg": 0,
        "sum": 0,
        "stddev": 0,
        "p50": 0,
        "p75": 0,
        "p90": 0,
        "p95": 0,
        "p99": 0,
        "p99_99": 0,
        "outofrange": 0,
        "hdrsize": 11376,
        "cnt": 0
      },
      "outbuf_latency": {
        "min": 0,
        "max": 0,
        "avg": 0,
        "sum": 0,
        "stddev": 0,
        "p50": 0,
        "p75": 0,
        "p90": 0,
        "p95": 0,
        "p99": 0,
        "p99_99": 0,
        "outofrange": 0,
        "hdrsize": 11376,
        "cnt": 0
      },
      "rtt": {
        "min": 0,
        "max": 0,
        "avg": 0,
        "sum": 0,
        "stddev": 0,
        "p50": 0,
        "p75": 0,
        "p90": 0,
        "p95": 0,
        "p99": 0,
        "p99_99": 0,
        "outofrange": 0,
        "hdrsize": 13424,
        "cnt": 0
      },
      "throttle": {
        "min": 0,
        "max": 0,
        "avg": 0,
        "sum": 0,
        "stddev": 0,
        "p50": 0,
        "p75": 0,
        "p90": 0,
        "p95": 0,
        "p99": 0,
        "p99_99": 0,
        "outofrange": 0,
        "hdrsize": 17520,
        "cnt": 0
      },
      "req": {
        "Produce": 0,
        "ListOffsets": 0,
        "Metadata": 2,
        "FindCoordinator": 0,
        "SaslHandshake": 0,
        "ApiVersion": 1,
        "InitProducerId": 0,
        "AddPartitionsToTxn": 0,
        "AddOffsetsToTxn": 0,
        "EndTxn": 0,
        "TxnOffsetCommit": 0,
        "SaslAuthenticate": 0,
        "OffsetDeleteRequest": 0,
        "DescribeClientQuotasRequest": 0,
        "AlterClientQuotasRequest": 0,
        "DescribeUserScramCredentialsRequest": 0
      },
      "toppars": {}
    }
  },
  "topics": {
    "mymetadata": {
      "topic": "mymetadata",
      "age": 8001,
      "metadata_age": 6995,
      "batchsize": {
        "min": 85445,
        "max": 85813,
        "avg": 85616,
        "sum": 33732797,
        "stddev": 127,
        "p50": 86015,
        "p75": 86015,
        "p90": 86015,
        "p95": 86015,
        "p99": 86015,
        "p99_99": 86015,
        "outofrange": 0,
        "hdrsize": 24688,
        "cnt": 394
      },
      "batchcnt": {
        "min": 10338,
        "max": 10342,
        "avg": 10339,
        "sum": 4073922,
        "stddev": 0,
        "p50": 10367,
        "p75": 10367,
        "p90": 10367,
        "p95": 10367,
        "p99": 10367,
        "p99_99": 10367,
        "outofrange": 0,
        "hdrsize": 14448,
        "cnt": 394
      },
      "partitions": {
        "0": {
          "partition": 0,
          "broker": 1001,
          "leader": 1001,
          "desired": false,
          "unknown": false,
          "msgq_cnt": 11008,
          "msgq_bytes": 952106,
          "xmit_msgq_cnt": 0,
          "xmit_msgq_bytes": 0,
          "fetchq_cnt": 0,
          "fetchq_size": 0,
          "fetch_state": "none",
          "query_offset": -1001,
          "next_offset": 0,
          "app_offset": -1001,
          "stored_offset": -1001,
          "commited_offset": -1001,
          "committed_offset": -1001,
          "eof_offset": -1001,
          "lo_offset": -1001,
          "hi_offset": -1001,
          "ls_offset": -1001,
          "consumer_lag": -1,
          "consumer_lag_stored": -1,
          "txmsgs": 12283759,
          "txbytes": 1062448968,
          "rxmsgs": 0,
          "rxbytes": 0,
          "msgs": 99072,
          "rx_ver_drops": 0,
          "msgs_inflight": 72380,
          "next_ack_seq": 0,
          "next_err_seq": 0,
          "acked_msgid": 0
        },
        "-1": {
          "partition": -1,
          "broker": -1,
          "leader": -1,
          "desired": false,
          "unknown": false,
          "msgq_cnt": 0,
          "msgq_bytes": 0,
          "xmit_msgq_cnt": 0,
          "xmit_msgq_bytes": 0,
          "fetchq_cnt": 0,
          "fetchq_size": 0,
          "fetch_state": "none",
          "query_offset": -1001,
          "next_offset": 0,
          "app_offset": -1001,
          "stored_offset": -1001,
          "commited_offset": -1001,
          "committed_offset": -1001,
          "eof_offset": -1001,
          "lo_offset": -1001,
          "hi_offset": -1001,
          "ls_offset": -1001,
          "consumer_lag": -1,
          "consumer_lag_stored": -1,
          "txmsgs": 0,
          "txbytes": 0,
          "rxmsgs": 0,
          "rxbytes": 0,
          "msgs": 0,
          "rx_ver_drops": 0,
          "msgs_inflight": 0,
          "next_ack_seq": 0,
          "next_err_seq": 0,
          "acked_msgid": 0
        }
      }
    }
  },
  "tx": 1191,
  "tx_bytes": 101689065,
  "rx": 1185,
  "rx_bytes": 74252,
  "txmsgs": 12283759,
  "txmsg_bytes": 1062448968,
  "rxmsgs": 0,
  "rxmsg_bytes": 0
}

@mitchell-h ? :open_mouth:
Or, can you tell me what should I look at ?

Thanks