Submit message when csv has been ingested

Customers send us csv files. We then send text or csv files to vendors. Vendors send responses back to us abs we then marry the data up. I want to ingest the csv into a db and when complete have a message that contains metadata about the file to a topic that consumers subscribe to. When I ingest the data from csv to db I will need to add the file name to the data so that this can be used by consumers. These consumers would lookup the data they need and drop the file on the vendor sftp site.

I know how to ingest the data into a db but how do I add the file name and possibly file date (last modified) to the data?

Then, is there some kind of hook whereby I can submit a message to another topic (I.e. customer x just sent for.csv) once the file has been ingested?

Hi @gvdonovan, just to be clear, are you talking about ingesting CSV files into Kafka, to then load to a database?

Assuming that you are then it will come down to the particular connector that you use for ingesting the CSV file. There are three to consider:

Looking at kafka-connect-file-pulse docs briefly I saw there is a Kafka topic with metadata that might be useful for your use case.

1 Like

Hi @rmoff and thank you for your response. We are in the evaluating Kafka for many of our processes.

Yes, I want to ingest a CSV from a customer into Kafka and then a db much like you do in hour video. Loading the data is just the first step in our process. Upon receiving customer data we then send a subset of this data in various forms to vendors who then send the data back - also in different forms.

Once we receive everything from our vendors we merge the data and then send back to our customer - and yes, in different forms based on what they’ve requested (i.e. JSON or CSV).

I’m currently investigating how to trigger consumer processes to send the data to the vendors. Using your solution, I may rename “processed” to something like “written to db” and then possibly have another connector which monitors this folder to grab the filename and last modified date and write to a topic that my “vendor” consumers suscribe to. Does this make sense?

Also, when ingesting the data into kafka and then the db - is there a way to add the name of the file and the last modified date so that this data is written to each row? The consumers would then pick up the message with the filename and date and query the db with this information to create the vendor outputs.

Thanks so much for your work and guidance. I just started with Kafka yesterday and after troubleshooting “not an absolute path” errors due to not having created the volume corresponding to the $PWD, I’m up and running.

I’m currently using the spooldir plugin - I will look into the second connector (file-pulse) and how I might be able to add the filename and the file’s last modified date to the payload for each row that will be written to the db. Surelly someone has done this before :slight_smile:

Following Streaming data into Kafka S01/E01 - Loading CSV file - DEV Community I was able to get the file metadata to the payload via the following:

curl \
-i -X PUT -H "Accept:application/json" \
-H  "Content-Type:application/json" http://localhost:8083/connectors/source-csv-filepulse-ee/config \
-d '{
    "connector.class":"io.streamthoughts.kafka.connect.filepulse.source.FilePulseSourceConnector",
    "fs.scan.directory.path":"/tmp/kafka-connect/examples/",
    "fs.scan.interval.ms":"10000",
    "fs.scan.filters":"io.streamthoughts.kafka.connect.filepulse.scanner.local.filter.RegexFileListFilter",
    "file.filter.regex.pattern":".*\\.csv$",
    "task.reader.class": "io.streamthoughts.kafka.connect.filepulse.reader.RowFileInputReader",
    "offset.strategy":"name",
    "skip.headers": "1",
    "topic":"musics-filepulse-csv-ee",
    "internal.kafka.reporter.bootstrap.servers": "broker:29092",
    "internal.kafka.reporter.topic":"connect-file-pulse-status",
    "fs.cleanup.policy.class": "io.streamthoughts.kafka.connect.filepulse.clean.LogCleanupPolicy",
    "tasks.max": 1,
    "filters":"ParseLine, AppendHeaderFileName",                
    "filters.ParseLine.extractColumnName": "headers",
    "filters.ParseLine.trimColumn": "true",
    "filters.ParseLine.separator": ";",
    "filters.ParseLine.type": "io.streamthoughts.kafka.connect.filepulse.filter.DelimitedRowFilter",
    "filters.AppendHeaderFileName.type":"io.streamthoughts.kafka.connect.filepulse.filter.AppendFilter",
    "filters.AppendHeaderFileName.field": "$value.file_name",
    "filters.AppendHeaderFileName.value":"{{ $metadata.name }}"
}'

Hi @gvdonovan , glad you got it working. One thing to mention is that using CSV files for data exchange is often a suboptimal solution (although if you’re dealing with third parties, perhaps one over which you don’t have a choice). Given that, you’ll probably find fewer resources, and more friction, trying to do what you’re wanting to.

Indeed. I’d rather not use CSV but we’re dealing with healthcare providers and vendors and many of them have old proprietary formats that make csv’s look like the shiny new toy. Some send us JSON and for those cases we’ll just use the JSON class. Looking at potentially combining Airflow or Zeebe to compliment Kafka in our solution. Thanks!

1 Like