Kafka to Postgres using JDBC Sink Connector


#1

Hello,
I’m testing the kafka pipeline, and I’m stuck at moving enriched data from Kafka to Postgres using the kafka-jdbc-sink-connector.

The point I’m stuck at right now is data mapping, i.e. how to configure the connector to read the enriched snowplow output from the kafka topic, so that it can sink it to Postgres. Some of the enriched data is in JSON, and some in TSV, so how do I get the connector to read that data? The JSONconverter that comes with the kafka connector may be able to read the JSON data, but not the TSV data it.

Has anyone successfully set this up, and can share config details? @simplesteph @magaton

One idea is to use the scala analytics SDK to convert enriched data into JSON, then load the resulting JSON into the kafka connector. If that’s possible, how exactly could I set that up (I’m not an advanced programmer, but I can work with code examples)?

Here are the docs I’ve been referencing:
http://docs.confluent.io/current/connect/connect-jdbc/docs/sink_connector.html
http://docs.confluent.io/current/connect/connect-jdbc/docs/sink_config_options.html

My connect-json-standalone.properties file:

# Sample configuration for a standalone Kafka Connect worker that uses Avro serialization and
# integrates the the Schema Registry. This sample configuration assumes a local installation of
# Confluent Platform with all services running on their default ports.

# Bootstrap Kafka servers. If multiple servers are specified, they should be comma-separated.
bootstrap.servers=my-host:9092

# The converters specify the format of data in Kafka and how to translate it into Connect data.
# Every Connect user will need to configure these based on the format they want their data in
# when loaded from or stored into Kafka
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
#key.converter.schema.registry.url=http://my-host:8081
#value.converter.schema.registry.url=http://my-host:8081
key.converter.schemas.enable=true
value.converter.schemas.enable=true

# The internal converter used for offsets and config data is configurable and must be specified,
# but most users will always want to use the built-in default. Offset and config data is never
# visible outside of Connect in this format.
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false

# Local storage file for offset data
offset.storage.file.filename=/tmp/connect.offsets

# Confuent Control Center Integration -- uncomment these lines to enable Kafka client interceptors
# that will report audit data that can be displayed and analyzed in Confluent Control Center
# producer.interceptor.classes=io.confluent.monitoring.clients.interceptor.MonitoringProducerInterceptor
# consumer.interceptor.classes=io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor

My sink-postgresql.properties file:

name=sink-postgresql
connector.class=io.confluent.connect.jdbc.JdbcSinkConnector
tasks.max=1
topics=enriched-events
connection.url=jdbc:postgresql://my-host/snowplow-database
connection.user=$DATABASE_USER
connection.password=$DATABASE_PASSWORD
auto.create=false
table.name.format=atomic
record_key=??

On-Premise PostgreSQL storage. Still requires S3?
#2

Hi @rob,

It sounds like an interesting setup!
I hope somebody has some pointers for you.


#3

hi @rob

I just done the same thing recently

confluent-connect-jdbc is not support JsonConverter without schema enabled (https://github.com/confluentinc/kafka-connect-jdbc/blob/master/docs/sink_connector.rst#data-mapping) . you need change your connect-json-standalone.properties file

key.converter=io.confluent.connect.avro.AvroConverter
value.converter=io.confluent.connect.avro.AvroConverter

and hence need write your own kafka consumer, producer, avro schema to transform enriched-data topic for connector source topic

also need to note that connect-jdbc data mapping is limited (not support JSON column type), you might need to do some special transforming on unstruct event data or context like contexts_org_w3_performance_timing_1

I suggest you have a look at confluent-connect-elasticsearch, it support json data kafak topic, and data mapping is simpler.