A fast and durable Pub/Sub channel over Websockets. FastAPI + WebSockets + PubSub == ⚡ 💪 ❤️

Overview

pubsub

🗞️ FastAPI Websocket Pub/Sub

Tests Package Downloads

A fast and durable Pub/Sub channel over Websockets. The easiest way to create a live publish / subscribe multi-cast over the web.

Supports and tested on Python >= 3.7

Installation 🛠️

pip install fastapi_websocket_pubsub

Intro

The classic pub/sub pattern made easily accessible and scalable over the web and across your cloud in realtime; while enjoying the benefits of FastAPI (e.g. dependency injection).

FastAPI + WebSockets + PubSub == 💪 ❤️

  • Subscribe

    • Clients subscribe to topics (arbitrary strings) and receive relevant events along with structured data (serialized with Pydantic).
      # Callback to be called upon event being published on server
      async def on_event(data):
          print("We got an event! with data- ", data)
      # Subscribe for the event 
      client.subscribe("my event", on_event)
  • Publish

    • Directly from server code to connected clients.
      app = FastAPI() 
      endpoint = PubSubEndpoint()
      endpoint.register_route(app, "/pubsub")
      endpoint.publish(["my_event_topic"], data=["my", "data", 1])
    • From client to client (through the servers)
      async with PubSubClient(server_uri="ws://localhost/pubsub") as client:
          endpoint.publish(["my_event_topic"], data=["my", "data", 1])
    • Across server instances (using broadcaster and a backend medium (e.g. Redis, Kafka, ...))
      • No matter which server a client connects to - it will get the messages it subscribes to
      app = FastAPI() 
      endpoint = PubSubEndpoint(broadcaster="postgres://localhost:5432/")
      
      @app.websocket("/pubsub")
      async def websocket_rpc_endpoint(websocket: WebSocket):
          async with endpoint.broadcaster:
              await endpoint.main_loop(websocket)
      see examples/pubsub_broadcaster_server_example.py for full usage example

Usage example (server publishing following HTTP trigger):

In the code below, a client connects to the server and subscribes to a topic named "triggered". Aside from PubSub websocket, the server also exposes a regular http route, which triggers publication of the event.

Server:

import asyncio
import uvicorn
from fastapi import FastAPI
from fastapi.routing import APIRouter

from fastapi_websocket_pubsub import PubSubEndpoint
app =  FastAPI()
# Init endpoint
endpoint = PubSubEndpoint()
# register the endpoint on the app
endpoint.register_route(app, "/pubsub")
# Register a regular HTTP route
@app.get("/trigger")
async def trigger_events():
    # Upon request trigger an event
    endpoint.publish(["triggered"])

Client:

from fastapi_websocket_pubsub import PubSubClient
# Callback to be called upon event being published on server
async def on_trigger(data):
    print("Trigger URL was accessed")

async with PubSubClient(server_uri="ws://localhost/pubsub") as client:
    # Subscribe for the event 
    client.subscribe("triggered", on_event)

More Examples

What can I do with this?

The combination of Websockets, and bi-directional Pub/Sub is ideal to create realtime data propagation solution at scale over the web.

  • Update mechanism
  • Remote control mechanism
  • Data processing
  • Distributed computing
  • Realtime communications over the web

Foundations:

  • Based on fastapi-websocket-rpc for a robust realtime bidirectional channel

  • Based on broadcaster for syncing server instances

  • Server Endpoint:

    • Based on FastAPI: enjoy all the benefits of a full ASGI platform, including Async-io and dependency injections (for example to authenticate connections)

    • Based on Pydantic: easily serialize structured data as part of RPC requests and responses. Simply Pass Pydantic data models as PubSub published data to have it available as part of an event.

  • Client :

    • Based on Tenacity: allowing configurable retries to keep to connection alive

      • see WebSocketRpcClient.init's retry_config
    • Based on python websockets - a more comprehensive client than the one offered by FastAPI

Logging

fastapi-websocket-pubsub uses fastapi-websocket-rpc for logging config. It provides a helper logging module to control how it produces logs for you. See fastapi_websocket_rpc/logger.py. Use logging_config.set_mode or the 'WS_RPC_LOGGING' environment variable to choose the logging method you prefer. Or override completely via default logging config (e.g. 'logging.config.dictConfig'), all logger name start with: 'fastapi.ws_rpc.pubsub'

example:

# set RPC to log like UVICORN
from fastapi_websocket_rpc.logger import logging_config, LoggingModes
logging_config.set_mode(LoggingModes.UVICORN)

Pull requests - welcome!

  • Please include tests for new features
You might also like...
Prometheus exporter for Starlette and FastAPI

starlette_exporter Prometheus exporter for Starlette and FastAPI. The middleware collects basic metrics: Counter: starlette_requests_total Histogram:

🚀   Cookiecutter Template for FastAPI + React Projects.  Using PostgreSQL, SQLAlchemy, and Docker
🚀 Cookiecutter Template for FastAPI + React Projects. Using PostgreSQL, SQLAlchemy, and Docker

FastAPI + React · A cookiecutter template for bootstrapping a FastAPI and React project using a modern stack. Features FastAPI (Python 3.8) JWT authen

A rate limiter for Starlette and FastAPI

SlowApi A rate limiting library for Starlette and FastAPI adapted from flask-limiter. Note: this is alpha quality code still, the API may change, and

Opentracing support for Starlette and FastApi
Opentracing support for Starlette and FastApi

Starlette-OpenTracing OpenTracing support for Starlette and FastApi. Inspired by: Flask-OpenTracing OpenTracing implementations exist for major distri

FastAPI application and service structure for a more maintainable codebase

Abstracting FastAPI Services See this article for more information: https://camillovisini.com/article/abstracting-fastapi-services/ Poetry poetry inst

A rate limiter for Starlette and FastAPI

SlowApi A rate limiting library for Starlette and FastAPI adapted from flask-limiter. Note: this is alpha quality code still, the API may change, and

Prometheus exporter for Starlette and FastAPI

starlette_exporter Prometheus exporter for Starlette and FastAPI. The middleware collects basic metrics: Counter: starlette_requests_total Histogram:

Opentracing support for Starlette and FastApi
Opentracing support for Starlette and FastApi

Starlette-OpenTracing OpenTracing support for Starlette and FastApi. Inspired by: Flask-OpenTracing OpenTracing implementations exist for major distri

Full stack, modern web application generator. Using FastAPI, PostgreSQL as database, Docker, automatic HTTPS and more.
Full stack, modern web application generator. Using FastAPI, PostgreSQL as database, Docker, automatic HTTPS and more.

Full Stack FastAPI and PostgreSQL - Base Project Generator Generate a backend and frontend stack using Python, including interactive API documentation

Releases(v0.1.20)
Twitter API monitor with fastAPI + MongoDB

Twitter API monitor with fastAPI + MongoDB You need to have a file .env with the following variables: DB_URL="mongodb+srv://mongodb_path" DB_URL2=

Leonardo Ferreira 3 Apr 08, 2022
Socket.IO integration for Flask applications.

Flask-SocketIO Socket.IO integration for Flask applications. Installation You can install this package as usual with pip: pip install flask-socketio

Miguel Grinberg 4.9k Jan 03, 2023
A FastAPI Middleware of joerick/pyinstrument to check your service performance.

fastapi_profiler A FastAPI Middleware of joerick/pyinstrument to check your service performance. 📣 Info A FastAPI Middleware of pyinstrument to check

LeoSun 107 Jan 05, 2023
Lightning FastAPI

Lightning FastAPI Lightning FastAPI framework, provides boiler plates for FastAPI based on Django Framework Explaination / | │ manage.py │ README.

Rajesh Joshi 1 Oct 15, 2021
Drop-in MessagePack support for ASGI applications and frameworks

msgpack-asgi msgpack-asgi allows you to add automatic MessagePack content negotiation to ASGI applications (Starlette, FastAPI, Quart, etc.), with a s

Florimond Manca 128 Jan 02, 2023
Пример использования GraphQL Ariadne с FastAPI и сравнение его с GraphQL Graphene FastAPI

FastAPI Ariadne Example Пример использования GraphQL Ariadne с FastAPI и сравнение его с GraphQL Graphene FastAPI - GitHub ###Запуск на локальном окру

ZeBrains Team 9 Nov 10, 2022
A FastAPI WebSocket application that makes use of ncellapp package by @hemantapkh

ncellFastAPI author: @awebisam Used FastAPI to create WS application. Ncellapp module by @hemantapkh NOTE: Not following best practices and, needs ref

Aashish Bhandari 7 Oct 01, 2021
Ansible Inventory Plugin, created to get hosts from HTTP API.

ansible-ws-inventory-plugin Ansible Inventory Plugin, created to get hosts from HTTP API. Features: Database compatible with MongoDB and Filesystem (J

Carlos Neto 0 Feb 05, 2022
A comprehensive CRUD API generator for SQLALchemy.

FastAPI Quick CRUD Introduction Advantage Constraint Getting started Installation Usage Design Path Parameter Query Parameter Request Body Upsert Intr

192 Jan 06, 2023
A utility that allows you to use DI in fastapi without Depends()

fastapi-better-di What is this ? fastapi-better-di is a utility that allows you to use DI in fastapi without Depends() Installation pip install fastap

Maxim 9 May 24, 2022
Sample FastAPI project that uses async SQLAlchemy, SQLModel, Postgres, Alembic, and Docker.

FastAPI + SQLModel + Alembic Sample FastAPI project that uses async SQLAlchemy, SQLModel, Postgres, Alembic, and Docker. Want to learn how to build th

228 Jan 02, 2023
Prometheus exporter for Starlette and FastAPI

starlette_exporter Prometheus exporter for Starlette and FastAPI. The middleware collects basic metrics: Counter: starlette_requests_total Histogram:

Steve Hillier 225 Jan 05, 2023
cookiecutter template for web API with python

Python project template for Web API with cookiecutter What's this This provides the project template including minimum test/lint/typechecking package

Hitoshi Manabe 4 Jan 28, 2021
Fastapi-ml-template - Fastapi ml template with python

FastAPI ML Template Run Web API Local $ sh run.sh # poetry run uvicorn app.mai

Yuki Okuda 29 Nov 20, 2022
Stac-fastapi built on Tile38 and Redis to support caching

stac-fastapi-caching Stac-fastapi built on Tile38 to support caching. This code is built on top of stac-fastapi-elasticsearch 0.1.0 with pyle38, a Pyt

Jonathan Healy 4 Apr 11, 2022
Prometheus exporter for several chia node statistics

prometheus-chia-exporter Prometheus exporter for several chia node statistics It's assumed that the full node, the harvester and the wallet run on the

30 Sep 19, 2022
Local Telegram Bot With FastAPI & Ngrok

An easy local telegram bot server with python, fastapi and ngrok.

Ömer Faruk Özdemir 7 Dec 25, 2022
Mixer -- Is a fixtures replacement. Supported Django, Flask, SqlAlchemy and custom python objects.

The Mixer is a helper to generate instances of Django or SQLAlchemy models. It's useful for testing and fixture replacement. Fast and convenient test-

Kirill Klenov 871 Dec 25, 2022
API using python and Fastapi framework

Welcome 👋 CFCApi is a API DEVELOPMENT PROJECT UNDER CODE FOR COMMUNITY ! Project Walkthrough 🚀 CFCApi run on Python using FASTapi Framework Docs The

Abhishek kushwaha 7 Jan 02, 2023
A rate limiter for Starlette and FastAPI

SlowApi A rate limiting library for Starlette and FastAPI adapted from flask-limiter. Note: this is alpha quality code still, the API may change, and

Laurent Savaete 562 Jan 01, 2023