Hello team,
I’m in the middle of POC to use Kafka in client environment, now having a bit of trouble using ActiveMQ source connector. Installed it with confluent-hub command locally (Windows 10 + Ubuntu WSL2).
Creation of connector with next curl command :
curl -s -X POST -H "Content-Type: application/json" --data '{
"name": "activemqConnector",
"config": {
"connector.class": "io.confluent.connect.activemq.ActiveMQSourceConnector",
"kafka.topic":"poc-ee-kafka",
"activemq.url":"failover://(tcp://server1:61616,tcp://server2:61616)?randomize=false&jms.redeliveryPolicy.maximumRedeliveries=1",
"activemq.username":"",
"activemq.password":"",
"jms.destination.name":"AC.DEV.ADH.REQ",
"jms.message.selector":"ssq_messagetype = ''forIg''",
"jms.destination.type":"queue",
"topic.creation.enable":"true",
"topic.creation.default.replication.factor":"1",
"topic.creation.default.partitions":"1",
"topic.creation.replication.factor":"1",
"confluent.topic.replication.factor": "1",
"topic.creation.partitions":"1",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"key.converter.schemas.enable":"false",
"value.converter":"org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable":"false",
"confluent.license":"",
"confluent.topic.bootstrap.servers":"localhost:9092"
}
}' http://localhost:8083/connectors
I’m sure that troubles are coming from jms.message.selector property, since if removed all messages are arriving to designated topic.
Output produced by confluent local services connect log command :
[2021-07-30 15:49:34,617] INFO ActiveMQSourceConnectorConfig values:
activemq.password = [hidden]
activemq.url = failover://(tcp://server1:61616,tcp://server2:61616)?randomize=false&jms.redeliveryPolicy.maximumRedeliveries=1
activemq.username =
batch.size = 1024
character.encoding = UTF-8
confluent.license =
confluent.topic = _confluent-command
confluent.topic.bootstrap.servers = [localhost:9092]
confluent.topic.replication.factor = 1
jms.destination.name = AC.DEV.ADH.REQ
jms.destination.type = queue
jms.message.selector = ssq_messagetype = forIg
jms.subscription.durable = false
jms.subscription.name =
kafka.topic = poc-ee-kafka
max.pending.messages = 4096
max.poll.duration = 60000
max.retry.time = 3600000
use.permissive.schema = false
(io.confluent.connect.activemq.ActiveMQSourceConnectorConfig:372)
[2021-07-30 15:49:34,620] INFO Creating ActiveMQConnectionFactory with broker URL 'failover://(tcp://server1:61616,tcp://server2:61616)?randomize=false&jms.redeliveryPolicy.maximumRedeliveries=1' (io.confluent.connect.activemq.ActiveMQSourceTask:30)
[2021-07-30 15:49:34,638] INFO WorkerSourceTask{id=activemqConnector-0} Source task finished initialization and start (org.apache.kafka.connect.runtime.WorkerSourceTask:226)
[2021-07-30 15:49:34,809] INFO activemqConnector-0 Setting JMS client ID to 'activemqConnector-0204af860' (io.confluent.connect.jms.core.source.JmsClientHelper:138)
[2021-07-30 15:49:34,861] INFO Successfully connected to tcp://server1:61616 (org.apache.activemq.transport.failover.FailoverTransport:1054)
[2021-07-30 15:49:34,947] INFO activemqConnector-0 Starting connection (io.confluent.connect.jms.core.source.JmsClientHelper:98)
[2021-07-30 15:49:34,961] INFO Created session in 13 ms (io.confluent.connect.jms.core.source.JmsClientHelper:103)
[2021-07-30 15:49:34,997] INFO Created ActiveMQMessageConsumer { value=ID:PSFB1038-35173-1627674574705-1:1:1:1, started=true } consumer. (io.confluent.connect.jms.core.source.JmsClientHelper:106)
Message are likes this :
ActiveMQTextMessage {commandId = 71, responseRequired = false, messageId = ID:PSFB1038-59511-1627654146508-3:1:1:3:1, originalDestination = null, originalTransactionId = null, producerId = ID:PSFB1038-59511-1627654146508-3:1:1:3, destination = queue://AC.DEV.ADH.REQ, transactionId = TX:ID:PSFB1038-59511-1627654146508-3:1:11, expiration = 0, timestamp = 1627674642542, arrival = 0, brokerInTime = 1627674642606, brokerOutTime = 1627674644747, correlationId = null, replyTo = null, persistent = true, type = null, priority = 4, groupID = null, groupSequence = 0, targetConsumerId = null, compressed = false, userID = null, content = org.apache.activemq.util.ByteSequence@63c6c5e5, marshalledProperties = org.apache.activemq.util.ByteSequence@6edb2c7b, dataStructure = null, redeliveryCounter = 0, size = 0, properties = {ssq_messagetype=forIg, ssq_correlationid=testestIgal}, readOnlyProperties = true, readOnlyBody = true, droppable = false, jmsXGroupFirstForConsumer = false, text = {
noEmployee": "12334556"
}}
Any idea what could get wrong, so messages are not picked up ?