Easy to use Google Pub/Sub

Overview

Relé makes integration with Google PubSub straightforward and easy.

Build Status Read the Docs Code Coverage PyPI - Python Version PyPI - Downloads

Motivation and Features

The Publish-Subscribe pattern and specifically the Google Cloud Pub/Sub library are very powerful tools but you can easily cut your fingers on it. Relé makes integration seamless by providing Publisher, Subscriber and Worker classes with the following features:

  • Powerful Publishing API
  • Highly Scalable Worker
  • Intuitive Subscription Management
  • Easily Extensible Middleware
  • Ready to go Django/Flask integration
  • CLI
  • And much more!

What it looks like

# Publish to the topic
import rele

rele.publish(topic='photo-uploaded', data={'customer_id': 123})

# Subscribe to the Pub/Sub topic
from rele import sub

@sub(topic='photo-uploaded')
def photo_uploaded(data, **kwargs):
    print(f"Customer {data['customer_id']} has uploaded an image")

What's in the name

"Relé" is Spanish for relay, a technology that has played a key role in history in the evolution of communication and electrical technology, including the telegraph, telephone, electricity transmission, and transistors.

Install

Relé supports Python 3.6+ and installing via pip

pip install rele

or with Django integration

pip install rele[django]

or with Flask integration

pip install rele[flask]

Quickstart

Please see our documentation to get started.

You can also read more about it here


Running Tests

Does the code actually work?

  make test
Comments
  • Clean up rele.config.setup + Worker() init

    Clean up rele.config.setup + Worker() init

    This relates to #114 & #119

    This makes makes all config variables nullable falling back to standard google envars, without breaking the current api.

    The new apis would look like this is you have GOOGLE_APPLICATION_CREDENTIALS set.

    rele.config.setup()
    
    w = Worker([sub1, sub2])
    w.run_forever()
    

    TBH, I think reading global configs from the environment is easier to reason about than a singleton so I'd suggest that. Any non globals should just be passed into the instances on init.

    :tophat: What?

    Provide a description of what has been implemented.

    :thinking: Why?

    Give an explanation of why.

    :link: Related issue

    Add related issue's number. Example: Fix #1

    opened by craigmulligan 8
  • Runtime error when trying to read project_id from default google creds

    Runtime error when trying to read project_id from default google creds

    I'm getting the following error when trying to run rele.config.setup using default credentials:

    rele.config.setup({
    	"GC_CREDENTIALS_PATH": None,
        "MIDDLEWARE": [
         	"rele.contrib.LoggingMiddleware",
            "rele.contrib.FlaskMiddleware",
         ],
         "APP_NAME": "smart_comms_planner",
    }, flask_app=app)
    

    output:

      File "/Users/matthewbridges/repos/smart-comms-planner/src/__init__.py", line 66, in <module>
        rele.config.setup(settings["rele"], flask_app=app)
      File "/Users/matthewbridges/.local/share/virtualenvs/smart-comms-planner-yrxYHqso/lib/python3.8/site-packages/rele/config.py", line 69, in setup
        init_global_publisher(config)
      File "/Users/matthewbridges/.local/share/virtualenvs/smart-comms-planner-yrxYHqso/lib/python3.8/site-packages/rele/publishing.py", line 10, in init_global_publisher
        gc_project_id=config.gc_project_id,
      File "/Users/matthewbridges/.local/share/virtualenvs/smart-comms-planner-yrxYHqso/lib/python3.8/site-packages/rele/config.py", line 59, in gc_project_id
        return self.credentials.project_id
    AttributeError: 'Credentials' object has no attribute 'project_id'
    

    The code is incorrectly attempting to read project_id off the credentials object, when in fact it is returned as a tuple from get_google_defaults.

    I've added a proposed fix here: https://github.com/mercadona/rele/pull/195

    opened by Itsindigo 7
  • Style guide

    Style guide

    We currently have flake8 in our linting command. We do not have isort in the project style guidelines.

    Shall we add it?

    Or try out something new like black?

    good first issue question 
    opened by andrewgy8 5
  • Configure Publisher Client timeout

    Configure Publisher Client timeout

    I would like to be able to configure the timeout when publishing a message in a blocking fashion. Right now, the default and hard coded way is set to 3.0 seconds.

    I propose adding a configuration so I can declare any number of seconds for the publisher.

    Ex.

    RELE = {
    	...
    	'PUBLISHER_TIMEOUT': 5.0
    }
    
    enhancement 
    opened by andrewgy8 4
  • Add an API to allow passing objects to middleware

    Add an API to allow passing objects to middleware

    I'm adding rele to flask app, and the subscription callbacks need the flask app_context. I've done this by way of middleware:

    eg:

    class FlaskMiddleware(BaseMiddleware):
        def pre_process_message(self, subscription, message):
            from server import app
            self.ctx = app.app_context()
            self.ctx.push()
    
        def post_process_message(self):
            self.ctx.pop()
    

    But to make this reusable across our services and other flask apps, I'd need a way to add arbitrary data to the config that is passed to middleware.setup method or have an easy way to call custom middleware functions.

    eg:

    class FlaskMiddleware(BaseMiddleware):
        def setup(self, config):
            self.app = config["FLASK_APP"]
    
        def pre_process_message(self, subscription, message):
            self.ctx = self.app.app_context()
            self.ctx.push()
    
        def post_process_message(self):
            self.ctx.pop()
    
    enhancement 
    opened by craigmulligan 4
  • Simplify initializing and running the Worker

    Simplify initializing and running the Worker

    One consistent peice of feedback that I have received is simplifying the Worker class. Right now, you must Initialize with the subs, each individual config attribute, run setup, run start, and then sleep.

    Like this:

    worker = Worker(
        [photo_uploaded],
        config.gc_project_id,
        config.credentials,
        config.ack_deadline,
    )
    worker.setup()
    worker.start()
    sleep(120)
    

    I propose we simplify the API to run a worker to look something like this instead:

    worker = Worker([photo_uploaded], config)
    worker.run(sleep=120)
    

    run would call both setup and start, and we could add the standard sleep method. In addition we consolidate the configuration into one attribute.

    This would be backwards compatible since we will be creating a new method. And the change to Worker initialization could also fall back to the declared attributes if defined. Otherwise, use the config object.

    enhancement 
    opened by andrewgy8 4
  • Improve sub decorator

    Improve sub decorator

    The sub decorator can get some ease of use improvements

    1. Use functools.wraps to preserve __name__ and __doc__ of the original function
    2. Inspect callback function signature and raise an exception if the signature is not compatible (ie. missing data)
    3. Log a warning message if the function cannot be discovered (ie. no sub in the path)

    Slightly related to the second point: @sub("topic", filter_by=42) will raise a TypeError at runtime (because filters are used as callables but they are not checked)

    enhancement hacktoberfest 
    opened by tbarbugli 3
  • Breaking with Google PubSub >= 2.0

    Breaking with Google PubSub >= 2.0

    In the 1.0 release, I had to pin pubsub to <2.0. Otherwise, our tests would break. It seems there were major changes, and we need to update some usages in our code.

    To reproduce:

    1. update google pubsub library to >2.0.
    2. Run tests
    good first issue hacktoberfest 
    opened by andrewgy8 3
  • Execution doesn't stop when using debugger

    Execution doesn't stop when using debugger

    Hey 👋 first of all thanks for this great library!

    This might not be a strictly Relé related issue, but when I tried to pause execution to debug some issue (ie dropping a __import__("pdb").set_trace()) inside the subscription message handler function it did not stop, but just kept on processing messages.

    This might be a trivial problem to overcome, but my search engine fu failed me. I'm guessing the problem is that the subscriber handles the message in a thread, so it would somehow need to let the parent know to stop, but not really sure how to go about it.

    question 
    opened by daaain 3
  • Add unrecoverable_middleware

    Add unrecoverable_middleware

    This is useful if you know you will never be able to handle the message and don't have a dead-letter queue setup.

    :tophat: What?

    A custom exception and middleware to automate and ignore bad messages.

    :thinking: Why?

    I've been using this a fair amount in some of our services and thought it may be useful to others.

    opened by craigmulligan 3
  • Add python 3.8 to travis

    Add python 3.8 to travis

    :tophat: What?

    Add python 3.8 to travis.

    :thinking: Why?

    It was officially released on Oct 14, 2019

    This would allow us to officially add Python 3.8. However, it seems that grpcio is not compiled for Python 3.8 and therefore the travis build takes much longer to complete.. Ran for 5 min 40 sec.

    So either we can wait and not support 3.8 yet, or merge this and have the travis build take longer than expected.

    I leave it as a draft until we decide.

    opened by andrewgy8 3
  • [travis] PSQL is not required for tests execution

    [travis] PSQL is not required for tests execution

    :tophat: What?

    Remove PostgreSQL service from Travis pipeline

    :thinking: Why?

    • It's not used
    • It could give a false sensation of security by believing there are tests checking database features (Django DB)
    hacktoberfest 
    opened by Maks3w 0
  • [add] Python 3.10 support

    [add] Python 3.10 support

    :tophat: What?

    • Run the test suite against the minimumPython supported (3.6) and the maximum version supported (3.10)
    • Also update the dist image to bionic because it's the only one with support for all the Python versions.
    • Add the PIP classifiers for Python 3.10

    :thinking: Why?

    Be sure the library is compatible with the latest/current version of Python (3.10)

    hacktoberfest 
    opened by Maks3w 0
  • Add native async support for subscriptions

    Add native async support for subscriptions

    Currently, async functions can't be used with the @sub context manager.

    @sub(topic='my-topic'):
    async def async_handler(data, **kwargs):
        print('data', data)
    

    If you try, you'll get the following error.

    Configuring worker with 1 subscription(s)...
      dicom_uploads - handle_upload
    /usr/local/lib/python3.9/site-packages/google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py:126: RuntimeWarning: coroutine 'handle_upload' was never awaited
      callback(message)
    RuntimeWarning: Enable tracemalloc to get the object allocation traceback
    

    There are some simple workarounds, but it would be nice if this was supported natively.

    @sub(topic='my-topic'):
    def sync_handler(data, **kwargs)
        return asyncio.run(async_handler(data, **kwargs))
    
    async def async_handler(data, **kwargs):
        print('data', data)
    
    opened by csaroff 6
  • Add support for arbitrary deserializers

    Add support for arbitrary deserializers

    :tophat: What?

    Allow custom deserializers other than the default json deserializer.

    :thinking: Why?

    Some of GCP's pub/sub notifications have messages that are simple strings which can't be deserialized as json.

    For example, notifications that a record was inserted into the dicom store comes through with a string indicating the path to that record and nothing more.

    :link: https://github.com/mercadona/rele/issues/229

    opened by csaroff 8
  • Add subscription option to disable json parsing

    Add subscription option to disable json parsing

    Currently rele's subscription model only supports consuming messages that are json deserializable.

    Unfortunately some of GCP's core services publish non-json messages. For example, the gcp dicom store publishes notifications to a topic when a new dicom instance was inserted, but the message field is just the dicom instance path. When attempting to consume these messages through rele, we get the following error.

    Exception raised while processing message for dicom_uploads - handle_upload: JSONDecodeError
    Traceback (most recent call last):
      File "/usr/local/lib/python3.9/site-packages/rele/subscription.py", line 112, in __call__
        data = json.loads(message.data.decode("utf-8"))
      File "/usr/local/lib/python3.9/json/__init__.py", line 346, in loads
        return _default_decoder.decode(s)
      File "/usr/local/lib/python3.9/json/decoder.py", line 337, in decode
        obj, end = self.raw_decode(s, idx=_w(s, 0).end())
      File "/usr/local/lib/python3.9/json/decoder.py", line 355, in raw_decode
        raise JSONDecodeError("Expecting value", s, err.value) from None
    json.decoder.JSONDecodeError: Expecting value: line 1 column 1 (char 0)
    

    Can we add an option to disable deserialization of these messages? Something like:

    from rele import sub
    
    @sub(topic='my-topic', deserialize_message=False):
    def handle_upload(data, **kwargs):
        print('Handling data', data)
    
    opened by csaroff 2
Releases(1.6.0)
  • 1.6.0(Aug 4, 2022)

  • v1.4.1(Apr 20, 2022)

    What's Changed

    • [Deleted] Delete post-publish-message-failure hook on VerboseLoggingMiddleware. (#220)

    Full Changelog: https://github.com/mercadona/rele/compare/v1.4.0...v1.4.1

    Source code(tar.gz)
    Source code(zip)
  • v1.4.0(Apr 13, 2022)

    What's Changed

    • [Added] Added a VerboseLoggingMiddleware that does not truncate mesage payload. (#218)

    Full Changelog: https://github.com/mercadona/rele/compare/v1.3.0...v1.4.0

    Source code(tar.gz)
    Source code(zip)
  • v1.3.0(Apr 5, 2022)

    What's Changed

    • GC Project Id & Windows support by @chirgjin-les in https://github.com/mercadona/rele/pull/215
    • Release v1.3.0 by @jonasae in https://github.com/mercadona/rele/pull/216

    New Contributors

    • @chirgjin-les made their first contribution in https://github.com/mercadona/rele/pull/215

    Full Changelog: https://github.com/mercadona/rele/compare/v1.2.0...v1.3.0

    Source code(tar.gz)
    Source code(zip)
  • v1.2.0(Dec 15, 2021)

    • [CHANGED] TimeotError from publisher (#212)
    • Added filter_subs_by setting in documentation (#208)
    • Automatic topic creation (#206)
    • Log post publish success (#204)
    Source code(tar.gz)
    Source code(zip)
  • 1.1.1(Jun 28, 2021)

  • 1.1.0(Mar 10, 2021)

    • Google Pubsub 2.0 Compat (#192)
    • Add validations to the sub decorator (#189)
    • Add new post_publish_hook and deprecate the old one (#190)
    • Discover and load settings when publishing (#188)
    • Fix #180: Raise error when the config loads a repeated subscription (#187)
    Source code(tar.gz)
    Source code(zip)
  • 1.0.0(Sep 25, 2020)

  • 0.14.0(Aug 5, 2020)

    • BREAKING CHANGE: Remove GC_CREDENTIALS (#174)
    • DEPRECATE: GC_PROJECT_ID setting (#178)
    • Add changelog to the docs site (#179)
    • Catch TimeoutError and run post_publish_failure when blocking (#172)
    Source code(tar.gz)
    Source code(zip)
  • 0.13.0(Jul 10, 2020)

  • 0.13.dev0(Jun 16, 2020)

  • 0.12.0(Jun 12, 2020)

  • 0.11.0(Jun 4, 2020)

  • 0.10.0(Feb 4, 2020)

    • Adjust default THREADS_PER_SUBSCRIPTION (#152)
    • Add unrecoverable_middleware (#150)
    • Allow multiple filters (#148)
    • Configure timeout from .publish() (#143)
    • Dont crash when subscription topic does not exist (#142)
    Source code(tar.gz)
    Source code(zip)
  • 0.9.1(Jan 2, 2020)

  • 0.9.0(Dec 20, 2019)

    • Flask support via middleware (#127)
    • Add message attributes to metrics log (#128)
    • Specify number of threads per subscriber with Subscription ThreadPoolExecutor (#139)
    • Publishing timeout while blocking (#137)
    • Clean up rele.config.setup + Worker() init (#132)
    Source code(tar.gz)
    Source code(zip)
  • 0.8.1(Nov 25, 2019)

  • 0.8.0(Nov 22, 2019)

    • Worker run method (#118)
    • Add kwargs to setup method passed through to middleware (#123)
    • Add missing worker middleware hooks (#121)
    • Add 3.8 support
    • More Documentation
    Source code(tar.gz)
    Source code(zip)
  • 0.7.0(Oct 21, 2019)

  • 0.6.0(Sep 23, 2019)

    • BREAKING: Remove drf as a dependency (#91)
    • Add message as a parameter for middleware hooks (#99)
    • Check setting.CONN_MAX_AGE and warn when not 0 (#97)
    • More documentation
    Source code(tar.gz)
    Source code(zip)
  • 0.5.0(Aug 9, 2019)

  • 0.4.1(Jun 19, 2019)

  • 0.4.0(Jun 17, 2019)

    • Set DEFAULT_ACK_DEADLINE (#49)
    • Filter by message attributes (#66)
    • BREAKING: All Relé settings are defined in a dict (#60)

    Old structure:

    from google.oauth2 import service_account
    RELE_GC_CREDENTIALS = service_account.Credentials.from_service_account_file(
        'rele/settings/dummy-credentials.json'
    )
    RELE_GC_PROJECT_ID = 'dummy-project-id'
    

    New structure:

    from google.oauth2 import service_account
    RELE = {
        'GC_CREDENTIALS': service_account.Credentials.from_service_account_file(
            'rele/settings/dummy-credentials.json'
        ),
        'GC_PROJECT_ID': 'dummy-project-id',
        'MIDDLEWARE': [
            'rele.contrib.LoggingMiddleware',
            'rele.contrib.DjangoDBMiddleware',
        ],
        'SUB_PREFIX': 'delivery',
        'APP_NAME': 'delivery',
    }
    
    • rele.contrib.middleware (#55)
    • Prefix argument in sub decorator (#47)
    • Add timestamp to the published message (#42)
    • BREAKING: Explicit publisher and subscriber configuration (#43)
    • Sphinx documentation (#27, #34, #40, #41)
    • Contributing guidelines (#32)
    Source code(tar.gz)
    Source code(zip)
  • v0.3.1(Jun 5, 2019)

  • v0.3.0(May 14, 2019)

    • Ability to run in emulator mode (#12)
    • Add Travis-CI builds (#10)
    • More friendly global publish (#11)
    • Non-blocking behaviour when publishing by default (#6)
    Source code(tar.gz)
    Source code(zip)
An all-in-one discord bot!

Interbot Interbot is a do-it-all bot originally made for the Interbyte Studios discord server. This repo contains the code for this bot, allowing you

Logan 5 Aug 03, 2021
Neko is An Anime themed advance Telegram group management bot.

NekoRobot A modular telegram Python bot running on python3 with an sqlalchemy, mongodb database. ╒═══「 Status 」 Maintained Support Group Included Free

Lovely Prince 11 Oct 11, 2022
Cogs for Red-DiscordBot

Redbot cogs for Red-DiscordBot authored by Kreusada This is my repository for Red Discord-Bot. I built these cogs because these were the features that

Kreus Amredes 26 Nov 07, 2022
Prisma Cloud utility scripts, and a Python SDK for Prisma Cloud APIs.

pcs-toolbox Prisma Cloud utility scripts, and a Python SDK for Prisma Cloud APIs. Table of Contents Support Setup Configuration Script Usage CSPM Scri

Palo Alto Networks 34 Dec 15, 2022
To send an Instagram message using Python

To send an Instagram message using Python, you must have an Instagram account and install the Instabot library in your Python virtual environment.

Coding Taggers 1 Dec 18, 2021
A self-bot for discord, written in Python, which will send you notifications to your desktop if it detects an intruder on your discord server

A self-bot for discord, written in Python, which will send you notifications to your desktop if it detects an intruder on your discord server

LevPrav 1 Jan 11, 2022
Slack bot to automatically delete yubisneeze / accidental yubikey presses

YubiSnooze Slack bot to automatically delete yubisneeze / accidental yubikey presses. It will search using the regex "[cbdefghijklnrtuv]{44}" and if t

Andrew MacPherson 3 Feb 09, 2022
Another secured and Yet Fastest telegram userbot

Vision-UserBot A stable, simple Telegram UserBot in Pyrogram! Support Variables ➨ TG_APP_ID - Your Telegram Api id. ➨ TG_API_HASH - Your Telegram Api

TeamVision 40 Oct 24, 2022
Automates the process to obtain an appointment for NIE in spain.

get-nie-appointment A Python script that automates the process of getting an appointment for NIE assignation. It can be modified in order to change th

Ezequiel Aceto 39 Nov 28, 2022
Get charts, top artists and top songs WITHOUT LastFM API

LastFM Get charts, top artists and top songs WITHOUT LastFM API Usage Get stats (charts) We provide many filters and options to customize. Geo filter

4 Feb 11, 2022
This is a music bot for discord written in python

this is a music bot for discord written in python, it is designed for educational use ONLY, I do not take any responsibility for uses outside of educational use

5 Dec 24, 2021
Python Dialogflow CX Scripting API (SCRAPI)

Python Dialogflow CX Scripting API (SCRAPI) A high level scripting API for bot builders, developers, and maintainers. Table of Contents Introduction W

Google Cloud Platform 39 Dec 09, 2022
An API wrapper library for opensea api.

Opensea API An API wrapper library for opensea api. Installation pip3 install opensea Usage Retrieving assets: from opensea import get_assets # This

Ankush Singh 38 Jul 17, 2022
Pdisk Link Converter Telegram Bot, Convert link in a single click

Pdisk Converter Bot Make short link by using Pdisk API key Installation The Easy Way Required Variables BOT_TOKEN: Create a bot using @BotFather, and

Ayush Kumar Jaiswal 6 Jul 28, 2022
Скрипт, позволяющий импортировать плейлисты из Spotify, а также обычные треклисты в VK музыку.

vk-music-import Программа для переноса плейлистов из Spotify и текстовых треклистов в VK Музыку. Преимущества: Позволяет быстро импортировать плейлист

Mew Forest 32 Nov 23, 2022
TonplaceApi - Ton.place api wrapper

tonplaceApi ton.place/tonplaceApi Обертка для ton.place Установка pip install ht

Nickolay Samedov 3 Feb 21, 2022
Moderation By Pokemon Bot (Discord)

Moderation Bot By Pokémon Bot (Discord) Official Moderation Bot for Pokemon Bot functional and based in the Discord Server, the bot is written in Pyth

Aakash Manoj Agrawal 6 Jan 04, 2022
A chatbot that helps you set price alerts for your amazon products.

Amazon Price Alert Bot Description A Telegram chatbot that helps you set price alerts for amazon products. The bot checks the price of your watchliste

Rittik Basu 24 Dec 29, 2022
An advanced automatic top.gg dank memer voter that votes automatically for you.

Auto Dank Memer Voter An automatic dank memer voter that sends votes onto top.gg every 12 hours, unless their is captcha. I am working on a captcha de

6 Aug 27, 2022
Easy to use reaction role Discord bot written in Python.

Reaction Light - Discord Role Bot Light yet powerful reaction role bot coded in Python. Key Features Create multiple custom embedded messages with cus

eibex 109 Dec 20, 2022