POSTGRES TO REDIS Sychronisation through kafka and debezium
RDIS
docker run --name redis_nave1085 -d -p 1235:6379 redis redis-server
POSTGRES
docker run --name deb_postgres -p 1236:5432 -e POSTGRES_PASSWORD=postgres_nave1085 -d debezium/postgres
ZOOKEEPER
docker run -it --rm --name zookeeper -p 2181:2181 -p 2888:2888 -p 3888:3888 -e LOG_LEVEL=DEBUG debezium/zookeeper
KAFKA
docker run -it --rm --name kafka -p 9092:9092 -e LOG_LEVEL=DEBUG --link zookeeper:zookeeper --link deb_postgres:deb_postgres --link redis_nave1085:redis_nave1085 debezium/kafka
CONNECT
docker run -it --rm --name connect -p 8083:8083 -e GROUP_ID=1 -e CONFIG_STORAGE_TOPIC=my-connect-configs -e OFFSET_STORAGE_TOPIC=my-connect-offsets -e STATUS_STORAGE_TOPIC=my_connect_Statuses --link zookeeper:zookeeper --link kafka:kafka --link deb_postgres:deb_postgres --link redis_nave1085:redis_nave1085 debezium/connect
RedisSinkConnector Downloaded from Below link ,renamed the lib folder to kafka-connect-jdbc and moved to kafka/connect path
jcustenborder/kafka-connect-redis
docker cp /home/navi1085/jcustenborder-kafka-connect-redis-0.0.2.17/kafka-connect-jdbc connect:/kafka/connect/
docker exec -it connect /bin/bash
docker restart connect
8083/connector-plugins
[{"class":"com.github.jcustenborder.kafka.connect.redis.RedisSinkConnector","type":"sink","version":"0.0.0.0"},
{"class":"io.debezium.connector.db2.Db2Connector","type":"source","version":"1.9.4.Final"},
{"class":"io.debezium.connector.mongodb.MongoDbConnector","type":"source","version":"1.9.4.Final"},
{"class":"io.debezium.connector.mysql.MySqlConnector","type":"source","version":"1.9.4.Final"},
{"class":"io.debezium.connector.oracle.OracleConnector","type":"source","version":"1.9.4.Final"},
{"class":"io.debezium.connector.postgresql.PostgresConnector","type":"source","version":"1.9.4.Final"},
{"class":"io.debezium.connector.sqlserver.SqlServerConnector","type":"source","version":"1.9.4.Final"},
{"class":"io.debezium.connector.vitess.VitessConnector","type":"source","version":"1.9.4.Final"},
{"class":"org.apache.kafka.connect.file.FileStreamSinkConnector","type":"sink","version":"3.1.0"},
{"class":"org.apache.kafka.connect.file.FileStreamSourceConnector","type":"source","version":"3.1.0"},
{"class":"org.apache.kafka.connect.mirror.MirrorCheckpointConnector","type":"source","version":"1"},
{"class":"org.apache.kafka.connect.mirror.MirrorHeartbeatConnector","type":"source","version":"1"},
{"class":"org.apache.kafka.connect.mirror.MirrorSourceConnector","type":"source","version":"1"}]
curl -H "Accept:application/json" localhost:8083
curl -H "Accept:application/json" localhost:8083/connectors
SOURCE CURL
curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" localhost:8083/connectors/ -d '{ "name": "student-connector", "config": { "connector.class": "io.debezium.connector.postgresql.PostgresConnector", "tasks.max": "1", "database.hostname": "172.18.1.221", "database.port": "1236", "database.user": "postgres", "database.password": "postgres_nave1085", "database.dbname": "postgres", "[database.server.name]( database.server.name/)": "dbs1", "schema.whitelist": "public", "database.history.kafka.bootstrap.servers": "kafka:9092", "database.history.kafka.topic": "schema-changes.student", "key.converter": "org.apache.kafka.connect.json.JsonConverter","value.converter": "org.apache.kafka.connect.json.JsonConverter","key.converter.schemas.enable": "false","value.converter.schemas.enable": "false" ,"redis.type":"JSON"} }'
TARGET CURL
curl -i -X POST -H "Accept:application/json" -H "Content-Length: 10000" -H "Content-Type:application/json" localhost:8083/connectors/ -d '{ "name": "RedisSinkConnector223", "config": { "topics" : "redisdatamove1", "connector.class": "com.github.jcustenborder.kafka.connect.redis.RedisSinkConnector", "tasks.max": "1", "redis.hosts": "[172.18.1.221:1235](
172.18.1.221:1235/)","key.converter": "org.apache.kafka.connect.json.JsonConverter","value.converter": "org.apache.kafka.connect.json.JsonConverter","redis.type":"JSON"}}'
8083/connectors
["RedisSinkConnector223","student-connector"]
POSTGRES DB
create table student ( id int primary key ,
username varchar(100) )
insert into student (id,username) values (2,'rajan');
WATCHER
docker run -it --name watcher --rm --link zookeeper:zookeeper --link kafka:kafka --link redis_nave1085:redis_nave1085 --link deb_postgres:deb_postgres debezium/kafka watch-topic -a -k dbs1.public.student
WATCHER RESULTS
{"id":2} {"before":null,"after":{"id":2,"username":"rajan"},
"source":{"version":"1.9.4.Final","connector":"postgresql","name":"dbs1","ts_ms":1657178271692,
"snapshot":"false","db":"postgres","sequence":"[\"23719736\",\"23719792\"]","schema":"public",
"table":"student","txId":554,"lsn":23719792,"xmin":null},"op":"c","ts_ms":1657178271990,
"transaction":null}
docker exec -it redis_nave1085 /bin/bash
root@90c79aa80289:/data# redis-cli
[127.0.0.1:6379](h
/127.0.0.1:6379/)> select 1
OK
127.0.0.1:6379[1]> keys *
**1) "__kafka.offset.redisdatamove1.0"**
127.0.0.1:6379[1]>
i am getting the below results in redis how to resolve the redis data CDC
1) "__kafka.offset.redisdatamove1.0"
<b>127.0.0.1:6379[1]> get "__kafka.offset.redisdatamove1.0"
"{\"topic\":\"redisdatamove1\",\"partition\":0,\"offset\":0}"
127.0.0.1:6379[1]></b>
Thanks in advance
Naveenkumar.S