1. 分布式系统理论

CAP 定理

三者只能选二

  • Consistency — 一致性:所有节点同一时刻看到相同数据
  • Availability — 可用性:每个请求都得到响应
  • Partition tolerance — 分区容忍:网络分区时系统仍运行

网络分区不可避免,实际在 CP(如 etcd、ZooKeeper)和 AP(如 Cassandra、DynamoDB)之间选择。

BASE 理论(实践)

  • Basically Available — 基本可用
  • Soft State — 软状态
  • Eventually Consistent — 最终一致性

大多数互联网系统采用 BASE,通过 Saga 模式、事件溯源等实现分布式事务的最终一致性。

Saga 模式(分布式事务)

package saga

import (
    "context"
    "fmt"
)

// Saga 编排模式:由中心 Orchestrator 协调
type OrderSaga struct {
    inventorySvc InventoryService
    paymentSvc   PaymentService
    shippingSvc  ShippingService
    orderRepo    OrderRepository
}

type SagaStep struct {
    name       string
    execute    func(ctx context.Context, data *OrderData) error
    compensate func(ctx context.Context, data *OrderData) error // 补偿操作
}

func (s *OrderSaga) Execute(ctx context.Context, order *OrderData) error {
    steps := []SagaStep{
        {
            name:       "扣减库存",
            execute:    func(ctx context.Context, d *OrderData) error { return s.inventorySvc.Reserve(ctx, d.ProductID, d.Quantity) },
            compensate: func(ctx context.Context, d *OrderData) error { return s.inventorySvc.Release(ctx, d.ProductID, d.Quantity) },
        },
        {
            name:       "扣款",
            execute:    func(ctx context.Context, d *OrderData) error { return s.paymentSvc.Charge(ctx, d.UserID, d.Amount) },
            compensate: func(ctx context.Context, d *OrderData) error { return s.paymentSvc.Refund(ctx, d.UserID, d.Amount) },
        },
        {
            name:       "创建物流",
            execute:    func(ctx context.Context, d *OrderData) error { return s.shippingSvc.Create(ctx, d) },
            compensate: func(ctx context.Context, d *OrderData) error { return s.shippingSvc.Cancel(ctx, d) },
        },
    }

    completed := make([]int, 0)

    for i, step := range steps {
        fmt.Printf("执行步骤: %s\n", step.name)
        if err := step.execute(ctx, order); err != nil {
            fmt.Printf("步骤 %s 失败: %v,开始补偿\n", step.name, err)

            // 逆序执行补偿操作
            for j := len(completed) - 1; j >= 0; j-- {
                compStep := steps[completed[j]]
                fmt.Printf("补偿: %s\n", compStep.name)
                if compErr := compStep.compensate(ctx, order); compErr != nil {
                    fmt.Printf("补偿失败: %v (需人工介入)\n", compErr)
                }
            }
            return fmt.Errorf("saga 失败于步骤 %s: %w", step.name, err)
        }
        completed = append(completed, i)
    }

    fmt.Println("Saga 执行成功")
    return nil
}

2. Go 性能分析与调优

pprof 内置性能分析器

package main

import (
    "net/http"
    _ "net/http/pprof"  // 注册 pprof 路由
    "runtime"
    "time"
)

func main() {
    // 开启 pprof 端点 (生产环境需要鉴权保护!)
    go func() {
        http.ListenAndServe(":6060", nil)
    }()

    // 手动控制 GC
    runtime.GC()

    // 通过 HTTP 访问分析数据:
    // CPU 分析: curl "localhost:6060/debug/pprof/profile?seconds=30" > cpu.pprof
    // 内存分析: curl "localhost:6060/debug/pprof/heap" > heap.pprof
    // goroutine: curl "localhost:6060/debug/pprof/goroutine" > goroutine.pprof
    // 火焰图:   go tool pprof -http=:8088 cpu.pprof
}

// 基准测试
func BenchmarkHandler(b *testing.B) {
    handler := setupHandler()
    req := httptest.NewRequest("GET", "/api/v1/users", nil)

    b.ResetTimer()
    b.RunParallel(func(pb *testing.PB) {
        for pb.Next() {
            w := httptest.NewRecorder()
            handler.ServeHTTP(w, req)
        }
    })

    b.ReportMetric(float64(b.N)/b.Elapsed().Seconds(), "req/s")
}

常见性能优化技巧

package optimization

import (
    "bytes"
    "strings"
    "sync"
    "unsafe"
)

// ─── 1. 减少内存分配 ──────────────────────────────────────────
// 预分配 slice 容量
func goodSlice(n int) []int {
    s := make([]int, 0, n) // ✅ 预分配,避免多次扩容
    for i := 0; i < n; i++ {
        s = append(s, i)
    }
    return s
}

// ─── 2. 字符串拼接 ─────────────────────────────────────────────
func buildString(parts []string) string {
    // ✅ strings.Builder:零内存复制
    var sb strings.Builder
    sb.Grow(estimateSize(parts)) // 预分配
    for _, p := range parts {
        sb.WriteString(p)
    }
    return sb.String()
}

// ─── 3. sync.Pool 对象复用 ────────────────────────────────────
var bufPool = sync.Pool{
    New: func() any { return new(bytes.Buffer) },
}

func processData(data []byte) string {
    buf := bufPool.Get().(*bytes.Buffer)
    buf.Reset()
    defer bufPool.Put(buf)

    buf.Write(data)
    // 处理...
    return buf.String()
}

// ─── 4. 零拷贝字符串转换 ──────────────────────────────────────
// 警告:仅在确定字符串不会被修改时使用
func bytesToStringUnsafe(b []byte) string {
    return unsafe.String(&b[0], len(b))
}

func stringToBytesUnsafe(s string) []byte {
    return unsafe.Slice(unsafe.StringData(s), len(s))
}

// ─── 5. 减少锁竞争 ─────────────────────────────────────────────
// 使用 sync.Map 替代带锁的 map(读多写少场景)
type Cache struct {
    m sync.Map
}

func (c *Cache) Get(key string) (any, bool) {
    return c.m.Load(key)
}

func (c *Cache) Set(key string, val any) {
    c.m.Store(key, val)
}

// ─── 6. goroutine 池 ──────────────────────────────────────────
type WorkerPool struct {
    tasks chan func()
    wg    sync.WaitGroup
}

func NewWorkerPool(workers int) *WorkerPool {
    p := &WorkerPool{tasks: make(chan func(), 1000)}
    for i := 0; i < workers; i++ {
        go func() {
            for task := range p.tasks {
                task()
            }
        }()
    }
    return p
}

func (p *WorkerPool) Submit(task func()) {
    p.tasks <- task
}

func (p *WorkerPool) Close() {
    close(p.tasks)
}

3. 链路追踪:OpenTelemetry

go get go.opentelemetry.io/otel
go get go.opentelemetry.io/otel/sdk/trace
go get go.opentelemetry.io/otel/exporters/jaeger
package tracing

import (
    "context"
    "go.opentelemetry.io/otel"
    "go.opentelemetry.io/otel/attribute"
    "go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc"
    "go.opentelemetry.io/otel/sdk/resource"
    sdktrace "go.opentelemetry.io/otel/sdk/trace"
    semconv "go.opentelemetry.io/otel/semconv/v1.21.0"
    "go.opentelemetry.io/otel/trace"
)

func InitTracer(serviceName, endpoint string) (func(context.Context) error, error) {
    exporter, err := otlptracegrpc.New(context.Background(),
        otlptracegrpc.WithEndpoint(endpoint),
        otlptracegrpc.WithInsecure(),
    )
    if err != nil {
        return nil, err
    }

    tp := sdktrace.NewTracerProvider(
        sdktrace.WithBatcher(exporter),
        sdktrace.WithSampler(sdktrace.ParentBased(sdktrace.TraceIDRatioBased(0.1))), // 10% 采样
        sdktrace.WithResource(resource.NewWithAttributes(
            semconv.SchemaURL,
            semconv.ServiceName(serviceName),
            semconv.ServiceVersion("1.0.0"),
            attribute.String("env", "production"),
        )),
    )

    otel.SetTracerProvider(tp)
    return tp.Shutdown, nil
}

// 在业务代码中使用 Span
func GetUser(ctx context.Context, id int64) (*User, error) {
    tracer := otel.Tracer("user-service")

    // 创建 span
    ctx, span := tracer.Start(ctx, "GetUser",
        trace.WithAttributes(
            attribute.Int64("user.id", id),
        ),
    )
    defer span.End()

    // 数据库查询 (自动继承 trace context)
    user, err := db.QueryUser(ctx, id)
    if err != nil {
        span.RecordError(err)
        span.SetStatus(codes.Error, err.Error())
        return nil, err
    }

    span.SetAttributes(attribute.String("user.email", user.Email))
    return user, nil
}

// Gin 中间件传播 trace context
func OtelMiddleware(serviceName string) gin.HandlerFunc {
    tracer := otel.Tracer(serviceName)
    return func(c *gin.Context) {
        ctx, span := tracer.Start(c.Request.Context(),
            c.FullPath(),
            trace.WithSpanKind(trace.SpanKindServer),
        )
        defer span.End()

        c.Request = c.Request.WithContext(ctx)
        c.Next()

        span.SetAttributes(
            attribute.Int("http.status_code", c.Writer.Status()),
            attribute.String("http.method", c.Request.Method),
        )
    }
}

4. 熔断器与服务韧性

package resilience

import (
    "errors"
    "sync"
    "time"
)

// 熔断器状态
type State int

const (
    StateClosed   State = iota // 正常: 请求通过
    StateHalfOpen              // 半开: 探测恢复
    StateOpen                  // 断开: 快速失败
)

type CircuitBreaker struct {
    mu           sync.Mutex
    state        State
    failCount    int
    successCount int
    lastFailTime time.Time

    maxFailures  int
    resetTimeout time.Duration
    halfOpenMax  int
}

func NewCircuitBreaker(maxFailures int, resetTimeout time.Duration) *CircuitBreaker {
    return &CircuitBreaker{
        maxFailures:  maxFailures,
        resetTimeout: resetTimeout,
        halfOpenMax:  3,
    }
}

var ErrCircuitOpen = errors.New("熔断器断开,服务不可用")

func (cb *CircuitBreaker) Execute(fn func() error) error {
    cb.mu.Lock()
    state := cb.currentState()
    cb.mu.Unlock()

    switch state {
    case StateOpen:
        return ErrCircuitOpen

    case StateHalfOpen, StateClosed:
        err := fn()
        cb.mu.Lock()
        defer cb.mu.Unlock()

        if err != nil {
            cb.onFailure()
            return err
        }
        cb.onSuccess()
        return nil
    }
    return nil
}

func (cb *CircuitBreaker) currentState() State {
    if cb.state == StateOpen {
        if time.Since(cb.lastFailTime) > cb.resetTimeout {
            cb.state = StateHalfOpen
            cb.successCount = 0
        }
    }
    return cb.state
}

func (cb *CircuitBreaker) onFailure() {
    cb.failCount++
    cb.lastFailTime = time.Now()
    if cb.failCount >= cb.maxFailures {
        cb.state = StateOpen
    }
}

func (cb *CircuitBreaker) onSuccess() {
    if cb.state == StateHalfOpen {
        cb.successCount++
        if cb.successCount >= cb.halfOpenMax {
            cb.state = StateClosed
            cb.failCount = 0
        }
    } else {
        cb.failCount = 0
    }
}

// 使用示例
func CallExternalService(cb *CircuitBreaker, userID int64) (*UserInfo, error) {
    var result *UserInfo
    err := cb.Execute(func() error {
        var err error
        result, err = externalClient.GetUser(userID)
        return err
    })
    if errors.Is(err, ErrCircuitOpen) {
        // 降级策略: 返回缓存数据
        return getCachedUser(userID)
    }
    return result, err
}

5. 生产环境清单

🔒 安全

  • HTTPS 强制开启
  • 所有密钥通过环境变量注入
  • 数据库最小权限账号
  • SQL 注入防护
  • 速率限制
  • 安全响应头

⚡ 性能

  • 数据库连接池配置
  • Redis 缓存热点数据
  • 慢查询监控与优化
  • 合理的超时设置
  • 开启 HTTP/2
  • 静态资源 CDN

📊 可观测性

  • 结构化日志 (JSON)
  • Prometheus 指标暴露
  • 链路追踪集成
  • 健康检查端点
  • 告警规则配置
  • 错误预算监控

🚀 部署

  • Docker 多阶段构建
  • 优雅关机处理
  • 零停机滚动更新
  • 自动回滚策略
  • 数据库迁移管理
  • 多副本高可用
🎯

性能调优方法论

① 先测量,再优化(不要猜测瓶颈) ② 找到最慢的 1% 请求的原因  ③ 优化顺序:算法 > 数据库查询 > 缓存 > 代码层面 > 基础设施  ④ 每次只改一个变量,对比前后 benchmark  ⑤ 对性能敏感路径写 benchmark test