OutOfMemoryError using Source Connector with large table

Hi! Not sure whether this belongs in the ksqlDB or the Kafka Connect forum, hope this is the right place.

I’m trying to import data from MariaDB using the JDBC Source Connector. While this works fine for one table (with ~4.5M rows), a larger table (~25M rows, 15 fields) results in the following error:

kafka_1            | [2021-05-28 20:40:08,499] INFO [GroupCoordinator 1]: Member connect-1-3034af86-d293-4529-94f0-2dd47dfb137d in group ksql-connect-cluster has failed, removing it from the group (kafka.coordinator.group.GroupCoordinator)
kafka_1            | [2021-05-28 20:40:08,499] INFO [GroupCoordinator 1]: Preparing to rebalance group ksql-connect-cluster in state PreparingRebalance with old generation 3 (__consumer_offsets-25) (reason: removing member connect-1-3034af86-d293-4529-94f0-2dd47dfb137d on heartbeat expiration) (kafka.coordinator.group.GroupCoordinator)
kafka_1            | [2021-05-28 20:40:08,499] INFO [GroupCoordinator 1]: Group ksql-connect-cluster with generation 4 is now empty (__consumer_offsets-25) (kafka.coordinator.group.GroupCoordinator)
ksqldb-server      | Exception in thread "Thread-38" java.lang.OutOfMemoryError: Java heap space
kafka_1            | [2021-05-28 20:41:29,564] INFO [Controller id=1] Processing automatic preferred replica leader election (kafka.controller.KafkaController)
kafka_1            | [2021-05-28 20:41:29,564] TRACE [Controller id=1] Checking need to trigger auto leader balancing (kafka.controller.KafkaController)
kafka_1            | [2021-05-28 20:41:29,566] DEBUG [Controller id=1] Topics not in preferred replica for broker 1 HashMap() (kafka.controller.KafkaController)
kafka_1            | [2021-05-28 20:41:29,567] TRACE [Controller id=1] Leader imbalance ratio for broker 1 is 0.0 (kafka.controller.KafkaController)
ksqldb-server      | Exception in thread "vertx-blocked-thread-checker" java.lang.OutOfMemoryError: Java heap space
ksqldb-server      |
ksqldb-server      | Exception: java.lang.OutOfMemoryError thrown from the UncaughtExceptionHandler in thread "kafka-producer-network-thread | producer-3"
ksqldb-server      | [2021-05-28 20:42:07,222] ERROR Uncaught exception in thread 'kafka-producer-network-thread | producer-3': (org.apache.kafka.common.utils.KafkaThread:49)
ksqldb-server      | [2021-05-28 20:42:09,631] WARN Unexpected exception in the selector loop. (io.netty.channel.nio.NioEventLoop:563)
ksqldb-server      | java.lang.OutOfMemoryError: Java heap space
ksqldb-server      | [2021-05-28 20:42:09,631] INFO Stopping JDBC source task (io.confluent.connect.jdbc.source.JdbcSourceTask:317)
ksqldb-server      | [2021-05-28 20:45:22,991] INFO Server shutting down (io.confluent.ksql.rest.server.KsqlServerMain:96)
ksqldb-server      | [2021-05-28 20:45:24,514] INFO ksqlDB shutdown called (io.confluent.ksql.rest.server.KsqlRestApplication:498)

I’m using docker compose (based on ksql/docker-compose.yml at cb513066955fe047773566c7806e85da93c8806b · confluentinc/ksql · GitHub). My connector config looks like this (I’ve tried various values for poll.interval.ms and batch.max.rows):

  'connector.class' = 'io.confluent.connect.jdbc.JdbcSourceConnector',
  'connection.url' = 'jdbc:mariadb://<some-other-docker-host>:3306/<database>',
  'connection.user' = '<user>',
  'connection.password' = '<password>',
  'mode' = 'timestamp',
  'timestamp.column.name' = 'updated_at',
  'validate.non.null' = 'false',
  'poll.interval.ms' = '500',
  'topic.prefix' = '<connector>_',
  'db.timezone' = 'Europe/Amsterdam',
  'table.whitelist' = '<small_table>,<large_table>'

Any ideas on how to get the data into Kafka? I have very little experience with both Kafka and ksqlDB, would trying Kafka Connect directly (outside of ksqlDB) be of any use?

Thanks in advance!

I’ve tried implementing the recommendations from https://www.confluent.io/blog/bounding-ksqldb-memory-usage/. This required some workarounds:

  1. The required jar isn’t part of the docker image as described in issue 6838. Issue 6644 mentions where to download the required jar file from.
  2. Adding a custom jar to KSQL_CLASSPATH currently doesn’t seem possible, see issue 6831. When taking a look at the paths which are on the KSQL_CLASSPATH by default (see ksql/ksql-run-class at cb513066955fe047773566c7806e85da93c8806b · confluentinc/ksql · GitHub) the dir /share/java/ksqldb-examples didn’t sound particularly important, so I mounted it to a docker volume containing the jar downloaded in step 1.

This allowed me to use the following configuration:

      KSQL_KSQL_STREAMS_ROCKSDB_CONFIG_SETTER: io.confluent.ksql.rocksdb.KsqlBoundedMemoryRocksDBConfigSetter

This didn’t resolve the problem unfortunately.

Fixed it by limiting the connector queries using ‘query.suffix’:

  'query.suffix' = ' LIMIT 1000000',

The property ‘batch.max.rows’ seems to have no effect, at least not with ‘mode’=‘timestamp’.