A reproduction repo for a Scheduling bug in AirFlow 2.2.3

Overview

How to run

docker-compose build
docker-compose up

Setup

Have 3 DAGs:

  1. ~240 Tasks, executes every hour, runs for about 45-50 minutes total
  2. ~600 Tasks, executes every 5 days, runs for days
  3. ~10 Tasks, executes on trigger, runs for 10-50 minutes

DAGs run on the default_pool with max_active_tasks set to 2.

My AirFlow config file has the following:

parallelism = 32
default_pool_task_slot_count = 6
executor = LocalExecutor
default_task_weight_rule = absolute

parallelism was set initially like that. default_pool_task_slot_count was set to 6, because it seemed rational that if all of my 3 dags are executing at the same time, the maximum amount of tasks that can be executed is 3*2=6.

The problem:

Almost every time any one of the DAGs is executed, no tasks from other DAGs will start until all the tasks from the first one finish. That is, if slow DAG_2 with 600 tasks starts, DAG_1 will have to wait for days to start even a single Task.

the_bug

The Logs for the Scheduler look like this:

scheduler_1  | 
scheduler_1  | [2022-02-09 10:02:27,116] {scheduler_job.py:288} INFO - 4 tasks up for execution:
scheduler_1  |  
   
    
scheduler_1  |  
    
     
scheduler_1  |  
     
      
scheduler_1  |  
      
       
scheduler_1  | [2022-02-09 10:02:27,119] {scheduler_job.py:322} INFO - Figuring out tasks to run in Pool(name=default_pool) with 4 open slots and 4 task instances ready to be queued
scheduler_1  | [2022-02-09 10:02:27,119] {scheduler_job.py:349} INFO - DAG Slow_DAG has 2/2 running and queued tasks
scheduler_1  | [2022-02-09 10:02:27,119] {scheduler_job.py:357} INFO - Not executing 
       
         since the number of tasks running or queued from DAG Slow_DAG is >= to the DAG's max_active_tasks limit of 2 scheduler_1 | [2022-02-09 10:02:27,119] {scheduler_job.py:349} INFO - DAG Slow_DAG has 2/2 running and queued tasks scheduler_1 | [2022-02-09 10:02:27,119] {scheduler_job.py:357} INFO - Not executing 
        
          since the number of tasks running or queued from DAG Slow_DAG is >= to the DAG's max_active_tasks limit of 2 scheduler_1 | [2022-02-09 10:02:27,119] {scheduler_job.py:349} INFO - DAG Slow_DAG has 2/2 running and queued tasks scheduler_1 | [2022-02-09 10:02:27,120] {scheduler_job.py:357} INFO - Not executing 
         
           since the number of tasks running or queued from DAG Slow_DAG is >= to the DAG's max_active_tasks limit of 2 scheduler_1 | [2022-02-09 10:02:27,120] {scheduler_job.py:349} INFO - DAG Slow_DAG has 2/2 running and queued tasks scheduler_1 | [2022-02-09 10:02:27,120] {scheduler_job.py:357} INFO - Not executing 
          
            since the number of tasks running or queued from DAG Slow_DAG is >= to the DAG's max_active_tasks limit of 2 scheduler_1 | [2022-02-09 10:02:27,120] {scheduler_job.py:410} INFO - Setting the following tasks to queued state: scheduler_1 | 
          
         
        
       
      
     
    
   

I have fixed this by setting

parallelism = 1000
default_pool_task_slot_count = 999

The reason why this occurs and why the solution works

We need to look at this method of the Scheduler: https://github.com/apache/airflow/blob/2.2.3/airflow/jobs/scheduler_job.py#L229
In it, the overall logic is the following:

  1. Receive max_tis(== parallelism - active running tasks from the config) in the arguments. In my case that would be 30
  2. Calculate how many free slots in all all Pools we have. In my case that would be 4.
  3. Select the minimum and update the max_tis variable: max_tis = min(4, 30) = 4
  4. Query the DB for tasks that are Scheduled in an unpaused DAGs that are running normally and order them by DAG execution date.
  5. Limit the query by max_tis.
  6. Loop over each returned task and check if we can run it. Run, if possible.

Now, since the tasks are ordered by execution date, this will return us tasks for a DAG that started first, which lets say was DAG_2. It has over 600 tasks. The LIMIT operation will return 4 of them. Each off these tasks cannot be run since there are already 2 tasks running and the max_active_tasks of the DAG is 2. Thus, we just wait till one of these tasks finish and start another task from the same DAG.

Owner
Ilya Strelnikov
Ilya Strelnikov
J MBF - Assalamualaikum Mamang...

★ VISITOR ★ ★ INFORMATION ★ Script Ini DiBuat Oleh YayanXD Script Ini Akan DiPerjual Belikan Tanggal 9 Januari 2022 Jika Mau Beli Script Silahkan Hub

Risky [ Zero Tow ] 5 Apr 08, 2022
Blender addon to import images as meshes

ImagesAsMesh Blender addon to import images as meshes. Inspired by: ImagesAsPlanes Installation It's like just about every other Blender addon. Downlo

Niccolo Zuppichini 4 Jan 04, 2022
Desenvolvendo as habilidades básicas de programação visando a construção de aplicativos por meio de bibliotecas apropriadas à Ciência de Dados.

Algoritmos e Introdução à Computação Ementa: Conceitos básicos sobre algoritmos e métodos para sua construção. Tipos de dados e variáveis. Estruturas

Dyanna Cruz 1 Jan 06, 2022
Intelligent Systems Project In Python

Intelligent Systems Project In Python

RLLAB 3 May 16, 2022
A general purpose low level programming language written in Python.

A general purpose low level programming language written in Python. Basal is an easy mid level programming language compiling to C. It has an easy syntax, similar to Python, Rust etc.

Snm Logic 6 Mar 30, 2022
Web service which feeds Navitia with real-time disruptions

Chaos Chaos is the web service which can feed Navitia with real-time disruptions. It can work together with Kirin which can feed Navitia with real-tim

KISIO Digital 7 Jan 07, 2022
Architecture example simulator

SCADA architecture Example of a SCADA-like console application, used to serve as a minimal example of a standard architecture of an IIoT system. Insta

1 Nov 06, 2021
Dicionario-git-github - Dictionary created to help train new users of Git and GitHub applications

Dicionário 📕 Dicionário criado com o objetivo de auxiliar no treinamento de nov

Felippe Rafael 1 Feb 07, 2022
Mmr image postbot - Бот для создания изображений с новыми релизами в сообщество ВК MMR Aggregator

Mmr image postbot - Бот для создания изображений с новыми релизами в сообщество ВК MMR Aggregator

Max 3 Jan 07, 2022
Animation retargeting tool for Autodesk Maya. Retargets mocap to a custom rig with a few clicks.

Animation Retargeting Tool for Maya A tool for transferring animation data between rigs or transfer raw mocap from a skeleton to a custom rig. (The sc

Joaen 62 Dec 19, 2022
Lags valorant servers by rapidly picking up and throwing shorties.

Lags valorant servers by rapidly picking up and throwing shorties.

Eric Still 9 Dec 30, 2021
SDX: Software Defined Internet Exchange

Installation steps: Download and import the Internet2-SDX virtual machine (VM) image, below, in VirtualBox and you are all set :) $ wget http://sites.

Software Defined Internet Exchange Point 15 Nov 21, 2021
Some out-of-the-box hooks for pre-commit

pre-commit-hooks Some out-of-the-box hooks for pre-commit. See also: https://github.com/pre-commit/pre-commit Using pre-commit-hooks with pre-commit A

pre-commit 3.6k Dec 29, 2022
Run CodeServer on Google Colab using Inlets in less than 60 secs using your own domain.

Inlets Colab Run CodeServer on Colab using Inlets in less than 60 secs using your own domain. Features Optimized for Inlets/InletsPro Use your own Cus

2 Dec 30, 2021
Python script for changing the SSH banner content with other content

Banner-changer-py Python script for changing the SSH banner content with other content. The Script will take the content of a specified file range and

2 Nov 23, 2021
A (hopefully) considerably copious collection of classical cipher crackers

ClassicalCipherCracker A (hopefully) considerably copious collection of classical cipher crackers Written in Python3 (and run with PyPy) TODOs Write a

Stanley Zhong 2 Feb 22, 2022
Solcast Integration for Home Assistant

Solcast Solar Home Assistant(https://www.home-assistant.io/) Component This custom component integrates the Solcast API into Home Assistant. Modified

Greg 45 Dec 20, 2022
PyMedPhys is an open-source Medical Physics python library

PyMedPhys is an open-source Medical Physics python library built by an open community that values and prioritises code sharing, review, improvement, and learning from each other. I

PyMedPhys 238 Dec 27, 2022
Example python package with pybind11 cpp extension

Developing C++ extension in Python using pybind11 This is a summary of the commands used in the tutorial.

55 Sep 04, 2022
A check numbers python module

Made with Python3 (C) @FayasNoushad Copyright permission under MIT License License - https://github.com/FayasNoushad/Numbers/blob/main/LICENSE Deplo

Fayas Noushad 3 Nov 28, 2021