JDBC Source Connector Memory Usage

Hello,

I’m streaming query-based cdc data from a MySQL table with JDBC Source Connector. I want to scale to 20 tables but the memory usage is really concerning right now.

My target Kafka topics throughput rate is pretty normal with average value of 25.39KB. At the same time my worker which has only one connector consumes 6GB memory.

Is this an expected consumption for JDBC, if its not is it because of the query results or something about my connector config?

Any suggestion would be awesome.
Thanks in advance
Batu

Hi @batu

how big is the table?
could you share the connector config?

best,
michael

It is often hard to identify the root cause of memory consumption, but you can use tools to minimize the complexity. One of them is the JDK Flight Recorder. As Kafka Connect is a JVM-based system, you can start your worker instances with flight recording enabled — which will dump a recording containing a way for you to analyze which Java class, module, package, method, and even set of variables are consuming more memory.

It will help to hint at what is causing the issue and what can be done.

@riferrei

Hi @mmuehlbeyer

Thanks for reply,
It’s quite big with 100M records and lots of transactions but i’m facing the same issue with each table i try.

Here is my worker and connector configs :point_down:

Worker Config


CONNECT_LOG4J_APPENDER_STDOUT_LAYOUT_CONVERSIONPATTERN: "[%d] %p %X{connector.context}%m (%c:%L)%n"
CONNECT_BOOTSTRAP_SERVERS: ${BOOTSTRAP_SERVERS}
CONNECT_REST_PORT: 8081
CONNECT_REST_ADVERTISED_HOST_NAME: "kafka-connect-mysql-jdbc-product-00"
CONNECT_GROUP_ID: kafka-connect-mysql-jdbc-prod
CONNECT_CONFIG_STORAGE_TOPIC: _kafka-connect-mysql-jdbc-product-configs
CONNECT_OFFSET_STORAGE_TOPIC: _kafka-connect-mysql-jdbc-product-offsets
CONNECT_STATUS_STORAGE_TOPIC: _kafka-connect-mysql-jdbc-product-status

CONNECT_KEY_CONVERTER: io.confluent.connect.avro.AvroConverter
CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL: ${SCHEMA_REGISTRY_URL}
CONNECT_KEY_CONVERTER_BASIC_AUTH_CREDENTIALS_SOURCE: "USER_INFO"
CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_BASIC_AUTH_USER_INFO: ${SCHEMA_REGISTRY_BASIC_AUTH_USER_INFO}
CONNECT_VALUE_CONVERTER: io.confluent.connect.avro.AvroConverter
CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: ${SCHEMA_REGISTRY_URL}
CONNECT_VALUE_CONVERTER_BASIC_AUTH_CREDENTIALS_SOURCE: "USER_INFO"
CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_BASIC_AUTH_USER_INFO: ${SCHEMA_REGISTRY_BASIC_AUTH_USER_INFO}
CONNECT_INTERNAL_KEY_CONVERTER: 'org.apache.kafka.connect.json.JsonConverter'
CONNECT_INTERNAL_VALUE_CONVERTER: 'org.apache.kafka.connect.json.JsonConverter'

CONNECT_LOG4J_ROOT_LOGLEVEL: 'INFO'
CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: "3"
CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: "3"
CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: "3"
CONNECT_PLUGIN_PATH: /usr/share/java,/usr/share/confluent-hub-components,/data/connect-jars
#
CONNECT_REQUEST_TIMEOUT_MS: "20000"
CONNECT_SASL_MECHANISM: "PLAIN"
CONNECT_RETRY_BACKOFF_MS: "500"
CONNECT_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM: "https"
CONNECT_SECURITY_PROTOCOL: "SASL_SSL"
CONNECT_SASL_JAAS_CONFIG: ${JAAS_CONFIG}
CONNECT_CONSUMER_SASL_JAAS_CONFIG: ${JAAS_CONFIG}
CONNECT_PRODUCER_SASL_JAAS_CONFIG: ${JAAS_CONFIG}
#
CONNECT_CONSUMER_SECURITY_PROTOCOL: "SASL_SSL"
CONNECT_CONSUMER_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM: "https"
CONNECT_CONSUMER_SASL_MECHANISM: "PLAIN"
CONNECT_CONSUMER_REQUEST_TIMEOUT_MS: "20000"
CONNECT_CONSUMER_RETRY_BACKOFF_MS: "500"
#
CONNECT_PRODUCER_SECURITY_PROTOCOL: SASL_SSL
CONNECT_PRODUCER_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM: "https"
CONNECT_PRODUCER_SASL_MECHANISM: "PLAIN"
CONNECT_PRODUCER_REQUEST_TIMEOUT_MS: "20000"
CONNECT_PRODUCER_RETRY_BACKOFF_MS: "500"

#
CONNECT_OFFSET_FLUSH_TIMEOUT_MS: "60000"
CONNECT_OFFSET_FLUSH_INTERVAL_MS: "10000"
KAFKA_PRODUCER_MAX_REQUEST_SIZE: "10485760"
CONNECT_PRODUCER_MAX_REQUEST_SIZE: "10485760"
KAFKA_HEAP_OPTS: "-Xmx10g"

Connector Config

"config": {
      "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
      "timestamp.column.name": "Timest",
      "incrementing.column.name": "ProductId",
      "connection.password": "password",
      "validate.non.null": "false",
      "timestamp.initial": "-1",
      "tasks.max": "1",
      "table.types": "TABLE",
      "table.whitelist": "Product",
      "mode": "timestamp+incrementing",
      "topic.prefix": "JDBC.productDB.",
      "task.class": "io.confluent.connect.jdbc.source.JdbcSourceTask",
      "connection.user": "User",
      "db.timezone": "Turkey",
      "name": "jdbc-product-02",
      "numeric.mapping": "best_fit",
      "connection.url": "jdbc:mysql://host:3306/DB?enabledTLSProtocols=TLSv1.2&verifyServerCertificate=false&useSSL=false&requireSSL=false"
    }

Thanks,
Batu

Hello @riferrei

It seems like a good idea to get to the heart of the matter and find the root problem. But I will use Flight Recorder for the first time, let’s see what I can get :slight_smile:.

Thanks,
Batu

Nah, you will do fine. It is pretty easy to capture the heap dump. The not-so-simple part is analyzing it. :sweat_smile:

The video below, albeit a bit old, provides an example of how to capture and analyze the heap dump:

@riferrei

3 Likes

Somewhat of a guess, but perhaps if you fiddle with the batch.max.rows configuration value to pull fewer records each time will put less load on the worker?

2 Likes

one thing worth to try might be to set the
defaultFetchSize in your connection url, e.g

"connection.url": "jdbc:mysql://host:3306/DB?enabledTLSProtocols=TLSv1.2&verifyServerCertificate=false&useSSL=false&requireSSL=false&defaultFetchSize=1000"

2 Likes

@riferrei thanks for the resources, I connected to my docker container from JDK Mission Control and run a Flight Recorder for 10 minutes.
And yes analyzing is really the hard part :slight_smile:
This byteArrays looks like the root cause however I’m not sure what is it stands for. I can just guess it’s because of the query results.

If there are other metrics I need to look at, I can take a look.

3 Likes

@rmoff another great blogpost came to the rescue again while trying to access my containers JMX Metrics.

I will reduce the batch.max.rows and set defaultFetchSize=1000 as @mmuehlbeyer suggested then check the resource consumption again.

3 Likes

This is really good, @batu :heart_eyes:

By looking at the stack trace displayed for the byte[], you can derive that this is pure payload allocation, meaning that data on-heap is being allocated due to what is coming from the source connector. This is good because it means you do not have a contention somewhere. The volume is just too high for the allocated worker. The payload results from Avro records being deserialized from the source connector and encoded into the internal format the Kafka Connect accepts — called SourceRecord.

The throughput you see in the Kafka Topic cannot be directly mapped as the number of bytes the connector is handling because Kafka has no serialization or deserialization. Everything is just bytes being moved across the wire and written directly in the page cache, which is off-heap. Source connectors are different. They need to materialize the data on-heap to perform data wrangling such as schema management. Therefore, you have to adjust how your Kafka Connect layer keeps up with that volume.

You have three options:

  1. Increase the heap size of that worker: Size your heap to twice the number you see. Yes, you may incur GC pauses, but the collectors in Java are pretty efficient these days.

  2. Divide-and-conquer with more workers: Add more worker nodes into the Kafka Connect cluster. If your target goal is handling ˜6GB of data, I would put at least two instances with 4GB of heap each. Don’t forget to increase the number of tasks to more than one to force the connector to spread the tasks amongst the workers.

  3. Handle way fewer data at a time: This is essentially what @rmoff and @mmuehlbeyer are helping you with. Instead of beefing up your Kafka Connect layer, you configure it to handle fewer data at a time, leaving the JVM more room to allocate data on-heap and clean the garbage. Sometimes, this can be achieved by doing some throttling, which seems to be what the parameter batch.max.rows does.

@riferrei

3 Likes

This topic was automatically closed 7 days after the last reply. New replies are no longer allowed.