Hi Kafkateers!
Anybody extensively using RegexRouter transformation here?
Does anybody know if this is possible to use some regex modifiers for the replacement
string from the transofmation?
Let me describe the use-case in detail.
I have a sink JDBC connector, which takes multiple topics and writes the messages from them to multiple tables.
The connector configuration looks something like this:
{
"name": "sink-sandbox-revision-1",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
...
"dialect.name": "OracleDatabaseDialect",
"topics": "source.organizations,source.departments,source.employees",
"tasks.max": "1",
"auto.create": "false",
"auto.evolve": "false",
"quote.sql.identifiers": "never",
"transforms": "RouteRecords,HoistKey,ValueToJson",
"transforms.RouteRecords.type":"org.apache.kafka.connect.transforms.RegexRouter",
"transforms.RouteRecords.regex":"^source\\.(.+)$",
"transforms.RouteRecords.replacement":"external_$1",
"transforms.HoistKey.type": "org.apache.kafka.connect.transforms.HoistField$Key",
"transforms.HoistKey.field": "RECORD_KEY",
"transforms.ValueToJson.type": "com.github.cedelsb.kafka.connect.smt.Record2JsonStringConverter$Value",
"transforms.ValueToJson.post.processing.to.xml" : "false",
"transforms.ValueToJson.json.string.field.name" : "JSON_PAYLOAD",
"insert.mode": "upsert",
"pk.mode": "record_key",
"pk.fields": "RECORD_KEY",
"delete.enabled": "true",
"errors.tolerance": "none"
}
}
My target database is Oracle, and my tables are created as:
CREATE TABLE external_organizations (
record_key VARCHAR2(255) NOT NULL,
json_payload CLOB
PRIMARY KEY (RECORD_KEY)
);
CREATE TABLE external_departments (
record_key VARCHAR2(255) NOT NULL,
json_payload CLOB
PRIMARY KEY (RECORD_KEY)
);
CREATE TABLE external_employees (
record_key VARCHAR2(255) NOT NULL,
json_payload CLOB
PRIMARY KEY (RECORD_KEY)
);
But when I deploy my connector, it fails, due to the JDBC Sink connector bug. It seems to be an Oracle-related problem.
I found out that I can workaround this, by also having "external_organizations"
, "external_departments"
and "external_employees"
tables in my db (with double quotes) - the error isnโt thrown then, however, the connector still uses the old three tables in such a case (the ones without quotes). All this looks really weird and I cannot rely on such things in production of course.
The other thing is, that when I use table names in UPPERCASE in the connector configuration, the pipeline works, and I thought what if I convert the names to UPPERCASE in RegexRoute
transformation? should be possible!
So, I tried to use "transforms.RouteRecords.replacement":"EXTERNAL_\U$1\E"
instead of "transforms.RouteRecords.replacement":"external_$1"
, but unfortunately this didnโt work for me (though it fits to the regex substitution string format, and usually works).
Am I missing something? Or is there probably another way of how to convert target table names to UPPERCASE (please remember that I have multiple target tables and cannot simply use table.name.format
config parameter for this)?