Enriched NSQ Events to Elasticsearch is Not Working

Hi. I am stuck on setting my storage wherein I want to sink my enriched events from NSQ to Elasticsearch. When I try to pipe ES loader, no data can be sent to ES from NSQ enriched topic. I even tried to sink it to stdout and still no luck. May I know if there’s any misconfiguration on my setting? NSQ and ES is running on the same server (on-prem). Verified NSQ enrich is working.

elasticsearch.conf

# Copyright (c) 2014-2016 Snowplow Analytics Ltd. All rights reserved.
#
# This program is licensed to you under the Apache License Version 2.0, and
# you may not use this file except in compliance with the Apache License
# Version 2.0.  You may obtain a copy of the Apache License Version 2.0 at
# http://www.apache.org/licenses/LICENSE-2.0.
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the Apache License Version 2.0 is distributed on an "AS
# IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.  See the Apache License Version 2.0 for the specific language
# governing permissions and limitations there under.

# This file (config.hocon.sample) contains a template with
# configuration options for the Elasticsearch Loader.

# Sources currently supported are:
# "kinesis" for reading records from a Kinesis stream
# "stdin" for reading unencoded tab-separated events from stdin
# If set to "stdin", JSON documents will not be sent to Elasticsearch
# but will be written to stdout.
# "nsq" for reading unencoded tab-separated events from NSQ
source = nsq

# Where to write good and bad records
sink {
  # Sinks currently supported are:
  # "elasticsearch" for writing good records to Elasticsearch
  # "stdout" for writing good records to stdout
  good = "elasticsearch"

  # Sinks currently supported are:
  # "kinesis" for writing bad records to Kinesis
  # "stderr" for writing bad records to stderr
  # "nsq" for writing bad records to NSQ    
  # "none" for ignoring bad records
  bad = "stderr"
}

# "good" for a stream of successfully enriched events
# "bad" for a stream of bad events
# "plain-json" for writing plain json
enabled = "good"

# The following are used to authenticate for the Amazon Kinesis sink.
#
# If both are set to "default", the default provider chain is used
# (see http://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/auth/DefaultAWSCredentialsProviderChain.html)
#
# If both are set to "iam", use AWS IAM Roles to provision credentials.
#
# If both are set to "env", use environment variables AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY
aws {
  accessKey = iam
  secretKey = iam
}

# config for NSQ
nsq {
  # Channel name for NSQ source
  # If more than one application reading from the same NSQ topic at the same time,
  # all of them must have unique channel name for getting all the data from the same topic
  channelName = "nsq_to_file"
     
  # Host name for NSQ tools
  host = "IP"

  # HTTP port for nsqd
  port = 4151

  # HTTP port for nsqlookupd
  lookupPort = 4161
}

kinesis {
  # "LATEST": most recent data.
  # "TRIM_HORIZON": oldest available data.
  # "AT_TIMESTAMP": Start from the record at or after the specified timestamp
  # Note: This only affects the first run of this application on a stream.
  initialPosition = "initialPosition"

  # Need to be specified when initial-position is "AT_TIMESTAMP".
  # Timestamp format need to be in "yyyy-MM-ddTHH:mm:ssZ".
  # Ex: "2017-05-17T10:00:00Z"
  # Note: Time need to specified in UTC.
  initialTimestamp = "initialTimestamp"

  # Maximum number of records to get from Kinesis per call to GetRecords
  maxRecords = "10"

  # Region where the Kinesis stream is located
  region = ""

  # "appName" is used for a DynamoDB table to maintain stream state.
  # You can set it automatically using: "SnowplowElasticsearchSink-${sink.kinesis.in.stream-name}"
  appName = "none"
}

# Common configuration section for all stream sources
streams {
  inStreamName = "good_enriched"
  
  # Stream for enriched events which are rejected by Elasticsearch
  outStreamName = "bad"

  # Events are accumulated in a buffer before being sent to Elasticsearch.
  # The buffer is emptied whenever:
  # - the combined size of the stored records exceeds byteLimit or
  # - the number of stored records exceeds recordLimit or
  # - the time in milliseconds since it was last emptied exceeds timeLimit
  buffer {
    byteLimit = 3000 # Not supported by NSQ, will be ignored
    recordLimit = 500
    timeLimit = 60000 # Not supported by NSQ, will be ignored
  }
}

elasticsearch {

  # Events are indexed using an Elasticsearch Client
  # - endpoint: the cluster endpoint
  # - port: the port the cluster can be accessed on
  #   - for http this is usually 9200
  #   - for transport this is usually 9300
  # - max-timeout: the maximum attempt time before a client restart
  # - ssl: if using the http client, whether to use ssl or not
  client {
    endpoint = "127.0.0.1"
    port = "9200"
    maxTimeout = "500"
    ssl = false
  }

  # When using the AWS ES service
  # - signing: if using the http client and the AWS ES service you can sign your requests
  #    http://docs.aws.amazon.com/general/latest/gr/signing_aws_api_requests.html
  # - region where the AWS ES service is located
  aws {
    signing = false
    region = ""
  }

  # index: the Elasticsearch index name
  # type: the Elasticsearch index type
  cluster {
    name = "elasticsearch"
    index = "snowplow"
    clusterType = "elasticsearch"
  }
}

My NSQ config is using the latest collector and enricher versions. I am also using the iglu resolver from snowplow.

java -jar snowplow-stream-collector-0.11.0.jar --config nsq-collector.conf | java -jar snowplow-stream-enrich-0.12.0.jar --config nsq-enrich.conf --resolver file:resolver-enrich.json | java -jar snowplow-elasticsearch-loader-http-0.10.1.jar --config elasticsearch.conf

Just to add, I tried using stdin as source using old version for collector and enricher and using the latest ES loader version; everything works fine.

java -jar snowplow-stream-collector-0.10.0.jar --config stdout-collector.conf | java -jar snowplow-stream-enrich-0.11.0.jar --config stdin-enrich.conf --resolver file:resolver-enrich.json | java -jar snowplow-elasticsearch-loader-http-0.10.1.jar --config es-working-stdin.conf

NSQ setup: followed quickstart
ES version: 5.5.3

I am new to snowplow so this question is really dumb. Hahaha.
Hoping for your suggestions on this.

Thanks!

Hello, at a glance, nothing seems wrong with your configuration except maybe for nsq.host which should be 127.0.0.1 instead of IP.

If that’s not it, could you share your collector and stream enrich configuration?

Hi Ben,

I also tried changing it to 127.0.0.1 but still the same. Here is my collector and enrich config.

nsq-collector.conf

# Copyright (c) 2013-2017 Snowplow Analytics Ltd. All rights reserved.
#
# This program is licensed to you under the Apache License Version 2.0, and
# you may not use this file except in compliance with the Apache License
# Version 2.0.  You may obtain a copy of the Apache License Version 2.0 at
# http://www.apache.org/licenses/LICENSE-2.0.
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the Apache License Version 2.0 is distributed on an "AS
# IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.  See the Apache License Version 2.0 for the specific language
# governing permissions and limitations there under.

# This file (application.conf.example) contains a template with
# configuration options for the Scala Stream Collector.
#
# To use, copy this to 'application.conf' and modify the configuration options.

# 'collector' contains configuration options for the main Scala collector.
collector {
  # The collector runs as a web service specified on the following
  # interface and port.
  interface = "IP"
  port = 8084

  # Configure the P3P policy header.
  p3p {
    policyRef = "/w3c/p3p.xml"
    CP = "NOI DSP COR NID PSA OUR IND COM NAV STA"
  }

  # The collector returns a cookie to clients for user identification
  # with the following domain and expiration.
  cookie {
    enabled = true
    expiration = "365 days" # e.g. "365 days"
    # Network cookie name
    name = sp
    # The domain is optional and will make the cookie accessible to other
    # applications on the domain. Comment out this line to tie cookies to
    # the collector's full domain
    domain = "test.com"
  }

  # When enabled and the cookie specified above is missing, performs a redirect to itself to check
  # if third-party cookies are blocked using the specified name. If they are indeed blocked,
  # fallbackNetworkId is used instead of generating a new random one.
  cookieBounce {
    enabled = false
    # The name of the request parameter which will be used on redirects checking that third-party
    # cookies work.
    name = "n3pc"
    # Network user id to fallback to when third-party cookies are blocked.
    fallbackNetworkUserId = "00000000-0000-4000-A000-000000000000"
  }

  streams {
    # Events which have successfully been collected will be stored in the good stream/topic
    good = "good"

    # Events that are too big (w.r.t Kinesis 1MB limit) will be stored in the bad stream/topic
    bad = "bad"

    # Whether to use the incoming event's ip as the partition key for the good stream/topic
    # Note: Nsq does not make use of partition key.
    useIpAddressAsPartitionKey = false
    # Enable the chosen sink by uncommenting the appropriate configuration
    sink {
      # Choose between kinesis, kafka, nsq, or stdout
      # To use stdout comment everything
      enabled = nsq
      # Region where the streams are located
      region = ""

      # Thread pool size for Kinesis API requests
      threadPoolSize = 10

      # The following are used to authenticate for the Amazon Kinesis sink.
      # If both are set to 'default', the default provider chain is used
      # (see http://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/auth/DefaultAWSCredentialsProviderChain.html)
      # If both are set to 'iam', use AWS IAM Roles to provision credentials.
      # If both are set to 'env', use environment variables AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY
      aws {
        accessKey = iam
        secretKey = iam
      }

      # Minimum and maximum backoff periods
      backoffPolicy {
        minBackoff = 3000
        maxBackoff = 600000
      }

      # Or Kafka
      #brokers = "{{kafkaBrokers}}"
      ## Number of retries to perform before giving up on sending a record
      #retries = 0

      # Or NSQ
      ## Host name for NSQ tools
      host = "127.0.0.1"
      ## TCP port for nsqd, 4150 by default
      port = 4150
    }

    # Incoming events are stored in a buffer before being sent to Kinesis/Kafka.
    # Note: Buffering is not supported by NSQ.
    # The buffer is emptied whenever:
    # - the number of stored records reaches record-limit or
    # - the combined size of the stored records reaches byte-limit or
    # - the time in milliseconds since the buffer was last emptied reaches time-limit
    buffer {
      byteLimit = 400000
      recordLimit = 500 # Not supported by Kafka; will be ignored
      timeLimit = 600000
    }
  }
}

# Akka has a variety of possible configuration options defined at
# http://doc.akka.io/docs/akka/current/scala/general/configuration.html
akka {
  loglevel = DEBUG # 'OFF' for no logging, 'DEBUG' for all logging.
  loggers = ["akka.event.slf4j.Slf4jLogger"]

  # akka-http is the server the Stream collector uses and has configurable options defined at
  # http://doc.akka.io/docs/akka-http/current/scala/http/configuration.html
  http.server {
    # To obtain the hostname in the collector, the 'remote-address' header
    # should be set. By default, this is disabled, and enabling it
    # adds the 'Remote-Address' header to every request automatically.
    remote-address-header = on

    raw-request-uri-header = on

    # Define the maximum request length (the default is 2048)
    parsing {
      max-uri-length = 32768
      uri-parsing-mode = relaxed
    }
  }
}

nsq-enrich.conf

# Copyright (c) 2013-2017 Snowplow Analytics Ltd. All rights reserved.
#
# This program is licensed to you under the Apache License Version 2.0, and
# you may not use this file except in compliance with the Apache License
# Version 2.0.  You may obtain a copy of the Apache License Version 2.0 at
# http://www.apache.org/licenses/LICENSE-2.0.
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the Apache License Version 2.0 is distributed on an "AS
# IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.  See the Apache License Version 2.0 for the specific language
# governing permissions and limitations there under.

# This file (application.conf.example) contains a template with
# configuration options for Stream Enrich.

enrich {
  # Sources currently supported are:
  # 'kinesis' for reading Thrift-serialized records from a Kinesis stream
  # 'kafka' for reading Thrift-serialized records from a Kafka topic
  # 'nsq' for reading Thrift-serialized records from a Nsq topic
  # 'stdin' for reading Base64-encoded Thrift-serialized records from stdin
  source = nsq

  # Sinks currently supported are:
  # 'kinesis' for writing enriched events to one Kinesis stream and invalid events to another.
  # 'kafka' for writing enriched events to one Kafka topic and invalid events to another.
  # 'nsq' for writing enriched events to one Nsq topic and invalid events to another.
  # 'stdouterr' for writing enriched events to stdout and invalid events to stderr.
  #    Using "sbt assembly" and "java -jar" is recommended to disable sbt logging.
  sink = nsq

  # AWS credentials
  # If both are set to 'default', use the default AWS credentials provider chain.
  # If both are set to 'iam', use AWS IAM Roles to provision credentials.
  # If both are set to 'env', use environment variables AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY
  aws {
    accessKey = iam
    secretKey = iam
  }

  streams {
    in {
      # Stream/topic where the raw events to be enriched are located
      raw = "good"
    }

    out {
      # Stream/topic where the events that were successfully enriched will end up
      enriched = "good_enriched"
      # Stream/topic where the event that failed enrichment will be stored
      bad = "bad_enriched"

      # How the output stream/topic will be partitioned.
      # Possible partition keys are: event_id, event_fingerprint, domain_userid, network_userid,
      # user_ipaddress, domain_sessionid, user_fingerprint.
      # Refer to https://github.com/snowplow/snowplow/wiki/canonical-event-model to know what the
      # possible parittion keys correspond to.
      # Otherwise, the partition key will be a random UUID.
      # Note: Nsq does not make use of partition key.
      partitionKey = ""
    }

    kinesis {
      # Region where the streams are located
      region = ""

      # Maximum number of records to get from Kinesis per call to GetRecords
      maxRecords = 10000

      # LATEST: most recent data.
      # TRIM_HORIZON: oldest available data.
      # "AT_TIMESTAMP": Start from the record at or after the specified timestamp
      # Note: This only effects the first run of this application on a stream.
      initialPosition = TRIM_HORIZON

      # Need to be specified when initial-position is "AT_TIMESTAMP".
      # Timestamp format need to be in "yyyy-MM-ddTHH:mm:ssZ".
      # Ex: "2017-05-17T10:00:00Z"
      # Note: Time need to specified in UTC.
      initialTimestamp = ""

      # Minimum and maximum backoff periods, in milliseconds
      backoffPolicy {
        minBackoff = 3000
        maxBackoff = 600000
      }
    }

    # Kafka configuration
    kafka {
      brokers = ""

      # Number of retries to perform before giving up on sending a record
      retries = 0
    }

    # config for nsq
    nsq {
      # Channel name for nsq source
      # If more than one application is reading from the same NSQ topic at the same time,
      # all of them must have the same channel name
      rawChannel = "nsq_to_file"

      # Host name for nsqd
      host = "127.0.0.1"

      # TCP port for nsqd, 4150 by default
      port = 4150

      # Host name for lookupd
      lookupHost = "127.0.0.1"

      # HTTP port for nsqlookupd, 4161 by default
      lookupPort = 4161
    }

    # After enrichment, events are accumulated in a buffer before being sent to Kinesis/Kafka.
    # Note: Buffering is not supported by NSQ.
    # The buffer is emptied whenever:
    # - the number of stored records reaches recordLimit or
    # - the combined size of the stored records reaches byteLimit or
    # - the time in milliseconds since it was last emptied exceeds timeLimit when
    #   a new event enters the buffer
    buffer {
      byteLimit = "400000"
      recordLimit = "500" # Not supported by Kafka; will be ignored
      timeLimit = "6000000"
    }

    # Used for a DynamoDB table to maintain stream state.
    # Used as the Kafka consumer group ID.
    # You can set it automatically using: "SnowplowEnrich-$\\{enrich.streams.in.raw\\}"
    appName = ""
  
  }
}

When running the loader, there was no error as well so I find it hard to pinpoint what’s wrong.

I also noticed from NSQ when I run the loader that it’s able to locate that topic since JavaNSQClient user-agent appears. However, it does not output anything on my shell.

You seem to be using the same channelName for the loader and stream enrich which I don’t think is the way to go.

If the issue doesn’t come from that I would try to run each component in isolation.

e.g.:

  • run the collector, check what’s in the nsq topic
  • if what’s in the collector is validated, run stream enrich and check what ends up in the good and bad topics
  • if everything is fine run the loader

Hi Ben,

Followed you suggestions and changed the channelName for both. Collector and Enrich is working fine. Running the loader, seems it’s able to get the messages from enriched topic as checked from NSQ UI, the channels showing on good_enriched topic:

  1. channel assigned for good_enriched (nsq_to_file/1.0.0-compat go-nsq/1.0.6 user-agent)
  2. channel used by ES loader (JavaNSQClient user-agent)
  3. nsq_to_file channel which just show up (nsq_to_file/1.0.0-compat go-nsq/1.0.6 user-agent)

I set the loader output to stdout and tested. The messages are being received on the channel used by ES loader, however, still no output from my terminal.

Can’t say that I have encountered this issue in the past.

Also have you tried cleaning your NSQ data and starting from scratch in this regard? Maybe channel names are conflicting / interfering?

This might not be related but I see an issue with your collector’s sink configuration.

If you want nsq enabled it should be:

    sink {
      enabled = nsq

      host = "127.0.0.1"
      port = 4150
    }

with the Kinesis stuff removed.

1 Like