Data Pipeline & Automation

Data Engineering in Snowflake — Semi Structured Data Processing — Part II

Sunny Malik
3 min readApr 10, 2021

--

In the previous article (Data Engineering in Snowflake — Semi Structured Data Processing — Part I) we discussed about the importance of Schema Check, and how to identify path to JSON attributes and filter our latest revisions from the semi-structured data for processing within a micro-batch.

Let’s continue to next important step in the process….

3. Flattening the Dataset

Events data can be flattened using the view created in Step1 (which provides us the path and column names of each attribute in the payload) and map them to the target column names using the de-duplicated dataset from Step2 (Optional)

One the data is flattened it time to prepare the dataset for the final Merge Operation in the target table.

4. Merge Into Target Table

Last step of the puzzle is using the flattened data view and performing the Merge on the Target table. Snowflake stored procedure can help you create dynamic statements which can help you pull the column list of the target table

Similarly you can use below query which takes in primary key columns(delimited) and generates the corresponding mapping to be used in Merge statement for Join expression

Syntax of Merge Statement

MERGE INTO <target_table> USING <source> ON <join_expr>WHEN MATCHED [ AND <case_predicate> ] THEN { UPDATE SET <col_name> = <expr> [ , <col_name2> = <expr2> ... ] | DELETE } [ ... ]WHEN NOT MATCHED [ AND <case_predicate> ] THEN INSERT [ ( <col_name> [ , ... ] ) ] VALUES ( <expr> [ , ... ] )

Its important to note that we are deriving the mapping (de-dup view -> target table) for the merge operation using the column metadata of the target table (This would not raise an error if new added column in events payload is not read or yet mapped to the target table. On the other hand, if the expected column is no longer available in JSON payload, it would raise an error).

Final Thoughts

You can leverage Snowflake stored procedures to automate the entire execution process for all listed steps or run them through your Data pipeline.

If you plan to process the events data multiple time in a day, Tasks and streams would come very handy as you can leverage streams to collect the incremental records instead of the staging table (your source table for Step 1 and Step 2). Snowflake Streams would only capture the new records inserted into the staging layer and as Streams gets processed, data is purged from the Stream. You can still recover the data from the stream in case of any eventualities using time travel (defaulted to 14 days).

Tasks on the other hand can help schedule the execution of the Data pipeline periodically by running your stored procedure and processing your data from Staging to Target table. They can be setup using CRON expression or schedule with minimum frequency of 1 min thus automating the entire data pipeline from end to end perspective.

For more details you may also refer to the common continuous data pipeline flow using Snowflake functionality at the link below.

………………It’s Snowing on a SunnyDay…!!! ………………….

--

--