前へ << C言語で ftp クライアントを作ってみよう (1) | TCP/IP エラー処理 connect 編 >> 次へ |
Solaris では Solaris7 (11/99) から /dev/poll というデバイスが追加され、 より効率のよいポーリングが可能になりました。 Linux ではカーネル 2.6 で epoll というシステムコールが導入されたようです。
今回は *BSD で利用可能な kqueue()・kevent() を使ってマルチスレッドサーバを実現してみましょう。
select() は入出力のポーリングしかできませんでしたが、 kqueue()・kevent() はそれに加えて、
int kq; kq = kqueue();ここで得られたイベントキューに、「監視してほしいモノ」と 「通知してほしい条件」を登録するわけです。
struct kevent kev; int sock = socket(AF_INET, SOCK_STREAM, 0); ... connect などの処理 .... EV_SET(&kev, sock, EVFILT_READ, EV_ADD, 0, 0, NULL);まず、「監視してほしいモノ」と「通知してほしい条件」を管理する構造体 struct kevent を宣言します。そこに EV_SET() を使ってデータをセットします。
struct kevent { uintptr_t ident; short filter; u_short flags; u_int fflags; intptr_t data; void *udata; };EV_SET() はマクロで、実際には
kev.ident = sock; kev.filter = EVFILT_READ; kev.flags = EV_ADD; kev.fflags = 0; kev.data = 0; kev.udata = NULL;と struct kevent のメンバに値を設定しているだけです。よって、 EV_SET() しただけでは何も起こりません。
kevent(kq, &kev, 1, NULL, 0, NULL);としてイベントキューに登録します (引数の意味は後述)。 これでカーネルは ディスクリプタ sock が読み取り可能になったときに通知してくれるようになりました。
struct kevent kev2; int ret; char buf[256]; int sock2; int len; ret = kevent(kq, NULL, 0, &kev2, 1, NULL); sock2 = kev2.ident; len = read(sock2, buf, sizeof(buf)); printf("受信データ=[%.*s]\n", len, buf);kevent() の第四引数 &kev2 はイベントの内容を格納するアドレス、 第五引数の 1 は「発生したイベントをひとつだけ教えてね」ということ、 第六引数 (最後の引数) NULL は、タイムアウト時間です。 タイムアウト時間を NULL にすると指定した条件を満たすイベントが発生するまで kevent() から返ってきません。
kevent() から戻ってきたということは登録したイベントが発生した、 つまり sock が読み取り可能になったということです。 入力可能になったディスクリプタは kev2.ident に格納されていますので、 そこから read() して、その結果を標準出力に書き出しています。
ここでは struct kevent kev2 を新たに宣言していますが、 struct kevent kev を使い回しても問題はありません。 単に kev と kev2 には関連性がないことを示したかっただけです。
つまり、一度セットしたイベントの内容を覚えておく必要はありません。 一度 kevent() で登録したイベントはカーネル内部に保存されます。
/* イベント登録の場合 */ kevent(kq, &kev, 1, NULL, 0, NULL); /* イベント通知待ちの場合 */ kevent(kq, NULL, 0, &kev2, 1, NULL);まとめると、
struct kevent kev; EV_SET(&kev, sock, EVFILT_READ, EV_ADD, 0, 0, NULL); kevent(kq, &kev, 1, NULL, 0, NULL); /* ⇒ イベント登録。イベントが通知される */ ... EV_SET(&kev, sock, EVFILT_READ, EV_DISABLE, 0, 0, NULL); kevent(kq, &kev, 1, NULL, 0, NULL); /* ⇒ イベント無効化。イベントが通知されなくなる */ ... EV_SET(&kev, sock, EVFILT_READ, EV_ENABLE, 0, 0, NULL); kevent(kq, &kev, 1, NULL, 0, NULL); /* ⇒ イベント有効化。再度イベントが通知されるようになる */ ... EV_SET(&kev, sock, EVFILT_READ, EV_DELETE, 0, 0, NULL); kevent(kq, &kev, 1, NULL, 0, NULL); /* ⇒ イベント削除 */なお、ディスクリプタについてのイベントは、ディスクリプタを close() すると自動的に削除されます。
1: /* 2: * $Id: echo-server-kqueue-1.c,v 1.3 2005/06/11 20:25:10 68user Exp $ 3: * 4: * kqueue・kevent を使ったマルチスレッド echo サーバ 5: * 6: * written by 68user http://X68000.q-e-d.net/~68user/ 7: */ 8: 9: #include <stdio.h> 10: #include <stdlib.h> 11: #include <time.h> 12: #include <netdb.h> 13: #include <string.h> 14: #include <sys/types.h> 15: #include <sys/socket.h> 16: #include <sys/time.h> 17: #include <unistd.h> 18: #include <netinet/in.h> 19: #include <arpa/inet.h> 20: #include <sys/event.h> 21: 22: #define BUF_LEN 256 /* バッファのサイズ */ 23: #define MAX_SOCK 256 /* 最大プロセス数 */ 24: 25: /* クライアントの情報を保持する構造体 */ 26: typedef struct CLIENT_INFO { 27: char hostname[BUF_LEN]; /* ホスト名 */ 28: char ipaddr[BUF_LEN]; /* IP アドレス */ 29: int port; /* ポート番号 */ 30: } CLIENT_INFO; 31: 32: CLIENT_INFO client_info[MAX_SOCK+1]; 33: 34: /*----------------------------------------------------- 35: 引数でリスニングソケットを受け取り、accept し、 36: client_info に新しいクライアントの情報を登録する。 37: 戻り値は新しいクライアントのソケットディスクリプタ。 38: ただしエラー発生時は -1 を返す。 39: -----------------------------------------------------*/ 40: int 41: accept_new_client(int sock){ 42: int len; 43: int new_socket; 44: struct hostent *peer_host; 45: struct sockaddr_in peer_sin; 46: 47: len = sizeof(peer_sin); 48: new_socket = accept(sock, (struct sockaddr *)&peer_sin, &len); 49: if ( new_socket == -1 ){ 50: perror("accept"); 51: exit(1); 52: } 53: /* ここから先はデバッグ用の情報取得 */ 54: peer_host = gethostbyaddr((char *)&peer_sin.sin_addr.s_addr, 55: sizeof(peer_sin.sin_addr), AF_INET); 56: 57: /* ホスト名 */ 58: strncpy(client_info[new_socket].hostname, peer_host->h_name, 59: sizeof client_info[new_socket].hostname); 60: /* IP アドレス */ 61: strncpy(client_info[new_socket].ipaddr, inet_ntoa(peer_sin.sin_addr), 62: sizeof client_info[new_socket].ipaddr); 63: /* ポート番号 */ 64: client_info[new_socket].port = ntohs(peer_sin.sin_port); 65: 66: printf("接続: %s (%s) ポート %d ディスクリプタ %d 番\n", 67: client_info[new_socket].hostname, 68: client_info[new_socket].ipaddr, 69: client_info[new_socket].port, 70: new_socket); 71: return new_socket; 72: } 73: 74: /*----------------------------------------------------- 75: 引数でソケットディスクリプタを受け取り、そのソケットから 76: read(2) で文字列を読み込み、文字列をそのままクライアントに 77: 送信する。 78: -----------------------------------------------------*/ 79: void 80: read_and_reply(int sock){ 81: int read_size; 82: int ret; 83: char buf[BUF_LEN]; 84: 85: read_size = read(sock, buf, sizeof(buf)-1); 86: 87: if ( read_size == -1 ){ 88: perror("read"); 89: } else if ( read_size == 0 ){ 90: printf("%s (%s) ポート %d ディスクリプタ %d 番からの接続が切れました。\n", 91: client_info[sock].hostname, 92: client_info[sock].ipaddr, 93: client_info[sock].port, 94: sock); 95: ret = close(sock); 96: if ( ret == -1 ){ 97: perror("close"); 98: exit(1); 99: } 100: } else { 101: printf("%s (%s) ポート %d ディスクリプタ %d 番からのメッセージ: %.*s", 102: client_info[sock].hostname, 103: client_info[sock].ipaddr, 104: client_info[sock].port, 105: sock, 106: read_size, 107: buf); 108: write(sock, buf, read_size); 109: } 110: } 111: 112: int 113: main(){ 114: int sock_optval = 1; 115: int ret; 116: int port = 5000; 117: int listening_socket; 118: struct sockaddr_in sin; 119: 120: /* リスニングソケットを作成 */ 121: listening_socket = socket(AF_INET, SOCK_STREAM, 0); 122: 123: /* ソケットオプション設定 */ 124: if ( setsockopt(listening_socket, SOL_SOCKET, SO_REUSEADDR, 125: &sock_optval, sizeof(sock_optval)) == -1 ){ 126: perror("setsockopt"); 127: exit(1); 128: } 129: /* アドレスファミリ・ポート番号・IPアドレス設定 */ 130: sin.sin_family = AF_INET; 131: sin.sin_port = htons(port); 132: sin.sin_addr.s_addr = htonl(INADDR_ANY); 133: 134: ret = bind(listening_socket, (struct sockaddr *)&sin, sizeof(sin)); 135: if ( ret == -1 ){ 136: perror("bind"); 137: exit(1); 138: } 139: 140: ret = listen(listening_socket, SOMAXCONN); 141: if ( ret == -1 ){ 142: perror("listen"); 143: exit(1); 144: } 145: printf("ポート %d を見張ります。\n", port); 146: 147: int kq; 148: struct kevent kev; 149: 150: kq = kqueue(); 151: if ( kq == -1 ){ 152: perror("kqueue"); 153: exit(1); 154: } 155: EV_SET(&kev, listening_socket, EVFILT_READ, EV_ADD, 0, 0, NULL); 156: ret = kevent(kq, &kev, 1, NULL, 0, NULL); 157: if ( ret == -1 ){ 158: perror("kevent"); 159: exit(1); 160: } 161: 162: while (1){ 163: int n; 164: struct timespec waitspec; /* kevent に待ち時間を指定するための構造体 */ 165: waitspec.tv_sec = 2; /* 待ち時間に 2.500 秒を指定 */ 166: waitspec.tv_nsec = 500000; 167: 168: n = kevent(kq, NULL, 0, &kev, 1, &waitspec); 169: 170: if ( n == -1 ){ 171: perror("kevent"); 172: exit(1); 173: 174: } else if ( n > 0 ){ 175: /* 読み込み可能なソケットが存在する */ 176: if ( kev.ident == listening_socket ){ 177: /* 新しいクライアントがやってきた */ 178: int new_sock = accept_new_client(kev.ident); 179: if ( new_sock != -1 ){ 180: /* 監視対象に新たなソケットを追加 */ 181: EV_SET(&kev, new_sock, EVFILT_READ, EV_ADD, 0, 0, NULL); 182: n = kevent(kq, &kev, 1, NULL, 0, NULL); 183: if ( n == -1 ){ 184: perror("kevent"); 185: exit(1); 186: } 187: } 188: 189: } else { 190: int sock = kev.ident; 191: printf("ディスクリプタ %d 番が読み込み可能です。\n", sock); 192: read_and_reply(sock); 193: } 194: } 195: } 196: 197: close(listening_socket); 198: return 0; 199: }
以下、kevent() の呼び出しの回数を減らす方法を紹介します。
int sock1, socke2, sock3; struct kevent kev; EV_SET(&kev, sock1, EVFILT_READ, EV_ADD, 0, 0, NULL); kevent(kq, &kev, 1, NULL, 0, NULL); EV_SET(&kev, sock2, EVFILT_READ, EV_ADD, 0, 0, NULL); kevent(kq, &kev, 1, NULL, 0, NULL); EV_SET(&kev, sock3, EVFILT_READ, EV_ADD, 0, 0, NULL); kevent(kq, &kev, 1, NULL, 0, NULL);「kevent() の第三引数は登録したいイベントの個数」と書いたので、 察しのよい人は気付いているかもしれませんが、一度に複数のイベントを登録することもできます。
int sock1, socke2, sock3; struct kevent kev[3]; EV_SET(&kev[0], sock1, EVFILT_READ, EV_ADD, 0, 0, NULL); EV_SET(&kev[1], sock2, EVFILT_READ, EV_ADD, 0, 0, NULL); EV_SET(&kev[2], sock3, EVFILT_READ, EV_ADD, 0, 0, NULL); kevent(kq, kev, 3, NULL, 0, NULL);
int i, n; struct kevent kev[10]; n = kevent(kq, NULL, 0, &kev, sizeof(kev)/sizeof(kev[0]), NULL); for ( i=0 ; i<n ; i++ ){ int sock = kev[i].ident; read(sock, buf, sizeof(buf)); ... }
ちょっとわかりづらいですが、kevent() 呼び出しは一ヶ所だけで、そこでイベント登録とイベント通知取得を同時に行っています。
int kev_regist_num; /* 登録するイベント数 */ struct kevent kev[10]; /* イベント登録・イベント通知取得共用 */ int listening_socket = socket(AF_INET, SOCK_STREAM, 0); bind(listening_socket, (struct sockaddr *)&sin, sizeof(sin)); listen(listening_socket, SOMAXCONN); EV_SET(&kev[0], listening_socket, EVFILT_READ, EV_ADD, 0, 0, NULL); kev_regist_num=1; while (1){ int n = kevent(kq, kev, kev_regist_num, kev, sizeof(kev)/sizeof(kev[0]), NULL); kev_regist_num=0; for ( int i=0 ; i<n ; i++ ){ int sock = kev[i].ident; if ( sock == listening_socket ){ int new_sock = accept(sock, (struct sockaddr *)&peer_sin, &len); EV_SET(&kev[kev_regist_num++], new_sock, EVFILT_READ, EV_ADD, 0, 0, NULL); } else { int len = read(sock, buf, sizeof(buf)); if ( len == 0 ){ close(sock); } else { write(sock, buf, len); } } } }
前へ << C言語で ftp クライアントを作ってみよう (1) | TCP/IP エラー処理 connect 編 >> 次へ |
ご意見・ご指摘は Twitter: @68user までお願いします。