An ETL framework + Monitoring UI/API (experimental project for learning purposes)

Related tags

Data Analysisfastlane
Overview

Fastlane

An ETL framework for building pipelines, and Flask based web API/UI for monitoring pipelines.

Project structure

fastlane
|- fastlane: (ETL framework)
|- fastlane_web: (web API/UI for monitoring pipelines)
   |- migrations: (database migrations)
   |- web_api: Flask backend API
   |- web_ui: TBD

Install

  1. Clone the repository
  2. pip install -e .

Example

fastlane --source=mysql --target=s3 --config=examples/mysql_to_athena_example.json

--source: The pipeline's source type (mysql, bigquery, mongodb are only implemented sources so far)

--target: The pipeline's target type (s3, influxdb, mysql, firehose are only implemented targets so far)

--transform: The pipeline's tranform type (default is the only implemented transform so far)

--config: The path to JSON configuration file for the pipeline

--logs_to_slack: Send error logs to slack

--logs_to_cloudwatch: Send logs to cloudwatch

--logs_to_file: Send logs to a file

Extending the framework

The ETL framework has 4 concepts:

Source

The base class fastlane.source.Source provides basic functionality, and defines a standard interface for extracting data from a particular source type. An instance of Source is responsible only for extracting data from source and returning as a python list of dictionaries.

Implementations of the Source base class must fulfill the following functions at minimum:

str: """Return a string describing type of source this is, for example mysql or bigquery""" @classmethod def configuration_schema(cls) -> SourceConfigSchema: """Return a marshmallow schema inherited from SourceConfigSchema base schema. This schema is used to validate the sources configuration, so all possible fields should be covered in schema returned here."""">
from fastlane.source import Source, SourceConfigSchema
import fastlane.utils as utils


class SourceImpl(Source):
    ...

    def extract(self) -> List[dict]:
        """This function should retrieve data from the source and return it as a list of dictionaries.
            The Source class is an iterator, and this function is called on each iteration. 
            The iterator stops (and source worker exits) when this function returns an empty list. 
            So when there are no more records to fetch, this function should return [].
        """

    @utils.classproperty
    def source_type(self) -> str:
        """Return a string describing type of source this is, for example mysql or bigquery"""

    @classmethod
    def configuration_schema(cls) -> SourceConfigSchema:
        """Return a marshmallow schema inherited from SourceConfigSchema base schema.
            This schema is used to validate the sources configuration, so all possible fields should be covered in
            schema returned here."""

Example implementation of Source interface is in fastlane.sources.impl.source_mysql

Implementation Coverage

  • MySQL
  • BigQuery
  • MongoDB

Transform

The base class fastlane.transform.Transform provides basic functionality, and defines a standard interface for transforming data to be ready for target. An instance of Transform is responsible only for transforming data from source into a format compatible with target.

Implementations of the Transform base class must fulfill the following functions at minimum:

str: """Return a string describing type of transform this is.""" @classmethod def configuration_schema(cls) -> TransformConfigSchema: """Return a marshmallow schema inherited from TransformConfigSchema base schema. This schema is used to validate the transforms configuration, so all possible fields should be covered in schema returned here."""">
from fastlane.transform import Transform, TransformConfigSchema
import fastlane.utils as utils


class TransformImpl(Transform):
    ...

    def transform(self, df: pd.DataFrame) -> pd.DataFrame:
        """This function should run any transformation on the dataframe and return the transformed dataframe.
            Ideally the same dataframe should by transformed on in place, but if a new dataframe needs to be created, 
            Make sure to remove the old dataframes from memory.
            This function is called by the transform worker every time a new batch of source data has been received.
        """

    @utils.classproperty
    def transform_type(self) -> str:
        """Return a string describing type of transform this is."""

    @classmethod
    def configuration_schema(cls) -> TransformConfigSchema:
        """Return a marshmallow schema inherited from TransformConfigSchema base schema.
            This schema is used to validate the transforms configuration, so all possible fields should be covered in
            schema returned here."""

Example implementation of Transform interface is in fastlane.transform.impl.transform_default

Target

The base class fastlane.target.Target provides basic functionality, and defines a standard interface for loading data into a destination. An instance of Target is responsible only for storing data which has been transformed into a destination.

Implementations of the Target base class must fulfill the following functions at minimum:

str: """Return a string describing type of target this is.""" @classmethod def target_id(cls, configuration: dict) -> str: """Return a unique identifier from this specific targets configuration. The id should be unique across the whole target destination. For example the target_id for mysql target is built from table and database"""">
from fastlane.target import Target, TargetConfigSchema
import fastlane.utils as utils


class TargetImpl(Target):
    ...

    def load(self, df: pd.DataFrame):
        """This function is called by the target worker every time a new batch of transformed data has been received.
            This function should store the dataframe in whatever destination it implements.
        """

    def get_offset(self):
        """Get the largest key which has been stored in the target. Used from incrementally loaded tables."""

    @utils.classproperty
    def target_type(self) -> str:
        """Return a string describing type of target this is."""

    @classmethod
    def target_id(cls, configuration: dict) -> str:
        """Return a unique identifier from this specific targets configuration. 
            The id should be unique across the whole target destination. 
            For example the target_id for mysql target is built from table and database"""

Example implementation of Target interface is in fastlane.target.impl.target_athena

Implementation Coverage

  • S3
  • InfluxDB
  • MySQL
  • Firehose

Pipeline

The fastlane.pipeline.Pipeline class is what drives the ETL process. It manages the source, transform and target processes, and runs monitoring processes which give insight into the performance/state of the running pipeline.

The Pipeline class works by spawning a number of worker threads for each stage of the ETL process (source, transform, target). Each stage passes work to the next via Queues:

        _________________        Queue       ____________________         Queue        ________________    load
extract | source_worker | -->  [|.|.|.|] -->| transform_worker_1 | -->  [|.|.|.|] --> | target_worker_1 | ------>
------> |_______________|                    --------------------                      ----------------    load
                                         -->| transform_worker_2 |                --> | target_worker_2 | ------>
                                             --------------------                      ----------------    load
                                         -->| transform_worker_3 |                --> | target_worker_3 | ------>
                                             --------------------                      ----------------    load
                                                                                  --> | target_worker_4 | ------>
                                                                                       ----------------

Throughout the ETL process, few small monitoring processes are collecting metrics at periodic intervals such as memory usage, records loaded per second, total records loaded, queue sizes. See fastlane.monitoring.pipeline_monitor for more details on how thats done.

Pipelines Web API

This project includes a Pipeline web API built w Flask which is used as a backend for collecting and storing the metrics from running Pipelines, as well as to serve the Pipeline monitoring web UI.

Resources

CRUD on pipelines

Methods

/api/pipeline

POST
GET
DELETE

/api/pipelines

list pipelines

Methods
GET

/api/pipeline/run

invocation of a particular pipeline

Methods
POST
PUT
GET
DELETE

/api/pipeline/run/latest

latest invocation of a particular pipeline.

Methods
GET

/api/pipeline/run/rps

records per second metrics for a particuar pipeline run.

Methods
GET
POST

/api/pipeline/run/memory_usage

memory usage metrics for a particular pipeline run.

Methods
GET
POST

/api/pipeline/run/logs

logs (from cloudwatch) for a particular pipeline run.

Methods
GET
POST

Pipeline Web UI

Will pvoide a user interface to moniter currently running pipelines, as well as debug and analyze previously invoked pipelines.

Owner
Dan Katz
Seasoned software engineer working in prototyping, architecting, developing and testing full stack applications
Dan Katz
Exploring the Top ML and DL GitHub Repositories

This repository contains my work related to my project where I scraped data on the most popular machine learning and deep learning GitHub repositories in order to further visualize and analyze it.

Nico Van den Hooff 17 Aug 21, 2022
ASOUL直播间弹幕抓取&&数据分析

ASOUL直播间弹幕抓取&&数据分析(更新中) 这些文件用于爬取ASOUL直播间的弹幕(其他直播间也可以)和其他信息,以及简单的数据分析生成。

159 Dec 10, 2022
A real-time financial data streaming pipeline and visualization platform using Apache Kafka, Cassandra, and Bokeh.

Realtime Financial Market Data Visualization and Analysis Introduction This repo shows my project about real-time stock data pipeline. All the code is

6 Sep 07, 2022
An orchestration platform for the development, production, and observation of data assets.

Dagster An orchestration platform for the development, production, and observation of data assets. Dagster lets you define jobs in terms of the data f

Dagster 6.2k Jan 08, 2023
PySpark Structured Streaming ROS Kafka ApacheSpark Cassandra

PySpark-Structured-Streaming-ROS-Kafka-ApacheSpark-Cassandra The purpose of this project is to demonstrate a structured streaming pipeline with Apache

Zekeriyya Demirci 5 Nov 13, 2022
Minimal working example of data acquisition with nidaqmx python API

Data Aquisition using NI-DAQmx python API Based on this project It is a minimal working example for data acquisition using the NI-DAQmx python API. It

Pablo 1 Nov 05, 2021
Create HTML profiling reports from pandas DataFrame objects

Pandas Profiling Documentation | Slack | Stack Overflow Generates profile reports from a pandas DataFrame. The pandas df.describe() function is great

10k Jan 01, 2023
A probabilistic programming language in TensorFlow. Deep generative models, variational inference.

Edward is a Python library for probabilistic modeling, inference, and criticism. It is a testbed for fast experimentation and research with probabilis

Blei Lab 4.7k Jan 09, 2023
Python library for creating data pipelines with chain functional programming

PyFunctional Features PyFunctional makes creating data pipelines easy by using chained functional operators. Here are a few examples of what it can do

Pedro Rodriguez 2.1k Jan 05, 2023
A simplified prototype for an as-built tracking database with API

Asbuilt_Trax A simplified prototype for an as-built tracking database with API The purpose of this project is to: Model a database that tracks constru

Ryan Pemberton 1 Jan 31, 2022
Used for data processing in machine learning, and help us to construct ML model more easily from scratch

Used for data processing in machine learning, and help us to construct ML model more easily from scratch. Can be used in linear model, logistic regression model, and decision tree.

ShawnWang 0 Jul 05, 2022
Spectral Analysis in Python

SPECTRUM : Spectral Analysis in Python contributions: Please join https://github.com/cokelaer/spectrum contributors: https://github.com/cokelaer/spect

Thomas Cokelaer 280 Dec 16, 2022
Methylation/modified base calling separated from basecalling.

Remora Methylation/modified base calling separated from basecalling. Remora primarily provides an API to call modified bases for basecaller programs s

Oxford Nanopore Technologies 72 Jan 05, 2023
Python tools for querying and manipulating BIDS datasets.

PyBIDS is a Python library to centralize interactions with datasets conforming BIDS (Brain Imaging Data Structure) format.

Brain Imaging Data Structure 180 Dec 18, 2022
A distributed block-based data storage and compute engine

Nebula is an extremely-fast end-to-end interactive big data analytics solution. Nebula is designed as a high-performance columnar data storage and tabular OLAP engine.

Columns AI 131 Dec 26, 2022
Full automated data pipeline using docker images

Create postgres tables from CSV files This first section is only relate to creating tables from CSV files using postgres container alone. Just one of

1 Nov 21, 2021
Open-source Laplacian Eigenmaps for dimensionality reduction of large data in python.

Fast Laplacian Eigenmaps in python Open-source Laplacian Eigenmaps for dimensionality reduction of large data in python. Comes with an wrapper for NMS

17 Jul 09, 2022
Using approximate bayesian posteriors in deep nets for active learning

Bayesian Active Learning (BaaL) BaaL is an active learning library developed at ElementAI. This repository contains techniques and reusable components

ElementAI 687 Dec 25, 2022
Parses data out of your Google Takeout (History, Activity, Youtube, Locations, etc...)

google_takeout_parser parses both the Historical HTML and new JSON format for Google Takeouts caches individual takeout results behind cachew merge mu

Sean Breckenridge 27 Dec 28, 2022
An Integrated Experimental Platform for time series data anomaly detection.

Curve Sorry to tell contributors and users. We decided to archive the project temporarily due to the employee work plan of collaborators. There are no

Baidu 486 Dec 21, 2022