hi there,
I have an http sink connector with error reporter setup.
When the target server (http.api.url
) responds with http error e.g. 500, then everything works as expected, error reporter will write the error in the configured format to my reporter.error.topic.name
.
But when the http sink connector can not connect to the http.api.url
at all (the server is not running for example) then the error reporter fails on json conversion:
Caused by: org.apache.kafka.connect.errors.DataException: Conversion error: null value for field that is required and has no default value
at org.apache.kafka.connect.json.JsonConverter.convertToJson(JsonConverter.java:569)
at org.apache.kafka.connect.json.JsonConverter.convertToJsonWithEnvelope(JsonConverter.java:550)
at org.apache.kafka.connect.json.JsonConverter.fromConnectData(JsonConverter.java:304)
at io.confluent.connect.formatter.json.JsonFormatter.formatValue(JsonFormatter.java:45)
at io.confluent.connect.reporter.Reporter.reportMessage(Reporter.java:457)
at io.confluent.connect.reporter.Reporter.reportError(Reporter.java:297)
at io.confluent.connect.reporter.Reporter.reportError(Reporter.java:263)
at io.confluent.connect.http.writer.HttpAsyncReporter.reportErrantRecord(HttpAsyncReporter.java:152)
my complete configuration:
"connector.class": "io.confluent.connect.http.HttpSinkConnector",
"tasks.max": 1,
"topics" : "test-transaction-events",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter.schemas.enable": false,
"value.converter.schema.registry.url": "http://schema-registry:8081",
"value.converter.nullable": true,
"include.schema.changes": false,
"config.action.reload": "restart",
"errors.retry.timeout": "0",
"errors.retry.delay.max.ms" : 60000,
"errors.tolerance": "none",
"errors.log.enable": false,
"errors.log.include.messages": false,
"errors.deadletterqueue.topic.name": "sink-dlt",
"errors.deadletterqueue.topic.replication.factor": 1,
"confluent.topic.bootstrap.servers": "broker:29092",
"confluent.topic.replication.factor": 1,
"http.api.url": "http://host.docker.internal:3333/sink",
"request.method": "POST",
"behavior.on.null.values": "log",
"behavior.on.error": "log",
"report.errors.as" : "http_response",
"headers":"Content-Type:application/json|Accept:application/json",
"header.separator" : "|",
"http.connect.timeout.ms": "3000",
"http.request.timeout.ms" : "10000",
"reporter.result.topic.name" : "${connector}-success",
"reporter.result.topic.replication.factor": 1,
"reporter.result.topic.partitions": 1,
"reporter.error.topic.name": "${connector}-error",
"reporter.error.topic.replication.factor": 1,
"reporter.error.topic.partitions": 1,
"reporter.bootstrap.servers": "http://broker:29092",
"reporter.result.topic.key.format" : "json",
"reporter.result.topic.value.format": "json",
"reporter.error.topic.key.format": "json",
"reporter.error.topic.value.format": "json",
"auth.type": "NONE",
"max.retries": 5,
"retry.backoff.ms": 5000,
"retry.on.status.codes": "400-",
"request.body.format": "json",
"batch.max.size": 1,
"batch.json.as.array" : false
The actual error in JsonConverter
is due to the fact that Schema is defined (String) but value is null
:
I wanted to test this with "reporter.error.topic.value.format": "string"
which is mentioned in the documentation but that value is not allowed, error: Invalid value string for configuration reporter.error.topic.value.format: Must be one of [json]
Any idea what might be wrong?
thanks