使用 Supabase Realtime 构建实时竞价引擎
Supabase Realtime 建立在 PostgreSQL 的 Write-Ahead Log(WAL)之上,使用基于 Elixir 的服务器通过 WebSocket 将数据库更改推送到连接的客户端。对于拍卖系统,这意味着每个进入数据库的投标都会立即传播到所有观看该拍卖的竞价者。没有轮询。没有单独的 pub/sub 基础设施。你的数据库就是你的事件系统。
让我们从头开始构建一个。
目录
- 架构概述
- 数据库模式和设置
- PostgreSQL 触发器进行投标逻辑
- 使用 JavaScript 的客户端订阅
- 处理竞态条件和投标验证
- 活跃竞价者的存在跟踪
- 性能调优和生产注意事项
- Supabase Realtime 与其他方案对比
- 部署和扩展你的拍卖系统
- 常见问题
架构概述
在编写任何代码之前,让我们了解我们正在构建什么以及这些部分如何组合在一起。
Supabase Realtime 为你提供三个原语,完美地映射到拍卖要求:
- Postgres 更改:订阅你的投标和拍卖表上的 INSERT、UPDATE 和 DELETE 事件。当有人放置投标时,每个订阅者都会在几毫秒内获得新行数据。
- 广播:向频道参与者发送临时消息。非常适合不需要持久化的"你已被超越"通知。
- 存在:跟踪谁当前正在观看拍卖。这让你可以在 UI 中显示"14 个竞价者正在观看",并检测幽灵会话。
数据流看起来是这样的:
- 竞价者通过你的前端提交投标
- 一个 RPC 调用或直接插入命中你的
bids表 - 一个 PostgreSQL 触发器验证投标金额并更新
auctions.current_high_bid - Supabase Realtime 捕获 WAL 更改并将其推送给该拍卖频道的所有订阅者
- 第二个触发器触发广播事件以通知前高竞价者他们已被超越
- 每个连接的客户端实时更新其 UI
从投标放置到跨所有客户端的 UI 更新的延迟通常在 100 毫秒以下。我在 Supabase Pro 层的生产中测得 p99 约为 80-90 毫秒。
为什么不直接使用轮询?
我知道你们中有些人在想"我不能每 500ms 轮询一次吗?"可以。但在单个拍卖上有 200 个并发竞价者的情况下,每秒有 400 个请求击中你的数据库用于一个拍卖。将其乘以 50 个活跃拍卖,你就有 20,000 个每秒查询——其中大多数返回没有新内容。WebSocket 翻转这个模型:当没有变化时零查询,当有变化时即时更新。
数据库模式和设置
这是我使用的模式。它刻意很简单——你可以扩展它,但核心结构处理大多数拍卖类型。
-- 拍卖表
CREATE TABLE auctions (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
item_name TEXT NOT NULL,
description TEXT,
starting_price DECIMAL(12,2) NOT NULL DEFAULT 0,
current_high_bid DECIMAL(12,2) DEFAULT 0,
highest_bidder_id UUID REFERENCES auth.users(id),
min_increment DECIMAL(12,2) DEFAULT 1.00,
status TEXT NOT NULL DEFAULT 'active'
CHECK (status IN ('scheduled', 'active', 'ended', 'sold', 'cancelled')),
starts_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
ends_at TIMESTAMPTZ NOT NULL DEFAULT NOW() + INTERVAL '30 minutes',
created_at TIMESTAMPTZ DEFAULT NOW(),
updated_at TIMESTAMPTZ DEFAULT NOW()
);
-- 投标表
CREATE TABLE bids (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
auction_id UUID NOT NULL REFERENCES auctions(id) ON DELETE CASCADE,
user_id UUID NOT NULL REFERENCES auth.users(id),
amount DECIMAL(12,2) NOT NULL,
placed_at TIMESTAMPTZ DEFAULT NOW(),
CONSTRAINT positive_amount CHECK (amount > 0)
);
-- 用于快速每个拍卖的投标查找的索引
CREATE INDEX idx_bids_auction_amount ON bids(auction_id, amount DESC);
CREATE INDEX idx_bids_auction_time ON bids(auction_id, placed_at DESC);
-- 关键:启用副本标识以进行 Realtime
ALTER TABLE auctions REPLICA IDENTITY FULL;
ALTER TABLE bids REPLICA IDENTITY FULL;
REPLICA IDENTITY FULL 设置是必需的。没有它,Supabase Realtime 在 UPDATE 和 DELETE 事件上只获得主键——不是完整行数据。对于拍卖系统,你需要完整的有效负载,以便客户端可以更新投标金额而无需进行单独的查询。
启用 Realtime 复制
在 Supabase 仪表板中,前往数据库 → 复制并为 auctions 和 bids 表切换复制。或者,你可以使用 SQL 执行此操作:
BEGIN;
-- 移除现有发布(如果存在)
DROP PUBLICATION IF EXISTS supabase_realtime;
-- 使用两个表创建发布
CREATE PUBLICATION supabase_realtime FOR TABLE auctions, bids;
COMMIT;
行级安全
不要跳过这个。RLS 是你的服务端验证层。
ALTER TABLE auctions ENABLE ROW LEVEL SECURITY;
ALTER TABLE bids ENABLE ROW LEVEL SECURITY;
-- 任何人都可以查看活跃拍卖
CREATE POLICY "Public auction viewing" ON auctions
FOR SELECT USING (status IN ('active', 'ended', 'sold'));
-- 认证用户可以查看活跃拍卖上的所有投标
CREATE POLICY "View bids on active auctions" ON bids
FOR SELECT USING (
EXISTS (
SELECT 1 FROM auctions
WHERE auctions.id = bids.auction_id
AND auctions.status = 'active'
)
);
-- 只有认证用户可以放置投标
CREATE POLICY "Place bids" ON bids
FOR INSERT WITH CHECK (
auth.uid() = user_id
AND EXISTS (
SELECT 1 FROM auctions
WHERE auctions.id = auction_id
AND auctions.status = 'active'
AND auctions.ends_at > NOW()
)
);
PostgreSQL 触发器进行投标逻辑
这就是真正的魔法发生的地方。数据库在服务端强制执行所有投标逻辑——客户端无法作弊。
投标验证和拍卖更新触发器
CREATE OR REPLACE FUNCTION process_new_bid()
RETURNS TRIGGER AS $$
DECLARE
v_auction auctions%ROWTYPE;
BEGIN
-- 锁定拍卖行以防止竞态条件
SELECT * INTO v_auction
FROM auctions
WHERE id = NEW.auction_id
FOR UPDATE;
-- 验证拍卖是活跃的
IF v_auction.status != 'active' THEN
RAISE EXCEPTION 'Auction is not active';
END IF;
-- 验证拍卖未结束
IF v_auction.ends_at < NOW() THEN
RAISE EXCEPTION 'Auction has ended';
END IF;
-- 验证投标金额超过当前最高价 + 最小增量
IF NEW.amount < v_auction.current_high_bid + v_auction.min_increment THEN
RAISE EXCEPTION 'Bid must be at least % higher than current high bid of %',
v_auction.min_increment, v_auction.current_high_bid;
END IF;
-- 防止自我超越
IF v_auction.highest_bidder_id = NEW.user_id THEN
RAISE EXCEPTION 'You are already the highest bidder';
END IF;
-- 使用新的高投标更新拍卖
UPDATE auctions
SET
current_high_bid = NEW.amount,
highest_bidder_id = NEW.user_id,
updated_at = NOW()
WHERE id = NEW.auction_id;
RETURN NEW;
END;
$$ LANGUAGE plpgsql SECURITY DEFINER;
CREATE TRIGGER validate_and_process_bid
BEFORE INSERT ON bids
FOR EACH ROW
EXECUTE FUNCTION process_new_bid();
那个拍卖行上的 FOR UPDATE 锁是关键。没有它,两个同时到达的投标可能都读取相同的 current_high_bid,都通过验证,并都被插入。该锁序列化访问。
广播被超越通知
这个触发器在成功投标之后触发,并向拍卖频道发送临时通知:
CREATE OR REPLACE FUNCTION notify_outbid()
RETURNS TRIGGER AS $$
DECLARE
v_previous_bidder UUID;
BEGIN
-- 找到谁刚刚被超越
SELECT user_id INTO v_previous_bidder
FROM bids
WHERE auction_id = NEW.auction_id
AND id != NEW.id
ORDER BY amount DESC
LIMIT 1;
-- 如果有前高竞价者,广播被超越通知
IF v_previous_bidder IS NOT NULL THEN
PERFORM realtime.send(
jsonb_build_object(
'auction_id', NEW.auction_id,
'new_high', NEW.amount,
'outbid_user', v_previous_bidder,
'new_leader', NEW.user_id
),
'outbid',
'auction:' || NEW.auction_id::text,
true
);
END IF;
RETURN NULL;
END;
$$ LANGUAGE plpgsql SECURITY DEFINER;
CREATE TRIGGER after_bid_notify
AFTER INSERT ON bids
FOR EACH ROW
EXECUTE FUNCTION notify_outbid();
使用 JavaScript 的客户端订阅
现在让我们连接前端。我会用香草 JavaScript/React 模式展示这个——相同的方法适用于你是否使用 Next.js 或任何其他框架构建。
初始化客户端
import { createClient } from '@supabase/supabase-js';
const supabase = createClient(
process.env.NEXT_PUBLIC_SUPABASE_URL,
process.env.NEXT_PUBLIC_SUPABASE_ANON_KEY,
{
realtime: {
params: {
eventsPerSecond: 20 // 节流拍卖流量
}
}
}
);
那个 eventsPerSecond 参数很重要。在一个热拍卖上每秒有数十个投标,你不希望每秒重新渲染 50 次。每秒 20 个更新足以进行流畅的 UI 更新。
订阅拍卖频道
function subscribeToAuction(auctionId, callbacks) {
const channel = supabase.channel(`auction:${auctionId}`);
channel
// 通过 Postgres 更改监听新投标
.on('postgres_changes', {
event: 'INSERT',
schema: 'public',
table: 'bids',
filter: `auction_id=eq.${auctionId}`
}, (payload) => {
callbacks.onNewBid(payload.new);
})
// 监听拍卖状态变化
.on('postgres_changes', {
event: 'UPDATE',
schema: 'public',
table: 'auctions',
filter: `id=eq.${auctionId}`
}, (payload) => {
callbacks.onAuctionUpdate(payload.new);
})
// 监听被超越广播通知
.on('broadcast', { event: 'outbid' }, ({ payload }) => {
callbacks.onOutbid(payload);
})
// 通过存在跟踪活跃竞价者
.on('presence', { event: 'sync' }, () => {
const state = channel.presenceState();
const bidderCount = Object.keys(state).length;
callbacks.onPresenceUpdate(bidderCount, state);
})
.subscribe(async (status) => {
if (status === 'SUBSCRIBED') {
// 跟踪此用户的存在
await channel.track({
user_id: supabase.auth.getUser()?.data?.user?.id,
status: 'watching',
joined_at: new Date().toISOString()
});
}
});
return channel;
}
拍卖订阅的 React Hook
import { useState, useEffect, useCallback } from 'react';
function useAuction(auctionId) {
const [auction, setAuction] = useState(null);
const [bids, setBids] = useState([]);
const [bidderCount, setBidderCount] = useState(0);
const [isOutbid, setIsOutbid] = useState(false);
useEffect(() => {
// 获取初始状态
async function loadAuction() {
const { data: auctionData } = await supabase
.from('auctions')
.select('*')
.eq('id', auctionId)
.single();
setAuction(auctionData);
const { data: bidData } = await supabase
.from('bids')
.select('*')
.eq('auction_id', auctionId)
.order('amount', { ascending: false })
.limit(20);
setBids(bidData || []);
}
loadAuction();
// 订阅实时更新
const channel = subscribeToAuction(auctionId, {
onNewBid: (bid) => {
setBids(prev => [bid, ...prev].slice(0, 20));
setIsOutbid(false);
},
onAuctionUpdate: (updated) => setAuction(updated),
onOutbid: (payload) => {
const currentUser = supabase.auth.getUser()?.data?.user;
if (payload.outbid_user === currentUser?.id) {
setIsOutbid(true);
}
},
onPresenceUpdate: (count) => setBidderCount(count)
});
return () => {
supabase.removeChannel(channel);
};
}, [auctionId]);
const placeBid = useCallback(async (amount) => {
const user = (await supabase.auth.getUser()).data.user;
const { data, error } = await supabase
.from('bids')
.insert({
auction_id: auctionId,
amount: parseFloat(amount),
user_id: user.id
})
.select()
.single();
if (error) throw new Error(error.message);
return data;
}, [auctionId]);
return { auction, bids, bidderCount, isOutbid, placeBid };
}
处理竞态条件和投标验证
竞态条件是拍卖系统中最大的 bug 源。以下是我如何处理它们。
服务端:PostgreSQL 做繁重工作
我们触发函数中的 SELECT ... FOR UPDATE 是第一道防线。但还有另一个模式我开始使用——高争用拍卖的咨询锁:
CREATE OR REPLACE FUNCTION place_bid_safe(
p_auction_id UUID,
p_user_id UUID,
p_amount DECIMAL
)
RETURNS TABLE(bid_id UUID, new_high DECIMAL) AS $$
DECLARE
v_lock_key BIGINT;
v_bid_id UUID;
BEGIN
-- 从拍卖 UUID 生成确定性锁键
v_lock_key := ('x' || left(p_auction_id::text, 15))::bit(64)::bigint;
-- 获取咨询锁(阻止同一拍卖的并发投标)
PERFORM pg_advisory_xact_lock(v_lock_key);
-- 现在安全插入(触发器处理验证)
INSERT INTO bids (auction_id, user_id, amount)
VALUES (p_auction_id, p_user_id, p_amount)
RETURNING id INTO v_bid_id;
RETURN QUERY
SELECT v_bid_id, p_amount;
END;
$$ LANGUAGE plpgsql SECURITY DEFINER;
使用 Supabase 的 RPC 从客户端调用它:
const { data, error } = await supabase.rpc('place_bid_safe', {
p_auction_id: auctionId,
p_user_id: user.id,
p_amount: bidAmount
});
客户端:乐观 UI 带回滚
立即在 UI 中显示投标,但如果服务器拒绝它,准备好回滚:
async function handleBidSubmit(amount) {
const optimisticBid = {
id: crypto.randomUUID(),
amount,
user_id: user.id,
placed_at: new Date().toISOString(),
_optimistic: true
};
// 立即显示
setBids(prev => [optimisticBid, ...prev]);
try {
await placeBid(amount);
// 真实投标将通过 Realtime 到达并替换乐观投标
} catch (err) {
// 失败时移除乐观投标
setBids(prev => prev.filter(b => b.id !== optimisticBid.id));
showError(err.message);
}
}
活跃竞价者的存在跟踪
显示有多少人正在观看拍卖会造成紧迫感。Supabase 的存在跟踪非常简单:
// 当用户开始竞价时更新用户状态
async function updatePresenceStatus(channel, status) {
await channel.track({
user_id: user.id,
status, // 'watching', 'bidding', 'won'
last_active: new Date().toISOString()
});
}
在显示端,你可以分解存在状态以显示多少人正在活跃竞价与仅观看:
function parseBidderStats(presenceState) {
const users = Object.values(presenceState).flat();
return {
total: users.length,
bidding: users.filter(u => u.status === 'bidding').length,
watching: users.filter(u => u.status === 'watching').length
};
}
性能调优和生产注意事项
节流和防抖
竞价战可以每秒生成数十个事件。以下是我配置的内容:
- 服务端:Supabase 客户端配置上的
eventsPerSecond: 20 - 客户端:投标按钮防抖 300ms 以防止双击
- UI 更新:对投标列表动画使用
requestAnimationFrame
拍卖结束时间
不要相信客户端时钟。使用通过 pg_cron 的 PostgreSQL cron 作业:
-- 每 10 秒运行一次以关闭过期的拍卖
SELECT cron.schedule(
'close-expired-auctions',
'*/10 * * * * *',
$$
UPDATE auctions
SET status = CASE
WHEN highest_bidder_id IS NOT NULL THEN 'sold'
ELSE 'ended'
END
WHERE status = 'active'
AND ends_at <= NOW();
$$
);
防狙击延长
大多数拍卖平台在最后几秒内出现投标时延长截止日期:
-- 添加到 process_new_bid 触发器
IF v_auction.ends_at - NOW() < INTERVAL '30 seconds' THEN
UPDATE auctions
SET ends_at = ends_at + INTERVAL '30 seconds'
WHERE id = NEW.auction_id;
END IF;
Supabase Realtime 与其他方案对比
我在生产中使用过大多数这些。这是一个诚实的比较:
| 功能 | Supabase Realtime | Pusher | Ably | Firebase RTDB | Socket.io(自托管) |
|---|---|---|---|---|---|
| 原生 DB 同步 | ✅ PostgreSQL WAL | ❌ 单独服务 | ❌ 单独服务 | ✅ JSON 树 | ❌ 手动 |
| 延迟(p99) | ~80-100ms | ~60ms | ~50ms | ~100ms | ~40ms(取决于基础设施) |
| 最大事件/秒 | 200k+ | 10k(Pro) | 50k | 100k | 无限(你扩展它) |
| 身份验证集成 | 内置(RLS + JWT) | 自定义 | 基于令牌 | Firebase Auth | 自定义 |
| 存在 | ✅ 内置 | ✅ 内置 | ✅ 内置 | ✅ 内置 | ✅ 内置 |
| 免费层 | 500K MAU, 200 并发 | 100 个连接 | 600 万条消息/月 | 1GB 存储 | $0(托管成本) |
| Pro 定价 | $25/月 | $49/月 | $29/月 | 按使用付费 | ~$100-500/月(AWS) |
| 最适合 | 以 DB 为中心的实时应用 | 简单 pub/sub | 高可靠性 | 移动应用 | 完全控制 |
对于拍卖系统,Supabase 获胜,因为你的投标已经在 PostgreSQL 中。你不需要在数据库和单独的 pub/sub 系统之间同步。投标进入 DB,DB 触发 WebSocket 推送。一个真实来源。
如果你正在 headless CMS 架构上构建,Supabase 自然适配内容交付,而无需添加另一个要管理的服务。
部署和扩展你的拍卖系统
对于大多数项目,Supabase 的托管 Pro 层每月 $25 可以轻松处理高达 10,000 个每日拍卖。以下是要注意的事项:
- 连接限制:Pro 层为你提供 500 个并发 Realtime 连接。如果你需要更多,你需要升级或在客户端上实现连接池。
- WAL 大小:高流量竞价生成大量 WAL 流量。监控你的复制槽以避免磁盘膨胀。
- 频道数:每个拍卖获得自己的频道。有数千个活跃拍卖,测试你的客户端是否正确取消订阅已结束的拍卖。
对于使用 Astro 或 Next.js 构建的前端,Supabase JS 客户端工作相同——只需确保在客户端初始化它用于 Realtime 订阅。
如果你正在构建需要处理严肃规模的东西——数十万个并发竞价者——与我们联系。我们已经大规模构建了这些系统,可以帮助你避免陷阱。你也可以查看我们的 定价页面了解基于项目的参与。
常见问题
Supabase Realtime 可以处理多少并发竞价者? Supabase Realtime 在其托管平台上的分布式服务器上每秒可以处理 200,000 多个事件。每个项目的 Pro 层每月 $25 支持最多 500 个并发连接。对于更大的拍卖,Enterprise 层提供自定义限制,或者你可以在自己的基础设施上自托管 Realtime 服务器(它是开源的)。
Supabase Realtime 对实时拍卖够快吗? 是的。在我的测试中,从投标插入到客户端通知的端到端延迟平均约为 50-80 毫秒,p99 低于 100 毫秒。作为参考,人类反应时间约为 200-300 毫秒,所以投标看起来有效地是即时的。瓶颈很少是 Supabase——通常是客户端的网络连接。
当两个人同时竞价时,我如何防止竞态条件?
在触发函数内使用 PostgreSQL 的 SELECT ... FOR UPDATE 行级锁,或通过 pg_advisory_xact_lock() 使用咨询锁。这序列化每个拍卖的投标处理,以便一次只有一个投标被验证。"失败"的投标仍然被验证——它只是看到来自赢家的更新后的高投标,并要么成功(如果它仍然更高),要么失败,显示适当的错误。
我可以将 Supabase Realtime 与 Next.js 或 Astro 一起使用吗?
当然可以。@supabase/supabase-js 客户端在任何 JavaScript 环境中都可以工作。对于 Next.js,在客户端组件中初始化 Supabase 客户端(因为 Realtime 需要浏览器 WebSocket),并在 useEffect 钩子内使用它。对于 Astro,在客户端交互岛中使用它。无论你的框架选择如何,订阅代码都是相同的。
如果用户在拍卖进行中断开连接会发生什么? Supabase Realtime 自动尝试重新连接。当客户端重新连接并重新订阅时,它接收当前状态。对于关键拍卖,我建议在重新连接时也通过标准查询获取最新拍卖状态,以确保在断开连接窗口期间不会遗漏任何内容。存在系统将在超时后自动移除断开连接的用户。
我如何准确处理拍卖结束时间?
永远不要依赖客户端计时器来处理拍卖结束时间——它们可以被操纵。使用 PostgreSQL 的 pg_cron 扩展来检查和关闭过期的拍卖每 10 秒服务端。向客户端发送服务器时间戳,以便他们可以显示倒计时,但实际的结束确定始终在数据库中发生。
Supabase Realtime 对小项目是免费的吗? Supabase 的免费层包括 Realtime,最多 200 个并发连接和 500,000 个每月活跃用户。这足以用于爱好拍卖网站或 MVP。如果你运行具有有意义流量的生产拍卖平台,Pro 层每月 $25,外出费 $0.09/GB 是你想从哪里开始的地方。这明显比运行自己的 WebSocket 基础设施便宜。
我如何在本地测试实时竞价系统?
使用 Supabase CLI(supabase start)来运行启用了 Realtime 的本地 Supabase 实例。打开多个浏览器选项卡以模拟多个竞价者。对于负载测试,我使用一个简单的 Node.js 脚本,创建 100+ Supabase 客户端并让他们在计时器上相互竞价拍卖。这捕捉竞态条件并帮助你在投入生产前调优你的 eventsPerSecond 参数。