Spark missing in Dataflow-runner

Hi all,
I’m new to Snowplow and dataflow-runner so it may be a misunderstanding but I’ve copied the setup guide exactly from here for loading data into snowflake:

Yet I keep getting an error when attempting to run the command
./dataflow-runner run-transient --emr-config=cluster.json --emr-playbook=playbook.json

The error is "Cannot run program "spark-submit" (in directory "."): error=2, No such file or directory"

I can verify that through cloudtrail that the command is sent to AWS EMR, the clsuter starts but for some reason some of the parameters I sent through my playbook.json are not making it’s way into EMR so it continuously fails

My cluster.json is almost an exact copy of the tutorial

{
  "schema":"iglu:com.snowplowanalytics.dataflowrunner/ClusterConfig/avro/1-1-0",
  "data":{
    "name":"dataflow-runner - snowflake transformer",
    "logUri":"s3://logs/data-snowplow-emr-etl-runner/",
    "region":"us-west-2",
    "credentials":{
      "accessKeyId":"xxxxxxxx",
      "secretAccessKey":"xxxxxx"
    },
    "roles":{
      "jobflow":"x",
      "service":"x"
    },
    "ec2":{
      "amiVersion":"6.1.0",
      "keyName":"test",
      "location":{
        "vpc":{
          "subnetId": "test"
        }
      },
      "instances":{
        "master":{
          "type":"m4.large"
        },
        "core":{
          "type":"r4.xlarge",
          "count":1,
          "ebsConfiguration":{
            "ebs_optimized": false,
            "ebsBlockDeviceConfigs": [
              {
                "volumesPerInstance" : 1

              }
            ]
          }
        },
        "task":{
          "type":"m4.large",
          "count":0,
          "bid":"0.015"
        }
      }
    },
    "tags":[ ],
    "bootstrapActionConfigs":[ ],
    "configurations":[
      {
        "classification":"core-site",
        "properties":{
          "Io.file.buffer.size":"65536"
        }
      },
      {
        "classification":"mapred-site",
        "properties":{
          "Mapreduce.user.classpath.first":"true"
        }
      },
      {
        "classification":"yarn-site",
        "properties":{
          "yarn.resourcemanager.am.max-attempts":"1"
        }
      },
      {
        "classification":"spark",
        "properties":{
          "maximizeResourceAllocation":"true"
        }
      }
    ],
    "applications":[ "Hadoop", "Spark" ]
  }
}

My playbook.json is also an exact copy

{
  "schema":"iglu:com.snowplowanalytics.dataflowrunner/PlaybookConfig/avro/1-0-1",
  "data":{
    "region":"region",
    "credentials":{
      "accessKeyId":"<%= ENV['AWS_ACCESS_KEY'] %>",
      "secretAccessKey":"<%= ENV['AWS_SECRET_KEY'] %>"
    },
    "steps":[
      {
        "type":"CUSTOM_JAR",
        "name":"Snowflake Transformer",
        "actionOnFailure":"CANCEL_AND_WAIT",
        "jar":"command-runner.jar",
        "arguments":[
          "spark-submit",
          "--conf",
          "spark.hadoop.mapreduce.job.outputformat.class=com.snowplowanalytics.snowflake.transformer.S3OutputFormat",
          "--deploy-mode",
          "cluster",
          "--class",
          "com.snowplowanalytics.snowflake.transformer.Main",

          "s3://snowplow-hosted-assets/4-storage/snowflake-loader/snowplow-snowflake-transformer-0.7.1.jar",

          "--config",
          "{{base64File "./targets/snowflake.json"}}",
          "--resolver",
          "{{base64File "resolver.json"}}",
          "--events-manifest",
          "{{base64File "dynamodb.json"}}"
        ]
      },

      {
        "type":"CUSTOM_JAR",
        "name":"Snowflake Loader",
        "actionOnFailure":"CANCEL_AND_WAIT",
        "jar":"s3://snowplow-hosted-assets/4-storage/snowflake-loader/snowplow-snowflake-loader-0.7.1.jar",
        "arguments":[
          "load",
          "--base64",
          "--config",
          "{{base64File "./targets/snowflake.json"}}",
          "--resolver",
          "{{base64File "./resolver.json"}}"
        ]
      }
    ],
    "tags":[ ]
  }
}

In the end dataflow-runner has all the control with setting up EMR, so I have no visibility into whats going wrong, the only thing I know is the following request that was sent from dataflow-runner through cloudtrail which does not specify the need for spark anywhere, even though I’ve added it to my cluster.json

 "requestParameters": {
        "name": "dataflow-runner - snowflake transformer",
        "logUri": "s3://logs/data-snowplow-emr-etl-runner/",
        "releaseLabel": "emr-6.1.0",
        "instances": {
            "instanceGroups": [
                {
                    "instanceRole": "MASTER",
                    "instanceType": "m4.large",
                    "instanceCount": 1
                },
                {
                    "instanceRole": "CORE",
                    "instanceType": "r4.xlarge",
                    "instanceCount": 1,
                    "ebsConfiguration": {
                        "ebsOptimized": false
                    }
                }
            ],
            "ec2KeyName": "test",
            "placement": {
                "availabilityZone": ""
            },
            "keepJobFlowAliveWhenNoSteps": true,
            "terminationProtected": false,
            "ec2SubnetId": "test"
        },
        "visibleToAllUsers": true,
        "jobFlowRole": "x",
        "serviceRole": "x"
    },

Did I mess something up? Any help would be really appreciated

1 Like

Hi @danrodrigues,

The last part is a little bit suspicious. What version of Dataflow Runner are you using? We extensively use 0.4.2 - can you try that out?

Hi @anton
I’m using dataflow-runner 0.5.0. Sure I’ll give that a try

Hey @danrodrigues! Sorry, just checked with our team - we use 0.4.1 everywhere and it would explain something as we’ve made some applications-related change in 0.4.2.

Hey @anton
I tried with both version 0.4.1 and 0.4.2
Getting the same error that it can’t find spark on the machine

Cannot run program "spark-submit" (in directory "."): error=2, No such file or directory

I’ve not a clue why

That’s very strange, @danrodrigues. You can try to bootstrap the EMR cluster youself - there’s no spec configuration apart from included Spark. After bootstrapping it and getting its jobflow id (like j-ZKIY4CKQRX72) you can then submit the playbook with:

dataflow-runner --emr-playbok playbook.json --emr-cluster $JOBFLOW

I’m just curious how your cluster gets bootstrapped without Spark.

Hey @anton
I’m not sure either how it happened. I followed the tutorial to a T.
I tried bootstrapping my own cluster but running into a different issues after adding the steps.

Idk if I’ve selected the right cluster configuration, I was hoping cluster.json would take care of all of that

After bootstrapping my own cluster using the following settings

I get the following error

[2020-10-30 22:50:29.095]Container exited with a non-zero exit code 13. Error file: prelaunch.err.
Last 4096 bytes of prelaunch.err :
Last 4096 bytes of stderr :
ala:67)
	at com.snowplowanalytics.snowflake.transformer.Main$.main(Main.scala:35)
	at com.snowplowanalytics.snowflake.transformer.Main.main(Main.scala)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:728)
20/10/30 22:50:28 INFO ApplicationMaster: Final app status: FAILED, exitCode: 13, (reason: User class threw exception: java.lang.NoSuchMethodError: scala.Predef$.refArrayOps([Ljava/lang/Object;)Lscala/collection/mutable/ArrayOps;
	at cats.effect.internals.IOAppPlatform$.mainFiber(IOAppPlatform.scala:36)
	at cats.effect.internals.IOAppPlatform$.main(IOAppPlatform.scala:24)
	at cats.effect.IOApp$class.main(IOApp.scala:67)
	at com.snowplowanalytics.snowflake.transformer.Main$.main(Main.scala:35)
	at com.snowplowanalytics.snowflake.transformer.Main.main(Main.scala)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:728)

Now I’m completely lost, because this is all code inside of snowflake

I did notice this warning

20/10/30 22:50:10 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
20/10/30 22:50:10 WARN DependencyUtils: Skip remote jar s3://snowplow-hosted-assets/4-storage/snowflake-loader/snowplow-snowflake-transformer-0.7.1.jar.

Hi @danrodrigues,

That error is about AMI/Transformer incompatibility. Transformer 0.8.0 works on AMI6 and pre-0.8.0 on AMI5 (AMI 5.16, IIRC). At least now it sees Spark for sure.

Hey @anton
Really appreciate the help.
We’re getting closer and closer as time goes on. I updated the transformer to 0.8.0 and am using AMI 6.0 and it gets past the transformer step but fails at the loader step for an error that makes no sense

2020-11-02T21:28:14.041Z: Launching Snowflake Loader. Fetching state from DynamoDB
2020-11-02T21:28:14.918Z: State fetched, acquiring DB connection
2020-11-02T21:28:16.309Z: DB connection acquired. Loading...
2020-11-02T21:28:16.972Z: Existing column [event_id VARCHAR(36) NOT NULL] doesn't match expected definition [event_id CHAR(36) NOT NULL UNIQUE] at position 7
2020-11-02T21:28:16.972Z: Existing column [domain_sessionidx INTEGER] doesn't match expected definition [domain_sessionidx SMALLINT] at position 17
2020-11-02T21:28:16.972Z: Existing column [geo_country VARCHAR(2)] doesn't match expected definition [geo_country CHAR(2)] at position 19
2020-11-02T21:28:16.972Z: Existing column [geo_region VARCHAR(3)] doesn't match expected definition [geo_region CHAR(3)] at position 20
2020-11-02T21:28:16.972Z: Existing column [tr_currency VARCHAR(3)] doesn't match expected definition [tr_currency CHAR(3)] at position 107
2020-11-02T21:28:16.972Z: Existing column [ti_currency VARCHAR(3)] doesn't match expected definition [ti_currency CHAR(3)] at position 111
2020-11-02T21:28:16.972Z: Existing column [base_currency VARCHAR(3)] doesn't match expected definition [base_currency CHAR(3)] at position 113
2020-11-02T21:28:16.972Z: Existing column [domain_sessionid VARCHAR(128)] doesn't match expected definition [domain_sessionid CHAR(128)] at position 121

I tried manually modifying the column type to be char instead of varchar but to snowflake it seems to be the same thing. Any idea of what it could be?

@danrodrigues, those are not errors (though look like one) and could be ignored (as you said “it seems to be the same thing”). Anything else in the logs?

@ihor
Unfortunately not. It seems to end “successfully” but nothing happened. My “input” folder is still there, and nothing in either “stageUrl” or “badOutputUrl” or in Snowflake tables. Is there a better way to debug this?

Hi @ihor or @anton
Any guidance you can provide as to why nothing happened after running the data loader? It didn’t fail, there’s nothing in the logs, yet no data has been moved or uploaded

Hi @danrodrigues,

Can you check your DynamoDB manifest and show us any rows from there?

Hey @anton
Sure, yeah there’s 0 rows in the manifest table in DynamoDB

That’s a good sign that Transformer couldn’t find any data in the enriched archive (or couldn’t access the manifest, but less likely). Could you look at the S3 path from input config property and check there are some folders with enriched data.

Sorry @anton been a while since I’ve been able to get back to this
responding to your question re: Could you look at the s3 path from input config what format does it expect the files to be in?

I added some .lzo files from the stream-enrich process.
I also added some .gzip files from the archived enriched files

Neither seem to do anything. It completes successfully but there’s nothing in dynamoDB and no errors. Is there some kind of debug logging I can turn on?

As always appreciate how helpful you’ve been

Hey @anton
Just following up on this. I’ve tried just about everything to get this working on my own, I’ve:

  1. verified the AMI version and Transformer/Loader versions match according to the tutorial
  2. removed the Cross batch De-Duplication step
  3. verified the Snowflake role has full access to DynamoDB and all S3 buckets mentioned in the config
  4. verified the storage integration works on its own.
  5. I’ve tried running fully through with .LZO compressed stream enriched files
  6. I’ve tried running fully through with .GZ compressed enriched archive files
  7. I’ve verified that snowflake is accessible because I can see some commands that were run in the history for that user

I’m basically at my wits end here.
It runs fully through both the transformer and loader steps and yet nothing happens. I don’t see any way of turning on verbose logging. I don’t know why I’m having so much trouble starting this dataflow-runner when I’m following the exact tutorial online. If you want me to open a new support ticket for this I can, but I am in serious need of help after fighting with this for days, maybe over a week by now

Best,
Dan

1 Like

Hi @danrodrigues @anton

having the same exact issue with transformer and loader version 0.8.0 and dataflow_runner_0.5.0
All stages completed without errors yet nothing loaded to snowflake.

I do see records in DynamoDB manifest table. also notice the column LoadedBy is not there (was for 0.4.2)

Will appreciate any help on this

Best,
Shai

Hi @danrodrigues,

Have you managed to solve this? As I mentioned in my previous post, I think your problem is that there’s no data in input. Transformer expects certain folder structure there, created for example by EmrEtlRunner

+ s3://your-bucket/enriched/archive/
  + run=2020-12-05-12-33-11/
  + run=2020-12-05-13-32-59/
  + run=2020-12-05-14-33-05/
  ...

Those run=... folders is what Transformer expects to find there. If it cannot find anything matching that format - it will conclude there’s nothing to process.

@eko, I think your problem is different, I’ll respond in a separate post.

Hi @anton,

Regarding the folder structure you mentioned (run=…),

  1. Is there a way to enforce this structure if my pipeline uses the Scala Stream Collector, Stream Enrich, and the s3-loader (as opposed to using EmrEtlRunner)?
  2. If so, did I miss something in the tutorial? If not, what options might I have?

Note: I got the same error reported by @danrodrigues on Nov 2:
(Spark missing in Dataflow-runner).

Let me know if I can provide more information.

Thank you,
Joseph