Help tracking form data via unstructured event & kafka

Hello,
I am new to snowplow, and having a challenge tracking form data via an unstructured event. I’m using javascript tracker 2.7.0, scala stream collector 0.9.0, kafka 0.10.1.0.
I’ve been following the tutorial here: Form tracking with Snowplow [tutorial].

My goal at this point is to get the form submission data logged in the kafka “collector-payloads” topic. Any ideas would be much appreciated.

Note:
I posted a similar challenge recently, using a different pipeline (javascript tracker + clojure-collector + s3), where the problem was simply that I didn’t see the base64 code in the s3 logs. However, this is not the case this time, as there is no base64 code visible for the unstructured events in the kafka collector-payloads topic.

The challenge:
The data from the unstructured event (which is inteded to track form submissions) isn’t being logged into the kafka “collector-payloads” topic.

Here’s what I know:
-Pageview tracking using the snowplow-authored event for that is working successfully.
-I know that the form data comes in as base64 code, and there is no base64 data being logged in the kafka collector-payloads topic. Only non-encoded pageview data is being logged.
-If I use a snowplow-authored event instead of the unstructured event, it works (scala-stream-collector logs the form data into the kafka collector-payloads topic successfully).
-I have the javascripts loading on my website via the tag manager “qubit”, and the unstructured event javascript code loads at the end of the body section, meaning it loads after the form loads. The rest of the code loads near the top of the page. The unstructured event javascript code loads after the base javascript code loads (see code below for what I mean by base code).
-My website is static html, and so the form loads statically not dynamically.
-The cookie from the scala-stream-collector is setting fine, and I’m seeing the HTTP requests in my browsers’ network pane in developer tools (Something like http://i.mydomain.com/i?stm=234893)
-The cookie from the javascript tracker is setting fine.
-I am hosting an iglu repo via firebase at https://my-redacted-company-iglu.firebaseapp.com, and I have confirmed the schema is available at https://my-redacted-company-iglu.firebaseapp.com/schemas/com.my-redacted-company/custom_submit_form/jsonschema/1-0-0, and the jsonpaths are available at https://my-redacted-company-iglu.firebaseapp.com/jsonpaths/com.my-redacted-company/custom_submit_form_1.json
-My iglu repo is hosted using https, as firebase forces https.
-The unstructured event data isn’t being logged into the bad-1 kafka topic, as I don’t see any relevant data in there.

My base javascript code (loaded up by qubit tag management):

<script type="text/javascript"> 

//Load js file
;(function(p,l,o,w,i,n,g){if(!p[i]){p.GlobalSnowplowNamespace=p.GlobalSnowplowNamespace||[];
p.GlobalSnowplowNamespace.push(i);p[i]=function(){(p[i].q=p[i].q||[]).push(arguments)
};p[i].q=p[i].q||[];n=l.createElement(o);g=l.getElementsByTagName(o)[0];n.async=1;
n.src=w;g.parentNode.insertBefore(n,g)}}(window,document,"script","//myurl.com/assets/js/53477247898.js","sp001"));

//Create new tracker & load general parameters
window.sp001("newTracker", "tracker1", "i.myurl.com", { 
  appId: "mainwebsite",
  platform: "web",
  cookieDomain: ".myurl.com",
  cookieName: "_gs563_", 
  sessionCookieTimeout: 3600, 
  cookieLifetime: 315576
});

//Load pageview tracking
window.sp001('trackPageView');

</script>`

My unstructured event javascript code, loaded at the end of the body (loaded up by qubit tag management):

<script type="text/javascript"> 
    $('#form001').submit(function(){
        var form_id = $(this).attr('id');
        var email_address = $('input[name=form[email]]').val();
        window.sp001('trackUnstructEvent', {
            schema: 'iglu:com.my-redacted-company/custom_submit_form/jsonschema/1-0-0',
            data: {
                form: form_id,
                email: email_address
            }
        });
    });
</script>

My form’s html:

<div id="form001" class="application-form w-form"> 
	<form action="http://myurl.com/form/submit?formId=3" class="w-clearfix" data-name="Application Form" id="application-form-1" method="post" name="application-form-1" autocomplete="off"> 
		<div class="w-embed"> 
			<input type="hidden" name="form[formId]" value="3" />
			<input type="hidden" name="form[formName]" value="form3" />
		</div>
		<input class="form-1-text-field w-input" data-name="form[first_name]" id="_form_first_name" maxlength="256" name="form[first_name]" placeholder="first name" required="required" type="text">
		<input class="form-1-text-field w-input" data-name="form[last_name]" id="_form_last_name" maxlength="256" name="form[last_name]" placeholder="last name" required="required" type="text">
		<input class="form-1-text-field w-input" data-name="form[company_name]" id="_form_company_name" maxlength="256" name="form[company_name]" placeholder="company name" required="required" type="text"> 
		<input class="form-1-text-field w-input" data-name="form[company_website]" id="_form_company_website" maxlength="256" name="form[company_website]" placeholder="company website" type="text">
		<input class="form-1-text-field w-input" data-name="form[email]" id="_form_email" maxlength="256" name="form[email]" placeholder="email" required="required" type="email"> 
		<input class="form-1-submit w-button" data-wait="please wait" id="form_input_application_submit" type="submit" value="submit"> 
	</form>
	<div class="w-form-done">
		<p>Thank you.
			<br>You'll hear from us soon.</p>
	</div>
	<div class="w-form-fail">
		<p>Error, please email us instead.</p>
	</div>
</div>

My scala-stream-collector config file:

collector {
  # The collector runs as a web service specified on the following
  # interface and port.
  interface = "0.0.0.0"
  port = 80

  # Production mode disables additional services helpful for configuring and
  # initializing the collector, such as a path '/dump' to view all
  # records stored in the current stream.
  production = true

  # Configure the P3P policy header.
  p3p {
    policyref = "/w3c/p3p.xml"
    CP = "NOI DSP COR NID PSA OUR IND COM NAV STA"
  }

  # The collector returns a cookie to clients for user identification
  # with the following domain and expiration.
  cookie {
    enabled = true
    expiration = 365 # 1 year
    # Network cookie name
    name = _75234_
    # The domain is optional and will make the cookie accessible to other
    # applications on the domain. Comment out this line to tie cookies to
    # the collector's full domain
    domain = ".my-redacted-domain.com"
  }

  # The collector has a configurable sink for storing data in
  # different formats for the enrichment process.
  sink {
    # Sinks currently supported are:
    # 'kinesis' for writing Thrift-serialized records to a Kinesis stream
    # 'kafka' for writing Thrift-serialized records to kafka
    # 'stdout' for writing Base64-encoded Thrift-serialized records to stdout
    #    Recommended settings for 'stdout' so each line printed to stdout
    #    is a serialized record are:
    #      1. Setting 'akka.loglevel = OFF' and 'akka.loggers = []'
    #         to disable all logging.
    #      2. Using 'sbt assembly' and 'java -jar ...' to disable
    #         sbt logging.
    enabled = "kafka"

    kinesis {
      thread-pool-size: 10 # Thread pool size for Kinesis API requests

      # The following are used to authenticate for the Amazon Kinesis sink.
      #
      # If both are set to 'default', the default provider chain is used
      # (see http://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/auth/DefaultAWSCredentialsProviderChain.html)
      #
      # If both are set to 'iam', use AWS IAM Roles to provision credentials.
      #
      # If both are set to 'env', use environment variables AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY
      aws {
        access-key: "iam"
        secret-key: "iam"
      }

      # Data will be stored in the following stream.
      stream {
        region: "us-east-1"
        good: "example"
        bad: "example"
      }

      # Minimum and maximum backoff periods
      backoffPolicy: {
        minBackoff: 0
        maxBackoff: 0
      }
    }

    kafka {
      brokers: "my-redacted-local-dns-routing-host:9092" 

      # Data will be stored in the following topics
      topic {
        good: "collector-payloads"
        bad: "bad-1"
      }
    }

    # Incoming events are stored in a buffer before being sent to Kinesis/Kafka.
    # The buffer is emptied whenever:
    # - the number of stored records reaches record-limit or
    # - the combined size of the stored records reaches byte-limit or
    # - the time in milliseconds since the buffer was last emptied reaches time-limit
    buffer {
      byte-limit: 4500000 # 4.5mb
      record-limit: 500 # Not supported by Kafka; will be ignored. Put something here anyways or there will be an error.
      time-limit: 60000 # 1 minute
    }
  }
}

# Akka has a variety of possible configuration options defined at
# http://doc.akka.io/docs/akka/2.2.3/general/configuration.html.
akka {
  loglevel = OFF # 'OFF' for no logging, 'DEBUG' for all logging.
  loggers = ["akka.event.slf4j.Slf4jLogger"]
}

# spray-can is the server the Stream collector uses and has configurable
# options defined at
# https://github.com/spray/spray/blob/master/spray-can/src/main/resources/reference.conf
spray.can.server {
  # To obtain the hostname in the collector, the 'remote-address' header
  # should be set. By default, this is disabled, and enabling it
  # adds the 'Remote-Address' header to every request automatically.
  remote-address-header = on

  uri-parsing-mode = relaxed
  raw-request-uri-header = on

  # Define the maximum request length (the default is 2048)
  parsing {
    max-uri-length = 32768
  }
}

My jsonschema for the unstructured event:

{
	"$schema": "http://iglucentral.com/schemas/com.snowplowanalytics.self-desc/schema/jsonschema/1-0-0#",
	"description": "Schema for form submission",
	"self": {
		"vendor": "com.my-redacted-company",
		"name": "custom_submit_form",
		"format": "jsonschema",
		"version": "1-0-0"
	},
	"type": "object",
	"properties": {
		"email": {
			"type": "string"
		},
		"form": {
			"type": "string"
		}
	},
	"minProperties":1,
	"required": ["email"],
	"additionalProperties": false
}

My jsonpaths file for the unstructured event:

{
    "jsonpaths": [
        "$.schema.vendor",
        "$.schema.name",
        "$.schema.format",
        "$.schema.version",
        "$.hierarchy.rootId",
        "$.hierarchy.rootTstamp",
        "$.hierarchy.refRoot",
        "$.hierarchy.refTree",
        "$.hierarchy.refParent",
        "$.data.email",
        "$.data.form"
    ]
}

Hi @rob - I can’t answer your specific question but reading through your setup I can flag something that won’t work: hosting the JSON Paths files on HTTP(S).

This is because under the hood these files are used for Redshift COPY FROM JSON commands, and those commands expect the files to be hosted on S3:

http://docs.aws.amazon.com/redshift/latest/dg/copy-parameters-data-format.html#copy-json

Thanks @alex for the info about the iglu repo.

Update: The issue seems to be that the unstructured event is logged into kafka successfully when the form field’s name is “Email”, but not when it is “form[email]”. However, the email software I’m using requires the form name to be “form[email]”. The brackets in the name appear to be the problem.

I modified the javascript tracking code from:

var email_address = $('input[name=form[email]]').val();

to:

var email_address = $("input[name='form[email]']").val();

It seems to be working with this change. I’m not sure if this code needs any updates, like escape characters (qubit tag manager seems to apply escape characters automatically) or something else. If so, please let me know. Otherwise, this is the solution I’ve found, if others need it.

Thanks @rob - feel free to create a bug report for the JS Tracker and we’ll try to reproduce it: