A Time Series Library for Apache Spark

Overview

Flint: A Time Series Library for Apache Spark

The ability to analyze time series data at scale is critical for the success of finance and IoT applications based on Spark. Flint is Two Sigma's implementation of highly optimized time series operations in Spark. It performs truly parallel and rich analyses on time series data by taking advantage of the natural ordering in time series data to provide locality-based optimizations.

Flint is an open source library for Spark based around the TimeSeriesRDD, a time series aware data structure, and a collection of time series utility and analysis functions that use TimeSeriesRDDs. Unlike DataFrame and Dataset, Flint's TimeSeriesRDDs can leverage the existing ordering properties of datasets at rest and the fact that almost all data manipulations and analysis over these datasets respect their temporal ordering properties. It differs from other time series efforts in Spark in its ability to efficiently compute across panel data or on large scale high frequency data.

Documentation Status

Requirements

Dependency Version
Spark Version 2.3 and 2.4
Scala Version 2.12
Python Version 3.5 and above

How to install

Scala artifact is published in maven central:

https://mvnrepository.com/artifact/com.twosigma/flint

Python artifact is published in PyPi:

https://pypi.org/project/ts-flint

Note you will need both Scala and Python artifact to use Flint with PySpark.

How to build

To build from source:

Scala (in top-level dir):

sbt assemblyNoTest

Python (in python subdir):

python setup.py install

or

pip install .

Python bindings

The python bindings for Flint, including quickstart instructions, are documented at python/README.md. API documentation is available at http://ts-flint.readthedocs.io/en/latest/.

Getting Started

Starting Point: TimeSeriesRDD and TimeSeriesDataFrame

The entry point into all functionalities for time series analysis in Flint is TimeSeriesRDD (for Scala) and TimeSeriesDataFrame (for Python). In high level, a TimeSeriesRDD contains an OrderedRDD which could be used to represent a sequence of ordering key-value pairs. A TimeSeriesRDD uses Long to represent timestamps in nanoseconds since epoch as keys and InternalRows as values for OrderedRDD to represent a time series data set.

Create TimeSeriesRDD

Applications can create a TimeSeriesRDD from an existing RDD, from an OrderedRDD, from a DataFrame, or from a single csv file.

As an example, the following creates a TimeSeriesRDD from a gzipped CSV file with header and specific datetime format.

import com.twosigma.flint.timeseries.CSV
val tsRdd = CSV.from(
  sqlContext,
  "file://foo/bar/data.csv",
  header = true,
  dateFormat = "yyyyMMdd HH:mm:ss.SSS",
  codec = "gzip",
  sorted = true
)

To create a TimeSeriesRDD from a DataFrame, you have to make sure the DataFrame contains a column named "time" of type LongType.

import com.twosigma.flint.timeseries.TimeSeriesRDD
import scala.concurrent.duration._
val df = ... // A DataFrame whose rows have been sorted by their timestamps under "time" column
val tsRdd = TimeSeriesRDD.fromDF(dataFrame = df)(isSorted = true, timeUnit = MILLISECONDS)

One could also create a TimeSeriesRDD from a RDD[Row] or an OrderedRDD[Long, Row] by providing a schema, e.g.

import com.twosigma.flint.timeseries._
import scala.concurrent.duration._
val rdd = ... // An RDD whose rows have sorted by their timestamps
val tsRdd = TimeSeriesRDD.fromRDD(
  rdd,
  schema = Schema("time" -> LongType, "price" -> DoubleType)
)(isSorted = true,
  timeUnit = MILLISECONDS
)

It is also possible to create a TimeSeriesRDD from a dataset stored as parquet format file(s). The TimeSeriesRDD.fromParquet() function provides the option to specify which columns and/or the time range you are interested, e.g.

import com.twosigma.flint.timeseries._
import scala.concurrent.duration._
val tsRdd = TimeSeriesRDD.fromParquet(
  sqlContext,
  path = "hdfs://foo/bar/"
)(isSorted = true,
  timeUnit = MILLISECONDS,
  columns = Seq("time", "id", "price"),  // By default, null for all columns
  begin = "20100101",                    // By default, null for no boundary at begin
  end = "20150101"                       // By default, null for no boundary at end
)

Group functions

A group function is to group rows with nearby (or exactly the same) timestamps.

  • groupByCycle A function to group rows within a cycle, i.e. rows with exactly the same timestamps. For example,
val priceTSRdd = ...
// A TimeSeriesRDD with columns "time" and "price"
// time  price
// -----------
// 1000L 1.0
// 1000L 2.0
// 2000L 3.0
// 2000L 4.0
// 2000L 5.0

val results = priceTSRdd.groupByCycle()
// time  rows
// ------------------------------------------------
// 1000L [[1000L, 1.0], [1000L, 2.0]]
// 2000L [[2000L, 3.0], [2000L, 4.0], [2000L, 5.0]]
  • groupByInterval A function to group rows whose timestamps fall into an interval. Intervals could be defined by another TimeSeriesRDD. Its timestamps will be used to defined intervals, i.e. two sequential timestamps define an interval. For example,
val priceTSRdd = ...
// A TimeSeriesRDD with columns "time" and "price"
// time  price
// -----------
// 1000L 1.0
// 1500L 2.0
// 2000L 3.0
// 2500L 4.0

val clockTSRdd = ...
// A TimeSeriesRDD with only column "time"
// time
// -----
// 1000L
// 2000L
// 3000L

val results = priceTSRdd.groupByInterval(clockTSRdd)
// time  rows
// ----------------------------------
// 1000L [[1000L, 1.0], [1500L, 2.0]]
// 2000L [[2000L, 3.0], [2500L, 4.0]]
  • addWindows For each row, this function adds a new column whose value for a row is a list of rows within its window.
val priceTSRdd = ...
// A TimeSeriesRDD with columns "time" and "price"
// time  price
// -----------
// 1000L 1.0
// 1500L 2.0
// 2000L 3.0
// 2500L 4.0

val result = priceTSRdd.addWindows(Window.pastAbsoluteTime("1000ns"))
// time  price window_past_1000ns
// ------------------------------------------------------
// 1000L 1.0   [[1000L, 1.0]]
// 1500L 2.0   [[1000L, 1.0], [1500L, 2.0]]
// 2000L 3.0   [[1000L, 1.0], [1500L, 2.0], [2000L, 3.0]]
// 2500L 4.0   [[1500L, 2.0], [2000L, 3.0], [2500L, 4.0]]

Temporal Join Functions

A temporal join function is a join function defined by a matching criteria over time. A tolerance in temporal join matching criteria specifies how much it should look past or look futue.

  • leftJoin A function performs the temporal left-join to the right TimeSeriesRDD, i.e. left-join using inexact timestamp matches. For each row in the left, append the most recent row from the right at or before the same time. An example to join two TimeSeriesRDDs is as follows.
val leftTSRdd = ...
val rightTSRdd = ...
val result = leftTSRdd.leftJoin(rightTSRdd, tolerance = "1day")
  • futureLeftJoin A function performs the temporal future left-join to the right TimeSeriesRDD, i.e. left-join using inexact timestamp matches. For each row in the left, appends the closest future row from the right at or after the same time.
val result = leftTSRdd.futureLeftJoin(rightTSRdd, tolerance = "1day")

Summarize Functions

Summarize functions are the functions to apply summarizer(s) to rows within a certain period, like cycle, interval, windows, etc.

  • summarizeCycles A function computes aggregate statistics of rows that are within a cycle, i.e. rows share a timestamp.
val volTSRdd = ...
// A TimeSeriesRDD with columns "time", "id", and "volume"
// time  id volume
// ------------
// 1000L 1  100
// 1000L 2  200
// 2000L 1  300
// 2000L 2  400

val result = volTSRdd.summarizeCycles(Summary.sum("volume"))
// time  volume_sum
// ----------------
// 1000L 300
// 2000L 700

Similarly, we could summarize over intervals, windows, or the whole time series data set. See

  • summarizeIntervals
  • summarizeWindows
  • addSummaryColumns

One could check timeseries.summarize.summarizer for different kinds of summarizer(s), like ZScoreSummarizer, CorrelationSummarizer, NthCentralMomentSummarizer etc.

Contributing

In order to accept your code contributions, please fill out the appropriate Contributor License Agreement in the cla folder and submit it to [email protected].

Disclaimer

Apache Spark is a trademark of The Apache Software Foundation. The Apache Software Foundation is not affiliated, endorsed, connected, sponsored or otherwise associated in any way to Two Sigma, Flint, or this website in any manner.

© Two Sigma Open Source, LLC

Owner
Two Sigma
Two Sigma is a financial sciences company. Our scientists use rigorous inquiry, data analysis, and invention to solve tough challenges across financial services
Two Sigma
Relevance Vector Machine implementation using the scikit-learn API.

scikit-rvm scikit-rvm is a Python module implementing the Relevance Vector Machine (RVM) machine learning technique using the scikit-learn API. Quicks

James Ritchie 204 Nov 18, 2022
使用数学和计算机知识投机倒把

偷鸡不成项目集锦 坦率地讲,涉及金融市场的好策略如果公开,必然导致使用的人多,最后策略变差。所以这个仓库只收集我目前失败了的案例。 加密货币组合套利 中国体育彩票预测 我赚不上钱的项目,也许可以帮助更有能力的人去赚钱。

Roy 28 Dec 29, 2022
Evidently helps analyze machine learning models during validation or production monitoring

Evidently helps analyze machine learning models during validation or production monitoring. The tool generates interactive visual reports and JSON profiles from pandas DataFrame or csv files. Current

Evidently AI 3.1k Jan 07, 2023
Cohort Intelligence used to solve various mathematical functions

Cohort-Intelligence-for-Mathematical-Functions About Cohort Intelligence : Cohort Intelligence ( CI ) is an optimization technique. It attempts to mod

Aayush Khandekar 2 Oct 25, 2021
Reproducibility and Replicability of Web Measurement Studies

Reproducibility and Replicability of Web Measurement Studies This repository holds additional material to the paper "Reproducibility and Replicability

6 Dec 31, 2022
A linear regression model for house price prediction

Linear_Regression_Model A linear regression model for house price prediction. This code is using these packages, so please make sure your have install

ShawnWang 1 Nov 29, 2021
Examples and code for the Practical Machine Learning workshop series

Practical Machine Learning Workshop Series Practical Machine Learning for Quantitative Finance Post conference workshop at the WBS Spring Conference D

CompatibL 21 Jun 25, 2022
A flexible CTF contest platform for coming PKU GeekGame events

Project Guiding Star: the Backend A flexible CTF contest platform for coming PKU GeekGame events Still in early development Highlights Not configurabl

PKU GeekGame 14 Dec 15, 2022
Project to deploy a machine learning model based on Titanic dataset from Kaggle

kaggle_titanic_deploy Project to deploy a machine learning model based on Titanic dataset from Kaggle In this project we used the Titanic dataset from

Vivian Yamassaki 8 May 23, 2022
Predict profitability of trades based on indicator buy / sell signals

Predict profitability of trades based on indicator buy / sell signals Trade profitability analysis for trades based on various indicators signals: MAC

Tomasz Porzycki 1 Dec 15, 2021
Code for the TCAV ML interpretability project

Interpretability Beyond Feature Attribution: Quantitative Testing with Concept Activation Vectors (TCAV) Been Kim, Martin Wattenberg, Justin Gilmer, C

552 Dec 27, 2022
WAGMA-SGD is a decentralized asynchronous SGD for distributed deep learning training based on model averaging.

WAGMA-SGD is a decentralized asynchronous SGD based on wait-avoiding group model averaging. The synchronization is relaxed by making the collectives externally-triggerable, namely, a collective can b

Shigang Li 6 Jun 18, 2022
Timeseries analysis for neuroscience data

=================================================== Nitime: timeseries analysis for neuroscience data ===============================================

NIPY developers 212 Dec 09, 2022
cleanlab is the data-centric ML ops package for machine learning with noisy labels.

cleanlab is the data-centric ML ops package for machine learning with noisy labels. cleanlab cleans labels and supports finding, quantifying, and lear

Cleanlab 51 Nov 28, 2022
High performance implementation of Extreme Learning Machines (fast randomized neural networks).

High Performance toolbox for Extreme Learning Machines. Extreme learning machines (ELM) are a particular kind of Artificial Neural Networks, which sol

Anton Akusok 174 Dec 07, 2022
Model factory is a ML training platform to help engineers to build ML models at scale

Model Factory Machine learning today is powering many businesses today, e.g., search engine, e-commerce, news or feed recommendation. Training high qu

16 Sep 23, 2022
A Collection of Conference & School Notes in Machine Learning 🦄📝🎉

Machine Learning Conference & Summer School Notes. 🦄📝🎉

558 Dec 28, 2022
My capstone project for Udacity's Machine Learning Nanodegree

MLND-Capstone My capstone project for Udacity's Machine Learning Nanodegree Lane Detection with Deep Learning In this project, I use a deep learning-b

Michael Virgo 407 Dec 12, 2022
Python ML pipeline that showcases mltrace functionality.

mltrace tutorial Date: October 2021 This tutorial builds a training and testing pipeline for a toy ML prediction problem: to predict whether a passeng

Log Labs 28 Nov 09, 2022