gRPC实践练习
gRPC实践练习
参考:
- buf
- protobuf EN
- protobuf CN
- grpc-go EN
- grpc-go CN
- grpcurl
- protovalidate
- CEL by Example
- go-grpc-middleware
- OpenTelemetry
- grpc-gateway
grpc相关的内容都十分繁琐,虽然看还是能看懂但自己写就抓瞎了。所以做一个项目边练边学尝试融会贯通所学内容。
项目地址:Jinvic/grpc-practice
项目结构
项目结构没有什么最佳实践之类,按自己喜欢的来就行。
protobuf有一套自己的包机制,和go混在一起容易混淆。搞清楚package和option go_package等配置就放哪都行。
./
├── Readme.md
├── api/
│ ├── book/
│ │ └── v1/
│ │ └── book.proto
│ └── common/
│ └── v1/
│ └── types.proto
├── buf.gen.yaml
├── buf.lock
├── buf.yaml
├── cmd/
│ ├── client/
│ │ └── book/
│ └── server/
│ └── book/
│ └── main.go
├── config/
│ └── config.yml
├── go.mod
├── go.sum
├── internal/
│ ├── client/
│ │ └── book/
│ │ └── client.go
│ ├── pkg/
│ │ ├── config/
│ │ │ ├── config.go
│ │ │ └── viper.go
│ │ ├── database/
│ │ │ └── gorm.go
│ │ ├── logger/
│ │ │ ├── context.go
│ │ │ └── logger.go
│ │ └── otel/
│ │ └── otel.go
│ └── server/
│ ├── book/
│ │ ├── model/
│ │ │ ├── biz/
│ │ │ │ └── book.go
│ │ │ └── db/
│ │ │ └── book.go
│ │ ├── repo/
│ │ │ ├── book.go
│ │ │ └── converter.go
│ │ ├── server/
│ │ │ ├── di.go
│ │ │ └── server.go
│ │ ├── service/
│ │ │ ├── book.go
│ │ │ └── converter.go
│ │ └── usecase/
│ │ └── book.go
│ └── common/
│ └── interceptor/
│ ├── initialize.go
│ ├── logger.go
│ └── validator.go
└── util/
└── file/
└── file.gobuf
Buf CLI是一个protobuf构建工具,可以帮助简化配置。安装可查看教程,在此不再赘述。
buf.yaml
运行buf config init将初始化一个buf.yaml文件,内容如下:
# For details on buf.yaml configuration, visit https://buf.build/docs/configuration/v2/buf-yaml
version: v2
lint:
use:
- STANDARD
breaking:
use:
- FILE此时需要手动添加modules来指定需要处理的proto文件,例如:
version: v2
modules:
- path: api另两个选项分别为lint和breaking分别用于设置静态代码分析策略和破坏性更改检测规则,保持默认就行。详细配置可以参考文档。可以分别使用buf lint命令和buf breaking命令来进行代码检查和破坏性变更检测。
配置完成后可以使用buf build检查是否有问题。
buf.gen.yaml
如果不使用buf cli,可以直接在proto文件中配置option go_package,然后运行如下命令来生成stub文件:
protoc \
--proto_path=./api \
--go_out=./api \
--go_opt=paths=source_relative \
--go-grpc_out=./api \
--go-grpc_opt=paths=source_relative \
api/book/v1/book.proto \
api/common/v1/types.proto这套方法肉眼可见的复杂。而buf.gen.yaml就是将配置项写到文件中从而简化配置。
buf.gen.yaml没有默认内容,我直接给出一个示例进行讲解。更多配置可以查看文档。
version: v2
managed:
enabled: true
override:
- file_option: go_package
path: book/v1
value: bookstore/api/book/v1;bookv1
- file_option: go_package
path: common/v1
value: bookstore/api/common/v1;commonv1
plugins:
- remote: buf.build/protocolbuffers/go
out: api
opt:
- paths=source_relative
- remote: buf.build/grpc/go
out: api
opt:
- paths=source_relative
inputs:
- directory: apimanaged:托管模式。可以将文件和字段option统一定义在buf.gen.yaml中而不是在每个proto文件中都写一遍。例如我的配置就等价于option go_package = "bookstore/api/book/v1;bookv1";。更多配置项参见文档。 如果需要覆写多个文件的option使用managed mode会很方便,但为少数文件专门使用反而增加了复杂度。此外,虽然启用managed mode后原proto文件中可以不写相关option,但还是建议也写上保持对原生protoc的兼容等。plugins:使用插件。这里的remote指的是托管在BSR(Buf Schema Registry)上的远程插件。也可以使用protoc的内置插件和本地插件。例如使用本地插件的示例如下:- local: protoc-gen-go out: api/bookstore/v1 opt: - paths=source_relative - local: protoc-gen-go-grpc out: api/bookstore/v1 opt: - paths=source_relative如上配置等价于
protoc --go_out=api/bookstore/v1 --go_opt=paths=source_relative --go-grpc_out=api/bookstore/v1 --go-grpc_opt=paths=source_relative,可以看出就相当于把冗长的命令参数写到配置文件中来简化配置。inputs:输入列表,这个很好理解。这个配置是可选的,默认行为就是查找所有proto文件。具体可用参数参见文档。
配置完成后可以使用buf generate生成go的stub代码。
protobuf
protobuf的详细语法可以查阅文档和protobuf笔记,在次不再赘述。
这里只是结合目录结构和部分代码进行简单讲解。
./api/
├── book/
│ └── v1/
│ └── book.proto
└── common/
└── v1/
└── types.proto// api/book/v1/book.proto
syntax = "proto3";
package book.v1;
import "common/v1/types.proto";
import "google/protobuf/field_mask.proto";
import "google/protobuf/struct.proto";
import "google/protobuf/timestamp.proto";
option go_package = "bookstore/api/book/v1;bookv1";
service BookService {
rpc GetBook(GetBookRequest) returns (GetBookResponse);
rpc CreateBook(CreateBookRequest) returns (CreateBookResponse);
rpc ListBooks(ListBooksRequest) returns (ListBooksResponse);
rpc UpdateBook(UpdateBookRequest) returns (UpdateBookResponse);
rpc DeleteBook(DeleteBookRequest) returns (DeleteBookResponse);
}
message GetBookRequest {
int64 id = 1;
}
message GetBookResponse {
common.v1.Book book = 1;
}
message CreateBookRequest {
common.v1.Book book = 1;
}
message CreateBookResponse {
common.v1.Book book = 1;
}
message ListBooksRequest {
int32 page_number = 1;
int32 page_size = 2;
repeated common.v1.OrderBy order_by = 3;
map<string, google.protobuf.Value> filter = 4;
}
message ListBooksResponse {
repeated common.v1.Book books = 1;
int32 total_count = 2;
int32 page_number = 3;
int32 page_size = 4;
}
message UpdateBookRequest {
common.v1.Book book = 1;
google.protobuf.FieldMask update_mask = 2;
}
message UpdateBookResponse {
common.v1.Book book = 1;
}
message DeleteBookRequest {
int64 id = 1;
}
message DeleteBookResponse {
int64 id = 1;
google.protobuf.Timestamp deleted_at = 2;
}// api/common/v1/types.proto
syntax = "proto3";
package common.v1;
import "google/protobuf/timestamp.proto";
option go_package = "bookstore/api/common/v1;commonv1";
enum BookStatus {
BOOK_STATUS_UNSPECIFIED = 0;
BOOK_STATUS_AVAILABLE = 1; // 可用
BOOK_STATUS_UNAVAILABLE = 2; // 不可用
BOOK_STATUS_BORROWED = 3; // 借出
BOOK_STATUS_LOST = 4; // 丢失
BOOK_STATUS_RESERVED = 6; // 预约
}
message Book {
int64 id = 1;
BookStatus status = 2;
google.protobuf.Timestamp created_at = 3;
google.protobuf.Timestamp updated_at = 4;
google.protobuf.Timestamp deleted_at = 5;
string title = 6;
string author = 7;
double price = 8;
string isbn = 9;
string publisher = 10;
google.protobuf.Timestamp published_at = 11;
}
message OrderBy {
string field = 1;
bool ascending = 2;
}package是proto之间互相引用的包名。可以注意到book.proto从types.proto引用Book时就是common.v1.Book。
import用于引入其他proto文件,可以是本地文件也可以是从其他地方引入。例如book.proto的import "common/v1/types.proto";就是本地proto,import "google/protobuf/struct.proto";就是google/protobuf里的proto。需要注意的是,即使是同一目录下的不同文件在引用时也需要显式import,这点和Go的默认行为并不一致。
这里我们在引入google/protobuf的相关文件时不需要显式声明依赖,是因为google/protobuf是内置在protobuf中的,相当与标准库。而在引入其他第三方库时,往往需要手动安装(protoc)或显式声明(buf cli)。
option之前有讲过,是语言特定的一些定义选项。
service和message分别定义rpc服务和消息结构体,repeated和enum定义数组和枚举,语法都比较简单。
gRPC服务端
关于proto部分,在之前已经介绍地差不多了。现在我们来看看如何在go中实现gorc服务端并用上之前的proto stub。
如下是我规划的项目结构,按照server → service → usecase → repo进行分层:
./
├── api/
├── cmd/
│ ├── client/
│ └── server/
│ └── book/ # 入口
└── internal/
├── client/
└── server/
└── book/
├── model/
│ ├── biz/ # 业务层模型
│ └── db/ # 数据库模型
├── repo/ # 数据仓库层
├── server/ # gRPC server
├── service/ # gRPC 服务实现
└── usecase/ # 用例层首先我们要实现在proto中定义的服务,如下:
// internal/server/book/service/book.go
type BookService struct {
bookv1.UnimplementedBookServiceServer
bu *usecase.BookUsecase
}
func NewBookService(bu *usecase.BookUsecase) *BookService {
return &BookService{bu: bu}
}
func (s *BookService) GetBook(ctx context.Context, req *bookv1.GetBookRequest) (*bookv1.GetBookResponse, error) {
bizBook, err := s.bu.GetBook(ctx, req.Id)
if err != nil {
return nil, status.Errorf(codes.Internal, "failed to get book: %v", err)
}
return &bookv1.GetBookResponse{Book: BizToV1Book(bizBook)}, nil
}
func (s *BookService) CreateBook(ctx context.Context, req *bookv1.CreateBookRequest) (*bookv1.CreateBookResponse, error) {
// ...
}
func (s *BookService) ListBooks(ctx context.Context, req *bookv1.ListBooksRequest) (*bookv1.ListBooksResponse, error) {
// ...
}
func (s *BookService) UpdateBook(ctx context.Context, req *bookv1.UpdateBookRequest) (*bookv1.UpdateBookResponse, error) {
// ...
}
func (s *BookService) DeleteBook(ctx context.Context, req *bookv1.DeleteBookRequest) (*bookv1.DeleteBookResponse, error) {
// ...
}我们需要自行定义一个结构体,嵌入protobuf生成的stub代码中的UnimplementedXXXServer结构体,并实现相关方法。这就相当于handler或controller层了。
然后我们启动gprc服务。此处的依赖注入实现较简略仅作示例。生产环境可以换成wire或dig。
// internal/server/book/server/server.go
type BookServer struct {
port int
}
func NewBookServer(port int) *BookServer {
return &BookServer{port: port}
}
func (s *BookServer) Run(ctx context.Context) error {
lis, err := net.Listen("tcp", fmt.Sprintf(":%d", s.port))
if err != nil {
return fmt.Errorf("failed to listen: %w", err)
}
defer lis.Close()
grpcServer := grpc.NewServer()
bookv1.RegisterBookServiceServer(grpcServer, BuildBookService())
go func() {
<-ctx.Done()
log.Println("shutting down server...")
grpcServer.GracefulStop()
}()
log.Printf("server listening at port %d", s.port)
if err := grpcServer.Serve(lis); err != nil {
if err == grpc.ErrServerStopped {
log.Println("server stopped")
return nil
}
return fmt.Errorf("failed to serve: %w", err)
}
return nil
}
func BuildBookService() *service.BookService {
br := repo.NewBookRepository()
bu := usecase.NewBookUsecase(br)
return service.NewBookService(bu)
}总结流程,首先使用net.Listen()监听一个端口,然后用grpc.NewServer()声明一个grpc.Server类型的grpc服务,使用stub代码中的RegisterBookServiceServer()方法将你自定义的结构体注册到这个grpc服务中,最后用grpc.Server.Serve()在监听器上提供服务。
继续往项目里填充crud代码,运行go run cmd/server/book/main.go就可以启动这个服务了。
grpcurl
要测试启动的grpc服务是否可用,可以实现一个客户端,也可以使用grpcurl这个工具。就简单测试而言直接使用grpcurl就行。
go install github.com/fullstorydev/grpcurl/cmd/grpcurl@latest启用反射服务
由于grpc使用protobuf的二进制编码,而不是json那种自描述的文本编码,所以我们需要提供proto文件或者反射服务,让客户端了解服务列表、方法列表、消息定义等。通常完整客户端都是和服务端共用相同的proto文件,而我们为了调试方便使用grpcurl时就启用反射服务。
可以使用google.golang.org/grpc/reflection包将grpc.Server注册到反射服务中:
import "google.golang.org/grpc/reflection"
func (s *BookServer) Run(ctx context.Context) error {
// ...
grpcServer := grpc.NewServer()
reflection.Register(grpcServer) // new
bookv1.RegisterBookServiceServer(grpcServer, BuildBookService())
// ...
}反射服务在收到查询请求时,会实时查询grpc.Server中已注册的所有服务,而不是快照注册时的服务列表,所以注册反射服务和注册业务服务的顺序没有影响。
grpcurl基础用法
现在我们可以通过grpcurl调用grpc服务了,用法为:grpcurl [flags] [address] [list|describe] [symbol]。
-plaintext
Use plain-text HTTP/2 when connecting to server (no TLS).由于我们实现的服务端没有启用证书,所以我们grpcurl测试时的所有命令都要添加-plaintext选项来进行明文通讯。生产环境应该启用tls加密。
可以使用list查看服务和方法列表:
$ grpcurl -plaintext localhost:8081 list
book.v1.BookService
grpc.reflection.v1.ServerReflection
grpc.reflection.v1alpha.ServerReflection
$ grpcurl -plaintext localhost:8081 list book.v1.BookService
book.v1.BookService.CreateBook
book.v1.BookService.DeleteBook
book.v1.BookService.GetBook
book.v1.BookService.ListBooks
book.v1.BookService.UpdateBook可以使用describe查看服务和方法列表:
$ grpcurl -plaintext localhost:8081 describe book.v1.BookService
book.v1.BookService is a service:
service BookService {
rpc CreateBook ( .book.v1.CreateBookRequest ) returns ( .book.v1.CreateBookResponse );
rpc DeleteBook ( .book.v1.DeleteBookRequest ) returns ( .book.v1.DeleteBookResponse );
rpc GetBook ( .book.v1.GetBookRequest ) returns ( .book.v1.GetBookResponse );
rpc ListBooks ( .book.v1.ListBooksRequest ) returns ( .book.v1.ListBooksResponse );
rpc UpdateBook ( .book.v1.UpdateBookRequest ) returns ( .book.v1.UpdateBookResponse );
}
$ grpcurl -plaintext localhost:8081 describe book.v1.BookService.GetBook
book.v1.BookService.GetBook is a method:
rpc GetBook ( .book.v1.GetBookRequest ) returns ( .book.v1.GetBookResponse );
$ grpcurl -plaintext localhost:8081 describe .book.v1.GetBookResponse
book.v1.GetBookResponse is a message:
message GetBookResponse {
.common.v1.Book book = 1;
}
$ grpcurl -plaintext localhost:8081 describe .common.v1.Book
common.v1.Book is a message:
message Book {
int64 id = 1;
.common.v1.BookStatus status = 2;
.google.protobuf.Timestamp created_at = 3;
.google.protobuf.Timestamp updated_at = 4;
.google.protobuf.Timestamp deleted_at = 5;
string title = 6;
string author = 7;
double price = 8;
string isbn = 9;
string publisher = 10;
.google.protobuf.Timestamp published_at = 11;
}最后是调用方法,可以通过-d标志传入一个json字符串作为输入参数。如果传入@字符则是从标准输入读取json作为参数,一般用于复杂json输入,也可用于测试流方法。
$ grpcurl -plaintext -d '{"id":1}' localhost:8081 book.v1.BookService/GetBook
{
"book": {
"id": "1",
"title": "The Great Gatsby",
"author": "F. Scott Fitzgerald",
"price": 10.99,
"isbn": "978-0-7432-1967-1",
"publisher": "Scribner",
"publishedAt": "2026-04-16T01:47:46.470161300Z"
}
}
# linux
$ grpcurl -plaintext -d @ localhost:8081 book.v1.BookService/GetBook <<EOM
{"id": 1}
EOM
{
"book": {
"id": "1",
"title": "The Great Gatsby",
"author": "F. Scott Fitzgerald",
"price": 10.99,
"isbn": "978-0-7432-1967-1",
"publisher": "Scribner",
"publishedAt": "2026-04-16T01:47:11.511262700Z"
}
}# windows(Powershell)
> @"
{"id": 1}
"@ | grpcurl -plaintext -d '@' localhost:8081 book.v1.BookService/GetBook
{
"book": {
"id": "1",
"title": "The Great Gatsby",
"author": "F. Scott Fitzgerald",
"price": 10.99,
"isbn": "978-0-7432-1967-1",
"publisher": "Scribner",
"publishedAt": "2026-04-16T02:02:47.932611Z"
}
}grpc客户端
除了使用grpcurl直接测试grpc服务,还需要实现grpc客户端供微服务之间互相调用。如下是一个实现示例:
// internal/client/book/client.go
type Client struct {
bookv1.BookServiceClient
conn *grpc.ClientConn
}
func NewClient(host string, port int) (*Client, error) {
serverAddr := fmt.Sprintf("%s:%d", host, port)
opts := []grpc.DialOption{
grpc.WithTransportCredentials(insecure.NewCredentials()),
}
conn, err := grpc.NewClient(serverAddr, opts...)
if err != nil {
return nil, fmt.Errorf("failed to create client: %w", err)
}
return &Client{
BookServiceClient: bookv1.NewBookServiceClient(conn),
conn: conn,
}, nil
}
func (c *Client) Close() error {
return c.conn.Close()
}
func (c *Client) GetBook(ctx context.Context, id int64) (*commonv1.Book, error) {
resp, err := c.BookServiceClient.GetBook(ctx, &bookv1.GetBookRequest{Id: id})
if err != nil {
return nil, fmt.Errorf("failed to get book: %w", err)
}
return resp.Book, nil
}
// ...逻辑还是比较清晰的。首先确认服务地址serverAddr,然后处理连接选项opts。这里我们使用的grpc.WithTransportCredentials(insecure.NewCredentials()),因为服务端没有启用tls加密。通过grpc.NewClient()获取到一个grpc连接conn,再通过bookv1.NewBookServiceClient()创建book服务的客户端。需要注意的是,要将conn变量保存下来用于关闭连接。最后我们为客户端实现各个方法就行。
流式rpc
除了常规的请求-响应,grpc还支持各种流式rpc方法,包括客户端流,服务端流和双向流等。让我们为各种流式方法设计业务场景并尝试实现:
- 客户端流:批量导入
- 服务端流:批量导出
- 双向流:心跳检测
protobuf定义流
在protobuf中,要启用流特性,需要在声明方法时添加stream关键字进行标识:
service BookService {
rpc GetBook(GetBookRequest) returns (GetBookResponse);
rpc CreateBook(CreateBookRequest) returns (CreateBookResponse);
rpc ListBooks(ListBooksRequest) returns (ListBooksResponse);
rpc UpdateBook(UpdateBookRequest) returns (UpdateBookResponse);
rpc DeleteBook(DeleteBookRequest) returns (DeleteBookResponse);
// new
rpc BatchImportBooks(stream BatchImportBooksRequest) returns (BatchImportBooksResponse);
rpc BatchExportBooks(BatchExportBooksRequest) returns (stream BatchExportBooksResponse);
rpc HeartBeat(stream HeartBeatRequest) returns (stream HeartBeatResponse);
}
message BatchImportBooksRequest {
common.v1.Book book = 1;
}
message BatchImportBooksResponse {
int32 total = 1;
int32 success = 2;
repeated string error_messages = 3;
}
message BatchExportBooksRequest {
repeated int64 book_ids = 1;
}
message BatchExportBooksResponse {
int64 book_id = 1;
common.v1.Book book = 2;
string error_message = 3;
}
message HeartBeatRequest {
// 1: ping, 2: pong
common.v1.HeartBeatType type = 1;
int64 sent_at = 2;
}
message HeartBeatResponse {
// 1: ping, 2: pong
common.v1.HeartBeatType type = 1;
int64 sent_at = 2;
int64 received_at = 3;
}
enum HeartBeatType {
HEART_BEAT_TYPE_UNSPECIFIED = 0;
HEART_BEAT_TYPE_PING = 1; // 心跳请求
HEART_BEAT_TYPE_PONG = 2; // 心跳响应
}流式rpc服务端实现
然后是三个方法的服务端go实现,方法签名如下:
func (s *BookService) BatchImportBooks(stream bookv1.BookService_BatchImportBooksServer) error
func (s *BookService) BatchExportBooks(req *bookv1.BatchExportBooksRequest, stream bookv1.BookService_BatchExportBooksServer) error
func (s *BookService) HeartBeat(stream bookv1.BookService_HeartBeatServer) error可以看到,它使用服务名_方法名Server取代了原来的request和response。查看定义可以发现请求和响应被封装到了对应的流接口中:
type BookService_BatchImportBooksServer = grpc.ClientStreamingServer[BatchImportBooksRequest, BatchImportBooksResponse]
type BookService_BatchExportBooksServer = grpc.ServerStreamingServer[BatchExportBooksResponse]
type BookService_HeartBeatServer = grpc.BidiStreamingServer[HeartBeatRequest, HeartBeatResponse]查看接口定义,可以看到它们各自定义了Send()和Recv()等相关方法用于发送和接收消息。
type ServerStreamingServer[Res any] interface {
Send(*Res) error
ServerStream
}
type ClientStreamingServer[Req any, Res any] interface {
Recv() (*Req, error)
SendAndClose(*Res) error
ServerStream
}
type BidiStreamingServer[Req any, Res any] interface {
Recv() (*Req, error)
Send(*Res) error
ServerStream
}需要注意的是,这些方法都不是并发安全的。因为对于绝大多数简单的、顺序处理的流,一个内置的sync.Mutex会带来不必要的性能损耗。同时gRPC的设计者也希望开发者明确地规划发送和接收的角色权限。所以如果在多个goroutine中都有调用同一个方法,需要自行设计同步机制。例如使用一个专用的goroutine来执行所有的Send()或Recv()操作,或者使用sync.Mutex进行保护。
按照由易到难的顺序,我们依次实现服务端流,客户端流和双向流。
首先是服务端流,只有一次输入,分多次输出结果:
func (s *BookService) BatchExportBooks(req *bookv1.BatchExportBooksRequest, stream bookv1.BookService_BatchExportBooksServer) error {
for _, bookId := range req.BookIds {
book, err := s.bu.GetBook(stream.Context(), bookId)
if err != nil {
if sendErr := stream.Send(&bookv1.BatchExportBooksResponse{
BookId: bookId,
ErrorMessage: fmt.Sprintf("failed to get book %s: %v", book.Title, err),
}); sendErr != nil {
return status.Errorf(codes.Internal, "failed to send response: %v", sendErr)
}
continue
}
if err := stream.Send(&bookv1.BatchExportBooksResponse{Book: BizToV1Book(book)}); err != nil {
return status.Errorf(codes.Internal, "failed to send response: %v", err)
}
}
return nil
}然后是客户端流,多次输入直到流关闭,最后一次性输出结果。
func (s *BookService) BatchImportBooks(stream bookv1.BookService_BatchImportBooksServer) error {
var total int32
var success int32
var errorMessages []string
for {
req, err := stream.Recv()
if err == io.EOF {
break
}
if err != nil {
return status.Errorf(codes.Internal, "failed to receive request: %v", err)
}
bizBook := V1ToBizBook(req.Book)
_, err = s.bu.CreateBook(stream.Context(), bizBook)
if err != nil {
errorMessages = append(errorMessages, fmt.Sprintf("failed to create book %s: %v", bizBook.Title, err))
} else {
success++
}
total++
}
return stream.SendAndClose(&bookv1.BatchImportBooksResponse{
Total: total,
Success: success,
ErrorMessages: errorMessages,
})
}最后是双向流,双方都随时可以进行输入输出:
func (s *BookService) HeartBeat(stream bookv1.BookService_HeartBeatServer) error {
sendCh := make(chan *bookv1.HeartBeatResponse, 10)
done := make(chan struct{})
defer close(done)
// 发送消息到客户端
sender := func() {
for {
select {
case msg, ok := <-sendCh:
if !ok {
return
}
if err := stream.Send(msg); err != nil {
log.Printf("failed to send message: %v", err)
return
}
log.Println("sent message:", msg.Type)
case <-done:
log.Println("sender goroutine stopped")
return
case <-stream.Context().Done():
log.Println("client disconnected, stopping sender goroutine")
return
}
}
}
// 定时发送心跳消息
ticker := func() {
jitter := time.Duration(rand.Intn(10)) * time.Second
ticker := time.NewTicker(1*time.Minute + jitter)
defer ticker.Stop()
for {
select {
case <-ticker.C:
sendCh <- &bookv1.HeartBeatResponse{
Type: commonv1.HeartBeatType_HEART_BEAT_TYPE_PING,
SentAt: time.Now().UnixMilli(),
}
case <-done:
log.Println("ticker goroutine stopped")
return
case <-stream.Context().Done():
log.Println("client disconnected, stopping ticker goroutine")
return
}
}
}
// 接收客户端的心跳消息
receiver := func() error {
for {
req, err := stream.Recv()
if err == io.EOF {
break
}
if err != nil {
return status.Errorf(codes.Internal, "failed to receive request: %v", err)
}
switch req.Type {
case commonv1.HeartBeatType_HEART_BEAT_TYPE_PING:
sendCh <- &bookv1.HeartBeatResponse{
Type: commonv1.HeartBeatType_HEART_BEAT_TYPE_PONG,
SentAt: req.SentAt,
}
case commonv1.HeartBeatType_HEART_BEAT_TYPE_PONG:
receivedAt := time.Now().UnixMilli()
log.Println("received pong, latency:", receivedAt-req.SentAt)
default:
log.Println("received unknown heart beat type")
return status.Errorf(codes.InvalidArgument, "unknown heart beat type: %v", req.Type)
}
}
return nil
}
go ticker()
go sender()
return receiver()
}流式rpc客户端实现
然后我们尝试实现客户端。流式rpc的服务端和客户端实现差不多,都是操作一个流对象通过Send()和Recv()等相关方法收发信息。
客户端的各种流接口定义如下:
type ServerStreamingClient[Res any] interface {
Recv() (*Res, error)
ClientStream
}
type ClientStreamingClient[Req any, Res any] interface {
Send(*Req) error
CloseAndRecv() (*Res, error)
ClientStream
}
type BidiStreamingClient[Req any, Res any] interface {
Send(*Req) error
Recv() (*Res, error)
ClientStream
}和服务端流接口一样,这些方法也不是并发安全的,需要自行设计同步机制。
具体实现如下:
服务端流:
func (c *Client) BatchExportBooks(ctx context.Context, bookIds []int64) ([]*commonv1.Book, []string, error) {
req := &bookv1.BatchExportBooksRequest{BookIds: bookIds}
stream, err := c.BookServiceClient.BatchExportBooks(ctx, req)
if err != nil {
return nil, nil, fmt.Errorf("failed to export books: %w", err)
}
books := make([]*commonv1.Book, 0)
errorMessages := make([]string, 0)
for {
resp, err := stream.Recv()
if err == io.EOF {
break
}
if err != nil {
return nil, nil, fmt.Errorf("failed to receive response: %w", err)
}
if resp.ErrorMessage != "" {
errorMessages = append(errorMessages, fmt.Sprintf("failed to export book with ID %d: %s", resp.BookId, resp.ErrorMessage))
continue
}
books = append(books, resp.Book)
}
return books, errorMessages, nil
}客户端流:
func (c *Client) BatchImportBooks(ctx context.Context, books []*commonv1.Book) (*bookv1.BatchImportBooksResponse, error) {
stream, err := c.BookServiceClient.BatchImportBooks(ctx)
if err != nil {
return nil, fmt.Errorf("failed to import books: %w", err)
}
for _, book := range books {
err := stream.Send(&bookv1.BatchImportBooksRequest{Book: book})
if err != nil {
return nil, fmt.Errorf("failed to send request: %w", err)
}
}
resp, err := stream.CloseAndRecv()
if err != nil {
return nil, fmt.Errorf("failed to close stream: %w", err)
}
return resp, nil
}双向流:
func (c *Client) HeartBeat(ctx context.Context) error {
stream, err := c.BookServiceClient.HeartBeat(ctx)
if err != nil {
return fmt.Errorf("failed to create heart beat stream: %w", err)
}
sendCh := make(chan *bookv1.HeartBeatRequest, 10)
done := make(chan struct{})
defer close(done)
// 发送消息到服务端
sender := func() {
for {
select {
case msg, ok := <-sendCh:
if !ok {
return
}
if err := stream.Send(msg); err != nil {
log.Printf("failed to send message: %v", err)
return
}
log.Println("sent message:", msg.Type)
case <-done:
log.Println("sender goroutine stopped")
return
case <-stream.Context().Done():
log.Println("client disconnected, stopping sender goroutine")
return
}
}
}
// 定时发送心跳消息
ticker := func() {
jitter := time.Duration(rand.Intn(10)) * time.Second
ticker := time.NewTicker(1*time.Minute + jitter)
defer ticker.Stop()
for {
select {
case <-ticker.C:
sendCh <- &bookv1.HeartBeatRequest{
Type: commonv1.HeartBeatType_HEART_BEAT_TYPE_PING,
SentAt: time.Now().UnixMilli(),
}
case <-done:
log.Println("ticker goroutine stopped")
return
case <-stream.Context().Done():
log.Println("client disconnected, stopping ticker goroutine")
return
}
}
}
// 接收服务端的心跳消息
receiver := func() error {
for {
req, err := stream.Recv()
if err == io.EOF {
break
}
if err != nil {
return status.Errorf(codes.Internal, "failed to receive request: %v", err)
}
switch req.Type {
case commonv1.HeartBeatType_HEART_BEAT_TYPE_PING:
sendCh <- &bookv1.HeartBeatRequest{
Type: commonv1.HeartBeatType_HEART_BEAT_TYPE_PONG,
SentAt: req.SentAt,
}
case commonv1.HeartBeatType_HEART_BEAT_TYPE_PONG:
receivedAt := time.Now().UnixMilli()
log.Println("received pong, latency:", receivedAt-req.SentAt)
default:
log.Println("received unknown heart beat type")
return status.Errorf(codes.InvalidArgument, "unknown heart beat type: %v", req.Type)
}
}
return nil
}
go ticker()
go sender()
return receiver()
}参数校验
protobuf原生不提供参数校验功能,需要引入第三方的插件(protoc-gen-validate)或者库(protovalidate)。这里我们选择protovalidate。
protovalidate的详细语法在此不作赘述,可以查阅文档进行了解。这里只简单介绍如何导入依赖和简单使用。
首先要在buf.yaml中添加依赖,并执行buf dep update更新依赖:
version: v2
modules:
- path: api
deps:
- buf.build/bufbuild/protovalidate # new
lint:
use:
- STANDARD
breaking:
use:
- FILE在proto文件中通过import buf/validate/validate.proto引入依赖,就可以添加验证规则了。添加验证后的book.proto部分代码如下:
message GetBookRequest {
int64 id = 1 [(buf.validate.field).int64.gte = 1];
}
message CreateBookRequest {
common.v1.Book book = 1 [(buf.validate.field).cel = {
id: "create_book.required_fields"
message: "title and author are required"
expression:
"this.title != ''"
"&& this.author != ''"
}];
}
message ListBooksRequest {
optional int32 page_number = 1 [(buf.validate.field).int32.gte = 1];
optional int32 page_size = 2 [(buf.validate.field).int32.gte = 1];
repeated common.v1.OrderBy order_by = 3;
map<string, google.protobuf.Value> filter = 4;
}
message UpdateBookRequest {
common.v1.Book book = 1 [(buf.validate.field).cel = {
id: "update_book.required_fields"
message: "id is required"
expression: "this.id > 0"
}];
google.protobuf.FieldMask update_mask = 2;
}可以看到,既可以直接为字段设置简单的验证规则,也可以使用cel表达式设置更为复杂的验证规则。
要在go中执行参数校验,需要安装buf.build/go/protovalidate依赖。
go get buf.build/go/protovalidate@latest可以直接使用protovalidate.Validate()方法验证proto转换的结构体:
// internal/server/book/service/book.go
func (s *BookService) GetBook(ctx context.Context, req *bookv1.GetBookRequest) (*bookv1.GetBookResponse, error) {
protovalidate.Validate(req) // new
bizBook, err := s.bu.GetBook(ctx, req.Id)
if err != nil {
return nil, status.Errorf(codes.Internal, "failed to get book: %v", err)
}
return &bookv1.GetBookResponse{Book: BizToV1Book(bizBook)}, nil
}而对gRPC项目,则可以将protovalidate注册为拦截器:
// internal/server/book/server/server.go
func (s *BookServer) Run(ctx context.Context) error {
// ...
validator, err := protovalidate.New()
if err != nil {
return fmt.Errorf("failed to create validator: %w", err)
}
interceptor := func(ctx context.Context, req any, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (any, error) {
if err := validator.Validate(req.(proto.Message)); err != nil {
return nil, status.Errorf(codes.InvalidArgument, "invalid request: %v", err)
}
return handler(ctx, req)
}
grpcServer := grpc.NewServer(grpc.UnaryInterceptor(interceptor))
// ...
}扩展:gRPC校验方案演进史
timeline title gRPC校验方案演进史 section 2017-2019 方案A:go-proto-validators : 早期方案 : 需生成代码,与 go-grpc-middleware 配合 section 2019-2023 方案B:protoc-gen-validate (PGV) : 成为事实标准 : 规则更丰富,生成代码成熟 section 2023-Now 方案C:protovalidate : Buf 官方推出 : 无代码生成,性能更强
- 方案A:
go-proto-validators(较老,生态渐微)- 规则定义:在
.proto中使用(validator.field)选项。 - 校验执行:通过
protoc插件 生成 Go代码,代码中包含Validate()方法。 - gRPC集成:需要手动调用
msg.Validate(),或使用go-grpc-middleware/validator拦截器自动调用。 - 当前状态:维护少,不推荐新项目。作者
mwitkow已停止主动维护,且对最新protobufAPI 支持不佳。
- 规则定义:在
- 方案B:
protoc-gen-validate(PGV) (主流,但正被取代)- 规则定义:在
.proto中使用(validate.rules)选项。 - 校验执行:通过
protoc插件 生成 Go代码,代码中包含ValidateAll()等方法。 - gRPC集成:同方案A,可手动调用或使用
go-grpc-middleware/validator拦截器。 - 当前状态:非常成熟,大量项目在用。但依赖代码生成,且已被更优的方案C挑战。
- 规则定义:在
- 方案C:
protovalidate(Buf出品,现代推荐)- 规则定义:在
.proto中使用(buf.validate.field)选项 (更丰富的规则集)。 - 校验执行:无需代码生成。在运行时动态读取
.proto中的规则并执行校验。提供protovalidate.Validate(msg)函数。 - gRPC集成:可手动调用,或使用
go-grpc-middleware/protovalidate包中的拦截器。 - 当前状态:未来方向。Buf 公司官方维护,性能好,不污染生成的代码。
- 规则定义:在
总结:
- 老方案:在proto里写规则 → 生成 Validate() 方法 → 用 go-grpc-middleware 的拦截器自动调用。
- 新方案:在proto里写规则 → 直接调用 protovalidate.Validate() 或它的拦截器。
拦截器
既然上一节提到了拦截器,这一节就展开讲讲。拦截器其实就相当于其他地方的“中间件”(middleware)概念,用于在rpc调用前后执行特定操作。
客户端和服务端拦截器不同,一元和流式rpc的拦截器也不同,两两组合遍有四种拦截器:
type UnaryClientInterceptor func(ctx context.Context, method string, req, reply any, cc *ClientConn, invoker UnaryInvoker, opts ...CallOption) error
type StreamClientInterceptor func(ctx context.Context, desc *StreamDesc, cc *ClientConn, method string, streamer Streamer, opts ...CallOption) (ClientStream, error)
type UnaryServerInterceptor func(ctx context.Context, req any, info *UnaryServerInfo, handler UnaryHandler) (resp any, err error)
type StreamServerInterceptor func(srv any, ss ServerStream, info *StreamServerInfo, handler StreamHandler) error服务端一元rpc拦截器
我们先看看服务端一元rpc拦截器,使用示例同上节:
// internal/server/book/server/server.go
func (s *BookServer) Run(ctx context.Context) error {
// ...
validator, err := protovalidate.New()
if err != nil {
return fmt.Errorf("failed to create validator: %w", err)
}
interceptor := func(ctx context.Context, req any, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (any, error) {
msg, ok := req.(proto.Message)
if !ok {
return nil, status.Errorf(codes.Internal, "unsupported message type: %T", req)
}
if err := validator.Validate(msg); err != nil {
return nil, status.Errorf(codes.InvalidArgument, "invalid request: %v", err)
}
return handler(ctx, req)
}
grpcServer := grpc.NewServer(grpc.UnaryInterceptor(interceptor))
// ...
}这里使用grpc.UnaryInterceptor()注册拦截器。需要注意的是该方法只能注册一个拦截器,所以如果有多个拦截器需要手动处理链式调用。不过grpc在v1.28.0中更新了ChainUnaryInterceptor()方法,可以直接链式注册多个拦截器,就不再需要手动处理了。两者的定义如下:
func UnaryInterceptor(i UnaryServerInterceptor) ServerOption
func ChainUnaryInterceptor(interceptors ...UnaryServerInterceptor) ServerOption拦截器全堆在server里太臃肿了,让我们更新项目结构,将拦截器统一放在统一目录下管理:
.\internal\server\common\/
└── interceptor/
├── initialize.go
└── validator.go除了手动定义拦截器,go-grpc-middleware中也提供了很多现成的拦截器可用。例如我们之前的验证拦截器就可以使用protovalidate这个子包:
// internal/server/common/interceptor/initialize.go
package interceptor
func Initialize() error {
if err := InitValidateInterceptor(); err != nil {
return err
}
return nil
}
// internal/server/common/interceptor/validator.go
package interceptor
import (
"buf.build/go/protovalidate"
protovalidate_middleware "github.com/grpc-ecosystem/go-grpc-middleware/v2/interceptors/protovalidate"
"google.golang.org/grpc"
)
var (
validator protovalidate.Validator
)
func InitValidateInterceptor() error {
var err error
validator, err = protovalidate.New()
if err != nil {
return err
}
return nil
}
func ValidateUnaryInterceptor(opts ...protovalidate_middleware.Option) grpc.UnaryServerInterceptor {
return protovalidate_middleware.UnaryServerInterceptor(validator, opts...)
}
func ValidateStreamInterceptor(opts ...protovalidate_middleware.Option) grpc.StreamServerInterceptor {
return protovalidate_middleware.StreamServerInterceptor(validator, opts...)
}
// internal/server/book/server/server.go
func (s *BookServer) Run(ctx context.Context) error {
// ...
grpcServer := grpc.NewServer(
grpc.ChainUnaryInterceptor(
interceptor.ValidateUnaryInterceptor(),
),
grpc.ChainStreamInterceptor(
interceptor.ValidateStreamInterceptor(),
),
)
// ...
}客户端一元rpc拦截器
再是客户端一元rpc拦截器,例如超时控制就可以实现如下:
func TimeoutInterceptor(timeout time.Duration) grpc.UnaryClientInterceptor {
return func(ctx context.Context, method string, req, reply any, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
timedCtx, cancel := context.WithTimeout(ctx, timeout)
defer cancel()
return invoker(timedCtx, method, req, reply, cc, opts...)
}
}go-grpc-middleware也封装好了超时控制和连接重试等客户端拦截器,我们可以直接集成到之前的客户端实现中:
import(
"github.com/grpc-ecosystem/go-grpc-middleware/v2/interceptors/retry"
"github.com/grpc-ecosystem/go-grpc-middleware/v2/interceptors/timeout"
)
func NewClient(host string, port int) (*Client, error) {
serverAddr := fmt.Sprintf("%s:%d", host, port)
opts := []grpc.DialOption{
grpc.WithTransportCredentials(insecure.NewCredentials()),
// new
grpc.WithChainUnaryInterceptor(
timeout.UnaryClientInterceptor(5*time.Second),
retry.UnaryClientInterceptor(
retry.WithMax(3),
retry.WithPerRetryTimeout(2*time.Second),
),
),
}
conn, err := grpc.NewClient(serverAddr, opts...)
// ...
}go-grpc-middleware中还有很多优秀的拦截器可用,感兴趣可以探索一下。
服务端流式rpc拦截器
type UnaryServerInterceptor func(ctx context.Context, req any, info *UnaryServerInfo, handler UnaryHandler) (resp any, err error)之前在服务端一元rpc拦截器中,我们通过go-grpc-middleware包直接提供了流式验证拦截器。尝试手动实现一下:
var (
validator protovalidate.Validator
)
func InitValidateInterceptor() error {
var err error
validator, err = protovalidate.New()
if err != nil {
return err
}
return nil
}
func ValidateStreamInterceptor() grpc.StreamServerInterceptor {
return func(srv any, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
return handler(srv, &wrappedServerStream{
ServerStream: ss,
validator: validator,
})
}
}
type wrappedServerStream struct {
grpc.ServerStream
validator protovalidate.Validator
}
func (w *wrappedServerStream) RecvMsg(m any) error {
if err := w.ServerStream.RecvMsg(m); err != nil {
return err
}
msg, ok := m.(proto.Message)
if !ok {
return status.Errorf(codes.Internal, "unsupported message type: %T", m)
}
return w.validator.Validate(msg)
}如上,我们包装了一个wrappedServerStream结构体实现grpc.ServerStream接口。同时重写其RecvMsg()方法,添加参数验证功能。
客户端流式rpc拦截器
客户端流式 RPC 的核心是发送多条消息,校验逻辑通常在消息构造时已完成,因此 SendMsg 拦截器的使用频率低于服务端的 RecvMsg 拦截器。如下是一个数据脱敏的拦截器实现示例。和服务端类似,包装客户端流接口grpc.ClientStream并重写发送方法SendMsg():
func SanitizeStreamInterceptor() grpc.StreamClientInterceptor {
return func(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream, error) {
clientStream, err := streamer(ctx, desc, cc, method, opts...)
if err != nil {
return nil, err
}
return &sanitizedClientStream{
ClientStream: clientStream,
}, nil
}
}
type sanitizedClientStream struct {
grpc.ClientStream
}
func (s *sanitizedClientStream) SendMsg(m any) error {
msg, ok := m.(proto.Message)
if !ok {
return status.Errorf(codes.Internal, "unsupported message type: %T", m)
}
switch v := msg.(type) {
case *commonv1.User:
v.RealName = sanitizeRealName(v.RealName)
v.Phone = sanitizePhone(v.Phone)
v.Email = sanitizeEmail(v.Email)
}
return s.ClientStream.SendMsg(msg)
}
// sanitizeRealName 保留首字母
func sanitizeRealName(name string) string {
name = strings.TrimSpace(name)
if len(name) <= 1 {
return name
}
runes := []rune(name)
return string(runes[0]) + strings.Repeat("*", len(runes)-1)
}
// sanitizePhone 保留前3后4
func sanitizePhone(phone string) string {
if len(phone) >= 7 {
return phone[:3] + "****" + phone[len(phone)-4:]
}
return "***"
}
// sanitizeEmail 保留首字母和域名
func sanitizeEmail(email string) string {
parts := strings.Split(email, "@")
if len(parts) != 2 {
return email
}
local := parts[0]
if len(local) <= 1 {
return email
}
return local[:1] + "***@" + parts[1]
}我们注意到,和grpc.StreamServerInterceptor中直接操作ServerStream不同,grpc.StreamClientInterceptor需要先用Streamer创建ClientStream再操作该客户端流。这意味着我们可以在流被创建之前进行拦截,例如一个限流拦截器可以实现如下:
func RateLimitStreamInterceptor(limiter *rate.Limiter) grpc.StreamClientInterceptor {
return func(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream, error) {
// 在创建流之前检查是否超过限流
if !limiter.Allow() {
return nil, status.Errorf(codes.ResourceExhausted, "rate limit exceeded for method: %s", method)
}
// 限流通过,创建流
return streamer(ctx, desc, cc, method, opts...)
}
}日志
日志功能实现
尝试为服务端添加日志拦截器以监控每个grpc请求的情况。这里使用的是官方的log/slog日志库。也可以使用其他日志库,除了初始化部分在grpc内的用法都差不多。
关于slog的具体用法在此不进行赘述,如下是最终的logger实现:
// internal/pkg/logger/logger.go
package logger
import (
"bookstore/internal/pkg/config"
"io"
"log/slog"
"os"
"github.com/natefinch/lumberjack"
)
var defaultLogger *slog.Logger
func InitLogger(cfg *config.Logging, logFile string) {
var level slog.Level
switch cfg.Level {
case "debug":
level = slog.LevelDebug
case "info":
level = slog.LevelInfo
case "warn":
level = slog.LevelWarn
case "error":
level = slog.LevelError
default:
level = slog.LevelDebug
}
var writer io.Writer
switch cfg.Output {
case "stdout":
writer = os.Stdout
case "file":
writer = &lumberjack.Logger{
Filename: logFile,
MaxSize: cfg.MaxSize,
MaxAge: cfg.MaxAge,
MaxBackups: cfg.MaxBackups,
Compress: cfg.Compress,
LocalTime: cfg.LocalTime,
}
default:
writer = os.Stdout
}
var handler slog.Handler
options := &slog.HandlerOptions{
AddSource: cfg.AddSource,
Level: level,
}
switch cfg.Format {
case "json":
handler = slog.NewJSONHandler(writer, options)
case "text":
handler = slog.NewTextHandler(writer, options)
default:
handler = slog.NewTextHandler(writer, options)
}
defaultLogger = slog.New(handler)
slog.SetDefault(defaultLogger)
}
func GetLogger() *slog.Logger {
return defaultLogger
}
func Debug(msg string, args ...any) { defaultLogger.Debug(msg, args...) }
func Info(msg string, args ...any) { defaultLogger.Info(msg, args...) }
func Warn(msg string, args ...any) { defaultLogger.Warn(msg, args...) }
func Error(msg string, args ...any) { defaultLogger.Error(msg, args...) }// internal/pkg/logger/context.go
package logger
import (
"context"
"log/slog"
)
type ctxKey struct{}
// WithContext 将 logger 注入到 context 中
func WithContext(ctx context.Context, logger *slog.Logger) context.Context {
return context.WithValue(ctx, ctxKey{}, logger)
}
// FromContext 从 context 中获取 logger,如果不存在则返回全局默认
func FromContext(ctx context.Context) *slog.Logger {
if logger, ok := ctx.Value(ctxKey{}).(*slog.Logger); ok {
return logger
}
return defaultLogger
}
// 带 context 的日志方法(自动携带请求 ID 等信息)
func DebugCtx(ctx context.Context, msg string, args ...any) {
FromContext(ctx).Debug(msg, args...)
}
func InfoCtx(ctx context.Context, msg string, args ...any) {
FromContext(ctx).Info(msg, args...)
}
func ErrorCtx(ctx context.Context, msg string, args ...any) {
FromContext(ctx).Error(msg, args...)
}关于logger注入context
在context.go中,选择将携带参数的logger注入上下文,方便实现请求级参数隔离,可用于链路追踪等。相比将相关参数放入上下文再在每次打印时取出参数进行组装,这样做可以减少重复劳动,降低出错的可能性。
总结:传递 Logger 而非传递参数,是将“能力”注入 Context,实现了一次组装、随处使用,避免了手动传递参数的遗漏风险。
日志拦截器实现
手动实现日志拦截器
func LoggingUnaryInterceptor() grpc.UnaryServerInterceptor {
return func(ctx context.Context, req any, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (any, error) {
start := time.Now()
l := logger.FromContext(ctx).With(slog.String("method", info.FullMethod))
ctx = logger.WithContext(ctx, l)
resp, err := handler(ctx, req)
duration := time.Since(start)
statusCode := status.Code(err)
l.Info("request handled",
"duration", duration.Milliseconds(),
"status_code", statusCode.String(),
"error", err,
)
if l.Enabled(ctx, slog.LevelDebug) {
l.Debug("request details",
"request", req,
"response", resp,
)
}
return resp, err
}
}
func LoggingStreamInterceptor() grpc.StreamServerInterceptor {
return func(srv any, stream grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
start := time.Now()
l := logger.FromContext(ctx).With(slog.String("method", info.FullMethod))
wrappedStream := &loggedServerStream{
ServerStream: stream,
logger: l,
}
err := handler(srv, wrappedStream)
duration := time.Since(start)
statusCode := status.Code(err)
l.Info("request handled",
"duration", duration.Milliseconds(),
"status_code", statusCode.String(),
"error", err,
"recv_count", wrappedStream.recvCount,
"send_count", wrappedStream.sendCount,
)
return err
}
}
type loggedServerStream struct {
grpc.ServerStream
logger *slog.Logger
recvCount int
sendCount int
}
func (s *loggedServerStream) RecvMsg(m any) error {
err := s.ServerStream.RecvMsg(m)
if err == nil {
s.recvCount++
s.logger.Debug("received message",
"count", s.recvCount,
"message", m,
)
}
return err
}
func (s *loggedServerStream) SendMsg(m any) error {
err := s.ServerStream.SendMsg(m)
if err == nil {
s.sendCount++
s.logger.Debug("sent message",
"count", s.sendCount,
"message", m,
)
}
return err
}
func (s *loggedServerStream) Context() context.Context {
ctx := s.ServerStream.Context()
if ctx == nil {
ctx = context.Background()
}
return logger.WithContext(ctx, s.logger)
}如上是手动实现的日志拦截器。可以在handler()前后收集信息并打印,也可以在stream重写的RecvMsg()和SendMsg()方法中进行打印。如上示例打印了请求用时和输入输出的日志。
使用go-grpc-middleware的logging包
除了手动实现,也可以直接使用go-grpc-middleware提供的日志中间件:
import "github.com/grpc-ecosystem/go-grpc-middleware/v2/interceptors/logging"
var (
interceptorLogger *slog.Logger
)
func InitLoggingInterceptor() error {
interceptorLogger = logger.GetLogger()
return nil
}
// InterceptorLogger 将 slog.Logger 适配为 logging.Logger 接口
func InterceptorLogger() logging.Logger {
return logging.LoggerFunc(func(ctx context.Context, lvl logging.Level, msg string, fields ...any) {
// logging.Level 与 slog.Level 数值对齐,可直接转换
interceptorLogger.Log(ctx, slog.Level(lvl), msg, fields...)
})
}
func LoggingUnaryInterceptor(opts ...logging.Option) grpc.UnaryServerInterceptor {
return logging.UnaryServerInterceptor(InterceptorLogger(), opts...)
}
func LoggingStreamInterceptor(opts ...logging.Option) grpc.StreamServerInterceptor {
return logging.StreamServerInterceptor(InterceptorLogger(), opts...)
}logging.Option配置详解
WithLogOnEvents- 配置记录哪些事件// 只记录调用开始和结束(默认) logging.WithLogOnEvents(logging.StartCall, logging.FinishCall) // 记录所有事件(包括 payload,非常详细) logging.WithLogOnEvents( logging.StartCall, logging.FinishCall, logging.PayloadReceived, logging.PayloadSent, ) // 只记录调用结束(适合生产环境) logging.WithLogOnEvents(logging.FinishCall)WithLevels- 自定义状态码到日志级别的映射logging.WithLevels(func(code codes.Code) logging.Level { switch code { case codes.OK: return logging.LevelInfo case codes.Canceled: return logging.LevelWarn case codes.Unknown, codes.Internal, codes.Unavailable: return logging.LevelError default: return logging.LevelInfo } })服务端和客户端的默认映射不同:
- 服务端默认:
OK/NotFound/Canceled→Info;DeadlineExceeded/Unavailable→Warn;Internal/Unknown→Error - 客户端默认:
OK/Canceled→Debug;Unknown/DeadlineExceeded→Info;Internal/Unavailable→Warn
- 服务端默认:
WithDurationField- 自定义耗时字段格式// 默认:{"grpc.time_ms": "12.345"} logging.WithDurationField(logging.DurationToTimeMillisFields) // 使用 duration 字符串:{"grpc.duration": "12.345ms"} logging.WithDurationField(logging.DurationToDurationField) // 自定义:{"duration_ms": 12345} logging.WithDurationField(func(duration time.Duration) logging.Fields { return logging.Fields{"duration_ms", duration.Milliseconds()} })WithErrorFields- 从错误中提取额外字段logging.WithErrorFields(func(err error) logging.Fields { // 假设你的错误类型包含额外信息 if customErr, ok := err.(interface{ Code() int }); ok { return logging.Fields{"error_code", customErr.Code()} } return nil })WithCodes- 自定义错误到状态码的映射logging.WithCodes(func(err error) codes.Code { if err == nil { return codes.OK } // 自定义错误类型 if customErr, ok := err.(interface{ GRPCStatus() *status.Status }); ok { return customErr.GRPCStatus().Code() } return codes.Unknown })WithFieldsFromContext- 从 Context 提取字段// 从 context 中提取 trace_id、user_id 等 logging.WithFieldsFromContext(func(ctx context.Context) logging.Fields { fields := logging.Fields{} if traceID := ctx.Value("trace_id"); traceID != nil { fields = append(fields, "trace_id", traceID) } if userID := ctx.Value("user_id"); userID != nil { fields = append(fields, "user_id", userID) } return fields })WithFieldsFromContextAndCallMeta- 从 Context 和调用元数据提取字段logging.WithFieldsFromContextAndCallMeta(func(ctx context.Context, c interceptors.CallMeta) logging.Fields { return logging.Fields{ "method_type", string(c.Typ), "service", c.Service, "method", c.Method, } })WithTimestampFormat- 自定义时间戳格式logging.WithTimestampFormat(time.RFC3339Nano) // 纳秒精度WithDisableLoggingFields- 禁用默认的 gRPC 字段默认会记录:
protocol、grpc.component、grpc.service、grpc.method、grpc.method_type// 禁用 method 和 method_type 字段 logging.WithDisableLoggingFields( logging.MethodFieldKey, logging.MethodTypeFieldKey, )
Context字段传递
logging包也提供了在Context中传递日志字段的机制。可以通过logging.InjectLogField()将字段注入上下文,在后续日志中自动打印。也可以使用logging.ExtractFields()从上下文中提取注入的字段。
import "github.com/grpc-ecosystem/go-grpc-middleware/v2/interceptors/logging"
// 在业务代码中添加字段
func (s *BookService) GetBook(ctx context.Context, req *pb.GetBookRequest) (*pb.GetBookResponse, error) {
// 添加请求特定的字段
ctx = logging.InjectLogField(ctx, "book_id", req.Id)
ctx = logging.InjectLogField(ctx, "user_id", getUserID(ctx))
// 这些字段会自动出现在后续的日志中
// ...
}
// 提取当前 context 中的字段(用于自己的日志)
fields := logging.ExtractFields(ctx)可观测性
可观测性是一个比较庞杂的概念。包括链路追踪(Trace),**指标(Metrics)和日志(Log)**等。我们通过将OpenTelemetry这个开源的可观测性框架集成到grpc中来实现可观测性。这里不展开讲解OpenTelemetry本身,详情可以查看文档。
otel初始化
OpenTelemetry Go 的日志组件仍处于Beta阶段refer,而且我们之前已经实现了一套日志功能,所以这里我们主要实现链路追踪和指标功能。大致流程为初始化resource和exporter,用两者初始化provider并设置为全局变量。
| 组件 | 作用 | 类比 |
|---|---|---|
| Resource | 标识服务的元信息(名称、版本、环境) | 快递单上的寄件人信息 |
| Exporter | 决定数据发送到哪里(OTLP、Jaeger、stdout) | 快递公司 |
| Provider | 管理遥测数据的生命周期和配置 | 快递网点 |
如下只是一个实现示例,具体各项指标需要根据项目实际情况进行自定义。
// internal/pkg/otel/otel.go
package otel
import (
"bookstore/internal/pkg/config"
"context"
"fmt"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc"
"go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc"
"go.opentelemetry.io/otel/sdk/metric"
"go.opentelemetry.io/otel/sdk/resource"
"go.opentelemetry.io/otel/sdk/trace"
semconv "go.opentelemetry.io/otel/semconv/v1.37.0"
)
func InitOtel(ctx context.Context, cfg *config.Otel, srvName string, srvVersion string, env string) (func(context.Context) error, error) {
// 创建资源
res, err := initResource(ctx, srvName, srvVersion, env)
if err != nil {
return nil, fmt.Errorf("failed to create resource: %w", err)
}
// 创建TracerProvider
tracerProvider, err := initTracerProvider(ctx, cfg, res)
if err != nil {
return nil, fmt.Errorf("failed to create tracer provider: %w", err)
}
// 创建MeterProvider
meterProvider, err := initMeterProvider(ctx, cfg, res)
if err != nil {
if tracerErr := tracerProvider.Shutdown(ctx); tracerErr != nil {
return nil, errors.Join(
fmt.Errorf("failed to create meter provider: %w", err),
fmt.Errorf("failed to shutdown tracer provider: %w", tracerErr),
)
}
return nil, fmt.Errorf("failed to create meter provider: %w", err)
}
otel.SetTracerProvider(tracerProvider)
otel.SetMeterProvider(meterProvider)
return func(ctx context.Context) error {
errs := make([]error, 0)
tracerErr := tracerProvider.Shutdown(ctx)
if tracerErr != nil {
errs = append(errs, fmt.Errorf("failed to shutdown tracer provider: %w", tracerErr))
}
meterErr := meterProvider.Shutdown(ctx)
if meterErr != nil {
errs = append(errs, fmt.Errorf("failed to shutdown meter provider: %w", meterErr))
}
if len(errs) > 0 {
return errors.Join(errs...)
}
return nil
}, nil
}
// 创建资源
func initResource(ctx context.Context, srvName string, srvVersion string, env string) (*resource.Resource, error) {
res, err := resource.New(ctx,
resource.WithAttributes(
semconv.ServiceNameKey.String(srvName),
semconv.ServiceVersionKey.String(srvVersion),
attribute.String("deployment.environment", env),
),
resource.WithProcess(), // 添加进程信息(PID、可执行文件路径等)
resource.WithOS(), // 添加操作系统信息
resource.WithHost(), // 添加主机名
)
if err != nil {
return nil, fmt.Errorf("failed to create resource: %w", err)
}
return resource.Merge(resource.Default(), res)
}
// 创建TracerProvider
func initTracerProvider(ctx context.Context, cfg *config.Otel, res *resource.Resource) (*trace.TracerProvider, error) {
opts := []otlptracegrpc.Option{
otlptracegrpc.WithEndpoint(cfg.OtelEndpoint),
otlptracegrpc.WithTimeout(cfg.ExportTimeout),
}
if cfg.Insecure {
opts = append(opts, otlptracegrpc.WithInsecure())
}
exporter, err := otlptracegrpc.New(ctx, opts...)
if err != nil {
return nil, fmt.Errorf("failed to create OTLP trace exporter: %w", err)
}
provider := trace.NewTracerProvider(
// 设置导出器
trace.WithBatcher(exporter,
trace.WithBatchTimeout(cfg.BatchTimeout),
),
// 设置资源
trace.WithResource(res),
// 设置采样器
trace.WithSampler(trace.TraceIDRatioBased(cfg.TraceSampleRate)),
)
return provider, nil
}
// 创建MeterProvider
func initMeterProvider(ctx context.Context, cfg *config.Otel, res *resource.Resource) (*metric.MeterProvider, error) {
opts := []otlpmetricgrpc.Option{
otlpmetricgrpc.WithEndpoint(cfg.OtelEndpoint),
otlpmetricgrpc.WithTimeout(cfg.ExportTimeout),
}
if cfg.Insecure {
opts = append(opts, otlpmetricgrpc.WithInsecure())
}
exporter, err := otlpmetricgrpc.New(ctx, opts...)
if err != nil {
return nil, fmt.Errorf("failed to create OTLP metric exporter: %w", err)
}
provider := metric.NewMeterProvider(
// 设置导出器
metric.WithReader(metric.NewPeriodicReader(exporter,
metric.WithInterval(cfg.ExportInterval),
)),
// 设置资源
metric.WithResource(res),
)
return provider, nil
}集成otel到grpc
要将初始化完成的otel集成到grpc中,最简单的方式就是使用go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc库进行仪表化。
// internal/server/book/server/server.go
import (
"go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc" // new
)
type BookServer struct {
cfg *config.Config
bookService *service.BookService
}
func NewBookServer(i do.Injector) (*BookServer, error) {
cfg := do.MustInvoke[*config.Config](i)
bookService := do.MustInvoke[*service.BookService](i)
return &BookServer{
cfg: cfg,
bookService: bookService,
}, nil
}
func (s *BookServer) Run(ctx context.Context) error {
// ...
grpcServer := grpc.NewServer(
grpc.StatsHandler(otelgrpc.NewServerHandler()), // new
grpc.ChainUnaryInterceptor(
interceptor.ValidateUnaryInterceptor(),
interceptor.LoggingUnaryInterceptor(loggingOptions...),
),
grpc.ChainStreamInterceptor(
interceptor.ValidateStreamInterceptor(),
interceptor.LoggingStreamInterceptor(loggingOptions...),
),
)
// ...
return nil
}使用 otelgrpc.NewServerHandler() 后,它会自动为每个 RPC 调用:
- 解析追踪上下文:从 gRPC Metadata 中读取
traceparent,实现链路串联 - 创建 Span:自动生成 Server Span,记录开始/结束时间
- 记录 Metrics:自动记录
rpc.server.duration、rpc.server.request.size等指标 - 传播 Baggage:如果客户端传递了业务属性,会自动放入 Context
RESTful API
我们可以使用grpc-gateway这个插件将RESTful HTTP API转换为gRPC,从而可以使用http请求访问gprc服务。
首先在buf.gen.yaml中添加插件:
plugins:
- remote: buf.build/protocolbuffers/go
out: api
opt:
- paths=source_relative
- remote: buf.build/grpc/go
out: api
opt:
- paths=source_relative
- remote: buf.build/grpc-ecosystem/gateway # new
out: api
opt:
- paths=source_relative然后在buf.yaml中添加依赖,并执行buf dep update更新依赖:
version: v2
modules:
- path: api
deps:
- buf.build/bufbuild/protovalidate
- buf.build/googleapis/googleapis # new
lint:
use:
- STANDARD
breaking:
use:
- FILE在proto文件中引入google/api/annotations.proto依赖,就可以编辑http路由规则了:
import "google/api/annotations.proto";
service BookService {
rpc GetBook(GetBookRequest) returns (GetBookResponse) {
option (google.api.http) = {get: "/v1/books/{id}"};
}
rpc CreateBook(CreateBookRequest) returns (CreateBookResponse) {
option (google.api.http) = {
post: "/v1/books"
body: "*"
};
}
rpc ListBooks(ListBooksRequest) returns (ListBooksResponse) {
option (google.api.http) = {get: "/v1/books"};
}
rpc UpdateBook(UpdateBookRequest) returns (UpdateBookResponse) {
option (google.api.http) = {
put: "/v1/books/{book.id}"
body: "*"
};
}
rpc DeleteBook(DeleteBookRequest) returns (DeleteBookResponse) {
option (google.api.http) = {delete: "/v1/books/{id}"};
}
rpc BatchImportBooks(stream BatchImportBooksRequest) returns (BatchImportBooksResponse) {}
rpc BatchExportBooks(BatchExportBooksRequest) returns (stream BatchExportBooksResponse) {}
rpc HeartBeat(stream HeartBeatRequest) returns (stream HeartBeatResponse) {}
}需要注意的是,HTTP请求和gRPC流式方法的兼容性:
| RPC 类型 | HTTP Gateway 支持 | 说明 |
|---|---|---|
| 普通 Unary RPC | ✅ 完全支持 | 请求-响应模式,完美转换 |
| 服务端流式 (Server Streaming) | ⚠️ 有限支持 | 转换为 Server-Sent Events (SSE),客户端需要特殊处理 |
| 客户端流式 (Client Streaming) | ❌ 不支持 | HTTP/1.1 无法表示客户端流 |
| 双向流式 (Bidirectional Streaming) | ❌ 不支持 | 无法通过 HTTP/1.1 实现 |
因此对于 BatchImportBooks(客户端流)、HeartBeat(双向流)等流式方法,不建议添加 HTTP 映射。
protobuf相关配置完成,接下来我们集成到go程序中,为grpc服务添加http网关。首先添加http网关的相关配置:
// internal/pkg/config/config.go
type BookService struct {
Name string `mapstructure:"name"` // 服务名称(必填)
Version string `mapstructure:"version"` // 服务版本
Host string `mapstructure:"host"`
Port int `mapstructure:"port"`
LogFile string `mapstructure:"log_file"`
HTTPPort int `mapstructure:"http_port"` // new
EnableHTTP bool `mapstructure:"enable_http"` // new
}然后实现http网关逻辑。初始化grpc-gateway的路由器mux用于将HTTP请求映射到gRPC方法,将其注册到grpc服务中,再启动一个http服务器接受http请求给mux处理。
flowchart LR
subgraph External[外部]
Client[HTTP Client
JSON]
end
subgraph Gateway[HTTP Gateway]
Router[mux Router
协议转换]
GRPCClient[gRPC Client
处理调用]
end
subgraph Backend[gRPC Backend]
Service[gRPC Service
业务逻辑]
end
Client -->|HTTP Request
JSON| Router
Router -->|转发调用| GRPCClient
GRPCClient -->|gRPC Call
Protobuf| Service
Service -->|gRPC Response
Protobuf| GRPCClient
GRPCClient -->|返回响应| Router
Router -->|HTTP Response
JSON| Client
style Router fill:#e1f5fe
style GRPCClient fill:#f3e5f5
style Service fill:#e8f5e9
import "github.com/grpc-ecosystem/grpc-gateway/v2/runtime"
func (s *BookServer) runHTTPGateway(ctx context.Context) error {
mux := runtime.NewServeMux()
opts := []grpc.DialOption{
grpc.WithTransportCredentials(insecure.NewCredentials()),
}
grpcEndpoint := fmt.Sprintf("localhost:%d", s.cfg.Services.Book.Port)
err := bookv1.RegisterBookServiceHandlerFromEndpoint(ctx, mux, grpcEndpoint, opts)
if err != nil {
return err
}
httpAddr := fmt.Sprintf(":%d", s.cfg.Services.Book.HTTPPort)
httpServer := &http.Server{
Addr: httpAddr,
Handler: mux,
}
go func() {
<-ctx.Done()
log.Println("shutting down http gateway...")
shutdownCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
if err := httpServer.Shutdown(shutdownCtx); err != nil {
log.Printf("failed to shutdown http gateway: %v", err)
}
}()
log.Printf("http gateway listening at port %d", s.cfg.Services.Book.HTTPPort)
if err := httpServer.ListenAndServe(); err != nil {
if err == http.ErrServerClosed {
log.Println("http gateway closed")
return nil
}
return fmt.Errorf("failed to serve http gateway: %w", err)
}
return nil
}最后将启动http网关集成到启动grpc服务的流程中。注意http网关应该在grpc服务之后启动,所以我这里简单加了个time.Sleep()。
func (s *BookServer) Run(ctx context.Context) error {
if s.cfg.Services.Book.EnableHTTP {
go func() {
time.Sleep(1 * time.Second) // wait for grpc server to start
if err := s.runHTTPGateway(ctx); err != nil {
log.Printf("failed to run http gateway: %v", err)
}
}()
}
...
}