数据库开发指南
本指南详细介绍了 AI SaaS Template 中基于 Drizzle ORM 和 PostgreSQL 的数据库开发模式,包括模式设计、数据操作、类型安全查询和最佳实践。
技术栈概述
核心技术
- 数据库: PostgreSQL (推荐 Neon/Supabase)
- ORM: Drizzle ORM - 类型安全的轻量级 ORM
- 迁移工具: Drizzle Kit - 数据库迁移和管理
- 连接方式: HTTP/WebSocket 连接 (适合 serverless)
- 类型安全: 完全的 TypeScript 类型推断
项目数据库结构
src/lib/db/
├── index.ts # 数据库连接和配置
├── schema.ts # 数据库表结构定义
├── types.ts # 数据库相关类型
├── migrations/ # 数据库迁移文件
└── queries/ # 复用的查询函数
├── users.ts # 用户相关查询
├── payments.ts # 支付相关查询
└── index.ts # 查询函数导出数据库连接配置
数据库连接设置
// src/lib/db/index.ts
import { drizzle } from 'drizzle-orm/postgres-js'
import postgres from 'postgres'
import { env } from '@/env'
import * as schema from './schema'
// 创建数据库连接
const client = postgres(env.DATABASE_URL, {
prepare: false, // 禁用预处理语句 (Neon 兼容性)
max: 20, // 最大连接数
})
// 创建 Drizzle 实例
export const db = drizzle(client, {
schema,
logger: env.NODE_ENV === 'development', // 开发环境启用日志
})
// 导出数据库类型
export type Database = typeof db
export * from './schema'
export * from './queries'Drizzle 配置文件
// drizzle.config.ts
import { defineConfig } from 'drizzle-kit'
import { env } from './env'
export default defineConfig({
schema: './src/lib/db/schema.ts',
out: './drizzle',
dialect: 'postgresql',
dbCredentials: {
url: env.DATABASE_URL,
},
verbose: true,
strict: true,
})数据库模式设计
核心数据表
// src/lib/db/schema.ts
import {
pgTable,
text,
timestamp,
boolean,
integer,
decimal,
jsonb,
varchar,
unique,
index,
} from 'drizzle-orm/pg-core'
import { createId } from '@paralleldrive/cuid2'
// 用户表
export const users = pgTable('users', {
id: text('id').primaryKey().$defaultFn(() => createId()),
clerkId: text('clerk_id').notNull().unique(), // Clerk 用户 ID
email: text('email').notNull().unique(),
name: text('name'),
imageUrl: text('image_url'),
role: text('role', { enum: ['user', 'admin'] }).notNull().default('user'),
// 订阅信息
subscriptionId: text('subscription_id'),
subscriptionStatus: text('subscription_status'),
planType: text('plan_type', { enum: ['free', 'basic', 'pro'] }).notNull().default('free'),
// AI 使用统计
aiUsageCount: integer('ai_usage_count').notNull().default(0),
aiUsageLimit: integer('ai_usage_limit').notNull().default(10), // 免费用户限制
// 时间戳
createdAt: timestamp('created_at').notNull().defaultNow(),
updatedAt: timestamp('updated_at').notNull().defaultNow().$onUpdateFn(() => new Date()),
}, (table) => ({
clerkIdIdx: index('users_clerk_id_idx').on(table.clerkId),
emailIdx: index('users_email_idx').on(table.email),
}))
// AI 对话会话表
export const conversations = pgTable('conversations', {
id: text('id').primaryKey().$defaultFn(() => createId()),
userId: text('user_id').notNull().references(() => users.id, { onDelete: 'cascade' }),
title: text('title').notNull(),
model: text('model').notNull(), // 使用的 AI 模型
provider: text('provider').notNull(), // AI 提供商 (openai, anthropic, google, xai)
// 会话设置
systemPrompt: text('system_prompt'),
temperature: decimal('temperature', { precision: 3, scale: 2 }).default('0.7'),
maxTokens: integer('max_tokens').default(4000),
// 元数据
messageCount: integer('message_count').notNull().default(0),
totalTokens: integer('total_tokens').notNull().default(0),
createdAt: timestamp('created_at').notNull().defaultNow(),
updatedAt: timestamp('updated_at').notNull().defaultNow().$onUpdateFn(() => new Date()),
}, (table) => ({
userIdIdx: index('conversations_user_id_idx').on(table.userId),
createdAtIdx: index('conversations_created_at_idx').on(table.createdAt),
}))
// AI 对话消息表
export const messages = pgTable('messages', {
id: text('id').primaryKey().$defaultFn(() => createId()),
conversationId: text('conversation_id').notNull().references(() => conversations.id, { onDelete: 'cascade' }),
role: text('role', { enum: ['user', 'assistant', 'system'] }).notNull(),
content: text('content').notNull(),
// Token 使用情况
tokenCount: integer('token_count'),
// 元数据
metadata: jsonb('metadata'), // 存储额外信息
createdAt: timestamp('created_at').notNull().defaultNow(),
}, (table) => ({
conversationIdIdx: index('messages_conversation_id_idx').on(table.conversationId),
createdAtIdx: index('messages_created_at_idx').on(table.createdAt),
}))
// 支付记录表
export const payments = pgTable('payments', {
id: text('id').primaryKey().$defaultFn(() => createId()),
userId: text('user_id').notNull().references(() => users.id, { onDelete: 'cascade' }),
// Stripe 相关
stripePaymentIntentId: text('stripe_payment_intent_id'),
stripeCustomerId: text('stripe_customer_id'),
stripeSubscriptionId: text('stripe_subscription_id'),
// 支付信息
amount: decimal('amount', { precision: 10, scale: 2 }).notNull(),
currency: text('currency').notNull().default('usd'),
status: text('status', {
enum: ['pending', 'succeeded', 'failed', 'canceled', 'refunded']
}).notNull(),
// 计划信息
planType: text('plan_type').notNull(),
billingPeriod: text('billing_period', { enum: ['monthly', 'yearly'] }),
createdAt: timestamp('created_at').notNull().defaultNow(),
updatedAt: timestamp('updated_at').notNull().defaultNow().$onUpdateFn(() => new Date()),
}, (table) => ({
userIdIdx: index('payments_user_id_idx').on(table.userId),
statusIdx: index('payments_status_idx').on(table.status),
stripeSubscriptionIdIdx: index('payments_stripe_subscription_id_idx').on(table.stripeSubscriptionId),
}))
// 文件上传表 (可选)
export const uploads = pgTable('uploads', {
id: text('id').primaryKey().$defaultFn(() => createId()),
userId: text('user_id').notNull().references(() => users.id, { onDelete: 'cascade' }),
// 文件信息
fileName: text('file_name').notNull(),
originalName: text('original_name').notNull(),
mimeType: text('mime_type').notNull(),
size: integer('size').notNull(),
// 存储信息
storageProvider: text('storage_provider').notNull().default('uploadthing'),
storageKey: text('storage_key').notNull(),
url: text('url').notNull(),
createdAt: timestamp('created_at').notNull().defaultNow(),
}, (table) => ({
userIdIdx: index('uploads_user_id_idx').on(table.userId),
storageKeyIdx: index('uploads_storage_key_idx').on(table.storageKey),
}))
// 系统配置表
export const systemConfig = pgTable('system_config', {
key: text('key').primaryKey(),
value: jsonb('value').notNull(),
description: text('description'),
updatedAt: timestamp('updated_at').notNull().defaultNow().$onUpdateFn(() => new Date()),
})关系定义
// src/lib/db/schema.ts (续)
import { relations } from 'drizzle-orm'
// 定义表关系
export const usersRelations = relations(users, ({ many }) => ({
conversations: many(conversations),
messages: many(messages),
payments: many(payments),
uploads: many(uploads),
}))
export const conversationsRelations = relations(conversations, ({ one, many }) => ({
user: one(users, {
fields: [conversations.userId],
references: [users.id],
}),
messages: many(messages),
}))
export const messagesRelations = relations(messages, ({ one }) => ({
conversation: one(conversations, {
fields: [messages.conversationId],
references: [conversations.id],
}),
}))
export const paymentsRelations = relations(payments, ({ one }) => ({
user: one(users, {
fields: [payments.userId],
references: [users.id],
}),
}))
export const uploadsRelations = relations(uploads, ({ one }) => ({
user: one(users, {
fields: [uploads.userId],
references: [users.id],
}),
}))数据库查询操作
基础查询函数
// src/lib/db/queries/users.ts
import { eq, desc, count, and, gte, lte } from 'drizzle-orm'
import { db } from '..'
import { users, conversations, messages } from '../schema'
// 根据 Clerk ID 获取用户
export async function getUserByClerkId(clerkId: string) {
const [user] = await db
.select()
.from(users)
.where(eq(users.clerkId, clerkId))
.limit(1)
return user
}
// 创建新用户
export async function createUser(data: {
clerkId: string
email: string
name?: string
imageUrl?: string
}) {
const [user] = await db
.insert(users)
.values({
clerkId: data.clerkId,
email: data.email,
name: data.name,
imageUrl: data.imageUrl,
})
.returning()
return user
}
// 更新用户信息
export async function updateUser(clerkId: string, data: Partial<typeof users.$inferInsert>) {
const [user] = await db
.update(users)
.set({ ...data, updatedAt: new Date() })
.where(eq(users.clerkId, clerkId))
.returning()
return user
}
// 增加用户 AI 使用次数
export async function incrementUserAIUsage(userId: string) {
const [user] = await db
.update(users)
.set({
aiUsageCount: sql`${users.aiUsageCount} + 1`,
updatedAt: new Date(),
})
.where(eq(users.id, userId))
.returning()
return user
}
// 获取用户统计信息
export async function getUserStats(userId: string) {
const [userStats] = await db
.select({
conversationCount: count(conversations.id),
messageCount: count(messages.id),
aiUsageCount: users.aiUsageCount,
aiUsageLimit: users.aiUsageLimit,
planType: users.planType,
})
.from(users)
.leftJoin(conversations, eq(conversations.userId, users.id))
.leftJoin(messages, eq(messages.conversationId, conversations.id))
.where(eq(users.id, userId))
.groupBy(users.id, users.aiUsageCount, users.aiUsageLimit, users.planType)
return userStats
}AI 对话相关查询
// src/lib/db/queries/conversations.ts
import { eq, desc, count, sql, and } from 'drizzle-orm'
import { db } from '..'
import { conversations, messages, users } from '../schema'
// 获取用户的对话列表
export async function getUserConversations(userId: string, page = 1, limit = 20) {
const offset = (page - 1) * limit
const conversationList = await db
.select({
id: conversations.id,
title: conversations.title,
model: conversations.model,
provider: conversations.provider,
messageCount: conversations.messageCount,
createdAt: conversations.createdAt,
updatedAt: conversations.updatedAt,
})
.from(conversations)
.where(eq(conversations.userId, userId))
.orderBy(desc(conversations.updatedAt))
.limit(limit)
.offset(offset)
// 获取总数
const [{ total }] = await db
.select({ total: count() })
.from(conversations)
.where(eq(conversations.userId, userId))
return {
conversations: conversationList,
total,
page,
limit,
totalPages: Math.ceil(total / limit),
}
}
// 创建新对话
export async function createConversation(data: {
userId: string
title: string
model: string
provider: string
systemPrompt?: string
temperature?: string
maxTokens?: number
}) {
const [conversation] = await db
.insert(conversations)
.values(data)
.returning()
return conversation
}
// 获取对话详情和消息
export async function getConversationWithMessages(conversationId: string, userId: string) {
// 获取对话信息
const [conversation] = await db
.select()
.from(conversations)
.where(and(
eq(conversations.id, conversationId),
eq(conversations.userId, userId)
))
.limit(1)
if (!conversation) {
return null
}
// 获取消息列表
const messageList = await db
.select()
.from(messages)
.where(eq(messages.conversationId, conversationId))
.orderBy(messages.createdAt)
return {
...conversation,
messages: messageList,
}
}
// 添加消息到对话
export async function addMessageToConversation(data: {
conversationId: string
role: 'user' | 'assistant' | 'system'
content: string
tokenCount?: number
metadata?: any
}) {
// 使用事务确保数据一致性
return await db.transaction(async (tx) => {
// 插入消息
const [message] = await tx
.insert(messages)
.values(data)
.returning()
// 更新对话统计
await tx
.update(conversations)
.set({
messageCount: sql`${conversations.messageCount} + 1`,
totalTokens: data.tokenCount
? sql`${conversations.totalTokens} + ${data.tokenCount}`
: conversations.totalTokens,
updatedAt: new Date(),
})
.where(eq(conversations.id, data.conversationId))
return message
})
}
// 删除对话
export async function deleteConversation(conversationId: string, userId: string) {
const [deleted] = await db
.delete(conversations)
.where(and(
eq(conversations.id, conversationId),
eq(conversations.userId, userId)
))
.returning({ id: conversations.id })
return !!deleted
}支付相关查询
// src/lib/db/queries/payments.ts
import { eq, desc, and, inArray } from 'drizzle-orm'
import { db } from '..'
import { payments, users } from '../schema'
// 创建支付记录
export async function createPayment(data: {
userId: string
stripePaymentIntentId?: string
stripeCustomerId?: string
stripeSubscriptionId?: string
amount: string
currency: string
status: string
planType: string
billingPeriod?: string
}) {
const [payment] = await db
.insert(payments)
.values(data)
.returning()
return payment
}
// 根据 Stripe 订阅 ID 获取支付记录
export async function getPaymentByStripeSubscriptionId(subscriptionId: string) {
const [payment] = await db
.select()
.from(payments)
.where(eq(payments.stripeSubscriptionId, subscriptionId))
.limit(1)
return payment
}
// 更新支付状态
export async function updatePaymentStatus(paymentId: string, status: string) {
const [payment] = await db
.update(payments)
.set({ status, updatedAt: new Date() })
.where(eq(payments.id, paymentId))
.returning()
return payment
}
// 获取用户的支付历史
export async function getUserPayments(userId: string, page = 1, limit = 20) {
const offset = (page - 1) * limit
const paymentList = await db
.select()
.from(payments)
.where(eq(payments.userId, userId))
.orderBy(desc(payments.createdAt))
.limit(limit)
.offset(offset)
return paymentList
}
// 获取活跃订阅
export async function getActiveSubscription(userId: string) {
const [subscription] = await db
.select()
.from(payments)
.where(and(
eq(payments.userId, userId),
inArray(payments.status, ['succeeded']),
eq(payments.planType, 'pro') // 或其他付费计划
))
.orderBy(desc(payments.createdAt))
.limit(1)
return subscription
}数据库迁移
生成和运行迁移
# 生成迁移文件
pnpm drizzle-kit generate
# 推送迁移到数据库
pnpm drizzle-kit push
# 查看数据库状态
pnpm drizzle-kit introspect
# 打开 Drizzle Studio (数据库管理界面)
pnpm drizzle-kit studio迁移文件示例
-- drizzle/0001_initial.sql
CREATE TABLE IF NOT EXISTS "users" (
"id" text PRIMARY KEY NOT NULL,
"clerk_id" text NOT NULL,
"email" text NOT NULL,
"name" text,
"image_url" text,
"role" text DEFAULT 'user' NOT NULL,
"subscription_id" text,
"subscription_status" text,
"plan_type" text DEFAULT 'free' NOT NULL,
"ai_usage_count" integer DEFAULT 0 NOT NULL,
"ai_usage_limit" integer DEFAULT 10 NOT NULL,
"created_at" timestamp DEFAULT now() NOT NULL,
"updated_at" timestamp DEFAULT now() NOT NULL,
CONSTRAINT "users_clerk_id_unique" UNIQUE("clerk_id"),
CONSTRAINT "users_email_unique" UNIQUE("email")
);
CREATE INDEX IF NOT EXISTS "users_clerk_id_idx" ON "users" ("clerk_id");
CREATE INDEX IF NOT EXISTS "users_email_idx" ON "users" ("email");类型安全和验证
数据类型推断
// src/lib/db/types.ts
import type {
users,
conversations,
messages,
payments
} from './schema'
// 推断插入类型
export type NewUser = typeof users.$inferInsert
export type NewConversation = typeof conversations.$inferInsert
export type NewMessage = typeof messages.$inferInsert
export type NewPayment = typeof payments.$inferInsert
// 推断选择类型
export type User = typeof users.$inferSelect
export type Conversation = typeof conversations.$inferSelect
export type Message = typeof messages.$inferSelect
export type Payment = typeof payments.$inferSelect
// 组合类型
export type ConversationWithMessages = Conversation & {
messages: Message[]
}
export type UserWithStats = User & {
conversationCount: number
messageCount: number
}数据验证
// src/lib/validations/database.ts
import { z } from 'zod'
// 用户验证模式
export const createUserSchema = z.object({
clerkId: z.string().min(1),
email: z.string().email(),
name: z.string().optional(),
imageUrl: z.string().url().optional(),
})
// 对话验证模式
export const createConversationSchema = z.object({
title: z.string().min(1).max(100),
model: z.string().min(1),
provider: z.enum(['openai', 'anthropic', 'google', 'xai']),
systemPrompt: z.string().optional(),
temperature: z.coerce.number().min(0).max(2).optional(),
maxTokens: z.number().int().min(1).max(8000).optional(),
})
// 消息验证模式
export const createMessageSchema = z.object({
conversationId: z.string().min(1),
role: z.enum(['user', 'assistant', 'system']),
content: z.string().min(1),
tokenCount: z.number().int().positive().optional(),
metadata: z.record(z.any()).optional(),
})性能优化
查询优化
// 使用索引优化查询
export async function getRecentConversations(userId: string) {
return await db
.select({
id: conversations.id,
title: conversations.title,
updatedAt: conversations.updatedAt,
})
.from(conversations)
.where(eq(conversations.userId, userId))
.orderBy(desc(conversations.updatedAt)) // 利用索引
.limit(10)
}
// 批量操作
export async function batchCreateMessages(messageData: NewMessage[]) {
return await db
.insert(messages)
.values(messageData)
.returning()
}
// 使用子查询优化复杂查询
export async function getUsersWithRecentActivity() {
const recentActivity = db
.select({
userId: conversations.userId,
lastActivity: max(conversations.updatedAt).as('lastActivity'),
})
.from(conversations)
.groupBy(conversations.userId)
.as('recentActivity')
return await db
.select({
id: users.id,
name: users.name,
email: users.email,
lastActivity: recentActivity.lastActivity,
})
.from(users)
.innerJoin(recentActivity, eq(users.id, recentActivity.userId))
.orderBy(desc(recentActivity.lastActivity))
}连接池管理
// src/lib/db/connection.ts
import postgres from 'postgres'
import { env } from '@/env'
const connectionConfig = {
host: env.DB_HOST,
port: env.DB_PORT,
database: env.DB_NAME,
username: env.DB_USER,
password: env.DB_PASSWORD,
// 连接池配置
max: 20, // 最大连接数
idle_timeout: 20, // 空闲超时 (秒)
connect_timeout: 10, // 连接超时 (秒)
// SSL 配置
ssl: env.NODE_ENV === 'production' ? { rejectUnauthorized: false } : false,
// 其他优化
prepare: false, // 禁用预处理语句 (Neon 兼容)
transform: postgres.camel, // 自动转换为驼峰命名
}
export const client = postgres(env.DATABASE_URL, connectionConfig)测试策略
数据库测试
// tests/db/users.test.ts
import { describe, it, expect, beforeEach, afterEach } from 'vitest'
import { db } from '@/lib/db'
import { users } from '@/lib/db/schema'
import { createUser, getUserByClerkId } from '@/lib/db/queries/users'
describe('User Database Operations', () => {
beforeEach(async () => {
// 清理测试数据
await db.delete(users).where(eq(users.email, 'test@example.com'))
})
afterEach(async () => {
// 测试后清理
await db.delete(users).where(eq(users.email, 'test@example.com'))
})
it('should create a new user', async () => {
const userData = {
clerkId: 'clerk_test_123',
email: 'test@example.com',
name: 'Test User',
}
const user = await createUser(userData)
expect(user).toBeDefined()
expect(user.email).toBe(userData.email)
expect(user.clerkId).toBe(userData.clerkId)
})
it('should retrieve user by clerk ID', async () => {
const userData = {
clerkId: 'clerk_test_456',
email: 'test@example.com',
name: 'Test User',
}
await createUser(userData)
const user = await getUserByClerkId(userData.clerkId)
expect(user).toBeDefined()
expect(user?.email).toBe(userData.email)
})
})最佳实践总结
1. 模式设计原则
- 使用有意义的表名和列名
- 添加适当的索引优化查询性能
- 使用外键约束确保数据完整性
- 合理使用 JSONB 存储灵活的元数据
2. 查询优化
- 利用 Drizzle 的类型安全查询构建器
- 使用索引优化常用查询
- 批量操作提高性能
- 使用事务确保数据一致性
3. 类型安全
- 充分利用 TypeScript 类型推断
- 使用 Zod 验证输入数据
- 定义清晰的数据类型和接口
- 使用关系定义简化连表查询
4. 错误处理
- 实现适当的错误处理和日志记录
- 使用事务处理复杂操作
- 处理数据库连接失败情况
- 验证和清理用户输入
5. 性能监控
- 监控查询性能和连接池状态
- 定期分析慢查询
- 使用 Drizzle Studio 进行数据管理
- 实施适当的缓存策略
这个数据库开发指南为构建可扩展、类型安全的 AI SaaS 应用提供了完整的数据层解决方案,结合了 Drizzle ORM 的现代化特性和 PostgreSQL 的强大功能。