Unable to connect ELK stack on aws from Elasticsearch Loader

I am not sure whether the given sample config is valid or not - https://raw.githubusercontent.com/snowplow/snowplow-elasticsearch-loader/master/examples/config.hocon.sample

Elastic loader version: snowplow-elasticsearch-loader-http-0.10.1.jar

I faced configuration error related to kinesis & nsq. Solved that by adding kinesis & nsq section.

Following is the elastic loader configuration I am using -

########################################################################

source = kinesis


sink {
 
  good = "elasticsearch"


  bad = "stderr"
}


enabled = "good"

aws {
  accessKey = XXXXXXXXXXXXXXXXXXX
  secretKey = "XXXXXXXXXXXXXXXXXXX"
}

kinesis {

  initialPosition = "LATEST"
  initialTimestamp = "{{initialTimestamp}}
  maxRecords = 10000**
  region = "us-east-1"
  appName = "ELK-Loader"
}

nsq {
 channelName = "dummy"
 nsqdHost = "dummy"

 nsqdPort =  0
 nsqlookupdHost = "dummy"
 nsqlookupdPort = 0
}

queue {
 
  enabled="kinesis"
  initialPosition = "{{initialPosition}}"

  initialTimestamp = "{{initialTimestamp}}"

 
  maxRecords = 10000

  region = "us-east-1"


  appName = "ELK-Loader"


  channelName = "dummy"


  nsqdHost = "dummy"
  
  nsqdPort =  0


  nsqlookupdHost = "dummy"
 
  nsqlookupdPort = 0
}


streams {
  inStreamName = "rr-dev-snowplow-enriched-good"


  outStreamName = "rr-dev-snowplow-enriched-bad"

  buffer {
    byteLimit = 4500000 # Not supported by NSQ, will be ignored
    recordLimit = 500
    timeLimit = 60000 # Not supported by NSQ, will be ignored
  }
}

elasticsearch {


  client {
    endpoint = "https://search-rr-snowplow-events-3ks5yicuiihikty5mhvl3tdwuq.us-east-1.es.amazonaws.com"
    port = "9200"
    username = "XXXXXXXXXX"
    password = "XXXXXXXXXX"
    shardDateFormat = "{{elasticsearchShardDateFormat}}"
    shardDateField = "{{elasticsearchShardDateField}}"
    maxTimeout = "6000"
    maxRetries = 5
    ssl = false
  }


  aws {
    signing = false
    region = "us-east-1"
  }

  
  cluster {
    name = "135947549772:rr-snowplow-events"
    index = "snowplow"
    clusterType = "enriched"
  }
}

####################################################################

Also facing the following error on running -

$ java -jar snowplow-elasticsearch-loader-http-0.10.2.jar --config rr_snowplow_elk_loader.conf

Exception in thread "main" java.lang.RuntimeException: Invalid hosts/ports https://search-rr-snowplow-events-3ks5yicuiihikty5mhvl3tdwuq.us-east-1.es.amazonaws.com:9200
        at scala.sys.package$.error(package.scala:27)
        at com.sksamuel.elastic4s.ElasticsearchClientUri$$anonfun$3.apply(ElasticsearchClientUri.scala:23)
        at com.sksamuel.elastic4s.ElasticsearchClientUri$$anonfun$3.apply(ElasticsearchClientUri.scala:21)
        at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
        at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
        at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
        at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
        at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
        at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:186)
        at com.sksamuel.elastic4s.ElasticsearchClientUri$.apply(ElasticsearchClientUri.scala:21)
        at com.snowplowanalytics.elasticsearch.loader.clients.ElasticsearchSenderHTTP.<init>(ElasticsearchSenderHTTP.scala:55)
        at com.snowplowanalytics.elasticsearch.loader.ElasticsearchHTTPSinkApp$.elasticsearchSender$lzycompute(ElasticsearchHTTPSinkApp.scala:39)
        at com.snowplowanalytics.elasticsearch.loader.ElasticsearchHTTPSinkApp$.elasticsearchSender(ElasticsearchHTTPSinkApp.scala:38)
        at com.snowplowanalytics.elasticsearch.loader.ElasticsearchSinkApp$class.run(ElasticsearchSinkApp.scala:168)
        at com.snowplowanalytics.elasticsearch.loader.ElasticsearchHTTPSinkApp$.run(ElasticsearchHTTPSinkApp.scala:25)
        at com.snowplowanalytics.elasticsearch.loader.ElasticsearchHTTPSinkApp$.delayedEndpoint$com$snowplowanalytics$elasticsearch$loader$ElasticsearchHTTPSinkApp$1(ElasticsearchHTTPSinkApp.scala:41)
        at com.snowplowanalytics.elasticsearch.loader.ElasticsearchHTTPSinkApp$delayedInit$body.apply(ElasticsearchHTTPSinkApp.scala:25)
        at scala.Function0$class.apply$mcV$sp(Function0.scala:34)
        at scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:12)
        at scala.App$$anonfun$main$1.apply(App.scala:76)
        at scala.App$$anonfun$main$1.apply(App.scala:76)
        at scala.collection.immutable.List.foreach(List.scala:392)
        at scala.collection.generic.TraversableForwarder$class.foreach(TraversableForwarder.scala:35)
        at scala.App$class.main(App.scala:76)
        at com.snowplowanalytics.elasticsearch.loader.ElasticsearchHTTPSinkApp$.main(ElasticsearchHTTPSinkApp.scala:25)
        at com.snowplowanalytics.elasticsearch.loader.ElasticsearchHTTPSinkApp.main(ElasticsearchHTTPSinkApp.scala)

At first glance I notice that your client.endpoint starts with https:// yet you have client.ssl = false. Also are you sure you have the correct Security Group rules allowing traffic from the from the instance you’re running es-loader on to the ELK endpoint?

Hi @brad.inscoe, I tried with ssl=true, the result is the same (Invalid host/port). Then I tried changing the port to 443 and still got the same result Invalid host/port. I am not sure which port to use as in documentation it is mentioned 9200 for http but GET request to endpoint from the browser work fine and I get the cluster details.
Regarding the security group the instance on which the ES Loader running have a default security group and I am not sure what security group the ELK stack is using.

Hi @ihor, Can you help me out here ?

@brad.inscoe My curl command from the instance where elastic loader is working. But the elastic loader is not able to connect

Figured it out I was using endpoint as “https://search-rr-snowplow-events-3ks5yicuiihikty5mhvl3tdwuq.us-east-1.es.amazonaws.com" instead of just “search-rr-snowplow-events-3ks5yicuiihikty5mhvl3tdwuq.us-east-1.es.amazonaws.com” . It does not expect http/https to present

Well now I am facing the following issues, this happens whenever I push something into my kinesis good stream (inStreamName parameter of ES Loader config) -

java -jar snowplow-elasticsearch-loader-http-0.10.2.jar --config rr_snowplow_elk_loader.conf

[main] INFO com.sksamuel.elastic4s.http.HttpClient$ - Creating HTTP client on http://search-rr-snowplow-events-dev-ysd6kroeyqxvgbwefcnqmrv42a.us-east-1.es.amazonaws.com:9200
[main] INFO com.amazonaws.services.kinesis.leases.impl.LeaseCoordinator - With failover time 30000 ms and epsilon 25 ms, LeaseCoordinator will renew leases every 9975 ms, takeleases every 60050 ms, process maximum of 2147483647 leases and steal 1 lease(s) at a time.
[main] WARN com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker - Received configuration for both region name as us-east-1, and Amazon Kinesis endpoint as https://kinesis.us-east-1.amazonaws.com. Amazon Kinesis endpoint will overwrite region name.
[main] INFO com.snowplowanalytics.elasticsearch.loader.KinesisSourceExecutor - KinesisSourceExecutor worker created
[main] INFO com.amazonaws.services.kinesis.connectors.KinesisConnectorExecutorBase - Starting worker in KinesisSourceExecutor
[main] INFO com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker - Initialization attempt 1
[main] INFO com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker - Initializing LeaseCoordinator
[main] INFO com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker - Syncing Kinesis shard info
[main] INFO com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker - Starting LeaseCoordinator
[LeaseCoordinator-1] INFO com.amazonaws.services.kinesis.leases.impl.LeaseTaker - Worker 77d9bd70d61746a9:-46e68ee4:170a2208f86:-8000 needed 1 leases but none were expired, so it will steal lease shardId-000000000001 from 7f1050f9c3e0e095:-6d3936c4:170a21c225b:-8000
[LeaseCoordinator-1] INFO com.amazonaws.services.kinesis.leases.impl.LeaseTaker - Worker 77d9bd70d61746a9:-46e68ee4:170a2208f86:-8000 saw 2 total leases, 0 available leases, 2 workers. Target is 1 leases, I have 0 leases, I will take 1 leases
[LeaseCoordinator-1] INFO com.amazonaws.services.kinesis.leases.impl.LeaseTaker - Worker 77d9bd70d61746a9:-46e68ee4:170a2208f86:-8000 successfully took 1 leases: shardId-000000000001
[main] INFO com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker - Initialization complete. Starting worker loop.
[main] INFO com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker - Created new shardConsumer for : ShardInfo [shardId=shardId-000000000001, concurrencyToken=d830b290-b6ae-4a1e-9d4b-e9a7b03c8675, parentShardIds=[], checkpoint={SequenceNumber: LATEST,SubsequenceNumber: 0}]
[RecordProcessor-0000] INFO com.amazonaws.services.kinesis.clientlibrary.lib.worker.BlockOnParentShardTask - No need to block on parents [] of shard shardId-000000000001
[RecordProcessor-0000] INFO com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisDataFetcher - Initializing shard shardId-000000000001 with LATEST
[main] INFO com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker - Current stream shard assignments: shardId-000000000001
[main] INFO com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker - Sleeping ...
[LeaseCoordinator-2] INFO com.amazonaws.services.kinesis.leases.impl.LeaseTaker - Worker 77d9bd70d61746a9:-46e68ee4:170a2208f86:-8000 saw 2 total leases, 1 available leases, 1 workers. Target is 2 leases, I have 1 leases, I will take 1 leases
[LeaseCoordinator-2] INFO com.amazonaws.services.kinesis.leases.impl.LeaseTaker - Worker 77d9bd70d61746a9:-46e68ee4:170a2208f86:-8000 successfully took 1 leases: shardId-000000000000
[main] INFO com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker - Created new shardConsumer for : ShardInfo [shardId=shardId-000000000000, concurrencyToken=792216ca-8358-435b-b596-2059173a604d, parentShardIds=[], checkpoint={SequenceNumber: LATEST,SubsequenceNumber: 0}]
[RecordProcessor-0001] INFO com.amazonaws.services.kinesis.clientlibrary.lib.worker.BlockOnParentShardTask - No need to block on parents [] of shard shardId-000000000000
[RecordProcessor-0001] INFO com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisDataFetcher - Initializing shard shardId-000000000000 with LATEST

[pool-2-thread-4] ERROR com.snowplowanalytics.elasticsearch.loader.clients.ElasticsearchSenderHTTP - ElasticsearchSender threw an unexpected exception
java.net.ConnectException
        at org.apache.http.nio.pool.RouteSpecificPool.timeout(RouteSpecificPool.java:168)
        at org.apache.http.nio.pool.AbstractNIOConnPool.requestTimeout(AbstractNIOConnPool.java:561)
        at org.apache.http.nio.pool.AbstractNIOConnPool$InternalSessionRequestCallback.timeout(AbstractNIOConnPool.java:822)
        at org.apache.http.impl.nio.reactor.SessionRequestImpl.timeout(SessionRequestImpl.java:183)
        at org.apache.http.impl.nio.reactor.DefaultConnectingIOReactor.processTimeouts(DefaultConnectingIOReactor.java:210)
        at org.apache.http.impl.nio.reactor.DefaultConnectingIOReactor.processEvents(DefaultConnectingIOReactor.java:155)
        at org.apache.http.impl.nio.reactor.AbstractMultiworkerIOReactor.execute(AbstractMultiworkerIOReactor.java:348)
        at org.apache.http.impl.nio.conn.PoolingNHttpClientConnectionManager.execute(PoolingNHttpClientConnectionManager.java:192)
        at org.apache.http.impl.nio.client.CloseableHttpAsyncClientBase$1.run(CloseableHttpAsyncClientBase.java:64)
        at java.base/java.lang.Thread.run(Thread.java:834)

[RecordProcessor-0000] ERROR com.snowplowanalytics.elasticsearch.loader.clients.ElasticsearchSenderHTTP - Shutting down application as unable to connect to Elasticsearch for over 60000 ms
java.net.ConnectException
        at org.apache.http.nio.pool.RouteSpecificPool.timeout(RouteSpecificPool.java:168)
        at org.apache.http.nio.pool.AbstractNIOConnPool.requestTimeout(AbstractNIOConnPool.java:561)
        at org.apache.http.nio.pool.AbstractNIOConnPool$InternalSessionRequestCallback.timeout(AbstractNIOConnPool.java:822)
        at org.apache.http.impl.nio.reactor.SessionRequestImpl.timeout(SessionRequestImpl.java:183)
        at org.apache.http.impl.nio.reactor.DefaultConnectingIOReactor.processTimeouts(DefaultConnectingIOReactor.java:210)
        at org.apache.http.impl.nio.reactor.DefaultConnectingIOReactor.processEvents(DefaultConnectingIOReactor.java:155)
        at org.apache.http.impl.nio.reactor.AbstractMultiworkerIOReactor.execute(AbstractMultiworkerIOReactor.java:348)
        at org.apache.http.impl.nio.conn.PoolingNHttpClientConnectionManager.execute(PoolingNHttpClientConnectionManager.java:192)
        at org.apache.http.impl.nio.client.CloseableHttpAsyncClientBase$1.run(CloseableHttpAsyncClientBase.java:64)
        at java.base/java.lang.Thread.run(Thread.java:834)

Hi @ihor, @brad.inscoe - I am using ElasticSearch version 7.1, Is this version supported?

@Tejas_Behra, to my knowledge, the latest version we use on our managed pipelines is 6.8. Not sure if 7.x is supported.

Hey @ihor, I tried with 6.7 and still getting the same error - [RecordProcessor-0000] ERROR com.snowplowanalytics.elasticsearch.loader.clients.ElasticsearchSenderHTTP - Shutting down application as unable to connect to Elasticsearch for over 60000 ms

Anything I am missing out?

Hi @ihor, sorry but I have one more question for you - does Elastic loader is supported with AWS Elastic search ?

@Tejas_Behra, yes, ES Loader is meant to work with AWS Elasticsearch. I haven’t been involved in setting up this part of the pipeline workflow to advise on the error you face. You might have some sort of permission issue accessing the service.

Hi @ihor, Is there someone who can help me in solving this issue?

Hey @Tejas_Behra - ill walk through how we generally setup ES Loaders and hopefully there is something in there that can help!

I would also recommend upgrading to the latest 1.0.0 release of the loader.

First up the different components!

Loader HOCON example:

source = "kinesis"

sink {
    good = "elasticsearch"
    bad = "kinesis"
}

enabled = "good"

aws {
    accessKey = iam
    secretKey = iam
}

queue {
  enabled = kinesis

  initialPosition = "LATEST"
  initialTimestamp = ""
  maxRecords = 10000
  region = "<region>" # e.g. eu-west-1
  appName = "<unique app name>" # e.g. com-acme-es-loader-enriched

  channelName = ""
  host = ""
  port = -1
  lookupPort = -1

  disableCloudWatch = true # Disables custom metrics being published
}

streams {
    inStreamName = "<enriched kinesis stream name>"
    outStreamName = "<bad kinesis stream name>"

    buffer {
        byteLimit = 1000000
        recordLimit = 500
        timeLimit = 250
    }
}

elasticsearch {
    client {
        endpoint = "vpc-<internal endpoint>.es.amazonaws.com"
        port = "443"
        maxTimeout = "10000"
        maxRetries = 6
        ssl = true
    }

    aws {
        signing = true
        region = "eu-west-1"
    }

    cluster {
        name = "<simple name of cluster>"
        index = "<good index name>"
        documentType = "good"
    }
}

Note: The documentType links onto the Elasticsearch index mapping and we generally use “good” or “bad” as our documentType’s internally.

Example IAM policy for Loader:

{
  "Version" : "2012-10-17",
  "Statement": [
    {
      "Effect": "Allow",
      "Action": [
        "dynamodb:ListTables",
        "dynamodb:BatchWriteItem",
        "dynamodb:PutItem",
        "dynamodb:CreateTable",
        "dynamodb:DescribeTable",
        "dynamodb:DeleteItem",
        "dynamodb:GetItem",
        "dynamodb:Scan",
        "dynamodb:Query",
        "dynamodb:UpdateItem",
        "kinesis:*",
        "s3:GetObject",
        "cloudwatch:ListMetrics",
        "cloudwatch:PutMetricAlarm",
        "cloudwatch:PutMetricData",
        "ec2:DescribeTags",
        "es:DescribeElasticsearchDomains",
        "es:ListDomainNames",
        "es:DescribeElasticsearchDomain",
        "es:ESHttpGet",
        "es:ESHttpHead",
        "es:ESHttpDelete",
        "es:ESHttpPost",
        "es:ESHttpPut",
        "logs:CreateLogStream",
        "logs:PutLogEvents",
        "logs:DescribeLogStreams"
      ],
      "Resource": ["*"]
    }
  ]
}

Wide open Access Policy for the VPC based AWS Elasticsearch Cluster - you can tighten this up but this will let you send signed requests to the AWS Elasticsearch Cluster:

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Action": "es:*",
            "Principal": "*",
            "Effect": "Allow",
            "Resource": "arn:aws:es:${var.aws_region}:${var.account_id}:domain/${var.domain_name}/*"
        }
    ]
}

Creating the index in Elasticsearch

This is the default mapping file we use when setting up for loading “enriched” data to Elasticsearch:

{
    "settings": {
        "analysis": {
            "analyzer": {
                "default": {
                    "type": "keyword"
                }
            }
        },
        "index" : {
            "number_of_replicas" : "1",
            "number_of_shards" : "${shards}"
        }
    },
    "mappings": {
        "good": {
            "properties": {
                "app_id": {
                    "type": "keyword"
                },
                "br_colordepth": {
                    "type": "keyword"
                },
                "br_cookies": {
                    "type": "boolean"
                },
                "br_family": {
                    "type": "keyword"
                },
                "br_features_director": {
                    "type": "boolean"
                },
                "br_features_flash": {
                    "type": "boolean"
                },
                "br_features_gears": {
                    "type": "boolean"
                },
                "br_features_java": {
                    "type": "boolean"
                },
                "br_features_pdf": {
                    "type": "boolean"
                },
                "br_features_quicktime": {
                    "type": "boolean"
                },
                "br_features_realplayer": {
                    "type": "boolean"
                },
                "br_features_silverlight": {
                    "type": "boolean"
                },
                "br_features_windowsmedia": {
                    "type": "boolean"
                },
                "br_lang": {
                    "type": "keyword"
                },
                "br_name": {
                    "type": "keyword"
                },
                "br_renderengine": {
                    "type": "keyword"
                },
                "br_type": {
                    "type": "keyword"
                },
                "br_version": {
                    "type": "keyword"
                },
                "br_viewheight": {
                    "type": "long"
                },
                "br_viewwidth": {
                    "type": "long"
                },
                "collector_tstamp": {
                    "type": "date",
                    "format": "dateOptionalTime"
                },
                "doc_charset": {
                    "type": "keyword"
                },
                "doc_height": {
                    "type": "long"
                },
                "doc_width": {
                    "type": "long"
                },
                "domain_sessionid": {
                    "type": "keyword"
                },
                "domain_sessionidx": {
                    "type": "long"
                },
                "domain_userid": {
                    "type": "keyword"
                },
                "dvce_ismobile": {
                    "type": "boolean"
                },
                "dvce_screenheight": {
                    "type": "long"
                },
                "dvce_screenwidth": {
                    "type": "long"
                },
                "dvce_sent_tstamp": {
                    "type": "date",
                    "format": "dateOptionalTime"
                },
                "dvce_tstamp": {
                    "type": "date",
                    "format": "dateOptionalTime"
                },
                "dvce_type": {
                    "type": "keyword"
                },
                "etl_tstamp": {
                    "type": "date",
                    "format": "dateOptionalTime"
                },
                "event": {
                    "type": "keyword"
                },
                "event_id": {
                    "type": "keyword"
                },
                "geo_location": {
                    "type": "geo_point"
                },
                "mkt_campaign": {
                    "type": "keyword"
                },
                "mkt_content": {
                    "type": "keyword"
                },
                "mkt_medium": {
                    "type": "keyword"
                },
                "mkt_source": {
                    "type": "keyword"
                },
                "mkt_term": {
                    "type": "keyword"
                },
                "name_tracker": {
                    "type": "keyword"
                },
                "network_userid": {
                    "type": "keyword"
                },
                "os_family": {
                    "type": "keyword"
                },
                "os_manufacturer": {
                    "type": "keyword"
                },
                "os_name": {
                    "type": "keyword"
                },
                "os_timezone": {
                    "type": "keyword"
                },
                "page_referrer": {
                    "type": "keyword"
                },
                "page_title": {
                    "type": "keyword"
                },
                "page_url": {
                    "type": "keyword"
                },
                "page_urlfragment": {
                    "type": "keyword"
                },
                "page_urlhost": {
                    "type": "keyword"
                },
                "page_urlpath": {
                    "type": "keyword"
                },
                "page_urlport": {
                    "type": "long"
                },
                "page_urlquery": {
                    "type": "keyword"
                },
                "page_urlscheme": {
                    "type": "keyword"
                },
                "platform": {
                    "type": "keyword"
                },
                "pp_xoffset_max": {
                    "type": "long"
                },
                "pp_xoffset_min": {
                    "type": "long"
                },
                "pp_yoffset_max": {
                    "type": "long"
                },
                "pp_yoffset_min": {
                    "type": "long"
                },
                "refr_medium": {
                    "type": "keyword"
                },
                "refr_source": {
                    "type": "keyword"
                },
                "refr_term": {
                    "type": "keyword"
                },
                "refr_urlfragment": {
                    "type": "keyword"
                },
                "refr_urlhost": {
                    "type": "keyword"
                },
                "refr_urlpath": {
                    "type": "keyword"
                },
                "refr_urlport": {
                    "type": "long"
                },
                "refr_urlquery": {
                    "type": "keyword"
                },
                "refr_urlscheme": {
                    "type": "keyword"
                },
                "se_action": {
                    "type": "keyword"
                },
                "se_category": {
                    "type": "keyword"
                },
                "se_label": {
                    "type": "keyword"
                },
                "user_fingerprint": {
                    "type": "keyword"
                },
                "user_id": {
                    "type": "keyword"
                },
                "user_ipaddress": {
                    "type": "keyword"
                },
                "useragent": {
                    "type": "keyword"
                },
                "v_collector": {
                    "type": "keyword"
                },
                "v_etl": {
                    "type": "keyword"
                },
                "v_tracker": {
                    "type": "keyword"
                },
                "dvce_created_tstamp": {
                    "type": "date",
                    "format": "dateOptionalTime"
                },
                "txn_id": {
                    "type": "long"
                },
                "geo_country": {
                    "type": "keyword"
                },
                "geo_region": {
                    "type": "keyword"
                },
                "geo_city": {
                    "type": "keyword"
                },
                "geo_zipcode": {
                    "type": "keyword"
                },
                "geo_latitude": {
                    "type": "double"
                },
                "geo_longitude": {
                    "type": "double"
                },
                "geo_region_name": {
                    "type": "keyword"
                },
                "ip_isp": {
                    "type": "keyword"
                },
                "ip_organization": {
                    "type": "keyword"
                },
                "ip_domain": {
                    "type": "keyword"
                },
                "ip_netspeed": {
                    "type": "keyword"
                },
                "se_property": {
                    "type": "keyword"
                },
                "se_value": {
                    "type": "keyword"
                },
                "tr_orderid": {
                    "type": "keyword"
                },
                "tr_affiliation": {
                    "type": "keyword"
                },
                "tr_total": {
                    "type": "double"
                },
                "tr_tax": {
                    "type": "double"
                },
                "tr_shipping": {
                    "type": "double"
                },
                "tr_city": {
                    "type": "keyword"
                },
                "tr_state": {
                    "type": "keyword"
                },
                "tr_country": {
                    "type": "keyword"
                },
                "tr_currency": {
                    "type": "keyword"
                },
                "tr_total_base": {
                    "type": "double"
                },
                "tr_tax_base": {
                    "type": "double"
                },
                "tr_shipping_base": {
                    "type": "double"
                },
                "ti_orderid": {
                    "type": "keyword"
                },
                "ti_sku": {
                    "type": "keyword"
                },
                "ti_name": {
                    "type": "keyword"
                },
                "ti_category": {
                    "type": "keyword"
                },
                "ti_price": {
                    "type": "double"
                },
                "ti_quantity": {
                    "type": "long"
                },
                "ti_currency": {
                    "type": "keyword"
                },
                "ti_price_base": {
                    "type": "double"
                },
                "base_currency": {
                    "type": "keyword"
                },
                "geo_timezone": {
                    "type": "keyword"
                },
                "mkt_clickid": {
                    "type": "keyword"
                },
                "mkt_network": {
                    "type": "keyword"
                },
                "etl_tags": {
                    "type": "keyword"
                },
                "refr_domain_userid": {
                    "type": "keyword"
                },
                "refr_device_tstamp": {
                    "type": "date",
                    "format": "dateOptionalTime"
                },
                "derived_tstamp": {
                    "type": "date",
                    "format": "dateOptionalTime"
                },
                "event_vendor": {
                    "type": "keyword"
                },
                "event_name": {
                    "type": "keyword"
                },
                "event_format": {
                    "type": "keyword"
                },
                "event_version": {
                    "type": "keyword"
                },
                "event_fingerprint": {
                    "type": "keyword"
                },
                "true_tstamp": {
                    "type": "date",
                    "format": "dateOptionalTime"
                }
            }
        }
    }
}

Inserted into the cluster like so:

curl -XPUT --fail 'https://${var.cluster_endpoint}/${var.index_name_good}?pretty' -H 'Content-Type: application/json' -d '${local.mapping_good}'

Note: The index_name_good should be the same index name as what you have put into your config HOCON.


At this point you should have the index created in Elasticsearch with the correct mapping and you should be ready to load to the cluster!

In terms of network security you do want to ensure that you have allowed inbound traffic on port 443 TCP for the AWS Elasticsearch Cluster and outbound traffic on port 443 TCP from the node running the Elasticsearch Loader - if you can already query the endpoint from the server when SSHed in then this should be fine.

Hope this helps!

1 Like

@josh Thanks alot. Can you also post the mapping for enriched bad record ?

Sure - this bad mapping is forwards and backwards compatible with the new bad row format and old bad row format.

Note: If you start using the new bad rows format the timestamp to order on is data.failure.timestamp not failure_tstamp.