Python Stream Processing

Overview

https://raw.githubusercontent.com/robinhood/faust/8ee5e209322d9edf5bdb79b992ef986be2de4bb4/artwork/banner-alt1.png

Python Stream Processing

Build status coverage BSD License faust can be installed via wheel Supported Python versions. Support Python implementations.

Version: 1.10.4
Web: http://faust.readthedocs.io/
Download: http://pypi.org/project/faust
Source: http://github.com/robinhood/faust
Keywords: distributed, stream, async, processing, data, queue, state management
# Python Streams
# Forever scalable event processing & in-memory durable K/V store;
# as a library w/ asyncio & static typing.
import faust

Faust is a stream processing library, porting the ideas from Kafka Streams to Python.

It is used at Robinhood to build high performance distributed systems and real-time data pipelines that process billions of events every day.

Faust provides both stream processing and event processing, sharing similarity with tools such as Kafka Streams, Apache Spark/Storm/Samza/Flink,

It does not use a DSL, it's just Python! This means you can use all your favorite Python libraries when stream processing: NumPy, PyTorch, Pandas, NLTK, Django, Flask, SQLAlchemy, ++

Faust requires Python 3.6 or later for the new async/await syntax, and variable type annotations.

Here's an example processing a stream of incoming orders:

app = faust.App('myapp', broker='kafka://localhost')

# Models describe how messages are serialized:
# {"account_id": "3fae-...", amount": 3}
class Order(faust.Record):
    account_id: str
    amount: int

@app.agent(value_type=Order)
async def order(orders):
    async for order in orders:
        # process infinite stream of orders.
        print(f'Order for {order.account_id}: {order.amount}')

The Agent decorator defines a "stream processor" that essentially consumes from a Kafka topic and does something for every event it receives.

The agent is an async def function, so can also perform other operations asynchronously, such as web requests.

This system can persist state, acting like a database. Tables are named distributed key/value stores you can use as regular Python dictionaries.

Tables are stored locally on each machine using a super fast embedded database written in C++, called RocksDB.

Tables can also store aggregate counts that are optionally "windowed" so you can keep track of "number of clicks from the last day," or "number of clicks in the last hour." for example. Like Kafka Streams, we support tumbling, hopping and sliding windows of time, and old windows can be expired to stop data from filling up.

For reliability we use a Kafka topic as "write-ahead-log". Whenever a key is changed we publish to the changelog. Standby nodes consume from this changelog to keep an exact replica of the data and enables instant recovery should any of the nodes fail.

To the user a table is just a dictionary, but data is persisted between restarts and replicated across nodes so on failover other nodes can take over automatically.

You can count page views by URL:

# data sent to 'clicks' topic sharded by URL key.
# e.g. key="http://example.com" value="1"
click_topic = app.topic('clicks', key_type=str, value_type=int)

# default value for missing URL will be 0 with `default=int`
counts = app.Table('click_counts', default=int)

@app.agent(click_topic)
async def count_click(clicks):
    async for url, count in clicks.items():
        counts[url] += count

The data sent to the Kafka topic is partitioned, which means the clicks will be sharded by URL in such a way that every count for the same URL will be delivered to the same Faust worker instance.

Faust supports any type of stream data: bytes, Unicode and serialized structures, but also comes with "Models" that use modern Python syntax to describe how keys and values in streams are serialized:

# Order is a json serialized dictionary,
# having these fields:

class Order(faust.Record):
    account_id: str
    product_id: str
    price: float
    quantity: float = 1.0

orders_topic = app.topic('orders', key_type=str, value_type=Order)

@app.agent(orders_topic)
async def process_order(orders):
    async for order in orders:
        # process each order using regular Python
        total_price = order.price * order.quantity
        await send_order_received_email(order.account_id, order)

Faust is statically typed, using the mypy type checker, so you can take advantage of static types when writing applications.

The Faust source code is small, well organized, and serves as a good resource for learning the implementation of Kafka Streams.

Learn more about Faust in the introduction introduction page
to read more about Faust, system requirements, installation instructions, community resources, and more.
or go directly to the quickstart tutorial
to see Faust in action by programming a streaming application.
then explore the User Guide
for in-depth information organized by topic.

Faust is...

Simple

Faust is extremely easy to use. To get started using other stream processing solutions you have complicated hello-world projects, and infrastructure requirements. Faust only requires Kafka, the rest is just Python, so If you know Python you can already use Faust to do stream processing, and it can integrate with just about anything.

Here's one of the easier applications you can make:

import faust

class Greeting(faust.Record):
    from_name: str
    to_name: str

app = faust.App('hello-app', broker='kafka://localhost')
topic = app.topic('hello-topic', value_type=Greeting)

@app.agent(topic)
async def hello(greetings):
    async for greeting in greetings:
        print(f'Hello from {greeting.from_name} to {greeting.to_name}')

@app.timer(interval=1.0)
async def example_sender(app):
    await hello.send(
        value=Greeting(from_name='Faust', to_name='you'),
    )

if __name__ == '__main__':
    app.main()

You're probably a bit intimidated by the async and await keywords, but you don't have to know how asyncio works to use Faust: just mimic the examples, and you'll be fine.

The example application starts two tasks: one is processing a stream, the other is a background thread sending events to that stream. In a real-life application, your system will publish events to Kafka topics that your processors can consume from, and the background thread is only needed to feed data into our example.

Highly Available
Faust is highly available and can survive network problems and server crashes. In the case of node failure, it can automatically recover, and tables have standby nodes that will take over.
Distributed
Start more instances of your application as needed.
Fast
A single-core Faust worker instance can already process tens of thousands of events every second, and we are reasonably confident that throughput will increase once we can support a more optimized Kafka client.
Flexible
Faust is just Python, and a stream is an infinite asynchronous iterator. If you know how to use Python, you already know how to use Faust, and it works with your favorite Python libraries like Django, Flask, SQLAlchemy, NTLK, NumPy, SciPy, TensorFlow, etc.

Installation

You can install Faust either via the Python Package Index (PyPI) or from source.

To install using pip:

$ pip install -U faust

Bundles

Faust also defines a group of setuptools extensions that can be used to install Faust and the dependencies for a given feature.

You can specify these in your requirements or on the pip command-line by using brackets. Separate multiple bundles using the comma:

$ pip install "faust[rocksdb]"

$ pip install "faust[rocksdb,uvloop,fast,redis]"

The following bundles are available:

Stores
faust[rocksdb]:

for using RocksDB for storing Faust table state.

Recommended in production.

Caching
faust[redis]: for using Redis_ as a simple caching backend (Memcached-style).
Codecs
faust[yaml]: for using YAML and the PyYAML library in streams.
Optimization
faust[fast]: for installing all the available C speedup extensions to Faust core.
Sensors
faust[datadog]: for using the Datadog Faust monitor.
faust[statsd]: for using the Statsd Faust monitor.
Event Loops
faust[uvloop]: for using Faust with uvloop.
faust[eventlet]: for using Faust with eventlet
Debugging
faust[debug]: for using aiomonitor to connect and debug a running Faust worker.
faust[setproctitle]: when the setproctitle module is installed the Faust worker will use it to set a nicer process name in ps/top listings. Also installed with the fast and debug bundles.

Downloading and installing from source

Download the latest version of Faust from http://pypi.org/project/faust

You can install it by doing:

$ tar xvfz faust-0.0.0.tar.gz
$ cd faust-0.0.0
$ python setup.py build
# python setup.py install

The last command must be executed as a privileged user if you are not currently using a virtualenv.

Using the development version

With pip

You can install the latest snapshot of Faust using the following pip command:

$ pip install https://github.com/robinhood/faust/zipball/master#egg=faust

FAQ

Can I use Faust with Django/Flask/etc.?

Yes! Use eventlet as a bridge to integrate with asyncio.

Using eventlet

This approach works with any blocking Python library that can work with eventlet.

Using eventlet requires you to install the aioeventlet module, and you can install this as a bundle along with Faust:

$ pip install -U faust[eventlet]

Then to actually use eventlet as the event loop you have to either use the -L <faust --loop> argument to the faust program:

$ faust -L eventlet -A myproj worker -l info

or add import mode.loop.eventlet at the top of your entry point script:

#!/usr/bin/env python3
import mode.loop.eventlet  # noqa

Warning

It's very important this is at the very top of the module, and that it executes before you import libraries.

Can I use Faust with Tornado?

Yes! Use the tornado.platform.asyncio bridge: http://www.tornadoweb.org/en/stable/asyncio.html

Can I use Faust with Twisted?

Yes! Use the asyncio reactor implementation: https://twistedmatrix.com/documents/17.1.0/api/twisted.internet.asyncioreactor.html

Will you support Python 2.7 or Python 3.5?

No. Faust requires Python 3.6 or later, since it heavily uses features that were introduced in Python 3.6 (async, await, variable type annotations).

I get a maximum number of open files exceeded error by RocksDB when running a Faust app locally. How can I fix this?

You may need to increase the limit for the maximum number of open files. The following post explains how to do so on OS X: https://blog.dekstroza.io/ulimit-shenanigans-on-osx-el-capitan/

What kafka versions faust supports?

Faust supports kafka with version >= 0.10.

Getting Help

Slack

For discussions about the usage, development, and future of Faust, please join the `fauststream`_ Slack.

Resources

Bug tracker

If you have any suggestions, bug reports, or annoyances please report them to our issue tracker at https://github.com/robinhood/faust/issues/

License

This software is licensed under the New BSD License. See the LICENSE file in the top distribution directory for the full license text.

Contributing

Development of Faust happens at GitHub: https://github.com/robinhood/faust

You're highly encouraged to participate in the development of Faust.

Be sure to also read the Contributing to Faust section in the documentation.

Code of Conduct

Everyone interacting in the project's code bases, issue trackers, chat rooms, and mailing lists is expected to follow the Faust Code of Conduct.

As contributors and maintainers of these projects, and in the interest of fostering an open and welcoming community, we pledge to respect all people who contribute through reporting issues, posting feature requests, updating documentation, submitting pull requests or patches, and other activities.

We are committed to making participation in these projects a harassment-free experience for everyone, regardless of level of experience, gender, gender identity and expression, sexual orientation, disability, personal appearance, body size, race, ethnicity, age, religion, or nationality.

Examples of unacceptable behavior by participants include:

  • The use of sexualized language or imagery
  • Personal attacks
  • Trolling or insulting/derogatory comments
  • Public or private harassment
  • Publishing other's private information, such as physical or electronic addresses, without explicit permission
  • Other unethical or unprofessional conduct.

Project maintainers have the right and responsibility to remove, edit, or reject comments, commits, code, wiki edits, issues, and other contributions that are not aligned to this Code of Conduct. By adopting this Code of Conduct, project maintainers commit themselves to fairly and consistently applying these principles to every aspect of managing this project. Project maintainers who do not follow or enforce the Code of Conduct may be permanently removed from the project team.

This code of conduct applies both within project spaces and in public spaces when an individual is representing the project or its community.

Instances of abusive, harassing, or otherwise unacceptable behavior may be reported by opening an issue or contacting one or more of the project maintainers.

This Code of Conduct is adapted from the Contributor Covenant, version 1.2.0 available at http://contributor-covenant.org/version/1/2/0/.

Comments
  • Memory leak

    Memory leak

    Checklist

    • [x] I have included information about relevant versions
    • [x] I have verified that the issue persists when using the master branch of Faust.

    Steps to reproduce

    Dockerfile with

    FROM python:3-slim-stretch
    
    WORKDIR /faust_bug
    
    RUN apt update -yqq \
     && apt install -y git
    
    #RUN pip install faust
    RUN pip install kafka robinhood-aiokafka
    RUN git clone https://github.com/robinhood/faust.git \
     && cd faust \
     && python setup.py build \
     && python setup.py install
    
    COPY /faust_bug /faust_bug
    
    ENTRYPOINT ["faust", "-A", "faust_app", "worker", "-l", "DEBUG"]
    

    A directory faust_bug containing only file faust_app.py with

    import faust
    import time
    
    app = faust.App('locator', broker='kafka://ip:port', value_serializer='json')
    
    interfaces_topic = app.topic('scan_interfaces', key_type=str)
    
    
    @app.agent(interfaces_topic)
    async def handle_interfaces(results):
        async for switch, result in results.items():
            time.sleep(1)
            print("Single iteration finished")
    

    The container is then deployed as four containers (for the four kafka partitions) inside of a kubernetes cluster via boiler-plate garden.io configurations.

    Expected behavior

    The container consumes from the topic without leaking memory.

    Actual behavior

    The container leaks memory, crashing after 2-3 minutes after hitting the 1gb limit.

    Full traceback

    No error but here are debug logs for a container at ~600mb: https://gist.github.com/surculus12/a8386cfeac54596ce3622a5a898f6e96

    Versions

    • Python version: 3.7.4
    • Faust version: 1.8.0
    • Operating system: python:3-slim-stretch (docker)
    • Kafka version: 2.2
    • RocksDB version (if applicable)
    +ƒaµS† v1.8.0-+------------------------------------------+
    | id          | locator                                  |
    | transport   | [URL('kafka://<ip:port>')]      |
    | store       | memory:                                  |
    | web         | http://localhost:6066/                   |
    | log         | -stderr- (debug)                         |
    | pid         | 1                                        |
    | hostname    | locator-v-1c4e381572-7664d865f7-rr4r4    |
    | platform    | CPython 3.7.3 (Linux x86_64)             |
    | drivers     |                                          |
    |   transport | aiokafka=1.0.4                           |
    |   web       | aiohttp=3.6.1                            |
    | datadir     | /faust_bug/locator-data                  |
    | appdir      | /faust_bug/locator-data/v1               |
    +-------------+------------------------------------------+```
    
    opened by surculus12 20
  • Defining agents programmatically

    Defining agents programmatically

    Hello people.

    I'm working with faust 1.4.5 and I'm trying to define agents programmatically. I almost did it but there's an inconsistent behavior.

    Here's how I do it

    for transformation in get_transformations():
    
        async def _func(messages):
            service=Service(transformation,config)
            async for message in messages:
               # do stuff
                yield result
    
        _func.__name__=transformation.name
        locals()[transformation.name]=_func
        app.agent(app.topic(transformation.input_topic,
                            value_serializer="raw"))((transformation.input_topic,locals()[transformation.name],))
    

    (ignore the part with the locals, it's not really necessary)

    The code work flawlessly as long as I have only one agent. When I have more than one, a message on the topic associated to an agent is delivered to another agent. If I list the agents with the associated command, the topic-agent association looks correct. Obviously on my side the input<->function association is correct (I'm migrating existing services to Faust).

    My assumption is that by defining the agents in this way, some reflection mechanism breaks but I cannot understand what. To further validate this assumption, one important piece of information is that the order in which I define the agents change the behavior. So probably something gets overwritten at every step in the loop and the last agent gets associated to all the topics or viceversa.

    Is there a way to debug this? Some internal variable to check? Because with the debug messages and agents command, everything seems correct.

    opened by chobeat 18
  • Heartbeat failed and hang

    Heartbeat failed and hang

    Checklist

    • [x] I have included information about relevant versions
    • [ ] I have verified that the issue persists when using the master branch of Faust.

    Steps to reproduce

    it works for a while and suddenly hangs

    Expected behavior

    don't hang

    Actual behavior

    I have the worker using threads to call pytesseract, one worker 20 threads (actually it will use 20 subprocess) maybe this is the cause of the hanging.

    Full traceback

    ESC[36mrisk-pytesseract_1          |ESC[0m [2019-01-02 18:11:35,433: WARNING]: Heartbeat failed for group internal_text_recognize because it is rebalancing
    ESC[36mrisk-pytesseract_1          |ESC[0m [2019-01-02 18:11:35,433: INFO]: Revoking previously assigned partitions frozenset({TopicPartition(topic='internal_text_recognize-__assignor-__leader', partition=0), TopicPartition(topic='interna
    l_text_recognize', partition=0)}) for group internal_text_recognize
    ESC[36mrisk-pytesseract_1          |ESC[0m [2019-01-02 18:11:38,432: WARNING]: Heartbeat failed for group internal_text_recognize because it is rebalancing
    ESC[36mrisk-pytesseract_1          |ESC[0m [2019-01-02 18:11:41,466: WARNING]: Heartbeat failed for group internal_text_recognize because it is rebalancing
    ESC[36mrisk-pytesseract_1          |ESC[0m [2019-01-02 18:11:44,431: WARNING]: Heartbeat failed for group internal_text_recognize because it is rebalancing
    ESC[36mrisk-pytesseract_1          |ESC[0m [2019-01-02 18:11:47,430: WARNING]: Heartbeat failed for group internal_text_recognize because it is rebalancing
    ESC[36mrisk-pytesseract_1          |ESC[0m [2019-01-02 18:11:50,431: WARNING]: Heartbeat failed for group internal_text_recognize because it is rebalancing
    ESC[36mrisk-pytesseract_1          |ESC[0m [2019-01-02 18:12:32,507: WARNING]: [^-App]: Warning: Task timed out!
    ESC[36mrisk-pytesseract_1          |ESC[0m [2019-01-02 18:12:32,507: WARNING]: [^-App]: Please make sure it is hanging before restarting.
    ESC[36mrisk-pytesseract_1          |ESC[0m [2019-01-02 18:12:32,507: INFO]: [^-App]: [Flight Recorder-17] (started at Wed Jan  2 18:11:35 2019) Replaying logs...
    ESC[36mrisk-pytesseract_1          |ESC[0m [2019-01-02 18:12:32,507: INFO]: [^-App]: [Flight Recorder-17] (Wed Jan  2 18:11:35 2019) flow_control.suspend()
    ESC[36mrisk-pytesseract_1          |ESC[0m [2019-01-02 18:12:32,508: INFO]: [^-App]: [Flight Recorder-17] (Wed Jan  2 18:11:35 2019) consumer.pause_partitions
    ESC[36mrisk-pytesseract_1          |ESC[0m [2019-01-02 18:12:32,508: INFO]: [^-App]: [Flight Recorder-17] (Wed Jan  2 18:11:35 2019) flow_control.clear()
    ESC[36mrisk-pytesseract_1          |ESC[0m [2019-01-02 18:12:32,508: INFO]: [^-App]: [Flight Recorder-17] (Wed Jan  2 18:11:35 2019) consumer.wait_empty()
    ESC[36mrisk-pytesseract_1          |ESC[0m [2019-01-02 18:12:32,508: INFO]: [^-App]: [Flight Recorder-17] -End of log-
    ESC[36mrisk-pytesseract_1          |ESC[0m [2019-01-02 18:13:33,138: WARNING]: Marking the coordinator dead (node 1001)for group internal_text_recognize.
    ESC[36mrisk-pytesseract_1          |ESC[0m [2019-01-02 18:13:34,638: INFO]: Discovered coordinator 1001 for group internal_text_recognize
    ESC[36mrisk-pytesseract_1          |ESC[0m [2019-01-02 18:13:34,639: WARNING]: Heartbeat failed: local member_id was not recognized; resetting and re-joining group
    ESC[36mrisk-pytesseract_1          |ESC[0m [2019-01-02 18:13:34,639: ERROR]: Heartbeat session expired - marking coordinator dead
    ESC[36mrisk-pytesseract_1          |ESC[0m [2019-01-02 18:13:34,639: WARNING]: Marking the coordinator dead (node 1001)for group internal_text_recognize.
    ESC[36mrisk-pytesseract_1          |ESC[0m [2019-01-02 18:13:36,140: INFO]: Discovered coordinator 1001 for group internal_text_recognize
    ESC[36mrisk-pytesseract_1          |ESC[0m [2019-01-02 18:13:36,141: WARNING]: Heartbeat failed: local member_id was not recognized; resetting and re-joining group
    

    and early time I found this as well

    risk-pytesseract_1          | [2019-01-02 14:03:07,317: WARNING]: Marking the coordinator dead (node 1001)for group internal_text_recognize.
    risk-pytesseract_1          | [2019-01-02 14:03:08,698: WARNING]: [^--Consumer]: Possible livelock: COMMIT OFFSET NOT ADVANCING
    risk-pytesseract_1          | [2019-01-02 14:03:08,817: INFO]: Discovered coordinator 1001 for group internal_text_recognize
    risk-pytesseract_1          | [2019-01-02 14:03:08,818: WARNING]: Heartbeat failed: local member_id was not recognized; resetting and re-joining group
    risk-pytesseract_1          | [2019-01-02 14:03:08,818: ERROR]: Heartbeat session expired - marking coordinator dead
    risk-pytesseract_1          | [2019-01-02 14:03:08,818: WARNING]: Marking the coordinator dead (node 1001)for group internal_text_recognize.
    risk-pytesseract_1          | [2019-01-02 14:03:10,318: INFO]: Discovered coordinator 1001 for group internal_text_recognize
    risk-pytesseract_1          | [2019-01-02 14:03:10,319: WARNING]: Heartbeat failed: local member_id was not recognized; resetting and re-joining group
    risk-pytesseract_1          | [2019-01-02 14:03:10,319: ERROR]: Heartbeat session expired - marking coordinator dead
    risk-pytesseract_1          | [2019-01-02 14:03:10,319: WARNING]: Marking the coordinator dead (node 1001)for group internal_text_recognize.
    risk-pytesseract_1          | [2019-01-02 14:03:11,820: INFO]: Discovered coordinator 1001 for group internal_text_recognize
    

    Versions

    • Python version: 3.6.7
    • Faust version: 1.4.2
    • Operating system: debian strench
    • Kafka version: 2.1.1
    • RocksDB version (if applicable)
    opened by DeoLeung 14
  • cannot launch example tableofset

    cannot launch example tableofset

    Checklist

    • [x] I have included information about relevant versions
    • [x] I have verified that the issue persists when using the master branch of Faust.

    Steps to reproduce

    Tell us what you did to cause something to happen.

    Expected behavior

    Tell us what you expected to happen.

    Actual behavior

    Tell us what happened instead.

    Full traceback

    Paste the full traceback (if there is any)
    

    faust -A tableofset worker |more +ƒaµS† v1.7.4-+------------------------------------------------------------------------------------+ | id | table-of-sets-windowed-v2 | | transport | [URL('kafka://localhost:9092')] | | store | memory: | | web | http://localhost:6066/ | | log | -stderr- (warn) | | pid | 22478 | | hostname | pascal-asus | | platform | CPython 3.7.3 (Linux x86_64) | | drivers | | | transport | aiokafka=1.0.4 | | web | aiohttp=3.5.4 | | datadir | /home/hadoop/kafka_2.11-2.3.0/faust-master/examples/table-of-sets-windowed-data | | appdir | /home/hadoop/kafka_2.11-2.3.0/faust-master/examples/table-of-sets-windowed-data/v2 | +-------------+------------------------------------------------------------------------------------+ [2019-10-13 17:29:19,486: ERROR]: [^Worker]: Error: TypeError("Channel must be channel, topic, or str; not <class 'coroutine'>") Traceback (most recent call last): File "/home/hadoop/.local/lib/python3.7/site-packages/mode/worker.py", line 264, in execute_from_commandline self.loop.run_until_complete(self._starting_fut) File "/usr/lib/python3.7/asyncio/base_events.py", line 584, in run_until_complete return future.result() File "/home/hadoop/.local/lib/python3.7/site-packages/mode/services.py", line 719, in start await self._default_start() File "/home/hadoop/.local/lib/python3.7/site-packages/mode/services.py", line 726, in _default_start await self._actually_start() File "/home/hadoop/.local/lib/python3.7/site-packages/mode/services.py", line 750, in _actually_start await child.maybe_start() File "/home/hadoop/.local/lib/python3.7/site-packages/mode/services.py", line 778, in maybe_start await self.start() File "/home/hadoop/.local/lib/python3.7/site-packages/mode/services.py", line 719, in start await self._default_start() File "/home/hadoop/.local/lib/python3.7/site-packages/mode/services.py", line 726, in _default_start await self._actually_start() File "/home/hadoop/.local/lib/python3.7/site-packages/mode/services.py", line 750, in _actually_start await child.maybe_start() File "/home/hadoop/.local/lib/python3.7/site-packages/mode/services.py", line 778, in maybe_start await self.start() File "/home/hadoop/.local/lib/python3.7/site-packages/mode/services.py", line 719, in start await self._default_start() File "/home/hadoop/.local/lib/python3.7/site-packages/mode/services.py", line 726, in _default_start await self._actually_start() File "/home/hadoop/.local/lib/python3.7/site-packages/mode/services.py", line 743, in _actually_start await self.on_start() File "/usr/local/lib/python3.7/dist-packages/faust/agents/manager.py", line 31, in on_start self.update_topic_index() File "/usr/local/lib/python3.7/dist-packages/faust/agents/manager.py", line 64, in update_topic_index for topic in agent.get_topic_names(): File "/usr/local/lib/python3.7/dist-packages/faust/agents/agent.py", line 965, in get_topic_names channel = self.channel File "/usr/local/lib/python3.7/dist-packages/faust/agents/agent.py", line 978, in channel **self._channel_kwargs, File "/usr/local/lib/python3.7/dist-packages/faust/agents/agent.py", line 488, in _prepare_channel f'Channel must be channel, topic, or str; not {type(channel)}') TypeError: Channel must be channel, topic, or str; not <class 'coroutine'>

    Versions

    • Python version 2.7.16 3.7.3
    • Faust version 1.7.4
    • Operating system ubuntu 19.04
    • Kafka version 2.11-2.3.0
    • RocksDB version (if applicable)
    opened by PGDataHome 13
  • Can't change the partition number of changelog topic.

    Can't change the partition number of changelog topic.

    I am trying to change the partition number of changelog topic, but i always have 8 partitions

    Here is my config:

    
    app = App(__name__,
              version=2,
              topic_partitions=1,
              key_serializer='json',
              value_serializer='json',
              # store='rocksdb://',
              broker='kafka://kafka:29092')
    worker_commands = app.topic('worker_commands',
                                value_type=Command,
                                partitions=1)
    worker_events = app.topic('worker_events',
                              value_type=Event,
                              partitions=1)
    accounts = app.Table('accounts',
                         default=Account,
                         partitions=1)
    
    

    I am expecting that the number of source topics will be equal to changelog topics

    Actual behavior

    I have the partition number of source topics equals to 1 BUT the number of changelog topics is always equals to 8

    Traceback

    faust        | [2019-03-30 20:37:11,268: ERROR]: [^---Agent*: agent.reduce_command_model]: Crashed reason=PartitionsMismatch("The source topic 'worker_events' for table 'accounts'\n
    has 1 partitions, but the changelog\ntopic 'agent-v2-accounts-changelog' has 8 partitions.\n\nPlease make sure the topics have the same number of partitions\nby configuring Kafka co
    rrectly.\n")
    faust        | Traceback (most recent call last):
    faust        |   File "/usr/local/lib/python3.7/site-packages/faust/agents/agent.py", line 601, in _execute_task
    faust        |     await coro
    faust        |   File "/srv/app/agent.py", line 83, in reduce_command_model
    faust        |     accounts[event.aggregate_id] = account
    faust        |   File "/usr/local/lib/python3.7/site-packages/mode/utils/collections.py", line 505, in __setitem__
    faust        |     self.on_key_set(key, value)
    faust        |   File "/usr/local/lib/python3.7/site-packages/faust/tables/table.py", line 72, in on_key_set
    faust        |     self._send_changelog(event, key, value)
    faust        |   File "/usr/local/lib/python3.7/site-packages/faust/tables/base.py", line 228, in _send_changelog
    faust        |     self._verify_source_topic_partitions(event)
    faust        |   File "/usr/local/lib/python3.7/site-packages/faust/tables/base.py", line 257, in _verify_source_topic_partitions
    faust        |     change_n=change_n,
    faust        | faust.exceptions.PartitionsMismatch: The source topic 'worker_events' for table 'accounts'
    faust        | has 1 partitions, but the changelog
    faust        | topic 'agent-v2-accounts-changelog' has 8 partitions.
    faust        |
    faust        | Please make sure the topics have the same number of partitions
    faust        | by configuring Kafka correctly.
    

    Versions

    • Python version 3.7
    • Faust version 1.5.2
    • Operating system Ubuntu 18.04
    • Kafka version 2.1.0
    opened by zhorzh 13
  • faust v1.9.0 not working with sasl authentication

    faust v1.9.0 not working with sasl authentication

    Steps to reproduce

    configure faust to use sasl

    app = faust.App( , broker=broker, key_serializer='json', value_serializer='json', broker_credentials=faust.SASLCredentials( username=, password=, ) )

    Expected behavior

    That it works. (It works as expected with v1.8.0)

    Actual behavior

    program dies with a AssertionError during startup of the worker

    Full traceback

    [2019-11-06 14:27:33,119] [5519] [ERROR] [^Worker]: Error: AssertionError("yield from wasn't used with future",) Traceback (most recent call last): File "/usr/local/lib/python3.6/site-packages/mode/worker.py", line 267, in execute_from_commandline self.loop.run_until_complete(self._starting_fut) File "/usr/lib64/python3.6/asyncio/base_events.py", line 484, in run_until_complete return future.result() File "/usr/local/lib/python3.6/site-packages/mode/services.py", line 727, in start await self._default_start() File "/usr/local/lib/python3.6/site-packages/mode/services.py", line 734, in _default_start await self._actually_start() File "/usr/local/lib/python3.6/site-packages/mode/services.py", line 758, in _actually_start await child.maybe_start() File "/usr/local/lib/python3.6/site-packages/mode/services.py", line 786, in maybe_start await self.start() File "/usr/local/lib/python3.6/site-packages/mode/services.py", line 727, in start await self._default_start() File "/usr/local/lib/python3.6/site-packages/mode/services.py", line 734, in _default_start await self._actually_start() File "/usr/local/lib/python3.6/site-packages/mode/services.py", line 758, in _actually_start await child.maybe_start() File "/usr/local/lib/python3.6/site-packages/mode/services.py", line 786, in maybe_start await self.start() File "/usr/local/lib/python3.6/site-packages/mode/services.py", line 727, in start await self._default_start() File "/usr/local/lib/python3.6/site-packages/mode/services.py", line 734, in _default_start await self._actually_start() File "/usr/local/lib/python3.6/site-packages/mode/services.py", line 751, in _actually_start await self.on_start() File "/usr/local/lib/python3.6/site-packages/faust/transport/drivers/aiokafka.py", line 742, in on_start await producer.start() File "/usr/local/lib/python3.6/site-packages/aiokafka/producer/producer.py", line 171, in start await self.client.bootstrap() File "/usr/local/lib/python3.6/site-packages/aiokafka/client.py", line 203, in bootstrap version_hint=version_hint) File "/usr/local/lib/python3.6/site-packages/aiokafka/conn.py", line 90, in create_conn await conn.connect() File "/usr/local/lib/python3.6/site-packages/aiokafka/conn.py", line 214, in connect await self._do_sasl_handshake() File "/usr/local/lib/python3.6/site-packages/aiokafka/conn.py", line 281, in _do_sasl_handshake payload, expect_response = res AssertionError: yield from wasn't used with future

    Versions

    CPython 3.6.8 (Linux x86_64) faust=1.9.0 aiokafka=1.1.3 aiohttp=3.6.2
    linux=3.10.0-957.27.2.el7.x86_64

    opened by jabonte 9
  • How to define the web port in faust application?

    How to define the web port in faust application?

    I am unable to run two different faust applications. Could you please help me out to define the web port in faust application?

          worker  = Worker(app, loglevel="INFO")
          worker.execute_from_commandline()
    
    opened by Mahamutha 9
  • using multiple worker only one is working

    using multiple worker only one is working

    Checklist

    • [x] I have included information about relevant versions
    • [x] I have verified that the issue persists when using the master branch of Faust.

    Steps to reproduce

    use docker compose to start a worker with scale: 10 each worker has concurrency 1

    Expected behavior

    10 worker works together

    Actual behavior

    only one is working

    Full traceback

    the others are just ready

    risk-pytesseract_1          | [2019-01-02 23:14:53,562: INFO]: (Re-)joining group internal_text_recognize
    risk-pytesseract_1          | [2019-01-02 23:15:33,564: ERROR]: Error sending JoinGroupRequest_v0 to node 1001 [[Error 7] RequestTimedOutError] -- marking coordinator dead
    risk-pytesseract_1          | [2019-01-02 23:15:33,567: WARNING]: Marking the coordinator dead (node 1001)for group internal_text_recognize.
    risk-pytesseract_1          | [2019-01-02 23:15:33,569: INFO]: Discovered coordinator 1001 for group internal_text_recognize
    risk-pytesseract_1          | [2019-01-02 23:15:33,569: INFO]: (Re-)joining group internal_text_recognize
    risk-pytesseract_1          | [2019-01-02 23:15:44,086: INFO]: Joined group 'internal_text_recognize' (generation 19) with member_id faust-1.4.2-94ff910d-603f-406d-b993-e2ba6b7929a8
    risk-pytesseract_1          | [2019-01-02 23:16:24,087: ERROR]: Error sending SyncGroupRequest_v0 to node 1001 [[Error 7] RequestTimedOutError] -- marking coordinator dead
    risk-pytesseract_1          | [2019-01-02 23:16:24,088: WARNING]: Marking the coordinator dead (node 1001)for group internal_text_recognize.
    risk-pytesseract_1          | [2019-01-02 23:16:24,090: INFO]: Discovered coordinator 1001 for group internal_text_recognize
    risk-pytesseract_1          | [2019-01-02 23:16:24,090: INFO]: (Re-)joining group internal_text_recognize
    risk-pytesseract_1          | [2019-01-02 23:16:24,091: INFO]: Joined group 'internal_text_recognize' (generation 19) with member_id faust-1.4.2-94ff910d-603f-406d-b993-e2ba6b7929a8
    risk-pytesseract_1          | [2019-01-02 23:16:44,086: INFO]: (Re-)joining group internal_text_recognize
    risk-pytesseract_1          | [2019-01-02 23:16:44,089: INFO]: Joined group 'internal_text_recognize' (generation 20) with member_id faust-1.4.2-94ff910d-603f-406d-b993-e2ba6b7929a8
    risk-pytesseract_1          | [2019-01-02 23:16:44,094: INFO]: Successfully synced group internal_text_recognize with generation 20
    risk-pytesseract_1          | [2019-01-02 23:16:44,095: INFO]: Setting newly assigned partitions set() for group internal_text_recognize
    risk-pytesseract_1          | [2019-01-02 23:16:47,212: INFO]: [^---Recovery]: Resuming flow...
    risk-pytesseract_1          | [2019-01-02 23:16:47,213: INFO]: [^---Recovery]: Resuming streams with empty assignment
    risk-pytesseract_1          | [2019-01-02 23:16:47,213: INFO]: [^--Fetcher]: Starting...
    risk-pytesseract_1          | [2019-01-02 23:16:47,213: INFO]: [^---Recovery]: Worker ready
    risk-pytesseract_1          | [2019-01-02 23:16:47,214: INFO]: [^Worker]: Ready
    

    Versions

    • Python version: 3.6.7
    • Faust version: 1.4.2
    • Operating system: debian strench
    • Kafka version: 2.1.1
    • RocksDB version (if applicable)
    opened by DeoLeung 9
  • how to manual commit offset

    how to manual commit offset

    I would like to processing message from kafka broker, and I don't want to missing or re-read message from kafka, so I want to manual commit offset when program execute sucess。I try to set acks=Falsein faust.App.topic,but when I set like that faust will not commit offset,even if call app.comit(topics=None).so I ask for help what should i do for that situation. thinks!

    opened by hys20151008 9
  • More workers then partitions cause re-balance to hang

    More workers then partitions cause re-balance to hang

    Hey guys, first of all - this is an awesome library ! It's gives a solution to a problem a lot of developers has in data-heavy applications. :)

    Steps to reproduce

    Create a kafka topic with 1 partition, and start 3 agents for that topic. You will see that the agents will hang when they will try to re-balance.

    Expected behavior

    They should re-balance, and only one should consume messages since 3 workers > 1 partition

    Actual behavior

    All agents freeze upon re-balance.

    Versions

    Faust version 1.0.27 Python 3.7.0 without any extension

    • Python version 3.7.0
    • Faust version 1.0.27
    • Operating system alpine3.8
    • Kafka version 1.1.1
    opened by amitripshtos 9
  • Faust app does not consume kafka messages after  livelock: COMMIT OFFSET NOT ADVANCING

    Faust app does not consume kafka messages after livelock: COMMIT OFFSET NOT ADVANCING

    FAUST version: 1.7.1 Kafka Version: SCALA -2.12, KAFKA-2.1.0 PYTHON - 3.7.3

    I see the error sometimes: [2019-10-19 18:12:34,104: WARNING]: [^--Consumer]: wait_empty: Waiting for [(0, <ConsumerMessage: TopicPartition(topic='text', partition=14) offset=60157>)] tasks [2019-10-19 18:12:34,649: WARNING]: [^--Consumer]: Possible livelock: COMMIT OFFSET NOT ADVANCING

    At this time, KAFKA LAG for the given partition does not change and the consumer stays in the KAFKA consumer group for the given partition. The only way to commit the offsets is to restart the APP. Is there a way to handle it programmatically in FAUST APP? Or any other suggestions?

    opened by amitkg29 8
  • asyncio.exceptions.CancelledError and consumer stops

    asyncio.exceptions.CancelledError and consumer stops

    Checklist

    • [x] I have included information about relevant versions
    • [ ] I have verified that the issue persists when using the master branch of Faust.

    Steps to reproduce

    Have not been able to reproduce

    Expected behavior

    Not have the consumers stop processing

    Actual behavior

    Have a consumer that is deployed in 4 hosts, consuming from a topic on 16 partitions (4 per host). Have seen the consumer get the exception below and stoped consuming messages, with the following showing in the logs:

    Some consumer configurations: broker_heartbeat_interval: 6 broker_request_timeout: 180 broker_session_timeout: 120 broker_max_poll_records: 300

    Full traceback

    2022-11-29T15:20:36.894Z mss_sa_refresh_stream_processor INFO [request_id={173e3da6-6ee3-4fa9-ba16-460a14961ba8} transaction_id={5443682e-7eb8-4012-8f8b-8efe6cca45cc} logger={mss_sa_refresh_stream_processor.svc_loc_refresh.agents}] End processing event KEY: 6220141. Duration: 71 ms
    2022-11-29T15:20:36.895Z mss_sa_refresh_stream_processor INFO [request_id={03336c53-750c-4520-a4d9-638493a068ec} transaction_id={739ee30b-d420-451e-9aba-b92f6b6597b9} logger={mss_sa_refresh_stream_processor.svc_loc_refresh.agents}] Start processing event KEY: 6191034
    2022-11-29T15:20:36.968Z mss_sa_refresh_stream_processor INFO [request_id={03336c53-750c-4520-a4d9-638493a068ec} transaction_id={739ee30b-d420-451e-9aba-b92f6b6597b9} logger={mss_sa_refresh_stream_processor.svc_loc_refresh.agents}] End processing event KEY: 6191034. Duration: 73 ms
    2022-11-29T15:20:36.969Z mss_sa_refresh_stream_processor INFO [request_id={04195a4c-c944-44b0-8e4a-e9cdf91ecc9f} transaction_id={72d56cb5-a0df-4b7f-b775-c64ee024e2f9} logger={mss_sa_refresh_stream_processor.svc_loc_refresh.agents}] Start processing event KEY: 6190948
    2022-11-29T15:20:37.042Z mss_sa_refresh_stream_processor INFO [request_id={04195a4c-c944-44b0-8e4a-e9cdf91ecc9f} transaction_id={72d56cb5-a0df-4b7f-b775-c64ee024e2f9} logger={mss_sa_refresh_stream_processor.svc_loc_refresh.agents}] End processing event KEY: 6190948. Duration: 74 ms
    2022-11-29T15:20:37.042Z mss_sa_refresh_stream_processor INFO [request_id={4f58da8f-4422-444b-9aca-2772e00b3778} transaction_id={f74748f1-845d-46b1-aa1c-8f90ae1956a3} logger={mss_sa_refresh_stream_processor.svc_loc_refresh.agents}] Start processing event KEY: 6221813
    2022-11-29T15:20:37.138Z mss_sa_refresh_stream_processor WARNING [logger={mode.redirect}] Exception in thread
    2022-11-29T15:20:37.138Z mss_sa_refresh_stream_processor WARNING [logger={mode.redirect}] Thread-1
    2022-11-29T15:20:37.139Z mss_sa_refresh_stream_processor WARNING [logger={mode.redirect}] :
    2022-11-29T15:20:37.139Z mss_sa_refresh_stream_processor WARNING [logger={mode.redirect}] Traceback (most recent call last):
    2022-11-29T15:20:37.140Z mss_sa_refresh_stream_processor WARNING [logger={mode.redirect}] File "/opt/abc/mss-sa-refresh-stream-processor/releases/20221122T162351/venv/lib64/python3.8/site-packages/mode/utils/locks.py", line 76, in wait
    2022-11-29T15:20:37.140Z mss_sa_refresh_stream_processor WARNING [logger={mode.redirect}] await fut
    2022-11-29T15:20:37.141Z mss_sa_refresh_stream_processor WARNING [logger={mode.redirect}] asyncio.exceptions
    2022-11-29T15:20:37.141Z mss_sa_refresh_stream_processor WARNING [logger={mode.redirect}] .
    2022-11-29T15:20:37.141Z mss_sa_refresh_stream_processor WARNING [logger={mode.redirect}] CancelledError
    2022-11-29T15:20:37.141Z mss_sa_refresh_stream_processor WARNING [logger={mode.redirect}] The above exception was the direct cause of the following exception:
    2022-11-29T15:20:37.141Z mss_sa_refresh_stream_processor WARNING [logger={mode.redirect}] Traceback (most recent call last):
    2022-11-29T15:20:37.141Z mss_sa_refresh_stream_processor WARNING [logger={mode.redirect}] File "/opt/rh/rh-python38/root/usr/lib64/python3.8/threading.py", line 932, in _bootstrap_inner
    2022-11-29T15:20:37.142Z mss_sa_refresh_stream_processor WARNING [logger={mode.redirect}] self.run()
    2022-11-29T15:20:37.142Z mss_sa_refresh_stream_processor WARNING [logger={mode.redirect}] File "/opt/abc/mss-sa-refresh-stream-processor/releases/20221122T162351/venv/lib64/python3.8/site-packages/mode/threads.py", line 66, in run
    2022-11-29T15:20:37.142Z mss_sa_refresh_stream_processor WARNING [logger={mode.redirect}] self.service._start_thread()
    2022-11-29T15:20:37.142Z mss_sa_refresh_stream_processor WARNING [logger={mode.redirect}] File "/opt/abc/mss-sa-refresh-stream-processor/releases/20221122T162351/venv/lib64/python3.8/site-packages/mode/threads.py", line 211, in _start_thread
    2022-11-29T15:20:37.143Z mss_sa_refresh_stream_processor WARNING [logger={mode.redirect}] self.thread_loop.run_until_complete(self._serve())
    2022-11-29T15:20:37.143Z mss_sa_refresh_stream_processor WARNING [logger={mode.redirect}] File "/opt/rh/rh-python38/root/usr/lib64/python3.8/asyncio/base_events.py", line 603, in run_until_complete
    2022-11-29T15:20:37.143Z mss_sa_refresh_stream_processor WARNING [logger={mode.redirect}] self.run_forever()
    2022-11-29T15:20:37.143Z mss_sa_refresh_stream_processor WARNING [logger={mode.redirect}] File "/opt/rh/rh-python38/root/usr/lib64/python3.8/asyncio/base_events.py", line 570, in run_forever
    2022-11-29T15:20:37.143Z mss_sa_refresh_stream_processor WARNING [logger={mode.redirect}] self._run_once()
    2022-11-29T15:20:37.143Z mss_sa_refresh_stream_processor WARNING [logger={mode.redirect}] File "/opt/rh/rh-python38/root/usr/lib64/python3.8/asyncio/base_events.py", line 1859, in _run_once
    2022-11-29T15:20:37.145Z mss_sa_refresh_stream_processor INFO [request_id={4f58da8f-4422-444b-9aca-2772e00b3778} transaction_id={f74748f1-845d-46b1-aa1c-8f90ae1956a3} logger={mss_sa_refresh_stream_processor.svc_loc_refresh.agents}] End processing event KEY: 6221813. Duration: 102 ms
    2022-11-29T15:20:37.146Z mss_sa_refresh_stream_processor INFO [request_id={d4ddb314-1b8f-4bc3-9508-a9ca6f5ebabd} transaction_id={8ecb1508-5719-4687-9b08-ed9018054797} logger={mss_sa_refresh_stream_processor.svc_loc_refresh.agents}] Start processing event KEY: 6434463
    2022-11-29T15:20:37.150Z mss_sa_refresh_stream_processor WARNING [logger={mode.redirect}] handle._run()
    2022-11-29T15:20:37.150Z mss_sa_refresh_stream_processor WARNING [logger={mode.redirect}] SystemError
    2022-11-29T15:20:37.150Z mss_sa_refresh_stream_processor WARNING [logger={mode.redirect}] :
    2022-11-29T15:20:37.150Z mss_sa_refresh_stream_processor WARNING [logger={mode.redirect}] PyEval_EvalFrameEx returned a result with an error set
    2022-11-29T15:20:37.222Z mss_sa_refresh_stream_processor INFO [request_id={d4ddb314-1b8f-4bc3-9508-a9ca6f5ebabd} transaction_id={8ecb1508-5719-4687-9b08-ed9018054797} logger={mss_sa_refresh_stream_processor.svc_loc_refresh.agents}] End processing event KEY: 6434463. Duration: 77 ms
    2022-11-29T15:25:37.943Z mss_sa_refresh_stream_processor WARNING [logger={faust.transport.drivers.aiokafka}] [^--Consumer]: Warning: Task timed out!
    2022-11-29T15:25:37.944Z mss_sa_refresh_stream_processor WARNING [logger={faust.transport.drivers.aiokafka}] [^--Consumer]: Please make sure it's hanging before restart.
    2022-11-29T15:25:37.944Z mss_sa_refresh_stream_processor INFO [logger={faust.transport.drivers.aiokafka}] [^--Consumer]: [Flight Recorder-7113] (started at Tue Nov 29 15:20:37 2022) Replaying logs...
    2022-11-29T15:25:37.944Z mss_sa_refresh_stream_processor INFO [logger={faust.transport.drivers.aiokafka}] [^--Consumer]: [Flight Recorder-7113] (Tue Nov 29 15:20:37 2022) +consumer.commit()
    2022-11-29T15:25:37.944Z mss_sa_refresh_stream_processor INFO [logger={faust.transport.drivers.aiokafka}] [^--Consumer]: [Flight Recorder-7113] -End of log-
    2022-11-29T15:25:37.944Z mss_sa_refresh_stream_processor INFO [logger={faust.transport.drivers.aiokafka}] [^--Consumer]: [Flight Recorder-7113] Task traceback
    2022-11-29T15:25:37.946Z mss_sa_refresh_stream_processor INFO [logger={faust.transport.drivers.aiokafka}] [^--Consumer]: Stack for <Task pending name='<coroutine object Consumer._commit_handler at 0x7f88c9e183c0>' coro=<Service._execute_task() running at /opt/abc/mss-sa-refresh-stream-processor/releases/20221122T162351/venv/lib64/python3.8/site-packages/mode/services.py:779> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x7f88bacfd340>()]> cb=[Service._on_future_done()]> (most recent call last):
    0>' coro=<Service._execute_task() running at /opt/abc/mss-sa-refresh-stream-processor/releases/20221122T162351/venv/lib64/python3.8/site-packages/mode/services.py:779> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x7f88bacfd340>()]> cb=[Service._on_future_done()]> (most recent call last):
      File "/opt/abc/mss-sa-refresh-stream-processor/releases/20221122T162351/venv/lib64/python3.8/site-packages/mode/services.py", line 779, in _execute_task
        await task
      File "/opt/abc/mss-sa-refresh-stream-processor/releases/20221122T162351/venv/lib64/python3.8/site-packages/faust/transport/consumer.py", line 783, in _commit_handler
        await self.commit()
      File "/opt/abc/mss-sa-refresh-stream-processor/releases/20221122T162351/venv/lib64/python3.8/site-packages/faust/transport/consumer.py", line 822, in commit
        return await self.force_commit(
      File "/opt/abc/mss-sa-refresh-stream-processor/releases/20221122T162351/venv/lib64/python3.8/site-packages/mode/services.py", line 459, in _and_transition
        return await fun(self, *args, **kwargs)
      File "/opt/abc/mss-sa-refresh-stream-processor/releases/20221122T162351/venv/lib64/python3.8/site-packages/faust/transport/consumer.py", line 857, in force_commit
        did_commit = await self._commit_tps(
      File "/opt/abc/mss-sa-refresh-stream-processor/releases/20221122T162351/venv/lib64/python3.8/site-packages/faust/transport/consumer.py", line 874, in _commit_tps
        return await self._commit_offsets(
      File "/opt/abc/mss-sa-refresh-stream-processor/releases/20221122T162351/venv/lib64/python3.8/site-packages/faust/transport/consumer.py", line 943, in _commit_offsets
        did_commit = await self._commit(committable_offsets)
      File "/opt/abc/mss-sa-refresh-stream-processor/releases/20221122T162351/venv/lib64/python3.8/site-packages/faust/transport/consumer.py", line 1311, in _commit
        return await self._thread.commit(offsets)
      File "/opt/abc/mss-sa-refresh-stream-processor/releases/20221122T162351/venv/lib64/python3.8/site-packages/faust/transport/drivers/aiokafka.py", line 523, in commit
        return await self.call_thread(self._commit, offsets)
      File "/opt/abc/mss-sa-refresh-stream-processor/releases/20221122T162351/venv/lib64/python3.8/site-packages/mode/threads.py", line 436, in call_thread
        result = await promise
    

    Versions

    • Python version: 3.8.11
    • Faust version: 1.10.4
    • Operating system: Linux 3.10.0-693.el7.x86_64
    • Kafka version: 2.6.0
    • aiohttp version: 3.8.3
    opened by lachuta 0
  • Consumer crashes sporadically and stops consuming - Crashed reason=ConsumerStoppedError()

    Consumer crashes sporadically and stops consuming - Crashed reason=ConsumerStoppedError()

    Checklist

    • [x] I have included information about relevant versions
    • [ ] I have verified that the issue persists when using the master branch of Faust.

    Steps to reproduce

    Have not been able to reproduce

    Expected behavior

    Not have the consumers crashed

    Actual behavior

    Have a consumer that is deployed in 4 hosts, consuming from a topic on 16 partitions (4 per host). 2 to 3 times a week a consumer crashes, stops consuming messages, with the following showing in the logs:

    Some consumer configurations: broker_heartbeat_interval: 6 broker_request_timeout: 180
    broker_session_timeout: 120 broker_max_poll_records: 300

    Full traceback

    2022-11-21T00:01:02.132Z nid_martens_sa_stream_processor ERROR [logger={faust.transport.consumer}] [^---Fetcher]: Crashed reason=ConsumerStoppedError()
    Traceback (most recent call last):
      File "/opt/abc/nid-martens-sa-stream-processor/releases/20221116T163245/venv/lib64/python3.8/site-packages/mode/services.py", line 779, in _execute_task
        await task
      File "/opt/abc/nid-martens-sa-stream-processor/releases/20221116T163245/venv/lib64/python3.8/site-packages/faust/transport/consumer.py", line 176, in _fetcher
        await self._drainer
      File "/opt/abc/nid-martens-sa-stream-processor/releases/20221116T163245/venv/lib64/python3.8/site-packages/faust/transport/consumer.py", line 1039, in _drain_messages
        async for tp, message in ait:
      File "/opt/abc/nid-martens-sa-stream-processor/releases/20221116T163245/venv/lib64/python3.8/site-packages/faust/transport/consumer.py", line 640, in getmany
        records, active_partitions = await self._wait_next_records(timeout)
      File "/opt/abc/nid-martens-sa-stream-processor/releases/20221116T163245/venv/lib64/python3.8/site-packages/faust/transport/consumer.py", line 676, in _wait_next_records
        records = await self._getmany(
      File "/opt/abc/nid-martens-sa-stream-processor/releases/20221116T163245/venv/lib64/python3.8/site-packages/faust/transport/consumer.py", line 1269, in _getmany
        return await self._thread.getmany(active_partitions, timeout)
      File "/opt/abc/nid-martens-sa-stream-processor/releases/20221116T163245/venv/lib64/python3.8/site-packages/faust/transport/drivers/aiokafka.py", line 805, in getmany
        return await self.call_thread(
      File "/opt/abc/nid-martens-sa-stream-processor/releases/20221116T163245/venv/lib64/python3.8/site-packages/mode/threads.py", line 436, in call_thread
        result = await promise
      File "/opt/abc/nid-martens-sa-stream-processor/releases/20221116T163245/venv/lib64/python3.8/site-packages/mode/threads.py", line 383, in _process_enqueued
        result = await maybe_async(method(*args, **kwargs))
      File "/opt/abc/nid-martens-sa-stream-processor/releases/20221116T163245/venv/lib64/python3.8/site-packages/mode/utils/futures.py", line 134, in maybe_async
        return await res
      File "/opt/abc/nid-martens-sa-stream-processor/releases/20221116T163245/venv/lib64/python3.8/site-packages/faust/transport/drivers/aiokafka.py", line 822, in _fetch_records
        raise ConsumerStoppedError()
    aiokafka.errors.ConsumerStoppedError
    2022-11-21T00:01:02.137Z nid_martens_sa_stream_processor INFO [logger={faust.worker}] [^Worker]: Stopping...
    2022-11-21T00:01:02.138Z nid_martens_sa_stream_processor INFO [logger={abc_logging_faust.app}] [^-abcLoggingFaustApp]: Stopping...
    2022-11-21T00:01:02.138Z nid_martens_sa_stream_processor INFO [logger={faust.transport.consumer}] [^---Fetcher]: Stopping...
    2022-11-21T00:01:02.138Z nid_martens_sa_stream_processor INFO [logger={abc_logging_faust.app}] [^-abcLoggingFaustApp]: Wait for streams...
    2022-11-21T00:01:02.168Z nid_martens_sa_stream_processor INFO [logger={aiokafka.consumer.group_coordinator}] LeaveGroup request succeeded
    2022-11-21T00:01:02.168Z nid_martens_sa_stream_processor WARNING [logger={mode.redirect}] Exception in thread
    2022-11-21T00:01:02.168Z nid_martens_sa_stream_processor WARNING [logger={mode.redirect}] Thread-1
    2022-11-21T00:01:02.169Z nid_martens_sa_stream_processor WARNING [logger={mode.redirect}] :
    2022-11-21T00:01:02.169Z nid_martens_sa_stream_processor WARNING [logger={mode.redirect}] StopIteration
    2022-11-21T00:01:02.170Z nid_martens_sa_stream_processor WARNING [logger={mode.redirect}] :
    2022-11-21T00:01:02.170Z nid_martens_sa_stream_processor WARNING [logger={mode.redirect}] FetchResponse_v4(throttle_time_ms=0, topics=[(topics='addr-pair-gather', partitions=[(partition=8, error_code=0, highwater_offset=466624, last_stable_offset=466624, aborted_transactions=NULL, message_set=b'')])])
    2022-11-21T00:01:02.170Z nid_martens_sa_stream_processor WARNING [logger={mode.redirect}] The above exception was the direct cause of the following exception:
    2022-11-21T00:01:02.170Z nid_martens_sa_stream_processor WARNING [logger={mode.redirect}] Traceback (most recent call last):
    2022-11-21T00:01:02.170Z nid_martens_sa_stream_processor WARNING [logger={mode.redirect}] File "/opt/rh/rh-python38/root/usr/lib64/python3.8/threading.py", line 932, in _bootstrap_inner
    2022-11-21T00:01:02.171Z nid_martens_sa_stream_processor WARNING [logger={mode.redirect}] self.run()
    2022-11-21T00:01:02.171Z nid_martens_sa_stream_processor WARNING [logger={mode.redirect}] File "/opt/abc/nid-martens-sa-stream-processor/releases/20221116T163245/venv/lib64/python3.8/site-packages/mode/threads.py", line 66, in run
    2022-11-21T00:01:02.171Z nid_martens_sa_stream_processor WARNING [logger={mode.redirect}] self.service._start_thread()
    2022-11-21T00:01:02.171Z nid_martens_sa_stream_processor WARNING [logger={mode.redirect}] File "/opt/abc/nid-martens-sa-stream-processor/releases/20221116T163245/venv/lib64/python3.8/site-packages/mode/threads.py", line 211, in _start_thread
    2022-11-21T00:01:02.171Z nid_martens_sa_stream_processor WARNING [logger={mode.redirect}] self.thread_loop.run_until_complete(self._serve())
    2022-11-21T00:01:02.172Z nid_martens_sa_stream_processor WARNING [logger={mode.redirect}] File "/opt/rh/rh-python38/root/usr/lib64/python3.8/asyncio/base_events.py", line 616, in run_until_complete
    2022-11-21T00:01:02.172Z nid_martens_sa_stream_processor WARNING [logger={mode.redirect}] return future.result()
    2022-11-21T00:01:02.172Z nid_martens_sa_stream_processor WARNING [logger={mode.redirect}] File "/opt/abc/nid-martens-sa-stream-processor/releases/20221116T163245/venv/lib64/python3.8/site-packages/mode/threads.py", line 264, in _serve
    2022-11-21T00:01:02.172Z nid_martens_sa_stream_processor WARNING [logger={mode.redirect}] await self._shutdown_thread()
    2022-11-21T00:01:02.172Z nid_martens_sa_stream_processor WARNING [logger={mode.redirect}] File "/opt/abc/nid-martens-sa-stream-processor/releases/20221116T163245/venv/lib64/python3.8/site-packages/mode/threads.py", line 240, in _shutdown_thread
    2022-11-21T00:01:02.173Z nid_martens_sa_stream_processor WARNING [logger={mode.redirect}] await self.on_thread_stop()
    2022-11-21T00:01:02.173Z nid_martens_sa_stream_processor WARNING [logger={mode.redirect}] File "/opt/abc/nid-martens-sa-stream-processor/releases/20221116T163245/venv/lib64/python3.8/site-packages/faust/transport/drivers/aiokafka.py", line 309, in on_thread_stop
    2022-11-21T00:01:02.173Z nid_martens_sa_stream_processor WARNING [logger={mode.redirect}] await self._consumer.stop()
    2022-11-21T00:01:02.173Z nid_martens_sa_stream_processor WARNING [logger={mode.redirect}] File "/opt/abc/nid-martens-sa-stream-processor/releases/20221116T163245/venv/lib64/python3.8/site-packages/aiokafka/consumer/consumer.py", line 503, in stop
    2022-11-21T00:01:02.173Z nid_martens_sa_stream_processor WARNING [logger={mode.redirect}] await self._fetcher.close()
    2022-11-21T00:01:02.174Z nid_martens_sa_stream_processor WARNING [logger={mode.redirect}] File "/opt/abc/nid-martens-sa-stream-processor/releases/20221116T163245/venv/lib64/python3.8/site-packages/aiokafka/consumer/fetcher.py", line 429, in close
    2022-11-21T00:01:02.174Z nid_martens_sa_stream_processor WARNING [logger={mode.redirect}] await self._fetch_task
    2022-11-21T00:01:02.174Z nid_martens_sa_stream_processor WARNING [logger={mode.redirect}] File "/opt/abc/nid-martens-sa-stream-processor/releases/20221116T163245/venv/lib64/python3.8/site-packages/aiokafka/consumer/fetcher.py", line 557, in _fetch_requests_routine
    2022-11-21T00:01:02.174Z nid_martens_sa_stream_processor WARNING [logger={mode.redirect}] await self.close()
    2022-11-21T00:01:02.175Z nid_martens_sa_stream_processor WARNING [logger={mode.redirect}] File "/opt/abc/nid-martens-sa-stream-processor/releases/20221116T163245/venv/lib64/python3.8/site-packages/aiokafka/consumer/fetcher.py", line 439, in close
    2022-11-21T00:01:02.175Z nid_martens_sa_stream_processor WARNING [logger={mode.redirect}] await x
    2022-11-21T00:01:02.175Z nid_martens_sa_stream_processor WARNING [logger={mode.redirect}] File "/opt/abc/nid-martens-sa-stream-processor/releases/20221116T163245/venv/lib64/python3.8/site-packages/aiokafka/consumer/fetcher.py", line 547, in _fetch_requests_routine
    2022-11-21T00:01:02.175Z nid_martens_sa_stream_processor WARNING [logger={mode.redirect}] has_new_data = any(fut.result() for fut in done_pending)
    2022-11-21T00:01:02.175Z nid_martens_sa_stream_processor WARNING [logger={mode.redirect}] File "/opt/abc/nid-martens-sa-stream-processor/releases/20221116T163245/venv/lib64/python3.8/site-packages/aiokafka/consumer/fetcher.py", line 547, in <genexpr>
    2022-11-21T00:01:02.176Z nid_martens_sa_stream_processor WARNING [logger={mode.redirect}] has_new_data = any(fut.result() for fut in done_pending)
    2022-11-21T00:01:02.176Z nid_martens_sa_stream_processor WARNING [logger={mode.redirect}] File "/opt/abc/nid-martens-sa-stream-processor/releases/20221116T163245/venv/lib64/python3.8/site-packages/aiokafka/consumer/fetcher.py", line 681, in _proc_fetch_request
    2022-11-21T00:01:02.176Z nid_martens_sa_stream_processor WARNING [logger={mode.redirect}] time_response = monotonic()
    2022-11-21T00:01:02.176Z nid_martens_sa_stream_processor WARNING [logger={mode.redirect}] SystemError
    2022-11-21T00:01:02.176Z nid_martens_sa_stream_processor WARNING [logger={mode.redirect}] :
    2022-11-21T00:01:02.177Z nid_martens_sa_stream_processor WARNING [logger={mode.redirect}] <built-in function monotonic> returned a result with an error set
    2022-11-21T00:06:02.165Z nid_martens_sa_stream_processor WARNING [logger={faust.transport.drivers.aiokafka}] [^--Consumer]: Warning: Task timed out!
    2022-11-21T00:06:02.166Z nid_martens_sa_stream_processor WARNING [logger={faust.transport.drivers.aiokafka}] [^--Consumer]: Please make sure it's hanging before restart.
    2022-11-21T00:06:02.166Z nid_martens_sa_stream_processor INFO [logger={faust.transport.drivers.aiokafka}] [^--Consumer]: [Flight Recorder-2454] (started at Mon Nov 21 00:01:02 2022) Replaying logs...
    2022-11-21T00:06:02.166Z nid_martens_sa_stream_processor INFO [logger={faust.transport.drivers.aiokafka}] [^--Consumer]: [Flight Recorder-2454] (Mon Nov 21 00:01:02 2022) +consumer.commit()
    2022-11-21T00:06:02.167Z nid_martens_sa_stream_processor INFO [logger={faust.transport.drivers.aiokafka}] [^--Consumer]: [Flight Recorder-2454] -End of log-
    2022-11-21T00:06:02.167Z nid_martens_sa_stream_processor INFO [logger={faust.transport.drivers.aiokafka}] [^--Consumer]: [Flight Recorder-2454] Task traceback
    2022-11-21T00:06:02.168Z nid_martens_sa_stream_processor INFO [logger={faust.transport.drivers.aiokafka}] [^--Consumer]: Stack for <Task pending name='Task-2717096' coro=<Service.stop() running at /opt/abc/nid-martens-sa-stream-processor/releases/20221116T163245/venv/lib64/python3.8/site-packages/mode/services.py:839> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x7fc9effbdac0>()]> cb=[shield.<locals>._inner_done_callback() at /opt/rh/rh-python38/root/usr/lib64/python3.8/asyncio/tasks.py:885]> (most recent call last):
      File "/opt/abc/nid-martens-sa-stream-processor/releases/20221116T163245/venv/lib64/python3.8/site-packages/mode/services.py", line 839, in stop
        await self.on_stop()
      File "/opt/abc/nid-martens-sa-stream-processor/releases/20221116T163245/venv/lib64/python3.8/site-packages/faust/app/base.py", line 1467, in on_stop
        await self._stop_consumer()
      File "/opt/abc/nid-martens-sa-stream-processor/releases/20221116T163245/venv/lib64/python3.8/site-packages/faust/app/base.py", line 1493, in _stop_consumer
        await self._consumer_wait_empty(consumer, self.log)
      File "/opt/abc/nid-martens-sa-stream-processor/releases/20221116T163245/venv/lib64/python3.8/site-packages/faust/app/base.py", line 1499, in _consumer_wait_empty
        await consumer.wait_empty()
      File "/opt/abc/nid-martens-sa-stream-processor/releases/20221116T163245/venv/lib64/python3.8/site-packages/mode/services.py", line 459, in _and_transition
        return await fun(self, *args, **kwargs)
      File "/opt/abc/nid-martens-sa-stream-processor/releases/20221116T163245/venv/lib64/python3.8/site-packages/faust/transport/consumer.py", line 757, in wait_empty
        await T(self.commit_and_end_transactions)()
      File "/opt/abc/nid-martens-sa-stream-processor/releases/20221116T163245/venv/lib64/python3.8/site-packages/faust/transport/consumer.py", line 768, in commit_and_end_transactions
        await self.commit(start_new_transaction=False)
      File "/opt/abc/nid-martens-sa-stream-processor/releases/20221116T163245/venv/lib64/python3.8/site-packages/faust/transport/consumer.py", line 822, in commit
        return await self.force_commit(
      File "/opt/abc/nid-martens-sa-stream-processor/releases/20221116T163245/venv/lib64/python3.8/site-packages/mode/services.py", line 459, in _and_transition
        return await fun(self, *args, **kwargs)
      File "/opt/abc/nid-martens-sa-stream-processor/releases/20221116T163245/venv/lib64/python3.8/site-packages/faust/transport/consumer.py", line 857, in force_commit
        did_commit = await self._commit_tps(
      File "/opt/abc/nid-martens-sa-stream-processor/releases/20221116T163245/venv/lib64/python3.8/site-packages/faust/transport/consumer.py", line 874, in _commit_tps
        return await self._commit_offsets(
      File "/opt/abc/nid-martens-sa-stream-processor/releases/20221116T163245/venv/lib64/python3.8/site-packages/faust/transport/consumer.py", line 943, in _commit_offsets
        did_commit = await self._commit(committable_offsets)
      File "/opt/abc/nid-martens-sa-stream-processor/releases/20221116T163245/venv/lib64/python3.8/site-packages/faust/transport/consumer.py", line 1311, in _commit
        return await self._thread.commit(offsets)
      File "/opt/abc/nid-martens-sa-stream-processor/releases/20221116T163245/venv/lib64/python3.8/site-packages/faust/transport/drivers/aiokafka.py", line 523, in commit
        return await self.call_thread(self._commit, offsets)
      File "/opt/abc/nid-martens-sa-stream-processor/releases/20221116T163245/venv/lib64/python3.8/site-packages/mode/threads.py", line 436, in call_thread
        result = await promise
    
    

    Versions

    • Python version: 3.8.11
    • Faust version: 1.10.4
    • Operating system: Linux 3.10.0-693.el7.x86_64
    • Kafka version: 2.6.0
    • aiohttp version: 3.8.3
    opened by lachuta 0
  • Couldn't set Retry Topic mechanism In Faust Streaming

    Couldn't set Retry Topic mechanism In Faust Streaming

    Checklist

    • [ X] I have included information about relevant versions
    • [ X] I have verified that the issue persists when using the master branch of Faust.

    Hello all, I’m using faust-streaming with fastAPI. I have two topics

    • main_topic
    • retry_topic

    I want if the logic fails in main_topic it should call retry_topic and if that fails so there should be max 3 retries that it should do. I’ve tried using sink in faust streaming what it does is that it yield result from my main topic to retry_topic but I’m still not able to limit it to 3 retries.

    Is there a way to do that in faust/Kafka streaming I know that celery has this feature?

    There’s StackOverflow URL feel free to answer there as well.. https://stackoverflow.com/questions/74360650/faust-streaming-retry-topic-mechanism

    I know it's not an appropriate channel to ask this here, but I couldn't find the slack URL working on the faust website.

    opened by rawheel 0
  • Dagger - A Faust Based Workflow Engine

    Dagger - A Faust Based Workflow Engine

    Building on the concepts of faust, we built out a micro-service orchestration/workflow engine - Dagger. Dagger operates at Faust scale to handle millions of long running tasks. It works similar to Java based workflow engines like Cadence or Camunda but integrates directly with Kafka

    opened by patkivikram 0
  • A Faust Alternative - Fluvii

    A Faust Alternative - Fluvii

    Hi everyone!

    For those looking for a similar alternative to Faust (shout out to the fork for all their hard work), our team at Red Hat has made another open source python client library called Fluvii which you can check out here.

    Without rehashing too much of what's in the README, it basically is a very light transactional client library built on the confluent-kafka-python client with the capability to have state stores (tables). There's a lot more detail on the repo, so check it out...just wanted to start getting the word out!

    Thanks everyone!

    opened by truthordata 0
  • [doc] specifying language

    [doc] specifying language

    Note: Before submitting this pull request, please review our contributing guidelines.

    Description

    specifying language used on code blocks

    NOTE: All patches should be made against master, not a maintenance branch like 3.1, 2.5, etc. That is unless the bug is already fixed in master, but not in that version series.

    If it fixes a bug or resolves a feature request, be sure to link to that issue via (Fixes #4412) for example.

    opened by sixwaaaay 0
Releases(1.0.10d3)
Owner
Robinhood
Robinhood
A lightweight python module for building event driven distributed systems

Eventify A lightweight python module for building event driven distributed systems. Installation pip install eventify Problem Developers need a easy a

Eventify 16 Aug 18, 2022
Framework and Library for Distributed Online Machine Learning

Jubatus The Jubatus library is an online machine learning framework which runs in distributed environment. See http://jubat.us/ for details. Quick Sta

Jubatus 701 Nov 29, 2022
Privacy enhanced BitTorrent client with P2P content discovery

Tribler Towards making Bittorrent anonymous and impossible to shut down. We use our own dedicated Tor-like network for anonymous torrent downloading.

4.2k Dec 31, 2022
Run MapReduce jobs on Hadoop or Amazon Web Services

mrjob: the Python MapReduce library mrjob is a Python 2.7/3.4+ package that helps you write and run Hadoop Streaming jobs. Stable version (v0.7.4) doc

Yelp.com 2.6k Dec 22, 2022
Distributed-systems-algos - Distributed Systems Algorithms For Python

Distributed Systems Algorithms ISIS algorithm In an asynchronous system that kee

Tony Joo 2 Nov 30, 2022
Distributed Synchronization for Python

Distributed Synchronization for Python Tutti is a nearly drop-in replacement for python's built-in synchronization primitives that lets you fearlessly

Hamilton Kibbe 4 Jul 07, 2022
ZeroNet - Decentralized websites using Bitcoin crypto and BitTorrent network

ZeroNet Decentralized websites using Bitcoin crypto and the BitTorrent network - https://zeronet.io / onion Why? We believe in open, free, and uncenso

ZeroNet 17.8k Jan 03, 2023
An distributed automation framework.

Automation Kit Repository Welcome to the Automation Kit repository! Note: This package is progressing quickly but is not yet ready for full production

Automation Mojo 3 Nov 03, 2022
蓝鲸基础计算平台(BK-BASE)是一个专注于运维领域的的基础平台,打造一站式、低门槛的基础服务

蓝鲸基础计算平台(BK-BASE)是一个专注于运维领域的的基础平台,打造一站式、低门槛的基础服务。通过简化运维数据的收集、获取,提升数据开发效率,辅助运维人员实时运维决策,助力企业运营体系数字化、智能化转型。

Tencent 80 Dec 16, 2022
Luigi is a Python module that helps you build complex pipelines of batch jobs. It handles dependency resolution, workflow management, visualization etc. It also comes with Hadoop support built in.

Luigi is a Python (3.6, 3.7 tested) package that helps you build complex pipelines of batch jobs. It handles dependency resolution, workflow managemen

Spotify 16.2k Jan 01, 2023
Run Python in Apache Storm topologies. Pythonic API, CLI tooling, and a topology DSL.

Streamparse lets you run Python code against real-time streams of data via Apache Storm. With streamparse you can create Storm bolts and spouts in Pyt

Parsely, Inc. 1.5k Dec 22, 2022
PArallel Distributed Deep LEarning: Machine Learning Framework from Industrial Practice (『飞桨』核心框架,深度学习&机器学习高性能单机、分布式训练和跨平台部署)

English | 简体中文 Welcome to the PaddlePaddle GitHub. PaddlePaddle, as the only independent R&D deep learning platform in China, has been officially open

19.4k Dec 30, 2022
Distributed training framework for TensorFlow, Keras, PyTorch, and Apache MXNet.

Horovod Horovod is a distributed deep learning training framework for TensorFlow, Keras, PyTorch, and Apache MXNet. The goal of Horovod is to make dis

Horovod 12.9k Dec 29, 2022
Bittorrent software for cats

NyaaV2 Setting up for development This project uses Python 3.7. There are features used that do not exist in 3.6, so make sure to use Python 3.7. This

3k Dec 30, 2022
Deluge BitTorrent client - Git mirror, PRs only

Deluge is a BitTorrent client that utilizes a daemon/client model. It has various user interfaces available such as the GTK-UI, Web-UI and a Console-UI. It uses libtorrent at it's core to handle the

Deluge team 1.3k Jan 07, 2023
PowerGym is a Gym-like environment for Volt-Var control in power distribution systems.

Overview PowerGym is a Gym-like environment for Volt-Var control in power distribution systems. The Volt-Var control targets minimizing voltage violat

Siemens 44 Jan 01, 2023
Microsoft Distributed Machine Learning Toolkit

DMTK Distributed Machine Learning Toolkit https://www.dmtk.io Please open issues in the project below. For any technical support email to

Microsoft 2.8k Nov 19, 2022
Ray provides a simple, universal API for building distributed applications.

An open source framework that provides a simple, universal API for building distributed applications. Ray is packaged with RLlib, a scalable reinforcement learning library, and Tune, a scalable hyper

23.5k Jan 05, 2023
An open source framework that provides a simple, universal API for building distributed applications. Ray is packaged with RLlib, a scalable reinforcement learning library, and Tune, a scalable hyperparameter tuning library.

Ray provides a simple, universal API for building distributed applications. Ray is packaged with the following libraries for accelerating machine lear

23.2k Dec 30, 2022
Python Stream Processing

Python Stream Processing Version: 1.10.4 Web: http://faust.readthedocs.io/ Download: http://pypi.org/project/faust Source: http://github.com/robinhood

Robinhood 6.4k Jan 07, 2023