Skip to content

Group 1 - Checklist Level 1

dbt Fundamentals

Start here to practice and apply the dbt fundamental skills:

  • Sources
  • Testing primary keys
  • Staging models
  • Documentation
  • Making use of packages:
    • dbt_utils
    • codegen

Work through the steps in order. Expand a hint only after you've had a genuine attempt - the struggle is where the learning happens!


Step 1 - Explore the raw streaming tables

  • Step complete

Before writing any dbt code, understand what you're working with. Query the raw tables and note:

  • How many rows are in each table?
  • What does a typical row look like?
  • Are there any obvious quality issues (nulls, unexpected values, duplicate IDs)?
select * from streaming.watch_events
select * from streaming.subscriptions
select * from streaming.content_catalog
Hint: What to look for

Pay attention to:

  • monthly_fee_cents in subscriptions - is this in the right unit for reporting?
  • status in subscriptions - what values exist? Are they consistent?
  • device_type in watch_events - any inconsistent casing?

You'll use these observations to write accepted_values tests and decide where to apply your macros.


Step 2 - Define sources in YAML

  • Step complete

Create the file models/staging/streaming/_streaming__sources.yml. Define a source named streaming with all three tables.

Rename the tables with more human readable names

Hint: Source YAML structure
version: 2

sources:
  - name: source_name # you choose this
    database: can be dynamically set using "{{ env_var('DBT_DATABASE') }}"
    schema: name_of_schema
    tables:
      - name: table_name # you choose this
        identifier: table_name_as_in_warehouse
      - name: table_name_2 # you choose this
        identifier: table_name_2_as_in_warehouse

Then you'll refer to this using {{ source('source_name', 'table_name') }} calls.


Step 3 - Add source freshness config

  • Step complete

Add a loaded_at_field and freshness block to at least one table (start with watch_events).

Add two new keys under the same level as the table in your source yaml: - freshness - loaded_at_field

Use autocomplete (tab) as this will auto populate what you need to add to configure the source freshness correctly.

Hint: Freshness config
tables:
  - name: table_name
    loaded_at_field: column_name
    freshness:
      warn_after: 
        count: int # Your tolerance, eg 4 days
        period: day/hour/minute/second
      error_after: 
        count: int # Your tolerance, eg 4 days
        period: day/hour/minute/second

Test it with:

dbt source freshness --select source:streaming

dbt compares max(watched_at) against the current timestamp and raises a warning or error if data is older than the threshold.


Step 4 - Add generic tests to sources

  • Step complete

Add the following tests to the primary keys of each table: - not_null - unique

You will need to add a new key called columns: to configure the name of the primary key column. Then you'll need to add the key data_tests:.

Hint: Tests on source columns
tables:
  - name: table_name
    identifier: table_name
    columns:
      - name: column_name
        tests:
          - not_null
          - unique

Run source tests:

dbt test --select source:streaming

Step 5 - Build stg_streaming__content_catalog.sql

  • Step complete

Create models/staging/streaming/stg_streaming__content_catalog.sql.

You can do this by clicking the Generate model button that appears above the name of your table in the source yaml you already created.

When the code is generated, click save - notice where this file is saved? Why is that?

Goals: Make the following changes to clean the data

  • Rename columns to be clear and consistent
  • Cast any data types that are incorrect
  • Clean the string columns - notice any inconsistencies with casing or spaces?
Hint: Try running the following - what do you see?

select 
    distinct genre
from {{ source('streaming', 'content_catalog') }} 
Are these values consistent?

select 
    *
from {{ source('streaming', 'content_catalog') }} 
where genre = 'drama'
How many rows did you expect to be returned? What is the issue?

  • lower(trim(...)) can be used to normalise string values.

Step 6 - Build stg_streaming__subscriptions.sql

  • Step complete

Create models/staging/streaming/stg_streaming__subscriptions.sql.

Goals: Make the following changes to clean the data

  • Convert all cents columns to dollars
  • Normalise the status column and rename to subscription_status
  • Create two new timestamp columns from the existing date and time columns
    • started_at
    • ended_at
Hint: Handling cents
monthly_fee_cents / 100.0 as monthly_fee_dollars

You'll replace this inline calculation with your cents_to_dollars macro in Step 9 after you've written it.


Step 7 - Build stg_streaming__watch_events.sql

  • Step complete

Create models/staging/streaming/stg_streaming__watch_events.sql.

This is the highest-volume table - fact-style, one row per viewing event. The values in event_id are reused when the source data collects the event stream data. This means that this column is not unique.

Goals: Create a surrogate key - Use Snowflake's MD5 function to create a surrogate key - Update your code to use the generate_surrogate_key function from the dbt_utils package

Hint: Using Snowflake's MD5 function

Snowflake's MD5 function will create a hash key from a given value. To get one value you first need to concatencate the columns and then use the MD5 function on that result - see below for an example.

SELECT MD5(
    CONCAT(
        COALESCE(column_1, ''), '|',
        COALESCE(column_2, ''), '|',
        COALESCE(column_3, '')
    )
) AS hash_key
FROM your_table

Hint: Surrogate key with dbt_utils

Check that dbt_utils is mentioned in packages.yml. Now you can use the functions from this package. Here's the syntax for using the generate_surrogate_key function:

{{
    dbt_utils.generate_surrogate_key(['column_1', 'column_2', 'column_3'])
}}   as name_of_column

Step 8 - Create a staging models YAML

  • Step complete

Create models/staging/streaming/_streaming__models.yml.

You will use this to document all three staging models with (at a minimum) the following info and tests:

  • A model-level description
  • not_null + unique tests on the primary key column
  • not_null tests on key foreign keys and timestamp columns
Hint: Use the codegen package to generate the model yaml

dbt-codegen generates model YAML so you don't have to write it by hand.

1. Add the package to packages.yml by adding the following two lines under dbt_utils:

    - package: dbt-labs/codegen
        version: 0.13.1

2. Install it: It should automatically install, however to manually do this you can run the following in the command line.

    dbt deps

3. Open a new (or existing) untitled file in dbt Cloud and paste the following, then click </> Compile:

    {{ codegen.generate_model_yaml(
        model_names=["model_name"]
    ) }}

Copy the compiled output into your _streaming__models.yml and fill in descriptions and any additional tests.


Step 9 BONUS - Run the full test suite

  • Step complete
dbt test --select staging.streaming

Fix any failures. A test failure is information - read the error message, query the failing rows, understand why.

Hint: Investigating a test failure

To see failing rows for a not_null test on content_id:

select *
from {{ ref('stg_streaming__content_catalog') }}
where content_id is null

For an accepted_values failure, see which unexpected values exist:

select distinct genre
from {{ ref('stg_streaming__content_catalog') }}
order by 1

Decide: fix the source assertion (update the accepted list) or fix the data transformation.


Done?

You've added sources, source freshness checks, primary key built-in tests, creating staging models and used some common dbt packages to make your workflow more efficient.

Now head to Level 2 to continue applying new skills!