A reproduction repo for a Scheduling bug in AirFlow 2.2.3

Overview

How to run

docker-compose build
docker-compose up

Setup

Have 3 DAGs:

  1. ~240 Tasks, executes every hour, runs for about 45-50 minutes total
  2. ~600 Tasks, executes every 5 days, runs for days
  3. ~10 Tasks, executes on trigger, runs for 10-50 minutes

DAGs run on the default_pool with max_active_tasks set to 2.

My AirFlow config file has the following:

parallelism = 32
default_pool_task_slot_count = 6
executor = LocalExecutor
default_task_weight_rule = absolute

parallelism was set initially like that. default_pool_task_slot_count was set to 6, because it seemed rational that if all of my 3 dags are executing at the same time, the maximum amount of tasks that can be executed is 3*2=6.

The problem:

Almost every time any one of the DAGs is executed, no tasks from other DAGs will start until all the tasks from the first one finish. That is, if slow DAG_2 with 600 tasks starts, DAG_1 will have to wait for days to start even a single Task.

the_bug

The Logs for the Scheduler look like this:

scheduler_1  | 
scheduler_1  | [2022-02-09 10:02:27,116] {scheduler_job.py:288} INFO - 4 tasks up for execution:
scheduler_1  |  
   
    
scheduler_1  |  
    
     
scheduler_1  |  
     
      
scheduler_1  |  
      
       
scheduler_1  | [2022-02-09 10:02:27,119] {scheduler_job.py:322} INFO - Figuring out tasks to run in Pool(name=default_pool) with 4 open slots and 4 task instances ready to be queued
scheduler_1  | [2022-02-09 10:02:27,119] {scheduler_job.py:349} INFO - DAG Slow_DAG has 2/2 running and queued tasks
scheduler_1  | [2022-02-09 10:02:27,119] {scheduler_job.py:357} INFO - Not executing 
       
         since the number of tasks running or queued from DAG Slow_DAG is >= to the DAG's max_active_tasks limit of 2 scheduler_1 | [2022-02-09 10:02:27,119] {scheduler_job.py:349} INFO - DAG Slow_DAG has 2/2 running and queued tasks scheduler_1 | [2022-02-09 10:02:27,119] {scheduler_job.py:357} INFO - Not executing 
        
          since the number of tasks running or queued from DAG Slow_DAG is >= to the DAG's max_active_tasks limit of 2 scheduler_1 | [2022-02-09 10:02:27,119] {scheduler_job.py:349} INFO - DAG Slow_DAG has 2/2 running and queued tasks scheduler_1 | [2022-02-09 10:02:27,120] {scheduler_job.py:357} INFO - Not executing 
         
           since the number of tasks running or queued from DAG Slow_DAG is >= to the DAG's max_active_tasks limit of 2 scheduler_1 | [2022-02-09 10:02:27,120] {scheduler_job.py:349} INFO - DAG Slow_DAG has 2/2 running and queued tasks scheduler_1 | [2022-02-09 10:02:27,120] {scheduler_job.py:357} INFO - Not executing 
          
            since the number of tasks running or queued from DAG Slow_DAG is >= to the DAG's max_active_tasks limit of 2 scheduler_1 | [2022-02-09 10:02:27,120] {scheduler_job.py:410} INFO - Setting the following tasks to queued state: scheduler_1 | 
          
         
        
       
      
     
    
   

I have fixed this by setting

parallelism = 1000
default_pool_task_slot_count = 999

The reason why this occurs and why the solution works

We need to look at this method of the Scheduler: https://github.com/apache/airflow/blob/2.2.3/airflow/jobs/scheduler_job.py#L229
In it, the overall logic is the following:

  1. Receive max_tis(== parallelism - active running tasks from the config) in the arguments. In my case that would be 30
  2. Calculate how many free slots in all all Pools we have. In my case that would be 4.
  3. Select the minimum and update the max_tis variable: max_tis = min(4, 30) = 4
  4. Query the DB for tasks that are Scheduled in an unpaused DAGs that are running normally and order them by DAG execution date.
  5. Limit the query by max_tis.
  6. Loop over each returned task and check if we can run it. Run, if possible.

Now, since the tasks are ordered by execution date, this will return us tasks for a DAG that started first, which lets say was DAG_2. It has over 600 tasks. The LIMIT operation will return 4 of them. Each off these tasks cannot be run since there are already 2 tasks running and the max_active_tasks of the DAG is 2. Thus, we just wait till one of these tasks finish and start another task from the same DAG.

Owner
Ilya Strelnikov
Ilya Strelnikov
Site de gestion de cave à vin utilisant une BDD manipulée avec SQLite3 via Python

cave-vin Site de gestion de cave à vin utilisant une bdd manipulée avec MySQL ACCEDER AU SITE : Pour accéder à votre cave vous aurez besoin de lancer

Elouann Lucas 0 Jul 05, 2022
Insights in greek football league 2020-2021 and bookmaker's accuracy

Greek_Football_League_Analysis_2020_2021 Aim of Project: This project aims in deriving useful insights from greek football league 2020-2021 by mean st

2 Jan 16, 2022
JPMC Virtual Experience

This repository contains the submitted patch files along with raw files of the various tasks assigned by JPMorgan Chase & Co. through its Software Engineering Virtual Experience Program on Forage (fo

Vardhini K 1 Dec 05, 2021
Performance data for WASM SIMD instructions.

WASM SIMD Data This repository contains code and data which can be used to generate a JSON file containing information about the WASM SIMD proposal. F

Evan Nemerson 5 Jul 24, 2022
Load dependent libraries dynamically.

dypend dypend Load dependent libraries dynamically. A few days ago, I encountered many users feedback in an open source project. The Problem is they c

Louis 5 Mar 02, 2022
Sigma coding youtube - This is a collection of all the code that can be found on my YouTube channel Sigma Coding.

Sigma Coding Tutorials & Resources YouTube • Facebook Support Sigma Coding Patreon • GitHub Sponsor • Shop Amazon Table of Contents Overview Topics Re

Alex Reed 927 Jan 08, 2023
A community based economy bot with python works only with python 3.7.8 as web3 requires cytoolz

A community based economy bot with python works only with python 3.7.8 as web3 requires cytoolz has some issues building with python 3.10

4 Jan 01, 2022
Aevsploit İçin Destekde Bulun Papara: 1427113016

Aevsploit İçin Destekde Bulun Papara: 1427113016 Toolu Geliştirmek İçin Fikirlerinizi Bekliyorum Telegram

9 Jun 07, 2022
The Official Jaseci Code Repository

Jaseci Release Notes Version 1.2.2 Updates Added new built-ins for nodes and edges (context, info, and details) Fixed dot output Added reset command t

136 Dec 20, 2022
PyPIContents is an application that generates a Module Index from the Python Package Index (PyPI) and also from various versions of the Python Standard Library.

PyPIContents is an application that generates a Module Index from the Python Package Index (PyPI) and also from various versions of the Python Standar

Collage Labs 10 Nov 19, 2022
SymbLang are my programming language! Insired by the brainf**k.

SymbLang . - output as Unicode. , - input. ; - clear data. & - character that the main line start with. @value: 0 - 9 - character that the function

1 Apr 04, 2022
CircuitPython Driver for Adafruit 24LC32 I2C EEPROM Breakout 32Kbit / 4 KB

Introduction CircuitPython driver for Adafruit 24LC32 I2C EEPROM Breakout Dependencies This driver depends on: Adafruit CircuitPython Bus Device Regis

Adafruit Industries 4 Oct 03, 2022
An OpenSource crowd-sourced cooking recipes website

An OpenSource crowd-sourced cooking recipes website

21 Jul 31, 2022
Demo of patching a python context manager

patch-demo-20211203 demo of patching a python context manager poetry install poetry run python -m my_great_app to run the code poetry run pytest to te

Brad Smith 1 Feb 09, 2022
Convert Beat Saber maps to Tesla light shows!

Tesla x Beat Saber - Light Show Converter Convert Beat Saber maps to Tesla light shows! This project requires FFMPEG and all packages from requirement

HLVM 20 Dec 21, 2022
A collection of repositories used to realise various end-to-end high-level synthesis (HLS) flows centering around the CIRCT project.

circt-hls What is this?: A collection of repositories used to realise various end-to-end high-level synthesis (HLS) flows centering around the CIRCT p

29 Dec 14, 2022
Controller state monitor plugin for EVA ICS

eva-plugin-cmon Controller status monitor plugin for EVA ICS Monitors connected controllers status in SFA and pushes measurements into an external Inf

Altertech 1 Nov 06, 2021
Python Project For Beginner

Basic-Vitrual-AI-Assistant Python Project For Beginner Hey There, I had manipulated Selenium WebDriver to make this assistant. I hope, It will be help

Maruf Billah 13 Dec 12, 2022
Writeup and scripts for the 2021 malwarebytes crackme

Malwarebytes Crackme 2021 Tools and environment setup We will be doing this analysis in a Windows 10 VM with the flare-vm tools installed. Most of the

Jerome Leow 9 Dec 02, 2022
A python library what works with numbers.

pynum A python library what works with numbers. Prime Prime class have everithing you want about prime numbers. check_prime The check_prime method is

Mohammad Mahdi Paydar Puya 1 Jan 07, 2022