A reproduction repo for a Scheduling bug in AirFlow 2.2.3

Overview

How to run

docker-compose build
docker-compose up

Setup

Have 3 DAGs:

  1. ~240 Tasks, executes every hour, runs for about 45-50 minutes total
  2. ~600 Tasks, executes every 5 days, runs for days
  3. ~10 Tasks, executes on trigger, runs for 10-50 minutes

DAGs run on the default_pool with max_active_tasks set to 2.

My AirFlow config file has the following:

parallelism = 32
default_pool_task_slot_count = 6
executor = LocalExecutor
default_task_weight_rule = absolute

parallelism was set initially like that. default_pool_task_slot_count was set to 6, because it seemed rational that if all of my 3 dags are executing at the same time, the maximum amount of tasks that can be executed is 3*2=6.

The problem:

Almost every time any one of the DAGs is executed, no tasks from other DAGs will start until all the tasks from the first one finish. That is, if slow DAG_2 with 600 tasks starts, DAG_1 will have to wait for days to start even a single Task.

the_bug

The Logs for the Scheduler look like this:

scheduler_1  | 
scheduler_1  | [2022-02-09 10:02:27,116] {scheduler_job.py:288} INFO - 4 tasks up for execution:
scheduler_1  |  
   
    
scheduler_1  |  
    
     
scheduler_1  |  
     
      
scheduler_1  |  
      
       
scheduler_1  | [2022-02-09 10:02:27,119] {scheduler_job.py:322} INFO - Figuring out tasks to run in Pool(name=default_pool) with 4 open slots and 4 task instances ready to be queued
scheduler_1  | [2022-02-09 10:02:27,119] {scheduler_job.py:349} INFO - DAG Slow_DAG has 2/2 running and queued tasks
scheduler_1  | [2022-02-09 10:02:27,119] {scheduler_job.py:357} INFO - Not executing 
       
         since the number of tasks running or queued from DAG Slow_DAG is >= to the DAG's max_active_tasks limit of 2 scheduler_1 | [2022-02-09 10:02:27,119] {scheduler_job.py:349} INFO - DAG Slow_DAG has 2/2 running and queued tasks scheduler_1 | [2022-02-09 10:02:27,119] {scheduler_job.py:357} INFO - Not executing 
        
          since the number of tasks running or queued from DAG Slow_DAG is >= to the DAG's max_active_tasks limit of 2 scheduler_1 | [2022-02-09 10:02:27,119] {scheduler_job.py:349} INFO - DAG Slow_DAG has 2/2 running and queued tasks scheduler_1 | [2022-02-09 10:02:27,120] {scheduler_job.py:357} INFO - Not executing 
         
           since the number of tasks running or queued from DAG Slow_DAG is >= to the DAG's max_active_tasks limit of 2 scheduler_1 | [2022-02-09 10:02:27,120] {scheduler_job.py:349} INFO - DAG Slow_DAG has 2/2 running and queued tasks scheduler_1 | [2022-02-09 10:02:27,120] {scheduler_job.py:357} INFO - Not executing 
          
            since the number of tasks running or queued from DAG Slow_DAG is >= to the DAG's max_active_tasks limit of 2 scheduler_1 | [2022-02-09 10:02:27,120] {scheduler_job.py:410} INFO - Setting the following tasks to queued state: scheduler_1 | 
          
         
        
       
      
     
    
   

I have fixed this by setting

parallelism = 1000
default_pool_task_slot_count = 999

The reason why this occurs and why the solution works

We need to look at this method of the Scheduler: https://github.com/apache/airflow/blob/2.2.3/airflow/jobs/scheduler_job.py#L229
In it, the overall logic is the following:

  1. Receive max_tis(== parallelism - active running tasks from the config) in the arguments. In my case that would be 30
  2. Calculate how many free slots in all all Pools we have. In my case that would be 4.
  3. Select the minimum and update the max_tis variable: max_tis = min(4, 30) = 4
  4. Query the DB for tasks that are Scheduled in an unpaused DAGs that are running normally and order them by DAG execution date.
  5. Limit the query by max_tis.
  6. Loop over each returned task and check if we can run it. Run, if possible.

Now, since the tasks are ordered by execution date, this will return us tasks for a DAG that started first, which lets say was DAG_2. It has over 600 tasks. The LIMIT operation will return 4 of them. Each off these tasks cannot be run since there are already 2 tasks running and the max_active_tasks of the DAG is 2. Thus, we just wait till one of these tasks finish and start another task from the same DAG.

Owner
Ilya Strelnikov
Ilya Strelnikov
Demo code for "Logs in distributed systems" webinar

Hexlet Logs Demo Пререквизиты docker-compose python3 Учетка в DataDog Базовое понимание, что такое логи (можно почитать гайд

Anton Markelov 1 Dec 01, 2021
Simple Crud Python vs MySQL

Simple Crud Python vs MySQL The idea came when I was studying MySQ... A desire to create a python program that can give access to a "localhost" databa

Lucas 1 Jan 21, 2022
This collection is to provide an easier way to interact with Juniper

Ansible Collection - cremsburg.apstra Overview The goal of this collection is to provide an easier way to interact with Juniper's Apstra solution. Whi

Calvin Remsburg 1 Jan 18, 2022
A simple API to upload notes or files to KBFS

This API can be used to upload either secure notes or files to a secure KeybaseFS folder.

Dakota Brown 1 Oct 08, 2021
redun aims to be a more expressive and efficient workflow framework

redun yet another redundant workflow engine redun aims to be a more expressive and efficient workflow framework, built on top of the popular Python pr

insitro 372 Jan 04, 2023
Project aims to map out common user behavior on the computer

User-Behavior-Mapping-Tool Project aims to map out common user behavior on the computer. Most of the code is based on the research by kacos2000 found

trustedsec 136 Dec 23, 2022
Simple Python Gemini browser with nice formatting

gg I wasn't satisfied with any of the other available Gemini clients, so I wrote my own. Requires Python 3.9 (maybe older, I haven't checked) and opti

Sarah Taube 2 Nov 21, 2021
i3wm helper tool for workspaces on multiple monitors

i3screens A helper tool for managing i3wm workspaces on multiple monitors. Use-case You have a multi-monitor setup and want to have the "same" workspa

Sebastian Neef 1 Dec 05, 2022
📽 Streamlit application powered by a PyScaffold project setup

streamlit-demo Streamlit application powered by a PyScaffold project setup. Work in progress: The idea of this repo is to demonstrate how to package a

PyScaffold 2 Oct 10, 2022
AIST++ API This repo contains starter code for using the AIST++ dataset.

Explainability for Vision Transformers (in PyTorch) This repository implements methods for explainability in Vision Transformers

Google 260 Dec 30, 2022
Subnet calculator script using python

subnetCalculator Subnet calculator script using python3 Interactive Version Define the subnet variable interactively Use: subnetDict = subnetCalculato

1 Feb 15, 2022
The semi-complete teardown of Cosmo's Cosmic Adventure.

The semi-complete teardown of Cosmo's Cosmic Adventure.

Scott Smitelli 10 Dec 02, 2022
Solutions for the Advent of Code 2021 event.

About 📋 This repository holds all of the solution code for the Advent of Code 2021 event. All solutions are done in Python 3.9.9 and done in non-real

robert yin 0 Mar 21, 2022
Insert a Spotify Playlist, Get a list of YouTube URLs from it.

spotbee This is a module that spits out YouTube URLs from Spotify Playlist URLs Why use this? It is asynchronous which makes it compatible to use with

Nishant Sapkota 10 Apr 06, 2022
An Advent calendar of small programming puzzles for a variety of skill sets and skill levels.

Advent of Code 2021 The Advent of Code is an Advent calendar of small programming puzzles for a variety of skill sets and skill levels that can be sol

Evan Cope 0 Feb 13, 2022
RangDev Notepad App With Python

RangDev Notepad-App-With-Python Take down quick and speedy notes! This is a small project of a notepad app built with Tkinter and SQLite3. Database cr

rangga.alrasya 1 Dec 01, 2021
A NetBox Plugin that gives a UI for generating, comparing and deploying configurations to devices.

netbox_config_plugin - A plugin to generate, compare and deploy configurations This plugin allows you to execute your code to generate a config for a

Jo 11 Dec 21, 2022
A comparison of mesh generators.

This repository creates meshes of the same domains with multiple mesh generators and compares the results.

Nico Schlömer 29 Dec 12, 2022
Create Arrays (Working with For Loops)

DSA with Python Create Arrays (Working with For Loops) CREATING ARRAYS WITH USER INPUT Array is a collection of items stored at contiguous memory loca

1 Feb 08, 2022
The Python Fuzzer that the world deserves 🐍

pip3 install frelatage Current release : 0.0.2 The Python Fuzzer that the world deserves Installation | How it works | Features | Use Frelatage | Conf

Rog3r 219 Dec 21, 2022