当前位置:网站首页>一个简单的websocket例子
一个简单的websocket例子
2022-07-17 12:06:00 【永远的麦田】
写的一个范例,用来测试websocket服务器用的
有自动连接部分,即如果与服务器断开,系统会再次连接websocket服务器,避免因网络问题引发数据彻底断了
需要注意的是,websocket组件必须是0.57.0,后面更新的组件没有再跟了,应该与当前的使用有冲突。
完整代码如下:
import websocket # websocket_client==0.57.0
import queue
import json
import time
import sys
from apscheduler.schedulers.background import BackgroundScheduler
try:
import thread
except ImportError:
import _thread as thread
class RecvTickWebSocket:
def __init__(self, _instrument_id):
self._instrument_id = _instrument_id # 合约代码
self.ws: websocket.WebSocketApp = None
self.queue = queue.Queue()
self.url = "ws://192.168.1.100:9001/ws"
def get_ws_status(self, ws: websocket.WebSocketApp):
if self is None or ws is None:
return False
return ws.sock is not None
def deal_data(self):
print("deal_data.......begin")
ws = self.ws
while self.get_ws_status(ws):
time_out = False
tick_data: dict = None
try:
tick_data = self.queue.get(timeout=3)
except Exception:
time_out = True
if not time_out:
print(tick_data)
print("###:[{}] deal_data end", self._instrument_id)
def __del__(self):
if self.ws is not None:
print("####: Close WS")
self.ws.close()
@property
def instrument_id(self):
return self._instrument_id
def on_message(self, message):
data = json.loads(message)
if "dataType" in data and data["dataType"] == "future_depth": # 期货实时行情
self.queue.put(data)
def on_error(self, error):
print("####### on_error #######,[{}]: {}", self._instrument_id, error)
def on_close(self):
pass
def on_open(self):
print("### on_open: {}", self.instrument_id)
scheduler = BackgroundScheduler()
scheduler.add_job(self.deal_data, args=[])
scheduler.start()
def run(*args):
msg = '{
{"symbols":"{}","subscribe":"on"}}'.format(self.instrument_id)
print("# Subscribe Msg: {}", msg)
self.ws.send(msg)
thread.start_new_thread(run, ())
def start(self):
self.ws = websocket.WebSocketApp(self.url,
on_message=self.on_message,
on_error=self.on_error,
on_close=self.on_close)
self.ws.on_open = self.on_open
self.ws.run_forever(ping_interval=60, ping_timeout=5)
@staticmethod
def job(instrument_id):
print("##: @@@ In JOB @@@:{}", instrument_id)
my_web = RecvTickWebSocket(instrument_id)
while True:
my_web.start()
time.sleep(1)
def do_recv_contract(instrument_id):
sched = BackgroundScheduler()
sched.add_job(RecvTickWebSocket.job, args=[instrument_id])
sched.start()
while True:
time.sleep(1)
if __name__ == '__main__':
argv = sys.argv
instrument_id = argv[1]
print("## sys.argv:{}", argv)
do_recv_contract(instrument_id)运行截图:
D:\ProgramData\Anaconda3\envs\pydev3.9\python.exe E:/work/py/strategyserver/strategyserver/websocketclient_demo2.py rb2210
## sys.argv:{} ['E:/work/py/strategyserver/strategyserver/websocketclient_demo2.py', 'rb2210']
##: @@@ In JOB @@@:{} rb2210
### on_open: {} rb2210
deal_data.......begin
# Subscribe Msg: {} {"symbols":"rb2210","subscribe":"on"}边栏推荐
- String类型函数传递问题
- 【vulnhub靶场】PRIME:1打靶过程记录
- Let, const, VaR in ES6
- Online education knowledge payment website source code system + live broadcast + applet, installation tutorial
- HCIA 复习作答 2022.7.6
- R语言使用epiDisplay包的aggregate函数将数值变量基于因子变量拆分为不同的子集,计算每个子集的汇总统计信息、设置na.rm参数为FALSE之后包含缺失值的分组的统计量的结果为NA
- Huawei wireless device configuration intelligent roaming
- Complete knapsack problem code template
- English语法_人称代词-用法
- 华为无线设备配置动态负载均衡
猜你喜欢

Flink入门到实战-阶段四(时间和窗口图解)
![[Download] take you to use FRP to achieve intranet penetration detailed tutorial!](/img/8c/f3a5dd05966bcb98f24d97e1576a7e.png)
[Download] take you to use FRP to achieve intranet penetration detailed tutorial!

Huawei wireless device configuration dynamic load balancing

2022 Hunan secondary vocational group "Cyberspace Security" packet analysis infiltration Pacpng parsing (super detailed)

Kirin Xin'an operating system derivative solution | host security reinforcement software, to achieve one click rapid reinforcement!

Ffmpeg record video, stop (vb.net, step on the pit, class library - 10)

【东北师范大学】考研初试复试资料分享

中科磐云—D模块web远程代码执行漏洞解析

Random talk on GIS data (III)

Flink introduction to actual combat - phase IV (time and window diagram)
随机推荐
Rasa 3. X learning series -rasa version 3.1.5 release
ash: /etc/apt/sources. List: insufficient permissions
2022 windows penetration test of "Cyberspace Security" of Hunan secondary vocational group (ultra detailed)
【CSP-J 2021】总结
卫星网络中基于时变图的节能资源分配策略
Huawei ascend910 running yolov3 tutorial
笔记本键盘失灵解决办法
Microsoft OneNote tutorial, how to insert mathematical formulas in OneNote?
R语言使用epiDisplay包的ordinal.or.display函数获取有序logistic回归模型的汇总统计信息(变量对应的优势比及其置信区间、以及假设检验的p值)、使用summary汇总统计
Story of status code
Random talk on GIS data (III)
SSH Connection Huawei modelarts Notebook
快速判断站点是否存活的 3 种编程实现
Online education knowledge payment website source code system + live broadcast + applet, installation tutorial
为什么磁力变速齿轮会反转?
R语言使用lm函数构建线性回归模型、使用subset函数指定对于数据集的子集构建回归模型(使用subset函数筛选满足条件的数据子集构建回归模型)
基于信道状态信息的Wi-Fi感知技术与实践
【MySQL】MySQL的增删查改(进阶)
HCIA 静态基础实验 7.8
What is pytest? Automated testing is a must