hi there,
I have an http sink connector with error reporter setup.
When the target server (http.api.url) responds with http error e.g. 500, then everything works as expected, error reporter will write the error in the configured format to my reporter.error.topic.name.
But when the http sink connector can not connect to the http.api.url at all (the server is not running for example) then the error reporter fails on json conversion:
Caused by: org.apache.kafka.connect.errors.DataException: Conversion error: null value for field that is required and has no default value
at org.apache.kafka.connect.json.JsonConverter.convertToJson(JsonConverter.java:569)
at org.apache.kafka.connect.json.JsonConverter.convertToJsonWithEnvelope(JsonConverter.java:550)
at org.apache.kafka.connect.json.JsonConverter.fromConnectData(JsonConverter.java:304)
at io.confluent.connect.formatter.json.JsonFormatter.formatValue(JsonFormatter.java:45)
at io.confluent.connect.reporter.Reporter.reportMessage(Reporter.java:457)
at io.confluent.connect.reporter.Reporter.reportError(Reporter.java:297)
at io.confluent.connect.reporter.Reporter.reportError(Reporter.java:263)
at io.confluent.connect.http.writer.HttpAsyncReporter.reportErrantRecord(HttpAsyncReporter.java:152)
my complete configuration:
"connector.class": "io.confluent.connect.http.HttpSinkConnector",
"tasks.max": 1,
"topics" : "test-transaction-events",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter.schemas.enable": false,
"value.converter.schema.registry.url": "http://schema-registry:8081",
"value.converter.nullable": true,
"include.schema.changes": false,
"config.action.reload": "restart",
"errors.retry.timeout": "0",
"errors.retry.delay.max.ms" : 60000,
"errors.tolerance": "none",
"errors.log.enable": false,
"errors.log.include.messages": false,
"errors.deadletterqueue.topic.name": "sink-dlt",
"errors.deadletterqueue.topic.replication.factor": 1,
"confluent.topic.bootstrap.servers": "broker:29092",
"confluent.topic.replication.factor": 1,
"http.api.url": "http://host.docker.internal:3333/sink",
"request.method": "POST",
"behavior.on.null.values": "log",
"behavior.on.error": "log",
"report.errors.as" : "http_response",
"headers":"Content-Type:application/json|Accept:application/json",
"header.separator" : "|",
"http.connect.timeout.ms": "3000",
"http.request.timeout.ms" : "10000",
"reporter.result.topic.name" : "${connector}-success",
"reporter.result.topic.replication.factor": 1,
"reporter.result.topic.partitions": 1,
"reporter.error.topic.name": "${connector}-error",
"reporter.error.topic.replication.factor": 1,
"reporter.error.topic.partitions": 1,
"reporter.bootstrap.servers": "http://broker:29092",
"reporter.result.topic.key.format" : "json",
"reporter.result.topic.value.format": "json",
"reporter.error.topic.key.format": "json",
"reporter.error.topic.value.format": "json",
"auth.type": "NONE",
"max.retries": 5,
"retry.backoff.ms": 5000,
"retry.on.status.codes": "400-",
"request.body.format": "json",
"batch.max.size": 1,
"batch.json.as.array" : false
The actual error in JsonConverter is due to the fact that Schema is defined (String) but value is null:
I wanted to test this with "reporter.error.topic.value.format": "string"which is mentioned in the documentation but that value is not allowed, error: Invalid value string for configuration reporter.error.topic.value.format: Must be one of [json]
Any idea what might be wrong?
thanks
