Shredding EMR spark config (IOException: All datanodes ... are bad)

Hi,

After bumping the Shredder and the RDBLoader versions to 1.0.0 in our codebase, we triggered the mentioned apps to shred and load 14 million objects (equaly 15GB of data) onto Redshift (one of the runs has a size of 3.7GB with nearly 4.3 million objects which is exeptionally large). We used a single R5.12xlarge instance on EMR with the following configuration to handle the shredding job:

"configurations": [
  {
    "classification": "spark",
    "configurations": [],
    "properties": {
        "maximizeResourceAllocation": "false"
    }
  },
  {
    "classification": "spark-defaults",
    "configurations": [],
    "properties": {
        "spark.driver.maxResultSize": "0",
        "spark.default.parallelism": "80",
        "spark.driver.cores": "5",
        "spark.driver.memory": "37G",
        "spark.dynamicAllocation.enabled": "false",
        "spark.executor.cores": "5",
        "spark.executor.instances": "8",
        "spark.executor.memory": "37G",
        "spark.yarn.driver.memoryOverhead": "5G",
        "spark.yarn.executor.memoryOverhead": "5G"
    }
  },

Unfortunately, the EMR job failed after 29 hours with the following error:

AM Container for appattempt_1621946407901_0002_000001 exited with exitCode: 15

Failing this attempt.Diagnostics: [2021-05-25 23:07:22.389]Exception from container-launch.

Container id: container_1621946407901_0002_01_000001

Exit code: 15

...
    
ERROR DFSClient: Failed to close file: /var/log/spark/apps/application_1621946407901_0002_1.inprogress with inode: 16493
java.io.IOException: All datanodes [DatanodeInfoWithStorage[11.222.232.28:9866,DS-768926aa-41b9-4e38-acf6-c67a57cf70e1,DISK]] are bad. Aborting... 

Beside, we found another issue in the application log which was a bit concerning:

Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 45.1 in stage 13147.0 (TID 19308180) can not write to output file: org.apache.hadoop.fs.FileAlreadyExistsException: File already exists:s3://****/run=2021-04-30-19-31-38/output=good/vendor=com.snowplowanalytics.snowplow/name=atomic/format=tsv/model=1/part-00045-f677cd81-8444-43cb-9efe-2cd518cec43d.c000.txt.gz  

Following up on this issue, we checked the RDBLoader logs and we found many errors similar to this one:

ERROR 2021-05-27 07:40:05.878: Folder [s3://****/run=2021-04-11-10-51-48/] is already loaded at 2021-05-26T12:39:00Z. Aborting the operation, acking the command

It seems for some reason the EMR job is re-shredding the files which were shredded earlier, in the same job. Now, my questions are as follow:

  1. Why does it seem like that spark is redoing part of the job as if it has no clue it has done it before?
  2. Why does it take so long to shred this amount of data? Can it be related to the previous question?
  3. How to avoid the abovementioned situation?
  4. Is there a place where we can find reasonable spark configs plus EC2 choices for different amount of data to be shredded?

Hi @dadasami,

Sorry to hear you’ve ran into this issue. Before proceeding, may I ask if your pipeline was running EmrEtlRunner + R34 (or earlier) Loader or R35 before the upgrade?

The reason I’m asking is because the discovery mechanism has changed significantly since EmrEtlRunner. R35 and 1.0.0 use a special shredding_complete.json file to indicate that a folder has been shredded. If you upgraded from R34 and haven’t changed the archive path all folders you had in enriched archive will be considered unprocessed.

Sure! And thanks for your reply @anton . R35 was in use before the upgrade.

One more point that might be important to mention is that we deleted all previously shredded data, and dropped the Redshift atomic schema before the upgrade. The reason for that was the new change in the structure of the shredder output bucket and assuming that the old shredded data cannot be identified by the new shredder. In case our assumption was wrong, it might be helpful also to others if you add this point to the documentation.

I agree. We have an explanation of this mechanism in the R35 Upgrade guide and caution in 1.0.0, but I also agree it’s easy to not notice them - I’ll make it as visible as possible. Sorry again for your trouble.

On rest of your questions:

  1. Yes, it’s certainly related
  2. A few advices:
    a. Keep your enriched and shredded archive consistent - discovery mechanism uses both to determine folders it needs to process
    b. Keep pre-R35 archives in a different place, i.e. internally we use v1 prefix for new R35 archive and will move over to v2 when something changes again
    c. Don’t foget about the manifest table it protects you from double-loading (and the reason of those “already loaded at 2021-05-26T12:39:00Z. Aborting the operation, acking the command” messages)
  3. We internally still use this 2016 guide Apache Spark: Config Cheatsheet
2 Likes

No problem! And thanks a lot for taking your time and answering the questions.

Unfortunately, I still do not understand why the EMR job failed after all. Initially, we started with an empty Redshift and an empty shredded archive. If I am not mistaken the shredder could realize that it has to shred the entire enriched archive. And indeed, it tried to shred the entire enriched archive. The issue arises where the shredder tries to process the same enriched runs twice (or maybe multiple times) in the same EMR run.

The new RDBLoader creates the manifest table automatically, and probably that is how it could catch those errors when it received the same sqs message twice. Yet, the question is why Shredder is running over files more than once, at all?

Is that because of bad spark configuration? Same batch being assigned to multiple executors for any reason?

@anton, To me it seems the discovery mechanism is not working after the update (and possibly the shredder processes the entire enriched archive everytime). Versioning is enabled for our shredded archive bucket and on every EMR run we get a new version of shredding_complete.json plus a new version of _SUCCESS.

Hi @dadasami , can you confirm that you never have several shredder jobs running at a time, and that they run sequentially ?

Hi @BenB. I do; the old EMR cluster gets terminated automatically as soon as the new one is launched.
The only chance that I see for parallel runs is in case that EMR requires a cooldown time after termination.

Could you please share the content of stdout for the driver for a Spark job that processed several times the same run folder ? You can find it in the Executors tab in EMR UI :

2021-05-27-170619_516x487_scrot

Of course @BenB, Please find the content of the file at the end of this comment. I would like to mention couple of points related to this file:

  1. The list is much shorter than expected and it has to contain runs until May 25.
  2. I can say for a fact that the shredder job actually processed more runs than the ones on the list. (Can it be that the list got refreshed at some point?)
  3. We ran the shredder job again and the new job process the same runs, eventhough many of them had been shredded already. In fact, it seems that the shredder fetch the entire list of the runs and ignores shredding_complete.json (this is just my speculation).

stdout:
RDB Shredder: processing s3://sp-enriched-???/archive/run=2021-03-19-12-58-05/
RDB Shredder: processing s3://sp-enriched-???/archive/run=2021-03-21-04-34-22/
RDB Shredder: processing s3://sp-enriched-???/archive/run=2021-03-21-07-59-01/
RDB Shredder: processing s3://sp-enriched-???/archive/run=2021-03-21-10-09-41/
RDB Shredder: processing s3://sp-enriched-???/archive/run=2021-03-21-12-11-25/
RDB Shredder: processing s3://sp-enriched-???/archive/run=2021-03-21-17-01-33/
RDB Shredder: processing s3://sp-enriched-???/archive/run=2021-03-21-19-04-56/
RDB Shredder: processing s3://sp-enriched-???/archive/run=2021-03-21-21-26-31/
RDB Shredder: processing s3://sp-enriched-???/archive/run=2021-03-21-23-31-26/
RDB Shredder: processing s3://sp-enriched-???/archive/run=2021-03-22-01-49-36/
RDB Shredder: processing s3://sp-enriched-???/archive/run=2021-03-22-03-50-16/
RDB Shredder: processing s3://sp-enriched-???/archive/run=2021-03-22-06-03-44/
RDB Shredder: processing s3://sp-enriched-???/archive/run=2021-03-22-08-22-25/
RDB Shredder: processing s3://sp-enriched-???/archive/run=2021-03-22-08-40-12/
RDB Shredder: processing s3://sp-enriched-???/archive/run=2021-03-22-09-18-38/
RDB Shredder: processing s3://sp-enriched-???/archive/run=2021-03-22-11-22-43/
RDB Shredder: processing s3://sp-enriched-???/archive/run=2021-03-22-12-35-02/
RDB Shredder: processing s3://sp-enriched-???/archive/run=2021-03-22-14-46-09/
RDB Shredder: processing s3://sp-enriched-???/archive/run=2021-03-22-16-45-35/
RDB Shredder: processing s3://sp-enriched-???/archive/run=2021-03-22-19-11-14/
RDB Shredder: processing s3://sp-enriched-???/archive/run=2021-03-22-21-38-56/
RDB Shredder: processing s3://sp-enriched-???/archive/run=2021-03-23-00-06-47/
RDB Shredder: processing s3://sp-enriched-???/archive/run=2021-03-23-02-37-25/
RDB Shredder: processing s3://sp-enriched-???/archive/run=2021-03-23-04-38-00/
RDB Shredder: processing s3://sp-enriched-???/archive/run=2021-03-23-06-45-00/
RDB Shredder: processing s3://sp-enriched-???/archive/run=2021-03-23-09-11-45/
RDB Shredder: processing s3://sp-enriched-???/archive/run=2021-03-23-11-14-04/
RDB Shredder: processing s3://sp-enriched-???/archive/run=2021-03-23-13-38-44/
RDB Shredder: processing s3://sp-enriched-???/archive/run=2021-03-23-16-11-19/
RDB Shredder: processing s3://sp-enriched-???/archive/run=2021-03-23-18-40-15/
RDB Shredder: processing s3://sp-enriched-???/archive/run=2021-03-23-20-45-11/
RDB Shredder: processing s3://sp-enriched-???/archive/run=2021-03-23-23-10-05/
RDB Shredder: processing s3://sp-enriched-???/archive/run=2021-03-24-01-37-03/
RDB Shredder: processing s3://sp-enriched-???/archive/run=2021-03-24-03-37-58/
RDB Shredder: processing s3://sp-enriched-???/archive/run=2021-03-24-05-35-47/
RDB Shredder: processing s3://sp-enriched-???/archive/run=2021-03-24-07-37-40/
RDB Shredder: processing s3://sp-enriched-???/archive/run=2021-03-24-09-41-25/
RDB Shredder: processing s3://sp-enriched-???/archive/run=2021-03-24-11-55-36/
RDB Shredder: processing s3://sp-enriched-???/archive/run=2021-03-24-13-19-03/
RDB Shredder: processing s3://sp-enriched-???/archive/run=2021-03-24-15-46-31/
RDB Shredder: processing s3://sp-enriched-???/archive/run=2021-03-24-17-51-44/
RDB Shredder: processing s3://sp-enriched-???/archive/run=2021-03-24-19-58-20/
RDB Shredder: processing s3://sp-enriched-???/archive/run=2021-03-24-22-03-25/
RDB Shredder: processing s3://sp-enriched-???/archive/run=2021-03-25-00-27-12/
RDB Shredder: processing s3://sp-enriched-???/archive/run=2021-03-25-02-23-45/
RDB Shredder: processing s3://sp-enriched-???/archive/run=2021-03-25-04-38-30/
RDB Shredder: processing s3://sp-enriched-???/archive/run=2021-03-25-06-48-13/
RDB Shredder: processing s3://sp-enriched-???/archive/run=2021-03-25-08-48-24/
RDB Shredder: processing s3://sp-enriched-???/archive/run=2021-03-25-11-11-03/
RDB Shredder: processing s3://sp-enriched-???/archive/run=2021-03-25-13-10-08/
RDB Shredder: processing s3://sp-enriched-???/archive/run=2021-03-25-15-29-12/
RDB Shredder: processing s3://sp-enriched-???/archive/run=2021-03-25-17-57-41/
RDB Shredder: processing s3://sp-enriched-???/archive/run=2021-03-25-19-38-23/
RDB Shredder: processing s3://sp-enriched-???/archive/run=2021-03-25-21-44-54/
RDB Shredder: processing s3://sp-enriched-???/archive/run=2021-03-25-23-56-40/
RDB Shredder: processing s3://sp-enriched-???/archive/run=2021-03-26-02-02-24/
RDB Shredder: processing s3://sp-enriched-???/archive/run=2021-03-26-04-11-11/
RDB Shredder: processing s3://sp-enriched-???/archive/run=2021-03-26-06-21-01/
RDB Shredder: processing s3://sp-enriched-???/archive/run=2021-03-26-08-41-33/
RDB Shredder: processing s3://sp-enriched-???/archive/run=2021-03-26-10-41-14/
RDB Shredder: processing s3://sp-enriched-???/archive/run=2021-03-26-12-33-27/

Hey @dadasami ,

The shredder does not check the existence of shredding_complete.json to infer what needs to get shredded, what it does is that it lists all the folders in enriched/ and all the folders in shredded/ and then it shreds all the folders that are in enriched/ but not in shredded/.

It uses shredding_complete.json only to print in the logs that some past runs were not successful.

I wonder if this could have something to do with the problem, but reading AWS documentation, it doesn’t seem so.

You see that because new run folders appear in shredded/ but don’t appear in stdout ? The list is append-only and each time a folder is processed it’s added.

This is very puzzling. Would you be able to share this information :

  1. No shredder is running
  2. List the whole content of enriched/
  3. List the whole content of shredded/
  4. Run shredder and wait for it to finish
  5. List the whole content of shredded/
  6. Show the content of stdout for the driver

please ? This will help us to narrow down the issue.

2 Likes

Hey @BenB ,

Thanks for looking into this. Unfortunately, I cannot provide you with the information you asked for. The bug was causing a down-time on our prod and we had to revert to R35.

Yes, there were new run folders added to the shredded/ with shredding_complete.json but didn’t appear on stdout. In fact, (and perhaps one last bit of information that I can give) in a number of different EMR runs that I checked there was only a single run directory written to the stdout, but the job was triggering many sqs messages for the already shredded run directories.

Excuse me if I cannot support with deeper investigation at the moment. Still I’d be happy to report further observations if necessary.

Hello again, @dadasami,

I said I do know what happened, but in fact I just have a hypothesis that can explain some observations.

This is the key observation:

there was only a single run directory written to the stdout, but the job was triggering many sqs messages

Basically, no directory can get to stdout without triggering an SQS message first. That’s for sure - each step in ShredJob is blocking. What can happen though is your job failing in the middle of archive processing and attempting to re-process the archive, but at the end you always see stdout only for last attempt.

This hypothesis still cannot explain several other observations (i.e. at what point does it overwrite the shredding_complete.json), but I have few suggestions that potentially can unblock you.

  1. Make sure you’ve set Spark max-attempts to 1: hadoop yarn - How to limit the number of retries on Spark job failure? - Stack Overflow
  2. You initially mentioned you’re using r5.12xlarge to process 15GB of data, but I didn’t realise these 15GB consist of hundreds of small folders. In order to process small folders you don’t need a big cluster as every folder (let’s say 10Mb each) is processed one by one.
  3. You can try to combine your small folders into fewer bigger ones (by month or by 10 days) with S3DistCp --groupBy: amazon emr - How to EMR S3DistCp groupBy properly? - Stack Overflow. That would make your EMR job more efficient and easier to spot at what exact folder does the job fail. If you’re going to do this - make sure you’ve copied your archive, not moved and that data inside each folder has the same structure (no folders inside folders). In this case a beefier cluster would also be more appropriate, if each folder would be reaching hundreds of megabytes.
1 Like

Hello again @anton,

I’m glad that you could take a look into this. I have some comments on your suggestion:

  1. What happens if max-attempts is set to 1 and the job fails? Our observation suggests that the amount of failures could be potentially high. Unfortunately, it is not easy to figure out whether the failures were happening on the first attempt or only while overwriting.

  2. We had to use r5.12xlarge because our first run folder is quite large (4GB). The reason is that we ran S3DistCp job long after setting up the S3 loader bucket and as such, a lot of data got accumulated on the first run. We’ll switch back to a smaller instance, as soon as this folder is shredded and loaded to Redshift. Any idea on how to partition this folder into smaller ones will save us a lot of effort (Of course, as part of a CICD workflow). In fact, this large run might be a potential source for the observed issues.

Please let me know if you could come up with an idea that explains the shredding_complete.json overwrite.