Databricks - Load data - Apache Airflow Load Control - 1.6

Before loading the data from your Source(s) System(s) to your Databricks Target System, please: 

You are now ready to load the data with Apache Airflow.

Create the Databricks Jobs

Airflow will use Databricks Jobs to load the data.

biGENIUS-X artifacts contain configuration files to create these Jobs.

Download the following helper: deploy_jobs.ps1.

Copy it near the LoadControl folder of your generated artifacts:

Then:

  • Open Powershell
  • Navigate to your generated artifacts folder:
--Example
cd C:\XXXXXXXXX\20240219144515
-- Replace <yourhost> by the Databricks host
-- Replace <yourtoken> by your PAT
.\deploy_jobs.ps1 -databricksAccountName <yourhost> -jobsFolderName LoadControl -databricksToken <yourtoken>

You should have the following result for each Job creation:

Load the data with Apache Airflow

To load the data with Apache Airflow:

    • Navigate to the Airflow homepage http://localhost:8080
    • Enter your credentials
    • You should have the following result:
    • Configure a connection to your target solution environment by following the steps:
      • Open the menu Admin > Connections
      • Click on the plus icon
      • Enter the connection details:

      • Check the connection by clicking on the Test button:
      • You should have the following message:
      • Save the connection by clicking on the Save button:
    • Go back to the DAGs list by clicking on the DAGs menu:
    • To launch a data load, click on the Trigger DAG button for the concerned DAG:
    • The data load started:
    • You can follow the data load by clicking on the DAG, then on the current execution (green bar), and finally on the Graph tab:
    • If you want to check the logs of the execution, click on the Audit Log tab:
    • You can now check that your data were correctly loaded by creating a Notebook in Databricks and executing with the following code:
    -- Adapt the database and table names
    -- Add as many union as target tables
    df = spark.sql("""
        select '`rawvault`.`rdv_sat_creditcard_delta_satellite_result`' as table_name
        , count(*) as row_count
        , max(bg_loadtimestamp) as max_bg_loadtimestamp
        from `rawvault`.`rdv_sat_creditcard_delta_satellite_result`
        union
        select '`rawvault`.`rdv_hub_salesorderheader_hub_result`' as table_name
        , count(*) as row_count
        , max(bg_loadtimestamp) as max_bg_loadtimestamp
        from `rawvault`.`rdv_hub_salesorderheader_hub_result`
    """)
    df.show(n=1000, truncate=False)