CSV field to TimeStamp

I am trying to load a CSV datetime field in to Kafka as a timestamp, using transforms and org.apache.kafka.connect.transforms.TimestampConverter$Value, but no luck.

The CSV field looks like this:

Fieldname: EventTime
Value: 08/02/2021 10:00

I tried this, but without success:

transforms = eventTimeValue
transforms.eventTimeValue.type=org.apache.kafka.connect.transforms.TimestampConverter$Value
transforms.eventTimeValue.target.type=Timestamp
transforms.eventTimeValue.format="dd/MM/yyyy' 'HH:mm"
transforms.eventTimeValue.field=EventTime

Any ideas on why this is not working?

What’s your full connector config? When you say “no luck” do you mean you get an error?

Robin, thank you for the quick response.

The full config is this.

name=Occupancy-Sftp
kafka.topic=Occupancy3
tasks.max=1
connector.class=io.confluent.connect.sftp.SftpCsvSourceConnector
cleanup.policy=MOVE
behavior.on.error=LOG
input.path=/Occupancy
error.path=/Occupancy/Error
finished.path=/Occupancy/Finished
input.file.pattern=.*\\.csv
sftp.username=<Username>
sftp.password=<Password>
sftp.host=<IP>
sftp.port=<Port>
csv.first.row.as.header=true
schema.generation.enabled=true
schema.generation.key.fields=EventTime
parser.timestamp.date.formats=dd/MM/yyyy' 'HH:mm 

transforms = eventTimeValue
transforms.eventTimeValue.type=org.apache.kafka.connect.transforms.TimestampConverter$Value
transforms.eventTimeValue.target.type=Timestamp
transforms.eventTimeValue.format=dd/MM/yyyy' 'HH:mm
transforms.eventTimeValue.field=EventTime

This is the data as put in the Topic:

{
  "schema": {
    "type": "struct",
    "fields": [
      {
        "type": "string",
        "optional": true,
        "field": "EventTime"
      },
      {
        "type": "string",
        "optional": true,
        "field": "SourceEntityDescription"
      },
      {
        "type": "string",
        "optional": true,
        "field": "EventType"
      },
      {
        "type": "string",
        "optional": true,
        "field": "TokenNumber"
      }
    ],
    "optional": false,
    "name": "defaultvalueschemaname"
  },
  "payload": {
    "EventTime": "08/02/2021 11:57",
    "SourceEntityDescription": "Bla Bla Bla",
    "EventType": "Valid Access",
    "TokenNumber": "1234567890"
  }
}

I’m not familiar with this connector, but looking at the docs I think you need to specify

timestamp.mode=FIELD
timestamp.field=EventTime

This should then pick up the field as the timestamp to use as the Kafka message’s timestamp.

This isn’t the same as setting the datatype of the field itself. For that you need to work with the schema generation. You’ve set schema.generation.key.fields but I’m not sure if you want the timestamp as the message key?

If you’re trying to get the timestamp as a timestamp in the value of the message when it’s written to the topic then I would use an explicit schema that you declare yourself in value.schema

Robin, thank you.

I tried that using this config:

name=Havas-Occupancy-Sftp
kafka.topic=Havas-Occupancy3
tasks.max=1
connector.class=io.confluent.connect.sftp.SftpCsvSourceConnector
cleanup.policy=MOVE
behavior.on.error=LOG
input.path=/Occupancy/TestData
error.path=/Occupancy/TestData/Error
finished.path=/Occupancy/TestData/Finished
input.file.pattern=.*\\.csv
sftp.username=<Username>
sftp.password=<Password>
sftp.host=<IP>
sftp.port=<Port>
csv.first.row.as.header=true
schema.generation.enabled=false
key.schema={\"name\" :\"OccupancyNameSchema2\",\"type\" : \"STRUCT\",\"isOptional\" : true, \"fieldSchemas\" : {\"EventTime\" : {\"type\" : \"STRING\", \"isOptional\" : true}, \"SourceEntityDescription\" : {\"type\" : \"STRING\",\"isOptional\" : true}, \"EventType\" : {\"type\" : \"STRING\",\"isOptional\" : true}, \"TokenNumber\" : {\"type\" : \"STRING\",\"isOptional\" : true}}}
value.schema={\"name\" : \"OccupancyValueSchema2\",\"type\" : \"STRUCT\",\"isOptional\" : true, \"fieldSchemas\" : {\"EventTime\" : {\"name\":"\org.apache.kafka.connect.data.Timestamp\",\"type\":\"INT64\",\"version\":1,\"isOptional\": true}, \"SourceEntityDescription\" : {\"type\" : \"STRING\",\"isOptional\" : true}, \"EventType\" : {\"type\" : \"STRING\",\"isOptional\" : true}, \"TokenNumber\" : {\"type\" : \"STRING\",\"isOptional\" : true}}}

parser.timestamp.date.formats=[dd/MM/yyyy' 'HH:mm]

transforms = eventTimeValue
transforms.eventTimeValue.type=org.apache.kafka.connect.transforms.TimestampConverter$Value
transforms.eventTimeValue.target.type=Timestamp
transforms.eventTimeValue.format=dd/MM/yyyy' 'HH:mm
transforms.eventTimeValue.field=EventTime

Schema is:

key.schema=
{
	\"name\" :\"OccupancyNameSchema2\",\"type\" : \"STRUCT\",\"isOptional\" : true, 
	\"fieldSchemas\" : 
	{
		\"EventTime\" : {\"type\" : \"STRING\", \"isOptional\" : true},
		\"SourceEntityDescription\" : {\"type\" : \"STRING\",\"isOptional\" : true}, 
		\"EventType\" : {\"type\" : \"STRING\",\"isOptional\" : true}, 
		\"TokenNumber\" : {\"type\" : \"STRING\",\"isOptional\" : true}
	}
}
value.schema=
{
	\"name\" : \"OccupancyValueSchema2\",\"type\" : \"STRUCT\",\"isOptional\" : true, 
	\"fieldSchemas\" : 
	{
		\"EventTime\" : {\"name\":"\org.apache.kafka.connect.data.Timestamp\",\"type\":\"INT64\",\"version\":1,\"isOptional\": true},
		\"SourceEntityDescription\" : {\"type\" : \"STRING\",\"isOptional\" : true},
		\"EventType\" : {\"type\" : \"STRING\",\"isOptional\" : true},
		\"TokenNumber\" : {\"type\" : \"STRING\",\"isOptional\" : true}
	}
}

The response is now:

{
	"schema":
	{
		"type":"struct","fields":
		[
		{"type":"int64","optional":true,"name":"org.apache.kafka.connect.data.Timestamp","version":1,"field":"EventTime"},
		{"type":"string","optional":true,"field":"SourceEntityDescription"},
		{"type":"string","optional":true,"field":"EventType"},
		{"type":"string","optional":true,"field":"TokenNumber"}
		],
		"optional":true,"name":"HavasOccupancyValueSchema2"
	},
	"payload":
	{
		"EventTime":null,
		"SourceEntityDescription":"Bla Bla Bla",
		"EventType":"Valid Access",
		"TokenNumber":"1234567890"
	}
}

Made progress as the field is now defined as Timestamp but for some reason the transformation does not do what I expect it should do.
I had a look at the definitions for SimpleDateFormat, and I believe it is correct.

I tested the date time format using this website: http://www.sdfonlinetester.info and looks correct.

Update…

Made progress. Discovered there is most likely a bug in the SFTP connector somewhere.
If I let it generate a schema itself the data pull returns the EventTime field, no problem. But whatever I try with a custom schema the EventTime is always null.

Then I received updated CSV files with a new date time format for the EventTime field
yyyy-MM-dd HH:mm:ss, and no problem pulling in the EventTime field and performing the transformation on it.

The previous datetime format was dd/MM/yyyy HH:mm, and I think the code has a problem with the / when a manual schema is defined.

As this source connector “io.confluent.connect.sftp.SftpCsvSourceConnector” is being developed by Confluent, how do I go about reporting this possible bug?

Robin, thank you for your help.

@nathan.nam any suggestions?:point_up:

This topic was automatically closed 7 days after the last reply. New replies are no longer allowed.