Is there a way to see the values in Kafka Connect that are read from a source before they are written to Kafka broker?
You can do this using Kafka Connect logs if you’re using any Single Message Transform. Using this Docker Compose let me show you how:
-
Create a connector - in this case I’m using Debezium pulling data from MySQL.
curl -i -X PUT -H "Content-Type:application/json" \ http://localhost:8083/connectors/source-debezium-orders-00/config \ -d '{ "connector.class": "io.debezium.connector.mysql.MySqlConnector", "database.hostname": "mysql", "database.port": "3306", "database.user": "debezium", "database.password": "dbz", "database.server.id": "42", "database.server.name": "asgard", "table.whitelist": "demo.orders", "database.history.kafka.bootstrap.servers": "kafka:29092", "database.history.kafka.topic": "dbhistory.demo" , "decimal.handling.mode": "double", "include.schema.changes": "true", "transforms": "unwrap,addTopicPrefix", "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState", "transforms.addTopicPrefix.type":"org.apache.kafka.connect.transforms.RegexRouter", "transforms.addTopicPrefix.regex":"(.*)", "transforms.addTopicPrefix.replacement":"mysql-debezium-$1" }'
Note the two Single Message Transform that are defined (
unwrap
andaddTopicPrefix
) -
Set the
TransformationChain
logger toTRACE
curl -s -X PUT \ -H "Content-Type:application/json" -d '{"level": "TRACE"}' \ http://localhost:8083/admin/loggers/org.apache.kafka.connect.runtime.TransformationChain
-
Open the Kafka Connect worker log file - since I’m using Docker I run
docker logs -f kafka-connect
(the name of my container iskafka-connect
). -
Insert a row in the source database so that Kafka Connect will pick it up and log it
insert into ORDERS (order_id, customer_id, order_total_usd, make, model, delivery_city, delivery_company, delivery_address) values (1, 886, 103095.45, 'Suzuki', 'XL-7', 'York', 'Reichert, Jacobi and Corwin', '87528 Arkansas Avenue');
-
In the log file, observe the two
TRACE
entries which show[2020-11-19 17:26:15,088] TRACE [source-debezium-orders-00|task-0] Applying transformation io.debezium.transforms.ExtractNewRecordState to SourceRecord{sourcePartition={server=asgard}, sourceOffset={ts_sec=1605806774, file=binlog.000002, pos=235, row=1, server_id=1, event=2}} ConnectRecord{topic='asgard.demo.ORDERS', kafkaPartition=null, key=Struct{id=501}, keySchema=Schema{asgard.demo.ORDERS.Key:STRUCT}, value=Struct{after=Struct{id=501,order_id=1,customer_id=886,order_total_usd=103095.45,make=Suzuki,model=XL-7,delivery_city=York,delivery_company=Reichert, Jacobi and Corwin,delivery_address=87528 Arkansas Avenue,CREATE_TS=2020-11-19T17:26:14Z,UPDATE_TS=2020-11-19T17:26:14Z},source=Struct{version=1.2.2.Final,connector=mysql,name=asgard,ts_ms=1605806774000,db=demo,table=ORDERS,server_id=1,file=binlog.000002,pos=392,row=0,thread=10},op=c,ts_ms=1605806774929}, valueSchema=Schema{asgard.demo.ORDERS.Envelope:STRUCT}, timestamp=null, headers=ConnectHeaders(headers=)} (org.apache.kafka.connect.runtime.TransformationChain:47) [2020-11-19 17:26:15,089] TRACE [source-debezium-orders-00|task-0] Applying transformation org.apache.kafka.connect.transforms.RegexRouter to SourceRecord{sourcePartition={server=asgard}, sourceOffset={ts_sec=1605806774, file=binlog.000002, pos=235, row=1, server_id=1, event=2}} ConnectRecord{topic='asgard.demo.ORDERS', kafkaPartition=null, key=Struct{id=501}, keySchema=Schema{asgard.demo.ORDERS.Key:STRUCT}, value=Struct{id=501,order_id=1,customer_id=886,order_total_usd=103095.45,make=Suzuki,model=XL-7,delivery_city=York,delivery_company=Reichert, Jacobi and Corwin,delivery_address=87528 Arkansas Avenue,CREATE_TS=2020-11-19T17:26:14Z,UPDATE_TS=2020-11-19T17:26:14Z}, valueSchema=Schema{asgard.demo.ORDERS.Value:STRUCT}, timestamp=null, headers=ConnectHeaders(headers=)} (org.apache.kafka.connect.runtime.TransformationChain:47)
Which with a bit of reformatting looks like this for the first one, showing the data as read from the source connector:
[2020-11-19 17:26:15,088] TRACE [source-debezium-orders-00|task-0] Applying transformation io.debezium.transforms.ExtractNewRecordState to SourceRecord{sourcePartition={server=asgard}, sourceOffset={ts_sec=1605806774, file=binlog.000002, pos=235, row=1, server_id=1, event=2}} ConnectRecord{topic='asgard.demo.ORDERS', kafkaPartition=null, key=Struct{id=501}, keySchema=Schema{asgard.demo.ORDERS.Key:STRUCT}, value=Struct{after=Struct{id=501,order_id=1,customer_id=886,order_total_usd=103095.45,make=Suzuki,model=XL-7,delivery_city=York,delivery_company=Reichert, Jacobi and Corwin,delivery_address=87528 Arkansas Avenue,CREATE_TS=2020-11-19T17:26:14Z,UPDATE_TS=2020-11-19T17:26:14Z}, source=Struct{version=1.2.2.Final,connector=mysql,name=asgard,ts_ms=1605806774000,db=demo,table=ORDERS,server_id=1,file=binlog.000002,pos=392,row=0,thread=10}, op=c, ts_ms=1605806774929}, valueSchema=Schema{asgard.demo.ORDERS.Envelope:STRUCT}, timestamp=null, headers=ConnectHeaders(headers=)} (org.apache.kafka.connect.runtime.TransformationChain:47)