当前位置:网站首页>Use Flink SQL to transfer market data 1: transfer VWAP
Use Flink SQL to transfer market data 1: transfer VWAP
2022-07-18 12:37:00 【Big data grocery store】

This article is the first in a multi part series , It shows FlinkSQL Powerful function and expressibility applied to market data . The codes and data of this series can be found in github Get on . It is led by quantitative modeling Simudyne and Krishnen Vytelingum Together . Speed is crucial in financial markets . Whether the goal is to maximize alpha Or minimize the risk , Financial technicians will invest a lot of money , To get the latest insights about market conditions and quotations . The event driven and streaming processing architecture allows complex processing of events when they occur , Make it naturally suitable for financial market applications .
Flink SQL Is a data processing language , It can be used for rapid prototyping and development of event driven and streaming applications .Flink SQL take SQL The simplicity and accessibility of Apache Flink( A popular distributed streaming media platform ) Performance and scalability are combined . With the help of Flink SQL, Business analysts 、 Developers and quantifiers can quickly establish streaming pipelines , To perform complex data analysis in real time .
In this paper , We will use Simudyne Developed agent-based model (ABM) Generated comprehensive market data .ABM It's not a top-down approach , But in complex systems for autonomous participants ( Or agent ) Modeling , for example : Various buyers and sellers in the financial market . These interactions can be captured , And it can analyze the generated comprehensive data set for many applications , For example, the training model used to detect emergency fraud , Or explore risk management “ hypothesis ” scene .ABM The generated comprehensive data is useful when historical data is insufficient or unavailable .
1 streaming VWAP
Let's start with a simple example , This example calculates the volume weighted average price from a series of trading events (VWAP).VWAP It is a general benchmark used to measure the market price and future direction of securities in trading . ad locum , We have a CSV Data set in format , This data set shows a trading day (2020 year 10 month 22 Japan ) Fictitious securities (SIMUI) Trading events .
sym,prc,vol,bid_id,ask_id,buyer_id,seller_id,step,time
SIMUl,149.86,2300,P|63-m-1,P|66-l-0,P|63,P|66,380,22-Oct-2020 08:00:07.600
SIMUl,149.86,1935,P|63-m-1,P|25-l-0,P|63,P|25,380,22-Oct-2020 08:00:07.600
SIMUl,149.74,582,P|18-l-0,P|98-m-0,P|18,P|98,428,22-Oct-2020 08:00:08.560
SIMUl,149.76,2475,P|27-l-0,P|42-m-1,P|27,P|42,1021,22-Oct-2020 08:00:20.420
SIMUl,149.84,21,P|5-m-0,P|42-l-0,P|5,P|42,1078,22-Oct-2020 08:00:21.560
SIMUl,149.76,2709,P|24-l-1,P|92-m-0,P|24,P|92,1200,22-Oct-2020 08:00:24.000
SIMUl,149.84,1653,P|8-m-1,P|24-l-0,P|8,P|24,1513,22-Oct-2020 08:00:30.260
SIMUl,149.84,400,P|19-m-0,P|24-l-0,P|19,P|24,1577,22-Oct-2020 08:00:31.540
These columns are : Trade variety , Price , Number , offer ID, Asking price ID, Buyer ID, seller ID, Step and timestamp . The step column is a discrete step ABM The artifact of market simulation , For our purposes, we can ignore ; The remaining columns are self-evident .
To process this data , We need to send CREATE TABLE Statement to declare Flink SQL surface . Our sample data is based on file system , However, it is easy to change the connector type from other sources ( for example Kafka The theme ) Reading data . Please note that ,event_time Is a derived column , Also used for watermarks . By adding watermark ,Flink The waiting time for delayed arrival and fault events can be limited , So that progress can be made . ad locum , We declare , arrive event_time Records that exceed the watermark for more than one minute will be ignored .
CREATE TABLE trades (
symbol STRING,
price DOUBLE,
vol INT,
bid_id STRING,
ask_id STRING,
buyer_id STRING,
seller_id STRING,
step INT,
ts_str STRING,
event_time AS TO_TIMESTAMP (ts_str, 'dd-MMM-yyyy HH:mm:ss.SSS'),
WATERMARK FOR event_time AS event_time - INTERVAL '1' MINUTE
) WITH (
'connector' = 'filesystem',
'path' = '/path/to/varstream/data/trades_raw',
'format' = 'csv'
);
VWAP The formula is simple : For each transaction within a specified time period , Multiply the price by the number of shares traded . Divide the sum by the total number of shares traded during the period . The following stream query will show the current VWAP, It will be updated with the arrival of new trading events :
SELECT
symbol,
SUM (vol) AS cumulative_volume,
SUM (price * vol) AS cumulative_pv,
SUM (price * vol) / SUM (vol) AS vwap
FROM
trades
GROUP BY
symbol
;
2 Play in real time
because CSV There is only one day's data value in a symbol in the file , So the result update may happen too fast , You hardly notice . Events are read from the source faster than they occur in real time . Sometimes it is necessary to playback historical data in quasi real time , It's like Flink Now receiving historical event data ( for example , Used in demonstration or prototype design and development ).
To solve this problem , We offer a simple UDTF( User defined table functions ), This data plays the historical data with artificial delay derived from the line timestamp .UDTF There are two parameters : The second parameter specifies the line timestamp ( In our example, for event_time), The first parameter specifies the minute duration after the first line timestamp ( In minutes ), To start applying the delay . The following code snippet shows how to register UDTF And before dealing with the incident 120 Minutes later, use it in the view to apply the delay . Please note that LATERAL TABLE Use of connections , This join applies the function to each row in the main table .
-- Register UDTF
CREATE FUNCTION replay_after AS 'varstream.ReplayAfterFunction' LANGUAGE JAVA ;
-- Create a view
CREATE VIEW trades_replay AS (
SELECT * FROM trades
LEFT JOIN LATERAL TABLE (replay_after (120, trades.event_time)) ON TRUE
) ;
You can verify the replay method of the event by issuing a simple query : SELECT * FROM trades_replay Use this view , We can now send the same VWAP Aggregate query , And observe the right VWAP Stream update , It's like they happen in real time :
symbol,
SUM (vol) AS cumulative_volume,
SUM (price * vol) AS cumulative_pv,
SUM (price * vol) / SUM (vol) AS vwap
FROM
trades_replay
GROUP BY
symbol
;
In spite of this UDTF It is very useful in prototyping , But it is not intended to be used in production at all . We use it here just for demonstration FlinkSQL How to update aggregation results when events arrive in real time .
3 Group Windows
The previous example shows how to calculate the flow of the day VWAP. Suppose you want to use every 1 Minutes to build a trading dashboard with a candle chart . You may need to calculate VWAP、 expensive 、 Low price and total volume .Flink SQL This operation is made easy through the group window , The group window can be in GROUP BY Apply aggregate function on time interval .
The following shows how to get VWAP:
CREATE VIEW vwap_1m AS (
SELECT
symbol,
TUMBLE_START (event_time, INTERVAL '1' MINUTES) AS start_time,
TUMBLE_ROWTIME (event_time, INTERVAL '1' MINUTES) AS row_time,
MAX (price) AS max_price,
MIN (price) AS min_price,
SUM (price * vol) AS total_price,
SUM (vol) AS total_vol,
SUM (price * vol) / SUM (vol) AS vwap
FROM
trades
GROUP BY
TUMBLE (event_time, INTERVAL '1' MINUTES), symbol
);
SELECT symbol, start_time, total_price, total_vol, vwap FROM vwap_1m ;

The previous operation calculates the transactions that occur every minute VWAP. If you want to calculate the mobile in a few minutes VWAP(MVWAP), be Flink SQL Provides a jump group window . The following shows 5 Minutes of movement VWAP, In steps of 1 minute .
CREATE VIEW vwap_5m AS (
SELECT
symbol,
HOP_START (event_time, INTERVAL '1' MINUTES, INTERVAL '5' MINUTES) AS start_time,
HOP_ROWTIME (event_time, INTERVAL '1' MINUTES, INTERVAL '5' MINUTES) AS row_time,
MAX (price) AS max_price,
MIN (price) AS min_price,
SUM (price * vol) AS total_price,
SUM (vol) AS total_vol,
SUM (price * vol) / SUM (vol) AS vwap
FROM
trades
GROUP BY
HOP (event_time, INTERVAL '1' MINUTES, INTERVAL '5' MINUTES), symbol
);
SELECT symbol, start_time, total_price, total_vol, vwap FROM vwap_5m ;

4 Conclusion
Flink SQL It can greatly simplify and speed up the development of streaming data flow . In this paper , We explored SQL GROUP BY Different uses of clauses , To calculate according to the market data flow VWAP The change of . thereinafter , We will show you how to extract streaming samples per minute from market data , To calculate intraday var (IVaR). We hope this series of articles will encourage you to try to Flink SQL For streaming market data applications .
Original author :Patrick Angeles& Krishnen Vytelingum
Link to the original text :https://blog.cloudera.com/streaming-market-data-with-flink-sql-part-i-streaming-vwap/
Follow wechat public account for more information : 
边栏推荐
- Interface automated testing: postman practical tutorial
- Gson parsing and generating JSON data tool class
- Tri des deux champs dans redis
- AcWing 368. 银河 题解(强连通分量做差分约束问题)
- 2022 Chengdu / Hangzhou / Xiamen / Wuhan product manager certification enrollment brochure (NPDP)
- IntelliJ tips
- 网安学习-权限提升
- 小程序容器技术在Hybrid 混合App开发中的价值
- 判等问题:程序里如何确定你就是你?
- Detailed explanation of assembly language programming skills (with examples)
猜你喜欢

Redis的安装(Windows)及常用的使用方法

0714下午1,review,

ThreadX kernel source code analysis (SMP) - thread execution core remap

谷歌 | 图神经网络预训练帮助了分子表征吗

Terraform命令行工具介绍、安装、使用

FreeRTOS个人笔记-任务定义与任务切换

TCP IP ICMP 以太网帧格式

Introduction, installation et utilisation des outils en ligne de commande terraform

用Flink SQL流化市场数据2:盘中风险价值

判等问题:程序里如何确定你就是你?
随机推荐
长安链介绍-02
8 o'clock tonight! Lightdb PG distributed database technology innovation and practice "
Introduction, installation et utilisation des outils en ligne de commande terraform
Redis_ Linux Installation
Kettle [practice 01] under linux environment, Azkaban is used to call kettle's KJB or KTR script regularly to realize automatic data processing (complete process example sharing: cloud resources inclu
HDOJ-2057(A + B Again)
2022 Chengdu / Hangzhou / Xiamen / Wuhan product manager certification enrollment brochure (NPDP)
技术分享| 快对讲-5G对讲
Basic part of C language: pointer (elementary level)
Elementary C language - structure
Logic of automatic reasoning 02 propositional calculus
谷歌 | 图神经网络预训练帮助了分子表征吗
Mockito中模拟静态方法
用Flink SQL流化市场数据2:盘中风险价值
判等问题:程序里如何确定你就是你?
AcWing 368. Galaxy problem solution (strongly connected components as difference constraints)
You can't answer these 20 classic redis interview questions yet, and the interviewer doesn't even look at you
AIRIOT答疑第4期|如何使用数据分析引擎?
C # résoudre les json imbriqués en utilisant jobject
MySQL deadlock analysis and solution