A common gotcha when working with airflow is to determine when a DAG is going to execute, and what the execution date-time is going to be when it executes. Especially if a DAG relies on processing data for a given execution date-time only to ensure idempotency. The best way to think about this is not in dates or timestamps but time-spans. An Airflow DAG run is going to cover a times-span - i.e. it is going to process data that was generated/received/added etc. in a given time-span. The start and end of a time-span can be accessed within operators with:
timespan_start = context["execution_date"]
timespan_end = context["dag"].following_schedule(timespan_start)
Looking at this it becomes clear that context["execution_date"]
is poorly named. Rather than the date and time when the DAG runs, it describes the
start of the times-pan. Indeed, the DAG executes at
context["execution_date"]
of the following DAG instance.
Here is an example where this counter-intuitive behavior can cause trouble: Say one expects
a set of files to be made available by 1am every morning. A reasonable approach will be to
schedule a DAG for 2am every morning. To only process the newest files one could decide
to filter by context["execution_date"]
.
Another reason to do this could be to to ensure idempotency - which means that re-running
a DAG instance at a later date will generate the exact same output.
The problem with this approach is that it introduces an unintended 24 hour delay. New files arrive by 1am, the DAG starts to run at 2am but the DAGs execution-date is set to 1am of the previous day - so it will process data from the day before - 24 hour old data.
The solution is to not filter data using the incorrectly named
context["execution_date"]
but the actual execution date-time which is
context["dag"].following_schedule(context["execution_date"])
.
Thinking about time-spans instead of execution date/times is the way to go in Airflow in my opinion. Airflow is a batch system that often will process data that was added during a given time-span. This is different from an event or streaming system. To simplify such operation I usually added a GCSTimeSpanFileTransformOperator which allows fetching data generated within the DAG time-span from Google Cloud Storage, downloading the files, transforming them and re-uploading them to GCS.