A reproduction repo for a Scheduling bug in AirFlow 2.2.3

Overview

How to run

docker-compose build
docker-compose up

Setup

Have 3 DAGs:

  1. ~240 Tasks, executes every hour, runs for about 45-50 minutes total
  2. ~600 Tasks, executes every 5 days, runs for days
  3. ~10 Tasks, executes on trigger, runs for 10-50 minutes

DAGs run on the default_pool with max_active_tasks set to 2.

My AirFlow config file has the following:

parallelism = 32
default_pool_task_slot_count = 6
executor = LocalExecutor
default_task_weight_rule = absolute

parallelism was set initially like that. default_pool_task_slot_count was set to 6, because it seemed rational that if all of my 3 dags are executing at the same time, the maximum amount of tasks that can be executed is 3*2=6.

The problem:

Almost every time any one of the DAGs is executed, no tasks from other DAGs will start until all the tasks from the first one finish. That is, if slow DAG_2 with 600 tasks starts, DAG_1 will have to wait for days to start even a single Task.

the_bug

The Logs for the Scheduler look like this:

scheduler_1  | 
scheduler_1  | [2022-02-09 10:02:27,116] {scheduler_job.py:288} INFO - 4 tasks up for execution:
scheduler_1  |  
   
    
scheduler_1  |  
    
     
scheduler_1  |  
     
      
scheduler_1  |  
      
       
scheduler_1  | [2022-02-09 10:02:27,119] {scheduler_job.py:322} INFO - Figuring out tasks to run in Pool(name=default_pool) with 4 open slots and 4 task instances ready to be queued
scheduler_1  | [2022-02-09 10:02:27,119] {scheduler_job.py:349} INFO - DAG Slow_DAG has 2/2 running and queued tasks
scheduler_1  | [2022-02-09 10:02:27,119] {scheduler_job.py:357} INFO - Not executing 
       
         since the number of tasks running or queued from DAG Slow_DAG is >= to the DAG's max_active_tasks limit of 2 scheduler_1 | [2022-02-09 10:02:27,119] {scheduler_job.py:349} INFO - DAG Slow_DAG has 2/2 running and queued tasks scheduler_1 | [2022-02-09 10:02:27,119] {scheduler_job.py:357} INFO - Not executing 
        
          since the number of tasks running or queued from DAG Slow_DAG is >= to the DAG's max_active_tasks limit of 2 scheduler_1 | [2022-02-09 10:02:27,119] {scheduler_job.py:349} INFO - DAG Slow_DAG has 2/2 running and queued tasks scheduler_1 | [2022-02-09 10:02:27,120] {scheduler_job.py:357} INFO - Not executing 
         
           since the number of tasks running or queued from DAG Slow_DAG is >= to the DAG's max_active_tasks limit of 2 scheduler_1 | [2022-02-09 10:02:27,120] {scheduler_job.py:349} INFO - DAG Slow_DAG has 2/2 running and queued tasks scheduler_1 | [2022-02-09 10:02:27,120] {scheduler_job.py:357} INFO - Not executing 
          
            since the number of tasks running or queued from DAG Slow_DAG is >= to the DAG's max_active_tasks limit of 2 scheduler_1 | [2022-02-09 10:02:27,120] {scheduler_job.py:410} INFO - Setting the following tasks to queued state: scheduler_1 | 
          
         
        
       
      
     
    
   

I have fixed this by setting

parallelism = 1000
default_pool_task_slot_count = 999

The reason why this occurs and why the solution works

We need to look at this method of the Scheduler: https://github.com/apache/airflow/blob/2.2.3/airflow/jobs/scheduler_job.py#L229
In it, the overall logic is the following:

  1. Receive max_tis(== parallelism - active running tasks from the config) in the arguments. In my case that would be 30
  2. Calculate how many free slots in all all Pools we have. In my case that would be 4.
  3. Select the minimum and update the max_tis variable: max_tis = min(4, 30) = 4
  4. Query the DB for tasks that are Scheduled in an unpaused DAGs that are running normally and order them by DAG execution date.
  5. Limit the query by max_tis.
  6. Loop over each returned task and check if we can run it. Run, if possible.

Now, since the tasks are ordered by execution date, this will return us tasks for a DAG that started first, which lets say was DAG_2. It has over 600 tasks. The LIMIT operation will return 4 of them. Each off these tasks cannot be run since there are already 2 tasks running and the max_active_tasks of the DAG is 2. Thus, we just wait till one of these tasks finish and start another task from the same DAG.

Owner
Ilya Strelnikov
Ilya Strelnikov
Pampy: The Pattern Matching for Python you always dreamed of.

Pampy: Pattern Matching for Python Pampy is pretty small (150 lines), reasonably fast, and often makes your code more readable and hence easier to rea

Claudio Santini 3.5k Dec 30, 2022
Checkers Project Built Using Python

Checkers Project Built Using Python

Meekness Anyaeche 1 Nov 08, 2021
Fetch data from an excel file and create HTML file

excel-to-html Problem Statement! - Fetch data from excel file and create html file Excel.xlsx file contain the information.in multiple rows that is ne

Vivek Kashyap 1 Oct 25, 2021
Flames Calculater App used to calculate flames status between two names created using python's Flask web framework.

Flames Finder Web App Flames Calculater App used to calculate flames status between two names created using python's Flask web framework. First, App g

Siva Prakash 4 Jan 02, 2022
The docker-based Open edX distribution designed for peace of mind

Tutor: the docker-based Open edX distribution designed for peace of mind Tutor is a docker-based Open edX distribution, both for production and local

Overhang.IO 696 Dec 31, 2022
Python Programmma DarkMap.py

DarkMap Python Programmma DarkMap.py O'rganish va rasmlarni ko'riosh https://drive.google.com/drive/folders/1l1zybs_0Zy9z_trZYz5R72WrwsE6mFOh?usp=shar

Og'abek 0 May 06, 2022
Telegram bot to search quotes from brainyquote.com

Brainy Quote Bot @BrainQuoteBot A star ⭐ from you means a lot to us! Telegram bot to search quotes from brainyquote.com Usage Deploy to Heroku Tap on

21 Nov 24, 2022
Used the pyautogui library to automate some processes on the computer

Pyautogui Utilizei a biblioteca pyautogui para automatizar alguns processos no c

Dheovani Xavier 1 Dec 30, 2021
πŸ“¦ A Human's Ultimate Guide to setup.py.

πŸ“¦ setup.py (for humans) This repo exists to provide an example setup.py file, that can be used to bootstrap your next Python project. It includes som

Navdeep Gill 5k Jan 04, 2023
Chemical equation balancer

Chemical equation balancer Balance your chemical equations with ease! Installation $ git clone

Marijan Smetko 4 Nov 26, 2022
Launcher program to select which version of the Q-Sys software to launch.

QSC-QSYS Launcher Launcher program to select which version of the Q-Sys software to launch. Instructions To use the application simply save the "Q-Sys

Zach Lisko 2 Sep 28, 2022
This is a program for Carbon Emission calculator.

Summary This is a program for Carbon Emission calculator. Usage This will calculate the carbon emission by each person on various factors. Contributor

Ankit Rane 2 Feb 18, 2022
Free Data Engineering course!

Data Engineering Zoomcamp Register in DataTalks.Club's Slack Join the #course-data-engineering channel The videos are published to DataTalks.Club's Yo

DataTalksClub 7.3k Dec 30, 2022
Demo scripts for the Kubernetes Security Webinar

Kubernetes Security Webinar [in Russian] YouTube video (October 13, 2021) Authors: Artem Yushkovsky (LinkedIn, GitHub) Maxim Mosharov @ Whitespots.io

Slurm 34 Dec 06, 2022
Reference python implementation of Chia pool operations for pool operators

This repository provides a sample server written in python, which is meant to server as a basis for a Chia Pool. While this is a fully functional implementation, it requires some work in scalability

Chia Network 451 Dec 13, 2022
a simple proof system I made to learn math without any mistakes

math_up a simple proof system I made to learn math without any mistakes 0. Short Introduction test yourself, enjoy your math! math_up is an NBG-based,

μ–‘ν˜„μš° 5 Jun 04, 2021
Hacktoberfest2021 πŸ₯³- Contribute Any Pattern In Any Language😎 Every PR will be accepted Pls contribute

✨ Hacktober Fest 2021 ✨ πŸ™‚ All Contributors are requested to star this repo and follow me for a successful merge of pull request. πŸ™‚ πŸ‘‰ Add any patter

Md. Almas Ali 103 Jan 07, 2023
Import Apex legends mprt files exported from Legion

Apex-mprt-importer-for-Blender Import Apex legends mprt files exported from Legion. REQUIRES CAST IMPORTER Usage: Use a VPK extracter to extract the m

15 Dec 18, 2022
little proyect to organize myself, but maybe can help someone else

TaskXT 0.1 Little proyect to organize myself, but maybe can help someone else Idea The main idea is to ogranize you work and stuff to do, but with onl

Gabriel Carmona 4 Oct 03, 2021
Wrappers around the most common maya.cmds and maya.api use cases

Maya FunctionSet (maya_fn) A package that decompose core maya.cmds and maya.api features to a set of simple functions. Tests The recommended approach

Ryan Porter 9 Mar 12, 2022