dear all,
I just started using the confluent platform, I am using the community version, I want to use jdbc connect to synchronize a mysql table, after creating a topic, check the details of the topic, the following information appears, and the table data shows garbled characters。
ksql> list topics;
Kafka Topic | Partitions | Partition Replicas
---------------------------------------------------------------
console-consumer-51507 | 1 | 1
default_ksql_processing_log | 1 | 1
mysql-01-t1 | 1 | 1
mysql-0t1 | 1 | 1
yyt_test | 1 | 1
---------------------------------------------------------------
ksql> print 'mysql-0t1' from beginning;
*Key format: ¯\_(ツ)_/¯ - no data processed*
*Value format: does not match any supported format. It may be a STRING with encoding other than UTF8, or some other format.*
rowtime: 2021/09/10 04:06:20.750 Z, key: <null>, value: \x00\x00\x00\x00\x02\x02\x02\x14, partition: 0
rowtime: 2021/09/10 04:06:20.752 Z, key: <null>, value: \x00\x00\x00\x00\x02\x04\x02\x14, partition: 0
rowtime: 2021/09/10 04:06:20.752 Z, key: <null>, value: \x00\x00\x00\x00\x02\x06\x02\x14, partition: 0
rowtime: 2021/09/10 04:06:20.752 Z, key: <null>, value: \x00\x00\x00\x00\x02\x08\x02\x14, partition: 0
rowtime: 2021/09/10 04:06:20.752 Z, key: <null>, value: \x00\x00\x00\x00\x02\x0A\x02\x14, partition: 0
rowtime: 2021/09/10 04:06:20.752 Z, key: <null>, value: \x00\x00\x00\x00\x02\x0C\x02\x14, partition: 0
rowtime: 2021/09/10 04:06:20.752 Z, key: <null>, value: \x00\x00\x00\x00\x02\x0E\x02\x14, partition: 0
rowtime: 2021/09/10 04:06:20.752 Z, key: <null>, value: \x00\x00\x00\x00\x02\x10\x02\x14, partition: 0
rowtime: 2021/09/10 04:06:20.752 Z, key: <null>, value: \x00\x00\x00\x00\x02\x12\x02\x14, partition: 0
rowtime: 2021/09/10 04:06:20.752 Z, key: <null>, value: \x00\x00\x00\x00\x02\x14\x02\x14, partition: 0
rowtime: 2021/09/10 04:06:20.752 Z, key: <null>, value: \x00\x00\x00\x00\x02\x16\x02\x14, partition: 0
rowtime: 2021/09/10 04:06:20.752 Z, key: <null>, value: \x00\x00\x00\x00\x02\x18\x02\x14, partition: 0
rowtime: 2021/09/10 04:06:20.752 Z, key: <null>, value: \x00\x00\x00\x00\x02\x1A\x02\x14, partition: 0
rowtime: 2021/09/10 04:06:20.752 Z, key: <null>, value: \x00\x00\x00\x00\x02\x1C\x02\x14, partition: 0
rowtime: 2021/09/10 04:06:20.752 Z, key: <null>, value: \x00\x00\x00\x00\x02\x1E\x02\x14, partition: 0
jdbc source:
curl -X POST http://10.192.11.71:8083/connectors -H "Content-Type: application/json" -d '{
"name": "jdbc_source_yyt_02",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
"connection.url": "jdbc:mysql://10.192.123.169:3306/test",
"connection.user": "root",
"connection.password": "Mysql@123$",
"topic.prefix": "mysql-0",
"mode":"bulk",
"poll.interval.ms" : 100000,
"catalog.pattern" : "test",
"table.whitelist" : "t1"
}
}'
mysql table define:
mysql> show create table t1;
+-------+---------------------------------------------------------------------------------------------------------------------------------------+
| Table | Create Table |
+-------+---------------------------------------------------------------------------------------------------------------------------------------+
| t1 | CREATE TABLE `t1` (
`c1` int(11) NOT NULL,
`c2` int(11) DEFAULT NULL,
PRIMARY KEY (`c1`)
) ENGINE=InnoDB DEFAULT CHARSET=latin1 |
+-------+---------------------------------------------------------------------------------------------------------------------------------------+
1 row in set (0.00 sec)
mysql>
Configuration file:
# pwd
/home/Confluent/confluent-6.2.0/etc/schema-registry
# cat connect-avro-distributed.properties
bootstrap.servers=10.192.11.71:9092,10.192.11.72:9092,10.192.11.74:9092
group.id=connect-cluster
key.converter=io.confluent.connect.avro.AvroConverter
key.converter.schema.registry.url=http://10.192.11.72:8081
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://10.192.11.72:8081
config.storage.topic=connect-configs
offset.storage.topic=connect-offsets
status.storage.topic=connect-statuses
config.storage.replication.factor=3
offset.storage.replication.factor=3
status.storage.replication.factor=3
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false
plugin.path=/usr/share/java,/home/Confluent/confluent-6.2.0/share/java/kafka,
How can I solve this problem? thank you very much!