(14.4) イベント処理
次が、イベントチェックのepoll_waitで検知したソケットイベントを処理するwhileループだ。
numにはepoll_wait()で取得した準備済ソケット数が入っている。
1495 while (num) { 1496 pt = (listener_poll_type *) out_pfd->client_data;
listener_poll_type構造体 ptは、apr_pollset_add()で登録されたapr_pollfd_t構造体のclient_dataにセットされている。
(a) 確立済みソケットの処理
1497 if (pt->type == PT_CSD) {
こちらのif文は、処理可能なソケットが、確立済みのソケットの場合だ。
workerスレッドのprocess_socketで作成されたevent_conn_state_t構造体 cs の apr_pollfd_t構造体の変数pfdが、apr_pollset_add(event_pollset, &cs->pfd) と第2引数に与えられている。これがout_pfd->client_dataを介して引き渡される。
KeepAliveの次メッセージ待ち、書込み可能待ち、LINGER待ち(標準時間のものと短時間のもの)が該当する。
1498 /* one of the sockets is readable */ 1499 struct timeout_queue *remove_from_q = &write_completion_q; 1500 int blocking = 1;
ここで、event_conn_state_t情報のステータス別に処理が分岐する。
event_conn_state_t情報は、listener_poll_type情報のvoid*変数 batonに保持されている。
1501 cs = (event_conn_state_t *) pt->baton;
1502 switch (cs->pub.state) {
1503 case CONN_STATE_CHECK_REQUEST_LINE_READABLE:
これは、次リクエスト待ちのevent_conn_state_tだ。
1504 cs->pub.state = CONN_STATE_READ_REQUEST_LINE;
1505 remove_from_q = &keepalive_q;
1506 /* don't wait for a worker for a keepalive request */
blockingを0にする。
blockingはget_worker()でアイドルworkerスレッドをチェックしたときに、アイドルworkerスレッドがない場合、アイドルworkerスレッドができるまでそこで処理をブロックするかしないかのフラグだ。
1500行目で1にセットされているが、ここで0(ブロックしない)ように変更される。
1507 blocking = 0;
1508 /* FALL THROUGH */
ここではbreakしない。下記のcase文に処理が継続する。
1509 case CONN_STATE_WRITE_COMPLETION:
これは、書込み可能待ちのevent_conn_state_tだ。
下記のget_worker()でアイドル状態のworkerスレッドの存否をチェックする。
CONN_STATE_WRITE_COMPLETIONの場合は、blockingが1なので、ここでブロックされてしまう。
CONN_STATE_CHECK_REQUEST_LINE_READABLEの場合は、アイドルworkerスレッドがない場合にもすぐに抜けてくる。
この場合、workers_were_busyは1になる。have_idle_workerは0だ。
1510 get_worker(&have_idle_worker, blocking, 1511 &workers_were_busy);
keepalive_qキュー(CONN_STATE_CHECK_REQUEST_LINE_READABLEの場合)、
または、write_completion_qキュー(CONN_STATE_WRITE_COMPLETIONの場合)からタイムアウト情報が削除される。
KeepAliveの場合、上記の通りブロックされないので、アイドルworkerスレッドがない場合にも削除される。
1512 apr_thread_mutex_lock(timeout_mutex);
1513 TO_QUEUE_REMOVE(*remove_from_q, cs);
1514 rc = apr_pollset_remove(event_pollset, &cs->pfd);
1515
1516 /*
1517 * Some of the pollset backends, like KQueue or Epoll
1518 * automagically remove the FD if the socket is closed,
1519 * therefore, we can accept _SUCCESS or _NOTFOUND,
1520 * and we still want to keep going
1521 */
1522 if (rc != APR_SUCCESS && !APR_STATUS_IS_NOTFOUND(rc)) {
1523 ap_log_error(APLOG_MARK, APLOG_ERR, rc, ap_server_conf,
1524 "pollset remove failed");
1525 apr_thread_mutex_unlock(timeout_mutex);
1526 start_lingering_close_nonblocking(cs);
1527 break;
1528 }
1529
1530 apr_thread_mutex_unlock(timeout_mutex);
1531 TO_QUEUE_ELEM_INIT(cs);
1532 /* If we didn't get a worker immediately for a keep-alive
1533 * request, we close the connection, so that the client can
1534 * re-connect to a different process.
1535 */
1536 if (!have_idle_worker) {
アイドルworkerスレッドはいなかった場合(この条件はブロックされないkeep-alive待ちの場合に発生する可能性がある)、コネクションを直ちに閉じる。
クライアント側はこれによって、別のプロセスに再接続するチャンスができる。
1537 start_lingering_close_nonblocking(cs);
1538 break;
ここで、breakするので、switch文から抜ける。
1539 }
下記は、このイベントをworkerスレッドとの通信用のメッセージキューに登録している。
out_pfdから out_pfd->client_dataに登録されている listener_poll_type情報の、そのbaton情報に登録されている。
event_conn_state_t情報を取得し、この情報(apr_socket_t、event_conn_state_t、メモリプール情報)をworker_queeに登録する。
workerスレッド側では、これを、1486行目のpush_timer2worker()のものと同じように、ap_queue_pop_something()で受け取る。
1540 rc = push2worker(out_pfd, event_pollset);
1541 if (rc != APR_SUCCESS) {
1542 ap_log_error(APLOG_MARK, APLOG_CRIT, rc,
1543 ap_server_conf, "push2worker failed");
1544 }
1545 else {
1546 have_idle_worker = 0;
1547 }
workerへのキューにイベントを登録したら、breakでswitch文を抜ける。
正常終了時にはhave_idle_workerの値を初期化(0)している。
1548 break;
1549 case CONN_STATE_LINGER_NORMAL:
これは、LINGER待ちのevent_conn_state_tだ。
1550 case CONN_STATE_LINGER_SHORT:
これは、LINGER待ち(短時間)のevent_conn_state_tだ。
LINGER待ちの場合、標準の待ち時間の場合も、短時間の場合も、実施する処理が同じで、
ともに、process_lingering_close()だ。
この処理では受信データの空読みを実行する。
全てデータを読み終えても、EOF等にならなければ(EAGAIN)、処理をそこで終える(引き続き読込可能となるのを待つ)。
EOF等が検知されれば、ソケットを閉じる。
また、short_linger_qキューまたは、linger_qキューのタイムアウト情報も削除される。
この処理は、このlistenrスレッドの処理内で直接実行されている。
1551 process_lingering_close(cs, out_pfd); 1552 break; 1553 default:
その他の状態は定義されていない。
エラーだ。
1554 ap_log_error(APLOG_MARK, APLOG_CRIT, rc, 1555 ap_server_conf, 1556 "event_loop: unexpected state %d", 1557 cs->pub.state); 1558 ap_assert(0); 1559 }
ここまでが、workerスレッドの処理で再登録された監視対象ソケットに対するイベント処理だ。
1560 }
(b) Listenソケットの処理
1561 else if (pt->type == PT_ACCEPT) {
こちらのif文は、監視対象のソケットが、listenしているサーバソケットの場合だった場合の処理だ。
ここではacceptが行われる。
1562 /* A Listener Socket is ready for an accept() */
1563 if (workers_were_busy) {
get_worker(非ブロックモード)で処理した場合、アイドルworkerスレッドがなければ、workers_were_busyに1がセットされている。
初期値は0なので、通常はこのif文には入らない。
ここに入る直前のget_worker()処理は、1495行目からのwhile()文のループで、前の周回で行われた処理と考えられる。
ここは、前回のチェックで、アイドルworkerスレッドがなかった場合の処理だ。
アイドルworkerスレッドがない状態の場合、新たな接続を受け入れない。
そのため、disable_listensocks()でイベント監視対象からListenソケットを取り除く。
そして、listeners_disabled変数に1をセットする。
1564 if (!listeners_disabled) 1565 disable_listensocks(process_slot); 1566 listeners_disabled = 1; 1567 ap_log_error(APLOG_MARK, APLOG_DEBUG, 0, ap_server_conf, 1568 "All workers busy, not accepting new conns" 1569 "in this process"); 1570 } 1571 else if ( (int)apr_atomic_read32(&connection_count) 1572 - (int)apr_atomic_read32(&lingering_count) 1573 > threads_per_child 1574 + ap_queue_info_get_idlers(worker_queue_info) * 1575 worker_factor / WORKER_FACTOR_SCALE)
次のif文は、現に受け付けている接続数が、AsyncRequestWorkerFactorの条件範囲に収まるかどうかをチェックしている。
connection_count | 現在の接続数 |
lingering_count | 現在リンガリングクローズ中の接続数 (connection_countに含まれる) |
この差は、アクティブな接続数となる。
threads_per_child | ThreadsPerChildで指定されたworkerスレッド数 |
ap_queue_info_get_idlers(worker_queue_info) | 現在のアイドルworkerスレッド数 |
worker_factor / WORKER_FACTOR_SCALE | syncRequestWorkerFactorの設定値 (16分の1単位で近似されている) |
158 #define WORKER_FACTOR_SCALE 16 /* scale factor to allow fractional values */
この式は、受け入れ可能な接続の上限だ。
アクティブな接続数が受け入れ可能な接続の上限を超えた場合、これ以上新たな接続を受け入れない。
そのため、disable_listensocks()でイベント監視対象からListenソケットを取り除く。
そして、listeners_disabled変数に1をセットする。
1576 { 1577 if (!listeners_disabled) 1578 disable_listensocks(process_slot); 1579 ap_log_error(APLOG_MARK, APLOG_DEBUG, 0, ap_server_conf, 1580 "Too many open connections (%u), " 1581 "not accepting new conns in this process", 1582 apr_atomic_read32(&connection_count)); 1583 ap_log_error(APLOG_MARK, APLOG_TRACE1, 0, ap_server_conf, 1584 "Idle workers: %u", 1585 ap_queue_info_get_idlers(worker_queue_info)); 1586 listeners_disabled = 1; 1587 } 1588 else if (listeners_disabled) {
それ以外の条件の場合、新たな接続を受け入れてよいので、
もし listeners_disabledに1がセットされていたら、0にリセットし、
enable_listensocks()でListenソケットをイベント監視対象に復帰させる。
1589 listeners_disabled = 0; 1590 enable_listensocks(process_slot); 1591 } 1592 if (!listeners_disabled) {
上記の処理を経て、ここでlisteners_disabled変数が0の場合、acceptを実行する。
1593 lr = (ap_listen_rec *) pt->baton;
lrは、ap_listen_rec情報だ。
この情報は、listenしているサーバソケットの情報を保持している。
listener_poll_type構造体 ptのtypeがPT_ACCEPTである情報は、
listener_thread関数の初期処理のひとつinit_pollset()関数内で作成される。
これは、Listenディレクティブで定義されているサーバソケットに対応して作成される。
ここで作成されるのは、apr_pollfd_t構造体の配列で、
このapr_pollfd_t情報のclient_data変数に、listener_poll_type情報が格納される。
221 static apr_pollfd_t *listener_pollfd;
つまり、listener_pollfd[i].client_dataに、listener_poll_type情報は保持される。
そして、このlistener_poll_type情報のbaton変数に
ap_listen_rec情報がセットされる。
ap_listen_rec情報がまさに、Listenディレクティブの処理時に作成される情報だ。
そして、このap_listen_recのaccept_func変数に、同じくinit_pollset()関数内で
ap_unixd_accept関数ポインタがセットされている。
そして、このapr_pollfd_t情報が、
apr_pollset_add(event_pollset, &listener_pollfd[i])という形で、
apr_pollset_add()関数の第2引数に渡されて、監視対象に追加され、
1496行目で、pt変数に戻ってきている。
そして、ptのbaton変数に格納されているのが、ap_listen_rec情報だ。
1594 ap_pop_pool(&ptrans, worker_queue_info);
空いているトランザクションプールを取り出す。
1595 1596 if (ptrans == NULL) {
このif文は空いているトランザクションプールがない場合に作成する処理だ。
トランザクションプールは、acceptするソケットごとに作成される。
1597 /* create a new transaction pool for each accepted socket */ 1598 apr_allocator_t *allocator; 1599 1600 apr_allocator_create(&allocator); 1601 apr_allocator_max_free_set(allocator, 1602 ap_max_mem_free); 1603 apr_pool_create_ex(&ptrans, pconf, NULL, allocator); 1604 apr_allocator_owner_set(allocator, ptrans); 1605 if (ptrans == NULL) { 1606 ap_log_error(APLOG_MARK, APLOG_CRIT, rc, 1607 ap_server_conf, 1608 "Failed to create transaction pool"); 1609 signal_threads(ST_GRACEFUL); 1610 return NULL; 1611 } 1612 } 1613 apr_pool_tag(ptrans, "transaction"); 1614
そして、アイドルworkerスレッドを取得する
ここは第2引数が1なので、ブロックモードでアイドルworkerスレッドを確保することになる。
基本的には、workers_were_busyが0の場合、つまりアイドルworkerスレッドがある場合に、ここに入るので、ここではブロックモードではあるが、直ちにreturnすると期待されるだろう。
1615 get_worker(&have_idle_worker, 1, &workers_were_busy);
アイドルworkerスレッドが確保できたら、
次がaccept処理となる。
lr->accept_funcは、ap_unixd_accept()が実行される。
基本的にはapccet()が実行される。
合わせて、apr_socket_t情報が作成され、
このソケット情報を生成したメモリプール(ptrans)に対して、cleanup関数を登録している。
これは、socket_cleanup()関数である。
メモリプールやそのcleanup関数についてはまだ説明していないが、APRライブラリに実装されている。
やがては確認したいが、ここでは追わない。
1616 rc = lr->accept_func(&csd, lr, ptrans);
1617
1618 /* later we trash rv and rely on csd to indicate
1619 * success/failure
1620 */
1621 AP_DEBUG_ASSERT(rc == APR_SUCCESS || !csd);
1622
1623 if (rc == APR_EGENERAL) {
各種エラー時で、ここでは子プロセスのgracefulな終了を開始している。
1624 /* E[NM]FILE, ENOMEM, etc */ 1625 resource_shortage = 1; 1626 signal_threads(ST_GRACEFUL); 1627 } 1628 1629 if (csd != NULL) {
このif文が正常に、acceptが行われた場合の処理だ。
1630 conns_this_child--;
ここでは、MaxConnectionsPerChildのチェック用に、処理コネクション数をカウントダウンしている。
1631 rc = ap_queue_push(worker_queue, csd, NULL, ptrans);
そして、acceptした情報をworkerスレッドとの通信用のメッセージキューに登録している。
workerスレッド側では、acceptイベントを他のイベントと同様に ap_queue_pop_something()で取り出す。
ここでの1540行目との大きな違いは、event_conn_state_t情報がまだ作成されていないことだ。
ap_queue_push()関数は、第3引数がevent_conn_state_t情報だが、この値がNULLになっている。
process_socket関数の893行目
893 if (cs == NULL) { /* This is a new connection */
この変数csがNULLとなるのは、このケースだ。
1632 if (rc != APR_SUCCESS) { 1633 /* trash the connection; we couldn't queue the connected 1634 * socket to a worker 1635 */ 1636 apr_socket_close(csd); 1637 ap_log_error(APLOG_MARK, APLOG_CRIT, rc, 1638 ap_server_conf, 1639 "ap_queue_push failed"); 1640 apr_pool_clear(ptrans); 1641 ap_push_pool(worker_queue_info, ptrans); 1642 } 1643 else {
正常終了時にはhave_idle_workerの値を初期化(0)している。
このまま処理を継続する。
1644 have_idle_worker = 0;
1645 }
1646 }
1647 else {
こちらは、APR_EGENERAL以外のエラーのケースだ。
処理をこのまま継続している。
1648 apr_pool_clear(ptrans); 1649 ap_push_pool(worker_queue_info, ptrans); 1650 } 1651 } 1652 } /* if:else on pt->type */ 1653 out_pfd++; 1654 num--; 1655 } /* while for processing poll */
1414行目のfor()の末尾だ。
ここまでが、epoll_wait()でチェックしたソケットイベント処理である。
次は、タイムアウトキューの処理に進む。
0 件のコメント:
コメントを投稿