Upgraded to snowflake loader 0.8.0 but data is not loaded

Hi,

Upgraded to transformer and loader version 0.8.0 and dataflow_runner_0.5.0
All stages completed without errors yet nothing loaded to snowflake.
This is what i see in EMR logs:

2020-11-23T05:14:24.944Z: Launching Snowflake Loader. Fetching state from DynamoDB
2020-11-23T05:14:26.576Z: State fetched, acquiring DB connection
2020-11-23T05:14:29.361Z: DB connection acquired. Loading...
2020-11-23T05:14:30.734Z: Existing column [event_id VARCHAR(36) NOT NULL] doesn't match expected definition [event_id CHAR(36) NOT NULL UNIQUE] at position 7
2020-11-23T05:14:30.734Z: Existing column [domain_sessionidx INTEGER] doesn't match expected definition [domain_sessionidx SMALLINT] at position 17
2020-11-23T05:14:30.735Z: Existing column [geo_country VARCHAR(2)] doesn't match expected definition [geo_country CHAR(2)] at position 19
2020-11-23T05:14:30.735Z: Existing column [geo_region VARCHAR(3)] doesn't match expected definition [geo_region CHAR(3)] at position 20
2020-11-23T05:14:30.735Z: Existing column [tr_currency VARCHAR(3)] doesn't match expected definition [tr_currency CHAR(3)] at position 107
2020-11-23T05:14:30.735Z: Existing column [ti_currency VARCHAR(3)] doesn't match expected definition [ti_currency CHAR(3)] at position 111
2020-11-23T05:14:30.735Z: Existing column [base_currency VARCHAR(3)] doesn't match expected definition [base_currency CHAR(3)] at position 113
2020-11-23T05:14:30.736Z: Existing column [domain_sessionid VARCHAR(128)] doesn't match expected definition [domain_sessionid CHAR(128)] at position 121
2020-11-23T05:14:32.730Z: Warehouse snowplow_wh resumed
2020-11-23T05:14:32.747Z: Success. Exiting...

This is the playbook config:

{
    "schema":"iglu:com.snowplowanalytics.dataflowrunner/PlaybookConfig/avro/1-0-1",
    "data":{
        "region":"us-east-1",
        "credentials":{
            "accessKeyId":"env",
            "secretAccessKey":"env"
        },
        "steps": [
            {
                "type": "CUSTOM_JAR",
                "name": "S3 Copy",
                "actionOnFailure": "CANCEL_AND_WAIT",
                "jar": "/usr/share/aws/emr/s3-dist-cp/lib/s3-dist-cp.jar",
                "arguments": [
                    "--src", "s3://snowplowstream/",
                    "--dest", "s3://enriched/archive/run={{nowWithFormat "2006-01-02-15-04-05"}}/",
                    "--s3Endpoint", "s3.amazonaws.com",
                    "--srcPattern", ".*\\.gz",
                    "--deleteOnSuccess",
                    "--s3ServerSideEncryption"
                ]
            },
            {
                "type":"CUSTOM_JAR",
                "name":"Snowflake Transformer",
                "actionOnFailure":"CANCEL_AND_WAIT",
                "jar":"command-runner.jar",
                "arguments":[
                    "spark-submit",
                        "--conf", "spark.hadoop.mapreduce.job.outputformat.class=com.snowplowanalytics.snowflake.transformer.S3OutputFormat",
                        "--deploy-mode", "cluster",
                        "--class", "com.snowplowanalytics.snowflake.transformer.Main",
                        "s3://snowplow-hosted-assets/4-storage/snowflake-loader/snowplow-snowflake-transformer-0.8.0.jar",
                        "--config", "{{base64File "./conf/snowflakeloader.json"}}",
                        "--resolver", "{{base64File "./conf/iglu_resolver.json"}}",
                        "--events-manifest", "{{base64File "./conf/dedupe_manifest.json"}}"
                ]
            },
            {
                "type":"CUSTOM_JAR",
                "name":"Snowflake Loader",
                "actionOnFailure":"CANCEL_AND_WAIT",
                "jar":"s3://snowplow-hosted-assets/4-storage/snowflake-loader/snowplow-snowflake-loader-0.8.0.jar",
                "arguments": [
                    "load",
                        "--base64",
                        "--config", "{{base64File "./conf/snowflakeloader.json"}}",
                        "--resolver", "{{base64File "./conf/iglu_resolver.json"}}"
                ]
            }
        ],
        "tags":[ ]
    }
}

Cluster config:

{
   "schema": "iglu:com.snowplowanalytics.dataflowrunner/ClusterConfig/avro/1-1-0",
   "data":{
      "name": "Snowflake Pipeline",
      "logUri": "s3://dataflowrunnerlogs",
      "region": "us-east-1",
      "credentials":{
         "accessKeyId": "env",
         "secretAccessKey": "env"
      },
      "roles":{
         "jobflow":"xxxxx",
         "service":"xxxxx"
      },
      "ec2":{
         "amiVersion": "6.1.0",
         "keyName": "xxxx",
         "location":{
            "vpc":{
               "subnetId": "xxxx"
            }
         },
         "instances":{
            "master":{
               "type":"m4.large"
            },
            "core": {
               "type": "r4.8xlarge",
               "count": 3,
               "ebsConfiguration": {
                 "ebsOptimized": true,
                 "ebsBlockDeviceConfigs": [
                   {
                     "volumesPerInstance": 1,
                     "volumeSpecification": {
                       "iops": 1500,
                       "sizeInGB": 320,
                       "volumeType": "io1"
                     }
                   }
                 ]
               }
             },
            "task":{
               "type":"m1.medium",
               "count": 0,
               "bid":"0.015"
            }
         }
      },
      "tags":[ ],
      "bootstrapActionConfigs":[ ],
      "configurations": [
         {
           "classification": "yarn-site",
           "properties": {
             "yarn.nodemanager.vmem-check-enabled": "false",
             "yarn.nodemanager.resource.memory-mb": "57344",
             "yarn.scheduler.maximum-allocation-mb": "57344"
           }
         },
         {
           "classification": "spark",
           "properties": {
             "maximizeResourceAllocation": "false"
           }
         },
         {
           "classification": "spark-defaults",
           "properties": {
             "spark.dynamicAllocationEnabled": "false",
             "spark.executor.instances": "6",
             "spark.yarn.executor.memoryOverhead": "1024",
             "spark.executor.memory": "7G",
             "spark.executor.cores": "1",
             "spark.yarn.driver.memoryOverhead": "1024",
             "spark.driver.memory": "7G",
             "spark.driver.cores": "1",
             "spark.default.parallelism": "24"
           }
         }
       ],
      "applications":[ "Hadoop", "Spark" ]
   }
}

snowflake loader config:

{
    "schema": "iglu:com.snowplowanalytics.snowplow.storage/snowflake_config/jsonschema/1-0-3",
    "data": {
        "name": "Snowflake Storage Target",
        "auth": {
            "roleArn": "SnowflakeRole",
            "sessionDuration": 900
        },
        "awsRegion": "us-east-1",
        "manifest": "snowflake-run-manifest",
        "snowflakeRegion": "east-us-2.azure",
        "database": "ekocom_snowplow",
        "input": "s3://enriched/archive/",
        "stage": "snowplow_stage",
        "stageUrl": "s3://snowflake/stage/",
        "warehouse": "snowplow_wh",
        "schema": "atomic",
        "account": "xxxx",
        "username": "xxxx",
        "password": {
            "ec2ParameterStore": {
                "parameterName": "xxxx"
            }
        },
        "maxError": 1,
        "jdbcHost": "xxxx.snowflakecomputing.com",
        "purpose": "ENRICHED_EVENTS"
    }
}

Note that I do see records in DynamoDB manifest table but also notice the column LoadedBy is missing (was for 0.4.2) so it seems like the process did not end.

Also tried to recreate the stage in snowflake DB yet the results were the same.
Will appreciate any help on this

Best,
Shai

2 Likes

@eko,

  1. How does enriched data get into s3://snowplowstream/? Are you using S3 Loader?
  2. Do you get data transformed in s3://snowflake/stage/(in JSON format)?

I’m bit surprised Transformer doesn’t fail with

"--conf", "spark.hadoop.mapreduce.job.outputformat.class=com.snowplowanalytics.snowflake.transformer.S3OutputFormat"

It was removed in 0.8.0.

But generally I agree with Ihor - you first need to check that data is really in s3://snowflake/stage/ (folders have expected format and not empty).