In the first part of this two-part series, we explored the basic concepts of cron expressions and DAG run parameters in the context of Airflow’s scheduling mechanism. In part two we’ll discuss when and how a DAG will be triggered.
3. Important rules and concepts
When will a DAG run?
Assume that there's a DAG like this:
# initialize the DAG
dag = DAG(
dag_id='dag_interval_demo',
start_date=datetime(2022, 5, 29, 14, 30, 0),
end_date=datetime(2022, 12, 31, 23, 59, 59),
schedule_interval='10 * * * *',
default_args=WORKFLOW_DEFAULT_ARGS,
catchup=False
)
Here, the first DAG run will be triggered after the start date + the scheduled interval. So, it was at 2022-05-29 16:10:00, rather than 2022-05-29 15:10:00 that this DAG was initially triggered. Don't confuse the start_date with the date that the first DAG run starts.
The execution date
Now that the DAG is triggered, there’s another important — yet confusing — concept worth explaining: the "execution date". This IS NOT the date at which the DAG gets triggered (as you might expect it to be) but is instead date of the beginning of the data interval that you want to process. In this example it’s 2022-05-29 15:10:00.
As the screenshot below shows, the value of the item Run is the execution date. Also, the item run_id contains the execution date. And the start time is one hour later than the execution date.
execution_date
The reason we set the schedule_interval to the 10th minute of every hour is that we want to deal with the data within this time range (yyyy-mm-dd hh:10:00 ~ yyyy-mm-dd hh+1:10:00). Every execution date indicates the beginning of a certain range. It’s only after the end of a time range that we can start processing the data that has been inside it. (The data gathered during the range is called a data interval.)
Let me explain it another way. We run the DAG to process the data that we have already obtained during a certain time range — this is called the schedule interval. We are not able to run the DAG until the range ends. The start point of the range is called the execution date.
Now we can explain why the first DAG run could only start after the start date + the scheduled interval: the data isn’t gathered completely until this time point.
(Note: Airflow 2.2 replaced the term 'execution date' with 'logical date'.)
Time zone
Dealing with different time zones around the world always requires a lot of care, especially with time changes such as Daylight Saving Time (DST).
By default, Airflow stores datetime information in UTC (coordinated universal time, a global standard by which time zones are measured). This is stored internally and in the database, which means the date parameters mentioned above are all described in UTC. In the UI, the date time is always shown in UTC.
Let's assume a DAG as below:
# initialize the DAG
# Assume we're in eastern Australia (local time zone is AET: UTC+10/+11)
# 2022-01-01 00:00:00 ~ 2022-04-03 02:59:59 AEDT (UTC+11)
# 2022-04-03 02:00:00 ~ 2022-10-02 01:59:59 AEST (UTC+10)
# 2022-10-02 03:00:00 ~ 2023-01-01 00:00:00 AEDT (UTC+11)
dag = DAG(
dag_id='time_zone_demo',
start_date=datetime(2022, 1, 1, 0, 0, 0),
end_date=datetime(2022, 12, 31, 23, 59, 59),
schedule_interval='@daily',
default_args=WORKFLOW_DEFAULT_ARGS
)
According to the configurations, the local triggering time of the DAG is varied on special days. Yesterday, it got triggered at 11 o'clock, but today it gets triggered at 10 o'clock:
triggering time in UTC |
triggering time in local time zone |
---|---|
2022-04-02 00:00:00 |
2022-04-02 11:00:00 (AEDT) |
2022-04-03 00:00:00 |
2022-04-03 10:00:00 (AEST) |
2022-10-01 00:00:00 |
2022-10-10 10:00:00 (AEST) |
2022-10-02 00:00:00 |
2022-10-02 11:00:00 (AEDT) |
So here comes the problem: what if we want the DAG to be triggered at 10:00:00 in local time everyday, regardless of DST?
Naive and aware datetime objects
With Python, we have several choices of how to represent date and time. They can be roughly divided into two categories: naive datetimes and aware datetimes.
Naive datetime objects don’t store the time zone info. In the example below, the results of function now() and utcnow() are different; if a result is given, it’s impossible to tell which time zone it belongs to.
import datetime
datetime.datetime.now() # (2022, 5, 29, 8, 49, 22, 22920)
datetime.datetime.utcnow() # (2022, 5, 29, 0, 49, 22, 22920)
Aware datetime objects keep the time zone info, so it’s easy to know which time zone it represents.
from airflow.utils import timezone
timezone.utcnow() # (2023, 4, 4, 3, 1, 1, 916835, tzinfo=Timezone('UTC')
This shows that we should use aware datetime objects to specify the local time zone; Airflow will then trigger the DAG based on it.
Best practice
You should NEVER use naive datetime objects. Naive datetime objects should NOT be considered UTC by default. Even if you’re working in UTC you MUST specify the UTC timezone in your datetime objects.
With aware-datetime objects, we can now create timezone-aware DAGs. Such DAGs will be triggered based on the time zone that you set to it.
import pendulum
dag = DAG(
dag_id='tz_dag',
start_date=pendulum.datetime(2016, 1, 1, tz='Australia/Melbourne')
)
op = EmptyOperator(task_id='empty', dag=dag)
print(dag.timezone)
Backfill and catchup
Backfilling means running an Airflow DAG for a specified date in the past. It usually happens when we want to re-run the DAG in a specific time range or process the data that became available before the first time we triggered the DAG. To do this, we can use the Airflow command line interface (CLI) or the DAG parameter 'catchup' (as mentioned above).
CLI
The backfill command will run the DAG 'test_dag' to fill back the data interval from the date specified by '-s' to the date specified by '-e'. In this case, 3 DAG runs will be triggered, and the execution dates are 2022-01-01, 2022-01-02, and 2022-01-03.
If the option '--reset-dagruns' is added, it will be executed regardless of whether they have been processed.
airflow backfill -s 2022-01-01 -e 2022-01-04 --reset-dagruns test_dag
Parameter 'catchup'
If we define a DAG as below and trigger it today, it will process all the available data intervals from 2022-01-01 to today.
dag = DAG(
dag_id='test_dag',
start_date=pendulum.datetime(2022, 1, 1, tz='UTC')
catchup=True,
# ... other parameters,
)
This feature could be useful when you have to deal with historical data, but it doesn’t have to be set in a DAG. It can also be achieved in the processing logic, which is more flexible.
Configurations about time
It's hard to understand what a configuration item really does by merely reading the definition. The following pages have more detail:
Summary
Airflow's time scheduling mechanism is easy to set but hard to understand. I hope this article gives you some ideas about how to better use Airflow. Here are the most important rules of this article:
A DAG run executes the data obtained for the latest data interval.
Use an aware-datetime object instead of a naive one.
Disclaimer: The statements and opinions expressed in this article are those of the author(s) and do not necessarily reflect the positions of Thoughtworks.