RegexRouter transformation and regular expression in substitution string

Hi Kafkateers!

Anybody extensively using RegexRouter transformation here?

Does anybody know if this is possible to use some regex modifiers for the replacement string from the transofmation?

Let me describe the use-case in detail.

I have a sink JDBC connector, which takes multiple topics and writes the messages from them to multiple tables.

The connector configuration looks something like this:

{
  "name": "sink-sandbox-revision-1",
  "config": {
    "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
    ...
    "dialect.name": "OracleDatabaseDialect",
    "topics": "source.organizations,source.departments,source.employees",
    "tasks.max": "1",
    "auto.create": "false",
    "auto.evolve": "false",
    "quote.sql.identifiers": "never",
    "transforms": "RouteRecords,HoistKey,ValueToJson",
    "transforms.RouteRecords.type":"org.apache.kafka.connect.transforms.RegexRouter",
    "transforms.RouteRecords.regex":"^source\\.(.+)$",
    "transforms.RouteRecords.replacement":"external_$1",
    "transforms.HoistKey.type": "org.apache.kafka.connect.transforms.HoistField$Key",
    "transforms.HoistKey.field": "RECORD_KEY",
    "transforms.ValueToJson.type": "com.github.cedelsb.kafka.connect.smt.Record2JsonStringConverter$Value",
    "transforms.ValueToJson.post.processing.to.xml" : "false",
    "transforms.ValueToJson.json.string.field.name" : "JSON_PAYLOAD",
    "insert.mode": "upsert",
    "pk.mode": "record_key",
    "pk.fields": "RECORD_KEY",
    "delete.enabled": "true",
    "errors.tolerance": "none"
  }
}

My target database is Oracle, and my tables are created as:

CREATE TABLE external_organizations (
  record_key VARCHAR2(255) NOT NULL,
  json_payload CLOB
  PRIMARY KEY (RECORD_KEY)
);

CREATE TABLE external_departments (
  record_key VARCHAR2(255) NOT NULL,
  json_payload CLOB
  PRIMARY KEY (RECORD_KEY)
);

CREATE TABLE external_employees (
  record_key VARCHAR2(255) NOT NULL,
  json_payload CLOB
  PRIMARY KEY (RECORD_KEY)
);

But when I deploy my connector, it fails, due to the JDBC Sink connector bug. It seems to be an Oracle-related problem.

I found out that I can workaround this, by also having "external_organizations", "external_departments" and "external_employees" tables in my db (with double quotes) - the error isn’t thrown then, however, the connector still uses the old three tables in such a case (the ones without quotes). All this looks really weird and I cannot rely on such things in production of course.

The other thing is, that when I use table names in UPPERCASE in the connector configuration, the pipeline works, and I thought what if I convert the names to UPPERCASE in RegexRoute transformation? should be possible!

So, I tried to use "transforms.RouteRecords.replacement":"EXTERNAL_\U$1\E" instead of "transforms.RouteRecords.replacement":"external_$1", but unfortunately this didn’t work for me (though it fits to the regex substitution string format, and usually works).

Am I missing something? Or is there probably another way of how to convert target table names to UPPERCASE (please remember that I have multiple target tables and cannot simply use table.name.format config parameter for this)?

I googled a lot before posting my question, and it looks like others also have similar issues.

Check out this one: ChangeTopicCase — Kafka Connect Connectors 1.0 documentation

1 Like

This topic was automatically closed 30 days after the last reply. New replies are no longer allowed.