Http sink error reported fails on NPE when endpoint not available

hi there,
I have an http sink connector with error reporter setup.
When the target server (http.api.url) responds with http error e.g. 500, then everything works as expected, error reporter will write the error in the configured format to my reporter.error.topic.name.

But when the http sink connector can not connect to the http.api.url at all (the server is not running for example) then the error reporter fails on json conversion:

Caused by: org.apache.kafka.connect.errors.DataException: Conversion error: null value for field that is required and has no default value
  at org.apache.kafka.connect.json.JsonConverter.convertToJson(JsonConverter.java:569)
  at org.apache.kafka.connect.json.JsonConverter.convertToJsonWithEnvelope(JsonConverter.java:550)
  at org.apache.kafka.connect.json.JsonConverter.fromConnectData(JsonConverter.java:304)
  at io.confluent.connect.formatter.json.JsonFormatter.formatValue(JsonFormatter.java:45)
  at io.confluent.connect.reporter.Reporter.reportMessage(Reporter.java:457)
  at io.confluent.connect.reporter.Reporter.reportError(Reporter.java:297)
  at io.confluent.connect.reporter.Reporter.reportError(Reporter.java:263)
  at io.confluent.connect.http.writer.HttpAsyncReporter.reportErrantRecord(HttpAsyncReporter.java:152)

my complete configuration:

 "connector.class": "io.confluent.connect.http.HttpSinkConnector",
  "tasks.max": 1,
  "topics" : "test-transaction-events",
  "key.converter": "org.apache.kafka.connect.storage.StringConverter",
  "value.converter": "io.confluent.connect.avro.AvroConverter",
  "value.converter.schemas.enable": false,
  "value.converter.schema.registry.url": "http://schema-registry:8081",
  "value.converter.nullable": true,
  "include.schema.changes": false,
  "config.action.reload": "restart",
  "errors.retry.timeout": "0",
  "errors.retry.delay.max.ms" : 60000,
  "errors.tolerance": "none",
  "errors.log.enable": false,
  "errors.log.include.messages": false,
  "errors.deadletterqueue.topic.name": "sink-dlt",
  "errors.deadletterqueue.topic.replication.factor": 1,
  "confluent.topic.bootstrap.servers": "broker:29092",
  "confluent.topic.replication.factor": 1,
  "http.api.url": "http://host.docker.internal:3333/sink",
  "request.method": "POST",
  "behavior.on.null.values": "log",
  "behavior.on.error": "log",
  "report.errors.as" : "http_response",
  "headers":"Content-Type:application/json|Accept:application/json",
  "header.separator" : "|",
  "http.connect.timeout.ms": "3000",
  "http.request.timeout.ms" : "10000",
  "reporter.result.topic.name" : "${connector}-success",
  "reporter.result.topic.replication.factor": 1,
  "reporter.result.topic.partitions": 1,
  "reporter.error.topic.name": "${connector}-error",
  "reporter.error.topic.replication.factor": 1,
  "reporter.error.topic.partitions": 1,
  "reporter.bootstrap.servers": "http://broker:29092",
  "reporter.result.topic.key.format" : "json",
  "reporter.result.topic.value.format": "json",
  "reporter.error.topic.key.format": "json",
  "reporter.error.topic.value.format": "json",
  "auth.type": "NONE",
  "max.retries": 5,
  "retry.backoff.ms": 5000,
  "retry.on.status.codes": "400-",
  "request.body.format": "json",
  "batch.max.size": 1,
  "batch.json.as.array" : false

The actual error in JsonConverter is due to the fact that Schema is defined (String) but value is null:

I wanted to test this with "reporter.error.topic.value.format": "string"which is mentioned in the documentation but that value is not allowed, error: Invalid value string for configuration reporter.error.topic.value.format: Must be one of [json]

Any idea what might be wrong?
thanks

This topic was automatically closed 30 days after the last reply. New replies are no longer allowed.