🔧 微服务与 gRPC
微服务架构设计模式、gRPC 高性能通信、服务发现、消息队列——构建可扩展的分布式系统。
1. 微服务架构
客户端 (Web / Mobile / Third-party)
发起 HTTP / gRPC 请求
↓
API 网关 (Kong / Traefik / 自研)
统一认证 · 限流 · 路由 · 负载均衡 · SSL 终止
↓
用户服务 :8081
注册、登录、Profile、权限
订单服务 :8082
下单、支付、状态跟踪
通知服务 :8083
邮件、短信、Push 推送
↓
PostgreSQL
用户数据库
PostgreSQL
订单数据库
Kafka
消息队列 · 事件总线
用户服务 / 订单服务 → 发布事件 → Kafka → 通知服务消费
微服务 vs 单体
何时选微服务
- 团队规模 > 50人,需独立部署
- 不同模块有差异化扩展需求
- 需要多语言技术栈混合
- 系统已足够复杂,单体难以维护
微服务的代价
- 分布式系统的复杂性(网络、事务)
- 服务发现、负载均衡基础设施
- 分布式链路追踪的必要性
- 数据一致性更难保证
2. gRPC 高性能 RPC
gRPC vs REST
gRPC 使用 HTTP/2 + Protobuf 二进制编码,性能比 JSON REST 高 5-10 倍;支持双向流;强类型接口契约;适合服务间通信(内部调用)。
定义 Protobuf 协议
// api/user/v1/user.proto
syntax = "proto3";
package user.v1;
option go_package = "github.com/myapp/api/user/v1;userv1";
import "google/protobuf/timestamp.proto";
// 消息定义
message User {
int64 id = 1;
string name = 2;
string email = 3;
string role = 4;
bool active = 5;
google.protobuf.Timestamp created_at = 6;
}
message GetUserRequest { int64 id = 1; }
message GetUserResponse { User user = 1; }
message ListUsersRequest {
int32 page = 1;
int32 page_size = 2;
string search = 3;
}
message ListUsersResponse {
repeated User users = 1;
int64 total = 2;
}
message CreateUserRequest {
string name = 1;
string email = 2;
string password = 3;
}
// 服务定义
service UserService {
rpc GetUser (GetUserRequest) returns (GetUserResponse);
rpc ListUsers (ListUsersRequest) returns (ListUsersResponse);
rpc CreateUser (CreateUserRequest) returns (User);
// 服务端流式 RPC: 实时推送用户事件
rpc WatchUsers (WatchUsersRequest) returns (stream User);
// 双向流式 RPC: 批量处理
rpc BatchProcess (stream CreateUserRequest) returns (stream User);
}
# 安装工具
go install google.golang.org/protobuf/cmd/protoc-gen-go@latest
go install google.golang.org/grpc/cmd/protoc-gen-go-grpc@latest
# 生成 Go 代码
protoc --go_out=. --go_opt=paths=source_relative \
--go-grpc_out=. --go-grpc_opt=paths=source_relative \
api/user/v1/user.proto
gRPC 服务端实现
package grpcserver
import (
"context"
"net"
"log"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"google.golang.org/grpc/reflection"
pb "github.com/myapp/api/user/v1"
)
type UserServer struct {
pb.UnimplementedUserServiceServer // 必须嵌入,实现前向兼容
svc service.UserService
}
// 实现 GetUser
func (s *UserServer) GetUser(ctx context.Context, req *pb.GetUserRequest) (*pb.GetUserResponse, error) {
if req.Id <= 0 {
return nil, status.Errorf(codes.InvalidArgument, "用户 ID 无效: %d", req.Id)
}
user, err := s.svc.GetUser(ctx, req.Id)
if err != nil {
if errors.Is(err, service.ErrNotFound) {
return nil, status.Errorf(codes.NotFound, "用户 %d 不存在", req.Id)
}
return nil, status.Errorf(codes.Internal, "内部错误: %v", err)
}
return &pb.GetUserResponse{User: toProtoUser(user)}, nil
}
// 实现服务端流
func (s *UserServer) WatchUsers(req *pb.WatchUsersRequest, stream pb.UserService_WatchUsersServer) error {
eventCh := s.svc.SubscribeUserEvents(stream.Context())
for {
select {
case user, ok := <-eventCh:
if !ok {
return nil
}
if err := stream.Send(toProtoUser(user)); err != nil {
return status.Errorf(codes.Unavailable, "发送失败: %v", err)
}
case <-stream.Context().Done():
return stream.Context().Err()
}
}
}
// 实现双向流
func (s *UserServer) BatchProcess(stream pb.UserService_BatchProcessServer) error {
for {
req, err := stream.Recv()
if err == io.EOF {
return nil
}
if err != nil {
return err
}
user, err := s.svc.CreateUser(stream.Context(), req)
if err != nil {
return err
}
if err := stream.Send(toProtoUser(user)); err != nil {
return err
}
}
}
// 启动 gRPC 服务器
func RunGRPCServer(svc service.UserService) {
lis, _ := net.Listen("tcp", ":50051")
// 拦截器 (类似 HTTP 中间件)
srv := grpc.NewServer(
grpc.ChainUnaryInterceptor(
loggingInterceptor,
recoveryInterceptor,
authInterceptor,
),
grpc.ChainStreamInterceptor(
streamLoggingInterceptor,
),
)
pb.RegisterUserServiceServer(srv, &UserServer{svc: svc})
reflection.Register(srv) // 允许 grpcurl 等工具探索
log.Println("gRPC 服务器启动: :50051")
log.Fatal(srv.Serve(lis))
}
gRPC 客户端
package grpcclient
import (
"context"
"time"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/keepalive"
pb "github.com/myapp/api/user/v1"
)
func NewUserClient(addr string) (pb.UserServiceClient, func(), error) {
conn, err := grpc.NewClient(addr,
grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithKeepaliveParams(keepalive.ClientParameters{
Time: 10 * time.Second,
Timeout: 3 * time.Second,
PermitWithoutStream: true,
}),
grpc.WithDefaultCallOptions(
grpc.MaxCallRecvMsgSize(4*1024*1024), // 4MB
),
)
if err != nil {
return nil, nil, err
}
client := pb.NewUserServiceClient(conn)
cleanup := func() { conn.Close() }
return client, cleanup, nil
}
// 使用客户端
func ExampleCall(client pb.UserServiceClient) {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
user, err := client.GetUser(ctx, &pb.GetUserRequest{Id: 1})
if err != nil {
// gRPC 错误码处理
st, ok := status.FromError(err)
if ok {
switch st.Code() {
case codes.NotFound:
fmt.Println("用户不存在")
case codes.DeadlineExceeded:
fmt.Println("请求超时")
default:
fmt.Printf("gRPC 错误: %s\n", st.Message())
}
}
return
}
fmt.Printf("用户: %s <%s>\n", user.User.Name, user.User.Email)
}
3. 消息队列:Kafka
go get github.com/IBM/sarama
package kafka
import (
"context"
"encoding/json"
"github.com/IBM/sarama"
)
// ─── 生产者 ───────────────────────────────────────────────────
type Producer struct {
client sarama.SyncProducer
}
func NewProducer(brokers []string) (*Producer, error) {
cfg := sarama.NewConfig()
cfg.Producer.RequiredAcks = sarama.WaitForAll // 所有副本确认
cfg.Producer.Retry.Max = 3
cfg.Producer.Return.Successes = true
cfg.Producer.Compression = sarama.CompressionSnappy
client, err := sarama.NewSyncProducer(brokers, cfg)
if err != nil {
return nil, err
}
return &Producer{client: client}, nil
}
type Event struct {
Type string `json:"type"`
Payload any `json:"payload"`
}
func (p *Producer) Publish(topic string, key string, event Event) error {
value, err := json.Marshal(event)
if err != nil {
return err
}
msg := &sarama.ProducerMessage{
Topic: topic,
Key: sarama.StringEncoder(key),
Value: sarama.ByteEncoder(value),
}
_, _, err = p.client.SendMessage(msg)
return err
}
// ─── 消费者组 ─────────────────────────────────────────────────
type ConsumerGroup struct {
group sarama.ConsumerGroup
handler sarama.ConsumerGroupHandler
}
type Handler struct {
ready chan bool
process func(msg *sarama.ConsumerMessage) error
}
func (h *Handler) Setup(s sarama.ConsumerGroupSession) error {
close(h.ready)
return nil
}
func (h *Handler) Cleanup(s sarama.ConsumerGroupSession) error { return nil }
func (h *Handler) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
for msg := range claim.Messages() {
if err := h.process(msg); err != nil {
// 处理失败可选择: 重试、死信队列、跳过
log.Printf("消息处理失败: %v, offset: %d", err, msg.Offset)
continue
}
// 手动提交 offset (确保至少一次消费)
session.MarkMessage(msg, "")
}
return nil
}
func NewConsumerGroup(brokers []string, groupID string, topics []string, processor func(*sarama.ConsumerMessage) error) error {
cfg := sarama.NewConfig()
cfg.Consumer.Offsets.Initial = sarama.OffsetNewest
cfg.Consumer.Group.Rebalance.GroupStrategies = []sarama.BalanceStrategy{
sarama.NewBalanceStrategyRoundRobin(),
}
group, _ := sarama.NewConsumerGroup(brokers, groupID, cfg)
handler := &Handler{ready: make(chan bool), process: processor}
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
// 处理重平衡信号
for {
if err := group.Consume(ctx, topics, handler); err != nil {
return err
}
handler.ready = make(chan bool)
}
}
4. 服务发现与 API 网关
package registry
import (
"context"
"fmt"
"time"
clientv3 "go.etcd.io/etcd/client/v3"
)
type ServiceRegistry struct {
client *clientv3.Client
leaseID clientv3.LeaseID
}
type ServiceInfo struct {
Name string `json:"name"`
Host string `json:"host"`
Port int `json:"port"`
Version string `json:"version"`
}
// 服务注册 (带心跳)
func (r *ServiceRegistry) Register(ctx context.Context, info ServiceInfo) error {
// 创建 lease (TTL 10秒)
lease, err := r.client.Grant(ctx, 10)
if err != nil {
return err
}
r.leaseID = lease.ID
key := fmt.Sprintf("/services/%s/%s:%d", info.Name, info.Host, info.Port)
value, _ := json.Marshal(info)
// 注册服务 (绑定 lease)
_, err = r.client.Put(ctx, key, string(value),
clientv3.WithLease(lease.ID))
if err != nil {
return err
}
// 自动续租 (keepalive)
ch, err := r.client.KeepAlive(ctx, lease.ID)
if err != nil {
return err
}
go func() {
for range ch {
// 持续消费续租响应
}
}()
fmt.Printf("服务注册成功: %s\n", key)
return nil
}
// 服务发现
func (r *ServiceRegistry) Discover(ctx context.Context, serviceName string) ([]ServiceInfo, error) {
prefix := fmt.Sprintf("/services/%s/", serviceName)
resp, err := r.client.Get(ctx, prefix, clientv3.WithPrefix())
if err != nil {
return nil, err
}
var services []ServiceInfo
for _, kv := range resp.Kvs {
var info ServiceInfo
if err := json.Unmarshal(kv.Value, &info); err == nil {
services = append(services, info)
}
}
return services, nil
}
// 监听服务变更
func (r *ServiceRegistry) Watch(ctx context.Context, serviceName string, onChange func([]ServiceInfo)) {
prefix := fmt.Sprintf("/services/%s/", serviceName)
watchCh := r.client.Watch(ctx, prefix, clientv3.WithPrefix())
for resp := range watchCh {
for _, event := range resp.Events {
fmt.Printf("服务变更: %s %s\n", event.Type, event.Kv.Key)
}
services, _ := r.Discover(ctx, serviceName)
onChange(services)
}
}
微服务设计原则
① 单一职责:每个服务只做一件事 ② 松耦合:服务间通过接口或消息通信 ③ 数据隔离:每个服务有自己的数据库 ④ 失败容忍:实现熔断器(Circuit Breaker) ⑤ 可观测:每个服务都暴露健康检查和 metrics