Snowflake - Load data - Native Load Control - 1.8
Before loading the data from your Source(s) System(s) to your Snowflake Target System, please:
You are now ready to load the data.
There are three possibilities to load the data natively with Snowflake:
- Sequentially based on Snowflake Tasks (with or without a schedule)
- With a near real-time approach with Snowflake Dynamic Tables and Stream
- With a stage-driven approach based on 1 Snowflake Task per Stage Model Object
The near real-time approach uses Snowflake Dynamic Tables with Stream, a Snowflake public preview.
Don't use this approach for a data solution in production.
With both approaches, you can load linked Project data in an overall load.
You can also load the data with an external tool as:
Load the data sequentially
With a schedule
To load the data:
- Open Snowflake
- Open the Worksheet XXXXX_tasks_helper.sql. It contains 5 parts:
- Enable all tasks: Load the data with a schedule
- Which is the DAG that is currently scheduled or is executing?: Monitor the current execution
- Returns the history of a DAG or task: Monitor all the executions
- Show logging events for the loaders: Display the logging events
- After the test execute the following script to ensure that no additional costs occur: Suspend the root Task
- To launch the data load with a schedule every X minutes:
- Select the seventh line of Part 1 and execute it: the root task and all the dependent tasks to load the data are launched:
CALL SYSTEM$TASK_DEPENDENTS_ENABLE('LOAD_XXXXXX_ROOT_TASK');
Be careful: The root task will be executed every 5 minutes by default.
You can change this in the Load Control script creation:
CREATE OR REPLACE TASK LOAD_XXXXXX_ROOT_TASK
WAREHOUSE = TRAINING_WH_461345104558
SCHEDULE = '5 minute'
ALLOW_OVERLAPPING_EXECUTION = FALSE
SUSPEND_TASK_AFTER_NUM_FAILURES = 3
AS
BEGIN
LET start_time VARCHAR := TO_VARCHAR(CONVERT_TIMEZONE('UTC', CURRENT_TIMESTAMP()))::VARCHAR;
CALL system$set_return_value(:start_time);
END;
- You can monitor the current load execution with Part 2. For example, just after the load data launch, you will have only the root task scheduled:
- After the completion of the load, the root task and all the child tasks will be completed. You can check then with the Part 3:
- After the load has been completed, if using a schedule every X minutes, please suspend the root task to avoid additional costs in Snowflake execution. To do this, execute Part 5.
Without a schedule
- Open Snowflake
- Execute the following code:
--Identify the root task name: LOAD_XXXXXX_ROOT_TASK
SHOW TASKS;
--Update LOAD_XXXXXX_ROOT_TASK by the root task name in the following code:
--Update <warehouse> by the warehouse name
--Unset the default schedule
ALTER TASK LOAD_XXXXXX_ROOT_TASK UNSET SCHEDULE;
--Use the AFTER clause to make it a child task temporarily:
CREATE OR REPLACE TASK TEMP_PARENT_TASK WAREHOUSE = '<warehouse>' AS SELECT 1;
ALTER TASK LOAD_XXXXXX_ROOT_TASK ADD AFTER TEMP_PARENT_TASK;
SELECT SYSTEM$TASK_DEPENDENTS_ENABLE('LOAD_XXXXXX_ROOT_TASK');
ALTER TASK LOAD_XXXXXX_ROOT_TASK REMOVE AFTER TEMP_PARENT_TASK;
DROP TASK TEMP_PARENT_TASK;
--Launch the data load
EXECUTE TASK LOAD_XXXXXX_ROOT_TASK;
- You can also monitor the current load execution with Part 2 of the Worksheet XXXXX_tasks_helper.sql. See the previous chapter "With a schedule".
- After the completion of the load, the root task and all the child tasks will be completed. You also can check then with the Part 3 of the Worksheet XXXXX_tasks_helper.sql. See the previous chapter "With a schedule".
Load the data in near real-time with Stream
All your Model Objects should have the Property Use Stream checked to use a Stream approach.
REMINDER: The near real-time approach uses Snowflake Dynamic Tables with Stream, a Snowflake public preview.
Don't use this approach for a data solution in production.
Using a Stream approach to load data for PIT tables is impossible.
To load the data:
- Open Snowflake
-
The generated artifacts don't include helpers to load data with Stream. Please download the Helper_Snowflake_Stream.sql and upload it as Worksheet in Snowflake.
- Open the Worksheet Helper_Snowflake. It contains 5 parts:
- Generate the SQL code to resume the stream tasks
- Load the data
- Monitor the execution
- Generate the SQL code to suspend the stream tasks
- Suspend the root Task
- Select the 3 lines of Part 1 and execute them: the list of SQL statements to resume all the tasks is generated.
- Copy the generated SQL statements in Part 2
- Select the X lines of Part 2 and execute them: the tasks to load the data are launched
Be careful: The tasks will be executed every 5 minutes by default.
You can change this for each task in the Load Control script creation:
CREATE OR REPLACE TASK STG_ST_CREDITCARD_LOADER_TASK
WAREHOUSE = TRAINING_WH_461345104558
SCHEDULE = '5 minute'
WHEN
...........
CREATE OR REPLACE TASK RDV_HUB_CREDITCARD_HUB_LOADER_TASK
WAREHOUSE = TRAINING_WH_461345104558
SCHEDULE = '5 minute'
WHEN
...........
- You can monitor the load execution with Part 3. For example, just after the load data launch, you will have all the tasks scheduled:
- After the completion of the load, all the tasks will be completed:
As all the Stage objects should be filled before continuing the load, the first execution will load the data only for the Stage objects:
- After the completion of the load, in testing mode, please suspend all the tasks to avoid additional costs in Snowflake execution. To do this:
- Select the 3 lines of Part 4 and execute them: the list of SQL statements to suspend all the tasks is generated
- Copy the generated SQL statements in Part 5
-
- Execute Part 5
Load the data with a stage-driven approach
To load the data:
- Open Snowflake
-
The generated artifacts don't include helpers to load data with a stage-driven approach. Please download the Helper_Snowflake_Stage_Driven.sql and upload it as Worksheet in Snowflake.
- Open the Worksheet Helper_Snowflake_Stage_Driven. It contains 5 parts:
- Generate the SQL code to resume 1 or several the stage-driven task
- Load the data
- Monitor the execution
- Generate the SQL code to suspend the tasks
- Suspend the root Task
- Select the 3 lines of Part 1 and execute them: the list of SQL statements to resume all the tasks is generated.
- Copy 1 or several generated SQL statement for stage-driven tasks in Part 2
- Select the X lines of Part 2 and execute them: the tasks to load the data are launched
Be careful: The tasks will be executed every 5 minutes by default.
You can change this for each task in the Load Control script creation:
CREATE OR REPLACE TASK STG_ST_CURRENCY_LOADER_TASK
WAREHOUSE = TRAINING_WH_461345104558
SCHEDULE = '5 minute'
...........
CREATE OR REPLACE TASK STG_ST_CURRENCYRATE_LOADER_TASK
WAREHOUSE = TRAINING_WH_461345104558
SCHEDULE = '5 minute'
..........
- You can monitor the load execution with Part 3. For example, just after the load data launch, you will have all the tasks scheduled (only 1 stage load plan in this example):
- After the completion of the load, all the tasks will be completed and the next one scheduled:
- After the completion of the load, in testing mode, please suspend all the tasks to avoid additional costs in Snowflake execution. To do this:
- Select the 3 lines of Part 4 and execute them: the list of SQL statements to suspend all the tasks is generated
- Copy the generated SQL statements in Part 5
-
- Execute Part 5
Check the data
You can now check that your data were correctly loaded with the following script:
USE DATABASE IDENTIFIER('<Name of your target database>');
SELECT table_schema AS table_schema
, table_name AS schema_table
, table_type AS table_type
, row_count AS rows_count
FROM information_schema.tables
WHERE table_type = 'BASE TABLE'
AND table_schema IN ('SA','RDV','BV','DM')
ORDER BY row_count;
Load several linked Projects data
As you read in the previous chapters, to load data in for a Snowflake Project, you need to execute the corresponding root task.
To load several linked projects, we will create a new root task that executes the root tasks of all Projects.
In the following example, Project B (ex.: Dimensional Project) uses Project A (ex.: Stage File Project) as a Linked Project.
--Identify the 2 root task names: LOAD_XXXXXX_ROOT_TASK
SHOW TASKS;
-- Update LOAD_XXXXXX_ROOT_TASK_1 by the root task name for the Project A
-- Update LOAD_XXXXXX_ROOT_TASK_2 by the root task name for the Project A
-- Update <warehouse> by the warehouse name
--Create the task to load all the projects
CREATE OR REPLACE TASK LOAD_C4_X_SF_OVERALL_ROOT_TASK
WAREHOUSE = <warehouse>
AS
BEGIN
EXECUTE TASK LOAD_XXXXXX_ROOT_TASK_1;
-- Wait 30 seconds to be sure the first root task is ended
-- Adapt to more seconds if necessary
CALL SYSTEM$WAIT(30);
EXECUTE TASK LOAD_XXXXXX_ROOT_TASK_2;
END;
--Execute the overall load
EXECUTE TASK LOAD_C4_X_SF_OVERALL_ROOT_TASK;