Airflow Sensors: Quick Introduction

Marc Lamberti
4 min readApr 6, 2021

Airflow sensors are extremely popular in Apache Airflow. Why? Because they wait for a criteria to be met before getting completed. This opens you a ton of possibilities to make more complex and powerful data pipelines. In this article, you are going to have a quick introduction to the Sensors. Ready? Let’s go!

Use Case

To grasp why a feature is important, it’s always a good idea to start from a use case. Here is a common use case for Sensors:

DAG example with Airflow Sensors

You’ve made the DAG above in charge for processing data coming from your different partners A, B and C. Each partner has its own folder where their files should land between 9 AM and 10 AM. As you process all files at once in the task “Process”, you have to wait for the files to be present in the folders. This is it! Each time you need to wait for something, you should hear a voice in your head saying “pssst… use a sensor… USE A SENSOR”.

People who don’t know the Airflow Sensors, tend to use the PythonOperator. Indeed, you could create a Python function, check the folders, if the files exist, if not, wait for them etc. However, by doing this you don’t get the benefits of the Sensors and it will be a nightmare to maintain.

Bottom line, you need to wait for a criteria to be met, use Sensors!

What is a Sensor?

A Sensor is a special kind of Operators evaluating at a defined time interval if a criteria is met or not. If yes, it succeeds, it not, it continues to check the criteria until it times out.

What does it mean?

Let’s say your goal is to wait for a specific file to exist in a folder. With a Sensor, every 30 seconds, it will check if the file exists. If not it will continue to check, if yes, it will succeed and the next task will get executed. It is as simple as that.

There are many different Sensors, here are a non exhaustive list:

  • The FileSensor: Waits for a file or a folder to exist in a filesystem
  • The PythonSensor: Waits for a Python callable to return True (Yes, you could use that one instead of the PythonOperator 😉)
  • The SQLSensor: Runs a SQL statement repeatedly until a criteria is met
  • The TimeDeltaSensor: Waits for a timedelta after the task’s execution_date + schedule_interval

And more!

There is a ton of Sensors, go check out this link if you want discover the others.

BaseSensorOperator

Regardless of the Sensor used, they all inherit from the BaseSensorOperator Class. Which means, they share the same attributes and methods that the BaseSensorOperator implements. If you truly want to master the Sensors, you have to know what this Class allows you.

When you add a Sensor to your DAG, the very first step is to define when your Sensor will check its criteria for being met. By default, every 60 seconds. You can change this interval with the poke_interval argument.

waiting_for_file = FileSensor(         
task_id='waiting_for_file',
poke_interval=30
)

The code above is an example of the FileSensor to check every 30 seconds if a file exists. One question I see pretty often is, what is the best time interval? What is the best practice for the poke_interval? Well, there is no right answer. The shorter the poke_interval, the faster the check and so the task completion. But be careful, if it is too short, there is no guarantee that Airflow will be able to evaluate each interval in time.

You have to pay attention to what your condition does. For example, checking if a file exists on a filesystem will be faster than checking if an entry exists in a database (network latency). The more complex your criteria, the longer the time interval should be.

That being here is another question. What happens if the your condition is never met? Before answering it, you have to know what’s going on when a Sensor runs.

Pools and Concurrency

Whenever you add a Sensor to your DAG, you should always define a timeout. Why?

In Airflow, each time an operator (a task) runs, a slot from the default pool is occupied until it completes. A pool is used to limit the execution parallelism on arbitrary set of tasks. You can assign tasks to different pools. By default, all operators share the same pool called “default_pool” which has 128 slots.

Airflow default pool

This means, by default you can run up to 128 tasks at the same time in your Airflow instance. Obviously you can change that number, create other pools etc, but this is another topic.

In addition to pools, there are some configuration settings to limit the maximum number of tasks running for a given DAG. By default, defined to 16 (Look at the dag_concurrency setting). In other words, you can run up 16 tasks in parallel for a given DAG.

Now, a Sensor waits for a condition to be met right? What if that condition is never true? What if you can execute up to 4 tasks in parallel for the same DAG (across multiple DAG Runs) but the Sensor you have as the first task never ends? And what can you do if your Sensor times out?

Well, this is where this quick introduction ends. But, I don’t want to let you without answers to those important questions. I’ve made a complete article about Airflow Sensors right here. You will learn everything you need to fully master the Sensors and put your data pipelines to the next level.

I hope you enjoyed that quick introduction, see you for another article!

Have a great day! 😉

--

--