View internal Kafka Connect records?

Is there a way to see the values in Kafka Connect that are read from a source before they are written to Kafka broker?

You can do this using Kafka Connect logs if you’re using any Single Message Transform. Using this Docker Compose let me show you how:

  1. Create a connector - in this case I’m using Debezium pulling data from MySQL.

    curl -i -X PUT -H  "Content-Type:application/json" \
        http://localhost:8083/connectors/source-debezium-orders-00/config \
        -d '{
                "connector.class": "io.debezium.connector.mysql.MySqlConnector",
                "database.hostname": "mysql",
                "database.port": "3306",
                "database.user": "debezium",
                "database.password": "dbz",
                "database.server.id": "42",
                "database.server.name": "asgard",
                "table.whitelist": "demo.orders",
                "database.history.kafka.bootstrap.servers": "kafka:29092",
                "database.history.kafka.topic": "dbhistory.demo" ,
                "decimal.handling.mode": "double",
                "include.schema.changes": "true",
                "transforms": "unwrap,addTopicPrefix",
                "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
                "transforms.addTopicPrefix.type":"org.apache.kafka.connect.transforms.RegexRouter",
                "transforms.addTopicPrefix.regex":"(.*)",
                "transforms.addTopicPrefix.replacement":"mysql-debezium-$1"
        }'
    

    Note the two Single Message Transform that are defined (unwrap and addTopicPrefix)

  2. Set the TransformationChain logger to TRACE

    curl -s -X PUT \
         -H "Content-Type:application/json" -d '{"level": "TRACE"}' \
         http://localhost:8083/admin/loggers/org.apache.kafka.connect.runtime.TransformationChain 
    
  3. Open the Kafka Connect worker log file - since I’m using Docker I run docker logs -f kafka-connect (the name of my container is kafka-connect).

  4. Insert a row in the source database so that Kafka Connect will pick it up and log it

    insert into ORDERS (order_id, customer_id, order_total_usd, make, model, delivery_city, delivery_company, delivery_address) 
                values (1, 886, 103095.45, 'Suzuki', 'XL-7', 'York', 'Reichert, Jacobi and Corwin', '87528 Arkansas Avenue');
    
  5. In the log file, observe the two TRACE entries which show

    [2020-11-19 17:26:15,088] TRACE [source-debezium-orders-00|task-0] Applying transformation io.debezium.transforms.ExtractNewRecordState to SourceRecord{sourcePartition={server=asgard}, sourceOffset={ts_sec=1605806774, file=binlog.000002, pos=235, row=1, server_id=1, event=2}} ConnectRecord{topic='asgard.demo.ORDERS', kafkaPartition=null, key=Struct{id=501}, keySchema=Schema{asgard.demo.ORDERS.Key:STRUCT}, value=Struct{after=Struct{id=501,order_id=1,customer_id=886,order_total_usd=103095.45,make=Suzuki,model=XL-7,delivery_city=York,delivery_company=Reichert, Jacobi and Corwin,delivery_address=87528 Arkansas Avenue,CREATE_TS=2020-11-19T17:26:14Z,UPDATE_TS=2020-11-19T17:26:14Z},source=Struct{version=1.2.2.Final,connector=mysql,name=asgard,ts_ms=1605806774000,db=demo,table=ORDERS,server_id=1,file=binlog.000002,pos=392,row=0,thread=10},op=c,ts_ms=1605806774929}, valueSchema=Schema{asgard.demo.ORDERS.Envelope:STRUCT}, timestamp=null, headers=ConnectHeaders(headers=)} (org.apache.kafka.connect.runtime.TransformationChain:47)
    [2020-11-19 17:26:15,089] TRACE [source-debezium-orders-00|task-0] Applying transformation org.apache.kafka.connect.transforms.RegexRouter to SourceRecord{sourcePartition={server=asgard}, sourceOffset={ts_sec=1605806774, file=binlog.000002, pos=235, row=1, server_id=1, event=2}} ConnectRecord{topic='asgard.demo.ORDERS', kafkaPartition=null, key=Struct{id=501}, keySchema=Schema{asgard.demo.ORDERS.Key:STRUCT}, value=Struct{id=501,order_id=1,customer_id=886,order_total_usd=103095.45,make=Suzuki,model=XL-7,delivery_city=York,delivery_company=Reichert, Jacobi and Corwin,delivery_address=87528 Arkansas Avenue,CREATE_TS=2020-11-19T17:26:14Z,UPDATE_TS=2020-11-19T17:26:14Z}, valueSchema=Schema{asgard.demo.ORDERS.Value:STRUCT}, timestamp=null, headers=ConnectHeaders(headers=)} (org.apache.kafka.connect.runtime.TransformationChain:47)
    

    Which with a bit of reformatting looks like this for the first one, showing the data as read from the source connector:

    [2020-11-19 17:26:15,088] TRACE [source-debezium-orders-00|task-0] Applying transformation io.debezium.transforms.ExtractNewRecordState to 
    SourceRecord{sourcePartition={server=asgard}, 
                sourceOffset={ts_sec=1605806774, 
                            file=binlog.000002, 
                            pos=235, 
                            row=1, 
                            server_id=1, 
                            event=2}} 
    ConnectRecord{topic='asgard.demo.ORDERS', 
                kafkaPartition=null, 
                key=Struct{id=501}, 
                keySchema=Schema{asgard.demo.ORDERS.Key:STRUCT}, 
                value=Struct{after=Struct{id=501,order_id=1,customer_id=886,order_total_usd=103095.45,make=Suzuki,model=XL-7,delivery_city=York,delivery_company=Reichert, Jacobi and Corwin,delivery_address=87528 Arkansas Avenue,CREATE_TS=2020-11-19T17:26:14Z,UPDATE_TS=2020-11-19T17:26:14Z},
                            source=Struct{version=1.2.2.Final,connector=mysql,name=asgard,ts_ms=1605806774000,db=demo,table=ORDERS,server_id=1,file=binlog.000002,pos=392,row=0,thread=10},
                            op=c,
                            ts_ms=1605806774929}, 
                valueSchema=Schema{asgard.demo.ORDERS.Envelope:STRUCT}, timestamp=null, headers=ConnectHeaders(headers=)} 
    (org.apache.kafka.connect.runtime.TransformationChain:47)