Easy to use Google Pub/Sub

Overview

Relé makes integration with Google PubSub straightforward and easy.

Build Status Read the Docs Code Coverage PyPI - Python Version PyPI - Downloads

Motivation and Features

The Publish-Subscribe pattern and specifically the Google Cloud Pub/Sub library are very powerful tools but you can easily cut your fingers on it. Relé makes integration seamless by providing Publisher, Subscriber and Worker classes with the following features:

  • Powerful Publishing API
  • Highly Scalable Worker
  • Intuitive Subscription Management
  • Easily Extensible Middleware
  • Ready to go Django/Flask integration
  • CLI
  • And much more!

What it looks like

# Publish to the topic
import rele

rele.publish(topic='photo-uploaded', data={'customer_id': 123})

# Subscribe to the Pub/Sub topic
from rele import sub

@sub(topic='photo-uploaded')
def photo_uploaded(data, **kwargs):
    print(f"Customer {data['customer_id']} has uploaded an image")

What's in the name

"Relé" is Spanish for relay, a technology that has played a key role in history in the evolution of communication and electrical technology, including the telegraph, telephone, electricity transmission, and transistors.

Install

Relé supports Python 3.6+ and installing via pip

pip install rele

or with Django integration

pip install rele[django]

or with Flask integration

pip install rele[flask]

Quickstart

Please see our documentation to get started.

You can also read more about it here


Running Tests

Does the code actually work?

  make test
Comments
  • Clean up rele.config.setup + Worker() init

    Clean up rele.config.setup + Worker() init

    This relates to #114 & #119

    This makes makes all config variables nullable falling back to standard google envars, without breaking the current api.

    The new apis would look like this is you have GOOGLE_APPLICATION_CREDENTIALS set.

    rele.config.setup()
    
    w = Worker([sub1, sub2])
    w.run_forever()
    

    TBH, I think reading global configs from the environment is easier to reason about than a singleton so I'd suggest that. Any non globals should just be passed into the instances on init.

    :tophat: What?

    Provide a description of what has been implemented.

    :thinking: Why?

    Give an explanation of why.

    :link: Related issue

    Add related issue's number. Example: Fix #1

    opened by craigmulligan 8
  • Runtime error when trying to read project_id from default google creds

    Runtime error when trying to read project_id from default google creds

    I'm getting the following error when trying to run rele.config.setup using default credentials:

    rele.config.setup({
    	"GC_CREDENTIALS_PATH": None,
        "MIDDLEWARE": [
         	"rele.contrib.LoggingMiddleware",
            "rele.contrib.FlaskMiddleware",
         ],
         "APP_NAME": "smart_comms_planner",
    }, flask_app=app)
    

    output:

      File "/Users/matthewbridges/repos/smart-comms-planner/src/__init__.py", line 66, in <module>
        rele.config.setup(settings["rele"], flask_app=app)
      File "/Users/matthewbridges/.local/share/virtualenvs/smart-comms-planner-yrxYHqso/lib/python3.8/site-packages/rele/config.py", line 69, in setup
        init_global_publisher(config)
      File "/Users/matthewbridges/.local/share/virtualenvs/smart-comms-planner-yrxYHqso/lib/python3.8/site-packages/rele/publishing.py", line 10, in init_global_publisher
        gc_project_id=config.gc_project_id,
      File "/Users/matthewbridges/.local/share/virtualenvs/smart-comms-planner-yrxYHqso/lib/python3.8/site-packages/rele/config.py", line 59, in gc_project_id
        return self.credentials.project_id
    AttributeError: 'Credentials' object has no attribute 'project_id'
    

    The code is incorrectly attempting to read project_id off the credentials object, when in fact it is returned as a tuple from get_google_defaults.

    I've added a proposed fix here: https://github.com/mercadona/rele/pull/195

    opened by Itsindigo 7
  • Style guide

    Style guide

    We currently have flake8 in our linting command. We do not have isort in the project style guidelines.

    Shall we add it?

    Or try out something new like black?

    good first issue question 
    opened by andrewgy8 5
  • Configure Publisher Client timeout

    Configure Publisher Client timeout

    I would like to be able to configure the timeout when publishing a message in a blocking fashion. Right now, the default and hard coded way is set to 3.0 seconds.

    I propose adding a configuration so I can declare any number of seconds for the publisher.

    Ex.

    RELE = {
    	...
    	'PUBLISHER_TIMEOUT': 5.0
    }
    
    enhancement 
    opened by andrewgy8 4
  • Add an API to allow passing objects to middleware

    Add an API to allow passing objects to middleware

    I'm adding rele to flask app, and the subscription callbacks need the flask app_context. I've done this by way of middleware:

    eg:

    class FlaskMiddleware(BaseMiddleware):
        def pre_process_message(self, subscription, message):
            from server import app
            self.ctx = app.app_context()
            self.ctx.push()
    
        def post_process_message(self):
            self.ctx.pop()
    

    But to make this reusable across our services and other flask apps, I'd need a way to add arbitrary data to the config that is passed to middleware.setup method or have an easy way to call custom middleware functions.

    eg:

    class FlaskMiddleware(BaseMiddleware):
        def setup(self, config):
            self.app = config["FLASK_APP"]
    
        def pre_process_message(self, subscription, message):
            self.ctx = self.app.app_context()
            self.ctx.push()
    
        def post_process_message(self):
            self.ctx.pop()
    
    enhancement 
    opened by craigmulligan 4
  • Simplify initializing and running the Worker

    Simplify initializing and running the Worker

    One consistent peice of feedback that I have received is simplifying the Worker class. Right now, you must Initialize with the subs, each individual config attribute, run setup, run start, and then sleep.

    Like this:

    worker = Worker(
        [photo_uploaded],
        config.gc_project_id,
        config.credentials,
        config.ack_deadline,
    )
    worker.setup()
    worker.start()
    sleep(120)
    

    I propose we simplify the API to run a worker to look something like this instead:

    worker = Worker([photo_uploaded], config)
    worker.run(sleep=120)
    

    run would call both setup and start, and we could add the standard sleep method. In addition we consolidate the configuration into one attribute.

    This would be backwards compatible since we will be creating a new method. And the change to Worker initialization could also fall back to the declared attributes if defined. Otherwise, use the config object.

    enhancement 
    opened by andrewgy8 4
  • Improve sub decorator

    Improve sub decorator

    The sub decorator can get some ease of use improvements

    1. Use functools.wraps to preserve __name__ and __doc__ of the original function
    2. Inspect callback function signature and raise an exception if the signature is not compatible (ie. missing data)
    3. Log a warning message if the function cannot be discovered (ie. no sub in the path)

    Slightly related to the second point: @sub("topic", filter_by=42) will raise a TypeError at runtime (because filters are used as callables but they are not checked)

    enhancement hacktoberfest 
    opened by tbarbugli 3
  • Breaking with Google PubSub >= 2.0

    Breaking with Google PubSub >= 2.0

    In the 1.0 release, I had to pin pubsub to <2.0. Otherwise, our tests would break. It seems there were major changes, and we need to update some usages in our code.

    To reproduce:

    1. update google pubsub library to >2.0.
    2. Run tests
    good first issue hacktoberfest 
    opened by andrewgy8 3
  • Execution doesn't stop when using debugger

    Execution doesn't stop when using debugger

    Hey 👋 first of all thanks for this great library!

    This might not be a strictly Relé related issue, but when I tried to pause execution to debug some issue (ie dropping a __import__("pdb").set_trace()) inside the subscription message handler function it did not stop, but just kept on processing messages.

    This might be a trivial problem to overcome, but my search engine fu failed me. I'm guessing the problem is that the subscriber handles the message in a thread, so it would somehow need to let the parent know to stop, but not really sure how to go about it.

    question 
    opened by daaain 3
  • Add unrecoverable_middleware

    Add unrecoverable_middleware

    This is useful if you know you will never be able to handle the message and don't have a dead-letter queue setup.

    :tophat: What?

    A custom exception and middleware to automate and ignore bad messages.

    :thinking: Why?

    I've been using this a fair amount in some of our services and thought it may be useful to others.

    opened by craigmulligan 3
  • Add python 3.8 to travis

    Add python 3.8 to travis

    :tophat: What?

    Add python 3.8 to travis.

    :thinking: Why?

    It was officially released on Oct 14, 2019

    This would allow us to officially add Python 3.8. However, it seems that grpcio is not compiled for Python 3.8 and therefore the travis build takes much longer to complete.. Ran for 5 min 40 sec.

    So either we can wait and not support 3.8 yet, or merge this and have the travis build take longer than expected.

    I leave it as a draft until we decide.

    opened by andrewgy8 3
  • [travis] PSQL is not required for tests execution

    [travis] PSQL is not required for tests execution

    :tophat: What?

    Remove PostgreSQL service from Travis pipeline

    :thinking: Why?

    • It's not used
    • It could give a false sensation of security by believing there are tests checking database features (Django DB)
    hacktoberfest 
    opened by Maks3w 0
  • [add] Python 3.10 support

    [add] Python 3.10 support

    :tophat: What?

    • Run the test suite against the minimumPython supported (3.6) and the maximum version supported (3.10)
    • Also update the dist image to bionic because it's the only one with support for all the Python versions.
    • Add the PIP classifiers for Python 3.10

    :thinking: Why?

    Be sure the library is compatible with the latest/current version of Python (3.10)

    hacktoberfest 
    opened by Maks3w 0
  • Add native async support for subscriptions

    Add native async support for subscriptions

    Currently, async functions can't be used with the @sub context manager.

    @sub(topic='my-topic'):
    async def async_handler(data, **kwargs):
        print('data', data)
    

    If you try, you'll get the following error.

    Configuring worker with 1 subscription(s)...
      dicom_uploads - handle_upload
    /usr/local/lib/python3.9/site-packages/google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py:126: RuntimeWarning: coroutine 'handle_upload' was never awaited
      callback(message)
    RuntimeWarning: Enable tracemalloc to get the object allocation traceback
    

    There are some simple workarounds, but it would be nice if this was supported natively.

    @sub(topic='my-topic'):
    def sync_handler(data, **kwargs)
        return asyncio.run(async_handler(data, **kwargs))
    
    async def async_handler(data, **kwargs):
        print('data', data)
    
    opened by csaroff 6
  • Add support for arbitrary deserializers

    Add support for arbitrary deserializers

    :tophat: What?

    Allow custom deserializers other than the default json deserializer.

    :thinking: Why?

    Some of GCP's pub/sub notifications have messages that are simple strings which can't be deserialized as json.

    For example, notifications that a record was inserted into the dicom store comes through with a string indicating the path to that record and nothing more.

    :link: https://github.com/mercadona/rele/issues/229

    opened by csaroff 8
  • Add subscription option to disable json parsing

    Add subscription option to disable json parsing

    Currently rele's subscription model only supports consuming messages that are json deserializable.

    Unfortunately some of GCP's core services publish non-json messages. For example, the gcp dicom store publishes notifications to a topic when a new dicom instance was inserted, but the message field is just the dicom instance path. When attempting to consume these messages through rele, we get the following error.

    Exception raised while processing message for dicom_uploads - handle_upload: JSONDecodeError
    Traceback (most recent call last):
      File "/usr/local/lib/python3.9/site-packages/rele/subscription.py", line 112, in __call__
        data = json.loads(message.data.decode("utf-8"))
      File "/usr/local/lib/python3.9/json/__init__.py", line 346, in loads
        return _default_decoder.decode(s)
      File "/usr/local/lib/python3.9/json/decoder.py", line 337, in decode
        obj, end = self.raw_decode(s, idx=_w(s, 0).end())
      File "/usr/local/lib/python3.9/json/decoder.py", line 355, in raw_decode
        raise JSONDecodeError("Expecting value", s, err.value) from None
    json.decoder.JSONDecodeError: Expecting value: line 1 column 1 (char 0)
    

    Can we add an option to disable deserialization of these messages? Something like:

    from rele import sub
    
    @sub(topic='my-topic', deserialize_message=False):
    def handle_upload(data, **kwargs):
        print('Handling data', data)
    
    opened by csaroff 2
Releases(1.6.0)
  • 1.6.0(Aug 4, 2022)

  • v1.4.1(Apr 20, 2022)

    What's Changed

    • [Deleted] Delete post-publish-message-failure hook on VerboseLoggingMiddleware. (#220)

    Full Changelog: https://github.com/mercadona/rele/compare/v1.4.0...v1.4.1

    Source code(tar.gz)
    Source code(zip)
  • v1.4.0(Apr 13, 2022)

    What's Changed

    • [Added] Added a VerboseLoggingMiddleware that does not truncate mesage payload. (#218)

    Full Changelog: https://github.com/mercadona/rele/compare/v1.3.0...v1.4.0

    Source code(tar.gz)
    Source code(zip)
  • v1.3.0(Apr 5, 2022)

    What's Changed

    • GC Project Id & Windows support by @chirgjin-les in https://github.com/mercadona/rele/pull/215
    • Release v1.3.0 by @jonasae in https://github.com/mercadona/rele/pull/216

    New Contributors

    • @chirgjin-les made their first contribution in https://github.com/mercadona/rele/pull/215

    Full Changelog: https://github.com/mercadona/rele/compare/v1.2.0...v1.3.0

    Source code(tar.gz)
    Source code(zip)
  • v1.2.0(Dec 15, 2021)

    • [CHANGED] TimeotError from publisher (#212)
    • Added filter_subs_by setting in documentation (#208)
    • Automatic topic creation (#206)
    • Log post publish success (#204)
    Source code(tar.gz)
    Source code(zip)
  • 1.1.1(Jun 28, 2021)

  • 1.1.0(Mar 10, 2021)

    • Google Pubsub 2.0 Compat (#192)
    • Add validations to the sub decorator (#189)
    • Add new post_publish_hook and deprecate the old one (#190)
    • Discover and load settings when publishing (#188)
    • Fix #180: Raise error when the config loads a repeated subscription (#187)
    Source code(tar.gz)
    Source code(zip)
  • 1.0.0(Sep 25, 2020)

  • 0.14.0(Aug 5, 2020)

    • BREAKING CHANGE: Remove GC_CREDENTIALS (#174)
    • DEPRECATE: GC_PROJECT_ID setting (#178)
    • Add changelog to the docs site (#179)
    • Catch TimeoutError and run post_publish_failure when blocking (#172)
    Source code(tar.gz)
    Source code(zip)
  • 0.13.0(Jul 10, 2020)

  • 0.13.dev0(Jun 16, 2020)

  • 0.12.0(Jun 12, 2020)

  • 0.11.0(Jun 4, 2020)

  • 0.10.0(Feb 4, 2020)

    • Adjust default THREADS_PER_SUBSCRIPTION (#152)
    • Add unrecoverable_middleware (#150)
    • Allow multiple filters (#148)
    • Configure timeout from .publish() (#143)
    • Dont crash when subscription topic does not exist (#142)
    Source code(tar.gz)
    Source code(zip)
  • 0.9.1(Jan 2, 2020)

  • 0.9.0(Dec 20, 2019)

    • Flask support via middleware (#127)
    • Add message attributes to metrics log (#128)
    • Specify number of threads per subscriber with Subscription ThreadPoolExecutor (#139)
    • Publishing timeout while blocking (#137)
    • Clean up rele.config.setup + Worker() init (#132)
    Source code(tar.gz)
    Source code(zip)
  • 0.8.1(Nov 25, 2019)

  • 0.8.0(Nov 22, 2019)

    • Worker run method (#118)
    • Add kwargs to setup method passed through to middleware (#123)
    • Add missing worker middleware hooks (#121)
    • Add 3.8 support
    • More Documentation
    Source code(tar.gz)
    Source code(zip)
  • 0.7.0(Oct 21, 2019)

  • 0.6.0(Sep 23, 2019)

    • BREAKING: Remove drf as a dependency (#91)
    • Add message as a parameter for middleware hooks (#99)
    • Check setting.CONN_MAX_AGE and warn when not 0 (#97)
    • More documentation
    Source code(tar.gz)
    Source code(zip)
  • 0.5.0(Aug 9, 2019)

  • 0.4.1(Jun 19, 2019)

  • 0.4.0(Jun 17, 2019)

    • Set DEFAULT_ACK_DEADLINE (#49)
    • Filter by message attributes (#66)
    • BREAKING: All Relé settings are defined in a dict (#60)

    Old structure:

    from google.oauth2 import service_account
    RELE_GC_CREDENTIALS = service_account.Credentials.from_service_account_file(
        'rele/settings/dummy-credentials.json'
    )
    RELE_GC_PROJECT_ID = 'dummy-project-id'
    

    New structure:

    from google.oauth2 import service_account
    RELE = {
        'GC_CREDENTIALS': service_account.Credentials.from_service_account_file(
            'rele/settings/dummy-credentials.json'
        ),
        'GC_PROJECT_ID': 'dummy-project-id',
        'MIDDLEWARE': [
            'rele.contrib.LoggingMiddleware',
            'rele.contrib.DjangoDBMiddleware',
        ],
        'SUB_PREFIX': 'delivery',
        'APP_NAME': 'delivery',
    }
    
    • rele.contrib.middleware (#55)
    • Prefix argument in sub decorator (#47)
    • Add timestamp to the published message (#42)
    • BREAKING: Explicit publisher and subscriber configuration (#43)
    • Sphinx documentation (#27, #34, #40, #41)
    • Contributing guidelines (#32)
    Source code(tar.gz)
    Source code(zip)
  • v0.3.1(Jun 5, 2019)

  • v0.3.0(May 14, 2019)

    • Ability to run in emulator mode (#12)
    • Add Travis-CI builds (#10)
    • More friendly global publish (#11)
    • Non-blocking behaviour when publishing by default (#6)
    Source code(tar.gz)
    Source code(zip)
A Python wrapper around the OpenWeatherMap web API

PyOWM A Python wrapper around OpenWeatherMap web APIs What is it? PyOWM is a client Python wrapper library for OpenWeatherMap (OWM) web APIs. It allow

Claudio Sparpaglione 740 Dec 18, 2022
My homeserver setup. Everything managed securely using Portainer.

homeserver-traefik-portainer Features: access all services with free TLS from letsencrypt using your own domain running a side project is super simple

Tomasz Wójcik 44 Jan 03, 2023
Discord-disnake - This package allows to use disnake without changing the discord namespace

This package is a shim This module allows to use disnake using discord namespace. This is not an independent library. Installing Python 3.8 or higher

5 Dec 13, 2022
通过GitHub的actions 自动采集节点 生成订阅信息

VmessActions 通过GitHub的actions 自动采集节点 自动生成订阅信息 订阅内容自动更新再仓库的 clash.yml 和 v2ray.txt 中 然后PC端/手机端根据自己的软件支持的格式,订阅对应的链接即可

skywolf627 372 Jan 04, 2023
A bot to view Garfield comics directly from Discord and get updates of the comics automatically

Garfield-Bot A bot to view Garfield comics directly from Discord and get updates of the comics automatically. Instructions to use the bot: Invite the

Raghav Sharma 3 Feb 13, 2022
This bot will automatically like and follow users that post under a specified hashtag

Instagram-bot This bot will automatically like and follow users that post under a specified hashtag Dependencies Java JDK Selenium Updated version of

Makana Edwards 1 Nov 04, 2021
Send Informative, Concise Slack Notifications With Minimal Effort

slack-templates Send Informative, Concise Slack Notifications With Minimal Effort slack-templates Slack Integration Available Templates Usage Report t

9 Nov 03, 2022
Small cloudfoundry client implemented in python

Cloudfoundry python client The cf-python-client repo contains a Python client library for Cloud Foundry. Installing Supported versions warning: Starti

Cloud Foundry Community 50 Sep 08, 2022
DDoS Script (DDoS Panel) with Multiple Bypass ( Cloudflare UAM,CAPTCHA,BFM,NOSEC / DDoS Guard / Google Shield / V Shield / Amazon / etc.. )

KARMA DDoS DDoS Script (DDoS Panel) with Multiple Bypass ( Cloudflare UAM,CAPTCHA,BFM,NOSEC / DDoS Guard / Google Shield / V Shield / Amazon / etc.. )

Hyuk 256 Jan 02, 2023
🔎 Hunt down social media accounts by username across social networks

Hunt down social media accounts by username across social networks Installation | Usage | Docker Notes | Contributing Installation # clone the repo $

Sherlock 38.2k Jan 01, 2023
A Discord Bot for the Pygame Community Server

PygameCommunityBot The Pygame Community Discord bot The bot is capable of doing a lot of stuff, the command prefix is pg!. For help on all the bot com

PygameCommunityDiscord 23 Nov 30, 2022
An example of using discordpy 2.0.0a to create a bot that supports slash commands

DpySlashBotExample An example of using discordpy 2.0.0a to create a bot that supports slash commands. This is not a fully complete bot, just an exampl

7 Oct 17, 2022
Cutting-edge GitHub page customization tool

Cutting-edge GitHub page customization tool Want to customize your GitHub user page, but don't know how? Now you can make your profile unique and attr

Igor Vaiman 32 Aug 24, 2022
Template to create a telegram bot in python

Template for Telegram Bot Template to create a telegram bot in python. How to Run First add src to PYTHONPATH: export PYTHONPATH=${PWD} Then run: pyt

Ali Hejazizo 12 Dec 24, 2022
An Advanced Python Playing Card Module that makes creating playing card games simple and easy!

playingcards.py An Advanced Python Playing Card Module that makes creating playing card games simple and easy! Features Easy to Understand Class Objec

Blake Potvin 5 Aug 30, 2022
Free & open source API service for obtaining information about +9600 universities worldwide.

Free & open source API service for obtaining information about +9600 universities worldwide.

Yagiz Degirmenci 57 Nov 04, 2022
The Official Dropbox API V2 SDK for Python

The offical Dropbox SDK for Python. Documentation can be found on Read The Docs. Installation Create an app via the Developer Console. Install via pip

Dropbox 828 Jan 05, 2023
Insane Weather Bot is here! Give suggestions, fork, and do much more to help us enhance the abilities of Insane Weather Bot.

Insane_Weather_Bot Insane Weather Bot is here! Give suggestions, fork, and do much more to help us enhance the abilities of Insane Weather Bot. Weathe

1 Jan 02, 2022
A way to export your saved reddit posts to a Notion table.

reddit-saved-to-notion A way to export your saved reddit posts and comments to a Notion table.Uses notion-sdk-py and praw for interacting with Notion

19 Sep 12, 2022
A Flask & Twilio Secret Santa app.

🎄 ✨ Secret Santa Twilio ✨ 📱 A contactless Secret Santa game built with Python, Flask and Twilio! Prerequisites 📝 A Twilio account. Sign up here ngr

Sangeeta Jadoonanan 5 Dec 23, 2021