RxPY - The Reactive Extensions for Python (RxPY)

Overview

The Reactive Extensions for Python (RxPY)

Documentation Status

A library for composing asynchronous and event-based programs using observable collections and query operator functions in Python

RxPY v3.0

For v1.X please go to the v1 branch.

RxPY v3.x runs on Python 3.6 or above. To install RxPY:

pip3 install rx

About ReactiveX

Reactive Extensions for Python (RxPY) is a set of libraries for composing asynchronous and event-based programs using observable sequences and pipable query operators in Python. Using Rx, developers represent asynchronous data streams with Observables, query asynchronous data streams using operators, and parameterize concurrency in data/event streams using Schedulers.

= 5) ) composed.subscribe(lambda value: print("Received {0}".format(value)))">
import rx
from rx import operators as ops

source = rx.of("Alpha", "Beta", "Gamma", "Delta", "Epsilon")

composed = source.pipe(
    ops.map(lambda s: len(s)),
    ops.filter(lambda i: i >= 5)
)
composed.subscribe(lambda value: print("Received {0}".format(value)))

Learning RxPY

Read the documentation to learn the principles of RxPY and get the complete reference of the available operators.

If you need to migrate code from RxPY v1.x, read the migration section.

There is also a list of third party documentation available here.

Community

Join the conversation on Slack!

The gracious folks at PySlackers have given us a home in the #rxpy Slack channel. Please join us there for questions, conversations, and all things related to RxPy.

To join, navigate the page above to receive an email invite. After signing up, join us in the #rxpy channel.

Please follow the community guidelines and terms of service.

Differences from .NET and RxJS

RxPY is a fairly complete implementation of Rx with more than 120 operators, and over 1300 passing unit-tests. RxPY is mostly a direct port of RxJS, but also borrows a bit from RxNET and RxJava in terms of threading and blocking operators.

RxPY follows PEP 8, so all function and method names are lowercase with words separated by underscores as necessary to improve readability.

Thus .NET code such as:

var group = source.GroupBy(i => i % 3);

need to be written with an _ in Python:

group = source.pipe(ops.group_by(lambda i: i % 3))

With RxPY you should use named keyword arguments instead of positional arguments when an operator has multiple optional arguments. RxPY will not try to detect which arguments you are giving to the operator (or not).

Comments
  • RxPY 3.0

    RxPY 3.0

    Currently working on RxPY 3.0 which is heavily inspired by RxJS 6.0. Need help with fixing things after code refactoring. New features:

    • Operators are plain and pipable functions
    • New pipe operator
    • No more method chaining (see timeflies_tkinter.py for working example)
    • Code base reduced. With pipable operators it's very easy to create your own operators
    • Backpressure removed. Can be made as extension if anyone wants to care for it.
    • Result mappers (result_mapper) will be removed to make implementation simpler.

    What remains needs to be done is:

    • Unit tests (currently only test_map and test_filter is working). Needs to be updated to new pipeline style.
    • Documentation. The current README.md is bloated. Needs to be simplified.
    • Add lazy loading for the rest of the operators in rx/operators/__init__.py.
    • Add lazy loading for the rest of the static creation operators in rx/.__init__.py.

    PRs are very much welcome to the feature/rxpy-3.0 branch.

    enhancement PR welcome 
    opened by dbrattli 32
  • fix skip import in PyQt5/PySide2 tests + typos

    fix skip import in PyQt5/PySide2 tests + typos

    When running pytest locally, I'm experiencing some weird behavior with PyQt5 & Pyside2 successive tests and the skip mechanism. I assume this is due to some conflict with the skip variable declared at the module level.

    Anyway, this PR replaces skip variable by pytest.importorskip function instead (as in other mainloop scheduler tests), plus replaces 'qt5' by 'pyqt5' to be consistent with 'pyside2'.

    opened by jcafhe 27
  • Enforce CurrentThreadScheduler to run on a dedicated thread?

    Enforce CurrentThreadScheduler to run on a dedicated thread?

    I have two concerns about the rx.concurrency.currentthreadscheduler.CurrentThreadScheduler class implementation. Both relate to the __new__ method:

        def __new__(cls) -> 'CurrentThreadScheduler':
            """Ensure that each thread has at most a single instance."""
    
            thread = threading.current_thread()
            self: 'CurrentThreadScheduler' = CurrentThreadScheduler._global.get(thread)
            if not self:
                self = super().__new__(cls)
                CurrentThreadScheduler._global[thread] = self
            return self
    

    1. Is a CurrentThreadScheduler always supposed to schedule actions on the same thread? In my opinion, the CurrentThreadScheduler is attached to some thread once the trampoline is run for the first time or "reactivated".

    2. When I tried to extend from CurrentThreadScheduler in rxbackpressure I couldn't do it because the __new__ method would always return a CurrentThreadScheduler.

    class MyCurrentThreadScheduler(SchedulerMixin, CurrentThreadScheduler):
        pass
    
    my_current_thread_scheduler = MyCurrentThreadScheduler()
    print(type(my_current_thread_scheduler))     # would print CurrentThreadScheduler
    
    opened by MichaelSchneeberger 27
  • RxPY v3 Remaining Work

    RxPY v3 Remaining Work

    This issue is for tracking the remaining work for RxPY v3.

    • [x] Fix type hints for pipe #355
    • [x] Disposables. Have been removed for RxJS 6. I think we should keep disposables, but make them simpler and remove the static Disposable object with methods and only have the simpler creation functions like create, empty etc.
    • [x] Move all observer files to core/observer.
    • [x] Fix testing/marbles.py.
    • [x] Lazy load schedulers. We have many schedulers that are not being used. Perhaps eventloop schedulers should be imported explicitly (full path) so they don't add to startup time
    • [x] Remove (result) mappers from combine_latest and with_latest_from.
    • [x] Add a starmap operator based on itertools for use with combine_latest et al.
    • [x] Remove BlockingObservable and methods and provide a blocking run() method instead.
    • [x] Remove all first arguments that accept Iterable[Observable] for all operators. Use *args instead.

    Later, i.e v3.1

    • Merge internal and core to internal similar to RxJS.
    • Making Observables of type Generic[T] instead of Generic[Any]. The problem is what to do with vararg operators such as zip, combine_latest etc.
    opened by dbrattli 26
  • Artifacts exposed from `core/__init__.py`

    Artifacts exposed from `core/__init__.py`

    Sorry if this is a silly question, but I am slightly confused as to what artifacts are being exposed from core/__init__.py. In some cases these are typings (such as Disposable, Scheduler, Observer) and in other cases these are base implementations (e.g. Observable) which extend the typings.

    Would it be better to consistently expose base implementations here, which are presumably more directly relevant for users, and keep the typings only in the package of the same name, for (mostly) internal use?

    opened by erikkemperman 23
  • Marbles and Marble Diagrams related code didn't work

    Marbles and Marble Diagrams related code didn't work

    I can't get proper result with all marbles and marble Diagrams related code in Getting Started.ipynb.

    In [8]:

    from rx.testing import marbles

    xs = Observable.from_marbles("a-b-c-|") xs.to_blocking().to_marbles()

    Out[8]:

    'a-b-c-|'

    but my output is: ''

    In [9]:

    xs = Observable.from_marbles("1-2-3-x-5") ys = Observable.from_marbles("1-2-3-4-5") xs.merge(ys).to_blocking().to_marbles()

    Out[9]:

    '11-22-33-4x'

    but my output is: ''

    I get none output about all these two marbles and marble diagrams related sample code! My Python is 3.4.4 (v3.4.4:737efcadf5a6, Dec 20 2015, 20:20:57) [MSC v.1600 64 bit (AMD64)] on win32 and my RxPY is Rx (1.5.2). Can anyone help me?

    opened by luaNewer 20
  • Release v3

    Release v3

    I feel it's time to get v3 released. Any last minute changes that need to go in first?

    PS: It started with a tweet: https://twitter.com/mainro/status/1072965958921277441

    opened by dbrattli 18
  • RxPY 3.0: Documentation

    RxPY 3.0: Documentation

    With v3, an official documentation should be provided. This issue can be used to follow the progress on this topic. The following things must be done

    • [X] create outline
    • [X] migrate part of readme in doc
    • [X] reference doc from readme
    • [X] add a migration section
    • [X] document APIs : autodoc for all operators and classes
    • [X] choose a theme : Guzzle.
    • [x] publish the doc somewhere: https://rxpy.readthedocs.io
    • [x] add marble diagrams for operators
    • [X] add additional readings section
    • [X] license section
    • [X] contributing section
    • [X] Fix docstring errors in init.py
    • [x] spellcheck
    • [ ] final pass on each section
    documentation 
    opened by MainRo 18
  • Questions about concurrency

    Questions about concurrency

    Hey Dag, I have a few final questions before I finalize my material for the video shoot on Tuesday. Thanks again for all your responsiveness and help. I think RxPy is going to be helpful for a lot of folks I am speaking to.

    QUESTION 1:

    In Java Concurrency in Practice, Goetz says that for computation-intensive tasks, a roughly optimal thread pool size is (# of CPU Cores) + 1. I am guessing that when creating a ThreadPoolScheduler to support computation-intensive processes, this rule-of-thumb is still valid?

    # calculate number of CPU's and add 1, then create a ThreadPoolScheduler with that number of threads
    optimal_thread_count = multiprocessing.cpu_count() + 1
    pool_scheduler = ThreadPoolScheduler(optimal_thread_count)
    

    QUESTION 2:

    Coming from RxJava, I noticed that subscribe_on() will take one thread from a ThreadPoolScheduler as expected.

    from rx import Observable
    from rx.concurrency import ThreadPoolScheduler
    from threading import current_thread
    import multiprocessing, time, random
    
    
    def intense_calculation(value):
        # sleep for a random short duration between 0.5 to 2.0 seconds to simulate a long-running calculation
        time.sleep(random.randint(5,20) * .1)
        return value
    
    # calculate number of CPU's and add 1, then create a ThreadPoolScheduler with that number of threads
    optimal_thread_count = multiprocessing.cpu_count() + 1
    pool_scheduler = ThreadPoolScheduler(optimal_thread_count)
    
    # Create Process 1
    Observable.from_(["Alpha","Beta","Gamma","Delta","Epsilon"]) \
        .subscribe_on(pool_scheduler) \
        .map(lambda s: intense_calculation(s)) \
        .subscribe(on_next=lambda i: print("PROCESS 1: {0} {1}".format(current_thread().name, i)),
                   on_error=lambda e: print(e),
                   on_completed=lambda: print("PROCESS 1 done!"))
    
    input("Press any key to quit\n")
    

    OUTPUT:

    Press any key to quit
    PROCESS 1: Thread-1 Alpha
    PROCESS 1: Thread-1 Beta
    PROCESS 1: Thread-1 Gamma
    PROCESS 1: Thread-1 Delta
    PROCESS 1: Thread-1 Epsilon
    PROCESS 1 done!
    

    However, observe_on() behaves quite differently in how it claims threads. I understand that observe_on() will redirect emissions to a different scheduler at that point in the Observable chain, but I noticed it may use a different thread for each emission.

    from rx import Observable
    from rx.concurrency import ThreadPoolScheduler
    from threading import current_thread
    import multiprocessing, time, random
    
    
    def intense_calculation(value):
        # sleep for a random short duration between 0.5 to 2.0 seconds to simulate a long-running calculation
        time.sleep(random.randint(5,20) * .1)
        return value
    
    # calculate number of CPU's and add 1, then create a ThreadPoolScheduler with that number of threads
    optimal_thread_count = multiprocessing.cpu_count() + 1
    pool_scheduler = ThreadPoolScheduler(optimal_thread_count)
    
    # Create Process 1
    Observable.from_(["Alpha","Beta","Gamma","Delta","Epsilon"]) \
        .observe_on(pool_scheduler) \
        .map(lambda s: intense_calculation(s)) \
        .subscribe(on_next=lambda i: print("PROCESS 1: {0} {1}".format(current_thread().name, i)),
                   on_error=lambda e: print(e),
                   on_completed=lambda: print("PROCESS 1 done!"))
    
    input("Press any key to quit\n")
    

    OUTPUT:

    Press any key to quit
    PROCESS 1: Thread-1 Alpha
    PROCESS 1: Thread-2 Beta
    PROCESS 1: Thread-1 Gamma
    PROCESS 1: Thread-3 Delta
    PROCESS 1: Thread-2 Epsilon
    PROCESS 1 done!
    

    Is this expected? In RxJava, subscribeOn() and observeOn() will both only claim one thread for each subscription. I don't see an issue with observe_on() using multiple threads for a single subscription as long as the Observable contract still being adhered to and the emissions stay serialized. Is this the behavior you intended by design?

    opened by thomasnield 14
  • Subscribe refactor

    Subscribe refactor

    Ref: https://github.com/ReactiveX/RxPY/issues/430 https://github.com/ReactiveX/RxPY/issues/293#issuecomment-457502444

    I'm a bit conflicted about submitting this -- on the one hand, it seems like too big a patch just before release, on the other hand it would change the API surface and therefore this is probably also something that could not land just after a major release...

    But personally I think something like this would improve the API, and if I understand the earlier discussion properly this might be pretty close to consensus to how things ought to be. It seems to me the main reason this wasn't attempted earlier is that it would be quite a tedious thing to do. And so it was :-)

    But I got things working, as far as I'm aware, and thus I've decided to just throw this out there, curious what people think.

    The idea is basically to adapt the principal user-facing API (Observable) to promote the "callback style" of subscribing to an observable. The "observer style" is still supported, and used internally in two ways, but only as implementation details (namely, the operators and the observable sub-class hierarchy).

    In other words, the observer style of subscribing only remains in places that could be adapted without touching the observable API.

    For what it's worth, the changes here plenty, but they are almost entirely "mechanical":

    • Get rid of polymorphism in Observable.subscribe, change the signature to only accept an observer. All invokations of subscribe that were using callbacks change to subscribe_.

    • Rename subscribe to subscribe_observer and rename subscribe_ to subscribe.

    • The observer argument to subscribe_observer does not have to be optional, if it is not given we may as well invoke the callback-style variant with no arguments. So, considering it compulsory, we can turn things around by moving Observable.subscribe_observer(observer) into Observer.subscribe_to(observable).

    The last step means that the typing / ABC for Observable now only knows about the callback style. To use the observer style, you have to first have an instance of Observer and subscribe via that instance.

    I've run the whole thing through Travis to make sure each of these steps resulted in a passing build, as evidenced by the commit history below.

    opened by erikkemperman 13
  • RxPY v3 Release It!

    RxPY v3 Release It!

    The release of v3 is set to June 1, and I would really like to get it released. What are the remaining issues or blockers?

    I don't think everything should be perfect for the release. After the release we should continue to improve RxPY and target a release v3.1 etc as soon as we get feedback on the v3 release.

    Lets get it out!

    opened by dbrattli 13
  • Curry flip

    Curry flip

    Use curry flip on buffer_with_count Propose some test cleanup (more readable) Propose first draft for documentation on curry-flip usage Propose (unnecessary?) typing overload for map_ when no mapper is provided Hint on contributing - what checks to run

    opened by matiboy 1
  • [WIP] Refactor using curry_flip from Expression library

    [WIP] Refactor using curry_flip from Expression library

    WIP branch for refactoring RxPY to use curry_flip from the Expression library. This will remove a lot of boilerplate code and make the operator code more readable (no nested functions).

    opened by dbrattli 1
  • [Question] shiping data between asyncio and rxpy

    [Question] shiping data between asyncio and rxpy

    My goal is to constantly switch between rxpy and asyncio, so that they are connected together and the data stream goes through the entire chain.

    In my project (an automation command line tool for Twitter), the initial data flows to the asynchronous HTTP client as an Observable or a normal list, and the client response flows to the processing chain as a new Observable, where there may be a need to request the asynchronous HTTP client again... https://github.com/boholder/puntgun/issues/17 Therefore the data stream is constantly switched between the two frameworks for processing. For example, the data flow will goes like:

    usernames: list[str] -> **async client** -> list[User] -> Observable[User]  -> (1)
    (1) -> filters -> Observable[decision] (2)
    (1) -> filters that need to query API -> **async client** -> Observable[decision] (2)
    (2) -> actions -> **async client** -> exit
    

    After searching, I learned that:

    1. Sadly the aioreactive doesn't meet my needs, there are several operators (share(), buffer_with_count()...) I already use that it doesn't implement. https://github.com/dbrattli/aioreactive

    2. It is impossible to transfer data to an asynchronous function via an operator, because you can't call an async function in on_next(). https://github.com/ReactiveX/RxPY/issues/649#issuecomment-1159512451

    3. But we can store the data at somewhere else (an asyncio.Queue) , writing a custom async operator which runs an infinite loop, to perform the async work, we can even add more operators to process the async work's result. https://github.com/ReactiveX/RxPY/issues/592#issue-1105185161 https://github.com/ReactiveX/RxPY/issues/571#issue-878054692 https://blog.oakbits.com/rxpy-and-asyncio.html

    I experimented a bit based on this friendly answer, but there are still something where I'm not sure how to implement:

    1. When we use asyncio at the same time, can rxpy guarantee to execute all operators on all data till observer before exiting?

    2. (asyncio related) The logic of putting data in the "first segment" chain into the "second segment" chain cannot wait for the Future, and some tasks in the "second" chain are canceled before they are executed. How do I wait for all tasks to finish before exit? (A search shows that asyncio.gather() only cares about the coroutine being passed in, so manual control of the event loop might solve the problem?)

    3. (still asyncio related) asyncio needs to manually string coroutines together using await (call add()), is there a way to automatically pass the data stream from rxpy into the asyncio.queue? (still have the problem of not being able to await Future) Is this example a solution? https://github.com/ReactiveX/RxPY/blob/master/examples/asyncio/toasyncgenerator.py

    Thanks for reading my question.

    import asyncio
    import time
    from collections import namedtuple
    
    import reactivex as rx
    from reactivex import operators as op
    from reactivex.disposable import Disposable
    from reactivex.scheduler.eventloop import AsyncIOScheduler
    from reactivex.subject import Subject
    
    start = time.time()
    
    def ts():
        return f"{time.time() - start:.3f}"
    
    ACTION_DURATION = 1.0
    
    first_subject = Subject()
    first_async_action = Subject()
    second_subject = Subject()
    
    Data = namedtuple("Data", ["api", "param", "future"])
    
    async def async_calling_api(data: Data):
        """Some async processing, like sending/writing data."""
        print(f"{ts()} [A]sync action started  api:{data.api} param:{data.param}")
        # process the data with async function
        await asyncio.sleep(ACTION_DURATION)
        print(f"{ts()} [A]sync action finished api:{data.api} param:{data.param}")
        # process finished, return the response
        return f"[{data.param}]"
    
    def serialize_map_async(mapper):
        def _serialize_map_async(source):
            def on_subscribe(observer, scheduler):
                # separate different api callings into different task queues
                queues = {k: asyncio.Queue() for k in range(0, 3)}
    
                async def infinite_loop(q: asyncio.Queue[Data]):
                    try:
                        while True:
                            data = await q.get()
                            resp = await mapper(data)
                            observer.on_next(resp)
                            data.future.set_result(resp)
                    except Exception as e:
                        observer.on_error(e)
    
                def on_next(data: Data):
                    # take data from upstream ( calls on subject.on_next() trigger it )
                    # synchronous -> asynchronous by putting elements into queue
                    try:
                        queues[data.api].put_nowait(data)
                    except Exception as e:
                        observer.on_error(e)
    
                tasks = [asyncio.create_task(infinite_loop(q)) for q in queues.values()]
    
                d = source.subscribe(
                    on_next=on_next,
                    on_error=observer.on_error,
                    on_completed=observer.on_completed,
                )
    
                def dispose():
                    d.dispose()
                    [task.cancel() for task in tasks]
    
                return Disposable(dispose)
    
            return rx.create(on_subscribe)
    
        return _serialize_map_async
    
    
    async def setup():
        loop = asyncio.get_event_loop()
        first_subject.pipe(
            serialize_map_async(async_calling_api),
            # The futures created here was not waited for, so it was not added to asyncio's chain,
            # resulting in the following gather only guaranteeing all the tasks of the first level,
            # and some second level task was canceled before it was executed.
            op.do_action(lambda x: second_subject.on_next(Data(2, x, asyncio.Future()))),
        ).subscribe(
            on_next=lambda param: print(f"{ts()} [O]bserver [1] received: {param}"),
            scheduler=AsyncIOScheduler(loop)
        )
    
        second_subject.pipe(serialize_map_async(async_calling_api), ).subscribe(
            on_next=lambda param: print(f"{ts()} [O]bserver [2] received: {param}"),
            scheduler=AsyncIOScheduler(loop)
        )
    
    async def add(api: int, param: str):
        future = asyncio.Future()
        first_subject.on_next(Data(api, param, future))
        return await future
    
    async def main():
        await setup()
        # I wonder if there is a way to write "await rx.from..."
        #
        # rx.from_iterable("a", "b").pipe(
        #  op.do(await ...)
        # )
    
        a = await asyncio.gather(add(0, "0a"), add(0, "0b"), add(1, "1a"), add(1, "1b"), )
        print(f"---> {a}")
    
    asyncio.run(main())
    
    0.003 [A]sync action started  api:0 param:0a
    0.003 [A]sync action started  api:1 param:1a
    1.007 [A]sync action finished api:0 param:0a
    1.007 [O]bserver [1] received: [0a]
    1.007 [A]sync action started  api:0 param:0b
    1.007 [A]sync action finished api:1 param:1a
    1.007 [O]bserver [1] received: [1a]
    1.007 [A]sync action started  api:1 param:1b
    1.007 [A]sync action started  api:2 param:[0a]
    5.021 [A]sync action finished api:0 param:0b
    2.021 [O]bserver [1] received: [0b]
    2.021 [A]sync action finished api:1 param:1b
    2.021 [O]bserver [1] received: [1b]
    2.021 [A]sync action finished api:2 param:[0a]
    2.021 [O]bserver [2] received: [[0a]]
    2.021 [A]sync action started  api:2 param:[1a]
    ---> ['[0a]', '[0b]', '[1a]', '[1b]']
    
    opened by boholder 1
  • How to on_next from background thread in main thread?

    How to on_next from background thread in main thread?

    Hi, I want to pass data from a background thread into the main thread. I have the following code, but it seems that CurrentThreadScheduler() is not doing what I expect.

    I'm using reactivex version 4.2.

    from reactivex import operators as ops
    from reactivex.scheduler import CurrentThreadScheduler, ThreadPoolScheduler
    from reactivex import Subject
    import threading
    from threading import Thread
    
    class MyThread(Thread):
        def __init__(self, callback):
            Thread.__init__(self)
            self.callback = callback
    
        def run(self):
            self.callback("hello")
    
    my_subject = Subject()
    
    def callback(data):
        ''' This is called in a separate thread '''
        print(f"In callback: {threading.current_thread().name}")
        my_subject.on_next(data)
    
    if __name__ == "__main__":
    
        thread_to_execute_on = CurrentThreadScheduler()
        # thread_to_execute_on = ThreadPoolScheduler(max_workers=1)
    
        print(f"Before stream: {threading.current_thread().name}")
        
        background_thread = MyThread(callback=callback)
        
        my_subject.pipe(
            ops.observe_on(thread_to_execute_on)
        ).subscribe(
            lambda data: print(f"In subscription: {threading.current_thread().name}")
        )
    
        background_thread.start()
        
        input()
    

    The output is:

    Before stream: MainThread
    In callback: Thread-7
    In subscription: Thread-7
    

    I've also tried using ThreadPoolScheduler and the data is correctly passed to the threadpool thread. In that scenario, output is:

    Before stream: MainThread
    In callback: Thread-7
    In subscription: ThreadPoolExecutor-0_0
    

    Is there something I can use to schedule work back on the main thread? for example: ops.observe_on(MainThreadScheduler()). This seems to be quite simple to do in C# and java.

    To be clear, the output I am after is:

    Before stream: MainThread
    In callback: Thread-7
    In subscription: MainThread
    
    opened by alek5k 0
  • Error in the documentation ?

    Error in the documentation ?

    Describe the bug One of the first example of the Get Started documentation fails with:

    Traceback (most recent call last):
      File "main.py", line 12, in <module>
        reactivex.of("Alpha", "Beta", "Gamma", "Delta", "Epsilon").pipe(
      File "./venv/lib/python3.8/site-packages/reactivex/observable/observable.py", line 237, in pipe
        return pipe_(self, *operators)
      File "./venv/lib/python3.8/site-packages/reactivex/pipe.py", line 214, in pipe
        return compose(*fns)(__value)
      File "./venv/lib/python3.8/site-packages/reactivex/pipe.py", line 87, in _compose
        return reduce(lambda obs, op: op(obs), operators, source)
      File "./venv/lib/python3.8/site-packages/reactivex/pipe.py", line 87, in <lambda>
        return reduce(lambda obs, op: op(obs), operators, source)
    TypeError: 'Observable' object is not callable
    

    The next example in the list also fails in the same way.

    To Reproduce Copy the code from the Custom Operator part of the documentation into a file and run it.

    The line return rx.pipe( needs to be replaced with return reactivex.pipe( to work with the above imports.

    Expected behavior The code should run

    Code or Screenshots

    import reactivex
    from reactivex import operators as ops
    
    def length_more_than_5():
        return reactivex.pipe(
            ops.map(lambda s: len(s)),
            ops.filter(lambda i: i >= 5),
        )
    
    reactivex.of("Alpha", "Beta", "Gamma", "Delta", "Epsilon").pipe(
        length_more_than_5()
    ).subscribe(lambda value: print("Received {0}".format(value)))
    

    Additional context

    • OS: Ubuntu 22.04
    • RxPY: 4.0.4
    • Python: 3.8.10 and 3.10.4
    PR welcome 
    opened by tlebrize 1
Releases(v4.0.4)
  • v4.0.4(Jul 16, 2022)

    What's Changed

    • Allow any python in the 3.x series >=3.7 by @mmv in https://github.com/ReactiveX/RxPY/pull/656

    New Contributors

    • @mmv made their first contribution in https://github.com/ReactiveX/RxPY/pull/656

    Full Changelog: https://github.com/ReactiveX/RxPY/compare/v4.0.3...v4.0.4

    Source code(tar.gz)
    Source code(zip)
  • v4.0.3(Jul 2, 2022)

    What's Changed

    • fix: typing fixes for Pyright 254 by @dbrattli in https://github.com/ReactiveX/RxPY/pull/650
    • Make _T in Observable[_T] covariant by @Azureblade3808 in https://github.com/ReactiveX/RxPY/pull/654

    New Contributors

    • @Azureblade3808 made their first contribution in https://github.com/ReactiveX/RxPY/pull/654

    Full Changelog: https://github.com/ReactiveX/RxPY/compare/v4.0.2...v4.0.3

    Source code(tar.gz)
    Source code(zip)
  • v4.0.2(Jun 12, 2022)

    What's Changed

    • Use deque in ReplaySubject (#646) by @timothy-shields in https://github.com/ReactiveX/RxPY/pull/648

    New Contributors

    • @timothy-shields made their first contribution in https://github.com/ReactiveX/RxPY/pull/648

    Full Changelog: https://github.com/ReactiveX/RxPY/compare/v4.0.1...v4.0.2

    Source code(tar.gz)
    Source code(zip)
  • v4.0.0(Mar 19, 2022)

    What's Changed

    • Set daemon attribute instead of using setDaemon method that was deprecated in Python 3.10 by @tirkarthi in https://github.com/ReactiveX/RxPY/pull/570
    • Remove deprecated loop parameter by @carlwgeorge in https://github.com/ReactiveX/RxPY/pull/575
    • Correct example code instructions by @Julian-O in https://github.com/ReactiveX/RxPY/pull/580
    • Fix build of documentation by @MainRo in https://github.com/ReactiveX/RxPY/pull/589
    • replace coroutine decorator with async keyword by @MainRo in https://github.com/ReactiveX/RxPY/pull/593
    • Removal of misleading information in take_while documentation by @kkonrad in https://github.com/ReactiveX/RxPY/pull/596
    • rework zip fix for #537 by @MainRo in https://github.com/ReactiveX/RxPY/pull/595
    • [ReactiveX] Use pyproject.toml and poetry by @dbrattli in https://github.com/ReactiveX/RxPY/pull/603
    • [ReactiveX] Fix pipe operator by @dbrattli in https://github.com/ReactiveX/RxPY/pull/605
    • [ReactiveX] Add poetry cache by @dbrattli in https://github.com/ReactiveX/RxPY/pull/607
    • [ReactiveX] Doc build fixes by @dbrattli in https://github.com/ReactiveX/RxPY/pull/606
    • [ReactiveX] Sort imports for tests by @dbrattli in https://github.com/ReactiveX/RxPY/pull/608
    • [ReactiveX] Optimize build by @dbrattli in https://github.com/ReactiveX/RxPY/pull/609
    • Bump version by @dbrattli in https://github.com/ReactiveX/RxPY/pull/610
    • [ReactiveX] reactivex rename by @dbrattli in https://github.com/ReactiveX/RxPY/pull/611
    • [ReactiveX] Fix typing by @dbrattli in https://github.com/ReactiveX/RxPY/pull/612
    • [ReactiveX] Pre commit by @dbrattli in https://github.com/ReactiveX/RxPY/pull/613
    • ReactiveX by @dbrattli in https://github.com/ReactiveX/RxPY/pull/614
    • Add publish action and dynamic versioning by @dbrattli in https://github.com/ReactiveX/RxPY/pull/615
    • Try fixing publish version by @dbrattli in https://github.com/ReactiveX/RxPY/pull/616
    • Fix poetry version setting by @dbrattli in https://github.com/ReactiveX/RxPY/pull/617
    • Fix defer docs by @dbrattli in https://github.com/ReactiveX/RxPY/pull/618
    • Update docs for pre-release install by @dbrattli in https://github.com/ReactiveX/RxPY/pull/621
    • Fix version for docs by @dbrattli in https://github.com/ReactiveX/RxPY/pull/623
    • Fix library version by @dbrattli in https://github.com/ReactiveX/RxPY/pull/625
    • Remove poetry run from version echo by @dbrattli in https://github.com/ReactiveX/RxPY/pull/626
    • Fix wxscheduler issues by @christiansandberg in https://github.com/ReactiveX/RxPY/pull/602
    • Type fixes by @dbrattli in https://github.com/ReactiveX/RxPY/pull/629
    • Disallow untyped defs by @dbrattli in https://github.com/ReactiveX/RxPY/pull/632
    • Fix withlatestfrom by @dbrattli in https://github.com/ReactiveX/RxPY/pull/633
    • Window threading issue. Fixes #604 by @dbrattli in https://github.com/ReactiveX/RxPY/pull/627
    • Fix return annotation of Observable.await by @hamstap85 in https://github.com/ReactiveX/RxPY/pull/630
    • fix(examples): fixes timeflies_tkinter by @dbrattli in https://github.com/ReactiveX/RxPY/pull/635

    New Contributors

    • @tirkarthi made their first contribution in https://github.com/ReactiveX/RxPY/pull/570
    • @carlwgeorge made their first contribution in https://github.com/ReactiveX/RxPY/pull/575
    • @Julian-O made their first contribution in https://github.com/ReactiveX/RxPY/pull/580
    • @kkonrad made their first contribution in https://github.com/ReactiveX/RxPY/pull/596
    • @christiansandberg made their first contribution in https://github.com/ReactiveX/RxPY/pull/602
    • @hamstap85 made their first contribution in https://github.com/ReactiveX/RxPY/pull/630

    Full Changelog: https://github.com/ReactiveX/RxPY/compare/v3.2.0...v4.0.0

    Source code(tar.gz)
    Source code(zip)
  • v4.0.0b5(Mar 12, 2022)

  • v4.0.0b4(Mar 11, 2022)

  • v4.0.0b3(Mar 6, 2022)

  • v4.0.0b2(Mar 6, 2022)

  • v4.0.0b1(Mar 5, 2022)

  • v4.0.0a4(Mar 5, 2022)

    What's Changed

    • [ReactiveX] Use pyproject.toml and poetry by @dbrattli in https://github.com/ReactiveX/RxPY/pull/603
    • [ReactiveX] Fix pipe operator by @dbrattli in https://github.com/ReactiveX/RxPY/pull/605
    • [ReactiveX] Add poetry cache by @dbrattli in https://github.com/ReactiveX/RxPY/pull/607
    • [ReactiveX] Doc build fixes by @dbrattli in https://github.com/ReactiveX/RxPY/pull/606
    • [ReactiveX] Sort imports for tests by @dbrattli in https://github.com/ReactiveX/RxPY/pull/608
    • [ReactiveX] Optimize build by @dbrattli in https://github.com/ReactiveX/RxPY/pull/609
    • [ReactiveX] reactivex rename by @dbrattli in https://github.com/ReactiveX/RxPY/pull/611
    • [ReactiveX] Fix typing by @dbrattli in https://github.com/ReactiveX/RxPY/pull/612
    • [ReactiveX] Pre commit by @dbrattli in https://github.com/ReactiveX/RxPY/pull/613
    • ReactiveX by @dbrattli in https://github.com/ReactiveX/RxPY/pull/614
    • Add publish action and dynamic versioning by @dbrattli in https://github.com/ReactiveX/RxPY/pull/615

    New Contributors

    • @tirkarthi made their first contribution in https://github.com/ReactiveX/RxPY/pull/570
    • @carlwgeorge made their first contribution in https://github.com/ReactiveX/RxPY/pull/575
    • @Julian-O made their first contribution in https://github.com/ReactiveX/RxPY/pull/580
    • @kkonrad made their first contribution in https://github.com/ReactiveX/RxPY/pull/596

    Full Changelog: https://github.com/ReactiveX/RxPY/compare/v3.2.0...v4.0.0a4

    Source code(tar.gz)
    Source code(zip)
  • v3.2.0(Apr 25, 2021)

    This is an improvement and bugfix release.

    Improvements

    • Add fork_join operator #533
    • Allow AsyncIOThreadSafeScheduler to schedule on its own AsyncIO event loop #567

    Fixes

    • Fix InvalidStateError in to_future #529
    • Complete as soon as any source completed In zip #525
    • Fix delay_with_mapper subscription #547
    • Fix scheduler forwarding in delay operator #555

    Other

    • Do not use pytest-runner #522
    Source code(tar.gz)
    Source code(zip)
  • v3.1.1(Jul 16, 2020)

    This is a bugfix and performance improvment release.

    Fixes:

    • Explicitly catch asyncio.CancelledError #498
    • Do not used start_with in reduce #488
    • Fix race condition in zip operator #505
    • Fix GEventScheduler deadlock #519

    Performance:

    • Drop reference to cached items asap in to_xxx #509

    Other:

    • Fixed several errors in documentation
    • Fixed mypy support in sdist #515
    Source code(tar.gz)
    Source code(zip)
  • v3.1.0(Feb 28, 2020)

    This is a bugfix and enhancement release. The principal evolutions are:

    • Fixed flat_map with mapper returning an asyncio Future #457
    • Fixed await on Observable that schedules later #456
    • Fixed scheduler forwading in several operators #476 #492 #493
    • Fixed concat: subscribe scheduler not forwarded + sources iterable exhaustion #486
    • Fixed missing default scheduler in timer operator #481
    • Fixed concat_with_iterable: subscribe scheduler is not forwarded #495
    • Added subject mapper to group_by and group_by_until #467
    • Many small fixes on the documentation
    Source code(tar.gz)
    Source code(zip)
  • v1.5.9(Mar 29, 2017)

    This release of RxPy has a number of enhancements and fixes, including:

    • Rolling initialization state bug with to_list() has been resolved #156

    • auto_connect() functionality added to ConnectableObservable #164

    • Several doXXX() side effect operators, as well as concat_map() and from_callable(), were added to mirror RxJava2 #166 #164

    • A to_sorted_list() operator has been added to collect emissions into a sorted list #167

    • A small set of statistical operators (median(), mode(), standard_deviation(), variance()) has been added #167

    • BlockingObservable received first(), first_or_default(), last(), and last_or_default() operators #172

    Source code(tar.gz)
    Source code(zip)
  • v1.1(Mar 30, 2015)

    • Transducers via Observable.transduce()
    • adapt_call no longer requires the inspect module
    • Support callable instance, instance method, and class method for adapt_call thanks to @succhiello.
    • Added example using concurrent futures for compute-intensive task parallelization, thanks to @38elements.
    • Got chess example working again under Python 2.7 thansks to @enobayram.
    • Added example for async generator.
    • Many PEP 8 fixes.
    Source code(tar.gz)
    Source code(zip)
  • v1.0.0(Dec 28, 2014)

  • v0.13(Nov 16, 2014)

    • Aligning throttle type operator naming with RxJS and RxJava
    • Added throttle_last() as alias for sample()
    • Renamed throttle() to debounce() and added throttle_with_timeout() as alias
    • Renamed any() to some()
    • Simplified sequence_equal()
    • Bugfix for take() when no count given
    • Removed internal operator final_value() which did exactly the same as last()
    • Added to_iterable() as alias to to_list()
    • Added throttle_first()
    Source code(tar.gz)
    Source code(zip)
Owner
ReactiveX
Reactive Extensions for Async Programming
ReactiveX
Jannik Ramrath 1 Feb 05, 2022
Easytile blender - Simple Blender 2.83 addon for tiling meshes easily

easytile_blender Dead simple, barebones Blender (2.83) addon for placing meshes as tiles. Installation In Blender, go to Edit Preferences Add-ons

Sam Gibson 6 Jul 19, 2022
《赛马娘》(ウマ娘: Pretty Derby)辅助 🐎🖥 基于 auto-derby 可视化操作/设置 启动器 一键包

ok-derby 《赛马娘》(ウマ娘: Pretty Derby)辅助 🐎 🖥 基于 auto-derby 可视化操作/设置 启动器 一键包 便捷,好用的 auto_derby 管理器! 功能 支持客户端 DMM (前台) 实验性 安卓 ADB 连接(后台)开发基于 1080x1920 分辨率

秋葉あんず 90 Jan 01, 2023
Simple Kahoot Botter.

Kahoot A simple Botter made in Python 3 for Kahoot.com. Also sorry for the shitty code lol. How to Run You need Python 3 installed on your device. Aft

7 Jun 29, 2022
Demo scripts for the Kubernetes Security Webinar

Kubernetes Security Webinar [in Russian] YouTube video (October 13, 2021) Authors: Artem Yushkovsky (LinkedIn, GitHub) Maxim Mosharov @ Whitespots.io

Slurm 34 Dec 06, 2022
Automated rop chain generation

This is the accompanying code to the blog post talking about automated rop chain generation. Build the test file with: make Install the dependencies:

Christopher Roberts 14 Nov 22, 2022
Dotfiles for my configurations!

Dotfiles Repo Welcome! Over here, you can find my dotfiles for various applications, including QTile, Alacritty, Kitty, LunarVim, and more! Make sure

class PythonAddict 3 Jan 10, 2022
Sentiment Based Product Recommendation System

Sentiment Based Product Recommendation System The e-commerce business is quite p

Sumit Sahay 2 Jan 15, 2022
freeCodeCamp Scientific Computing with Python Project for Certification.

Time_Calculator_freeCodeCamp freeCodeCamp Scientific Computing with Python Project for Certification. Write a function named add_time that takes in tw

Rajdeep Mondal 1 Dec 23, 2021
Telegram bot for Urban Dictionary.

Urban Dictionary Bot @TheUrbanDictBot A star ⭐ from you means a lot to us! Telegram bot for Urban Dictionary. Usage Deploy to Heroku Tap on above butt

Stark Bots 17 Nov 24, 2022
0xFalcon - 0xFalcon Tool For Python

0xFalcone Installation Install 0xFalcone Tool: apt install git git clone https:/

Alharb7 6 Sep 24, 2022
PythonKafkaCompose is an upgrade of the amazing work done in liveMaps

PythonKafkaCompose is an upgrade of the amazing work done in liveMaps It is a simple project composed by: an instance of Kafka a Py

5 Jun 19, 2022
Xoroshiro-cairo - A xoroshiro128** pseudorandom number generator implementation in Cairo

xoroshiro-cairo A xoroshiro128** pseudorandom number generator implementation in

Milan Cermak 26 Oct 05, 2022
A python package to adjust the bias of probabilistic forecasts/hindcasts using "Mean and Variance Adjustment" method.

Documentation A python package to adjust the bias of probabilistic forecasts/hindcasts using "Mean and Variance Adjustment" method. Read documentation

1 Feb 02, 2022
A Dungeon and Dragons Toolkit using Python

Pythons-Dungeons A Dungeon and Dragons Toolkit using Python Rules: -When you are commiting please don't delete parts of the code that are important -A

2 Oct 21, 2021
An Airflow operator to call the main function from the dbt-core Python package

airflow-dbt-python An Airflow operator to call the main function from the dbt-core Python package Motivation Airflow running in a managed environment

Tomás Farías Santana 93 Jan 08, 2023
Ellipitical Curve Table Generator

Ellipitical-Curve-Table-Generator This script generates a table of elliptical po

Nishaant Goswamy 1 Jan 02, 2022
A Notifier Program that Notifies you to relax your eyes Every 15 Minutes👀

Every 15 Minutes is an application that is used to Notify you to Relax your eyes Every 15 Minutes, This is fully made with Python and also with the us

FSP Gang s' Admin 1 Nov 03, 2021
Streamlit component to display topics from Streamlit's community forum related to any exception.

streamlit-forum Streamlit component to display topics from Streamlit's community forum related to any exception. Installation pip install streamlit-fo

Snehan Kekre 7 Jul 15, 2022
This interactive script demonstrates the Menezes-Vanstone-EC-Cryptosystem

Menezes-Vanstone-EC-Cryptosystem This interactive script demonstrates the Meneze

Nishaant Goswamy 1 Jan 02, 2022