FilePulse: Not adding duplicate values on the output topic

Hi,

I have been searching about this issue on the internet, but I cannot find any answer. I use FilePulse 2.10 connector on Kafka 3.2 to store data from a file to an output topic. This is my connector config:

{
  "config": {
    "connector.class": "io.streamthoughts.kafka.connect.filepulse.source.FilePulseSourceConnector",
    "topic": "connect-test",
    "tasks.max": 5,

    "filters": "Drop",
    "filters.Drop.type":"io.streamthoughts.kafka.connect.filepulse.filter.DropFilter",
    "filters.Drop.if": "{{ matches($value.message, '^.*START_VNC.*$') }}", 
    "filters.Drop.invert": "true",

    "fs.listing.class": "io.streamthoughts.kafka.connect.filepulse.fs.LocalFSDirectoryListing",
    "fs.listing.directory.path": "/opt/kafka/file-pulse-logs",
    "fs.listing.recursive.enabled": "false",
    "fs.listing.filters": "io.streamthoughts.kafka.connect.filepulse.fs.filter.IgnoreHiddenFileListFilter,io.streamthoughts.kafka.connect.filepulse.fs.filter.RegexFileListFilter", 
    "fs.listing.task.delegation.enabled": "false",
    "file.filter.regex.pattern": ".*\\.log$",
    "fs.listing.interval.ms": 10000,
    "fs.cleanup.policy.class": "io.streamthoughts.kafka.connect.filepulse.fs.clean.LogCleanupPolicy",

    "fs.recursive.scan.enable": "false",
    "tasks.reader.class": "io.streamthoughts.kafka.connect.filepulse.fs.reader.LocalRowFileInputReader",

    "tasks.file.status.storage.bootstrap.servers":"ptld8waardc01:9092",
    "tasks.file.status.storage.topic": "connect-file-pulse-regexp-status",
    "tasks.file.status.storage.topic.partitions": 1,
    "tasks.file.status.storage.class": "io.streamthoughts.kafka.connect.filepulse.state.KafkaFileObjectStateBackingStore",
    "read.max.wait.ms": 5000,
    "allow.tasks.reconfiguration.after.timeout.ms": 5000,

    "offset.attributes.string": "name+lastModified",
    "offset.storage.partitions": 1,
    "offsets.topic.replication.factor": 1
  },
  "name": "file-pulse-logs-regexp"
}

As you can see, I filter the rows of log files containing a specific string. The connector is working with this configuration, but every time the .log file is modified its whole content is stored again in the “connect-test” topic. In the FileStreamSource connector, I did not have this issue. Is there any configuration I can add to the connector or to Kafka to prevent this duplication?

This topic was automatically closed 30 days after the last reply. New replies are no longer allowed.