As the amount of data you have in your data warehouse grows, it becomes increasingly difficult to know what it looks like at all times. This is where data observability comes in - it allows you to monitor the state of your data over time and gives you the ability to take action early on data that looks anomalous.
In this codelab, you will build a basic data observability system, all within Snowflake. This system will track the time all tables in your Snowflake instance are loading and update the information over time to allow you to react to stale tables faster.
snowflake databaseIf you already have an existing data pipeline that writes data into Snowflake, or if you are using the demo environment, then you can skip this step.
If you want to set up a few example pipelines, we will do that below.
This code will create a database, two schemas within the database, and then three tables: customer, order, and order rollup. We then create two Snowflake tasks that insert random data on an hourly cadence into the customer and order tables, and a third task that inserts (or updates) data in the order rollup table based on data in the order table.
-- Schema setup
create database if not exists demo;
create schema if not exists demo.raw;
create schema if not exists demo.analytics;
-- Customers data setup
create table if not exists demo.raw.customers
( id integer,
first_name varchar,
last_name varchar,
created_at timestamp
);
create task insert_raw_customers
warehouse = identifier($warehouse_name)
schedule = '60 minute'
as
insert into demo.raw.customers
with
last_names as (select ['smith', 'johnson', 'williams', 'brown', 'jones', 'miller', 'davis', 'garcia', 'rodriguez', 'wilson', 'martinez', 'anderson', 'taylor', 'thomas', 'hernandez', 'moore', 'martin', 'jackson', 'thompson', 'white', 'lopez', 'lee', 'gonzalez', 'harris', 'clark', 'lewis', 'robinson', 'walker', 'perez', 'hall', 'young', 'allen', 'sanchez', 'wright', 'king', 'scott', 'green', 'baker', 'adams', 'nelson', 'hill', 'ramirez', 'campbell', 'mitchell', 'roberts', 'carter', 'phillips', 'evans', 'turner', 'torres', 'parker', 'collins', 'edwards', 'stewart', 'flores', 'morris', 'nguyen', 'murphy', 'rivera', 'cook', 'rogers', 'morgan', 'peterson', 'cooper', 'reed', 'bailey', 'bell', 'gomez', 'kelly', 'howard', 'ward', 'cox', 'diaz', 'richardson', 'wood', 'watson', 'brooks', 'bennett', 'gray', 'james', 'reyes', 'cruz', 'hughes', 'price', 'myers', 'long', 'foster', 'sanders', 'ross', 'morales', 'powell', 'sullivan', 'russell', 'ortiz', 'jenkins', 'gutierrez', 'perry', 'butler', 'barnes', 'fisher', 'henderson', 'coleman', 'simmons', 'patterson', 'jordan', 'reynolds', 'hamilton', 'graham', 'kim', 'gonzales', 'alexander', 'ramos', 'wallace', 'griffin', 'west', 'cole', 'hayes', 'chavez', 'gibson', 'bryant', 'ellis', 'stevens', 'murray', 'ford', 'marshall', 'owens', 'mcdonald', 'harrison', 'ruiz', 'kennedy', 'wells', 'alvarez', 'woods', 'mendoza', 'castillo', 'olson', 'webb', 'washington', 'tucker', 'freeman', 'burns', 'henry', 'vasquez', 'snyder', 'simpson', 'crawford', 'jimenez', 'porter', 'mason', 'shaw', 'gordon', 'wagner', 'hunter', 'romero', 'hicks', 'dixon', 'hunt', 'palmer', 'robertson', 'black', 'holmes', 'stone', 'meyer', 'boyd', 'mills', 'warren', 'fox', 'rose', 'rice', 'moreno', 'schmidt', 'patel', 'ferguson', 'nichols', 'herrera', 'medina', 'ryan', 'fernandez', 'weaver', 'daniels', 'stephens', 'gardner', 'payne', 'kelley', 'dunn', 'pierce', 'arnold', 'tran', 'spencer', 'peters', 'hawkins', 'grant', 'hansen', 'castro', 'hoffman', 'hart', 'elliott', 'cunningham', 'knight', 'bradley'] as arr),
first_names as (select ['Michael','James','John','Robert','David','William','Mary','Christopher','Joseph','Richard','Daniel','Thomas','Matthew','Jennifer','Charles','Anthony','Patricia','Linda','Mark','Elizabeth','Joshua','Steven','Andrew','Kevin','Brian','Barbara','Jessica','Jason','Susan','Timothy','Paul','Kenneth','Lisa','Ryan','Sarah','Karen','Jeffrey','Donald','Ashley','Eric','Jacob','Nicholas','Jonathan','Ronald','Michelle','Kimberly','Nancy','Justin','Sandra','Amanda','Brandon','Stephanie','Emily','Melissa','Gary','Edward','Stephen','Scott','George','Donna','Jose','Rebecca','Deborah','Laura','Cynthia','Carol','Amy','Margaret','Gregory','Sharon','Larry','Angela','Maria','Alexander','Benjamin','Nicole','Kathleen','Patrick','Samantha','Tyler','Samuel','Betty','Brenda','Pamela','Aaron','Kelly','Heather','Rachel','Adam','Christine','Zachary','Debra','Katherine','Dennis','Nathan','Christina','Julie','Jordan','Kyle','Anna'] as arr)
select abs(random()) % 1000000 as id,
initcap(replace(fn.arr[abs(random()) % 100], '"', '')) as first_name,
initcap(replace(ln.arr[abs(random()) % 100], '"', '')) as last_name,
timestampadd(MILLISECOND, -1 * abs(random()) % 10000000, current_timestamp()) as created_at
from table(generator(rowcount => 10)) t
cross join last_names ln
cross join first_names fn
;
alter task insert_raw_customers resume;
-- Orders data setup
create table if not exists demo.raw.orders
( id integer,
user_id integer,
status varchar,
order_amount_usd decimal(10, 2),
order_timestamp timestamp
);
create task insert_raw_orders
warehouse = identifier($warehouse_name)
schedule = '60 minute'
as
insert into demo.raw.orders
with
statuses as (select ['New', 'Processing', 'Shipped', 'Complete', 'Cancelled'] as arr)
select
abs(random()) % 100000000 as id,
abs(random()) % 1000000 as user_id,
replace(s.arr[abs(random()) % 5], '"', '') as status,
uniform(0::float, 100::float, random())::decimal(38,2) as order_amount_usd,
timestampadd(MILLISECOND, -1 * abs(random()) % 10000000, current_timestamp()) as order_timestamp
from table(generator(rowcount => 50)) t
cross join statuses s
;
alter task insert_raw_orders resume;
-- Orders rollup data setup
create table if not exists demo.analytics.agg_orders
( order_date date,
order_status varchar,
count_orders int,
sum_amount decimal(38,2),
avg_amount decimal(10,2)
);
create task insert_analytics_agg_orders
warehouse = identifier($warehouse_name)
schedule = '60 minute'
as
merge into demo.analytics.agg_orders old_agg using (
select
date(order_timestamp) as order_date,
status as order_status,
count(1) as count_orders,
sum(order_amount_usd) as sum_amount,
avg(order_amount_usd) as avg_amount
from demo.raw.orders
where order_timestamp >= timestampadd(day, -3, current_date())
group by order_date, order_status
) new_agg on old_agg.order_date = new_agg.order_date and old_agg.order_status = new_agg.order_status
when matched then update set old_agg.count_orders = new_agg.count_orders, old_agg.sum_amount = new_agg.sum_amount, old_agg.avg_amount = new_agg.avg_amount
when not matched then insert (order_date, order_status, count_orders, sum_amount, avg_amount) values (new_agg.order_date, new_agg.order_status, new_agg.count_orders, new_agg.sum_amount, new_agg.avg_amount)
;
alter task insert_analytics_agg_orders resume;
Every Snowflake account exposes account-level metadata in views within the snowflakeDatabase. This information allows you to understand what's going on in your Snowflake environment and gives you important insights into what queries are running and what objects they are affecting.
The account usage schema is in the Snowflake database. It contains information about objects in the account as well as queries executed and their impact on the warehouse. For the purposes of this lab, we will be looking at two views in this schema, specifically query_history and access_history.
The query_history view shows all the queries in your account, as well as a metrics about those queries, such as bytes read/written, run time, and rows inserted or changed. You can explore the queries that have been started in the last three days by running the following query:
select *
from snowflake.account_usage.query_history
where start_time >= dateadd('day', -3, current_timestamp)
limit 10;
The access_history view shows which objects were accessed or modified with each query. The objects that are affected are all stored in JSON format. You can explore the queries that have been started in the last three days by running the following query:
select *
from snowflake.account_usage.access_history
where query_start_time >= dateadd('day', -3, current_timestamp)
limit 10;
In order to explore the nested data, you can use lateral flatten join to flatten the information in the JSON object, and then key into the objects themselves. The following example will show later flatten, and then an example of pulling a specific key out of the object (in this case, the fully qualified name of the accessed object).
-- There are a few nested fields here. For the purposes of figuring out what table was loaded into, we can look deeper into objects_modified
select ah.query_id, om_flat.*
from snowflake.account_usage.access_history as ah, lateral flatten(objects_modified) as om_flat
where query_start_time >= dateadd('day', -3, current_timestamp)
limit 10;
-- More nested fields! Grab the fully qualified table name from the "value" column of the flattened array
select ah.query_id, om_flat.value:"objectDomain" as object_type, om_flat.value:"objectName"::string as fqtn
from snowflake.account_usage.access_history as ah, lateral flatten(objects_modified) as om_flat
where query_start_time >= dateadd('day', -3, current_timestamp)
limit 10;
In this exercise, we will consider tables updated when new rows are written into the table. Given this, there are three types of query types that can update a table: insert, copy, and merge.
Given what we learned in the previous section, we can put the two concepts together to map write queries to the tables they wrote into. The query below creates two CTEs: write_queries filters down query_history to only write statements and accessed_objects extracts the fully qualified table name (FQTN) for each table written to by the query. The final query joins the two CTEs and returns which when each query completed and what table it wrote into.
with write_queries as (
select *
from snowflake.account_usage.query_history
where start_time >= dateadd('day', -3, current_timestamp)
and query_type IN ('INSERT', 'COPY', 'MERGE')
),
accessed_objects as (
select ah.query_id, om_flat.value:"objectDomain" as object_type, om_flat.value:"objectName"::string as fqtn
from snowflake.account_usage.access_history as ah, lateral flatten(objects_modified) as om_flat
where query_start_time >= dateadd('day', -3, current_timestamp)
and om_flat.value:"objectDomain"::string = 'Table'
)
select wq.query_id, wq.query_type, wq.end_time, ao.fqtn
from accessed_objects as ao
inner join write_queries as wq on ao.query_id = wq.query_id
limit 10
;
For the purposes of data observability, we want to know how long it's been since the last write occurred to each table to know when our tables are not updating on time.
The above query returns a single record for each write into each table, but what we want to know is when the table was last updated. To do this, let's aggregate the results to only get the latest write time.
with write_queries as (
select *
from snowflake.account_usage.query_history
where start_time >= dateadd('day', -3, current_timestamp)
and query_type IN ('INSERT', 'COPY', 'MERGE')
),
accessed_objects as (
select ah.query_id,
om_flat.value:"objectDomain" as object_type,
om_flat.value:"objectName"::string as fqtn
from snowflake.account_usage.access_history as ah, lateral flatten(objects_modified) as om_flat
where query_start_time >= dateadd('day', -3, current_timestamp)
and om_flat.value:"objectDomain"::string = 'Table'
),
writes_by_object as (
select wq.query_id, wq.query_type, wq.end_time, ao.fqtn, wq.rows_inserted
from accessed_objects as ao
inner join write_queries as wq on ao.query_id = wq.query_id
)
select fqtn, max(end_time) as last_write
from writes_by_object
group by fqtn
limit 10;
It's useful to know the last time every table was written into, but the most important part of data observability is to understand the state of your data at any point in time. Re-running the above query manually all the time is un-scaleable, so we want to create a table to store the results and periodically update the table with the latest write time for each table.
The following query creates a table to store result and creates a task to run every 60 minutes. The task will execute a single merge query that will insert new rows for tables that are not already tracked or update the latest write time for the tables that are already tracked.
create table demo.analytics.table_write_times (
fqtn string,
last_write timestamp
);
create task insert_table_write_times
warehouse = identifier($warehouse_name)
schedule = '60 minute'
as
merge into demo.analytics.table_write_times as target using (
with write_queries as (
select *
from snowflake.account_usage.query_history
where start_time >= dateadd('day', -3, current_timestamp)
and query_type IN ('INSERT', 'COPY', 'MERGE')
),
accessed_objects as (
select ah.query_id,
om_flat.value:"objectDomain" as object_type,
om_flat.value:"objectName"::string as fqtn
from snowflake.account_usage.access_history as ah, lateral flatten(objects_modified) as om_flat
where query_start_time >= dateadd('day', -3, current_timestamp)
and om_flat.value:"objectDomain"::string = 'Table'
),
writes_by_object as (
select wq.query_id, wq.query_type, wq.end_time, ao.fqtn, wq.rows_inserted
from accessed_objects as ao
inner join write_queries as wq on ao.query_id = wq.query_id
)
select fqtn, max(end_time) as last_write
from writes_by_object
group by fqtn
) as new_data on target.fqtn = new_data.fqtn
when matched then update set target.last_write = new_data.last_write
when not matched then insert (fqtn, last_write) values (new_data.fqtn, new_data.last_write)
;
Let's say that we expect every table in our whole Snowflake instance to be updating at least every 24 hours. Now that there's a table that we are constantly updating with the latest time every table was written into, we can check to find the tables that have not been updated in our expected update period.
The following query will filter the table we're updating above to find any table that was last updated more than 24 hours ago, as well as return how many hours that table is outside of our expectation. The timestampdiff clause looks for how many hours have elapsed between the last write to each table the the current time, and we look for every table that has had more than 24 hours elapse.
select fqtn, (timestampdiff('hours', last_write, current_timestamp) - 24) as hours_outside_of_expectation
from demo.analytics.table_write_times
where timestampdiff('hours', last_write, current_timestamp) > 24;
This query can now be used in any BI tool in order to visualize the tables that are the most stale and use that as a list of table that you need to address as problematic.
Going through this exercise, you have seen how to use existing functionality in Snowflake, specifically the snowflakeDatabase and Tasks to build a basic observability implementation that tracks the latency of every table in your warehouse. There were many assumptions built into the exercise, and you should adjust them as you see fit for your environment.
If you are interested in scaling and extending your data observability solution, reach out to our team at Bigeye for a demo of our data observability platform.
There are many ways you can extend this exercise to learn more about what information is available in Snowflake and implement various key data observability metrics. Here are a few ideas to dig into: