I have a kafka topic I’m pushing to opensearch and I’m running into a case where a single partition has stopped working.
I don’t think this is a case of the plugin not working with opensearch as the other partitions are fine, and other topics I have are also fine.
I’ve tried shifting the offset, as well as deleting the consumer group for this connector and the issue still was there - so I believe that means that it’s not a bad message causing the issue.
The errors I’m seeing are of the form
ERROR Failed to execute bulk request due to 'ElasticsearchStatusException[Elasticsearch exception [type=action_request_validation_exception, reason=Validation Failed: 1: an id must be provided if version type or value are set;]]' after 16 attempt(s) (io.confluent.connect.elasticsearch.RetryUtil)
ElasticsearchStatusException[Elasticsearch exception [type=action_request_validation_exception, reason=Validation Failed: 1: an id must be provided if version type or value are set;]]
I’ve tried changing the number of tasks, getting trace logs to see what it’s doing, etc. and I’ve not been able to get any insights into why for this single partition in a 24 partition topic, all with millions of messages in them it seems to be unable to proceed.
Possibly you have empty string keys in Kafka? This does seem like a (the?) scenario that would cause this, and it would make sense that the one partition receiving empty string keys would cause it. If not empty string, perhaps another problematic key.
I like this idea - to really rule out bad messages, monitor the offset as you go. Any chance you are shifting the offset only to hit another problematic record?
If it is a problematic record, to narrow it down, try configuring the connector go one record at a time – AFAICT setting batch.size to 1 will do this. I.e., I’m thinking you can set this config, let the connector fail, and then check the consumer offset of the problematic partition and the problematic record should be the next one.
Another idea: an experiment that you could do to rule a key issue in or out is to create a new connector with a different name, key.ignore set to true, and a temporary different ES instance set up for the experiment. Might be some work to set this up and I’m not sure if you can debug in such a heavy handed way in your environment, but if the connector configured to ignore keys runs without issue then you can be confident that message key is the problem.
Was able to find a dev cluster where this was occurring on so was able to extract out some more details. (interestingly every cluster this is an issue on it’s only partition 8 (out of 24))
The consumer group reported partition 8 to be at offset 1318401.
Offset 1318401: null body, no headers, key = 197246656777161224#749767753839740321
Offset 1318402: protobuf obj. body, no headers, similar key format “num#num”
Test 1: I moved the offset by 1 back to 1318400.
→ Slightly different key (different numbers) but otherwise identical message. No difference in behaviour, still appeared to be ‘stuck’
Set batch.size=1 for following tests
Test 2: Set behavior.on.null.values = ignore. No difference
Test 3: Set behavior.on.malformed.documents = ignore. No difference
Test 4: Set key.ignore = true. No difference
Test 5: Set drop.invalid.message = true. This time it ‘successfully’ processed until the end of the partition
What was interesting about test 5 was that it did process some documents correctly. I see a number of documents that are in later offsets in partition 5 have ended up in elasticsearch. But it does seem to imply that there isn’t just a single invalid document but many. I tried deleting the consumer group to see if it’d get stuck in the same spot, and it didn’t. This time only up to 1213775 (ie not as far as before). This message as well is not particularly interesting, looking like this (this is the next message after offset 1213775)
Timestamp = 1676126033330
Partition = 8
Offset = 1213793
Key = 196438426805734410#940398639743469252
Headers = null
Body = null
So I suppose the good news is that drop.invalid.message lets it at least process some messages, but it’s not clear why some very innocent looking messages are treated as invalid.
For good measure I tried dropping the index from elasticsearch and letting KafkaConnect rebuild it from scratch but ended up in the same spot - a “random” offset that only partition 8 stops on
One issue is records with non-null keys and null values. There are a couple such records you found, and the default behavior of the connector is to fail on these. So, either you’d want to set behavior.on.null.values to IGNORE, or look upstream into why null values are showing up. Is it expected?
But, since your Test 2 saw no difference, this isn’t the only issue.
Since drop.invalid.messages gets you past the issue, this is the only spot where this config is checked, so I’m wondering if, on a fresh test, what "Can't convert ..." errors you see in logging. Hopefully this will shed some light.
Right now I have behavior.on.null.values set to DELETE. The idea being that if we see a null value that it causes the document in ES to be deleted. I could try using IGNORE instead, to see if it’s struggling on those nulls, but other partitions will contain them too so I wouldn’t think it be the cause.
I’ve tried today to reproduce the drop.invalid.messages and this time it got stuck instead of processing the messages. I can see it logging that it has seen the messages & added them into the bulk processor, but once there I get nothing else log-wise. I’ll did notice while looking at the link you provided I’m running an older version of the connector, so I’ll going to try upgrading the plugin - maybe I’m running into an old bug