BoostServerTech Chat项目将每条消息存储在Redis中。作为一种内存数据存储,Rubén Pérez (@anarthal) 已意识到未来需要替换旧消息。以下是其原因及代码实现。
聊天消息具有特定的访问模式:仅追加、倒序读取(最新优先)、按房间划分。Redis流几乎完全符合这一要求。每个房间(聊天组)都是一个流。发送消息使用 XADD,读取历史使用 XREVRANGE。Redis为每个条目分配唯一的时间顺序ID,从而实现消息排序和基于游标的分页,无需进行模式迁移、索引决策或ORM。虽然SQL表也可以实现,但消息生成速度极快,绝大多数SQL数据库在这种插入密集的情况下会遇到瓶颈,需进行严重的性能调优,而Redis则可以原生处理。
存储消息的过程:用户发送消息时,服务器将其附加到房间的Redis流中。* 告诉Redis自动分配一个流ID:
// 构造请求
redis::request req;
for (const auto& msg : messages)
req.push("XADD", room_id, "*", "payload", serialize_redis_message(msg));
// 执行请求
redis::generic_response res;
error_code ec;
co_await conn_.async_exec(req, res, asio::redirect_error(ec));
值得注意的三点:
- 多个
XADD命令被推送到单个redis::request中,Boost.Redis通过一个连接进行管道化,即使客户端同时发送多个消息,也只需一次往返。 - 这是一个C++20协程,
co_await在Redis响应之前会挂起,但线程可以自由处理其他工作。 XADD接受任意数量的(键,值)字符串对;我们使用一个名为“payload”的单一键,里面存储序列化为JSON的消息,这允许任意嵌套。
无冗余的序列化:每条消息作为JSON有效载荷存储在流条目中。其数据结构如下:
struct redis_wire_message {
std::string_view content;
std::int64_t timestamp;
std::int64_t user_id;
};
BOOST_DESCRIBE_STRUCT(redis_wire_message, (), (content, timestamp, user_id))
该 BOOST_DESCRIBE_STRUCT 宏注册结构体成员以实现编译时反射。Boost.JSON会自动处理:boost::json::value_from(msg) 进行序列化,boost::json::try_value_to<redis_wire_message>(jv) 进行反序列化,避免手动编写 to_json/from_json 函数。
权衡:Redis将所有内容保存在内存中,这使其速度极快,但也是显而易见的问题。目前,服务器启用了Redis持久性,以保证数据在重启后依然存在。但随着消息量的增长,完全将历史保存在RAM中显得不合理。计划是将旧消息迁移到MySQL进行归档。消息层已经通过自身的服务接口隔离,因此引入分层存储策略(来自Redis的近期消息,来自MySQL的旧消息)只需触及一个组件,其他部分无需了解。
但“最终”这个过程充满了挑战。迁移边界有很多问题:是根据时间窗口迁移消息,还是根据计数阈值?是在读取时内联迁移,还是作为后台作业?当数据存在于两个地方时,基于游标的分页该如何处理?如果你构建了一个从快速短暂存储迁移到较慢持久存储的系统,是什么触发了迁移?你又遇到了哪些意外问题?Rubén非常希望听到实际有效的经验。
博主点评: 本文深入探讨了Redis流在聊天消息存储中的优势和未来的迁移策略,展示了如何利用Boost库简化序列化过程,解决高并发写入带来的挑战。同时,作者也很清晰地指出了内存存储的局限性及未来扩展的必要性,值得开发者关注。