1. 数据库选型

应用层
🐘
PostgreSQL / MySQL
关系型数据库 — 用户、订单、事务、强一致性
ACID
Redis
内存缓存 — 会话、排行榜、分布式锁、消息队列
高速
🍃
MongoDB
文档数据库 — 半结构化数据、日志、配置、内容
灵活
🔍
Elasticsearch
搜索引擎 — 全文检索、日志分析、聚合统计
搜索

2. PostgreSQL + GORM

连接与配置

go get gorm.io/gorm
go get gorm.io/driver/postgres
go get gorm.io/driver/mysql
package database

import (
    "fmt"
    "time"
    "gorm.io/driver/postgres"
    "gorm.io/gorm"
    "gorm.io/gorm/logger"
)

type Config struct {
    Host     string
    Port     int
    User     string
    Password string
    DBName   string
    SSLMode  string
    MaxOpen  int           // 最大连接数
    MaxIdle  int           // 最大空闲连接
    MaxLife  time.Duration // 连接最大存活时间
}

func NewDB(cfg Config) (*gorm.DB, error) {
    dsn := fmt.Sprintf(
        "host=%s port=%d user=%s password=%s dbname=%s sslmode=%s TimeZone=Asia/Shanghai",
        cfg.Host, cfg.Port, cfg.User, cfg.Password, cfg.DBName, cfg.SSLMode,
    )

    db, err := gorm.Open(postgres.Open(dsn), &gorm.Config{
        Logger: logger.Default.LogMode(logger.Info), // 开发模式打印 SQL
        NowFunc: func() time.Time {
            return time.Now().Local()
        },
        PrepareStmt: true, // 预编译语句缓存 (性能优化)
    })
    if err != nil {
        return nil, fmt.Errorf("连接数据库失败: %w", err)
    }

    // 配置连接池
    sqlDB, _ := db.DB()
    sqlDB.SetMaxOpenConns(cfg.MaxOpen)   // 最大打开连接数 (根据 DB 规格设置)
    sqlDB.SetMaxIdleConns(cfg.MaxIdle)   // 空闲连接池大小
    sqlDB.SetConnMaxLifetime(cfg.MaxLife) // 避免连接被网关强制断开
    sqlDB.SetConnMaxIdleTime(10 * time.Minute)

    return db, nil
}

// 健康检查
func Ping(db *gorm.DB) error {
    sqlDB, err := db.DB()
    if err != nil {
        return err
    }
    return sqlDB.Ping()
}

模型定义

package model

import (
    "time"
    "gorm.io/gorm"
)

// 基础模型 (嵌入到所有模型)
type Base struct {
    ID        int64          `gorm:"primarykey" json:"id"`
    CreatedAt time.Time      `json:"created_at"`
    UpdatedAt time.Time      `json:"updated_at"`
    DeletedAt gorm.DeletedAt `gorm:"index" json:"-"` // 软删除
}

type User struct {
    Base
    Name     string `gorm:"type:varchar(100);not null" json:"name"`
    Email    string `gorm:"type:varchar(200);uniqueIndex;not null" json:"email"`
    Password string `gorm:"type:varchar(255);not null" json:"-"`
    Role     string `gorm:"type:varchar(20);default:user" json:"role"`
    Active   bool   `gorm:"default:true" json:"active"`
    Posts    []Post `gorm:"foreignKey:UserID" json:"posts,omitempty"`
}

type Post struct {
    Base
    Title     string    `gorm:"type:varchar(200);not null" json:"title"`
    Content   string    `gorm:"type:text" json:"content"`
    Published bool      `gorm:"default:false" json:"published"`
    UserID    int64     `gorm:"index;not null" json:"user_id"`
    User      User      `gorm:"foreignKey:UserID" json:"user,omitempty"`
    Tags      []Tag     `gorm:"many2many:post_tags" json:"tags,omitempty"`
}

type Tag struct {
    Base
    Name  string `gorm:"type:varchar(50);uniqueIndex;not null" json:"name"`
    Posts []Post `gorm:"many2many:post_tags" json:"-"`
}

// 数据库迁移
func AutoMigrate(db *gorm.DB) error {
    return db.AutoMigrate(&User{}, &Post{}, &Tag{})
}

Repository 模式

package repository

import (
    "context"
    "errors"
    "gorm.io/gorm"
    "myapp/internal/model"
)

var ErrNotFound = errors.New("record not found")

type UserRepository interface {
    FindByID(ctx context.Context, id int64) (*model.User, error)
    FindByEmail(ctx context.Context, email string) (*model.User, error)
    List(ctx context.Context, params ListParams) ([]model.User, int64, error)
    Create(ctx context.Context, user *model.User) error
    Update(ctx context.Context, id int64, updates map[string]any) (*model.User, error)
    Delete(ctx context.Context, id int64) error
}

type ListParams struct {
    Page     int
    PageSize int
    Search   string
    Role     string
    OrderBy  string
}

type userRepo struct {
    db *gorm.DB
}

func NewUserRepository(db *gorm.DB) UserRepository {
    return &userRepo{db: db}
}

func (r *userRepo) FindByID(ctx context.Context, id int64) (*model.User, error) {
    var user model.User
    err := r.db.WithContext(ctx).First(&user, id).Error
    if errors.Is(err, gorm.ErrRecordNotFound) {
        return nil, ErrNotFound
    }
    return &user, err
}

func (r *userRepo) FindByEmail(ctx context.Context, email string) (*model.User, error) {
    var user model.User
    err := r.db.WithContext(ctx).Where("email = ?", email).First(&user).Error
    if errors.Is(err, gorm.ErrRecordNotFound) {
        return nil, ErrNotFound
    }
    return &user, err
}

func (r *userRepo) List(ctx context.Context, params ListParams) ([]model.User, int64, error) {
    var users []model.User
    var total int64

    query := r.db.WithContext(ctx).Model(&model.User{})

    // 动态条件
    if params.Search != "" {
        query = query.Where("name ILIKE ? OR email ILIKE ?",
            "%"+params.Search+"%", "%"+params.Search+"%")
    }
    if params.Role != "" {
        query = query.Where("role = ?", params.Role)
    }

    // 先计数 (总数)
    if err := query.Count(&total).Error; err != nil {
        return nil, 0, err
    }

    // 分页查询
    offset := (params.Page - 1) * params.PageSize
    orderBy := params.OrderBy
    if orderBy == "" {
        orderBy = "created_at DESC"
    }

    err := query.Offset(offset).Limit(params.PageSize).Order(orderBy).Find(&users).Error
    return users, total, err
}

func (r *userRepo) Create(ctx context.Context, user *model.User) error {
    return r.db.WithContext(ctx).Create(user).Error
}

func (r *userRepo) Update(ctx context.Context, id int64, updates map[string]any) (*model.User, error) {
    var user model.User
    err := r.db.WithContext(ctx).
        Model(&user).
        Where("id = ?", id).
        Updates(updates).Error
    if err != nil {
        return nil, err
    }
    return r.FindByID(ctx, id)
}

func (r *userRepo) Delete(ctx context.Context, id int64) error {
    result := r.db.WithContext(ctx).Delete(&model.User{}, id)
    if result.Error != nil {
        return result.Error
    }
    if result.RowsAffected == 0 {
        return ErrNotFound
    }
    return nil
}

// ─── 事务 ─────────────────────────────────────────────────────
func (r *userRepo) TransferWithTx(ctx context.Context, fromID, toID int64, amount float64) error {
    return r.db.WithContext(ctx).Transaction(func(tx *gorm.DB) error {
        // 事务内所有操作使用 tx,不用 r.db
        var from, to model.Account
        if err := tx.First(&from, fromID).Error; err != nil {
            return err
        }
        if from.Balance < amount {
            return errors.New("余额不足")
        }
        if err := tx.First(&to, toID).Error; err != nil {
            return err
        }

        if err := tx.Model(&from).Update("balance", gorm.Expr("balance - ?", amount)).Error; err != nil {
            return err // 事务自动回滚
        }
        if err := tx.Model(&to).Update("balance", gorm.Expr("balance + ?", amount)).Error; err != nil {
            return err
        }

        return nil // 返回 nil 则事务 commit
    })
}

3. Redis 缓存

go get github.com/redis/go-redis/v9
package cache

import (
    "context"
    "encoding/json"
    "fmt"
    "time"
    "github.com/redis/go-redis/v9"
)

type RedisCache struct {
    client *redis.Client
}

func NewRedisCache(addr, password string, db int) *RedisCache {
    rdb := redis.NewClient(&redis.Options{
        Addr:         addr,
        Password:     password,
        DB:           db,
        PoolSize:     20,
        MinIdleConns: 5,
        DialTimeout:  5 * time.Second,
        ReadTimeout:  3 * time.Second,
        WriteTimeout: 3 * time.Second,
    })
    return &RedisCache{client: rdb}
}

// 泛型缓存方法
func Get[T any](ctx context.Context, c *RedisCache, key string) (*T, error) {
    val, err := c.client.Get(ctx, key).Bytes()
    if err == redis.Nil {
        return nil, nil // 缓存未命中
    }
    if err != nil {
        return nil, fmt.Errorf("redis get: %w", err)
    }
    var v T
    if err := json.Unmarshal(val, &v); err != nil {
        return nil, err
    }
    return &v, nil
}

func Set[T any](ctx context.Context, c *RedisCache, key string, val T, ttl time.Duration) error {
    b, err := json.Marshal(val)
    if err != nil {
        return err
    }
    return c.client.Set(ctx, key, b, ttl).Err()
}

// ─── Cache-Aside 模式 ─────────────────────────────────────────
func (c *RedisCache) GetUser(ctx context.Context, id int64, fallback func() (*User, error)) (*User, error) {
    key := fmt.Sprintf("user:%d", id)

    // 1. 先查缓存
    user, err := Get[User](ctx, c, key)
    if err != nil {
        // Redis 故障降级:直接查 DB
        return fallback()
    }
    if user != nil {
        return user, nil // 缓存命中
    }

    // 2. 缓存未命中,查数据库
    user, err = fallback()
    if err != nil {
        return nil, err
    }

    // 3. 写入缓存 (异步,不阻塞主流程)
    go func() {
        Set(context.Background(), c, key, *user, 30*time.Minute)
    }()

    return user, nil
}

// ─── 分布式锁 ─────────────────────────────────────────────────
func (c *RedisCache) AcquireLock(ctx context.Context, key string, ttl time.Duration) (bool, error) {
    // SET NX (仅当不存在时设置)
    return c.client.SetNX(ctx, "lock:"+key, "1", ttl).Result()
}

func (c *RedisCache) ReleaseLock(ctx context.Context, key string) error {
    return c.client.Del(ctx, "lock:"+key).Err()
}

// ─── 排行榜 (Sorted Set) ──────────────────────────────────────
func (c *RedisCache) UpdateScore(ctx context.Context, leaderboard, member string, score float64) error {
    return c.client.ZAdd(ctx, leaderboard, redis.Z{Score: score, Member: member}).Err()
}

func (c *RedisCache) GetTopN(ctx context.Context, leaderboard string, n int) ([]redis.Z, error) {
    return c.client.ZRevRangeWithScores(ctx, leaderboard, 0, int64(n-1)).Result()
}

// ─── 防重复提交 (Token Bucket) ────────────────────────────────
func (c *RedisCache) IsAllowed(ctx context.Context, key string, limit int, window time.Duration) (bool, error) {
    pipe := c.client.Pipeline()
    incr := pipe.Incr(ctx, key)
    pipe.Expire(ctx, key, window)
    _, err := pipe.Exec(ctx)
    if err != nil {
        return false, err
    }
    return incr.Val() <= int64(limit), nil
}

4. 数据库迁移管理

go get github.com/golang-migrate/migrate/v4
-- migrations/000001_create_users.up.sql
CREATE TABLE IF NOT EXISTS users (
    id          BIGSERIAL PRIMARY KEY,
    name        VARCHAR(100) NOT NULL,
    email       VARCHAR(200) NOT NULL UNIQUE,
    password    VARCHAR(255) NOT NULL,
    role        VARCHAR(20)  NOT NULL DEFAULT 'user',
    active      BOOLEAN      NOT NULL DEFAULT TRUE,
    created_at  TIMESTAMPTZ  NOT NULL DEFAULT NOW(),
    updated_at  TIMESTAMPTZ  NOT NULL DEFAULT NOW(),
    deleted_at  TIMESTAMPTZ
);

CREATE INDEX idx_users_email   ON users(email);
CREATE INDEX idx_users_deleted ON users(deleted_at);

-- 自动更新 updated_at
CREATE OR REPLACE FUNCTION update_updated_at()
RETURNS TRIGGER AS $$
BEGIN
    NEW.updated_at = NOW();
    RETURN NEW;
END;
$$ LANGUAGE plpgsql;

CREATE TRIGGER users_updated_at
    BEFORE UPDATE ON users
    FOR EACH ROW EXECUTE FUNCTION update_updated_at();
-- migrations/000001_create_users.down.sql
DROP TRIGGER IF EXISTS users_updated_at ON users;
DROP TABLE IF EXISTS users;
package database

import (
    "github.com/golang-migrate/migrate/v4"
    _ "github.com/golang-migrate/migrate/v4/database/postgres"
    _ "github.com/golang-migrate/migrate/v4/source/file"
)

func RunMigrations(databaseURL string) error {
    m, err := migrate.New("file://migrations", databaseURL)
    if err != nil {
        return fmt.Errorf("创建迁移器失败: %w", err)
    }
    defer m.Close()

    if err := m.Up(); err != nil && err != migrate.ErrNoChange {
        return fmt.Errorf("迁移失败: %w", err)
    }

    v, _, _ := m.Version()
    fmt.Printf("数据库迁移完成,当前版本: %d\n", v)
    return nil
}

5. MongoDB 集成

go get go.mongodb.org/mongo-driver/mongo
package mongodb

import (
    "context"
    "time"
    "go.mongodb.org/mongo-driver/bson"
    "go.mongodb.org/mongo-driver/bson/primitive"
    "go.mongodb.org/mongo-driver/mongo"
    "go.mongodb.org/mongo-driver/mongo/options"
)

type LogEntry struct {
    ID        primitive.ObjectID `bson:"_id,omitempty" json:"id"`
    Level     string             `bson:"level" json:"level"`
    Message   string             `bson:"message" json:"message"`
    UserID    int64              `bson:"user_id,omitempty" json:"user_id"`
    IP        string             `bson:"ip" json:"ip"`
    Metadata  bson.M             `bson:"metadata,omitempty" json:"metadata"`
    CreatedAt time.Time          `bson:"created_at" json:"created_at"`
}

type LogRepository struct {
    col *mongo.Collection
}

func NewMongoDB(uri, database string) (*mongo.Client, error) {
    ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
    defer cancel()

    opts := options.Client().ApplyURI(uri).
        SetMaxPoolSize(50).
        SetMinPoolSize(5).
        SetMaxConnIdleTime(30 * time.Second)

    client, err := mongo.Connect(ctx, opts)
    if err != nil {
        return nil, err
    }
    if err := client.Ping(ctx, nil); err != nil {
        return nil, err
    }
    return client, nil
}

func NewLogRepository(client *mongo.Client, dbName string) *LogRepository {
    col := client.Database(dbName).Collection("logs")
    // 创建 TTL 索引(自动 30 天后删除日志)
    col.Indexes().CreateOne(context.Background(), mongo.IndexModel{
        Keys: bson.D{{Key: "created_at", Value: 1}},
        Options: options.Index().SetExpireAfterSeconds(30 * 24 * 3600),
    })
    return &LogRepository{col: col}
}

func (r *LogRepository) Insert(ctx context.Context, entry *LogEntry) error {
    entry.ID = primitive.NewObjectID()
    entry.CreatedAt = time.Now()
    _, err := r.col.InsertOne(ctx, entry)
    return err
}

func (r *LogRepository) Query(ctx context.Context, filter bson.M, limit int) ([]LogEntry, error) {
    opts := options.Find().
        SetLimit(int64(limit)).
        SetSort(bson.D{{Key: "created_at", Value: -1}})

    cursor, err := r.col.Find(ctx, filter, opts)
    if err != nil {
        return nil, err
    }
    defer cursor.Close(ctx)

    var entries []LogEntry
    return entries, cursor.All(ctx, &entries)
}

// 聚合查询:统计每小时错误数
func (r *LogRepository) HourlyErrorStats(ctx context.Context) ([]bson.M, error) {
    pipeline := mongo.Pipeline{
        // 过滤 error 级别
        {{Key: "$match", Value: bson.D{{Key: "level", Value: "error"}}}},
        // 按小时分组计数
        {{Key: "$group", Value: bson.D{
            {Key: "_id", Value: bson.D{
                {Key: "$dateToString", Value: bson.D{
                    {Key: "format", Value: "%Y-%m-%d %H:00"},
                    {Key: "date", Value: "$created_at"},
                }},
            }},
            {Key: "count", Value: bson.D{{Key: "$sum", Value: 1}}},
        }}},
        {{Key: "$sort", Value: bson.D{{Key: "_id", Value: 1}}}},
    }

    cursor, err := r.col.Aggregate(ctx, pipeline)
    if err != nil {
        return nil, err
    }
    var results []bson.M
    return results, cursor.All(ctx, &results)
}
⚠️

N+1 查询问题

获取 100 个用户时若再逐一查询他们的帖子,会产生 101 次 SQL 请求。使用 GORM 的 PreloadJoins 解决:db.Preload("Posts").Find(&users)

🎯

连接池最佳实践

MaxOpenConns 通常设为 DB 服务器 CPU 核数的 2-4 倍;MaxIdleConns 设为 MaxOpenConns 的一半;ConnMaxLifetime 设为 5-30 分钟(避免连接被防火墙关闭)。

5. 数据库迁移管理(golang-migrate)

原理:数据库迁移工具将 Schema 变更记录为版本化的 SQL 文件(up/down),与代码一起纳入版本控制。每次迁移记录在 schema_migrations 表中,确保不重复执行。

# 安装 golang-migrate CLI
go install -tags 'postgres' github.com/golang-migrate/migrate/v4/cmd/migrate@latest

# 创建新迁移文件
migrate create -ext sql -dir migrations -seq create_users_table
# 生成: migrations/000001_create_users_table.up.sql
#       migrations/000001_create_users_table.down.sql

# 执行迁移(升级到最新版本)
migrate -path migrations -database "postgres://user:pass@localhost/mydb?sslmode=disable" up

# 回滚最近 1 个迁移
migrate -path migrations -database "..." down 1

# 查看当前版本
migrate -path migrations -database "..." version
-- migrations/000001_create_users_table.up.sql
CREATE TABLE IF NOT EXISTS users (
    id         BIGSERIAL PRIMARY KEY,
    name       VARCHAR(100) NOT NULL,
    email      VARCHAR(255) NOT NULL UNIQUE,
    password   VARCHAR(255) NOT NULL,
    role       VARCHAR(20)  NOT NULL DEFAULT 'user',
    created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(),
    updated_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(),
    deleted_at TIMESTAMP WITH TIME ZONE  -- 软删除
);

CREATE INDEX idx_users_email ON users(email);
CREATE INDEX idx_users_deleted_at ON users(deleted_at) WHERE deleted_at IS NOT NULL;

-- 自动更新 updated_at
CREATE OR REPLACE FUNCTION update_updated_at()
RETURNS TRIGGER AS $$
BEGIN NEW.updated_at = NOW(); RETURN NEW; END;
$$ LANGUAGE plpgsql;

CREATE TRIGGER users_updated_at BEFORE UPDATE ON users
    FOR EACH ROW EXECUTE FUNCTION update_updated_at();

-- migrations/000001_create_users_table.down.sql
DROP TABLE IF EXISTS users;
package main

import (
    "database/sql"
    "fmt"
    "log"

    "github.com/golang-migrate/migrate/v4"
    "github.com/golang-migrate/migrate/v4/database/postgres"
    _ "github.com/golang-migrate/migrate/v4/source/file"
)

// 在应用启动时自动执行迁移
func runMigrations(db *sql.DB, migrationsPath string) error {
    driver, err := postgres.WithInstance(db, &postgres.Config{})
    if err != nil {
        return fmt.Errorf("迁移驱动初始化失败: %w", err)
    }

    m, err := migrate.NewWithDatabaseInstance(
        "file://"+migrationsPath,  // 迁移文件路径
        "postgres",
        driver,
    )
    if err != nil {
        return fmt.Errorf("迁移实例创建失败: %w", err)
    }

    if err := m.Up(); err != nil && err != migrate.ErrNoChange {
        return fmt.Errorf("迁移执行失败: %w", err)
    }

    v, _, _ := m.Version()
    log.Printf("数据库迁移完成,当前版本: %d\n", v)
    return nil
}

6. 泛型 Repository 模式(Go 1.18+)

泛型 Repository 将 CRUD 操作抽象为通用接口,减少每个实体都写一遍相同代码的重复工作,同时保持类型安全。

package repository

import (
    "context"
    "database/sql"
    "fmt"
    "strings"
)

// ─── 基础模型约束 ─────────────────────────────────────────────
type Model interface {
    TableName() string
    GetID() int64
}

// ─── 泛型 Repository 基类 ─────────────────────────────────────
type BaseRepository[T Model] struct {
    db *sql.DB
}

func NewBaseRepository[T Model](db *sql.DB) *BaseRepository[T] {
    return &BaseRepository[T]{db: db}
}

// FindByID:通用按 ID 查找(需要子类提供 scan 函数)
func (r *BaseRepository[T]) FindByID(ctx context.Context, id int64,
    scan func(*sql.Row) (T, error)) (T, error) {

    var zero T
    tableName := zero.TableName()
    row := r.db.QueryRowContext(ctx,
        fmt.Sprintf("SELECT * FROM %s WHERE id = $1 AND deleted_at IS NULL", tableName),
        id,
    )
    return scan(row)
}

// List:通用分页查询
type ListParams struct {
    Page     int
    PageSize int
    OrderBy  string
    Desc     bool
}

func (r *BaseRepository[T]) Count(ctx context.Context, where string, args ...any) (int64, error) {
    var zero T
    var count int64
    query := fmt.Sprintf("SELECT COUNT(*) FROM %s WHERE deleted_at IS NULL", zero.TableName())
    if where != "" {
        query += " AND " + where
    }
    return count, r.db.QueryRowContext(ctx, query, args...).Scan(&count)
}

// ─── 具体 User Repository ─────────────────────────────────────
type UserRepository struct {
    *BaseRepository[User]
}

func NewUserRepository(db *sql.DB) *UserRepository {
    return &UserRepository{BaseRepository: NewBaseRepository[User](db)}
}

func (r *UserRepository) FindByEmail(ctx context.Context, email string) (User, error) {
    row := r.db.QueryRowContext(ctx,
        "SELECT id, name, email, role, created_at FROM users WHERE email = $1 AND deleted_at IS NULL",
        email,
    )
    return scanUser(row)
}

func (r *UserRepository) Create(ctx context.Context, u *User) error {
    return r.db.QueryRowContext(ctx,
        "INSERT INTO users (name, email, password, role) VALUES ($1, $2, $3, $4) RETURNING id, created_at",
        u.Name, u.Email, u.Password, u.Role,
    ).Scan(&u.ID, &u.CreatedAt)
}

func scanUser(row *sql.Row) (User, error) {
    var u User
    err := row.Scan(&u.ID, &u.Name, &u.Email, &u.Role, &u.CreatedAt)
    return u, err
}

// User 实现 Model 接口
func (u User) TableName() string { return "users" }
func (u User) GetID() int64      { return u.ID }