Upgrade shredder from 0.19.0 to 2.0.0 decoding failure in shredding job

Hi,

we are currently upgrading out shredder and rdbloader from 0.19.0 to 2.0.0.

We have been using this playbook.json

 {
        "type": "CUSTOM_JAR",
        "name": "RDB Shredder",
        "actionOnFailure": "CANCEL_AND_WAIT",
        "jar": "command-runner.jar",
        "arguments": [
            "spark-submit",
            "--class", "com.snowplowanalytics.snowplow.rdbloader.shredder.batch.Main",
            "--master", "yarn",
            "--deploy-mode", "cluster",
            "s3://snowplow-hosted-assets-eu-central-1/4-storage/rdb-shredder/snowplow-rdb-shredder-2.0.0.jar",
            "--iglu-config", "{{base64File "resolver.json"}}",
            "--config", "{{base64File "config.hocon"}}"
        ]
      }

We have already changed the class name to rdbloader.shredder.batch.Main.

However, the shredding job now fails at decoding the config files:


- Unable to parse the configuration: DecodingFailure at .input: String.

Usage: snowplow-rdb-shredder-2.0.0 --iglu-config <<base64>> [--duplicate-storage-config <<base64>>] --config <config.hocon>

Spark job to shred event and context JSONs from Snowplow enriched events

Options and flags:
    --help
        Display this help text.
    --iglu-config <<base64>>
        Base64-encoded Iglu Client JSON config
    --duplicate-storage-config <<base64>>
        Base64-encoded Events Manifest JSON config
    --config <config.hocon>, -c <config.hocon>
        base64-encoded config HOCON

We are using the new config templates for shredder and rdbloader.

{
  "name": "myapp",
  "id": "4113ba83-2797-4436-8c92-5ced0b8ac5b6",

  "region": "eu-west-1",
  "messageQueue": "SQS_QUEUE",

  "shredder": {
    "input": "SP_ENRICHED_URI",
    "output": "SP_SHREDDED_URI"
    "compression": "GZIP"
  },

  "formats": {
    "default": "JSON",
    "json": [ ],
    "tsv": [ ],
    "skip": [ ]
  },

  "jsonpaths": "s3://snowplow-schemas-STAGE/jsonpaths/",

  "storage": {
    "type": "redshift",
    "host": "REDSHIFT_ENDPOINT",
    "database": "sp_redshift_database",
    "port": 5439,
    "roleArn": "arn:aws:iam::AWS_ACCOUNT_NUMBER:role/REDSHIFTLOADROLE",
    "schema": "atomic",
    "username": "REDSHIFT_USER_NAME",
    "password": "DB_REDSHIFT_PASSWORD",
    "jdbc": {"ssl": true},
    "maxError": 10,
    "compRows": 100000
  },

  "steps": ["analyze"],

  "monitoring": {
    "snowplow": null,
    "sentry": null
  }
}

and a resolver.json




{
  "schema": "iglu:com.snowplowanalytics.iglu/resolver-config/jsonschema/1-0-0",
  "data": {
    "cacheSize": 500,
    "repositories": [
      {
        "name": "S3-schemas-registry",
        "priority": 0,
        "vendorPrefixes": ["com.,myapp"],
        "connection": {
          "http": {
            "uri": "SP_SCHEMA_URI"
          }
        }
      },
      {
        "name": "Iglu Central",
        "priority": 1,
        "vendorPrefixes": [ "com.snowplowanalytics" ],
        "connection": {
          "http": {
            "uri": "http://iglucentral.com"
          }
        }
      },
      {
        "name": "Iglu Central - Mirror 01",
        "priority": 2,
        "vendorPrefixes": ["com.snowplowanalytics"],
        "connection": {
          "http": {
            "uri": "http://mirror01.iglucentral.com"
          }
        }
      }
    ]
  }
}

  • We are using a static schema repo hosted on s3? Do we need to use iglu-server?
  • Is --duplicate-storage-config <>
    Base64-encoded Events Manifest JSON config optional?

Hi @mgloel,

The problem that you have is with config HOCON. I think yours is a bit outdated. It’s for 1.x series, whereas 2.x has separated Shredder and Loader configs. You can find minimal and reference (full) example in our repo or follow the upgrade guide. Also you might be interested in recent 2.1.0 release, although most of its changes are in Loader, not Shredder.

Another potetnial problem is with resolver - it seems you don’t have any Iglu Servers, which might be ok if you use only Snowplow-authored schemas. But if you have any custom schemas that need to be migrated you’d need an Iglu Server with them. More info here:

Is --duplicate-storage-config <>
Base64-encoded Events Manifest JSON config optional?

Yes, it is optional.

1 Like

Ah, ok, that’s what we thought. Unfortunately, we do have custom schemas in that s3 repo but we were planning to set up iglu server soon. Thanks a lot!

On the other hand, if you shred your data with custom schemas into JSON (and have JSONPath files for them) - you also don’t need an Iglu Server. Again, the above link should have all info.

1 Like

We are using the new configs now and realised that rdbloader config does not have a format field any more. When we are running the rdbloaderit is complaining about non existent tsv files.

Can we just add back the format field with json as a value for the rdbloader 2.0.0?

  1. On format - that’s correct, it’s in shredder’s config. The reason is that it’s Shredder that decides what format data should be in. And then Shredder sends an SQS message telling Loader what format it has produced. In other words, Loader finds out about format from Shredder, not from config (it’s always been the case)
  2. On the error you’re seeing. I think the problem that all your data went into a shredded bad bucket. Loader knows there was some data with this schema, but cannot find it on S3 because it’s in bad bucket - have a look there, but likely it’s because of missing Iglu Server
1 Like

There is neither a bad nor good folder in our shredded bucket anymore. It ends up like this:

The folder content looks like this

run=2022-01-11-02-26-08/
 _SUCCESS
  shredding_complete.json
  output=good/
    vendor=com.myapp/
     name=my_context/
       format=json/
          model=1/

Not sure I understand. Is there anything inside model=1? In particular, what’s in run=2022-01-15-09-48-52/ - the one that caused original exception.

The folder exists and is filled with data. (see at the bottom)

.
β”œβ”€β”€ _SUCCESS
β”œβ”€β”€ output=good
β”‚ β”œβ”€β”€ vendor=com.myapp
β”‚ β”‚ β”œβ”€β”€ name=charging_event
β”‚ β”‚ β”‚ └── format=json
β”‚ β”‚ β”‚ └── model=1
β”‚ β”‚ β”‚ β”œβ”€β”€ part-00108-11d68beb-d848-48b2-bd2c-9a856df2e783.c000.txt.gz
β”‚ β”‚ β”‚ β”œβ”€β”€ part-00273-02887f65-56d8-491b-a910-c5036f9e6e4a.c000.txt.gz
β”‚ β”‚ β”‚ └── part-00486-03e5d079-3547-4823-96b1-502a28970d05.c000.txt.gz
β”‚ β”‚
…
β”‚ └── vendor=com.snowplowanalytics.snowplow
β”‚ └── name=atomic
β”‚ └── format=tsv
β”‚ └── model=1
β”‚ β”œβ”€β”€ part-00024-6f998800-860c-45fe-9c46-c6612772cb01.c000.txt.gz
β”‚ β”œβ”€β”€ part-00031-fc381c9f-defe-44c9-bc63-88189060ccd8.c000.txt.gz
β”‚ β”œβ”€β”€ part-00033-6f998800-860c-45fe-9c46-c6612772cb01.c000.txt.gz
β”‚ β”œβ”€β”€ part-00039-73aeb99f-be3e-4fb0-8ecf-6d5a542187c2.c000.txt.gz
β”‚ β”œβ”€β”€ part-00063-fc381c9f-defe-44c9-bc63-88189060ccd8.c000.txt.gz
β”‚ β”œβ”€β”€ part-00095-fc381c9f-defe-44c9-bc63-88189060ccd8.c000.txt.gz
β”‚ β”œβ”€β”€ part-00099-73aeb99f-be3e-4fb0-8ecf-6d5a542187c2.c000.txt.gz
β”‚ β”œβ”€β”€ part-00106-73aeb99f-be3e-4fb0-8ecf-6d5a542187c2.c000.txt.gz
β”‚ β”œβ”€β”€ part-00108-6f998800-860c-45fe-9c46-c6612772cb01.c000.txt.gz
β”‚ β”œβ”€β”€ part-00170-6f998800-860c-45fe-9c46-c6612772cb01.c000.txt.gz
β”‚ β”œβ”€β”€ part-00202-73aeb99f-be3e-4fb0-8ecf-6d5a542187c2.c000.txt.gz
β”‚ β”œβ”€β”€ part-00273-fc381c9f-defe-44c9-bc63-88189060ccd8.c000.txt.gz
β”‚ β”œβ”€β”€ part-00321-6f998800-860c-45fe-9c46-c6612772cb01.c000.txt.gz
β”‚ β”œβ”€β”€ part-00345-6f998800-860c-45fe-9c46-c6612772cb01.c000.txt.gz
β”‚ β”œβ”€β”€ part-00346-fc381c9f-defe-44c9-bc63-88189060ccd8.c000.txt.gz
β”‚ β”œβ”€β”€ part-00444-fc381c9f-defe-44c9-bc63-88189060ccd8.c000.txt.gz
β”‚ β”œβ”€β”€ part-00450-6f998800-860c-45fe-9c46-c6612772cb01.c000.txt.gz
β”‚ β”œβ”€β”€ part-00450-73aeb99f-be3e-4fb0-8ecf-6d5a542187c2.c000.txt.gz
β”‚ β”œβ”€β”€ part-00471-73aeb99f-be3e-4fb0-8ecf-6d5a542187c2.c000.txt.gz
β”‚ β”œβ”€β”€ part-00477-73aeb99f-be3e-4fb0-8ecf-6d5a542187c2.c000.txt.gz
β”‚ β”œβ”€β”€ part-00486-73aeb99f-be3e-4fb0-8ecf-6d5a542187c2.c000.txt.gz
β”‚ β”œβ”€β”€ part-00495-fc381c9f-defe-44c9-bc63-88189060ccd8.c000.txt.gz
β”‚ β”œβ”€β”€ part-00501-73aeb99f-be3e-4fb0-8ecf-6d5a542187c2.c000.txt.gz
β”‚ β”œβ”€β”€ part-00507-fc381c9f-defe-44c9-bc63-88189060ccd8.c000.txt.gz
β”‚ β”œβ”€β”€ part-00508-6f998800-860c-45fe-9c46-c6612772cb01.c000.txt.gz
β”‚ β”œβ”€β”€ part-00519-6f998800-860c-45fe-9c46-c6612772cb01.c000.txt.gz
β”‚ β”œβ”€β”€ part-00527-73aeb99f-be3e-4fb0-8ecf-6d5a542187c2.c000.txt.gz
β”‚ β”œβ”€β”€ part-00535-73aeb99f-be3e-4fb0-8ecf-6d5a542187c2.c000.txt.gz
β”‚ β”œβ”€β”€ part-00565-73aeb99f-be3e-4fb0-8ecf-6d5a542187c2.c000.txt.gz
β”‚ β”œβ”€β”€ part-00575-fc381c9f-defe-44c9-bc63-88189060ccd8.c000.txt.gz
β”‚ β”œβ”€β”€ part-00585-6f998800-860c-45fe-9c46-c6612772cb01.c000.txt.gz
β”‚ β”œβ”€β”€ part-00643-6f998800-860c-45fe-9c46-c6612772cb01.c000.txt.gz
β”‚ β”œβ”€β”€ part-00664-6f998800-860c-45fe-9c46-c6612772cb01.c000.txt.gz
β”‚ β”œβ”€β”€ part-00718-fc381c9f-defe-44c9-bc63-88189060ccd8.c000.txt.gz
β”‚ β”œβ”€β”€ part-00721-fc381c9f-defe-44c9-bc63-88189060ccd8.c000.txt.gz
β”‚ └── part-00730-fc381c9f-defe-44c9-bc63-88189060ccd8.c000.txt.gz
└── shredding_complete.json

Everything you posted so far looks completely healthy and expected. Except the error in the Loader, obviously.

Could you:

  1. Confirm that in your shredding_complete.json the base points to exactly that path (including run=2022-01-15-09-48-52)? I cannot check it because don’t see bucket and if there’s any prefix.
  2. Make sure it wasn’t an old SQS message that triggered that load, which could point to a different location (e.g. you should see shredder’s version in the shredding_complete.json)
  3. Try to send the content of your shredding_complete.json into the SQS queue

The base in the shredding_complete.json points to the same path and the shredder version was 2.0.0

I will try to send the content to the sqs queue.

        {
   "schema":"iglu:com.snowplowanalytics.snowplow.storage.rdbloader/shredding_complete/jsonschema/1-0-1",
   "data":{
      "base":"s3://sp-shredded-alternative-gitc-dev/run=2022-01-15-09-48-52/",
      "types":[
         {
            "schemaKey":"iglu:com.snowplowanalytics.snowplow/atomic/jsonschema/1-0-0",
            "format":"TSV"
         },
         {
            "schemaKey":"iglu:com.myapp/scan_event/jsonschema/1-0-0",
            "format":"JSON"
         },
         
[ our schemas ...]
         {
            "schemaKey":"iglu:com.myapp/charging_event/jsonschema/1-0-0",
            "format":"JSON"
         }
      ],
      "timestamps":{
         "jobStarted":"2022-01-21T12:26:48.552Z",
         "jobCompleted":"2022-01-21T12:26:51.658Z",
         "min":"2022-01-15T09:48:48.872Z",
         "max":"2022-01-15T09:54:54.035Z"
      },
      "compression":"GZIP",
      "processor":{
         "artifact":"snowplow-rdb-shredder",
         "version":"2.0.0"
      },
      "count":{
         "good":12
      }
   }
}

One thing that is striking me is that the snowplow schema is in TSV whereas our custom schemas are in JSON. We specified JSON as a format in the shredding config.

Our updated rdbloader config.hocon:


{
  # Data Lake (S3) region
  # This field is optional if it can be resolved with AWS region provider chain.
  # It checks places like env variables, system properties, AWS profile file.
  # https://sdk.amazonaws.com/java/api/latest/software/amazon/awssdk/regions/providers/DefaultAwsRegionProviderChain.html
  "region": "eu-west-1",

  # SQS topic name used by Shredder and Loader to communicate
  "messageQueue": "SQS_QUEUE",

  # Optional. S3 path that holds JSONPaths
  "jsonpaths": "s3://snowplow-schemas-STAGE/jsonpaths/",

  # Warehouse connection details
  "storage" : {
    # Redshift hostname
    "host": "REDSHIFT_ENDPOINT",
    # Database name
    "database": "sp_redshift_database",
    # Database port. Optional, default value 5439
    "port": 5439,
    # AWS Role ARN allowing Redshift to load data from S3
    "roleArn": "arn:aws:iam::AWS_ACCOUNT_NUMBER:role/REDSHIFTLOADROLE",
    # DB schema name
    "schema": "atomic",
    # DB user with permissions to load data
    "username": "REDSHIFT_USER_NAME",
    # DB password
    "password": "DB_REDSHIFT_PASSWORD",
    # Custom JDBC configuration. Optional, default value { "ssl": true }
    "jdbc": { "ssl": true },
    # MAXERROR, amount of acceptable loading errors. Optional, default value 10
    "maxError": 10
  },

  "schedules": {
    # Periodic schedules to stop loading, e.g. for Redshift maintenance window
    # Any amount of schedules is supported, but recommended to not overlap them
    "noOperation": [
      {
        # Human-readable name of the no-op window
        "name": "Maintenance window",
        # Cron expression with second granularity
        "when": "0 0 12 * * ?",
        # For how long the loader should be paused
        "duration": "1 hour"
      }
    ]
  }

  # Observability and reporting options
"monitoring": {
    # Snowplow tracking (optional)
  #  "snowplow": {
   #   "appId": "redshift-loader",
    #  "collector": "snplow.acme.com",
    #},

    # Optional, for tracking runtime exceptions
   # "sentry": {
    #  "dsn": "http://sentry.acme.com"
   # },

    # Optional, configure how metrics are reported
   # "metrics": {
      # Optional, send metrics to StatsD server
    #  "statsd": {
     #   "hostname": "localhost",
      #  "port": 8125,
        # Any key-value pairs to be tagged on every StatsD metric
       # "tags": {
        #  "app": "rdb-loader"
        #}
        # Optional, override the default metric prefix
        # "prefix": "snowplow.rdbloader."
      #},

      # Optional, print metrics on stdout (with slf4j)
     # "stdout": {
        # Optional, override the default metric prefix
        # "prefix": "snowplow.rdbloader."
      #}
    #},

    # Optional, configuration for periodic unloaded/corrupted folders checks
   # "folders": {
      # Path where Loader could store auxiliary logs
      # Loader should be able to write here, Redshift should be able to load from here
    #  "staging": "s3://acme-snowplow/loader/logs/",
      # How often to check
     # "period": "1 hour"
      # Specifies since when folder monitoring will check
      #"since": "14 days"
      # Specifies until when folder monitoring will check
      #"until": "7 days"
      # Path to shredded archive
     # "shredderOutput": "s3://acme-snowplow/loader/shredder-output/"
    #},

    # Periodic DB health-check, raising a warning if DB hasn't responded to `SELECT 1`
    "healthCheck": {
      # How often query a DB
      "frequency": "20 minutes",
      # How long to wait for a response
      "timeout": "15 seconds"
    }
  },

  # Additional backlog of recently failed folders that could be automatically retried
  # Retry Queue saves a failed folder and then re-reads the info from shredding_complete S3 file
  "retryQueue": {
    # How often batch of failed folders should be pulled into a discovery queue
    "period": "30 minutes",
    # How many failures should be kept in memory
    # After the limit is reached new failures are dropped
    "size": 64,
    # How many attempt to make for each folder
    # After the limit is reached new failures are dropped
    "maxAttempts": 3,
    # Artificial pause after each failed folder being added to the queue
    "interval": "5 seconds"
  }
}

If you mean just iglu:com.snowplowanalytics.snowplow/atomic/jsonschema/1-0-0 - it’s completely fine, it’s a special schema.

I’m looking forward to know what happens if you send the shredding_complete.json to the SQS queue.

The only scenario when that message arises is when Loader receives a message, which contains a base path doesn’t exist. And from all your other messages it doesn’t seem to be the case.

1 Like

It’s running without errors now. Thanks for your help @anton !

1 Like

Hey @anton

unfortunately, we noticed that the shredderr 2.0.0 is reshredding already existing data in the shredded bucket and not loading anything into redshift? What could be a reson for that?
We do not get an error on the rdb loader module but cloudwatch gives us these strange info messages:

It is trying to load files which should have been loaded already last week.

Some sqs params we have used until now:

  fifo_queue                  = true
  content_based_deduplication = true
  visibility_timeout_seconds  = 600
  message_retention_seconds   = 345600

Do we have to add SNS?

  # SNS example:
  #"queue": {
  #  # Type of the queue. It can be either sqs or sns
  #  "type": "sns",
  #  # ARN of SNS topic
  #  "topicArn": "arn:aws:sns:eu-central-1:123456789:test-sns-topic",
  #  # Region of the SNS topic
  #  "region": "eu-central-1"
  #}

Do we need to specify the sns arn in the rdbloader config? Because the name alone does not seem to work:


 # SQS topic name used by Shredder and Loader to communicate
  "messageQueue": "SNS_TOPIC",

Hi @mgloel,

It can happen only if you remove/rename your shredded data. The logic is that Shredder compares list of enriched and shredded bucket and everything that is enriched, but not shredded is picked up.

Your last error might have been a consequence of the same problem.

1 Like

Well, we have not touched the shredded data. The EMR job just keeps reshredding the same data and not loading anything into Redshift.
We will try to set up two clean buckets for enriched and shredded and rerun it. Let’s see.

Hi @anton ,

a quick update: We created two new empty buckets for enriched and shredded data that have a 2-0-0 version suffix. The rdbloader is not running into any sqs errors anymore and the data is being loaded into redshift.

However, it seems that the shredding_complete.json file is modified on each shredder run. The files itself are reshredded with a new filename (see screenshot below) Furthermore, the duration of the shredding is always about one hour for a rather small amount of data. It does not seem normal.
Do we have to specify manifest somewhere?

It’s adding new files on the subsequent shredding runs which seems odd.

Another question if we want to use 2.1.0 we just bump the versions

FROM snowplow/snowplow-rdb-loader:2.1.0

and

 "s3://snowplow-hosted-assets-eu-central-1/4-storage/rdb-shredder/snowplow-rdb-shredder-2.1.0.jar",

We have not found any documentation for 2.1.0.

Hi @mgloel,

Yes, to upgrade you can just bump the version, no config changes required.

On the actual bug - I was inspecting the code and found one suspicious part. I think now the problem might be in the fact that your folders are located in the root (like s3://sp-shredded-2-0-0-gtc-stg/run=2022-01-26-07-43-13) and in our own deployments we always use a prefix (like s3://sp-shredded-2-0-0-gtc-stg/archive/run=2022-01-26-07-43-13 and s3://sp-enriched-2-0-0-gtc-stg/archive/run=2022-01-26-07-43-13). I’m going to test this hypothesis tomorrow - if meanwhile you can try it on your own to add a prefix - that would be great.

2 Likes