🗄️ 数据库集成
掌握 GORM ORM、Redis 缓存、MongoDB 文档存储,以及连接池优化、事务管理、数据迁移最佳实践。
1. 数据库选型
应用层
🐘
PostgreSQL / MySQL
关系型数据库 — 用户、订单、事务、强一致性
ACID
⚡
Redis
内存缓存 — 会话、排行榜、分布式锁、消息队列
高速
🍃
MongoDB
文档数据库 — 半结构化数据、日志、配置、内容
灵活
🔍
Elasticsearch
搜索引擎 — 全文检索、日志分析、聚合统计
搜索
2. PostgreSQL + GORM
连接与配置
go get gorm.io/gorm
go get gorm.io/driver/postgres
go get gorm.io/driver/mysql
package database
import (
"fmt"
"time"
"gorm.io/driver/postgres"
"gorm.io/gorm"
"gorm.io/gorm/logger"
)
type Config struct {
Host string
Port int
User string
Password string
DBName string
SSLMode string
MaxOpen int // 最大连接数
MaxIdle int // 最大空闲连接
MaxLife time.Duration // 连接最大存活时间
}
func NewDB(cfg Config) (*gorm.DB, error) {
dsn := fmt.Sprintf(
"host=%s port=%d user=%s password=%s dbname=%s sslmode=%s TimeZone=Asia/Shanghai",
cfg.Host, cfg.Port, cfg.User, cfg.Password, cfg.DBName, cfg.SSLMode,
)
db, err := gorm.Open(postgres.Open(dsn), &gorm.Config{
Logger: logger.Default.LogMode(logger.Info), // 开发模式打印 SQL
NowFunc: func() time.Time {
return time.Now().Local()
},
PrepareStmt: true, // 预编译语句缓存 (性能优化)
})
if err != nil {
return nil, fmt.Errorf("连接数据库失败: %w", err)
}
// 配置连接池
sqlDB, _ := db.DB()
sqlDB.SetMaxOpenConns(cfg.MaxOpen) // 最大打开连接数 (根据 DB 规格设置)
sqlDB.SetMaxIdleConns(cfg.MaxIdle) // 空闲连接池大小
sqlDB.SetConnMaxLifetime(cfg.MaxLife) // 避免连接被网关强制断开
sqlDB.SetConnMaxIdleTime(10 * time.Minute)
return db, nil
}
// 健康检查
func Ping(db *gorm.DB) error {
sqlDB, err := db.DB()
if err != nil {
return err
}
return sqlDB.Ping()
}
模型定义
package model
import (
"time"
"gorm.io/gorm"
)
// 基础模型 (嵌入到所有模型)
type Base struct {
ID int64 `gorm:"primarykey" json:"id"`
CreatedAt time.Time `json:"created_at"`
UpdatedAt time.Time `json:"updated_at"`
DeletedAt gorm.DeletedAt `gorm:"index" json:"-"` // 软删除
}
type User struct {
Base
Name string `gorm:"type:varchar(100);not null" json:"name"`
Email string `gorm:"type:varchar(200);uniqueIndex;not null" json:"email"`
Password string `gorm:"type:varchar(255);not null" json:"-"`
Role string `gorm:"type:varchar(20);default:user" json:"role"`
Active bool `gorm:"default:true" json:"active"`
Posts []Post `gorm:"foreignKey:UserID" json:"posts,omitempty"`
}
type Post struct {
Base
Title string `gorm:"type:varchar(200);not null" json:"title"`
Content string `gorm:"type:text" json:"content"`
Published bool `gorm:"default:false" json:"published"`
UserID int64 `gorm:"index;not null" json:"user_id"`
User User `gorm:"foreignKey:UserID" json:"user,omitempty"`
Tags []Tag `gorm:"many2many:post_tags" json:"tags,omitempty"`
}
type Tag struct {
Base
Name string `gorm:"type:varchar(50);uniqueIndex;not null" json:"name"`
Posts []Post `gorm:"many2many:post_tags" json:"-"`
}
// 数据库迁移
func AutoMigrate(db *gorm.DB) error {
return db.AutoMigrate(&User{}, &Post{}, &Tag{})
}
Repository 模式
package repository
import (
"context"
"errors"
"gorm.io/gorm"
"myapp/internal/model"
)
var ErrNotFound = errors.New("record not found")
type UserRepository interface {
FindByID(ctx context.Context, id int64) (*model.User, error)
FindByEmail(ctx context.Context, email string) (*model.User, error)
List(ctx context.Context, params ListParams) ([]model.User, int64, error)
Create(ctx context.Context, user *model.User) error
Update(ctx context.Context, id int64, updates map[string]any) (*model.User, error)
Delete(ctx context.Context, id int64) error
}
type ListParams struct {
Page int
PageSize int
Search string
Role string
OrderBy string
}
type userRepo struct {
db *gorm.DB
}
func NewUserRepository(db *gorm.DB) UserRepository {
return &userRepo{db: db}
}
func (r *userRepo) FindByID(ctx context.Context, id int64) (*model.User, error) {
var user model.User
err := r.db.WithContext(ctx).First(&user, id).Error
if errors.Is(err, gorm.ErrRecordNotFound) {
return nil, ErrNotFound
}
return &user, err
}
func (r *userRepo) FindByEmail(ctx context.Context, email string) (*model.User, error) {
var user model.User
err := r.db.WithContext(ctx).Where("email = ?", email).First(&user).Error
if errors.Is(err, gorm.ErrRecordNotFound) {
return nil, ErrNotFound
}
return &user, err
}
func (r *userRepo) List(ctx context.Context, params ListParams) ([]model.User, int64, error) {
var users []model.User
var total int64
query := r.db.WithContext(ctx).Model(&model.User{})
// 动态条件
if params.Search != "" {
query = query.Where("name ILIKE ? OR email ILIKE ?",
"%"+params.Search+"%", "%"+params.Search+"%")
}
if params.Role != "" {
query = query.Where("role = ?", params.Role)
}
// 先计数 (总数)
if err := query.Count(&total).Error; err != nil {
return nil, 0, err
}
// 分页查询
offset := (params.Page - 1) * params.PageSize
orderBy := params.OrderBy
if orderBy == "" {
orderBy = "created_at DESC"
}
err := query.Offset(offset).Limit(params.PageSize).Order(orderBy).Find(&users).Error
return users, total, err
}
func (r *userRepo) Create(ctx context.Context, user *model.User) error {
return r.db.WithContext(ctx).Create(user).Error
}
func (r *userRepo) Update(ctx context.Context, id int64, updates map[string]any) (*model.User, error) {
var user model.User
err := r.db.WithContext(ctx).
Model(&user).
Where("id = ?", id).
Updates(updates).Error
if err != nil {
return nil, err
}
return r.FindByID(ctx, id)
}
func (r *userRepo) Delete(ctx context.Context, id int64) error {
result := r.db.WithContext(ctx).Delete(&model.User{}, id)
if result.Error != nil {
return result.Error
}
if result.RowsAffected == 0 {
return ErrNotFound
}
return nil
}
// ─── 事务 ─────────────────────────────────────────────────────
func (r *userRepo) TransferWithTx(ctx context.Context, fromID, toID int64, amount float64) error {
return r.db.WithContext(ctx).Transaction(func(tx *gorm.DB) error {
// 事务内所有操作使用 tx,不用 r.db
var from, to model.Account
if err := tx.First(&from, fromID).Error; err != nil {
return err
}
if from.Balance < amount {
return errors.New("余额不足")
}
if err := tx.First(&to, toID).Error; err != nil {
return err
}
if err := tx.Model(&from).Update("balance", gorm.Expr("balance - ?", amount)).Error; err != nil {
return err // 事务自动回滚
}
if err := tx.Model(&to).Update("balance", gorm.Expr("balance + ?", amount)).Error; err != nil {
return err
}
return nil // 返回 nil 则事务 commit
})
}
3. Redis 缓存
go get github.com/redis/go-redis/v9
package cache
import (
"context"
"encoding/json"
"fmt"
"time"
"github.com/redis/go-redis/v9"
)
type RedisCache struct {
client *redis.Client
}
func NewRedisCache(addr, password string, db int) *RedisCache {
rdb := redis.NewClient(&redis.Options{
Addr: addr,
Password: password,
DB: db,
PoolSize: 20,
MinIdleConns: 5,
DialTimeout: 5 * time.Second,
ReadTimeout: 3 * time.Second,
WriteTimeout: 3 * time.Second,
})
return &RedisCache{client: rdb}
}
// 泛型缓存方法
func Get[T any](ctx context.Context, c *RedisCache, key string) (*T, error) {
val, err := c.client.Get(ctx, key).Bytes()
if err == redis.Nil {
return nil, nil // 缓存未命中
}
if err != nil {
return nil, fmt.Errorf("redis get: %w", err)
}
var v T
if err := json.Unmarshal(val, &v); err != nil {
return nil, err
}
return &v, nil
}
func Set[T any](ctx context.Context, c *RedisCache, key string, val T, ttl time.Duration) error {
b, err := json.Marshal(val)
if err != nil {
return err
}
return c.client.Set(ctx, key, b, ttl).Err()
}
// ─── Cache-Aside 模式 ─────────────────────────────────────────
func (c *RedisCache) GetUser(ctx context.Context, id int64, fallback func() (*User, error)) (*User, error) {
key := fmt.Sprintf("user:%d", id)
// 1. 先查缓存
user, err := Get[User](ctx, c, key)
if err != nil {
// Redis 故障降级:直接查 DB
return fallback()
}
if user != nil {
return user, nil // 缓存命中
}
// 2. 缓存未命中,查数据库
user, err = fallback()
if err != nil {
return nil, err
}
// 3. 写入缓存 (异步,不阻塞主流程)
go func() {
Set(context.Background(), c, key, *user, 30*time.Minute)
}()
return user, nil
}
// ─── 分布式锁 ─────────────────────────────────────────────────
func (c *RedisCache) AcquireLock(ctx context.Context, key string, ttl time.Duration) (bool, error) {
// SET NX (仅当不存在时设置)
return c.client.SetNX(ctx, "lock:"+key, "1", ttl).Result()
}
func (c *RedisCache) ReleaseLock(ctx context.Context, key string) error {
return c.client.Del(ctx, "lock:"+key).Err()
}
// ─── 排行榜 (Sorted Set) ──────────────────────────────────────
func (c *RedisCache) UpdateScore(ctx context.Context, leaderboard, member string, score float64) error {
return c.client.ZAdd(ctx, leaderboard, redis.Z{Score: score, Member: member}).Err()
}
func (c *RedisCache) GetTopN(ctx context.Context, leaderboard string, n int) ([]redis.Z, error) {
return c.client.ZRevRangeWithScores(ctx, leaderboard, 0, int64(n-1)).Result()
}
// ─── 防重复提交 (Token Bucket) ────────────────────────────────
func (c *RedisCache) IsAllowed(ctx context.Context, key string, limit int, window time.Duration) (bool, error) {
pipe := c.client.Pipeline()
incr := pipe.Incr(ctx, key)
pipe.Expire(ctx, key, window)
_, err := pipe.Exec(ctx)
if err != nil {
return false, err
}
return incr.Val() <= int64(limit), nil
}
4. 数据库迁移管理
go get github.com/golang-migrate/migrate/v4
-- migrations/000001_create_users.up.sql
CREATE TABLE IF NOT EXISTS users (
id BIGSERIAL PRIMARY KEY,
name VARCHAR(100) NOT NULL,
email VARCHAR(200) NOT NULL UNIQUE,
password VARCHAR(255) NOT NULL,
role VARCHAR(20) NOT NULL DEFAULT 'user',
active BOOLEAN NOT NULL DEFAULT TRUE,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
deleted_at TIMESTAMPTZ
);
CREATE INDEX idx_users_email ON users(email);
CREATE INDEX idx_users_deleted ON users(deleted_at);
-- 自动更新 updated_at
CREATE OR REPLACE FUNCTION update_updated_at()
RETURNS TRIGGER AS $$
BEGIN
NEW.updated_at = NOW();
RETURN NEW;
END;
$$ LANGUAGE plpgsql;
CREATE TRIGGER users_updated_at
BEFORE UPDATE ON users
FOR EACH ROW EXECUTE FUNCTION update_updated_at();
-- migrations/000001_create_users.down.sql
DROP TRIGGER IF EXISTS users_updated_at ON users;
DROP TABLE IF EXISTS users;
package database
import (
"github.com/golang-migrate/migrate/v4"
_ "github.com/golang-migrate/migrate/v4/database/postgres"
_ "github.com/golang-migrate/migrate/v4/source/file"
)
func RunMigrations(databaseURL string) error {
m, err := migrate.New("file://migrations", databaseURL)
if err != nil {
return fmt.Errorf("创建迁移器失败: %w", err)
}
defer m.Close()
if err := m.Up(); err != nil && err != migrate.ErrNoChange {
return fmt.Errorf("迁移失败: %w", err)
}
v, _, _ := m.Version()
fmt.Printf("数据库迁移完成,当前版本: %d\n", v)
return nil
}
5. MongoDB 集成
go get go.mongodb.org/mongo-driver/mongo
package mongodb
import (
"context"
"time"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/bson/primitive"
"go.mongodb.org/mongo-driver/mongo"
"go.mongodb.org/mongo-driver/mongo/options"
)
type LogEntry struct {
ID primitive.ObjectID `bson:"_id,omitempty" json:"id"`
Level string `bson:"level" json:"level"`
Message string `bson:"message" json:"message"`
UserID int64 `bson:"user_id,omitempty" json:"user_id"`
IP string `bson:"ip" json:"ip"`
Metadata bson.M `bson:"metadata,omitempty" json:"metadata"`
CreatedAt time.Time `bson:"created_at" json:"created_at"`
}
type LogRepository struct {
col *mongo.Collection
}
func NewMongoDB(uri, database string) (*mongo.Client, error) {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
opts := options.Client().ApplyURI(uri).
SetMaxPoolSize(50).
SetMinPoolSize(5).
SetMaxConnIdleTime(30 * time.Second)
client, err := mongo.Connect(ctx, opts)
if err != nil {
return nil, err
}
if err := client.Ping(ctx, nil); err != nil {
return nil, err
}
return client, nil
}
func NewLogRepository(client *mongo.Client, dbName string) *LogRepository {
col := client.Database(dbName).Collection("logs")
// 创建 TTL 索引(自动 30 天后删除日志)
col.Indexes().CreateOne(context.Background(), mongo.IndexModel{
Keys: bson.D{{Key: "created_at", Value: 1}},
Options: options.Index().SetExpireAfterSeconds(30 * 24 * 3600),
})
return &LogRepository{col: col}
}
func (r *LogRepository) Insert(ctx context.Context, entry *LogEntry) error {
entry.ID = primitive.NewObjectID()
entry.CreatedAt = time.Now()
_, err := r.col.InsertOne(ctx, entry)
return err
}
func (r *LogRepository) Query(ctx context.Context, filter bson.M, limit int) ([]LogEntry, error) {
opts := options.Find().
SetLimit(int64(limit)).
SetSort(bson.D{{Key: "created_at", Value: -1}})
cursor, err := r.col.Find(ctx, filter, opts)
if err != nil {
return nil, err
}
defer cursor.Close(ctx)
var entries []LogEntry
return entries, cursor.All(ctx, &entries)
}
// 聚合查询:统计每小时错误数
func (r *LogRepository) HourlyErrorStats(ctx context.Context) ([]bson.M, error) {
pipeline := mongo.Pipeline{
// 过滤 error 级别
{{Key: "$match", Value: bson.D{{Key: "level", Value: "error"}}}},
// 按小时分组计数
{{Key: "$group", Value: bson.D{
{Key: "_id", Value: bson.D{
{Key: "$dateToString", Value: bson.D{
{Key: "format", Value: "%Y-%m-%d %H:00"},
{Key: "date", Value: "$created_at"},
}},
}},
{Key: "count", Value: bson.D{{Key: "$sum", Value: 1}}},
}}},
{{Key: "$sort", Value: bson.D{{Key: "_id", Value: 1}}}},
}
cursor, err := r.col.Aggregate(ctx, pipeline)
if err != nil {
return nil, err
}
var results []bson.M
return results, cursor.All(ctx, &results)
}
N+1 查询问题
获取 100 个用户时若再逐一查询他们的帖子,会产生 101 次 SQL 请求。使用 GORM 的 Preload 或 Joins 解决:db.Preload("Posts").Find(&users)
连接池最佳实践
MaxOpenConns 通常设为 DB 服务器 CPU 核数的 2-4 倍;MaxIdleConns 设为 MaxOpenConns 的一半;ConnMaxLifetime 设为 5-30 分钟(避免连接被防火墙关闭)。