Kafka Confluent Cloud Authentication

Hi,

Newbie Snowplow user here but I have managed to get an end to end local implementation working within Docker using Web Tracker, Scala Stream Kafka Collector, Stream Enrich, Druid and Metabase.

Before building a working staging environment within Kubernetes using Helm, I wanted to try and connect the Scala Stream Kafka Collector to our Kafka Confluent Cloud account. However, I am running into issues with the SASL authentication. The documentation is pretty sparse on this subject and just points us to the Kafka documentation. Here is my config.hocon configuration -

sink {
  enabled = kafka
  brokers = "our-domain.confluent.cloud:9092"
  retries = 0

  # The kafka producer has a variety of possible configuration options defined at
  # https://kafka.apache.org/documentation/#producerconfigs
  # Some values are set to other values from this config by default:
  # "bootstrap.servers" = brokers
  # "buffer.memory"     = buffer.byteLimit
  # "linger.ms"         = buffer.timeLimit

  producerConf {
    "sasl.jaas.config" = "org.apache.kafka.common.security.plain.PlainLoginModule required username='1234567890' password='our-confluent-api-secret';"
    "security.protocol" = "SASL_SSL"
    "sasl.mechanisms" = "PLAIN"
  }
}

However, when the container starts the configuration in the output doesn’t match -

sasl.client.callback.handler.class = null
sasl.jaas.config = [hidden]
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.login.callback.handler.class = null
sasl.login.class = null
sasl.login.refresh.buffer.seconds = 300
sasl.login.refresh.min.period.seconds = 60
sasl.login.refresh.window.factor = 0.8
sasl.login.refresh.window.jitter = 0.05
sasl.mechanism = GSSAPI
security.protocol = SASL_SSL

Additionally, I get the following error in the console -

[main] INFO org.apache.kafka.clients.producer.KafkaProducer - [Producer clientId=producer-1] Closing the Kafka producer with timeoutMillis = 0 ms.
Exception in thread "main" org.apache.kafka.common.KafkaException: Failed to construct kafka producer
at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:431)
at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:299)
at com.snowplowanalytics.snowplow.collectors.scalastream.sinks.KafkaSink.createProducer(KafkaSink.scala:58)
at com.snowplowanalytics.snowplow.collectors.scalastream.sinks.KafkaSink.<init>(KafkaSink.scala:34)
at com.snowplowanalytics.snowplow.collectors.scalastream.KafkaCollector$.main(KafkaCollector.scala:29)
at com.snowplowanalytics.snowplow.collectors.scalastream.KafkaCollector.main(KafkaCollector.scala)
Caused by: org.apache.kafka.common.KafkaException: java.lang.IllegalArgumentException: No serviceName defined in either JAAS or Kafka config
at org.apache.kafka.common.network.SaslChannelBuilder.configure(SaslChannelBuilder.java:160)
at org.apache.kafka.common.network.ChannelBuilders.create(ChannelBuilders.java:146)
at org.apache.kafka.common.network.ChannelBuilders.clientChannelBuilder(ChannelBuilders.java:67)
at org.apache.kafka.clients.ClientUtils.createChannelBuilder(ClientUtils.java:99)
at org.apache.kafka.clients.producer.KafkaProducer.newSender(KafkaProducer.java:439)
at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:420)
... 5 more
Caused by: java.lang.IllegalArgumentException: No serviceName defined in either JAAS or Kafka config
at org.apache.kafka.common.security.kerberos.KerberosLogin.getServiceName(KerberosLogin.java:301)
at org.apache.kafka.common.security.kerberos.KerberosLogin.configure(KerberosLogin.java:92)
at org.apache.kafka.common.security.authenticator.LoginManager.<init>(LoginManager.java:60)
at org.apache.kafka.common.security.authenticator.LoginManager.acquireLoginManager(LoginManager.java:104)
at org.apache.kafka.common.network.SaslChannelBuilder.configure(SaslChannelBuilder.java:149)
... 10 more

While searching the usual channels for help, it is suggested that this error is caused by the JAAS config not being visible to the producer so I am quite confident this is merely a configuration issue on my side. Unless I am missing something, I would expect SASL authentication to be available to the producer since the configuration suggests that the configuration options are available.

# The kafka producer has a variety of possible configuration options defined at
# https://kafka.apache.org/documentation/#producerconfigs

Does anyone have any experience with this issue?

This was caused by a simple typo in the configuration

"sasl.mechanisms" = "PLAIN"

Should have been

"sasl.mechanism" = "PLAIN"

This solved the issue.