Cp-all-in-one Postgres Sink connector

Hi All,

i am using cp-all-in-one .

Can anyone please provide redis source and postgres sink connector example.

Thanks and Regards
naveen

hey @naveenseerangan

did you check the docs:
https://docs.confluent.io/kafka-connectors/jdbc/current/index.html

as well as @rmoff
blog Kafka Connect JDBC Sink deep-dive: Working with Primary Keys

best,
michael

Hi Michel,

I used those docs but I am helpless. Please find the my source and target connector

Source Connector

curl -i -X POST -H “Accept:application/json” -H “Content-Type:application/json” localhost:8083/connectors/ -d ‘{“name”: “redissource”,“config”: {“connector.class”: “com.redis.kafka.connect.RedisSourceConnector”, “tasks.max”: “1”,“topics”: “mystream”,“redis.uri”: “redis://virginia:virginia@172.18.1.41:1235”,“redis.cluster.enabled”: “false”, “redis.stream.name”:“mystream”,“value.converter”:“org.apache.kafka.connect.json.JsonConverter”,“value.converter.schemas.enable”:“false”}}’

Target Connector

curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" localhost:8083/connectors/   -d  '{"name": "redtopgsink","config": { "connector.class"  : "io.confluent.connect.jdbc.JdbcSinkConnector", "connection.url":"jdbc:postgresql://navi1085_postgres_1:5432/postgres", "connection.user" :"postgres",  "connection.password" :"postgres_nave1085",  "topics"  :"mystream", "value.converter":"org.apache.kafka.connect.json.JsonConverter","value.converter.schemas.enable":"false",   "insert.mode" :"upsert", "schema.pattern":"public","auto.create"  :"true" ,"pk.mode"   :"record_key","pk.fields":"sensor_id",   "delete.enabled"  :"true"}}'

Hi @naveenseerangan

any errors in the logs?
what about the status of the connect worker tasks?

best,
michael

Hi Michel,

Kindly request you to find the below error logs.

{“redissource”:
{“info”:{“name”:“redissource”,“config”:{“connector.class”:“com.redis.kafka.connect.RedisSourceConnector”,
“redis.stream.name”:“mystream”,“tasks.max”:“1”,“topics”:“mystream”,“value.converter.schemas.enable”:“false”,
“name”:“redissource”,“redis.cluster.enabled”:“false”,
“value.converter”:“org.apache.kafka.connect.json.JsonConverter”,
“redis.uri”:“redis://virginia:virginia@172.18.1.41:1235”},
“tasks”:[{“connector”:“redissource”,“task”:0}],“type”:“source”},

“status”:{“name”:“redissource”,“connector”:{“state”:“RUNNING”,“worker_id”:“connect:8083”},
“tasks”:[{“id”:0,“state”:“RUNNING”,“worker_id”:“connect:8083”}],“type”:“source”}},

“redtopgsink”:{“info”:{“name”:“redtopgsink”,“config”:{
“connector.class”:“io.confluent.connect.jdbc.JdbcSinkConnector”,
“connection.password”:“postgres_nave1085”,“topics”:“mystream”,
“delete.enabled”:“true”,“connection.user”:“postgres”,“schema.pattern”:“public”,
“value.converter.schemas.enable”:“false”,“name”:“redtopgsink”,
“auto.create”:“true”,“connection.url”:“jdbc:postgresql://navi1085_postgres_1:5432/postgres”,
“value.converter”:“org.apache.kafka.connect.json.JsonConverter”,“insert.mode”:“upsert”,
“pk.mode”:“record_key”,“auto.offset.reset”:“earliest”,“pk.fields”:“sensor_id”},
“tasks”:[{“connector”:“redtopgsink”,“task”:0}],“type”:“sink”},
“status”:{“name”:“redtopgsink”,“connector”:{“state”:“RUNNING”,“worker_id”:“connect:8083”},

“tasks”:[{“id”:0,“state”:“FAILED”,“worker_id”:“connect:8083”,
“trace”:"org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception
.\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:618)
\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:334)
\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:235)
\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:204)
\n\tat org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:200)
\n\tat org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:255)
\n\tat java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
\n\tat java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
\n\tat java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
\n\tat java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
\n\tat java.base/java.lang.Thread.run(Thread.java:829)
\nCaused by: java.lang.ClassCastException: class java.util.HashMap c
annot be cast to class org.apache.kafka.connect.data.Struct (java.util.HashMap
is in module java.base of loader ‘bootstrap’; org.apache.kafka.connect.data.Struct is in unnamed module of loader ‘app’)

\n\tat io.confluent.connect.jdbc.sink.PreparedStatementBinder.bindRecord(PreparedStatementBinder.java:86)
\n\tat io.confluent.connect.jdbc.sink.BufferedRecords.flush(BufferedRecords.java:183)
\n\tat io.confluent.connect.jdbc.sink.JdbcDbWriter.write(JdbcDbWriter.java:80)
\n\tat io.confluent.connect.jdbc.sink.JdbcSinkTask.put(JdbcSinkTask.java:84
)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:584)
\n\t… 10 more\n"}],“type”:“sink”}}}

But postgres table got created as mystream.

Thanks and reagrds
naveen

hey @naveenseerangan

afaik as I know the jdbc sink requires a schema to your data.
so you need to include a schema as part of your json or by avro.

check also

best,
michael

This topic was automatically closed 30 days after the last reply. New replies are no longer allowed.