Unable to connect ELK stack on aws from Elasticsearch Loader

I am not sure whether the given sample config is valid or not - https://raw.githubusercontent.com/snowplow/snowplow-elasticsearch-loader/master/examples/config.hocon.sample

Elastic loader version: snowplow-elasticsearch-loader-http-0.10.1.jar

I faced configuration error related to kinesis & nsq. Solved that by adding kinesis & nsq section.

Following is the elastic loader configuration I am using -

########################################################################

source = kinesis

sink {

good = “elasticsearch”

bad = “stderr”
}

enabled = “good”

aws {
accessKey = XXXXXXXXXXXXXXXXXXX
secretKey = “XXXXXXXXXXXXXXXXXXX”
}

kinesis {

initialPosition = “LATEST”
initialTimestamp = "{{initialTimestamp}}
maxRecords = 10000**
region = “us-east-1”
appName = “ELK-Loader”
}

nsq {
channelName = “dummy”
nsqdHost = “dummy”

nsqdPort = 0
nsqlookupdHost = “dummy”
nsqlookupdPort = 0
}

queue {

enabled=“kinesis”
initialPosition = “{{initialPosition}}”

initialTimestamp = “{{initialTimestamp}}”

maxRecords = 10000

region = “us-east-1”

appName = “ELK-Loader”

channelName = “dummy”

nsqdHost = “dummy”

nsqdPort = 0

nsqlookupdHost = “dummy”

nsqlookupdPort = 0
}

streams {
inStreamName = “rr-dev-snowplow-enriched-good”

outStreamName = “rr-dev-snowplow-enriched-bad”

buffer {
byteLimit = 4500000 # Not supported by NSQ, will be ignored
recordLimit = 500
timeLimit = 60000 # Not supported by NSQ, will be ignored
}
}

elasticsearch {

client {
endpoint = “https://search-rr-snowplow-events-3ks5yicuiihikty5mhvl3tdwuq.us-east-1.es.amazonaws.com
port = “9200”
username = “XXXXXXXXXX”
password = “XXXXXXXXXX”
shardDateFormat = “{{elasticsearchShardDateFormat}}”
shardDateField = “{{elasticsearchShardDateField}}”
maxTimeout = “6000”
maxRetries = 5
ssl = false
}

aws {
signing = false
region = “us-east-1”
}

cluster {
name = “135947549772:rr-snowplow-events”
index = “snowplow”
clusterType = “enriched”
}
}

####################################################################

Also facing the following error on running -

$ java -jar snowplow-elasticsearch-loader-http-0.10.2.jar --config rr_snowplow_elk_loader.conf

Exception in thread “main” java.lang.RuntimeException: Invalid hosts/ports https://search-rr-snowplow-events-3ks5yicuiihikty5mhvl3tdwuq.us-east-1.es.amazonaws.com:9200
at scala.sys.package$.error(package.scala:27)
at com.sksamuel.elastic4s.ElasticsearchClientUri$$anonfun$3.apply(ElasticsearchClientUri.scala:23)
at com.sksamuel.elastic4s.ElasticsearchClientUri$$anonfun$3.apply(ElasticsearchClientUri.scala:21)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:186)
at com.sksamuel.elastic4s.ElasticsearchClientUri$.apply(ElasticsearchClientUri.scala:21)
at com.snowplowanalytics.elasticsearch.loader.clients.ElasticsearchSenderHTTP.(ElasticsearchSenderHTTP.scala:55)
at com.snowplowanalytics.elasticsearch.loader.ElasticsearchHTTPSinkApp$.elasticsearchSender$lzycompute(ElasticsearchHTTPSinkApp.scala:39)
at com.snowplowanalytics.elasticsearch.loader.ElasticsearchHTTPSinkApp$.elasticsearchSender(ElasticsearchHTTPSinkApp.scala:38)
at com.snowplowanalytics.elasticsearch.loader.ElasticsearchSinkApp$class.run(ElasticsearchSinkApp.scala:168)
at com.snowplowanalytics.elasticsearch.loader.ElasticsearchHTTPSinkApp$.run(ElasticsearchHTTPSinkApp.scala:25)
at com.snowplowanalytics.elasticsearch.loader.ElasticsearchHTTPSinkApp$.delayedEndpoint$com$snowplowanalytics$elasticsearch$loader$ElasticsearchHTTPSinkApp$1(ElasticsearchHTTPSinkApp.scala:41)
at com.snowplowanalytics.elasticsearch.loader.ElasticsearchHTTPSinkApp$delayedInit$body.apply(ElasticsearchHTTPSinkApp.scala:25)
at scala.Function0$class.apply$mcV$sp(Function0.scala:34)
at scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:12)
at scala.App$$anonfun$main$1.apply(App.scala:76)
at scala.App$$anonfun$main$1.apply(App.scala:76)
at scala.collection.immutable.List.foreach(List.scala:392)
at scala.collection.generic.TraversableForwarder$class.foreach(TraversableForwarder.scala:35)
at scala.App$class.main(App.scala:76)
at com.snowplowanalytics.elasticsearch.loader.ElasticsearchHTTPSinkApp$.main(ElasticsearchHTTPSinkApp.scala:25)
at com.snowplowanalytics.elasticsearch.loader.ElasticsearchHTTPSinkApp.main(ElasticsearchHTTPSinkApp.scala)

At first glance I notice that your client.endpoint starts with https:// yet you have client.ssl = false. Also are you sure you have the correct Security Group rules allowing traffic from the from the instance you’re running es-loader on to the ELK endpoint?

Hi @brad.inscoe, I tried with ssl=true, the result is the same (Invalid host/port). Then I tried changing the port to 443 and still got the same result Invalid host/port. I am not sure which port to use as in documentation it is mentioned 9200 for http but GET request to endpoint from the browser work fine and I get the cluster details.
Regarding the security group the instance on which the ES Loader running have a default security group and I am not sure what security group the ELK stack is using.

Hi @ihor, Can you help me out here ?

@brad.inscoe My curl command from the instance where elastic loader is working. But the elastic loader is not able to connect

Figured it out I was using endpoint as “https://search-rr-snowplow-events-3ks5yicuiihikty5mhvl3tdwuq.us-east-1.es.amazonaws.com" instead of just “search-rr-snowplow-events-3ks5yicuiihikty5mhvl3tdwuq.us-east-1.es.amazonaws.com” . It does not expect http/https to present

Well now I am facing the following issues, this happens whenever I push something into my kinesis good stream (inStreamName parameter of ES Loader config) -

java -jar snowplow-elasticsearch-loader-http-0.10.2.jar --config rr_snowplow_elk_loader.conf

[main] INFO com.sksamuel.elastic4s.http.HttpClient$ - Creating HTTP client on http://search-rr-snowplow-events-dev-ysd6kroeyqxvgbwefcnqmrv42a.us-east-1.es.amazonaws.com:9200
[main] INFO com.amazonaws.services.kinesis.leases.impl.LeaseCoordinator - With failover time 30000 ms and epsilon 25 ms, LeaseCoordinator will renew leases every 9975 ms, takeleases every 60050 ms, process maximum of 2147483647 leases and steal 1 lease(s) at a time.
[main] WARN com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker - Received configuration for both region name as us-east-1, and Amazon Kinesis endpoint as https://kinesis.us-east-1.amazonaws.com. Amazon Kinesis endpoint will overwrite region name.
[main] INFO com.snowplowanalytics.elasticsearch.loader.KinesisSourceExecutor - KinesisSourceExecutor worker created
[main] INFO com.amazonaws.services.kinesis.connectors.KinesisConnectorExecutorBase - Starting worker in KinesisSourceExecutor
[main] INFO com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker - Initialization attempt 1
[main] INFO com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker - Initializing LeaseCoordinator
[main] INFO com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker - Syncing Kinesis shard info
[main] INFO com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker - Starting LeaseCoordinator
[LeaseCoordinator-1] INFO com.amazonaws.services.kinesis.leases.impl.LeaseTaker - Worker 77d9bd70d61746a9:-46e68ee4:170a2208f86:-8000 needed 1 leases but none were expired, so it will steal lease shardId-000000000001 from 7f1050f9c3e0e095:-6d3936c4:170a21c225b:-8000
[LeaseCoordinator-1] INFO com.amazonaws.services.kinesis.leases.impl.LeaseTaker - Worker 77d9bd70d61746a9:-46e68ee4:170a2208f86:-8000 saw 2 total leases, 0 available leases, 2 workers. Target is 1 leases, I have 0 leases, I will take 1 leases
[LeaseCoordinator-1] INFO com.amazonaws.services.kinesis.leases.impl.LeaseTaker - Worker 77d9bd70d61746a9:-46e68ee4:170a2208f86:-8000 successfully took 1 leases: shardId-000000000001
[main] INFO com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker - Initialization complete. Starting worker loop.
[main] INFO com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker - Created new shardConsumer for : ShardInfo [shardId=shardId-000000000001, concurrencyToken=d830b290-b6ae-4a1e-9d4b-e9a7b03c8675, parentShardIds=, checkpoint={SequenceNumber: LATEST,SubsequenceNumber: 0}]
[RecordProcessor-0000] INFO com.amazonaws.services.kinesis.clientlibrary.lib.worker.BlockOnParentShardTask - No need to block on parents of shard shardId-000000000001
[RecordProcessor-0000] INFO com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisDataFetcher - Initializing shard shardId-000000000001 with LATEST
[main] INFO com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker - Current stream shard assignments: shardId-000000000001
[main] INFO com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker - Sleeping …
[LeaseCoordinator-2] INFO com.amazonaws.services.kinesis.leases.impl.LeaseTaker - Worker 77d9bd70d61746a9:-46e68ee4:170a2208f86:-8000 saw 2 total leases, 1 available leases, 1 workers. Target is 2 leases, I have 1 leases, I will take 1 leases
[LeaseCoordinator-2] INFO com.amazonaws.services.kinesis.leases.impl.LeaseTaker - Worker 77d9bd70d61746a9:-46e68ee4:170a2208f86:-8000 successfully took 1 leases: shardId-000000000000
[main] INFO com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker - Created new shardConsumer for : ShardInfo [shardId=shardId-000000000000, concurrencyToken=792216ca-8358-435b-b596-2059173a604d, parentShardIds=, checkpoint={SequenceNumber: LATEST,SubsequenceNumber: 0}]
[RecordProcessor-0001] INFO com.amazonaws.services.kinesis.clientlibrary.lib.worker.BlockOnParentShardTask - No need to block on parents of shard shardId-000000000000
[RecordProcessor-0001] INFO com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisDataFetcher - Initializing shard shardId-000000000000 with LATEST

[pool-2-thread-4] ERROR com.snowplowanalytics.elasticsearch.loader.clients.ElasticsearchSenderHTTP - ElasticsearchSender threw an unexpected exception
java.net.ConnectException
at org.apache.http.nio.pool.RouteSpecificPool.timeout(RouteSpecificPool.java:168)
at org.apache.http.nio.pool.AbstractNIOConnPool.requestTimeout(AbstractNIOConnPool.java:561)
at org.apache.http.nio.pool.AbstractNIOConnPool$InternalSessionRequestCallback.timeout(AbstractNIOConnPool.java:822)
at org.apache.http.impl.nio.reactor.SessionRequestImpl.timeout(SessionRequestImpl.java:183)
at org.apache.http.impl.nio.reactor.DefaultConnectingIOReactor.processTimeouts(DefaultConnectingIOReactor.java:210)
at org.apache.http.impl.nio.reactor.DefaultConnectingIOReactor.processEvents(DefaultConnectingIOReactor.java:155)
at org.apache.http.impl.nio.reactor.AbstractMultiworkerIOReactor.execute(AbstractMultiworkerIOReactor.java:348)
at org.apache.http.impl.nio.conn.PoolingNHttpClientConnectionManager.execute(PoolingNHttpClientConnectionManager.java:192)
at org.apache.http.impl.nio.client.CloseableHttpAsyncClientBase$1.run(CloseableHttpAsyncClientBase.java:64)
at java.base/java.lang.Thread.run(Thread.java:834)

[RecordProcessor-0000] ERROR com.snowplowanalytics.elasticsearch.loader.clients.ElasticsearchSenderHTTP - Shutting down application as unable to connect to Elasticsearch for over 60000 ms
java.net.ConnectException
at org.apache.http.nio.pool.RouteSpecificPool.timeout(RouteSpecificPool.java:168)
at org.apache.http.nio.pool.AbstractNIOConnPool.requestTimeout(AbstractNIOConnPool.java:561)
at org.apache.http.nio.pool.AbstractNIOConnPool$InternalSessionRequestCallback.timeout(AbstractNIOConnPool.java:822)
at org.apache.http.impl.nio.reactor.SessionRequestImpl.timeout(SessionRequestImpl.java:183)
at org.apache.http.impl.nio.reactor.DefaultConnectingIOReactor.processTimeouts(DefaultConnectingIOReactor.java:210)
at org.apache.http.impl.nio.reactor.DefaultConnectingIOReactor.processEvents(DefaultConnectingIOReactor.java:155)
at org.apache.http.impl.nio.reactor.AbstractMultiworkerIOReactor.execute(AbstractMultiworkerIOReactor.java:348)
at org.apache.http.impl.nio.conn.PoolingNHttpClientConnectionManager.execute(PoolingNHttpClientConnectionManager.java:192)
at org.apache.http.impl.nio.client.CloseableHttpAsyncClientBase$1.run(CloseableHttpAsyncClientBase.java:64)
at java.base/java.lang.Thread.run(Thread.java:834)

Hi @ihor, @brad.inscoe - I am using ElasticSearch version 7.1, Is this version supported?

@Tejas_Behra, to my knowledge, the latest version we use on our managed pipelines is 6.8. Not sure if 7.x is supported.

Hey @ihor, I tried with 6.7 and still getting the same error - [RecordProcessor-0000] ERROR com.snowplowanalytics.elasticsearch.loader.clients.ElasticsearchSenderHTTP - Shutting down application as unable to connect to Elasticsearch for over 60000 ms

Anything I am missing out?

Hi @ihor, sorry but I have one more question for you - does Elastic loader is supported with AWS Elastic search ?

@Tejas_Behra, yes, ES Loader is meant to work with AWS Elasticsearch. I haven’t been involved in setting up this part of the pipeline workflow to advise on the error you face. You might have some sort of permission issue accessing the service.