S3 Loader does not connect to dynamodb localstack

Hi,

I am using minikube and localstack to setup a snowplow test pipeline.
I managed to set up the tracker (a python tracker), collector and enrichment module including streams. It writes record successfully to the streams.

Now I am stuck with the s3 loader part. The loader log in the console provides generic warning and info messages but there is no error and no records are written into the s3 bucket on localstack.

loader → 2020-05-16T11:54:12.993Z → [main] INFO com.snowplowanalytics.s3.loader.sinks.KinesisSink - Stream bad-stream-loader exists and is active
loader → 2020-05-16T11:54:13.010Z → [main] INFO com.snowplowanalytics.s3.loader.S3Loader$ - Initializing sink with KinesisConnectorConfiguration: {regionName=eu-west-1, s3Endpoint=localstack:4566, kinesisInputStream=good-stream-enriched, maxRecords=50, connectorDestination=s3, bufferMillisecondsLimit=2000, bufferRecordCountLimit=2, s3Bucket=test-bucket-loader, kinesisEndpoint=localstack:4566, appName=oneapp-loader, bufferByteSizeLimit=10000, retryLimit=1, initialPositionInStream=TRIM_HORIZON}
loader → 2020-05-16T11:54:13.277Z → [main] INFO com.snowplowanalytics.s3.loader.KinesisSourceExecutor - KinesisSourceExecutor worker created

I already figured that the corresponding dynamodb table is not created in localstack. The dynamodb table creation works fine in the enrichment module (table name differs: “oa-enrich”):

I am using the following config:

# Default configuration for s3-loader

# Sources currently supported are:
# 'kinesis' for reading records from a Kinesis stream
# 'nsq' for reading records from a NSQ topic
source = kinesis

# Sink is used for sending events which processing failed.
# Sinks currently supported are:
# 'kinesis' for writing records to a Kinesis stream
# 'nsq' for writing records to a NSQ topic
sink = kinesis

# The following are used to authenticate for the Amazon Kinesis sink.
# If both are set to 'default', the default provider chain is used
# (see http://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/auth/DefaultAWSCredentialsProviderChain.html)
# If both are set to 'iam', use AWS IAM Roles to provision credentials.
# If both are set to 'env', use environment variables AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY
aws {
  accessKey = env
  secretKey = env
}
# Config for NSQ
nsq {
  # Channel name for NSQ source
  # If more than one application reading from the same NSQ topic     at the same time,
  # all of them must have unique channel name for getting all the data from the same topic
  channelName = "not_used"
    
  # Host name for NSQ tools
  host = "not_used"

  # HTTP port for nsqd
  port = 0000

  # HTTP port for nsqlookupd
  lookupPort = 0000
}

kinesis {
  # LATEST: most recent data.
  # TRIM_HORIZON: oldest available data.
  # "AT_TIMESTAMP": Start from the record at or after the specified timestamp
  # Note: This only affects the first run of this application on a stream.
  initialPosition = TRIM_HORIZON

  # Need to be specified when initialPosition is "AT_TIMESTAMP".
  # Timestamp format need to be in "yyyy-MM-ddTHH:mm:ssZ".
  # Ex: "2017-05-17T10:00:00Z"
  # Note: Time need to specified in UTC.
  # initialTimestamp = "{{timestamp}}"

  # Maximum number of records to read per GetRecords call     
  maxRecords = 50

  region = ${LOADER_AWS_REGION}

  # "appName" is used for a DynamoDB table to maintain stream state.
  appName = oa-loader

  ## Optional endpoint url configuration to override aws kinesis endpoints,
  ## this can be used to specify local endpoints when using localstack
  customEndpoint = "localstack:4566"
}

streams {
  # Input stream name
  inStreamName =  ${LOADER_STREAMS_GOOD}

  # Stream for events for which the storage process fails
  outStreamName =  ${LOADER_STREAMS_BAD}

  # Events are accumulated in a buffer before being sent to S3.
  # The buffer is emptied whenever:
  # - the combined size of the stored records exceeds byteLimit or
  # - the number of stored records exceeds recordLimit or
  # - the time in milliseconds since it was last emptied exceeds timeLimit
  buffer {
    byteLimit = ${LOADER_STREAMS_BUFFER_BYTE_LIMIT} # Not supported by NSQ; will be ignored
    recordLimit = ${LOADER_STREAMS_BUFFER_RECORD_LIMIT}
    timeLimit = ${LOADER_STREAMS_BUFFER_TIME_LIMIT} # Not supported by NSQ; will be ignored
  }
}

s3 {
  region = ${LOADER_AWS_REGION}
  bucket = test-bucket-loader
  # optional bucket where to store partitioned data
  #partitionedBucket = "{{s3bucket}}/partitioned"

  # optional date format prefix for directory pattern
  # eg: {YYYY}/{MM}/{dd}/{HH}
  #dateFormat = "{{s3DateFormat}}"

  # optional directory structure to use while storing data on s3 (followed by dateFormat config)
  # eg: outputDirectory = "enriched/good/"‚
  #outputDirectory = "{{s3OutputDirectory}}"

  # optional filename prefix
  # eg: output
  #filenamePrefix = "oa"

  # Format is one of lzo or gzip
  # Note, that you can use gzip only for enriched data stream.
  format = gzip

  # Maximum Timeout that the application is allowed to fail for (in milliseconds)
  maxTimeout = 60000

  ## Optional endpoint url configuration to override aws s3 endpoints,
  ## this can be used to specify local endpoints when using localstack
  customEndpoint = "localstack:4566"
}

# Optional section for tracking endpoints
# monitoring {
#   snowplow{
#     collectorUri = "{{collectorUri}}"
#     collectorPort = 80 
#     appId = "{{appName}}"
#     method = "{{method}}"
#   }
# }

After enabling log4j I realised that the loader tries to connect to the wrong dynamodb endpoint:
https://dynamodb.eu-west-1.amazonaws.com

loader → 2020-05-16T16:43:11.100Z → 2020-05-16 16:43:11,100 [main] DEBUG com.amazonaws.request -  Sending Request: POST https://dynamodb.eu-west-1.amazonaws.com /

The endpoint should be `https//localstack:4566 like for the s3 and kinesis resources. This works for fine for enrichment but not for the s3 loader? How can I set the endpoint for dynamodb correctly?

I would really appreciate your help.

Hi @mgloel we ran into this exact issue in our recent hackathon to leverage localstack. The missing piece is indeed the override for DynamoDB - this was recently added to the Elasticsearch Loader and can be used as a guide as to what needs to be changed.

The changes that need to make look roughly as follows:

https://github.com/snowplow/snowplow-elasticsearch-loader/commit/dfb8da9d2bfea86a058197c40fa9efea9380feb2
https://github.com/snowplow/snowplow-elasticsearch-loader/commit/ca619cd940a58968e3a066b76f1ddeea69ee251e

We have this on our radar to fix but obviously if you have bandwidth we would love a PR adding this ability to the S3 Loader so that all of the AWS streaming components can be used with Localstack!

2 Likes

Hi @josh, thanks a lot for pointing me into the right direction. I tweaked the s3-loader module now in a similar fashion as in the elasticsearch PR:

However, it does not override the dynamodb endpoints yet. It still sets it to the
endpoint with the region that I specify in the config.

“The region of Amazon DynamoDB client has been set to eu-west-1”
And it is trying this endpoint.
https://dynamodb.eu-west-1.amazonaws.com

Is there another thing that needs to be added in the s3 loader specifically. Unfortunately, I am new to scala and not sure where and how the AWS Connector is specified.

Hi @mgloel the other place it needs to be set is in here:

https://github.com/gloelmat/snowplow-s3-loader/blob/43736a929a7906ca3a17c1683b96e8512a77d22e/src/main/scala/com.snowplowanalytics.s3/loader/KinesisSourceExecutor.scala#L65

This was the same spot missed initially in the Elasticsearch Loader. You will need to add:

.withDynamoDBEndpoint(kcc.DYNAMODB_ENDPOINT)

Into that constructor and it should then work!

Hey @josh,

I have added that already. I spotted the second commit later

It’s still not running.
Is there something specific about the AWS connector in the ES loader that difffers from S3 Loader moduler

Hi @mgloel if that has been configured it should work as far as I can tell - there could be other nuances in the localstack configuration that need to be addressed like disabling CBOR with this env var AWS_CBOR_DISABLE=1.

Are there any error logs you can share or is it sill simply not doing anything?

What are your thresholds for sinking to S3 set to? byte_limit, record_limit, time_limit etc.

1 Like

Hey @josh,

CBOR hes been disabled already. I will share the relecant chunk of my logs:

loader → 2020-05-19T08:57:30.702Z → [main] INFO com.snowplowanalytics.s3.loader.sinks.KinesisSink - Stream bad-stream-loader exists and is active
loader → 2020-05-19T08:57:30.727Z → [main] INFO com.snowplowanalytics.s3.loader.S3Loader$ - Initializing sink with KinesisConnectorConfiguration: {regionName=eu-west-1, s3Endpoint=localstack:4566, kinesisInputStream=good-stream-enriched, maxRecords=50, connectorDestination=s3, bufferMillisecondsLimit=2000, bufferRecordCountLimit=2, s3Bucket=test-bucket-loader, kinesisEndpoint=localstack:4566, appName=oneapp-loader, bufferByteSizeLimit=10000, retryLimit=1, dynamoDBEndpoint=localstack:4566, initialPositionInStream=TRIM_HORIZON}
loader → 2020-05-19T08:57:30.840Z → 2020-05-19 08:57:30,840 [main] WARN  com.amazonaws.http.AmazonHttpClient -  SSL Certificate checking for endpoints has been explicitly disabled.
loader → 2020-05-19T08:57:30.911Z → 2020-05-19 08:57:30,905 [main] WARN  com.amazonaws.http.AmazonHttpClient -  SSL Certificate checking for endpoints has been explicitly disabled.
loader → 2020-05-19T08:57:30.913Z → 2020-05-19 08:57:30,913 [main] WARN  com.amazonaws.http.AmazonHttpClient -  SSL Certificate checking for endpoints has been explicitly disabled.
loader → 2020-05-19T08:57:30.920Z → 2020-05-19 08:57:30,920 [main] DEBUG com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker -  The region of Amazon CloudWatch client has been set to eu-west-1
loader → 2020-05-19T08:57:30.923Z → 2020-05-19 08:57:30,923 [main] DEBUG com.amazonaws.services.kinesis.metrics.impl.CWPublisherRunnable -  Constructing CWPublisherRunnable with maxBufferTimeMillis 10000 maxQueueSize 10000 batchSize 200 maxJitter 0
loader → 2020-05-19T08:57:30.924Z → 2020-05-19 08:57:30,924 [cw-metrics-publisher] DEBUG com.amazonaws.services.kinesis.metrics.impl.CWPublisherRunnable -  Waiting up to 10000ms for 200 more datums to appear.
loader → 2020-05-19T08:57:30.926Z → 2020-05-19 08:57:30,925 [main] DEBUG com.amazonaws.services.kinesis.clientlibrary.proxies.KinesisProxy -  KinesisProxy( good-stream-enriched)
loader → 2020-05-19T08:57:30.933Z → 2020-05-19 08:57:30,933 [main] INFO  com.amazonaws.services.kinesis.leases.impl.LeaseCoordinator -  With failover time 30000 ms and epsilon 25 ms, LeaseCoordinator will renew leases every 9975 ms, takeleases every 60050 ms, process maximum of 2147483647 leases and steal 1 lease(s) at a time.
loader → 2020-05-19T08:57:30.934Z → 2020-05-19 08:57:30,934 [main] DEBUG com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker -  The region of Amazon Kinesis client has been set to eu-west-1
loader → 2020-05-19T08:57:30.934Z → 2020-05-19 08:57:30,934 [main] DEBUG com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker -  The region of Amazon DynamoDB client has been set to eu-west-1
loader → 2020-05-19T08:57:30.934Z → 2020-05-19 08:57:30,934 [main] WARN  com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker -  Received configuration for both region name as eu-west-1, and Amazon Kinesis endpoint as localstack:4566. Amazon Kinesis endpoint will overwrite region name.
loader → 2020-05-19T08:57:30.934Z → 2020-05-19 08:57:30,934 [main] DEBUG com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker -  The region of Amazon Kinesis client has been overwritten to localstack:4566
loader → 2020-05-19T08:57:30.934Z → [main] INFO com.snowplowanalytics.s3.loader.KinesisSourceExecutor - KinesisSourceExecutor worker created
loader → 2020-05-19T08:57:30.937Z → 2020-05-19 08:57:30,937 [main] INFO  com.amazonaws.services.kinesis.connectors.KinesisConnectorExecutorBase -  Starting worker in KinesisSourceExecutor
loader → 2020-05-19T08:57:30.937Z → 2020-05-19 08:57:30,937 [main] INFO  com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker -  Initialization attempt 1
loader → 2020-05-19T08:57:30.937Z → 2020-05-19 08:57:30,937 [main] INFO  com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker -  Initializing LeaseCoordinator
loader → 2020-05-19T08:57:30.939Z → 2020-05-19 08:57:30,938 [main] DEBUG com.amazonaws.request -  Sending Request: POST https://dynamodb.eu-west-1.amazonaws.com / Headers: (User-Agent: oneapp-loader,amazon-kinesis-connector-to-s3,amazon-kinesis-connector-java-1.3.0,amazon-kinesis-client-library-java-1.7.4, aws-sdk-java/1.11.115 Linux/4.15.0-1077-azure OpenJDK_64-Bit_Server_VM/25.252-b09/1.8.0_252 scala/2.11.11, amz-sdk-invocation-id: c52678e6-3c17-f6ee-7783-3aa98445dde9, Content-Length: 29, X-Amz-Target: DynamoDB_20120810.DescribeTable, Content-Type: application/x-amz-json-1.0, ) 
loader → 2020-05-19T08:57:30.939Z → 2020-05-19 08:57:30,939 [main] DEBUG com.amazonaws.auth.AWS4Signer -  AWS4 Canonical Request: '"POST
loader → 2020-05-19T08:57:30.939Z → /

The dynamoDBEndpoint=localstack:4566 is set as seen in line two but it is not used for the override.

Hi @mgloel - the only other thing I can think of off the top of my head would be whether the Kinesis Client Library is not correctly overriding the endpoint.

In the ES loader we use 1.9.1 (https://github.com/snowplow/snowplow-elasticsearch-loader/blob/0.12.2-rc2/project/Dependencies.scala#L22)

In the S3 loader we use 1.7.5 (https://github.com/snowplow/snowplow-s3-loader/blob/master/project/Dependencies.scala#L25)

Aligning these might yield a more positive result and could be worth a try!

1 Like

Hey @josh ,

updating the kinesisclient to 1.9.1. did not change anything unfortunately.
I will check again whether it is a localstack problem but I doubt it.

According to the logs above it seems that the worker does not override the dynamodb endpoint in the first place. The region for db is set, though.

Are they maybe some other dependencies required?

Will need to take a deeper look at the code - not a heap of availability right at the moment however. The best option is likely to debug through all the different parts of the app where it should be being set and checking that it is indeed being overwritten… the fact that it is still reporting the AWS endpoint says that at some point the variable is not being correctly set.

Sorry not to be more help here!

1 Like

Hey @josh
no problem. Your replies were actually all really helpful. Thanks so much.
I will try to setup my pipeline in the actual AWS environment for now. I would be very interested to follow up on any changes to the s3 loader.

Hey @mgloel,

I also wanted to run the pipeline locally and test the new S3 loader (0.7.0) partitioning recently.

I ran into the same issue and I managed to get the full pipeline running locally.

One of the problem is indeed the DynamoDB endpoint as pointed out by @josh

Another thing I had to change was the S3 client configuration to use a path style based access. I changed the configuration to add a flag, and pass it to the S3 client in S3Emitter.scala:

  val client = AmazonS3ClientBuilder
    .standard()
    .withCredentials(provider)
+    .withPathStyleAccessEnabled(config.pathStyleAccessEnabled)
    .withEndpointConfiguration(new EndpointConfiguration(config.endpoint, config.region))
    .build()

Maybe it can help you too?

I should be able to open a pull request soon with these few changes.

4 Likes

Great find @AcidFlow! Thanks so much for getting to the bottom of this and yes please for a PR with these changes!

1 Like

Hey @josh, @mgloel

Here is the PR : https://github.com/snowplow/snowplow-s3-loader/pull/175

Let me know if you need any help / have any questions :slight_smile:

3 Likes