Hi. I am stuck on setting my storage wherein I want to sink my enriched events from NSQ to Elasticsearch. When I try to pipe ES loader, no data can be sent to ES from NSQ enriched topic. I even tried to sink it to stdout and still no luck. May I know if there’s any misconfiguration on my setting? NSQ and ES is running on the same server (on-prem). Verified NSQ enrich is working.
elasticsearch.conf
# Copyright (c) 2014-2016 Snowplow Analytics Ltd. All rights reserved.
#
# This program is licensed to you under the Apache License Version 2.0, and
# you may not use this file except in compliance with the Apache License
# Version 2.0. You may obtain a copy of the Apache License Version 2.0 at
# http://www.apache.org/licenses/LICENSE-2.0.
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the Apache License Version 2.0 is distributed on an "AS
# IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied. See the Apache License Version 2.0 for the specific language
# governing permissions and limitations there under.
# This file (config.hocon.sample) contains a template with
# configuration options for the Elasticsearch Loader.
# Sources currently supported are:
# "kinesis" for reading records from a Kinesis stream
# "stdin" for reading unencoded tab-separated events from stdin
# If set to "stdin", JSON documents will not be sent to Elasticsearch
# but will be written to stdout.
# "nsq" for reading unencoded tab-separated events from NSQ
source = nsq
# Where to write good and bad records
sink {
# Sinks currently supported are:
# "elasticsearch" for writing good records to Elasticsearch
# "stdout" for writing good records to stdout
good = "elasticsearch"
# Sinks currently supported are:
# "kinesis" for writing bad records to Kinesis
# "stderr" for writing bad records to stderr
# "nsq" for writing bad records to NSQ
# "none" for ignoring bad records
bad = "stderr"
}
# "good" for a stream of successfully enriched events
# "bad" for a stream of bad events
# "plain-json" for writing plain json
enabled = "good"
# The following are used to authenticate for the Amazon Kinesis sink.
#
# If both are set to "default", the default provider chain is used
# (see http://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/auth/DefaultAWSCredentialsProviderChain.html)
#
# If both are set to "iam", use AWS IAM Roles to provision credentials.
#
# If both are set to "env", use environment variables AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY
aws {
accessKey = iam
secretKey = iam
}
# config for NSQ
nsq {
# Channel name for NSQ source
# If more than one application reading from the same NSQ topic at the same time,
# all of them must have unique channel name for getting all the data from the same topic
channelName = "nsq_to_file"
# Host name for NSQ tools
host = "IP"
# HTTP port for nsqd
port = 4151
# HTTP port for nsqlookupd
lookupPort = 4161
}
kinesis {
# "LATEST": most recent data.
# "TRIM_HORIZON": oldest available data.
# "AT_TIMESTAMP": Start from the record at or after the specified timestamp
# Note: This only affects the first run of this application on a stream.
initialPosition = "initialPosition"
# Need to be specified when initial-position is "AT_TIMESTAMP".
# Timestamp format need to be in "yyyy-MM-ddTHH:mm:ssZ".
# Ex: "2017-05-17T10:00:00Z"
# Note: Time need to specified in UTC.
initialTimestamp = "initialTimestamp"
# Maximum number of records to get from Kinesis per call to GetRecords
maxRecords = "10"
# Region where the Kinesis stream is located
region = ""
# "appName" is used for a DynamoDB table to maintain stream state.
# You can set it automatically using: "SnowplowElasticsearchSink-${sink.kinesis.in.stream-name}"
appName = "none"
}
# Common configuration section for all stream sources
streams {
inStreamName = "good_enriched"
# Stream for enriched events which are rejected by Elasticsearch
outStreamName = "bad"
# Events are accumulated in a buffer before being sent to Elasticsearch.
# The buffer is emptied whenever:
# - the combined size of the stored records exceeds byteLimit or
# - the number of stored records exceeds recordLimit or
# - the time in milliseconds since it was last emptied exceeds timeLimit
buffer {
byteLimit = 3000 # Not supported by NSQ, will be ignored
recordLimit = 500
timeLimit = 60000 # Not supported by NSQ, will be ignored
}
}
elasticsearch {
# Events are indexed using an Elasticsearch Client
# - endpoint: the cluster endpoint
# - port: the port the cluster can be accessed on
# - for http this is usually 9200
# - for transport this is usually 9300
# - max-timeout: the maximum attempt time before a client restart
# - ssl: if using the http client, whether to use ssl or not
client {
endpoint = "127.0.0.1"
port = "9200"
maxTimeout = "500"
ssl = false
}
# When using the AWS ES service
# - signing: if using the http client and the AWS ES service you can sign your requests
# http://docs.aws.amazon.com/general/latest/gr/signing_aws_api_requests.html
# - region where the AWS ES service is located
aws {
signing = false
region = ""
}
# index: the Elasticsearch index name
# type: the Elasticsearch index type
cluster {
name = "elasticsearch"
index = "snowplow"
clusterType = "elasticsearch"
}
}
My NSQ config is using the latest collector and enricher versions. I am also using the iglu resolver from snowplow.
java -jar snowplow-stream-collector-0.11.0.jar --config nsq-collector.conf | java -jar snowplow-stream-enrich-0.12.0.jar --config nsq-enrich.conf --resolver file:resolver-enrich.json | java -jar snowplow-elasticsearch-loader-http-0.10.1.jar --config elasticsearch.conf
Just to add, I tried using stdin as source using old version for collector and enricher and using the latest ES loader version; everything works fine.
java -jar snowplow-stream-collector-0.10.0.jar --config stdout-collector.conf | java -jar snowplow-stream-enrich-0.11.0.jar --config stdin-enrich.conf --resolver file:resolver-enrich.json | java -jar snowplow-elasticsearch-loader-http-0.10.1.jar --config es-working-stdin.conf
NSQ setup: followed quickstart
ES version: 5.5.3
I am new to snowplow so this question is really dumb. Hahaha.
Hoping for your suggestions on this.
Thanks!