Problem with writing Kafka Connect `log4j` logs to a Kafka Topic

Hi people!

I need to write Kafka Connect log4j logs to a Kafka Topic, to be able to access them more easily.

There is this good blog post on log4j configuration for to write Kafka Connect logs to both stdout and a file: https://forum.confluent.io/t/kafka-connect-change-log-level-and-write-log-to-file/

Also, there is another log4j appender type, which is called KafkaAppender (Log4j – Log4j 2 Appenders).

But after I added its configuration to my connect-log4j.properties:

log4j.rootLogger=INFO, stdout, connectAppender, connectKafkaAppender
#log4j.rootLogger=INFO, stdout, connectAppender

log4j.logger.org.apache.zookeeper=ERROR
log4j.logger.org.reflections=ERROR

# The `%X{connector.context}` parameter in the layout includes connector-specific and task-specific information
# in the log message, where appropriate. This makes it easier to identify those log messages that apply to a
# specific connector. Simply add this parameter to the log layout configuration below to include the contextual information.
#
#connect.log.pattern=[%d] %p %m (%c:%L)%n
connect.log.pattern=[%d] %p %X{connector.context}%m (%c:%L)%n

# Send the logs to the console.
#
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=${connect.log.pattern}

# Send the logs to a file, rolling the file at midnight local time. For example, the `File` option specifies the
# location of the log files (e.g. ${kafka.logs.dir}/connect.log), and at midnight local time the file is closed
# and copied in the same directory but with a filename that ends in the `DatePattern` option.
#
log4j.appender.connectAppender=org.apache.log4j.DailyRollingFileAppender
log4j.appender.connectAppender.DatePattern='.'yyyy-MM-dd-HH
log4j.appender.connectAppender.File=${kafka.logs.dir}/connect.log
log4j.appender.connectAppender.layout=org.apache.log4j.PatternLayout
log4j.appender.connectAppender.layout.ConversionPattern=${connect.log.pattern}

# Send the logs to a kafka topic
# there will be no key for the kafka records by default
log4j.appender.connectKafkaAppender=org.apache.kafka.log4jappender.KafkaLog4jAppender
log4j.appender.connectKafkaAppender.brokerList=localhost:9092
log4j.appender.connectKafkaAppender.topic=_connect_log
log4j.appender.connectKafkaAppender.compressionType=none
log4j.appender.connectKafkaAppender.ignoreExceptions=true
log4j.appender.connectKafkaAppender.syncSend=false
log4j.appender.connectKafkaAppender.layout=org.apache.log4j.PatternLayout
log4j.appender.connectKafkaAppender.layout.ConversionPattern=${connect.log.pattern}

My Kafka Connect Worker won’t start anymore. The topic _connect_log exists in my Kafka Cluster, and it is empty.

The only Worker’s output, which I see in my console is as following:

[2021-05-04 10:25:00,641] INFO ProducerConfig values: 
	acks = 1
	batch.size = 16384
	bootstrap.servers = [localhost:9092]
	buffer.memory = 33554432
	client.dns.lookup = use_all_dns_ips
	client.id = producer-1
	compression.type = none
	connections.max.idle.ms = 540000
	delivery.timeout.ms = 120000
	enable.idempotence = false
	interceptor.classes = []
	internal.auto.downgrade.txn.commit = false
	key.serializer = class org.apache.kafka.common.serialization.ByteArraySerializer
	linger.ms = 0
	max.block.ms = 60000
	max.in.flight.requests.per.connection = 5
	max.request.size = 1048576
	metadata.max.age.ms = 300000
	metadata.max.idle.ms = 300000
	metric.reporters = []
	metrics.num.samples = 2
	metrics.recording.level = INFO
	metrics.sample.window.ms = 30000
	partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner
	receive.buffer.bytes = 32768
	reconnect.backoff.max.ms = 1000
	reconnect.backoff.ms = 50
	request.timeout.ms = 30000
	retries = 2147483647
	retry.backoff.ms = 100
	sasl.client.callback.handler.class = null
	sasl.jaas.config = null
	sasl.kerberos.kinit.cmd = /usr/bin/kinit
	sasl.kerberos.min.time.before.relogin = 60000
	sasl.kerberos.service.name = null
	sasl.kerberos.ticket.renew.jitter = 0.05
	sasl.kerberos.ticket.renew.window.factor = 0.8
	sasl.login.callback.handler.class = null
	sasl.login.class = null
	sasl.login.refresh.buffer.seconds = 300
	sasl.login.refresh.min.period.seconds = 60
	sasl.login.refresh.window.factor = 0.8
	sasl.login.refresh.window.jitter = 0.05
	sasl.mechanism = GSSAPI
	security.protocol = PLAINTEXT
	security.providers = null
	send.buffer.bytes = 131072
	socket.connection.setup.timeout.max.ms = 127000
	socket.connection.setup.timeout.ms = 10000
	ssl.cipher.suites = null
	ssl.enabled.protocols = [TLSv1.2, TLSv1.3]
	ssl.endpoint.identification.algorithm = https
	ssl.engine.factory.class = null
	ssl.key.password = null
	ssl.keymanager.algorithm = SunX509
	ssl.keystore.certificate.chain = null
	ssl.keystore.key = null
	ssl.keystore.location = null
	ssl.keystore.password = null
	ssl.keystore.type = JKS
	ssl.protocol = TLSv1.3
	ssl.provider = null
	ssl.secure.random.implementation = null
	ssl.trustmanager.algorithm = PKIX
	ssl.truststore.certificates = null
	ssl.truststore.location = null
	ssl.truststore.password = null
	ssl.truststore.type = JKS
	transaction.timeout.ms = 60000
	transactional.id = null
	value.serializer = class org.apache.kafka.common.serialization.ByteArraySerializer
 (org.apache.kafka.clients.producer.ProducerConfig:0)

Usually, there is much more output on Worker’s startup. And if I remove the connectKafkaAppender related configuration from my connect-log4j.properties, the Worker starts normally and serves requests.

Is there anything I am missing or doing wrong?

May the 4th be with you! :dizzy:

Hi,

did you try with
log4j.rootLogger=INFO, stdout, connectAppender

edit:
sorry of course you’ve tried :wink: just recognized your last sentence.

Regards,
Michael

next try :wink:
did you try with settings provided here:

perhaps also the “underscore topic” leads to an error?

hi @mmuehlbeyer!

in fact, I already tried different topic names including connect.log and connectlog, and this didn’t change much for me.

Do I understand it right that the main difference in the config provided by you is that log4j.appender.kafka_appender.layout=io.confluent.common.logging.log4j.StructuredJsonLayout is used there?

Or should I take the config 1:1 and give it a try to see if the worker starts with it?

hi @whatsupbros

ok I see.

yes you’re right, I would go for both 1:1 and only changing the log4j.appender.kafka_appender.layout

Okay, I tried it to change the appender layout, and now have this error after I start the Worker:

log4j:ERROR Could not instantiate class [io.confluent.common.logging.log4j.StructuredJsonLayout].                                                                                                               [15:47:59]
java.lang.ClassNotFoundException: io.confluent.common.logging.log4j.StructuredJsonLayout
        at java.base/jdk.internal.loader.BuiltinClassLoader.loadClass(BuiltinClassLoader.java:602)
        at java.base/jdk.internal.loader.ClassLoaders$AppClassLoader.loadClass(ClassLoaders.java:178)
        at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:522)
        at java.base/java.lang.Class.forName0(Native Method)
        at java.base/java.lang.Class.forName(Class.java:340)
        at org.apache.log4j.helpers.Loader.loadClass(Loader.java:198)
        at org.apache.log4j.helpers.OptionConverter.instantiateByClassName(OptionConverter.java:327)
        at org.apache.log4j.helpers.OptionConverter.instantiateByKey(OptionConverter.java:124)
        at org.apache.log4j.PropertyConfigurator.parseAppender(PropertyConfigurator.java:797)
        at org.apache.log4j.PropertyConfigurator.parseCategory(PropertyConfigurator.java:768)
        at org.apache.log4j.PropertyConfigurator.configureRootCategory(PropertyConfigurator.java:648)
        at org.apache.log4j.PropertyConfigurator.doConfigure(PropertyConfigurator.java:514)
        at org.apache.log4j.PropertyConfigurator.doConfigure(PropertyConfigurator.java:580)
        at org.apache.log4j.helpers.OptionConverter.selectAndConfigure(OptionConverter.java:526)
        at org.apache.log4j.LogManager.<clinit>(LogManager.java:127)
        at org.slf4j.impl.Log4jLoggerFactory.<init>(Log4jLoggerFactory.java:66)
        at org.slf4j.impl.StaticLoggerBinder.<init>(StaticLoggerBinder.java:72)
        at org.slf4j.impl.StaticLoggerBinder.<clinit>(StaticLoggerBinder.java:45)
        at org.slf4j.LoggerFactory.bind(LoggerFactory.java:150)
        at org.slf4j.LoggerFactory.performInitialization(LoggerFactory.java:124)
        at org.slf4j.LoggerFactory.getILoggerFactory(LoggerFactory.java:417)
        at org.slf4j.LoggerFactory.getLogger(LoggerFactory.java:362)
        at org.slf4j.LoggerFactory.getLogger(LoggerFactory.java:388)
        at org.apache.kafka.connect.cli.ConnectDistributed.<clinit>(ConnectDistributed.java:57)

And the worker output is the same - only the producer config is there.

Using the configuration 1:1 is unfortunately not an option for me, because it seems that this is the log4j config for ksqlDB and I simply don’t have some variables used in the file set in my environment.

However, I do not see anything in the configuration, which could fix my issue, since it is pretty much the same in the Kafka Connect part. Only log4j.appender.kafka_appender.SyncSend is set to true, but this is the default, which I also tried.

hmm

class is in classpath?

perhaps one thing worth to try
https://kafka-tutorials.confluent.io/handling-deserialization-errors/ksql.html

seems the configuration is similar as the one you’ve tried
perhaps you can get correct classpath and so on from the tutorial.

nevertheless I’ll check the settings later in my local lab setup

I am not a Java Developer, and to be honest, I cannot tell if the class is in classpath or not.

But what I can say is that in my case it is not ksqlDB, it is Kafka Connect Worker, which I am trying to start with such configuration that it writes logs of self not only to a file, but also a Kafka Topic.

This configuration, which I provided in the starter message, is actually result of my own research on log4j, because I never did it before.

So probably it can be done even easier that I am doing it there. The ultimate goal is to have the logs inside a topic.

me neither :wink:

understand, thought it might helpful

are you running vanilla Kafka or Confluent Kafka?
might the cause for the class not found

I see should be possible I guess but also need to test it by myself

I am on Confluent Platform Community in fact. But the class is still not found when I try using it.

But I don’t think that the reason is in the log message layout, to be honest. Because even when the class is not found, the behaviour of the worker is the same (it hangs with the only message with the provider properties in the output).

I guess there is something about the connection to the Kafka Brokers. But I cannot understand what exactly is wrong with it (and it is local installation, so there is no even network traversing and stuff).

hmm I see

I’ve played also around with my local setup but wasn’t able to get logs into a topic.

one workaround
not the best but maybe worth to think about:

why not ingesting the logfiles with a file source connector with kafka connect ;)?
I know if connect goes down the logs will not be loaded to kafka but perhaps it’s a proper workaround for your usecase

I also have this option in mind, but I’d rather use it as a last resort. Because you know, this is not a good idea to write a service log using functionality of the service itself, it looks fishy.

That is why I was trying to use built in log4j capabilities for that, which seems to be a better option.

But thank you anyway for trying to be helpful!

was able to get it up and running :slight_smile:
config below

important:
you need the following jars in your $CONFLUENT_INSTALL_DIR/share/java/confluent-common

  • common-logging-6.1.1.jar
  • confluent-log4j-extensions-6.1.1.jar

get it from:
https://mvnrepository.com/artifact/io.confluent/common-logging
https://mvnrepository.com/artifact/io.confluent/confluent-log4j-extensions

config starts here

log4j.rootLogger=DEBUG, stdout, connectAppender

# Send the logs to the console.
#
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout

# Send the logs to a file, rolling the file at midnight local time. For example, the `File` option specifies the
# location of the log files (e.g. ${kafka.logs.dir}/connect.log), and at midnight local time the file is closed
# and copied in the same directory but with a filename that ends in the `DatePattern` option.
#
log4j.appender.connectAppender=org.apache.log4j.DailyRollingFileAppender
log4j.appender.connectAppender.DatePattern='.'yyyy-MM-dd-HH
log4j.appender.connectAppender.File=${kafka.logs.dir}/connect.log
log4j.appender.connectAppender.layout=org.apache.log4j.PatternLayout

# The `%X{connector.context}` parameter in the layout includes connector-specific and task-specific information
# in the log message, where appropriate. This makes it easier to identify those log messages that apply to a
# specific connector. Simply add this parameter to the log layout configuration below to include the contextual information.
#
connect.log.pattern=[%d] %p %m (%c:%L)%n
#connect.log.pattern=[%d] %p %X{connector.context}%m (%c:%L)%n

log4j.appender.stdout.layout.ConversionPattern=${connect.log.pattern}
log4j.appender.connectAppender.layout.ConversionPattern=${connect.log.pattern}

og4j.appender.kafka_appender=org.apache.kafka.log4jappender.KafkaLog4jAppender
log4j.appender.kafka_appender.layout=io.confluent.common.logging.log4j.StructuredJsonLayout
log4j.appender.kafka_appender.BrokerList=localhost:9092
log4j.appender.kafka_appender.Topic=k-connect-log
log4j.logger.processing=INFO, kafka_appender


log4j.logger.org.apache.kafka.connect=INFO, kafka_appender
log4j.additivity.org.apache.kafka.connect=false

log4j.logger.org.apache.zookeeper=ERROR
log4j.logger.org.reflections=ERROR
1 Like

Wow, nice job! This sounds really useful.

Thx! Will be able to check it tomorrow, so will let you know as soon as I have any result here.

1 Like

Hi @mmuehlbeyer!

So, I finally had a chance to check your proposal, and with some help of it, I found out the actual reason of the issue.

First of all, about these libraries you are mentioning - for the reason unknown to me, I don’t have the mentioned libraries in my $CONFLUENT_BASE/share/java/confluent-common for my installation.

I also checked it for the most current version of Confluent Platform Community and Enterprise (because the current is 6.1.1 and I am now still on 6.1.0) - and there no such libraries there as well under confluent-common directory.

However, in both my installation and the most current one, I have these libraries in $CONFLUENT_BASE/share/java/ksqldb directory.

I don’t know if it was intended to be like this, but this is the reality we have here. I don’t know though how all these libraries are loaded from $CONFLUENT_BASE/share/java by the Confluent Platform processes, probably @rmoff could clarify on this a little more.

Nevertheless, I was quite sure that unless I use the io.confluent.common.logging.log4j.StructuredJsonLayout as the log message layout, I shouldn’t need these libraries, and that is why I looked for other differences in yours and mine configs.

So, I noticed, that you are using your kafka_appender only for particular loggers, and you didn’t add it as a root logger.

And when I adjusted my config the same way, so that only Kafka Connect logger logs are written to my Kafka Topic, it all started to work!

Everything else about the config stayed the same - the topic name starting with _ symbol and having dash - symbol in the name.

The bottomline is, that you probably cannot use KafkaLog4jAppender for the root logger.

My final and working connect-log4j.properties file looks like this now:

log4j.rootLogger=INFO, stdout, connectAppender

log4j.logger.org.apache.kafka.connect=INFO, connectKafkaAppender

log4j.logger.org.apache.zookeeper=ERROR
log4j.logger.org.reflections=ERROR

# The `%X{connector.context}` parameter in the layout includes connector-specific and task-specific information
# in the log message, where appropriate. This makes it easier to identify those log messages that apply to a
# specific connector. Simply add this parameter to the log layout configuration below to include the contextual information.
#
#connect.log.pattern=[%d] %p %m (%c:%L)%n
connect.log.pattern=[%d] %p %X{connector.context}%m (%c:%L)%n

# Send the logs to the console.
#
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=${connect.log.pattern}

# Send the logs to a file, rolling the file at midnight local time. For example, the `File` option specifies the
# location of the log files (e.g. ${kafka.logs.dir}/connect.log), and at midnight local time the file is closed
# and copied in the same directory but with a filename that ends in the `DatePattern` option.
#
log4j.appender.connectAppender=org.apache.log4j.DailyRollingFileAppender
log4j.appender.connectAppender.DatePattern='.'yyyy-MM-dd-HH
log4j.appender.connectAppender.File=${kafka.logs.dir}/connect.log
log4j.appender.connectAppender.layout=org.apache.log4j.PatternLayout
log4j.appender.connectAppender.layout.ConversionPattern=${connect.log.pattern}

# Send the logs to a kafka topic
# there will be no key for the kafka records by default
log4j.appender.connectKafkaAppender=org.apache.kafka.log4jappender.KafkaLog4jAppender
log4j.appender.connectKafkaAppender.brokerList=localhost:9092
log4j.appender.connectKafkaAppender.topic=_connect-log
log4j.appender.connectKafkaAppender.compressionType=none
log4j.appender.connectKafkaAppender.ignoreExceptions=true
log4j.appender.connectKafkaAppender.syncSend=false
log4j.appender.connectKafkaAppender.layout=org.apache.log4j.PatternLayout
log4j.appender.connectKafkaAppender.layout.ConversionPattern=${connect.log.pattern}
1 Like

Regarding usage of io.confluent.common.logging.log4j.StructuredJsonLayout for the message layout - I tried this too.

And you were 100% correct, that the two mentioned by you libraries are required for it:

  • common-logging-X.X.X.jar
  • confluent-log4j-extensions-X.X.X.jar

Where X.X.X is the version of your Confluent Platform installation. And yes, these libraries are missing byt default in $CONFLUENT_BASE/share/java/confluent-common directory, however they exist in $CONFLUENT_BASE/share/java/ksqldb.

I just copied them from there to confluent-common directory, restarted my Kafka Connect Workers, and they started to see the needed libraries, and they were loaded without issues.

So it all started to work, and as a result, the log messages were formatted like this in my topic:

$ ./bin/kafka-console-consumer \
> --bootstrap-server localhost:9092 \
> --topic _connect-log \
> --key-deserializer "org.apache.kafka.common.serialization.StringDeserializer" \
> --value-deserializer "org.apache.kafka.common.serialization.StringDeserializer" \
> --partition 0 --offset 250 --max-messages 5

{"level":"INFO","logger":"org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader","time":1620392108294,"message":Added alias 'TopicNameMatches' to plugin 'org.apache.kafka.connect.transforms.predicates.TopicNameMatches'}
{"level":"INFO","logger":"org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader","time":1620392108296,"message":Added alias 'BasicAuthSecurityRestExtension' to plugin 'org.apache.kafka.connect.rest.basic.auth.extension.BasicAuthSecurityRestExtension'}
{"level":"INFO","logger":"org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader","time":1620392108296,"message":Added aliases 'AllConnectorClientConfigOverridePolicy' and 'All' to plugin 'org.apache.kafka.connect.connector.policy.AllConnectorClientConfigOverridePolicy'}
{"level":"INFO","logger":"org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader","time":1620392108298,"message":Added aliases 'NoneConnectorClientConfigOverridePolicy' and 'None' to plugin 'org.apache.kafka.connect.connector.policy.NoneConnectorClientConfigOverridePolicy'}
{"level":"INFO","logger":"org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader","time":1620392108299,"message":Added aliases 'PrincipalConnectorClientConfigOverridePolicy' and 'Principal' to plugin 'org.apache.kafka.connect.connector.policy.PrincipalConnectorClientConfigOverridePolicy'}

Processed a total of 5 messages

However, there is another problem with it - because the topic message values are sadly not valid JSONs. The reason for this, is that the log message value itself (message attribute of the resulting JSON) is not properly quoted, and any special characters within the message value are not escaped.

@rmoff could this be a bug with the log message layout formatter?

As the current workaround of the mentioned above JSON serialization problem with io.confluent.common.logging.log4j.StructuredJsonLayout, I used these layout properties instead:

log4j.appender.connectKafkaAppender.layout=org.apache.log4j.PatternLayout
#log4j.appender.connectKafkaAppender.layout.ConversionPattern=${connect.log.pattern}
log4j.appender.connectKafkaAppender.layout.ConversionPattern={"level":"%p","timestamp":"%d{ISO8601}","logger":"%c","message":"%X{connector.context}%m"}

Such configuration gives us these log records in the _connect-log topic:

$ ./bin/kafka-console-consumer \
> --bootstrap-server localhost:9092 \
> --topic _connect-log \
> --key-deserializer "org.apache.kafka.common.serialization.StringDeserializer" \
> --value-deserializer "org.apache.kafka.common.serialization.StringDeserializer" \
> --partition 0 --offset 195 --max-messages 5

{"level":"INFO","timestamp":"2021-05-07 14:58:01,267","logger":"org.apache.kafka.connect.runtime.distributed.WorkerCoordinator","message":"[Worker clientId=connect-1, groupId=connect-cluster] Rebalance started"}
{"level":"INFO","timestamp":"2021-05-07 14:58:01,313","logger":"org.apache.kafka.connect.runtime.distributed.DistributedHerder","message":"[Worker clientId=connect-1, groupId=connect-cluster] Joined group at generation 1 with protocol version 2 and got assignment: Assignment{error=0, leader='connect-1-5e12c053-635c-4489-93ef-f6689b91cefb', leaderUrl='http://127.0.1.1:8083/', offset=-1, connectorIds=[], taskIds=[], revokedConnectorIds=[], revokedTaskIds=[], delay=0} with rebalance delay: 0"}
{"level":"INFO","timestamp":"2021-05-07 14:58:01,314","logger":"org.apache.kafka.connect.runtime.distributed.DistributedHerder","message":"[Worker clientId=connect-1, groupId=connect-cluster] Starting connectors and tasks using config offset -1"}
{"level":"INFO","timestamp":"2021-05-07 14:58:01,315","logger":"org.apache.kafka.connect.runtime.distributed.DistributedHerder","message":"[Worker clientId=connect-1, groupId=connect-cluster] Finished starting connectors and tasks"}
{"level":"INFO","timestamp":"2021-05-07 14:58:01,368","logger":"org.apache.kafka.connect.runtime.distributed.DistributedHerder","message":"[Worker clientId=connect-1, groupId=connect-cluster] Session key updated"}

Processed a total of 5 messages

This is also not ideal, because the log message is not encoded. But unfortunately I didn’t find a way to do this easily with log4j version 1, which seems to be under the hood of Kafka logging capabilities currently.

In log4j2 though, it seems to be possible to use such a pattern, which should do the job:

log4j.appender.connectKafkaAppender.layout.ConversionPattern={"level":"%level","timestamp":"%date{ISO8601_OFFSET_DATE_TIME_HHCMM}","logger":"%logger","message":"%encode{%X{connector.context}%message}{JSON}"}

I am not sure about this, but it looks like log4j is being replaced with log4j2 in this KIP and the related Jira.

Does anybody know for which version it is planned?

Okay, I found an alternative way to write JSON formatted log messages with log4j version 1, but these additional libraries must be put to $CONFLUENT_BASE/share/java/confluent-common directory for it to work:

Then, the layout configuration of the appender should look as following:

log4j.appender.connectKafkaAppender.layout=net.logstash.log4j.JSONEventLayoutV1

As a result, the log messages are formatted like this:

$ ./bin/kafka-console-consumer \
> --bootstrap-server localhost:9092 \
> --topic _connect-log \
> --key-deserializer "org.apache.kafka.common.serialization.StringDeserializer" \
> --value-deserializer "org.apache.kafka.common.serialization.StringDeserializer" \
> --partition 0 --offset 195 --max-messages 5

{"@timestamp":"2021-05-11T09:27:50.404Z","source_host":"DWPNB7S3T273","file":"WorkerCoordinator.java","method":"onJoinPrepare","level":"INFO","line_number":"225","thread_name":"DistributedHerder-connect-1-1","@version":1,"logger_name":"org.apache.kafka.connect.runtime.distributed.WorkerCoordinator","message":"[Worker clientId=connect-1, groupId=connect-cluster] Rebalance started","class":"org.apache.kafka.connect.runtime.distributed.WorkerCoordinator","mdc":{}}

{"@timestamp":"2021-05-11T09:27:50.450Z","source_host":"DWPNB7S3T273","file":"DistributedHerder.java","method":"onAssigned","level":"INFO","line_number":"1689","thread_name":"DistributedHerder-connect-1-1","@version":1,"logger_name":"org.apache.kafka.connect.runtime.distributed.DistributedHerder","message":"[Worker clientId=connect-1, groupId=connect-cluster] Joined group at generation 1 with protocol version 2 and got assignment: Assignment{error=0, leader='connect-1-dc8be253-e282-4c95-838c-fecfe893556e', leaderUrl='http:\/\/127.0.1.1:8083\/', offset=-1, connectorIds=[], taskIds=[], revokedConnectorIds=[], revokedTaskIds=[], delay=0} with rebalance delay: 0","class":"org.apache.kafka.connect.runtime.distributed.DistributedHerder$RebalanceListener","mdc":{}}

{"@timestamp":"2021-05-11T09:27:50.452Z","source_host":"DWPNB7S3T273","file":"DistributedHerder.java","method":"startWork","level":"INFO","line_number":"1216","thread_name":"DistributedHerder-connect-1-1","@version":1,"logger_name":"org.apache.kafka.connect.runtime.distributed.DistributedHerder","message":"[Worker clientId=connect-1, groupId=connect-cluster] Starting connectors and tasks using config offset -1","class":"org.apache.kafka.connect.runtime.distributed.DistributedHerder","mdc":{}}

{"@timestamp":"2021-05-11T09:27:50.453Z","source_host":"DWPNB7S3T273","file":"DistributedHerder.java","method":"startWork","level":"INFO","line_number":"1244","thread_name":"DistributedHerder-connect-1-1","@version":1,"logger_name":"org.apache.kafka.connect.runtime.distributed.DistributedHerder","message":"[Worker clientId=connect-1, groupId=connect-cluster] Finished starting connectors and tasks","class":"org.apache.kafka.connect.runtime.distributed.DistributedHerder","mdc":{}}

{"@timestamp":"2021-05-11T09:27:50.517Z","source_host":"DWPNB7S3T273","file":"DistributedHerder.java","method":"onSessionKeyUpdate","level":"INFO","line_number":"1578","thread_name":"KafkaBasedLog Work Thread - connect-configs","@version":1,"logger_name":"org.apache.kafka.connect.runtime.distributed.DistributedHerder","message":"[Worker clientId=connect-1, groupId=connect-cluster] Session key updated","class":"org.apache.kafka.connect.runtime.distributed.DistributedHerder$ConfigUpdateListener","mdc":{}}

Processed a total of 5 messages

This option generates proper JSONs, which should be possible to parse without issues.

Just FYI @mmuehlbeyer! And thank you again for your assistance!

Nevertheless, hopefully, we’ll have log4j2 integrated with Kafka soon, which will give us much more opportunities of configuring the loggers. I.e. JsonTemplateLayout and other fancy things, which would be possible to configure even to have proper Kafka Connect records with schema within the messages.

1 Like

This topic was automatically closed 7 days after the last reply. New replies are no longer allowed.