Kafkacat ("netcat for Kafka")

What is kafkacat?

kafkacat is a generic non-JVM producer and consumer for Apache Kafka >=0.8, think of it as a netcat for Kafka.

Download: https://github.com/edenhill/kafkacat/

kafkacat examples

  • In producer mode kafkacat reads messages from stdin, delimited with a configurable delimiter (-D, defaults to newline), and produces them to the provided Kafka cluster (-b), topic (-t) and optionally, partition (-p).

    $ kafkacat -b localhost:9092 -t new_topic -P
    test
    
    $ kafkacat -b localhost:9092 -t partitioned_topic -P -K: -p 1
    1:foo
    
  • In consumer mode kafkacat reads messages from a topic and partition and prints them to stdout using the configured message delimiter.

    $ kafkacat -b localhost:9092 -t mysql_users
    % Auto-selecting Consumer mode (use -P or -C to override)
    {"uid":1,"name":"Cliff","locale":"en_US","address_city":"St Louis","elite":"P"}
    {"uid":2,"name":"Nick","locale":"en_US","address_city":"Palo Alto","elite":"G"}
    [...]
    
  • kafkacat also features a Metadata list (-L) mode to display the current state of the Kafka cluster and its topics and partitions.

    $ kafkacat -L -b mybroker
    Metadata for all topics (from broker 1: mybroker:9092/1):
    3 brokers:
    broker 1 at mybroker:9092
    broker 2 at mybrokertoo:9092
    broker 3 at thirdbroker:9092
    16 topics:
    topic "syslog" with 3 partitions:
        partition 0, leader 3, replicas: 1,2,3, isrs: 1,2,3
        partition 1, leader 1, replicas: 1,2,3, isrs: 1,2,3
        partition 2, leader 1, replicas: 1,2, isrs: 1,2
    topic "rdkafkatest1_auto_49f744a4327b1b1e" with 2 partitions:
        partition 0, leader 3, replicas: 3, isrs: 3
        partition 1, leader 1, replicas: 1, isrs: 1
    topic "rdkafkatest1_auto_e02f58f2c581cba" with 2 partitions:
        partition 0, leader 3, replicas: 3, isrs: 3
        partition 1, leader 1, replicas: 1, isrs: 1
    ....
    

Using kafkacat with Confluent Cloud

Here’s an example with kafkacat, taking advantage of running it directly from Docker too:

$ docker run --rm --interactive edenhill/kafkacat:1.6.0 \
            -X security.protocol=SASL_SSL -X sasl.mechanisms=PLAIN \
            -X ssl.ca.location=./etc/ssl/cert.pem -X api.version.request=true \
            -b $CCLOUD_BROKER_HOST \
            -X sasl.username="$CCLOUD_API_KEY" \
            -X sasl.password="$CCLOUD_API_SECRET" \
            -t rmoff_test_topic_01 -C -u -e
Hello world!
This is a message on a topic in Confluent Cloud

You can also output the message in JSON which can be useful:

$ docker run --rm --interactive edenhill/kafkacat:1.6.0 \
            -X security.protocol=SASL_SSL -X sasl.mechanisms=PLAIN \
            -X ssl.ca.location=./etc/ssl/cert.pem -X api.version.request=true \
            -b $CCLOUD_BROKER_HOST \
            -X sasl.username="$CCLOUD_API_KEY" \
            -X sasl.password="$CCLOUD_API_SECRET" \
            -t rmoff_test_topic_01 -C -u -J -e
{
  "topic": "rmoff_test_topic_01",
  "partition": 0,
  "offset": 0,
  "tstype": "create",
  "ts": 1604571163960,
  "broker": 7,
  "key": null,
  "payload": "Hello world!"
}
{
  "topic": "rmoff_test_topic_01",
  "partition": 3,
  "offset": 0,
  "tstype": "create",
  "ts": 1604571168723,
  "broker": 1,
  "key": null,
  "payload": "This is a message on a topic in Confluent Cloud"
}

Articles

1 Like