前情回顾
前文我们实现了跨服踢人逻辑,通过分布式锁锁住不同服务器相同用户登录的操作,保证逻辑的原子性。
今天我们来谈一谈心跳机制,以及为什么要有心跳机制,以及该如何实现心跳机制,而且是分布式情况下心跳配合踢人逻辑该如何实现。
心跳概念
在一个“长连接”(如 TCP 持久连接、WebSocket、gRPC 长流等)中,客户端和服务端之间会保持一个持续打开的通道,以便双方可以随时双向发送数据。与一次性请求/响应模型(短连接)相比,长连接最大的挑战在于如何做到“及时发现网络或对端异常”、并“防止连接在中间节点(如路由器、NAT、防火墙)被静默地回收”。心跳机制正是为了解决这两个核心问题而引入的:
心跳示意图

没有心跳

当没有心跳机制的时候,如果设备异常断线(拔掉网线),tcp层面可能无法立即感知,导致僵尸连接挂在服务器上。除非服务器发送数据给客户端才会感知到。或者被中间设备超时回收。
防止中间设备超时回收
许多网络设备(尤其是 NAT、负载均衡、防火墙)会对空闲连接设定一个超时阈值:
- 如果某段时间内连接上没有任何数据包经过,它会自动“回收”这条路由/会话,导致真正的数据到达时被丢弃或重置。
 - 心跳包可以视作“活动信号”,让中间设备认为连接仍在活跃,从而维持映射表或会话状态,避免意外断开。
 
服务器心跳实现
服务器可以启动一个定时器,每隔60s检测一下所有连接,判断连接是否’’活着’’, 所谓”活着”就是连接没有断开。
怎么设置”活着”呢?就是对每一个Session(会话)设置一个时间戳,这个Session收到消息后,就更新这个时间戳。
服务器定时检测当前时间和这个时间戳的差值,如果大于一个阈值就说明连接断开了。这个阈值看服务器设定,一般60s即可。
void CServer::on_timer(const boost::system::error_code& e){lock_guard<mutex> lock(_mutex);time_t now = time(nullptr);for(auto iter = _sessions.begin(); iter != _sessions.end();) {auto b_expired = iter->second->IsHeartbeatExpired(now);if (b_expired) {iter->second->Close();iter = _sessions.erase(iter);auto uid_str = iter->second->GetUserId();auto lock_key = LOCK_PREFIX+uid_str;auto identifier = RedisMgr::GetInstance()->acquireLock(lock_key, LOCK_TIME_OUT, ACQUIRE_TIME_OUT);Defer defer([identifier, lock_key](){RedisMgr::GetInstance()->releaseLock(lock_key,identifier);});//写清除redis逻辑}else {++iter;}}}
大家仔细观察这个代码,有没有发现什么问题?
这段代码是先加线程锁_mutex, 然后加分布式锁lock_key
但是我们看下Session读取掉线连接信息时会清空redis信息,流程如下
void CSession::AsyncReadHead(int total_len){auto self = shared_from_this();asyncReadFull(HEAD_TOTAL_LEN, [self, this](const boost::system::error_code& ec, std::size_t bytes_transfered) {try {if (ec) {std::cout << "handle read failed, error is " << ec.what() << endl;Close();auto self = shared_from_this();//加锁清除sessionauto uid_str = std::to_string(_user_uid);auto lock_key = LOCK_PREFIX + uid_str;auto identifier = RedisMgr::GetInstance()->acquireLock(lock_key, LOCK_TIME_OUT, ACQUIRE_TIME_OUT);Defer defer([identifier, lock_key, self, this]() {_server->ClearSession(_session_id);RedisMgr::GetInstance()->releaseLock(lock_key, identifier);});if (identifier.empty()) {return;}std::string redis_session_id = "";auto bsuccess = RedisMgr::GetInstance()->Get(USER_SESSION_PREFIX + uid_str, redis_session_id);if (!bsuccess) {return;}if (redis_session_id != _session_id) {//说明有客户在其他服务器异地登录了return;}RedisMgr::GetInstance()->Del(USER_SESSION_PREFIX + uid_str);//清除用户登录信息RedisMgr::GetInstance()->Del(USERIPPREFIX + uid_str);return;}//....省略正常逻辑}catch (std::exception& e) {std::cout << "Exception code is " << e.what() << endl;}});}
AsyncReadHead错误处理中先加了分布式锁lock_key,再加线程锁_mutex
图示如下

上面图示已经很明显了,有概率造成死锁。
接下来谈谈死锁如何避免
如何避免死锁
线程锁避免死锁
如果是两个线程锁,避免死锁的最简单方式就是同时加锁,或者顺序一致性加锁
在 C++17 里,std::scoped_lock(也有人称它为“scope lock”)提供了对多个互斥量无死锁地一次性加锁的能力。它的核心在于内部调用了函数模板 std::lock(m1, m2, …),该函数会:
- 尝试按某种顺序非阻塞地抓取所有 mutex:
std::lock会循环地对每一个 mutex 做try_lock(),- 如果有任何一个 
try_lock()失败,就立刻释放前面已经成功抓到的所有 mutex,退避(backoff),然后重试。 
 - 保证最终所有 mutex 要么全部抓到了,要么都没抓到:
- 这样就避免了“线程 A 拿了 m1 等待 m2,而线程 B 拿了 m2 等待 m1”这种经典死锁情形。
 
 
只要你的所有代码都用同一个调用 std::scoped_lock(m1, m2, …) 的方式去加这几把锁,就不会出现交叉锁导致的死锁。
用法示例
#include <mutex>#include <thread>#include <iostream>std::mutex mtx1, mtx2;void worker1() {// 同时加 mtx1、mtx2,不会与另一个线程交叉死锁std::scoped_lock lock(mtx1, mtx2);std::cout << "worker1 got both locks\n";// … 操作受两把锁保护的资源 …}void worker2() {// 即便另一线程也是先 mtx2、后 mtx1,只要都改成 scoped_lock(mtx1, mtx2),// 底层 std::lock 会保证不会死锁std::scoped_lock lock(mtx1, mtx2);std::cout << "worker2 got both locks\n";// … 相同资源操作 …}int main() {std::thread t1(worker1);std::thread t2(worker2);t1.join();t2.join();return 0;}
为什么不会死锁?
std::scoped_lock(mtx1, mtx2)在构造时等价于:std::lock(mtx1, mtx2);
std::lock会:- 先 
try_lock()mtx1、再try_lock()mtx2。 - 如果第二步失败,就释放第一把、稍作退避后重试。
 - 直到两把都一次性成功为止。
 
- 先 
 
这样就不会出现“线程 A 拿到 mtx1 → 等 mtx2”同时“线程 B 拿到 mtx2 → 等 mtx1”互相卡死的情况。
分布式锁
要解决“分布式锁 ↔ 线程锁”互相嵌套导致死锁的问题,核心思路就是:
统一锁的获取顺序
- 始终按同一个顺序去申请锁。
 - 比如:不论是业务 A(先分布式锁后线程锁)还是心跳(先线程锁后分布式锁),都改成 “先拿分布式锁 → 再拿线程锁” 或者 “先拿线程锁 → 再拿分布式锁” 之一即可。
 - 只要保证两个场景里锁的申请顺序一致,就不会互相等待导致死锁。
 
使用带超时的尝试锁(tryLock)+ 重试/回退策略
- 对于线程锁(例如 
ReentrantLock)和分布式锁(例如Redisson的tryLock(long waitTime, long leaseTime, TimeUnit unit)),都用tryLock而非阻塞式lock()。 - 如果某把锁在指定时间内拿不到,就释放已持有的那把锁,稍微退避(sleep 随机短时长)后重试。
 - 这样可以在检测到可能的死锁倾向时主动放弃,避免无限等待。
 
- 对于线程锁(例如 
 合并锁或升级锁策略
如果分布式节点上并发线程只是共享同一把“逻辑锁”,可以考虑把本地线程锁和分布式锁做一次封装:
class CombinedLock {RLock distLock;std::mutex mtx;public void lock() {distLock.lock();mtx.lock();}public void unlock() {mtx.unlock();distLock.unlock();}}
这样业务层只用
combinedLock.lock(),根本不用关心哪把先后,底层永远是固定顺序。
只用分布式锁或只用线程锁
- 如果心跳更新 Redis 的操作本身就是分布式的,就完全用分布式锁保护它,不再加线程锁。
 - 反之,如果这段更新完全在本机线程间协作,也可直接把分布式锁封装进本地锁里,让它表现得像本地锁。
 
利用 Redis Lua 脚本保证原子性
- 将所有对 Redis 的读写操作放到一个 Lua 脚本里一次执行,借助 Redis 的单线程特性保证原子。
 - 这样就不需要额外的分布式锁,线程里也不用再加锁。
 
改造心跳服务
举例:改造心跳服务
void CServer::on_timer(const boost::system::error_code& ec) {std::vector<std::shared_ptr<CSession>> _expired_sessions;int session_count = 0;//此处加锁遍历session{lock_guard<mutex> lock(_mutex);time_t now = std::time(nullptr);for (auto iter = _sessions.begin(); iter != _sessions.end(); iter++) {auto b_expired = iter->second->IsHeartbeatExpired(now);if (b_expired) {//关闭socket, 其实这里也会触发async_read的错误处理iter->second->Close();//收集过期信息_expired_sessions.push_back(iter->second);continue;}session_count++;}}//设置session数量auto& cfg = ConfigMgr::Inst();auto self_name = cfg["SelfServer"]["Name"];auto count_str = std::to_string(session_count);RedisMgr::GetInstance()->HSet(LOGIN_COUNT, self_name, count_str);//处理过期session, 单独提出,防止死锁for (auto &session : _expired_sessions) {session->DealExceptionSession();}//再次设置,下一个60s检测_timer.expires_after(std::chrono::seconds(60));_timer.async_wait([this](boost::system::error_code ec) {on_timer(ec);});}
将清除逻辑提炼到函数DealExceptionSession
void CSession::DealExceptionSession(){auto self = shared_from_this();//加锁清除sessionauto uid_str = std::to_string(_user_uid);auto lock_key = LOCK_PREFIX + uid_str;auto identifier = RedisMgr::GetInstance()->acquireLock(lock_key, LOCK_TIME_OUT, ACQUIRE_TIME_OUT);Defer defer([identifier, lock_key, self, this]() {_server->ClearSession(_session_id);RedisMgr::GetInstance()->releaseLock(lock_key, identifier);});if (identifier.empty()) {return;}std::string redis_session_id = "";auto bsuccess = RedisMgr::GetInstance()->Get(USER_SESSION_PREFIX + uid_str, redis_session_id);if (!bsuccess) {return;}if (redis_session_id != _session_id) {//说明有客户在其他服务器异地登录了return;}RedisMgr::GetInstance()->Del(USER_SESSION_PREFIX + uid_str);//清除用户登录信息RedisMgr::GetInstance()->Del(USERIPPREFIX + uid_str);}
持有本地线程锁只做遍历、收集过期 UID,不做删除;
释放线程锁后,对每个 UID 按 “分布式锁→线程锁” 顺序逐个清理。
这样,所有“同时持有两把锁”的位置,顺序均为:
分布式锁 → 本地线程锁
从而避免死锁
提炼异常处理
比如异步读处理
void CSession::AsyncReadBody(int total_len){auto self = shared_from_this();asyncReadFull(total_len, [self, this, total_len](const boost::system::error_code& ec, std::size_t bytes_transfered) {try {if (ec) {std::cout << "handle read failed, error is " << ec.what() << endl;Close();DealExceptionSession();return;}if (bytes_transfered < total_len) {std::cout << "read length not match, read [" << bytes_transfered << "] , total ["<< total_len<<"]" << endl;Close();_server->ClearSession(_session_id);return;}//判断连接无效if (!_server->CheckValid(_session_id)) {Close();return;}memcpy(_recv_msg_node->_data , _data , bytes_transfered);_recv_msg_node->_cur_len += bytes_transfered;_recv_msg_node->_data[_recv_msg_node->_total_len] = '\0';cout << "receive data is " << _recv_msg_node->_data << endl;//更新session心跳时间UpdateHeartbeat();//此处将消息投递到逻辑队列中LogicSystem::GetInstance()->PostMsgToQue(make_shared<LogicNode>(shared_from_this(), _recv_msg_node));//继续监听头部接受事件AsyncReadHead(HEAD_TOTAL_LEN);}catch (std::exception& e) {std::cout << "Exception code is " << e.what() << endl;}});}
类似的还有,读头部,写数据等。
过期判断
判断现在时间和上一次心跳时间戳的差值,超过20s就认为连接过期。实际服务器心跳阈值最好60s,这里为了方便演示效果。
bool CSession::IsHeartbeatExpired(std::time_t& now) {double diff_sec = std::difftime(now, _last_heartbeat);if (diff_sec > 20) {std::cout << "heartbeat expired, session id is " << _session_id << endl;return true;}return false;}
更新心跳
void CSession::UpdateHeartbeat(){time_t now = std::time(nullptr);_last_heartbeat = now;}
在读取消息时做了更新
增加心跳请求处理
服务器增加心跳处理请求
void LogicSystem::HeartBeatHandler(std::shared_ptr<CSession> session, const short& msg_id, const string& msg_data) {Json::Reader reader;Json::Value root;reader.parse(msg_data, root);auto uid = root["fromuid"].asInt();std::cout << "receive heart beat msg, uid is " << uid << std::endl;Json::Value rtvalue;rtvalue["error"] = ErrorCodes::Success;session->Send(rtvalue.toStyledString(), ID_HEARTBEAT_RSP);}void LogicSystem::RegisterCallBacks() {_fun_callbacks[MSG_CHAT_LOGIN] = std::bind(&LogicSystem::LoginHandler, this,placeholders::_1, placeholders::_2, placeholders::_3);_fun_callbacks[ID_SEARCH_USER_REQ] = std::bind(&LogicSystem::SearchInfo, this,placeholders::_1, placeholders::_2, placeholders::_3);_fun_callbacks[ID_ADD_FRIEND_REQ] = std::bind(&LogicSystem::AddFriendApply, this,placeholders::_1, placeholders::_2, placeholders::_3);_fun_callbacks[ID_AUTH_FRIEND_REQ] = std::bind(&LogicSystem::AuthFriendApply, this,placeholders::_1, placeholders::_2, placeholders::_3);_fun_callbacks[ID_TEXT_CHAT_MSG_REQ] = std::bind(&LogicSystem::DealChatTextMsg, this,placeholders::_1, placeholders::_2, placeholders::_3);_fun_callbacks[ID_HEART_BEAT_REQ] = std::bind(&LogicSystem::HeartBeatHandler, this,placeholders::_1, placeholders::_2, placeholders::_3);}
客户端增加心跳处理发包和回复
发包处理
在ChatDialog构造函数中添加
_timer = new QTimer(this);connect(_timer, &QTimer::timeout, this, [this](){auto user_info = UserMgr::GetInstance()->GetUserInfo();QJsonObject textObj;textObj["fromuid"] = user_info->_uid;QJsonDocument doc(textObj);QByteArray jsonData = doc.toJson(QJsonDocument::Compact);emit TcpMgr::GetInstance()->sig_send_data(ReqId::ID_HEART_BEAT_REQ, jsonData);});_timer->start(10000);
在析构函数中添加
ChatDialog::~ChatDialog(){_timer->stop();delete ui;}
回包处理
_handlers.insert(ID_HEARTBEAT_RSP,[this](ReqId id, int len, QByteArray data){Q_UNUSED(len);qDebug() << "handle id is " << id << " data is " << data;// 将QByteArray转换为QJsonDocumentQJsonDocument jsonDoc = QJsonDocument::fromJson(data);// 检查转换是否成功if (jsonDoc.isNull()) {qDebug() << "Failed to create QJsonDocument.";return;}QJsonObject jsonObj = jsonDoc.object();if (!jsonObj.contains("error")) {int err = ErrorCodes::ERR_JSON;qDebug() << "Heart Beat Msg Failed, err is Json Parse Err" << err;return;}int err = jsonObj["error"].toInt();if (err != ErrorCodes::SUCCESS) {qDebug() << "Heart Beat Msg Failed, err is " << err;return;}qDebug() << "Receive Heart Beat Msg Success" ;});
客户端增加断线提示
在TcpMgr构造函数中添加
// 处理连接断开QObject::connect(&_socket, &QTcpSocket::disconnected, [&]() {qDebug() << "Disconnected from server.";//并且发送通知到界面emit sig_connection_closed();});
MainWindow构造函数中添加信号连接
//连接服务器断开心跳超时或异常连接信息connect(TcpMgr::GetInstance().get(),&TcpMgr::sig_connection_closed, this, &MainWindow::SlotExcepConOffline);
槽函数
void MainWindow::SlotExcepConOffline(){// 使用静态方法直接弹出一个信息框QMessageBox::information(this, "下线提示", "心跳超时或临界异常,该终端下线!");TcpMgr::GetInstance()->CloseConnection();offlineLogin();}
效果测试
为了方便测试,我们修改StatusServer中GetServer逻辑只返回第一个ChatServer1
ChatServer StatusServiceImpl::getChatServer() {std::lock_guard<std::mutex> guard(_server_mtx);auto minServer = _servers.begin()->second;auto lock_key = LOCK_COUNT;//暂时注释//auto identifier = RedisMgr::GetInstance()->acquireLock(lock_key, LOCK_TIME_OUT, ACQUIRE_TIME_OUT);////利用defer解锁//Defer defer2([this, identifier, lock_key]() {// RedisMgr::GetInstance()->releaseLock(lock_key, identifier);// });//auto count_str = RedisMgr::GetInstance()->HGet(LOGIN_COUNT, minServer.name);//if (count_str.empty()) {// //不存在则默认设置为最大// minServer.con_count = INT_MAX;//}//else {// minServer.con_count = std::stoi(count_str);//}//// 使用范围基于for循环//for ( auto& server : _servers) {//// if (server.second.name == minServer.name) {// continue;// }// auto count_str = RedisMgr::GetInstance()->HGet(LOGIN_COUNT, server.second.name);// if (count_str.empty()) {// server.second.con_count = INT_MAX;// }// else {// server.second.con_count = std::stoi(count_str);// }// if (server.second.con_count < minServer.con_count) {// minServer = server.second;// }//}return minServer;}
我们启动客户端,以及服务器,先屏蔽客户端发送心跳逻辑,可以看到服务器检测心跳超时后会切断客户端连接

客户端添加心跳包发送
可以看到每隔10s客户端就发送心跳包给服务器,服务器收到后,打印日志,客户端也打印日志

优化连接数统计
之前我们统计一个服务器连接数,都是在服务器检测登录一个用户就增加连接数写入redis,以及CSession析构减少连接数写入redis, 还加了分布式锁,这种做法频繁加锁会影响效率,现在我们有了心跳检测,只需要在心跳检测结束后将统计的连接数写入redis即可
void CServer::on_timer(const boost::system::error_code& ec) {std::vector<std::shared_ptr<CSession>> _expired_sessions;int session_count = 0;//此处加锁遍历session{lock_guard<mutex> lock(_mutex);time_t now = std::time(nullptr);for (auto iter = _sessions.begin(); iter != _sessions.end(); iter++) {auto b_expired = iter->second->IsHeartbeatExpired(now);if (b_expired) {//关闭socket, 其实这里也会触发async_read的错误处理iter->second->Close();//收集过期信息_expired_sessions.push_back(iter->second);continue;}session_count++;}}//设置session数量auto& cfg = ConfigMgr::Inst();auto self_name = cfg["SelfServer"]["Name"];auto count_str = std::to_string(session_count);RedisMgr::GetInstance()->HSet(LOGIN_COUNT, self_name, count_str);//处理过期session, 单独提出,防止死锁for (auto &session : _expired_sessions) {session->DealExceptionSession();}//再次设置,下一个60s检测_timer.expires_after(std::chrono::seconds(60));_timer.async_wait([this](boost::system::error_code ec) {on_timer(ec);});}
状态服务器中获取连接数返回ChatServer也可以简化了
ChatServer StatusServiceImpl::getChatServer() {std::lock_guard<std::mutex> guard(_server_mtx);auto minServer = _servers.begin()->second;auto count_str = RedisMgr::GetInstance()->HGet(LOGIN_COUNT, minServer.name);if (count_str.empty()) {//不存在则默认设置为最大minServer.con_count = INT_MAX;}else {minServer.con_count = std::stoi(count_str);}// 使用范围基于for循环for ( auto& server : _servers) {if (server.second.name == minServer.name) {continue;}auto count_str = RedisMgr::GetInstance()->HGet(LOGIN_COUNT, server.second.name);if (count_str.empty()) {server.second.con_count = INT_MAX;}else {server.second.con_count = std::stoi(count_str);}if (server.second.con_count < minServer.con_count) {minServer = server.second;}}return minServer;}
哪怕我们获取的信息是旧的数据也没关系,负载分配没必要太精确,还有心跳每隔60s会更新依次连接逻辑,所以问题不大。
我们把ChatServer2的逻辑也更新成和ChatServer1,再次测试分布式情况下踢人+心跳逻辑

可以看到同账号异地登录,旧客户端收到被踢掉消息,旧的客户端关闭连接,所以弹出心跳超时或异常,该终端下线
所以我们这期踢人+心跳检测就实现了。
源码和视频
源码连接:
https://gitee.com/secondtonone1/llfcchat
视频连接:
https://www.bilibili.com/video/BV1ct5xzcEka/?vd_source=8be9e83424c2ed2c9b2a3ed1d01385e9