JDBC sink connector error for array type

Hello,

I am trying to replication data from one postgresql database to another. I am using kafka-connect-jdbc version 10.2.2.

I got below error when my postgres source table has a column which is an array:
“org.apache.kafka.connect.errors.ConnectException: Unsupported source data type: ARRAY”

I tried to use flatten SMT in my sink connector configuration, but got below error:
“Flatten transformation does not support ARRAY”

Any suggestions to solve this error for array data type in the source table? Thanks.

Sarah

Hi there! Can you please confirm that your ARRAY contains primitive types only? At this moment, the connector only supports primitives.

Yes. It is an array of integers.

@danicafine How to solve this error?

The “Unsupported source data type” error message likely comes from the GenericDatabaseDialect class, meaning that the connector isn’t recognizing that the database as Postgres (based on the database connection string). Could you please try setting dialect.name=PostgreSqlDatabaseDialect in your connector config and see if you get the same error.

Here’s the supporting documentation if you want to take a look.

@danicafine I added “dialect.name=PostgreSqlDatabaseDialect” as per your suggestion, below is the error message:

org.apache.kafka.connect.errors.ConnectException: Unsupported source data type: ARRAY
\n\tat io.confluent.connect.jdbc.dialect.GenericDatabaseDialect.bindField(GenericDatabaseDialect.java:1569)\n\tat io.confluent.connect.jdbc.sink.PreparedStatementBinder.bindField(PreparedStatementBinder.java:149)\n\tat

Seems the postgres dialect is not used although I have specified it in the sink connector’s configuration.

The kafka connect jar locates in:
debezium-connect-1/connect/kafka-connect-jdbc/kafka-connect-jdbc-10.2.2.jar

And below is my sink connect’s configure:
“connector.class”: “io.confluent.connect.jdbc.JdbcSinkConnector”,
“tasks.max”: “1”,
“topics”: “array_test”,
“connection.url”: “jdbc:postgresql://myserver:5432/mydb?user=usr&password=psd&stringtype=unspecified”,
“transforms”: “unwrap”,
“transforms.unwrap.type”: “io.debezium.transforms.ExtractNewRecordState”,
“transforms.unwrap.drop.tombstones”: “false”,
“dialect.name”: “PostgreSqlDatabaseDialect”,
“auto.create”: “true”,
“insert.mode”: “upsert”,
“delete.enabled”: “true”,
“pk.fields”: “id”,
“pk.mode”: “record_key”

Thanks for looking at my issue.

Restart all the docker services, it works.

1 Like

That’s great news! Glad to hear it worked out for you!

This topic was automatically closed 30 days after the last reply. New replies are no longer allowed.