1. 数据库选型

应用层
🐘
PostgreSQL / MySQL
关系型数据库 — 用户、订单、事务、强一致性
ACID
Redis
内存缓存 — 会话、排行榜、分布式锁、消息队列
高速
🍃
MongoDB
文档数据库 — 半结构化数据、日志、配置、内容
灵活
🔍
Elasticsearch
搜索引擎 — 全文检索、日志分析、聚合统计
搜索

2. PostgreSQL + GORM

连接与配置

go get gorm.io/gorm
go get gorm.io/driver/postgres
go get gorm.io/driver/mysql
package database

import (
    "fmt"
    "time"
    "gorm.io/driver/postgres"
    "gorm.io/gorm"
    "gorm.io/gorm/logger"
)

type Config struct {
    Host     string
    Port     int
    User     string
    Password string
    DBName   string
    SSLMode  string
    MaxOpen  int           // 最大连接数
    MaxIdle  int           // 最大空闲连接
    MaxLife  time.Duration // 连接最大存活时间
}

func NewDB(cfg Config) (*gorm.DB, error) {
    dsn := fmt.Sprintf(
        "host=%s port=%d user=%s password=%s dbname=%s sslmode=%s TimeZone=Asia/Shanghai",
        cfg.Host, cfg.Port, cfg.User, cfg.Password, cfg.DBName, cfg.SSLMode,
    )

    db, err := gorm.Open(postgres.Open(dsn), &gorm.Config{
        Logger: logger.Default.LogMode(logger.Info), // 开发模式打印 SQL
        NowFunc: func() time.Time {
            return time.Now().Local()
        },
        PrepareStmt: true, // 预编译语句缓存 (性能优化)
    })
    if err != nil {
        return nil, fmt.Errorf("连接数据库失败: %w", err)
    }

    // 配置连接池
    sqlDB, _ := db.DB()
    sqlDB.SetMaxOpenConns(cfg.MaxOpen)   // 最大打开连接数 (根据 DB 规格设置)
    sqlDB.SetMaxIdleConns(cfg.MaxIdle)   // 空闲连接池大小
    sqlDB.SetConnMaxLifetime(cfg.MaxLife) // 避免连接被网关强制断开
    sqlDB.SetConnMaxIdleTime(10 * time.Minute)

    return db, nil
}

// 健康检查
func Ping(db *gorm.DB) error {
    sqlDB, err := db.DB()
    if err != nil {
        return err
    }
    return sqlDB.Ping()
}

模型定义

package model

import (
    "time"
    "gorm.io/gorm"
)

// 基础模型 (嵌入到所有模型)
type Base struct {
    ID        int64          `gorm:"primarykey" json:"id"`
    CreatedAt time.Time      `json:"created_at"`
    UpdatedAt time.Time      `json:"updated_at"`
    DeletedAt gorm.DeletedAt `gorm:"index" json:"-"` // 软删除
}

type User struct {
    Base
    Name     string `gorm:"type:varchar(100);not null" json:"name"`
    Email    string `gorm:"type:varchar(200);uniqueIndex;not null" json:"email"`
    Password string `gorm:"type:varchar(255);not null" json:"-"`
    Role     string `gorm:"type:varchar(20);default:user" json:"role"`
    Active   bool   `gorm:"default:true" json:"active"`
    Posts    []Post `gorm:"foreignKey:UserID" json:"posts,omitempty"`
}

type Post struct {
    Base
    Title     string    `gorm:"type:varchar(200);not null" json:"title"`
    Content   string    `gorm:"type:text" json:"content"`
    Published bool      `gorm:"default:false" json:"published"`
    UserID    int64     `gorm:"index;not null" json:"user_id"`
    User      User      `gorm:"foreignKey:UserID" json:"user,omitempty"`
    Tags      []Tag     `gorm:"many2many:post_tags" json:"tags,omitempty"`
}

type Tag struct {
    Base
    Name  string `gorm:"type:varchar(50);uniqueIndex;not null" json:"name"`
    Posts []Post `gorm:"many2many:post_tags" json:"-"`
}

// 数据库迁移
func AutoMigrate(db *gorm.DB) error {
    return db.AutoMigrate(&User{}, &Post{}, &Tag{})
}

Repository 模式

package repository

import (
    "context"
    "errors"
    "gorm.io/gorm"
    "myapp/internal/model"
)

var ErrNotFound = errors.New("record not found")

type UserRepository interface {
    FindByID(ctx context.Context, id int64) (*model.User, error)
    FindByEmail(ctx context.Context, email string) (*model.User, error)
    List(ctx context.Context, params ListParams) ([]model.User, int64, error)
    Create(ctx context.Context, user *model.User) error
    Update(ctx context.Context, id int64, updates map[string]any) (*model.User, error)
    Delete(ctx context.Context, id int64) error
}

type ListParams struct {
    Page     int
    PageSize int
    Search   string
    Role     string
    OrderBy  string
}

type userRepo struct {
    db *gorm.DB
}

func NewUserRepository(db *gorm.DB) UserRepository {
    return &userRepo{db: db}
}

func (r *userRepo) FindByID(ctx context.Context, id int64) (*model.User, error) {
    var user model.User
    err := r.db.WithContext(ctx).First(&user, id).Error
    if errors.Is(err, gorm.ErrRecordNotFound) {
        return nil, ErrNotFound
    }
    return &user, err
}

func (r *userRepo) FindByEmail(ctx context.Context, email string) (*model.User, error) {
    var user model.User
    err := r.db.WithContext(ctx).Where("email = ?", email).First(&user).Error
    if errors.Is(err, gorm.ErrRecordNotFound) {
        return nil, ErrNotFound
    }
    return &user, err
}

func (r *userRepo) List(ctx context.Context, params ListParams) ([]model.User, int64, error) {
    var users []model.User
    var total int64

    query := r.db.WithContext(ctx).Model(&model.User{})

    // 动态条件
    if params.Search != "" {
        query = query.Where("name ILIKE ? OR email ILIKE ?",
            "%"+params.Search+"%", "%"+params.Search+"%")
    }
    if params.Role != "" {
        query = query.Where("role = ?", params.Role)
    }

    // 先计数 (总数)
    if err := query.Count(&total).Error; err != nil {
        return nil, 0, err
    }

    // 分页查询
    offset := (params.Page - 1) * params.PageSize
    orderBy := params.OrderBy
    if orderBy == "" {
        orderBy = "created_at DESC"
    }

    err := query.Offset(offset).Limit(params.PageSize).Order(orderBy).Find(&users).Error
    return users, total, err
}

func (r *userRepo) Create(ctx context.Context, user *model.User) error {
    return r.db.WithContext(ctx).Create(user).Error
}

func (r *userRepo) Update(ctx context.Context, id int64, updates map[string]any) (*model.User, error) {
    var user model.User
    err := r.db.WithContext(ctx).
        Model(&user).
        Where("id = ?", id).
        Updates(updates).Error
    if err != nil {
        return nil, err
    }
    return r.FindByID(ctx, id)
}

func (r *userRepo) Delete(ctx context.Context, id int64) error {
    result := r.db.WithContext(ctx).Delete(&model.User{}, id)
    if result.Error != nil {
        return result.Error
    }
    if result.RowsAffected == 0 {
        return ErrNotFound
    }
    return nil
}

// ─── 事务 ─────────────────────────────────────────────────────
func (r *userRepo) TransferWithTx(ctx context.Context, fromID, toID int64, amount float64) error {
    return r.db.WithContext(ctx).Transaction(func(tx *gorm.DB) error {
        // 事务内所有操作使用 tx,不用 r.db
        var from, to model.Account
        if err := tx.First(&from, fromID).Error; err != nil {
            return err
        }
        if from.Balance < amount {
            return errors.New("余额不足")
        }
        if err := tx.First(&to, toID).Error; err != nil {
            return err
        }

        if err := tx.Model(&from).Update("balance", gorm.Expr("balance - ?", amount)).Error; err != nil {
            return err // 事务自动回滚
        }
        if err := tx.Model(&to).Update("balance", gorm.Expr("balance + ?", amount)).Error; err != nil {
            return err
        }

        return nil // 返回 nil 则事务 commit
    })
}

3. Redis 缓存

go get github.com/redis/go-redis/v9
package cache

import (
    "context"
    "encoding/json"
    "fmt"
    "time"
    "github.com/redis/go-redis/v9"
)

type RedisCache struct {
    client *redis.Client
}

func NewRedisCache(addr, password string, db int) *RedisCache {
    rdb := redis.NewClient(&redis.Options{
        Addr:         addr,
        Password:     password,
        DB:           db,
        PoolSize:     20,
        MinIdleConns: 5,
        DialTimeout:  5 * time.Second,
        ReadTimeout:  3 * time.Second,
        WriteTimeout: 3 * time.Second,
    })
    return &RedisCache{client: rdb}
}

// 泛型缓存方法
func Get[T any](ctx context.Context, c *RedisCache, key string) (*T, error) {
    val, err := c.client.Get(ctx, key).Bytes()
    if err == redis.Nil {
        return nil, nil // 缓存未命中
    }
    if err != nil {
        return nil, fmt.Errorf("redis get: %w", err)
    }
    var v T
    if err := json.Unmarshal(val, &v); err != nil {
        return nil, err
    }
    return &v, nil
}

func Set[T any](ctx context.Context, c *RedisCache, key string, val T, ttl time.Duration) error {
    b, err := json.Marshal(val)
    if err != nil {
        return err
    }
    return c.client.Set(ctx, key, b, ttl).Err()
}

// ─── Cache-Aside 模式 ─────────────────────────────────────────
func (c *RedisCache) GetUser(ctx context.Context, id int64, fallback func() (*User, error)) (*User, error) {
    key := fmt.Sprintf("user:%d", id)

    // 1. 先查缓存
    user, err := Get[User](ctx, c, key)
    if err != nil {
        // Redis 故障降级:直接查 DB
        return fallback()
    }
    if user != nil {
        return user, nil // 缓存命中
    }

    // 2. 缓存未命中,查数据库
    user, err = fallback()
    if err != nil {
        return nil, err
    }

    // 3. 写入缓存 (异步,不阻塞主流程)
    go func() {
        Set(context.Background(), c, key, *user, 30*time.Minute)
    }()

    return user, nil
}

// ─── 分布式锁 ─────────────────────────────────────────────────
func (c *RedisCache) AcquireLock(ctx context.Context, key string, ttl time.Duration) (bool, error) {
    // SET NX (仅当不存在时设置)
    return c.client.SetNX(ctx, "lock:"+key, "1", ttl).Result()
}

func (c *RedisCache) ReleaseLock(ctx context.Context, key string) error {
    return c.client.Del(ctx, "lock:"+key).Err()
}

// ─── 排行榜 (Sorted Set) ──────────────────────────────────────
func (c *RedisCache) UpdateScore(ctx context.Context, leaderboard, member string, score float64) error {
    return c.client.ZAdd(ctx, leaderboard, redis.Z{Score: score, Member: member}).Err()
}

func (c *RedisCache) GetTopN(ctx context.Context, leaderboard string, n int) ([]redis.Z, error) {
    return c.client.ZRevRangeWithScores(ctx, leaderboard, 0, int64(n-1)).Result()
}

// ─── 防重复提交 (Token Bucket) ────────────────────────────────
func (c *RedisCache) IsAllowed(ctx context.Context, key string, limit int, window time.Duration) (bool, error) {
    pipe := c.client.Pipeline()
    incr := pipe.Incr(ctx, key)
    pipe.Expire(ctx, key, window)
    _, err := pipe.Exec(ctx)
    if err != nil {
        return false, err
    }
    return incr.Val() <= int64(limit), nil
}

4. 数据库迁移管理

go get github.com/golang-migrate/migrate/v4
-- migrations/000001_create_users.up.sql
CREATE TABLE IF NOT EXISTS users (
    id          BIGSERIAL PRIMARY KEY,
    name        VARCHAR(100) NOT NULL,
    email       VARCHAR(200) NOT NULL UNIQUE,
    password    VARCHAR(255) NOT NULL,
    role        VARCHAR(20)  NOT NULL DEFAULT 'user',
    active      BOOLEAN      NOT NULL DEFAULT TRUE,
    created_at  TIMESTAMPTZ  NOT NULL DEFAULT NOW(),
    updated_at  TIMESTAMPTZ  NOT NULL DEFAULT NOW(),
    deleted_at  TIMESTAMPTZ
);

CREATE INDEX idx_users_email   ON users(email);
CREATE INDEX idx_users_deleted ON users(deleted_at);

-- 自动更新 updated_at
CREATE OR REPLACE FUNCTION update_updated_at()
RETURNS TRIGGER AS $$
BEGIN
    NEW.updated_at = NOW();
    RETURN NEW;
END;
$$ LANGUAGE plpgsql;

CREATE TRIGGER users_updated_at
    BEFORE UPDATE ON users
    FOR EACH ROW EXECUTE FUNCTION update_updated_at();
-- migrations/000001_create_users.down.sql
DROP TRIGGER IF EXISTS users_updated_at ON users;
DROP TABLE IF EXISTS users;
package database

import (
    "github.com/golang-migrate/migrate/v4"
    _ "github.com/golang-migrate/migrate/v4/database/postgres"
    _ "github.com/golang-migrate/migrate/v4/source/file"
)

func RunMigrations(databaseURL string) error {
    m, err := migrate.New("file://migrations", databaseURL)
    if err != nil {
        return fmt.Errorf("创建迁移器失败: %w", err)
    }
    defer m.Close()

    if err := m.Up(); err != nil && err != migrate.ErrNoChange {
        return fmt.Errorf("迁移失败: %w", err)
    }

    v, _, _ := m.Version()
    fmt.Printf("数据库迁移完成,当前版本: %d\n", v)
    return nil
}

5. MongoDB 集成

go get go.mongodb.org/mongo-driver/mongo
package mongodb

import (
    "context"
    "time"
    "go.mongodb.org/mongo-driver/bson"
    "go.mongodb.org/mongo-driver/bson/primitive"
    "go.mongodb.org/mongo-driver/mongo"
    "go.mongodb.org/mongo-driver/mongo/options"
)

type LogEntry struct {
    ID        primitive.ObjectID `bson:"_id,omitempty" json:"id"`
    Level     string             `bson:"level" json:"level"`
    Message   string             `bson:"message" json:"message"`
    UserID    int64              `bson:"user_id,omitempty" json:"user_id"`
    IP        string             `bson:"ip" json:"ip"`
    Metadata  bson.M             `bson:"metadata,omitempty" json:"metadata"`
    CreatedAt time.Time          `bson:"created_at" json:"created_at"`
}

type LogRepository struct {
    col *mongo.Collection
}

func NewMongoDB(uri, database string) (*mongo.Client, error) {
    ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
    defer cancel()

    opts := options.Client().ApplyURI(uri).
        SetMaxPoolSize(50).
        SetMinPoolSize(5).
        SetMaxConnIdleTime(30 * time.Second)

    client, err := mongo.Connect(ctx, opts)
    if err != nil {
        return nil, err
    }
    if err := client.Ping(ctx, nil); err != nil {
        return nil, err
    }
    return client, nil
}

func NewLogRepository(client *mongo.Client, dbName string) *LogRepository {
    col := client.Database(dbName).Collection("logs")
    // 创建 TTL 索引(自动 30 天后删除日志)
    col.Indexes().CreateOne(context.Background(), mongo.IndexModel{
        Keys: bson.D{{Key: "created_at", Value: 1}},
        Options: options.Index().SetExpireAfterSeconds(30 * 24 * 3600),
    })
    return &LogRepository{col: col}
}

func (r *LogRepository) Insert(ctx context.Context, entry *LogEntry) error {
    entry.ID = primitive.NewObjectID()
    entry.CreatedAt = time.Now()
    _, err := r.col.InsertOne(ctx, entry)
    return err
}

func (r *LogRepository) Query(ctx context.Context, filter bson.M, limit int) ([]LogEntry, error) {
    opts := options.Find().
        SetLimit(int64(limit)).
        SetSort(bson.D{{Key: "created_at", Value: -1}})

    cursor, err := r.col.Find(ctx, filter, opts)
    if err != nil {
        return nil, err
    }
    defer cursor.Close(ctx)

    var entries []LogEntry
    return entries, cursor.All(ctx, &entries)
}

// 聚合查询:统计每小时错误数
func (r *LogRepository) HourlyErrorStats(ctx context.Context) ([]bson.M, error) {
    pipeline := mongo.Pipeline{
        // 过滤 error 级别
        {{Key: "$match", Value: bson.D{{Key: "level", Value: "error"}}}},
        // 按小时分组计数
        {{Key: "$group", Value: bson.D{
            {Key: "_id", Value: bson.D{
                {Key: "$dateToString", Value: bson.D{
                    {Key: "format", Value: "%Y-%m-%d %H:00"},
                    {Key: "date", Value: "$created_at"},
                }},
            }},
            {Key: "count", Value: bson.D{{Key: "$sum", Value: 1}}},
        }}},
        {{Key: "$sort", Value: bson.D{{Key: "_id", Value: 1}}}},
    }

    cursor, err := r.col.Aggregate(ctx, pipeline)
    if err != nil {
        return nil, err
    }
    var results []bson.M
    return results, cursor.All(ctx, &results)
}
⚠️

N+1 查询问题

获取 100 个用户时若再逐一查询他们的帖子,会产生 101 次 SQL 请求。使用 GORM 的 PreloadJoins 解决:db.Preload("Posts").Find(&users)

🎯

连接池最佳实践

MaxOpenConns 通常设为 DB 服务器 CPU 核数的 2-4 倍;MaxIdleConns 设为 MaxOpenConns 的一半;ConnMaxLifetime 设为 5-30 分钟(避免连接被防火墙关闭)。