需求分析
我们希望客户端在登录后,从服务器拉取聊天信息,并且展示。常规的设计中,客户端本地也会有一个数据库,缓存上一次获取的最后的聊天信息,如果客户端下线了,再次登录,只需要从服务器拉取未接受的数据即可。
所以综合考虑过后将需求列出
- 客户端本地数据库缓存已经接受的消息(以后再做)
- 客户端登录后,将本地数据的最大的消息id发送给服务器,服务器根据这个id去数据库查找,找到比这个id大的消息,将消息回传给客户端
- 客户端登录后,先加载旧的数据,再差异加载未读取的数据即可。
客户端本地数据库存储放在之后实现,所以我们客户端目前只发送消息id为0即可。
数据模型设计
- 消息唯一标识
- 在服务器端的 MySQL 表里,为每条消息分配一个全局唯一的自增主键(
message_id),再配合时间戳(created_at)。 - 客户端本地用同样的
message_id做主键,这样就能很方便地做增量同步与去重。
- 在服务器端的 MySQL 表里,为每条消息分配一个全局唯一的自增主键(
- 会话/用户维度的索引
- 如果支持多对多(群聊),再维护一个会话表(
thread_id)和用户—会话关联表。 - 查询和分页时,都按
(thread_id, message_id)或(thread_id, created_at)建复合索引,加速筛选。
- 如果支持多对多(群聊),再维护一个会话表(
同步流程
客户端登录时
从本地 SQLite 加载最近 N 条消息(按
message_id或时间倒序),渲染到界面。读取本地记录的「每个会话已同步到的最大
message_id」,发送给服务器:{"action": "fetch_messages","thread_id": 123,"since_id": 3456}
服务器端响应
- 查询
WHERE thread_id=123 AND message_id>3456 ORDER BY message_id ASC LIMIT 1000 - 返回消息列表(可以分页返回,大量时前端可循环拉取,或返回
has_more标记)。
- 查询
客户端接收并保存
- 将服务器返回的消息批量插入本地 SQLite,注意用「主键冲突忽略(
INSERT OR IGNORE)」防止重复。 - 更新本地「已同步最大
message_id」。
- 将服务器返回的消息批量插入本地 SQLite,注意用「主键冲突忽略(
后续聊天时
- 新消息既推到服务器,也实时写入本地 SQLite。
- 如果走长连接(
Asio+ 自定义协议或使用 WebSocket),服务器收到新消息后直接广播给在线客户端,并提示客户端写到本地。 - 如果客户端离线,新消息积累在服务器,下一次登录再按 above 流程拉取。
聊天消息表
下面给出消息聊天表的字段和解释,包含了message_id, thread_id以及常见的其他字段
CREATE TABLE `chat_message` (`message_id` BIGINT UNSIGNED NOT NULL AUTO_INCREMENT,`thread_id` BIGINT UNSIGNED NOT NULL,`sender_id` BIGINT UNSIGNED NOT NULL,`recv_id` BIGINT UNSIGNED NOT NULL,`content` TEXT NOT NULL,`created_at` TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,`updated_at` TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,`status` TINYINT NOT NULL DEFAULT 0 COMMENT '0=未读 1=已读 2=撤回',PRIMARY KEY (`message_id`),KEY `idx_thread_created` (`thread_id`, `created_at`),KEY `idx_thread_message` (`thread_id`, `message_id`)) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci;
字段说明
message_id:全局自增主键,唯一标识一条消息。thread_id:会话(单聊、群聊)ID,同一会话下的所有消息共用一个thread_id。sender_id:发送者用户 ID,指向用户表的主键。recv_id: 接收者用户ID,指向用户表主键content:消息正文,TEXT 类型,适合存储普通文字。created_at:消息创建时间,自动记录插入时刻。updated_at:消息更新时间,可用于标记“撤回”(status 变更)、编辑等操作。status:消息状态,用于标记未读/已读/撤回等(也可扩展更多状态)。
索引设计
- 主键索引:
PRIMARY KEY (message_id)用于唯一检索消息。 - 会话+时间索引:
KEY (thread_id, created_at)支持按会话分页、按时间范围查询。 - 会话+消息ID 索引:
KEY (thread_id, message_id)支持按message_id做增量拉取(WHERE thread_id=… AND message_id > since_id)。
可选扩展
- 群聊用户表:如果支持群聊,需要一个
thread_member表,记录每个thread_id下的成员及其角色。 - 附件支持:若要存储图片/文件,可额外建
message_attachment表,字段例如attachment_id、message_id、file_url、file_type。 - 已读回执:单独设计
message_read表,记录哪些用户在何时已读了该消息,字段如(message_id, user_id, read_at)。
会话消息表
全局聊天线程表
建立chat_thread主表,给它一个全局自增id,记录所有私聊/群聊的线程统一入口
CREATE TABLE chat_thread (`id` BIGINT UNSIGNED NOT NULL AUTO_INCREMENT,`type` ENUM('private','group') NOT NULL,`created_at` TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,PRIMARY KEY (id));
单聊表设计
对于单聊,只有两个人,所以可以直接在private_chat表中定义两个字段存储user1_id和user2_id,这样能直接确定参与者
CREATE TABLE `private_chat` (`thread_id` BIGINT UNSIGNED NOT NULL COMMENT '引用chat_thread.id',`user1_id` BIGINT UNSIGNED NOT NULL,`user2_id` BIGINT UNSIGNED NOT NULL,`created_at` TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,PRIMARY KEY (`thread_id`),UNIQUE KEY `uniq_private_thread` (`user1_id`, `user2_id`), -- 保证每对用户只能有一个私聊会话-- 以下两行就是我们要额外加的复合索引KEY `idx_private_user1_thread` (`user1_id`, `thread_id`),KEY `idx_private_user2_thread` (`user2_id`, `thread_id`)) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
- 通过
user1_id和user2_id唯一确定一个单聊会话 - 询某两个用户的单聊时,直接
SELECT即可。
群聊表设计
群聊相较于单聊要复杂一些,需要记录每个群聊的多名成员及其角色、权限等信息
先建一个独立的会话(线程)表:
CREATE TABLE `group_chat` (`thread_id` BIGINT UNSIGNED NOT NULL COMMENT '引用chat_thread.id',`name` VARCHAR(255) DEFAULT NULL COMMENT '群聊名称',`created_at` TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,PRIMARY KEY (`thread_id`)) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
- 群聊会话表只存储群聊本身的信息(如群名称、创建时间等),
thread_id是唯一标识符
群聊成员表设计
- 群聊成员表用于存储群聊中各成员的信息(包括角色、加入时间、禁言等)。
CREATE TABLE `group_chat_member` (`thread_id` BIGINT UNSIGNED NOT NULL COMMENT '引用 group_chat_thread.thread_id',`user_id` BIGINT UNSIGNED NOT NULL COMMENT '引用 user.user_id',`role` TINYINT NOT NULL DEFAULT 0 COMMENT '0=普通成员,1=管理员,2=创建者',`joined_at` TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,`muted_until` TIMESTAMP NULL COMMENT '如果被禁言,可存到什么时候',PRIMARY KEY (`thread_id`, `user_id`),KEY `idx_user_threads` (`user_id`)) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
前端聊天框调整
回顾
我们先回顾一下之前设计的聊天框

对于我们自己发出的信息,我们可以实现这样一个网格布局管理

NameLabel用来显示用户的名字,Bubble用来显示聊天信息,Spacer是个弹簧,保证将NameLabel``,IconLabel,Bubble等挤压到右侧。
如果是别人发出的消息,我们设置这样一个网格布局

增加状态标签
因为自己发送的时候要增加发送状态(发送失败,未读,已读)三种,所以考虑将自己发送的消息改为如下

大体结构如下
列0 列1 列2 列3┌───────┬───────────┬────────────┬──────────┐行 0 │ │ (空) │ m_pNameLabel │ m_pIconLabel ││ │ (col=1, │ (右对齐+8px)│ (跨两行、靠上) ││ │ 未放置)│ │ │├───────┼───────────┼────────────┴──────────┤行 1 │ pSpacer│ m_pStatusLabel │ m_pBubble │ m_pIconLabel ││ │ (row=1, │ (聊天气泡) │ (继续占位) ││ │ col=1) │ │ │└───────┴───────────┴──────────────────────┘
代码修改如下
ChatItemBase::ChatItemBase(ChatRole role, QWidget *parent): QWidget(parent), m_role(role){m_pNameLabel = new QLabel();m_pNameLabel->setObjectName("chat_user_name");QFont font("Microsoft YaHei");font.setPointSize(9);m_pNameLabel->setFont(font);m_pNameLabel->setFixedHeight(20);m_pIconLabel = new QLabel();m_pIconLabel->setScaledContents(true);m_pIconLabel->setFixedSize(42, 42);m_pBubble = new QWidget();QGridLayout *pGLayout = new QGridLayout();pGLayout->setVerticalSpacing(3);pGLayout->setHorizontalSpacing(3);pGLayout->setMargin(3);QSpacerItem*pSpacer = new QSpacerItem(40, 20, QSizePolicy::Expanding, QSizePolicy::Minimum);//添加状态图标控件m_pStatusLabel = new QLabel();m_pStatusLabel->setFixedSize(16, 16);m_pStatusLabel->setScaledContents(true);if(m_role == ChatRole::Self){m_pNameLabel->setContentsMargins(0,0,8,0);m_pNameLabel->setAlignment(Qt::AlignRight);//名字标签pGLayout->addWidget(m_pNameLabel, 0,2, 1,1);//icon 头像pGLayout->addWidget(m_pIconLabel, 0, 3, 2,1, Qt::AlignTop);//第 0 列:依然是 pSpacer,占用第 1 行,第 0 列pGLayout->addItem(pSpacer, 1, 0, 1, 1);//气泡控件pGLayout->addWidget(m_pBubble, 1,2, 1,1);//状态图标pGLayout->addWidget(m_pStatusLabel, 1, 1, 1, 1, Qt::AlignCenter);pGLayout->setColumnStretch(0, 2);pGLayout->setColumnStretch(1, 0); // status 图标 (固定大小)pGLayout->setColumnStretch(2, 3); // 名字 + 气泡 (主要拉伸区域)pGLayout->setColumnStretch(3, 0); // 头像 (固定大小)}else{m_pNameLabel->setContentsMargins(8,0,0,0);m_pNameLabel->setAlignment(Qt::AlignLeft);pGLayout->addWidget(m_pIconLabel, 0, 0, 2,1, Qt::AlignTop);pGLayout->addWidget(m_pNameLabel, 0,1, 1,1);pGLayout->addWidget(m_pBubble, 1,1, 1,1);pGLayout->addItem(pSpacer, 2, 2, 1, 1);pGLayout->setColumnStretch(1, 3);pGLayout->setColumnStretch(2, 2);}this->setLayout(pGLayout);}
增加接口设置状态
void ChatItemBase::setStatus(int status) {if (status == MsgStatus::UN_READ) {m_pStatusLabel->setPixmap(QPixmap(":/res/unread.png"));return;}if (status == MsgStatus::SEND_FAILE) {m_pStatusLabel->setPixmap(QPixmap(":/res/send_fail.png"));return;}if (status == MsgStatus::READED) {m_pStatusLabel->setPixmap(QPixmap(":/res/readed.png"));return;}}
客户端同步流程
客户端本地会有sql记录该用户所有聊天记录最后收到的消息信息,包括message_id,thread_id等,每次客户端登录将本地最大messag_id和thread_id发送给服务器,服务器按照每个thread_id将信息恢复给客户端,可支持分页返回。
举例
比如第一次请求,客户端携带message_id为1001,thread_id为22,那么服务器就会去chat_message中升序查找,比message_id(1001)大且thread_id为22的消息,返回20条
客户端拿到20条消息后,可根据最后一个消息messag_id继续请求消息。
所以我们得出一个结论要拉取消息就要有thread_id以及message_id。
接下来的情形分为两种
情况1
本地有thread_id,但是在该用户A离线的时候B用户给他发消息,因为他们之前没有聊过天,所以此时B会通知服务器在private_chat表中创建新的thread_id,但是A本地数据库没有这个thread_id,所以A需要在登录时拉取.
拉取就传递目前A本地数据库中最大的thead_id以及自己的user_id给服务器,服务器去查找比这个thread_id大的会话列表返回即可,采取分页的方式,每次加载100个,并配合load_more字段通知客户端是否继续拉取
如果load_more字段为true则客户端继续拉取,传递上次服务器给它同步的最大的thread_id,服务器继续返回比thread_id大的会话列表。
直到load_more为false,客户端不再拉取。
情况2
如果客户端换了新机器,本地没有记录信息,那么就需要在用户登录后向服务器发送user_id和thread_id,thread_id 请求从 0 开始,服务器将返回该用户的所有聊天thread_id,必须分页返回,并且携带 load_more 字段,字段和上面类似。
一个服务器返回的数据格式如下
{"error":0,"uid" : 1001,"load_more":true,"threads":[{"thread_id": 1001,"type": "private","user1_id": 1019,"user2_id": 1020},{"thread_id": 1002,"type": "group","user1_id": 0,"user2_id": 0,},{"thread_id": 1003,"type": "private","user1_id": 1019,"user2_id": 1021},{"thread_id": 1004,"type": "group","user1_id": 0,"user2_id": 0}]}
可采用如下sql语句查询
-- 1) CTE 把私聊/群聊合并好WITH all_threads AS (SELECTthread_id,'private' AS type,user1_id,user2_idFROM private_chatWHERE (user1_id = :me OR user2_id = :me)AND thread_id > :last_idUNION ALLSELECTthread_id,'group' AS type,NULL AS user1_id,NULL AS user2_idFROM group_chat_memberWHERE user_id = :meAND thread_id > :last_id)-- 2) 按 thread_id 升序,取 page_size+1 条SELECT *FROM all_threadsORDER BY thread_idLIMIT :page_size + 1;
然后在服务端(伪代码)处理结果:
def fetch_threads(me, last_id, page_size):rows = db.query(sql, { "me": me, "last_id": last_id, "page_size": page_size })# rows 最多有 page_size+1 条if len(rows) > page_size:load_more = Truerows = rows[:-1] # 丢掉第 page_size+1 条else:load_more = False# 更新下一次游标:取最后一条的 thread_idif rows:next_last_id = rows[-1]["thread_id"]else:next_last_id = last_idreturn {"data": rows,"next_last_id": next_last_id,"load_more": load_more}
说明
- 为什么要多取 1 条?
- 取
page_size + 1条后,如果结果确实多出那 1 条,就说明“在本页之后”还有数据; - 如果正好只有
page_size条或更少,就可以断定已经取尽。
- 取
- 游标(cursor)模式 vs OFFSET
- 用游标(
thread_id > last_id)可以保证性能,避免大 OFFSET 带来的全表扫描。 - 每次请求只跑新数据所在的索引范围。
- 用游标(
- 客户端流程
- 初次加载:传
last_id = 0; - 点「加载更多」:传上次接口返回的
next_last_id; - 收到
load_more = false:表示已到末尾,不要再发更多请求。
- 初次加载:传
当然为了提升效率,可以在用户登录后,选择是否同步消息的勾选框
如果勾选则调用上述sql语句查询该用户所有chat_thread返回。
如果没勾选,就不用加载chat_thread。
重构聊天item
需要重构聊天左侧item列表结构,以支持聊天消息记录持久化存储。
默认情况下,会检索本地客户端是否有聊天记录信息,
如果没有则需要请求所有thread_id列表,然后更新左侧item列表。
如果有,也需要差异化加载 thread_id 列表,比如说 A 下线了,B 和 A 通信,A 之前没有收到过 B 的信息,所以也要拉取所有新建立的会话。
所以当务之急是先把这个聊天列表加载好
因为我们没有为客户端设置本地数据库,所以我们默认每次用户登录都请求一下所有thread_id列表,这样方便测试效果
Server返回聊天列表
Server需要根据用户uid返回他的聊天列表
1 注册消息
_fun_callbacks[ID_LOAD_CHAT_THREAD_REQ] = std::bind(&LogicSystem::GetUserThreadsHandler, this,placeholders::_1, placeholders::_2, placeholders::_3);
2 实现获取聊天记录逻辑
void LogicSystem::GetUserThreadsHandler(std::shared_ptr<CSession> session,const short& msg_id, const string& msg_data){//从数据库加chat_threads记录Json::Reader reader;Json::Value root;reader.parse(msg_data, root);auto uid = root["uid"].asInt();std::cout << "get uid threads " << uid << std::endl;Json::Value rtvalue;rtvalue["error"] = ErrorCodes::Success;rtvalue["uid"] = uid;Defer defer([this, &rtvalue, session]() {std::string return_str = rtvalue.toStyledString();session->Send(return_str, ID_LOAD_CHAT_THREAD_RSP);});std::vector<std::shared_ptr<ChatThreadInfo>> threads;bool res = GetUserThreads(uid, threads);if (!res) {rtvalue["error"] = ErrorCodes::UidInvalid;return;}//整理threads数据写入json返回for (auto& thread : threads) {Json::Value thread_value;thread_value["thread_id"] = int(thread->_thread_id);thread_value["type"] = thread->_type;thread_value["user1_id"] = thread->_user1_id;thread_value["user2_id"] = thread->_user2_id;rtvalue["threads"].append(thread_value);}}bool LogicSystem::GetUserThreads(int userId,std::vector<std::shared_ptr<ChatThreadInfo>>& threads){return MysqlMgr::GetInstance()->GetUserThreads(userId, threads);}
3 数据库加载聊天
// 新增两个输出参数:loadMore, nextLastIdbool MysqlDao::GetUserThreads(int64_t userId,int64_t lastId,int pageSize,std::vector<std::shared_ptr<ChatThreadInfo>>& threads,bool& loadMore,int64_t& nextLastId){// 初始状态loadMore = false;nextLastId = lastId;threads.clear();auto con = pool_->getConnection();if (!con) {return false;}Defer defer([this, &con]() {pool_->returnConnection(std::move(con));});auto& conn = con->_con;try {// 准备分页查询:CTE + UNION ALL + ORDER + LIMIT N+1std::string sql ="WITH all_threads AS ( "" SELECT thread_id, 'private' AS type, user1_id, user2_id "" FROM private_chat "" WHERE (user1_id = ? OR user2_id = ?) "" AND thread_id > ? "" UNION ALL "" SELECT thread_id, 'group' AS type, 0 AS user1_id, 0 AS user2_id "" FROM group_chat_member "" WHERE user_id = ? "" AND thread_id > ? "") ""SELECT thread_id, type, user1_id, user2_id "" FROM all_threads "" ORDER BY thread_id "" LIMIT ?;";std::unique_ptr<sql::PreparedStatement> pstmt(conn->prepareStatement(sql));// 绑定参数:? 对应 (userId, userId, lastId, userId, lastId, pageSize+1)int idx = 1;pstmt->setInt64(idx++, userId); // private.user1_idpstmt->setInt64(idx++, userId); // private.user2_idpstmt->setInt64(idx++, lastId); // private.thread_id > lastIdpstmt->setInt64(idx++, userId); // group.user_idpstmt->setInt64(idx++, lastId); // group.thread_id > lastIdpstmt->setInt(idx++, pageSize + 1); // LIMIT pageSize+1// 执行std::unique_ptr<sql::ResultSet> res(pstmt->executeQuery());// 先把所有行读到临时容器std::vector<std::shared_ptr<ChatThreadInfo>> tmp;while (res->next()) {auto cti = std::make_shared<ChatThreadInfo>();cti->_thread_id = res->getInt64("thread_id");cti->_type = res->getString("type");cti->_user1_id = res->getInt64("user1_id");cti->_user2_id = res->getInt64("user2_id");tmp.push_back(cti);}// 判断是否多取到一条if ((int)tmp.size() > pageSize) {loadMore = true;tmp.pop_back(); // 丢掉第 pageSize+1 条}// 如果还有数据,更新 nextLastId 为最后一条的 thread_idif (!tmp.empty()) {nextLastId = tmp.back()->_thread_id;}// 移入输出向量threads = std::move(tmp);}catch (sql::SQLException& e) {std::cerr << "SQLException: " << e.what()<< " (MySQL error code: " << e.getErrorCode()<< ", SQLState: " << e.getSQLState() << ")\n";return false;}return true;}
客户端请求聊天列表
1 完善loading对话框
完善加载对话框,调整下布局,增加一个label和旋转gif的布局

布局界面

接下来调整下代码
#ifndef LOADINGDLG_H#define LOADINGDLG_H#include <QDialog>namespace Ui {class LoadingDlg;}class LoadingDlg : public QDialog{Q_OBJECTpublic:explicit LoadingDlg(QWidget *parent = nullptr, QString tip = "Loading...");~LoadingDlg();private:Ui::LoadingDlg *ui;};#endif // LOADINGDLG_H
具体实现
LoadingDlg::LoadingDlg(QWidget *parent, QString tip):QDialog(parent),ui(new Ui::LoadingDlg){ui->setupUi(this);// 1. 让这个 Widget 透明背景、无边框、拦截底部事件setWindowFlags(Qt::Dialog | Qt::FramelessWindowHint | Qt::WindowSystemMenuHint | Qt::WindowStaysOnTopHint);setAttribute(Qt::WA_TranslucentBackground);// 设置背景透明// 2. 让它覆盖父窗口整个面积if (parent) {// 获取屏幕尺寸setFixedSize(parent->size()); // 设置对话框为全屏尺寸}if (parent) {QPoint topLeft = parent->mapToGlobal(QPoint(0, 0));move(topLeft);}// 3. 半透明黑色背景(alpha = 128,大约 50% 透明度)// setStyleSheet("background-color: rgba(0, 0, 0, 128);");QMovie *movie = new QMovie(":/res/loading2.gif"); // 加载动画的资源文件ui->loading_lb->setMovie(movie);movie->start();// 3. 告诉 QMovie:将解码后的每一帧缩放到 100×100(固定大小)movie->setScaledSize(ui->loading_lb->size());ui->status_lb->setText(tip);}LoadingDlg::~LoadingDlg(){delete ui;}
2 加载聊天记录
之前没有从数据库加载聊天记录,只是模拟从本地好友中加载为聊天记录了,现在需要将这部分从ChatDialog构造函数中移除
改为从服务器申请,并且此时展示LoadingDlg对话框,直到获取记录后,将LoadingDlg移除。
因为获取服务器记录是通过网络获取的,所以在客户端的TcpMgr中通过信号发送给ChatDialog界面
所以ChatDialog的构造函数改为如下
ChatDialog::ChatDialog(QWidget* parent) :QDialog(parent),ui(new Ui::ChatDialog), _b_loading(false), _mode(ChatUIMode::ChatMode),_state(ChatUIMode::ChatMode), _last_widget(nullptr), _cur_chat_uid(0), _loading_dlg(nullptr){ui->setupUi(this);ui->add_btn->SetState("normal", "hover", "press");ui->add_btn->setProperty("state", "normal");QAction* searchAction = new QAction(ui->search_edit);searchAction->setIcon(QIcon(":/res/search.png"));ui->search_edit->addAction(searchAction, QLineEdit::LeadingPosition);ui->search_edit->setPlaceholderText(QStringLiteral("搜索"));// 创建一个清除动作并设置图标QAction* clearAction = new QAction(ui->search_edit);clearAction->setIcon(QIcon(":/res/close_transparent.png"));// 初始时不显示清除图标// 将清除动作添加到LineEdit的末尾位置ui->search_edit->addAction(clearAction, QLineEdit::TrailingPosition);// 当需要显示清除图标时,更改为实际的清除图标connect(ui->search_edit, &QLineEdit::textChanged, [clearAction](const QString& text) {if (!text.isEmpty()) {clearAction->setIcon(QIcon(":/res/close_search.png"));}else {clearAction->setIcon(QIcon(":/res/close_transparent.png")); // 文本为空时,切换回透明图标}});// 连接清除动作的触发信号到槽函数,用于清除文本connect(clearAction, &QAction::triggered, [this, clearAction]() {ui->search_edit->clear();clearAction->setIcon(QIcon(":/res/close_transparent.png")); // 清除文本后,切换回透明图标ui->search_edit->clearFocus();//清除按钮被按下则不显示搜索框ShowSearch(false);});ui->search_edit->SetMaxLength(15);//连接加载信号和槽connect(ui->chat_user_list, &ChatUserList::sig_loading_chat_user, this, &ChatDialog::slot_loading_chat_user);//模拟加载自己头像QString head_icon = UserMgr::GetInstance()->GetIcon();QPixmap pixmap(head_icon); // 加载图片QPixmap scaledPixmap = pixmap.scaled(ui->side_head_lb->size(), Qt::KeepAspectRatio, Qt::SmoothTransformation); // 将图片缩放到label的大小ui->side_head_lb->setPixmap(scaledPixmap); // 将缩放后的图片设置到QLabel上ui->side_head_lb->setScaledContents(true); // 设置QLabel自动缩放图片内容以适应大小ui->side_chat_lb->setProperty("state", "normal");ui->side_chat_lb->SetState("normal", "hover", "pressed", "selected_normal", "selected_hover", "selected_pressed");ui->side_contact_lb->SetState("normal", "hover", "pressed", "selected_normal", "selected_hover", "selected_pressed");ui->side_settings_lb->SetState("normal", "hover", "pressed", "selected_normal", "selected_hover", "selected_pressed");AddLBGroup(ui->side_chat_lb);AddLBGroup(ui->side_contact_lb);AddLBGroup(ui->side_settings_lb);connect(ui->side_chat_lb, &StateWidget::clicked, this, &ChatDialog::slot_side_chat);connect(ui->side_contact_lb, &StateWidget::clicked, this, &ChatDialog::slot_side_contact);connect(ui->side_settings_lb, &StateWidget::clicked, this, &ChatDialog::slot_side_setting);//链接搜索框输入变化connect(ui->search_edit, &QLineEdit::textChanged, this, &ChatDialog::slot_text_changed);ShowSearch(false);//检测鼠标点击位置判断是否要清空搜索框this->installEventFilter(this); // 安装事件过滤器//设置聊天label选中状态ui->side_chat_lb->SetSelected(true);//设置选中条目SetSelectChatItem();//更新聊天界面信息SetSelectChatPage();//连接加载联系人的信号和槽函数connect(ui->con_user_list, &ContactUserList::sig_loading_contact_user,this, &ChatDialog::slot_loading_contact_user);//连接联系人页面点击好友申请条目的信号connect(ui->con_user_list, &ContactUserList::sig_switch_apply_friend_page,this, &ChatDialog::slot_switch_apply_friend_page);//连接清除搜索框操作connect(ui->friend_apply_page, &ApplyFriendPage::sig_show_search, this, &ChatDialog::slot_show_search);//为searchlist 设置search editui->search_list->SetSearchEdit(ui->search_edit);//连接申请添加好友信号connect(TcpMgr::GetInstance().get(), &TcpMgr::sig_friend_apply, this, &ChatDialog::slot_apply_friend);//连接认证添加好友信号connect(TcpMgr::GetInstance().get(), &TcpMgr::sig_add_auth_friend, this, &ChatDialog::slot_add_auth_friend);//链接自己认证回复信号connect(TcpMgr::GetInstance().get(), &TcpMgr::sig_auth_rsp, this,&ChatDialog::slot_auth_rsp);//连接点击联系人item发出的信号和用户信息展示槽函数connect(ui->con_user_list, &ContactUserList::sig_switch_friend_info_page,this, &ChatDialog::slot_friend_info_page);//设置中心部件为chatpageui->stackedWidget->setCurrentWidget(ui->chat_page);//连接searchlist跳转聊天信号connect(ui->search_list, &SearchList::sig_jump_chat_item, this, &ChatDialog::slot_jump_chat_item);//连接好友信息界面发送的点击事件connect(ui->friend_info_page, &FriendInfoPage::sig_jump_chat_item, this,&ChatDialog::slot_jump_chat_item_from_infopage);//连接聊天列表点击信号connect(ui->chat_user_list, &QListWidget::itemClicked, this, &ChatDialog::slot_item_clicked);//连接对端消息通知connect(TcpMgr::GetInstance().get(), &TcpMgr::sig_text_chat_msg,this, &ChatDialog::slot_text_chat_msg);connect(ui->chat_page, &ChatPage::sig_append_send_chat_msg, this, &ChatDialog::slot_append_send_chat_msg);_timer = new QTimer(this);connect(_timer, &QTimer::timeout, this, [this]() {auto user_info = UserMgr::GetInstance()->GetUserInfo();QJsonObject textObj;textObj["fromuid"] = user_info->_uid;QJsonDocument doc(textObj);QByteArray jsonData = doc.toJson(QJsonDocument::Compact);emit TcpMgr::GetInstance()->sig_send_data(ReqId::ID_HEART_BEAT_REQ, jsonData);});_timer->start(10000);//连接tcp返回的加载聊天回复connect(TcpMgr::GetInstance().get(), &TcpMgr::sig_load_chat_thread,this, &ChatDialog::slot_load_chat_thread);}
当用户登录成功后会切换到聊天页面,此时请求聊天列表
void MainWindow::SlotSwitchChat(){_chat_dlg = new ChatDialog();_chat_dlg->setWindowFlags(Qt::CustomizeWindowHint|Qt::FramelessWindowHint);setCentralWidget(_chat_dlg);_chat_dlg->show();_login_dlg->hide();this->setMinimumSize(QSize(1050,900));this->setMaximumSize(QWIDGETSIZE_MAX, QWIDGETSIZE_MAX);_ui_status = CHAT_UI;_chat_dlg->loadChatList();}
通过发送请求获取聊天记录
void ChatDialog::loadChatList(){showLoadingDlg(true);//发送请求逻辑QJsonObject jsonObj;auto uid = UserMgr::GetInstance()->GetUid();jsonObj["uid"] = uid;int last_chat_thread_id = UserMgr::GetInstance()->GetLastChatThreadId();jsonObj["thread_id"] = last_chat_thread_id;QJsonDocument doc(jsonObj);QByteArray jsonData = doc.toJson(QJsonDocument::Compact);//发送tcp请求给chat serveremit TcpMgr::GetInstance()->sig_send_data(ReqId::ID_LOAD_CHAT_THREAD_REQ, jsonData);}
TCPMgr注册从服务器获取回复的消息处理
_handlers.insert(ID_LOAD_CHAT_THREAD_RSP, [this](ReqId id, int len, QByteArray data) {Q_UNUSED(len);qDebug() << "handle id is " << id << " data is " << data;// 将QByteArray转换为QJsonDocumentQJsonDocument jsonDoc = QJsonDocument::fromJson(data);// 检查转换是否成功if (jsonDoc.isNull()) {qDebug() << "Failed to create QJsonDocument.";return;}QJsonObject jsonObj = jsonDoc.object();if (!jsonObj.contains("error")) {int err = ErrorCodes::ERR_JSON;qDebug() << "chat thread json parse failed " << err;return;}int err = jsonObj["error"].toInt();if (err != ErrorCodes::SUCCESS) {qDebug() << "get chat thread rsp failed, error is " << err;return;}qDebug() << "Receive chat thread rsp Success";auto thread_array = jsonObj["threads"].toArray();std::vector<std::shared_ptr<ChatThreadInfo>> chat_threads;for (const QJsonValue& value : thread_array) {auto cti = std::make_shared<ChatThreadInfo>();cti->_thread_id = value["thread_id"].toInt();cti->_type = value["type"].toString();cti->_user1_id = value["user1_id"].toInt();cti->_user2_id = value["user2_id"].toInt();chat_threads.push_back(cti);}bool load_more = jsonObj["load_more"].toBool();int next_last_id = jsonObj["next_last_id"].toInt();//发送信号通知界面emit sig_load_chat_thread(load_more, next_last_id, chat_threads);});
ChatDialog接收TcpMgr发送的sig_load_chat_thread消息,然后触发如下函数,该函数主要加载聊天列表并且消除加载动画
void ChatDialog::slot_load_chat_thread(bool load_more, int last_thread_id,std::vector<std::shared_ptr<ChatThreadInfo>> chat_threads){for (auto& cti : chat_threads) {//先处理单聊,群聊跳过,以后添加if (cti->_type == "group") {continue;}auto uid = UserMgr::GetInstance()->GetUid();auto other_uid = 0;if (uid == cti->_user1_id) {other_uid = cti->_user2_id;}else {other_uid = cti->_user1_id;}auto friend_info = UserMgr::GetInstance()->GetFriendById(other_uid);if (!friend_info) {continue;}auto* chat_user_wid = new ChatUserWid();auto user_info = std::make_shared<UserInfo>(friend_info);chat_user_wid->SetInfo(user_info);QListWidgetItem* item = new QListWidgetItem;//qDebug()<<"chat_user_wid sizeHint is " << chat_user_wid->sizeHint();item->setSizeHint(chat_user_wid->sizeHint());ui->chat_user_list->addItem(item);ui->chat_user_list->setItemWidget(item, chat_user_wid);_chat_items_added.insert(user_info->_uid, item);auto chat_thread_data = std::make_shared<ChatThreadData>();chat_thread_data->_user1_id = uid;chat_thread_data->_user2_id = other_uid;chat_thread_data->_last_msg_id = 0;chat_thread_data->_thread_id = cti->_thread_id;UserMgr::GetInstance()->AddChatThreadData(chat_thread_data);}UserMgr::GetInstance()->SetLastChatThreadId(last_thread_id);if (load_more) {//发送请求逻辑QJsonObject jsonObj;auto uid = UserMgr::GetInstance()->GetUid();jsonObj["uid"] = uid;jsonObj["thread_id"] = last_thread_id;QJsonDocument doc(jsonObj);QByteArray jsonData = doc.toJson(QJsonDocument::Compact);//发送tcp请求给chat serveremit TcpMgr::GetInstance()->sig_send_data(ReqId::ID_LOAD_CHAT_THREAD_REQ, jsonData);return;}//更新聊天界面信息SetSelectChatItem();SetSelectChatPage();showLoadingDlg(false);}
数据库构建
去navicat中执行上面数据模型设计中提到的几个sql语句
1 创建chat_thread
2 创建group_chat

成功后显示

3 创建group_member

成功后显示表

4 创建私聊表

成功后显示

注意: 创建后没有数据,数据是我自己添加的,为了方便测试
开启服务器,客户端登陆后加载数据
效果如下

首次单聊
A和B是好友,首次单聊,A发送给服务器创建聊天的请求。
服务器根据A的创建请求创建私聊,然后返回给客户端A。
注意
因为聊天服务是异步的,而且是分布式的,所以有可能对方B就在此时发送消息给A,服务器已经创建好了,或者服务器正在调用sql创建。
所以对于创建请求,sql需要先查询是否已经被其他人创建了thread_id, 我们可以制定一个规则,任何一方创建thread_id,在写入私聊表private_chat时都需要保证最小的uid为uid1_id, 大的在uid2_id, 这样查询的时候也方便。
这个查询要加行级锁,避免分布式造成数据混乱。
总结
所以创建单聊时,要先去private_chat表根据uid查询,如果查到了则返回这个thread_id, 这个查询要加行级锁。
如果没查到,则在chat_thread表创建thread_id并且插入private_chat表
思路
我们整理下思路
1) 查询是否已存在私聊会话,如果存在则加锁行并返回 thread_id
SELECT thread_idFROM private_chatWHERE (user1_id = LEAST(:user1_id, :user2_id) AND user2_id = GREATEST(:user1_id, :user2_id))FOR UPDATE; -- 使用行级锁,避免并发冲突
查询时使用 LEAST 和 GREATEST 来保证无论是 user1_id 还是 user2_id,都将较小的 ID 存放在 user1_id,较大的存放在 user2_id。这样可以避免不同的用户顺序导致查找不到匹配的记录。
FOR UPDATE 关键字会锁定这些查询行,确保在事务结束之前不会有其他并发的操作修改数据。
- 如果未找到数据(查询返回空),则插入新记录:
-- 1. 在 chat_thread 表中创建新记录INSERT INTO chat_thread (type, created_at)VALUES ('private', NOW());-- 2. 获取新插入的 thread_id(假设你可以通过 LAST_INSERT_ID 获取)SELECT LAST_INSERT_ID();
- 将新生成的 thread_id 插入 private_chat 表
INSERT INTO private_chat (thread_id, user1_id, user2_id, created_at)VALUES (:new_thread_id, LEAST(:user1_id, :user2_id), GREATEST(:user1_id, :user2_id), NOW());
使用 INSERT INTO chat_thread 创建新的聊天记录,并使用 LAST_INSERT_ID() 获取新生成的 thread_id。
将新 thread_id 插入到 private_chat 表中,同时使用 LEAST 和 GREATEST 确保较小的 ID 存入 user1_id,较大的存入 user2_id。
问题分析
- 行级锁的生命周期:
行级锁(通过FOR UPDATE获得的锁)只在当前事务中有效。当查询结束后,锁会被释放。也就是说,如果我们查询了是否存在private_chat的记录并加了锁,但在查询完成后进行插入chat_thread和private_chat的操作时,其他并发请求可能会先插入新的私聊记录,从而造成数据冲突。 - 可能的并发问题:
例如:- 线程 A 执行查询,锁定了
private_chat表的行; - 线程 B 也执行了相同的查询,发现没有记录,于是开始插入
chat_thread; - 线程 A 完成插入
chat_thread和private_chat,但线程 B 也在此时完成了它的插入,导致private_chat表中出现两个重复的记录。
- 线程 A 执行查询,锁定了
解决方案
为了确保并发操作的安全性,我们可以使用 事务 来保证在查询、插入 chat_thread 和 private_chat 表的过程中,数据的一致性和原子性。具体步骤如下:
方案:使用事务(Atomic Transaction)
我们可以使用 事务 来确保操作的一致性,整个操作从查询到插入都在一个事务中进行。这样即使存在多个并发请求,也能保证同一时间只有一个请求可以成功创建 chat_thread 和 private_chat。
关键改动:
- 在查询时加行级锁。
- 确保所有的数据库操作(查询和插入)都在一个事务中进行,这样可以防止并发插入的问题。
- 使用事务提交(
commit)和回滚(rollback)确保数据一致性。
关键代码
bool MysqlDao::CreatePrivateChat(int user1_id, int user2_id, int& thread_id){auto con = pool_->getConnection();if (!con) {return false;}Defer defer([this, &con]() {pool_->returnConnection(std::move(con));});auto& conn = con->_con;try {// 开启事务conn->setAutoCommit(false);// 1. 查询是否已存在私聊并加行级锁int uid1 = std::min(user1_id, user2_id);int uid2 = std::max(user1_id, user2_id);std::string check_sql ="SELECT thread_id FROM private_chat ""WHERE (user1_id = ? AND user2_id = ?) ""FOR UPDATE;";std::unique_ptr<sql::PreparedStatement> pstmt(conn->prepareStatement(check_sql));pstmt->setInt64(1, uid1);pstmt->setInt64(2, uid2);std::unique_ptr<sql::ResultSet> res(pstmt->executeQuery());if (res->next()) {// 如果已存在,返回该 thread_idthread_id = res->getInt("thread_id");conn->commit(); // 提交事务return true;}// 2. 如果未找到,创建新的 chat_thread 和 private_chat 记录// 在 chat_thread 表插入新记录std::string insert_chat_thread_sql ="INSERT INTO chat_thread (type, created_at) VALUES ('private', NOW());";std::unique_ptr<sql::PreparedStatement> pstmt_insert_thread(conn->prepareStatement(insert_chat_thread_sql));pstmt_insert_thread->executeUpdate();// 获取新插入的 thread_idstd::string get_last_insert_id_sql = "SELECT LAST_INSERT_ID();";std::unique_ptr<sql::PreparedStatement> pstmt_last_insert_id(conn->prepareStatement(get_last_insert_id_sql));std::unique_ptr<sql::ResultSet> res_last_id(pstmt_last_insert_id->executeQuery());res_last_id->next();thread_id = res_last_id->getInt(1);// 3. 在 private_chat 表插入新记录std::string insert_private_chat_sql ="INSERT INTO private_chat (thread_id, user1_id, user2_id, created_at) ""VALUES (?, ?, ?, NOW());";std::unique_ptr<sql::PreparedStatement> pstmt_insert_private(conn->prepareStatement(insert_private_chat_sql));pstmt_insert_private->setInt64(1, thread_id);pstmt_insert_private->setInt64(2, uid1);pstmt_insert_private->setInt64(3, uid2);pstmt_insert_private->executeUpdate();// 提交事务conn->commit();return true;}catch (sql::SQLException& e) {std::cerr << "SQLException: " << e.what() << std::endl;conn->rollback();return false;}return false;}bool MysqlMgr::CreatePrivateChat(int user1_id, int user2_id, int& thread_id){return _dao.CreatePrivateChat(user1_id, user2_id, thread_id);}
LogicSystem添加创建聊天的回调函数,并且注册
void LogicSystem::CreatePrivateChat(std::shared_ptr<CSession> session, const short& msg_id, const string& msg_data){Json::Reader reader;Json::Value root;reader.parse(msg_data, root);auto uid = root["uid"].asInt();auto other_id = root["other_id"].asInt();Json::Value rtvalue;rtvalue["error"] = ErrorCodes::Success;rtvalue["uid"] = uid;rtvalue["other_id"] = other_id;Defer defer([this, &rtvalue, session]() {std::string return_str = rtvalue.toStyledString();session->Send(return_str, ID_LOAD_CHAT_THREAD_RSP);});int thread_id = 0;bool res = MysqlMgr::GetInstance()->CreatePrivateChat(uid, other_id, thread_id);if (!res) {rtvalue["error"] = ErrorCodes::CREATE_CHAT_FAILED;return;}rtvalue["thread_id"] = thread_id;}_fun_callbacks[ID_CREATE_PRIVATE_CHAT_REQ] = std::bind(&LogicSystem::CreatePrivateChat, this,placeholders::_1, placeholders::_2, placeholders::_3);
客户端完善
在好友信息界面
void FriendInfoPage::on_msg_chat_clicked(){qDebug() << "msg chat btn clicked";emit sig_jump_chat_item(_user_info);}
追踪这个信号,我们完善槽函数
void ChatDialog::slot_jump_chat_item_from_infopage(std::shared_ptr<UserInfo> user_info){qDebug() << "slot jump chat item " << endl;auto thread_id = UserMgr::GetInstance()->GetThreadIdByUid(user_info->_uid);if (thread_id != -1) {auto find_iter = _chat_thread_items.find(thread_id);if (find_iter != _chat_thread_items.end()) {qDebug() << "jump to chat item , uid is " << user_info->_uid;ui->chat_user_list->scrollToItem(find_iter.value());ui->side_chat_lb->SetSelected(true);SetSelectChatItem(user_info->_uid);//更新聊天界面信息SetSelectChatPage(user_info->_uid);slot_side_chat();return;} //说明之前有缓存过聊天列表,只是被删除了,那么重新加进来即可else {auto* chat_user_wid = new ChatUserWid();chat_user_wid->SetInfo(user_info);QListWidgetItem* item = new QListWidgetItem;qDebug() << "chat_user_wid sizeHint is " << chat_user_wid->sizeHint();ui->chat_user_list->insertItem(0, item);ui->chat_user_list->setItemWidget(item, chat_user_wid);_chat_thread_items.insert(thread_id, item);ui->side_chat_lb->SetSelected(true);SetSelectChatItem(user_info->_uid);//更新聊天界面信息SetSelectChatPage(user_info->_uid);slot_side_chat();return;}}//如果没找到,则发送创建请求auto uid = UserMgr::GetInstance()->GetUid();QJsonObject jsonObj;jsonObj["uid"] = uid;jsonObj["other_id"] = user_info->_uid;QJsonDocument doc(jsonObj);QByteArray jsonData = doc.toJson(QJsonDocument::Compact);//发送tcp请求给chat serveremit TcpMgr::GetInstance()->sig_send_data(ReqId::ID_CREATE_PRIVATE_CHAT_REQ, jsonData);}
客户端注册服务器返回的消息ID_CREATE_PRIVATE_CHAT_RSP,进行处理
_handlers.insert(ID_CREATE_PRIVATE_CHAT_RSP, [this](ReqId id, int len, QByteArray data) {Q_UNUSED(len);qDebug() << "handle id is " << id << " data is " << data;// 将QByteArray转换为QJsonDocumentQJsonDocument jsonDoc = QJsonDocument::fromJson(data);// 检查转换是否成功if (jsonDoc.isNull()) {qDebug() << "Failed to create QJsonDocument.";return;}QJsonObject jsonObj = jsonDoc.object();if (!jsonObj.contains("error")) {int err = ErrorCodes::ERR_JSON;qDebug() << "parse create private chat json parse failed " << err;return;}int err = jsonObj["error"].toInt();if (err != ErrorCodes::SUCCESS) {qDebug() << "get create private chat failed, error is " << err;return;}qDebug() << "Receive create private chat rsp Success";int uid = jsonObj["uid"].toInt();int other_id = jsonObj["other_id"].toInt();int thread_id = jsonObj["thread_id"].toInt();//发送信号通知界面emit sig_create_private_chat(uid, other_id, thread_id);});
编写槽函数和sig_create_private_chat连接,并且增加聊天条目
//连接tcp返回的创建私聊的回复connect(TcpMgr::GetInstance().get(), &TcpMgr::sig_create_private_chat,this, &ChatDialog::slot_create_private_chat);
具体处理的槽函数
void ChatDialog::slot_create_private_chat(int uid, int other_id, int thread_id){auto* chat_user_wid = new ChatUserWid();auto user_info = UserMgr::GetInstance()->GetFriendById(other_id);chat_user_wid->SetInfo(user_info);QListWidgetItem* item = new QListWidgetItem;item->setSizeHint(chat_user_wid->sizeHint());qDebug() << "chat_user_wid sizeHint is " << chat_user_wid->sizeHint();ui->chat_user_list->insertItem(0, item);ui->chat_user_list->setItemWidget(item, chat_user_wid);_chat_thread_items.insert(thread_id, item);auto chat_thread_data = std::make_shared<ChatThreadData>();chat_thread_data->_user1_id = uid;chat_thread_data->_user2_id = other_id;chat_thread_data->_last_msg_id = 0;chat_thread_data->_thread_id = thread_id;UserMgr::GetInstance()->AddChatThreadData(chat_thread_data, other_id);ui->side_chat_lb->SetSelected(true);SetSelectChatItem(user_info->_uid);//更新聊天界面信息SetSelectChatPage(user_info->_uid);slot_side_chat();return;}
聊天消息重构
ChaUserWid重构
之前我们的会话列表由一个一个的ChatUserWid构成

原来的ChatUserWid内部存储的是UserInfo结构,目前我们已经增加了ChatThread数据库内容,所以要将会话列表的每个ChatUserWid中存储ChatThreadData结构。
接下来我们定义这几个结构
class ChatUserWid : public ListItemBase{Q_OBJECTpublic:explicit ChatUserWid(QWidget *parent = nullptr);~ChatUserWid();QSize sizeHint() const override;void SetChatData(std::shared_ptr<ChatThreadData> chat_data);std::shared_ptr<ChatThreadData> GetChatData();void ShowRedPoint(bool bshow);void updateLastMsg(std::vector<std::shared_ptr<TextChatData>> msgs);private:Ui::ChatUserWid *ui;std::shared_ptr<ChatThreadData> _chat_data;};
具体定义
void ChatUserWid::SetChatData(std::shared_ptr<ChatThreadData> chat_data) {_chat_data = chat_data;auto other_id = _chat_data->GetOtherId();auto other_info = UserMgr::GetInstance()->GetFriendById(other_id);// 加载图片QPixmap pixmap(other_info->_icon);// 设置图片自动缩放ui->icon_lb->setPixmap(pixmap.scaled(ui->icon_lb->size(), Qt::KeepAspectRatio, Qt::SmoothTransformation));ui->icon_lb->setScaledContents(true);ui->user_name_lb->setText(other_info->_name);ui->user_chat_lb->setText(chat_data->GetLastMsg());}std::shared_ptr<ChatThreadData> ChatUserWid::GetChatData(){return _chat_data;}
这样我们就将聊天会话的信息写入到了ChatUserWid这样一个个小条目了。
消息类抽象
因为我们将来要存储文本,文件以及图片不同类型的消息,那么就将原来的消息抽象出一个基类
class ChatDataBase {public:ChatDataBase(int msg_id, int thread_id, ChatFormType form_type, ChatMsgType msg_type,QString content,int _send_uid);ChatDataBase(QString unique_id, int thread_id, ChatFormType form_type, ChatMsgType msg_type,QString content, int send_uid);int GetMsgId() { return _msg_id; }int GetThreadId() { return _thread_id; }ChatFormType GetFormType() { return _form_type; }ChatMsgType GetMsgType() { return _msg_type; }QString GetContent() { return _content; }int GetSendUid() { return _send_uid; }QString GetMsgContent(){return _content;}void SetUniqueId(int unique_id);QString GetUniqueId();private://客户端本地唯一标识QString _unique_id;//消息idint _msg_id;//会话idint _thread_id;//群聊还是私聊ChatFormType _form_type;//文本信息为0,图片为1,文件为2ChatMsgType _msg_type;QString _content;//发送者idint _send_uid;};
然后基于上面的基类,我们可以定义不同类型的消息,如文本消息
class TextChatData : public ChatDataBase {public:TextChatData(int msg_id, int thread_id, ChatFormType form_type, ChatMsgType msg_type, QString content,int send_uid):ChatDataBase(msg_id, thread_id, form_type, msg_type, content, send_uid){}TextChatData(QString unique_id, int thread_id, ChatFormType form_type, ChatMsgType msg_type, QString content,int send_uid):ChatDataBase(unique_id, thread_id, form_type, msg_type, content, send_uid){}};
有了这个文本消息后,我们可以将基类指针ChatDataBase存储起来,将来通过实现虚函数,进行多态调用.
ChatThreadData聊天线程
聊天线程数据,重构和完善
//客户端本地存储的聊天线程数据结构class ChatThreadData {public:ChatThreadData(int other_id, int thread_id, int last_msg_id):_other_id(other_id), _thread_id(thread_id), _last_msg_id(last_msg_id){}void AddMsg(std::shared_ptr<ChatDataBase> msg);void SetLastMsgId(int msg_id);void SetOtherId(int other_id);int GetOtherId();QString GetGroupName();QMap<int, std::shared_ptr<ChatDataBase>> GetMsgMap();int GetThreadId();QMap<int, std::shared_ptr<ChatDataBase>>& GetMsgMapRef();void AppendMsg(int msg_id, std::shared_ptr<ChatDataBase> base_msg);QString GetLastMsg();private://如果是私聊,则为对方的id;如果是群聊,则为0int _other_id;int _last_msg_id;int _thread_id;QString _last_msg;//群聊信息,成员列表std::vector<int> _group_members;//群聊名称QString _group_name;//缓存消息map,抽象为基类,因为会有图片等其他类型消息QMap<int, std::shared_ptr<ChatDataBase>> _msg_map;};
具体实现
void ChatThreadData::AddMsg(std::shared_ptr<ChatDataBase> msg){_msg_map.insert(msg->GetMsgId(), msg);}void ChatThreadData::SetLastMsgId(int msg_id){_last_msg_id = msg_id;}void ChatThreadData::SetOtherId(int other_id){_other_id = other_id;}int ChatThreadData::GetOtherId() {return _other_id;}QString ChatThreadData::GetGroupName(){return _group_name;}QMap<int, std::shared_ptr<ChatDataBase>> ChatThreadData::GetMsgMap() {return _msg_map;}int ChatThreadData::GetThreadId(){return _thread_id;}QMap<int, std::shared_ptr<ChatDataBase>>& ChatThreadData::GetMsgMapRef(){return _msg_map;}void ChatThreadData::AppendMsg(int msg_id, std::shared_ptr<ChatDataBase> base_msg) {_msg_map.insert(msg_id, base_msg);_last_msg = base_msg->GetMsgContent();_last_msg_id = msg_id;}QString ChatThreadData::GetLastMsg(){return _last_msg;}
好友认证
对于好友认证时,如果双方通过,也要默认建立聊天消息,并且产生会话列表.
我们先从这块接入聊天消息列表,完善整体流程
proto协议修改
因为认证添加好友后,会生成两条聊天信息(比如,我们已经是好友了等),同时通知给对方,协议格式增加和修改如下
message AddFriendMsg{int32 sender_id = 1;string unique_id = 2;int32 msg_id = 3;int32 thread_id = 4;string msgcontent = 5;}message AuthFriendReq{int32 fromuid = 1;int32 touid = 2;repeated AddFriendMsg textmsgs = 3;}message AuthFriendRsp{int32 error = 1;int32 fromuid = 2;int32 touid = 3;}
服务器接收好友申请
服务器收到A向B添加好友的请求,会更新数据库申请记录,同时转发给B
void LogicSystem::AddFriendApply(std::shared_ptr<CSession> session, const short& msg_id, const string& msg_data){Json::Reader reader;Json::Value root;reader.parse(msg_data, root);auto uid = root["uid"].asInt();auto desc = root["applyname"].asString();auto bakname = root["bakname"].asString();auto touid = root["touid"].asInt();std::cout << "user login uid is " << uid << " applydesc is "<< desc << " bakname is " << bakname << " touid is " << touid << endl;Json::Value rtvalue;rtvalue["error"] = ErrorCodes::Success;Defer defer([this, &rtvalue, session]() {std::string return_str = rtvalue.toStyledString();session->Send(return_str, ID_ADD_FRIEND_RSP);});//先更新数据库MysqlMgr::GetInstance()->AddFriendApply(uid, touid, desc, bakname);//查询redis 查找touid对应的server ipauto to_str = std::to_string(touid);auto to_ip_key = USERIPPREFIX + to_str;std::string to_ip_value = "";bool b_ip = RedisMgr::GetInstance()->Get(to_ip_key, to_ip_value);if (!b_ip) {return;}auto& cfg = ConfigMgr::Inst();auto self_name = cfg["SelfServer"]["Name"];std::string base_key = USER_BASE_INFO + std::to_string(uid);auto apply_info = std::make_shared<UserInfo>();bool b_info = GetBaseInfo(base_key, uid, apply_info);//直接通知对方有申请消息if (to_ip_value == self_name) {auto session = UserMgr::GetInstance()->GetSession(touid);if (session) {//在内存中则直接发送通知对方Json::Value notify;notify["error"] = ErrorCodes::Success;notify["applyuid"] = uid;notify["name"] = apply_info->name;notify["desc"] = desc;if (b_info) {notify["icon"] = apply_info->icon;notify["sex"] = apply_info->sex;notify["nick"] = apply_info->nick;}std::string return_str = notify.toStyledString();session->Send(return_str, ID_NOTIFY_ADD_FRIEND_REQ);}return ;}AddFriendReq add_req;add_req.set_applyuid(uid);add_req.set_touid(touid);add_req.set_name(apply_info->name);add_req.set_desc(desc);if (b_info) {add_req.set_icon(apply_info->icon);add_req.set_sex(apply_info->sex);add_req.set_nick(apply_info->nick);}//发送通知ChatGrpcClient::GetInstance()->NotifyAddFriend(to_ip_value,add_req);}
如果不在一个服务器,则通过grpc通知对端所在服务器, 对端服务器收到后,组织消息转发
Status ChatServiceImpl::NotifyAddFriend(ServerContext* context, const AddFriendReq* request, AddFriendRsp* reply){//查找用户是否在本服务器auto touid = request->touid();auto session = UserMgr::GetInstance()->GetSession(touid);Defer defer([request, reply]() {reply->set_error(ErrorCodes::Success);reply->set_applyuid(request->applyuid());reply->set_touid(request->touid());});//用户不在内存中则直接返回if (session == nullptr) {return Status::OK;}//在内存中则直接发送通知对方Json::Value rtvalue;rtvalue["error"] = ErrorCodes::Success;rtvalue["applyuid"] = request->applyuid();rtvalue["name"] = request->name();rtvalue["desc"] = request->desc();rtvalue["icon"] = request->icon();rtvalue["sex"] = request->sex();rtvalue["nick"] = request->nick();std::string return_str = rtvalue.toStyledString();session->Send(return_str, ID_NOTIFY_ADD_FRIEND_REQ);return Status::OK;}
服务器收到同意申请
当B客户同意添加好友,会将请求发送给服务器
服务器收到后会执行
void LogicSystem::AuthFriendApply(std::shared_ptr<CSession> session, const short& msg_id, const string& msg_data) {Json::Reader reader;Json::Value root;reader.parse(msg_data, root);auto uid = root["fromuid"].asInt();auto touid = root["touid"].asInt();auto back_name = root["back"].asString();std::cout << "from " << uid << " auth friend to " << touid << std::endl;Json::Value rtvalue;rtvalue["error"] = ErrorCodes::Success;auto user_info = std::make_shared<UserInfo>();std::string base_key = USER_BASE_INFO + std::to_string(touid);bool b_info = GetBaseInfo(base_key, touid, user_info);if (b_info) {rtvalue["name"] = user_info->name;rtvalue["nick"] = user_info->nick;rtvalue["icon"] = user_info->icon;rtvalue["sex"] = user_info->sex;rtvalue["uid"] = touid;}else {rtvalue["error"] = ErrorCodes::UidInvalid;}Defer defer([this, &rtvalue, session]() {std::string return_str = rtvalue.toStyledString();session->Send(return_str, ID_AUTH_FRIEND_RSP);});//先更新数据库, 放到事务中,此处不再处理//MysqlMgr::GetInstance()->AuthFriendApply(uid, touid);std::vector<std::shared_ptr<AddFriendMsg>> chat_datas;//更新数据库添加好友MysqlMgr::GetInstance()->AddFriend(uid, touid,back_name, chat_datas);//查询redis 查找touid对应的server ipauto to_str = std::to_string(touid);auto to_ip_key = USERIPPREFIX + to_str;std::string to_ip_value = "";bool b_ip = RedisMgr::GetInstance()->Get(to_ip_key, to_ip_value);if (!b_ip) {return;}auto& cfg = ConfigMgr::Inst();auto self_name = cfg["SelfServer"]["Name"];//直接通知对方有认证通过消息if (to_ip_value == self_name) {auto session = UserMgr::GetInstance()->GetSession(touid);if (session) {//在内存中则直接发送通知对方Json::Value notify;notify["error"] = ErrorCodes::Success;notify["fromuid"] = uid;notify["touid"] = touid;std::string base_key = USER_BASE_INFO + std::to_string(uid);auto user_info = std::make_shared<UserInfo>();bool b_info = GetBaseInfo(base_key, uid, user_info);if (b_info) {notify["name"] = user_info->name;notify["nick"] = user_info->nick;notify["icon"] = user_info->icon;notify["sex"] = user_info->sex;}else {notify["error"] = ErrorCodes::UidInvalid;}for(auto & chat_data : chat_datas){Json::Value chat;chat["sender"] = chat_data->sender_id();chat["msg_id"] = chat_data->msg_id();chat["thread_id"] = chat_data->thread_id();chat["unique_id"] = chat_data->unique_id();chat["msg_content"] = chat_data->msgcontent();notify["chat_datas"].append(chat);rtvalue["chat_datas"].append(chat);}std::string return_str = notify.toStyledString();session->Send(return_str, ID_NOTIFY_AUTH_FRIEND_REQ);}return ;}AuthFriendReq auth_req;auth_req.set_fromuid(uid);auth_req.set_touid(touid);for(auto& chat_data : chat_datas){auto text_msg = auth_req.add_textmsgs();text_msg->CopyFrom(*chat_data);Json::Value chat;chat["sender"] = chat_data->sender_id();chat["msg_id"] = chat_data->msg_id();chat["thread_id"] = chat_data->thread_id();chat["unique_id"] = chat_data->unique_id();chat["msg_content"] = chat_data->msgcontent();rtvalue["chat_datas"].append(chat);}//发送通知ChatGrpcClient::GetInstance()->NotifyAuthFriend(to_ip_value, auth_req);}
数据库处理
bool MysqlDao::AddFriend(const int& from, const int& to, std::string back_name,std::vector<std::shared_ptr<AddFriendMsg>>& chat_datas) {auto con = pool_->getConnection();if (con == nullptr) {return false;}Defer defer([this, &con]() {pool_->returnConnection(std::move(con));});try {//开始事务con->_con->setAutoCommit(false);std::string reverse_back;std::string apply_desc;{// 1. 锁定并读取std::unique_ptr<sql::PreparedStatement> selStmt(con->_con->prepareStatement("SELECT back_name, descs ""FROM friend_apply ""WHERE from_uid = ? AND to_uid = ? ""FOR UPDATE"));selStmt->setInt(1, to);selStmt->setInt(2, from);std::unique_ptr<sql::ResultSet> rsSel(selStmt->executeQuery());if (rsSel->next()) {reverse_back = rsSel->getString("back_name");apply_desc = rsSel->getString("descs");}else {// 没有对应的申请记录,直接 rollback 并返回失败con->_con->rollback();return false;}}{// 2. 执行真正的更新std::unique_ptr<sql::PreparedStatement> updStmt(con->_con->prepareStatement("UPDATE friend_apply ""SET status = 1 ""WHERE from_uid = ? AND to_uid = ?"));updStmt->setInt(1, to);updStmt->setInt(2, from);if (updStmt->executeUpdate() != 1) {// 更新行数不对,回滚con->_con->rollback();return false;}}{// 3. 准备第一个SQL语句, 插入认证方好友数据std::unique_ptr<sql::PreparedStatement> pstmt(con->_con->prepareStatement("INSERT IGNORE INTO friend(self_id, friend_id, back) ""VALUES (?, ?, ?) "));//反过来的申请时from,验证时topstmt->setInt(1, from); // from idpstmt->setInt(2, to);pstmt->setString(3, back_name);// 执行更新int rowAffected = pstmt->executeUpdate();if (rowAffected < 0) {con->_con->rollback();return false;}//准备第二个SQL语句,插入申请方好友数据std::unique_ptr<sql::PreparedStatement> pstmt2(con->_con->prepareStatement("INSERT IGNORE INTO friend(self_id, friend_id, back) ""VALUES (?, ?, ?) "));//反过来的申请时from,验证时topstmt2->setInt(1, to); // from idpstmt2->setInt(2, from);pstmt2->setString(3, reverse_back);// 执行更新int rowAffected2 = pstmt2->executeUpdate();if (rowAffected2 < 0) {con->_con->rollback();return false;}}// 4. 创建 chat_threadlong long threadId = 0;{std::unique_ptr<sql::PreparedStatement> threadStmt(con->_con->prepareStatement("INSERT INTO chat_thread (type, created_at) VALUES ('private', NOW());"));threadStmt->executeUpdate();std::unique_ptr<sql::Statement> stmt(con->_con->createStatement());std::unique_ptr<sql::ResultSet> rs(stmt->executeQuery("SELECT LAST_INSERT_ID()"));if (rs->next()) {threadId = rs->getInt64(1);}else {return false;}}// 5. 插入 private_chat{std::unique_ptr<sql::PreparedStatement> pcStmt(con->_con->prepareStatement("INSERT INTO private_chat(thread_id, user1_id, user2_id) VALUES (?, ?, ?)"));pcStmt->setInt64(1, threadId);pcStmt->setInt(2, from);pcStmt->setInt(3, to);if (pcStmt->executeUpdate() < 0) return false;}// 6. 可选:插入初始消息到 chat_messageif (apply_desc.empty() == false){std::unique_ptr<sql::PreparedStatement> msgStmt(con->_con->prepareStatement("INSERT INTO chat_message(thread_id, sender_id, recv_id, content,created_at, updated_at, status) VALUES (?, ?, ?, ?,NOW(),NOW(),?)"));msgStmt->setInt64(1, threadId);msgStmt->setInt(2, to);msgStmt->setInt(3, from);msgStmt->setString(4, apply_desc);msgStmt->setInt(5, 0);if (msgStmt->executeUpdate() < 0) { return false; }std::unique_ptr<sql::Statement> stmt(con->_con->createStatement());std::unique_ptr<sql::ResultSet> rs(stmt->executeQuery("SELECT LAST_INSERT_ID()"));if (rs->next()) {auto messageId = rs->getInt64(1);auto tx_data = std::make_shared<AddFriendMsg>();tx_data->set_sender_id(to);tx_data->set_msg_id(messageId);tx_data->set_msgcontent(apply_desc);tx_data->set_thread_id(threadId);tx_data->set_unique_id("");std::cout << "addfriend insert message success" << std::endl;chat_datas.push_back(tx_data);}else {return false;}}{std::unique_ptr<sql::PreparedStatement> msgStmt(con->_con->prepareStatement("INSERT INTO chat_message(thread_id, sender_id, recv_id, content, created_at, updated_at, status) VALUES (?, ?, ?, ?,NOW(),NOW(),?)"));msgStmt->setInt64(1, threadId);msgStmt->setInt(2, from);msgStmt->setInt(3, to);msgStmt->setString(4, "We are friends now!");msgStmt->setInt(5, 0);if (msgStmt->executeUpdate() < 0) { return false; }std::unique_ptr<sql::Statement> stmt(con->_con->createStatement());std::unique_ptr<sql::ResultSet> rs(stmt->executeQuery("SELECT LAST_INSERT_ID()"));if (rs->next()) {auto messageId = rs->getInt64(1);auto tx_data = std::make_shared<AddFriendMsg>();tx_data->set_sender_id(from);tx_data->set_msg_id(messageId);tx_data->set_msgcontent("We are friends now!");tx_data->set_thread_id(threadId);tx_data->set_unique_id("");chat_datas.push_back(tx_data);}else {return false;}}// 提交事务con->_con->commit();std::cout << "addfriend insert friends success" << std::endl;return true;}catch (sql::SQLException& e) {// 如果发生错误,回滚事务if (con) {con->_con->rollback();}std::cerr << "SQLException: " << e.what();std::cerr << " (MySQL error code: " << e.getErrorCode();std::cerr << ", SQLState: " << e.getSQLState() << " )" << std::endl;return false;}return true;}
服务器收到同意通知
B同意A的申请,此时B所在的服务器会将同意的通知发送到A所在的服务器
下面是A所在的服务器收到请求后,发送通知给A的逻辑
Status ChatServiceImpl::NotifyAuthFriend(ServerContext* context, const AuthFriendReq* request,AuthFriendRsp* reply) {//查找用户是否在本服务器auto touid = request->touid();auto fromuid = request->fromuid();auto session = UserMgr::GetInstance()->GetSession(touid);Defer defer([request, reply]() {reply->set_error(ErrorCodes::Success);reply->set_fromuid(request->fromuid());reply->set_touid(request->touid());});//用户不在内存中则直接返回if (session == nullptr) {return Status::OK;}//在内存中则直接发送通知对方Json::Value rtvalue;rtvalue["error"] = ErrorCodes::Success;rtvalue["fromuid"] = request->fromuid();rtvalue["touid"] = request->touid();std::string base_key = USER_BASE_INFO + std::to_string(fromuid);auto user_info = std::make_shared<UserInfo>();bool b_info = GetBaseInfo(base_key, fromuid, user_info);if (b_info) {rtvalue["name"] = user_info->name;rtvalue["nick"] = user_info->nick;rtvalue["icon"] = user_info->icon;rtvalue["sex"] = user_info->sex;}else {rtvalue["error"] = ErrorCodes::UidInvalid;}for(auto& msg : request->textmsgs()) {Json::Value chat;chat["sender"] = msg.sender_id();chat["msg_id"] = msg.msg_id();chat["thread_id"] = msg.thread_id();chat["unique_id"] = msg.unique_id();chat["msg_content"] = msg.msgcontent();rtvalue["chat_datas"].append(chat);}std::string return_str = rtvalue.toStyledString();session->Send(return_str, ID_NOTIFY_AUTH_FRIEND_REQ);return Status::OK;}
客户端收到好友同意回复
当A申请B加好友,B同意后,服务器会回复给B消息,这样B的客户端要处理同意的回包
_handlers.insert(ID_AUTH_FRIEND_RSP, [this](ReqId id, int len, QByteArray data) {Q_UNUSED(len);qDebug() << "handle id is " << id << " data is " << data;// 将QByteArray转换为QJsonDocumentQJsonDocument jsonDoc = QJsonDocument::fromJson(data);// 检查转换是否成功if (jsonDoc.isNull()) {qDebug() << "Failed to create QJsonDocument.";return;}QJsonObject jsonObj = jsonDoc.object();if (!jsonObj.contains("error")) {int err = ErrorCodes::ERR_JSON;qDebug() << "Auth Friend Failed, err is Json Parse Err" << err;return;}int err = jsonObj["error"].toInt();if (err != ErrorCodes::SUCCESS) {qDebug() << "Auth Friend Failed, err is " << err;return;}auto name = jsonObj["name"].toString();auto nick = jsonObj["nick"].toString();auto icon = jsonObj["icon"].toString();auto sex = jsonObj["sex"].toInt();auto uid = jsonObj["uid"].toInt();std::vector<std::shared_ptr<TextChatData>> chat_datas;for (const QJsonValue& data : jsonObj["chat_datas"].toArray()) {auto send_uid = data["sender"].toInt();auto msg_id = data["msg_id"].toInt();auto thread_id = data["thread_id"].toInt();auto unique_id = data["unique_id"].toInt();auto msg_content = data["msg_content"].toString();auto chat_data = std::make_shared<TextChatData>(msg_id, thread_id, ChatFormType::PRIVATE,ChatMsgType::TEXT, msg_content, send_uid);chat_datas.push_back(chat_data);}auto rsp = std::make_shared<AuthRsp>(uid, name, nick, icon, sex);rsp->SetChatDatas(chat_datas);emit sig_auth_rsp(rsp);qDebug() << "Auth Friend Success " ;});
界面和好友状态更新
void ChatDialog::slot_auth_rsp(std::shared_ptr<AuthRsp> auth_rsp){qDebug() << "receive slot_auth_rsp uid is " << auth_rsp->_uid<< " name is " << auth_rsp->_name << " nick is " << auth_rsp->_nick;//判断如果已经是好友则跳过auto bfriend = UserMgr::GetInstance()->CheckFriendById(auth_rsp->_uid);if (bfriend) {return;}UserMgr::GetInstance()->AddFriend(auth_rsp);int randomValue = QRandomGenerator::global()->bounded(100); // 生成0到99之间的随机整数int str_i = randomValue % strs.size();int head_i = randomValue % heads.size();int name_i = randomValue % names.size();auto* chat_user_wid = new ChatUserWid();auto chat_thread_data = std::make_shared<ChatThreadData>(auth_rsp->_uid, auth_rsp->_thread_id, 0);UserMgr::GetInstance()->AddChatThreadData(chat_thread_data, auth_rsp->_uid);for (auto& chat_msg : auth_rsp->_chat_datas) {chat_thread_data->AppendMsg(chat_msg->GetMsgId(), chat_msg);}chat_user_wid->SetChatData(chat_thread_data);QListWidgetItem* item = new QListWidgetItem;//qDebug()<<"chat_user_wid sizeHint is " << chat_user_wid->sizeHint();item->setSizeHint(chat_user_wid->sizeHint());ui->chat_user_list->insertItem(0, item);ui->chat_user_list->setItemWidget(item, chat_user_wid);_chat_thread_items.insert(auth_rsp->_thread_id, item);}
客户端收到好友同意通知
A加B为好友,B同意后,服务器通知A,以下为A收到通知后的处理
_handlers.insert(ID_NOTIFY_AUTH_FRIEND_REQ, [this](ReqId id, int len, QByteArray data) {Q_UNUSED(len);qDebug() << "handle id is " << id << " data is " << data;// 将QByteArray转换为QJsonDocumentQJsonDocument jsonDoc = QJsonDocument::fromJson(data);// 检查转换是否成功if (jsonDoc.isNull()) {qDebug() << "Failed to create QJsonDocument.";return;}QJsonObject jsonObj = jsonDoc.object();if (!jsonObj.contains("error")) {int err = ErrorCodes::ERR_JSON;qDebug() << "Auth Friend Failed, err is " << err;return;}int err = jsonObj["error"].toInt();if (err != ErrorCodes::SUCCESS) {qDebug() << "Auth Friend Failed, err is " << err;return;}int from_uid = jsonObj["fromuid"].toInt();QString name = jsonObj["name"].toString();QString nick = jsonObj["nick"].toString();QString icon = jsonObj["icon"].toString();int sex = jsonObj["sex"].toInt();std::vector<std::shared_ptr<TextChatData>> chat_datas;for (const QJsonValue& data : jsonObj["chat_datas"].toArray()) {auto send_uid = data["sender"].toInt();auto msg_id = data["msg_id"].toInt();auto thread_id = data["thread_id"].toInt();auto unique_id = data["unique_id"].toInt();auto msg_content = data["msg_content"].toString();auto chat_data = std::make_shared<TextChatData>(msg_id, thread_id, ChatFormType::PRIVATE,ChatMsgType::TEXT, msg_content, send_uid);chat_datas.push_back(chat_data);}auto auth_info = std::make_shared<AuthInfo>(from_uid,name,nick, icon, sex);auth_info->SetChatDatas(chat_datas);emit sig_add_auth_friend(auth_info);});
界面添加好友会话状态更新
void ChatDialog::slot_add_auth_friend(std::shared_ptr<AuthInfo> auth_info) {qDebug() << "receive slot_add_auth__friend uid is " << auth_info->_uid<< " name is " << auth_info->_name << " nick is " << auth_info->_nick;//判断如果已经是好友则跳过auto bfriend = UserMgr::GetInstance()->CheckFriendById(auth_info->_uid);if (bfriend) {return;}UserMgr::GetInstance()->AddFriend(auth_info);auto* chat_user_wid = new ChatUserWid();auto chat_thread_data = std::make_shared<ChatThreadData>(auth_info->_uid, auth_info->_thread_id, 0);UserMgr::GetInstance()->AddChatThreadData(chat_thread_data, auth_info->_uid);for (auto& chat_msg : auth_info->_chat_datas) {chat_thread_data->AppendMsg(chat_msg->GetMsgId(), chat_msg);}chat_user_wid->SetChatData(chat_thread_data);QListWidgetItem* item = new QListWidgetItem;//qDebug()<<"chat_user_wid sizeHint is " << chat_user_wid->sizeHint();item->setSizeHint(chat_user_wid->sizeHint());ui->chat_user_list->insertItem(0, item);ui->chat_user_list->setItemWidget(item, chat_user_wid);_chat_thread_items.insert(auth_info->_thread_id, item);}
效果展示

GRPC同步认证消息认证
分布式认证就是让两个客户端分别登录不同的服务器,注意因为我们修改了连接检测和记录的方式,改为通过心跳定时更新,为了避免两个客户端同时登录到一个服务器的情况,可以在一个客户端登录服务器后,另一个客户端延迟一分钟登录。
同时要注意StatusServer要将getChatServer这个函数打开
ChatServer StatusServiceImpl::getChatServer() {std::lock_guard<std::mutex> guard(_server_mtx);auto minServer = _servers.begin()->second;auto count_str = RedisMgr::GetInstance()->HGet(LOGIN_COUNT, minServer.name);if (count_str.empty()) {//不存在则默认设置为最大minServer.con_count = INT_MAX;}else {minServer.con_count = std::stoi(count_str);}// 使用范围基于for循环for ( auto& server : _servers) {if (server.second.name == minServer.name) {continue;}auto count_str = RedisMgr::GetInstance()->HGet(LOGIN_COUNT, server.second.name);if (count_str.empty()) {server.second.con_count = INT_MAX;}else {server.second.con_count = std::stoi(count_str);}if (server.second.con_count < minServer.con_count) {minServer = server.second;}}return minServer;}
两个客户端登录后,确保后台看到两个用户登录不同的Server
1019用户登录Server2

1002用户登录Server1

这样二者都登陆成功了,然后任意一方向对方发送添加好友请求,另一方同意,看到的效果如下

聊天记录增量加载
客户端逻辑
聊天记录增量加载,可以在加载完聊天会话列表后,继续分页加载聊天信息。
因为qt支持信号和槽函数机制,所以我们可以加载完会话列表后发送, 在UserMgr中设置一个当前加载的_cur_load_chat_index用来记录将要加载的会话消息。
我们对外暴露两个接口,分别是获取当前要加载会话信息,和下次加载的会话信息
std::shared_ptr<ChatThreadData> UserMgr::GetCurLoadData(){if (_cur_load_chat_index >= _chat_thread_ids.size()) {return nullptr;}auto iter = _chat_map.find(_chat_thread_ids[_cur_load_chat_index]);if (iter == _chat_map.end()) {return nullptr;}return iter.value();}
然后封装加载消息的函数
void ChatDialog::loadChatMsg() {//发送聊天记录请求_cur_load_chat = UserMgr::GetInstance()->GetCurLoadData();if (_cur_load_chat == nullptr) {return;}showLoadingDlg(true);//发送请求给服务器//发送请求逻辑QJsonObject jsonObj;jsonObj["thread_id"] = _cur_load_chat->GetThreadId();jsonObj["message_id"] = _cur_load_chat->GetLastMsgId();QJsonDocument doc(jsonObj);QByteArray jsonData = doc.toJson(QJsonDocument::Compact);//发送tcp请求给chat serveremit TcpMgr::GetInstance()->sig_send_data(ReqId::ID_LOAD_CHAT_MSG_REQ, jsonData);}
接下来我们在加载完会话列表后调用这个函数
void ChatDialog::slot_load_chat_thread(bool load_more, int last_thread_id,std::vector<std::shared_ptr<ChatThreadInfo>> chat_threads){for (auto& cti : chat_threads) {//先处理单聊,群聊跳过,以后添加if (cti->_type == "group") {continue;}auto uid = UserMgr::GetInstance()->GetUid();auto other_uid = 0;if (uid == cti->_user1_id) {other_uid = cti->_user2_id;}else {other_uid = cti->_user1_id;}auto chat_thread_data = std::make_shared<ChatThreadData>(other_uid, cti->_thread_id, 0);UserMgr::GetInstance()->AddChatThreadData(chat_thread_data, other_uid);auto* chat_user_wid = new ChatUserWid();chat_user_wid->SetChatData(chat_thread_data);QListWidgetItem* item = new QListWidgetItem;//qDebug()<<"chat_user_wid sizeHint is " << chat_user_wid->sizeHint();item->setSizeHint(chat_user_wid->sizeHint());ui->chat_user_list->addItem(item);ui->chat_user_list->setItemWidget(item, chat_user_wid);_chat_thread_items.insert(cti->_thread_id, item);}UserMgr::GetInstance()->SetLastChatThreadId(last_thread_id);if (load_more) {//发送请求逻辑QJsonObject jsonObj;auto uid = UserMgr::GetInstance()->GetUid();jsonObj["uid"] = uid;jsonObj["thread_id"] = last_thread_id;QJsonDocument doc(jsonObj);QByteArray jsonData = doc.toJson(QJsonDocument::Compact);//发送tcp请求给chat serveremit TcpMgr::GetInstance()->sig_send_data(ReqId::ID_LOAD_CHAT_THREAD_REQ, jsonData);return;}showLoadingDlg(false);//继续加载聊天数据loadChatMsg();}
在收到服务器回复时处理消息
_handlers.insert(ID_LOAD_CHAT_MSG_RSP, [this](ReqId id, int len, QByteArray data) {Q_UNUSED(len);qDebug() << "handle id is " << id << " data is " << data;// 将QByteArray转换为QJsonDocumentQJsonDocument jsonDoc = QJsonDocument::fromJson(data);// 检查转换是否成功if (jsonDoc.isNull()) {qDebug() << "Failed to create QJsonDocument.";return;}QJsonObject jsonObj = jsonDoc.object();if (!jsonObj.contains("error")) {int err = ErrorCodes::ERR_JSON;qDebug() << "parse create private chat json parse failed " << err;return;}int err = jsonObj["error"].toInt();if (err != ErrorCodes::SUCCESS) {qDebug() << "get create private chat failed, error is " << err;return;}qDebug() << "Receive create private chat rsp Success";int thread_id = jsonObj["thread_id"].toInt();int last_msg_id = jsonObj["last_message_id"].toInt();bool load_more = jsonObj["load_more"].toBool();std::vector<std::shared_ptr<TextChatData>> chat_datas;for (const QJsonValue& data : jsonObj["chat_datas"].toArray()) {auto send_uid = data["sender"].toInt();auto msg_id = data["msg_id"].toInt();auto thread_id = data["thread_id"].toInt();auto unique_id = data["unique_id"].toInt();auto msg_content = data["msg_content"].toString();QString chat_time = data["chat_time"].toString();auto chat_data = std::make_shared<TextChatData>(msg_id, thread_id, ChatFormType::PRIVATE,ChatMsgType::TEXT, msg_content, send_uid, chat_time);chat_datas.push_back(chat_data);}//发送信号通知界面emit sig_load_chat_msg(thread_id, last_msg_id, load_more, chat_datas);});
界面收到sig_load_chat_msg后添加消息,并且判断是否还有剩余消息加载
void ChatDialog::slot_load_chat_msg(int thread_id, int msg_id, bool load_more, std::vector<std::shared_ptr<TextChatData>> msglists){_cur_load_chat->SetLastMsgId(msg_id);//加载聊天信息for (auto& chat_msg : msglists) {_cur_load_chat->AppendMsg(chat_msg->GetMsgId(), chat_msg);}//还有未加载完的消息,就继续加载if (load_more) {//发送请求给服务器//发送请求逻辑QJsonObject jsonObj;jsonObj["thread_id"] = _cur_load_chat->GetThreadId();jsonObj["message_id"] = _cur_load_chat->GetLastMsgId();QJsonDocument doc(jsonObj);QByteArray jsonData = doc.toJson(QJsonDocument::Compact);//发送tcp请求给chat serveremit TcpMgr::GetInstance()->sig_send_data(ReqId::ID_LOAD_CHAT_MSG_REQ, jsonData);return;}//获取下一个chat_thread_cur_load_chat = UserMgr::GetInstance()->GetNextLoadData();//都加载完了if(!_cur_load_chat){//更新聊天界面信息SetSelectChatItem();SetSelectChatPage();showLoadingDlg(false);return;}//继续加载下一个聊天//发送请求给服务器//发送请求逻辑QJsonObject jsonObj;jsonObj["thread_id"] = _cur_load_chat->GetThreadId();jsonObj["message_id"] = _cur_load_chat->GetLastMsgId();QJsonDocument doc(jsonObj);QByteArray jsonData = doc.toJson(QJsonDocument::Compact);//发送tcp请求给chat serveremit TcpMgr::GetInstance()->sig_send_data(ReqId::ID_LOAD_CHAT_MSG_REQ, jsonData);}
服务器逻辑
注册消息
_fun_callbacks[ID_LOAD_CHAT_MSG_REQ] = std::bind(&LogicSystem::LoadChatMsg, this,placeholders::_1, placeholders::_2, placeholders::_3);
具体逻辑处理
void LogicSystem::LoadChatMsg(std::shared_ptr<CSession> session,const short& msg_id, const string& msg_data) {Json::Reader reader;Json::Value root;reader.parse(msg_data, root);auto thread_id = root["thread_id"].asInt();auto message_id = root["message_id"].asInt();Json::Value rtvalue;rtvalue["error"] = ErrorCodes::Success;rtvalue["thread_id"] = thread_id;Defer defer([this, &rtvalue, session]() {std::string return_str = rtvalue.toStyledString();session->Send(return_str, ID_LOAD_CHAT_MSG_RSP);});int page_size = 10;std::shared_ptr<PageResult> res = MysqlMgr::GetInstance()->LoadChatMsg(thread_id, message_id, page_size);if (!res) {rtvalue["error"] = ErrorCodes::LOAD_CHAT_FAILED;return;}rtvalue["last_message_id"] = res->next_cursor;rtvalue["load_more"] = res->load_more;for (auto& chat : res->messages) {Json::Value chat_data;chat_data["sender"] = chat.sender_id;chat_data["msg_id"] = chat.message_id;chat_data["thread_id"] = chat.thread_id;chat_data["unique_id"] = 0;chat_data["msg_content"] = chat.content;chat_data["chat_time"] = chat.chat_time;rtvalue["chat_datas"].append(chat_data);}}
数据库新增根据thread_id和message_id返回分页数据
std::shared_ptr<PageResult> MysqlMgr::LoadChatMsg(int threadId, int lastId, int pageSize){return _dao.LoadChatMsg(threadId, lastId, pageSize);}
具体在MysqlDao层面实现分页加载
std::shared_ptr<PageResult> MysqlDao::LoadChatMsg(int thread_id, int last_message_id, int page_size){auto con = pool_->getConnection();if (!con) {return nullptr;}Defer defer([this, &con]() {pool_->returnConnection(std::move(con));});auto& conn = con->_con;try {auto page_res = std::make_shared<PageResult>();page_res->load_more = false;page_res->next_cursor = last_message_id;// SQL:多取一条,用于判断是否还有更多const std::string sql = R"(SELECT message_id, thread_id, sender_id, recv_id, content,created_at, updated_at, statusFROM chat_messageWHERE thread_id = ?AND message_id > ?ORDER BY message_id ASCLIMIT ?)";uint32_t fetch_limit = page_size + 1;auto pstmt = std::unique_ptr<sql::PreparedStatement>(conn->prepareStatement(sql));pstmt->setInt(1, thread_id);pstmt->setInt(2, last_message_id);pstmt->setInt(3, fetch_limit);auto rs = std::unique_ptr<sql::ResultSet>(pstmt->executeQuery());// 读取 fetch_limit 条记录while (rs->next()) {ChatMessage msg;msg.message_id = rs->getUInt64("message_id");msg.thread_id = rs->getUInt64("thread_id");msg.sender_id = rs->getUInt64("sender_id");msg.recv_id = rs->getUInt64("recv_id");msg.content = rs->getString("content");msg.chat_time = rs->getString("created_at");msg.status = rs->getInt("status");page_res->messages.push_back(std::move(msg));}return page_res;}catch (sql::SQLException& e) {std::cerr << "SQLException: " << e.what() << std::endl;conn->rollback();return nullptr;}return nullptr;}
效果展示

发送和接收消息同步
客户端缓存发送消息
我们需要在客户端缓存一下发送的消息,等到服务器回复后再将收到的消息放入ChatThreadData中。
为了标识消息的唯一性,我们需要在客户端生成唯一unique_id,构造成ChatTextData先放到ChatThreadData中存起来。
//已发送的消息,还未收到回应的。QMap<QString, std::shared_ptr<TextChatData>> _msg_unrsp_map;
实现发送逻辑
void ChatPage::on_send_btn_clicked(){if (_chat_data == nullptr) {qDebug() << "friend_info is empty";return;}auto user_info = UserMgr::GetInstance()->GetUserInfo();auto pTextEdit = ui->chatEdit;ChatRole role = ChatRole::Self;QString userName = user_info->_name;QString userIcon = user_info->_icon;const QVector<MsgInfo>& msgList = pTextEdit->getMsgList();QJsonObject textObj;QJsonArray textArray;int txt_size = 0;auto thread_id = _chat_data->GetThreadId();for(int i=0; i<msgList.size(); ++i){//消息内容长度不合规就跳过if(msgList[i].content.length() > 1024){continue;}QString type = msgList[i].msgFlag;ChatItemBase *pChatItem = new ChatItemBase(role);pChatItem->setUserName(userName);pChatItem->setUserIcon(QPixmap(userIcon));QWidget *pBubble = nullptr;//生成唯一idQUuid uuid = QUuid::createUuid();//转为字符串QString uuidString = uuid.toString();if(type == "text"){pBubble = new TextBubble(role, msgList[i].content);if(txt_size + msgList[i].content.length()> 1024){textObj["fromuid"] = user_info->_uid;textObj["touid"] = _chat_data->GetOtherId();textObj["thread_id"] = thread_id;textObj["text_array"] = textArray;QJsonDocument doc(textObj);QByteArray jsonData = doc.toJson(QJsonDocument::Compact);//发送并清空之前累计的文本列表txt_size = 0;textArray = QJsonArray();textObj = QJsonObject();//发送tcp请求给chat serveremit TcpMgr::GetInstance()->sig_send_data(ReqId::ID_TEXT_CHAT_MSG_REQ, jsonData);}//将bubble和uid绑定,以后可以等网络返回消息后设置是否送达//_bubble_map[uuidString] = pBubble;txt_size += msgList[i].content.length();QJsonObject obj;QByteArray utf8Message = msgList[i].content.toUtf8();auto content = QString::fromUtf8(utf8Message);obj["content"] = content;obj["unique_id"] = uuidString;textArray.append(obj);//todo... 注意,此处先按私聊处理auto txt_msg = std::make_shared<TextChatData>(uuidString, thread_id, ChatFormType::PRIVATE,ChatMsgType::TEXT, content, user_info->_uid, 0);//将未回复的消息加入到未回复列表中,以便后续处理_chat_data->AppendUnRspMsg(uuidString,txt_msg);}else if(type == "image"){pBubble = new PictureBubble(QPixmap(msgList[i].content) , role);}else if(type == "file"){}//发送消息if(pBubble != nullptr){pChatItem->setWidget(pBubble);pChatItem->setStatus(0);ui->chat_data_list->appendChatItem(pChatItem);_unrsp_item_map[uuidString] = pChatItem;}}qDebug() << "textArray is " << textArray ;//发送给服务器textObj["text_array"] = textArray;textObj["fromuid"] = user_info->_uid;textObj["touid"] = _chat_data->GetOtherId();textObj["thread_id"] = thread_id;QJsonDocument doc(textObj);QByteArray jsonData = doc.toJson(QJsonDocument::Compact);//发送并清空之前累计的文本列表txt_size = 0;textArray = QJsonArray();textObj = QJsonObject();//发送tcp请求给chat serveremit TcpMgr::GetInstance()->sig_send_data(ReqId::ID_TEXT_CHAT_MSG_REQ, jsonData);}
相比于之前,我们在json中增加了unique_id和thread_id字段,服务器收到后根据thread_id生成消息放入到数据库,并携带unique_id回传给客户端。
客户端缓存消息放入UserMgr中
//将未回复的消息加入到未回复列表中,以便后续处理_chat_data->AppendUnRspMsg(uuidString,txt_msg);
此外,客户端需要设置聊天文本状态为未回复
pChatItem->setStatus(0);
切换聊天不丢失状态
如果此时切换页面,再切回来,也要保证之前服务器未回复的消息能重新加载
切换的逻辑在
void ChatDialog::SetSelectChatPage(int thread_id){if (ui->chat_user_list->count() <= 0) {return;}if (thread_id == 0) {auto item = ui->chat_user_list->item(0);//转为widgetQWidget* widget = ui->chat_user_list->itemWidget(item);if (!widget) {return;}auto con_item = qobject_cast<ChatUserWid*>(widget);if (!con_item) {return;}//设置信息auto chat_data = con_item->GetChatData();ui->chat_page->SetChatData(chat_data);return;}auto find_iter = _chat_thread_items.find(thread_id);if (find_iter == _chat_thread_items.end()) {return;}//转为widgetQWidget* widget = ui->chat_user_list->itemWidget(find_iter.value());if (!widget) {return;}//判断转化为自定义的widget// 对自定义widget进行操作, 将item 转化为基类ListItemBaseListItemBase* customItem = qobject_cast<ListItemBase*>(widget);if (!customItem) {qDebug() << "qobject_cast<ListItemBase*>(widget) is nullptr";return;}auto itemType = customItem->GetItemType();if (itemType == CHAT_USER_ITEM) {auto con_item = qobject_cast<ChatUserWid*>(customItem);if (!con_item) {return;}//设置信息auto chat_data = con_item->GetChatData();ui->chat_page->SetChatData(chat_data);return;}}
其中SetChatData是设置页面聊天信息列表
void ChatPage::SetChatData(std::shared_ptr<ChatThreadData> chat_data) {_chat_data = chat_data;auto other_id = _chat_data->GetOtherId();if(other_id == 0) {//说明是群聊ui->title_lb->setText(_chat_data->GetGroupName());//todo...加载群聊信息和成员信息return;}//私聊auto friend_info = UserMgr::GetInstance()->GetFriendById(other_id);if (friend_info == nullptr) {return;}ui->title_lb->setText(friend_info->_name);ui->chat_data_list->removeAllItem();_unrsp_item_map.clear();for(auto & msg : chat_data->GetMsgMapRef()){AppendChatMsg(msg);}for (auto& msg : chat_data->GetMsgUnRspRef()) {AppendChatMsg(msg);}}
这样我们可以加载服务器已经回复的和服务器未回复的。保证完全,具体添加逻辑
void ChatPage::AppendChatMsg(std::shared_ptr<ChatDataBase> msg){auto self_info = UserMgr::GetInstance()->GetUserInfo();ChatRole role;if (msg->GetSendUid() == self_info->_uid) {role = ChatRole::Self;ChatItemBase* pChatItem = new ChatItemBase(role);pChatItem->setUserName(self_info->_name);pChatItem->setUserIcon(QPixmap(self_info->_icon));QWidget* pBubble = nullptr;if (msg->GetMsgType() == ChatMsgType::TEXT) {pBubble = new TextBubble(role, msg->GetMsgContent());}pChatItem->setWidget(pBubble);auto status = msg->GetStatus();pChatItem->setStatus(status);ui->chat_data_list->appendChatItem(pChatItem);if (status == 0) {_unrsp_item_map[msg->GetUniqueId()] = pChatItem;}}else {role = ChatRole::Other;ChatItemBase* pChatItem = new ChatItemBase(role);auto friend_info = UserMgr::GetInstance()->GetFriendById(msg->GetSendUid());if (friend_info == nullptr) {return;}pChatItem->setUserName(friend_info->_name);pChatItem->setUserIcon(QPixmap(friend_info->_icon));QWidget* pBubble = nullptr;if (msg->GetMsgType() == ChatMsgType::TEXT) {pBubble = new TextBubble(role, msg->GetMsgContent());}pChatItem->setWidget(pBubble);auto status = msg->GetStatus();pChatItem->setStatus(status);ui->chat_data_list->appendChatItem(pChatItem);if (status == 0) {_unrsp_item_map[msg->GetUniqueId()] = pChatItem;}}}
其中_unrsp_item_map是聊天页面上的服务器未回复的聊天记录的,每次切换页面清掉,再重新创建加载。
这么做效率不高,后期给大家介绍Module View Delegate模式去优化聊天数据加载和管理。
这里先把持久化存储功能先实现再说。
客户端收到服务器回复
收到服务器回复后,需要组织数据发送给ChatDialog界面,将未回复的消息更新为已回复。
_handlers.insert(ID_TEXT_CHAT_MSG_RSP, [this](ReqId id, int len, QByteArray data) {Q_UNUSED(len);qDebug() << "handle id is " << id << " data is " << data;// 将QByteArray转换为QJsonDocumentQJsonDocument jsonDoc = QJsonDocument::fromJson(data);// 检查转换是否成功if (jsonDoc.isNull()) {qDebug() << "Failed to create QJsonDocument.";return;}QJsonObject jsonObj = jsonDoc.object();if (!jsonObj.contains("error")) {int err = ErrorCodes::ERR_JSON;qDebug() << "Chat Msg Rsp Failed, err is Json Parse Err" << err;return;}int err = jsonObj["error"].toInt();if (err != ErrorCodes::SUCCESS) {qDebug() << "Chat Msg Rsp Failed, err is " << err;return;}qDebug() << "Receive Text Chat Rsp Success " ;//收到消息后转发给页面auto thread_id = jsonObj["thread_id"].toInt();auto sender = jsonObj["fromuid"].toInt();std::vector<std::shared_ptr<TextChatData>> chat_datas;for (const QJsonValue& data : jsonObj["chat_datas"].toArray()) {auto msg_id = data["message_id"].toInt();auto unique_id = data["unique_id"].toString();auto msg_content = data["content"].toString();QString chat_time = data["chat_time"].toString();int status = data["status"].toInt();auto chat_data = std::make_shared<TextChatData>(msg_id,unique_id, thread_id, ChatFormType::PRIVATE,ChatMsgType::TEXT, msg_content, sender, status, chat_time);chat_datas.push_back(chat_data);}//发送信号通知界面emit sig_chat_msg_rsp(thread_id, chat_datas);});
将信号和槽函数连接
connect(TcpMgr::GetInstance().get(), &TcpMgr::sig_chat_msg_rsp, this, &ChatDialog::slot_add_chat_msg);
会触发槽函数, 槽函数内部检测消息,将消息存储到已经回复列表中。
void ChatDialog::slot_add_chat_msg(int thread_id, std::vector<std::shared_ptr<TextChatData>> msglists) {auto chat_data = UserMgr::GetInstance()->GetChatThreadByThreadId(thread_id);if (chat_data == nullptr) {return;}//将消息放入数据中管理for (auto& msg : msglists) {chat_data->MoveMsg(msg);if (_cur_chat_thread_id != thread_id) {continue;}//更新聊天界面信息ui->chat_page->UpdateChatStatus(msg->GetUniqueId(),msg->GetStatus());}}
转移逻辑, 其实就是去未回复中查找对应消息,如果有就移动到已回复列表,如果没有就直接将回复的消息插入已回复列表中
void ChatThreadData::MoveMsg(std::shared_ptr<ChatDataBase> msg) {auto iter = _msg_unrsp_map.find(msg->GetUniqueId());if (iter == _msg_unrsp_map.end()) {AddMsg(msg);return;}iter.value()->SetStatus(2);AddMsg(iter.value());_msg_unrsp_map.erase(iter);}void ChatThreadData::AddMsg(std::shared_ptr<ChatDataBase> msg){_msg_map.insert(msg->GetMsgId(), msg);_last_msg = msg->GetMsgContent();_last_msg_id = msg->GetMsgId();}
对端收到消息通知
客户端对端收到通知消息
_handlers.insert(ID_NOTIFY_TEXT_CHAT_MSG_REQ, [this](ReqId id, int len, QByteArray data) {Q_UNUSED(len);qDebug() << "handle id is " << id << " data is " << data;// 将QByteArray转换为QJsonDocumentQJsonDocument jsonDoc = QJsonDocument::fromJson(data);// 检查转换是否成功if (jsonDoc.isNull()) {qDebug() << "Failed to create QJsonDocument.";return;}QJsonObject jsonObj = jsonDoc.object();if (!jsonObj.contains("error")) {int err = ErrorCodes::ERR_JSON;qDebug() << "Notify Chat Msg Failed, err is Json Parse Err" << err;return;}int err = jsonObj["error"].toInt();if (err != ErrorCodes::SUCCESS) {qDebug() << "Notify Chat Msg Failed, err is " << err;return;}qDebug() << "Receive Text Chat Notify Success " ;//收到消息后转发给页面auto thread_id = jsonObj["thread_id"].toInt();auto sender = jsonObj["fromuid"].toInt();std::vector<std::shared_ptr<TextChatData>> chat_datas;for (const QJsonValue& data : jsonObj["chat_datas"].toArray()) {auto msg_id = data["message_id"].toInt();auto unique_id = data["unique_id"].toString();auto msg_content = data["content"].toString();QString chat_time = data["chat_time"].toString();int status = data["status"].toInt();auto chat_data = std::make_shared<TextChatData>(msg_id, unique_id, thread_id, ChatFormType::PRIVATE,ChatMsgType::TEXT, msg_content, sender, status, chat_time);chat_datas.push_back(chat_data);}emit sig_text_chat_msg(chat_datas);});
这个消息连接槽函数
//连接对端消息通知connect(TcpMgr::GetInstance().get(), &TcpMgr::sig_text_chat_msg,this, &ChatDialog::slot_text_chat_msg);
因为被通知,可能此时不在对应的会话中
void ChatDialog::slot_text_chat_msg(std::vector<std::shared_ptr<TextChatData>> msglists){for (auto& msg : msglists) {//更新数据auto thread_id = msg->GetThreadId();auto thread_data = UserMgr::GetInstance()->GetChatThreadByThreadId(thread_id);thread_data->AddMsg(msg);if (_cur_chat_thread_id != thread_id) {continue;}ui->chat_page->AppendChatMsg(msg);}}
服务器逻辑
服务器在收到聊天消息后要将消息入库,并且判断对方是否通服,如果不是一个服务器,则用grpc通知对方所在的服务器,再通过对方服务器的Session通知对方。
如果是同一个服务器,则直接通过Session通知对方
void LogicSystem::DealChatTextMsg(std::shared_ptr<CSession> session, const short& msg_id, const string& msg_data) {Json::Reader reader;Json::Value root;reader.parse(msg_data, root);auto uid = root["fromuid"].asInt();auto touid = root["touid"].asInt();const Json::Value arrays = root["text_array"];Json::Value rtvalue;rtvalue["error"] = ErrorCodes::Success;rtvalue["fromuid"] = uid;rtvalue["touid"] = touid;auto thread_id = root["thread_id"].asInt();rtvalue["thread_id"] = thread_id;std::vector<std::shared_ptr<ChatMessage>> chat_datas;auto timestamp = getCurrentTimestamp();for (const auto& txt_obj : arrays) {auto content = txt_obj["content"].asString();auto unique_id = txt_obj["unique_id"].asString();std::cout << "content is " << content << std::endl;std::cout << "unique_id is " << unique_id << std::endl;auto chat_msg = std::make_shared<ChatMessage>();chat_msg->chat_time = timestamp;chat_msg->sender_id = uid;chat_msg->recv_id = touid;chat_msg->unique_id = unique_id;chat_msg->thread_id = thread_id;chat_msg->content = content;chat_msg->status = 2;chat_datas.push_back(chat_msg);}//插入数据库MysqlMgr::GetInstance()->AddChatMsg(chat_datas);for (const auto& chat_data : chat_datas) {Json::Value chat_msg;chat_msg["message_id"] = chat_data->message_id;chat_msg["unique_id"] = chat_data->unique_id;chat_msg["content"] = chat_data->content;chat_msg["status"] = chat_data->status;chat_msg["chat_time"] = chat_data->chat_time;rtvalue["chat_datas"].append(chat_msg);}Defer defer([this, &rtvalue, session]() {std::string return_str = rtvalue.toStyledString();session->Send(return_str, ID_TEXT_CHAT_MSG_RSP);});//查询redis 查找touid对应的server ipauto to_str = std::to_string(touid);auto to_ip_key = USERIPPREFIX + to_str;std::string to_ip_value = "";bool b_ip = RedisMgr::GetInstance()->Get(to_ip_key, to_ip_value);if (!b_ip) {return;}auto& cfg = ConfigMgr::Inst();auto self_name = cfg["SelfServer"]["Name"];//直接通知对方有认证通过消息if (to_ip_value == self_name) {auto session = UserMgr::GetInstance()->GetSession(touid);if (session) {//在内存中则直接发送通知对方std::string return_str = rtvalue.toStyledString();session->Send(return_str, ID_NOTIFY_TEXT_CHAT_MSG_REQ);}return ;}TextChatMsgReq text_msg_req;text_msg_req.set_fromuid(uid);text_msg_req.set_touid(touid);text_msg_req.set_thread_id(thread_id);for (const auto& chat_data : chat_datas) {auto *text_msg = text_msg_req.add_textmsgs();text_msg->set_unique_id(chat_data->unique_id);text_msg->set_msgcontent(chat_data->content);text_msg->set_msg_id(chat_data->message_id);text_msg->set_chat_time(chat_data->chat_time);}//发送通知 todo...ChatGrpcClient::GetInstance()->NotifyTextChatMsg(to_ip_value, text_msg_req, rtvalue);}
数据库处理
bool MysqlMgr::AddChatMsg(std::vector<std::shared_ptr<ChatMessage>>& chat_datas) {return _dao.AddChatMsg(chat_datas);}
Dao层做了详细的数据库操作
bool MysqlDao::AddChatMsg(std::vector<std::shared_ptr<ChatMessage>>& chat_datas) {auto con = pool_->getConnection();if (!con) {return false;}Defer defer([this, &con]() {pool_->returnConnection(std::move(con));});auto& conn = con->_con;try {//关闭自动提交,以手动管理事务conn->setAutoCommit(false);auto pstmt = std::unique_ptr<sql::PreparedStatement>(conn->prepareStatement("INSERT INTO chat_message ""(thread_id, sender_id, recv_id, content, created_at, updated_at, status) ""VALUES (?, ?, ?, ?, ?, ?, ?)"));for (auto& msg : chat_datas) {// 普通字段pstmt->setUInt64(1, msg->thread_id);pstmt->setUInt64(2, msg->sender_id);pstmt->setUInt64(3, msg->recv_id);pstmt->setString(4, msg->content);pstmt->setString(5, msg->chat_time); // created_atpstmt->setString(6, msg->chat_time); // updated_atpstmt->setInt(7, msg->status);pstmt->executeUpdate();// 2. 取 LAST_INSERT_ID()std::unique_ptr<sql::Statement> keyStmt(conn->createStatement());std::unique_ptr<sql::ResultSet> rs(keyStmt->executeQuery("SELECT LAST_INSERT_ID()"));if (rs->next()) {msg->message_id = rs->getUInt64(1);}else {continue;}}conn->commit();return true;}catch (sql::SQLException& e) {std::cerr << "SQLException: " << e.what() << std::endl;conn->rollback();return false;}return true;}
grpc协议完善
message TextChatMsgReq {int32 fromuid = 1;int32 touid = 2;int32 thread_id = 3;repeated TextChatData textmsgs = 4;}message TextChatData{string unique_id = 1;int32 msg_id = 2;string msgcontent = 3;string chat_time = 4;}message TextChatMsgRsp {int32 error = 1;int32 fromuid = 2;int32 touid = 3;int32 thread_id = 4;repeated TextChatData textmsgs = 5;}
对端服务器处理
如果客户不在本服,则通知对端服务处理
Status ChatServiceImpl::NotifyTextChatMsg(::grpc::ServerContext* context,const TextChatMsgReq* request, TextChatMsgRsp* reply) {//查找用户是否在本服务器auto touid = request->touid();auto session = UserMgr::GetInstance()->GetSession(touid);reply->set_error(ErrorCodes::Success);//用户不在内存中则直接返回if (session == nullptr) {return Status::OK;}//在内存中则直接发送通知对方Json::Value rtvalue;rtvalue["error"] = ErrorCodes::Success;rtvalue["fromuid"] = request->fromuid();rtvalue["touid"] = request->touid();rtvalue["thread_id"] = request->thread_id();//将聊天数据组织为数组Json::Value text_array;for (auto& msg : request->textmsgs()) {Json::Value element;element["content"] = msg.msgcontent();element["unique_id"] = msg.unique_id();element["message_id"] = msg.msg_id();element["chat_time"] = msg.chat_time();text_array.append(element);}rtvalue["chat_datas"] = text_array;std::string return_str = rtvalue.toStyledString();session->Send(return_str, ID_NOTIFY_TEXT_CHAT_MSG_REQ);return Status::OK;}
验证效果

待完善部分
目前切换页面会将之前的记录删掉,这样每次重新加载会影响效率。
考虑以后采用多页缓存机制。
以后用Model View Delegate改造数据存储模式。
使用 Model/View 架构(QListView + QAbstractListModel + Delegate)
- 思路:不要手动往布局里插 widget,而是把 “一条聊天消息” 抽象成一个数据结构,存到自定义的
QAbstractListModel。 - 在右侧放一个
QListView,并为它写一个QStyledItemDelegate,统一负责绘制消息气泡、头像、时间等。 - 优点:Qt 的视图会自动做 行缓存(view recycling)、懒加载 等优化,数据量大也能保持流畅。
- 切换用户:只需
model->setMessages(userMessages)(内部发beginResetModel()/endResetModel()),视图自动刷新。
方案一:在同一个 Model 里 reset 数据
维护一个消息列表
class ChatModel : public QAbstractListModel {QVector<Message> m_msgs;public:// 必要的 override:rowCount(), data(), roleNames()...void setMessages(const QVector<Message>& msgs) {beginResetModel();m_msgs = msgs;endResetModel();}};
切换用户时
// 假设你有一个 ChatModel* model 和 QListView* listView// listView->setModel(model); // 已经在初始化时做过一次void onUserClicked(const User& u) {QVector<Message> msgs = loadMessagesFromDb(u.id);model->setMessages(msgs);// 可选:滚到最底部listView->scrollToBottom();}
优点
- 结构简单,一处 model,view 自动刷新。
- 不需要销毁或创建 widget,性能最佳。
方案二:每个用户一个 Model,切换指针
如果你希望把每个用户的数据和 model 分开管理,也可以为每个用户维护独立的 ChatModel:
QMap<UserId, ChatModel*> modelPool;void onUserClicked(const User& u) {if (!modelPool.contains(u.id)) {// 第一次点击,创建并加载ChatModel* m = new ChatModel(this);m->setMessages(loadMessagesFromDb(u.id));modelPool[u.id] = m;}listView->setModel(modelPool[u.id]);listView->scrollToBottom();}
- 优点:切换立刻就有缓存好的数据,不用每次都从数据库/网络加载。
- 缺点:如果用户特别多,内存开销会比较大。
更细粒度的更新
如果你不想一次 beginResetModel()/endResetModel() 重刷全表,还可以在 model 里实现增删改接口:
void ChatModel::appendMessage(const Message& m) {beginInsertRows(QModelIndex(), m_msgs.size(), m_msgs.size());m_msgs.append(m);endInsertRows();}void ChatModel::clearMessages() {beginRemoveRows(QModelIndex(), 0, m_msgs.size()-1);m_msgs.clear();endRemoveRows();}
- 切换用户时先
clearMessages(),然后循环appendMessage()。 - 这样 view 能做更细粒度的动画或局部刷新。
总结
- 最简单:一个 model,内部维护
QVector<Message>,切换时调用setMessages()。 - 缓存多用户:给每个用户分配一个 model,切换时调用
listView->setModel(...)。 - 增量更新:用
beginInsertRows/beginRemoveRows实现局部刷新。
选哪种方案,取决于你的聊天数据量和内存/加载开销:
- 少量用户、消息量大 → 方案一(reset)+ 分页加载
- 用户量多、切换频繁 → 方案二(model 池)
- 想要炫酷的动画或更精细性能 → 增量更新。