A reproduction repo for a Scheduling bug in AirFlow 2.2.3

Overview

How to run

docker-compose build
docker-compose up

Setup

Have 3 DAGs:

  1. ~240 Tasks, executes every hour, runs for about 45-50 minutes total
  2. ~600 Tasks, executes every 5 days, runs for days
  3. ~10 Tasks, executes on trigger, runs for 10-50 minutes

DAGs run on the default_pool with max_active_tasks set to 2.

My AirFlow config file has the following:

parallelism = 32
default_pool_task_slot_count = 6
executor = LocalExecutor
default_task_weight_rule = absolute

parallelism was set initially like that. default_pool_task_slot_count was set to 6, because it seemed rational that if all of my 3 dags are executing at the same time, the maximum amount of tasks that can be executed is 3*2=6.

The problem:

Almost every time any one of the DAGs is executed, no tasks from other DAGs will start until all the tasks from the first one finish. That is, if slow DAG_2 with 600 tasks starts, DAG_1 will have to wait for days to start even a single Task.

the_bug

The Logs for the Scheduler look like this:

scheduler_1  | 
scheduler_1  | [2022-02-09 10:02:27,116] {scheduler_job.py:288} INFO - 4 tasks up for execution:
scheduler_1  |  
   
    
scheduler_1  |  
    
     
scheduler_1  |  
     
      
scheduler_1  |  
      
       
scheduler_1  | [2022-02-09 10:02:27,119] {scheduler_job.py:322} INFO - Figuring out tasks to run in Pool(name=default_pool) with 4 open slots and 4 task instances ready to be queued
scheduler_1  | [2022-02-09 10:02:27,119] {scheduler_job.py:349} INFO - DAG Slow_DAG has 2/2 running and queued tasks
scheduler_1  | [2022-02-09 10:02:27,119] {scheduler_job.py:357} INFO - Not executing 
       
         since the number of tasks running or queued from DAG Slow_DAG is >= to the DAG's max_active_tasks limit of 2 scheduler_1 | [2022-02-09 10:02:27,119] {scheduler_job.py:349} INFO - DAG Slow_DAG has 2/2 running and queued tasks scheduler_1 | [2022-02-09 10:02:27,119] {scheduler_job.py:357} INFO - Not executing 
        
          since the number of tasks running or queued from DAG Slow_DAG is >= to the DAG's max_active_tasks limit of 2 scheduler_1 | [2022-02-09 10:02:27,119] {scheduler_job.py:349} INFO - DAG Slow_DAG has 2/2 running and queued tasks scheduler_1 | [2022-02-09 10:02:27,120] {scheduler_job.py:357} INFO - Not executing 
         
           since the number of tasks running or queued from DAG Slow_DAG is >= to the DAG's max_active_tasks limit of 2 scheduler_1 | [2022-02-09 10:02:27,120] {scheduler_job.py:349} INFO - DAG Slow_DAG has 2/2 running and queued tasks scheduler_1 | [2022-02-09 10:02:27,120] {scheduler_job.py:357} INFO - Not executing 
          
            since the number of tasks running or queued from DAG Slow_DAG is >= to the DAG's max_active_tasks limit of 2 scheduler_1 | [2022-02-09 10:02:27,120] {scheduler_job.py:410} INFO - Setting the following tasks to queued state: scheduler_1 | 
          
         
        
       
      
     
    
   

I have fixed this by setting

parallelism = 1000
default_pool_task_slot_count = 999

The reason why this occurs and why the solution works

We need to look at this method of the Scheduler: https://github.com/apache/airflow/blob/2.2.3/airflow/jobs/scheduler_job.py#L229
In it, the overall logic is the following:

  1. Receive max_tis(== parallelism - active running tasks from the config) in the arguments. In my case that would be 30
  2. Calculate how many free slots in all all Pools we have. In my case that would be 4.
  3. Select the minimum and update the max_tis variable: max_tis = min(4, 30) = 4
  4. Query the DB for tasks that are Scheduled in an unpaused DAGs that are running normally and order them by DAG execution date.
  5. Limit the query by max_tis.
  6. Loop over each returned task and check if we can run it. Run, if possible.

Now, since the tasks are ordered by execution date, this will return us tasks for a DAG that started first, which lets say was DAG_2. It has over 600 tasks. The LIMIT operation will return 4 of them. Each off these tasks cannot be run since there are already 2 tasks running and the max_active_tasks of the DAG is 2. Thus, we just wait till one of these tasks finish and start another task from the same DAG.

Owner
Ilya Strelnikov
Ilya Strelnikov
A small script I made that takes any standard Decklist of magic the gathering cards and pulls all card images from scryfall at once!

A small script I made that takes any standard Decklist of magic the gathering cards and pulls all card images from scryfall at once!

15 Aug 26, 2022
From "fixed RAnDom CRashes" to "[FIX] Fixed random crashes."

Clean Commit From fixed RAnDom CRashes to [FIX] Fixed random crashes. Clean commit helps you by auto-formating your commits to make your repos better

Mathias 3 Dec 26, 2021
Wunderland desktop wallpaper and Microsoft Teams background.

Wunderland Professional Impress your colleagues, friends and family with this edition of the "Wunderland" wallpaper. With the nostalgic feel of the or

3 Dec 14, 2022
Keyboard Layout Change - Extension for Ulauncher

Keyboard Layout Change - Extension for Ulauncher

Marco Borchi 4 Aug 26, 2022
This is Gaurav's IP Project Completed in the year session of 2021-2022.

The Analyser by Gaurav Rayat Why this Project? Today we are continuously hearing about growth in Crime rates and the number of murders executed day by

1 Dec 30, 2021
Fofa asset consolidation script

资产收集+C段整理二合一 基于fofa资产搜索引擎进行资产收集,快速检索目标条件下的IP,URL以及标题,适用于资产较多时对模糊资产的快速检索,新增C段整理功能,整理出

白泽Sec安全实验室 36 Dec 01, 2022
Traits for Python3

Do you like Python, but think that multiple inheritance is a bit too flexible? Are you looking for a more constrained way to define interfaces and re-use code?

121 Nov 15, 2022
A service to display a quick summary of a project on GitHub.

A service to display a quick summary of a project on GitHub. Usage 📖 Paste the code below with details filled in as specified below into your Readme.

Rohit V 8 Dec 06, 2022
Box CRUD API With Python

Box CRUD API: Consider a store which has an inventory of boxes which are all cuboid(which have length breadth and height). Each Cuboid has been added

Akhil Bhalerao 3 Feb 17, 2022
Just some mtk tool for exploitation, reading/writing flash and doing crazy stuff

Just some mtk tool for exploitation, reading/writing flash and doing crazy stuff. For linux, a patched kernel is needed (see Setup folder) (except for read/write flash). For windows, you need to inst

Bjoern Kerler 1.1k Dec 31, 2022
About A python based Apple Quicktime protocol,you can record audio and video from real iOS devices

介绍 本应用程序使用 python 实现,可以通过 USB 连接 iOS 设备进行屏幕共享 高帧率(30〜60fps) 高画质 低延迟(200ms) 非侵入性 支持多设备并行 Mac OSX 安装 python =3.7 brew install libusb pkg-config 如需使用 g

YueC 124 Nov 30, 2022
Python language from the beginning.

Python For Beginners Python Programming Language ♦️ Python is a very powerful and user friendly programming language. ❄️ ♦️ There are some basic sytax

Randula Yashasmith Mawaththa 6 Sep 18, 2022
1st Online Python Editor With Live Syntax Checking and Execution

PythonBuddy 🖊️ 🐍 Online Python 3 Programming with Live Pylint Syntax Checking! Usage Fetch from repo: git clone https://github.com/ethanchewy/Python

Ethan Chiu 255 Dec 23, 2022
Modeval (or Modular Eval) is a modular and secure string evaluation library that can be used to create custom parsers or interpreters.

modeval Modeval (or Modular Eval) is a modular and secure string evaluation library that can be used to create custom parsers or interpreters. Basic U

2 Jan 01, 2022
This repository can help you made a PocketMine-MP Server with Termux apps!

Hello This GitHub repository can made you a Server PocketMine-MP On development! How to Install Open Termux Type "pkg install git && python" If python

1 Mar 04, 2022
Sample microservices application demo

Development mode docker-compose -f docker-compose.yml -f docker-compose.dev.yml up -d or export COMPOSE_FILE='docker-compose.yml:docker-compose.dev.ym

Konstantinos Bairaktaris 1 Nov 14, 2021
Simple calculator made in python

calculator Uma alculadora simples feita em python CMD, PowerShell, Bash ✔️ Início 💻 apt-get update apt-get upgrade -y apt-get install python git git

Spyware 8 Dec 28, 2021
A wide AOI generator tool.

Dark Generator A wide AOI generator tool. Information Installation To Install you have to have python 3.x and pip installed on your system. If you hav

Darkest Surface 12 Dec 26, 2022
Better Giveaways is a bot that will change the experience of using a giveaway bot forever.

Better-Giveaways Better Giveaways is a bot that will change the experience of using a giveaway bot forever. VoxelBotUtils/Novus, latest PyPi releases

Lightning 2 Jan 12, 2022
Convert long numbers into a human-readable format in Python

Convert long numbers into a human-readable format in Python

Alex Zaitsev 73 Dec 28, 2022