Hey people!
I am here trying to attach a Debezium Source Connector to my Oracle database. And I end up with this exception, when try to deploy my connector configuration to Kafka Connect:
Unable to connect: Failed to resolve Oracle database version
The exception looks really weird to me, because other connectors work with the same database perfectly fine (though, they all are sink connectors, not source, but I don’t think that this should make any difference).
I am providing my connector config here under a spoiler. I used the official docs very carefully to compose it, so I don’t really understand what could be wrong about it.
Connector config
{
"name":"source-local-debezium-revision-1",
"config":{
"connector.class":"io.debezium.connector.oracle.OracleConnector",
"key.converter":"io.confluent.connect.avro.AvroConverter",
"key.converter.schema.registry.url":"http://localhost:8081",
"key.converter.schemas.enable":"true",
"value.converter":"io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url":"http://localhost:8081",
"value.converter.schemas.enable":"true",
"tasks.max":"1",
"topics":"my-dest-topic",
"database.hostname":"localhost",
"database.port":"1521",
"database.user":"c##debezium_user",
"database.password":"top_secret",
"database.dbname":"XE",
"database.pdb.name":"XEPDB1",
"database.server.name":"local_sandbox_oracle",
"database.connection.adapter":"logminer",
"database.history.kafka.topic":"schema_changes",
"database.history.kafka.bootstrap.servers":"localhost:9092",
"database.tablename.case.insensitive":"true",
"snapshot.mode":"initial",
"table.include.list":"users.my_source_table",
"tombstones.on.delete":"true",
"message.key.columns":"users.my_source_table.id",
"sanitize.field.names":"true"
}
}
Furthermore, when I tried to validate the config.
Validation query
curl --location --request PUT 'http://localhost:8083/connector-plugins/io.debezium.connector.oracle.OracleConnector/config/validate' \
--header 'Content-Type: application/json' \
--data-raw '{
"name": "source-local-debezium-revision-1",
"connector.class": "io.debezium.connector.oracle.OracleConnector",
"key.converter": "io.confluent.connect.avro.AvroConverter",
"key.converter.schema.registry.url": "http://localhost:8081",
"key.converter.schemas.enable": "true",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url": "http://localhost:8081",
"value.converter.schemas.enable": "true",
"tasks.max": "1",
"topics": "my-dest-topic",
"database.hostname": "localhost",
"database.port": "1521",
"database.user": "c##debezium_user",
"database.password": "top_secret",
"database.dbname": "XE",
"database.pdb.name": "XEPDB1",
"database.server.name": "local_sandbox_oracle",
"database.connection.adapter": "logminer",
"database.history.kafka.topic": "schema_changes",
"database.history.kafka.bootstrap.servers": "localhost:9092",
"database.tablename.case.insensitive": "true",
"snapshot.mode": "initial",
"table.include.list": "users.my_source_table",
"tombstones.on.delete": "true",
"message.key.columns": "users.my_source_table:id",
"sanitize.field.names": "true"
}'
I got this very weird result.
Validation result
{
"name": "io.debezium.connector.oracle.OracleConnector",
"error_count": 1,
"groups": [
"Common",
"Transforms",
"Predicates",
"Error Handling",
"Topic Creation",
"Oracle",
"Connector",
"History Storage",
"Events"
],
"configs": [
{
"definition": {
"name": "name",
"type": "STRING",
"required": true,
"default_value": null,
"importance": "HIGH",
"documentation": "Globally unique name to use for this connector.",
"group": "Common",
"width": "MEDIUM",
"display_name": "Connector name",
"dependents": [],
"order": 1
},
"value": {
"name": "name",
"value": "source-local-debezium-revision-1",
"recommended_values": [],
"errors": [],
"visible": true
}
},
{
"definition": {
"name": "connector.class",
"type": "STRING",
"required": true,
"default_value": null,
"importance": "HIGH",
"documentation": "Name or alias of the class for this connector. Must be a subclass of org.apache.kafka.connect.connector.Connector. If the connector is org.apache.kafka.connect.file.FileStreamSinkConnector, you can either specify this full name, or use \"FileStreamSink\" or \"FileStreamSinkConnector\" to make the configuration a bit shorter",
"group": "Common",
"width": "LONG",
"display_name": "Connector class",
"dependents": [],
"order": 2
},
"value": {
"name": "connector.class",
"value": "io.debezium.connector.oracle.OracleConnector",
"recommended_values": [],
"errors": [],
"visible": true
}
},
{
"definition": {
"name": "tasks.max",
"type": "INT",
"required": false,
"default_value": "1",
"importance": "HIGH",
"documentation": "Maximum number of tasks to use for this connector.",
"group": "Common",
"width": "SHORT",
"display_name": "Tasks max",
"dependents": [],
"order": 3
},
"value": {
"name": "tasks.max",
"value": "1",
"recommended_values": [],
"errors": [],
"visible": true
}
},
{
"definition": {
"name": "key.converter",
"type": "CLASS",
"required": false,
"default_value": null,
"importance": "LOW",
"documentation": "Converter class used to convert between Kafka Connect format and the serialized form that is written to Kafka. This controls the format of the keys in messages written to or read from Kafka, and since this is independent of connectors it allows any connector to work with any serialization format. Examples of common formats include JSON and Avro.",
"group": "Common",
"width": "SHORT",
"display_name": "Key converter class",
"dependents": [],
"order": 4
},
"value": {
"name": "key.converter",
"value": "io.confluent.connect.avro.AvroConverter",
"recommended_values": [],
"errors": [],
"visible": true
}
},
{
"definition": {
"name": "value.converter",
"type": "CLASS",
"required": false,
"default_value": null,
"importance": "LOW",
"documentation": "Converter class used to convert between Kafka Connect format and the serialized form that is written to Kafka. This controls the format of the values in messages written to or read from Kafka, and since this is independent of connectors it allows any connector to work with any serialization format. Examples of common formats include JSON and Avro.",
"group": "Common",
"width": "SHORT",
"display_name": "Value converter class",
"dependents": [],
"order": 5
},
"value": {
"name": "value.converter",
"value": "io.confluent.connect.avro.AvroConverter",
"recommended_values": [],
"errors": [],
"visible": true
}
},
{
"definition": {
"name": "header.converter",
"type": "CLASS",
"required": false,
"default_value": null,
"importance": "LOW",
"documentation": "HeaderConverter class used to convert between Kafka Connect format and the serialized form that is written to Kafka. This controls the format of the header values in messages written to or read from Kafka, and since this is independent of connectors it allows any connector to work with any serialization format. Examples of common formats include JSON and Avro. By default, the SimpleHeaderConverter is used to serialize header values to strings and deserialize them by inferring the schemas.",
"group": "Common",
"width": "SHORT",
"display_name": "Header converter class",
"dependents": [],
"order": 6
},
"value": {
"name": "header.converter",
"value": null,
"recommended_values": [],
"errors": [],
"visible": true
}
},
{
"definition": {
"name": "transforms",
"type": "LIST",
"required": false,
"default_value": "",
"importance": "LOW",
"documentation": "Aliases for the transformations to be applied to records.",
"group": "Transforms",
"width": "LONG",
"display_name": "Transforms",
"dependents": [],
"order": 7
},
"value": {
"name": "transforms",
"value": "",
"recommended_values": [],
"errors": [],
"visible": true
}
},
... some deleted rows due to max message size ...
{
"definition": {
"name": "database.hostname",
"type": "STRING",
"required": false,
"default_value": null,
"importance": "HIGH",
"documentation": "Resolvable hostname or IP address of the database server.",
"group": "Oracle",
"width": "MEDIUM",
"display_name": "Hostname",
"dependents": [],
"order": 1
},
"value": {
"name": "database.hostname",
"value": null,
"recommended_values": [],
"errors": [
"Unable to connect: Failed to resolve Oracle database version"
],
"visible": true
}
},
{
"definition": {
"name": "database.port",
"type": "INT",
"required": false,
"default_value": "1528",
"importance": "HIGH",
"documentation": "Port of the database server.",
"group": "Oracle",
"width": "SHORT",
"display_name": "Port",
"dependents": [],
"order": 2
},
"value": {
"name": "database.port",
"value": null,
"recommended_values": [],
"errors": [],
"visible": true
}
},
{
"definition": {
"name": "database.user",
"type": "STRING",
"required": false,
"default_value": null,
"importance": "HIGH",
"documentation": "Name of the database user to be used when connecting to the database.",
"group": "Oracle",
"width": "SHORT",
"display_name": "User",
"dependents": [],
"order": 3
},
"value": {
"name": "database.user",
"value": null,
"recommended_values": [],
"errors": [],
"visible": true
}
},
... some deleted rows due to max message size ...
{
"definition": {
"name": "table.whitelist",
"type": "LIST",
"required": false,
"default_value": null,
"importance": "LOW",
"documentation": "The tables for which changes are to be captured (deprecated, use \"table.include.list\" instead)",
"group": "Events",
"width": "LONG",
"display_name": "Deprecated: Include Tables",
"dependents": [],
"order": 12
},
"value": {
"name": "table.whitelist",
"value": null,
"recommended_values": [],
"errors": [],
"visible": false
}
},
{
"definition": {
"name": "table.include.list",
"type": "LIST",
"required": false,
"default_value": null,
"importance": "HIGH",
"documentation": "The tables for which changes are to be captured",
"group": "Events",
"width": "LONG",
"display_name": "Include Tables",
"dependents": [],
"order": 13
},
"value": {
"name": "table.include.list",
"value": null,
"recommended_values": [],
"errors": [],
"visible": true
}
},
{
"definition": {
"name": "table.blacklist",
"type": "LIST",
"required": false,
"default_value": null,
"importance": "LOW",
"documentation": "A comma-separated list of regular expressions that match the fully-qualified names of tables to be excluded from monitoring (deprecated, use \"table.exclude.list\" instead)",
"group": "Events",
"width": "LONG",
"display_name": "Deprecated: Exclude Tables",
"dependents": [],
"order": 14
},
"value": {
"name": "table.blacklist",
"value": null,
"recommended_values": [],
"errors": [],
"visible": false
}
},
... some deleted rows due to max message size ...
]
}
The most interesting part in it is the part, which shows what causes the issue:
{
"definition": {
"name": "database.hostname",
"type": "STRING",
"required": false,
"default_value": null,
"importance": "HIGH",
"documentation": "Resolvable hostname or IP address of the database server.",
"group": "Oracle",
"width": "MEDIUM",
"display_name": "Hostname",
"dependents": [],
"order": 1
},
"value": {
"name": "database.hostname",
"value": null,
"recommended_values": [],
"errors": [
"Unable to connect: Failed to resolve Oracle database version"
],
"visible": true
}
}
And as you see, apparently, the value of the database.hostname
configuration option is null
, when it was perfectly fine provided in the config: "database.hostname":"localhost"
.
If you look deeper, you will also see that some other configuration options did not get their values as well (i.e. database.port
, table.include.list
, …), whereas some different ones were filled correctly (i.e. connector.class
or key.converter
).
I already tried different ordering of the configuration options (put ones which are now not filled in the beginning of the config file), tested this with Debezium 1.5.0 Final, 1.5.1 Final and 1.6.0 Beta. The issue persists in all these versions.
Does anybody know what could be the root cause for the issue? Or am I missing something?
Or should I address my question to the Debezium team instead?
Thank you!