A reproduction repo for a Scheduling bug in AirFlow 2.2.3

Overview

How to run

docker-compose build
docker-compose up

Setup

Have 3 DAGs:

  1. ~240 Tasks, executes every hour, runs for about 45-50 minutes total
  2. ~600 Tasks, executes every 5 days, runs for days
  3. ~10 Tasks, executes on trigger, runs for 10-50 minutes

DAGs run on the default_pool with max_active_tasks set to 2.

My AirFlow config file has the following:

parallelism = 32
default_pool_task_slot_count = 6
executor = LocalExecutor
default_task_weight_rule = absolute

parallelism was set initially like that. default_pool_task_slot_count was set to 6, because it seemed rational that if all of my 3 dags are executing at the same time, the maximum amount of tasks that can be executed is 3*2=6.

The problem:

Almost every time any one of the DAGs is executed, no tasks from other DAGs will start until all the tasks from the first one finish. That is, if slow DAG_2 with 600 tasks starts, DAG_1 will have to wait for days to start even a single Task.

the_bug

The Logs for the Scheduler look like this:

scheduler_1  | 
scheduler_1  | [2022-02-09 10:02:27,116] {scheduler_job.py:288} INFO - 4 tasks up for execution:
scheduler_1  |  
   
    
scheduler_1  |  
    
     
scheduler_1  |  
     
      
scheduler_1  |  
      
       
scheduler_1  | [2022-02-09 10:02:27,119] {scheduler_job.py:322} INFO - Figuring out tasks to run in Pool(name=default_pool) with 4 open slots and 4 task instances ready to be queued
scheduler_1  | [2022-02-09 10:02:27,119] {scheduler_job.py:349} INFO - DAG Slow_DAG has 2/2 running and queued tasks
scheduler_1  | [2022-02-09 10:02:27,119] {scheduler_job.py:357} INFO - Not executing 
       
         since the number of tasks running or queued from DAG Slow_DAG is >= to the DAG's max_active_tasks limit of 2 scheduler_1 | [2022-02-09 10:02:27,119] {scheduler_job.py:349} INFO - DAG Slow_DAG has 2/2 running and queued tasks scheduler_1 | [2022-02-09 10:02:27,119] {scheduler_job.py:357} INFO - Not executing 
        
          since the number of tasks running or queued from DAG Slow_DAG is >= to the DAG's max_active_tasks limit of 2 scheduler_1 | [2022-02-09 10:02:27,119] {scheduler_job.py:349} INFO - DAG Slow_DAG has 2/2 running and queued tasks scheduler_1 | [2022-02-09 10:02:27,120] {scheduler_job.py:357} INFO - Not executing 
         
           since the number of tasks running or queued from DAG Slow_DAG is >= to the DAG's max_active_tasks limit of 2 scheduler_1 | [2022-02-09 10:02:27,120] {scheduler_job.py:349} INFO - DAG Slow_DAG has 2/2 running and queued tasks scheduler_1 | [2022-02-09 10:02:27,120] {scheduler_job.py:357} INFO - Not executing 
          
            since the number of tasks running or queued from DAG Slow_DAG is >= to the DAG's max_active_tasks limit of 2 scheduler_1 | [2022-02-09 10:02:27,120] {scheduler_job.py:410} INFO - Setting the following tasks to queued state: scheduler_1 | 
          
         
        
       
      
     
    
   

I have fixed this by setting

parallelism = 1000
default_pool_task_slot_count = 999

The reason why this occurs and why the solution works

We need to look at this method of the Scheduler: https://github.com/apache/airflow/blob/2.2.3/airflow/jobs/scheduler_job.py#L229
In it, the overall logic is the following:

  1. Receive max_tis(== parallelism - active running tasks from the config) in the arguments. In my case that would be 30
  2. Calculate how many free slots in all all Pools we have. In my case that would be 4.
  3. Select the minimum and update the max_tis variable: max_tis = min(4, 30) = 4
  4. Query the DB for tasks that are Scheduled in an unpaused DAGs that are running normally and order them by DAG execution date.
  5. Limit the query by max_tis.
  6. Loop over each returned task and check if we can run it. Run, if possible.

Now, since the tasks are ordered by execution date, this will return us tasks for a DAG that started first, which lets say was DAG_2. It has over 600 tasks. The LIMIT operation will return 4 of them. Each off these tasks cannot be run since there are already 2 tasks running and the max_active_tasks of the DAG is 2. Thus, we just wait till one of these tasks finish and start another task from the same DAG.

Owner
Ilya Strelnikov
Ilya Strelnikov
Python implementation of the ASFLIP advection method

This is a python implementation of the ASFLIP advection method . We would like to hear from you if you appreciate this work.

Raymond Yun Fei 133 Nov 13, 2022
Mail Me My Social Media stats (SoMeMailMe)

Mail Me My Social Media follower count (SoMeMailMe) TikTok only show data 60 days back in time. With this repo you can easily scrape your follower cou

Daniel Wigh 1 Jan 07, 2022
Canim1 - Simple python tool to search for packages without m1 wheels in poetry lockfiles

canim1 Usage Clone the repo. Run poetry install. Then you can use the tool: ❯ po

Korijn van Golen 1 Jan 25, 2022
Web interface for browsing, search and filtering recent arxiv submissions

Web interface for browsing, search and filtering recent arxiv submissions

Andrej 4.8k Jan 08, 2023
A python script made for personal use to monitor for sports card restocks on target.com since they are sold out often

TargetProductMonitor A python script made for personal use to monitor for sports card resocks on target.com since they are sold out often. When a rest

Bryan Lorden 2 Jul 31, 2022
Rename and categorize your DMOJ solutions

DMOJ Downloader What is this for? DMOJ lets you download the code for all your solutions, however the files are just named as numbers

Evan Wild 1 Dec 04, 2022
An Advent calendar of small programming puzzles for a variety of skill sets and skill levels.

Advent of Code 2021 The Advent of Code is an Advent calendar of small programming puzzles for a variety of skill sets and skill levels that can be sol

Evan Cope 0 Feb 13, 2022
Location of public benchmarking; primarily final results

CSL_public_benchmark This repo is intended to provide a periodically-updated, public view into genome sequencing benchmarks managed by HudsonAlpha's C

HudsonAlpha Institute for Biotechnology 15 Jun 13, 2022
python for windows extensions

This is the readme for the Python for Win32 (pywin32) extensions source code. See CHANGES.txt for recent changes. 'setup.py' is a standard distutils

27 Dec 08, 2022
Metal Gear Rising: Revengeance's DAT archive (un)packer

DOOMP Metal Gear Rising: Revengeance's DAT archive (un)packer

Christopher Holzmann Pérez 5 Sep 02, 2022
Ssma is a tool that helps you collect your badges in a satr platform

satr-statistics-maker ssma is a tool that helps you collect your badges in a satr platform 🎖️ Requirements python = 3.7 Installation first clone the

TheAwiteb 3 Jan 04, 2022
Process GPX files (adding sensor metrics, uploading to InfluxDB, etc.) exported from imxingzhe.com

Xingzhe GPX Processor 行者轨迹处理工具 Xingzhe sells cheap GPS bike meters with sensor support including cadence, heart rate and power. But the GPX files expo

Shengqi Chen 8 Sep 23, 2022
Enjoyable scripting experience with Python

Enjoyable scripting experience with Python

8 Jun 08, 2022
a bit of my project :) and I use some of them for my school lesson or study for an exam! but some of them just for myself.

Handy Project a bit of my project :) and I use some of them for my school lesson or study for an exam! but some of them just for myself. the handy pro

amirkasra esmaeilian 13 Jul 05, 2021
A shim for the typeshed changes in mypy 0.900

types-all A shim for the typeshed changes in mypy 0.900 installation pip install types-all why --install-types is annoying, this installs all the thin

Anthony Sottile 28 Oct 20, 2022
Let’s Play with Python3

Python3-FirstEdition a bunch of python programs and stuff Super Important Notice THIS IS LICENSED UNDER GNU PUBLIC LICENSE V3 also, refer to Contribut

Jym Patel 2 Nov 24, 2022
Python library for converting Python calculations into rendered latex.

Covert art by Joshua Hoiberg handcalcs: Python calculations in Jupyter, as though you wrote them by hand. handcalcs is a library to render Python calc

Connor Ferster 5.1k Jan 07, 2023
Implemented Exploratory Data Analysis (EDA) using Python.Built a dashboard in Tableau and found that 45.87% of People suffer from heart disease.

Heart_Disease_Diagnostic_Analysis Objective 🎯 The aim of this project is to use the given data and perform ETL and data analysis to infer key metrics

Sultan Shaikh 4 Jan 28, 2022
mrcal is a generic toolkit to solve calibration and SFM-like problems originating at NASA/JPL

mrcal is a generic toolkit to solve calibration and SFM-like problems originating at NASA/JPL. Functionality related to these problems is exposed as a set of C and Python libraries and some commandli

Dima Kogan 102 Dec 23, 2022
In this project, we'll be creating a virtual personal assistant for ourselves using our favorite programming language

In this project, we'll be creating a virtual personal assistant for ourselves using our favorite programming language, Python. We can perform several offline as well as online operations using the bo

Ashutosh Krishna 188 Jan 03, 2023