A reproduction repo for a Scheduling bug in AirFlow 2.2.3

Overview

How to run

docker-compose build
docker-compose up

Setup

Have 3 DAGs:

  1. ~240 Tasks, executes every hour, runs for about 45-50 minutes total
  2. ~600 Tasks, executes every 5 days, runs for days
  3. ~10 Tasks, executes on trigger, runs for 10-50 minutes

DAGs run on the default_pool with max_active_tasks set to 2.

My AirFlow config file has the following:

parallelism = 32
default_pool_task_slot_count = 6
executor = LocalExecutor
default_task_weight_rule = absolute

parallelism was set initially like that. default_pool_task_slot_count was set to 6, because it seemed rational that if all of my 3 dags are executing at the same time, the maximum amount of tasks that can be executed is 3*2=6.

The problem:

Almost every time any one of the DAGs is executed, no tasks from other DAGs will start until all the tasks from the first one finish. That is, if slow DAG_2 with 600 tasks starts, DAG_1 will have to wait for days to start even a single Task.

the_bug

The Logs for the Scheduler look like this:

scheduler_1  | 
scheduler_1  | [2022-02-09 10:02:27,116] {scheduler_job.py:288} INFO - 4 tasks up for execution:
scheduler_1  |  
   
    
scheduler_1  |  
    
     
scheduler_1  |  
     
      
scheduler_1  |  
      
       
scheduler_1  | [2022-02-09 10:02:27,119] {scheduler_job.py:322} INFO - Figuring out tasks to run in Pool(name=default_pool) with 4 open slots and 4 task instances ready to be queued
scheduler_1  | [2022-02-09 10:02:27,119] {scheduler_job.py:349} INFO - DAG Slow_DAG has 2/2 running and queued tasks
scheduler_1  | [2022-02-09 10:02:27,119] {scheduler_job.py:357} INFO - Not executing 
       
         since the number of tasks running or queued from DAG Slow_DAG is >= to the DAG's max_active_tasks limit of 2 scheduler_1 | [2022-02-09 10:02:27,119] {scheduler_job.py:349} INFO - DAG Slow_DAG has 2/2 running and queued tasks scheduler_1 | [2022-02-09 10:02:27,119] {scheduler_job.py:357} INFO - Not executing 
        
          since the number of tasks running or queued from DAG Slow_DAG is >= to the DAG's max_active_tasks limit of 2 scheduler_1 | [2022-02-09 10:02:27,119] {scheduler_job.py:349} INFO - DAG Slow_DAG has 2/2 running and queued tasks scheduler_1 | [2022-02-09 10:02:27,120] {scheduler_job.py:357} INFO - Not executing 
         
           since the number of tasks running or queued from DAG Slow_DAG is >= to the DAG's max_active_tasks limit of 2 scheduler_1 | [2022-02-09 10:02:27,120] {scheduler_job.py:349} INFO - DAG Slow_DAG has 2/2 running and queued tasks scheduler_1 | [2022-02-09 10:02:27,120] {scheduler_job.py:357} INFO - Not executing 
          
            since the number of tasks running or queued from DAG Slow_DAG is >= to the DAG's max_active_tasks limit of 2 scheduler_1 | [2022-02-09 10:02:27,120] {scheduler_job.py:410} INFO - Setting the following tasks to queued state: scheduler_1 | 
          
         
        
       
      
     
    
   

I have fixed this by setting

parallelism = 1000
default_pool_task_slot_count = 999

The reason why this occurs and why the solution works

We need to look at this method of the Scheduler: https://github.com/apache/airflow/blob/2.2.3/airflow/jobs/scheduler_job.py#L229
In it, the overall logic is the following:

  1. Receive max_tis(== parallelism - active running tasks from the config) in the arguments. In my case that would be 30
  2. Calculate how many free slots in all all Pools we have. In my case that would be 4.
  3. Select the minimum and update the max_tis variable: max_tis = min(4, 30) = 4
  4. Query the DB for tasks that are Scheduled in an unpaused DAGs that are running normally and order them by DAG execution date.
  5. Limit the query by max_tis.
  6. Loop over each returned task and check if we can run it. Run, if possible.

Now, since the tasks are ordered by execution date, this will return us tasks for a DAG that started first, which lets say was DAG_2. It has over 600 tasks. The LIMIT operation will return 4 of them. Each off these tasks cannot be run since there are already 2 tasks running and the max_active_tasks of the DAG is 2. Thus, we just wait till one of these tasks finish and start another task from the same DAG.

Owner
Ilya Strelnikov
Ilya Strelnikov
A collection of existing KGQA datasets in the form of the huggingface datasets library, aiming to provide an easy-to-use access to them.

KGQA Datasets Brief Introduction This repository is a collection of existing KGQA datasets in the form of the huggingface datasets library, aiming to

Semantic Systems research group 21 Jan 06, 2023
清晰易读的7x7像素点阵中文字体和取模工具

FontChinese7x7 上古神器 III : 7x7像素点阵中文字体 想要在低分辨率屏幕上显示中文, 却发现中文字体实在是太大? 找了全网发现字体库最小也只有12x12? 甚至是好不容易找到了一个8x8字体, 结果发现字体收费且明确说明不得以任何形式嵌入到软件当中? 那就让这个项目来解决你的问

Angelic47 72 Dec 12, 2022
Safely pass trusted data to untrusted environments and back.

ItsDangerous ... so better sign this Various helpers to pass data to untrusted environments and to get it back safe and sound. Data is cryptographical

The Pallets Projects 2.6k Jan 01, 2023
A tool to guide you for team selection based on mana and ruleset using your owned cards.

Splinterlands_Teams_Guide A tool to guide you for team selection based on mana and ruleset using your owned cards. Built With This project is built wi

Ruzaini Subri 3 Jul 30, 2022
Lenovo Yoga Ideapad Autocharge

Description This program uses the conservation_mode of Lonovo Ideapad / Yoga not

1 Jan 09, 2022
Easy, clean, reliable Python 2/3 compatibility

Overview: Easy, clean, reliable Python 2/3 compatibility python-future is the missing compatibility layer between Python 2 and Python 3. It allows you

Python Charmers 1.2k Jan 08, 2023
A clock widget for linux ez to use no need for cmd line ;)

A clock widget in LINUX A clock widget for linux ez to use no need for cmd line ;) How to install? oh its ez just go to realese! what are the paltform

1 Feb 15, 2022
Fuzz introspector for python

Fuzz introspector High-level goals: Show fuzzing-relevant data about each function in a given project Show reachability of fuzzer(s) Integrate seamles

14 Mar 25, 2022
A simple wrapper for joy library

Joy CodeGround A simple wrapper for joy library to render joy sketches in browser using vs code, (or in other words, for those who are allergic to Jup

rijfas 9 Sep 08, 2022
Python solution of advent-of-code 2021

Advent of code 2021 Python solutions of Advent of Code 2021 written by Eric Bouteillon Requirements The solutions were developed and tested using Pyth

Eric Bouteillon 3 Oct 25, 2022
DOP-Tuning(Domain-Oriented Prefix-tuning model)

DOP-Tuning DOP-Tuning(Domain-Oriented Prefix-tuning model)代码基于Prefix-Tuning改进. Files ├── seq2seq # Code for encoder-decoder arch

Andrew Zeng 5 Nov 02, 2022
Blender 3.1 Alpha (and later) PLY importer that correctly loads point clouds (and all PLY models as point clouds)

import-ply-as-verts Blender 3.1 Alpha (and later) PLY importer that correctly loads point clouds (and all PLY models as point clouds) Latest News Mand

Michael Prostka 82 Dec 20, 2022
Blender-miHoYo-Shaders - Shaders for Blender attempting to replicate the shading of games developed by miHoYo

Blender-miHoYo-Shaders - Shaders for Blender attempting to replicate the shading of games developed by miHoYo

Matsuri 449 Jan 07, 2023
This is a pretty basic but relatively nice looking Python Pomodoro Timer.

Python Pomodoro-Timer This is a pretty basic but relatively nice looking Pomodoro Timer. Currently its set to a very basic mode, but the funcationalit

EmmHarris 2 Oct 18, 2021
Developer guide for Hivecoin project

Hivecoin-developer Developer guide for Hivecoin project. Install Content are writen in reStructuredText (RST) and rendered with Sphinx. Much of the co

tweetyf 1 Nov 22, 2021
A feed generator. Currently supports generating RSS feeds from Google, Bing, and Yahoo news.

A feed generator. Currently supports generating RSS feeds from Google, Bing, and Yahoo news.

Josh Cardenzana 0 Dec 13, 2021
GUI for the Gammu library.

Wammu GUI for the Gammu library. Homepage https://wammu.eu/ License GNU GPL version 3 or later. First start On first start you will be asked for set

Gammu 60 Dec 14, 2022
IPO Checker for NEPSE

IPO Checker Checks more than one account for an IPO. Usage: ipo_checker.py [-h] --file FILE IPO Checker for a list. optional arguments: -h, --help

Sagar Tamang 4 Sep 20, 2022
This is an online course where you can learn and master the skill of low-level performance analysis and tuning.

Performance Ninja Class This is an online course where you can learn to find and fix low-level performance issues, for example CPU cache misses and br

Denis Bakhvalov 1.2k Dec 30, 2022
Mail Me My Social Media stats (SoMeMailMe)

Mail Me My Social Media follower count (SoMeMailMe) TikTok only show data 60 days back in time. With this repo you can easily scrape your follower cou

Daniel Wigh 1 Jan 07, 2022