NodeJS模块研究 - cluster

Nodejs 提供了 cluster 来支持服务集群的扩展,提高多核 CPU 的利用效率,实现负载均衡,最大程度利用机器性能。本文从以下几个方面介绍 cluster 的 API 和用法:

  • cluster 启动 HTTP 服务器
  • 如何进行广播?
  • 如何实现状态共享?
  • 如何处理进程退出?
  • 更多进程控制方法:心跳保活、自动重启、负载检测

🔍 关注公众号“心谭博客” / 👉 查看原文: xxoo521.com / 更多前端与算法的系列文章

# cluster 启动 HTTP 服务器

为了方便测试,全局安装 autocannon:

npm install -g autocannon

不借助 cluster,编写一个简单的 http 服务器:

const http = require("http");

http
  .createServer((req, res) => {
    // 模拟cpu计算
    for (let i = 0; i < 100000; ++i) {}
    res.statusCode = 200;
    res.end("hello world!");
  })
  .listen(4000);

借助 autocannon 开启 1000 个连接,每个连接的请求次数为 10 次,压测结果如下:

➜  _posts git:(master) ✗ autocannon -c 1000 -p 10 http://127.0.0.1:4000
Running 10s test @ http://127.0.0.1:4000
1000 connections with 10 pipelining factor

┌─────────┬──────┬──────┬────────┬────────┬──────────┬───────────┬────────────┐
│ Stat    │ 2.5% │ 50%  │ 97.5%  │ 99%    │ Avg      │ Stdev     │ Max        │
├─────────┼──────┼──────┼────────┼────────┼──────────┼───────────┼────────────┤
│ Latency │ 0 ms │ 0 ms │ 636 ms │ 650 ms │ 62.48 ms │ 197.51 ms │ 2928.64 ms │
└─────────┴──────┴──────┴────────┴────────┴──────────┴───────────┴────────────┘
┌───────────┬─────────┬─────────┬─────────┬─────────┬──────────┬────────┬─────────┐
│ Stat      │ 1%      │ 2.5%    │ 50%     │ 97.5%   │ Avg      │ Stdev  │ Min     │
├───────────┼─────────┼─────────┼─────────┼─────────┼──────────┼────────┼─────────┤
│ Req/Sec   │ 13095   │ 13095   │ 15911   │ 16303   │ 15558.91 │ 901.48 │ 13092   │
├───────────┼─────────┼─────────┼─────────┼─────────┼──────────┼────────┼─────────┤
│ Bytes/Sec │ 1.47 MB │ 1.47 MB │ 1.78 MB │ 1.83 MB │ 1.74 MB  │ 101 kB │ 1.47 MB │
└───────────┴─────────┴─────────┴─────────┴─────────┴──────────┴────────┴─────────┘

Req/Bytes counts sampled once per second.

171k requests in 11.17s, 19.2 MB read
50 errors (0 timeouts)

然后用 cluster 模块来启动一个利用多核的 http 服务器,代码如下:

const cluster = require("cluster");
const http = require("http");
const os = require("os");

if (cluster.isMaster) {
  const cpuNum = os.cpus().length;
  for (let i = 0; i < cpuNum; ++i) {
    cluster.fork();
  }
} else {
  runServer();
}

function runServer() {
  http
    .createServer((req, res) => {
      for (let i = 0; i < 100000; ++i) {}
      res.statusCode = 200;
      res.end("hello world!");
    })
    .listen(4000);
}

同样利用 autocannon 进行测试,结果如下:

➜  _posts git:(master) ✗ autocannon -c 1000 -p 10 http://127.0.0.1:4000
Running 10s test @ http://127.0.0.1:4000
1000 connections with 10 pipelining factor

┌─────────┬──────┬──────┬────────┬────────┬─────────┬──────────┬──────────┐
│ Stat    │ 2.5% │ 50%  │ 97.5%  │ 99%    │ Avg     │ Stdev    │ Max      │
├─────────┼──────┼──────┼────────┼────────┼─────────┼──────────┼──────────┤
│ Latency │ 0 ms │ 0 ms │ 113 ms │ 125 ms │ 11.5 ms │ 37.37 ms │ 807.5 ms │
└─────────┴──────┴──────┴────────┴────────┴─────────┴──────────┴──────────┘
┌───────────┬────────┬────────┬─────────┬─────────┬─────────┬──────────┬────────┐
│ Stat      │ 1%     │ 2.5%   │ 50%     │ 97.5%   │ Avg     │ Stdev    │ Min    │
├───────────┼────────┼────────┼─────────┼─────────┼─────────┼──────────┼────────┤
│ Req/Sec   │ 43711  │ 43711  │ 97023   │ 108671  │ 90811.2 │ 16898.34 │ 43710  │
├───────────┼────────┼────────┼─────────┼─────────┼─────────┼──────────┼────────┤
│ Bytes/Sec │ 4.9 MB │ 4.9 MB │ 10.9 MB │ 12.2 MB │ 10.2 MB │ 1.89 MB  │ 4.9 MB │
└───────────┴────────┴────────┴─────────┴─────────┴─────────┴──────────┴────────┘

Req/Bytes counts sampled once per second.

908k requests in 10.7s, 102 MB read

可以看到,错误请求从 50 降低到 0,最长请求延迟从 2.9s 降低到了 0.8s,平均请求量从 1.5w 提升到了 9w,平均下载量从 1.74MB 提升到了 10.2MB。而本机的os.cpus().length返回的结果是 12,提升非常稳定,和 cpu 核数基本成正比。

从上面的实践也看到,从 cluster 开启的子进程总数量最好和 cpu 数量一样。

# 如何进行广播?

广播需要父子进程之间进行通信,多用于消息下发、数据共享。cluster 是基于 child_process 模块的,所以通信的做法和 child_process 区别不大。

在主进程中, cluster.workders 是个哈希表,可以遍历得到所有工作进程。如下所示,给所有的工作进程广播消息:

if (cluster.isMaster) {
  for (let i = 0; i < os.cpus().length; ++i) {
    cluster.fork();
  }
  // 给工作进程广播消息
  for (const id in cluster.workers) {
    cluster.workers[id].send({
      data: "msg",
    });
  }
} else if (cluster.isWorker) {
  // 工作进程接受到消息
  process.on("message", (msg) => {
    console.log("msg is", msg);
  });
}

# 如何实现状态共享?

在上一个例子中,看到了借助 cluster.workers 和事件机制,来进行消息广播。但由于集群的每个节点是“分散”,所以对于有状态的服务应该想办法解决“状态共享”这个问题。

例如有需要我们进行总访问量统计的需求,并且将当前的访问量返回给客户端。由于每个进程都承载了一部分访问,工作进程接收到请求的时候,需要向主进程上报;工作进程接收到上报,更新访问总量,并且广播给各个工作进程。这就是一个完整的消息上报 => 状态更新 => 消息广播的过程。

按照上面的思路,首先封装工作进程的 http 逻辑,如下所示:

function runServer() {
  let visitTotal = 0;
  // 接收主进程的广播
  process.on("message", (msg) => {
    if (msg.tag === "broadcast") visitTotal = msg.visitTotal;
  });

  http
    .createServer((req, res) => {
      // 消息上报给主进程
      process.send({
        tag: "report",
      });
      res.statusCode = 200;
      res.end(`visit total times is ${visitTotal + 1}`);
    })
    .listen(4000);
}

是的,就是通过传递消息上的一个字段,来标识是工作进程上报的消息还是主进程广播的消息。给主进程用的 broadcast() 函数如下:

function broadcast(workers, data) {
  for (const id in workers) {
    // 给工作进程广播消息
    workers[id].send({
      tag: "broadcast",
      ...data,
    });
  }
}

最后,主进程中需要为工作进程添加message事件的监听器,这样才能收到工作进程的消息,并且更新保存在主进程中的状态(visitTotal),完成广播。代码如下:

if (cluster.isMaster) {
  let visitTotal = 0; // 访客总人数

  const cpuNum = os.cpus().length;
  for (let i = 0; i < cpuNum; ++i) {
    cluster.fork();
  }

  for (const id in cluster.workers) {
    cluster.workers[id].on("message", (msg) => {
      if (msg.tag === "report") {
        ++visitTotal;
        broadcast(cluster.workers, { visitTotal });
      }
    });
  }
} else if (cluster.isWorker) {
  runServer();
}

其实,更常用的做法是专门准备一个服务器来进行统计,将服务单独部署。这里是为了深入理解和学习 cluster 模块。

# 如何处理进程退出?

cluster 模块中有 2 个 exit 事件:一个是 Worker 上的,仅用于工作进程中;另一个是主进程上,任何一个工作进程关闭都会触发。

在工作进程正常退出的时候,code 为 0,并且 Worker 上的 exitedAfterDisconnect 属性为 true。那么检测 code 和 exitedAfterDisconnect 属性,就能判断进程是否是异常退出。并且重新 fork 一个新的工作进程,来保持服务稳定运行。代码如下:

cluster.on("exit", (worker, code, signal) => {
  if (code || !worker.exitedAfterDisconnect) {
    console.log(`${worker.id} 崩溃,重启新的子进程`);
    cluster.fork();
  }
});

注意,exitedAfterDisconnect 属性在正常退出、调用 worker.kill() 或调用 worker.disconnect()时,均被设置为 true。因为调用 kill 和 disconnect 均为代码逻辑主动执行,属于程序的一部分。

# 更多进程控制方法:心跳保活、自动重启、负载检测

除了前面所讲的方法,进程控制的常见方法还有:心跳保活、自动重启、负载检测。

心跳保活:工作进程定时向主进程发送心跳包,主进程如果检测到长时间没有收到心跳包,要关闭对应的工作进程,并重启新的进程。

自动重启:给每个工作进程设置一个“生命周期”,例如 60mins。到时间后,通知主进程进行重启。

负载检测:工作进程和主进程可以定期检测 cpu 占用率、内存占用率、平均负载等指标,过高的话,则关闭重启对应工作进程。关于检测方法可以看这篇文章《NodeJS 模块研究 - os》

这些方法在 vemojs 中都有应用,具体可以看这篇文章:《VemoJS 源码拆解》

# 参考链接