Kafka Connect Elasticsearch sink stops sending records

Hey guys another quick question that you can help me out with.
Everything seems to be running fine for the most part. If i put 1 million records into a kafka producer the ES connector will start picking up all the records and put them in ES.

For some reason the workers will just stop sending data after 200,000-300,000. I don’t see any errors in the logs. If I restart the workers they pick up again. Any idea what could cause this?

What’s the status of the connector and tasks within it? You can check using the Kafka Connect REST API:

:movie_camera: Exploring the Kafka Connect REST API - YouTube
:writing_hand: Connect REST Interface | Confluent Documentation

So I am using the curl commands. When I use curl 'localhost:8083/connectors/elastic-search-topic12/tasks' | jq I get below

[
  {
    "id": {
      "connector": "elastic-search-topic12",
      "task": 0
    },
    "config": {
      "connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
      "type.name": "_doc",
      "topics": "topic1",
      "tasks.max": "2",
      "transforms.routeTS.type": "org.apache.kafka.connect.transforms.TimestampRouter",
      "transforms": "routeTS",
      "transforms.routeTS.topic.format": "${topic}-${timestamp}",
      "key.ignore": "true",
      "schema.ignore": "true",
      "transforms.routeTS.timestamp.format": "YYYY-MM-dd",
      "task.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkTask",
      "value.converter.schemas.enable": "false",
      "name": "elastic-search-topic12",
      "connection.url": "https://",
      "value.converter": "org.apache.kafka.connect.json.JsonConverter",
      "key.converter": "org.apache.kafka.connect.storage.StringConverter"
    }
  },
  {
    "id": {
      "connector": "elastic-search-topic12",
      "task": 1
    },
    "config": {
      "connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
      "type.name": "_doc",
      "topics": "topic1",
      "tasks.max": "2",
      "transforms.routeTS.type": "org.apache.kafka.connect.transforms.TimestampRouter",
      "transforms": "routeTS",
      "transforms.routeTS.topic.format": "${topic}-${timestamp}",
      "key.ignore": "true",
      "schema.ignore": "true",
      "transforms.routeTS.timestamp.format": "YYYY-MM-dd",
      "task.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkTask",
      "value.converter.schemas.enable": "false",
      "name": "elastic-search-topic12",
      "connection.url": "https://",
      "value.converter": "org.apache.kafka.connect.json.JsonConverter",
      "key.converter": "org.apache.kafka.connect.storage.StringConverter"
    }
  }
]

When I use curl 'localhost:8083/connectors/elastic-search-topic12/status' | jq
I get this below.

 {
   "error_code": 404,
   "message": "No status found for connector elastic-search-topic12"
 }

Any idea why I don’t see a status. Data does go through when I send it though. Am i missing something?

How many distributed workers do you have running?

So currently I have 2 running. Each one is running on its own linux machine. I have this added in both worker configs.

rest.advertised.host.name=172.31..
rest.advertised.port=8083

Each broadcasting its own IP. It seems to work fine from what I can see when I send messages through kafka but something could be wrong with my set up.

Thanks again for helping out.

Can you share the two connect worker properties configuration files?

Sure here is the first config file.

# A list of host/port pairs to use for establishing the initial connection to the Kafka cluster.
#bootstrap.servers=b-1.test-cluster.uxyzje.c2.kafka.us-east-1.amazonaws.com:9092,b-2.test-cluster.uxyzje.c2.kafka.us-east-1.amazonaws.com:9092
bootstrap.servers=b-1.****.kafka.us-east-1.amazonaws.com:9092,b-2.****.kafka.us-east-1.amazonaws.com:9092,b-3.****.kafka.us-east-1.amazonaws.com:9092,b-4.****.c8.kafka.us-east-1.amazonaws.com:9092
# unique name for the cluster, used in forming the Connect cluster group. Note that this must not conflict with consumer group IDs
group.id=connect-cluster2

# The converters specify the format of data in Kafka and how to translate it into Connect data. Every Connect user will
# need to configure these based on the format they want their data in when loaded from or stored into Kafka
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
# Converter-specific settings can be passed in by prefixing the Converter's setting with the converter we want to apply
# it to
key.converter.schemas.enable=true	
value.converter.schemas.enable=true

# Topic to use for storing offsets. This topic should have many partitions and be replicated and compacted.
# Kafka Connect will attempt to create the topic automatically when needed, but you can always manually create
# the topic before starting Kafka Connect if a specific topic configuration is needed.
# Most users will want to use the built-in default replication factor of 3 or in some cases even specify a larger value.
# Since this means there must be at least as many brokers as the maximum replication factor used, we'd like to be able
# to run this example on a single-broker cluster and so here we instead set the replication factor to 1.
offset.storage.topic=connect-offsets
offset.storage.replication.factor=4
#offset.storage.partitions=25

# Topic to use for storing connector and task configurations; note that this should be a single partition, highly replicated,
# and compacted topic. Kafka Connect will attempt to create the topic automatically when needed, but you can always manually create
# the topic before starting Kafka Connect if a specific topic configuration is needed.
# Most users will want to use the built-in default replication factor of 3 or in some cases even specify a larger value.
# Since this means there must be at least as many brokers as the maximum replication factor used, we'd like to be able
# to run this example on a single-broker cluster and so here we instead set the replication factor to 1.
config.storage.topic=connect-configs
config.storage.replication.factor=4

# Topic to use for storing statuses. This topic can have multiple partitions and should be replicated and compacted.
# Kafka Connect will attempt to create the topic automatically when needed, but you can always manually create
# the topic before starting Kafka Connect if a specific topic configuration is needed.
# Most users will want to use the built-in default replication factor of 3 or in some cases even specify a larger value.
# Since this means there must be at least as many brokers as the maximum replication factor used, we'd like to be able
# to run this example on a single-broker cluster and so here we instead set the replication factor to 1.
status.storage.topic=connect-status
status.storage.replication.factor=4
#status.storage.partitions=5

# Flush much faster than normal, which is useful for testing/debugging
offset.flush.interval.ms=10000

# These are provided to inform the user about the presence of the REST host and port configs 
# Hostname & Port for the REST API to listen on. If this is set, it will bind to the interface used to listen to requests.
rest.host.name=
rest.port=8083

consumer.max.poll.interval.ms=3000000

# The Hostname & Port that will be given out to other workers to connect to i.e. URLs that are routable from other servers.
rest.advertised.host.name=172.31.133.111
rest.advertised.port=8083

# Set to a list of filesystem paths separated by commas (,) to enable class loading isolation for plugins
# (connectors, converters, transformations). The list should consist of top level directories that include 
# any combination of: 
# a) directories immediately containing jars with plugins and their dependencies
# b) uber-jars with plugins and their dependencies
# c) directories immediately containing the package directory structure of classes of plugins and their dependencies
# Examples: 
# plugin.path=/usr/local/share/java,/usr/local/share/kafka/plugins,/opt/connectors,
plugin.path=/usr/share/java,/home/ubuntu/confluent-6.0.0/share/confluent-hub-components

Here is the second config file.

# A list of host/port pairs to use for establishing the initial connection to the Kafka cluster.
#bootstrap.servers=b-1.test-cluster.uxyzje.c2.kafka.us-east-1.amazonaws.com:9092,b-2.test-cluster.uxyzje.c2.kafka.us-east-1.amazonaws.com:9092
bootstrap.servers=b-1.****.kafka.us-east-1.amazonaws.com:9092,b-2.****.kafka.us-east-1.amazonaws.com:9092,b-3.****.kafka.us-east-1.amazonaws.com:9092,b-4.****.c8.kafka.us-east-1.amazonaws.com:9092
# unique name for the cluster, used in forming the Connect cluster group. Note that this must not conflict with consumer group IDs
group.id=connect-cluster2

# The converters specify the format of data in Kafka and how to translate it into Connect data. Every Connect user will
# need to configure these based on the format they want their data in when loaded from or stored into Kafka
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
# Converter-specific settings can be passed in by prefixing the Converter's setting with the converter we want to apply
# it to
key.converter.schemas.enable=true	
value.converter.schemas.enable=true

# Topic to use for storing offsets. This topic should have many partitions and be replicated and compacted.
# Kafka Connect will attempt to create the topic automatically when needed, but you can always manually create
# the topic before starting Kafka Connect if a specific topic configuration is needed.
# Most users will want to use the built-in default replication factor of 3 or in some cases even specify a larger value.
# Since this means there must be at least as many brokers as the maximum replication factor used, we'd like to be able
# to run this example on a single-broker cluster and so here we instead set the replication factor to 1.
offset.storage.topic=connect-offsets
offset.storage.replication.factor=4
#offset.storage.partitions=25

# Topic to use for storing connector and task configurations; note that this should be a single partition, highly replicated,
# and compacted topic. Kafka Connect will attempt to create the topic automatically when needed, but you can always manually create
# the topic before starting Kafka Connect if a specific topic configuration is needed.
# Most users will want to use the built-in default replication factor of 3 or in some cases even specify a larger value.
# Since this means there must be at least as many brokers as the maximum replication factor used, we'd like to be able
# to run this example on a single-broker cluster and so here we instead set the replication factor to 1.
config.storage.topic=connect-configs
config.storage.replication.factor=4

# Topic to use for storing statuses. This topic can have multiple partitions and should be replicated and compacted.
# Kafka Connect will attempt to create the topic automatically when needed, but you can always manually create
# the topic before starting Kafka Connect if a specific topic configuration is needed.
# Most users will want to use the built-in default replication factor of 3 or in some cases even specify a larger value.
# Since this means there must be at least as many brokers as the maximum replication factor used, we'd like to be able
# to run this example on a single-broker cluster and so here we instead set the replication factor to 1.
status.storage.topic=connect-status
status.storage.replication.factor=4
#status.storage.partitions=5

# Flush much faster than normal, which is useful for testing/debugging
offset.flush.interval.ms=10000

# These are provided to inform the user about the presence of the REST host and port configs 
# Hostname & Port for the REST API to listen on. If this is set, it will bind to the interface used to listen to requests.
rest.host.name=
rest.port=8083

consumer.max.poll.interval.ms=3000000

# The Hostname & Port that will be given out to other workers to connect to i.e. URLs that are routable from other servers.
rest.advertised.host.name=172.31.137.67
rest.advertised.port=8083

# Set to a list of filesystem paths separated by commas (,) to enable class loading isolation for plugins
# (connectors, converters, transformations). The list should consist of top level directories that include 
# any combination of: 
# a) directories immediately containing jars with plugins and their dependencies
# b) uber-jars with plugins and their dependencies
# c) directories immediately containing the package directory structure of classes of plugins and their dependencies
# Examples: 
# plugin.path=/usr/local/share/java,/usr/local/share/kafka/plugins,/opt/connectors,
plugin.path=/usr/share/java,/home/ubuntu/confluent-6.0.0/share/confluent-hub-components

What’s the output of this?

curl -s http://localhost:8083/connectors?expand=info&expand=status

Here are the results

{
    "elastic-search-topic12": {
        "info": {
            "name": "elastic-search-topic12",
            "config": {
                "connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
                "type.name": "_doc",
                "topics": "topic1",
                "tasks.max": "2",
                "transforms.routeTS.type": "org.apache.kafka.connect.transforms.TimestampRouter",
                "transforms": "routeTS",
                "transforms.routeTS.topic.format": "${topic}-${timestamp}",
                "key.ignore": "true",
                "schema.ignore": "true",
                "transforms.routeTS.timestamp.format": "YYYY-MM-dd",
                "value.converter.schemas.enable": "false",
                "name": "elastic-search-topic12",
                "connection.url": "https://****",
                "value.converter": "org.apache.kafka.connect.json.JsonConverter",
                "key.converter": "org.apache.kafka.connect.storage.StringConverter"
            },
            "tasks": [{
                    "connector": "elastic-search-topic12",
                    "task": 0
                }, {
                    "connector": "elastic-search-topic12",
                    "task": 1
                }
            ],
            "type": "sink"
        }
    }
}

[1]+ Done curl -s http://localhost:8083/connectors?expand=info

I’m a bit puzzled by the output because this is the kind of thing I get for http://localhost:8083/connectors?expand=info&expand=status, note the status object.

{
  "source-datagen-orders-us": {
    "info": {
      "name": "source-datagen-orders-us",
      "config": {
        "connector.class": "io.confluent.kafka.connect.datagen.DatagenConnector",
        "topic.creation.default.partitions": "6",
        "tasks.max": "1",
        "topic.creation.default.replication.factor": "1",
        "name": "source-datagen-orders-us",
        "kafka.topic": "orders",
        "schema.keyfield": "orderid",
        "schema.filename": "/data/orders_us.avsc"
      },
      "tasks": [
        {
          "connector": "source-datagen-orders-us",
          "task": 0
        }
      ],
      "type": "source"
    },
    "status": {
      "name": "source-datagen-orders-us",
      "connector": {
        "state": "RUNNING",
        "worker_id": "ksqldb:8083"
      },
      "tasks": [
        {
          "id": 0,
          "state": "RUNNING",
          "worker_id": "ksqldb:8083"
        }
      ],
      "type": "source"
    }
  }
}

From the looks of it you only ran curl -s http://localhost:8083/connectors?expand=info (maybe you got the & with a space before it?)

So when I run that first it does not get formatted correctly. On the original is formatted it myself. Here is a picture of the command and results.

When you hit enter that last line shows up. Not sure why I can not see the status of the connector.
I have also added another connector since the last time I posted that.

try quotation marks:

curl -s "http://localhost:8083/connectors?expand=info&expand=status"

So I just see an empty set. Like there is nothing running. Shows { }.
Does this mean they are not running? Confused though cause data is being sent through and im not seeing any issue right now.

In my worker configs i have both of these enable is this correct? Just curious.

rest.host.name=
rest.port=8083

consumer.max.poll.interval.ms=3000000

# The Hostname & Port that will be given out to other workers to connect to i.e. URLs that are routable from other servers.
rest.advertised.host.name=172.31.137.67
rest.advertised.port=8083

Something is really odd. If I create a test topic and send like 150 messages it goes in just fine but for some reason its just sending so much data. So Messages are in ES but my network output is huge and does not stop. I have deleted all other connectors and just using this test one.

See the output below.

Any idea what is causing this? When i kill the process it goes back down. If I restart its lower but nothing is being sent.

I can try maybe starting from scratch see if that helps.