This guide will teach you how to write and publish a SQL derivation in Estuary Flow, which will join two collections together on a common key.
Setting up your development environment
The data sources used in this tutorial are two tables in a PostgreSQL database. Check out the repository here for all the code needed and instructions on how to seed a demo environment for yourself using only Docker.
You can also just read through the rest of the guide!
Artists table sample:
artist_id | name | genre | country | formed_year | monthly_listeners |
1 | The Melodics | Pop | United States | 2010 | 500000 |
2 | Rhythm Riders | Rock | United Kingdom | 2005 | 750000 |
3 | Jazzy Joints | Jazz | France | 1998 | 250000 |
Albums table sample:
album_id | artist_id | title | release_date | total_tracks | album_type | label | total_plays |
1 | 1 | Harmonic Waves | 2022-05-10 | 12 | Studio | Melody Records | 1000000 |
2 | 1 | Acoustic Dreams | 2020-03-15 | 10 | EP | Melody Records | 750000 |
3 | 2 | Electric Fury | 2021-11-20 | 14 | Studio | Rock On Ltd | 2000000 |
4 | 3 | Smooth Sax | 2019-07-05 | 8 | Live | Jazz Co | 500000 |
As you can see, both tables contain a field called artist_id. This is what we're going to use as the key in our join operation. One artist can have multiple albums, but one album can only belong to one artist. There may also be some artists without any albums.
In order to transform join these tables in Flow, the first step is to start capturing them. For this, you can quickly spin up a PostgreSQL capture.
You can take a look into each via the data preview window on the Collections page to verify that the sample data has already landed in Flow.
Writing the derivation
Set up your folder structure so you can organize the resources required for the derivation. Create a working directory to follow along, and inside, create a flow.yaml file.
Inside your flow.yaml file, add the following contents:
plaintextcollections:
dani-demo/demo-derivations1/artist_total_plays:
schema:
description: A document that represents the click count for each ad platform
type: object
properties:
artist_id:
type: integer
artist_name:
type: string
default: ""
reduce: { strategy: maximize }
total_plays:
type: integer
default: 0
reduce: { strategy: sum }
required:
- artist_id
reduce:
strategy: merge
key:
- /artist_id
derive:
using:
sqlite:
migrations: []
transforms:
- name: fromAlbums
source:
name: dani-demo/demo-music/albums
shuffle:
key:
- /artist_id
lambda: |
select $artist_id, $total_plays;
- name: fromArtists
source:
name: dani-demo/demo-music/artists
shuffle:
key:
- /artist_id
lambda: |
select $artist_id, $name as artist_name;
Let's take a look at this in more detail. Essentially, we define one collection which is a derivation that is the result of two transformations.
In the schema definition, we specify what structure we want the documents of the result collection to take on. The outermost reduce strategy is set to merge, which means that documents with the same key (in this case, artist_id) will be combined.
The derivation details are defined in the derive section of the YAML:
- We're using SQLite as the engine for our transformations.
- There are two transformations: fromAlbums and fromArtists.
- Each transformation shuffles data on the artist_id key.
- The lambda sections contain the SQL queries that will be executed for each transformation.
Understanding the SQL Transformations
Let's break down the SQL queries in the derivation:
- fromAlbums transformation: select $artist_id, "" as artist_name, $total_plays;
This query selects the artist_id and total_plays from each album. The $ syntax is used to reference fields from the source document. - fromArtists transformation: select $artist_id, $name as artist_name, 0 as total_plays;
This query selects the artist_id and name (aliased as artist_name) from each artist and adds a total_plays column initialized to 0.
The Flow runtime will then merge these results based on the artist_id, summing up the total_plays for each artist and selecting the “largest” value in each group for the artist_name, which is essentially a comparison between an empty string and an actual value.
Previewing the Derivation results
To preview the derivation results, you can use flowctl.
plaintextflowctl preview --source flow.yaml --name dani-demo/demo-music/artist_total_play
The results will look something like this:
plaintext{"artist_id":1034,"artist_name":"Elizabeth Johnson"}
{"artist_id":1034,"total_plays":69242063}
{"artist_id":1034,"total_plays":88384450}
{"artist_id":1035,"artist_name":"Brittany Fernandez"}
{"artist_id":1035,"total_plays":31324569}
{"artist_id":1035,"total_plays":11181346}
{"artist_id":1036,"artist_name":"Mary Higgins"}
{"artist_id":1036,"total_plays":41909178}
{"artist_id":1036,"total_plays":39340532}
{"artist_id":1037,"artist_name":"Steven Flowers"}
{"artist_id":1037,"total_plays":86347088}
{"artist_id":1038,"artist_name":"Travis Thompson"}
{"artist_id":1038,"total_plays":41095614}
As you can see, this is the not-yet-reduced version of the derived collection. This means the IDs that should be merged are still in separate records.
If you take a look at the first three lines in the above example, you can see that they belong to the same artist_id, but are the results of the two separate transforms we defined in the derivation.
The first line {"artist_id":1034,"artist_name":"Elizabeth Johnson"} results from the artists table using the fromArtists lambda, while the other two come from the albums table using the fromAlbums lambda.
When Flow materializes this collection into a data warehouse, the expectation is to first, combine records with the same artist_id, then in the merged records, further reduce the total_plays fields using the sum strategy, as in adding the values together.
To help visualize this, the results of step 1 would look something like this internally to Flow:
plaintext{"artist_id":1034,"artist_name":"Elizabeth Johnson","total_plays": [69242063, 88384450]}
And then, the final result would look like this:
plaintext{"artist_id":1034,"artist_name":"Elizabeth Johnson","total_plays": 157626513}
Publishing the Derivation
To publish the derivation, use the following command:
plaintextflowctl catalog publish --source flow.yaml
After it's successfully published, head over to the Flow dashboard to see the new collection.
Creating a Materialization
To see the results of your derivation, you'll need to create a materialization. Here's an example for Snowflake:
- Go to the materialization creation page in the Flow dashboard and search for Snowflake.
- Configure the materialization with your Snowflake connection details.
- In the configuration step, select the derivation you created (in the example: dani-demo/demo-music/artist_total_plays) as the source collection.
- Save and publish your materialization.
This is what the results will look like:
As you can see, each artist only appears once, and the total_plays value is indeed the sum of the individual values from the respective albums.
Wrapping up
In this guide, you learned how to write a SQL derivation in Estuary Flow to join two collections and calculate aggregate data. This approach allows you to process data in real-time, ensuring that your analytics are always up-to-date.
Remember to clean up any resources you created for this tutorial if you no longer need them!
About the author
Dani is a data professional with a rich background in data engineering and real-time data platforms. At Estuary, Daniel focuses on promoting cutting-edge streaming solutions, helping to bridge the gap between technical innovation and developer adoption. With deep expertise in cloud-native and streaming technologies, Dani has successfully supported startups and enterprises in building robust data solutions.