A reproduction repo for a Scheduling bug in AirFlow 2.2.3

Overview

How to run

docker-compose build
docker-compose up

Setup

Have 3 DAGs:

  1. ~240 Tasks, executes every hour, runs for about 45-50 minutes total
  2. ~600 Tasks, executes every 5 days, runs for days
  3. ~10 Tasks, executes on trigger, runs for 10-50 minutes

DAGs run on the default_pool with max_active_tasks set to 2.

My AirFlow config file has the following:

parallelism = 32
default_pool_task_slot_count = 6
executor = LocalExecutor
default_task_weight_rule = absolute

parallelism was set initially like that. default_pool_task_slot_count was set to 6, because it seemed rational that if all of my 3 dags are executing at the same time, the maximum amount of tasks that can be executed is 3*2=6.

The problem:

Almost every time any one of the DAGs is executed, no tasks from other DAGs will start until all the tasks from the first one finish. That is, if slow DAG_2 with 600 tasks starts, DAG_1 will have to wait for days to start even a single Task.

the_bug

The Logs for the Scheduler look like this:

scheduler_1  | 
scheduler_1  | [2022-02-09 10:02:27,116] {scheduler_job.py:288} INFO - 4 tasks up for execution:
scheduler_1  |  
   
    
scheduler_1  |  
    
     
scheduler_1  |  
     
      
scheduler_1  |  
      
       
scheduler_1  | [2022-02-09 10:02:27,119] {scheduler_job.py:322} INFO - Figuring out tasks to run in Pool(name=default_pool) with 4 open slots and 4 task instances ready to be queued
scheduler_1  | [2022-02-09 10:02:27,119] {scheduler_job.py:349} INFO - DAG Slow_DAG has 2/2 running and queued tasks
scheduler_1  | [2022-02-09 10:02:27,119] {scheduler_job.py:357} INFO - Not executing 
       
         since the number of tasks running or queued from DAG Slow_DAG is >= to the DAG's max_active_tasks limit of 2 scheduler_1 | [2022-02-09 10:02:27,119] {scheduler_job.py:349} INFO - DAG Slow_DAG has 2/2 running and queued tasks scheduler_1 | [2022-02-09 10:02:27,119] {scheduler_job.py:357} INFO - Not executing 
        
          since the number of tasks running or queued from DAG Slow_DAG is >= to the DAG's max_active_tasks limit of 2 scheduler_1 | [2022-02-09 10:02:27,119] {scheduler_job.py:349} INFO - DAG Slow_DAG has 2/2 running and queued tasks scheduler_1 | [2022-02-09 10:02:27,120] {scheduler_job.py:357} INFO - Not executing 
         
           since the number of tasks running or queued from DAG Slow_DAG is >= to the DAG's max_active_tasks limit of 2 scheduler_1 | [2022-02-09 10:02:27,120] {scheduler_job.py:349} INFO - DAG Slow_DAG has 2/2 running and queued tasks scheduler_1 | [2022-02-09 10:02:27,120] {scheduler_job.py:357} INFO - Not executing 
          
            since the number of tasks running or queued from DAG Slow_DAG is >= to the DAG's max_active_tasks limit of 2 scheduler_1 | [2022-02-09 10:02:27,120] {scheduler_job.py:410} INFO - Setting the following tasks to queued state: scheduler_1 | 
          
         
        
       
      
     
    
   

I have fixed this by setting

parallelism = 1000
default_pool_task_slot_count = 999

The reason why this occurs and why the solution works

We need to look at this method of the Scheduler: https://github.com/apache/airflow/blob/2.2.3/airflow/jobs/scheduler_job.py#L229
In it, the overall logic is the following:

  1. Receive max_tis(== parallelism - active running tasks from the config) in the arguments. In my case that would be 30
  2. Calculate how many free slots in all all Pools we have. In my case that would be 4.
  3. Select the minimum and update the max_tis variable: max_tis = min(4, 30) = 4
  4. Query the DB for tasks that are Scheduled in an unpaused DAGs that are running normally and order them by DAG execution date.
  5. Limit the query by max_tis.
  6. Loop over each returned task and check if we can run it. Run, if possible.

Now, since the tasks are ordered by execution date, this will return us tasks for a DAG that started first, which lets say was DAG_2. It has over 600 tasks. The LIMIT operation will return 4 of them. Each off these tasks cannot be run since there are already 2 tasks running and the max_active_tasks of the DAG is 2. Thus, we just wait till one of these tasks finish and start another task from the same DAG.

Owner
Ilya Strelnikov
Ilya Strelnikov
Solcast Integration for Home Assistant

Solcast Solar Home Assistant(https://www.home-assistant.io/) Component This custom component integrates the Solcast API into Home Assistant. Modified

Greg 45 Dec 20, 2022
Reference management solution using Python and Notion.

notion-scholar Reference management solution using Python and Notion. The main idea of this app is to allow to furnish a Notion database using a BibTe

Thomas Hirtz 69 Dec 21, 2022
A dashboard for your code. A build system.

NOTICE: THIS REPO IS NO LONGER UPDATED Changes Changes is a build coordinator and reporting solution written in Python. The project is primarily built

Dropbox 763 Sep 09, 2022
A simple string parser based on CLR to check whether a string is acceptable or not for a given grammar.

A simple string parser based on CLR to check whether a string is acceptable or not for a given grammar.

Bharath M Kulkarni 1 Dec 15, 2021
Python script which synchronizes the replica-directoty with the original-one.

directories_synchronizer Python script which synchronizes the replica-directoty with the original-one. Automatically detects all changes when script i

0 Feb 13, 2022
adbsync - An ADB syncing helper

adbsync - An ADB syncing helper What's this? Everytime I wanted to make a backup of my phone, or restore those files onto it, I had to use everytime t

Giovanni Gualtieri 3 Aug 05, 2022
A few of my adventures with Devito.

Devito-playbox A few of my adventures with Devito. This repository contains a few notebooks and scripts that will lead me in the road of learning this

Átila Saraiva Quintela Soares 1 Feb 08, 2022
NCAR/UCAR virtual Python Tutorial Seminar Series lesson on MetPy.

The Project Pythia Python Tutorial Seminar Series continues with a lesson on MetPy on Wednesday, 2 February 2022 at 1 PM Mountain Standard Time.

Project Pythia Tutorials 6 Oct 09, 2022
A Python module for decorators, wrappers and monkey patching.

wrapt The aim of the wrapt module is to provide a transparent object proxy for Python, which can be used as the basis for the construction of function

Graham Dumpleton 1.8k Jan 06, 2023
Versión preliminar análisis general de Covid-19 en Colombia

Covid_Colombia_v09 Versión: Python 3.8.8 1/ La base de datos del Ministerio de Salud (Minsalud Colombia) está en https://www.datos.gov.co/Salud-y-Prot

Julián Gómez 1 Jan 30, 2022
Craxk is a SINGLE AND NON-REPLICABLE Hash that uses data from the hardware where it is executed to form a hash that can only be reproduced by a single machine.

What is Craxk ? Craxk is a UNIQUE AND NON-REPLICABLE Hash that uses data from the hardware where it is executed to form a hash that can only be reprod

5 Jun 19, 2021
SymbLang are my programming language! Insired by the brainf**k.

SymbLang . - output as Unicode. , - input. ; - clear data. & - character that the main line start with. @value: 0 - 9 - character that the function

1 Apr 04, 2022
Simple Python-based web application to allow UGM students to fill their QR presence list without having another device in hand.

Praesentia Praesentia is a simple Python-based web application to allow UGM students to fill their QR presence list without having another device in h

loncat 20 Sep 29, 2022
CircuitPython Driver for Adafruit 24LC32 I2C EEPROM Breakout 32Kbit / 4 KB

Introduction CircuitPython driver for Adafruit 24LC32 I2C EEPROM Breakout Dependencies This driver depends on: Adafruit CircuitPython Bus Device Regis

Adafruit Industries 4 Oct 03, 2022
Yet another basic python package.

ironmelts A basic python package. Easy to use. Minimum requirements. Installing Linux python3 -m pip install -U ironmelts macOS python3 -m pip install

IRONMELTS 1 Oct 26, 2021
Saturne best tools pour baiser tout le système de discord

Installation | Important | Discord 🌟 Comme Saturne est gratuit, les dons sont vraiment appréciables et maintiennent le développement! Caractéristique

GalackQSM 8 Oct 02, 2022
A Python program for calculating the 95%CI for GNSS-derived site velocities

GNSS_Vel_95%CI A Python program for calculating the 95%CI for GNSS-derived site velocities Function_GNSS_95CI.py is a Python function for calculating

<a href=[email protected]"> 4 Dec 16, 2022
CBLang is a programming language aiming to fix most of my problems with Python

CBLang A bad programming language made in Python. CBLang is a programming language aiming to fix most of my problems with Python (this means that you

Chadderbox 43 Dec 22, 2022
MeepoBenchmark - This project aims at providing the scripts, logs, and analytic results for Meepo Blockchain

MeepoBenchmark - This project aims at providing the scripts, logs, and analytic results for Meepo Blockchain

Peilin Zheng 3 Aug 16, 2022
Algorand Python API examples

Algorand-Py Algorand Python API examples This repo will hold example scripts to monitor activities on Algorand main net. You can: Monitor your assets

Karthik Dutt 2 Jan 23, 2022