A library for composing asynchronous and event-based programs using observable collections and query operator functions in Python

RxPY v3.0

For v1.X please go to the v1 branch.

RxPY v3.x runs on Python 3.6 or above. To install RxPY:

pip3 install rx

About ReactiveX

Reactive Extensions for Python (RxPY) is a set of libraries for composing asynchronous and event-based programs using observable sequences and pipable query operators in Python. Using Rx, developers represent asynchronous data streams with Observables, query asynchronous data streams using operators, and parameterize concurrency in data/event streams using Schedulers.

= 5) ) composed.subscribe(lambda value: print("Received {0}".format(value)))">
import rx
from rx import operators as ops

source = rx.of("Alpha", "Beta", "Gamma", "Delta", "Epsilon")

composed = source.pipe(
    ops.map(lambda s: len(s)),
    ops.filter(lambda i: i >= 5)
composed.subscribe(lambda value: print("Received {0}".format(value)))

Learning RxPY

Read the documentation to learn the principles of RxPY and get the complete reference of the available operators.

If you need to migrate code from RxPY v1.x, read the migration section.

There is also a list of third party documentation available here.


Join the conversation on Slack!

The gracious folks at PySlackers have given us a home in the #rxpy Slack channel. Please join us there for questions, conversations, and all things related to RxPy.

To join, navigate the page above to receive an email invite. After signing up, join us in the #rxpy channel.

Please follow the community guidelines and terms of service.

Differences from .NET and RxJS

RxPY is a fairly complete implementation of Rx with more than 120 operators, and over 1300 passing unit-tests. RxPY is mostly a direct port of RxJS, but also borrows a bit from RxNET and RxJava in terms of threading and blocking operators.

RxPY follows PEP 8, so all function and method names are lowercase with words separated by underscores as necessary to improve readability.

Thus .NET code such as:

var group = source.GroupBy(i => i % 3);

need to be written with an _ in Python:

group = source.pipe(ops.group_by(lambda i: i % 3))

With RxPY you should use named keyword arguments instead of positional arguments when an operator has multiple optional arguments. RxPY will not try to detect which arguments you are giving to the operator (or not).

    Curry flip

    Use curry flip on buffer_with_count Propose some test cleanup (more readable) Propose first draft for documentation on curry-flip usage Propose (unnecessary?) typing overload for map_ when no mapper is provided Hint on contributing - what checks to run

    opened by matiboy 1
  • [WIP] Refactor using curry_flip from Expression library

    [WIP] Refactor using curry_flip from Expression library

    WIP branch for refactoring RxPY to use curry_flip from the Expression library. This will remove a lot of boilerplate code and make the operator code more readable (no nested functions).

    opened by dbrattli 1
  • [Question] shiping data between asyncio and rxpy

    [Question] shiping data between asyncio and rxpy

    My goal is to constantly switch between rxpy and asyncio, so that they are connected together and the data stream goes through the entire chain.

    In my project (an automation command line tool for Twitter), the initial data flows to the asynchronous HTTP client as an Observable or a normal list, and the client response flows to the processing chain as a new Observable, where there may be a need to request the asynchronous HTTP client again... https://github.com/boholder/puntgun/issues/17 Therefore the data stream is constantly switched between the two frameworks for processing. For example, the data flow will goes like:

    usernames: list[str] -> **async client** -> list[User] -> Observable[User]  -> (1)
    (1) -> filters -> Observable[decision] (2)
    (1) -> filters that need to query API -> **async client** -> Observable[decision] (2)
    (2) -> actions -> **async client** -> exit

    After searching, I learned that:

    1. Sadly the aioreactive doesn't meet my needs, there are several operators (share(), buffer_with_count()...) I already use that it doesn't implement. https://github.com/dbrattli/aioreactive

    2. It is impossible to transfer data to an asynchronous function via an operator, because you can't call an async function in on_next(). https://github.com/ReactiveX/RxPY/issues/649#issuecomment-1159512451

    3. But we can store the data at somewhere else (an asyncio.Queue) , writing a custom async operator which runs an infinite loop, to perform the async work, we can even add more operators to process the async work's result. https://github.com/ReactiveX/RxPY/issues/592#issue-1105185161 https://github.com/ReactiveX/RxPY/issues/571#issue-878054692 https://blog.oakbits.com/rxpy-and-asyncio.html

    I experimented a bit based on this friendly answer, but there are still something where I'm not sure how to implement:

    1. When we use asyncio at the same time, can rxpy guarantee to execute all operators on all data till observer before exiting?

    2. (asyncio related) The logic of putting data in the "first segment" chain into the "second segment" chain cannot wait for the Future, and some tasks in the "second" chain are canceled before they are executed. How do I wait for all tasks to finish before exit? (A search shows that asyncio.gather() only cares about the coroutine being passed in, so manual control of the event loop might solve the problem?)

    3. (still asyncio related) asyncio needs to manually string coroutines together using await (call add()), is there a way to automatically pass the data stream from rxpy into the asyncio.queue? (still have the problem of not being able to await Future) Is this example a solution? https://github.com/ReactiveX/RxPY/blob/master/examples/asyncio/toasyncgenerator.py

    Thanks for reading my question.

    import asyncio
    import time
    from collections import namedtuple
    import reactivex as rx
    from reactivex import operators as op
    from reactivex.disposable import Disposable
    from reactivex.scheduler.eventloop import AsyncIOScheduler
    from reactivex.subject import Subject
    start = time.time()
    def ts():
        return f"{time.time() - start:.3f}"
    first_subject = Subject()
    first_async_action = Subject()
    second_subject = Subject()
    Data = namedtuple("Data", ["api", "param", "future"])
    async def async_calling_api(data: Data):
        """Some async processing, like sending/writing data."""
        print(f"{ts()} [A]sync action started  api:{data.api} param:{data.param}")
        # process the data with async function
        await asyncio.sleep(ACTION_DURATION)
        print(f"{ts()} [A]sync action finished api:{data.api} param:{data.param}")
        # process finished, return the response
        return f"[{data.param}]"
    def serialize_map_async(mapper):
        def _serialize_map_async(source):
            def on_subscribe(observer, scheduler):
                # separate different api callings into different task queues
                queues = {k: asyncio.Queue() for k in range(0, 3)}
                async def infinite_loop(q: asyncio.Queue[Data]):
                        while True:
                            data = await q.get()
                            resp = await mapper(data)
                    except Exception as e:
                def on_next(data: Data):
                    # take data from upstream ( calls on subject.on_next() trigger it )
                    # synchronous -> asynchronous by putting elements into queue
                    except Exception as e:
                tasks = [asyncio.create_task(infinite_loop(q)) for q in queues.values()]
                d = source.subscribe(
                def dispose():
                    [task.cancel() for task in tasks]
                return Disposable(dispose)
            return rx.create(on_subscribe)
        return _serialize_map_async
    async def setup():
        loop = asyncio.get_event_loop()
            # The futures created here was not waited for, so it was not added to asyncio's chain,
            # resulting in the following gather only guaranteeing all the tasks of the first level,
            # and some second level task was canceled before it was executed.
            op.do_action(lambda x: second_subject.on_next(Data(2, x, asyncio.Future()))),
            on_next=lambda param: print(f"{ts()} [O]bserver [1] received: {param}"),
        second_subject.pipe(serialize_map_async(async_calling_api), ).subscribe(
            on_next=lambda param: print(f"{ts()} [O]bserver [2] received: {param}"),
    async def add(api: int, param: str):
        future = asyncio.Future()
        first_subject.on_next(Data(api, param, future))
        return await future
    async def main():
        await setup()
        # I wonder if there is a way to write "await rx.from..."
        # rx.from_iterable("a", "b").pipe(
        #  op.do(await ...)
        # )
        a = await asyncio.gather(add(0, "0a"), add(0, "0b"), add(1, "1a"), add(1, "1b"), )
        print(f"---> {a}")
    0.003 [A]sync action started  api:0 param:0a
    0.003 [A]sync action started  api:1 param:1a
    1.007 [A]sync action finished api:0 param:0a
    1.007 [O]bserver [1] received: [0a]
    1.007 [A]sync action started  api:0 param:0b
    1.007 [A]sync action finished api:1 param:1a
    1.007 [O]bserver [1] received: [1a]
    1.007 [A]sync action started  api:1 param:1b
    1.007 [A]sync action started  api:2 param:[0a]
    5.021 [A]sync action finished api:0 param:0b
    2.021 [O]bserver [1] received: [0b]
    2.021 [A]sync action finished api:1 param:1b
    2.021 [O]bserver [1] received: [1b]
    2.021 [A]sync action finished api:2 param:[0a]
    2.021 [O]bserver [2] received: [[0a]]
    2.021 [A]sync action started  api:2 param:[1a]
    ---> ['[0a]', '[0b]', '[1a]', '[1b]']
    opened by boholder 1
  • How to on_next from background thread in main thread?

    How to on_next from background thread in main thread?

    Hi, I want to pass data from a background thread into the main thread. I have the following code, but it seems that CurrentThreadScheduler() is not doing what I expect.

    I'm using reactivex version 4.2.

    from reactivex import operators as ops
    from reactivex.scheduler import CurrentThreadScheduler, ThreadPoolScheduler
    from reactivex import Subject
    import threading
    from threading import Thread
    class MyThread(Thread):
        def __init__(self, callback):
            self.callback = callback
        def run(self):
    my_subject = Subject()
    def callback(data):
        ''' This is called in a separate thread '''
        print(f"In callback: {threading.current_thread().name}")
    if __name__ == "__main__":
        thread_to_execute_on = CurrentThreadScheduler()
        # thread_to_execute_on = ThreadPoolScheduler(max_workers=1)
        print(f"Before stream: {threading.current_thread().name}")
        background_thread = MyThread(callback=callback)
            lambda data: print(f"In subscription: {threading.current_thread().name}")

    The output is:

    Before stream: MainThread
    In callback: Thread-7
    In subscription: Thread-7

    I've also tried using ThreadPoolScheduler and the data is correctly passed to the threadpool thread. In that scenario, output is:

    Before stream: MainThread
    In callback: Thread-7
    In subscription: ThreadPoolExecutor-0_0

    Is there something I can use to schedule work back on the main thread? for example: ops.observe_on(MainThreadScheduler()). This seems to be quite simple to do in C# and java.

    To be clear, the output I am after is:

    Before stream: MainThread
    In callback: Thread-7
    In subscription: MainThread
    opened by alek5k 0
  • Error in the documentation ?

    Error in the documentation ?

    Describe the bug One of the first example of the Get Started documentation fails with:

    Traceback (most recent call last):
      File "main.py", line 12, in <module>
        reactivex.of("Alpha", "Beta", "Gamma", "Delta", "Epsilon").pipe(
      File "./venv/lib/python3.8/site-packages/reactivex/observable/observable.py", line 237, in pipe
        return pipe_(self, *operators)
      File "./venv/lib/python3.8/site-packages/reactivex/pipe.py", line 214, in pipe
        return compose(*fns)(__value)
      File "./venv/lib/python3.8/site-packages/reactivex/pipe.py", line 87, in _compose
        return reduce(lambda obs, op: op(obs), operators, source)
      File "./venv/lib/python3.8/site-packages/reactivex/pipe.py", line 87, in <lambda>
        return reduce(lambda obs, op: op(obs), operators, source)
    TypeError: 'Observable' object is not callable

    The next example in the list also fails in the same way.

    To Reproduce Copy the code from the Custom Operator part of the documentation into a file and run it.

    The line return rx.pipe( needs to be replaced with return reactivex.pipe( to work with the above imports.

    Expected behavior The code should run

    Code or Screenshots

    import reactivex
    from reactivex import operators as ops
    def length_more_than_5():
        return reactivex.pipe(
            ops.map(lambda s: len(s)),
            ops.filter(lambda i: i >= 5),
    reactivex.of("Alpha", "Beta", "Gamma", "Delta", "Epsilon").pipe(
    ).subscribe(lambda value: print("Received {0}".format(value)))

    Additional context

    • OS: Ubuntu 22.04
    • RxPY: 4.0.4
    • Python: 3.8.10 and 3.10.4
    PR welcome 
    opened by tlebrize 1
