Data Engineering in Snowflake — Semi Structured Data Processing — Part I
Working with Semi-Structured Data
Product Owner: Our analytics team needs to develop a ML program to harness insights from events data. Can you tell me how much time would it take to provide the required dataset to our Data Scientist team…
Scrum Master: We created a user story for our analyst who is working on identifying the key attributes, and once we have them ready our scrum team will get right on it in next sprint cycle. But as you might be aware all events data is in JSON/XML format which requires lot of data mapping effort.
Snowflake to the rescue…
Yes, Snowflake natively provides the support for semi-structured data which has been long talked about but how to process this data with minimal touchpoint and to scale while building data pipelines. But wait, that has also been discussed multiple times so what’s new ? Well, this article would help you take that understanding to the next level.
You can bring in your semi-structured data to Snowflake via multiple channels, by using PUT-COPY command, External Tables or Kafka connector (for near-real time data processing) to write events data into Snowflake.
As data lands into Snowflake (let’s call this as Staging Layer) it needs to be curated/filtered or Standardized to support insight derivation. Assuming you are getting the entire record(all attributes) when the data is updated in the upstream and we can divide this whole activity into 4 parts.
- Schema Check (Path Derivation to Attributes)
- Identify and Filter out Duplicates or Old revisions (Optional)
- Flattening your Data
- Merge into Target Table
1. Schema Check (Path Derivation for JSON Attributes)
While Semi-structured data is flexible to changes due to lack of fixed schema, this can also be challenging for consuming applications if these changes are not advertised in advance. Keeping the downstream applications dynamically adapt to the changing structure would be ideal to avoid failures in the data pipelines, isn’t it !
Let’s dig deep, we start off by loading our semi-structured dataset (e.g. JSON payload) into the staging table and identify the path to each attribute. It’s like having the knowledge about each attribute, it’s parent (hierarchy) and how to access the attribute (complete path) from the JSON payload if you need to query one.
Using the latest payload version from events data, create the following view to access the path to each attribute (assuming it’s a JSON payload). Subsequently, any changes in the payload for e.g. addition of new attributes or removal of existing ones can dynamically reflect in the view definition.
You may often plan to check the incoming payload for changes to ensure data consistency & integrity . Thus making it very important to have schema check as the primary validation in your data pipeline process when working with semi-structured dataset.
You may optionally plan to create multiple revisions of the view to keep track of the payload changes for auditing or compliance purposes..
Keep in mind, when you are trying to access an attribute which doesn’t exists or is no longer available in the JSON object, Snowflake will return null value instead of raising an error. Also you may have multiple attributes with the same name at different levels in the event payload which are legit and are handled appropriately in the snippet below as column names are prefixed with attribute hierarchy be default
2. Identify and Filter out any Duplicates or Old revisions
Though the Snowflake Kafka connector ensure the message is delivered only once, the staging table may still end up getting multiple versions of the record from the upstream when same records are transacted upon within a short time span, you may want to filter out and process only the latest version (net affect) of the record.
For this , we can leverage a small utility to identify latest revisions from the events data and create a view in database using the path to the available/known primary key combination and incremental column from the dataset.
Below utility would take the Schema Check view (from Step 1) as one of the inputs which helps to derive the path to Primary and Incremental key and creates a Snowflake view. This filters out and provide only the latest revisions of the payload records that should be taken to the next level for processing.
Depending upon the use case the latest payload revisions can now be fed into the data pipeline for flattening the payload and mapping the attributes to the target table columns for merge operations.
Follow along on the topic for data flattening and data merge operations at Data Engineering in Snowflake — Semi Structured Data Processing— Part II
………………It’s Snowing on a SunnyDay…!!! ……………….