EmrEtlRunner fails at Hadoop Shred step

Hello all!

We’re quite new to Snowplow, trying to set a pipeline up in AWS, but our Hadoop step 3 fails with a Not a file exception. We’re using Stream Enrich on the raw stream and Kinesis Firehose to get the enriched stream into an S3 bucket in GZIP.

When running the EmrEtlRunner the Hadoop cluster is spun up and the first 2 step completes: [staging_stream_enrich] s3-dist-cp and [shred] s3-dist-cp.

However, the 3rd step [shred] spark fails with the exception found at the bottom (this is from an ec2 instance which we found in the Hadoop stderr logs).

This is our current configuration:

aws:
  # Credentials can be hardcoded or set in environment variables
  access_key_id: <%= ENV['AWS_SNOWPLOW_ACCESS_KEY'] %>
  secret_access_key: <%= ENV['AWS_SNOWPLOW_SECRET_KEY'] %>
  s3:
    region: eu-west-2
    buckets:
      assets: s3://snowplow-hosted-assets # DO NOT CHANGE unless you are hosting the jarfiles etc yourself in your own bucket
      jsonpath_assets: # If you have defined your own JSON Schemas, add the s3:// path to your own JSON Path files in your own bucket here
      log: s3://REDACTED-snowplow-log-bucket
      encrypted: false
      enriched:
        good: s3://REDACTED-snowplow-out-bucket/enriched/good       # e.g. s3://my-out-bucket/enriched/good
        archive: s3://REDACTED-snowplow-archive-bucket/enriched    # Where to archive enriched events to, e.g. s3://my-archive-bucket/enriched
        stream: s3://REDACTED-snowplow-bucket/enriched/good     # S3 Loader's OR Firehose Kinesis output folder with enriched data. If present raw buckets will be discarded
      shredded:
        good: s3://REDACTED-snowplow-out-bucket/shredded/good       # e.g. s3://my-out-bucket/shredded/good
        bad: s3://REDACTED-snowplow-out-bucket/shredded/bad        # e.g. s3://my-out-bucket/shredded/bad
        errors:      # Leave blank unless :continue_on_unexpected_error: set to true below
        archive: s3://REDACTED-snowplow-archive-bucket/shredded    # Where to archive shredded events to, e.g. s3://my-archive-bucket/shredded
    consolidate_shredded_output: false # Whether to combine files when copying from hdfs to s3
  emr:
    ami_version: 5.9.0
    region: eu-west-2        # Always set this
    jobflow_role: EMR_EC2_DefaultRole # Created using $ aws emr create-default-roles
    service_role: EMR_DefaultRole     # Created using $ aws emr create-default-roles
    placement: # Set this if not running in VPC. Leave blank otherwise
    ec2_subnet_id: subnet-REDACTED # Set this if running in VPC. Leave blank otherwise
    ec2_key_name: snowplow-runner-key
    security_configuration: # Specify your EMR security configuration if needed. Leave blank otherwise
    bootstrap: []           # Set this to specify custom boostrap actions. Leave empty otherwise
    software:
      hbase:                # Optional. To launch on cluster, provide version, "0.92.0", keep quotes. Leave empty otherwise.
      lingual:              # Optional. To launch on cluster, provide version, "1.1", keep quotes. Leave empty otherwise.
    # Adjust your Hadoop cluster below
    jobflow:
      job_name: Snowplow ETL # Give your job a name
      master_instance_type: m4.large
      core_instance_count: 2
      core_instance_type: m4.large
      core_instance_ebs:    # Optional. Attach an EBS volume to each core instance.
        volume_size: 100    # Gigabytes
        volume_type: "gp2"
        volume_iops: 400    # Optional. Will only be used if volume_type is "io1"
        ebs_optimized: false # Optional. Will default to true
      task_instance_count: 0 # Increase to use spot instances
      task_instance_type: m4.large
      task_instance_bid: # In USD. Adjust bid, or leave blank for non-spot-priced (i.e. on-demand) task instances
    bootstrap_failure_tries: 3 # Number of times to attempt the job in the event of bootstrap failures
    configuration:
      yarn-site:
        yarn.resourcemanager.am.max-attempts: "1"
      spark:
        maximizeResourceAllocation: "true"
    additional_info:        # Optional JSON string for selecting additional features
enrich:
  versions:
    spark_enrich: 1.18.0 # Version of the Spark Enrichment process
  output_compression: GZIP # Stream mode supports only GZIP
storage:
  versions:
    rdb_loader: 0.16.0-rc2
    rdb_shredder: 0.15.0-rc3        # Version of the Spark Shredding process
    hadoop_elasticsearch: 0.1.0 # Version of the Hadoop to Elasticsearch copying process
monitoring:
  tags: {} # Name-value pairs describing this job
  logging:
    level: DEBUG # You can optionally switch to INFO for production
  snowplow:
    method: get
    app_id: snowplow-runner-app # e.g. snowplow
    collector: SnowplowCollector-env.REDACTED.eu-west-2.elasticbeanstalk.com # e.g. d3rkrsqld9gmqf.cloudfront.net
    protocol: http
    port: 80

LOGS:

19/07/31 21:31:42 INFO RMProxy: Connecting to ResourceManager at ip-172-31-25-175.eu-west-2.compute.internal/172.31.25.175:8030
19/07/31 21:31:42 INFO YarnRMClient: Registering the ApplicationMaster
19/07/31 21:31:42 INFO Utils: Using initial executors = 2, max of spark.dynamicAllocation.initialExecutors, spark.dynamicAllocation.minExecutors and spark.executor.instances
19/07/31 21:31:42 INFO YarnAllocator: Will request 2 executor container(s), each with 4 core(s) and 5248 MB memory (including 477 MB of overhead)
19/07/31 21:31:42 INFO YarnAllocator: Submitted 2 unlocalized container requests.
19/07/31 21:31:42 INFO ApplicationMaster: Started progress reporter thread with (heartbeat : 3000, initial allocation : 200) intervals
19/07/31 21:31:43 INFO AMRMClientImpl: Received new token for : ip-172-31-24-222.eu-west-2.compute.internal:8041
19/07/31 21:31:43 INFO YarnAllocator: Launching container container_1564608487795_0003_01_000002 on host ip-172-31-24-222.eu-west-2.compute.internal for executor with ID 1
19/07/31 21:31:43 INFO YarnAllocator: Received 1 containers from YARN, launching executors on 1 of them.
19/07/31 21:31:43 INFO ContainerManagementProtocolProxy: yarn.client.max-cached-nodemanagers-proxies : 0
19/07/31 21:31:43 INFO ContainerManagementProtocolProxy: Opening proxy : ip-172-31-24-222.eu-west-2.compute.internal:8041
19/07/31 21:31:48 INFO YarnSchedulerBackend$YarnDriverEndpoint: Registered executor NettyRpcEndpointRef(spark-client://Executor) (172.31.24.222:48674) with ID 1
19/07/31 21:31:48 INFO ExecutorAllocationManager: New executor 1 has registered (new total is 1)
19/07/31 21:31:48 INFO BlockManagerMasterEndpoint: Registering block manager ip-172-31-24-222.eu-west-2.compute.internal:40069 with 2.6 GB RAM, BlockManagerId(1, ip-172-31-24-222.eu-west-2.compute.internal, 40069, None)
19/07/31 21:32:11 INFO YarnClusterSchedulerBackend: SchedulerBackend is ready for scheduling beginning after waiting maxRegisteredResourcesWaitingTime: 30000(ms)
19/07/31 21:32:11 INFO YarnClusterScheduler: YarnClusterScheduler.postStartHook done
19/07/31 21:32:11 INFO SharedState: Setting hive.metastore.warehouse.dir ('null') to the value of spark.sql.warehouse.dir ('hdfs:///user/spark/warehouse').
19/07/31 21:32:11 INFO SharedState: Warehouse path is 'hdfs:///user/spark/warehouse'.
19/07/31 21:32:11 INFO StateStoreCoordinatorRef: Registered StateStoreCoordinator endpoint
19/07/31 21:32:13 INFO MemoryStore: Block broadcast_0 stored as values in memory (estimated size 302.6 KB, free 3.1 GB)
19/07/31 21:32:13 INFO MemoryStore: Block broadcast_0_piece0 stored as bytes in memory (estimated size 26.4 KB, free 3.1 GB)
19/07/31 21:32:13 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on 172.31.19.254:46581 (size: 26.4 KB, free: 3.1 GB)
19/07/31 21:32:14 INFO SparkContext: Created broadcast 0 from textFile at ShredJob.scala:278
19/07/31 21:32:16 INFO FileOutputCommitter: File Output Committer Algorithm version is 1
19/07/31 21:32:16 INFO SQLHadoopMapReduceCommitProtocol: Using output committer class org.apache.hadoop.mapreduce.lib.output.DirectFileOutputCommitter
19/07/31 21:32:16 INFO SparkContext: Starting job: text at ShredJob.scala:356
19/07/31 21:32:16 INFO GPLNativeCodeLoader: Loaded native gpl library
19/07/31 21:32:16 INFO LzoCodec: Successfully loaded & initialized native-lzo library [hadoop-lzo rev 5f779390180acb8f1d86999c0a0294917976289f]
19/07/31 21:32:16 INFO FileInputFormat: Total input paths to process : 1
19/07/31 21:32:16 WARN DAGScheduler: Creating new stage failed due to exception - job: 0
java.io.IOException: Not a file: hdfs://ip-172-31-25-175.eu-west-2.compute.internal:8020/local/snowplow/enriched-events/2019/07
	at org.apache.hadoop.mapred.FileInputFormat.getSplits(FileInputFormat.java:288)
	at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:194)
	at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:252)
	at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:250)
	at scala.Option.getOrElse(Option.scala:121)
	at org.apache.spark.rdd.RDD.partitions(RDD.scala:250)
	at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35)
	at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:252)
	at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:250)
	at scala.Option.getOrElse(Option.scala:121)
	at org.apache.spark.rdd.RDD.partitions(RDD.scala:250)
	at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35)
	at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:252)
	at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:250)
	at scala.Option.getOrElse(Option.scala:121)
	at org.apache.spark.rdd.RDD.partitions(RDD.scala:250)
	at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35)
	at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:252)
	at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:250)
	at scala.Option.getOrElse(Option.scala:121)
	at org.apache.spark.rdd.RDD.partitions(RDD.scala:250)
	at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35)
	at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:252)
	at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:250)
	at scala.Option.getOrElse(Option.scala:121)
	at org.apache.spark.rdd.RDD.partitions(RDD.scala:250)
	at org.apache.spark.ShuffleDependency.<init>(Dependency.scala:91)
	at org.apache.spark.rdd.ShuffledRDD.getDependencies(ShuffledRDD.scala:87)
	at org.apache.spark.rdd.RDD$$anonfun$dependencies$2.apply(RDD.scala:239)
	at org.apache.spark.rdd.RDD$$anonfun$dependencies$2.apply(RDD.scala:237)
	at scala.Option.getOrElse(Option.scala:121)
	at org.apache.spark.rdd.RDD.dependencies(RDD.scala:237)
	at org.apache.spark.scheduler.DAGScheduler.getShuffleDependencies(DAGScheduler.scala:472)
	at org.apache.spark.scheduler.DAGScheduler.getMissingAncestorShuffleDependencies(DAGScheduler.scala:439)
	at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$getOrCreateShuffleMapStage(DAGScheduler.scala:346)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$getOrCreateParentStages$1.apply(DAGScheduler.scala:422)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$getOrCreateParentStages$1.apply(DAGScheduler.scala:421)
	at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
	at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
	at scala.collection.mutable.HashSet.foreach(HashSet.scala:78)
	at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
	at scala.collection.mutable.AbstractSet.scala$collection$SetLike$$super$map(Set.scala:46)
	at scala.collection.SetLike$class.map(SetLike.scala:92)
	at scala.collection.mutable.AbstractSet.map(Set.scala:46)
	at org.apache.spark.scheduler.DAGScheduler.getOrCreateParentStages(DAGScheduler.scala:421)
	at org.apache.spark.scheduler.DAGScheduler.createResultStage(DAGScheduler.scala:408)
	at org.apache.spark.scheduler.DAGScheduler.handleJobSubmitted(DAGScheduler.scala:891)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1868)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1860)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1849)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
19/07/31 21:32:16 INFO DAGScheduler: Job 0 failed: text at ShredJob.scala:356, took 0.085470 s
19/07/31 21:32:16 ERROR FileFormatWriter: Aborting job null.
java.io.IOException: Not a file: hdfs://ip-172-31-25-175.eu-west-2.compute.internal:8020/local/snowplow/enriched-events/2019/07
	at org.apache.hadoop.mapred.FileInputFormat.getSplits(FileInputFormat.java:288)
	at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:194)
	at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:252)
	at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:250)
	at scala.Option.getOrElse(Option.scala:121)
	at org.apache.spark.rdd.RDD.partitions(RDD.scala:250)
	at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35)
	at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:252)
	at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:250)
	at scala.Option.getOrElse(Option.scala:121)
	at org.apache.spark.rdd.RDD.partitions(RDD.scala:250)
	at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35)
	at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:252)
	at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:250)
	at scala.Option.getOrElse(Option.scala:121)
	at org.apache.spark.rdd.RDD.partitions(RDD.scala:250)
	at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35)
	at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:252)
	at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:250)
	at scala.Option.getOrElse(Option.scala:121)
	at org.apache.spark.rdd.RDD.partitions(RDD.scala:250)
	at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35)
	at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:252)
	at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:250)
	at scala.Option.getOrElse(Option.scala:121)
	at org.apache.spark.rdd.RDD.partitions(RDD.scala:250)
	at org.apache.spark.ShuffleDependency.<init>(Dependency.scala:91)
	at org.apache.spark.rdd.ShuffledRDD.getDependencies(ShuffledRDD.scala:87)
	at org.apache.spark.rdd.RDD$$anonfun$dependencies$2.apply(RDD.scala:239)
	at org.apache.spark.rdd.RDD$$anonfun$dependencies$2.apply(RDD.scala:237)
	at scala.Option.getOrElse(Option.scala:121)
	at org.apache.spark.rdd.RDD.dependencies(RDD.scala:237)
	at org.apache.spark.scheduler.DAGScheduler.getShuffleDependencies(DAGScheduler.scala:472)
	at org.apache.spark.scheduler.DAGScheduler.getMissingAncestorShuffleDependencies(DAGScheduler.scala:439)
	at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$getOrCreateShuffleMapStage(DAGScheduler.scala:346)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$getOrCreateParentStages$1.apply(DAGScheduler.scala:422)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$getOrCreateParentStages$1.apply(DAGScheduler.scala:421)
	at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
	at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
	at scala.collection.mutable.HashSet.foreach(HashSet.scala:78)
	at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
	at scala.collection.mutable.AbstractSet.scala$collection$SetLike$$super$map(Set.scala:46)
	at scala.collection.SetLike$class.map(SetLike.scala:92)
	at scala.collection.mutable.AbstractSet.map(Set.scala:46)
	at org.apache.spark.scheduler.DAGScheduler.getOrCreateParentStages(DAGScheduler.scala:421)
	at org.apache.spark.scheduler.DAGScheduler.createResultStage(DAGScheduler.scala:408)
	at org.apache.spark.scheduler.DAGScheduler.handleJobSubmitted(DAGScheduler.scala:891)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1868)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1860)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1849)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:671)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2022)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply$mcV$sp(FileFormatWriter.scala:188)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:173)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:173)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:65)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:173)
	at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:145)
	at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:58)
	at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:56)
	at org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:74)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:117)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:117)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:138)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:135)
	at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:116)
	at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:92)
	at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:92)
	at org.apache.spark.sql.execution.datasources.DataSource.writeInFileFormat(DataSource.scala:438)
	at org.apache.spark.sql.execution.datasources.DataSource.write(DataSource.scala:474)
	at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:48)
	at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:58)
	at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:56)
	at org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:74)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:117)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:117)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:138)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:135)
	at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:116)
	at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:92)
	at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:92)
	at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:610)
	at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:233)
	at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:217)
	at org.apache.spark.sql.DataFrameWriter.text(DataFrameWriter.scala:555)
	at com.snowplowanalytics.snowplow.storage.spark.ShredJob.run(ShredJob.scala:356)
	at com.snowplowanalytics.snowplow.storage.spark.ShredJob$$anonfun$runJob$1.apply$mcV$sp(ShredJob.scala:162)
	at com.snowplowanalytics.snowplow.storage.spark.ShredJob$$anonfun$runJob$1.apply(ShredJob.scala:162)
	at com.snowplowanalytics.snowplow.storage.spark.ShredJob$$anonfun$runJob$1.apply(ShredJob.scala:162)
	at scala.util.Try$.apply(Try.scala:192)
	at com.snowplowanalytics.snowplow.storage.spark.ShredJob$.runJob(ShredJob.scala:162)
	at com.snowplowanalytics.snowplow.storage.spark.ShredJob$.run(ShredJob.scala:155)
	at com.snowplowanalytics.snowplow.storage.spark.SparkJob$class.main(SparkJob.scala:32)
	at com.snowplowanalytics.snowplow.storage.spark.ShredJob$.main(ShredJob.scala:63)
	at com.snowplowanalytics.snowplow.storage.spark.ShredJob.main(ShredJob.scala)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:635)
19/07/31 21:32:16 ERROR ApplicationMaster: User class threw exception: org.apache.spark.SparkException: Job aborted.
org.apache.spark.SparkException: Job aborted.
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply$mcV$sp(FileFormatWriter.scala:215)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:173)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:173)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:65)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:173)
	at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:145)
	at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:58)
	at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:56)
	at org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:74)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:117)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:117)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:138)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:135)
	at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:116)
	at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:92)
	at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:92)
	at org.apache.spark.sql.execution.datasources.DataSource.writeInFileFormat(DataSource.scala:438)
	at org.apache.spark.sql.execution.datasources.DataSource.write(DataSource.scala:474)
	at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:48)
	at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:58)
	at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:56)
	at org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:74)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:117)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:117)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:138)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:135)
	at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:116)
	at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:92)
	at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:92)
	at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:610)
	at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:233)
	at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:217)
	at org.apache.spark.sql.DataFrameWriter.text(DataFrameWriter.scala:555)
	at com.snowplowanalytics.snowplow.storage.spark.ShredJob.run(ShredJob.scala:356)
	at com.snowplowanalytics.snowplow.storage.spark.ShredJob$$anonfun$runJob$1.apply$mcV$sp(ShredJob.scala:162)
	at com.snowplowanalytics.snowplow.storage.spark.ShredJob$$anonfun$runJob$1.apply(ShredJob.scala:162)
	at com.snowplowanalytics.snowplow.storage.spark.ShredJob$$anonfun$runJob$1.apply(ShredJob.scala:162)
	at scala.util.Try$.apply(Try.scala:192)
	at com.snowplowanalytics.snowplow.storage.spark.ShredJob$.runJob(ShredJob.scala:162)
	at com.snowplowanalytics.snowplow.storage.spark.ShredJob$.run(ShredJob.scala:155)
	at com.snowplowanalytics.snowplow.storage.spark.SparkJob$class.main(SparkJob.scala:32)
	at com.snowplowanalytics.snowplow.storage.spark.ShredJob$.main(ShredJob.scala:63)
	at com.snowplowanalytics.snowplow.storage.spark.ShredJob.main(ShredJob.scala)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:635)
Caused by: java.io.IOException: Not a file: hdfs://ip-172-31-25-175.eu-west-2.compute.internal:8020/local/snowplow/enriched-events/2019/07
	at org.apache.hadoop.mapred.FileInputFormat.getSplits(FileInputFormat.java:288)
	at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:194)
	at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:252)
	at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:250)
	at scala.Option.getOrElse(Option.scala:121)
	at org.apache.spark.rdd.RDD.partitions(RDD.scala:250)
	at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35)
	at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:252)
	at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:250)
	at scala.Option.getOrElse(Option.scala:121)
	at org.apache.spark.rdd.RDD.partitions(RDD.scala:250)
	at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35)
	at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:252)
	at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:250)
	at scala.Option.getOrElse(Option.scala:121)
	at org.apache.spark.rdd.RDD.partitions(RDD.scala:250)
	at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35)
	at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:252)
	at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:250)
	at scala.Option.getOrElse(Option.scala:121)
	at org.apache.spark.rdd.RDD.partitions(RDD.scala:250)
	at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35)
	at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:252)
	at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:250)
	at scala.Option.getOrElse(Option.scala:121)
	at org.apache.spark.rdd.RDD.partitions(RDD.scala:250)
	at org.apache.spark.ShuffleDependency.<init>(Dependency.scala:91)
	at org.apache.spark.rdd.ShuffledRDD.getDependencies(ShuffledRDD.scala:87)
	at org.apache.spark.rdd.RDD$$anonfun$dependencies$2.apply(RDD.scala:239)
	at org.apache.spark.rdd.RDD$$anonfun$dependencies$2.apply(RDD.scala:237)
	at scala.Option.getOrElse(Option.scala:121)
	at org.apache.spark.rdd.RDD.dependencies(RDD.scala:237)
	at org.apache.spark.scheduler.DAGScheduler.getShuffleDependencies(DAGScheduler.scala:472)
	at org.apache.spark.scheduler.DAGScheduler.getMissingAncestorShuffleDependencies(DAGScheduler.scala:439)
	at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$getOrCreateShuffleMapStage(DAGScheduler.scala:346)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$getOrCreateParentStages$1.apply(DAGScheduler.scala:422)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$getOrCreateParentStages$1.apply(DAGScheduler.scala:421)
	at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
	at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
	at scala.collection.mutable.HashSet.foreach(HashSet.scala:78)
	at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
	at scala.collection.mutable.AbstractSet.scala$collection$SetLike$$super$map(Set.scala:46)
	at scala.collection.SetLike$class.map(SetLike.scala:92)
	at scala.collection.mutable.AbstractSet.map(Set.scala:46)
	at org.apache.spark.scheduler.DAGScheduler.getOrCreateParentStages(DAGScheduler.scala:421)
	at org.apache.spark.scheduler.DAGScheduler.createResultStage(DAGScheduler.scala:408)
	at org.apache.spark.scheduler.DAGScheduler.handleJobSubmitted(DAGScheduler.scala:891)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1868)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1860)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1849)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:671)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2022)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply$mcV$sp(FileFormatWriter.scala:188)
	... 49 more
19/07/31 21:32:16 INFO ApplicationMaster: Final app status: FAILED, exitCode: 15, (reason: User class threw exception: org.apache.spark.SparkException: Job aborted.)
19/07/31 21:32:16 INFO SparkContext: Invoking stop() from shutdown hook
19/07/31 21:32:16 INFO SparkUI: Stopped Spark web UI at http://ip-172-31-19-254.eu-west-2.compute.internal:42899
19/07/31 21:32:16 INFO YarnAllocator: Driver requested a total number of 0 executor(s).
19/07/31 21:32:16 INFO YarnClusterSchedulerBackend: Shutting down all executors
19/07/31 21:32:16 INFO YarnSchedulerBackend$YarnDriverEndpoint: Asking each executor to shut down
19/07/31 21:32:16 INFO SchedulerExtensionServices: Stopping SchedulerExtensionServices
(serviceOption=None,
 services=List(),
 started=false)
19/07/31 21:32:16 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!
19/07/31 21:32:16 INFO MemoryStore: MemoryStore cleared
19/07/31 21:32:16 INFO BlockManager: BlockManager stopped
19/07/31 21:32:16 INFO BlockManagerMaster: BlockManagerMaster stopped
19/07/31 21:32:16 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!
19/07/31 21:32:16 INFO SparkContext: Successfully stopped SparkContext
19/07/31 21:32:16 INFO ApplicationMaster: Unregistering ApplicationMaster with FAILED (diag message: User class threw exception: org.apache.spark.SparkException: Job aborted.)
19/07/31 21:32:16 INFO AMRMClientImpl: Waiting for application to be successfully unregistered.
19/07/31 21:32:16 INFO ApplicationMaster: Deleting staging directory hdfs://ip-172-31-25-175.eu-west-2.compute.internal:8020/user/hadoop/.sparkStaging/application_1564608487795_0003
19/07/31 21:32:16 INFO ShutdownHookManager: Shutdown hook called
19/07/31 21:32:16 INFO ShutdownHookManager: Deleting directory /mnt/yarn/usercache/hadoop/appcache/application_1564608487795_0003/spark-3b7990ca-028a-492b-a4be-217a2898bf62

Could you please help to see what we’re missing?

Okay, after swapping Firehose with the S3 Loader we managed to get to the 6th step. Now we’re getting an error there DecodingFailure at .id: Attempt to decode value on failed cursor.
Doing some research we suspect that our config isn’t quite right just yet…

@Attila_Molnar, the error to look for is

Caused by: java.io.IOException: Not a file: hdfs://ip-172-31-25-175.eu-west-2.compute.internal:8020/local/snowplow/enriched-events/2019/07

It looks like the enriched data that you sank to S3 is not formed correctly. I wouldn’t expect to see the folder /2019/07. What is expected is the pattern run=2019-07-xx-xx-xx-xx/actual_file_here.

@ihor Thank you! As I mentioned, the Not a file error was solved by using the S3 Loader instead of Kinesis Firehose. That was failing on step 3. Now we’re having difficulties on step 6 where the rdbloader is complaining about a DecodingFailure at .id: Attempt to decode value on failed cursor. We managed to run the rdbloader jar on our local machines and it fails with the same error, not sure how to debug further.

Hi :wave: I’m working with @Attila_Molnar on this issue.

We’ve managed to resolve this. There was a missing id field on the Redshift target configuration.

2 Likes

@egmcdonald I’m with the same error “DecodingFailure at .id: Attempt to decode value on failed cursor”
How you resolve this?