Easy to use Google Pub/Sub

Overview

Relé makes integration with Google PubSub straightforward and easy.

Build Status Read the Docs Code Coverage PyPI - Python Version PyPI - Downloads

Motivation and Features

The Publish-Subscribe pattern and specifically the Google Cloud Pub/Sub library are very powerful tools but you can easily cut your fingers on it. Relé makes integration seamless by providing Publisher, Subscriber and Worker classes with the following features:

  • Powerful Publishing API
  • Highly Scalable Worker
  • Intuitive Subscription Management
  • Easily Extensible Middleware
  • Ready to go Django/Flask integration
  • CLI
  • And much more!

What it looks like

# Publish to the topic
import rele

rele.publish(topic='photo-uploaded', data={'customer_id': 123})

# Subscribe to the Pub/Sub topic
from rele import sub

@sub(topic='photo-uploaded')
def photo_uploaded(data, **kwargs):
    print(f"Customer {data['customer_id']} has uploaded an image")

What's in the name

"Relé" is Spanish for relay, a technology that has played a key role in history in the evolution of communication and electrical technology, including the telegraph, telephone, electricity transmission, and transistors.

Install

Relé supports Python 3.6+ and installing via pip

pip install rele

or with Django integration

pip install rele[django]

or with Flask integration

pip install rele[flask]

Quickstart

Please see our documentation to get started.

You can also read more about it here


Running Tests

Does the code actually work?

  make test
Comments
  • Clean up rele.config.setup + Worker() init

    Clean up rele.config.setup + Worker() init

    This relates to #114 & #119

    This makes makes all config variables nullable falling back to standard google envars, without breaking the current api.

    The new apis would look like this is you have GOOGLE_APPLICATION_CREDENTIALS set.

    rele.config.setup()
    
    w = Worker([sub1, sub2])
    w.run_forever()
    

    TBH, I think reading global configs from the environment is easier to reason about than a singleton so I'd suggest that. Any non globals should just be passed into the instances on init.

    :tophat: What?

    Provide a description of what has been implemented.

    :thinking: Why?

    Give an explanation of why.

    :link: Related issue

    Add related issue's number. Example: Fix #1

    opened by craigmulligan 8
  • Runtime error when trying to read project_id from default google creds

    Runtime error when trying to read project_id from default google creds

    I'm getting the following error when trying to run rele.config.setup using default credentials:

    rele.config.setup({
    	"GC_CREDENTIALS_PATH": None,
        "MIDDLEWARE": [
         	"rele.contrib.LoggingMiddleware",
            "rele.contrib.FlaskMiddleware",
         ],
         "APP_NAME": "smart_comms_planner",
    }, flask_app=app)
    

    output:

      File "/Users/matthewbridges/repos/smart-comms-planner/src/__init__.py", line 66, in <module>
        rele.config.setup(settings["rele"], flask_app=app)
      File "/Users/matthewbridges/.local/share/virtualenvs/smart-comms-planner-yrxYHqso/lib/python3.8/site-packages/rele/config.py", line 69, in setup
        init_global_publisher(config)
      File "/Users/matthewbridges/.local/share/virtualenvs/smart-comms-planner-yrxYHqso/lib/python3.8/site-packages/rele/publishing.py", line 10, in init_global_publisher
        gc_project_id=config.gc_project_id,
      File "/Users/matthewbridges/.local/share/virtualenvs/smart-comms-planner-yrxYHqso/lib/python3.8/site-packages/rele/config.py", line 59, in gc_project_id
        return self.credentials.project_id
    AttributeError: 'Credentials' object has no attribute 'project_id'
    

    The code is incorrectly attempting to read project_id off the credentials object, when in fact it is returned as a tuple from get_google_defaults.

    I've added a proposed fix here: https://github.com/mercadona/rele/pull/195

    opened by Itsindigo 7
  • Style guide

    Style guide

    We currently have flake8 in our linting command. We do not have isort in the project style guidelines.

    Shall we add it?

    Or try out something new like black?

    good first issue question 
    opened by andrewgy8 5
  • Configure Publisher Client timeout

    Configure Publisher Client timeout

    I would like to be able to configure the timeout when publishing a message in a blocking fashion. Right now, the default and hard coded way is set to 3.0 seconds.

    I propose adding a configuration so I can declare any number of seconds for the publisher.

    Ex.

    RELE = {
    	...
    	'PUBLISHER_TIMEOUT': 5.0
    }
    
    enhancement 
    opened by andrewgy8 4
  • Add an API to allow passing objects to middleware

    Add an API to allow passing objects to middleware

    I'm adding rele to flask app, and the subscription callbacks need the flask app_context. I've done this by way of middleware:

    eg:

    class FlaskMiddleware(BaseMiddleware):
        def pre_process_message(self, subscription, message):
            from server import app
            self.ctx = app.app_context()
            self.ctx.push()
    
        def post_process_message(self):
            self.ctx.pop()
    

    But to make this reusable across our services and other flask apps, I'd need a way to add arbitrary data to the config that is passed to middleware.setup method or have an easy way to call custom middleware functions.

    eg:

    class FlaskMiddleware(BaseMiddleware):
        def setup(self, config):
            self.app = config["FLASK_APP"]
    
        def pre_process_message(self, subscription, message):
            self.ctx = self.app.app_context()
            self.ctx.push()
    
        def post_process_message(self):
            self.ctx.pop()
    
    enhancement 
    opened by craigmulligan 4
  • Simplify initializing and running the Worker

    Simplify initializing and running the Worker

    One consistent peice of feedback that I have received is simplifying the Worker class. Right now, you must Initialize with the subs, each individual config attribute, run setup, run start, and then sleep.

    Like this:

    worker = Worker(
        [photo_uploaded],
        config.gc_project_id,
        config.credentials,
        config.ack_deadline,
    )
    worker.setup()
    worker.start()
    sleep(120)
    

    I propose we simplify the API to run a worker to look something like this instead:

    worker = Worker([photo_uploaded], config)
    worker.run(sleep=120)
    

    run would call both setup and start, and we could add the standard sleep method. In addition we consolidate the configuration into one attribute.

    This would be backwards compatible since we will be creating a new method. And the change to Worker initialization could also fall back to the declared attributes if defined. Otherwise, use the config object.

    enhancement 
    opened by andrewgy8 4
  • Improve sub decorator

    Improve sub decorator

    The sub decorator can get some ease of use improvements

    1. Use functools.wraps to preserve __name__ and __doc__ of the original function
    2. Inspect callback function signature and raise an exception if the signature is not compatible (ie. missing data)
    3. Log a warning message if the function cannot be discovered (ie. no sub in the path)

    Slightly related to the second point: @sub("topic", filter_by=42) will raise a TypeError at runtime (because filters are used as callables but they are not checked)

    enhancement hacktoberfest 
    opened by tbarbugli 3
  • Breaking with Google PubSub >= 2.0

    Breaking with Google PubSub >= 2.0

    In the 1.0 release, I had to pin pubsub to <2.0. Otherwise, our tests would break. It seems there were major changes, and we need to update some usages in our code.

    To reproduce:

    1. update google pubsub library to >2.0.
    2. Run tests
    good first issue hacktoberfest 
    opened by andrewgy8 3
  • Execution doesn't stop when using debugger

    Execution doesn't stop when using debugger

    Hey 👋 first of all thanks for this great library!

    This might not be a strictly Relé related issue, but when I tried to pause execution to debug some issue (ie dropping a __import__("pdb").set_trace()) inside the subscription message handler function it did not stop, but just kept on processing messages.

    This might be a trivial problem to overcome, but my search engine fu failed me. I'm guessing the problem is that the subscriber handles the message in a thread, so it would somehow need to let the parent know to stop, but not really sure how to go about it.

    question 
    opened by daaain 3
  • Add unrecoverable_middleware

    Add unrecoverable_middleware

    This is useful if you know you will never be able to handle the message and don't have a dead-letter queue setup.

    :tophat: What?

    A custom exception and middleware to automate and ignore bad messages.

    :thinking: Why?

    I've been using this a fair amount in some of our services and thought it may be useful to others.

    opened by craigmulligan 3
  • Add python 3.8 to travis

    Add python 3.8 to travis

    :tophat: What?

    Add python 3.8 to travis.

    :thinking: Why?

    It was officially released on Oct 14, 2019

    This would allow us to officially add Python 3.8. However, it seems that grpcio is not compiled for Python 3.8 and therefore the travis build takes much longer to complete.. Ran for 5 min 40 sec.

    So either we can wait and not support 3.8 yet, or merge this and have the travis build take longer than expected.

    I leave it as a draft until we decide.

    opened by andrewgy8 3
  • [travis] PSQL is not required for tests execution

    [travis] PSQL is not required for tests execution

    :tophat: What?

    Remove PostgreSQL service from Travis pipeline

    :thinking: Why?

    • It's not used
    • It could give a false sensation of security by believing there are tests checking database features (Django DB)
    hacktoberfest 
    opened by Maks3w 0
  • [add] Python 3.10 support

    [add] Python 3.10 support

    :tophat: What?

    • Run the test suite against the minimumPython supported (3.6) and the maximum version supported (3.10)
    • Also update the dist image to bionic because it's the only one with support for all the Python versions.
    • Add the PIP classifiers for Python 3.10

    :thinking: Why?

    Be sure the library is compatible with the latest/current version of Python (3.10)

    hacktoberfest 
    opened by Maks3w 0
  • Add native async support for subscriptions

    Add native async support for subscriptions

    Currently, async functions can't be used with the @sub context manager.

    @sub(topic='my-topic'):
    async def async_handler(data, **kwargs):
        print('data', data)
    

    If you try, you'll get the following error.

    Configuring worker with 1 subscription(s)...
      dicom_uploads - handle_upload
    /usr/local/lib/python3.9/site-packages/google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py:126: RuntimeWarning: coroutine 'handle_upload' was never awaited
      callback(message)
    RuntimeWarning: Enable tracemalloc to get the object allocation traceback
    

    There are some simple workarounds, but it would be nice if this was supported natively.

    @sub(topic='my-topic'):
    def sync_handler(data, **kwargs)
        return asyncio.run(async_handler(data, **kwargs))
    
    async def async_handler(data, **kwargs):
        print('data', data)
    
    opened by csaroff 6
  • Add support for arbitrary deserializers

    Add support for arbitrary deserializers

    :tophat: What?

    Allow custom deserializers other than the default json deserializer.

    :thinking: Why?

    Some of GCP's pub/sub notifications have messages that are simple strings which can't be deserialized as json.

    For example, notifications that a record was inserted into the dicom store comes through with a string indicating the path to that record and nothing more.

    :link: https://github.com/mercadona/rele/issues/229

    opened by csaroff 8
  • Add subscription option to disable json parsing

    Add subscription option to disable json parsing

    Currently rele's subscription model only supports consuming messages that are json deserializable.

    Unfortunately some of GCP's core services publish non-json messages. For example, the gcp dicom store publishes notifications to a topic when a new dicom instance was inserted, but the message field is just the dicom instance path. When attempting to consume these messages through rele, we get the following error.

    Exception raised while processing message for dicom_uploads - handle_upload: JSONDecodeError
    Traceback (most recent call last):
      File "/usr/local/lib/python3.9/site-packages/rele/subscription.py", line 112, in __call__
        data = json.loads(message.data.decode("utf-8"))
      File "/usr/local/lib/python3.9/json/__init__.py", line 346, in loads
        return _default_decoder.decode(s)
      File "/usr/local/lib/python3.9/json/decoder.py", line 337, in decode
        obj, end = self.raw_decode(s, idx=_w(s, 0).end())
      File "/usr/local/lib/python3.9/json/decoder.py", line 355, in raw_decode
        raise JSONDecodeError("Expecting value", s, err.value) from None
    json.decoder.JSONDecodeError: Expecting value: line 1 column 1 (char 0)
    

    Can we add an option to disable deserialization of these messages? Something like:

    from rele import sub
    
    @sub(topic='my-topic', deserialize_message=False):
    def handle_upload(data, **kwargs):
        print('Handling data', data)
    
    opened by csaroff 2
Releases(1.6.0)
  • 1.6.0(Aug 4, 2022)

  • v1.4.1(Apr 20, 2022)

    What's Changed

    • [Deleted] Delete post-publish-message-failure hook on VerboseLoggingMiddleware. (#220)

    Full Changelog: https://github.com/mercadona/rele/compare/v1.4.0...v1.4.1

    Source code(tar.gz)
    Source code(zip)
  • v1.4.0(Apr 13, 2022)

    What's Changed

    • [Added] Added a VerboseLoggingMiddleware that does not truncate mesage payload. (#218)

    Full Changelog: https://github.com/mercadona/rele/compare/v1.3.0...v1.4.0

    Source code(tar.gz)
    Source code(zip)
  • v1.3.0(Apr 5, 2022)

    What's Changed

    • GC Project Id & Windows support by @chirgjin-les in https://github.com/mercadona/rele/pull/215
    • Release v1.3.0 by @jonasae in https://github.com/mercadona/rele/pull/216

    New Contributors

    • @chirgjin-les made their first contribution in https://github.com/mercadona/rele/pull/215

    Full Changelog: https://github.com/mercadona/rele/compare/v1.2.0...v1.3.0

    Source code(tar.gz)
    Source code(zip)
  • v1.2.0(Dec 15, 2021)

    • [CHANGED] TimeotError from publisher (#212)
    • Added filter_subs_by setting in documentation (#208)
    • Automatic topic creation (#206)
    • Log post publish success (#204)
    Source code(tar.gz)
    Source code(zip)
  • 1.1.1(Jun 28, 2021)

  • 1.1.0(Mar 10, 2021)

    • Google Pubsub 2.0 Compat (#192)
    • Add validations to the sub decorator (#189)
    • Add new post_publish_hook and deprecate the old one (#190)
    • Discover and load settings when publishing (#188)
    • Fix #180: Raise error when the config loads a repeated subscription (#187)
    Source code(tar.gz)
    Source code(zip)
  • 1.0.0(Sep 25, 2020)

  • 0.14.0(Aug 5, 2020)

    • BREAKING CHANGE: Remove GC_CREDENTIALS (#174)
    • DEPRECATE: GC_PROJECT_ID setting (#178)
    • Add changelog to the docs site (#179)
    • Catch TimeoutError and run post_publish_failure when blocking (#172)
    Source code(tar.gz)
    Source code(zip)
  • 0.13.0(Jul 10, 2020)

  • 0.13.dev0(Jun 16, 2020)

  • 0.12.0(Jun 12, 2020)

  • 0.11.0(Jun 4, 2020)

  • 0.10.0(Feb 4, 2020)

    • Adjust default THREADS_PER_SUBSCRIPTION (#152)
    • Add unrecoverable_middleware (#150)
    • Allow multiple filters (#148)
    • Configure timeout from .publish() (#143)
    • Dont crash when subscription topic does not exist (#142)
    Source code(tar.gz)
    Source code(zip)
  • 0.9.1(Jan 2, 2020)

  • 0.9.0(Dec 20, 2019)

    • Flask support via middleware (#127)
    • Add message attributes to metrics log (#128)
    • Specify number of threads per subscriber with Subscription ThreadPoolExecutor (#139)
    • Publishing timeout while blocking (#137)
    • Clean up rele.config.setup + Worker() init (#132)
    Source code(tar.gz)
    Source code(zip)
  • 0.8.1(Nov 25, 2019)

  • 0.8.0(Nov 22, 2019)

    • Worker run method (#118)
    • Add kwargs to setup method passed through to middleware (#123)
    • Add missing worker middleware hooks (#121)
    • Add 3.8 support
    • More Documentation
    Source code(tar.gz)
    Source code(zip)
  • 0.7.0(Oct 21, 2019)

  • 0.6.0(Sep 23, 2019)

    • BREAKING: Remove drf as a dependency (#91)
    • Add message as a parameter for middleware hooks (#99)
    • Check setting.CONN_MAX_AGE and warn when not 0 (#97)
    • More documentation
    Source code(tar.gz)
    Source code(zip)
  • 0.5.0(Aug 9, 2019)

  • 0.4.1(Jun 19, 2019)

  • 0.4.0(Jun 17, 2019)

    • Set DEFAULT_ACK_DEADLINE (#49)
    • Filter by message attributes (#66)
    • BREAKING: All Relé settings are defined in a dict (#60)

    Old structure:

    from google.oauth2 import service_account
    RELE_GC_CREDENTIALS = service_account.Credentials.from_service_account_file(
        'rele/settings/dummy-credentials.json'
    )
    RELE_GC_PROJECT_ID = 'dummy-project-id'
    

    New structure:

    from google.oauth2 import service_account
    RELE = {
        'GC_CREDENTIALS': service_account.Credentials.from_service_account_file(
            'rele/settings/dummy-credentials.json'
        ),
        'GC_PROJECT_ID': 'dummy-project-id',
        'MIDDLEWARE': [
            'rele.contrib.LoggingMiddleware',
            'rele.contrib.DjangoDBMiddleware',
        ],
        'SUB_PREFIX': 'delivery',
        'APP_NAME': 'delivery',
    }
    
    • rele.contrib.middleware (#55)
    • Prefix argument in sub decorator (#47)
    • Add timestamp to the published message (#42)
    • BREAKING: Explicit publisher and subscriber configuration (#43)
    • Sphinx documentation (#27, #34, #40, #41)
    • Contributing guidelines (#32)
    Source code(tar.gz)
    Source code(zip)
  • v0.3.1(Jun 5, 2019)

  • v0.3.0(May 14, 2019)

    • Ability to run in emulator mode (#12)
    • Add Travis-CI builds (#10)
    • More friendly global publish (#11)
    • Non-blocking behaviour when publishing by default (#6)
    Source code(tar.gz)
    Source code(zip)
This is the repository for HalpyBOT, the Hull Seals IRC Chatbot Assistant.

HalpyBOT 1.4.2 This is the repository for HalpyBOT, the Hull Seals IRC Chatbot Assistant. Description This repository houses all of the files required

The Hull Seals 3 Nov 03, 2022
🪣 Bitbucket Server PAT Generator

🪣 Bitbucket Server PAT Generator 🤝 Introduction Bitbucket Server (nee Stash) can hand out Personal Access Tokens (PAT) to be used in-place of user+p

reecetech 2 May 03, 2022
Discord bot that displays Jazz Jackrabbit 2 server status, current gamemode as "Playing.." status

JJ2-server-status-discord-bot Discord bot that displays Jazz Jackrabbit 2 server status, current gamemode as "Playing.." status How to setup: 0. Downl

2 Dec 09, 2021
Pinopoly is a tool to remove the "banker" player and replace them with a digitalized system

Pinopoly is a tool to remove the "banker" player and replace them with a digitalized system. It is intended to be used on a Raspberry Pi but can be used in the command line as well.

Alex Overstreet 11 Jul 09, 2022
McTrade is a bot exploiting Binance API, open source! built in python !

Open Source Crypto Trading Bot using Binance API Report Bug · Request Feature Table of Contents About The Project Built With Getting Started Prerequis

Raphael Cohen 5 Jul 17, 2022
Instagram boosting

instagram boosting bot This bot can boost your instagram account! Rules and Instruction Use git clone to download this repository Open cmd/terminal an

Eskimo 4 Oct 20, 2022
Easy and simple, Telegram Bot to Show alert when some edits a message in Group

Edit-Message-Alert Just a simple bot to show alert when someone edits a message sent by them, Just 17 Lines of Code These codes are for those who incu

Nuhman Pk 6 Dec 15, 2021
Wonderful Phoenix-Bot

Phoenix Bot Discord Phoenix Bot is inspired by Natewong1313's Bird Bot project yet due to lack of activity by their team. We have decided to revive th

Senior Developer 0 Aug 12, 2021
C Y B Ξ R UserBot is a project that simplifies the use of Telegram. All rights reserved.

C Y B Ξ R USΞRBOT 🇦🇿 C Y B Ξ R UserBot is a project that simplifies the use of Telegram. All rights reserved. Automatic Setup Android: open Termux p

C Y B Ξ R 0 Sep 20, 2022
Drover is a command-line utility for deploying Python packages to Lambda functions.

drover drover: a command-line utility for deploying Python packages to Lambda functions. Background This utility aims to provide a simple, repeatable,

Jeffrey Wilges 4 May 19, 2021
A Collection Manager for the objkt.com Minting Factory

Objkt Collection Manager A Collection Manager for the objkt.com Minting Factory. This contract can create a collection on objkt.com and mint into it.

Asbjorn Enge 5 Nov 22, 2022
⚡ ʑɠ ცơɬ Is One Of The Fastest & Smoothest Bot On Telegram Based on Telethon ⚡

『ʑɠ ცơɬ』 ⚡ ʑɠ ცơɬ Is One Of The Fastest & Smoothest Bot On Telegram Based on Telethon ⚡ Status Of Bot Telegram 🏪 Dєρℓογ το нєяοκυ Variables APP_ID =

ʑɑʑɓɦɑɪ 0 Feb 12, 2022
A ShareX alternative for Mac OS built in Python.

Clipboard Uploader A ShareX alternative for Mac OS built in Python. Install and setup Download the latest release and put it in your applications fold

Ben Tettmar 2 Jun 07, 2022
A bot to share Facebook posts.

bot_share_facebook a bot to share Facebook posts. install & clone untuk menjalankan anda bisa melalui terminal contohnya termux, cmd, dan terminal lai

Muhammad Latif Harkat 7 Dec 07, 2022
Auxiliator is telegram bot for basic web-application analysis

Auxiliator Auxiliator is telegram bot for basic web-application analysis What for? Sometimes there is no access to your main PC, where you can scan we

Revoltage 13 Dec 26, 2021
🎥 Stream your favorite movie from the terminal!

Stream-Cli stream-cli is a Python scrapping CLI that combine scrapy and webtorrent in one command for streaming movies from your terminal. Installatio

R E D O N E 379 Dec 24, 2022
Kevin L. 3 Jul 14, 2022
Python client for QIWI payment system

Pyqiwi Lib for QIWI payment system Installation pip install pyqiwi Usage from decimal import Decimal from datetime import datetime, timedelta from p

Andrey 12 Jun 03, 2022
Retrieves GitHub Stats via `git_api` and flask.

GitHub User Search Created using Python3 and git_api, coded by JBYT27. About This is a project I decided to make for Kajam, but I decided to choose a

an aspirin 4 May 11, 2022
gBasic - The easy multiplatform bot

gBasic The easy multiplatform bot gBasic is the module at the core of @GianpiertoldaBot, maintained with 3 for the entire community by the Stockdroid

Stockdroid Fans 5 Nov 03, 2021