你将会学到
- 一个完整的gRPC流实例,包括单向流与双向流的操作
- 如何实现gRPC流服务端代码
- 如何实现gRPC流客户端代码
准备
- 新建一个文件夹 go-grpc-simple-stream
- 在go-grpc-simple文件夹下建立三个目录: client, proto,server
- 使用
go mod
管理代码 - 在 go-grpc-simple-stream 目录下执行
go mod init go-grpc-simple-stream
编写 proto 文件
在 go-grpc-simple-stream/proto 目录下新建 hello.proto 文件
syntax = "proto3";
package hello;
service HelloService {
// 定义一个服务端推送客户的单向流
rpc ServerToClient(StreamRequest) returns (stream StreamResponse){};
// 定义一个客户端推送服务端的单向流
rpc ClientToServer(stream StreamRequest) returns (StreamResponse){};
// 定义一个服务端与客户端的双向流
rpc AllStream(stream StreamRequest) returns (stream StreamResponse){};
}
// stream 请求结构
message StreamRequest {
string data = 1;
}
// stream 响应结构
message StreamResponse {
string data = 1;
}
生成 pb go 代码
在 go-grpc-simple-stream/proto 目录下新建 gen.sh 文件
如何是 window 系统,则直接在将 protoc 命名复制在 cmd 下执行即可。
#!/usr/bin/env bash
set -x
# 第一步: 安装 protoc 插件
# 打开下面URL, 跟据自己的系统选择对应的 protoc-3.x.x-linux|osx|win
# https://github.com/protocolbuffers/protobuf/releases
# 下载完后,加入到环境变量或path中, 保证全局可用。
# 验证: protoc --version
# 第二步:引用 proto, protoc-gen-go, grpc 共3个工具包
# 安装 golang 的proto工具包
# go get -u github.com/golang/protobuf/proto
# 安装 goalng 的proto编译支持
# go get -u github.com/golang/protobuf/protoc-gen-go
# 安装 GRPC 包
# go get -u google.golang.org/grpc
protoc --go_out=plugins=grpc:. *.proto
执行以上脚本命令会在go-grpc-simple-stream/proto
目录下生成 hello.pb.go
文件
实现服务端代码
在 go-grpc-simple-stream/server目录下新建 main.go 文件
package main
import (
"fmt"
hello "go-grpc-simple-stream/proto"
"google.golang.org/grpc"
"io"
"log"
"net"
"os"
"sync"
"time"
)
const (
Addr = "localhost:3721"
)
type HelloServiceImpl struct {
}
// 服务端->客户端 推送单向流
func (h *HelloServiceImpl) ServerToClient(req *hello.StreamRequest, server hello.HelloService_ServerToClientServer) error {
// 模拟一个向客户端推送10次的单向流
var i int
for {
// 打印接受客户端的消息
log.Printf("单向流.接受客户端的消息:%s\n", req.GetData())
// 向客户端发送消息
err := server.Send(&hello.StreamResponse{Data: fmt.Sprintf("%d 单向流.%s", i, time.Now().Format("15:04:05"))})
if err != nil {
break
}
i ++
if i > 3 {
server.Send(&hello.StreamResponse{Data: "单向流.服务端推送完啦"})
break
}
time.Sleep(time.Second)
}
return nil
}
// 客户端->服务端 推送单向流
// 需要循环的接受来自客户端的消息,至到 io.EOF
func (h *HelloServiceImpl) ClientToServer(server hello.HelloService_ClientToServerServer) error {
for {
// 接受客户端的消息
data, err := server.Recv()
if err != nil {
// 无数据时跳出循环
if err == io.EOF {
break
}
return err
}
log.Printf("单向流.接受到客户端的消息:%s", data.GetData())
}
err := server.SendAndClose(&hello.StreamResponse{
Data: "单向流.接受客户端消息完毕",
})
if err != nil {
return err
}
return nil
}
// 双向流,即可以从服务端不断发送流数据,也可以不断的接受客户端发送过来的流数据。
// 所以需要处理发送与接受,需要采用两个协程处理。
func (h *HelloServiceImpl) AllStream(server hello.HelloService_AllStreamServer) error {
wg := sync.WaitGroup{}
wg.Add(2)
// 处理服务端向客户端发送的流数据
go func() {
defer wg.Done()
i := 0
for {
err := server.Send(&hello.StreamResponse{
Data: fmt.Sprintf("%d,来自双向流的服务端:%s",i, time.Now().Format("2006-01-02 15:04:05")),
})
if err != nil {
break
}
i ++
if i > 3 {
break
}
time.Sleep(time.Second)
}
}()
// 处理客户端向服务端发送过来的流数据。
go func() {
for {
data, err := server.Recv()
if err != nil {
if err == io.EOF {
break
}
log.Fatalln(err)
}
log.Printf("来自客户端的消息:%s\n", data.GetData())
}
}()
wg.Wait()
return nil
}
func main() {
log.SetFlags(log.Lshortfile|log.LstdFlags)
// 构造一个 gRPC 服务对象
grpcServer := grpc.NewServer()
// 然后使用 protoc 工具生成的 go 代码函数(RegisterHelloServiceServer) 注册我们实现的 HelloServiceImpl 服务
hello.RegisterHelloServiceServer(grpcServer, new(HelloServiceImpl))
// 通过 grpcServer.Serve 在一个监听端口上提供 gRPC 服务
lis, err := net.Listen("tcp", Addr)
if err != nil {
log.Fatal(err)
}
// 打印一下运行时的进程ID和地址
go func() {
log.Printf("PID:%d, %s\n", os.Getpid(), Addr)
}()
err = grpcServer.Serve(lis)
if err != nil {
log.Fatal(err)
}
}
运行 go run .
实现客户端代码
在 go-grpc-simple-stream/client 目录下新建 main.go 文件
package main
import (
"context"
"fmt"
hello "go-grpc-simple-stream/proto"
"google.golang.org/grpc"
"io"
"log"
"sync"
"time"
)
const (
Addr = "localhost:3721"
)
func main() {
log.SetFlags(log.Lshortfile|log.LstdFlags)
conn, err := grpc.Dial(Addr, grpc.WithInsecure())
if err != nil {
log.Fatal(err)
}
defer conn.Close()
client := hello.NewHelloServiceClient(conn)
// 获取服务端向客户端不断推送的数据流。
GetServerStream(client)
// 客户端不断的向服务器推送单向流
PutServerStream(client)
// 双向流
AllStream(client)
}
// 双向流
func AllStream(client hello.HelloServiceClient) {
all, err := client.AllStream(context.Background())
if err != nil {
log.Fatalln(err)
}
wg := sync.WaitGroup{}
wg.Add(2)
// 处理服务端->客户端的流
go func() {
defer wg.Done()
for {
resp, err := all.Recv()
if err != nil {
if err == io.EOF {
break
}
log.Fatalln(err)
}
log.Printf("双向流,接受到服务的数据:%s\n", resp.GetData())
}
}()
// 处理 客户端->服务端的流
go func() {
defer wg.Done()
i := 0
for {
err = all.Send(&hello.StreamRequest{
Data: fmt.Sprintf("%d,%s", i, time.Now().Format("15:04:05")),
})
if err != nil {
log.Println(err)
break
}
i ++
if i > 3 {
break
}
time.Sleep(time.Second)
}
}()
wg.Wait()
}
// 客户端不断的向服务器推送单向流
func PutServerStream(client hello.HelloServiceClient) {
i := 0
// 向服务端推送流
put, err := client.ClientToServer(context.Background())
if err != nil {
log.Fatalln(err)
}
for {
err = put.Send(&hello.StreamRequest{
Data: fmt.Sprintf("%d,%s", i, time.Now().Format("15:04:05")),
})
if err != nil {
break
}
i ++
if i > 3 {
break
}
time.Sleep(time.Second)
}
// 接受
resp, err := put.CloseAndRecv()
if err != nil {
if err != io.EOF {
log.Fatalln(err)
}
}
// 接受服务端响应的数据
if resp != nil {
log.Println(resp.GetData()) // 接受完毕
}
}
// 获取服务端向客户端不断推送的数据流。
func GetServerStream(client hello.HelloServiceClient) {
// 向服务器发一个数据标识
req := hello.StreamRequest{Data: "客户端"}
// 调用 ServerToClient 函数,准备接受服务端单向流
resp, err := client.ServerToClient(context.Background(), &req)
if err != nil {
log.Fatalln(err)
}
for {
data, err := resp.Recv()
if err != nil {
// 遇到 io.EOF 表示服务端流关闭
if err == io.EOF {
break
}
log.Println(err)
break
}
log.Printf("服务端推送的单向流:%s\n", data)
}
}
执行 go run .
go mod 管理代码
在 go-grpc-simple 目录下
go mod tidy
源码
github.com/go-grpc-simple-stream