简单聊一聊gRPC
什么是RPC
常规的客户端和服务端传协议需要创建TCP连接,具体什么协议根据具体的应用场景可以做选择
常见的协议
- HTTP
RPC是远程过程调用,主要在服务端执行
rpc就是把
-
上述过程封装下,使其操作更加有优化
-
使用大家都认可的协议使其规范化
-
做成一个框架
rpc在执行过程中就像在调用本地方法一样(底层还是TCP),rpc把一些细节给封装了一下
GRPC支持多个语言,go语言的仓库为
http://github.com/grpc/grpc-go
grpc是一个框架,具体的传输数据还是要用到一些相关的戒指
grpc传输所使用的协议为protobuf
protohuf用protoc(一个程序)来编译
protobuf的go的版本安装方法如下所示:
go get github.com/golang/protobuf/protoc-gen-go
protobuf是一个轻便高效的序列化数据结构协议,可以用于网络通信和数据存储
特点:性能高,传输快,维护方便
简单跑个Demo
服务端
创建中间文件
syntax = "proto3";
package services;
message ProdRequest {
int32 prod_id = 1;
}
message ProdResponse {
int32 prod_stock = 1;
}
创建如下所示的项目路径
然后在pbfiles目录下执行命令:
protoc --go_out=../ Prod.proto
生成了如下所示的文件
在prod.proto里面添加方法
syntax = "proto3";
package services;
message ProdRequest {
int32 prod_id = 1;
}
message ProdResponse {
int32 prod_stock = 1;
}
service ProdService {
rpc GetProdStock (ProdRequest) returns (ProdResponse);
}
然后重新生成文件
protoc --go_out=plugins=grpc:../services Prod.proto
现在发现里面又多了一些东西
这里有一个接口,我们要在go里面去实现这个接口
新建一个ProdService文件,用来实现services接口
package services
import (
"context"
)
import (
"google.golang.org/grpc"
)
type ProdService struct {
}
func (this *ProdService) GetProdStock(ctx context.Context, request *ProdRequest, opts ...grpc.CallOption) (*ProdResponse, error) {
return &ProdResponse{ProdStock: 20},nil
}
下面开始写服务端:
package main
import (
"google.golang.org/grpc"
"net"
)
import (
"go-grpc-learning/services"
)
func main() {
rpcServer := grpc.NewServer()
services.RegisterProdServiceServer(rpcServer,new(services.ProdService))
lis, _ := net.Listen("tcp",":8084")
rpcServer.Serve(lis)
}
这里有一个比较特殊RegisterProdServiceServer函数会提示参数不匹配,在网课中是把RegisterProdServiceServer声明的地方删掉最后一个参数
客户端
客户端的话就把服务端的Prod.pb.go文件拷贝过去就可以了
然后客户端的main函数:
package main
import (
"context"
"fmt"
"log"
)
import (
"google.golang.org/grpc"
)
import (
"grpc-client/services"
)
func main() {
connection, err := grpc.Dial(":8084")
if err != nil {
log.Fatal(err)
}
defer connection.Close()
prodClient := services.NewProdServiceClient(connection)
prodRes, err := prodClient.GetProdStock(context.Background(), &services.ProdRequest{ProdId: 12})
if err != nil {
log.Fatal(err)
}
fmt.Println(prodRes.ProdStock)
}
这样子运行会报错:
2021/01/16 14:42:17 grpc: no transport security set (use grpc.WithInsecure() explicitly or set credentials)
看提示是没有证书,正常来说grpc是需要使用证书的,然后证书的问题下一章节再说,先按照提示设置
grpc.WithInsecure()
第18行改成这样就可以了
connection, err := grpc.Dial(":8084", grpc.WithInsecure())
自签证书
一般来说在生产环境中是不允许使用自签证书的,在买域名的时候域名服务商一般会送一个证书
这个证书可以找域名服务商申请,一般是免费
加入证书代码:服务端
creds, err := credentials.NewServerTLSFromFile("keys/ssl.crt","keys/ssl.key")
if err != nil {
log.Fatal(err)
}
rpcServer := grpc.NewServer(grpc.Creds(creds))
重新运行server,现在再运行client就报错了
2021/01/16 15:47:10 rpc error: code = Unavailable desc = connection closed
client也一样的加入证书
creds, err := credentials.NewClientTLSFromFile("keys/ssl.crt","www.cjpa.top")
if err != nil {
log.Fatal(err)
}
connection, err := grpc.Dial(":8084", grpc.WithTransportCredentials(creds))
这个时候运行就可以正常接收到数据了
热身
之前服务端使用的tcp进行连接,现在尝试用一个http看看
-
为什么要使用httpserver?
因为有时候一些其他的服务可能没有用到grpc服务,而是用的http服务,为了能够让其他没有用到grpc的client也能调用到我们写的服务,需要写处理http的接口
代码如下:
func main() { creds, err := credentials.NewServerTLSFromFile("keys/ssl.crt","keys/ssl.key") if err != nil { log.Fatal(err) } rpcServer := grpc.NewServer(grpc.Creds(creds)) services.RegisterProdServiceServer(rpcServer,new(services.ProdService)) //lis, _ := net.Listen("tcp",":8084") //rpcServer.Serve(lis) mux := http.NewServeMux() mux.HandleFunc("/", func(writer http.ResponseWriter, request *http.Request) { rpcServer.ServeHTTP(writer,request) }) httpServer := &http.Server{ Addr : ":8084", Handler: mux, } httpServer.ListenAndServe() }
用浏览器访问之后发现提示gRPC requires HTTP/2,这是因为grpc提供http服务的时候用的是http2,而普通的浏览器访问时http1.x,因此会有这个提示
解决方案:
这样就变成了用https访问
httpServer.ListenAndServeTLS("keys/ssl.crt","keys/ssl.key")
现在可以正常访问,但是还时优点小问题
下面检查一下看看请求过来的时候用了什么协议
mux.HandleFunc("/", func(writer http.ResponseWriter, request *http.Request) {
fmt.Println(request.Proto)// 可以打印协议
rpcServer.ServeHTTP(writer,request)
})
发现输出的是http/2.0
总体问题不大,这个过会儿再看吧
下面用客户端访问服务端,然后把所有的信息都给打印下来
fmt.Println(request)
&{POST /services.ProdService/GetProdStock HTTP/2.0 2 0
map[
Content-Type:[application/grpc]
Te:[trailers]
User-Agent:[grpc-go/1.35.0]] 0xc000090210 <nil> -1 [] false www.cjpa.top map[] map[] <nil> map[] 127.0.0.1:53297 /services.ProdService/GetProdStock 0xc000122a50 <nil> <nil> 0xc000096080
}
可以看到Context-Type里面有grpc
后面需要做的就是怎么调用这个grpc
使用自签CA,server,Client证书和双向认证
CA证书
CA证书(root certificate)是属于根证书颁发机构(CA)的公钥证书,用以验证它所签发的证书(客户端、服务端)
生成流程
openssl
genrsa -out ca.key 2048
req -new -x509 -days 3650 -key ca.key -out ca.pem
重新生成服务端证书
genrsa -out server.key 2048
req -new -key server.key -out server.csr
x509 -req -sha256 -CA ca.pem -CAkey ca.key -CAcreateserial -days 3650 -in server.csr -out server.pem
上面的过程中都会提示要输入common name,这里统一填写localhost,写成域名也可以
生成客户端证书
ecparam -genkey -name secp384r1 -out client.key
req -new -key client.key -out client.csr
x509 -req -sha256 -CA ca.pem -CAkey ca.key -CAcreateserial -days 3650 -in client.csr -out client.pem
然后在程序中重新覆盖server.crt和server.key
服务端覆盖之前
覆盖之后
服务端代码改造
func main() {
//creds, err := credentials.NewServerTLSFromFile("keys/ssl.crt","keys/ssl.key")
//if err != nil {
// log.Fatal(err)
//}
cert, _ := tls.LoadX509KeyPair("cert/server.pem","cert/server.key")
certPool := x509.NewCertPool()
ca, _ := ioutil.ReadFile("cert/ca.pem")
certPool.AppendCertsFromPEM(ca)
creds := credentials.NewTLS(&tls.Config{
Certificates: []tls.Certificate{cert},
ClientAuth: tls.RequireAndVerifyClientCert,
ClientCAs: certPool,
})
rpcServer := grpc.NewServer(grpc.Creds(creds))
services.RegisterProdServiceServer(rpcServer,new(services.ProdService))
//lis, _ := net.Listen("tcp",":8084")
//rpcServer.Serve(lis)
mux := http.NewServeMux()
mux.HandleFunc("/", func(writer http.ResponseWriter, request *http.Request) {
fmt.Println(request)
rpcServer.ServeHTTP(writer,request)
})
httpServer := &http.Server{
Addr : ":8084",
Handler: mux,
}
//httpServer.ListenAndServe()
httpServer.ListenAndServeTLS("cert/server.pem","cert/server.key")
}
改造客户端:
和server差不多,也是把证书文件拷贝过去,不过server.key不能放进去
func main() {
//creds, err := credentials.NewClientTLSFromFile("keys/ssl.crt","www.cjpa.top")
//if err != nil {
// log.Fatal(err)
//}
cert, _ := tls.LoadX509KeyPair("cert/client.pem","cert/client.key")
certPool := x509.NewCertPool()
ca, _ := ioutil.ReadFile("cert/ca.pem")
certPool.AppendCertsFromPEM(ca)
creds := credentials.NewTLS(&tls.Config{
Certificates: []tls.Certificate{cert},//服务端证书
ServerName: "localhost",//双向验证
RootCAs: certPool,
})
connection, err := grpc.Dial(":8084", grpc.WithTransportCredentials(creds))
if err != nil {
log.Fatal(err)
}
defer connection.Close()
prodClient := services.NewProdServiceClient(connection)
prodRes, err := prodClient.GetProdStock(context.Background(), &services.ProdRequest{ProdId: 12})
if err != nil {
log.Fatal(err)
}
fmt.Println(prodRes.ProdStock)
}
现在服务正常运行了
双向认证下rpc-gateway使用
用gateway来同时提供rpc和http接口
有一个第三方库
https://github.com/grpc-ecosystem/grpc-gateway
对于普通的http请求,在grpc基础之上加一层代理并转发,转变成protobuf访问,然后再把protobuf的响应转化为http响应,返回给客户端
安装:
go get -u github.com/grpc-ecosystem/grpc-gateway/protoc-gen-swagger
go get -u github.com/grpc-ecosystem/grpc-gateway/protoc-gen-grpc-gateway
然后再拓展里面找到这个文件夹
拷贝过来
一个是正常的的prod.pb,一个是网关
下面开始改造代码
先改造pbfiles/Prod.proto
syntax = "proto3";
package services;
import "google/api/annotations.proto";
message ProdRequest {
int32 prod_id = 1;
}
message ProdResponse {
int32 prod_stock = 1;
}
service ProdService {
rpc GetProdStock (ProdRequest) returns (ProdResponse){
option (google.api.http) = {
get: "/v1/prod/{prod_id}"
};
};
}
在pbfiles里面打开终端,执行命令
# 这个会生成Prod.pb.go
protoc --go_out=plugins=grpc:../services Prod.proto
# 这会生成Prod.pd.gw.go
protoc --grpc-gateway_out=logtostderr=true:../services Prod.proto
编写http网关
func main() {
// 路由
gwmux := runtime.NewServeMux()
// 这个opt指定客户端使用的证书
opt := []grpc.DialOption{grpc.WithTransportCredentials(helper.GetClientCreds())}
// RegisterProdServiceHandlerFromEndpoint方法在Prod.pb.gw.go里面,这个Endpoint意思就是grpc的地址,和server.go里面的端口保持一致
err := services.RegisterProdServiceHandlerFromEndpoint(context.Background(), gwmux, "localhost:8084",opt)
if err != nil {
log.Fatal(err)
}
httpServer := &http.Server{
Addr: ":8085",
Handler: gwmux,
}
httpServer.ListenAndServe()
}
这样操作之后,http请求可以通过8085端口来进行访问,然后再转发给本地的8084rpc服务
在浏览器中访问8085:
语法速学
repeated
上节课是传入一个商品id获取一个库存,现在我们来获取一堆商品的库存
还是老规矩,改造代码pbfiles/Pord.proto:
syntax = "proto3";
package services;
import "google/api/annotations.proto";
message ProdRequest {
int32 prod_id = 1;
}
message QuerySize {
int32 size = 1; // 页尺寸,这个1是顺序
}
message ProdResponse {
int32 prod_stock = 1;
}
/*
返回一堆商品库存,使用了repeated修饰符
因为grpc是一个跨语言的框架,不能按照go的风格来写,repeated就代表这个属性可以重复
*/
message ProdResponseList {
repeated ProdResponse prodRes = 1;
}
service ProdService {
rpc GetProdStock (ProdRequest) returns (ProdResponse){
option (google.api.http) = {
get: "/v1/prod/{prod_id}"
};
};
rpc getProdStocks (QuerySize) returns (ProdResponseList) {
};
}
生成prod.pb.go文件:
protoc --go_out=plugins=grpc:../services Prod.proto
Services/ProdService.go里面也要实现一下上面定义的getProdStocks
func (this *ProdService) GetProdStocks(ctx context.Context,request *QuerySize) (*ProdResponseList, error) {
Prods := []*ProdResponse{
&ProdResponse{ProdStock: 28},
&ProdResponse{ProdStock: 36},
&ProdResponse{ProdStock: 12},
&ProdResponse{ProdStock: 78},
}
return &ProdResponseList{
ProdRes: Prods,
}, nil
}
然后客户端调用:
res, err := prodClient.GetProdStocks(context.Background(), &services.QuerySize{Size: 6})
if err != nil {
log.Fatal(err)
}
fmt.Println(res.ProdRes)
控制台输出如下
enum
上一个例子中我们实现了切片和数组
现在讲一下枚举类型
改造pbfiles/Prod.proto
enum ProdAreas {
A = 0;
B = 1;
C = 2;
}
message ProdRequest {
int32 prod_id = 1;
ProdAreas prod_area = 2;
}
重新生成Prod.pb.go
改造服务端代码
func (this *ProdService) GetProdStock(ctx context.Context, request *ProdRequest) (*ProdResponse, error) {
var stock int32 = 0
if request.ProdArea == ProdAreas_A {
stock = 30
}else if request.ProdArea == ProdAreas_B {
stock = 40
}else if request.ProdArea == ProdAreas_C {
stock = 50
}
return &ProdResponse{ProdStock: stock},nil
}
客户端
prodRes, err := prodClient.GetProdStock(context.Background(), &services.ProdRequest{ProdId: 12, ProdArea: services.ProdAreas_A})
if err != nil {
log.Fatal(err)
}
fmt.Println(prodRes.ProdStock)
输出结果如下
导入外部proto
新建一个proto文件,专门放一些实体
syntax = "proto3";
package services;
//商品模型
message ProdModel {
int32 prod_id = 1;
string prod_name = 2;
float prod_price = 3;
}
在Prod.proto里面导入
import "Models.proto";
然后写一个rpc服务
rpc getProdInfo (ProdRequest) returns (ProdModel) {};
生成go文件
protoc --go_out=plugins=grpc:../services Prod.proto
protoc --go_out=plugins=grpc:../services Models.proto
实现getProdInfo
func (this *ProdService) GetProdInfo(ctx context.Context, request *ProdRequest) (*ProdModel, error) {
ret := ProdModel {
ProdId: 101,
ProdName: "测试商品",
ProdPrice: 20.5,
}
return &ret, nil
}
客户端调用
prod, err := prodClient.GetProdInfo(context.Background(), &services.ProdRequest{ProdId: 12})
if err != nil {
log.Fatal(err)
}
fmt.Println(prod.ProdName)
结果如下
日期类型
创建主订单模型
import "google/protobuf/timestamp.proto";
// 主订单模型
message OrderMain {
int32 order_id = 1;// 订单ID
string order_no = 2;// 订单号
int32 user_id = 3;// 购买者Id
float order_money = 4;// 商品金额
//这边还需要一个订单时间
google.protobuf.Timestamp order_time = 5;
}
日期类型在go里面的使用
import "github.com/golang/protobuf/ptypes/timestamp"
t := timestamp.Timestamp{Seconds: time.Now().Unix()}
场景练习
上面的网关用的是get方法,下面用post试试
首先改造proto代码
Pbfiles/Order.proto
syntax = "proto3";
package services;
import "google/api/annotations.proto";
import "Models.proto";
message OrderRequest {
OrderMain order_main = 1;
}
message OrderResponse {
string status = 1;
string message = 2;
}
service OrderService {
rpc NewOrder(OrderRequest) returns (OrderResponse) {
option (google.api.http) = {
post: "/v1/orders"
body: "order_main"
};
};
}
生成新的网关代码,和grpc代码
protoc --go_out=plugins=grpc:../services Prod.proto &&
protoc --go_out=plugins=grpc:../services Models.proto &&
protoc --go_out=plugins=grpc:../services Orders.proto &&
protoc --grpc-gateway_out=logtostderr=true:../services Prod.proto &&
protoc --grpc-gateway_out=logtostderr=true:../services Orders.proto
改造OrderService.go
func (this *OrderService) NewOrder(ctx context.Context,orderRequest *OrderRequest) (*OrderResponse, error) {
fmt.Println(orderRequest)
return &OrderResponse{
Status: "ok",
Message: "success",
},nil
}
编写HttpServer.go(网关代码)
func main() {
// 路由
gwmux := runtime.NewServeMux()
gRpcEndPoint := "localhost:8084"
// 这个opt指定客户端使用的证书
opt := []grpc.DialOption{grpc.WithTransportCredentials(helper.GetClientCreds())}
// 商品服务网关
// RegisterProdServiceHandlerFromEndpoint方法在Prod.pb.gw.go里面,这个Endpoint意思就是grpc的地址,和server.go里面的端口保持一致
err := services.RegisterProdServiceHandlerFromEndpoint(context.Background(), gwmux, gRpcEndPoint,opt)
if err != nil {
log.Fatal(err)
}
// 订单服务网关
err = services.RegisterOrderServiceHandlerFromEndpoint(context.Background(), gwmux, gRpcEndPoint,opt)
if err != nil {
log.Fatal(err)
}
httpServer := &http.Server{
Addr: ":8085",
Handler: gwmux,
}
httpServer.ListenAndServe()
}
postman测试跑一波
服务端
子订单模型
// 主订单模型
message OrderMain {
int32 order_id = 1;// 订单ID
string order_no = 2;// 订单号
int32 user_id = 3;// 购买者Id
float order_money = 4;// 商品金额
// 这边还需要一个订单时间
google.protobuf.Timestamp order_time = 5;
// 嵌套
repeated OrderDetail order_detail = 6;
}
// 子订单模型
message OrderDetail {
int32 detail_id = 1;
string order_no = 2;
int32 prod_id = 3;
float prod_price = 4;
int32 prod_num = 5;
}
使用第三方库进行验证
https://github.com/envoyproxy/protoc-gen-validate
这个有点鸡肋,但是也挺方便还挺好用
流模式
为什么要使用流模式?
上面的例子传输的数据比较小
基本模式是客户端请求–服务端响应
这种模式有两个问题:
- 如果传输数据比较大的时候会导致压力陡增
- 服务端需要等待客户端的包全部发送才能处理以及响应
这样很慢,grpc只吃了流模式
举个例子
假设要从库里去取一批(x万到几十万),批量查询用户的积分
用户模型如下:
message UserInfo{
int32 user_id = 1;
int32 user_score = 2;
}
我们心里期望的模式是,每次先处理1w个,然后一万一万的处理,直到结束
服务端流
服务端流就是服务端分批发送
User.proto代码改造
rpc GetUserScoreByServerStream (UserScoreRequest) returns (stream UserScoreResponse);
服务端UserService.go代码改造
//服务端流模式
func (this *UserService) GetUserScoreByServerStream(in *UserScoreRequest,stream UserService_GetUserScoreByServerStreamServer) error {
var score int32 = 101
users := make([]*UserInfo, 0)
for index, user := range in.Users {
user.UserScore = score
score ++
users = append(users, user)
// 每隔两条发送一次
if (index + 1) % 2 ==0 && index > 0{
err := stream.Send(&UserScoreResponse{Users: users})
if err != nil {
return err
}
users = (users)[0:0]
}
// 发送完之后就晴空
time.Sleep(1 * time.Second)
}
if len(users) > 0 {
err := stream.Send(&UserScoreResponse{ Users:users})
if err != nil {
return err
}
}
return nil
}
客户端扫描读取:
stream, err := userClient.GetUserScoreByServerStream(context.Background(), &req)
if err != nil {
log.Fatal(err)
}
for {
res,err := stream.Recv()
if err == io.EOF {
break
}
if err != nil {
log.Fatal(err)
}
fmt.Println(res.Users)
}
客户端流
客户端分批接收
场景
客户端批量查询用户积分
- 客户端一次性把用户列表发送过去(客户端获取列表比较慢)
- 服务端查询比较快
这个时候可以使用客户端流模式,服务端就一直扫描消息,然后扫描到了就发送给客户端
改装User.proto
// 客户端流
rpc GetUserByClientStream (stream UserScoreRequest) returns (UserScoreResponse) {};
UserService.go
// 客户端流
func (this *UserService) GetUserByClientStream(stream UserService_GetUserByClientStreamServer) error {
var score int32 = 101
users := make([]*UserInfo, 0)
for {
req, err := stream.Recv()
if err == io.EOF {
return stream.SendAndClose(&UserScoreResponse{ Users:users}) //关闭流
}
if err != nil {
return err
}
for _, user := range req.Users {
user.UserScore = score // 业务处理
score++
users = append(users, user)
}
}
return nil
}
改造客户端代码
// 客户端流发送
stream, err := userClient.GetUserByClientStream(context.Background())
if err != nil {
log.Fatal(err)
}
for j := 1; j < 4; j ++{
var req = services.UserScoreRequest{}
req.Users = make([]*services.UserInfo,0)
for i = 1; i < 6; i++ { // 加了六条消息,假设是一个耗时的过程
req.Users = append(req.Users, &services.UserInfo{UserId: i})
}
err := stream.Send(&req)
if err != nil {
log.Println(err)
}
}
// 在结束之后发送关闭信号,并且接收消息
res,_ := stream.CloseAndRecv()
fmt.Println(res.Users)
}
双向流
场景:
客户端分批查询用户积分
- 客户端分批把用户列表发送过去(客户端获取列表比较慢)
- 服务端查询积分也很慢,所以分批发送过去
此时两边都很慢,那么就使用双向流模式
改造代码user.proto
// 双向流
rpc GetUserScoreByTWS (stream UserScoreRequest) returns (stream UserScoreResponse) {};
UserService.go
//双向流
func (this *UserService) GetUserScoreByTWS(stream UserService_GetUserScoreByTWSServer) error{
var score int32 = 101
users := make([]*UserInfo, 0)
for {
//接收
req, err := stream.Recv()
if err == io.EOF {
return nil
}
if err != nil {
return err
}
for _, user := range req.Users {
user.UserScore = score // 业务处理
score++
users = append(users, user)
}
//发送
err = stream.Send(&UserScoreResponse{Users: users})
if err != nil {
log.Fatal(err)
}
users = (users)[0:0]
}
return nil
}
客户端
// 客户端流发送
stream, err := userClient.GetUserScoreByTWS(context.Background())
//stream, err := userClient.GetUserByClientStream(context.Background())
if err != nil {
log.Fatal(err)
}
for j := 1; j < 4; j ++{
var req = services.UserScoreRequest{}
req.Users = make([]*services.UserInfo,0)
for i = 1; i < 6; i++ { // 加了六条消息,假设是一个耗时的过程
req.Users = append(req.Users, &services.UserInfo{UserId: i})
}
err := stream.Send(&req)
if err != nil {
log.Println(err)
}
// 在结束之后发送关闭信号,并且接收消息
res,err := stream.Recv()
if err == io.EOF {
break
}
if err != nil {
log.Fatal(err)
}
fmt.Println(res.Users)
}
可以看到,每次读去了5个