Home [翻译] 为什么C++20是最awesome的网络编程语言
Post
Cancel

[翻译] 为什么C++20是最awesome的网络编程语言

cover

这是早段时间Asio作者Christopher Kohlhoff在油管上的演讲。如果你对Asio的发展历史感兴趣,或者想了解如何使用modern的方式来进行C++20网络编程,那可以看下这篇文本翻译。

Asio的接口演变

cpp11-callback-style

先回到前C++11时代,如果要实现一个异步回调,那么一般可选三种形式:可以是operator()std::bind()或者是lambda

asio-api-old

此时,Asio提供的异步函数API如上图foo所示[1],用户提供的回调称为completion handler[2]。回调函数签名的设计也易于使用:

  • 参数列表可以提供是否成功或者传输了多少字节等信息。
  • 返回类型为void,原因是回调函数在后台执行,不需要关注它的返回。

[1] 更准确点说foo应该是initiating function,用户主动调用它来启动asynchronous operation。后面会把initiating function视为异步函数。

[2] asynchronous operation在完成时调用completion handler。这方面的概念细节可以看Asio官网的异步模型介绍。

asio-api-new

在2014-2015年,作者对API增加了返回类型推导以提高灵活性。一方面是异步函数接收completion token(而不是completion handler),另一方面返回类型是基于completion token类型得到的DEDUCED类型。

asio-api-use

新的API具有灵活性是因为Asio在内部提供了一个async_result的定制点,它通过以下三个输入来萃取生成异步操作:

  • Asio异步函数的签名。
  • Asio异步函数的实现。
  • 用户提供的completion token。

use-callback

最简单的例子就是传入一个回调(比如lambda),这个时候因为async_result定制点采用了默认的行为,推导出来的completion token等同于之前的completion handler,返回类型也同样是void,异步操作也依然是在后台运行任务以及通过回调收集得到的结果。因此对于用户来说,以前的代码并不需要作出任何改动。

use-future

那多出来的抽象不是毫无意义?当然不是。为了让你知道completion token能做什么,上图展示了一个开箱即用的use_future(它也是completion token)。use_future本身只是一个占位符而非某种回调,当作为completion token在内部输入给async_result后,生成的异步操作将返回一个future,因此你可以用它来等待操作的完成。

use-future-2

这个时候我们就多了一种新的异步编程方式。这个例子执行异步函数async_read_some(),它返回的字节数可以通过future.get()的方式去得到(而不是回调的方式)。

use-yield-context

不仅如此,我们可以轻松的支持更多异步编程方式。在这个例子中,我们支持了有栈协程的编程方式,这里的completion token并不是一个占位符,而是一个有栈协程的上下文yield,因此异步操作的实现实际是通过挂起/恢复来完成,直到异步操作完成才返回给你传输的字节数。这样做可以让你编写形如同步风格的异步代码。

use-boost-fiber

还有一个类似的是支持boost.fiber,意思相近,这里就不多介绍了。现在你可以知道,通过completion token不仅能轻易支持多种异步编程方式,还可以使得第三方库的定制变得更加简单。

use-awaitable

最后当然少不了C++20 coroutine。后面将展示如何用Asio with C++20写出awesome的代码。

Man-in-the-middle proxy

example

在后面的多个版本的代码中我们都使用同一个流程图,也就是做一个proxy server,实现

\[client \leftrightarrows server \leftrightarrows target\]

这里每个红色的有向边都代表一个异步操作,流程从最上方的async_accept()开始执行。

Step 0: C++11

在踏出第一步前,先品鉴一下C++11时代用到的代码风格。代码注释中的顺序编号可以帮助你更好理解异步代码:

#include <array>
#include <asio.hpp>
#include <iostream>
#include <memory>

using asio::buffer;
using asio::ip::tcp;

// proxy类记录client和server
class proxy : public std::enable_shared_from_this<proxy> {
  public:
  proxy(tcp::socket client)
      : client_(std::move(client))
      // server和client将运行在同一个IO context
      , server_(client_.get_executor())
  {
  }

  // 2. 这是构造的proxy首先调用的异步函数(async_connect)
  // server将作为一个客户端连接到远程的target(作者在示例中使用www.boost.org:80)
  void connect_to_server(tcp::endpoint target)
  {
    auto self = shared_from_this();
    // 当连接到target完成后,执行read_from_client()和read_from_server()
    // 实际上read_from_client()和read_from_server()内部只调用async_read_some()
    // 因此在io_context中是并发的执行2条async_read_some()异步操作链
    server_.async_connect(target, [self](std::error_code error) {
      if (!error) {
        self->read_from_client();
        self->read_from_server();
      }
    });
  }

  private:
  void stop()
  {
    client_.close();
    server_.close();
  }

  // 3. 当你使用client写入数据,比如用telnet写入"GET / HTTP/1.0"
  // Asio就会实际处理read操作,读到的数据放入data_from_client_缓冲中
  // 后续将通过write_to_server()将读到的n个字节写到远程target
  void read_from_client()
  {
    auto self = shared_from_this();
    client_.async_read_some(buffer(data_from_client_),
        [self](std::error_code error, std::size_t n) {
          if (!error) {
            self->write_to_server(n);
          } else {
            self->stop();
          }
        });
  }

  // 4. 通过async_write()将data_from_client_缓冲中的数据写到远程target
  // 写完后重新从client读取数据,形成了流程图中的循环
  void write_to_server(std::size_t n)
  {
    auto self = shared_from_this();
    async_write(server_, buffer(data_from_client_, n),
        [self](std::error_code ec, std::size_t /*n*/) {
          if (!ec) {
            self->read_from_client();
          } else {
            self->stop();
          }
        });
  }

  // 5. 这个分叉的另一条链路
  // 当proxy server执行第4步时,远程target也会给出response
  // 因此会读数据到data_from_server_缓冲
  void read_from_server()
  {
    auto self = shared_from_this();
    server_.async_read_some(asio::buffer(data_from_server_),
        [self](std::error_code error, std::size_t n) {
          if (!error) {
            self->write_to_client(n);
          } else {
            self->stop();
          }
        });
  }

  // 6. proxy转发data_from_server_内容给client
  void write_to_client(std::size_t n)
  {
    auto self = shared_from_this();
    async_write(client_, buffer(data_from_server_, n),
        [self](std::error_code ec, std::size_t /*n*/) {
          if (!ec) {
            self->read_from_server();
          } else {
            self->stop();
          }
        });
  }

  tcp::socket client_;
  tcp::socket server_;
  std::array<char, 1024> data_from_client_;
  std::array<char, 1024> data_from_server_;
};

void listen(tcp::acceptor& acceptor, tcp::endpoint target)
{
  // 1. 第一个异步操作,accept得到的client在异步操作完成时传入回调
  // 然后构造proxy server,并尝试连接到target
  acceptor.async_accept(
      [&acceptor, target](std::error_code error, tcp::socket client) {
        if (!error) {
          std::make_shared<proxy>(std::move(client))->connect_to_server(target);
        }

        // 这是Asio常用的一种“递归”的写法,可以再次accept另一个client
        // 因此可以处理多个客户端
        listen(acceptor, target);
      });
}

int main(int argc, char* argv[])
{
  try {
    if (argc != 5) {
      std::cerr << "Usage: proxy";
      std::cerr << " <listen_address> <listen_port>";
      std::cerr << " <target_address> <target_port>\n";
      return 1;
    }

    asio::io_context ctx;

    auto listen_endpoint
        = *tcp::resolver(ctx).resolve(argv[1], argv[2], tcp::resolver::passive);

    auto target_endpoint = *tcp::resolver(ctx).resolve(argv[3], argv[4]);

    tcp::acceptor acceptor(ctx, listen_endpoint);

    listen(acceptor, target_endpoint);

    ctx.run();
  } catch (std::exception& e) {
    std::cerr << "Exception: " << e.what() << "\n";
  }
}

代码稍微有点长,但这个代码只需把流程图白框的内容视为completion handler,那就是一个照抄的流程。

NOTE: 如果有些同学不熟悉Asio,我在这里简单总结一下:

  • 如何描述异步:使用Asio的异步函数来启动异步操作,用户传入完成异步操作时需要的回调。
  • 如何描述回调:使用lambda
  • 如何描述错误:使用std::error_code
  • 异步操作在哪里执行:io_context.run()对应的线程。
  • 异步操作链之间有何关联:存在共享的proxy对象。

Step 1: coroutine

现在实现一个基本的Asio with awesome C++20代码:将回调改成协程。

#include <array>
#include <asio.hpp>
#include <iostream>
#include <memory>

using asio::awaitable;
using asio::buffer;
using asio::co_spawn;
using asio::detached;
using asio::use_awaitable;
using asio::ip::tcp;

// 现在proxy不需要共享的读写缓冲
struct proxy_state {
  proxy_state(tcp::socket client)
      : client(std::move(client))
  {
  }

  tcp::socket client;
  tcp::socket server { client.get_executor() };
};

using proxy_state_ptr = std::shared_ptr<proxy_state>;

awaitable<void> client_to_server(proxy_state_ptr state)
{
  try {
    std::array<char, 1024> data;

    for (;;) {
      auto n
          = co_await state->client.async_read_some(buffer(data), use_awaitable);

      co_await async_write(state->server, buffer(data, n), use_awaitable);
    }
  } catch (const std::exception& e) {
    state->client.close();
    state->server.close();
  }
}

// 3. 使用协程处理转发
awaitable<void> server_to_client(proxy_state_ptr state)
{
  // 协程默认使用exception来处理错误
  try {
    // 由于不需要拆分函数,因此缓冲可以是局部变量
    std::array<char, 1024> data;

    // 描述循环的异步操作链
    for (;;) {
      // 读后写的操作
      // 整个函数看起来是不是非常的“同步”?
      auto n
          = co_await state->server.async_read_some(buffer(data), use_awaitable);

      co_await async_write(state->client, buffer(data, n), use_awaitable);
    }
  } catch (const std::exception& e) {
    state->client.close();
    state->server.close();
  }
}

// 2. server主动连接到远程target
// 建立连接后并发执行client_to_server()和server_to_client()
awaitable<void> proxy(tcp::socket client, tcp::endpoint target)
{
  auto state = std::make_shared<proxy_state>(std::move(client));

  co_await state->server.async_connect(target, use_awaitable);

  auto ex = state->client.get_executor();
  // 作者提示这里不应使用co_await client_to_server(state)
  // 因为并不是等到client_to_server()执行完才执行server_to_client()
  // 所以要使用co_spawn生成独立的异步操作链
  co_spawn(ex, client_to_server(state), detached);

  co_await server_to_client(state);
}

// 1. listen()需要返回awaitable<void>
awaitable<void> listen(tcp::acceptor& acceptor, tcp::endpoint target)
{
  // 现在我们不需要“递归”
  for (;;) {
    // 使用use_awaitable即C++20协程来替代回调
    // co_await会让出协程,当返回时,已经accept得到client
    // 默认client和acceptor有相同的executor
    auto client = co_await acceptor.async_accept(use_awaitable);

    auto ex = client.get_executor();
    // 使用co_spawn生成新的异步操作链,现在一条链路处理accept,另一条处理proxy
    // 首个参数接受executor,因此指定了异步操作链的执行上下文
    // 使用spawn出来的proxy()协程处理连接
    // detached表示不关心协程的结果,本质是completion token生成了一个空的completion handler
    co_spawn(ex, proxy(std::move(client), target), detached);
  }
}

int main(int argc, char* argv[])
{
  try {
    if (argc != 5) {
      std::cerr << "Usage: proxy";
      std::cerr << " <listen_address> <listen_port>";
      std::cerr << " <target_address> <target_port>\n";
      return 1;
    }

    asio::io_context ctx;

    auto listen_endpoint
        = *tcp::resolver(ctx).resolve(argv[1], argv[2], tcp::resolver::passive);

    auto target_endpoint = *tcp::resolver(ctx).resolve(argv[3], argv[4]);

    tcp::acceptor acceptor(ctx, listen_endpoint);

    // 将在context执行首个listen() coroutine
    co_spawn(ctx, listen(acceptor, target_endpoint), detached);

    ctx.run();
  } catch (std::exception& e) {
    std::cerr << "Exception: " << e.what() << "\n";
  }
}

现在这个版本的proxy使用了use_awaitable的completion token,代码风格接近同步编程。

相比之下,可以看下有什么不同:

  • 如何描述异步:使用co_spawn创建协程,使用co_await收集异步结果。
  • 如何描述回调:没有回调!
  • 如何描述错误:使用try-catch
  • 异步操作链之间有何关联:存在共享的proxy对象,但要求的状态变少了。

当然这只是第一步,我们后面会继续打磨。

Step 2: completion token adapter

前面看到Step 1处理错误需要使用异常,这在Asio当中并非必要。你可以利用completion token的扩展性,通过封装简单的adapter来改变这种行为。

#include <array>
#include <asio.hpp>
#include <asio/experimental/as_tuple.hpp>
#include <iostream>
#include <memory>

using asio::awaitable;
using asio::buffer;
using asio::co_spawn;
using asio::detached;
using asio::ip::tcp;

// 1. 新的completion token
constexpr auto use_nothrow_awaitable
    = asio::experimental::as_tuple(asio::use_awaitable);

struct proxy_state {
  proxy_state(tcp::socket client)
      : client(std::move(client))
  {
  }

  tcp::socket client;
  tcp::socket server { client.get_executor() };
};

using proxy_state_ptr = std::shared_ptr<proxy_state>;

awaitable<void> client_to_server(proxy_state_ptr state)
{
  std::array<char, 1024> data;

  for (;;) {
    // 2. 现在返回tuple,第一个element就是error code
    auto [e1, n1] = co_await state->client.async_read_some(
        buffer(data), use_nothrow_awaitable);
    if (e1)
      break;

    auto [e2, n2] = co_await async_write(
        state->server, buffer(data, n1), use_nothrow_awaitable);
    if (e2)
      break;
  }

  state->client.close();
  state->server.close();
}

awaitable<void> server_to_client(proxy_state_ptr state)
{
  std::array<char, 1024> data;

  for (;;) {
    auto [e1, n1] = co_await state->server.async_read_some(
        buffer(data), use_nothrow_awaitable);
    if (e1)
      break;

    auto [e2, n2] = co_await async_write(
        state->client, buffer(data, n1), use_nothrow_awaitable);
    if (e2)
      break;
  }

  state->client.close();
  state->server.close();
}

awaitable<void> proxy(tcp::socket client, tcp::endpoint target)
{
  auto state = std::make_shared<proxy_state>(std::move(client));

  auto [e]
      = co_await state->server.async_connect(target, use_nothrow_awaitable);
  if (!e) {
    auto ex = state->client.get_executor();
    co_spawn(ex, client_to_server(state), detached);

    co_await server_to_client(state);
  }
}

awaitable<void> listen(tcp::acceptor& acceptor, tcp::endpoint target)
{
  for (;;) {
    auto [e, client] = co_await acceptor.async_accept(use_nothrow_awaitable);
    if (e)
      break;

    auto ex = client.get_executor();
    co_spawn(ex, proxy(std::move(client), target), detached);
  }
}

int main(int argc, char* argv[])
{
  try {
    if (argc != 5) {
      std::cerr << "Usage: proxy";
      std::cerr << " <listen_address> <listen_port>";
      std::cerr << " <target_address> <target_port>\n";
      return 1;
    }

    asio::io_context ctx;

    auto listen_endpoint
        = *tcp::resolver(ctx).resolve(argv[1], argv[2], tcp::resolver::passive);

    auto target_endpoint = *tcp::resolver(ctx).resolve(argv[3], argv[4]);

    tcp::acceptor acceptor(ctx, listen_endpoint);

    co_spawn(ctx, listen(acceptor, target_endpoint), detached);

    ctx.run();
  } catch (std::exception& e) {
    std::cerr << "Exception: " << e.what() << "\n";
  }
}

这段代码改动很少,但利用completion token的扩展性可以轻松改变错误处理的行为。如今可以搭配结构化绑定来返回得到error code,而不是必须处理异常。

Step 3: timeout

超时管理也是必要的,有没有办法让客户端在一定时间内没操作就关掉连接呢?当然可以,这里给出了基本的操作。

#include <array>
#include <asio.hpp>
#include <asio/experimental/as_tuple.hpp>
#include <iostream>
#include <memory>

using asio::awaitable;
using asio::buffer;
using asio::co_spawn;
using asio::detached;
using asio::ip::tcp;
using std::chrono::steady_clock;
using namespace std::literals::chrono_literals;

constexpr auto use_nothrow_awaitable
    = asio::experimental::as_tuple(asio::use_awaitable);

struct proxy_state {
  proxy_state(tcp::socket client)
      : client(std::move(client))
  {
  }

  tcp::socket client;
  tcp::socket server { client.get_executor() };
  // 1. 引入新的共享时间戳
  steady_clock::time_point deadline;
};

using proxy_state_ptr = std::shared_ptr<proxy_state>;

awaitable<void> client_to_server(proxy_state_ptr state)
{
  std::array<char, 1024> data;

  for (;;) {
    // 2. 在任何操作前,先更新deadline
    state->deadline = std::max(state->deadline, steady_clock::now() + 5s);

    auto [e1, n1] = co_await state->client.async_read_some(
        buffer(data), use_nothrow_awaitable);
    if (e1)
      break;

    auto [e2, n2] = co_await async_write(
        state->server, buffer(data, n1), use_nothrow_awaitable);
    if (e2)
      break;
  }

  state->client.close();
  state->server.close();
}

awaitable<void> server_to_client(proxy_state_ptr state)
{
  std::array<char, 1024> data;

  for (;;) {
    state->deadline = std::max(state->deadline, steady_clock::now() + 5s);

    auto [e1, n1] = co_await state->server.async_read_some(
        buffer(data), use_nothrow_awaitable);
    if (e1)
      break;

    auto [e2, n2] = co_await async_write(
        state->client, buffer(data, n1), use_nothrow_awaitable);
    if (e2)
      break;
  }

  state->client.close();
  state->server.close();
}

// 3. 添加新的协程watchdog,超时则关闭连接
awaitable<void> watchdog(proxy_state_ptr state)
{
  asio::steady_timer timer(state->client.get_executor());

  auto now = steady_clock::now();
  while (state->deadline > now) {
    timer.expires_at(state->deadline);
    co_await timer.async_wait(use_nothrow_awaitable);
    now = steady_clock::now();
  }

  state->client.close();
  state->server.close();
}

// 4. 现在proxy()里面有3条异步操操作链,相比之前的版本新增一个watchdog超时管理
awaitable<void> proxy(tcp::socket client, tcp::endpoint target)
{
  auto state = std::make_shared<proxy_state>(std::move(client));

  auto [e]
      = co_await state->server.async_connect(target, use_nothrow_awaitable);
  if (!e) {
    auto ex = state->client.get_executor();
    co_spawn(ex, client_to_server(state), detached);
    co_spawn(ex, server_to_client(state), detached);

    // 当发生超时现象,这里执行close
    co_await watchdog(state);
  }
}

awaitable<void> listen(tcp::acceptor& acceptor, tcp::endpoint target)
{
  for (;;) {
    auto [e, client] = co_await acceptor.async_accept(use_nothrow_awaitable);
    if (e)
      break;

    auto ex = client.get_executor();
    co_spawn(ex, proxy(std::move(client), target), detached);
  }
}

int main(int argc, char* argv[])
{
  try {
    if (argc != 5) {
      std::cerr << "Usage: proxy";
      std::cerr << " <listen_address> <listen_port>";
      std::cerr << " <target_address> <target_port>\n";
      return 1;
    }

    asio::io_context ctx;

    auto listen_endpoint
        = *tcp::resolver(ctx).resolve(argv[1], argv[2], tcp::resolver::passive);

    auto target_endpoint = *tcp::resolver(ctx).resolve(argv[3], argv[4]);

    tcp::acceptor acceptor(ctx, listen_endpoint);

    co_spawn(ctx, listen(acceptor, target_endpoint), detached);

    ctx.run();
  } catch (std::exception& e) {
    std::cerr << "Exception: " << e.what() << "\n";
  }
}

思路很简单,插入一个watchdog协程即可。但这仅是一个低配版的代码,称不上awesome。作者提到如果希望更加精细的控制取消行为(per-operation cancellation)而不是直接全部关掉,也可以使用Asio的cancellation slot机制,但大部分人并不会直接使用这种偏底层的方法。

Step 4: awaitable operators

作者用了一个更加高层的抽象来执行取消操作,具体的办法就是使用逻辑或来组合异步操作。

awaitable operator

这里仍然是并行的执行,但任意一个协程完成(逻辑或短路),另外两个协程将被取消。

完整代码如下:

#include <array>
#include <asio.hpp>
#include <asio/experimental/as_tuple.hpp>
#include <asio/experimental/awaitable_operators.hpp>
#include <iostream>
#include <memory>

using asio::awaitable;
using asio::buffer;
using asio::co_spawn;
using asio::detached;
using asio::ip::tcp;
using namespace asio::experimental::awaitable_operators;
using std::chrono::steady_clock;
using namespace std::literals::chrono_literals;

constexpr auto use_nothrow_awaitable
    = asio::experimental::as_tuple(asio::use_awaitable);

struct proxy_state {
  proxy_state(tcp::socket client)
      : client(std::move(client))
  {
  }

  tcp::socket client;
  tcp::socket server { client.get_executor() };
  steady_clock::time_point deadline;
};

using proxy_state_ptr = std::shared_ptr<proxy_state>;

awaitable<void> client_to_server(proxy_state_ptr state)
{
  std::array<char, 1024> data;

  for (;;) {
    state->deadline = std::max(state->deadline, steady_clock::now() + 5s);

    auto [e1, n1] = co_await state->client.async_read_some(
        buffer(data), use_nothrow_awaitable);
    if (e1)
      co_return;

    auto [e2, n2] = co_await async_write(
        state->server, buffer(data, n1), use_nothrow_awaitable);
    if (e2)
      co_return;
  }
}

awaitable<void> server_to_client(proxy_state_ptr state)
{
  std::array<char, 1024> data;

  for (;;) {
    state->deadline = std::max(state->deadline, steady_clock::now() + 5s);

    auto [e1, n1] = co_await state->server.async_read_some(
        buffer(data), use_nothrow_awaitable);
    if (e1)
      co_return;

    auto [e2, n2] = co_await async_write(
        state->client, buffer(data, n1), use_nothrow_awaitable);
    if (e2)
      co_return;
  }
}

// 这里watchdog的逻辑也简化了,只检查timer
awaitable<void> watchdog(proxy_state_ptr state)
{
  asio::steady_timer timer(state->client.get_executor());

  auto now = steady_clock::now();
  while (state->deadline > now) {
    timer.expires_at(state->deadline);
    co_await timer.async_wait(use_nothrow_awaitable);
    now = steady_clock::now();
  }
}

awaitable<void> proxy(tcp::socket client, tcp::endpoint target)
{
  auto state = std::make_shared<proxy_state>(std::move(client));

  auto [e]
      = co_await state->server.async_connect(target, use_nothrow_awaitable);
  if (!e) {
    // 这里有3个并行且循环执行的异步操作链
    co_await (
        client_to_server(state) || server_to_client(state) || watchdog(state));

    // 只有任意一个协程完成才能往下走
    // 关闭连接的逻辑放在这里
    state->client.close();
    state->server.close();
  }
}

awaitable<void> listen(tcp::acceptor& acceptor, tcp::endpoint target)
{
  for (;;) {
    auto [e, client] = co_await acceptor.async_accept(use_nothrow_awaitable);
    if (e)
      break;

    auto ex = client.get_executor();
    co_spawn(ex, proxy(std::move(client), target), detached);
  }
}

int main(int argc, char* argv[])
{
  try {
    if (argc != 5) {
      std::cerr << "Usage: proxy";
      std::cerr << " <listen_address> <listen_port>";
      std::cerr << " <target_address> <target_port>\n";
      return 1;
    }

    asio::io_context ctx;

    auto listen_endpoint
        = *tcp::resolver(ctx).resolve(argv[1], argv[2], tcp::resolver::passive);

    auto target_endpoint = *tcp::resolver(ctx).resolve(argv[3], argv[4]);

    tcp::acceptor acceptor(ctx, listen_endpoint);

    co_spawn(ctx, listen(acceptor, target_endpoint), detached);

    ctx.run();
  } catch (std::exception& e) {
    std::cerr << "Exception: " << e.what() << "\n";
  }
}

那有没有办法是等待所有协程执行完?当然没问题,使用逻辑与即可。

Step 5: no shared state

前面我们的代码一直使用shared state的方式来描述一个proxy对象,但是显然前面版本的proxy()能通过一个co_await来保证三个异步操作链中对象的生命周期,因此我们能干掉shared state。

#include <array>
#include <asio.hpp>
#include <asio/experimental/as_tuple.hpp>
#include <asio/experimental/awaitable_operators.hpp>
#include <iostream>
#include <memory>

using asio::awaitable;
using asio::buffer;
using asio::co_spawn;
using asio::detached;
using asio::ip::tcp;
namespace this_coro = asio::this_coro;
using namespace asio::experimental::awaitable_operators;
using std::chrono::steady_clock;
using namespace std::literals::chrono_literals;

constexpr auto use_nothrow_awaitable
    = asio::experimental::as_tuple(asio::use_awaitable);

// 1. 不需要区分client_to_server()和server_to_client()
// 合并为一个函数
awaitable<void> transfer(
    tcp::socket& from, tcp::socket& to, steady_clock::time_point& deadline)
{
  std::array<char, 1024> data;

  for (;;) {
    deadline = std::max(deadline, steady_clock::now() + 5s);

    auto [e1, n1]
        = co_await from.async_read_some(buffer(data), use_nothrow_awaitable);
    if (e1)
      co_return;

    auto [e2, n2]
        = co_await async_write(to, buffer(data, n1), use_nothrow_awaitable);
    if (e2)
      co_return;
  }
}

awaitable<void> watchdog(steady_clock::time_point& deadline)
{
  // 使用`this_coro::executor`来获取当前上下文对应的executor
  // 进一步干掉shared state
  asio::steady_timer timer(co_await this_coro::executor);

  auto now = steady_clock::now();
  while (deadline > now) {
    timer.expires_at(deadline);
    co_await timer.async_wait(use_nothrow_awaitable);
    now = steady_clock::now();
  }
}

// 2. 使用局部变量即可
awaitable<void> proxy(tcp::socket client, tcp::endpoint target)
{
  tcp::socket server(client.get_executor());
  steady_clock::time_point deadline {};

  auto [e] = co_await server.async_connect(target, use_nothrow_awaitable);
  if (!e) {
    co_await (
      transfer(client, server, deadline) ||
      transfer(server, client, deadline) ||
      watchdog(deadline)
    );
  }
  // 使用RAII管理资源,不需要手动close()
}

awaitable<void> listen(tcp::acceptor& acceptor, tcp::endpoint target)
{
  for (;;) {
    auto [e, client] = co_await acceptor.async_accept(use_nothrow_awaitable);
    if (e)
      break;

    auto ex = client.get_executor();
    co_spawn(ex, proxy(std::move(client), target), detached);
  }
}

int main(int argc, char* argv[])
{
  try {
    if (argc != 5) {
      std::cerr << "Usage: proxy";
      std::cerr << " <listen_address> <listen_port>";
      std::cerr << " <target_address> <target_port>\n";
      return 1;
    }

    asio::io_context ctx;

    auto listen_endpoint
        = *tcp::resolver(ctx).resolve(argv[1], argv[2], tcp::resolver::passive);

    auto target_endpoint = *tcp::resolver(ctx).resolve(argv[3], argv[4]);

    tcp::acceptor acceptor(ctx, listen_endpoint);

    co_spawn(ctx, listen(acceptor, target_endpoint), detached);

    ctx.run();
  } catch (std::exception& e) {
    std::cerr << "Exception: " << e.what() << "\n";
  }
}

因为消灭了shared state,我们的代码其实是更易于维护,你甚至不需要手动执行close()

Step 6: more awaitable operators

前面说到我们希望更精细的控制每个连接的行为而不是共用同一个超时管理,这通过awesome的awaitable operator其实是轻而易举的。

step-6

现在只需每一个连接用内部的watchdog来管理即可。

Step 7: return value

刚才的示例都回避了co_await的返回值,多个awaitable允许不同逻辑的组合,那该如何返回?

一个简单的结论是:

  • 如果使用逻辑与,那就是std::tuple
  • 如果使用逻辑或,那就是std::variant
#include <array>
#include <asio.hpp>
#include <asio/experimental/as_tuple.hpp>
#include <asio/experimental/awaitable_operators.hpp>
#include <iostream>
#include <memory>

using asio::awaitable;
using asio::buffer;
using asio::co_spawn;
using asio::detached;
using asio::ip::tcp;
namespace this_coro = asio::this_coro;
using namespace asio::experimental::awaitable_operators;
using std::chrono::steady_clock;
using namespace std::literals::chrono_literals;

constexpr auto use_nothrow_awaitable
    = asio::experimental::as_tuple(asio::use_awaitable);

awaitable<void> timeout(steady_clock::duration duration)
{
  asio::steady_timer timer(co_await this_coro::executor);
  timer.expires_after(duration);
  co_await timer.async_wait(use_nothrow_awaitable);
}

awaitable<void> transfer(tcp::socket& from, tcp::socket& to)
{
  std::array<char, 1024> data;

  for (;;) {
    // 超时管理精细到per-operation
    auto result1 = co_await (
        from.async_read_some(buffer(data), use_nothrow_awaitable) ||
        timeout(5s)
      );

    // 处理variant
    if (result1.index() == 1)
      co_return; // timed out

    auto [e1, n1] = std::get<0>(result1);
    if (e1)
      break;

    auto result2 = co_await (
        async_write(to, buffer(data, n1), use_nothrow_awaitable) ||
        timeout(1s)
      );

    if (result2.index() == 1)
      co_return; // timed out

    auto [e2, n2] = std::get<0>(result2);
    if (e2)
      break;
  }
}

// proxy实现也变得更加简单
awaitable<void> proxy(tcp::socket client, tcp::endpoint target)
{
  tcp::socket server(client.get_executor());

  auto [e] = co_await server.async_connect(target, use_nothrow_awaitable);
  if (!e) {
    co_await (
        transfer(client, server) ||
        transfer(server, client)
      );
  }
}

awaitable<void> listen(tcp::acceptor& acceptor, tcp::endpoint target)
{
  for (;;) {
    auto [e, client] = co_await acceptor.async_accept(use_nothrow_awaitable);
    if (e)
      break;

    auto ex = client.get_executor();
    co_spawn(ex, proxy(std::move(client), target), detached);
  }
}

int main(int argc, char* argv[])
{
  try {
    if (argc != 5) {
      std::cerr << "Usage: proxy";
      std::cerr << " <listen_address> <listen_port>";
      std::cerr << " <target_address> <target_port>\n";
      return 1;
    }

    asio::io_context ctx;

    auto listen_endpoint
        = *tcp::resolver(ctx).resolve(argv[1], argv[2], tcp::resolver::passive);

    auto target_endpoint = *tcp::resolver(ctx).resolve(argv[3], argv[4]);

    tcp::acceptor acceptor(ctx, listen_endpoint);

    co_spawn(ctx, listen(acceptor, target_endpoint), detached);

    ctx.run();
  } catch (std::exception& e) {
    std::cerr << "Exception: " << e.what() << "\n";
  }
}

通过awaitable operator与推导的返回值组合,现在我们不再需要多余的watchdog,并且在超时管理上能做到更加细粒度,也更易于使用。

而这就是我们最终版本的awesome proxy server。

THE END

如果你对awesome的Asio感兴趣,那就赶紧下载吧!

官方网站:think-async.com

This post is licensed under CC BY 4.0 by the author.
Contents