A Django app that integrates with Dramatiq.

Overview

django_dramatiq

Build Status PyPI version

django_dramatiq is a Django app that integrates with Dramatiq.

Requirements

Example

You can find an example application built with django_dramatiq here.

Installation

pip install django-dramatiq

Add django_dramatiq to installed apps before any of your custom apps:

import os

INSTALLED_APPS = [
    "django_dramatiq",

    "myprojectapp1",
    "myprojectapp2",
    # etc...
]

Configure your broker in settings.py:

DRAMATIQ_BROKER = {
    "BROKER": "dramatiq.brokers.rabbitmq.RabbitmqBroker",
    "OPTIONS": {
        "url": "amqp://localhost:5672",
    },
    "MIDDLEWARE": [
        "dramatiq.middleware.Prometheus",
        "dramatiq.middleware.AgeLimit",
        "dramatiq.middleware.TimeLimit",
        "dramatiq.middleware.Callbacks",
        "dramatiq.middleware.Retries",
        "django_dramatiq.middleware.DbConnectionsMiddleware",
        "django_dramatiq.middleware.AdminMiddleware",
    ]
}

# Defines which database should be used to persist Task objects when the
# AdminMiddleware is enabled.  The default value is "default".
DRAMATIQ_TASKS_DATABASE = "default"

You may also configure a result backend:

DRAMATIQ_RESULT_BACKEND = {
    "BACKEND": "dramatiq.results.backends.redis.RedisBackend",
    "BACKEND_OPTIONS": {
        "url": "redis://localhost:6379",
    },
    "MIDDLEWARE_OPTIONS": {
        "result_ttl": 60000
    }
}

Usage

Declaring tasks

django_dramatiq will auto-discover tasks defined in tasks modules in each of your installed apps. For example, if you have an app named customers, your tasks for that app should live in a module called customers.tasks:

import dramatiq

from django.core.mail import send_mail

from .models import Customer

@dramatiq.actor
def email_customer(customer_id, subject, message):
    customer = Customer.get(pk=customer_id)
    send_mail(subject, message, "[email protected]", [customer.email])

Running workers

django_dramatiq comes with a management command you can use to auto-discover task modules and run workers:

python manage.py rundramatiq

If your project for some reason has apps with modules named tasks that are not intended for use with Dramatiq, you can ignore them:

DRAMATIQ_IGNORED_MODULES = (
    'app1.tasks',
    'app2.tasks',
    'app3.tasks.utils',
    'app3.tasks.utils.*',
    ...
)

The wildcard detection will ignore all sub modules from that point on. You will need to ignore the module itself if you don't want the __init__.py to be processed.

Testing

You should have a separate settings file for test. In that file, overwrite the broker to use Dramatiq's StubBroker:

DRAMATIQ_BROKER = {
    "BROKER": "dramatiq.brokers.stub.StubBroker",
    "OPTIONS": {},
    "MIDDLEWARE": [
        "dramatiq.middleware.AgeLimit",
        "dramatiq.middleware.TimeLimit",
        "dramatiq.middleware.Callbacks",
        "dramatiq.middleware.Pipelines",
        "dramatiq.middleware.Retries",
        "django_dramatiq.middleware.DbConnectionsMiddleware",
        "django_dramatiq.middleware.AdminMiddleware",
    ]
}

Using pytest-django

In your conftest module set up fixtures for your broker and a worker:

import dramatiq
import pytest

@pytest.fixture
def broker():
    broker = dramatiq.get_broker()
    broker.flush_all()
    return broker

@pytest.fixture
def worker(broker):
    worker = dramatiq.Worker(broker, worker_timeout=100)
    worker.start()
    yield worker
    worker.stop()

In your tests, use those fixtures whenever you want background tasks to be executed:

def test_customers_can_be_emailed(transactional_db, broker, worker, mailoutbox):
    customer = Customer(email="[email protected]")
    # Assuming "send_welcome_email" enqueues an "email_customer" task
    customer.send_welcome_email()

    # Wait for all the tasks to be processed
    broker.join("default")
    worker.join()

    assert len(mailoutbox) == 1
    assert mailoutbox[0].subject == "Welcome Jim!"

Using unittest

A simple test case has been provided that will automatically set up the broker and worker for each test, which are accessible as attributes on the test case. Note that DramatiqTestCase inherits django.test.TransactionTestCase.

from django.core import mail
from django.test import override_settings
from django_dramatiq.test import DramatiqTestCase


class CustomerTestCase(DramatiqTestCase):

    @override_settings(EMAIL_BACKEND='django.core.mail.backends.locmem.EmailBackend')
    def test_customers_can_be_emailed(self):
        customer = Customer(email="[email protected]")
        # Assuming "send_welcome_email" enqueues an "email_customer" task
        customer.send_welcome_email()

        # Wait for all the tasks to be processed
        self.broker.join(customer.queue_name)
        self.worker.join()

        self.assertEqual(len(mail.outbox), 1)
        self.assertEqual(mail.outbox[0].subject, "Welcome Jim!")

Cleaning up old tasks

The AdminMiddleware stores task metadata in a relational DB so it's a good idea to garbage collect that data every once in a while. You can use the delete_old_tasks actor to achieve this on a cron:

delete_old_tasks.send(max_task_age=86400)

Middleware

django_dramatiq.middleware.DbConnectionsMiddleware
This middleware is vital in taking care of closing expired connections after each message is processed.
django_dramatiq.middleware.AdminMiddleware
This middleware stores metadata about tasks in flight to a database and exposes them via the Django admin.

Custom keyword arguments to Middleware

Some middleware classes require dynamic arguments. An example of this would be the backend argument to dramatiq.middleware.GroupCallbacks.

To do this, you might add the middleware to your settings.py:

DRAMATIQ_BROKER = {
    ...
    "MIDDLEWARE": [
        ...
        "dramatiq.middleware.GroupCallbacks",
        ...
    ]
    ...
}

Next, you need to extend DjangoDramatiqConfig to provide the arguments for this middleware:

from django_dramatiq.apps import DjangoDramatiqConfig


class CustomDjangoDramatiqConfig(DjangoDramatiqConfig):
    @classmethod
    def middleware_groupcallbacks_kwargs(cls):
        return {"rate_limiter_backend": cls.get_rate_limiter_backend()}


CustomDjangoDramatiqConfig.initialize()

Notice the naming convention, to provide arguments to dramatiq.middleware.GroupCallbacks you need to add a @classmethod with the name middleware_<middleware_name>_kwargs, where <middleware_name> is the lowercase name of the middleware.

Finally, add the custom app config to your settings.py, replacing the existing django_dramatiq app config:

INSTALLED_APPS = [
    ...
    "yourapp.apps.CustomDjangoDramatiqConfig",
    ...
]

Usage with django-configurations

To use django_dramatiq together with django-configurations you need to define your own rundramatiq command as a subclass of the one in this package.

In YOURPACKAGE/management/commands/rundramatiq.py:

from django_dramatiq.management.commands.rundramatiq import Command as RunDramatiqCommand


class Command(RunDramatiqCommand):
    def discover_tasks_modules(self):
        tasks_modules = super().discover_tasks_modules()
        tasks_modules[0] = "YOURPACKAGE.dramatiq_setup"
        return tasks_modules

And in YOURPACKAGE/dramatiq_setup.py:

import django

from configurations.importer import install

install(check_options=True)
django.setup()

Running project tests locally

Install the dev dependencies with pip install -e '.[dev]' and then run tox.

License

django_dramatiq is licensed under Apache 2.0. Please see LICENSE for licensing details.

Comments
  • DbConnectionsMiddleware and

    DbConnectionsMiddleware and "Lost connection to MySQL server during query"

    Hi,

    I am using DbConnectionsMiddleware and I am still having problems if two tasks are processed and MySQL closes the connection between these two executions.

    I wrote a test to illustrate this example:

    models.py

    class SomeModel(models.Model):
        pass
    

    tests.py

    import dramatiq
    import pytest
    import time
    
    from django_dramatiq.middleware import DbConnectionsMiddleware
    
    from django.db import OperationalError
    
    
    @pytest.fixture(scope='function')
    def broker_stub():
        broker = StubBroker(middleware=[DbConnectionsMiddleware()])
        broker.emit_after('process_boot')
        dramatiq.set_broker(broker)
        yield broker
        broker.flush_all()
        broker.close()
    
    
    @pytest.fixture()
    def worker_stub(broker_stub):
        worker = Worker(broker_stub, worker_timeout=100, worker_threads=1)
        worker.start()
        yield worker
        worker.stop()
    
    
    @pytest.fixture(scope='function')
    def db_wait_timeout():
        from django.db import connection
        value = 10
        # set wait_timeout to force MySQL errors
        with connection.cursor() as cursor:
            cursor.execute(f'SET @@GLOBAL.wait_timeout={value};')
        connection.close()
        yield value
        # rollback wait_timeout to avoid django_db fixture cleanup errors
        with connection.cursor() as cursor:
            cursor.execute('SET @@GLOBAL.wait_timeout=28800;')
        connection.close()
    
    
    @pytest.mark.django_db(transaction=True)
    def test_must_not_raise_operational_error(self, db_wait_timeout, broker_stub, worker_stub):
        results = []
    
        @dramatiq.actor()
        def do_work():
            from .models import SomeModel
            try:
                model = SomeModel()
                model.save()
                results.append('success')
            except Exception as error:
                results.append(error)
    
        # initialize connections with database
        do_work.send()
        broker_stub.join(do_work.queue_name)
        worker_stub.join()
    
        # wait until all connections are terminated by MySQL
        time.sleep(db_wait_timeout + 5)
    
        # try to use available connections
        do_work.send()
    
        # wait until all connections are terminated by MySQL
        time.sleep(db_wait_timeout + 5)
    
        assert results == ['success', 'success']
    

    I think the problem occurs because DbConnectionsMiddleware closes django db connections only:

    • before_consumer_thread_shutdown
    • before_worker_thread_shutdown
    • before_worker_shutdown

    I think it needs also to close old connections:

    • before_process_message
    • after_process_message
    class DbConnectionsMiddleware(Middleware):
        ...
        def _close_old_connections(self, *args, **kwargs):
            db.close_old_connections()
            
        def before_process_message(self, broker, message):
            self._close_old_connections()
    
        def after_process_message(self, broker, message, *, result=None, exception=None):
            self._close_old_connections()
    

    Am I doing something wrong or what I've proposed makes sense?

    bug 
    opened by brunabxs 11
  • Supervisord

    Supervisord

    I'm trying to deploy using supervisord but it always gives me an error

    Nov 1 07:54:49 PM  [2020-11-01 19:54:49,875] [PID 236] [MainThread] [dramatiq.ForkProcess(0)] [INFO] Fork process 'dramatiq.middleware.prometheus:_run_exposition_server' is ready for action.
    Nov 1 07:54:50 PM  [2020-11-01 19:54:50,877] [PID 12] [MainThread] [dramatiq.MainProcess] [CRITICAL] Worker with PID 53 exited unexpectedly (code -9). Shutting down...
    Nov 1 07:54:50 PM  [2020-11-01 19:54:50,877] [PID 52] [MainThread] [dramatiq.WorkerProcess(0)] [INFO] Stopping worker process...
    Nov 1 07:54:50 PM  [2020-11-01 19:54:50,877] [PID 54] [MainThread] [dramatiq.WorkerProcess(2)] [INFO] Stopping worker process...
    Nov 1 07:54:50 PM  [2020-11-01 19:54:50,877] [PID 55] [MainThread] [dramatiq.WorkerProcess(3)] [INFO] Stopping worker process...
    Nov 1 07:54:50 PM  [2020-11-01 19:54:50,877] [PID 56] [MainThread] [dramatiq.WorkerProcess(4)] [INFO] Stopping worker process...
    Nov 1 07:54:50 PM  [2020-11-01 19:54:50,877] [PID 59] [MainThread] [dramatiq.WorkerProcess(7)] [INFO] Stopping worker process...
    Nov 1 07:54:50 PM  [2020-11-01 19:54:50,877] [PID 236] [MainThread] [dramatiq.ForkProcess(0)] [INFO] Stopping fork process...
    Nov 1 07:54:50 PM  [2020-11-01 19:54:50,885] [PID 56] [MainThread] [dramatiq.worker.Worker] [INFO] Shutting down...
    Nov 1 07:54:50 PM  [2020-11-01 19:54:50,887] [PID 55] [MainThread] [dramatiq.worker.Worker] [INFO] Shutting down...
    Nov 1 07:54:50 PM  [2020-11-01 19:54:50,901] [PID 52] [MainThread] [dramatiq.worker.Worker] [INFO] Shutting down...
    Nov 1 07:54:51 PM  [2020-11-01 19:54:51,497] [PID 54] [MainThread] [dramatiq.worker.Worker] [INFO] Shutting down...
    Nov 1 07:54:51 PM  [2020-11-01 19:54:51,688] [PID 59] [MainThread] [dramatiq.worker.Worker] [INFO] Shutting down...
    

    Any suggestion here? I'm triggering the command by running rundramatiq that was provided but it doesn't work

    opened by tarsil 10
  • Default amount of threads is different from (non-django) default implementation

    Default amount of threads is different from (non-django) default implementation

    When launching Dramatiq without Django it launches by default with:

    processes equal to CPU cores 8 threads.

    https://dramatiq.io/guide.html#workers https://github.com/Bogdanp/dramatiq/blob/master/dramatiq/worker.py#L72 https://github.com/Bogdanp/dramatiq/blob/e8bc14d4c3bd4a5105d379addb2cacebff4aba4b/dramatiq/cli.py#L158

    However in case of django_dramatiq it launches by default with:

    processes equal to CPU cores threads equal to CPU cores.

    https://github.com/Bogdanp/django_dramatiq/blob/master/django_dramatiq/management/commands/rundramatiq.py#L41

    I think it would be good to align those defaults. However there may be a good reason to have different defaults.

    What would be a good way forward?

    opened by Ecno92 9
  • dramatiq.set_broker(broker) in Phase 3

    dramatiq.set_broker(broker) in Phase 3

    I suspect I found a case when django_dramatiq is too late to set global_broker configured from settings, and actor decorator gets default RabbitMq broker from get_broker()

    Django app registry populate consists of three phases:

    • Phase 1: initialize app configs and import app modules
    • Phase 2: import models modules.
    • Phase 3: run ready() methods of app configs. Here is where correct broker is set

    Hence, in my case the models.py imports a task from tasks.py. That happens on Phase 2, prior to the moment of ready() of DjangoDramatiqConfig is called.

    I understand that broker initialization must be put into the ready() due to middleware that can require apps to be initialized first. However resolving this issue could improve quality of life a lot.

    As for now, I guess local imports of task will do.

    Kind regards, Serj

    bug help wanted 
    opened by nanopony 9
  • Anomalous AdminMiddleware serialization behavior

    Anomalous AdminMiddleware serialization behavior

    Hi,

    first of all, thanks for your work on dramatiq and django-dramatiq.

    I'd like to report a strange behaviour of the AdminMiddleware, in which it raises an exception upon task serialization.

    Full reproducer at: https://github.com/sanjioh/django-dramatiq-repro

    The core of the problem seems to be in the following files:

    views.py

    from uuid import uuid4
    
    from django.http import response
    
    from .tasks import a_task
    
    
    def repro(request):
        a_task.send(
            {
                'value': str(uuid4()),
            },
        )
        return response.HttpResponse()
    
    

    tasks.py

    from uuid import UUID
    
    import dramatiq
    
    
    class Klass:
    
        def __init__(self, value):
            self.value = value
    
        @classmethod
        def fromdict(cls, attrs):
            attrs['value'] = UUID(attrs['value'])
            return cls(**attrs)
    
    
    @dramatiq.actor
    def a_task(attrs):
        Klass.fromdict(attrs)
    

    Stacktrace:

    [2020-04-24 10:17:13,490] [PID 15544] [Thread-4] [dramatiq.broker.RedisBroker] [CRITICAL] Unexpected failure in after_process_message.
    Traceback (most recent call last):
      File "/Users/fabio/.virtualenvs/django-dramatiq-repro/lib/python3.8/site-packages/dramatiq/broker.py", line 98, in emit_after
        getattr(middleware, "after_" + signal)(self, *args, **kwargs)
      File "/Users/fabio/.virtualenvs/django-dramatiq-repro/lib/python3.8/site-packages/django_dramatiq/middleware.py", line 53, in after_process_message
        Task.tasks.create_or_update_from_message(
      File "/Users/fabio/.virtualenvs/django-dramatiq-repro/lib/python3.8/site-packages/django_dramatiq/models.py", line 19, in create_or_update_from_message
        "message_data": message.encode(),
      File "/Users/fabio/.virtualenvs/django-dramatiq-repro/lib/python3.8/site-packages/dramatiq/message.py", line 101, in encode
        return global_encoder.encode(self._asdict())
      File "/Users/fabio/.virtualenvs/django-dramatiq-repro/lib/python3.8/site-packages/dramatiq/encoder.py", line 49, in encode
        return json.dumps(data, separators=(",", ":")).encode("utf-8")
      File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/json/__init__.py", line 234, in dumps
        return cls(
      File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/json/encoder.py", line 199, in encode
        chunks = self.iterencode(o, _one_shot=True)
      File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/json/encoder.py", line 257, in iterencode
        return _iterencode(o, 0)
      File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/json/encoder.py", line 179, in default
        raise TypeError(f'Object of type {o.__class__.__name__} '
    TypeError: Object of type UUID is not JSON serializable
    

    Please let me know if I can provide further details.

    Thanks, Fabio

    opened by sanjioh 7
  • rundramatiq should search for `dramatiq` in the `Scripts` directory on Windows

    rundramatiq should search for `dramatiq` in the `Scripts` directory on Windows

    settings.py:

    """
    Django settings for permissions_test project.
    
    Generated by 'django-admin startproject' using Django 2.2.4.
    
    For more information on this file, see
    https://docs.djangoproject.com/en/2.2/topics/settings/
    
    For the full list of settings and their values, see
    https://docs.djangoproject.com/en/2.2/ref/settings/
    """
    import redis
    import os
    
    # Build paths inside the project like this: os.path.join(BASE_DIR, ...)
    BASE_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
    
    
    # Quick-start development settings - unsuitable for production
    # See https://docs.djangoproject.com/en/2.2/howto/deployment/checklist/
    
    # SECURITY WARNING: keep the secret key used in production secret!
    SECRET_KEY = '$t&u7oces91v6-4v0bi4ntp$z+m#ih+$tr+avjs3p#l3ru(5d4'
    
    # SECURITY WARNING: don't run with debug turned on in production!
    DEBUG = True
    
    ALLOWED_HOSTS = []
    
    
    # Application definition
    
    INSTALLED_APPS = [
        'django.contrib.admin',
        'django.contrib.auth',
        'django.contrib.contenttypes',
        'django.contrib.sessions',
        'django.contrib.messages',
        'django.contrib.staticfiles',
        'django_dramatiq',
        'users',
        'profiles',
    ]
    
    MIDDLEWARE = [
        'django.middleware.security.SecurityMiddleware',
        'django.contrib.sessions.middleware.SessionMiddleware',
        'django.middleware.common.CommonMiddleware',
        'django.middleware.csrf.CsrfViewMiddleware',
        'django.contrib.auth.middleware.AuthenticationMiddleware',
        'django.contrib.messages.middleware.MessageMiddleware',
        'django.middleware.clickjacking.XFrameOptionsMiddleware',
    ]
    
    ROOT_URLCONF = 'permissions_test.urls'
    
    TEMPLATES = [
        {
            'BACKEND': 'django.template.backends.django.DjangoTemplates',
            'DIRS': [os.path.join(BASE_DIR, "templates")],
            'APP_DIRS': True,
            'OPTIONS': {
                'context_processors': [
                    'django.template.context_processors.debug',
                    'django.template.context_processors.request',
                    'django.contrib.auth.context_processors.auth',
                    'django.contrib.messages.context_processors.messages',
                ],
            },
        },
    ]
    
    WSGI_APPLICATION = 'permissions_test.wsgi.application'
    
    
    
    # Database
    # https://docs.djangoproject.com/en/2.2/ref/settings/#databases
    
    DATABASES = {
        'default': {
            'ENGINE': 'django.db.backends.sqlite3',
            'NAME': os.path.join(BASE_DIR, 'db.sqlite3'),
            }
            }
            
            
            # Password validation
            # https://docs.djangoproject.com/en/2.2/ref/settings/#auth-password-validators
            
    DRAMATIQ_REDIS_URL = os.getenv("REDIS_URL", "redis://127.0.0.1:6379/0")
    DRAMATIQ_BROKER = {
        "BROKER": "dramatiq.brokers.redis.RedisBroker",
        "OPTIONS": {
            "connection_pool": redis.ConnectionPool.from_url(DRAMATIQ_REDIS_URL),
        },
        "MIDDLEWARE": [
            "dramatiq.middleware.AgeLimit",
            "dramatiq.middleware.TimeLimit",
            "dramatiq.middleware.Retries",
            "django_dramatiq.middleware.AdminMiddleware",
            "django_dramatiq.middleware.DbConnectionsMiddleware",
        ]
    }
    
    
    DRAMATIQ_RESULT_BACKEND = {
        "BACKEND": "dramatiq.results.backends.redis.RedisBackend",
        "BACKEND_OPTIONS": {
            "url": "redis://localhost:6379",
        },
        "MIDDLEWARE_OPTIONS": {
            "result_ttl": 60000
        }
    }
    
    AUTH_PASSWORD_VALIDATORS = [
        {
            'NAME': 'django.contrib.auth.password_validation.UserAttributeSimilarityValidator',
            },
            {
                'NAME': 'django.contrib.auth.password_validation.MinimumLengthValidator',
                },
                {
                    'NAME': 'django.contrib.auth.password_validation.CommonPasswordValidator',
                    },
                    {
                        'NAME': 'django.contrib.auth.password_validation.NumericPasswordValidator',
                        },
                        ]
                        
    # Кэш
    CACHES = {
        'default': {
            'BACKEND': 'django_redis.cache.RedisCache',
            'LOCATION': 'redis://127.0.0.1:6379/',
            'OPTIONS': {
                'CLIENT_CLASS': 'django_redis.client.DefaultClient',
            }
        }
    }
            
    # Храним сессию в Cache
    SESSION_ENGINE = "django.contrib.sessions.backends.cache"
    SESSION_CACHE_ALIAS = "default"
    
    # Internationalization
    # https://docs.djangoproject.com/en/2.2/topics/i18n/
    
    LANGUAGE_CODE = 'en-us'
    
    TIME_ZONE = 'UTC'
    
    USE_I18N = True
                                    
    USE_L10N = True
    
    USE_TZ = True
    
    
    # Static files (CSS, JavaScript, Images)
    # https://docs.djangoproject.com/en/2.2/howto/static-files/
    
    STATIC_URL = '/static/'
    AUTH_USER_MODEL = 'users.User'
    
    

    I've installed "django_dramatiq, dramatiq, watch, Redis" in /admin exists all tasks that i created

    help wanted good first issue 
    opened by AgentDaun 7
  • Making the auto discover module name configurable

    Making the auto discover module name configurable

    Hi there, I would like to find out what your view is on making the python module name in which tasks reside configurable.

    I see from the code that it will always need to lookup tasks in "tasks.py" so that django_dramatiq's own internal tasks are discovered. Given this I can think of a couple ways to extend the task list discovered by the "discover_tasks_modules" function in the "django_dramatiq.management.commands.rundramatiq" module.

    1. Extend "django_dramatiq" to support a user providing a sequence of additional sub module names. The user could for example provide a "DRAMATIQ_AUTODISCOVER_MODULES" setting which could be used in addition to the current default "tasks" to generate the discovered tasks list. So for example if a user specifies DRAMATIQ_AUTODISCOVER_MODULES = ("services", ) "django_dramatiq" would lookup all the "tasks" and "services" sub modules.
    2. Just extend the rundramatiq command in my own project and append tasks module list generated by "discover_tasks_modules". This option will require no change to "django_dramatiq" package

    I am happy to override the "rundramatiq" command in my own project but would like to hear your thoughts option 1 above.

    opened by BradleyKirton 6
  • Error related to periodiq middleware with version 0.11.0

    Error related to periodiq middleware with version 0.11.0

    I tried upgrading django_dramatiq from version 0.10.0 to 0.11.0 in a working Django + dramatiq + periodiq set-up.

    Python 3.10.4 is used, with the following requirements.txt:

    Django==4.1.3
    git+https://github.com/Sovetnikov/django_periodiq
    dramatiq[rabbitmq, watch]==1.13.0
    django-dramatiq==0.11.0
    

    Django config:

    DRAMATIQ_BROKER = {
        "BROKER": "dramatiq.brokers.rabbitmq.RabbitmqBroker",
        "OPTIONS": {
            "url": DRAMATIQ_BROKER_URL,
        },
        "MIDDLEWARE": [
            "dramatiq.middleware.AgeLimit",
            "dramatiq.middleware.TimeLimit",
            "dramatiq.middleware.Callbacks",
            "dramatiq.middleware.Retries",
            "django_dramatiq.middleware.DbConnectionsMiddleware",
            "django_dramatiq.middleware.AdminMiddleware",
            "periodiq.PeriodiqMiddleware",
        ]
    }
    DRAMATIQ_TASKS_DATABASE = **"default"
    

    I get the following error ValueError: The following actor options are undefined: periodic. Did you forget to add a middleware to your Broker?. Full stack trace available here: https://gist.github.com/kcleong/aded0ca16dfd56304d3774babf4b8471

    If I downgrade django_dramatiq to version 0.10.0 the error disapears.

    bug 
    opened by kcleong 5
  • Fix #123 -- init broker configuration before loading other app models

    Fix #123 -- init broker configuration before loading other app models

    Bug from #123 was introduced in https://github.com/Bogdanp/django_dramatiq/pull/103 where django_dramatiq started benefitting from AppConfig.ready() functionality. There was a research done in https://github.com/Bogdanp/django_dramatiq/issues/100 explaining the underlying issue.

    My PR is using the idea from @dnmellen to call DjangoDramatiqConfig.ready() during its models import, so earlier than any other custom models code would be imported. I was trying to come up with a different way, as this one feels tiny bit "hacky", however could not find anything better.

    opened by amureki 5
  • StreamLostError while sending tasks

    StreamLostError while sending tasks

    Hi! I have a django app (served with gunicorn with sync worker, not gevent) that sends tasks to dramatiq. The workers use gevent, since my background tasks are basically API calls to third-party apps.

    Sometimes I see this kind of error when I try to perform a .send() from the backend (not the gevent worker):

    Unexpected connection close detected: "StreamLostError: (\"Stream connection lost: ConnectionResetError(104, 'Connection reset by peer')\",)"
    

    I searched for this error and a possible solution was to increase the RabbitMQ heartbeat (I set it to 600) or close the connection after every .send(). (https://github.com/Bogdanp/dramatiq/issues/217)

    The only option that worked for me is to close the connection after every .send(), but I wonder if this solution penalizes performance when I have to send a lot of messages to dramatiq and if so, how can I send multiple messages to dramatiq in an efficient way.

    Thanks in advance!

    opened by dnmellen 5
  • Adds support for additional kwargs in middlewares (Solves issue #82)

    Adds support for additional kwargs in middlewares (Solves issue #82)

    I'm trying to make the middleware initialization more flexible by adding a hooks system to be able to override the initial kwargs for some middleware during the django_dramatiq app initialization.

    The way you would provide this hook would be by overriding django_dramatiq django conf:

    If you want to add dramatiq.middleware.GroupCallbacks middleware, the name of the hook would be middleware_groupcallbacks_kwargs. It takes the middleware name and converts it to lowercase (middleware_<middlewarename>_kwargs).

    The hook is a classmethod that returns a dict containing the kwargs for that middleware.

    from django_dramatiq.apps import DjangoDramatiqConfig, RATE_LIMITER_BACKEND
    
    class MyDjangoDramatiqConfig(DjangoDramatiqConfig):
    
        @classmethod
        def middleware_groupcallbacks_kwargs(cls):
            return {"rate_limiter_backend": cls.get_rate_limiter_backend()}
    

    I was completely unable to create tests for this feature without changing the current test settings, but these changes are not changing any functionality unless you override the DjangoDramatiqConfig.

    opened by dnmellen 5
  • Fix for #135 -- when using the PickleEncoder

    Fix for #135 -- when using the PickleEncoder

    When using the PickleEncoder make sure the task view still works by showing a representation of the kwargs which is json serializable This fixes an issue for me when I wasn't able to see the task details Each kwarg is simply replaced by a string representation if an encoder other than the JSONEncoder is used

    opened by huubbouma 0
  • JSON serialization error in admin view when using PickleEncoder

    JSON serialization error in admin view when using PickleEncoder

    If you use the PickleEncoder for the serialization of the arguments to actors:

    DRAMATIQ_ENCODER = "dramatiq.PickleEncoder"

    Then an error will appear if you go to the task details in the admin interface: TypeError: Object of type Application is not JSON serializable

    opened by huubbouma 2
  • Add silent option for startup

    Add silent option for startup

    Hi,

    Currently the rundramatiq management prints each tasks module it finds on startup:

    * Discovered tasks module foo.tasks
    * Discovered tasks module bar.tasks
    

    This output can be quite long if you have many tasks modules.

    It would be nice if this was possible to silence by adding a -v0 option to the command.

    Relevant code: https://github.com/Bogdanp/django_dramatiq/blob/master/django_dramatiq/management/commands/rundramatiq.py#L176

    opened by strokirk 0
  • TaskAdmin +readonly_fields

    TaskAdmin +readonly_fields "created_at", "updated_at"; fix setup name

    1. TaskAdmin +readonly_fields "created_at", "updated_at"
    2. Ignore pycharm files
    3. Let github.com to show "Used by" block at main repo page (setup.name, on my own experience): https://github.com/ikvk/imap_tools/commit/f1b26144bfad8c73c0ab32bd2e1a5a24878ead2e#diff-60f61ab7a8d1910d86d9fda2261620314edcae5894d5aaa236b821c7256badd7R16
    opened by ikvk 0
  • Executable name for Dramatiq on Windows is

    Executable name for Dramatiq on Windows is "dramatiq.exe"

    The _resolve_executable method in django_dramatiq/management/commands/rundramatiq.py will find venv\Scripts\dramatiq, but on Windows it needs to find venv\Scripts\dramatiq.exe

    Can we update the code in: https://github.com/Bogdanp/django_dramatiq/commit/4c70775bbdd7b4a6844afeec48ddb58d3bb7275d

    opened by pkimber 3
Releases(v0.11.2)
  • v0.11.2(Nov 18, 2022)

    What's Changed

    • Replace AppConfig.ready workaround with __init__ by @amureki in https://github.com/Bogdanp/django_dramatiq/pull/137
    • Add GitHub action to release package on PyPI (Fix #132) by @amureki in https://github.com/Bogdanp/django_dramatiq/pull/138

    Full Changelog: https://github.com/Bogdanp/django_dramatiq/compare/v0.11.1...v0.11.2

    Source code(tar.gz)
    Source code(zip)
Add you own metrics to your celery backend

Add you own metrics to your celery backend

Gandi 1 Dec 16, 2022
Queuing with django celery and rabbitmq

queuing-with-django-celery-and-rabbitmq Install Python 3.6 or above sudo apt-get install python3.6 Install RabbitMQ sudo apt-get install rabbitmq-ser

1 Dec 22, 2021
Distributed Task Queue (development branch)

Version: 5.0.5 (singularity) Web: http://celeryproject.org/ Download: https://pypi.org/project/celery/ Source: https://github.com/celery/celery/ Keywo

Celery 20.7k Jan 02, 2023
a little task queue for python

a lightweight alternative. huey is: a task queue (2019-04-01: version 2.0 released) written in python (2.7+, 3.4+) clean and simple API redis, sqlite,

Charles Leifer 4.3k Jan 08, 2023
Asynchronous tasks in Python with Celery + RabbitMQ + Redis

python-asynchronous-tasks Setup & Installation Create a virtual environment and install the dependencies: $ python -m venv venv $ source env/bin/activ

Valon Januzaj 40 Dec 03, 2022
A Django app that integrates with Dramatiq.

django_dramatiq django_dramatiq is a Django app that integrates with Dramatiq. Requirements Django 1.11+ Dramatiq 0.18+ Example You can find an exampl

Bogdan Popa 261 Dec 25, 2022
Accept queue automatically on League of Legends.

Accept queue automatically on League of Legends. I was inspired by the lucassmonn code accept-queue-lol-telegram, and I modify it according to my need

2 Sep 06, 2022
Sync Laravel queue with Python. Provides an interface for communication between Laravel and Python.

Python Laravel Queue Queue sync between Python and Laravel using Redis driver. You can process jobs dispatched from Laravel in Python. NOTE: This pack

Sinan Bekar 3 Oct 01, 2022
Py_extract is a simple, light-weight python library to handle some extraction tasks using less lines of code

py_extract Py_extract is a simple, light-weight python library to handle some extraction tasks using less lines of code. Still in Development Stage! I

I'm Not A Bot #Left_TG 7 Nov 07, 2021
Dagon - An Asynchronous Task Graph Execution Engine

Dagon - An Asynchronous Task Graph Execution Engine Dagon is a job execution sys

8 Nov 17, 2022
Beatserver, a periodic task scheduler for Django 🎵

Beat Server Beatserver, a periodic task scheduler for django channels | beta software How to install Prerequirements: Follow django channels documenta

Raja Simon 130 Dec 17, 2022
Clearly see and debug your celery cluster in real time!

Clearly see and debug your celery cluster in real time! Do you use celery, and monitor your tasks with flower? You'll probably like Clearly! 👍 Clearl

Rogério Sampaio de Almeida 364 Jan 02, 2023
FastAPI with Celery

Minimal example utilizing fastapi and celery with RabbitMQ for task queue, Redis for celery backend and flower for monitoring the celery tasks.

Grega Vrbančič 371 Jan 01, 2023
Flower is a web based tool for monitoring and administrating Celery clusters.

Real-time monitor and web admin for Celery distributed task queue

Mher Movsisyan 5.5k Jan 02, 2023
A fast and reliable background task processing library for Python 3.

dramatiq A fast and reliable distributed task processing library for Python 3. Changelog: https://dramatiq.io/changelog.html Community: https://groups

Bogdan Popa 3.4k Jan 01, 2023
Redis-backed message queue implementation that can hook into a discord bot written with hikari-lightbulb.

Redis-backed FIFO message queue implementation that can hook into a discord bot written with hikari-lightbulb. This is eventually intended to be the backend communication between a bot and a web dash

thomm.o 7 Dec 05, 2022
PostgreSQL-based Task Queue for Python

Procrastinate: PostgreSQL-based Task Queue for Python Procrastinate is an open-source Python 3.7+ distributed task processing library, leveraging Post

Procrastinate 486 Jan 08, 2023
A simple app that provides django integration for RQ (Redis Queue)

Django-RQ Django integration with RQ, a Redis based Python queuing library. Django-RQ is a simple app that allows you to configure your queues in djan

RQ 1.6k Dec 28, 2022
Django email backend with AWS SES and Celery

Django Celery SES Django Email Backend with Amazon Web Service SES and Celery, developed and used by StreetVoice. This packages provide a EmailBackend

StreetVoice 30 Oct 24, 2022
A multiprocessing distributed task queue for Django

A multiprocessing distributed task queue for Django Features Multiprocessing worker pool Asynchronous tasks Scheduled, cron and repeated tasks Signed

Ilan Steemers 1.7k Jan 03, 2023