Robin, thank you.
I tried that using this config:
name=Havas-Occupancy-Sftp
kafka.topic=Havas-Occupancy3
tasks.max=1
connector.class=io.confluent.connect.sftp.SftpCsvSourceConnector
cleanup.policy=MOVE
behavior.on.error=LOG
input.path=/Occupancy/TestData
error.path=/Occupancy/TestData/Error
finished.path=/Occupancy/TestData/Finished
input.file.pattern=.*\\.csv
sftp.username=<Username>
sftp.password=<Password>
sftp.host=<IP>
sftp.port=<Port>
csv.first.row.as.header=true
schema.generation.enabled=false
key.schema={\"name\" :\"OccupancyNameSchema2\",\"type\" : \"STRUCT\",\"isOptional\" : true, \"fieldSchemas\" : {\"EventTime\" : {\"type\" : \"STRING\", \"isOptional\" : true}, \"SourceEntityDescription\" : {\"type\" : \"STRING\",\"isOptional\" : true}, \"EventType\" : {\"type\" : \"STRING\",\"isOptional\" : true}, \"TokenNumber\" : {\"type\" : \"STRING\",\"isOptional\" : true}}}
value.schema={\"name\" : \"OccupancyValueSchema2\",\"type\" : \"STRUCT\",\"isOptional\" : true, \"fieldSchemas\" : {\"EventTime\" : {\"name\":"\org.apache.kafka.connect.data.Timestamp\",\"type\":\"INT64\",\"version\":1,\"isOptional\": true}, \"SourceEntityDescription\" : {\"type\" : \"STRING\",\"isOptional\" : true}, \"EventType\" : {\"type\" : \"STRING\",\"isOptional\" : true}, \"TokenNumber\" : {\"type\" : \"STRING\",\"isOptional\" : true}}}
parser.timestamp.date.formats=[dd/MM/yyyy' 'HH:mm]
transforms = eventTimeValue
transforms.eventTimeValue.type=org.apache.kafka.connect.transforms.TimestampConverter$Value
transforms.eventTimeValue.target.type=Timestamp
transforms.eventTimeValue.format=dd/MM/yyyy' 'HH:mm
transforms.eventTimeValue.field=EventTime
Schema is:
key.schema=
{
\"name\" :\"OccupancyNameSchema2\",\"type\" : \"STRUCT\",\"isOptional\" : true,
\"fieldSchemas\" :
{
\"EventTime\" : {\"type\" : \"STRING\", \"isOptional\" : true},
\"SourceEntityDescription\" : {\"type\" : \"STRING\",\"isOptional\" : true},
\"EventType\" : {\"type\" : \"STRING\",\"isOptional\" : true},
\"TokenNumber\" : {\"type\" : \"STRING\",\"isOptional\" : true}
}
}
value.schema=
{
\"name\" : \"OccupancyValueSchema2\",\"type\" : \"STRUCT\",\"isOptional\" : true,
\"fieldSchemas\" :
{
\"EventTime\" : {\"name\":"\org.apache.kafka.connect.data.Timestamp\",\"type\":\"INT64\",\"version\":1,\"isOptional\": true},
\"SourceEntityDescription\" : {\"type\" : \"STRING\",\"isOptional\" : true},
\"EventType\" : {\"type\" : \"STRING\",\"isOptional\" : true},
\"TokenNumber\" : {\"type\" : \"STRING\",\"isOptional\" : true}
}
}
The response is now:
{
"schema":
{
"type":"struct","fields":
[
{"type":"int64","optional":true,"name":"org.apache.kafka.connect.data.Timestamp","version":1,"field":"EventTime"},
{"type":"string","optional":true,"field":"SourceEntityDescription"},
{"type":"string","optional":true,"field":"EventType"},
{"type":"string","optional":true,"field":"TokenNumber"}
],
"optional":true,"name":"HavasOccupancyValueSchema2"
},
"payload":
{
"EventTime":null,
"SourceEntityDescription":"Bla Bla Bla",
"EventType":"Valid Access",
"TokenNumber":"1234567890"
}
}
Made progress as the field is now defined as Timestamp but for some reason the transformation does not do what I expect it should do.
I had a look at the definitions for SimpleDateFormat, and I believe it is correct.
I tested the date time format using this website: http://www.sdfonlinetester.info and looks correct.