A fast and durable Pub/Sub channel over Websockets. FastAPI + WebSockets + PubSub == ⚡ 💪 ❤️

Overview

pubsub

🗞️ FastAPI Websocket Pub/Sub

Tests Package Downloads

A fast and durable Pub/Sub channel over Websockets. The easiest way to create a live publish / subscribe multi-cast over the web.

Supports and tested on Python >= 3.7

Installation 🛠️

pip install fastapi_websocket_pubsub

Intro

The classic pub/sub pattern made easily accessible and scalable over the web and across your cloud in realtime; while enjoying the benefits of FastAPI (e.g. dependency injection).

FastAPI + WebSockets + PubSub == 💪 ❤️

  • Subscribe

    • Clients subscribe to topics (arbitrary strings) and receive relevant events along with structured data (serialized with Pydantic).
      # Callback to be called upon event being published on server
      async def on_event(data):
          print("We got an event! with data- ", data)
      # Subscribe for the event 
      client.subscribe("my event", on_event)
  • Publish

    • Directly from server code to connected clients.
      app = FastAPI() 
      endpoint = PubSubEndpoint()
      endpoint.register_route(app, "/pubsub")
      endpoint.publish(["my_event_topic"], data=["my", "data", 1])
    • From client to client (through the servers)
      async with PubSubClient(server_uri="ws://localhost/pubsub") as client:
          endpoint.publish(["my_event_topic"], data=["my", "data", 1])
    • Across server instances (using broadcaster and a backend medium (e.g. Redis, Kafka, ...))
      • No matter which server a client connects to - it will get the messages it subscribes to
      app = FastAPI() 
      endpoint = PubSubEndpoint(broadcaster="postgres://localhost:5432/")
      
      @app.websocket("/pubsub")
      async def websocket_rpc_endpoint(websocket: WebSocket):
          async with endpoint.broadcaster:
              await endpoint.main_loop(websocket)
      see examples/pubsub_broadcaster_server_example.py for full usage example

Usage example (server publishing following HTTP trigger):

In the code below, a client connects to the server and subscribes to a topic named "triggered". Aside from PubSub websocket, the server also exposes a regular http route, which triggers publication of the event.

Server:

import asyncio
import uvicorn
from fastapi import FastAPI
from fastapi.routing import APIRouter

from fastapi_websocket_pubsub import PubSubEndpoint
app =  FastAPI()
# Init endpoint
endpoint = PubSubEndpoint()
# register the endpoint on the app
endpoint.register_route(app, "/pubsub")
# Register a regular HTTP route
@app.get("/trigger")
async def trigger_events():
    # Upon request trigger an event
    endpoint.publish(["triggered"])

Client:

from fastapi_websocket_pubsub import PubSubClient
# Callback to be called upon event being published on server
async def on_trigger(data):
    print("Trigger URL was accessed")

async with PubSubClient(server_uri="ws://localhost/pubsub") as client:
    # Subscribe for the event 
    client.subscribe("triggered", on_event)

More Examples

What can I do with this?

The combination of Websockets, and bi-directional Pub/Sub is ideal to create realtime data propagation solution at scale over the web.

  • Update mechanism
  • Remote control mechanism
  • Data processing
  • Distributed computing
  • Realtime communications over the web

Foundations:

  • Based on fastapi-websocket-rpc for a robust realtime bidirectional channel

  • Based on broadcaster for syncing server instances

  • Server Endpoint:

    • Based on FastAPI: enjoy all the benefits of a full ASGI platform, including Async-io and dependency injections (for example to authenticate connections)

    • Based on Pydantic: easily serialize structured data as part of RPC requests and responses. Simply Pass Pydantic data models as PubSub published data to have it available as part of an event.

  • Client :

    • Based on Tenacity: allowing configurable retries to keep to connection alive

      • see WebSocketRpcClient.init's retry_config
    • Based on python websockets - a more comprehensive client than the one offered by FastAPI

Logging

fastapi-websocket-pubsub uses fastapi-websocket-rpc for logging config. It provides a helper logging module to control how it produces logs for you. See fastapi_websocket_rpc/logger.py. Use logging_config.set_mode or the 'WS_RPC_LOGGING' environment variable to choose the logging method you prefer. Or override completely via default logging config (e.g. 'logging.config.dictConfig'), all logger name start with: 'fastapi.ws_rpc.pubsub'

example:

# set RPC to log like UVICORN
from fastapi_websocket_rpc.logger import logging_config, LoggingModes
logging_config.set_mode(LoggingModes.UVICORN)

Pull requests - welcome!

  • Please include tests for new features
You might also like...
Prometheus exporter for Starlette and FastAPI

starlette_exporter Prometheus exporter for Starlette and FastAPI. The middleware collects basic metrics: Counter: starlette_requests_total Histogram:

🚀   Cookiecutter Template for FastAPI + React Projects.  Using PostgreSQL, SQLAlchemy, and Docker
🚀 Cookiecutter Template for FastAPI + React Projects. Using PostgreSQL, SQLAlchemy, and Docker

FastAPI + React · A cookiecutter template for bootstrapping a FastAPI and React project using a modern stack. Features FastAPI (Python 3.8) JWT authen

A rate limiter for Starlette and FastAPI

SlowApi A rate limiting library for Starlette and FastAPI adapted from flask-limiter. Note: this is alpha quality code still, the API may change, and

Opentracing support for Starlette and FastApi
Opentracing support for Starlette and FastApi

Starlette-OpenTracing OpenTracing support for Starlette and FastApi. Inspired by: Flask-OpenTracing OpenTracing implementations exist for major distri

FastAPI application and service structure for a more maintainable codebase

Abstracting FastAPI Services See this article for more information: https://camillovisini.com/article/abstracting-fastapi-services/ Poetry poetry inst

A rate limiter for Starlette and FastAPI

SlowApi A rate limiting library for Starlette and FastAPI adapted from flask-limiter. Note: this is alpha quality code still, the API may change, and

Prometheus exporter for Starlette and FastAPI

starlette_exporter Prometheus exporter for Starlette and FastAPI. The middleware collects basic metrics: Counter: starlette_requests_total Histogram:

Opentracing support for Starlette and FastApi
Opentracing support for Starlette and FastApi

Starlette-OpenTracing OpenTracing support for Starlette and FastApi. Inspired by: Flask-OpenTracing OpenTracing implementations exist for major distri

Full stack, modern web application generator. Using FastAPI, PostgreSQL as database, Docker, automatic HTTPS and more.
Full stack, modern web application generator. Using FastAPI, PostgreSQL as database, Docker, automatic HTTPS and more.

Full Stack FastAPI and PostgreSQL - Base Project Generator Generate a backend and frontend stack using Python, including interactive API documentation

Releases(v0.1.20)
Simple FastAPI Example : Blog API using FastAPI : Beginner Friendly

fastapi_blog FastAPI : Simple Blog API with CRUD operation Steps to run the project: git clone https://github.com/mrAvi07/fastapi_blog.git cd fastapi-

Avinash Alanjkar 1 Oct 08, 2022
Cache-house - Caching tool for python, working with Redis single instance and Redis cluster mode

Caching tool for python, working with Redis single instance and Redis cluster mo

Tural 14 Jan 06, 2022
A kedro-plugin to serve Kedro Pipelines as API

General informations Software repository Latest release Total downloads Pypi Code health Branch Tests Coverage Links Documentation Deployment Activity

Yolan Honoré-Rougé 12 Jul 15, 2022
Basic fastapi blockchain - An api based blockchain with full functionality

Basic fastapi blockchain - An api based blockchain with full functionality

1 Nov 27, 2021
SQLAlchemy Admin for Starlette/FastAPI

SQLAlchemy Admin for Starlette/FastAPI SQLAdmin is a flexible Admin interface for SQLAlchemy models. Main features include: SQLAlchemy sync/async engi

Amin Alaee 683 Jan 03, 2023
Opinionated authorization package for FastAPI

FastAPI Authorization Installation pip install fastapi-authorization Usage Currently, there are two models available: RBAC: Role-based Access Control

Marcelo Trylesinski 18 Jul 04, 2022
A Sample App to Demonstrate React Native and FastAPI Integration

React Native - Service Integration with FastAPI Backend. A Sample App to Demonstrate React Native and FastAPI Integration UI Based on NativeBase toolk

YongKi Kim 4 Nov 17, 2022
京东图片点击验证码识别

京东图片验证码识别 本项目是@yqchilde 大佬的 JDMemberCloseAccount 识别图形验证码(#45)思路验证,若你也有思路可以提交Issue和PR也可以在 @yqchilde 的 TG群 找到我 声明 本脚本只是为了学习研究使用 本脚本除了采集处理验证码图片没有其他任何功能,也

AntonVanke 37 Dec 22, 2022
FastAPI backend for Repost

Repost FastAPI This is the FastAPI implementation of the Repost API. Installation Python 3 must be installed and accessible through the use of a termi

PC 7 Jun 15, 2021
스타트업 개발자 채용

스타트업 개발자 채용 大 박람회 Seed ~ Series B에 있는 스타트업을 위한 채용정보 페이지입니다. Back-end, Frontend, Mobile 등 개발자를 대상으로 진행하고 있습니다. 해당 스타트업에 종사하시는 분뿐만 아니라 채용 관련 정보를 알고 계시다면

JuHyun Lee 58 Dec 14, 2022
FastAPI with Docker and Traefik

Dockerizing FastAPI with Postgres, Uvicorn, and Traefik Want to learn how to build this? Check out the post. Want to use this project? Development Bui

51 Jan 06, 2023
Easy and secure implementation of Azure AD for your FastAPI APIs 🔒

FastAPI-Azure-auth Azure AD Authentication for FastAPI apps made easy. 🚀 Description FastAPI is a modern, fast (high-performance), web framework for

Intility 216 Dec 27, 2022
A Python framework to build Slack apps in a flash with the latest platform features.

Bolt for Python A Python framework to build Slack apps in a flash with the latest platform features. Read the getting started guide and look at our co

SlackAPI 684 Jan 09, 2023
FastAPI IPyKernel Sandbox

FastAPI IPyKernel Sandbox This repository is a light-weight FastAPI project that is meant to provide a wrapper around IPyKernel interactions. It is in

Nick Wold 2 Oct 25, 2021
Signalling for FastAPI.

fastapi-signals Signalling for FastAPI.

Henshal B 7 May 04, 2022
flask extension for integration with the awesome pydantic package

flask extension for integration with the awesome pydantic package

249 Jan 06, 2023
Learn to deploy a FastAPI application into production DigitalOcean App Platform

Learn to deploy a FastAPI application into production DigitalOcean App Platform. This is a microservice for our Try Django 3.2 project. The goal is to extract any and all text from images using a tec

Coding For Entrepreneurs 59 Nov 29, 2022
Boilerplate code for quick docker implementation of REST API with JWT Authentication using FastAPI, PostgreSQL and PgAdmin ⭐

FRDP Boilerplate code for quick docker implementation of REST API with JWT Authentication using FastAPI, PostgreSQL and PgAdmin ⛏ . Getting Started Fe

BnademOverflow 53 Dec 29, 2022
Get MODBUS data from Sofar (K-TLX) inverter through LSW-3 or LSE module

SOFAR Inverter + LSW-3/LSE Small utility to read data from SOFAR K-TLX inverters through the Solarman (LSW-3/LSE) datalogger. Two scripts to get inver

58 Dec 29, 2022
This project shows how to serve an ONNX-optimized image classification model as a web service with FastAPI, Docker, and Kubernetes.

Deploying ML models with FastAPI, Docker, and Kubernetes By: Sayak Paul and Chansung Park This project shows how to serve an ONNX-optimized image clas

Sayak Paul 104 Dec 23, 2022