A reproduction repo for a Scheduling bug in AirFlow 2.2.3

Overview

How to run

docker-compose build
docker-compose up

Setup

Have 3 DAGs:

  1. ~240 Tasks, executes every hour, runs for about 45-50 minutes total
  2. ~600 Tasks, executes every 5 days, runs for days
  3. ~10 Tasks, executes on trigger, runs for 10-50 minutes

DAGs run on the default_pool with max_active_tasks set to 2.

My AirFlow config file has the following:

parallelism = 32
default_pool_task_slot_count = 6
executor = LocalExecutor
default_task_weight_rule = absolute

parallelism was set initially like that. default_pool_task_slot_count was set to 6, because it seemed rational that if all of my 3 dags are executing at the same time, the maximum amount of tasks that can be executed is 3*2=6.

The problem:

Almost every time any one of the DAGs is executed, no tasks from other DAGs will start until all the tasks from the first one finish. That is, if slow DAG_2 with 600 tasks starts, DAG_1 will have to wait for days to start even a single Task.

the_bug

The Logs for the Scheduler look like this:

scheduler_1  | 
scheduler_1  | [2022-02-09 10:02:27,116] {scheduler_job.py:288} INFO - 4 tasks up for execution:
scheduler_1  |  
   
    
scheduler_1  |  
    
     
scheduler_1  |  
     
      
scheduler_1  |  
      
       
scheduler_1  | [2022-02-09 10:02:27,119] {scheduler_job.py:322} INFO - Figuring out tasks to run in Pool(name=default_pool) with 4 open slots and 4 task instances ready to be queued
scheduler_1  | [2022-02-09 10:02:27,119] {scheduler_job.py:349} INFO - DAG Slow_DAG has 2/2 running and queued tasks
scheduler_1  | [2022-02-09 10:02:27,119] {scheduler_job.py:357} INFO - Not executing 
       
         since the number of tasks running or queued from DAG Slow_DAG is >= to the DAG's max_active_tasks limit of 2 scheduler_1 | [2022-02-09 10:02:27,119] {scheduler_job.py:349} INFO - DAG Slow_DAG has 2/2 running and queued tasks scheduler_1 | [2022-02-09 10:02:27,119] {scheduler_job.py:357} INFO - Not executing 
        
          since the number of tasks running or queued from DAG Slow_DAG is >= to the DAG's max_active_tasks limit of 2 scheduler_1 | [2022-02-09 10:02:27,119] {scheduler_job.py:349} INFO - DAG Slow_DAG has 2/2 running and queued tasks scheduler_1 | [2022-02-09 10:02:27,120] {scheduler_job.py:357} INFO - Not executing 
         
           since the number of tasks running or queued from DAG Slow_DAG is >= to the DAG's max_active_tasks limit of 2 scheduler_1 | [2022-02-09 10:02:27,120] {scheduler_job.py:349} INFO - DAG Slow_DAG has 2/2 running and queued tasks scheduler_1 | [2022-02-09 10:02:27,120] {scheduler_job.py:357} INFO - Not executing 
          
            since the number of tasks running or queued from DAG Slow_DAG is >= to the DAG's max_active_tasks limit of 2 scheduler_1 | [2022-02-09 10:02:27,120] {scheduler_job.py:410} INFO - Setting the following tasks to queued state: scheduler_1 | 
          
         
        
       
      
     
    
   

I have fixed this by setting

parallelism = 1000
default_pool_task_slot_count = 999

The reason why this occurs and why the solution works

We need to look at this method of the Scheduler: https://github.com/apache/airflow/blob/2.2.3/airflow/jobs/scheduler_job.py#L229
In it, the overall logic is the following:

  1. Receive max_tis(== parallelism - active running tasks from the config) in the arguments. In my case that would be 30
  2. Calculate how many free slots in all all Pools we have. In my case that would be 4.
  3. Select the minimum and update the max_tis variable: max_tis = min(4, 30) = 4
  4. Query the DB for tasks that are Scheduled in an unpaused DAGs that are running normally and order them by DAG execution date.
  5. Limit the query by max_tis.
  6. Loop over each returned task and check if we can run it. Run, if possible.

Now, since the tasks are ordered by execution date, this will return us tasks for a DAG that started first, which lets say was DAG_2. It has over 600 tasks. The LIMIT operation will return 4 of them. Each off these tasks cannot be run since there are already 2 tasks running and the max_active_tasks of the DAG is 2. Thus, we just wait till one of these tasks finish and start another task from the same DAG.

Owner
Ilya Strelnikov
Ilya Strelnikov
Contains a Jupyter Notebook for calculating remaining plants required based on field/lathhouse data.

Davis-Sunflowers-Su21 Project goals: Plants influence their reproduction and mating system in many ways. Various factors such as time of flowering, ab

1 Feb 10, 2022
Application launcher and environment management

Application launcher and environment management for 21st century games and digital post-production, built with bleeding-rez and Qt.py News Date Releas

10 Nov 03, 2022
Static bytecode simulator

SEA Static bytecode simulator for creating dependency/dependant based experimental bytecode format for CPython. Example a = random() if a = 5.0:

Batuhan Taskaya 23 Jun 10, 2022
The third home of the bare Programming Language (1st there's my heart, the forest came second and then there's Github :)

The third home of the bare Programming Language (1st there's my heart, the forest came second and then there's Github :)

Garren Souza 7 Dec 24, 2022
An open source server for Super Mario Bros. 35

SMB35 A custom server for Super Mario Bros. 35 This server is highly experimental. Do not expect it to work without flaws.

Yannik Marchand 162 Dec 07, 2022
A site that went kinda viral that lets you put Bernie Sanders in places

Bernie In Places An app that accidentally went viral! Read the story in WIRED here Install First, create a python virtual environment, and install all

310 Aug 22, 2022
Backend/API for the Mumble.dev, an open source social media application.

Welcome to the Mumble Api Repository Getting Started If you are trying to use this project for the first time, you can get up and running by following

Dennis Ivy 189 Dec 27, 2022
An Advanced Wordlist Library Written In Python For Acm114

RBAPG -RBAPG is the abbreviation of "Rule Based Attack Password Generator". -This module is a wordlist generator module. -You can generate randomly

Aziz Kaplan 11 Aug 28, 2022
A parser of Windows Defender's DetectionHistory forensic artifact, containing substantial info about quarantined files and executables.

A parser of Windows Defender's DetectionHistory forensic artifact, containing substantial info about quarantined files and executables.

Jordan Klepser 101 Oct 30, 2022
Data on Free Food at MIT

MIT Free Food Timing Procrastinating research by plotting data on how long it takes emails on the free-food at mit edu mailing list to go through. Dat

Peter Sharpe 2 Nov 01, 2021
This repo presents you the official code of "VISTA: Boosting 3D Object Detection via Dual Cross-VIew SpaTial Attention"

VISTA VISTA: Boosting 3D Object Detection via Dual Cross-VIew SpaTial Attention Shengheng Deng, Zhihao Liang, Lin Sun and Kui Jia* (*) Corresponding a

104 Dec 29, 2022
Enjoyable scripting experience with Python

Enjoyable scripting experience with Python

8 Jun 08, 2022
BridgeWalk is a partially-observed reinforcement learning environment with dynamics of varying stochasticity.

BridgeWalk is a partially-observed reinforcement learning environment with dynamics of varying stochasticity. The player needs to walk along a bridge to reach a goal location. When the player walks o

Danijar Hafner 6 Jun 13, 2022
Creates infinite amount of guilded accounts in seconds.

Guilded Cookie Creator [fuck guilded i quit working on this, they patch like every fucking method after 2/3 days i release shit] Optimizations Asynchr

scripted 7 Feb 28, 2022
Multi-Process / Censorship Detection

Multi-Process / Censorship Detection

Baris Dincer 2 Dec 22, 2021
An open-source hyper-heuristic framework for multi-objective optimization

MOEA-HH An open-source hyper-heuristic framework for multi-objective optimization. Introduction The multi-objective optimization technique is widely u

Hengzhe Zhang 1 Feb 10, 2022
Async-first dependency injection library based on python type hints

Dependency Depression Async-first dependency injection library based on python type hints Quickstart First let's create a class we would be injecting:

Doctor 8 Oct 10, 2022
Web app for keeping track of buildings in danger of collapsing in the event of an earthquake

Bulina Roșie 🇷🇴 Un cutremur în București nu este o situație ipotetică. Este o certitudine că acest lucru se va întâmpla. În acest context, la mai bi

Code for Romania 27 Nov 29, 2022
A modern Python build backend

trampolim A modern Python build backend. Features Task system, allowing to run arbitrary Python code during the build process (Planned) Easy to use CL

Filipe Laíns 39 Nov 08, 2022
京东自动入会获取京豆

京东入会领京豆 要求 有一定的电脑知识 or 有耐心爱折腾 需要Chrome(推荐)、Edge(Chromium)、Firefox 操作系统需是Mac(本人没在m1上测试)、Linux(在deepin上测试过)、Windows 安装方法 脚本采用Selenium遍历京东入会有礼界面,由于遍历了200

Vanke Anton 500 Dec 22, 2022