A reproduction repo for a Scheduling bug in AirFlow 2.2.3

Overview

How to run

docker-compose build
docker-compose up

Setup

Have 3 DAGs:

  1. ~240 Tasks, executes every hour, runs for about 45-50 minutes total
  2. ~600 Tasks, executes every 5 days, runs for days
  3. ~10 Tasks, executes on trigger, runs for 10-50 minutes

DAGs run on the default_pool with max_active_tasks set to 2.

My AirFlow config file has the following:

parallelism = 32
default_pool_task_slot_count = 6
executor = LocalExecutor
default_task_weight_rule = absolute

parallelism was set initially like that. default_pool_task_slot_count was set to 6, because it seemed rational that if all of my 3 dags are executing at the same time, the maximum amount of tasks that can be executed is 3*2=6.

The problem:

Almost every time any one of the DAGs is executed, no tasks from other DAGs will start until all the tasks from the first one finish. That is, if slow DAG_2 with 600 tasks starts, DAG_1 will have to wait for days to start even a single Task.

the_bug

The Logs for the Scheduler look like this:

scheduler_1  | 
scheduler_1  | [2022-02-09 10:02:27,116] {scheduler_job.py:288} INFO - 4 tasks up for execution:
scheduler_1  |  
   
    
scheduler_1  |  
    
     
scheduler_1  |  
     
      
scheduler_1  |  
      
       
scheduler_1  | [2022-02-09 10:02:27,119] {scheduler_job.py:322} INFO - Figuring out tasks to run in Pool(name=default_pool) with 4 open slots and 4 task instances ready to be queued
scheduler_1  | [2022-02-09 10:02:27,119] {scheduler_job.py:349} INFO - DAG Slow_DAG has 2/2 running and queued tasks
scheduler_1  | [2022-02-09 10:02:27,119] {scheduler_job.py:357} INFO - Not executing 
       
         since the number of tasks running or queued from DAG Slow_DAG is >= to the DAG's max_active_tasks limit of 2 scheduler_1 | [2022-02-09 10:02:27,119] {scheduler_job.py:349} INFO - DAG Slow_DAG has 2/2 running and queued tasks scheduler_1 | [2022-02-09 10:02:27,119] {scheduler_job.py:357} INFO - Not executing 
        
          since the number of tasks running or queued from DAG Slow_DAG is >= to the DAG's max_active_tasks limit of 2 scheduler_1 | [2022-02-09 10:02:27,119] {scheduler_job.py:349} INFO - DAG Slow_DAG has 2/2 running and queued tasks scheduler_1 | [2022-02-09 10:02:27,120] {scheduler_job.py:357} INFO - Not executing 
         
           since the number of tasks running or queued from DAG Slow_DAG is >= to the DAG's max_active_tasks limit of 2 scheduler_1 | [2022-02-09 10:02:27,120] {scheduler_job.py:349} INFO - DAG Slow_DAG has 2/2 running and queued tasks scheduler_1 | [2022-02-09 10:02:27,120] {scheduler_job.py:357} INFO - Not executing 
          
            since the number of tasks running or queued from DAG Slow_DAG is >= to the DAG's max_active_tasks limit of 2 scheduler_1 | [2022-02-09 10:02:27,120] {scheduler_job.py:410} INFO - Setting the following tasks to queued state: scheduler_1 | 
          
         
        
       
      
     
    
   

I have fixed this by setting

parallelism = 1000
default_pool_task_slot_count = 999

The reason why this occurs and why the solution works

We need to look at this method of the Scheduler: https://github.com/apache/airflow/blob/2.2.3/airflow/jobs/scheduler_job.py#L229
In it, the overall logic is the following:

  1. Receive max_tis(== parallelism - active running tasks from the config) in the arguments. In my case that would be 30
  2. Calculate how many free slots in all all Pools we have. In my case that would be 4.
  3. Select the minimum and update the max_tis variable: max_tis = min(4, 30) = 4
  4. Query the DB for tasks that are Scheduled in an unpaused DAGs that are running normally and order them by DAG execution date.
  5. Limit the query by max_tis.
  6. Loop over each returned task and check if we can run it. Run, if possible.

Now, since the tasks are ordered by execution date, this will return us tasks for a DAG that started first, which lets say was DAG_2. It has over 600 tasks. The LIMIT operation will return 4 of them. Each off these tasks cannot be run since there are already 2 tasks running and the max_active_tasks of the DAG is 2. Thus, we just wait till one of these tasks finish and start another task from the same DAG.

Owner
Ilya Strelnikov
Ilya Strelnikov
An attempt at furthering Factorio Calculator to work in more general contexts.

factorio-optimizer Lets do Factorio Calculator but make it optimize. Why not use Factorio Calculator? Becuase factorio calculator is not general. The

Jonathan Woollett-Light 1 Jun 03, 2022
Direct Multi-view Multi-person 3D Human Pose Estimation

Implementation of NeurIPS-2021 paper: Direct Multi-view Multi-person 3D Human Pose Estimation [paper] [video-YouTube, video-Bilibili] [slides] This is

Sea AI Lab 253 Jan 05, 2023
For Tok-k passages that have passed through the Bi-Encoder Retrival, ReRank is performed using CrossEncoder.

Cross-Encoder-with-Bi-Encoder For Tok-k passages that have passed through the Bi-Encoder Retrival, ReRank is performed using CrossEncoder. Data Data u

7 Feb 09, 2022
We'll be using HTML, CSS and JavaScript for the frontend

We'll be using HTML, CSS and JavaScript for the frontend. Nothing to install in specific. Open your text-editor and start coding a beautiful front-end.

Mugada sai tilak 1 Dec 15, 2021
A similarity measurer on two programming assignments on Online Judge.

A similarity measurer on two programming assignments on Online Judge. Algorithm implementation details are at here. Install Recommend OS: Ubuntu 20.04

StardustDL 6 May 21, 2022
Doom o’clock is a website/project that features a countdown of “when will the earth end” and a greenhouse gas effect emission prediction that’s predicted

Doom o’clock is a website/project that features a countdown of “when will the earth end” and a greenhouse gas effect emission prediction that’s predicted

shironeko(Hazel) 4 Jan 01, 2022
Curses frontend for Canto daemon

Canto Curses The curses (text) client for canto-daemon. Canto-daemon is required to work and is found at: http://github.com/themoken/canto-next Requir

Jack Miller 86 Dec 28, 2022
0CD - BinaryNinja plugin to introduce some quality of life utilities for obsessive compulsive CTF enthusiasts

0CD Author: b0bb Quality of life utilities for obsessive compulsive CTF enthusia

12 Sep 14, 2022
A way to write regex with objects instead of strings.

Py Idiomatic Regex (AKA iregex) Documentation Available Here An easier way to write regex in Python using OOP instead of strings. Makes the code much

Ryan Peach 18 Nov 15, 2021
UF3: a python library for generating ultra-fast interatomic potentials

Ultra-Fast Force Fields (UF3) S. R. Xie, M. Rupp, and R. G. Hennig, "Ultra-fast interpretable machine-learning potentials", preprint arXiv:2110.00624

Ultra-Fast Force Fields 24 Nov 13, 2022
script to analyze EQ decay using python

pyq_decay script to analyze EQ decay using python PyQ Decay ver 1.0 A pythonic script to analyze EQ aftershock decay using method of Omori (1894), Mog

1 Nov 04, 2021
Hotpile: High Order Turing Machine Language Compiler

Hotpile: High Order Turing Machine Language Compiler Build and Run Requirements: Python 3.6+, bison, flex, and GCC installed. Needs to be run under UN

Jiang Weihao 4 Dec 29, 2021
Estimating the potential photovoltaic production of buildings (in Berlin)

The following people contributed equally to this repository (in alphabetical order): Daniel Bumke JJX Corstiaen Versteegh This repository is forked on

Daniel Bumke 6 Feb 18, 2022
An ongoing curated list of frameworks, libraries, learning tutorials, software and resources in Python Language.

Python Development Welcome to the world of Python. An ongoing curated list of frameworks, libraries, learning tutorials, software and resources in Pyt

Paul Veillard 2 Dec 24, 2021
To effectively detect the faulty wafers

wafer_fault_detection Aim of the project: In electronics, a wafer (also called a slice or substrate) is a thin slice of semiconductor, such as crystal

Arun Singh Babal 1 Nov 06, 2021
Quick script for automatically extracting syscall numbers for an OS

Syscalls-Extractor Quick script for automatically extracting syscall numbers for an OS $ python3 .\syscalls-extractor.py --help usage: syscalls-extrac

m0rv4i 54 Feb 10, 2022
This is a python package to get wards, districts,cities and provinces in Zimbabwe

Zim-Places Features This is a python package that allows you to search for cities, provinces, and districts in Zimbabwe.Zimbabwe is split into eight p

RONALD KANYEPI 2 Mar 01, 2022
An-7 tool for python

***An-7 tool - Anonime-X Team*** An-x Menu : SPAM Android web malware interpreter Spam Tools : scampages letters mailers smtpcrack wpbrute shell Andro

Hamza Anonime 8 Nov 18, 2021
Demo scripts for the Kubernetes Security Webinar

Kubernetes Security Webinar [in Russian] YouTube video (October 13, 2021) Authors: Artem Yushkovsky (LinkedIn, GitHub) Maxim Mosharov @ Whitespots.io

Slurm 34 Dec 06, 2022
Simple Python script I use to manage and build my Reflux themes.

Simple Python script I use to manage and build my Reflux themes. Built for personal use, but anyone can easily fork and tweak to suit thier needs.

Ire 3 Jan 25, 2022