*BSD で kqueue・kevent を使ってみよう

前へ << C言語で ftp クライアントを作ってみよう (1) TCP/IP エラー処理 connect 編 >> 次へ

select() の欠点

select() は複数のディスクリプタをポーリングできる便利なシステムコールです。 しかしパフォーマンスはよくありません。理由は以下の通りです。
  • ユーザプロセスは、監視対象のディスクリプタ一覧をユーザ領域からカーネル領域にコピーする必要がある。
  • カーネルがポーリング結果をユーザ領域に返す際もコピーしなければならない。
  • カーネルは、ポーリング対象のディスクリプタを知るために、配列の全要素を調べなければならない。
  • ユーザプロセスも、入出力可能なディスクリプタを知るために、配列の全要素を調べなければならない。
  • 上記の作業は、select() を発行するたびに毎回行わなければならない。
select() のパフォーマンスが悪いことは広く知られていたので、 各 OS でいろいろな取り組みが行われてきました。

Solaris では Solaris7 (11/99) から /dev/poll というデバイスが追加され、 より効率のよいポーリングが可能になりました。 Linux ではカーネル 2.6 で epoll というシステムコールが導入されたようです。

kqueue() と kevent()

一方、FreeBSD では FreeBSD 4.1-RELEASE から kqueue()kevent() というシステムコールが追加されました。 *BSD 界にもよさが認められたようで、その後 NetBSD・OpenBSD にも移植されました。 NetBSD 1.6.1、OpenBSD 2.9 以降で利用可能です。

今回は *BSD で利用可能な kqueue()kevent() を使ってマルチスレッドサーバを実現してみましょう。

select() は入出力のポーリングしかできませんでしたが、 kqueue()kevent() はそれに加えて、

  • ファイルへの書き込み、ファイル削除など
  • プロセスの終了
  • ネットワークインタフェースのリンクアップ・リンクダウン (ネットワークの接続・切断)
などのカーネルのイベントを総合的に扱うことができる、より高機能な設計になっています。 ただしここでは select() の代替品として使う方法のみを説明します。

kqueue() とイベント登録

まず、カーネルイベントキューを取得します (以下、エラーチェックは省略してあります。 後で説明するサンプルソースではエラーチェックを行っていますので、そちらを参考にしてください)。
int kq;
kq = kqueue();
ここで得られたイベントキューに、「監視してほしいモノ」と 「通知してほしい条件」を登録するわけです。
struct kevent kev;
int sock = socket(AF_INET, SOCK_STREAM, 0);
... connect などの処理 ....
EV_SET(&kev, sock, EVFILT_READ, EV_ADD, 0, 0, NULL);
まず、「監視してほしいモノ」と「通知してほしい条件」を管理する構造体 struct kevent を宣言します。そこに EV_SET() を使ってデータをセットします。
  • 監視してほしいモノ… sock (ソケット)
  • 通知してほしい条件 … EVFILT_READ (読み取り可能な場合)
  • これをどうしてほしいのか … EV_ADD (イベントキューに追加する)
struct kevent は以下のような構造体となっています。
struct kevent {
        uintptr_t       ident;
        short           filter;
        u_short         flags;
        u_int           fflags;
        intptr_t        data;
        void            *udata;
};
EV_SET() はマクロで、実際には
kev.ident = sock;
kev.filter = EVFILT_READ;
kev.flags = EV_ADD;
kev.fflags = 0;
kev.data = 0;
kev.udata = NULL;
struct kevent のメンバに値を設定しているだけです。よって、 EV_SET() しただけでは何も起こりません。
そしてこれを
kevent(kq, &kev, 1, NULL, 0, NULL);
としてイベントキューに登録します (引数の意味は後述)。 これでカーネルは ディスクリプタ sock が読み取り可能になったときに通知してくれるようになりました。

イベント通知待ち

通知を待つ場合は、登録時と同じく kqueue() を使います。
struct kevent kev2;
int ret;
char buf[256];
int sock2;
int len;
ret = kevent(kq, NULL, 0, &kev2, 1, NULL);
sock2 = kev2.ident;
len = read(sock2, buf, sizeof(buf));
printf("受信データ=[%.*s]\n", len, buf);
kevent() の第四引数 &kev2 はイベントの内容を格納するアドレス、 第五引数の 1 は「発生したイベントをひとつだけ教えてね」ということ、 第六引数 (最後の引数) NULL は、タイムアウト時間です。 タイムアウト時間を NULL にすると指定した条件を満たすイベントが発生するまで kevent() から返ってきません。

kevent() から戻ってきたということは登録したイベントが発生した、 つまり sock が読み取り可能になったということです。 入力可能になったディスクリプタは kev2.ident に格納されていますので、 そこから read() して、その結果を標準出力に書き出しています。

ここでは struct kevent kev2 を新たに宣言していますが、 struct kevent kev を使い回しても問題はありません。 単に kev と kev2 には関連性がないことを示したかっただけです。

つまり、一度セットしたイベントの内容を覚えておく必要はありません。 一度 kevent() で登録したイベントはカーネル内部に保存されます。

select() では、 ユーザ空間とカーネル空間のデータコピーがパフォーマンス低下の一因でしたので、 kevent() ではカーネル内部で通知を希望するイベントを覚えておくことにしたわけです。
さて、イベント登録時・イベント通知待ちいずれの場合も kevent() を使います。 引数を見比べてみましょう。
/* イベント登録の場合 */
kevent(kq, &kev, 1, NULL, 0, NULL);
/* イベント通知待ちの場合 */
kevent(kq, NULL, 0, &kev2, 1, NULL);
まとめると、
  • 第一引数はイベントキュー
  • 第二引数は登録したいイベントのアドレス
  • 第三引数は登録したいイベントの個数
  • 第四引数は通知されたイベントを格納するアドレス
  • 第五引数は通知を希望するイベントの個数
  • 第六引数はタイムアウト時間
となります。
タイムアウト時間の概念は select() と同じですが、型が異なります。 select() のタイムアウト時間は struct timeval で指定します。 一方 kevent() のタイムアウト時間は struct timespec で指定します。 struct timeval はミリ秒 (0.001 秒) 単位ですが、 struct timespec はマイクロ秒 (0.000001 秒) 単位まで指定できます。

イベントの無効化

一度登録したイベントは、struct keventflags を変更したイベントを登録することで、 無効化・有効化したり、削除することができます。
struct kevent kev;
EV_SET(&kev, sock, EVFILT_READ, EV_ADD, 0, 0, NULL);
kevent(kq, &kev, 1, NULL, 0, NULL);
/* ⇒ イベント登録。イベントが通知される */
...
EV_SET(&kev, sock, EVFILT_READ, EV_DISABLE, 0, 0, NULL);
kevent(kq, &kev, 1, NULL, 0, NULL);
/* ⇒ イベント無効化。イベントが通知されなくなる */
...
EV_SET(&kev, sock, EVFILT_READ, EV_ENABLE, 0, 0, NULL);
kevent(kq, &kev, 1, NULL, 0, NULL);
/* ⇒ イベント有効化。再度イベントが通知されるようになる */
...
EV_SET(&kev, sock, EVFILT_READ, EV_DELETE, 0, 0, NULL);
kevent(kq, &kev, 1, NULL, 0, NULL);
/* ⇒ イベント削除 */
なお、ディスクリプタについてのイベントは、ディスクリプタを close() すると自動的に削除されます。

kqueue・kevent を使った echo サーバ

kqueue()kevent() を使った echo サーバのサンプルです。 接続してきたディスクリプタの値が MAX_SOCK (256) を越えるとバッファオーバーランが発生することに注意してください。

echo-server-kqueue-1.c

    1: /*
    2:  * $Id: echo-server-kqueue-1.c,v 1.3 2005/06/11 20:25:10 68user Exp $
    3:  *
    4:  * kqueue・kevent を使ったマルチスレッド echo サーバ
    5:  *
    6:  * written by 68user  http://X68000.q-e-d.net/~68user/
    7:  */
    8: 
    9: #include <stdio.h>
   10: #include <stdlib.h>
   11: #include <time.h>
   12: #include <netdb.h>
   13: #include <string.h>
   14: #include <sys/types.h>
   15: #include <sys/socket.h>
   16: #include <sys/time.h>
   17: #include <unistd.h>
   18: #include <netinet/in.h>
   19: #include <arpa/inet.h>
   20: #include <sys/event.h>
   21: 
   22: #define BUF_LEN  256  /* バッファのサイズ */
   23: #define MAX_SOCK 256  /* 最大プロセス数 */
   24: 
   25:                                 /* クライアントの情報を保持する構造体 */
   26: typedef struct CLIENT_INFO {
   27:     char hostname[BUF_LEN];       /* ホスト名 */
   28:     char ipaddr[BUF_LEN];         /* IP アドレス */
   29:     int port;                     /* ポート番号 */
   30: } CLIENT_INFO;
   31: 
   32: CLIENT_INFO client_info[MAX_SOCK+1];
   33: 
   34: /*-----------------------------------------------------
   35:   引数でリスニングソケットを受け取り、accept し、
   36:   client_info に新しいクライアントの情報を登録する。
   37:   戻り値は新しいクライアントのソケットディスクリプタ。
   38:   ただしエラー発生時は -1 を返す。
   39:   -----------------------------------------------------*/
   40: int
   41: accept_new_client(int sock){
   42:     int len;
   43:     int new_socket;
   44:     struct hostent *peer_host;
   45:     struct sockaddr_in peer_sin;
   46: 
   47:     len = sizeof(peer_sin);
   48:     new_socket = accept(sock, (struct sockaddr *)&peer_sin, &len);
   49:     if ( new_socket == -1 ){
   50:         perror("accept");
   51:         exit(1);
   52:     }
   53:                                 /* ここから先はデバッグ用の情報取得 */
   54:     peer_host = gethostbyaddr((char *)&peer_sin.sin_addr.s_addr,
   55:                               sizeof(peer_sin.sin_addr), AF_INET);
   56: 
   57:                                 /* ホスト名 */
   58:     strncpy(client_info[new_socket].hostname, peer_host->h_name,
   59:             sizeof client_info[new_socket].hostname);
   60:                                 /* IP アドレス */
   61:     strncpy(client_info[new_socket].ipaddr, inet_ntoa(peer_sin.sin_addr),
   62:             sizeof client_info[new_socket].ipaddr);
   63:                                 /* ポート番号 */
   64:     client_info[new_socket].port = ntohs(peer_sin.sin_port);
   65:   
   66:     printf("接続: %s (%s) ポート %d  ディスクリプタ %d 番\n",
   67:            client_info[new_socket].hostname,
   68:            client_info[new_socket].ipaddr,
   69:            client_info[new_socket].port,
   70:            new_socket);
   71:     return new_socket;
   72: }
   73: 
   74: /*-----------------------------------------------------
   75:   引数でソケットディスクリプタを受け取り、そのソケットから
   76:   read(2) で文字列を読み込み、文字列をそのままクライアントに
   77:   送信する。
   78:   -----------------------------------------------------*/
   79: void
   80: read_and_reply(int sock){
   81:     int read_size;
   82:     int ret;
   83:     char buf[BUF_LEN];
   84: 
   85:     read_size = read(sock, buf, sizeof(buf)-1);
   86: 
   87:     if ( read_size == -1 ){
   88:         perror("read");
   89:     } else if ( read_size == 0 ){
   90:         printf("%s (%s) ポート %d  ディスクリプタ %d 番からの接続が切れました。\n",
   91:                client_info[sock].hostname,
   92:                client_info[sock].ipaddr,
   93:                client_info[sock].port,
   94:                sock);
   95:         ret = close(sock);
   96:         if ( ret == -1 ){
   97:             perror("close");
   98:             exit(1);
   99:         }
  100:     } else {
  101:         printf("%s (%s) ポート %d  ディスクリプタ %d 番からのメッセージ: %.*s", 
  102:                client_info[sock].hostname,
  103:                client_info[sock].ipaddr,
  104:                client_info[sock].port,
  105:                sock,
  106:                read_size,
  107:                buf);
  108:         write(sock, buf, read_size);
  109:     }
  110: }
  111: 
  112: int
  113: main(){
  114:     int sock_optval = 1;
  115:     int ret;
  116:     int port = 5000;
  117:     int listening_socket;
  118:     struct sockaddr_in sin;
  119: 
  120:                                 /* リスニングソケットを作成 */
  121:     listening_socket = socket(AF_INET, SOCK_STREAM, 0);
  122: 
  123:                                 /* ソケットオプション設定 */
  124:     if ( setsockopt(listening_socket, SOL_SOCKET, SO_REUSEADDR,
  125:                     &sock_optval, sizeof(sock_optval)) == -1 ){
  126:         perror("setsockopt");
  127:         exit(1);
  128:     }
  129:                                 /* アドレスファミリ・ポート番号・IPアドレス設定 */
  130:     sin.sin_family = AF_INET;
  131:     sin.sin_port = htons(port);
  132:     sin.sin_addr.s_addr = htonl(INADDR_ANY);
  133: 
  134:     ret = bind(listening_socket, (struct sockaddr *)&sin, sizeof(sin));
  135:     if ( ret == -1 ){
  136:         perror("bind");
  137:         exit(1);
  138:     }
  139: 
  140:     ret = listen(listening_socket, SOMAXCONN);
  141:     if ( ret == -1 ){
  142:         perror("listen");
  143:         exit(1);
  144:     }
  145:     printf("ポート %d を見張ります。\n", port);
  146: 
  147:     int kq;
  148:     struct kevent kev;
  149: 
  150:     kq = kqueue();
  151:     if ( kq == -1 ){
  152:         perror("kqueue");
  153:         exit(1);
  154:     }
  155:     EV_SET(&kev, listening_socket, EVFILT_READ, EV_ADD, 0, 0, NULL);
  156:     ret = kevent(kq, &kev, 1, NULL, 0, NULL);
  157:     if ( ret == -1 ){
  158:         perror("kevent");
  159:         exit(1);
  160:     }
  161: 
  162:     while (1){
  163:         int n;
  164:         struct timespec waitspec;     /* kevent に待ち時間を指定するための構造体 */
  165:         waitspec.tv_sec  = 2;         /* 待ち時間に 2.500 秒を指定 */
  166:         waitspec.tv_nsec = 500000;
  167: 
  168:         n = kevent(kq, NULL, 0, &kev, 1, &waitspec);
  169: 
  170:         if ( n == -1 ){
  171:             perror("kevent");
  172:             exit(1);
  173: 
  174:         } else if ( n > 0 ){
  175:                                 /* 読み込み可能なソケットが存在する */
  176:             if ( kev.ident == listening_socket ){
  177:                                 /* 新しいクライアントがやってきた */
  178:                 int new_sock = accept_new_client(kev.ident);
  179:                 if ( new_sock != -1 ){
  180:                                 /* 監視対象に新たなソケットを追加 */
  181:                     EV_SET(&kev, new_sock, EVFILT_READ, EV_ADD, 0, 0, NULL);
  182:                     n = kevent(kq, &kev, 1, NULL, 0, NULL);
  183:                     if ( n == -1 ){
  184:                         perror("kevent");
  185:                         exit(1);
  186:                     }
  187:                 }
  188: 
  189:             } else {
  190:                 int sock = kev.ident;
  191:                 printf("ディスクリプタ %d 番が読み込み可能です。\n", sock);
  192:                 read_and_reply(sock);
  193:             }
  194:         }
  195:     }
  196:     
  197:     close(listening_socket);
  198:     return 0;
  199: }

さらなるパフォーマンスの向上を目指して

移植性のある select() を使わずに、現時点では *BSD でしか使えない kqueue・kevent を使うということは、 よほどパフォーマンスにこだわらなければならない場面だと思われます。

以下、kevent() の呼び出しの回数を減らす方法を紹介します。

複数イベントの登録

一つのイベントキューには複数のイベントを登録することができます。 例えば 3つのソケットからの読み取りを通知するイベントを登録するには、 以下のように kevent() を 3回呼びます。
int sock1, socke2, sock3;
struct kevent kev;
EV_SET(&kev, sock1, EVFILT_READ, EV_ADD, 0, 0, NULL);
kevent(kq, &kev, 1, NULL, 0, NULL);
EV_SET(&kev, sock2, EVFILT_READ, EV_ADD, 0, 0, NULL);
kevent(kq, &kev, 1, NULL, 0, NULL);
EV_SET(&kev, sock3, EVFILT_READ, EV_ADD, 0, 0, NULL);
kevent(kq, &kev, 1, NULL, 0, NULL);
kevent() の第三引数は登録したいイベントの個数」と書いたので、 察しのよい人は気付いているかもしれませんが、一度に複数のイベントを登録することもできます。
int sock1, socke2, sock3;
struct kevent kev[3];
EV_SET(&kev[0], sock1, EVFILT_READ, EV_ADD, 0, 0, NULL);
EV_SET(&kev[1], sock2, EVFILT_READ, EV_ADD, 0, 0, NULL);
EV_SET(&kev[2], sock3, EVFILT_READ, EV_ADD, 0, 0, NULL);
kevent(kq, kev, 3, NULL, 0, NULL);

イベント通知の複数取得

複数のイベントをまとめてイベントキューに登録できるように、 イベント通知も複数取得するこができます。 例えば最大 10個のイベントキューを取得するには以下のようにします。
int i, n;
struct kevent kev[10];
n = kevent(kq, NULL, 0, &kev, sizeof(kev)/sizeof(kev[0]), NULL);
for ( i=0 ; i<n ; i++ ){
    int sock = kev[i].ident;
    read(sock, buf, sizeof(buf));
    ...
}

同時にイベント登録とイベント通知を

これまた察しのいい人は気付いているでしょうが、 kevent() はイベント登録とイベント通知取得を同時に行うことができます。 以下の疑似コードは、できるだけ kevent() の呼び出し回数を少なくした echo サーバです (エラー処理は省略しています)。

ちょっとわかりづらいですが、kevent() 呼び出しは一ヶ所だけで、そこでイベント登録とイベント通知取得を同時に行っています。

int kev_regist_num;     /* 登録するイベント数 */
struct kevent kev[10];  /* イベント登録・イベント通知取得共用 */
int listening_socket = socket(AF_INET, SOCK_STREAM, 0);
bind(listening_socket, (struct sockaddr *)&sin, sizeof(sin));
listen(listening_socket, SOMAXCONN);
EV_SET(&kev[0], listening_socket, EVFILT_READ, EV_ADD, 0, 0, NULL);
kev_regist_num=1;
while (1){
    int n = kevent(kq, kev, kev_regist_num, kev, sizeof(kev)/sizeof(kev[0]), NULL);
    kev_regist_num=0;
    for ( int i=0 ; i<n ; i++ ){
        int sock = kev[i].ident;
        if ( sock == listening_socket ){
            int new_sock = accept(sock, (struct sockaddr *)&peer_sin, &len);
            EV_SET(&kev[kev_regist_num++], new_sock, EVFILT_READ, EV_ADD, 0, 0, NULL);
        } else {
            int len = read(sock, buf, sizeof(buf));
            if ( len == 0 ){
                close(sock);
            } else {
                write(sock, buf, len);
            }
        }
    }
}
前へ << C言語で ftp クライアントを作ってみよう (1) TCP/IP エラー処理 connect 編 >> 次へ

ご意見・ご指摘は Twitter: @68user までお願いします。