🔧 微服务与 gRPC
微服务架构设计模式、gRPC 高性能通信、服务发现、消息队列——构建可扩展的分布式系统。
1. 微服务架构
客户端 (Web / Mobile / Third-party)
发起 HTTP / gRPC 请求
↓
API 网关 (Kong / Traefik / 自研)
统一认证 · 限流 · 路由 · 负载均衡 · SSL 终止
↓
用户服务 :8081
注册、登录、Profile、权限
订单服务 :8082
下单、支付、状态跟踪
通知服务 :8083
邮件、短信、Push 推送
↓
PostgreSQL
用户数据库
PostgreSQL
订单数据库
Kafka
消息队列 · 事件总线
用户服务 / 订单服务 → 发布事件 → Kafka → 通知服务消费
微服务 vs 单体
何时选微服务
- 团队规模 > 50人,需独立部署
- 不同模块有差异化扩展需求
- 需要多语言技术栈混合
- 系统已足够复杂,单体难以维护
微服务的代价
- 分布式系统的复杂性(网络、事务)
- 服务发现、负载均衡基础设施
- 分布式链路追踪的必要性
- 数据一致性更难保证
2. gRPC 高性能 RPC
gRPC vs REST
gRPC 使用 HTTP/2 + Protobuf 二进制编码,性能比 JSON REST 高 5-10 倍;支持双向流;强类型接口契约;适合服务间通信(内部调用)。
定义 Protobuf 协议
// api/user/v1/user.proto
syntax = "proto3";
package user.v1;
option go_package = "github.com/myapp/api/user/v1;userv1";
import "google/protobuf/timestamp.proto";
// 消息定义
message User {
int64 id = 1;
string name = 2;
string email = 3;
string role = 4;
bool active = 5;
google.protobuf.Timestamp created_at = 6;
}
message GetUserRequest { int64 id = 1; }
message GetUserResponse { User user = 1; }
message ListUsersRequest {
int32 page = 1;
int32 page_size = 2;
string search = 3;
}
message ListUsersResponse {
repeated User users = 1;
int64 total = 2;
}
message CreateUserRequest {
string name = 1;
string email = 2;
string password = 3;
}
// 服务定义
service UserService {
rpc GetUser (GetUserRequest) returns (GetUserResponse);
rpc ListUsers (ListUsersRequest) returns (ListUsersResponse);
rpc CreateUser (CreateUserRequest) returns (User);
// 服务端流式 RPC: 实时推送用户事件
rpc WatchUsers (WatchUsersRequest) returns (stream User);
// 双向流式 RPC: 批量处理
rpc BatchProcess (stream CreateUserRequest) returns (stream User);
}
# 安装工具
go install google.golang.org/protobuf/cmd/protoc-gen-go@latest
go install google.golang.org/grpc/cmd/protoc-gen-go-grpc@latest
# 生成 Go 代码
protoc --go_out=. --go_opt=paths=source_relative \
--go-grpc_out=. --go-grpc_opt=paths=source_relative \
api/user/v1/user.proto
gRPC 服务端实现
package grpcserver
import (
"context"
"net"
"log"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"google.golang.org/grpc/reflection"
pb "github.com/myapp/api/user/v1"
)
type UserServer struct {
pb.UnimplementedUserServiceServer // 必须嵌入,实现前向兼容
svc service.UserService
}
// 实现 GetUser
func (s *UserServer) GetUser(ctx context.Context, req *pb.GetUserRequest) (*pb.GetUserResponse, error) {
if req.Id <= 0 {
return nil, status.Errorf(codes.InvalidArgument, "用户 ID 无效: %d", req.Id)
}
user, err := s.svc.GetUser(ctx, req.Id)
if err != nil {
if errors.Is(err, service.ErrNotFound) {
return nil, status.Errorf(codes.NotFound, "用户 %d 不存在", req.Id)
}
return nil, status.Errorf(codes.Internal, "内部错误: %v", err)
}
return &pb.GetUserResponse{User: toProtoUser(user)}, nil
}
// 实现服务端流
func (s *UserServer) WatchUsers(req *pb.WatchUsersRequest, stream pb.UserService_WatchUsersServer) error {
eventCh := s.svc.SubscribeUserEvents(stream.Context())
for {
select {
case user, ok := <-eventCh:
if !ok {
return nil
}
if err := stream.Send(toProtoUser(user)); err != nil {
return status.Errorf(codes.Unavailable, "发送失败: %v", err)
}
case <-stream.Context().Done():
return stream.Context().Err()
}
}
}
// 实现双向流
func (s *UserServer) BatchProcess(stream pb.UserService_BatchProcessServer) error {
for {
req, err := stream.Recv()
if err == io.EOF {
return nil
}
if err != nil {
return err
}
user, err := s.svc.CreateUser(stream.Context(), req)
if err != nil {
return err
}
if err := stream.Send(toProtoUser(user)); err != nil {
return err
}
}
}
// 启动 gRPC 服务器
func RunGRPCServer(svc service.UserService) {
lis, _ := net.Listen("tcp", ":50051")
// 拦截器 (类似 HTTP 中间件)
srv := grpc.NewServer(
grpc.ChainUnaryInterceptor(
loggingInterceptor,
recoveryInterceptor,
authInterceptor,
),
grpc.ChainStreamInterceptor(
streamLoggingInterceptor,
),
)
pb.RegisterUserServiceServer(srv, &UserServer{svc: svc})
reflection.Register(srv) // 允许 grpcurl 等工具探索
log.Println("gRPC 服务器启动: :50051")
log.Fatal(srv.Serve(lis))
}
gRPC 客户端
package grpcclient
import (
"context"
"time"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/keepalive"
pb "github.com/myapp/api/user/v1"
)
func NewUserClient(addr string) (pb.UserServiceClient, func(), error) {
conn, err := grpc.NewClient(addr,
grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithKeepaliveParams(keepalive.ClientParameters{
Time: 10 * time.Second,
Timeout: 3 * time.Second,
PermitWithoutStream: true,
}),
grpc.WithDefaultCallOptions(
grpc.MaxCallRecvMsgSize(4*1024*1024), // 4MB
),
)
if err != nil {
return nil, nil, err
}
client := pb.NewUserServiceClient(conn)
cleanup := func() { conn.Close() }
return client, cleanup, nil
}
// 使用客户端
func ExampleCall(client pb.UserServiceClient) {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
user, err := client.GetUser(ctx, &pb.GetUserRequest{Id: 1})
if err != nil {
// gRPC 错误码处理
st, ok := status.FromError(err)
if ok {
switch st.Code() {
case codes.NotFound:
fmt.Println("用户不存在")
case codes.DeadlineExceeded:
fmt.Println("请求超时")
default:
fmt.Printf("gRPC 错误: %s\n", st.Message())
}
}
return
}
fmt.Printf("用户: %s <%s>\n", user.User.Name, user.User.Email)
}
3. 消息队列:Kafka
go get github.com/IBM/sarama
package kafka
import (
"context"
"encoding/json"
"github.com/IBM/sarama"
)
// ─── 生产者 ───────────────────────────────────────────────────
type Producer struct {
client sarama.SyncProducer
}
func NewProducer(brokers []string) (*Producer, error) {
cfg := sarama.NewConfig()
cfg.Producer.RequiredAcks = sarama.WaitForAll // 所有副本确认
cfg.Producer.Retry.Max = 3
cfg.Producer.Return.Successes = true
cfg.Producer.Compression = sarama.CompressionSnappy
client, err := sarama.NewSyncProducer(brokers, cfg)
if err != nil {
return nil, err
}
return &Producer{client: client}, nil
}
type Event struct {
Type string `json:"type"`
Payload any `json:"payload"`
}
func (p *Producer) Publish(topic string, key string, event Event) error {
value, err := json.Marshal(event)
if err != nil {
return err
}
msg := &sarama.ProducerMessage{
Topic: topic,
Key: sarama.StringEncoder(key),
Value: sarama.ByteEncoder(value),
}
_, _, err = p.client.SendMessage(msg)
return err
}
// ─── 消费者组 ─────────────────────────────────────────────────
type ConsumerGroup struct {
group sarama.ConsumerGroup
handler sarama.ConsumerGroupHandler
}
type Handler struct {
ready chan bool
process func(msg *sarama.ConsumerMessage) error
}
func (h *Handler) Setup(s sarama.ConsumerGroupSession) error {
close(h.ready)
return nil
}
func (h *Handler) Cleanup(s sarama.ConsumerGroupSession) error { return nil }
func (h *Handler) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
for msg := range claim.Messages() {
if err := h.process(msg); err != nil {
// 处理失败可选择: 重试、死信队列、跳过
log.Printf("消息处理失败: %v, offset: %d", err, msg.Offset)
continue
}
// 手动提交 offset (确保至少一次消费)
session.MarkMessage(msg, "")
}
return nil
}
func NewConsumerGroup(brokers []string, groupID string, topics []string, processor func(*sarama.ConsumerMessage) error) error {
cfg := sarama.NewConfig()
cfg.Consumer.Offsets.Initial = sarama.OffsetNewest
cfg.Consumer.Group.Rebalance.GroupStrategies = []sarama.BalanceStrategy{
sarama.NewBalanceStrategyRoundRobin(),
}
group, _ := sarama.NewConsumerGroup(brokers, groupID, cfg)
handler := &Handler{ready: make(chan bool), process: processor}
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
// 处理重平衡信号
for {
if err := group.Consume(ctx, topics, handler); err != nil {
return err
}
handler.ready = make(chan bool)
}
}
4. 服务发现与 API 网关
package registry
import (
"context"
"fmt"
"time"
clientv3 "go.etcd.io/etcd/client/v3"
)
type ServiceRegistry struct {
client *clientv3.Client
leaseID clientv3.LeaseID
}
type ServiceInfo struct {
Name string `json:"name"`
Host string `json:"host"`
Port int `json:"port"`
Version string `json:"version"`
}
// 服务注册 (带心跳)
func (r *ServiceRegistry) Register(ctx context.Context, info ServiceInfo) error {
// 创建 lease (TTL 10秒)
lease, err := r.client.Grant(ctx, 10)
if err != nil {
return err
}
r.leaseID = lease.ID
key := fmt.Sprintf("/services/%s/%s:%d", info.Name, info.Host, info.Port)
value, _ := json.Marshal(info)
// 注册服务 (绑定 lease)
_, err = r.client.Put(ctx, key, string(value),
clientv3.WithLease(lease.ID))
if err != nil {
return err
}
// 自动续租 (keepalive)
ch, err := r.client.KeepAlive(ctx, lease.ID)
if err != nil {
return err
}
go func() {
for range ch {
// 持续消费续租响应
}
}()
fmt.Printf("服务注册成功: %s\n", key)
return nil
}
// 服务发现
func (r *ServiceRegistry) Discover(ctx context.Context, serviceName string) ([]ServiceInfo, error) {
prefix := fmt.Sprintf("/services/%s/", serviceName)
resp, err := r.client.Get(ctx, prefix, clientv3.WithPrefix())
if err != nil {
return nil, err
}
var services []ServiceInfo
for _, kv := range resp.Kvs {
var info ServiceInfo
if err := json.Unmarshal(kv.Value, &info); err == nil {
services = append(services, info)
}
}
return services, nil
}
// 监听服务变更
func (r *ServiceRegistry) Watch(ctx context.Context, serviceName string, onChange func([]ServiceInfo)) {
prefix := fmt.Sprintf("/services/%s/", serviceName)
watchCh := r.client.Watch(ctx, prefix, clientv3.WithPrefix())
for resp := range watchCh {
for _, event := range resp.Events {
fmt.Printf("服务变更: %s %s\n", event.Type, event.Kv.Key)
}
services, _ := r.Discover(ctx, serviceName)
onChange(services)
}
}
微服务设计原则
① 单一职责:每个服务只做一件事 ② 松耦合:服务间通过接口或消息通信 ③ 数据隔离:每个服务有自己的数据库 ④ 失败容忍:实现熔断器(Circuit Breaker) ⑤ 可观测:每个服务都暴露健康检查和 metrics
4. 熔断器模式(Circuit Breaker)
原理:熔断器模仿电路断路器的工作方式,保护系统免受级联故障(Cascading Failure)的影响。当下游服务出现大量错误时,熔断器"断开",后续请求直接返回错误而不再等待超时,给下游服务恢复的时间,同时防止调用方线程/goroutine 被大量阻塞。
- 关闭(Closed):正常状态,所有请求通过。统计失败次数,超过阈值后转为 Open。
- 打开(Open):熔断状态,所有请求立即失败(fail-fast),不访问下游。经过冷却时间(Reset Timeout)后转为 Half-Open。
- 半开(Half-Open):尝试恢复状态,放行少量探测请求。如果成功则转为 Closed,如果失败则重新转为 Open。
package circuitbreaker
import (
"errors"
"sync"
"time"
)
type State int
const (
StateClosed State = iota // 正常
StateOpen // 熔断
StateHalfOpen // 半开探测
)
var ErrCircuitOpen = errors.New("circuit breaker is open")
type CircuitBreaker struct {
mu sync.Mutex
state State
failureCount int // 连续失败次数
successCount int // 半开状态下连续成功次数
lastFailureTime time.Time
// 配置
maxFailures int // 失败阈值(转 Open)
halfOpenSuccess int // 半开成功阈值(转 Closed)
resetTimeout time.Duration // Open → HalfOpen 的等待时间
}
func New(maxFailures, halfOpenSuccess int, resetTimeout time.Duration) *CircuitBreaker {
return &CircuitBreaker{
maxFailures: maxFailures,
halfOpenSuccess: halfOpenSuccess,
resetTimeout: resetTimeout,
}
}
func (cb *CircuitBreaker) Execute(fn func() error) error {
cb.mu.Lock()
state := cb.currentState()
switch state {
case StateOpen:
cb.mu.Unlock()
return ErrCircuitOpen // 快速失败
case StateHalfOpen:
// 半开状态:只允许一个探测请求
cb.state = StateHalfOpen
cb.mu.Unlock()
default:
cb.mu.Unlock()
}
err := fn() // 执行实际请求
cb.mu.Lock()
defer cb.mu.Unlock()
if err != nil {
cb.onFailure()
} else {
cb.onSuccess()
}
return err
}
func (cb *CircuitBreaker) currentState() State {
if cb.state == StateOpen {
// 检查是否到了半开窗口
if time.Since(cb.lastFailureTime) > cb.resetTimeout {
cb.state = StateHalfOpen
cb.successCount = 0
}
}
return cb.state
}
func (cb *CircuitBreaker) onFailure() {
cb.successCount = 0
cb.lastFailureTime = time.Now()
switch cb.state {
case StateClosed:
cb.failureCount++
if cb.failureCount >= cb.maxFailures {
cb.state = StateOpen
}
case StateHalfOpen:
cb.state = StateOpen // 探测失败,重新熔断
}
}
func (cb *CircuitBreaker) onSuccess() {
cb.failureCount = 0
if cb.state == StateHalfOpen {
cb.successCount++
if cb.successCount >= cb.halfOpenSuccess {
cb.state = StateClosed // 探测成功,恢复
}
}
}
// ─── 使用示例 ─────────────────────────────────────────────────
func main() {
cb := New(5, 2, 10*time.Second)
callDownstream := func() error {
return cb.Execute(func() error {
// 调用下游服务
resp, err := http.Get("http://downstream-service/api")
if err != nil { return err }
resp.Body.Close()
if resp.StatusCode >= 500 {
return fmt.Errorf("server error: %d", resp.StatusCode)
}
return nil
})
}
err := callDownstream()
if errors.Is(err, ErrCircuitOpen) {
// 熔断:使用降级策略(缓存、默认值等)
fmt.Println("服务熔断,返回缓存数据")
}
}
5. 消息队列集成(Kafka / NATS)
原理:异步消息队列将服务间的同步调用解耦为生产者/消费者模型。生产者发布消息后立即返回,不等待消费者处理,实现服务解耦、削峰填谷、最终一致性。
package main
import (
"context"
"encoding/json"
"fmt"
"log"
"github.com/nats-io/nats.go" // go get github.com/nats-io/nats.go
"github.com/nats-io/nats.go/jetstream"
)
// NATS JetStream:持久化消息队列
type OrderEvent struct {
OrderID int64 `json:"order_id"`
UserID int64 `json:"user_id"`
Amount float64 `json:"amount"`
EventType string `json:"event_type"` // "created" | "paid" | "cancelled"
}
func main() {
// 连接 NATS 服务器
nc, err := nats.Connect("nats://localhost:4222",
nats.MaxReconnects(10),
nats.ReconnectWait(2*time.Second),
)
if err != nil {
log.Fatal("NATS 连接失败:", err)
}
defer nc.Close()
// 创建 JetStream 上下文(支持持久化和 ACK)
js, err := jetstream.New(nc)
if err != nil { log.Fatal(err) }
ctx := context.Background()
// 创建 Stream(存储消息)
stream, err := js.CreateOrUpdateStream(ctx, jetstream.StreamConfig{
Name: "ORDERS",
Subjects: []string{"orders.>"}, // 匹配所有 orders.* 主题
MaxAge: 7 * 24 * time.Hour, // 保留 7 天
Replicas: 1, // 副本数(生产用 3)
})
if err != nil { log.Fatal(err) }
// ─── 生产者:发布订单事件 ─────────────────────────────────
publishOrder := func(event OrderEvent) error {
data, err := json.Marshal(event)
if err != nil { return err }
ack, err := js.Publish(ctx, "orders.created", data)
if err != nil { return err }
fmt.Printf("消息已发布,seq=%d\n", ack.Sequence)
return nil
}
publishOrder(OrderEvent{OrderID: 1001, UserID: 42, Amount: 299.0, EventType: "created"})
// ─── 消费者:处理订单事件 ─────────────────────────────────
cons, err := stream.CreateOrUpdateConsumer(ctx, jetstream.ConsumerConfig{
Name: "payment-service", // 消费者名称(持久化)
FilterSubject: "orders.created",
AckPolicy: jetstream.AckExplicitPolicy, // 必须显式 ACK
MaxDeliver: 3, // 最多重试 3 次
AckWait: 30 * time.Second,
})
if err != nil { log.Fatal(err) }
// 开始消费
cc, err := cons.Consume(func(msg jetstream.Msg) {
var event OrderEvent
if err := json.Unmarshal(msg.Data(), &event); err != nil {
msg.Nak() // 处理失败,NATS 会重新投递
return
}
// 处理业务逻辑
fmt.Printf("处理订单 %d,金额 %.2f\n", event.OrderID, event.Amount)
// 处理成功,确认消息(不 ACK 则会重新投递)
msg.Ack()
})
if err != nil { log.Fatal(err) }
defer cc.Stop()
// 阻塞等待
select {}
}