A Time Series Library for Apache Spark

Overview

Flint: A Time Series Library for Apache Spark

The ability to analyze time series data at scale is critical for the success of finance and IoT applications based on Spark. Flint is Two Sigma's implementation of highly optimized time series operations in Spark. It performs truly parallel and rich analyses on time series data by taking advantage of the natural ordering in time series data to provide locality-based optimizations.

Flint is an open source library for Spark based around the TimeSeriesRDD, a time series aware data structure, and a collection of time series utility and analysis functions that use TimeSeriesRDDs. Unlike DataFrame and Dataset, Flint's TimeSeriesRDDs can leverage the existing ordering properties of datasets at rest and the fact that almost all data manipulations and analysis over these datasets respect their temporal ordering properties. It differs from other time series efforts in Spark in its ability to efficiently compute across panel data or on large scale high frequency data.

Documentation Status

Requirements

Dependency Version
Spark Version 2.3 and 2.4
Scala Version 2.12
Python Version 3.5 and above

How to install

Scala artifact is published in maven central:

https://mvnrepository.com/artifact/com.twosigma/flint

Python artifact is published in PyPi:

https://pypi.org/project/ts-flint

Note you will need both Scala and Python artifact to use Flint with PySpark.

How to build

To build from source:

Scala (in top-level dir):

sbt assemblyNoTest

Python (in python subdir):

python setup.py install

or

pip install .

Python bindings

The python bindings for Flint, including quickstart instructions, are documented at python/README.md. API documentation is available at http://ts-flint.readthedocs.io/en/latest/.

Getting Started

Starting Point: TimeSeriesRDD and TimeSeriesDataFrame

The entry point into all functionalities for time series analysis in Flint is TimeSeriesRDD (for Scala) and TimeSeriesDataFrame (for Python). In high level, a TimeSeriesRDD contains an OrderedRDD which could be used to represent a sequence of ordering key-value pairs. A TimeSeriesRDD uses Long to represent timestamps in nanoseconds since epoch as keys and InternalRows as values for OrderedRDD to represent a time series data set.

Create TimeSeriesRDD

Applications can create a TimeSeriesRDD from an existing RDD, from an OrderedRDD, from a DataFrame, or from a single csv file.

As an example, the following creates a TimeSeriesRDD from a gzipped CSV file with header and specific datetime format.

import com.twosigma.flint.timeseries.CSV
val tsRdd = CSV.from(
  sqlContext,
  "file://foo/bar/data.csv",
  header = true,
  dateFormat = "yyyyMMdd HH:mm:ss.SSS",
  codec = "gzip",
  sorted = true
)

To create a TimeSeriesRDD from a DataFrame, you have to make sure the DataFrame contains a column named "time" of type LongType.

import com.twosigma.flint.timeseries.TimeSeriesRDD
import scala.concurrent.duration._
val df = ... // A DataFrame whose rows have been sorted by their timestamps under "time" column
val tsRdd = TimeSeriesRDD.fromDF(dataFrame = df)(isSorted = true, timeUnit = MILLISECONDS)

One could also create a TimeSeriesRDD from a RDD[Row] or an OrderedRDD[Long, Row] by providing a schema, e.g.

import com.twosigma.flint.timeseries._
import scala.concurrent.duration._
val rdd = ... // An RDD whose rows have sorted by their timestamps
val tsRdd = TimeSeriesRDD.fromRDD(
  rdd,
  schema = Schema("time" -> LongType, "price" -> DoubleType)
)(isSorted = true,
  timeUnit = MILLISECONDS
)

It is also possible to create a TimeSeriesRDD from a dataset stored as parquet format file(s). The TimeSeriesRDD.fromParquet() function provides the option to specify which columns and/or the time range you are interested, e.g.

import com.twosigma.flint.timeseries._
import scala.concurrent.duration._
val tsRdd = TimeSeriesRDD.fromParquet(
  sqlContext,
  path = "hdfs://foo/bar/"
)(isSorted = true,
  timeUnit = MILLISECONDS,
  columns = Seq("time", "id", "price"),  // By default, null for all columns
  begin = "20100101",                    // By default, null for no boundary at begin
  end = "20150101"                       // By default, null for no boundary at end
)

Group functions

A group function is to group rows with nearby (or exactly the same) timestamps.

  • groupByCycle A function to group rows within a cycle, i.e. rows with exactly the same timestamps. For example,
val priceTSRdd = ...
// A TimeSeriesRDD with columns "time" and "price"
// time  price
// -----------
// 1000L 1.0
// 1000L 2.0
// 2000L 3.0
// 2000L 4.0
// 2000L 5.0

val results = priceTSRdd.groupByCycle()
// time  rows
// ------------------------------------------------
// 1000L [[1000L, 1.0], [1000L, 2.0]]
// 2000L [[2000L, 3.0], [2000L, 4.0], [2000L, 5.0]]
  • groupByInterval A function to group rows whose timestamps fall into an interval. Intervals could be defined by another TimeSeriesRDD. Its timestamps will be used to defined intervals, i.e. two sequential timestamps define an interval. For example,
val priceTSRdd = ...
// A TimeSeriesRDD with columns "time" and "price"
// time  price
// -----------
// 1000L 1.0
// 1500L 2.0
// 2000L 3.0
// 2500L 4.0

val clockTSRdd = ...
// A TimeSeriesRDD with only column "time"
// time
// -----
// 1000L
// 2000L
// 3000L

val results = priceTSRdd.groupByInterval(clockTSRdd)
// time  rows
// ----------------------------------
// 1000L [[1000L, 1.0], [1500L, 2.0]]
// 2000L [[2000L, 3.0], [2500L, 4.0]]
  • addWindows For each row, this function adds a new column whose value for a row is a list of rows within its window.
val priceTSRdd = ...
// A TimeSeriesRDD with columns "time" and "price"
// time  price
// -----------
// 1000L 1.0
// 1500L 2.0
// 2000L 3.0
// 2500L 4.0

val result = priceTSRdd.addWindows(Window.pastAbsoluteTime("1000ns"))
// time  price window_past_1000ns
// ------------------------------------------------------
// 1000L 1.0   [[1000L, 1.0]]
// 1500L 2.0   [[1000L, 1.0], [1500L, 2.0]]
// 2000L 3.0   [[1000L, 1.0], [1500L, 2.0], [2000L, 3.0]]
// 2500L 4.0   [[1500L, 2.0], [2000L, 3.0], [2500L, 4.0]]

Temporal Join Functions

A temporal join function is a join function defined by a matching criteria over time. A tolerance in temporal join matching criteria specifies how much it should look past or look futue.

  • leftJoin A function performs the temporal left-join to the right TimeSeriesRDD, i.e. left-join using inexact timestamp matches. For each row in the left, append the most recent row from the right at or before the same time. An example to join two TimeSeriesRDDs is as follows.
val leftTSRdd = ...
val rightTSRdd = ...
val result = leftTSRdd.leftJoin(rightTSRdd, tolerance = "1day")
  • futureLeftJoin A function performs the temporal future left-join to the right TimeSeriesRDD, i.e. left-join using inexact timestamp matches. For each row in the left, appends the closest future row from the right at or after the same time.
val result = leftTSRdd.futureLeftJoin(rightTSRdd, tolerance = "1day")

Summarize Functions

Summarize functions are the functions to apply summarizer(s) to rows within a certain period, like cycle, interval, windows, etc.

  • summarizeCycles A function computes aggregate statistics of rows that are within a cycle, i.e. rows share a timestamp.
val volTSRdd = ...
// A TimeSeriesRDD with columns "time", "id", and "volume"
// time  id volume
// ------------
// 1000L 1  100
// 1000L 2  200
// 2000L 1  300
// 2000L 2  400

val result = volTSRdd.summarizeCycles(Summary.sum("volume"))
// time  volume_sum
// ----------------
// 1000L 300
// 2000L 700

Similarly, we could summarize over intervals, windows, or the whole time series data set. See

  • summarizeIntervals
  • summarizeWindows
  • addSummaryColumns

One could check timeseries.summarize.summarizer for different kinds of summarizer(s), like ZScoreSummarizer, CorrelationSummarizer, NthCentralMomentSummarizer etc.

Contributing

In order to accept your code contributions, please fill out the appropriate Contributor License Agreement in the cla folder and submit it to [email protected].

Disclaimer

Apache Spark is a trademark of The Apache Software Foundation. The Apache Software Foundation is not affiliated, endorsed, connected, sponsored or otherwise associated in any way to Two Sigma, Flint, or this website in any manner.

© Two Sigma Open Source, LLC

Owner
Two Sigma
Two Sigma is a financial sciences company. Our scientists use rigorous inquiry, data analysis, and invention to solve tough challenges across financial services
Two Sigma
Conducted ANOVA and Logistic regression analysis using matplot library to visualize the result.

Intro-to-Data-Science Conducted ANOVA and Logistic regression analysis. Project ANOVA The main aim of this project is to perform One-Way ANOVA analysi

Chris Yuan 1 Feb 06, 2022
A Python-based application demonstrating various search algorithms, namely Depth-First Search (DFS), Breadth-First Search (BFS), and A* Search (Manhattan Distance Heuristic)

A Python-based application demonstrating various search algorithms, namely Depth-First Search (DFS), Breadth-First Search (BFS), and the A* Search (using the Manhattan Distance Heuristic)

17 Aug 14, 2022
A comprehensive set of fairness metrics for datasets and machine learning models, explanations for these metrics, and algorithms to mitigate bias in datasets and models.

AI Fairness 360 (AIF360) The AI Fairness 360 toolkit is an extensible open-source library containg techniques developed by the research community to h

1.9k Jan 06, 2023
Python/Sage Tool for deriving Scattering Matrices for WDF R-Adaptors

R-Solver A Python tools for deriving R-Type adaptors for Wave Digital Filters. This code is not quite production-ready. If you are interested in contr

8 Sep 19, 2022
A collection of video resources for machine learning

Machine Learning Videos This is a collection of recorded talks at machine learning conferences, workshops, seminars, summer schools, and miscellaneous

Dustin Tran 1.5k Dec 29, 2022
Responsible AI Workshop: a series of tutorials & walkthroughs to illustrate how put responsible AI into practice

Responsible AI Workshop Responsible innovation is top of mind. As such, the tech industry as well as a growing number of organizations of all kinds in

Microsoft 9 Sep 14, 2022
Python library which makes it possible to dynamically mask/anonymize data using JSON string or python dict rules in a PySpark environment.

pyspark-anonymizer Python library which makes it possible to dynamically mask/anonymize data using JSON string or python dict rules in a PySpark envir

6 Jun 30, 2022
A project based example of Data pipelines, ML workflow management, API endpoints and Monitoring.

MLOps template with examples for Data pipelines, ML workflow management, API development and Monitoring.

Utsav 33 Dec 03, 2022
Repository for DCA0305, an undergraduate course about Machine Learning Workflows and Pipelines

Federal University of Rio Grande do Norte Technology Center Department of Computer Engineering and Automation Machine Learning Based Systems Design Re

Ivanovitch Silva 81 Oct 18, 2022
Merlion: A Machine Learning Framework for Time Series Intelligence

Merlion is a Python library for time series intelligence. It provides an end-to-end machine learning framework that includes loading and transforming data, building and training models, post-processi

Salesforce 2.8k Jan 05, 2023
This repo implements a Topological SLAM: Deep Visual Odometry with Long Term Place Recognition (Loop Closure Detection)

This repo implements a topological SLAM system. Deep Visual Odometry (DF-VO) and Visual Place Recognition are combined to form the topological SLAM system.

Best of Australian Centre for Robotic Vision (ACRV) 32 Jun 23, 2022
Price Prediction model is used to develop an LSTM model to predict the future market price of Bitcoin and Ethereum.

Price Prediction model is used to develop an LSTM model to predict the future market price of Bitcoin and Ethereum.

2 Jun 14, 2022
Mortality risk prediction for COVID-19 patients using XGBoost models

Mortality risk prediction for COVID-19 patients using XGBoost models Using demographic and lab test data received from the HM Hospitales in Spain, I b

1 Jan 19, 2022
Greykite: A flexible, intuitive and fast forecasting library

The Greykite library provides flexible, intuitive and fast forecasts through its flagship algorithm, Silverkite.

LinkedIn 1.4k Jan 15, 2022
Arquivos do curso online sobre a estatística voltada para ciência de dados e aprendizado de máquina.

Estatistica para Ciência de Dados e Machine Learning Arquivos do curso online sobre a estatística voltada para ciência de dados e aprendizado de máqui

Renan Barbosa 1 Jan 10, 2022
Library for machine learning stacking generalization.

stacked_generalization Implemented machine learning *stacking technic[1]* as handy library in Python. Feature weighted linear stacking is also availab

114 Jul 19, 2022
pymc-learn: Practical Probabilistic Machine Learning in Python

pymc-learn: Practical Probabilistic Machine Learning in Python Contents: Github repo What is pymc-learn? Quick Install Quick Start Index What is pymc-

pymc-learn 196 Dec 07, 2022
Solve automatic numerical differentiation problems in one or more variables.

numdifftools The numdifftools library is a suite of tools written in _Python to solve automatic numerical differentiation problems in one or more vari

Per A. Brodtkorb 181 Dec 16, 2022
We have a dataset of user performances. The project is to develop a machine learning model that will predict the salaries of baseball players.

Salary-Prediction-with-Machine-Learning 1. Business Problem Can a machine learning project be implemented to estimate the salaries of baseball players

Ayşe Nur Türkaslan 9 Oct 14, 2022
A toolkit for geo ML data processing and model evaluation (fork of solaris)

An open source ML toolkit for overhead imagery. This is a beta version of lunular which may continue to develop. Please report any bugs through issues

Ryan Avery 4 Nov 04, 2021