Data Engineering Project for Beginners | Airflow, API, GCP, BigQuery, Coder

Data Engineering Project  for Beginners | Airflow, API, GCP, BigQuery, Coder

🎥 Watch Youtube tutorial

💻 Use Github repo

Real case project to give you a hands-on experience in creating your own Airflow pipeline and grasping what Idempotency, Partitioning, and Backfilling are.

🧭 Plan:

Data Pipeline using Airflow for Beginners

Pull OpenWeather API data → Data in data lake as Parquet files on GCP platform → Staging to Production tables in Data Warehouse (BigQuery)

🏆 Run the pipeline with Airflow using Coder - an open-source cloud development environment you download and host in any cloud. It deploys in seconds and provisions the infrastructure, IDE, language, and tools you want. Used as the best practice in Palantir, Dropbox, Discord, and many more.

Absolutely FREE, a few clicks to launch, and super user-friendly.

Let’s set up the things:

PART 1

First, make sure your Docker is running. https://docs.docker.com/desktop/install/mac-install/

Then open your terminal and run the command to install Coder

curl -L https://coder.com/install.sh | sh

next start coder with the command

coder server

Open browser and navigate to http://localhost:3000 → Create your user

💣 Boom, the platform is up and running!

Now Click Templates → Starter Templates → pick Docker containers

After it's provisioned let’s edit it a little: Dockerfile → Edit files → Add these lines:

    python3 \
    python3-pip \

main.tf → Edit files → Add these after terraform block:
(or copy from https://registry.coder.com/modules/apache-airflow)

module "airflow" {
  source   = "registry.coder.com/modules/apache-airflow/coder"
  version  = "1.0.13"
  agent_id = coder_agent.main.id
}

Click build and Publish

Now let’s create a workspace from the template:
click Workspaces → Create → Choose your newly built template → Click Airflow button → Create user → Tada 🎉

Now your Airflow instance ready & steady 🏎️

Coder platform Airflow

PART 2

Set up Connection to Google Cloud Platform - we’ll need a GCP Service Account (like credentials to access google platform programmatically):

  1. Create GCP account (it has free credits for the newbies, so don’t worry about the cost https://cloud.google.com/free/docs/free-cloud-features);
  2. Console Access: Go to the GCP Console, navigate to the IAM & Admin section, and select Service Accounts.
  3. Create Service Account: Click on "Create Service Account", provide a name, description, and click "Create".
  4. Grant Access: Assign the appropriate role Editor (just for the simplification)
  5. Create Key: Click on "Create Key", select JSON, and then "Create". This downloads a JSON key file. Keep this file secure, as it provides API access to your GCP resources.
  6. In Airflow Connections tab find “google_cloud_default” → under Keyfile JSON → insert WHOLE json file contents → Save

Set up variables

  1. In GCP create new project → Get the ID
  2. create account in OpenWeather API https://openweathermap.org/ → get API key
  3. In Airflow Variable tab create variables
weather-api-key = ‘API_KEY’
bq_data_warehouse_project = ‘your project ID’
gcs-bucket = ‘weather-tutorial’

PART 3

Create folder /dags and our first dag called data_ingestion.py

I like to start with writing the generic outline of the dag first, like:

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': days_ago(1),
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5)
}

dag = DAG(
    'weather_data_ingestion',
    default_args=default_args,
    description='Fetch weather data and store in BigQuery',
    schedule_interval='@daily',
)

Next let’s outline the steps you need your dag to perform:

fetch_weather_data_task >> gcs_to_bq_staging_task >> create_table_with_schema >> stg_to_prod_task

Next let’s define global variables we would like to use here, pull safely

# specifying global variable with CAPITAL letter as one of the best practices
API_KEY = Variable.get("weather-api-key")
GCS_BUCKET = Variable.get("gcs-bucket")
PROJECT_ID = Variable.get("bq_data_warehouse_project")

BQ_DATASET = "weather"
BQ_STAGING_DATASET = f"stg_{BQ_DATASET}"
TABLE_NAME = 'daily_data'
SQL_PATH = f"{os.path.abspath(os.path.dirname(__file__))}/sql/"
LAT = 40.7128  # Example: New York City latitude
LON = -74.0060  # Example: New York City longitude

okay let’s start with the first task fetch_weather_data_task

#it's Python operator, as we are going to create a function with pulling the fetch_weather_data_task = PythonOperator(
    task_id='fetch_weather_data',
    python_callable=fetch_weather_data,
    dag=dag,
)

let’s define our function fetch_weather_data

We are going to save it into Parquet file format (you can do it in csv tho, just make things easier), as it’s one of the best practices:

Parquet stores data in a columnar format, each column is stored together. It’s better for compression, allows query engines to skip reading unnecessary data while processing queries, and optimized for Analytics Workloads

def fetch_weather_data(**context):
    unix_timestamp, date = date_to_unix_timestamp()
    url = f"https://api.openweathermap.org/data/3.0/onecall/timemachine?lat={LAT}&lon={LON}&dt={unix_timestamp}&appid={API_KEY}"

    # Make the request
    response = requests.get(url)
    data = response.json()["data"]
    df = pd.DataFrame(data)

    # Create an extra column, datetime non-unix timestamp format
    df['datetime'] = date

    # Save DataFrame to Parquet
    filename = f"weather_data_{date}.parquet"
    """
    Push the filename into Xcom - XCom (short for cross-communication) is a 
    mechanism that allows tasks to exchange messages or small amounts of data.
    Variable have a function scope, but we need to use it in the next task
    """
    context['ti'].xcom_push(key='filename', value=filename)

    # Upload the file
    gcs_hook = GCSHook() # it's using default GCP conection 'google_cloud_default'
    gcs_hook.upload(bucket_name=GCS_BUCKET, object_name=filename, data=df.to_parquet(index=False))

we also need function date_to_unix_timestamp() as API requires that, we can separate into a distinct function:

def date_to_unix_timestamp():

    # Get the current date
    date = datetime.now().date()
    
    # Convert to a datetime object with time set to midnight
    date_converted = datetime.combine(date, datetime.min.time())
    
    # Convert to Unix timestamp (UTC time zone)
    unix_timestamp = int(date_converted.replace(tzinfo=timezone.utc).timestamp())
    
    return unix_timestamp, date

Now let’s assume we pulled the data into Google Cloud Storage, let’s go to the next task :gcs_to_bq_staging_task

Where we push our data from data lake into data warehouse. we are going to do it in 2 steps:

first, load it to the staging area and then we’ll write a sql script, which upserts data into the production data warehouse table.

By upserting I mean the practice of inserting the rows that are not present in the target table and updating with new values that already exist.

this time we don’t need PythonOperator, as we can use pre-built operators from apache-airflow-providers-google package, it’s easier and more convenient:

gcs_to_bq_staging_task = GCSToBigQueryOperator(
    task_id="gcs_to_bigquery",
    bucket=GCS_BUCKET,
    source_objects=["{{ti.xcom_pull(key='filename')}}"], # pull filename from Xcom from the previous task
    destination_project_dataset_table=f'{PROJECT_ID}.{BQ_STAGING_DATASET}.stg_{TABLE_NAME}',
    create_disposition='CREATE_IF_NEEDED', # automatically creates table for us
    write_disposition='WRITE_TRUNCATE', # automatically drops previously stored data in the table
    time_partitioning={'type': 'DAY', 'field': 'datetime'}, # remember partitioning in the beginning? here it comes!
    gcp_conn_id="google_cloud_default",
    source_format='PARQUET',
    dag=dag,
)

Next we are going to create a target table, with the logic create if not exists and explicitly stating the schema:

create_table_with_schema = BigQueryCreateEmptyTableOperator(
    task_id='create_table_with_schema',
    project_id=PROJECT_ID,
    dataset_id=BQ_DATASET,
    table_id=TABLE_NAME,
    time_partitioning={'type': 'DAY', 'field': 'datetime'},
    schema_fields=[
        {"name": "dt", "type": "INTEGER", "mode": "NULLABLE"},
        {"name": "sunrise", "type": "INTEGER", "mode": "NULLABLE"},
        {"name": "sunset", "type": "INTEGER", "mode": "NULLABLE"},
        {"name": "temp", "type": "FLOAT", "mode": "NULLABLE"},
        {"name": "feels_like", "type": "FLOAT", "mode": "NULLABLE"},
        {"name": "pressure", "type": "INTEGER", "mode": "NULLABLE"},
        {"name": "humidity", "type": "INTEGER", "mode": "NULLABLE"},
        {"name": "dew_point", "type": "FLOAT", "mode": "NULLABLE"},
        {"name": "clouds", "type": "INTEGER", "mode": "NULLABLE"},
        {"name": "visibility", "type": "INTEGER", "mode": "NULLABLE"},
        {"name": "wind_speed", "type": "FLOAT", "mode": "NULLABLE"},
        {"name": "wind_deg", "type": "INTEGER", "mode": "NULLABLE"},
        {"name": "weather", "type": "RECORD", "mode": "NULLABLE", "fields": [
            {"name": "list", "type": "RECORD", "mode": "REPEATED", "fields": [
                {"name": "element", "type": "RECORD", "mode": "NULLABLE", "fields": [
                    {"name": "description", "type": "STRING", "mode": "NULLABLE"},
                    {"name": "icon", "type": "STRING", "mode": "NULLABLE"},
                    {"name": "id", "type": "INTEGER", "mode": "NULLABLE"},
                    {"name": "main", "type": "STRING", "mode": "NULLABLE"}
                ]}
            ]}
        ]},
        {"name": "datetime", "type": "DATE", "mode": "NULLABLE"}
    ],
    dag=dag,
)

and lastly, we are creating stg_to_prod_task which pulls data from staging and upserts it with BigQueryInsertJobOperator:

stg_to_prod_task = BigQueryInsertJobOperator(
    task_id=f"upsert_staging_to_prod_task",
    project_id=PROJECT_ID,
    configuration={
        "query": {
                    "query": open(f"{SQL_PATH}upsert_table.sql", 'r').read()
                    .replace('{project_id}', PROJECT_ID)
                    .replace('{bq_dataset}', BQ_DATASET)
                    .replace('{table_name}', TABLE_NAME),
                    # .replace('{partition_date}', date.today().isoformat()),
                    "useLegacySql": False
                },
                "createDisposition": "CREATE_IF_NEEDED",
                 "destinationTable": {
                        "project_id": PROJECT_ID,
                        "dataset_id": BQ_DATASET,
                        "table_id": TABLE_NAME
                    }
    },
    dag=dag
)

Let’s run our dag now! Should be all good and let’s double check that all the resources are in place - checking our data lake, data warehouse

In order for us to make this pipeline with the option of backfilling - mean populating for the previous periods, let’s just add these 2 tweaks:

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': days_ago(1),
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
    'backfill_date': datetime.strptime('2024-03-02', '%Y-%m-%d').date()
}
def date_to_unix_timestamp(date):

    if date is None:
    # Get the current date
        date = datetime.now().date()

    # Convert to a datetime object with time set to midnight
    date_converted = datetime.combine(date, datetime.min.time())
    
    # Convert to Unix timestamp (UTC time zone)
    unix_timestamp = int(date_converted.replace(tzinfo=timezone.utc).timestamp())
    
    return unix_timestamp, date

jfyi, idempotence is a funky word that often hooks people. But it means if we run the pipeline repeatedly it will produce the same result.

To stop your project you can just click ‘Stop’ in Coder UI and to clean up Docker containers and images afterward.

In case you’ve been shutting off your Docker, just relaunch it, you can run

 coder login <https://[YOUR_URL].try.coder.app/>

Here you have it, dears. Simple, yet helpful pipeline at your fingers and a whole easy-to-launch platform to play around with Airflow dags. Please tell me which topics you want me to cover next, and leave your comments below. Until then, stay curious!

⚡️ My Data Engineering Roadmap