AI SaaS Template Docs
AI SaaS Template Docs
AI SaaS Template简介快速开始项目架构
配置管理
项目结构
数据库开发指南
API 开发指南
认证系统
文件管理
支付和账单
自定义主题

数据库开发指南

本指南详细介绍了 AI SaaS Template 中基于 Drizzle ORM 和 PostgreSQL 的数据库开发模式,包括模式设计、数据操作、类型安全查询和最佳实践。

技术栈概述

核心技术

  • 数据库: PostgreSQL (推荐 Neon/Supabase)
  • ORM: Drizzle ORM - 类型安全的轻量级 ORM
  • 迁移工具: Drizzle Kit - 数据库迁移和管理
  • 连接方式: HTTP/WebSocket 连接 (适合 serverless)
  • 类型安全: 完全的 TypeScript 类型推断

项目数据库结构

src/lib/db/
├── index.ts                # 数据库连接和配置
├── schema.ts              # 数据库表结构定义
├── types.ts               # 数据库相关类型
├── migrations/            # 数据库迁移文件
└── queries/               # 复用的查询函数
    ├── users.ts          # 用户相关查询
    ├── payments.ts       # 支付相关查询
    └── index.ts          # 查询函数导出

数据库连接配置

数据库连接设置

// src/lib/db/index.ts
import { drizzle } from 'drizzle-orm/postgres-js'
import postgres from 'postgres'
import { env } from '@/env'
import * as schema from './schema'

// 创建数据库连接
const client = postgres(env.DATABASE_URL, {
  prepare: false, // 禁用预处理语句 (Neon 兼容性)
  max: 20,        // 最大连接数
})

// 创建 Drizzle 实例
export const db = drizzle(client, { 
  schema,
  logger: env.NODE_ENV === 'development', // 开发环境启用日志
})

// 导出数据库类型
export type Database = typeof db
export * from './schema'
export * from './queries'

Drizzle 配置文件

// drizzle.config.ts
import { defineConfig } from 'drizzle-kit'
import { env } from './env'

export default defineConfig({
  schema: './src/lib/db/schema.ts',
  out: './drizzle',
  dialect: 'postgresql',
  dbCredentials: {
    url: env.DATABASE_URL,
  },
  verbose: true,
  strict: true,
})

数据库模式设计

核心数据表

// src/lib/db/schema.ts
import { 
  pgTable, 
  text, 
  timestamp, 
  boolean, 
  integer, 
  decimal,
  jsonb,
  varchar,
  unique,
  index,
} from 'drizzle-orm/pg-core'
import { createId } from '@paralleldrive/cuid2'

// 用户表
export const users = pgTable('users', {
  id: text('id').primaryKey().$defaultFn(() => createId()),
  clerkId: text('clerk_id').notNull().unique(), // Clerk 用户 ID
  email: text('email').notNull().unique(),
  name: text('name'),
  imageUrl: text('image_url'),
  role: text('role', { enum: ['user', 'admin'] }).notNull().default('user'),
  
  // 订阅信息
  subscriptionId: text('subscription_id'),
  subscriptionStatus: text('subscription_status'),
  planType: text('plan_type', { enum: ['free', 'basic', 'pro'] }).notNull().default('free'),
  
  // AI 使用统计
  aiUsageCount: integer('ai_usage_count').notNull().default(0),
  aiUsageLimit: integer('ai_usage_limit').notNull().default(10), // 免费用户限制
  
  // 时间戳
  createdAt: timestamp('created_at').notNull().defaultNow(),
  updatedAt: timestamp('updated_at').notNull().defaultNow().$onUpdateFn(() => new Date()),
}, (table) => ({
  clerkIdIdx: index('users_clerk_id_idx').on(table.clerkId),
  emailIdx: index('users_email_idx').on(table.email),
}))

// AI 对话会话表
export const conversations = pgTable('conversations', {
  id: text('id').primaryKey().$defaultFn(() => createId()),
  userId: text('user_id').notNull().references(() => users.id, { onDelete: 'cascade' }),
  title: text('title').notNull(),
  model: text('model').notNull(), // 使用的 AI 模型
  provider: text('provider').notNull(), // AI 提供商 (openai, anthropic, google, xai)
  
  // 会话设置
  systemPrompt: text('system_prompt'),
  temperature: decimal('temperature', { precision: 3, scale: 2 }).default('0.7'),
  maxTokens: integer('max_tokens').default(4000),
  
  // 元数据
  messageCount: integer('message_count').notNull().default(0),
  totalTokens: integer('total_tokens').notNull().default(0),
  
  createdAt: timestamp('created_at').notNull().defaultNow(),
  updatedAt: timestamp('updated_at').notNull().defaultNow().$onUpdateFn(() => new Date()),
}, (table) => ({
  userIdIdx: index('conversations_user_id_idx').on(table.userId),
  createdAtIdx: index('conversations_created_at_idx').on(table.createdAt),
}))

// AI 对话消息表
export const messages = pgTable('messages', {
  id: text('id').primaryKey().$defaultFn(() => createId()),
  conversationId: text('conversation_id').notNull().references(() => conversations.id, { onDelete: 'cascade' }),
  role: text('role', { enum: ['user', 'assistant', 'system'] }).notNull(),
  content: text('content').notNull(),
  
  // Token 使用情况
  tokenCount: integer('token_count'),
  
  // 元数据
  metadata: jsonb('metadata'), // 存储额外信息
  
  createdAt: timestamp('created_at').notNull().defaultNow(),
}, (table) => ({
  conversationIdIdx: index('messages_conversation_id_idx').on(table.conversationId),
  createdAtIdx: index('messages_created_at_idx').on(table.createdAt),
}))

// 支付记录表
export const payments = pgTable('payments', {
  id: text('id').primaryKey().$defaultFn(() => createId()),
  userId: text('user_id').notNull().references(() => users.id, { onDelete: 'cascade' }),
  
  // Stripe 相关
  stripePaymentIntentId: text('stripe_payment_intent_id'),
  stripeCustomerId: text('stripe_customer_id'),
  stripeSubscriptionId: text('stripe_subscription_id'),
  
  // 支付信息
  amount: decimal('amount', { precision: 10, scale: 2 }).notNull(),
  currency: text('currency').notNull().default('usd'),
  status: text('status', { 
    enum: ['pending', 'succeeded', 'failed', 'canceled', 'refunded'] 
  }).notNull(),
  
  // 计划信息
  planType: text('plan_type').notNull(),
  billingPeriod: text('billing_period', { enum: ['monthly', 'yearly'] }),
  
  createdAt: timestamp('created_at').notNull().defaultNow(),
  updatedAt: timestamp('updated_at').notNull().defaultNow().$onUpdateFn(() => new Date()),
}, (table) => ({
  userIdIdx: index('payments_user_id_idx').on(table.userId),
  statusIdx: index('payments_status_idx').on(table.status),
  stripeSubscriptionIdIdx: index('payments_stripe_subscription_id_idx').on(table.stripeSubscriptionId),
}))

// 文件上传表 (可选)
export const uploads = pgTable('uploads', {
  id: text('id').primaryKey().$defaultFn(() => createId()),
  userId: text('user_id').notNull().references(() => users.id, { onDelete: 'cascade' }),
  
  // 文件信息
  fileName: text('file_name').notNull(),
  originalName: text('original_name').notNull(),
  mimeType: text('mime_type').notNull(),
  size: integer('size').notNull(),
  
  // 存储信息
  storageProvider: text('storage_provider').notNull().default('uploadthing'),
  storageKey: text('storage_key').notNull(),
  url: text('url').notNull(),
  
  createdAt: timestamp('created_at').notNull().defaultNow(),
}, (table) => ({
  userIdIdx: index('uploads_user_id_idx').on(table.userId),
  storageKeyIdx: index('uploads_storage_key_idx').on(table.storageKey),
}))

// 系统配置表
export const systemConfig = pgTable('system_config', {
  key: text('key').primaryKey(),
  value: jsonb('value').notNull(),
  description: text('description'),
  updatedAt: timestamp('updated_at').notNull().defaultNow().$onUpdateFn(() => new Date()),
})

关系定义

// src/lib/db/schema.ts (续)
import { relations } from 'drizzle-orm'

// 定义表关系
export const usersRelations = relations(users, ({ many }) => ({
  conversations: many(conversations),
  messages: many(messages),
  payments: many(payments),
  uploads: many(uploads),
}))

export const conversationsRelations = relations(conversations, ({ one, many }) => ({
  user: one(users, {
    fields: [conversations.userId],
    references: [users.id],
  }),
  messages: many(messages),
}))

export const messagesRelations = relations(messages, ({ one }) => ({
  conversation: one(conversations, {
    fields: [messages.conversationId],
    references: [conversations.id],
  }),
}))

export const paymentsRelations = relations(payments, ({ one }) => ({
  user: one(users, {
    fields: [payments.userId],
    references: [users.id],
  }),
}))

export const uploadsRelations = relations(uploads, ({ one }) => ({
  user: one(users, {
    fields: [uploads.userId],
    references: [users.id],
  }),
}))

数据库查询操作

基础查询函数

// src/lib/db/queries/users.ts
import { eq, desc, count, and, gte, lte } from 'drizzle-orm'
import { db } from '..'
import { users, conversations, messages } from '../schema'

// 根据 Clerk ID 获取用户
export async function getUserByClerkId(clerkId: string) {
  const [user] = await db
    .select()
    .from(users)
    .where(eq(users.clerkId, clerkId))
    .limit(1)
  
  return user
}

// 创建新用户
export async function createUser(data: {
  clerkId: string
  email: string
  name?: string
  imageUrl?: string
}) {
  const [user] = await db
    .insert(users)
    .values({
      clerkId: data.clerkId,
      email: data.email,
      name: data.name,
      imageUrl: data.imageUrl,
    })
    .returning()
  
  return user
}

// 更新用户信息
export async function updateUser(clerkId: string, data: Partial<typeof users.$inferInsert>) {
  const [user] = await db
    .update(users)
    .set({ ...data, updatedAt: new Date() })
    .where(eq(users.clerkId, clerkId))
    .returning()
  
  return user
}

// 增加用户 AI 使用次数
export async function incrementUserAIUsage(userId: string) {
  const [user] = await db
    .update(users)
    .set({ 
      aiUsageCount: sql`${users.aiUsageCount} + 1`,
      updatedAt: new Date(),
    })
    .where(eq(users.id, userId))
    .returning()
  
  return user
}

// 获取用户统计信息
export async function getUserStats(userId: string) {
  const [userStats] = await db
    .select({
      conversationCount: count(conversations.id),
      messageCount: count(messages.id),
      aiUsageCount: users.aiUsageCount,
      aiUsageLimit: users.aiUsageLimit,
      planType: users.planType,
    })
    .from(users)
    .leftJoin(conversations, eq(conversations.userId, users.id))
    .leftJoin(messages, eq(messages.conversationId, conversations.id))
    .where(eq(users.id, userId))
    .groupBy(users.id, users.aiUsageCount, users.aiUsageLimit, users.planType)
  
  return userStats
}

AI 对话相关查询

// src/lib/db/queries/conversations.ts
import { eq, desc, count, sql, and } from 'drizzle-orm'
import { db } from '..'
import { conversations, messages, users } from '../schema'

// 获取用户的对话列表
export async function getUserConversations(userId: string, page = 1, limit = 20) {
  const offset = (page - 1) * limit
  
  const conversationList = await db
    .select({
      id: conversations.id,
      title: conversations.title,
      model: conversations.model,
      provider: conversations.provider,
      messageCount: conversations.messageCount,
      createdAt: conversations.createdAt,
      updatedAt: conversations.updatedAt,
    })
    .from(conversations)
    .where(eq(conversations.userId, userId))
    .orderBy(desc(conversations.updatedAt))
    .limit(limit)
    .offset(offset)
  
  // 获取总数
  const [{ total }] = await db
    .select({ total: count() })
    .from(conversations)
    .where(eq(conversations.userId, userId))
  
  return {
    conversations: conversationList,
    total,
    page,
    limit,
    totalPages: Math.ceil(total / limit),
  }
}

// 创建新对话
export async function createConversation(data: {
  userId: string
  title: string
  model: string
  provider: string
  systemPrompt?: string
  temperature?: string
  maxTokens?: number
}) {
  const [conversation] = await db
    .insert(conversations)
    .values(data)
    .returning()
  
  return conversation
}

// 获取对话详情和消息
export async function getConversationWithMessages(conversationId: string, userId: string) {
  // 获取对话信息
  const [conversation] = await db
    .select()
    .from(conversations)
    .where(and(
      eq(conversations.id, conversationId),
      eq(conversations.userId, userId)
    ))
    .limit(1)
  
  if (!conversation) {
    return null
  }
  
  // 获取消息列表
  const messageList = await db
    .select()
    .from(messages)
    .where(eq(messages.conversationId, conversationId))
    .orderBy(messages.createdAt)
  
  return {
    ...conversation,
    messages: messageList,
  }
}

// 添加消息到对话
export async function addMessageToConversation(data: {
  conversationId: string
  role: 'user' | 'assistant' | 'system'
  content: string
  tokenCount?: number
  metadata?: any
}) {
  // 使用事务确保数据一致性
  return await db.transaction(async (tx) => {
    // 插入消息
    const [message] = await tx
      .insert(messages)
      .values(data)
      .returning()
    
    // 更新对话统计
    await tx
      .update(conversations)
      .set({
        messageCount: sql`${conversations.messageCount} + 1`,
        totalTokens: data.tokenCount 
          ? sql`${conversations.totalTokens} + ${data.tokenCount}`
          : conversations.totalTokens,
        updatedAt: new Date(),
      })
      .where(eq(conversations.id, data.conversationId))
    
    return message
  })
}

// 删除对话
export async function deleteConversation(conversationId: string, userId: string) {
  const [deleted] = await db
    .delete(conversations)
    .where(and(
      eq(conversations.id, conversationId),
      eq(conversations.userId, userId)
    ))
    .returning({ id: conversations.id })
  
  return !!deleted
}

支付相关查询

// src/lib/db/queries/payments.ts
import { eq, desc, and, inArray } from 'drizzle-orm'
import { db } from '..'
import { payments, users } from '../schema'

// 创建支付记录
export async function createPayment(data: {
  userId: string
  stripePaymentIntentId?: string
  stripeCustomerId?: string
  stripeSubscriptionId?: string
  amount: string
  currency: string
  status: string
  planType: string
  billingPeriod?: string
}) {
  const [payment] = await db
    .insert(payments)
    .values(data)
    .returning()
  
  return payment
}

// 根据 Stripe 订阅 ID 获取支付记录
export async function getPaymentByStripeSubscriptionId(subscriptionId: string) {
  const [payment] = await db
    .select()
    .from(payments)
    .where(eq(payments.stripeSubscriptionId, subscriptionId))
    .limit(1)
  
  return payment
}

// 更新支付状态
export async function updatePaymentStatus(paymentId: string, status: string) {
  const [payment] = await db
    .update(payments)
    .set({ status, updatedAt: new Date() })
    .where(eq(payments.id, paymentId))
    .returning()
  
  return payment
}

// 获取用户的支付历史
export async function getUserPayments(userId: string, page = 1, limit = 20) {
  const offset = (page - 1) * limit
  
  const paymentList = await db
    .select()
    .from(payments)
    .where(eq(payments.userId, userId))
    .orderBy(desc(payments.createdAt))
    .limit(limit)
    .offset(offset)
  
  return paymentList
}

// 获取活跃订阅
export async function getActiveSubscription(userId: string) {
  const [subscription] = await db
    .select()
    .from(payments)
    .where(and(
      eq(payments.userId, userId),
      inArray(payments.status, ['succeeded']),
      eq(payments.planType, 'pro') // 或其他付费计划
    ))
    .orderBy(desc(payments.createdAt))
    .limit(1)
  
  return subscription
}

数据库迁移

生成和运行迁移

# 生成迁移文件
pnpm drizzle-kit generate

# 推送迁移到数据库
pnpm drizzle-kit push

# 查看数据库状态
pnpm drizzle-kit introspect

# 打开 Drizzle Studio (数据库管理界面)
pnpm drizzle-kit studio

迁移文件示例

-- drizzle/0001_initial.sql
CREATE TABLE IF NOT EXISTS "users" (
	"id" text PRIMARY KEY NOT NULL,
	"clerk_id" text NOT NULL,
	"email" text NOT NULL,
	"name" text,
	"image_url" text,
	"role" text DEFAULT 'user' NOT NULL,
	"subscription_id" text,
	"subscription_status" text,
	"plan_type" text DEFAULT 'free' NOT NULL,
	"ai_usage_count" integer DEFAULT 0 NOT NULL,
	"ai_usage_limit" integer DEFAULT 10 NOT NULL,
	"created_at" timestamp DEFAULT now() NOT NULL,
	"updated_at" timestamp DEFAULT now() NOT NULL,
	CONSTRAINT "users_clerk_id_unique" UNIQUE("clerk_id"),
	CONSTRAINT "users_email_unique" UNIQUE("email")
);

CREATE INDEX IF NOT EXISTS "users_clerk_id_idx" ON "users" ("clerk_id");
CREATE INDEX IF NOT EXISTS "users_email_idx" ON "users" ("email");

类型安全和验证

数据类型推断

// src/lib/db/types.ts
import type { 
  users, 
  conversations, 
  messages, 
  payments 
} from './schema'

// 推断插入类型
export type NewUser = typeof users.$inferInsert
export type NewConversation = typeof conversations.$inferInsert
export type NewMessage = typeof messages.$inferInsert
export type NewPayment = typeof payments.$inferInsert

// 推断选择类型
export type User = typeof users.$inferSelect
export type Conversation = typeof conversations.$inferSelect
export type Message = typeof messages.$inferSelect
export type Payment = typeof payments.$inferSelect

// 组合类型
export type ConversationWithMessages = Conversation & {
  messages: Message[]
}

export type UserWithStats = User & {
  conversationCount: number
  messageCount: number
}

数据验证

// src/lib/validations/database.ts
import { z } from 'zod'

// 用户验证模式
export const createUserSchema = z.object({
  clerkId: z.string().min(1),
  email: z.string().email(),
  name: z.string().optional(),
  imageUrl: z.string().url().optional(),
})

// 对话验证模式
export const createConversationSchema = z.object({
  title: z.string().min(1).max(100),
  model: z.string().min(1),
  provider: z.enum(['openai', 'anthropic', 'google', 'xai']),
  systemPrompt: z.string().optional(),
  temperature: z.coerce.number().min(0).max(2).optional(),
  maxTokens: z.number().int().min(1).max(8000).optional(),
})

// 消息验证模式
export const createMessageSchema = z.object({
  conversationId: z.string().min(1),
  role: z.enum(['user', 'assistant', 'system']),
  content: z.string().min(1),
  tokenCount: z.number().int().positive().optional(),
  metadata: z.record(z.any()).optional(),
})

性能优化

查询优化

// 使用索引优化查询
export async function getRecentConversations(userId: string) {
  return await db
    .select({
      id: conversations.id,
      title: conversations.title,
      updatedAt: conversations.updatedAt,
    })
    .from(conversations)
    .where(eq(conversations.userId, userId))
    .orderBy(desc(conversations.updatedAt)) // 利用索引
    .limit(10)
}

// 批量操作
export async function batchCreateMessages(messageData: NewMessage[]) {
  return await db
    .insert(messages)
    .values(messageData)
    .returning()
}

// 使用子查询优化复杂查询
export async function getUsersWithRecentActivity() {
  const recentActivity = db
    .select({
      userId: conversations.userId,
      lastActivity: max(conversations.updatedAt).as('lastActivity'),
    })
    .from(conversations)
    .groupBy(conversations.userId)
    .as('recentActivity')
  
  return await db
    .select({
      id: users.id,
      name: users.name,
      email: users.email,
      lastActivity: recentActivity.lastActivity,
    })
    .from(users)
    .innerJoin(recentActivity, eq(users.id, recentActivity.userId))
    .orderBy(desc(recentActivity.lastActivity))
}

连接池管理

// src/lib/db/connection.ts
import postgres from 'postgres'
import { env } from '@/env'

const connectionConfig = {
  host: env.DB_HOST,
  port: env.DB_PORT,
  database: env.DB_NAME,
  username: env.DB_USER,
  password: env.DB_PASSWORD,
  
  // 连接池配置
  max: 20,                    // 最大连接数
  idle_timeout: 20,           // 空闲超时 (秒)
  connect_timeout: 10,        // 连接超时 (秒)
  
  // SSL 配置
  ssl: env.NODE_ENV === 'production' ? { rejectUnauthorized: false } : false,
  
  // 其他优化
  prepare: false,             // 禁用预处理语句 (Neon 兼容)
  transform: postgres.camel,  // 自动转换为驼峰命名
}

export const client = postgres(env.DATABASE_URL, connectionConfig)

测试策略

数据库测试

// tests/db/users.test.ts
import { describe, it, expect, beforeEach, afterEach } from 'vitest'
import { db } from '@/lib/db'
import { users } from '@/lib/db/schema'
import { createUser, getUserByClerkId } from '@/lib/db/queries/users'

describe('User Database Operations', () => {
  beforeEach(async () => {
    // 清理测试数据
    await db.delete(users).where(eq(users.email, 'test@example.com'))
  })
  
  afterEach(async () => {
    // 测试后清理
    await db.delete(users).where(eq(users.email, 'test@example.com'))
  })
  
  it('should create a new user', async () => {
    const userData = {
      clerkId: 'clerk_test_123',
      email: 'test@example.com',
      name: 'Test User',
    }
    
    const user = await createUser(userData)
    
    expect(user).toBeDefined()
    expect(user.email).toBe(userData.email)
    expect(user.clerkId).toBe(userData.clerkId)
  })
  
  it('should retrieve user by clerk ID', async () => {
    const userData = {
      clerkId: 'clerk_test_456',
      email: 'test@example.com',
      name: 'Test User',
    }
    
    await createUser(userData)
    const user = await getUserByClerkId(userData.clerkId)
    
    expect(user).toBeDefined()
    expect(user?.email).toBe(userData.email)
  })
})

最佳实践总结

1. 模式设计原则

  • 使用有意义的表名和列名
  • 添加适当的索引优化查询性能
  • 使用外键约束确保数据完整性
  • 合理使用 JSONB 存储灵活的元数据

2. 查询优化

  • 利用 Drizzle 的类型安全查询构建器
  • 使用索引优化常用查询
  • 批量操作提高性能
  • 使用事务确保数据一致性

3. 类型安全

  • 充分利用 TypeScript 类型推断
  • 使用 Zod 验证输入数据
  • 定义清晰的数据类型和接口
  • 使用关系定义简化连表查询

4. 错误处理

  • 实现适当的错误处理和日志记录
  • 使用事务处理复杂操作
  • 处理数据库连接失败情况
  • 验证和清理用户输入

5. 性能监控

  • 监控查询性能和连接池状态
  • 定期分析慢查询
  • 使用 Drizzle Studio 进行数据管理
  • 实施适当的缓存策略

这个数据库开发指南为构建可扩展、类型安全的 AI SaaS 应用提供了完整的数据层解决方案,结合了 Drizzle ORM 的现代化特性和 PostgreSQL 的强大功能。

On this page

技术栈概述核心技术项目数据库结构数据库连接配置数据库连接设置Drizzle 配置文件数据库模式设计核心数据表关系定义数据库查询操作基础查询函数AI 对话相关查询支付相关查询数据库迁移生成和运行迁移迁移文件示例类型安全和验证数据类型推断数据验证性能优化查询优化连接池管理测试策略数据库测试最佳实践总结1. 模式设计原则2. 查询优化3. 类型安全4. 错误处理5. 性能监控