A reproduction repo for a Scheduling bug in AirFlow 2.2.3

Overview

How to run

docker-compose build
docker-compose up

Setup

Have 3 DAGs:

  1. ~240 Tasks, executes every hour, runs for about 45-50 minutes total
  2. ~600 Tasks, executes every 5 days, runs for days
  3. ~10 Tasks, executes on trigger, runs for 10-50 minutes

DAGs run on the default_pool with max_active_tasks set to 2.

My AirFlow config file has the following:

parallelism = 32
default_pool_task_slot_count = 6
executor = LocalExecutor
default_task_weight_rule = absolute

parallelism was set initially like that. default_pool_task_slot_count was set to 6, because it seemed rational that if all of my 3 dags are executing at the same time, the maximum amount of tasks that can be executed is 3*2=6.

The problem:

Almost every time any one of the DAGs is executed, no tasks from other DAGs will start until all the tasks from the first one finish. That is, if slow DAG_2 with 600 tasks starts, DAG_1 will have to wait for days to start even a single Task.

the_bug

The Logs for the Scheduler look like this:

scheduler_1  | 
scheduler_1  | [2022-02-09 10:02:27,116] {scheduler_job.py:288} INFO - 4 tasks up for execution:
scheduler_1  |  
   
    
scheduler_1  |  
    
     
scheduler_1  |  
     
      
scheduler_1  |  
      
       
scheduler_1  | [2022-02-09 10:02:27,119] {scheduler_job.py:322} INFO - Figuring out tasks to run in Pool(name=default_pool) with 4 open slots and 4 task instances ready to be queued
scheduler_1  | [2022-02-09 10:02:27,119] {scheduler_job.py:349} INFO - DAG Slow_DAG has 2/2 running and queued tasks
scheduler_1  | [2022-02-09 10:02:27,119] {scheduler_job.py:357} INFO - Not executing 
       
         since the number of tasks running or queued from DAG Slow_DAG is >= to the DAG's max_active_tasks limit of 2 scheduler_1 | [2022-02-09 10:02:27,119] {scheduler_job.py:349} INFO - DAG Slow_DAG has 2/2 running and queued tasks scheduler_1 | [2022-02-09 10:02:27,119] {scheduler_job.py:357} INFO - Not executing 
        
          since the number of tasks running or queued from DAG Slow_DAG is >= to the DAG's max_active_tasks limit of 2 scheduler_1 | [2022-02-09 10:02:27,119] {scheduler_job.py:349} INFO - DAG Slow_DAG has 2/2 running and queued tasks scheduler_1 | [2022-02-09 10:02:27,120] {scheduler_job.py:357} INFO - Not executing 
         
           since the number of tasks running or queued from DAG Slow_DAG is >= to the DAG's max_active_tasks limit of 2 scheduler_1 | [2022-02-09 10:02:27,120] {scheduler_job.py:349} INFO - DAG Slow_DAG has 2/2 running and queued tasks scheduler_1 | [2022-02-09 10:02:27,120] {scheduler_job.py:357} INFO - Not executing 
          
            since the number of tasks running or queued from DAG Slow_DAG is >= to the DAG's max_active_tasks limit of 2 scheduler_1 | [2022-02-09 10:02:27,120] {scheduler_job.py:410} INFO - Setting the following tasks to queued state: scheduler_1 | 
          
         
        
       
      
     
    
   

I have fixed this by setting

parallelism = 1000
default_pool_task_slot_count = 999

The reason why this occurs and why the solution works

We need to look at this method of the Scheduler: https://github.com/apache/airflow/blob/2.2.3/airflow/jobs/scheduler_job.py#L229
In it, the overall logic is the following:

  1. Receive max_tis(== parallelism - active running tasks from the config) in the arguments. In my case that would be 30
  2. Calculate how many free slots in all all Pools we have. In my case that would be 4.
  3. Select the minimum and update the max_tis variable: max_tis = min(4, 30) = 4
  4. Query the DB for tasks that are Scheduled in an unpaused DAGs that are running normally and order them by DAG execution date.
  5. Limit the query by max_tis.
  6. Loop over each returned task and check if we can run it. Run, if possible.

Now, since the tasks are ordered by execution date, this will return us tasks for a DAG that started first, which lets say was DAG_2. It has over 600 tasks. The LIMIT operation will return 4 of them. Each off these tasks cannot be run since there are already 2 tasks running and the max_active_tasks of the DAG is 2. Thus, we just wait till one of these tasks finish and start another task from the same DAG.

Owner
Ilya Strelnikov
Ilya Strelnikov
A tool for generating skill map/tree like diagram

skillmap A tool for generating skill map/tree like diagram. What is a skill map/tree? Skill tree is a term used in video games, and it can be used for

Yue 98 Jan 07, 2023
How to build an Fahrenheit to Celsius Converter in Python

Generally to measure the temperature we make use of one of these two popular units i.e. Fahrenheit & Celsius.

PyLaboratory 0 Feb 07, 2022
Hera is a Python framework for constructing and submitting Argo Workflows.

Hera is an Argo Workflows Python SDK. Hera aims to make workflow construction and submission easy and accessible to everyone! Hera abstracts away workflow setup details while still maintaining a cons

argoproj-labs 241 Jan 02, 2023
PyScaffold is a project generator for bootstrapping high quality Python packages

PyScaffold is a project generator for bootstrapping high quality Python packages, ready to be shared on PyPI and installable via pip. It is easy to use and encourages the adoption of the best tools a

PyScaffold 1.7k Jan 03, 2023
Bots in moderation and a game (for now)

Tutorial: come far funzionare il bot e durarlo per 24/7 (o quasi...) Ci sono 17 passi per seguire: Andare sul sito Replit https://replit.com/ Vedrete

ZacyKing 1 Dec 27, 2021
A frontend to ease the use of pulseaudio's routing capabilities, mimicking voicemeeter's workflow

Pulsemeeter A frontend to ease the use of pulseaudio's routing capabilities, mimicking voicemeeter's workflow Features Create virtual inputs and outpu

Gabriel Carneiro 164 Jan 04, 2023
Deis v1, the CoreOS and Docker PaaS: Your PaaS. Your Rules.

This repository (deis/deis) is no longer developed or maintained. The Deis v1 PaaS based on CoreOS Container Linux and Fleet has been replaced by Deis

Deis 6.1k Jan 04, 2023
Final Fantasy XIV Auto House Clicker

Final Fantasy XIV Auto House Clicker

KanameS 0 Mar 31, 2022
TrainingBike - Code, models and schematics I've used to interface my stationary training bike with PC.

TrainingBike Code, models and schematics I've used to interface my stationary training bike with PC. You can find more information about the project i

1 Jan 01, 2022
Laurence Billingham 1 Feb 16, 2022
A Unified Framework for Hydrology

Unified Framework for Hydrology The Python package unifhy (Unified Framework for Hydrology) is a hydrological modelling framework which combines inter

Unified Framefork for Hydrology - Community Organisation 6 Jan 01, 2023
Now you'll never be late for your Webinars or Meetings on the GoToWebinar Platform

GoToWebinar Launcher : Now you'll never be late for your Webinars or Meetings on the GoToWebinar Platform About Are you popular for always being late

Jay Thorat 6 Jun 07, 2022
⏰ Shutdown Timer is an application that you can shutdown, restart, logoff, and hibernate your computer with a timer.

Shutdown Timer is a an application that you can shutdown, restart, logoff, and hibernate your computer with a timer. After choosing an action from the

Mehmet Güdük 5 Jun 27, 2022
Scraping comments from the political section of popular Nigerian blog (Nairaland), and saving in a CSV file.

Scraping_Nairaland This project scraped comments from the political section of popular Nigerian blog www.nairaland.com using the Python BeautifulSoup

Ansel Orhero 1 Nov 14, 2021
The official FOSSCOMM 2021 CTF by [email protected]

FOSSCOMM 2021 CTF Table of Contents General Info FAQ General Info Purpose: This CTF is a collaboration between the FOSSCOMM conference and the Machina 2 Nov 14, 2021

一个Graia-Saya的插件仓库

一个Graia-Saya的插件仓库 这是一个存储基于 Graia-Saya 的插件的仓库 如果您有这类项目

ZAPHAKIEL 111 Oct 24, 2022
A simple code for processing images to local binary pattern.

This figure is gotten from this link https://link.springer.com/chapter/10.1007/978-3-030-01449-0_24 LBP-Local-Binary-Pattern A simple code for process

Happy N. Monday 3 Feb 15, 2022
SciPy library main repository

SciPy SciPy (pronounced "Sigh Pie") is an open-source software for mathematics, science, and engineering. It includes modules for statistics, optimiza

SciPy 10.7k Jan 09, 2023
Notebooks for computing approximations to the prime counting function using Riemann's formula.

Notebooks for computing approximations to the prime counting function using Riemann's formula.

Tom White 2 Aug 02, 2022
Module to align code with thoughts of users and designers. Also magically handles navigation and permissions.

This readme will introduce you to Carteblanche and walk you through an example app, please refer to carteblanche-django-starter for the full example p

Eric Neuman 42 May 28, 2021