ETL Shred step taking longer and longer


#1

Hi All,

We have been using Snowplow for a while now and have just started adding some more events that have been increasing our volume significantly.

One thing we have noticed is that the shred step is taking longer and longer, when we expected the enrich step to be the time consuming one.

Our cluster config is:

Master: 1 x m1.medium
Core: 3 x i2.2xlarge
Task: 40 x m3.2xlarge

The enrich step took 1h59m and the shred step took 6h48m, is there something about our config that is causing this? or is it normal?

The files they are processing are from the stream collector, approx 40mb compressed per batch, this was around 24 hours or so worth of data as we had an issue on the previous run.

Not sure if I need to change the cluster config to optimize speed or something.

Cheers,
Dean


ETL Shred is consistently failing
#2

Hi @lookaflyingdonkey - my first thought is that Shred is struggling with the sheer number of files it needs to generate:

  • You recently added new event types, so there are more files to process
  • You have a very large cluster - 43 boxes

To validate this hypothesis - can you check in your shredded:good folder in S3 for a run and share with us the total number of files and bytes for that run (s3 ls s3://foo/bar --summarize).


#3

So I would say that is it, I did however up the EMR cluster from m1.medium as the core servers to the i2.2xlarge and went from 5 to 40 task servers.

Would I be better off running less, more powerful task servers?

I am in the middle of finishing a storage load (it is taking hours to do the final archive moves), this was for around 72m events.

Below is the summary for the partially moved (been running for about 3 hours) files.

Total Objects: 42960
Total Size: 27624360922

Cheers,
Dean


#4

Right - when you are approaching six-figures worth of files, that’s causing problems. Opt for fewer, more powerful servers, and more frequent runs.


#5

Great, once this storage load is complete I will try the last 24 hours with less task nodes, I usually try to run every 12 hours but we got backlogged due to a failed EMR job and now it is snowballing!

What would the optimal number of nodes/instance types be? Are the EMR jobs CPU, Memory or disk bound? the C4 and I2 have similar CPU capabilities but I2 double the RAM and a NVMe drive, but also at a much higher cost. If they are disk bound then these should certainly speed up and allow to run less task nodes.

Cheers,
Dean


#6

@lookaflyingdonkey I’d recommend you not to use task instances to avoid losing events, per this issue on GitHub.


#7

Thanks for the heads up @bernardosrulzon , that does look like a strange bug you found there! I will try with just core nodes and beef those up to see if having less files helps.

Will also subscribe to that bug as with the number of events we are tracking we will save alot of $$ with spot instances


#8

Hi @lookaflyingdonkey,

Another advice for you. The i2 instances are quite slow. Try replacing 3 x i2.2xlarge with 3 x c3.8xlarge unless you require lots of storage in which case you could use c4 instances with the amount of (EBS) storage you need, provided you run pipeline version r87.

The cost of EC2/EMR should be comparable but the performance will be greatly increased.


#9

Thanks @ihor, that was the information I needed, I wasn’t sure if the shred was bound by CPU or disk, but by the sounds of it they are CPU based when dealing with less, but larger, files.

Still waiting for this archive step of storage loader to finish (been over 5 hours now!), I was reading about skipping that and doing the archive in parallel with the AWS CLI directly, may have to give that a go.

I will let you all know the results once I get to run the next batch.

Cheers,
Dean


#10

That has greatly improved performance here.

Also, any chance your shredded/good bucket is a different region than your Redshift cluster?


#11

So the loader ended up failing with the archiving :frowning: SO I am just manually moving the last few shredded folders.

Do you mean a different region than the cluster? If so then no they are both in eu-west-1

Cheers,
Dean


#12

So after all the archiving hassle the final summary is

Total Objects: 80874
Total Size: 47490835931

So ~47gb in 80k files…thats alot of text data!

Now to see how the reconfigured EMR goes


#13

Hmm, error in S3DistCp RAW

Caused by: org.apache.hadoop.mapreduce.lib.input.InvalidInputException: Input path does not exist: hdfs://ip-10-10-17-13.eu-west-1.compute.internal:8020/tmp/89e087b6-0fb9-4236-b40b-284729b1bef1/files


#14

:rolling_eyes: forgot I had skipped staging on the last run as it error’d out…


#15

So after 6h25m the job failed, 4h20m was in shredding and it was the shredding step that failed

2017-03-28 06:15:20,504 INFO [communication thread] org.apache.hadoop.mapred.Task: Communication exception: java.io.EOFException: End of File Exception between local host is: “ip-10-98-33-101.eu-west-1.compute.internal/10.98.33.101”; destination host is: “ip-10-98-33-101.eu-west-1.compute.internal”:49432; : java.io.EOFException; For more details see: http://wiki.apache.org/hadoop/EOFException
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57)
at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:526)
at org.apache.hadoop.net.NetUtils.wrapWithMessage(NetUtils.java:792)
at org.apache.hadoop.net.NetUtils.wrapException(NetUtils.java:765)
at org.apache.hadoop.ipc.Client.call(Client.java:1479)
at org.apache.hadoop.ipc.Client.call(Client.java:1412)
at org.apache.hadoop.ipc.WritableRpcEngine$Invoker.invoke(WritableRpcEngine.java:250)
at com.sun.proxy.$Proxy8.statusUpdate(Unknown Source)
at org.apache.hadoop.mapred.Task$TaskReporter.run(Task.java:768)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.EOFException
at java.io.DataInputStream.readInt(DataInputStream.java:392)
at org.apache.hadoop.ipc.Client$Connection.receiveRpcResponse(Client.java:1084)
at org.apache.hadoop.ipc.Client$Connection.run(Client.java:979)

2017-03-28 06:15:22,224 INFO [EventFetcher for fetching Map Completion Events] org.apache.hadoop.ipc.Client: Retrying connect to server: ip-10-98-33-101.eu-west-1.compute.internal/10.98.33.101:49432. Already tried 0 time(s); retry policy is RetryUpToMaximumCountWithFixedSleep(maxRetries=10, sleepTime=1000 MILLISECONDS)

This was with 3 x c3.8xlarge, so the enrich step was quite fast but shredding still taking a long long time.

Any ideas?


#16

Could you publish a screenshot of the monitoring charts ( IO tab ) ?
Do you have optimizely contexts enabled ?


#17

Sure, see below


I am not running the optimizely contexts either.


#18

I also just re ran skipping staging and enrichment and with 6 x c3.8xlarge

It successfully completed in 2h25m

But I am still unsure why the original job failed


#19

Did you recently add any new self-described event models?


#20

Not at the same time these jobs started having issues, we added some a week or so ago, I think it is just the volumes it is struggling with.

I just ran 12 hours worth (27k shredded files totaling 14gb) and it completed successfully in 2h11m with 6 x c3.8xlarge, the enrich step took 29m and the shred took 1h19m

I always thought the shred would be faster than enrich but it seems the other way now.

I will try to run another batch in 12 hours or so and see if it continues to be stable, if we ever have a failed job where data starts to back up I may setup a second EMR environment so I can run 2 batches side by side.