A reproduction repo for a Scheduling bug in AirFlow 2.2.3

Overview

How to run

docker-compose build
docker-compose up

Setup

Have 3 DAGs:

  1. ~240 Tasks, executes every hour, runs for about 45-50 minutes total
  2. ~600 Tasks, executes every 5 days, runs for days
  3. ~10 Tasks, executes on trigger, runs for 10-50 minutes

DAGs run on the default_pool with max_active_tasks set to 2.

My AirFlow config file has the following:

parallelism = 32
default_pool_task_slot_count = 6
executor = LocalExecutor
default_task_weight_rule = absolute

parallelism was set initially like that. default_pool_task_slot_count was set to 6, because it seemed rational that if all of my 3 dags are executing at the same time, the maximum amount of tasks that can be executed is 3*2=6.

The problem:

Almost every time any one of the DAGs is executed, no tasks from other DAGs will start until all the tasks from the first one finish. That is, if slow DAG_2 with 600 tasks starts, DAG_1 will have to wait for days to start even a single Task.

the_bug

The Logs for the Scheduler look like this:

scheduler_1  | 
scheduler_1  | [2022-02-09 10:02:27,116] {scheduler_job.py:288} INFO - 4 tasks up for execution:
scheduler_1  |  
   
    
scheduler_1  |  
    
     
scheduler_1  |  
     
      
scheduler_1  |  
      
       
scheduler_1  | [2022-02-09 10:02:27,119] {scheduler_job.py:322} INFO - Figuring out tasks to run in Pool(name=default_pool) with 4 open slots and 4 task instances ready to be queued
scheduler_1  | [2022-02-09 10:02:27,119] {scheduler_job.py:349} INFO - DAG Slow_DAG has 2/2 running and queued tasks
scheduler_1  | [2022-02-09 10:02:27,119] {scheduler_job.py:357} INFO - Not executing 
       
         since the number of tasks running or queued from DAG Slow_DAG is >= to the DAG's max_active_tasks limit of 2 scheduler_1 | [2022-02-09 10:02:27,119] {scheduler_job.py:349} INFO - DAG Slow_DAG has 2/2 running and queued tasks scheduler_1 | [2022-02-09 10:02:27,119] {scheduler_job.py:357} INFO - Not executing 
        
          since the number of tasks running or queued from DAG Slow_DAG is >= to the DAG's max_active_tasks limit of 2 scheduler_1 | [2022-02-09 10:02:27,119] {scheduler_job.py:349} INFO - DAG Slow_DAG has 2/2 running and queued tasks scheduler_1 | [2022-02-09 10:02:27,120] {scheduler_job.py:357} INFO - Not executing 
         
           since the number of tasks running or queued from DAG Slow_DAG is >= to the DAG's max_active_tasks limit of 2 scheduler_1 | [2022-02-09 10:02:27,120] {scheduler_job.py:349} INFO - DAG Slow_DAG has 2/2 running and queued tasks scheduler_1 | [2022-02-09 10:02:27,120] {scheduler_job.py:357} INFO - Not executing 
          
            since the number of tasks running or queued from DAG Slow_DAG is >= to the DAG's max_active_tasks limit of 2 scheduler_1 | [2022-02-09 10:02:27,120] {scheduler_job.py:410} INFO - Setting the following tasks to queued state: scheduler_1 | 
          
         
        
       
      
     
    
   

I have fixed this by setting

parallelism = 1000
default_pool_task_slot_count = 999

The reason why this occurs and why the solution works

We need to look at this method of the Scheduler: https://github.com/apache/airflow/blob/2.2.3/airflow/jobs/scheduler_job.py#L229
In it, the overall logic is the following:

  1. Receive max_tis(== parallelism - active running tasks from the config) in the arguments. In my case that would be 30
  2. Calculate how many free slots in all all Pools we have. In my case that would be 4.
  3. Select the minimum and update the max_tis variable: max_tis = min(4, 30) = 4
  4. Query the DB for tasks that are Scheduled in an unpaused DAGs that are running normally and order them by DAG execution date.
  5. Limit the query by max_tis.
  6. Loop over each returned task and check if we can run it. Run, if possible.

Now, since the tasks are ordered by execution date, this will return us tasks for a DAG that started first, which lets say was DAG_2. It has over 600 tasks. The LIMIT operation will return 4 of them. Each off these tasks cannot be run since there are already 2 tasks running and the max_active_tasks of the DAG is 2. Thus, we just wait till one of these tasks finish and start another task from the same DAG.

Owner
Ilya Strelnikov
Ilya Strelnikov
Howell County, Missouri, COVID-19 data and (unofficial) estimates

COVID-19 in Howell County, Missouri This repository contains the daily data files used to generate my COVID-19 dashboard for Howell County, Missouri,

Jonathan Thornton 0 Jun 18, 2022
A simple hash system.

PBH-Hash-System A simple hash system. Usage You could use it like this: from pbh import pbh print(pbh("Hey", True)) Output: 2feae2471698cfcdcbd6b98ca

Karim 3 Mar 24, 2022
A patch and keygen tools for typora.

A patch and keygen tools for typora.

Mason Shi 1.4k Apr 12, 2022
A faster Python generator that get function results from multi-process workers

multiyield This package implements a Python generator that get function results from multi-process workers. The faster_fifo Queue (instead of the stan

Xin Du 1 Nov 18, 2021
A minimal configuration for a dockerized kafka project.

Docker Kafka Quickstart A minimal configuration for a dockerized kafka project. Usage: Run this command to build kafka and zookeeper containers, and c

Nouamane Tazi 5 Jan 12, 2022
Repository, with small useful and functional applications

Repositorio,com pequenos aplicativos uteis e funcionais A ideia e ir deselvolvendo pequenos aplicativos funcionais e adicionar a este repositorio List

GabrielDuke 6 Dec 06, 2021
Make pack up python files easier.

python-easy-pack make pack up python files easier. 目前只提供了中文环境 如何使用? 将index.py复制到你的项目文件夹,或者把.py文件拷贝到这个文件夹。 打开你的cmd或者powershell 切换到程序所在目录,输入python index

2 Dec 15, 2021
MinimalGearDisplay, Assetto Corsa app

MinimalGearDisplay MinimalGearDisplay, Assetto Corsa app. Just displays the current gear you are on. Download and Install To use this app, clone or do

1 Jan 10, 2022
Graphene Metanode is a locally hosted node for one account and several trading pairs, which uses minimal RAM resources.

Graphene Metanode is a locally hosted node for one account and several trading pairs, which uses minimal RAM resources. It provides the necessary user stream data and order book data for trading in a

litepresence 5 May 08, 2022
A program that makes all 47 textures of Optifine CTM only using 2 textures

A program that makes all 47 textures of Optifine CTM only using 2 textures

1 Jan 22, 2022
An extended, game oriented, turtle

Burtle A Better TURTLE. Makes making games easier. write less do more!! Documentation & guide: https://alannxq.github.io/burtle/ Installation pip inst

5 May 19, 2022
The Official interpreter for the Pix programming language.

The official interpreter for the Pix programming language. Pix Pix is a programming language dedicated to readable syntax and usability Q) Is Pix the

Pix 6 Sep 25, 2022
A Python feed reader library.

reader is a Python feed reader library. It aims to allow writing feed reader applications without any business code, and without enforcing a dependenc

266 Dec 30, 2022
Let’s Play with Python3

Python3-FirstEdition a bunch of python programs and stuff Super Important Notice THIS IS LICENSED UNDER GNU PUBLIC LICENSE V3 also, refer to Contribut

Jym Patel 2 Nov 24, 2022
redun aims to be a more expressive and efficient workflow framework

redun yet another redundant workflow engine redun aims to be a more expressive and efficient workflow framework, built on top of the popular Python pr

insitro 372 Jan 04, 2023
objectfactory is a python package to easily implement the factory design pattern for object creation, serialization, and polymorphism

py-object-factory objectfactory is a python package to easily implement the factory design pattern for object creation, serialization, and polymorphis

Devin A. Conley 6 Dec 14, 2022
Roblox Limited Sniper For Python

Info this is version 2.1 version 3 will support more options (install python: https://www.python.org) the program will buy any limited item with a pri

1 Dec 09, 2021
A Python package that provides physical constants.

PhysConsts A Python package that provides physical constants. The code is being developed by Marc van der Sluys of the department of Astrophysics at t

Marc van der Sluys 1 Jan 05, 2022
One line Brainfuck interpreter in Python

One line Brainfuck interpreter in Python

16 Dec 21, 2022
Яндекс тренировки по алгоритмам. Июнь 2021

Young&&Yandex Тренировки по алгоритмам Если вы хотите попасть на летнюю стажировку в Яндекс, но пока не уверены в своих силах, приходите на наши трени

Podlevskiy Viktor 6 Sep 03, 2021