Trouble using ActiveMQ source connector

Hello team,

I’m in the middle of POC to use Kafka in client environment, now having a bit of trouble using ActiveMQ source connector. Installed it with confluent-hub command locally (Windows 10 + Ubuntu WSL2).
Creation of connector with next curl command :

curl -s -X POST -H "Content-Type: application/json" --data '{
  "name": "activemqConnector",
  "config": {
    "connector.class": "io.confluent.connect.activemq.ActiveMQSourceConnector",
    "kafka.topic":"poc-ee-kafka",
    "activemq.url":"failover://(tcp://server1:61616,tcp://server2:61616)?randomize=false&jms.redeliveryPolicy.maximumRedeliveries=1",
	"activemq.username":"",
	"activemq.password":"",
    "jms.destination.name":"AC.DEV.ADH.REQ",
	"jms.message.selector":"ssq_messagetype = ''forIg''",
    "jms.destination.type":"queue",
	"topic.creation.enable":"true",
	"topic.creation.default.replication.factor":"1",
    "topic.creation.default.partitions":"1",
	"topic.creation.replication.factor":"1",
	"confluent.topic.replication.factor": "1",
    "topic.creation.partitions":"1",
	"key.converter": "org.apache.kafka.connect.storage.StringConverter",
	"key.converter.schemas.enable":"false",
	"value.converter":"org.apache.kafka.connect.json.JsonConverter",
	"value.converter.schemas.enable":"false",
    "confluent.license":"",
    "confluent.topic.bootstrap.servers":"localhost:9092"
  }
}' http://localhost:8083/connectors

I’m sure that troubles are coming from jms.message.selector property, since if removed all messages are arriving to designated topic.

Output produced by confluent local services connect log command :

[2021-07-30 15:49:34,617] INFO ActiveMQSourceConnectorConfig values:
        activemq.password = [hidden]
        activemq.url = failover://(tcp://server1:61616,tcp://server2:61616)?randomize=false&jms.redeliveryPolicy.maximumRedeliveries=1
        activemq.username = 
        batch.size = 1024
        character.encoding = UTF-8
        confluent.license =
        confluent.topic = _confluent-command
        confluent.topic.bootstrap.servers = [localhost:9092]
        confluent.topic.replication.factor = 1
        jms.destination.name = AC.DEV.ADH.REQ
        jms.destination.type = queue
        jms.message.selector = ssq_messagetype = forIg
        jms.subscription.durable = false
        jms.subscription.name =
        kafka.topic = poc-ee-kafka
        max.pending.messages = 4096
        max.poll.duration = 60000
        max.retry.time = 3600000
        use.permissive.schema = false
 (io.confluent.connect.activemq.ActiveMQSourceConnectorConfig:372)
[2021-07-30 15:49:34,620] INFO Creating ActiveMQConnectionFactory with broker URL 'failover://(tcp://server1:61616,tcp://server2:61616)?randomize=false&jms.redeliveryPolicy.maximumRedeliveries=1' (io.confluent.connect.activemq.ActiveMQSourceTask:30)
[2021-07-30 15:49:34,638] INFO WorkerSourceTask{id=activemqConnector-0} Source task finished initialization and start (org.apache.kafka.connect.runtime.WorkerSourceTask:226)
[2021-07-30 15:49:34,809] INFO activemqConnector-0 Setting JMS client ID to 'activemqConnector-0204af860' (io.confluent.connect.jms.core.source.JmsClientHelper:138)
[2021-07-30 15:49:34,861] INFO Successfully connected to tcp://server1:61616 (org.apache.activemq.transport.failover.FailoverTransport:1054)
[2021-07-30 15:49:34,947] INFO activemqConnector-0 Starting connection (io.confluent.connect.jms.core.source.JmsClientHelper:98)
[2021-07-30 15:49:34,961] INFO Created session in 13 ms (io.confluent.connect.jms.core.source.JmsClientHelper:103)
[2021-07-30 15:49:34,997] INFO Created ActiveMQMessageConsumer { value=ID:PSFB1038-35173-1627674574705-1:1:1:1, started=true } consumer. (io.confluent.connect.jms.core.source.JmsClientHelper:106)

Message are likes this :

ActiveMQTextMessage {commandId = 71, responseRequired = false, messageId = ID:PSFB1038-59511-1627654146508-3:1:1:3:1, originalDestination = null, originalTransactionId = null, producerId = ID:PSFB1038-59511-1627654146508-3:1:1:3, destination = queue://AC.DEV.ADH.REQ, transactionId = TX:ID:PSFB1038-59511-1627654146508-3:1:11, expiration = 0, timestamp = 1627674642542, arrival = 0, brokerInTime = 1627674642606, brokerOutTime = 1627674644747, correlationId = null, replyTo = null, persistent = true, type = null, priority = 4, groupID = null, groupSequence = 0, targetConsumerId = null, compressed = false, userID = null, content = org.apache.activemq.util.ByteSequence@63c6c5e5, marshalledProperties = org.apache.activemq.util.ByteSequence@6edb2c7b, dataStructure = null, redeliveryCounter = 0, size = 0, properties = {ssq_messagetype=forIg, ssq_correlationid=testestIgal}, readOnlyProperties = true, readOnlyBody = true, droppable = false, jmsXGroupFirstForConsumer = false, text = {
  noEmployee": "12334556"
}}

Any idea what could get wrong, so messages are not picked up ?

This topic was automatically closed 30 days after the last reply. New replies are no longer allowed.