POSTGRES TO REDIS Sychronisation through kafka and debezium

POSTGRES TO REDIS Sychronisation through kafka and debezium

RDIS

docker run --name redis_nave1085 -d -p 1235:6379 redis redis-server

POSTGRES

docker run --name deb_postgres -p 1236:5432 -e POSTGRES_PASSWORD=postgres_nave1085 -d debezium/postgres

ZOOKEEPER

docker run -it --rm --name zookeeper -p 2181:2181 -p 2888:2888 -p 3888:3888 -e LOG_LEVEL=DEBUG debezium/zookeeper

KAFKA

docker run -it --rm --name kafka -p 9092:9092 -e LOG_LEVEL=DEBUG --link zookeeper:zookeeper --link deb_postgres:deb_postgres --link redis_nave1085:redis_nave1085 debezium/kafka

CONNECT

docker run -it --rm --name connect -p 8083:8083 -e GROUP_ID=1 -e CONFIG_STORAGE_TOPIC=my-connect-configs -e OFFSET_STORAGE_TOPIC=my-connect-offsets -e STATUS_STORAGE_TOPIC=my_connect_Statuses --link zookeeper:zookeeper --link kafka:kafka --link deb_postgres:deb_postgres --link redis_nave1085:redis_nave1085 debezium/connect

RedisSinkConnector Downloaded from Below link ,renamed the lib folder to kafka-connect-jdbc and moved to kafka/connect path

jcustenborder/kafka-connect-redis

docker cp /home/navi1085/jcustenborder-kafka-connect-redis-0.0.2.17/kafka-connect-jdbc connect:/kafka/connect/

docker exec -it connect /bin/bash
docker restart connect

8083/connector-plugins

[{"class":"com.github.jcustenborder.kafka.connect.redis.RedisSinkConnector","type":"sink","version":"0.0.0.0"},
{"class":"io.debezium.connector.db2.Db2Connector","type":"source","version":"1.9.4.Final"},
{"class":"io.debezium.connector.mongodb.MongoDbConnector","type":"source","version":"1.9.4.Final"},
{"class":"io.debezium.connector.mysql.MySqlConnector","type":"source","version":"1.9.4.Final"},
{"class":"io.debezium.connector.oracle.OracleConnector","type":"source","version":"1.9.4.Final"},
{"class":"io.debezium.connector.postgresql.PostgresConnector","type":"source","version":"1.9.4.Final"},
{"class":"io.debezium.connector.sqlserver.SqlServerConnector","type":"source","version":"1.9.4.Final"},
{"class":"io.debezium.connector.vitess.VitessConnector","type":"source","version":"1.9.4.Final"},
{"class":"org.apache.kafka.connect.file.FileStreamSinkConnector","type":"sink","version":"3.1.0"},
{"class":"org.apache.kafka.connect.file.FileStreamSourceConnector","type":"source","version":"3.1.0"},
{"class":"org.apache.kafka.connect.mirror.MirrorCheckpointConnector","type":"source","version":"1"},
{"class":"org.apache.kafka.connect.mirror.MirrorHeartbeatConnector","type":"source","version":"1"},
{"class":"org.apache.kafka.connect.mirror.MirrorSourceConnector","type":"source","version":"1"}]
curl -H "Accept:application/json" localhost:8083
curl -H "Accept:application/json" localhost:8083/connectors

SOURCE CURL
curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" localhost:8083/connectors/ -d '{ "name": "student-connector", "config": { "connector.class": "io.debezium.connector.postgresql.PostgresConnector", "tasks.max": "1", "database.hostname": "172.18.1.221", "database.port": "1236", "database.user": "postgres", "database.password": "postgres_nave1085", "database.dbname": "postgres", "[database.server.name]( database.server.name/)": "dbs1", "schema.whitelist": "public", "database.history.kafka.bootstrap.servers": "kafka:9092", "database.history.kafka.topic": "schema-changes.student", "key.converter": "org.apache.kafka.connect.json.JsonConverter","value.converter": "org.apache.kafka.connect.json.JsonConverter","key.converter.schemas.enable": "false","value.converter.schemas.enable": "false" ,"redis.type":"JSON"} }'

TARGET CURL

curl -i -X POST -H "Accept:application/json" -H "Content-Length: 10000" -H "Content-Type:application/json" localhost:8083/connectors/ -d '{ "name": "RedisSinkConnector223", "config": { "topics" : "redisdatamove1", "connector.class": "com.github.jcustenborder.kafka.connect.redis.RedisSinkConnector", "tasks.max": "1", "redis.hosts": "[172.18.1.221:1235](
172.18.1.221:1235/)","key.converter": "org.apache.kafka.connect.json.JsonConverter","value.converter": "org.apache.kafka.connect.json.JsonConverter","redis.type":"JSON"}}'

8083/connectors

["RedisSinkConnector223","student-connector"]

POSTGRES DB

create table student ( id int primary key ,
username varchar(100) )

insert into student (id,username) values (2,'rajan');

WATCHER

docker run -it --name watcher --rm --link zookeeper:zookeeper --link kafka:kafka --link redis_nave1085:redis_nave1085 --link deb_postgres:deb_postgres debezium/kafka watch-topic -a -k dbs1.public.student

WATCHER RESULTS

{"id":2} {"before":null,"after":{"id":2,"username":"rajan"},
"source":{"version":"1.9.4.Final","connector":"postgresql","name":"dbs1","ts_ms":1657178271692,
"snapshot":"false","db":"postgres","sequence":"[\"23719736\",\"23719792\"]","schema":"public",
"table":"student","txId":554,"lsn":23719792,"xmin":null},"op":"c","ts_ms":1657178271990,
"transaction":null}
docker exec -it redis_nave1085 /bin/bash

root@90c79aa80289:/data# redis-cli
[127.0.0.1:6379](h
/127.0.0.1:6379/)> select 1
OK
127.0.0.1:6379[1]> keys *
**1) "__kafka.offset.redisdatamove1.0"**
127.0.0.1:6379[1]>

i am getting the below results in redis how to resolve the redis data CDC

1) "__kafka.offset.redisdatamove1.0"

<b>127.0.0.1:6379[1]> get "__kafka.offset.redisdatamove1.0"
"{\"topic\":\"redisdatamove1\",\"partition\":0,\"offset\":0}"
127.0.0.1:6379[1]></b>

Thanks in advance
Naveenkumar.S

__kafka.offset.* entry values are just that, offsets.

You should look for other entries in redis that correspond to only your topic name if you want the rowsets that came from the database.

If you have none, then you should consult the logs for the connector…

If you refer the documentation, the data needs to be bytes or strings, not full JSON objects. Plus, it seems redis.type is not a valid config.

Also relevant - New Record State Extraction :: Debezium Documentation

This topic was automatically closed 30 days after the last reply. New replies are no longer allowed.