2014年10月27日月曜日

リクエスト処理の流れ: listener_thread(5) タイムアウトキューの処理

(14.4) イベント処理のつづき。

(14.5) タイムアウトキューの処理


次は、4つ用意されているタイムアウトキューに対する、タイムアウト処理だ。
タイムアウトに至る前にイベントが発生して、1655行目までのソケットイベント処理で消化されれば
特に何も行われない。
タイムアウトに至るまでに、ソケットの読み込み/書き出しが可能とならなかった場合に、それらソケットが次の処理の対象となる。

   1656
   1657         /* XXX possible optimization: stash the current time for use as
   1658          * r->request_time for new requests
   1659          */
   1660         now = apr_time_now();

まず、現在時刻を取得する。単位はマイクロ秒だ。

   1661         /* we only do this once per 0.1s (TIMEOUT_FUDGE_FACTOR) */
   1662         if (now > timeout_time) {

timeout_timeは初期値は0なので、最初はこのif文に必ず入る。
そして、timeout_timeに現在時刻のTIMEOUT_FUDGE_FACTOR後にセットしている(1664行目)。
定義を見ると、100000マイクロ秒なので、0.1秒。

   1396 #define TIMEOUT_FUDGE_FACTOR 100000

おおむね、0.1秒以上間隔をあけてチェックするということだろう。

   1663             struct process_score *ps;
   1664             timeout_time = now + TIMEOUT_FUDGE_FACTOR;
   1665
   1666             /* handle timed out sockets */
   1667             apr_thread_mutex_lock(timeout_mutex);
   1668
   1669             /* Step 1: keepalive timeouts */
   1670             /* If all workers are busy, we kill older keep-alive connections so that they
   1671              * may connect to another process.
   1672              */

(1)KeepAliveタイムアウト

keepalive_qキューを処理する。

   1673             if (workers_were_busy && keepalive_q.count) {

この時点でworkers_were_busyが1の場合(アイドルworkerスレッドがいない場合)で、
KeepAliveタイムアウトキューに登録がある場合(次のリクエストを待っている場合)、
現在時刻(+0.1秒)からKeepAliveTimeOut先までのタイムアウトイベントを先取りして
リンガリングクローズを実行している。
上記のコメントによれば、すべてのworkerがビジーなら、古いKeep-Alive接続を終了し、
クライアントが必要になった時点で別のプロセスに接続できるようにするということだ。

しかし、KeepAliveTimeOut先でこのkeepalive_qキューのタイムアウトは登録されるのだから、
古いというか、おそらくこの時点で登録済みのすべてのKeepAlive待ちイベントが
全てstart_lingering_close_nonblocking()関数処理されると考えられる。

start_lingering_close_nonblocking()関数は、
既に見た
start_lingering_close_blocking
と異なり、shutdown(SHUT_WR)の前に未出力のデータのフラッシュを試みない。
それ以外は同様で、この処理によって、ソケットはLINGER待ちになりlinger_qキュー、またはshort_linger_qキューに
登録される。

   1674                 ap_log_error(APLOG_MARK, APLOG_TRACE1, 0, ap_server_conf,
   1675                              "All workers are busy, will close %d keep-alive "
   1676                              "connections",
   1677                              keepalive_q.count);
   1678                 process_timeout_queue(&keepalive_q,
   1679                                       timeout_time + ap_server_conf->keep_alive_timeout,
   1680                                       start_lingering_close_nonblocking);
   1681             }
   1682             else {

こちらは、通常の場合で、
timeout_time(現在時刻+0.1秒)時点でタイムアウトしているタイムアウトイベントを
同じく、start_lingering_close_nonblocking()関数処理している。


   1683                 process_timeout_queue(&keepalive_q, timeout_time,
   1684                                       start_lingering_close_nonblocking);
   1685             }
   1686             /* Step 2: write completion timeouts */

(2)書き込み完了タイムアウト


ここは、write_completion_qキューを処理する。
タイムアウト時間が来ていた場合に行う処理は、start_lingering_close_nonblocking()で、
これはKeepAlive待ちと同じだ。
通常は、shutdown(SHUT_WR)が行われ、linger_qキューに登録される。

   1687             process_timeout_queue(&write_completion_q, timeout_time,
   1688                                   start_lingering_close_nonblocking);
   1689             /* Step 3: (normal) lingering close completion timeouts */

(3)リンガリングクローズタイムアウト(標準)

linger_qキューの処理だ。
ここでのタイムアウト時の処理は、stop_lingering_close()だ。
この処理は、ソケットをクローズする。

   1690             process_timeout_queue(&linger_q, timeout_time, stop_lingering_close);
   1691             /* Step 4: (short) lingering close completion timeouts */

(4)リンガリングクローズタイムアウト(短時間)

short_linger_qキューの処理だ。
ここでのタイムアウト時の処理も、linger_qキューと同じで、stop_lingering_close()だ。
ソケットをクローズする。

   1692             process_timeout_queue(&short_linger_q, timeout_time, stop_lingering_close);
   1693

ここまででタイマーキューの処理は終わりになる。

以下は、いくつかの管理情報の更新処理だ。

   1694             ps = ap_get_scoreboard_process(process_slot);
   1695             ps->write_completion = write_completion_q.count;
   1696             ps->keep_alive = keepalive_q.count;
   1697             apr_thread_mutex_unlock(timeout_mutex);
   1698
   1699             ps->connections = apr_atomic_read32(&connection_count);
   1700             ps->suspended = apr_atomic_read32(&suspended_count);
   1701             ps->lingering_close = apr_atomic_read32(&lingering_count);
   1702         }
   1703         if (listeners_disabled && !workers_were_busy
   1704             && (int)apr_atomic_read32(&connection_count)
   1705                - (int)apr_atomic_read32(&lingering_count)
   1706                < ((int)ap_queue_info_get_idlers(worker_queue_info) - 1)
   1707                  * worker_factor / WORKER_FACTOR_SCALE + threads_per_child)
   1708         {
   1709             listeners_disabled = 0;
   1710             enable_listensocks(process_slot);
   1711         }
   1712         /*
   1713          * XXX: do we need to set some timeout that re-enables the listensocks
   1714          * XXX: in case no other event occurs?
   1715          */
   1716     }     /* listener main loop */

ここまでで主ループは終わる。

listenerスレッドの終了処理では、Listenソケットをクローズし、workerスレッド向けのメッセージキューに終了通知を登録する。

   1717
   1718     close_listeners(process_slot, &closed);
   1719     ap_queue_term(worker_queue);
   1720
   1721     apr_thread_exit(thd, APR_SUCCESS);
   1722     return NULL;
   1723 }

これで、listener_threadの処理を終わる。

リクエスト処理の流れ: listener_thread(4) イベント処理

(14.3)タイムアウト処理の続き。

(14.4) イベント処理


次が、イベントチェックのepoll_waitで検知したソケットイベントを処理するwhileループだ。
numにはepoll_wait()で取得した準備済ソケット数が入っている。

   1495         while (num) {
   1496             pt = (listener_poll_type *) out_pfd->client_data;

listener_poll_type構造体 ptは、apr_pollset_add()で登録されたapr_pollfd_t構造体のclient_dataにセットされている。

(a) 確立済みソケットの処理


   1497             if (pt->type == PT_CSD) {

こちらのif文は、処理可能なソケットが、確立済みのソケットの場合だ。
workerスレッドのprocess_socketで作成されたevent_conn_state_t構造体 cs の apr_pollfd_t構造体の変数pfdが、apr_pollset_add(event_pollset, &cs->pfd) と第2引数に与えられている。これがout_pfd->client_dataを介して引き渡される。
KeepAliveの次メッセージ待ち、書込み可能待ち、LINGER待ち(標準時間のものと短時間のもの)が該当する。

   1498                 /* one of the sockets is readable */
   1499                 struct timeout_queue *remove_from_q = &write_completion_q;
   1500                 int blocking = 1;

ここで、event_conn_state_t情報のステータス別に処理が分岐する。
event_conn_state_t情報は、listener_poll_type情報のvoid*変数 batonに保持されている。

   1501                 cs = (event_conn_state_t *) pt->baton;
   1502                 switch (cs->pub.state) {
   1503                 case CONN_STATE_CHECK_REQUEST_LINE_READABLE:

これは、次リクエスト待ちのevent_conn_state_tだ。

   1504                     cs->pub.state = CONN_STATE_READ_REQUEST_LINE;
   1505                     remove_from_q = &keepalive_q;
   1506                     /* don't wait for a worker for a keepalive request */

blockingを0にする。
blockingはget_worker()でアイドルworkerスレッドをチェックしたときに、アイドルworkerスレッドがない場合、アイドルworkerスレッドができるまでそこで処理をブロックするかしないかのフラグだ。
1500行目で1にセットされているが、ここで0(ブロックしない)ように変更される。

   1507                     blocking = 0;
   1508                     /* FALL THROUGH */

ここではbreakしない。下記のcase文に処理が継続する。

   1509                 case CONN_STATE_WRITE_COMPLETION:

これは、書込み可能待ちのevent_conn_state_tだ。

下記のget_worker()でアイドル状態のworkerスレッドの存否をチェックする。
CONN_STATE_WRITE_COMPLETIONの場合は、blockingが1なので、ここでブロックされてしまう。
CONN_STATE_CHECK_REQUEST_LINE_READABLEの場合は、アイドルworkerスレッドがない場合にもすぐに抜けてくる。
この場合、workers_were_busyは1になる。have_idle_workerは0だ。

   1510                     get_worker(&have_idle_worker, blocking,
   1511                                &workers_were_busy);

keepalive_qキュー(CONN_STATE_CHECK_REQUEST_LINE_READABLEの場合)、
または、write_completion_qキュー(CONN_STATE_WRITE_COMPLETIONの場合)からタイムアウト情報が削除される。
KeepAliveの場合、上記の通りブロックされないので、アイドルworkerスレッドがない場合にも削除される。

   1512                     apr_thread_mutex_lock(timeout_mutex);
   1513                     TO_QUEUE_REMOVE(*remove_from_q, cs);
   1514                     rc = apr_pollset_remove(event_pollset, &cs->pfd);
   1515
   1516                     /*
   1517                      * Some of the pollset backends, like KQueue or Epoll
   1518                      * automagically remove the FD if the socket is closed,
   1519                      * therefore, we can accept _SUCCESS or _NOTFOUND,
   1520                      * and we still want to keep going
   1521                      */
   1522                     if (rc != APR_SUCCESS && !APR_STATUS_IS_NOTFOUND(rc)) {
   1523                         ap_log_error(APLOG_MARK, APLOG_ERR, rc, ap_server_conf,
   1524                                      "pollset remove failed");
   1525                         apr_thread_mutex_unlock(timeout_mutex);
   1526                         start_lingering_close_nonblocking(cs);
   1527                         break;
   1528                     }
   1529
   1530                     apr_thread_mutex_unlock(timeout_mutex);
   1531                     TO_QUEUE_ELEM_INIT(cs);
   1532                     /* If we didn't get a worker immediately for a keep-alive
   1533                      * request, we close the connection, so that the client can
   1534                      * re-connect to a different process.
   1535                      */
   1536                     if (!have_idle_worker) {

アイドルworkerスレッドはいなかった場合(この条件はブロックされないkeep-alive待ちの場合に発生する可能性がある)、コネクションを直ちに閉じる。
クライアント側はこれによって、別のプロセスに再接続するチャンスができる。

   1537                         start_lingering_close_nonblocking(cs);
   1538                         break;

ここで、breakするので、switch文から抜ける。

   1539                     }

下記は、このイベントをworkerスレッドとの通信用のメッセージキューに登録している。
out_pfdから out_pfd->client_dataに登録されている listener_poll_type情報の、そのbaton情報に登録されている。
event_conn_state_t情報を取得し、この情報(apr_socket_t、event_conn_state_t、メモリプール情報)をworker_queeに登録する。
workerスレッド側では、これを、1486行目のpush_timer2worker()のものと同じように、ap_queue_pop_something()で受け取る。

   1540                     rc = push2worker(out_pfd, event_pollset);
   1541                     if (rc != APR_SUCCESS) {
   1542                         ap_log_error(APLOG_MARK, APLOG_CRIT, rc,
   1543                                      ap_server_conf, "push2worker failed");
   1544                     }
   1545                     else {
   1546                         have_idle_worker = 0;
   1547                     }

workerへのキューにイベントを登録したら、breakでswitch文を抜ける。
正常終了時にはhave_idle_workerの値を初期化(0)している。

   1548                     break;


   1549                 case CONN_STATE_LINGER_NORMAL:

これは、LINGER待ちのevent_conn_state_tだ。

   1550                 case CONN_STATE_LINGER_SHORT:

これは、LINGER待ち(短時間)のevent_conn_state_tだ。
LINGER待ちの場合、標準の待ち時間の場合も、短時間の場合も、実施する処理が同じで、
ともに、process_lingering_close()だ。

この処理では受信データの空読みを実行する。
全てデータを読み終えても、EOF等にならなければ(EAGAIN)、処理をそこで終える(引き続き読込可能となるのを待つ)。
EOF等が検知されれば、ソケットを閉じる。
また、short_linger_qキューまたは、linger_qキューのタイムアウト情報も削除される。
この処理は、このlistenrスレッドの処理内で直接実行されている。

   1551                     process_lingering_close(cs, out_pfd);
   1552                     break;
   1553                 default:

その他の状態は定義されていない。
エラーだ。

   1554                     ap_log_error(APLOG_MARK, APLOG_CRIT, rc,
   1555                                  ap_server_conf,
   1556                                  "event_loop: unexpected state %d",
   1557                                  cs->pub.state);
   1558                     ap_assert(0);
   1559                 }

ここまでが、workerスレッドの処理で再登録された監視対象ソケットに対するイベント処理だ。

   1560             }

(b) Listenソケットの処理

   1561             else if (pt->type == PT_ACCEPT) {

こちらのif文は、監視対象のソケットが、listenしているサーバソケットの場合だった場合の処理だ。
ここではacceptが行われる。

   1562                 /* A Listener Socket is ready for an accept() */
   1563                 if (workers_were_busy) {

get_worker(非ブロックモード)で処理した場合、アイドルworkerスレッドがなければ、workers_were_busyに1がセットされている。
初期値は0なので、通常はこのif文には入らない。
ここに入る直前のget_worker()処理は、1495行目からのwhile()文のループで、前の周回で行われた処理と考えられる。
ここは、前回のチェックで、アイドルworkerスレッドがなかった場合の処理だ。

アイドルworkerスレッドがない状態の場合、新たな接続を受け入れない。
そのため、disable_listensocks()でイベント監視対象からListenソケットを取り除く。
そして、listeners_disabled変数に1をセットする。

   1564                     if (!listeners_disabled)
   1565                         disable_listensocks(process_slot);
   1566                     listeners_disabled = 1;
   1567                     ap_log_error(APLOG_MARK, APLOG_DEBUG, 0, ap_server_conf,
   1568                                  "All workers busy, not accepting new conns"
   1569                                  "in this process");
   1570                 }
   1571                 else if (  (int)apr_atomic_read32(&connection_count)
   1572                            - (int)apr_atomic_read32(&lingering_count)
   1573                          > threads_per_child
   1574                            + ap_queue_info_get_idlers(worker_queue_info) *
   1575                              worker_factor / WORKER_FACTOR_SCALE)

次のif文は、現に受け付けている接続数が、AsyncRequestWorkerFactorの条件範囲に収まるかどうかをチェックしている。

connection_count 現在の接続数
lingering_count 現在リンガリングクローズ中の接続数
(connection_countに含まれる)

この差は、アクティブな接続数となる。


threads_per_child ThreadsPerChildで指定されたworkerスレッド数
ap_queue_info_get_idlers(worker_queue_info) 現在のアイドルworkerスレッド数
worker_factor / WORKER_FACTOR_SCALE syncRequestWorkerFactorの設定値
(16分の1単位で近似されている)

    158 #define WORKER_FACTOR_SCALE   16  /* scale factor to allow fractional values */

この式は、受け入れ可能な接続の上限だ。

アクティブな接続数が受け入れ可能な接続の上限を超えた場合、これ以上新たな接続を受け入れない。
そのため、disable_listensocks()でイベント監視対象からListenソケットを取り除く。
そして、listeners_disabled変数に1をセットする。

   1576                 {
   1577                     if (!listeners_disabled)
   1578                         disable_listensocks(process_slot);
   1579                     ap_log_error(APLOG_MARK, APLOG_DEBUG, 0, ap_server_conf,
   1580                                  "Too many open connections (%u), "
   1581                                  "not accepting new conns in this process",
   1582                                  apr_atomic_read32(&connection_count));
   1583                     ap_log_error(APLOG_MARK, APLOG_TRACE1, 0, ap_server_conf,
   1584                                  "Idle workers: %u",
   1585                                  ap_queue_info_get_idlers(worker_queue_info));
   1586                     listeners_disabled = 1;
   1587                 }
   1588                 else if (listeners_disabled) {

それ以外の条件の場合、新たな接続を受け入れてよいので、
もし listeners_disabledに1がセットされていたら、0にリセットし、
enable_listensocks()でListenソケットをイベント監視対象に復帰させる。

   1589                     listeners_disabled = 0;
   1590                     enable_listensocks(process_slot);
   1591                 }
   1592                 if (!listeners_disabled) {

上記の処理を経て、ここでlisteners_disabled変数が0の場合、acceptを実行する。

   1593                     lr = (ap_listen_rec *) pt->baton;

lrは、ap_listen_rec情報だ。
この情報は、listenしているサーバソケットの情報を保持している。

listener_poll_type構造体 ptのtypeがPT_ACCEPTである情報は、
listener_thread関数の初期処理のひとつinit_pollset()関数内で作成される。
これは、Listenディレクティブで定義されているサーバソケットに対応して作成される。
ここで作成されるのは、apr_pollfd_t構造体の配列で、
このapr_pollfd_t情報のclient_data変数に、listener_poll_type情報が格納される。

    221 static apr_pollfd_t *listener_pollfd;

つまり、listener_pollfd[i].client_dataに、listener_poll_type情報は保持される。

そして、このlistener_poll_type情報のbaton変数に
ap_listen_rec情報がセットされる。
ap_listen_rec情報がまさに、Listenディレクティブの処理時に作成される情報だ。

そして、このap_listen_recのaccept_func変数に、同じくinit_pollset()関数内で
ap_unixd_accept関数ポインタがセットされている。

そして、このapr_pollfd_t情報が、
apr_pollset_add(event_pollset, &listener_pollfd[i])という形で、
apr_pollset_add()関数の第2引数に渡されて、監視対象に追加され、
1496行目で、pt変数に戻ってきている。

そして、ptのbaton変数に格納されているのが、ap_listen_rec情報だ。

   1594                     ap_pop_pool(&ptrans, worker_queue_info);

空いているトランザクションプールを取り出す。

   1595
   1596                     if (ptrans == NULL) {

このif文は空いているトランザクションプールがない場合に作成する処理だ。
トランザクションプールは、acceptするソケットごとに作成される。

   1597                         /* create a new transaction pool for each accepted socket */
   1598                         apr_allocator_t *allocator;
   1599
   1600                         apr_allocator_create(&allocator);
   1601                         apr_allocator_max_free_set(allocator,
   1602                                                    ap_max_mem_free);
   1603                         apr_pool_create_ex(&ptrans, pconf, NULL, allocator);
   1604                         apr_allocator_owner_set(allocator, ptrans);
   1605                         if (ptrans == NULL) {
   1606                             ap_log_error(APLOG_MARK, APLOG_CRIT, rc,
   1607                                          ap_server_conf,
   1608                                          "Failed to create transaction pool");
   1609                             signal_threads(ST_GRACEFUL);
   1610                             return NULL;
   1611                         }
   1612                     }
   1613                     apr_pool_tag(ptrans, "transaction");
   1614

そして、アイドルworkerスレッドを取得する
ここは第2引数が1なので、ブロックモードでアイドルworkerスレッドを確保することになる。
基本的には、workers_were_busyが0の場合、つまりアイドルworkerスレッドがある場合に、ここに入るので、ここではブロックモードではあるが、直ちにreturnすると期待されるだろう。

   1615                     get_worker(&have_idle_worker, 1, &workers_were_busy);

アイドルworkerスレッドが確保できたら、
次がaccept処理となる。
lr->accept_funcは、ap_unixd_accept()が実行される。
基本的にはapccet()が実行される。
合わせて、apr_socket_t情報が作成され、
このソケット情報を生成したメモリプール(ptrans)に対して、cleanup関数を登録している。
これは、socket_cleanup()関数である。
メモリプールやそのcleanup関数についてはまだ説明していないが、APRライブラリに実装されている。
やがては確認したいが、ここでは追わない。

   1616                     rc = lr->accept_func(&csd, lr, ptrans);
   1617
   1618                     /* later we trash rv and rely on csd to indicate
   1619                      * success/failure
   1620                      */
   1621                     AP_DEBUG_ASSERT(rc == APR_SUCCESS || !csd);
   1622
   1623                     if (rc == APR_EGENERAL) {

各種エラー時で、ここでは子プロセスのgracefulな終了を開始している。

   1624                         /* E[NM]FILE, ENOMEM, etc */
   1625                         resource_shortage = 1;
   1626                         signal_threads(ST_GRACEFUL);
   1627                     }
   1628
   1629                     if (csd != NULL) {

このif文が正常に、acceptが行われた場合の処理だ。

   1630                         conns_this_child--;

ここでは、MaxConnectionsPerChildのチェック用に、処理コネクション数をカウントダウンしている。

   1631                         rc = ap_queue_push(worker_queue, csd, NULL, ptrans);

そして、acceptした情報をworkerスレッドとの通信用のメッセージキューに登録している。
workerスレッド側では、acceptイベントを他のイベントと同様に ap_queue_pop_something()で取り出す。
ここでの1540行目との大きな違いは、event_conn_state_t情報がまだ作成されていないことだ。
ap_queue_push()関数は、第3引数がevent_conn_state_t情報だが、この値がNULLになっている。

process_socket関数の893行目

    893     if (cs == NULL) {           /* This is a new connection */

この変数csがNULLとなるのは、このケースだ。

   1632                         if (rc != APR_SUCCESS) {
   1633                             /* trash the connection; we couldn't queue the connected
   1634                              * socket to a worker
   1635                              */
   1636                             apr_socket_close(csd);
   1637                             ap_log_error(APLOG_MARK, APLOG_CRIT, rc,
   1638                                          ap_server_conf,
   1639                                          "ap_queue_push failed");
   1640                             apr_pool_clear(ptrans);
   1641                             ap_push_pool(worker_queue_info, ptrans);
   1642                         }
   1643                         else {

正常終了時にはhave_idle_workerの値を初期化(0)している。
このまま処理を継続する。

   1644                             have_idle_worker = 0;
   1645                         }
   1646                     }
   1647                     else {

こちらは、APR_EGENERAL以外のエラーのケースだ。
処理をこのまま継続している。

   1648                         apr_pool_clear(ptrans);
   1649                         ap_push_pool(worker_queue_info, ptrans);
   1650                     }
   1651                 }
   1652             }               /* if:else on pt->type */
   1653             out_pfd++;
   1654             num--;
   1655         }                   /* while for processing poll */

1414行目のfor()の末尾だ。
ここまでが、epoll_wait()でチェックしたソケットイベント処理である。

次は、タイムアウトキューの処理に進む。

リクエスト処理の流れ: listener_thread(3) タイムアウト

(14.2) イベントチェック処理の続きだ。

(14.3) タイムアウト処理



ここで、スキップリストを処理する。
タイムアウトに至ったコールバック関数を実行している。

先にタイムアウトイベントまでの時間をチェックして、epollでは最大その時間を待つように
していた。
先のepoll_waitでタイムアウトが発生していれば、ここでタイムアウトの処理が実行されるはずだ。
もし、監視対象のソケットでイベントが発生していれば、タイムアウトより早く処理を終えているので、ここでのタイムアウト処理が行われない可能性がある。
しかし、既にタイムアウト時刻を過ぎて、timeout_intervalに1がセットされていた場合は、ここでタイムアウトが発生するだろう。

   1480         now = apr_time_now();
   1481         apr_thread_mutex_lock(g_timer_skiplist_mtx);
   1482         ep = apr_skiplist_peek(timer_skiplist);
   1483         while (ep) {
   1484             if (ep->when < now + EVENT_FUDGE_FACTOR) {

このif文では、スキップリストに登録されていたタイムアウト処理の実行時刻が、現在時刻(now)から
EVENT_FUDGE_FACTORまでの範囲に収まっているか、既に過ぎていた場合に、タイムアウト処理を開始する。

   1485                 apr_skiplist_pop(timer_skiplist, NULL);

スキップリストからタイムアウトイベント(ep)を取り出す。

   1486                 push_timer2worker(ep);

push_timer2worker()は、タイムアウトイベントepをworkerスレッドとの通信用のメッセージキューに登録している。
workerスレッド側ではこれをap_queue_pop_something()で取り出す。
ここで、タイムアウトイベントは、第5引数(timer_event_t **te)に渡される。

以下 workerスレッド内の該当の処理だ。

   1773         rv = ap_queue_pop_something(worker_queue, &csd, &cs, &ptrans, &te);
  :
   1803         if (te != NULL) {

この変数teにタイムアウトイベントがセットされる。
このif文にはタイムアウトイベントが渡されてきたときに入ることになる。

   1804             te->cbfunc(te->baton);
   1805

これが、そのタイムアウトイベントに登録されているコールバック関数の実行だ。

以下では、実行後に処理済みのタイムアウトイベントを再利用できるように、timer_free_ringに返している。

   1806             {
   1807                 apr_thread_mutex_lock(g_timer_skiplist_mtx);
   1808                 APR_RING_INSERT_TAIL(&timer_free_ring, te, timer_event_t, link);
   1809                 apr_thread_mutex_unlock(g_timer_skiplist_mtx);
   1810             }
   1811         }

   1487             }
   1488             else {
   1489                 break;
   1490             }
   1491             ep = apr_skiplist_peek(timer_skiplist);

そして、次のタイムアウトイベントをチェックする。
この1483行目からのwhile()ループで、タイムアウトしたタイムアウトイベントがworkerスレッドに全て渡されている。

   1492         }
   1493         apr_thread_mutex_unlock(g_timer_skiplist_mtx);
   1494

次は、イベント処理に進む。

リクエスト処理の流れ: listener_thread(2) イベントチェック

(14.1) 主ループからの続きだ。

(14.2) イベントチェック処理


監視対象ソケットの接続受付、データ受信、書込み許可などのイベントをチェックする。
以下の処理がイベントチェック処理apr_pollset_poll()だ。

   1460         rc = apr_pollset_poll(event_pollset, timeout_interval, &num, &out_pfd);

第一引数のevent_pollsetは、apr_pollset_t型の変数で、event.cのファイル内にファイルスコープで定義されている。

    256 static apr_pollset_t *event_pollset;

この構造体は、start_threads()関数で、生成される。

   1883     int good_methods[] = {APR_POLLSET_KQUEUE, APR_POLLSET_PORT, APR_POLLSET_EPOLL};
  :
   1921     /* Create the main pollset */
   1922     for (i = 0; i < sizeof(good_methods) / sizeof(void*); i++) {
   1923         rv = apr_pollset_create_ex(&event_pollset,
   1924                             threads_per_child*2, /* XXX don't we need more, to handle
   1925                                                 * connections in K-A or lingering
   1926                                                 * close?
   1927                                                 */
   1928                             pchild, APR_POLLSET_THREADSAFE | APR_POLLSET_NOCOPY | APR_POLLSET_NODEFAULT,
   1929                             good_methods[i]);
   1930         if (rv == APR_SUCCESS) {
   1931             break;
   1932         }
   1933     }

現時点ではこの詳細は見ないが、good_methodsを順に試して、使えるものが見つかったら 1931行目でbreakしている。
CentOS 6で実行してみると、APR_POLLSET_EPOLLが使用されている。
event_pollset.providerには、このepoll用の必要な関数が指定される。

(apr-1.5.0/poll/unix/epoll.c)


    311 static apr_pollset_provider_t impl = {
    312     impl_pollset_create,
    313     impl_pollset_add,
    314     impl_pollset_remove,
    315     impl_pollset_poll,
    316     impl_pollset_cleanup,
    317     "epoll"
    318 };
    319
    320 apr_pollset_provider_t *apr_pollset_provider_epoll = &impl;

apr_pollset_poll()に戻ると、これは、このapr_pollset_provider_epollに定義されている poll関数、impl_pollset_poll()を実行する。

(apr-1.5.0/poll/unix/pollset.c)


    346 APR_DECLARE(apr_status_t) apr_pollset_poll(apr_pollset_t *pollset,
    347                                            apr_interval_time_t timeout,
    348                                            apr_int32_t *num,
    349                                            const apr_pollfd_t **descriptors)
    350 {
    351     return (*pollset->provider->poll)(pollset, timeout, num, descriptors);
    352 }

impl_pollset_poll関数のソースは次の通り。

(apr-1.5.0/poll/unix/epoll.c)


    243 static apr_status_t impl_pollset_poll(apr_pollset_t *pollset,
    244                                            apr_interval_time_t timeout,
    245                                            apr_int32_t *num,
    246                                            const apr_pollfd_t **descriptors)
    247 {
    248     int ret, i, j;
    249     apr_status_t rv = APR_SUCCESS;
    250     apr_pollfd_t *fdptr;
    251
    252     if (timeout > 0) {
    253         timeout /= 1000;
    254     }
    255
    256     ret = epoll_wait(pollset->p->epoll_fd, pollset->p->pollset, pollset->nalloc,
    257                      timeout);
    258     (*num) = ret;
    259
    260     if (ret < 0) {
    261         rv = apr_get_netos_error();
    262     }
    263     else if (ret == 0) {
    264         rv = APR_TIMEUP;
    265     }
    266     else {
    267         for (i = 0, j = 0; i < ret; i++) {
    268             if (pollset->flags & APR_POLLSET_NOCOPY) {
    269                 fdptr = (apr_pollfd_t *)(pollset->p->pollset[i].data.ptr);
    270             }
    271             else {
    272                 fdptr = &(((pfd_elem_t *) (pollset->p->pollset[i].data.ptr))->pfd);
    273             }
    274             /* Check if the polled descriptor is our
    275              * wakeup pipe. In that case do not put it result set.
    276              */
    277             if ((pollset->flags & APR_POLLSET_WAKEABLE) &&
    278                 fdptr->desc_type == APR_POLL_FILE &&
    279                 fdptr->desc.f == pollset->wakeup_pipe[0]) {
    280                 apr_pollset_drain_wakeup_pipe(pollset);
    281                 rv = APR_EINTR;
    282             }
    283             else {
    284                 pollset->p->result_set[j] = *fdptr;
    285                 pollset->p->result_set[j].rtnevents =
    286                     get_epoll_revent(pollset->p->pollset[i].events);
    287                 j++;
    288             }
    289         }
    290         if (((*num) = j)) { /* any event besides wakeup pipe? */
    291             rv = APR_SUCCESS;
    292
    293             if (descriptors) {
    294                 *descriptors = pollset->p->result_set;
    295             }
    296         }
    297     }
    298
    299     if (!(pollset->flags & APR_POLLSET_NOCOPY)) {
    300         pollset_lock_rings();
    301
    302         /* Shift all PFDs in the Dead Ring to the Free Ring */
    303         APR_RING_CONCAT(&(pollset->p->free_ring), &(pollset->p->dead_ring), pfd_elem_t, link);
    304
    305         pollset_unlock_rings();
    306     }
    307
    308     return rv;
    309 }

event_pollset.p->epoll_fdがepollのディスクリプタで、
event_pollset.p->pollsetに利用可能なイベントが格納されて返される。

epollの監視対象ファイルディスクリプタは、apr_pollset_add()関数で追加され、apr_pollset_remove()関数で除去される。

上記のimpl_pollset_poll()ではこのpollsetをすべて返すのではなく、一部のイベントを除去している。
それが、コメント中に書かれている「wakeup pipe」だ。
これが処理可能なイベントに入っていた場合には、rvにAPR_EINTRがセットされ、apr_pollset_drain_wakeup_pipe() が実行される。
この「wakeup pipe」は apr_pollset_create_ex()関数で、APR_POLLSET_WAKEABLEが指定されている場合に生成されているが、event MPMでは指定されていないようだ。

従って、このpollsetの内容が、descriptorsにセットされる。
呼び出し元(1460行)の変数名で書けば、out_pfdとなる。
そして、numには、out_pfdにセットされた利用可能なイベント数が返される。
timeout_intervalはepoll_wait()で指定されるタイムアウトが渡される。
これは、リクエスト処理の流れ: listener_thread(1) メインループの開始 で見たスキップリストに登録されているタイムアウトまでの時間に基づいて設定された。

   1461         if (rc != APR_SUCCESS) {

ここは成功しなかった場合の経路だ。

   1462             if (APR_STATUS_IS_EINTR(rc)) {

シグナルによる割り込みがあった場合には、continueにより処理を続ける。
1414行目からのforループの先頭に帰っている。

   1463                 continue;
   1464             }
   1465             if (!APR_STATUS_IS_TIMEUP(rc)) {

タイムアウト以外のエラーだった場合は
異常発生とみなして、子プロセスのgracefulな終了を開始している。
1469行目のsignal_threads()では処理中でlistener_may_exit変数に1がセットされる。

   1466                 ap_log_error(APLOG_MARK, APLOG_CRIT, rc, ap_server_conf,
   1467                              "apr_pollset_poll failed.  Attempting to "
   1468                              "shutdown process gracefully");
   1469                 signal_threads(ST_GRACEFUL);
   1470             }
   1471         }
   1472
   1473         if (listener_may_exit) {

ここで再び、終了チェックが行われる。
以下は、1417行目からの処理と同一だ。

   1474             close_listeners(process_slot, &closed);
   1475             if (terminate_mode == ST_UNGRACEFUL
   1476                 || apr_atomic_read32(&connection_count) == 0)
   1477                 break;
   1478         }
   1479

次はタイムアウト処理に進む。

リクエスト処理の流れ: listener_thread(1) メインループの開始

listenerスレッドは各子プロセスに1つ存在している。
worker MPMでは、listenしているサーバ用のソケットを監視して、クライアントからの接続を受け付け通信用のソケットを用意し、これをworkerスレッドにスレッド間通信用のキューを介して引き渡していた。
これが、event MPMでは、そのほかにも、クライアントからのFIN待ちや、Keep-Alive接続における、次のリクエストの受信待ち等の監視も行うようになっている。

(14) listener_thread


(httpd-2.4.9/server/mpm/event/event.c)
   1368 static void * APR_THREAD_FUNC listener_thread(apr_thread_t * thd, void *dummy)
   1369 {

init_pollset()関数では、Listenディレクティブで設定されているサーバソケットでlistener_poll_type情報を作成する。
初期設定では、このListenソケットはすべてlistenerスレッドでのイベント監視対象。

   1399     rc = init_pollset(tpool);
   1400     if (rc != APR_SUCCESS) {
   1401         ap_log_error(APLOG_MARK, APLOG_ERR, rc, ap_server_conf,
   1402                      "failed to initialize pollset, "
   1403                      "attempting to shutdown process gracefully");
   1404         signal_threads(ST_GRACEFUL);
   1405         return NULL;
   1406     }


引き続き主ループに進む。
今回はここを見ていく。

長い関数なので、投稿を分けたい。

(1)メインループの開始(本投稿)

(14.1) 主ループ


   1414     for (;;) {
   1415         int workers_were_busy = 0;

workers_were_busyは、この子プロセスのworkerスレッドに処理可能な空きがあるかどうかのフラグだ。
0は空きがある(ビジーではない)。
get_worker()関数でセットされる。

get_worker()関数は、workerスレッドの空き(アイドルスレッド)の有無をチェックする。
これは2つのモードで動作する。

  • 空きがない場合に、空きができるまでブロックするのがブロックモード(第2引数が1)
  • 空きがなければ、APR_EAGAINで直ちに返ってくるのが非ブロックモード(第2引数が0)

このlistenerスレッド関数ではいずれのモードも利用されている。

そして、get_worker()で第3引数に与えられている workers_were_busy は以下のケースで1がセットされる。

  • ブロックモードで、アイドルworkerスレッドがなく空き待ちを実行した場合
    (ブロックモードの場合、アイドルworkerスレッドがなく空き待ちを実行すると、正常な状態なら実際にアイドルworkerスレッドが得られてからreturnする)
  • 非ブロックモードでアイドルworkerスレッドがなく、直ちにAPR_EAGAINで返った場合


   1416         if (listener_may_exit) {

listener_may_exit はプロセスの終了フラグ。
このフラグが立っている場合、listenerスレッドは終了処理を始める。
このif文は終了処理だ。

とりあえず、今回はリクエスト処理の流れを追いたいので見ない。

   1417             close_listeners(process_slot, &closed);
   1418             if (terminate_mode == ST_UNGRACEFUL
   1419                 || apr_atomic_read32(&connection_count) == 0)
   1420                 break;
   1421         }
   1422
   1423         if (conns_this_child <= 0)

conns_this_childは、MaxConnectionsPerChildのチェック用の変数だ。
初期値にMaxConnectionsPerChildをセットし、接続を受け付けるごとにカウントダウンする(1630行目)。
値が0以下だと、処理数がMaxConnectionsPerChildを越えたことになる。
以下の関数check_infinite_requests()では、MaxConnectionsPerChildに0以外の値が指定されていた場合に、停止処理を実行している。
(MaxConnectionsPerChildが0の場合は無制限にリクエストを処理できる)

   1424             check_infinite_requests();
   1425
   1426         now = apr_time_now();

現在時刻を変数nowにセット。

   1427         if (APLOGtrace6(ap_server_conf)) {

このif文はTRACEログの出力が有効な場合に実行される。
LogLevel mpm_event:trace6
と指定してやると、この条件に入る(省略)。

   1443         }
   1444

以下ではスキップリストのチェックを行っている。
スキップリストはタイムアウト処理を管理している。
これは apr_skiplist とmod_dialupモジュールで簡単には見ている。

スキップリストをチェックして、次のタイムアウト処理までの時間を確認する。

   1445         apr_thread_mutex_lock(g_timer_skiplist_mtx);
   1446         te = apr_skiplist_peek(timer_skiplist);
   1447         if (te) {
   1448             if (te->when > now) {
   1449                 timeout_interval = te->when - now;
   1450             }
   1451             else {
   1452                 timeout_interval = 1;
   1453             }
   1454         }
   1455         else {
   1456             timeout_interval = apr_time_from_msec(100);
   1457         }
   1458         apr_thread_mutex_unlock(g_timer_skiplist_mtx);
   1459

timeout_interval 変数に、その時間をセットする。
既に指定のタイムアウト時刻を過ぎていた場合には、1をセットしている。
タイムアウトイベントがなければ、100ミリ秒がセットされる。

続いて、イベントチェック処理に進む。