Before loading the data from your Source(s) System(s) to your Snowflake Target System, please:
You are now ready to load the data.
There are three possibilities to load the data natively with Snowflake:
- Sequentially based on Snowflake Tasks
- With a near real-time approach with Snowflake Dynamic Tables and Stream
- With a stage-driven approach based on 1 Snowflake Task per Stage Model Object
The near real-time approach uses Snowflake Dynamic Tables with Stream, a Snowflake public preview.
Don't use this approach for a data solution in production.
You can also load the data with an external tool as:
Load the data sequentially
To load the data:
- Open Snowflake
- Open the Worksheet XXXXX_tasks_helper.sql. It contains 5 parts:
- Enable all tasks: Load the data
- Which is the DAG that is currently scheduled or is executing?: Monitor the current execution
- Returns the history of a DAG or task: Monitor all the executions
- Show logging events for the loaders: Display the logging events
- After the test execute the following script to ensure that no additional costs occur: Suspend the root Task
- Select the 3 lines of Part 1 and execute them: the root task to load the data is launched
Be careful: The root task will be executed every 5 minutes by default.
You can change this in the Load Control script creation:
CREATE OR REPLACE TASK LOAD_XXXXXX_ROOT_TASK
WAREHOUSE = TRAINING_WH_461345104558
SCHEDULE = '5 minute'
ALLOW_OVERLAPPING_EXECUTION = FALSE
SUSPEND_TASK_AFTER_NUM_FAILURES = 3
AS
BEGIN
LET start_time VARCHAR := TO_VARCHAR(CONVERT_TIMEZONE('UTC', CURRENT_TIMESTAMP()))::VARCHAR;
CALL system$set_return_value(:start_time);
END;
- You can monitor the current load execution with Part 2. For example, just after the load data launch, you will have only the root task scheduled:
- After the completion of the load, the root task and all the child tasks will be completed:
- After the load has completed, in testing mode, please suspend the root task to avoid additional costs in Snowflake execution. To do this, execute Part 5.
Load the data in near real-time with Stream
All your Model Objects should have the Property Use Stream checked to use a Stream approach.
REMINDER: The near real-time approach uses Snowflake Dynamic Tables with Stream, a Snowflake public preview.
Don't use this approach for a data solution in production.
Using a Stream approach to load data for PIT tables is impossible.
To load the data:
- Open Snowflake
-
The generated artifacts don't include helpers to load data with Stream. Please download the Helper_Snowflake_Stream.sql and upload it as Worksheet in Snowflake.
- Open the Worksheet Helper_Snowflake. It contains 5 parts:
- Generate the SQL code to resume the stream tasks
- Load the data
- Monitor the execution
- Generate the SQL code to suspend the stream tasks
- Suspend the root Task
- Select the 3 lines of Part 1 and execute them: the list of SQL statements to resume all the tasks is generated.
- Copy the generated SQL statements in Part 2
- Select the X lines of Part 2 and execute them: the tasks to load the data are launched
Be careful: The tasks will be executed every 5 minutes by default.
You can change this for each task in the Load Control script creation:
CREATE OR REPLACE TASK STG_ST_CREDITCARD_LOADER_TASK
WAREHOUSE = TRAINING_WH_461345104558
SCHEDULE = '5 minute'
WHEN
...........
CREATE OR REPLACE TASK RDV_HUB_CREDITCARD_HUB_LOADER_TASK
WAREHOUSE = TRAINING_WH_461345104558
SCHEDULE = '5 minute'
WHEN
...........
- You can monitor the load execution with Part 3. For example, just after the load data launch, you will have all the tasks scheduled:
- After the completion of the load, all the tasks will be completed:
As all the Stage objects should be filled before continuing the load, the first execution will load the data only for the Stage objects:
- After the completion of the load, in testing mode, please suspend all the tasks to avoid additional costs in Snowflake execution. To do this:
- Select the 3 lines of Part 4 and execute them: the list of SQL statements to suspend all the tasks is generated
- Copy the generated SQL statements in Part 5
-
- Execute Part 5
Load the data with a stage-driven approach
To load the data:
- Open Snowflake
-
The generated artifacts don't include helpers to load data with a stage-driven approach. Please download the Helper_Snowflake_Stage_Driven.sql and upload it as Worksheet in Snowflake.
- Open the Worksheet Helper_Snowflake_Stage_Driven. It contains 5 parts:
- Generate the SQL code to resume 1 or several the stage-driven task
- Load the data
- Monitor the execution
- Generate the SQL code to suspend the tasks
- Suspend the root Task
- Select the 3 lines of Part 1 and execute them: the list of SQL statements to resume all the tasks is generated.
- Copy 1 or several generated SQL statement for stage-driven tasks in Part 2
- Select the X lines of Part 2 and execute them: the tasks to load the data are launched
Be careful: The tasks will be executed every 5 minutes by default.
You can change this for each task in the Load Control script creation:
CREATE OR REPLACE TASK STG_ST_CURRENCY_LOADER_TASK
WAREHOUSE = TRAINING_WH_461345104558
SCHEDULE = '5 minute'
...........
CREATE OR REPLACE TASK STG_ST_CURRENCYRATE_LOADER_TASK
WAREHOUSE = TRAINING_WH_461345104558
SCHEDULE = '5 minute'
..........
- You can monitor the load execution with Part 3. For example, just after the load data launch, you will have all the tasks scheduled (only 1 stage load plan in this example):
- After the completion of the load, all the tasks will be completed and the next one scheduled:
- After the completion of the load, in testing mode, please suspend all the tasks to avoid additional costs in Snowflake execution. To do this:
- Select the 3 lines of Part 4 and execute them: the list of SQL statements to suspend all the tasks is generated
- Copy the generated SQL statements in Part 5
-
- Execute Part 5
Check the data
You can now check that your data were correctly loaded with the following script:
USE DATABASE IDENTIFIER('<Name of your target database>');
SELECT table_schema AS table_schema
, table_name AS schema_table
, table_type AS table_type
, row_count AS rows_count
FROM information_schema.tables
WHERE table_type = 'BASE TABLE'
AND table_schema IN ('SA','RDV','BV','DM')
ORDER BY row_count;