JDBC Source connector loses data in "incrementing" and "timestamp+incrementing" modes

Hello Kafkateers,

Noticed an issue with JDBC Source connectors and long transactions, which affect all operating modes including “incrementing” and “timestamp+incrementing”, which are claimed to be stable.

There is a tracked outbox table Table1 as following:

CREATE TABLE Table1 (
  i INTEGER NOT NULL,
  t TIMESTAMP NOT NULL,
  v VARCHAR2(2000)
);

Let’s imaging there are 2 sessions.

Session 1 inserts a row to the table, but doesn’t commit the transaction yet:

INSERT INTO Table1 VALUES (1, SYSTIMESTAMP, 'row1');

Session 2 inserts another row and commits it immediately:

INSERT INTO Table1 VALUES (2, SYSTIMESTAMP, 'row2');
COMMIT;

The connector sees “row2” and syncs it to our kafka topic.

Now Session 1 commits its transaction:

COMMIT;

But “row1” is not seen by the connector and never synced to Kafka, despite being inserted later to the table, because it is behind connector’s stored offset already.

The issue may seem artificial, but in fact it’s very real. When you have a concurrent environment and transactions can be long enough, it happens often enough with columns, populated with a sequence value, when an earlier transaction finishes later, than the other one, which started later.

Are there any “good” workaround for the issue?
I found only one using materialized views, and I don’t like it, probably other ideas? :slight_smile:

1 Like

Hi there. As a workaround, you can introduce a delay, using timestamp.delay.interval.ms as documented here: https://docs.confluent.io/kafka-connect-jdbc/current/source-connector/source_config_options.html#database

It only works in timestamp.* modes and does not guarantee that you will not lose data if a TX lasts longer than the delay, but it will catch a number of late arrivals.

1 Like

Hi @ksilin!

Hmm, thank you for the idea, it is something, will look into it.

In Oracle (which is my data source) there is the ORA_ROWSCN pseudocolumn, which is populated only after commit, and is monotonously increasing. It also can be configured to be stored row-level (by default block-level), so it is a very good candidate to be used for tracking changed rows in the source table.

But, the connector doesn’t see it in the table, and if exposed to a view, then the connector doesn’t want to use it as incrementing column due to the fact its nullable:

org.apache.kafka.connect.errors.ConnectException: Cannot make incremental queries using incrementing column ORA_ROWSCN on KAFKA_SANDBOX.V_OUTBOX_TEST because this column is nullable.

It also doesn’t seem to be possible to be exposed to a materialized view with FAST REFRESH ON COMMIT

Is there probably a way to make the connector use an incrementing column despite it being nullable?

ah, yeah, that’s the default setting. Using nullable columns is prohibited by default. You can try setting validate.non.null=false https://docs.confluent.io/kafka-connect-jdbc/current/source-connector/source_config_options.html#mode. However, I am not sure about the actual behavior of the connector, should some rows have NULLs in that column.

Oh gosh, I completely overlooked this config, will give it a try definitely.

The thing is that the column will not be null in fact - the ora_rowscn value may be empty only for unfinished transactions. On commit the value is populated with the current sequence change number. If this field is empty for committed data, then it is a bug, an Oracle Database bug :slight_smile:

This means, for all data, visible by connector, the field will always be there, so should be no problems at all.

1 Like

sounds good. Please, LMK once you have it running, just to confirm that it works as expected.

This topic was automatically closed 30 days after the last reply. New replies are no longer allowed.