当前位置:网站首页>【Golang | gRPC】gRPC-Client Streaming客户端流实战
【Golang | gRPC】gRPC-Client Streaming客户端流实战
2022-07-16 02:49:00 【田土豆】
环境:
Golang: go1.18.2 windows/amd64
grpc: v1.47.0
protobuf: v1.28.0
完整代码:
https://github.com/WanshanTian/GolangLearning
cd GolangLearning/RPC/gRPC-ClientStreaming
1. 简介
前文【Golang | gRPC】HTTP的连接管理——从HTTP/1.0到HTTP/2.0的演进 简单介绍了gRPC中流模式主要分为客户端流、服务端流、双向流以及流传输模式的优点,下面通过一个demo来说明gRPC客户端流的使用
2. 实践
现有下面一种场景:服务端保存着用户的年龄信息,客户端通过stream多次发送含用户姓名的message,服务端通过stream接收message,一次性返回所有请求用户的年龄和
2.1 proto文件
2.1.1 新建gRPC-ClientStreaming文件夹,使用go mod init初始化,创建pb文件夹,新建query.proto文件
syntax = "proto3";
package pb;
option go_package= ".;pb";
// 定义查询服务包含的方法
service Query {
// 客户端流模式
rpc GetAge (stream userInfo) returns (ageInfo){}
}
// 请求用的结构体,包含一个name字段
message userInfo {
string name = 1;
}
// 响应用的结构体,包含一个age字段
message ageInfo {
int32 age = 1;
}
服务端实现一个查询(Query)服务,包含一个方法GetAge。方法GetAge的入参前加关键字stream来表明该方法启用客户端流
2.1.2 在.\gRPC-ClientStreaming\pb目录下使用protoc工具进行编译,在pb文件夹下直接生成.pb.go和_grpc.pb.go文件。关于protoc的详细使用可以查看【Golang | gRPC】使用protoc编译.proto文件
protoc --go_out=./ --go-grpc_out=./ .\query.proto

2.2 grpc.pb.go文件
2.2.1 查看query_grpc.pb.go中生成的客户端流和服务端流的接口定义以及服务端QueryServer服务的定义
// 客户端流
type Query_GetAgeClient interface {
Send(*UserInfo) error
CloseAndRecv() (*AgeInfo, error)
grpc.ClientStream
}
// 服务端流
type Query_GetAgeServer interface {
SendAndClose(*AgeInfo) error
Recv() (*UserInfo, error)
grpc.ServerStream
}
// Query服务的客户端接口
type QueryClient interface {
GetAge(ctx context.Context, opts ...grpc.CallOption) (Query_GetAgeClient, error)
}
// Query服务的服务端接口
type QueryServer interface {
GetAge(Query_GetAgeServer) error
mustEmbedUnimplementedQueryServer()
}
- 客户端流使用
Send发送message,使用CloseAndRecv接收message - 客户端
GetAge方法的第一个返回值是Query_GetAgeClient,表明生成了一条流,用于发送和接收message;如果有多个方法,则每个方法可以各自生成一条流 - 服务端
GetAge方法的入参是Query_GetAgeServer(流),具体方法需要用户自行实现,可以从流中接收和发送message
2.3 服务端
在gRPC-ClientStreaming目录下新建Server文件夹,新建main.go文件
2.3.1 下面我们通过Query这个结构体具体实现QueryServer接口
// 用户信息
var userinfo = map[string]int32{
"foo": 18,
"bar": 20,
}
type Query struct {
pb.UnimplementedQueryServer // 涉及版本兼容
}
func (q *Query) GetAge(serverStream pb.Query_GetAgeServer) error {
log.Println("start of stream")
var names_received []*pb.UserInfo
for {
userinfoRecv, err := serverStream.Recv()
// 待客户端主动关闭流后,退出for循环
if err == io.EOF {
log.Println("end of the recv direction of the stream")
break
}
log.Printf("The name of user received is %s\n", userinfoRecv.GetName())
names_received = append(names_received, userinfoRecv)
}
// 统计年龄和
var ages_sum int32
for _, v := range names_received {
ages_sum += userinfo[v.GetName()]
}
// 返回message
log.Printf("send message about the total of ages:%d ", ages_sum)
err := serverStream.SendAndClose(&pb.AgeInfo{
Age: ages_sum})
if err != nil {
log.Panic(err)
}
log.Println("end of the send direction of the stream")
return nil
}
- 服务端每收到一个message,保存其用户名,直到客户端关闭发送方向的流
Recv方法会一直阻塞直到从stream中接收到message,或者直到客户端调用CloseAndRecv方法- 当客户端调用
CloseAndRecv方法时,服务端调用Recv方法会得到io.EOF返回值 - 服务端调用
SendAndClose方法发送响应message并关闭发送方向的流
2.3.2 服务注册并启动
func main() {
// 创建socket监听器
listener, _ := net.Listen("tcp", ":1234")
// new一个gRPC服务器,用来注册服务
grpcserver := grpc.NewServer()
// 注册服务方法
pb.RegisterQueryServer(grpcserver, new(Query))
// 开启gRPC服务
_ = grpcserver.Serve(listener)
}
使用RegisterQueryServer这个方法向gRPC服务器里注册QueryServer
2.4 客户端
在gRPC-ClientStreaming目录下新建Client文件夹,新建main.go文件
2.4.1 先建立无认证的连接,生成Client,然后通过方法GetAge返回对应的流,最后通过流进行message的收发
func main() {
//建立无认证的连接
conn, err := grpc.Dial(":1234", grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
log.Panic(err)
}
defer conn.Close()
client := pb.NewQueryClient(conn)
//返回GetAge方法对应的流
queryStream, _ := client.GetAge(context.Background())
// 向stream中发送message
_ = queryStream.Send(&pb.UserInfo{
Name: "foo"})
time.Sleep(time.Second)
_ = queryStream.Send(&pb.UserInfo{
Name: "bar"})
time.Sleep(time.Second)
// 发送两次数据后主动关闭流并等待接收来自server端的message
ages_sum, err := queryStream.CloseAndRecv()
if err != nil {
log.Println(err)
}
fmt.Printf("The total of ages of foo and bar is %d", ages_sum.GetAge())
}
- 客户端通过
CloseAndRecv方法主动关闭发送方向的流同时等待接收来自服务端的message
运行结果如下:
3. 总结
- 客户端通过
Send方法多次发送message,通过CloseAndRecv方法主动关闭发送方向的流同时等待接收来自服务端的message - 服务端通过
Recv方法多次接收message,通过SendAndClose方法发送响应message并关闭发送方向的流
边栏推荐
- CRS-2674,CRS-4000
- MySQL deadlock analysis and solution
- ORA-00322&ORA-00312
- Coming home late
- Programming old driver takes you to play with completable future asynchronous programming
- 推荐一款最流行的流程图及图表工具draw.io,老掉牙的工具已无人再用
- C语言经典实例:11-20例:求二维数组最大最小值、数组求素数、编制万年历、数组元素排序、进制数的转换进制数的转换、验证哥德巴赫猜想找出次大值、使用结构体输出学生成绩、重组数组
- MySQL基础——新增与进阶查询
- 虎符发生了什么?内部人员这样说
- In depth interpretation of the investment logic of the consortium's participation in the privatization of Twitter
猜你喜欢

MySQL基础——增删查改(基础)
![[matlab] extract the data in TXT file (comma interval), and draw a two-dimensional line chart](/img/2a/a2b05561accd4e1d146625c4b5471e.png)
[matlab] extract the data in TXT file (comma interval), and draw a two-dimensional line chart

Uncover the origin of metamask: a tool set for getting started with Web3 culture

022. Detailed explanation of static and final keywords

企业如何选择合适的 Time Series Database?

使用Flink SQL传输市场数据1:传输VWAP

深度解读财团参与Twitter私有化的投资逻辑
![[flag] applet -day2](/img/60/dfa563cf24654c9881fb2c0d32e15e.png)
[flag] applet -day2
![[slow live broadcast in the post epidemic era: a good time under the lens]](/img/01/97d90068131d5357bbcec2ad513512.png)
[slow live broadcast in the post epidemic era: a good time under the lens]

Is free knowledge not worth paying attention to?
随机推荐
Airiot Q & A issue 4 | how to use data analysis engine?
NC20566 [SCOI2010]游戏
6000 Digital Collections Online seconds empty! "National treasure" digital collections look like this
揭秘MetaMask的起源:成为入门Web3文化的工具集
Banned, off the shelf! Wechat has made a move to standardize and renovate the digital collection platform!
TCP IP ICMP 以太网帧格式
ORA-00322&ORA-00312
JS数组最全方法解读!!!
Network Security Learning - permission promotion
Judge and other questions: how to determine that you are you in the program?
Rust webserver service
Miller_ Rabin Brief
Web3.0浪潮下,隐私计算与NFT的前景会是怎样
023.static and final use traps
Revelation of AI application: honey and arsenic in security market
[matlab] extract the data in TXT file (comma interval), and draw a two-dimensional line chart
Maximum return alternative method
How to make encrypted PDF files editable
编程老司机带你玩转 CompletableFuture 异步编程
推荐一款最流行的流程图及图表工具draw.io,老掉牙的工具已无人再用