Estuary

Snowflake Triggers: How To Use Streams & Tasks + Examples

In this guide, learn how to master Snowflake triggers using streams and tasks and see examples and tutorials to automate your data workflows.

Snowflake Triggers: How To Use Streams & Tasks + Examples
Share this article

In a landscape where every moment counts, a single mistake can make or break a company's trajectory. Ignoring data-driven insights might just be the knockout punch that sends a business into a tailspin of oblivion. Amid this struggle for survival, Snowflake triggers emerge as a gateway to the future of data processing

Automated data management techniques like Snowflake triggers are not just useful; they're downright essential to keep things running smoothly and efficiently. With a staggering 207% growth in the number of organizations using data across all three major public clouds, triggers emerge as the true MVPs.

If that caught your attention, we’ve got some more insights for you. This guide will break down Snowflake triggers for you and show you exactly how to master Streams and Tasks. No more complexities – just straightforward and effective usage.

Understanding Snowflake Triggers: The Path To Data Excellence

Snowflake Triggers - Snowflake

When we refer to "Snowflake Triggers," we're actually talking about a unique approach Snowflake takes toward automating database actions. In most traditional databases, a trigger is a set of instructions that are executed when a specific event occurs, like an update to a data row. 

However, in Snowflake, the concept of triggers as we know it doesn't exist. Instead, Snowflake uses a different method to achieve similar outcomes using a combination of features called Streams and Tasks.

The Snowflake database service does not provide native support for triggers in the traditional sense but this doesn't limit its capabilities. It uses Streams and Tasks to replicate the functionalities of triggers in a more efficient and scalable way.

What does this mean for you as a developer or data engineer? You get the functionality you would from a trigger but with more flexibility and control. It lets you automate specific actions based on changes in your data, much like traditional triggers. However, you get to do it in a more efficient, scalable way that's built for the demands of big data and cloud computing.

Let's now see what these Streams and Tasks are.

What Are Snowflake Streams?

Snowflake Triggers - Snowflake Streams

Image Source

In Snowflake, a Stream is an object type that functions to track and record changes in a table over a certain period. Think of it as a tool that lets you monitor alterations within your table data, be it updated rows or different time intervals. 

With Streams, developers can query and extract precise information from tables. This Change Data Capture (CDC) capability of Streams makes it a valuable resource for managing data changes efficiently.

What Are Snowflake Tasks?

Snowflake Triggers - Snowflake Tasks

Image Source

On the other hand, a Task in Snowflake is an object type used to set up recurring schedules for executing SQL statements, including those that fetch data from stored procedures. Tasks can run continuously and simultaneously which makes them perfect for handling complex, periodic data processing. They provide an automated way to regularly carry out specific SQL operations to improve efficiency and consistency in data management.

In most data pipelines, which are typically continuous, Tasks work together with Streams. A Task can check if a Stream holds any changed data for a table. If no changes are found, the Task determines whether the pipeline used, altered, or skipped the data. This symbiotic relationship between Tasks and Streams in Snowflake mimics traditional triggers for efficient and scalable data processing.

Leveraging Snowflake's Streams & Tasks: A General Blueprint

Snowflake Triggers - Data Flow

Image Source

Let’s talk about the steps and the general flow of the process. 

Database & Schema Setup

First off, set up your database and schema. This ensures that all your subsequent operations - creating tables, streams, and tasks - are performed in the correct context.

Source Table Creation

Create the source table which holds the data you want to track changes for. This includes an update timestamp field that logs when each record was last updated.

History Table Creation

Establish a history table. This table serves as a record of changes to the source table. It includes fields for start time and end time for tracking the validity period of each record, and a current flag to indicate the current record for each key.

Stream Definition

Create a Stream on the source table. A Stream is a Snowflake object that logs changes (inserts, updates, and deletes) to a table. You can select these changes as if they were rows in a table.

Change Logic View

Set up a View that defines the logic for handling the changes captured by the Stream. This View generates the data to be inserted or updated in the history table and assigns a DML type ('I', 'U', or 'D') to each record based on the type of change.

Merge Changes

Implement a MERGE statement to integrate the changes defined in the View into the history table. This statement updates existing records, deletes records, or inserts new records based on the DML type of each record in the View.

Role Creation For Task Execution

Create a role for task administration and give it the EXECUTE TASK privilege. This role is used to execute tasks in Snowflake.

Warehouse Creation For Task

Establish a warehouse that will provide the computing resources for the task to run.

Task Creation

Construct a Task that schedules the MERGE statement to run periodically. The task uses the data warehouse you've set up and is scheduled to run whenever data is in the Stream.

Task Activation

Finally, resume the task so it starts running. By default, tasks are created in a suspended state and must be resumed before they start running.

The best practice is to customize the table names and schemas to fit your requirements and then use this robust blueprint for all your data warehousing operations in Snowflake. 

The real power of Snowflake's Streams and Tasks shines when we use them in real-life situations. Let's move beyond the theoretical understanding and see their application in different situations for a better understanding of their utility and effectiveness.

2 Examples Of Snowflake Streams & Tasks: A Practical Guide

Here are 2 examples that will show how Streams and Tasks can be used in different business contexts.

Example 1: Real-time Employee Data Management With Snowflake Streams & Tasks

Let’s consider the HR department of a firm. Here, we’ll specifically focus on an employees' table. We want to develop a real-time change-tracking mechanism for employee data, a common need across many organizations for monitoring new hires, tracking departures, or documenting departmental changes.

For this, we'll implement a stream on our employees' table to capture any data modifications. We'll also create an employees_history table to store a detailed history of the employee records. A regular Task in Snowflake merges the changes that our stream captured into the history table to keep the history data current.

Let's walk through the SQL scripts that construct this robust real-time data tracking system in Snowflake.

Database & Schema Setup

plaintext
CREATE DATABASE company; USE DATABASE company; CREATE SCHEMA hr; USE SCHEMA hr;

This SQL command generates a new database named ‘company’ and a schema named hr. It then switches to this database and schema for subsequent operations.

Source Table Creation

plaintext
CREATE OR REPLACE TABLE employees (     emp_id NUMBER,     first_name VARCHAR(50),     last_name VARCHAR(50),     dept_id NUMBER,     hire_date TIMESTAMP_NTZ,     update_timestamp TIMESTAMP_NTZ );

It creates the ‘employees’ table which is the source table holding the data we want to track changes for.

History Table Creation

plaintext
CREATE OR REPLACE TABLE employees_history (    emp_id NUMBER,    first_name VARCHAR(50),    last_name VARCHAR(50),    dept_id NUMBER,    hire_date TIMESTAMP_NTZ,    start_time TIMESTAMP_NTZ,    end_time TIMESTAMP_NTZ,    current_flag INT );

It generates the employees_history’ table which will hold a history of changes to the ‘employees’ table.

Stream Definition

plaintext
CREATE OR REPLACE STREAM employees_table_changes ON TABLE employees;

This creates a stream named ‘employees_table_changes’ on the ‘employees' table.

Change Logic View

plaintext
CREATE OR REPLACE VIEW employees_change_data AS SELECT emp_id, first_name, last_name, dept_id, hire_date, start_time, end_time, current_flag, dml_type FROM (    -- Logic for new insertions    SELECT emp_id, first_name, last_name, dept_id, hire_date, update_timestamp AS start_time,           LAG(update_timestamp) OVER (PARTITION BY emp_id ORDER BY update_timestamp DESC) AS end_time_raw,           CASE               WHEN end_time_raw IS NULL THEN '9999-12-31'::TIMESTAMP_NTZ               ELSE end_time_raw           END AS end_time,           CASE               WHEN end_time_raw IS NULL THEN 1               ELSE 0           END AS current_flag,           'I' AS dml_type    FROM employees_table_changes    WHERE metadata$action = 'INSERT' AND metadata$isupdate = 'FALSE'    UNION ALL    -- Logic for updates    SELECT emp_id, first_name, last_name, dept_id, hire_date, update_timestamp AS start_time,           LAG(update_timestamp) OVER (PARTITION BY emp_id ORDER BY update_timestamp DESC) AS end_time_raw,           CASE               WHEN end_time_raw IS NULL THEN '9999-12-31'::TIMESTAMP_NTZ               ELSE end_time_raw           END AS end_time,           CASE               WHEN end_time_raw IS NULL THEN 1               ELSE 0           END AS current_flag,           'U' AS dml_type    FROM employees_table_changes    WHERE metadata$action = 'UPDATE'    UNION ALL    -- Logic for deletions    SELECT emp_id, NULL, NULL, NULL, NULL, start_time, CURRENT_TIMESTAMP()::TIMESTAMP_NTZ, NULL, 'D' AS dml_type    FROM employees_history eh    WHERE eh.emp_id IN (SELECT DISTINCT emp_id FROM employees_table_changes WHERE metadata$action = 'DELETE')    AND eh.current_flag = 1 );

This script will create a view named ‘employees_change_data’ that defines the logic for handling the changes captured by the ‘employees_table_changes’ stream. The logic includes handling new insertions, updates, and deletions. 

The view generates a row for each change, specifying the type of change ('I' for insert, 'U' for update, 'D' for delete) in the ‘dml_type’ column. The ‘current_flag’ is set to 1 for new records and 0 for old records. The ‘start_time’ and ‘end_time’ define the time range during which each version of the record is valid.

Merge Changes Into The History Table

plaintext
MERGE INTO employees_history eh USING employees_change_data ecd ON eh.emp_id = ecd.emp_id AND eh.start_time = ecd.start_time WHEN MATCHED AND ecd.dml_type = 'U' THEN UPDATE    SET eh.end_time = ecd.end_time,        eh.current_flag = 0 WHEN MATCHED AND ecd.dml_type = 'D' THEN UPDATE    SET eh.end_time = ecd.end_time,        eh.current_flag = 0 WHEN NOT MATCHED AND ecd.dml_type = 'I' THEN INSERT    (emp_id, first_name, last_name, dept_id, hire_date, start_time, end_time, current_flag)    VALUES    (ecd.emp_id, ecd.first_name, ecd.last_name, ecd.dept_id, ecd.hire_date, ecd.start_time, ecd.end_time, ecd.current_flag);

This merges the changes defined in the ‘employees_change_data’ view into the ‘employees_history’ table. It checks the ‘dml_type’ field from the view to determine whether to update existing records or insert new records into the history table. The ‘current_flag’ is set to 0 for updated or deleted records to indicate that they are no longer the current version of the record.

Role Creation For Task Execution

plaintext
USE ROLE securityadmin; CREATE ROLE taskadmin; USE ROLE accountadmin; GRANT EXECUTE TASK ON ACCOUNT TO ROLE taskadmin; USE ROLE securityadmin; GRANT ROLE taskadmin TO ROLE sysadmin;

This script creates a role named ‘taskadmin’ and grants the EXECUTE TASK privilege to it.

Warehouse Creation For Task

plaintext
CREATE WAREHOUSE IF NOT EXISTS task_warehouse WITH WAREHOUSE_SIZE = 'XSMALL' AUTO_SUSPEND = 120;

This creates a warehouse named ‘task_warehouse’ that will provide the compute resources for the task to run.

Task Creation

plaintext
CREATE OR REPLACE TASK update_employee_history    WAREHOUSE = task_warehouse    SCHEDULE = '1 minute'    WHEN SYSTEM$STREAM_HAS_DATA('employees_table_changes') AS    MERGE INTO employees_history eh    USING employees_change_data ecd    ON eh.emp_id = ecd.emp_id    AND eh.start_time = ecd.start_time    WHEN MATCHED AND ecd.dml_type = 'U' THEN UPDATE        SET eh.end_time = ecd.end_time,            eh.current_flag = 0    WHEN MATCHED AND ecd.dml_type = 'D' THEN UPDATE        SET eh.end_time = ecd.end_time,            eh.current_flag = 0    WHEN NOT MATCHED AND ecd.dml_type = 'I' THEN INSERT        (emp_id, first_name, last_name, dept_id, hire_date, start_time, end_time, current_flag)        VALUES        (ecd.emp_id, ecd.first_name, ecd.last_name, ecd.dept_id, ecd.hire_date, ecd.start_time, ecd.end_time, ecd.current_flag);

This script creates a task named ‘update_employee_history’ that uses the ‘task_warehouse’ for its computation. The task is scheduled to run every minute when there's new data in the ‘employees_table_changes’ stream. The task's job is to execute the MERGE statement which updates the ‘employees_history’ table based on the changes captured in the ‘employees_change_data’ view.

Task Activation

plaintext
ALTER TASK populate_employees_history RESUME;

This will resume the ‘populate_employees_history’ task so that it starts running.

Example 2: Real-Time Tracking Of Product Sales Using Snowflake Streams & Tasks

Here, we will use Snowflake's powerful Streams and Tasks features to track real-time changes in product sales data. 

Setting Up The Environment

plaintext
-- setup the context USE ROLE sysadmin; USE DATABASE demo_db; USE SCHEMA public; USE WAREHOUSE compute_wh;

This sets up the necessary context for the following operations. It sets the active role to ‘sysadmin’, the active database to ‘demo_db’, the active schema to the ‘public’, and the active warehouse to ‘compute_wh’.

Creating The Raw & Final Tables

plaintext
--create a raw table where change data capture will be triggered CREATE OR REPLACE TABLE sales_raw_tbl (  product_id NUMBER,  sale_date DATE,  units_sold NUMBER ); --create the final table where post cdc, data will CREATE OR REPLACE TABLE sales_final_tbl (  product_id NUMBER,  sale_date DATE,  units_sold NUMBER );

We’ll use this script to create 2 tables: ‘sales_raw_tbl’ and ‘sales_final_tbl’. The ‘sales_raw_tbl’ will capture the real-time sales data while ‘sales_final_tbl’ will hold the processed data.

Creating The Stream

plaintext
--define the stream on the top of sales_raw_tbl CREATE OR REPLACE STREAM   sales_stream ON TABLE sales_raw_tbl APPEND_ONLY=TRUE;

With this script, we have created a stream called ‘sales_stream’, on ‘sales_raw_tbl’,. This stream will track any changes made to the ‘sales_raw_tbl’.

Loading Initial Data Into The Final Table

plaintext
-- 1st time data load from sales_raw_tbl to sales_final_tbl INSERT INTO sales_final_tbl SELECT * FROM sales_raw_tbl;

This script loads the initial data from ‘sales_raw_tbl’ into ‘sales_final_tbl’. This could represent an initial bulk load of historical sales data.

Creating The Task

plaintext
--create a task to load these new CDC data to final table CREATE OR REPLACE TASK sales_task    WAREHOUSE = compute_wh    SCHEDULE  = '5 minute'  WHEN    SYSTEM$STREAM_HAS_DATA('sales_stream')  AS    INSERT INTO sales_final_tbl SELECT * FROM sales_stream;

We’ll create a task named ‘sales_task’ that uses the ‘compute_wh’ for its computation. The task is scheduled to run every 5 minutes when there's new data in the ‘sales_stream’. The task's job is to execute the INSERT statement which updates the ‘sales_final_tbl’ based on the changes captured in the ‘sales_stream’.

Activating The Task

plaintext
--activate the task to start consuming it USE ROLE accountadmin; ALTER TASK sales_task RESUME;

This activates the ‘sales_task’. Once activated, the task will start running according to its schedule, ensuring that ‘sales_final_tbl’ is always up-to-date with the latest sales data.

Supercharge Your Data Operations With Estuary Flow and Snowflake Integration

Snowflake Triggers - Estuary Flow

While we explored the practical side of Snowflake triggers, we also suggest checking out Estuary Flow — an advanced data pipeline tool that optimizes data operations in synergy with Snowflake's capabilities. 

This means that, in addition to having Snowflake triggers to automate database actions, using Estuary Flow to set up your data pipeline can simplify your data management tasks while supercharging your ability to handle real-time data processing and integration. By employing real-time change data capture (CDC), automated schema management, and data deduplication, Flow ensures streamlined and effective data integration

Estuary Flow brings a new level of automation and resilience to your Snowflake data operations so you can capture and react to data changes in real time. It provides a robust, scalable, and fault-tolerant architecture for managing your data pipelines

Whether it's dealing with evolving data models, ensuring data accuracy, or handling large volumes of data, Flow manages it all with finesse. Its capabilities align perfectly with Snowflake's Streams and Tasks, making it a great tool to enhance your real-time data processing needs.

Real-Time CDC & Data Deduplication

Flow's real-time CDC can capture changes from your data sources instantly so that your Snowflake Streams are always fed with the most recent data. Its automated data deduplication ensures that your Tasks always operate on unique records, reducing redundancy and improving efficiency.

Automated Schema Management & Built-In Testing

With automated schema management, Flow adapts to evolving data models to ensure that your Snowflake Tasks always work with up-to-date schema. The built-in testing capabilities provide confidence in the data accuracy to make your Snowflake operations more reliable.

Fault-Tolerant Architecture & Scalability

Flow's fault-tolerant architecture keeps your Snowflake Streams and Tasks running even in the face of failures. Its scalability with your data volume guarantees that your Snowflake operations can handle growing data loads without a hitch.

Conclusion

Today, staying competitive means having the ability to act instantaneously and that's exactly what Snowflake triggers do. They give you the power to spot emerging trends, foresee potential disasters, and make fast decisions with lightning speed.

The value of Snowflake triggers is not restricted to any specific domain. From eCommerce giants optimizing product recommendations on the fly to healthcare systems responding to urgent patient needs, the impact stretches across industries.

Now the question arises: how can you amplify the benefits of these capabilities in your own operations? When you combine the power of Snowflake triggers with Estuary Flow, you’ll get the best of both worlds. It adds an extra layer of brilliance to your data management, elevating your data operations to a whole new level. 

Interested in learning more? Sign up for free and experience the difference firsthand. For any inquiries or additional information, don't hesitate to contact us.


Explore these related blogs to further enhance your understanding of real-time data processing and Snowflake's capabilities.

Start streaming your data for free

Build a Pipeline
Share this article

Table of Contents

Build a Pipeline

Start streaming your data for free

Build a Pipeline

Popular Articles

Streaming Pipelines.
Simple to Deploy.
Simply Priced.
$0.50/GB of data moved + $.14/connector/hour;
50% less than competing ETL/ELT solutions;
<100ms latency on streaming sinks/sources.