Snowflake Loader/Dataflow Runner using persistent cluster instead of new ones

Hi there, recently we’ve got a large increase in data volume which in turn meant that our EMR cluster for snowflake loader setup is close to running at 100%. Right now, every hour we would schedule a Dataflow Runner job that springs up a new EMR cluster to transform and load 1 hour of data into Snowflake.

But the problem is that there is about 5 minutes of time spent on starting clusters, which is rather unnecessary. A better way would be to leverage a persistent EMR cluster, and keep adding new steps to it.

I had a look at Snowflake Loader and Dataflow Runner documentation, and couldn’t find where I can configure it to do this. It seems like cluster.json file is designed to spring up new EMR clusters rather than using existing ones.

Does anybody know this might be achieved? Many thanks in advance.

@pat, there are actually 2 tasks you would need to consider:

  1. Ensure your EMR cluster and Spark job is configured for the payload of data you are processing
  2. Decide whether to run the EMR cluster persistently or in transient mode

Typically, you would choose a persistent cluster when you need so-called “drip-feeding” - load the data to Snowflake DB, say, every 10-20 minutes. If hourly update is acceptable it might be cheaper to run the job in transient mode; you just need to configure the cluster and Spark appropriately.

To run the job on a persistent cluster you would use run command (in place of run-transient) and submit the EMR jobflow ID with --emr-cluster option. The rest is the same. This does imply the EMR cluster has to be up and running prior to running your Snowflake job which could be done with up command. You can read more about Dataflow Runner here.

Thank you, this is what I was looking for, had some trouble figuring out how to add steps to a persistent cluster.

To share more context, we are doing hourly update and at the beginning it was suiting us fine, processing one hour of data only needed 10 minutes, add 5 minutes of cluster starting time on top that’s only 15 minutes. 1/4 of time utilisation, which meant run-transient makes sense.

But now our volume has grown, and it is almost taking 1 hour for an EMR cluster to process 1 hour of data, so we would have 24 clusters started every day, that’s running close to 1 hour for each job. In this sense we essentially would have an EMR cluster that’s running 24/7. This is where we can leverage reserved instances, and cut some cost by reducing EMR cluster start time. 5 minutes of start time * 24 = 120 minutes = 2 hours of compute time each day wasted.

@pat, I strongly advise you to review your EMR cluster configuration. It depends on the volume of data you process. Below is just an example for some moderate volume and you would need to adjust it depending on your volume. Again, it’s not just the size of the cluster but also Spark configuration that matters. When correctly configured, it helps to utilize the resources more efficiently.

{
  "instances": {
    "master": {
      "type": "m4.large"
    },
    "core": {
      "type": "r5.8xlarge",
      "count": 1,
      "ebsConfiguration": {
        "ebsOptimized": true,
        "ebsBlockDeviceConfigs": [
          {
            "volumesPerInstance": 1,
            "volumeSpecification": {
              "iops": 1500,
              "sizeInGB": 60,
              "volumeType": "io1"
            }
          }
        ]
      }
    },
    "task": {
      "type": "m4.large",
      "count": 0,
      "bid": "0.015"
    }
  },
  "configurations": [
    {
      "classification": "yarn-site",
      "properties": {
        "yarn.nodemanager.vmem-check-enabled": "false",
        "yarn.nodemanager.resource.memory-mb": "256000",
        "yarn.scheduler.maximum-allocation-mb": "256000"
      }
    },
    {
      "classification": "spark",
      "properties": {
        "maximizeResourceAllocation": "false"
      }
    },
    {
      "classification": "spark-defaults",
      "properties": {
        "spark.dynamicAllocationEnabled": "false",
        "spark.executor.instances": "9",
        "spark.yarn.executor.memoryOverhead": "3072",
        "spark.executor.memory": "22G",
        "spark.executor.cores": "3",
        "spark.yarn.driver.memoryOverhead": "3072",
        "spark.driver.memory": "22G",
        "spark.driver.cores": "3",
        "spark.default.parallelism": "108"
      }
    }
  ]
}

Yes, we are using the spark configuration cheatsheet for our setup. When looking at your example configuration, it looks like you tweaked the spark.default.parallelism, can you explain some reasoning behind this?
For example, the recommended spark.default.parallelism is 54 by default but you had 108.

I have also attached a copy of our spark configuration for reference:

   "schema":"iglu:com.snowplowanalytics.dataflowrunner/ClusterConfig/avro/1-1-0",
   "data":{
      "name":"dataflow-runner-snowflake-transformer",
      "logUri":"",
      "region":"",
      "credentials":{
      },
      "roles":{
         "jobflow":"EMR_EC2_DefaultRole",
         "service":"EMR_DefaultRole"
      },
      "ec2":{
         "amiVersion":"5.13.0",
         "keyName":"snowplow-prod-snowflake",
         "location":{
         },
         "instances":{
            "master":{
               "type":"m4.large"
            },
            "core":{
               "type":"r5.xlarge",
               "count":1
            },
            "task":{
               "type":"m4.large",
               "count":0,
               "bid":"0.015"
            }
         }
      },
      "tags":[ ],
      "bootstrapActionConfigs":[ ],
      "configurations":[
         {
            "classification":"core-site",
            "properties":{
               "Io.file.buffer.size":"65536"
            }
         },
         {
            "classification":"mapred-site",
            "properties":{
               "Mapreduce.user.classpath.first":"true"
            }
         },
         {
            "classification":"yarn-site",
            "properties":{
               "yarn.resourcemanager.am.max-attempts":"1",
               "yarn.nodemanager.vmem-check-enabled": "false",
               "yarn.nodemanager.resource.memory-mb":"28416",
               "yarn.scheduler.maximum-allocation-mb":"28416"
            }
         },
         {
            "classification":"spark",
            "properties":{
               "maximizeResourceAllocation":"false"
            }
         },
         {
          "classification":"spark-defaults",
          "properties":{
             "spark.dynamicAllocation.enabled":"false",
             "spark.executor.instances": "2",
             "spark.executor.memoryOverhead": "2048",
             "spark.executor.memory": "7G",
             "spark.driver.memoryOverhead": "2048",
             "spark.driver.memory": "7G",
             "spark.executor.cores": "1",
             "spark.driver.cores": "1",
             "spark.default.parallelism": "4"
          }
       }
      ],
      "applications":[ "Hadoop", "Spark" ]
   }
}

@pat, the parallelism was calculated as spark.executor.instances * spark.executor.cores * “Parallelism Per Core”. This comes from the recommendation in this post.