beast网络库实现websocket服务器

简介

使用beast网络库实现websocket服务器,一般来说websocket是一个长连接的协议,但是自动包含了解包处理,当我们在浏览器输入一个http请求时如果是以ws开头的如ws://127.0.0.1:9501就是请求本地9501端口的websocket服务器处理。而beast为我们提供了websocket的处理方案,我们可以在http服务器的基础上升级协议为websocket,处理部分websocket请求。如果服务器收到的是普通的http请求则按照http请求处理。我们可以从官方文档中按照示例逐步搭建websocket服务器。

构造websocket

在开始前我们先准备几个变量

  1. #include <boost/beast.hpp>
  2. #include <boost/beast/ssl.hpp>
  3. #include <boost/asio.hpp>
  4. #include <boost/asio/ssl.hpp>
  5. namespace net = boost::asio;
  6. namespace beast = boost::beast;
  7. using namespace boost::beast;
  8. using namespace boost::beast::websocket;
  9. net::io_context ioc;
  10. tcp_stream sock(ioc);
  11. net::ssl::context ctx(net::ssl::context::tlsv12);

WebSocket连接需要一个有状态对象,由Beast中的一个类模板websocket::stream表示。该接口使用分层流模型。一个websocket stream对象包含另一个流对象,称为“下一层”,它用于执行I/O操作。以下是每个模板参数的描述:

  1. namespace boost {
  2. namespace beast {
  3. namespace websocket {
  4. template<
  5. class NextLayer,
  6. bool deflateSupported = true>
  7. class stream;
  8. } // websocket
  9. } // beast
  10. } // boost

这段代码定义了Beast库中WebSocket实现的命名空间。其中,websocket命名空间下包含一个模板类stream,用于表示WebSocket连接。

stream类有两个模板参数:NextLayerdeflateSupported。其中,NextLayer表示WebSocket连接使用的下一层流类型,例如TCP套接字或TLS握手后的数据流;而deflateSupported则是一个bool值,表示是否支持WebSocket协议内置的压缩功能。

这些代码所在的三个命名空间分别是boostbeastwebsocket,是为了防止与其他库或用户代码发生名称冲突而创建的。将Beast库放在beast命名空间下是为了与Boost库本身分离,方便管理和维护。
当创建一个WebSocket流对象时,构造函数提供的任何参数都会被传递给下一层对象的构造函数。以下示例代码声明了一个基于TCP/IP套接字和I/O上下文的WebSocket流对象:

  1. stream<tcp_stream> ws(ioc);

上述代码创建了一个基于TCP流的WebSocket流对象,使用了指定的I/O上下文,该代码中的stream是Beast库中WebSocket流类模板的别名,其下一层流类型为tcp_stream。

需要注意的是,WebSocket流使用自己特定的超时功能来管理连接。如果使用tcp_stream或basic_stream类模板与WebSocket流一起使用,那么在连接建立后应该禁用TCP或basic流上的超时设置,否则流的行为将是不确定的。

这是因为WebSocket协议本身包含了超时机制,当流上发生超时时,WebSocket库会优先处理超时并关闭连接,而不会将超时事件传递给下层TCP或basic流。如果同时在WebSocket和TCP或basic流上启用超时设置,就可能出现冲突和未定义的行为。

因此,当使用WebSocket流时,应该避免在底层的TCP或basic流上设置超时,而是可以通过WebSocket流对象的set_option函数来设置WebSocket连接的超时时间。这样可以确保WebSocket连接中的超时机制正常工作,并且不会干扰底层流的超时设置。

与大多数I/O对象一样,WebSocket流对象也不是线程安全的。如果两个不同的线程同时访问该对象,则会产生未定义行为。

对于多线程程序,可以通过在构造tcp_stream对象时使用executor(如strand)来保证线程安全。下面的代码声明了一个使用strand来调用所有完成处理程序的WebSocket流:

  1. stream<tcp_stream> ws(net::make_strand(ioc));

如果下一层流支持移动构造,那么WebSocket流对象可以从一个已移动的对象中构造。

这意味着,在创建WebSocket流对象时,可以将下一层流对象的所有权转移到WebSocket流对象中,而不需要进行复制或重新分配。这种方式可以避免额外的内存开销和数据拷贝操作,提高程序运行效率。

例如,可以使用std::move函数将一个已存在的TCP套接字对象移动到WebSocket流中:

  1. stream<tcp_stream> ws(std::move(sock));

可以通过调用WebSocket流对象的next_layer函数来访问下一层流对象。

  1. ws.next_layer().close();

使用SSL

使用net::ssl::stream类模板作为流的模板类型,并且将net::io_contextnet::ssl::context参数传递给包装流的构造函数。

  1. stream<ssl_stream<tcp_stream>> wss(net::make_strand(ioc), ctx);

当然如果websocket stream 使用SSL类型需要包含<boost/beast/websocket/ssl.hpp>
next_layer() 函数用于访问底层的 SSL 流。ssl::stream 类中的 next_layer() 函数返回对底层 ssl_stream 的引用,它代表了建立在 SSL/TLS 层之上的网络流。

  1. wss.next_layer().handshake(net::ssl::stream_base::client);

在上述声明的多层流(如 SSL 流)中,使用 next_layer 进行链式调用访问每个层可能会很麻烦。为了简化这个过程,Boost.Beast 提供了 get_lowest_layer() 函数,用于获取多层流中的最底层流。

通过调用 get_lowest_layer() 函数,您可以直接获取多层流中的最底层流,而无需逐层调用 next_layer()。这对于取消所有未完成的 I/O 操作非常有用,例如在关闭连接之前取消所有挂起的异步操作。

  1. get_lowest_layer(wss).cancel();

连接

在进行 WebSocket 通信之前,需要首先连接 WebSocket 流,然后执行 WebSocket 握手。WebSocket 流将建立连接的任务委托给下一层流。例如,如果下一层是可连接的流或套接字对象,则可以访问它以调用必要的连接函数。以下是作为客户端执行的示例代码

  1. stream<tcp_stream> ws(ioc);
  2. net::ip::tcp::resolver resolver(ioc);
  3. get_lowest_layer(ws).connect(resolver.resolve("example.com", "ws"));

对于服务器接收连接,在WebSocket服务器中使用一个acceptor来接受传入的连接。当建立了一个传入连接时,可以从acceptor返回的socket构造WebSocket流。

  1. net::ip::tcp::acceptor acceptor(ioc);
  2. acceptor.bind(net::ip::tcp::endpoint(net::ip::tcp::v4(), 0));
  3. acceptor.listen();
  4. stream<tcp_stream> ws(acceptor.accept());

也可以通过使用acceptor成员函数的另一个重载,将传入连接直接接受到WebSocket流拥有的socket中

  1. stream<tcp_stream> ws(net::make_strand(ioc));
  2. acceptor.accept(get_lowest_layer(ws).socket());

握手

websocket通过握手将http升级为websocket协议,一个websocket协议如下

  1. GET / HTTP/1.1
  2. Host: www.example.com
  3. Upgrade: websocket
  4. Connection: upgrade
  5. Sec-WebSocket-Key: 2pGeTR0DsE4dfZs2pH+8MA==
  6. Sec-WebSocket-Version: 13
  7. User-Agent: Boost.Beast/216

先说客户端如何升级
这段代码使用websocket::stream的成员函数handshake和async_handshake,用于使用所需的主机名和目标字符串发送请求。该代码连接到从主机名查找返回的IP地址,然后在客户端角色中执行WebSocket握手

  1. stream<tcp_stream> ws(ioc);
  2. net::ip::tcp::resolver resolver(ioc);
  3. get_lowest_layer(ws).connect(resolver.resolve("www.example.com", "ws"));
  4. ws.handshake(
  5. "www.example.com", // The Host field
  6. "/" // The request-target
  7. );

在客户端收到来自服务器的HTTP Upgrade响应并指示成功升级时,调用者可能希望对接收到的HTTP响应消息进行额外的验证。例如,检查对基本身份验证挑战的响应是否有效。为了实现这一目的,handshake成员函数提供了重载形式,允许调用者将接收到的HTTP消息存储在类型为response_type的输出引用参数中。

  1. response_type res;
  2. ws.handshake(
  3. res, // Receives the HTTP response
  4. "www.example.com", // The Host field
  5. "/" // The request-target
  6. );

所以上述握手函数根据自己的需求调用一个即可。

再说服务器如何升级

对于接受传入连接的服务器,websocket::stream可以读取传入的升级请求并自动回复。如果握手符合要求,流将发送带有101切换协议状态码的升级响应。如果握手不符合要求,或者超出了调用者之前设置的流选项允许的参数范围,流将发送一个带有表示错误的状态码的HTTP响应。根据保持活动设置,连接可能保持打开状态,以进行后续握手尝试。在接收到升级请求握手后,由实现创建和发送的典型HTTP升级响应如下所示:

  1. HTTP/1.1 101 Switching Protocols
  2. Upgrade: websocket
  3. Connection: upgrade
  4. Sec-WebSocket-Accept: s3pPLMBiTxaQ9kYGzzhZRbK+xOo=
  5. Server: Boost.Beast

stream的accept和async_accept成员函数用于从已连接到传入对等方的流中读取WebSocket HTTP升级请求握手,然后发送WebSocket HTTP升级响应。示例如下

  1. ws.accept();

在某些情况下,服务器可能需要从流中读取数据,并在稍后决定将缓冲的字节解释为WebSocket升级请求。为了满足这种需求,accept和async_accept提供了接受额外缓冲区参数的重载版本。

以下是一个示例,展示了服务器如何将初始的HTTP请求头部读取到一个动态缓冲区中,然后稍后使用缓冲的数据尝试进行WebSocket升级:

  1. std::string s;
  2. net::read_until(sock, net::dynamic_buffer(s), "\r\n\r\n");
  3. ws.accept(net::buffer(s));

在实现同时支持WebSocket的HTTP服务器时,服务器通常需要从客户端读取HTTP请求。为了检测传入的HTTP请求是否是WebSocket升级请求,可以使用函数is_upgrade。

一旦调用者确定HTTP请求是WebSocket升级请求,就会提供额外的accept和async_accept重载版本,这些版本接收整个HTTP请求头作为一个对象,以进行握手处理。通过手动读取请求,程序可以处理普通的HTTP请求以及升级请求。程序还可以根据HTTP字段强制执行策略,例如基本身份验证。在这个示例中,首先使用HTTP算法读取请求,然后将其传递给新构建的流:

  1. flat_buffer buffer;
  2. http::request<http::string_body> req;
  3. http::read(sock, buffer, req);
  4. if(websocket::is_upgrade(req))
  5. {
  6. stream<tcp_stream> ws(std::move(sock));
  7. BOOST_ASSERT(buffer.size() == 0);
  8. ws.accept(req);
  9. }
  10. else
  11. {
  12. }

所以综上所述,在构建websocket升级时,可以用两种方式,一种是websocket来accept,另一种是在处理http的请求时将请求升级为websocket。
而这两种我们在实战的代码中都有实现,可以下载源码查看。

收发数据

当我们建立好websocket的握手后,就可以通过读写操作收发数据了。

  1. flat_buffer buffer;
  2. ws.read(buffer);
  3. ws.text(ws.got_text());
  4. ws.write(buffer.data());
  5. buffer.consume(buffer.size());

上面的代码片段采用同步读和同步写的方式,根据接收消息的类型设置 WebSocket 连接的模式。
回显接收到的消息给对等端。
清空缓冲区,以便下一次读取可以开始构建新的消息。

但有些场景我们不能通过buffer一次性的读出数据

这是一些使用场景,这些场景中无法预先缓冲整个消息:

  1. 向终端流式传输多媒体:在流式传输多媒体到终端时,通常不适合或不可能预先缓冲整个消息。例如,在实时视频流或音频流的传输过程中,数据可能以非常大的速率产生,并且需要立即传输给接收端进行实时播放。由于数据量巨大且需即时传输,预先缓冲整个消息可能会导致延迟或资源耗尽。

  2. 发送超出内存容量的消息:有时候需要发送的消息太大,无法一次性完全存储在内存中。这可能发生在需要传输大型文件或大量数据的情况下。如果尝试将整个消息加载到内存中,可能会导致内存溢出或系统性能下降。在这种情况下,需要通过分块或逐步读取的方式来发送消息,以便逐步加载和传输数据。

  3. 提供增量结果:在某些情况下,需要在处理过程中提供增量的结果,而不是等待整个处理完成后再返回结果。这可以在长时间运行的计算、搜索或处理任务中发生。通过逐步提供部分结果,可以让用户或应用程序更早地获得一些数据,并可以在处理过程中进行进一步的操作或显示。这种方式可以改善用户体验,并减少等待时间。

在这些特定的使用场景中,需要采用逐步处理、流式传输或增量输出的方式,而不是依赖于预先缓冲整个消息。这样可以提高性能、降低内存消耗,并满足特定需求。

如下是asio提供的官方案例,通过流式读取的方式获取对端信息

  1. multi_buffer buffer;
  2. do
  3. {
  4. ws.read_some(buffer, 512);
  5. }
  6. while(! ws.is_message_done());
  7. ws.binary(ws.got_binary());
  8. buffers_suffix<multi_buffer::const_buffers_type> cb{buffer.data()};
  9. for(;;)
  10. {
  11. if(buffer_bytes(cb) > 512)
  12. {
  13. ws.write_some(false, buffers_prefix(512, cb));
  14. cb.consume(512);
  15. }
  16. else
  17. {
  18. ws.write_some(true, cb);
  19. break;
  20. }
  21. }
  22. buffer.consume(buffer.size());

这段代码涉及 WebSocket 数据的读取和写入操作。以下是对每个部分的解释:

1 multi_buffer buffer;
这行代码定义了一个名为 buffermulti_buffer 对象,用于存储读取的 WebSocket 数据。

2 do { ... } while(! ws.is_message_done());
这部分代码使用一个循环,连续调用 ws.read_some 方法从 WebSocket 连接中读取数据,并将其存储到 buffer 中,直到 WebSocket 消息全部接收完成。

3 ws.binary(ws.got_binary());
此行代码将 WebSocket 连接设置为二进制模式,以便在后续的写入操作中正确处理二进制数据。

4 buffers_suffix<multi_buffer::const_buffers_type> cb{buffer.data()};
这行代码创建了一个 cb 对象,它是 buffer 的后缀子序列。它提供了对 buffer 中已接收数据的访问。

5 if(buffer_bytes(cb) > 512) { ... } else { ... }
这段代码检查 cb 中的数据量是否大于 512 字节。如果是,将执行 if 语句块;否则,将执行 else 语句块。

6 ws.write_some(false, buffers_prefix(512, cb));
如果 cb 中的数据量大于 512 字节,此行代码将发送 cb 的前缀(前 512 字节)到 WebSocket 连接中,并保留剩余的数据。

7 cb.consume(512);
这行代码告知 cb 对象消耗了前 512 字节的数据,以便在后续迭代中更新迭代范围。

8 ws.write_some(true, cb);
如果 cb 中的数据量少于等于 512 字节,此行代码将发送 cb 中的所有数据到 WebSocket 连接中。

buffer.consume(buffer.size());
这行代码清空 buffer 中存储的数据,使其为空。

总体来说,这段代码的作用是读取 WebSocket 数据并将其写回 WebSocket 连接,确保数据按照预期进行处理。最后,通过 buffer.consume(buffer.size()) 清空缓冲区,准备下一次数据读取操作。

关闭连接

当我们想关闭连接时,可以通过close 或 async_close 的关闭函数。
具体而言,WebSocket 协议规定了两种关闭方式:

close
这是一个同步操作的函数,用于关闭 WebSocket 会话。当调用 close 时,客户端或服务器会向对方发送一个关闭帧,并且在收到对方的关闭帧后,会话将被正常关闭。

async_close
这是一个异步操作的函数,用于关闭 WebSocket 会话。与 close 不同,async_close 是一个非阻塞操作,它不会等待对方的关闭帧,而是立即返回,并触发一个异步关闭操作。这样可以在关闭过程中继续处理其他任务,而不必等待关闭完成。

这些关闭函数允许主机在 WebSocket 会话中发起关闭请求,以便安全地终止连接。应用程序可以根据需要选择适合的关闭函数,具体取决于其对同步或异步关闭的要求。

因为一个websocket会包含多个层,所以我们可以通过获取最底层,再执行关闭。这样保证所有层都安全关闭。

  1. get_lowest_layer(wss).close();

总结

上面讲述了beast网络库websocket的使用方法,我通过实战,利用异步函数封装了一个websocket服务器,可以从下面的源码连接里看看项目代码。

视频连接https://space.bilibili.com/271469206/channel/collectiondetail?sid=313101

源码链接https://gitee.com/secondtonone1/boostasio-learn

热门评论

热门文章

  1. 解密定时器的实现细节

    喜欢(566) 浏览(3503)
  2. slice介绍和使用

    喜欢(521) 浏览(2497)
  3. C++ 类的继承封装和多态

    喜欢(588) 浏览(5022)
  4. windows环境搭建和vscode配置

    喜欢(587) 浏览(2842)
  5. Linux环境搭建和编码

    喜欢(594) 浏览(12336)

最新评论

  1. C++ 并发三剑客future, promise和async Yunfei:大佬您好,如果这个线程池中加入的异步任务的形参如果有右值引用,这个commit中的返回类型推导和bind绑定就会出现问题,请问实际工程中,是不是不会用到这种任务,如果用到了,应该怎么解决?
  2. Qt MVC结构之QItemDelegate介绍 胡歌-此生不换:gpt, google
  3. 聊天项目(9) redis服务搭建 pro_lin:redis线程池的析构函数,除了pop出队列,还要free掉redis连接把
  4. 答疑汇总(thread,async源码分析) Yagus:如果引用计数为0,则会执行 future 的析构进而等待任务执行完成,那么看到的输出将是 这边应该不对吧,std::future析构只在这三种情况都满足的时候才回block: 1.共享状态是std::async 创造的(类型是_Task_async_state) 2.共享状态没有ready 3.这个future是共享状态的最后一个引用 这边共享状态类型是“_Package_state”,引用计数即使为0也不应该block啊

个人公众号

个人微信