A reproduction repo for a Scheduling bug in AirFlow 2.2.3

Overview

How to run

docker-compose build
docker-compose up

Setup

Have 3 DAGs:

  1. ~240 Tasks, executes every hour, runs for about 45-50 minutes total
  2. ~600 Tasks, executes every 5 days, runs for days
  3. ~10 Tasks, executes on trigger, runs for 10-50 minutes

DAGs run on the default_pool with max_active_tasks set to 2.

My AirFlow config file has the following:

parallelism = 32
default_pool_task_slot_count = 6
executor = LocalExecutor
default_task_weight_rule = absolute

parallelism was set initially like that. default_pool_task_slot_count was set to 6, because it seemed rational that if all of my 3 dags are executing at the same time, the maximum amount of tasks that can be executed is 3*2=6.

The problem:

Almost every time any one of the DAGs is executed, no tasks from other DAGs will start until all the tasks from the first one finish. That is, if slow DAG_2 with 600 tasks starts, DAG_1 will have to wait for days to start even a single Task.

the_bug

The Logs for the Scheduler look like this:

scheduler_1  | 
scheduler_1  | [2022-02-09 10:02:27,116] {scheduler_job.py:288} INFO - 4 tasks up for execution:
scheduler_1  |  
   
    
scheduler_1  |  
    
     
scheduler_1  |  
     
      
scheduler_1  |  
      
       
scheduler_1  | [2022-02-09 10:02:27,119] {scheduler_job.py:322} INFO - Figuring out tasks to run in Pool(name=default_pool) with 4 open slots and 4 task instances ready to be queued
scheduler_1  | [2022-02-09 10:02:27,119] {scheduler_job.py:349} INFO - DAG Slow_DAG has 2/2 running and queued tasks
scheduler_1  | [2022-02-09 10:02:27,119] {scheduler_job.py:357} INFO - Not executing 
       
         since the number of tasks running or queued from DAG Slow_DAG is >= to the DAG's max_active_tasks limit of 2 scheduler_1 | [2022-02-09 10:02:27,119] {scheduler_job.py:349} INFO - DAG Slow_DAG has 2/2 running and queued tasks scheduler_1 | [2022-02-09 10:02:27,119] {scheduler_job.py:357} INFO - Not executing 
        
          since the number of tasks running or queued from DAG Slow_DAG is >= to the DAG's max_active_tasks limit of 2 scheduler_1 | [2022-02-09 10:02:27,119] {scheduler_job.py:349} INFO - DAG Slow_DAG has 2/2 running and queued tasks scheduler_1 | [2022-02-09 10:02:27,120] {scheduler_job.py:357} INFO - Not executing 
         
           since the number of tasks running or queued from DAG Slow_DAG is >= to the DAG's max_active_tasks limit of 2 scheduler_1 | [2022-02-09 10:02:27,120] {scheduler_job.py:349} INFO - DAG Slow_DAG has 2/2 running and queued tasks scheduler_1 | [2022-02-09 10:02:27,120] {scheduler_job.py:357} INFO - Not executing 
          
            since the number of tasks running or queued from DAG Slow_DAG is >= to the DAG's max_active_tasks limit of 2 scheduler_1 | [2022-02-09 10:02:27,120] {scheduler_job.py:410} INFO - Setting the following tasks to queued state: scheduler_1 | 
          
         
        
       
      
     
    
   

I have fixed this by setting

parallelism = 1000
default_pool_task_slot_count = 999

The reason why this occurs and why the solution works

We need to look at this method of the Scheduler: https://github.com/apache/airflow/blob/2.2.3/airflow/jobs/scheduler_job.py#L229
In it, the overall logic is the following:

  1. Receive max_tis(== parallelism - active running tasks from the config) in the arguments. In my case that would be 30
  2. Calculate how many free slots in all all Pools we have. In my case that would be 4.
  3. Select the minimum and update the max_tis variable: max_tis = min(4, 30) = 4
  4. Query the DB for tasks that are Scheduled in an unpaused DAGs that are running normally and order them by DAG execution date.
  5. Limit the query by max_tis.
  6. Loop over each returned task and check if we can run it. Run, if possible.

Now, since the tasks are ordered by execution date, this will return us tasks for a DAG that started first, which lets say was DAG_2. It has over 600 tasks. The LIMIT operation will return 4 of them. Each off these tasks cannot be run since there are already 2 tasks running and the max_active_tasks of the DAG is 2. Thus, we just wait till one of these tasks finish and start another task from the same DAG.

Owner
Ilya Strelnikov
Ilya Strelnikov
Pengenalan para anggota KOMPETEGRAM

Pengenalan Anggota KOMPETEGRAM Apa isi repositori ini ? 💬 Repositori ini berisi pengenalan nama anggota KOMPETEGRAM dari seluruh angkatan atau Batch.

Repositori KOMPETEGRAM 7 Sep 17, 2022
python scripts and other files to generate induction encoder PCBs in Kicad

induction_encoder python scripts and other files to generate induction encoder PCBs in Kicad Targeting the Renesas IPS2200 encoder chips.

Taylor Alexander 8 Feb 16, 2022
Driving lessons made simpler. Custom scheduling API built with Python.

NOTE This is a mirror of a GitLab repository. Dryvo Dryvo is a unique solution for the driving lessons industry. Our aim is to save the teacher’s time

Adam Goldschmidt 595 Dec 05, 2022
Repositorio com arquivos processados da CPI da COVID para facilitar analise

cpi4all Repositorio com arquivos processados da CPI da COVID para facilitar analise Organização No site do senado é possivel encontrar a lista de todo

Breno Rodrigues Guimarães 12 Aug 16, 2021
Python library for generating CycloneDX SBOMs

Python Library for generating CycloneDX This CycloneDX module for Python can generate valid CycloneDX bill-of-material document containing an aggregat

CycloneDX SBOM Standard 31 Dec 16, 2022
Python project that aims to discover CDP neighbors and map their Layer-2 topology within a shareable medium like Visio or Draw.io.

Python project that aims to discover CDP neighbors and map their Layer-2 topology within a shareable medium like Visio or Draw.io.

3 Feb 11, 2022
Apache Airflow - A platform to programmatically author, schedule, and monitor workflows

Apache Airflow Apache Airflow (or simply Airflow) is a platform to programmatically author, schedule, and monitor workflows. When workflows are define

The Apache Software Foundation 28.6k Dec 28, 2022
The ROS package for Airbotics.

airbotics The ROS package for Airbotics: Developed for ROS 1 and Python 3.8. The package has not been officially released on ROS yet so manual install

Airbotics 19 Dec 25, 2022
0CD - BinaryNinja plugin to introduce some quality of life utilities for obsessive compulsive CTF enthusiasts

0CD Author: b0bb Quality of life utilities for obsessive compulsive CTF enthusia

12 Sep 14, 2022
A minimal configuration for a dockerized kafka project.

Docker Kafka Quickstart A minimal configuration for a dockerized kafka project. Usage: Run this command to build kafka and zookeeper containers, and c

Nouamane Tazi 5 Jan 12, 2022
System Information Utility With Python

System-Information-Utility This is a simple utility, for the terminal, which allows you to find out information about your PC. It's very easy to run t

2 Apr 15, 2022
Little tool in python to watch anime from the terminal (the better way to watch anime)

anipy-cli Little tool in python to watch anime from the terminal (the better way to watch anime) Has a resume playback function when picking from Hist

sdao 97 Dec 29, 2022
India's own RPA Platform Python Powered

Welcome to My-AutoPylot , Made in India with ❤️ What is My-AutoPylot? PyBots is an Indian firm based in Vadodara, Gujarat. My-AutoPylot is a product d

PyBots Pvt Ltd 28 Sep 12, 2022
Python requirements.txt Guesser

Python-Requirements-Guesser ⚠️ This is alpha quality software. Work in progress Attempt to guess requirements.txt modules versions based on Git histor

Jerome 9 May 24, 2022
:fishing_pole_and_fish: List of `pre-commit` hooks to ensure the quality of your `dbt` projects.

pre-commit-dbt List of pre-commit hooks to ensure the quality of your dbt projects. BETA NOTICE: This tool is still BETA and may have some bugs, so pl

Offbi 262 Nov 25, 2022
Voldemort's Python import helper

importmagician Voldemort's Python import helper pip install importmagician Import from uninstalled Python directories Say you have a directory (relat

Zhengyang Feng 4 Mar 09, 2022
My collection of mini-projects in various languages

Mini-Projects My collection of mini-projects in various languages About: This repository consists of a number of small projects. Most of these "mini-p

Siddhant Attavar 1 Jul 11, 2022
Wrapper for the undocumented CodinGame API. Can be used both synchronously and asynchronlously.

codingame API wrapper Pythonic wrapper for the undocumented CodinGame API. Installation Python 3.6 or higher is required. Install codingame with pip:

Takos 19 Jun 20, 2022
Aerial Ace is a helper bot for poketwo which provide various functionalities on top of being a pokedex.

Aerial Ace is a helper bot for poketwo which provide various functionalities on top of being a pokedex.

Devanshu Mishra 1 Dec 01, 2021
Scraping comments from the political section of popular Nigerian blog (Nairaland), and saving in a CSV file.

Scraping_Nairaland This project scraped comments from the political section of popular Nigerian blog www.nairaland.com using the Python BeautifulSoup

Ansel Orhero 1 Nov 14, 2021