Redis Sink Connector - Records missing

I am using a self managed Redis Sink connector alsong with locally hosted Kafka Connect to copy data from a Kafka topic (hosted in Confluent Cloud) into local redis (hosted using docker image ) that is running on my local machine.

  1. The value in the Kafka topic is Avro formatted using schema registry (all in Confluent Cloud)
  2. The key is type long but I am using SMT and casting this to “string” in the redis-sink connector before copying it into redis
  3. The Kafka connect worker is running locally and connects to the Kafka topic in Confluent Cloud
  4. The redis sink connector is self managed running on my local machine

So when I publish like 100 messages to test the workings

The consumer lag for the connector - I see that there is no consumer lag and all offset is also caught up for the redis connector. This information is from the Confluent UI

The redis image also says

redis      | 1:M 02 Apr 2022 23:32:49.047 * 100 changes in 300 seconds. Saving...
redis      | 1:M 02 Apr 2022 23:32:49.048 * Background saving started by pid 51
redis      | 51:C 02 Apr 2022 23:32:49.049 * DB saved on disk
redis      | 51:C 02 Apr 2022 23:32:49.050 * RDB: 0 MB of memory used by copy-on-write
redis      | 1:M 02 Apr 2022 23:32:49.148 * Background saving terminated with success

The redis-sink connector setup is as follows

{
  "name": "redis-sink-connector",
  "config": {
    "connector.class"                                     : "com.github.jcustenborder.kafka.connect.redis.RedisSinkConnector",
    "redis.hosts"                                         : "redis:6379",
    "tasks.max"                                           : 1,
    "topics"                                              : "[redacted]",
    "key.converter.schemas.enable"                        : "false",
    "key.converter"                                       : "org.apache.kafka.connect.storage.StringConverter",
    "key.converter.schema.registry.url"                   : "https://[redacted].confluent.cloud",
    "key.converter.schema.registry.basic.auth.user.info"  : "[redacted]:[redacted]",
    "key.converter.basic.auth.credentials.source"         : "USER_INFO",
    "value.converter.schemas.enable"                      : "true",
    "value.converter"                                     : "org.apache.kafka.connect.converters.ByteArrayConverter",
    "value.converter.schema.registry.url"                 : "https://[redacted].confluent.cloud",
    "value.converter.schema.registry.basic.auth.user.info": "[redacted]:[redacted]",
    "value.converter.basic.auth.credentials.source"       : "USER_INFO",
    "transforms"                                          : "Cast",
    "transforms.Cast.type"                                : "org.apache.kafka.connect.transforms.Cast$Key",
    "transforms.Cast.spec"                                : "string"
  }
}

Also with the below, I see the redis activity

docker exec -it redis redis-cli MONITOR                                                                                                                               
OK
1648943909.726046 [1 172.24.0.3:47842] "MSET" "\x00\x00\x00\x00\x00\x00\x005" "\x00\x00\x01\x88;j:\x92\xcb\x7fHGL@\xea\x044\x116\x10[@\x00\xb6\x9f\xf7\xa4\xbd\x9a\x8a\
........................................
"MSET" "__kafka.offset.driver-locale.0" "{\"topic\":\"driver-locale\",\"partition\":0,\"offset\":321}"

But when I check for the keys using redis-cli, I see no keys present. Where would the messages be then?

127.0.0.1:6379> keys *
(empty list or set)

My question is - where are the messages gone when all sign indicates that the redis sink connector has copied them into redis.

Much appreciated.

This got resolved.

I had to change the db - since the keys were being stored in db"1" and the default was db"0".

127.0.0.1:6379> SELECT 1

127.0.0.1:6379[1]> 127.0.0.1:6379[1]> MGET 45
------------------
1)\x00\x00\x01\x88;Z\xfb\\m\xc5\xfe\xb2\x02@i\x00o\x81\x04\x91X@\x00\xb8\xda\x82\xcd\xae\x8c\x8a\xda\x11"
1 Like

This topic was automatically closed 7 days after the last reply. New replies are no longer allowed.