ETL very very slow in larger batches


#1

We had a period where ETL wasn’t run and now trying to play catch up. I’m just rerunning about 842 files with an average of 5 megs per raw file that contains the events. so very small data files. i increased the number of core instances to 8 r3.xlarge hoping that would help get it done. it’s been running for about 15 hours now and utilization is very low on the 8 machines. i know the process prefers smaller batches and i’ll have to cancel and write a script to only give ETL say 50 files at a time before kicking it off if that’s the case. any recommendations or help highly appreciated. also any recommendations on what cores should be based on the list of EC2 instances from amazon here? https://aws.amazon.com/ec2/pricing/on-demand/ like should i try m4.xlarge instead of r3?
Normally we run this hourly so number of files is more like 20-30 at most per hourly run and i can use just one core instance to get that done.

emr config:

# Adjust your Hadoop cluster below
jobflow:
  job_name: Snowplow ETL # Give your job a name
  master_instance_type: m4.large
  core_instance_count: 8
  core_instance_type: r3.xlarge
  task_instance_count: 0 # Increase to use spot instances
  task_instance_type: m1.medium
  task_instance_bid: 0.015 # In USD. Adjust bid, or leave blank for non-spot-priced (i.e. on-demand) task instances
bootstrap_failure_tries: 3 # Number of times to attempt the job in the event of bootstrap failures
additional_info:        # Optional JSON string for selecting additional features


#2

How long did the enrich step take? We’ve found they should be similar in time.

I believe we’ve seen both the enrich and shred steps hang when trying to process larger batches and not trying to optimise the cluster setup. I created a thread this morning here.

If you look around the Application History and Monitoring tabs do you see any signs it might have hung? You seem to be access most of the application logs through Application History. Unfortunately I don’t have enough Spark experience to say what you might be looking for.

Gareth


java.nio.file.FileAlreadyExistsException: ./ip_geo
#3

only 11 minutes for enrich step

looking at container pending it’s definitely moving, just super slow and looking at other graphs barely moving at the speed it should be. and thanks i’ll look at your thread.

IO:


#4

mark, its probably that after processing its trying to move data back from the nodes but the memory
and or cores allocated to you driver(s) is not enough and thats causing such a delay. I had come across a spread sheet that help you figure out the no. of cores and memory based on the cluster size. ping me and i can send it to you ( upload here only allows images )


#5

thanks i will


#6

@bhavin that’s interesting and probably the same problem I saw. This is the discourse post that discusses the Spark optimisation learnings which links to this where the spreadsheet you mention is. We haven’t tried to optimise our current cluster (master: 1xm1.medium, core: 2xm4.4xlarge) so it’s running with ‘out of the box’ AWS settings. Did you get any success with larger batches with an optimised cluster config?


#7

yes, for our custom recovery process i was processing 20G which would inflate to more then 100G and so did see significant improvement in the processing and load distribution across cluster , there are other post floating that suggest that you should only allocate 1 core and certain memory based on the calculation here. but that didn’t work so well for me.

spread sheet I mention above. ( i found it on the net ).

i have run on r3.4xl with 8 node with these settings.

            "Properties":{
              "spark.executor.memory":"36G",
              "spark.driver.memory":"36G",
              "spark.driver.cores":"5",
              "spark.default.parallelism":"115",
              "spark.executor.cores":"5",
              "spark.executor.instances":"23",
              "spark.dynamicAllocation.enabled":"false",
              "spark.yarn.executor.memoryOverhead": "4096",
              "spark.yarn.driver.memoryOverhead": "4096"
            },

and r3.8xl with 6 nodes with these setting

  spark-defaults:
    spark.executor.instances: '63'
    spark.yarn.executor.memoryOverhead: '5120'
    spark.executor.memory: 40G
    spark.yarn.driver.memoryOverhead: '5120'
    spark.driver.memory: 40G
    spark.executor.cores: '6'
    spark.driver.cores: '6'
    spark.default.parallelism: '708'
    spark.dynamicAllocation.enabled: 'false'

#8

@bhavin thanks, trying your config now.


#9

so with bhavin’s new config , and thanks for that @bhavin, it still takes 14 hours to complete. most of the time in shredding step. only took 9 minutes to enrich. is it number of files that’s causing shredding to take so long? again our data is so small. i wrote a script to only move 20-30 files at a time to processing directory from in directory. let’s see how it does. it will just take forever to catch up :frowning: another thing to note. we do have dynamodb de-duplication on.


#10

doing 20 files at a time:

enrich: 1 min
shred: 11 mins
data warehouse load: 38 mins :frowning:


#11

Are you running the global event deduplication which uses a DynamoDB table? if so have you checked that you’re not exceeding the throughput and being throttled? With our 2xm4.4xlarge cluster we’ve set the write throughput to 10,000 (it seems to peak around 9,000). This could cause the behaviour you’re seeing.


#12

yes we have it on. i’ll check thanks,


#13

@bhavin how long did your 20GB input job take? for the enrich and shred steps? also did you add any addition EBS volumes?

Currently I’ve got job which has an enrich step running for 2h20m on 2xm4.4xlarge which has been configured with 3 executors per node according to the spreadsheet. It looks like one of my nodes ran out of disk space


#14

does anyone know at what step dynamodb is used? it’s in the enrich step or shredding step? shredding right?

we have read set to 100 and write set to 100. so it’s default i know and pretty low and might be causing all this.


#15

I can check with Val… stand by.


#16

The DynamoDB lookup is performed in the Shred step. It’s quite easy for that to be throttled if you are doing a big catchup run.


#17

@mjensen look at the metrics tab on your DDB table. You’ll be able to see what happened. You’ll need to adjust the time range to see back in time. It defaults to an hour. We see only write capacity being used not read.


#18

yeah writes are throttling and peaking at about 2500. so we’d have to update write capacity to speed things up for reruns. thanks,


#19

In our experience when Dynamo is heavily throttling it could have used a greater throughput had it been available. You may find you need more throughput than the 2,500 it peaked at.

DynamoDB can be an expensive beast when used with EMR because of the highly parallelism but it’s typically in short bursts (when considered as a fraction of the day). We use it in our own ETL jobs and wrote something to manage the scaling and use something similar for the Snowplow ETL. We end up need throughputs of $4,000-6,000/month throughput for 4x20mins a day, scaling up and down is important! we also found the DDB auto scaler didn’t respond fast enough.


#20

@gareth yes we add 12 volumes of 10G each with 100 iops ( io1 ).

my job for 20gb took about 6 hr , but that was for custom recovery process that i have build and not for the snowplow ETL.
also i do deduping of the event ieds in the custom etl which is the reason 20g gets inflated with 5 - 10 g of event ids to more then 100g.

i am not sure why snowplow etl cant just query redshift and download the event ids and then discard the common ones , like i do that will save the dynamodb lookup