What is kafkacat?
kafkacat is a generic non-JVM producer and consumer for Apache Kafka >=0.8, think of it as a netcat for Kafka.
Download: GitHub - edenhill/kcat: Generic command line non-JVM Apache Kafka producer and consumer
kafkacat examples
-
In producer mode kafkacat reads messages from stdin, delimited with a configurable delimiter (-D, defaults to newline), and produces them to the provided Kafka cluster (-b), topic (-t) and optionally, partition (-p).
$ kafkacat -b localhost:9092 -t new_topic -P test
$ kafkacat -b localhost:9092 -t partitioned_topic -P -K: -p 1 1:foo
-
In consumer mode kafkacat reads messages from a topic and partition and prints them to stdout using the configured message delimiter.
$ kafkacat -b localhost:9092 -t mysql_users % Auto-selecting Consumer mode (use -P or -C to override) {"uid":1,"name":"Cliff","locale":"en_US","address_city":"St Louis","elite":"P"} {"uid":2,"name":"Nick","locale":"en_US","address_city":"Palo Alto","elite":"G"} [...]
-
kafkacat also features a Metadata list (-L) mode to display the current state of the Kafka cluster and its topics and partitions.
$ kafkacat -L -b mybroker Metadata for all topics (from broker 1: mybroker:9092/1): 3 brokers: broker 1 at mybroker:9092 broker 2 at mybrokertoo:9092 broker 3 at thirdbroker:9092 16 topics: topic "syslog" with 3 partitions: partition 0, leader 3, replicas: 1,2,3, isrs: 1,2,3 partition 1, leader 1, replicas: 1,2,3, isrs: 1,2,3 partition 2, leader 1, replicas: 1,2, isrs: 1,2 topic "rdkafkatest1_auto_49f744a4327b1b1e" with 2 partitions: partition 0, leader 3, replicas: 3, isrs: 3 partition 1, leader 1, replicas: 1, isrs: 1 topic "rdkafkatest1_auto_e02f58f2c581cba" with 2 partitions: partition 0, leader 3, replicas: 3, isrs: 3 partition 1, leader 1, replicas: 1, isrs: 1 ....
Using kafkacat with Confluent Cloud
Here’s an example with kafkacat, taking advantage of running it directly from Docker too:
$ docker run --rm --interactive edenhill/kafkacat:1.6.0 \
-X security.protocol=SASL_SSL -X sasl.mechanisms=PLAIN \
-X ssl.ca.location=./etc/ssl/cert.pem -X api.version.request=true \
-b $CCLOUD_BROKER_HOST \
-X sasl.username="$CCLOUD_API_KEY" \
-X sasl.password="$CCLOUD_API_SECRET" \
-t rmoff_test_topic_01 -C -u -e
Hello world!
This is a message on a topic in Confluent Cloud
You can also output the message in JSON which can be useful:
$ docker run --rm --interactive edenhill/kafkacat:1.6.0 \
-X security.protocol=SASL_SSL -X sasl.mechanisms=PLAIN \
-X ssl.ca.location=./etc/ssl/cert.pem -X api.version.request=true \
-b $CCLOUD_BROKER_HOST \
-X sasl.username="$CCLOUD_API_KEY" \
-X sasl.password="$CCLOUD_API_SECRET" \
-t rmoff_test_topic_01 -C -u -J -e
{
"topic": "rmoff_test_topic_01",
"partition": 0,
"offset": 0,
"tstype": "create",
"ts": 1604571163960,
"broker": 7,
"key": null,
"payload": "Hello world!"
}
{
"topic": "rmoff_test_topic_01",
"partition": 3,
"offset": 0,
"tstype": "create",
"ts": 1604571168723,
"broker": 1,
"key": null,
"payload": "This is a message on a topic in Confluent Cloud"
}