1. 微服务架构

客户端 (Web / Mobile / Third-party)

发起 HTTP / gRPC 请求

API 网关  (Kong / Traefik / 自研)

统一认证 · 限流 · 路由 · 负载均衡 · SSL 终止

用户服务 :8081

注册、登录、Profile、权限

订单服务 :8082

下单、支付、状态跟踪

通知服务 :8083

邮件、短信、Push 推送

PostgreSQL 用户数据库
PostgreSQL 订单数据库
Kafka 消息队列 · 事件总线
用户服务 / 订单服务 → 发布事件 → Kafka → 通知服务消费

微服务 vs 单体

何时选微服务

  • 团队规模 > 50人,需独立部署
  • 不同模块有差异化扩展需求
  • 需要多语言技术栈混合
  • 系统已足够复杂,单体难以维护

微服务的代价

  • 分布式系统的复杂性(网络、事务)
  • 服务发现、负载均衡基础设施
  • 分布式链路追踪的必要性
  • 数据一致性更难保证

2. gRPC 高性能 RPC

gRPC vs REST

gRPC 使用 HTTP/2 + Protobuf 二进制编码,性能比 JSON REST 高 5-10 倍;支持双向流;强类型接口契约;适合服务间通信(内部调用)。

定义 Protobuf 协议

// api/user/v1/user.proto
syntax = "proto3";

package user.v1;
option go_package = "github.com/myapp/api/user/v1;userv1";

import "google/protobuf/timestamp.proto";

// 消息定义
message User {
  int64  id         = 1;
  string name       = 2;
  string email      = 3;
  string role       = 4;
  bool   active     = 5;
  google.protobuf.Timestamp created_at = 6;
}

message GetUserRequest  { int64 id = 1; }
message GetUserResponse { User user = 1; }

message ListUsersRequest {
  int32  page      = 1;
  int32  page_size = 2;
  string search    = 3;
}

message ListUsersResponse {
  repeated User users = 1;
  int64         total = 2;
}

message CreateUserRequest {
  string name     = 1;
  string email    = 2;
  string password = 3;
}

// 服务定义
service UserService {
  rpc GetUser    (GetUserRequest)    returns (GetUserResponse);
  rpc ListUsers  (ListUsersRequest)  returns (ListUsersResponse);
  rpc CreateUser (CreateUserRequest) returns (User);

  // 服务端流式 RPC: 实时推送用户事件
  rpc WatchUsers (WatchUsersRequest) returns (stream User);

  // 双向流式 RPC: 批量处理
  rpc BatchProcess (stream CreateUserRequest) returns (stream User);
}
# 安装工具
go install google.golang.org/protobuf/cmd/protoc-gen-go@latest
go install google.golang.org/grpc/cmd/protoc-gen-go-grpc@latest

# 生成 Go 代码
protoc --go_out=. --go_opt=paths=source_relative \
       --go-grpc_out=. --go-grpc_opt=paths=source_relative \
       api/user/v1/user.proto

gRPC 服务端实现

package grpcserver

import (
    "context"
    "net"
    "log"
    "google.golang.org/grpc"
    "google.golang.org/grpc/codes"
    "google.golang.org/grpc/status"
    "google.golang.org/grpc/reflection"
    pb "github.com/myapp/api/user/v1"
)

type UserServer struct {
    pb.UnimplementedUserServiceServer // 必须嵌入,实现前向兼容
    svc service.UserService
}

// 实现 GetUser
func (s *UserServer) GetUser(ctx context.Context, req *pb.GetUserRequest) (*pb.GetUserResponse, error) {
    if req.Id <= 0 {
        return nil, status.Errorf(codes.InvalidArgument, "用户 ID 无效: %d", req.Id)
    }

    user, err := s.svc.GetUser(ctx, req.Id)
    if err != nil {
        if errors.Is(err, service.ErrNotFound) {
            return nil, status.Errorf(codes.NotFound, "用户 %d 不存在", req.Id)
        }
        return nil, status.Errorf(codes.Internal, "内部错误: %v", err)
    }

    return &pb.GetUserResponse{User: toProtoUser(user)}, nil
}

// 实现服务端流
func (s *UserServer) WatchUsers(req *pb.WatchUsersRequest, stream pb.UserService_WatchUsersServer) error {
    eventCh := s.svc.SubscribeUserEvents(stream.Context())
    for {
        select {
        case user, ok := <-eventCh:
            if !ok {
                return nil
            }
            if err := stream.Send(toProtoUser(user)); err != nil {
                return status.Errorf(codes.Unavailable, "发送失败: %v", err)
            }
        case <-stream.Context().Done():
            return stream.Context().Err()
        }
    }
}

// 实现双向流
func (s *UserServer) BatchProcess(stream pb.UserService_BatchProcessServer) error {
    for {
        req, err := stream.Recv()
        if err == io.EOF {
            return nil
        }
        if err != nil {
            return err
        }

        user, err := s.svc.CreateUser(stream.Context(), req)
        if err != nil {
            return err
        }

        if err := stream.Send(toProtoUser(user)); err != nil {
            return err
        }
    }
}

// 启动 gRPC 服务器
func RunGRPCServer(svc service.UserService) {
    lis, _ := net.Listen("tcp", ":50051")

    // 拦截器 (类似 HTTP 中间件)
    srv := grpc.NewServer(
        grpc.ChainUnaryInterceptor(
            loggingInterceptor,
            recoveryInterceptor,
            authInterceptor,
        ),
        grpc.ChainStreamInterceptor(
            streamLoggingInterceptor,
        ),
    )

    pb.RegisterUserServiceServer(srv, &UserServer{svc: svc})
    reflection.Register(srv) // 允许 grpcurl 等工具探索

    log.Println("gRPC 服务器启动: :50051")
    log.Fatal(srv.Serve(lis))
}

gRPC 客户端

package grpcclient

import (
    "context"
    "time"
    "google.golang.org/grpc"
    "google.golang.org/grpc/credentials/insecure"
    "google.golang.org/grpc/keepalive"
    pb "github.com/myapp/api/user/v1"
)

func NewUserClient(addr string) (pb.UserServiceClient, func(), error) {
    conn, err := grpc.NewClient(addr,
        grpc.WithTransportCredentials(insecure.NewCredentials()),
        grpc.WithKeepaliveParams(keepalive.ClientParameters{
            Time:                10 * time.Second,
            Timeout:             3 * time.Second,
            PermitWithoutStream: true,
        }),
        grpc.WithDefaultCallOptions(
            grpc.MaxCallRecvMsgSize(4*1024*1024), // 4MB
        ),
    )
    if err != nil {
        return nil, nil, err
    }

    client := pb.NewUserServiceClient(conn)
    cleanup := func() { conn.Close() }
    return client, cleanup, nil
}

// 使用客户端
func ExampleCall(client pb.UserServiceClient) {
    ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
    defer cancel()

    user, err := client.GetUser(ctx, &pb.GetUserRequest{Id: 1})
    if err != nil {
        // gRPC 错误码处理
        st, ok := status.FromError(err)
        if ok {
            switch st.Code() {
            case codes.NotFound:
                fmt.Println("用户不存在")
            case codes.DeadlineExceeded:
                fmt.Println("请求超时")
            default:
                fmt.Printf("gRPC 错误: %s\n", st.Message())
            }
        }
        return
    }
    fmt.Printf("用户: %s <%s>\n", user.User.Name, user.User.Email)
}

3. 消息队列:Kafka

go get github.com/IBM/sarama
package kafka

import (
    "context"
    "encoding/json"
    "github.com/IBM/sarama"
)

// ─── 生产者 ───────────────────────────────────────────────────
type Producer struct {
    client sarama.SyncProducer
}

func NewProducer(brokers []string) (*Producer, error) {
    cfg := sarama.NewConfig()
    cfg.Producer.RequiredAcks = sarama.WaitForAll  // 所有副本确认
    cfg.Producer.Retry.Max = 3
    cfg.Producer.Return.Successes = true
    cfg.Producer.Compression = sarama.CompressionSnappy

    client, err := sarama.NewSyncProducer(brokers, cfg)
    if err != nil {
        return nil, err
    }
    return &Producer{client: client}, nil
}

type Event struct {
    Type    string `json:"type"`
    Payload any    `json:"payload"`
}

func (p *Producer) Publish(topic string, key string, event Event) error {
    value, err := json.Marshal(event)
    if err != nil {
        return err
    }

    msg := &sarama.ProducerMessage{
        Topic: topic,
        Key:   sarama.StringEncoder(key),
        Value: sarama.ByteEncoder(value),
    }

    _, _, err = p.client.SendMessage(msg)
    return err
}

// ─── 消费者组 ─────────────────────────────────────────────────
type ConsumerGroup struct {
    group   sarama.ConsumerGroup
    handler sarama.ConsumerGroupHandler
}

type Handler struct {
    ready   chan bool
    process func(msg *sarama.ConsumerMessage) error
}

func (h *Handler) Setup(s sarama.ConsumerGroupSession) error {
    close(h.ready)
    return nil
}
func (h *Handler) Cleanup(s sarama.ConsumerGroupSession) error { return nil }
func (h *Handler) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
    for msg := range claim.Messages() {
        if err := h.process(msg); err != nil {
            // 处理失败可选择: 重试、死信队列、跳过
            log.Printf("消息处理失败: %v, offset: %d", err, msg.Offset)
            continue
        }
        // 手动提交 offset (确保至少一次消费)
        session.MarkMessage(msg, "")
    }
    return nil
}

func NewConsumerGroup(brokers []string, groupID string, topics []string, processor func(*sarama.ConsumerMessage) error) error {
    cfg := sarama.NewConfig()
    cfg.Consumer.Offsets.Initial = sarama.OffsetNewest
    cfg.Consumer.Group.Rebalance.GroupStrategies = []sarama.BalanceStrategy{
        sarama.NewBalanceStrategyRoundRobin(),
    }

    group, _ := sarama.NewConsumerGroup(brokers, groupID, cfg)
    handler := &Handler{ready: make(chan bool), process: processor}

    ctx, cancel := context.WithCancel(context.Background())
    defer cancel()

    // 处理重平衡信号
    for {
        if err := group.Consume(ctx, topics, handler); err != nil {
            return err
        }
        handler.ready = make(chan bool)
    }
}

4. 服务发现与 API 网关

package registry

import (
    "context"
    "fmt"
    "time"
    clientv3 "go.etcd.io/etcd/client/v3"
)

type ServiceRegistry struct {
    client *clientv3.Client
    leaseID clientv3.LeaseID
}

type ServiceInfo struct {
    Name    string `json:"name"`
    Host    string `json:"host"`
    Port    int    `json:"port"`
    Version string `json:"version"`
}

// 服务注册 (带心跳)
func (r *ServiceRegistry) Register(ctx context.Context, info ServiceInfo) error {
    // 创建 lease (TTL 10秒)
    lease, err := r.client.Grant(ctx, 10)
    if err != nil {
        return err
    }
    r.leaseID = lease.ID

    key := fmt.Sprintf("/services/%s/%s:%d", info.Name, info.Host, info.Port)
    value, _ := json.Marshal(info)

    // 注册服务 (绑定 lease)
    _, err = r.client.Put(ctx, key, string(value),
        clientv3.WithLease(lease.ID))
    if err != nil {
        return err
    }

    // 自动续租 (keepalive)
    ch, err := r.client.KeepAlive(ctx, lease.ID)
    if err != nil {
        return err
    }

    go func() {
        for range ch {
            // 持续消费续租响应
        }
    }()

    fmt.Printf("服务注册成功: %s\n", key)
    return nil
}

// 服务发现
func (r *ServiceRegistry) Discover(ctx context.Context, serviceName string) ([]ServiceInfo, error) {
    prefix := fmt.Sprintf("/services/%s/", serviceName)
    resp, err := r.client.Get(ctx, prefix, clientv3.WithPrefix())
    if err != nil {
        return nil, err
    }

    var services []ServiceInfo
    for _, kv := range resp.Kvs {
        var info ServiceInfo
        if err := json.Unmarshal(kv.Value, &info); err == nil {
            services = append(services, info)
        }
    }
    return services, nil
}

// 监听服务变更
func (r *ServiceRegistry) Watch(ctx context.Context, serviceName string, onChange func([]ServiceInfo)) {
    prefix := fmt.Sprintf("/services/%s/", serviceName)
    watchCh := r.client.Watch(ctx, prefix, clientv3.WithPrefix())

    for resp := range watchCh {
        for _, event := range resp.Events {
            fmt.Printf("服务变更: %s %s\n", event.Type, event.Kv.Key)
        }
        services, _ := r.Discover(ctx, serviceName)
        onChange(services)
    }
}
🎯

微服务设计原则

① 单一职责:每个服务只做一件事  ② 松耦合:服务间通过接口或消息通信  ③ 数据隔离:每个服务有自己的数据库  ④ 失败容忍:实现熔断器(Circuit Breaker)  ⑤ 可观测:每个服务都暴露健康检查和 metrics

4. 熔断器模式(Circuit Breaker)

原理:熔断器模仿电路断路器的工作方式,保护系统免受级联故障(Cascading Failure)的影响。当下游服务出现大量错误时,熔断器"断开",后续请求直接返回错误而不再等待超时,给下游服务恢复的时间,同时防止调用方线程/goroutine 被大量阻塞。

package circuitbreaker

import (
    "errors"
    "sync"
    "time"
)

type State int

const (
    StateClosed   State = iota  // 正常
    StateOpen                    // 熔断
    StateHalfOpen               // 半开探测
)

var ErrCircuitOpen = errors.New("circuit breaker is open")

type CircuitBreaker struct {
    mu              sync.Mutex
    state           State
    failureCount    int       // 连续失败次数
    successCount    int       // 半开状态下连续成功次数
    lastFailureTime time.Time

    // 配置
    maxFailures     int           // 失败阈值(转 Open)
    halfOpenSuccess int           // 半开成功阈值(转 Closed)
    resetTimeout    time.Duration // Open → HalfOpen 的等待时间
}

func New(maxFailures, halfOpenSuccess int, resetTimeout time.Duration) *CircuitBreaker {
    return &CircuitBreaker{
        maxFailures:     maxFailures,
        halfOpenSuccess: halfOpenSuccess,
        resetTimeout:    resetTimeout,
    }
}

func (cb *CircuitBreaker) Execute(fn func() error) error {
    cb.mu.Lock()
    state := cb.currentState()

    switch state {
    case StateOpen:
        cb.mu.Unlock()
        return ErrCircuitOpen  // 快速失败

    case StateHalfOpen:
        // 半开状态:只允许一个探测请求
        cb.state = StateHalfOpen
        cb.mu.Unlock()
    default:
        cb.mu.Unlock()
    }

    err := fn()  // 执行实际请求

    cb.mu.Lock()
    defer cb.mu.Unlock()

    if err != nil {
        cb.onFailure()
    } else {
        cb.onSuccess()
    }
    return err
}

func (cb *CircuitBreaker) currentState() State {
    if cb.state == StateOpen {
        // 检查是否到了半开窗口
        if time.Since(cb.lastFailureTime) > cb.resetTimeout {
            cb.state = StateHalfOpen
            cb.successCount = 0
        }
    }
    return cb.state
}

func (cb *CircuitBreaker) onFailure() {
    cb.successCount = 0
    cb.lastFailureTime = time.Now()

    switch cb.state {
    case StateClosed:
        cb.failureCount++
        if cb.failureCount >= cb.maxFailures {
            cb.state = StateOpen
        }
    case StateHalfOpen:
        cb.state = StateOpen  // 探测失败,重新熔断
    }
}

func (cb *CircuitBreaker) onSuccess() {
    cb.failureCount = 0
    if cb.state == StateHalfOpen {
        cb.successCount++
        if cb.successCount >= cb.halfOpenSuccess {
            cb.state = StateClosed  // 探测成功,恢复
        }
    }
}

// ─── 使用示例 ─────────────────────────────────────────────────
func main() {
    cb := New(5, 2, 10*time.Second)

    callDownstream := func() error {
        return cb.Execute(func() error {
            // 调用下游服务
            resp, err := http.Get("http://downstream-service/api")
            if err != nil { return err }
            resp.Body.Close()
            if resp.StatusCode >= 500 {
                return fmt.Errorf("server error: %d", resp.StatusCode)
            }
            return nil
        })
    }

    err := callDownstream()
    if errors.Is(err, ErrCircuitOpen) {
        // 熔断:使用降级策略(缓存、默认值等)
        fmt.Println("服务熔断,返回缓存数据")
    }
}

5. 消息队列集成(Kafka / NATS)

原理:异步消息队列将服务间的同步调用解耦为生产者/消费者模型。生产者发布消息后立即返回,不等待消费者处理,实现服务解耦、削峰填谷、最终一致性。

package main

import (
    "context"
    "encoding/json"
    "fmt"
    "log"

    "github.com/nats-io/nats.go"  // go get github.com/nats-io/nats.go
    "github.com/nats-io/nats.go/jetstream"
)

// NATS JetStream:持久化消息队列

type OrderEvent struct {
    OrderID   int64  `json:"order_id"`
    UserID    int64  `json:"user_id"`
    Amount    float64 `json:"amount"`
    EventType string `json:"event_type"` // "created" | "paid" | "cancelled"
}

func main() {
    // 连接 NATS 服务器
    nc, err := nats.Connect("nats://localhost:4222",
        nats.MaxReconnects(10),
        nats.ReconnectWait(2*time.Second),
    )
    if err != nil {
        log.Fatal("NATS 连接失败:", err)
    }
    defer nc.Close()

    // 创建 JetStream 上下文(支持持久化和 ACK)
    js, err := jetstream.New(nc)
    if err != nil { log.Fatal(err) }
    ctx := context.Background()

    // 创建 Stream(存储消息)
    stream, err := js.CreateOrUpdateStream(ctx, jetstream.StreamConfig{
        Name:     "ORDERS",
        Subjects: []string{"orders.>"},   // 匹配所有 orders.* 主题
        MaxAge:   7 * 24 * time.Hour,     // 保留 7 天
        Replicas: 1,                       // 副本数(生产用 3)
    })
    if err != nil { log.Fatal(err) }

    // ─── 生产者:发布订单事件 ─────────────────────────────────
    publishOrder := func(event OrderEvent) error {
        data, err := json.Marshal(event)
        if err != nil { return err }
        ack, err := js.Publish(ctx, "orders.created", data)
        if err != nil { return err }
        fmt.Printf("消息已发布,seq=%d\n", ack.Sequence)
        return nil
    }

    publishOrder(OrderEvent{OrderID: 1001, UserID: 42, Amount: 299.0, EventType: "created"})

    // ─── 消费者:处理订单事件 ─────────────────────────────────
    cons, err := stream.CreateOrUpdateConsumer(ctx, jetstream.ConsumerConfig{
        Name:          "payment-service",     // 消费者名称(持久化)
        FilterSubject: "orders.created",
        AckPolicy:     jetstream.AckExplicitPolicy,  // 必须显式 ACK
        MaxDeliver:    3,                             // 最多重试 3 次
        AckWait:       30 * time.Second,
    })
    if err != nil { log.Fatal(err) }

    // 开始消费
    cc, err := cons.Consume(func(msg jetstream.Msg) {
        var event OrderEvent
        if err := json.Unmarshal(msg.Data(), &event); err != nil {
            msg.Nak()  // 处理失败,NATS 会重新投递
            return
        }

        // 处理业务逻辑
        fmt.Printf("处理订单 %d,金额 %.2f\n", event.OrderID, event.Amount)

        // 处理成功,确认消息(不 ACK 则会重新投递)
        msg.Ack()
    })
    if err != nil { log.Fatal(err) }
    defer cc.Stop()

    // 阻塞等待
    select {}
}