Working with TCP Sockets 読書メモ 第21章 イベント駆動(Reactor)

Working with TCP Sockets 読書メモ 目次

イベント駆動(Reactor)

概要

  • Reactorパターンに基づいたイベント駆動のパターンが最近流行している
    • 具体的な実装例はNginx, Node.js等
  • このパターンはシングルスレッド・シングルプロセスで、並列性を実現する
  • このパターンでは、コネクションのライフサイクルの各ステージを任意の順番で実行可能なイベントとして扱う
  • 中心となるマルチプレクサ(Reactor)がコネクションのイベントをモニターして処理を起動する
  • 流れは以下のようになる
  1. サーバはソケットでコネクションを待ち受ける
  2. 新しいコネクションを受け付けると、モニター対象ソケットのリストに追加する
  3. サーバは有効なコネクションを監視しつつ、ソケットをlistenする
  4. 有効なコネクションが読み込み可能になった通知を受け取ると、サーバはコネクションからデータを読み取り、必要なコールバックを実行する
  5. 有効なコネクションがまだ読み込み可能であるという通知を受け取ると、サーバはコネクションからデータを読み取り、再びコールバックを実行する
  6. サーバが新しいコネクションを受け付けたら、モニター対象ソケットのリストに追加する
  7. サーバが最初の接続が書き込み可能になったというイベントを受け取ったら、レスポンスが書き込まれる

実装

require 'socket'
require_relative 'command_handler'

module FTP
  class Evented
    CHUNK_SIZE = 1024 * 16

    class Connection
      CRLF = "\r\n"
      attr_reader :client

      def initialize(io)
        @client             = io
        @request, @response = '', ''
        @handler            = CommandHandler.new(self)

        respond "220 OHAI"
        on_writable
      end

      def on_data(data)
        @request << data

        if @request.end_with?(CRLF)
          # リクエスト完了
          respond @handler.handle(@request)
          @request = ''
        end
      end

      def respond(message)
        @response << message + CRLF

        # 書込み可能なものだけすぐに書き込む
        # 残りは次にソケットが書き込み可能になった時に書き込む
        on_writable
      end

      def on_writable
        bytes = client.write_nonblock(@response)
        @response.slice!(0, bytes)
      end

      def monitor_for_reading?
        true
      end

      def monitor_for_writing?
        !(@response.empty?)
      end
    end

    def initialize(port = 21)
      @control_socket = TCPServer.new(port)
      trap(:INT) { exit }
    end

    def run
      @handles = {}

      loop do
        to_read  = @handles.values.select(&:monitor_for_reading?).map(&:client)
        to_write = @handles.values.select(&:monitor_for_writing?).map(&:client)

        readables, writables = IO.select(to_read + [@control_socket], to_write)

        readables.each do |socket|
          if socket == @control_socket
            io = @control_socket.accept
            connection = Connection.new(io)
            @handles[io.fileno] = connection
          else
            connection = @handles[socket.fileno]

            begin
              data = socket.read_nonblock(CHUNK_SIZE)
              connection.on_data(data)
            rescue Errno::EAGAIN
              # empty
            rescue EOFError
              @handles.delete(socket.fileno)
            end
          end
        end

        writables.each do |socket|
          connection = @handles[socket.fileno]
          connection.on_writable
        end
      end
    end
  end
end

server = FTP::Evented.new(4481)
server.run

考察

  • 利点:非常に高性能(数千〜数万コネクションを並列処理可能)
  • 欠点:特殊なプログラミングモデル(Reactorは絶対にブロックしてはならない)

ディスカッションに参加

1件のコメント

コメントを残す

コメントを残す