1. 微服务架构

客户端 (Web / Mobile / Third-party)

发起 HTTP / gRPC 请求

API 网关  (Kong / Traefik / 自研)

统一认证 · 限流 · 路由 · 负载均衡 · SSL 终止

用户服务 :8081

注册、登录、Profile、权限

订单服务 :8082

下单、支付、状态跟踪

通知服务 :8083

邮件、短信、Push 推送

PostgreSQL 用户数据库
PostgreSQL 订单数据库
Kafka 消息队列 · 事件总线
用户服务 / 订单服务 → 发布事件 → Kafka → 通知服务消费

微服务 vs 单体

何时选微服务

  • 团队规模 > 50人,需独立部署
  • 不同模块有差异化扩展需求
  • 需要多语言技术栈混合
  • 系统已足够复杂,单体难以维护

微服务的代价

  • 分布式系统的复杂性(网络、事务)
  • 服务发现、负载均衡基础设施
  • 分布式链路追踪的必要性
  • 数据一致性更难保证

2. gRPC 高性能 RPC

gRPC vs REST

gRPC 使用 HTTP/2 + Protobuf 二进制编码,性能比 JSON REST 高 5-10 倍;支持双向流;强类型接口契约;适合服务间通信(内部调用)。

定义 Protobuf 协议

// api/user/v1/user.proto
syntax = "proto3";

package user.v1;
option go_package = "github.com/myapp/api/user/v1;userv1";

import "google/protobuf/timestamp.proto";

// 消息定义
message User {
  int64  id         = 1;
  string name       = 2;
  string email      = 3;
  string role       = 4;
  bool   active     = 5;
  google.protobuf.Timestamp created_at = 6;
}

message GetUserRequest  { int64 id = 1; }
message GetUserResponse { User user = 1; }

message ListUsersRequest {
  int32  page      = 1;
  int32  page_size = 2;
  string search    = 3;
}

message ListUsersResponse {
  repeated User users = 1;
  int64         total = 2;
}

message CreateUserRequest {
  string name     = 1;
  string email    = 2;
  string password = 3;
}

// 服务定义
service UserService {
  rpc GetUser    (GetUserRequest)    returns (GetUserResponse);
  rpc ListUsers  (ListUsersRequest)  returns (ListUsersResponse);
  rpc CreateUser (CreateUserRequest) returns (User);

  // 服务端流式 RPC: 实时推送用户事件
  rpc WatchUsers (WatchUsersRequest) returns (stream User);

  // 双向流式 RPC: 批量处理
  rpc BatchProcess (stream CreateUserRequest) returns (stream User);
}
# 安装工具
go install google.golang.org/protobuf/cmd/protoc-gen-go@latest
go install google.golang.org/grpc/cmd/protoc-gen-go-grpc@latest

# 生成 Go 代码
protoc --go_out=. --go_opt=paths=source_relative \
       --go-grpc_out=. --go-grpc_opt=paths=source_relative \
       api/user/v1/user.proto

gRPC 服务端实现

package grpcserver

import (
    "context"
    "net"
    "log"
    "google.golang.org/grpc"
    "google.golang.org/grpc/codes"
    "google.golang.org/grpc/status"
    "google.golang.org/grpc/reflection"
    pb "github.com/myapp/api/user/v1"
)

type UserServer struct {
    pb.UnimplementedUserServiceServer // 必须嵌入,实现前向兼容
    svc service.UserService
}

// 实现 GetUser
func (s *UserServer) GetUser(ctx context.Context, req *pb.GetUserRequest) (*pb.GetUserResponse, error) {
    if req.Id <= 0 {
        return nil, status.Errorf(codes.InvalidArgument, "用户 ID 无效: %d", req.Id)
    }

    user, err := s.svc.GetUser(ctx, req.Id)
    if err != nil {
        if errors.Is(err, service.ErrNotFound) {
            return nil, status.Errorf(codes.NotFound, "用户 %d 不存在", req.Id)
        }
        return nil, status.Errorf(codes.Internal, "内部错误: %v", err)
    }

    return &pb.GetUserResponse{User: toProtoUser(user)}, nil
}

// 实现服务端流
func (s *UserServer) WatchUsers(req *pb.WatchUsersRequest, stream pb.UserService_WatchUsersServer) error {
    eventCh := s.svc.SubscribeUserEvents(stream.Context())
    for {
        select {
        case user, ok := <-eventCh:
            if !ok {
                return nil
            }
            if err := stream.Send(toProtoUser(user)); err != nil {
                return status.Errorf(codes.Unavailable, "发送失败: %v", err)
            }
        case <-stream.Context().Done():
            return stream.Context().Err()
        }
    }
}

// 实现双向流
func (s *UserServer) BatchProcess(stream pb.UserService_BatchProcessServer) error {
    for {
        req, err := stream.Recv()
        if err == io.EOF {
            return nil
        }
        if err != nil {
            return err
        }

        user, err := s.svc.CreateUser(stream.Context(), req)
        if err != nil {
            return err
        }

        if err := stream.Send(toProtoUser(user)); err != nil {
            return err
        }
    }
}

// 启动 gRPC 服务器
func RunGRPCServer(svc service.UserService) {
    lis, _ := net.Listen("tcp", ":50051")

    // 拦截器 (类似 HTTP 中间件)
    srv := grpc.NewServer(
        grpc.ChainUnaryInterceptor(
            loggingInterceptor,
            recoveryInterceptor,
            authInterceptor,
        ),
        grpc.ChainStreamInterceptor(
            streamLoggingInterceptor,
        ),
    )

    pb.RegisterUserServiceServer(srv, &UserServer{svc: svc})
    reflection.Register(srv) // 允许 grpcurl 等工具探索

    log.Println("gRPC 服务器启动: :50051")
    log.Fatal(srv.Serve(lis))
}

gRPC 客户端

package grpcclient

import (
    "context"
    "time"
    "google.golang.org/grpc"
    "google.golang.org/grpc/credentials/insecure"
    "google.golang.org/grpc/keepalive"
    pb "github.com/myapp/api/user/v1"
)

func NewUserClient(addr string) (pb.UserServiceClient, func(), error) {
    conn, err := grpc.NewClient(addr,
        grpc.WithTransportCredentials(insecure.NewCredentials()),
        grpc.WithKeepaliveParams(keepalive.ClientParameters{
            Time:                10 * time.Second,
            Timeout:             3 * time.Second,
            PermitWithoutStream: true,
        }),
        grpc.WithDefaultCallOptions(
            grpc.MaxCallRecvMsgSize(4*1024*1024), // 4MB
        ),
    )
    if err != nil {
        return nil, nil, err
    }

    client := pb.NewUserServiceClient(conn)
    cleanup := func() { conn.Close() }
    return client, cleanup, nil
}

// 使用客户端
func ExampleCall(client pb.UserServiceClient) {
    ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
    defer cancel()

    user, err := client.GetUser(ctx, &pb.GetUserRequest{Id: 1})
    if err != nil {
        // gRPC 错误码处理
        st, ok := status.FromError(err)
        if ok {
            switch st.Code() {
            case codes.NotFound:
                fmt.Println("用户不存在")
            case codes.DeadlineExceeded:
                fmt.Println("请求超时")
            default:
                fmt.Printf("gRPC 错误: %s\n", st.Message())
            }
        }
        return
    }
    fmt.Printf("用户: %s <%s>\n", user.User.Name, user.User.Email)
}

3. 消息队列:Kafka

go get github.com/IBM/sarama
package kafka

import (
    "context"
    "encoding/json"
    "github.com/IBM/sarama"
)

// ─── 生产者 ───────────────────────────────────────────────────
type Producer struct {
    client sarama.SyncProducer
}

func NewProducer(brokers []string) (*Producer, error) {
    cfg := sarama.NewConfig()
    cfg.Producer.RequiredAcks = sarama.WaitForAll  // 所有副本确认
    cfg.Producer.Retry.Max = 3
    cfg.Producer.Return.Successes = true
    cfg.Producer.Compression = sarama.CompressionSnappy

    client, err := sarama.NewSyncProducer(brokers, cfg)
    if err != nil {
        return nil, err
    }
    return &Producer{client: client}, nil
}

type Event struct {
    Type    string `json:"type"`
    Payload any    `json:"payload"`
}

func (p *Producer) Publish(topic string, key string, event Event) error {
    value, err := json.Marshal(event)
    if err != nil {
        return err
    }

    msg := &sarama.ProducerMessage{
        Topic: topic,
        Key:   sarama.StringEncoder(key),
        Value: sarama.ByteEncoder(value),
    }

    _, _, err = p.client.SendMessage(msg)
    return err
}

// ─── 消费者组 ─────────────────────────────────────────────────
type ConsumerGroup struct {
    group   sarama.ConsumerGroup
    handler sarama.ConsumerGroupHandler
}

type Handler struct {
    ready   chan bool
    process func(msg *sarama.ConsumerMessage) error
}

func (h *Handler) Setup(s sarama.ConsumerGroupSession) error {
    close(h.ready)
    return nil
}
func (h *Handler) Cleanup(s sarama.ConsumerGroupSession) error { return nil }
func (h *Handler) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
    for msg := range claim.Messages() {
        if err := h.process(msg); err != nil {
            // 处理失败可选择: 重试、死信队列、跳过
            log.Printf("消息处理失败: %v, offset: %d", err, msg.Offset)
            continue
        }
        // 手动提交 offset (确保至少一次消费)
        session.MarkMessage(msg, "")
    }
    return nil
}

func NewConsumerGroup(brokers []string, groupID string, topics []string, processor func(*sarama.ConsumerMessage) error) error {
    cfg := sarama.NewConfig()
    cfg.Consumer.Offsets.Initial = sarama.OffsetNewest
    cfg.Consumer.Group.Rebalance.GroupStrategies = []sarama.BalanceStrategy{
        sarama.NewBalanceStrategyRoundRobin(),
    }

    group, _ := sarama.NewConsumerGroup(brokers, groupID, cfg)
    handler := &Handler{ready: make(chan bool), process: processor}

    ctx, cancel := context.WithCancel(context.Background())
    defer cancel()

    // 处理重平衡信号
    for {
        if err := group.Consume(ctx, topics, handler); err != nil {
            return err
        }
        handler.ready = make(chan bool)
    }
}

4. 服务发现与 API 网关

package registry

import (
    "context"
    "fmt"
    "time"
    clientv3 "go.etcd.io/etcd/client/v3"
)

type ServiceRegistry struct {
    client *clientv3.Client
    leaseID clientv3.LeaseID
}

type ServiceInfo struct {
    Name    string `json:"name"`
    Host    string `json:"host"`
    Port    int    `json:"port"`
    Version string `json:"version"`
}

// 服务注册 (带心跳)
func (r *ServiceRegistry) Register(ctx context.Context, info ServiceInfo) error {
    // 创建 lease (TTL 10秒)
    lease, err := r.client.Grant(ctx, 10)
    if err != nil {
        return err
    }
    r.leaseID = lease.ID

    key := fmt.Sprintf("/services/%s/%s:%d", info.Name, info.Host, info.Port)
    value, _ := json.Marshal(info)

    // 注册服务 (绑定 lease)
    _, err = r.client.Put(ctx, key, string(value),
        clientv3.WithLease(lease.ID))
    if err != nil {
        return err
    }

    // 自动续租 (keepalive)
    ch, err := r.client.KeepAlive(ctx, lease.ID)
    if err != nil {
        return err
    }

    go func() {
        for range ch {
            // 持续消费续租响应
        }
    }()

    fmt.Printf("服务注册成功: %s\n", key)
    return nil
}

// 服务发现
func (r *ServiceRegistry) Discover(ctx context.Context, serviceName string) ([]ServiceInfo, error) {
    prefix := fmt.Sprintf("/services/%s/", serviceName)
    resp, err := r.client.Get(ctx, prefix, clientv3.WithPrefix())
    if err != nil {
        return nil, err
    }

    var services []ServiceInfo
    for _, kv := range resp.Kvs {
        var info ServiceInfo
        if err := json.Unmarshal(kv.Value, &info); err == nil {
            services = append(services, info)
        }
    }
    return services, nil
}

// 监听服务变更
func (r *ServiceRegistry) Watch(ctx context.Context, serviceName string, onChange func([]ServiceInfo)) {
    prefix := fmt.Sprintf("/services/%s/", serviceName)
    watchCh := r.client.Watch(ctx, prefix, clientv3.WithPrefix())

    for resp := range watchCh {
        for _, event := range resp.Events {
            fmt.Printf("服务变更: %s %s\n", event.Type, event.Kv.Key)
        }
        services, _ := r.Discover(ctx, serviceName)
        onChange(services)
    }
}
🎯

微服务设计原则

① 单一职责:每个服务只做一件事  ② 松耦合:服务间通过接口或消息通信  ③ 数据隔离:每个服务有自己的数据库  ④ 失败容忍:实现熔断器(Circuit Breaker)  ⑤ 可观测:每个服务都暴露健康检查和 metrics