S3 Sink Connector use file hash as Object name

Hi,

I am trying to use S3 Sink Connector to upload files reading from FilePulse connector. I want to the file hash (SHA256) as the the file name in S3, the format I want is:

key is the hash,

//${key:0:2}/${key:2:4}/${key}.bin

As I read from the document, I cannot use FieldPartitioner to put the key as the filename in S3.
So I have to create my own class to implement a custom partitioner, is that doable?

Really appreciate any suggestions.

James

Can you describe your record flow (FilePulse config and overall aim) a bit more? Is your aim that one source file gets ingested as one Kafka record, which sinks to one S3 object? Or does one source file map to multiple Kafka records (using a 1-to-many filter like CSVFilter), and you want to map the records originating in a particular file back as one S3 object? And then you want the hash of the original file name used to name the S3 object?

Either way, you’ll find that you would need to modify the S3 sink connector itself. The pluggable partitioner isn’t sufficient for what you want to do. The <topic>+<kafkaPartition>+<startOffset>.<format> that ends the name of the S3 object can’t be overridden. Relevant object naming source code is here.

If the record flow is one file to one record to one S3 object, then you might get away with configuring flush.size to one and modifying the connector to get the naming you want. I think you’d be able to maintain these delivery semantics with such a change, though don’t quote me on that – it needs some testing and code analysis.

If you want to map multiple Kafka records back to the same unique S3 object name then your connector changes would be (likely much) more involved since none of these rotation strategies will work for you. You’d have to implement your own rotation and doing that properly feels tricky to say the least.

Thanks for your response, we need one file to one record to one S3 object, if I need to modify the connector, could you give me some guidance? otherwise I have to write my own Kafka connector to upload to S3.

For the connector modification route, I’d recommend a hacky proof of concept as a first step, with the goals being (1) learning how to develop and test the connector, (2) learning enough of the codebase to make the change you’re after, and (3) proving that the object naming area of the code that I pointed to will do the trick. I’m not a maintainer of this connector, just poked around the codebase to see how this might be done – I may be missing something so better to quickly hack and verify the approach as opposed to chasing something that’s not viable.

Some recommendations for the PoC route:

  • The repo is here
  • Use Java 11. You’ll need Maven if you don’t already have it
  • Test against the v12.0.1 tag rather than trying to go against master where you’d need to worry about SNAPSHOT dependencies
  • First goal should be to get mvn package -DskipITs -Daws.region=us-east-1 to succeed. It’ll generate a the connector zip that you can test
  • If that works, dig into the codebase specifically around the object naming area here. That’s where you’ll want to muck with object naming. Perhaps as a first step make a trivial naming change that isn’t exactly what you want
  • Build and smoke test that trivial change, e.g. with a Docker setup so that you can iterate. I bet with that change unit tests will fail so jut skip them. Don’t worry about understanding and modifying tests to get a fully working build. Probably just -DskipITs and -DskipTests maven arguments will do the trick.
  • If the trivial change works as expected, write the actual object name change that you’re after

If the PoC above works well and you have a good handle on how to do this and want to proceed with a proper modification, then I’d recommend a contribution to the repo so that you’re getting maintainer collaboration. Also in the long run you would get connector upgrades “for free”. There’s a contribution guide here. Because a proper contribution would need some thinking about configuration etc, I’d recommend opening a GitHub issue where you can propose the associated config changes and get feedback before writing the config code.