A reproduction repo for a Scheduling bug in AirFlow 2.2.3

Overview

How to run

docker-compose build
docker-compose up

Setup

Have 3 DAGs:

  1. ~240 Tasks, executes every hour, runs for about 45-50 minutes total
  2. ~600 Tasks, executes every 5 days, runs for days
  3. ~10 Tasks, executes on trigger, runs for 10-50 minutes

DAGs run on the default_pool with max_active_tasks set to 2.

My AirFlow config file has the following:

parallelism = 32
default_pool_task_slot_count = 6
executor = LocalExecutor
default_task_weight_rule = absolute

parallelism was set initially like that. default_pool_task_slot_count was set to 6, because it seemed rational that if all of my 3 dags are executing at the same time, the maximum amount of tasks that can be executed is 3*2=6.

The problem:

Almost every time any one of the DAGs is executed, no tasks from other DAGs will start until all the tasks from the first one finish. That is, if slow DAG_2 with 600 tasks starts, DAG_1 will have to wait for days to start even a single Task.

the_bug

The Logs for the Scheduler look like this:

scheduler_1  | 
scheduler_1  | [2022-02-09 10:02:27,116] {scheduler_job.py:288} INFO - 4 tasks up for execution:
scheduler_1  |  
   
    
scheduler_1  |  
    
     
scheduler_1  |  
     
      
scheduler_1  |  
      
       
scheduler_1  | [2022-02-09 10:02:27,119] {scheduler_job.py:322} INFO - Figuring out tasks to run in Pool(name=default_pool) with 4 open slots and 4 task instances ready to be queued
scheduler_1  | [2022-02-09 10:02:27,119] {scheduler_job.py:349} INFO - DAG Slow_DAG has 2/2 running and queued tasks
scheduler_1  | [2022-02-09 10:02:27,119] {scheduler_job.py:357} INFO - Not executing 
       
         since the number of tasks running or queued from DAG Slow_DAG is >= to the DAG's max_active_tasks limit of 2 scheduler_1 | [2022-02-09 10:02:27,119] {scheduler_job.py:349} INFO - DAG Slow_DAG has 2/2 running and queued tasks scheduler_1 | [2022-02-09 10:02:27,119] {scheduler_job.py:357} INFO - Not executing 
        
          since the number of tasks running or queued from DAG Slow_DAG is >= to the DAG's max_active_tasks limit of 2 scheduler_1 | [2022-02-09 10:02:27,119] {scheduler_job.py:349} INFO - DAG Slow_DAG has 2/2 running and queued tasks scheduler_1 | [2022-02-09 10:02:27,120] {scheduler_job.py:357} INFO - Not executing 
         
           since the number of tasks running or queued from DAG Slow_DAG is >= to the DAG's max_active_tasks limit of 2 scheduler_1 | [2022-02-09 10:02:27,120] {scheduler_job.py:349} INFO - DAG Slow_DAG has 2/2 running and queued tasks scheduler_1 | [2022-02-09 10:02:27,120] {scheduler_job.py:357} INFO - Not executing 
          
            since the number of tasks running or queued from DAG Slow_DAG is >= to the DAG's max_active_tasks limit of 2 scheduler_1 | [2022-02-09 10:02:27,120] {scheduler_job.py:410} INFO - Setting the following tasks to queued state: scheduler_1 | 
          
         
        
       
      
     
    
   

I have fixed this by setting

parallelism = 1000
default_pool_task_slot_count = 999

The reason why this occurs and why the solution works

We need to look at this method of the Scheduler: https://github.com/apache/airflow/blob/2.2.3/airflow/jobs/scheduler_job.py#L229
In it, the overall logic is the following:

  1. Receive max_tis(== parallelism - active running tasks from the config) in the arguments. In my case that would be 30
  2. Calculate how many free slots in all all Pools we have. In my case that would be 4.
  3. Select the minimum and update the max_tis variable: max_tis = min(4, 30) = 4
  4. Query the DB for tasks that are Scheduled in an unpaused DAGs that are running normally and order them by DAG execution date.
  5. Limit the query by max_tis.
  6. Loop over each returned task and check if we can run it. Run, if possible.

Now, since the tasks are ordered by execution date, this will return us tasks for a DAG that started first, which lets say was DAG_2. It has over 600 tasks. The LIMIT operation will return 4 of them. Each off these tasks cannot be run since there are already 2 tasks running and the max_active_tasks of the DAG is 2. Thus, we just wait till one of these tasks finish and start another task from the same DAG.

You might also like...
Driving lessons made simpler. Custom scheduling API built with Python.
Driving lessons made simpler. Custom scheduling API built with Python.

NOTE This is a mirror of a GitLab repository. Dryvo Dryvo is a unique solution for the driving lessons industry. Our aim is to save the teacher’s time

Tracking development of the Class Schedule Siri Shortcut, an iOS program that checks the type of school day and tells you class scheduling.

Class Schedule Shortcut Tracking development of the Class Schedule Siri Shortcut, an iOS program that checks the type of school day and tells you clas

Repo Home WPDrawBot - (Repo, Home, WP) A powerful programmatic 2D drawing application for MacOS X which generates graphics from Python scripts. (graphics, dev, mac)

DrawBot DrawBot is a powerful, free application for macOS that invites you to write Python scripts to generate two-dimensional graphics. The built-in

AIST++ API This repo contains starter code for using the AIST++ dataset.
AIST++ API This repo contains starter code for using the AIST++ dataset.

AIST++ API This repo contains starter code for using the AIST++ dataset. To download the dataset or explore details of this dataset, please go to our

A toy repo illustrating a minimal installable Python package

MyToy: a minimal Python package This repository contains a minimal, toy Python package with a few files as illustration for students of how to lay out

This is the repo for Uncertainty Quantification 360 Toolkit.

UQ360 The Uncertainty Quantification 360 (UQ360) toolkit is an open-source Python package that provides a diverse set of algorithms to quantify uncert

This repo is related to Google Coding Challenge, given to Bright Network Internship Experience 2021.
This repo is related to Google Coding Challenge, given to Bright Network Internship Experience 2021.

BrightNetworkUK-GCC-2021 This repo is related to Google Coding Challenge, given to Bright Network Internship Experience 2021. Language used here is py

Educational Repo. Used whilst learning Flask.

flask_python Educational Repo. Used whilst learning Flask. The below instructions will be required whilst establishing as new project. Install Flask (

Repo created for the purpose of adding any kind of programs and projects

Programs and Project Repository A repository for adding programs and projects of any kind starting from beginners level to expert ones Contributing to

Owner
Ilya Strelnikov
Ilya Strelnikov
This is a practice on Airflow, which is building virtual env, installing Airflow and constructing data pipeline (DAGs)

airflow-test This is a practice on Airflow, which is Builing virtualbox env and setting Airflow on that env Installing Airflow using python virtual en

Jaeyoung 1 Nov 1, 2021
Yet another Airflow plugin using CLI command as RESTful api, supports Airflow v2.X.

中文版文档 Airflow Extended API Plugin Airflow Extended API, which export airflow CLI command as REST-ful API to extend the ability of airflow official API

Eric Cao 106 Nov 9, 2022
A totally unrealistic cell growth/reproduction simulation.

A totally unrealistic cell growth/reproduction simulation.

Andrien Wiandyano 1 Oct 24, 2021
Pipenv-local-deps-repro - Reproduction of a local transitive dependency on pipenv

Reproduction of the pipenv bug with transitive local dependencies. Clone this re

Lucas Duailibe 2 Jan 11, 2022
Airflow Operator for running Soda SQL scans

Airflow Operator for running Soda SQL scans

Todd de Quincey 7 Oct 18, 2022
Project repository of Apache Airflow, deployed on Docker in Amazon EC2 via GitLab.

Airflow on Docker in EC2 + GitLab's CI/CD Personal project for simple data pipeline using Airflow. Airflow will be installed inside Docker container,

Ammar Chalifah 13 Nov 29, 2022
A tutorial presents several practical examples of how to build DAGs in Apache Airflow

Apache Airflow - Python Brasil 2021 Este tutorial apresenta vários exemplos práticos de como construir DAGs no Apache Airflow. Background Apache Airfl

Jusbrasil 14 Jun 3, 2022
An Airflow operator to call the main function from the dbt-core Python package

airflow-dbt-python An Airflow operator to call the main function from the dbt-core Python package Motivation Airflow running in a managed environment

Tomás Farías Santana 93 Jan 8, 2023
Repositório para estudo do airflow

airflow-101 Repositório para estudo do airflow Docker criado baseado no tutorial Exemplo de API da pokeapi Para executar clone o repo execute as confi

Gabriel (Gabu) Bellon 1 Nov 23, 2021
A data engineering project with Kafka, Spark Streaming, dbt, Docker, Airflow, Terraform, GCP and much more!

Streamify A data pipeline with Kafka, Spark Streaming, dbt, Docker, Airflow, Terraform, GCP and much more! Description Objective The project will stre

Ankur Chavda 206 Dec 30, 2022