WebSocket node.js ws 註解整理 – 1

這裡 註解 整理在Node.js中,WebSocket client/server常用的實現 ws 套件的程式碼

https://github.com/websockets/ws

關於WebSocket protocol,可參考另一篇文章的整理

index.js

'use strict';

const WebSocket = require('./lib/websocket');

WebSocket.createWebSocketStream = require('./lib/stream');
WebSocket.Server = require('./lib/websocket-server');
WebSocket.Receiver = require('./lib/receiver');
WebSocket.Sender = require('./lib/sender');

module.exports = WebSocket;

所有的implementation在lib資料夾,大約3600行,算是一個輕量的實現,可先從index.js export的WebSocket開始追起,這也是一般js WebSocket client使用的class,WebSocket client API在實現上也是參考 browser使用的API,所以https://developer.mozilla.org/en-US/docs/Web/API/WebSocket 也要一併參考

以下逐行整理註解lib/websocket.js

lib/websocket.js

'use strict';

const EventEmitter = require('events');
const https = require('https');
const http = require('http');
const net = require('net');
const tls = require('tls');
const { randomBytes, createHash } = require('crypto');
const { URL } = require('url');

const PerMessageDeflate = require('./permessage-deflate');
const Receiver = require('./receiver');
const Sender = require('./sender');
const {
  BINARY_TYPES,
  EMPTY_BUFFER,
  GUID,
  kStatusCode,
  kWebSocket,
  NOOP
} = require('./constants');
//在Node.js中,繼承EventEmitter會有addListener,removeListener。引入 event-target主要是為了實作相容於web的addEventListener, removeEventListener介面
const { addEventListener, removeEventListener } = require('./event-target');
const { format, parse } = require('./extension');
const { toBuffer } = require('./buffer-util');

//下面定義的4個readyStates可參考 https://developer.mozilla.org/en-US/docs/Web/API/WebSocket ,這4個state在rfc 6455也有明確指出
const readyStates = ['CONNECTING', 'OPEN', 'CLOSING', 'CLOSED'];
const protocolVersions = [8, 13];
const closeTimeout = 30 * 1000;

上面可以看到支援的兩個version 8、13

8主要是對應https://tools.ietf.org/html/draft-ietf-hybi-thewebsocketprotocol-08 開始的版本到 -12 ,13主要是對應 https://tools.ietf.org/html/draft-ietf-hybi-thewebsocketprotocol-13 開始的版本,不過實際上支援的程度或相容性仍要看實作細節

/**
 * Class representing a WebSocket.
 *
 * @extends EventEmitter
 */
class WebSocket extends EventEmitter {
...
}

繼承 EventEmitter,可參考 Nodejs Events整理的說明

constructor的部分:

  /**
   * Create a new `WebSocket`.
   *
   * @param {(String|url.URL)} address The URL to which to connect
   * @param {(String|String[])} [protocols] The subprotocols
   * @param {Object} [options] Connection options
   */
  constructor(address, protocols, options) {
    super(); //ES6繼承使用的語法

    //BINARY_TYPES: ['nodebuffer', 'arraybuffer', 'fragments'] 預設使用nodebuffer
    this._binaryType = BINARY_TYPES[0];
    //預設的 close code 1006 代表未完成close,這邊主要是先定義一個fail state,操作成功或是有其他狀態變更才更新
    this._closeCode = 1006;
    //下面4個主要是紀錄close的訊息、close的情形
    this._closeFrameReceived = false;
    this._closeFrameSent = false;
    this._closeMessage = '';
    this._closeTimer = null;
    
    this._extensions = {};
    this._protocol = '';
    //RFC6455 p14 A connection is
   defined to initially be in a CONNECTING state.
    this._readyState = WebSocket.CONNECTING;
    //這邊可以作為實現網路參考,一個receiver, 一個sender
    this._receiver = null;
    this._sender = null;

    //underlying socket
    this._socket = null;

    if (address !== null) {
      this._bufferedAmount = 0;
      this._isServer = false;
      this._redirects = 0;

      //protocols如果是array就展開成字串 , 分隔
      if (Array.isArray(protocols)) {
        protocols = protocols.join(', ');
      } else if (typeof protocols === 'object' && protocols !== null) {
        //如果是object,就判定該argument指的是原本第三個的options,protocols從缺
        options = protocols;
        protocols = undefined;
      }
      initAsClient(this, address, protocols, options);
    } else {
      //WebSocket的address 明確地傳null,會當成server端的connection
      this._isServer = true;
    }
  }

接下來從constructor內的initAsClient往下追

/**
 * Initialize a WebSocket client.
 *
 * @param {WebSocket} websocket The client to initialize
 * @param {(String|url.URL)} address The URL to which to connect
 * @param {String} [protocols] The subprotocols
 * @param {Object} [options] Connection options
    //以下說明options
 * @param {(Boolean|Object)} [options.perMessageDeflate=true] Enable/disable
 *     permessage-deflate
 * @param {Number} [options.handshakeTimeout] Timeout in milliseconds for the
 *     handshake request
 * @param {Number} [options.protocolVersion=13] Value of the
 *     `Sec-WebSocket-Version` header
 * @param {String} [options.origin] Value of the `Origin` or
 *     `Sec-WebSocket-Origin` header
 * @param {Number} [options.maxPayload=104857600] The maximum allowed message
 *     size
 * @param {Boolean} [options.followRedirects=false] Whether or not to follow
 *     redirects
 * @param {Number} [options.maxRedirects=10] The maximum number of redirects
 *     allowed
 * @private
 */


function initAsClient(websocket, address, protocols, options) {
  const opts = {
    protocolVersion: protocolVersions[1], //預設 = 13
    maxPayload: 100 * 1024 * 1024, //預設 = 100MB
    perMessageDeflate: true, //預設開啟permessagedeflate
    followRedirects: false, //預設關閉redirect
    maxRedirects: 10, //預設最多10次redirects
    ...options, //這裡是merge options,options內有定義的field會蓋過前面寫的
    createConnection: undefined,
    socketPath: undefined,
    hostname: undefined,
    protocol: undefined,
    timeout: undefined,
    method: undefined,
    host: undefined,
    path: undefined,
    port: undefined
  };
  //如果protocolVersion不支援,throw RangeError
  if (!protocolVersions.includes(opts.protocolVersion)) {
    throw new RangeError(
      `Unsupported protocol version: ${opts.protocolVersion} ` +
        `(supported versions: ${protocolVersions.join(', ')})`
    );
  }
  let parsedUrl;

  //設定websocket需要的_url value
  //利用URL物件解析傳入的address
  if (address instanceof URL) {
    parsedUrl = address;
    websocket._url = address.href;
  } else {
    parsedUrl = new URL(address);
    websocket._url = address;
  }
  //ws支援unix domain socket
  const isUnixSocket = parsedUrl.protocol === 'ws+unix:';
  //unix socket需要有pathname,其他需要有host
  if (!parsedUrl.host && (!isUnixSocket || !parsedUrl.pathname)) {
    throw new Error(`Invalid URL: ${websocket.url}`);
  }
  //這邊主要判斷走一般tcp or tls連線
  const isSecure =
    parsedUrl.protocol === 'wss:' || parsedUrl.protocol === 'https:';
  const defaultPort = isSecure ? 443 : 80;
  const key = randomBytes(16).toString('base64'); //gen dynamic key for Sec-WebSocket-Key
  const get = isSecure ? https.get : http.get;
  let perMessageDeflate;
  opts.createConnection = isSecure ? tlsConnect : netConnect;

  opts.defaultPort = opts.defaultPort || defaultPort;
  opts.port = parsedUrl.port || defaultPort;
  opts.host = parsedUrl.hostname.startsWith('[')
    ? parsedUrl.hostname.slice(1, -1)
    : parsedUrl.hostname;
  opts.headers = {
    'Sec-WebSocket-Version': opts.protocolVersion,
    'Sec-WebSocket-Key': key,
    Connection: 'Upgrade',
    Upgrade: 'websocket',
    ...opts.headers
  };
  opts.path = parsedUrl.pathname + parsedUrl.search;
  opts.timeout = opts.handshakeTimeout;
  //如果有開permessagedeflate
  if (opts.perMessageDeflate) {
    perMessageDeflate = new PerMessageDeflate(
      opts.perMessageDeflate !== true ? opts.perMessageDeflate : {},
      false,
      opts.maxPayload
    );
   //這邊用es6 [] literal syntax是指 PerMessageDeflate.extensionName的值當成key
    opts.headers['Sec-WebSocket-Extensions'] = format({
      [PerMessageDeflate.extensionName]: perMessageDeflate.offer()
    });
  }
  //subprotocol
  if (protocols) {
    opts.headers['Sec-WebSocket-Protocol'] = protocols;
  }
  //指定Origin,主要是pass一些server的檢查
  if (opts.origin) {
    if (opts.protocolVersion < 13) {
      opts.headers['Sec-WebSocket-Origin'] = opts.origin;
    } else {
      opts.headers.Origin = opts.origin;
    }
  }  
  if (parsedUrl.username || parsedUrl.password) {
    opts.auth = `${parsedUrl.username}:${parsedUrl.password}`;
  }
  
  if (isUnixSocket) {
    const parts = opts.path.split(':');

    opts.socketPath = parts[0];
    opts.path = parts[1];
  } 

以下設置http req,透過http.get function,可參考: https://nodejs.org/docs/latest-v12.x/api/http.html#http_http_request_options_callback

  let req = (websocket._req = get(opts)); //發起連線

  if (opts.timeout) {
    //起始連接失敗 abortHandshake
    req.on('timeout', () => {
      abortHandshake(websocket, req, 'Opening handshake has timed out');
    });
  }

  //下面這段需要特別注意,主要是要清楚每一步代表的意思
  //例如 req.on('error' 發生的時機點
  //以及 req = null的時機點
  //以及 req.aborted == true的條件
  req.on('error', (err) => {
    if (req === null || req.aborted) return; //忽略abort的case,在ws, follow redirect是先abort再重新initAsClient
    req = websocket._req = null; //代表error event只會處理一次
    websocket._readyState = WebSocket.CLOSING;
    websocket.emit('error', err);
    websocket.emitClose(); //此處readyState會變為CLOSED
  });

上面的error event handler會檢查req是否為null,在ws中,client的req被set成null只有兩處,一個是error handler內的,主要是不做重複的處理,另外一個是收到upgrade之後,這邊可以理解成當client收到upgrade時,就完全主動掌握req的流程了(open handshake走到最後一步,server已經接受所以回傳Upgrade,等待client驗證完response header後,就切入socket),所以在切入socket前如果有錯誤,client就直接下abortHandshake,最後將控制權轉給socket,並且這段是synchronous code,到轉給socket前,不會進入到event handler。而後續的網路error,則由socket的error event handler處理 (ref websocket.js #174)

下面整理req收到response的整理,需注意再下面還有upgrade event,這兩種情況不會同時發生(有upgrade就不會emit response event),所以response可以理解為server reject(沒送101 switch protocol upgrade)

  //req收到response event的處理
  req.on('response', (res) => {
    const location = res.headers.location;
    const statusCode = res.statusCode;

    if (
      location && //有redirect header
      opts.followRedirects && //有設follow redirect
      statusCode >= 300 && //status code = 3xx
      statusCode < 400
    ) {
      if (++websocket._redirects > opts.maxRedirects) {
        abortHandshake(websocket, req, 'Maximum redirects exceeded');
        return;
      }
      //取消req,重新發起request
      req.abort();

      const addr = new URL(location, address);

      initAsClient(websocket, addr, protocols, options);
    } else if (!websocket.emit('unexpected-response', req, res)) {
      //emit return false代表沒有註冊該event name的listener
      abortHandshake(
        websocket,
        req,
        `Unexpected server response: ${res.statusCode}`
      );
    }
  });

這邊是收到Upgrade後的處理,在ws基本上就是丟棄req物件,改以底層的socket來做傳輸

  req.on('upgrade', (res, socket, head) => {
    websocket.emit('upgrade', res);

    //
    // The user may have closed the connection from a listener of the `upgrade`
    // event.
    //
    if (websocket.readyState !== WebSocket.CONNECTING) return;

    req = websocket._req = null; //關掉req的處理,這邊的操作是安全的,在nodejs lib _http_client.js 裡面 發出upgrade event後,再來就是'close' event

    //計算一下expect key
    const digest = createHash('sha1')
      .update(key + GUID)
      .digest('base64');

    if (res.headers['sec-websocket-accept'] !== digest) {
      abortHandshake(websocket, socket, 'Invalid Sec-WebSocket-Accept header');
      return;
    }

    //以下check 其他server response header
    const serverProt = res.headers['sec-websocket-protocol'];
    const protList = (protocols || '').split(/, */);
    let protError;

    if (!protocols && serverProt) {
      protError = 'Server sent a subprotocol but none was requested';
    } else if (protocols && !serverProt) {
      protError = 'Server sent no subprotocol';
    } else if (serverProt && !protList.includes(serverProt)) {
      protError = 'Server sent an invalid subprotocol';
    }

    if (protError) {
      abortHandshake(websocket, socket, protError);
      return;
    }
    //set handshaked subprotocol
    if (serverProt) websocket._protocol = serverProt;

    if (perMessageDeflate) {
      try {
        //檢查server extension response
        const extensions = parse(res.headers['sec-websocket-extensions']);
        //如果server有支援permessagedeflate的話...
        if (extensions[PerMessageDeflate.extensionName]) {
          perMessageDeflate.accept(extensions[PerMessageDeflate.extensionName]);
          websocket._extensions[
            PerMessageDeflate.extensionName
          ] = perMessageDeflate;
        }
      } catch (err) {
        abortHandshake(
          websocket,
          socket,
          'Invalid Sec-WebSocket-Extensions header'
        );
        return;
      }
    }
    //交給socket處理
    websocket.setSocket(socket, head, opts.maxPayload);
  });
}

Upgrade後就轉交給setSocket處理,這邊主要初始化sender, receiver,註冊相關的event handler/listener,然後WebSocket emit ‘open’

  /**
   * Set up the socket and the internal resources.
   *
   * @param {net.Socket} socket The network socket between the server and client
   * @param {Buffer} head The first packet of the upgraded stream
   * @param {Number} [maxPayload=0] The maximum allowed message size
   * @private
   */
  setSocket(socket, head, maxPayload) {
    //建構receiver: writable stream
    const receiver = new Receiver(
      this.binaryType, //buffer type
      this._extensions, //handshaked extensions
      this._isServer, //server or client, for mask
      maxPayload //maxpayload, default 100MB, 另外雖然RFC可以支援到64bit的長度,實際上的最大限制會被語言或是runtime限制住,例如js的max integer 2^53-1
    );
    //建構sender
    this._sender = new Sender(socket, this._extensions);
    this._receiver = receiver;
    this._socket = socket;

    receiver[kWebSocket] = this; //kWebSocket = Symbol('websocket')
    socket[kWebSocket] = this;

    receiver.on('conclude', receiverOnConclude); //conclude是結束的意思
    receiver.on('drain', receiverOnDrain); //writable stream event
    receiver.on('error', receiverOnError); //writable stream event
    receiver.on('message', receiverOnMessage); //receiver event
    receiver.on('ping', receiverOnPing); //receiver event
    receiver.on('pong', receiverOnPong); //receiver event

    socket.setTimeout(0); //turn off idle timeout
    socket.setNoDelay();  //disable the use of Nagle's algorithm.

    //這一行比較特別,他把head放回去,後面說明
    if (head.length > 0) socket.unshift(head);

    //register socket event listener
    socket.on('close', socketOnClose);
    socket.on('data', socketOnData);
    socket.on('end', socketOnEnd);
    socket.on('error', socketOnError);

    //finally, change to OPEN state, emit 'open'
    this._readyState = WebSocket.OPEN;
    this.emit('open');
  }

上面有一行 if (head.length > 0) socket.unshift(head); 是將head的data放回socket buffer,這邊的head不是指http header之類的,他是在http client upgrade時parse超過的部分,因為socket data來的時候有可能在http header後body也一起帶過來了,畢竟對server來說,當client request合格後,server side的websocket connection在送完open handshake response就是OPEN狀態了,後面可以繼續送body,所以從client角度來看,有可能同時收到server handshake response + websocket data(body),可參考:

If the server finishes these steps without aborting the WebSocket handshake, the server considers the WebSocket connection to be established and that the WebSocket connection is in the OPEN state.

RFC 6455 p25

在initAsClient裡建連線使用opts.createConnection變數,傳入http.get,opts.createConnection = isSecure ? tlsConnect : netConnect;

/**
 * Create a `net.Socket` and initiate a connection.
 *
 * @param {Object} options Connection options
 * @return {net.Socket} The newly created socket used to start the connection
 * @private
 */
function netConnect(options) {
  options.path = options.socketPath;
  return net.connect(options); //這裡的net.connect是 net.createConnection()返回一個socket
}

/**
 * Create a `tls.TLSSocket` and initiate a connection.
 *
 * @param {Object} options Connection options
 * @return {tls.TLSSocket} The newly created socket used to start the connection
 * @private
 */
function tlsConnect(options) {
  options.path = undefined;

  //這邊是設定SNI (Server Name Indication) extension,因為tls connection先於http request,所以對於virtual host來說,建TLS需要再加上SNI
  if (!options.servername && options.servername !== '') {
    options.servername = net.isIP(options.host) ? '' : options.host;
  }

  return tls.connect(options); //tls.connect返回tls.TLSSocket
}

ws透過createConnection參數省掉了custom http agent,可以理解為直接透過net.connect的raw connection,而避開的http.Agent的一些機制的實現,例如keep alive connection,事實上websocket的連線也不需要。

接下來整理有關close connection的部分,close大概分幾種狀態

  • abort handshake
  • 使用者下close()
  • tcp socket收到close
  • 收到對方送來close

abortHandshake發生的時機點主要是在websocket open handshake完成之前

/**
 * Abort the handshake and emit an error.
 *
 * @param {WebSocket} websocket The WebSocket instance
 * @param {(http.ClientRequest|net.Socket)} stream The request to abort or the
 *     socket to destroy
 * @param {String} message The error message
 * @private
 */
function abortHandshake(websocket, stream, message) {
  //更改狀態CLOSING
  websocket._readyState = WebSocket.CLOSING;
  //在err物件加上call此function的stack trace
  const err = new Error(message);
  Error.captureStackTrace(err, abortHandshake);
  
  //這裡用stream.setHeader判斷是否可以response header
  //以client來說 當收到server upgrade header時,就代表無法再response header
  //如果此時要abort 就是立即斷線
  if (stream.setHeader) {
    stream.abort();
    stream.once('abort', websocket.emitClose.bind(websocket));
    websocket.emit('error', err);
  } else {
    stream.destroy(err);
    stream.once('error', websocket.emit.bind(websocket, 'error'));
    stream.once('close', websocket.emitClose.bind(websocket));
  }
}

在ws中abortHandshake分成req 或是 socket,在程式碼內是用stream.setHeader來判斷,req.abort裡面也會做socket.destroy,唯一的差別是在req.abort時,socket不一定存在(http req.abort有做相關檢查),兩個API都是等同於直接斷線,因為從client的角度來看,在open handshake完成之前(before ‘open’ state),關閉連線就是直接斷線,沒有其他做法(沒有辦法做close handshake,也沒有辦法像server可以response status code)

再來整理使用者下close()的情形,須注意此public method – close() 在內部的關閉連線也是統一呼叫此method,所以不僅只是外部使用者呼叫而已

 /**
   * Start a closing handshake.
   *
   *          +----------+   +-----------+   +----------+
   *     - - -|ws.close()|-->|close frame|-->|ws.close()|- - -
   *    |     +----------+   +-----------+   +----------+     |
   *          +----------+   +-----------+         |
   * CLOSING  |ws.close()|<--|close frame|<--+-----+       CLOSING
   *          +----------+   +-----------+   |
   *    |           |                        |   +---+        |
   *                +------------------------+-->|fin| - - - -
   *    |         +---+                      |   +---+
   *     - - - - -|fin|<---------------------+
   *              +---+
   *
   * @param {Number} [code] Status code explaining why the connection is closing
   * @param {String} [data] A string explaining why the connection is closing
   * @public
   */

  close(code, data) {
    //close handshake要在state OPEN後才有意義
    //CLOSED狀態:已關閉連線 不需要處理
    if (this.readyState === WebSocket.CLOSED) return;
    //CONNETING: 在open之前,使用abortHandshake處理
    if (this.readyState === WebSocket.CONNECTING) {
      const msg = 'WebSocket was closed before the connection was established';
      return abortHandshake(this, this._req, msg);
    }
    //CLOSING狀態: 這個狀態比較複雜,因為很多情況會跑到此狀態
    //包括: req.on('error' event listener
    //      abortHandshake
    //這裡比較需要注意的是: 下面開始就是走 close handshake 這邊需要檢查是否有機會讓未open handshake完成的連線走到下面的步驟,這邊看不太出來,還需要再檢查確認,後面再補充說明

    //RFC要求,close handshake要收到close + 送出一次 close算完成
    //下面會要判斷 已sent & recv close再socket.end()是因為
    //receiverOnConclude把關閉socket連線的工作交給close() 可參考websocket.js L779
    //以下處理先送close再收到close然後斷線的情形 CASE 1
    if (this.readyState === WebSocket.CLOSING) {
      if (this._closeFrameSent && this._closeFrameReceived) this._socket.end();
      //如果已經是CLOSING狀態,代表呼叫已經send close,並且設定timeout timer,所以後面不用再處理了
      return;
    }

    this._readyState = WebSocket.CLOSING;
    this._sender.close(code, data, !this._isServer, (err) => {
      //
      // This error is handled by the `'error'` listener on the socket. We only
      // want to know if the close frame has been sent here.
      //
      //socket error,不能確定close frame是否成功送出
      if (err) return;

      this._closeFrameSent = true;

      //以下處理收到close並已送出close然後斷線的情形 CASE 2
      if (this._closeFrameReceived) this._socket.end();
    });

    //
    // Specify a timeout for the closing handshake to complete.
    //
    //這邊可以理解為送出close後,timeout後強制將socket destroy,清除資源
    this._closeTimer = setTimeout(
      this._socket.destroy.bind(this._socket),
      closeTimeout //預設 30 seconds
    );
  }

在上方有個地方判斷closeFrameSent + closeFrameReceived就做socket.end,socket.end是half-close,也就是送出FIN,ws統一在close()裡判斷是否要關閉連線,close handshake無論是 recv close + send close -> 斷線 or send close + recv close ->斷線 這兩種斷線的case分別在close()處理了

另外是sender.close()收到 socket error時,因為不確定close frame是否正確送出,所以就直接返回,此時_closeFrameSent = false,流程上有以下可能

  1. 先收到close,回覆close,進入CLOSING,送出可能失敗了,對方收不到close等到timeout後就TCP斷線,本地端timeout也會強制destroy socket
  2. 先送出close,但失敗了,已進入CLOSING狀態,本地端timeout後會強制destroy socket

其實還有其他組合,例如兩邊同時送close frame,送失敗的那一方會收到close frame,但是因為進入CLOSING狀態而closeFrameSent是false,代表closeFrameSent不可能再被設定了(CLOSING狀態之後的close()就是return),這邊就依賴timeout timer處理

除了使用者主動下close()外,以及abortHandshake情形,其他的case都是被動close,無論是網路斷線,或是收到close frame後進行close handshake,以下整理出所有被動引發close的條件

receiver.on('conclude', receiverOnConclude); //收到close frame
receiver.on('error', receiverOnError); //parse frame error
socket.on('close', socketOnClose); //Emitted once the socket is fully closed. The argument hadError is a boolean which says if the socket was closed due to a transmission error.
socket.on('end', socketOnEnd); //Emitted when the other end of the socket sends a FIN packet, thus ending the readable side of the socket.
socket.on('error', socketOnError); //Emitted when an error occurs. The 'close' event will be called directly following this event.

以下分別註解對應的event listener

以下是收到close frame時

/**
 * The listener of the `Receiver` `'conclude'` event.
 *
 * @param {Number} code The status code
 * @param {String} reason The reason for closing
 * @private
 */
function receiverOnConclude(code, reason) {
  const websocket = this[kWebSocket];

  //收到close frame,停止後續的data event接收
  websocket._socket.removeListener('data', socketOnData);  
  websocket._socket.resume();//**這裡resume()是socket.pause()暫停'data'後想要繼續接收'data' event時需要的,這行可能不必要呼叫
  //收到close frame設定相關資訊
  websocket._closeFrameReceived = true;
  websocket._closeMessage = reason;
  websocket._closeCode = code;
  //如果有帶close status code/reason,就回覆同樣的內容
  if (code === 1005) websocket.close();
  else websocket.close(code, reason);
}

以下是receiver parse error時

/**
 * The listener of the `Receiver` `'error'` event.
 *
 * @param {(RangeError|Error)} err The emitted error
 * @private
 */
function receiverOnError(err) {
  const websocket = this[kWebSocket];
  //停止繼續收data event
  websocket._socket.removeListener('data', socketOnData);

  websocket._readyState = WebSocket.CLOSING;
  websocket._closeCode = err[kStatusCode]; //這邊的err[kStatusCode]是由receiver所設定的,根據parse error的結果設定不同的code
  websocket.emit('error', err);
  //直接socket destroy
  websocket._socket.destroy();
}

以下是socket斷線時,這邊的錯誤處理比較多細節須注意: socket ‘close’ event: Emitted once the socket is fully closed. The argument hadError is a boolean which says if the socket was closed due to a transmission error

/**
 * The listener of the `net.Socket` `'close'` event.
 *
 * @private
 */
function socketOnClose() {
  const websocket = this[kWebSocket];

  this.removeListener('close', socketOnClose);
  this.removeListener('end', socketOnEnd);

  //這邊需特別注意是否重複走入CLOSING狀態或是是否有機會與CLOSED衝突,主要的關鍵點在socket close event可能發生的時間點,待之後另文分析
  websocket._readyState = WebSocket.CLOSING;

  //
  // The close frame might not have been received or the `'end'` event emitted,
  // for example, if the socket was destroyed due to an error. Ensure that the
  // `receiver` stream is closed after writing any remaining buffered data to
  // it. If the readable side of the socket is in flowing mode then there is no
  // buffered data as everything has been already written and `readable.read()`
  // will return `null`. If instead, the socket is paused, any possible buffered
  // data will be read as a single chunk and emitted synchronously in a single
  // `'data'` event.
  //
  websocket._socket.read(); //這裡的目的主要是清read buffer給receiver的writable stream,是synchronous call to 'data' event
  websocket._receiver.end(); //no more data

  this.removeListener('data', socketOnData); //移除data event listener
  this[kWebSocket] = undefined;

  clearTimeout(websocket._closeTimer); //關掉timeout timer,因為socket收到close會正常關閉,不須再destroy
  //什麼時候writableState finished or errorEmitted?
  //errorEmitted: 當stream writer送出error event, 發生於receiver _write 返回error時,例如receiver parse frame error
  //finished: 當stream writer送出finish event
  if (
    websocket._receiver._writableState.finished ||
    websocket._receiver._writableState.errorEmitted
  ) {
    websocket.emitClose(); //emit close!
  } else {
   //否則就等待 error or finish event,等到後就emit close
    websocket._receiver.on('error', receiverOnFinish); //will emit close
    websocket._receiver.on('finish', receiverOnFinish); //will emit close
  }
}

/**
 * The listener of the `Receiver` `'finish'` event.
 *
 * @private
 */
function receiverOnFinish() {
  this[kWebSocket].emitClose();
}

上面因為牽涉到writable stream和socket本身的error handling,有點錯綜複雜,最主要的原因在於使用者必須要很清楚每個event發生的原因和時間點,以及呼叫了close destroy end之類的function的流程細節是什麼、會收到什麼event、順序是什麼,不然使用上一定會漏掉很多情形

socket ‘end’ event是收到TCP FIN

/**
 * The listener of the `net.Socket` `'end'` event.
 *
 * @private
 */
function socketOnEnd() {
  const websocket = this[kWebSocket];

  websocket._readyState = WebSocket.CLOSING;
  websocket._receiver.end();
  this.end();
}

下面是當發生socket error時的處理,需要注意的是程式為了只處理一次error event,移除了如果沒有listener listen ‘error’ event會導致process exit: If an EventEmitter does not have at least one listener registered for the 'error' event, and an 'error' event is emitted, the error is thrown, a stack trace is printed, and the Node.js process exits.

/**
 * The listener of the `net.Socket` `'error'` event.
 *
 * @private
 */
function socketOnError() {
  const websocket = this[kWebSocket];

  this.removeListener('error', socketOnError);
  //上面解除socketOnError要特別小心,因為nodejs的eventemitter預設的行為是如果沒有任何'error' listener,會直接process exit
  this.on('error', NOOP);
  //socket都error了,直接destroy
  if (websocket) {
    websocket._readyState = WebSocket.CLOSING;
    this.destroy();
  }
}

close部分最後會進入CLOSED狀態,這部分是透過emitClose來更新,也就是當WebSocket發出’close’ event時

  /**
   * Emit the `'close'` event.
   *
   * @private
   */
  emitClose() {
    if (!this._socket) {
      this._readyState = WebSocket.CLOSED;
      this.emit('close', this._closeCode, this._closeMessage);
      return;
    }

    if (this._extensions[PerMessageDeflate.extensionName]) {
      this._extensions[PerMessageDeflate.extensionName].cleanup();
    }

    this._receiver.removeAllListeners();
    this._readyState = WebSocket.CLOSED;
    this.emit('close', this._closeCode, this._closeMessage);
  }

close error handling還有一種情況是當使用者主動呼叫send()或其他會送出data的method,但是readyState不是OPEN,這部分在sendAfterClose處理

/**
 * Handle cases where the `ping()`, `pong()`, or `send()` methods are called
 * when the `readyState` attribute is `CLOSING` or `CLOSED`.
 *
 * @param {WebSocket} websocket The WebSocket instance
 * @param {*} [data] The data to send
 * @param {Function} [cb] Callback
 * @private
 */
function sendAfterClose(websocket, data, cb) {
  if (data) {
    const length = toBuffer(data).length;
    
    //
    // The `_bufferedAmount` property is used only when the peer is a client and
    // the opening handshake fails. Under these circumstances, in fact, the
    // `setSocket()` method is not called, so the `_socket` and `_sender`
    // properties are set to `null`.
    //
    //更新bufferAmount, ws內部有分handshake完前後,參考不同的變數,可參考下面 bufferedAmount()的實作
    if (websocket._socket) websocket._sender._bufferedBytes += length;
    else websocket._bufferedAmount += length;
  }

  if (cb) {
    const err = new Error(
      `WebSocket is not open: readyState ${websocket.readyState} ` +
        `(${readyStates[websocket.readyState]})`
    );
    cb(err);
  }
}

//這邊順便註記一下bufferedAmount,handshake完成之前(socket = null)是記在一個內部變數
  /**
   * @type {Number}
   */
  get bufferedAmount() {
    if (!this._socket) return this._bufferedAmount;

    return this._socket._writableState.length + this._sender._bufferedBytes;
  }

WebSocket還提供發送訊息的method如send ping pong,這邊介紹send

  /**
   * Send a data message.
   *
   * @param {*} data The message to send
   * @param {Object} [options] Options object
   * @param {Boolean} [options.compress] Specifies whether or not to compress
   *     `data`
   * @param {Boolean} [options.binary] Specifies whether `data` is binary or
   *     text
   * @param {Boolean} [options.fin=true] Specifies whether the fragment is the
   *     last one
   * @param {Boolean} [options.mask] Specifies whether or not to mask `data`
   * @param {Function} [cb] Callback which is executed when data is written out
   * @public
   */
  send(data, options, cb) {    
    if (this.readyState === WebSocket.CONNECTING) {
      throw new Error('WebSocket is not open: readyState 0 (CONNECTING)');
    }
    //options is optional
    if (typeof options === 'function') {
      cb = options;
      options = {};
    }
    //這邊只有判斷data是number時,轉成string
    if (typeof data === 'number') data = data.toString();

    //如果狀態是CLOSING CLOSED
    if (this.readyState !== WebSocket.OPEN) {
      sendAfterClose(this, data, cb);
      return;
    }

    const opts = {
      binary: typeof data !== 'string',
      mask: !this._isServer,
      compress: true,
      fin: true,
      ...options
    };

    if (!this._extensions[PerMessageDeflate.extensionName]) {
      opts.compress = false;
    }
    //交給sender處理,這邊的細節再留到解析sender.js再說明
    this._sender.send(data || EMPTY_BUFFER, opts, cb);
  }

}

This entry was posted in nodejs. Bookmark the permalink.

Leave a Reply