EmrETLRunner fails to start

Hi,

I am getting the following error message when trying to run EmrETLRunner (Version: 1.0.4):

    Value guarded in: Snowplow::EmrEtlRunner::Cli::load_config
        With Contract: Maybe, String, Bool => Maybe
        At: uri:classloader:/emr-etl-runner/lib/snowplow-emr-etl-runner/cli.rb:211
                          EmrEtlRunner at uri:classloader:/emr-etl-runner/lib/snowplow-emr-etl-runner/contracts.rb:33
                      failure_callback at uri:classloader:/gems/contracts-0.11.0/lib/contracts.rb:154
                             call_with at uri:classloader:/gems/contracts-0.11.0/lib/contracts/call_with.rb:80
                       redefine_method at uri:classloader:/gems/contracts-0.11.0/lib/contracts/method_handler.rb:138
                       process_options at uri:classloader:/emr-etl-runner/lib/snowplow-emr-etl-runner/cli.rb:199
      get_args_config_enrichments_resolver at uri:classloader:/emr-etl-runner/lib/snowplow-emr-etl-runner/cli.rb:173
                               send_to at uri:classloader:/gems/contracts-0.11.0/lib/contracts/method_reference.rb:43
                             call_with at uri:classloader:/gems/contracts-0.11.0/lib/contracts/call_with.rb:76
                       redefine_method at uri:classloader:/gems/contracts-0.11.0/lib/contracts/method_handler.rb:138
                                <main> at uri:classloader:/emr-etl-runner/bin/snowplow-emr-etl-runner:37
                                  load at org/jruby/RubyKernel.java:1009
                                <main> at uri:classloader:/META-INF/main.rb:1
                               require at org/jruby/RubyKernel.java:974
                               require at uri:classloader:/META-INF/jruby.home/lib/ruby/stdlib/rubygems/core_ext/kernel_require.rb:54
                                <main> at <script>:3

This is what I have in the config file:

aws:
  # Credentials can be hardcoded or set in environment variables
  access_key_id: <%= ENV['AWS_ACCESS_KEY'] %>
  secret_access_key: <%= ENV['AWS_SECRET_KEY'] %>
  s3:
    region: us-east-1
    buckets:
      assets: s3://snowplow-hosted-assets # DO NOT CHANGE unless you are hosting the jarfiles etc yourself in your own bucket
      jsonpath_assets: # If you have defined your own JSON Schemas, add the s3:// path to your own JSON Path files in your own bucket here
      log: s3://company-shared-production/var/log/snowplow_tracker/runs/run=redshift_test/logs/
      encrypted: false
      raw:
        in: 
          - company-snowplow-raw-production
        processing: s3://company-shared-production/var/log/snowplow_tracker/runs/run=redshift_test/raw/processing/
        archive: s3://company-shared-production/var/log/snowplow_tracker/runs/run=redshift_test/raw/archive/
      enriched:
        good: s3://company-shared-production/var/log/snowplow_tracker/runs/run=redshift_test/enriched/good/       # e.g. s3://my-out-bucket/enriched/good
        archive: s3://company-shared-production/var/log/snowplow_tracker/runs/run=redshift_test/enriched/archive/    # Where to archive enriched events to, e.g. s3://my-archive-bucket/enriched
        bad: s3://company-shared-production/var/log/snowplow_tracker/runs/run=redshift_test/enriched/bad/     # S3 Loader's output folder with enriched data. If present raw buckets will be discarded
        errors: s3://company-shared-production/var/log/snowplow_tracker/runs/run=redshift_test/enriched/errors/
        stream: s3://company-shared-production/var/log/snowplow_tracker/runs/run=redshift_test/enriched/stream/
      shredded:
        good: s3://company-shared-production/var/log/snowplow_tracker/runs/run=redshift_test/enriched/good/       # e.g. s3://my-out-bucket/shredded/good
        bad: s3://company-shared-production/var/log/snowplow_tracker/runs/run=redshift_test/enriched/rdb_loader/        # e.g. s3://my-out-bucket/shredded/bad
        errors: s3://company-shared-production/var/log/snowplow_tracker/runs/run=redshift_test/enriched/errors/     # Leave blank unless :continue_on_unexpected_error: set to true below
        archive: s3://company-shared-production/var/log/snowplow_tracker/runs/run=redshift_test/enriched/archive/    # Where to archive shredded events to, e.g. s3://my-archive-bucket/shredded
    consolidate_shredded_output: false # Whether to combine files when copying from hdfs to s3
  emr:
    ami_version: 5.9.0
    region: us-east-1        # Always set this
    jobflow_role: EMR_EC2_DefaultRole # Created using $ aws emr create-default-roles
    service_role: EMR_DefaultRole     # Created using $ aws emr create-default-roles
    placement:     # Set this if not running in VPC. Leave blank otherwise
    ec2_subnet_id: subnet-690b1543 # Set this if running in VPC. Leave blank otherwise
    ec2_key_name: amplify-keypair
    security_configuration: # Specify your EMR security configuration if needed. Leave blank otherwise
    bootstrap:           # Set this to specify custom boostrap actions. Leave empty otherwise
    software:
      hbase:                # Optional. To launch on cluster, provide version, "0.92.0", keep quotes. Leave empty otherwise.
      lingual:              # Optional. To launch on cluster, provide version, "1.1", keep quotes. Leave empty otherwise.
    # Adjust your Hadoop cluster below
    jobflow:
      job_name: Snowplow ETL # Give your job a name
      master_instance_type: m1.medium
      core_instance_count: 2
      core_instance_type: m1.medium
      core_instance_bid: 0.015 # In USD. Adjust bid, or leave blank for on-demand core instances
      core_instance_ebs:    # Optional. Attach an EBS volume to each core instance.
        volume_size: 100    # Gigabytes
        volume_type: "gp2"
        volume_iops: 400    # Optional. Will only be used if volume_type is "io1"
        ebs_optimized: false # Optional. Will default to true
      task_instance_count: 0 # Increase to use spot instances
      task_instance_type: m1.medium
      task_instance_bid: 0.015 # In USD. Adjust bid, or leave blank for non-spot-priced (i.e. on-demand) task instances
    bootstrap_failure_tries: 3 # Number of times to attempt the job in the event of bootstrap failures
    configuration:
      yarn-site:
        yarn.resourcemanager.am.max-attempts: "1"
      spark:
        maximizeResourceAllocation: "true"
    additional_info:        # Optional JSON string for selecting additional features
enrich:
  versions:
    spark_enrich: 1.18.0 # Version of the Spark Enrichment process
  output_compression: GZIP # Stream mode supports only GZIP
storage:
  versions:
    rdb_loader: 0.14.0
    rdb_shredder: 0.13.1        # Version of the Spark Shredding process
    hadoop_elasticsearch: 0.1.0 # Version of the Hadoop to Elasticsearch copying process
monitoring:
  tags:
    app: ETL # Name-value pairs describing this job
  logging:
    level: DEBUG # You can optionally switch to INFO for production
#  snowplow:
#    method: get
#    app_id: redshift_loader # e.g. snowplow
#    collector: # e.g. d3rkrsqld9gmqf.cloudfront.net
#    protocol: http
#    port: 80

Anyone can help with this please?

@Farzin_Zaker, your configuration file is good and I had no problem parsing it. I think the problem is your access_key_id and secret_access_key are not substituted with the appropriate environmental variable’s values.

As proof of concept, can you substitute the variables with the actual values and see if you progress further?

Hi @ihor, Thanks for your reply. You are right, after fixing this and some other issues, I was able to take a few steps ahead but now, I am getting ClassNotFoundException at the “[shred] spark: Shred Enriched Events” step:

20/10/14 22:59:22 ERROR ApplicationMaster: User class threw exception: java.lang.NoClassDefFoundError: org/apache/spark/sql/execution/datasources/ExecutedWriteSummary
java.lang.NoClassDefFoundError: org/apache/spark/sql/execution/datasources/ExecutedWriteSummary
	at com.snowplowanalytics.snowplow.storage.spark.ShredJob$.<init>(ShredJob.scala:146)
	at com.snowplowanalytics.snowplow.storage.spark.ShredJob$.<clinit>(ShredJob.scala)
	at com.snowplowanalytics.snowplow.storage.spark.ShredJob.main(ShredJob.scala)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:635)
Caused by: java.lang.ClassNotFoundException:     org.apache.spark.sql.execution.datasources.ExecutedWriteSummary
    	at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
    	at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
    	at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
    	... 8 more
    20/10/14 22:59:22 INFO ApplicationMaster: Final app status: FAILED, exitCode: 15, (reason: User class threw exception: java.lang.NoClassDefFoundError: org/apache/spark/sql/execution/datasources/ExecutedWriteSummary)
    20/10/14 22:59:22 ERROR ApplicationMaster: Uncaught exception: 
    org.apache.spark.SparkException: Exception thrown in awaitResult: 
    	at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:205)
    	at org.apache.spark.deploy.yarn.ApplicationMaster.runDriver(ApplicationMaster.scala:401)
    	at org.apache.spark.deploy.yarn.ApplicationMaster.run(ApplicationMaster.scala:254)
    	at org.apache.spark.deploy.yarn.ApplicationMaster$$anonfun$main$1.apply$mcV$sp(ApplicationMaster.scala:764)
    	at org.apache.spark.deploy.SparkHadoopUtil$$anon$2.run(SparkHadoopUtil.scala:67)
    	at org.apache.spark.deploy.SparkHadoopUtil$$anon$2.run(SparkHadoopUtil.scala:66)
    	at java.security.AccessController.doPrivileged(Native Method)
    	at javax.security.auth.Subject.doAs(Subject.java:422)
    	at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1836)
    	at org.apache.spark.deploy.SparkHadoopUtil.runAsSparkUser(SparkHadoopUtil.scala:66)
    	at org.apache.spark.deploy.yarn.ApplicationMaster$.main(ApplicationMaster.scala:762)
    	at org.apache.spark.deploy.yarn.ApplicationMaster.main(ApplicationMaster.scala)
    Caused by: java.util.concurrent.ExecutionException: Boxed Error
    	at scala.concurrent.impl.Promise$.resolver(Promise.scala:55)
    	at scala.concurrent.impl.Promise$.scala$concurrent$impl$Promise$$resolveTry(Promise.scala:47)
    	at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:244)
    	at scala.concurrent.Promise$class.tryFailure(Promise.scala:112)
    	at scala.concurrent.impl.Promise$DefaultPromise.tryFailure(Promise.scala:153)
    	at org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:653)
    Caused by: java.lang.NoClassDefFoundError: org/apache/spark/sql/execution/datasources/ExecutedWriteSummary
    	at com.snowplowanalytics.snowplow.storage.spark.ShredJob$.<init>(ShredJob.scala:146)
    	at com.snowplowanalytics.snowplow.storage.spark.ShredJob$.<clinit>(ShredJob.scala)
    	at com.snowplowanalytics.snowplow.storage.spark.ShredJob.main(ShredJob.scala)
    	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    	at java.lang.reflect.Method.invoke(Method.java:498)
    	at org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:635)
    Caused by: java.lang.ClassNotFoundException: org.apache.spark.sql.execution.datasources.ExecutedWriteSummary
    	at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
    	at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
    	at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
    	... 8 more

I faced a similar issue at the “[enrich] spark: Enrich Raw Events” step and changing the ami_version from 5.29.0 to 5.9.0 fixed the issue.
My EMR cluster got stuck at the shredding step and I have no idea how to fix it. Would you please help with this?

Here is my latest config file:

aws:
  # Credentials can be hardcoded or set in environment variables
  access_key_id: <%= ENV['AWS_ACCESS_KEY'] %>
  secret_access_key: <%= ENV['AWS_SECRET_KEY'] %>
  s3:
    region: us-east-1
    buckets:
      assets: s3://company-hosted-assets # DO NOT CHANGE unless you are hosting the jarfiles etc yourself in your own bucket
      jsonpath_assets: # If you have defined your own JSON Schemas, add the s3:// path to your own JSON Path files in your own bucket here
      log: s3n://company-shared-production/redshift/var/log/company_tracker/logs/
      encrypted: false
      raw:
        in:
          - s3://company-raw-production/
        processing: s3://company-shared-production/redshift/var/log/company_tracker/raw/processing/
        archive: s3://company-shared-production/redshift/var/log/company_tracker/raw/archive/
      enriched:
        good: s3://company-shared-production/redshift/var/log/company_tracker/enriched/good/       # e.g. s3://my-out-bucket/enriched/good
        archive: s3://company-shared-production/redshift/var/log/company_tracker/enriched/archive/    # Where to archive enriched events to, e.g. s3://my-archive-bucket/enriched
        bad: s3://company-shared-production/redshift/var/log/company_tracker/enriched/bad/     # S3 Loader's output folder with enriched data. If present raw buckets will be discarded
        errors: s3://company-shared-production/redshift/var/log/company_tracker/enriched/errors/
#        stream: s3://company-shared-production/var/log/company_tracker/enriched/stream/
      shredded:
        good: s3://company-shared-production/redshift/var/log/company_tracker/shredded/good/       # e.g. s3://my-out-bucket/shredded/good
        bad: s3://company-shared-production/redshift/var/log/company_tracker/shredded/rdb_loader/        # e.g. s3://my-out-bucket/shredded/bad
        errors: s3://company-shared-production/redshift/var/log/company_tracker/shredded/errors/     # Leave blank unless :continue_on_unexpected_error: set to true below
        archive: s3://company-shared-production/redshift/var/log/company_tracker/shredded/archive/    # Where to archive shredded events to, e.g. s3://my-archive-bucket/shredded
    consolidate_shredded_output: false # Whether to combine files when copying from hdfs to s3
  emr:
    ami_version: 5.9.0
    region: us-east-1        # Always set this
    jobflow_role: EMR_EC2_DefaultRole # Created using $ aws emr create-default-roles
    service_role: EMR_DefaultRole     # Created using $ aws emr create-default-roles
    placement:     # Set this if not running in VPC. Leave blank otherwise
    ec2_subnet_id: subnet-690b1543 # Set this if running in VPC. Leave blank otherwise
    ec2_key_name: amplify-keypair
    security_configuration: # Specify your EMR security configuration if needed. Leave blank otherwise
    bootstrap: []           # Set this to specify custom boostrap actions. Leave empty otherwise
    software:
      hbase: #"1.3.1" #"1.4.13"              # Optional. To launch on cluster, provide version, "0.92.0", keep quotes. Leave empty otherwise.
      lingual: #"1.1"             # Optional. To launch on cluster, provide version, "1.1", keep quotes. Leave empty otherwise.
 #  Adjust your Hadoop cluster below
    jobflow:
      job_name: company to Redshift ETL # Give your job a name
      master_instance_type: m1.medium
      core_instance_count: 12
      core_instance_type: r3.2xlarge
      core_instance_bid: #0.015 # In USD. Adjust bid, or leave blank for on-demand core instances
      core_instance_ebs:    # Optional. Attach an EBS volume to each core instance.
        volume_size: 100    # Gigabytes
        volume_type: "gp2"
        volume_iops: 400    # Optional. Will only be used if volume_type is "io1"
        ebs_optimized: false # Optional. Will default to true
      task_instance_count: 0 # Increase to use spot instances
      task_instance_type: m4.large
      task_instance_bid: #0.015 # In USD. Adjust bid, or leave blank for non-spot-priced (i.e. on-demand) task instances
    bootstrap_failure_tries: 3 # Number of times to attempt the job in the event of bootstrap failures
    configuration:
      yarn-site:
        yarn.resourcemanager.am.max-attempts: "1"
      spark-defaults:
        maximizeResourceAllocation: "true"
#    additional_info:        # Optional JSON string for selecting additional features
collectors:
  format: thrift # For example: 'clj-tomcat' for the Clojure Collector, 'thrift' for Thrift records, 'tsv/com.amazon.aws.cloudfront/wd_access_log' for Cloudfront access logs or 'ndjson/urbanairship.connect/v1' for UrbanAirship Connect events
enrich:
  versions:
    spark_enrich: 1.18.0 # Version of the Spark Enrichment process
  continue_on_unexpected_error: false # Set to 'true' (and set :out_errors: above) if you don't want any exceptions thrown from ETL
  output_compression: GZIP # Stream mode supports only GZIP
storage:
  versions:
    rdb_loader: 0.17.0
    rdb_shredder: 0.16.0        # Version of the Spark Shredding process
    hadoop_elasticsearch: 0.1.0 # Version of the Hadoop to Elasticsearch copying process
monitoring:
  tags:
    app: company ETL # Name-value pairs describing this job
  logging:
    level: DEBUG # You can optionally switch to INFO for production
#  company:
#    method: get
#    app_id: redshift_loader # e.g. company
#    collector: # e.g. d3rkrsqld9gmqf.cloudfront.net
#    protocol: http
#    port: 80

The previous issue is resolved after downgrading the shredder and loader versions to 0.14.0.

Now The RDB Loader is generating the following error:

Data discovery error with following issues:
JSONPath file [com.snowplowanalytics.snowplow/parent_event_1.json] was not found

@ihor Do you have any idea how I can fix this?

I’ve responded to that issue in the other thread - but I wanted to ask, is there any reason you’re setting up the older batch-based Spark version of the pipeline?

We stopped maintaining the batch pipeline a while ago, and it’s no longer supported. I suspect that many of the issues you’ve hit are because of this (dependencies clashes for example). The stream pipeline is our main focus, and a lot of work has been done to automate things - for example in the latest version the jsonpaths are handled for you.

I’m quite confident spark-enrich will still work, but it doesn’t surprise me that you’re hitting bumps in the road, and obviously over time it’ll become more and more out of date. I thought it’s worth mentioning in case you just weren’t aware of this. (I know our docs can be confusing so perhaps they misled you somewhere along the line).

Hi @Colm,

Thanks for your reply. We have a very old Snowplow pipeline in place that stores events in Snowflake. For some reason, we have to migrate from Snowflake to Redshift. Here is the current pipeline:

Raw S3 bucket -> Enrich -> Shred -> Parquet -> Transform + Load (Snowflake)

To change the storage engine I was planning to modify the pipeline like this:

Raw S3 bucket -> Enrich -> Shred -> Load (Redshift)

I don’t want to change anything outside of the ETL pipeline and end up with a similar schema in the storage engine to minimize the migration effort. There are a lot of recommendation services and BI tools that need to be modified accordingly.

  • Unfortunately, I could not find out which version of the EMR ETL Runner is the latest version in the documents.
  • It is also not clear to me which versions of enrich, shredder and Ioader are compatible with each version of ETL Runner.
  • I also don’t know how can I switch from Raw In buckets to enrich streams. Can you point me to the right documents explaining this?
  • I am wondering if the latest version supports the same Raw data format that is being gathered in the S3 buckets or not.

You are right, I am lost in old and new documents and don’t know which one should I use :frowning:

That is quite a complicated situation alright! There’s quite a bit to cover.

The team has reached out via direct message to chat further. :slight_smile: