A reproduction repo for a Scheduling bug in AirFlow 2.2.3

Overview

How to run

docker-compose build
docker-compose up

Setup

Have 3 DAGs:

  1. ~240 Tasks, executes every hour, runs for about 45-50 minutes total
  2. ~600 Tasks, executes every 5 days, runs for days
  3. ~10 Tasks, executes on trigger, runs for 10-50 minutes

DAGs run on the default_pool with max_active_tasks set to 2.

My AirFlow config file has the following:

parallelism = 32
default_pool_task_slot_count = 6
executor = LocalExecutor
default_task_weight_rule = absolute

parallelism was set initially like that. default_pool_task_slot_count was set to 6, because it seemed rational that if all of my 3 dags are executing at the same time, the maximum amount of tasks that can be executed is 3*2=6.

The problem:

Almost every time any one of the DAGs is executed, no tasks from other DAGs will start until all the tasks from the first one finish. That is, if slow DAG_2 with 600 tasks starts, DAG_1 will have to wait for days to start even a single Task.

the_bug

The Logs for the Scheduler look like this:

scheduler_1  | 
scheduler_1  | [2022-02-09 10:02:27,116] {scheduler_job.py:288} INFO - 4 tasks up for execution:
scheduler_1  |  
   
    
scheduler_1  |  
    
     
scheduler_1  |  
     
      
scheduler_1  |  
      
       
scheduler_1  | [2022-02-09 10:02:27,119] {scheduler_job.py:322} INFO - Figuring out tasks to run in Pool(name=default_pool) with 4 open slots and 4 task instances ready to be queued
scheduler_1  | [2022-02-09 10:02:27,119] {scheduler_job.py:349} INFO - DAG Slow_DAG has 2/2 running and queued tasks
scheduler_1  | [2022-02-09 10:02:27,119] {scheduler_job.py:357} INFO - Not executing 
       
         since the number of tasks running or queued from DAG Slow_DAG is >= to the DAG's max_active_tasks limit of 2 scheduler_1 | [2022-02-09 10:02:27,119] {scheduler_job.py:349} INFO - DAG Slow_DAG has 2/2 running and queued tasks scheduler_1 | [2022-02-09 10:02:27,119] {scheduler_job.py:357} INFO - Not executing 
        
          since the number of tasks running or queued from DAG Slow_DAG is >= to the DAG's max_active_tasks limit of 2 scheduler_1 | [2022-02-09 10:02:27,119] {scheduler_job.py:349} INFO - DAG Slow_DAG has 2/2 running and queued tasks scheduler_1 | [2022-02-09 10:02:27,120] {scheduler_job.py:357} INFO - Not executing 
         
           since the number of tasks running or queued from DAG Slow_DAG is >= to the DAG's max_active_tasks limit of 2 scheduler_1 | [2022-02-09 10:02:27,120] {scheduler_job.py:349} INFO - DAG Slow_DAG has 2/2 running and queued tasks scheduler_1 | [2022-02-09 10:02:27,120] {scheduler_job.py:357} INFO - Not executing 
          
            since the number of tasks running or queued from DAG Slow_DAG is >= to the DAG's max_active_tasks limit of 2 scheduler_1 | [2022-02-09 10:02:27,120] {scheduler_job.py:410} INFO - Setting the following tasks to queued state: scheduler_1 | 
          
         
        
       
      
     
    
   

I have fixed this by setting

parallelism = 1000
default_pool_task_slot_count = 999

The reason why this occurs and why the solution works

We need to look at this method of the Scheduler: https://github.com/apache/airflow/blob/2.2.3/airflow/jobs/scheduler_job.py#L229
In it, the overall logic is the following:

  1. Receive max_tis(== parallelism - active running tasks from the config) in the arguments. In my case that would be 30
  2. Calculate how many free slots in all all Pools we have. In my case that would be 4.
  3. Select the minimum and update the max_tis variable: max_tis = min(4, 30) = 4
  4. Query the DB for tasks that are Scheduled in an unpaused DAGs that are running normally and order them by DAG execution date.
  5. Limit the query by max_tis.
  6. Loop over each returned task and check if we can run it. Run, if possible.

Now, since the tasks are ordered by execution date, this will return us tasks for a DAG that started first, which lets say was DAG_2. It has over 600 tasks. The LIMIT operation will return 4 of them. Each off these tasks cannot be run since there are already 2 tasks running and the max_active_tasks of the DAG is 2. Thus, we just wait till one of these tasks finish and start another task from the same DAG.

Owner
Ilya Strelnikov
Ilya Strelnikov
It is Keqin Wang first project in CMU, trying to use DRL(PPO) to control a 5-dof manipulator to draw line in space.

5dof-robot-writing this project aim to use PPO control a 5 dof manipulator to draw lines in 3d space. Introduction to the files the pybullet environme

Keqin Wang 4 Aug 22, 2022
Visualization of COVID-19 Omicron wave data in Seoul, Osaka, Tokyo, Hong Kong and Shanghai. 首尔、大阪、东京、香港、上海由新冠病毒 Omicron 变异株引起的本轮疫情数据可视化分析。

COVID-19 in East Asian Megacities This repository holds original Python code for processing and visualization COVID-19 data in East Asian megacities a

STONE 10 May 18, 2022
Sodium is a general purpose programming language which is instruction-oriented

Sodium is a general purpose programming language which is instruction-oriented (a new programming concept that we are developing and devising)

Satin Wuker 22 Jan 11, 2022
A web interface for a soft serve Git server.

Soft Serve monitor Soft Sevre is a very nice git server. It offers a really nice TUI to browse the repositories on the server. Unfortunately, it does

Maxime Bouillot 5 Apr 26, 2022
Mengzhan (John) code for Closed Loop Control system of Sharp Wave Ripples in Hippocampus CA3 region

ClosedLoopControl_Yu Mengzhan (John) code for Closed Loop Control system of Sharp Wave Ripples in Hippocampus CA3 region Creating Python Virtual Envir

Mengzhan (John) Liufu 1 Jan 22, 2022
[CVPR 2020] Rethinking Class-Balanced Methods for Long-Tailed Visual Recognition from a Domain Adaptation Perspective

Rethinking Class-Balanced Methods for Long-Tailed Visual Recognition from a Domain Adaptation Perspective [Arxiv] This is PyTorch implementation of th

Abdullah Jamal 22 Nov 19, 2022
Rofi script to minimize / unminimize multiple windows in qtile

Qminimize Rofi script to minimize / unminimize multiple windows in qtile Additional requirements : EWMH module fuzzywuzzy module How to use it : - Clo

9 Sep 18, 2022
Get you an ultimate lexer generator using Fable; port OCaml sedlex to FSharp, Python and more!

NOTE: currently we support interpreted mode and Python source code generation. It's EASY to compile compiled_unit into source code for C#, F# and othe

Taine Zhao 15 Aug 06, 2022
Simple package to make requests throughout Tor with circuit renewal.

AutoTor Table of Contents About the Project Contents Dependencies Getting Started Installation Coding Contributing About the Project Simple package to

Salvador Belenguer 6 Jan 01, 2023
Iss-tracker - ISS tracking script in python using NASA's API

ISS Tracker Tracking International Space Station using NASA's API and plotting i

Partho 9 Nov 29, 2022
Banking management project using Tkinter GUI in python.

Bank-Management Banking management project using Tkinter GUI in python. Packages required Tkinter - Tkinter is the standard GUI library for Python. sq

Anjali Kumawat 7 Jul 03, 2022
Mail Me My Social Media stats (SoMeMailMe)

Mail Me My Social Media follower count (SoMeMailMe) TikTok only show data 60 days back in time. With this repo you can easily scrape your follower cou

Daniel Wigh 1 Jan 07, 2022
Stack BOF Protection Bypass Techniques

Stack Buffer Overflow - Protection Bypass Techniques

ommadawn46 18 Dec 28, 2022
Minimalist BERT implementation assignment for CS11-747

minbert Assignment by Zhengbao Jiang, Shuyan Zhou, and Ritam Dutt This is an exercise in developing a minimalist version of BERT, part of Carnegie Mel

Graham Neubig 51 Jan 03, 2023
Um pequeno painel de consulta grátis.

[PAINEL-DE-CONSULTA 3.8(BETA)] · Confira meu canal do YouTube. Clique aqui! Nota: Próxima Atualização será a última com coisas novas, o resto será par

276 Jan 05, 2023
An easy python calculator for those who want's to know how if statements, loops, and imports works give it a try!

A usefull calculator for any student or anyone who want's to know how to build a simple 2 mode python based calculator.

Antonio Sánchez 1 Jan 06, 2022
VHDL to Discrete Logic on PCB Flow

PCBFlow Highly experimental set of scripts to transform a digital circuit described in a hardware description language (VHDL or Verilog) into a discre

Tim 77 Nov 04, 2022
Up to date simple useragent faker with real world database

fake-useragent info: Up to date simple useragent faker with real world database Features grabs up to date useragent from useragentstring.com randomize

Victor K. 2.9k Jan 04, 2023
Exam assignment for Laboratory of Bioinformatics 2

Exam assignment for Laboratory of Bioinformatics 2 (Alma Mater University of Bologna, Master in Bioinformatics)

2 Oct 22, 2022
Python Interactive Graphical System made during Computer Graphics classes (INE5420-2021.1)

PY-IGS - The PYthon Interactive Graphical System The PY-IGS Installation To install this software you will need these dependencies (with their thevelo

Enzo Coelho Albornoz 4 Dec 03, 2021