Unable to transfer data from Kinesis to ElasticSearch

Hello Support,

I am facing error while running Snowplow elasticsearch sink

I am running following command
command: snowplow-elasticsearch-sink-0.8.0-2x --config snowplow.conf

ERROR

./snowplow-elasticsearch-sink-0.8.0-2x --config snowplow.conf Exception in thread “main” scala.MatchError: ‘elasticsearch’ (of class java.lang.String)
at com.snowplowanalytics.snowplow.storage.kinesis.elasticsearch.ElasticsearchSinkApp$delayedInit$body.apply(ElasticsearchSinkApp.scala:123)
at scala.Function0$class.apply$mcV$sp(Function0.scala:40)
at scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:12)
at scala.App$$anonfun$main$1.apply(App.scala:71)
at scala.App$$anonfun$main$1.apply(App.scala:71)
at scala.collection.immutable.List.foreach(List.scala:318)
at scala.collection.generic.TraversableForwarder$class.foreach(TraversableForwarder.scala:32)
at scala.App$class.main(App.scala:71)
at com.snowplowanalytics.snowplow.storage.kinesis.elasticsearch.ElasticsearchSinkApp$.main(ElasticsearchSinkApp.scala:69)
at com.snowplowanalytics.snowplow.storage.kinesis.elasticsearch.ElasticsearchSinkApp.main(ElasticsearchSinkApp.scala)

Snowplow.conf

sink {

source = ‘kinesis’

sink {
“good”: elasticsearch
“bad”: ‘none’
}

stream-type: “good”

aws {
access-key: “key”
secret-key: “key”
}

kinesis {

in {
  stream-name: "enriched" # Kinesis stream name
  # LATEST: most recent data.
  # TRIM_HORIZON: oldest available data.
  # Note: This only affects the first run of this application
  # on a stream.
  initial-position: "TRIM_HORIZON"
  # Maximum number of records to get from Kinesis per call to GetRecords
  maxRecords: 10000
}
out {
  # Stream for enriched events which are rejected by Elasticsearch
  stream-name: "bad"
  shards: 1
}
region: "us-west-2"
app-name: "snownrich"

}

elasticsearch {

client {
  type: "http"
  endpoint: "xx.xx.xx.xx" #IP address of server
  port: "9200"
  max-timeout: "7200"
  # Section for configuring the HTTP client
  http {
    conn-timeout: "7200"
    read-timeout: "7200"
  }
}
cluster {
  name: "elasticsearch"
  index: "snowplow"
  type: "elasticsearch"
}

}

Events are accumulated in a buffer before being sent to Elasticsearch.

The buffer is emptied whenever:

- the combined size of the stored records exceeds byte-limit or

- the number of stored records exceeds record-limit or

- the time in milliseconds since it was last emptied exceeds time-limit

buffer {
byte-limit: 4500000
record-limit: 500 # Not supported by Kafka; will be ignored
time-limit: 60000
}

Optional section for tracking endpoints

monitoring {
snowplow {
collector-uri: “xx.xx.xx.xx”
collector-port: 80
app-id: “collector-monitor”
method: “GET”
}
}
}

Please help, TIA

Hey @geetanshjindal - your config looks fine to me so it is quite strange. The issue could be to do with the use of single quotes instead of double quotes. Please try the following config which has been formatted without any single quotes:

sink {
  source = kinesis
  sink {
    "good": elasticsearch
    "bad": none
  }
  stream-type: "good"
  aws {
    access-key: "key"
    secret-key: "key"
  }
  kinesis {
    in {
      stream-name: "enriched"
      initial-position: "TRIM_HORIZON"
      maxRecords: 10000
    }
    out {
      stream-name: "bad"
      shards: 1
    }
    region: "us-west-2"
    app-name: "snownrich"
  }
  elasticsearch {
    client {
      type: "http"
      endpoint: "xx.xx.xx.xx" #IP address of server
      port: "9200"
      max-timeout: "7200"
      http {
        conn-timeout: "7200"
        read-timeout: "7200"
      }
    }
    cluster {
      name: "elasticsearch"
      index: "snowplow"
      type: "elasticsearch"
    }
  }
  buffer {
    byte-limit: 4500000
    record-limit: 500
    time-limit: 60000
  }
  monitoring {
    snowplow {
      collector-uri: "xx.xx.xx.xx"
      collector-port: 80
      app-id: "collector-monitor"
      method: "GET"
    }
  }
}

Hey, Thanks. seems its working fine now.

in sample config file, everything is in single quotes. so did not changed the format

Hey @geetanshjindal you are right! Will make a ticket to get that fixed.