2014年10月27日月曜日

リクエスト処理の流れ: listener_thread(4) イベント処理

(14.3)タイムアウト処理の続き。

(14.4) イベント処理


次が、イベントチェックのepoll_waitで検知したソケットイベントを処理するwhileループだ。
numにはepoll_wait()で取得した準備済ソケット数が入っている。

   1495         while (num) {
   1496             pt = (listener_poll_type *) out_pfd->client_data;

listener_poll_type構造体 ptは、apr_pollset_add()で登録されたapr_pollfd_t構造体のclient_dataにセットされている。

(a) 確立済みソケットの処理


   1497             if (pt->type == PT_CSD) {

こちらのif文は、処理可能なソケットが、確立済みのソケットの場合だ。
workerスレッドのprocess_socketで作成されたevent_conn_state_t構造体 cs の apr_pollfd_t構造体の変数pfdが、apr_pollset_add(event_pollset, &cs->pfd) と第2引数に与えられている。これがout_pfd->client_dataを介して引き渡される。
KeepAliveの次メッセージ待ち、書込み可能待ち、LINGER待ち(標準時間のものと短時間のもの)が該当する。

   1498                 /* one of the sockets is readable */
   1499                 struct timeout_queue *remove_from_q = &write_completion_q;
   1500                 int blocking = 1;

ここで、event_conn_state_t情報のステータス別に処理が分岐する。
event_conn_state_t情報は、listener_poll_type情報のvoid*変数 batonに保持されている。

   1501                 cs = (event_conn_state_t *) pt->baton;
   1502                 switch (cs->pub.state) {
   1503                 case CONN_STATE_CHECK_REQUEST_LINE_READABLE:

これは、次リクエスト待ちのevent_conn_state_tだ。

   1504                     cs->pub.state = CONN_STATE_READ_REQUEST_LINE;
   1505                     remove_from_q = &keepalive_q;
   1506                     /* don't wait for a worker for a keepalive request */

blockingを0にする。
blockingはget_worker()でアイドルworkerスレッドをチェックしたときに、アイドルworkerスレッドがない場合、アイドルworkerスレッドができるまでそこで処理をブロックするかしないかのフラグだ。
1500行目で1にセットされているが、ここで0(ブロックしない)ように変更される。

   1507                     blocking = 0;
   1508                     /* FALL THROUGH */

ここではbreakしない。下記のcase文に処理が継続する。

   1509                 case CONN_STATE_WRITE_COMPLETION:

これは、書込み可能待ちのevent_conn_state_tだ。

下記のget_worker()でアイドル状態のworkerスレッドの存否をチェックする。
CONN_STATE_WRITE_COMPLETIONの場合は、blockingが1なので、ここでブロックされてしまう。
CONN_STATE_CHECK_REQUEST_LINE_READABLEの場合は、アイドルworkerスレッドがない場合にもすぐに抜けてくる。
この場合、workers_were_busyは1になる。have_idle_workerは0だ。

   1510                     get_worker(&have_idle_worker, blocking,
   1511                                &workers_were_busy);

keepalive_qキュー(CONN_STATE_CHECK_REQUEST_LINE_READABLEの場合)、
または、write_completion_qキュー(CONN_STATE_WRITE_COMPLETIONの場合)からタイムアウト情報が削除される。
KeepAliveの場合、上記の通りブロックされないので、アイドルworkerスレッドがない場合にも削除される。

   1512                     apr_thread_mutex_lock(timeout_mutex);
   1513                     TO_QUEUE_REMOVE(*remove_from_q, cs);
   1514                     rc = apr_pollset_remove(event_pollset, &cs->pfd);
   1515
   1516                     /*
   1517                      * Some of the pollset backends, like KQueue or Epoll
   1518                      * automagically remove the FD if the socket is closed,
   1519                      * therefore, we can accept _SUCCESS or _NOTFOUND,
   1520                      * and we still want to keep going
   1521                      */
   1522                     if (rc != APR_SUCCESS && !APR_STATUS_IS_NOTFOUND(rc)) {
   1523                         ap_log_error(APLOG_MARK, APLOG_ERR, rc, ap_server_conf,
   1524                                      "pollset remove failed");
   1525                         apr_thread_mutex_unlock(timeout_mutex);
   1526                         start_lingering_close_nonblocking(cs);
   1527                         break;
   1528                     }
   1529
   1530                     apr_thread_mutex_unlock(timeout_mutex);
   1531                     TO_QUEUE_ELEM_INIT(cs);
   1532                     /* If we didn't get a worker immediately for a keep-alive
   1533                      * request, we close the connection, so that the client can
   1534                      * re-connect to a different process.
   1535                      */
   1536                     if (!have_idle_worker) {

アイドルworkerスレッドはいなかった場合(この条件はブロックされないkeep-alive待ちの場合に発生する可能性がある)、コネクションを直ちに閉じる。
クライアント側はこれによって、別のプロセスに再接続するチャンスができる。

   1537                         start_lingering_close_nonblocking(cs);
   1538                         break;

ここで、breakするので、switch文から抜ける。

   1539                     }

下記は、このイベントをworkerスレッドとの通信用のメッセージキューに登録している。
out_pfdから out_pfd->client_dataに登録されている listener_poll_type情報の、そのbaton情報に登録されている。
event_conn_state_t情報を取得し、この情報(apr_socket_t、event_conn_state_t、メモリプール情報)をworker_queeに登録する。
workerスレッド側では、これを、1486行目のpush_timer2worker()のものと同じように、ap_queue_pop_something()で受け取る。

   1540                     rc = push2worker(out_pfd, event_pollset);
   1541                     if (rc != APR_SUCCESS) {
   1542                         ap_log_error(APLOG_MARK, APLOG_CRIT, rc,
   1543                                      ap_server_conf, "push2worker failed");
   1544                     }
   1545                     else {
   1546                         have_idle_worker = 0;
   1547                     }

workerへのキューにイベントを登録したら、breakでswitch文を抜ける。
正常終了時にはhave_idle_workerの値を初期化(0)している。

   1548                     break;


   1549                 case CONN_STATE_LINGER_NORMAL:

これは、LINGER待ちのevent_conn_state_tだ。

   1550                 case CONN_STATE_LINGER_SHORT:

これは、LINGER待ち(短時間)のevent_conn_state_tだ。
LINGER待ちの場合、標準の待ち時間の場合も、短時間の場合も、実施する処理が同じで、
ともに、process_lingering_close()だ。

この処理では受信データの空読みを実行する。
全てデータを読み終えても、EOF等にならなければ(EAGAIN)、処理をそこで終える(引き続き読込可能となるのを待つ)。
EOF等が検知されれば、ソケットを閉じる。
また、short_linger_qキューまたは、linger_qキューのタイムアウト情報も削除される。
この処理は、このlistenrスレッドの処理内で直接実行されている。

   1551                     process_lingering_close(cs, out_pfd);
   1552                     break;
   1553                 default:

その他の状態は定義されていない。
エラーだ。

   1554                     ap_log_error(APLOG_MARK, APLOG_CRIT, rc,
   1555                                  ap_server_conf,
   1556                                  "event_loop: unexpected state %d",
   1557                                  cs->pub.state);
   1558                     ap_assert(0);
   1559                 }

ここまでが、workerスレッドの処理で再登録された監視対象ソケットに対するイベント処理だ。

   1560             }

(b) Listenソケットの処理

   1561             else if (pt->type == PT_ACCEPT) {

こちらのif文は、監視対象のソケットが、listenしているサーバソケットの場合だった場合の処理だ。
ここではacceptが行われる。

   1562                 /* A Listener Socket is ready for an accept() */
   1563                 if (workers_were_busy) {

get_worker(非ブロックモード)で処理した場合、アイドルworkerスレッドがなければ、workers_were_busyに1がセットされている。
初期値は0なので、通常はこのif文には入らない。
ここに入る直前のget_worker()処理は、1495行目からのwhile()文のループで、前の周回で行われた処理と考えられる。
ここは、前回のチェックで、アイドルworkerスレッドがなかった場合の処理だ。

アイドルworkerスレッドがない状態の場合、新たな接続を受け入れない。
そのため、disable_listensocks()でイベント監視対象からListenソケットを取り除く。
そして、listeners_disabled変数に1をセットする。

   1564                     if (!listeners_disabled)
   1565                         disable_listensocks(process_slot);
   1566                     listeners_disabled = 1;
   1567                     ap_log_error(APLOG_MARK, APLOG_DEBUG, 0, ap_server_conf,
   1568                                  "All workers busy, not accepting new conns"
   1569                                  "in this process");
   1570                 }
   1571                 else if (  (int)apr_atomic_read32(&connection_count)
   1572                            - (int)apr_atomic_read32(&lingering_count)
   1573                          > threads_per_child
   1574                            + ap_queue_info_get_idlers(worker_queue_info) *
   1575                              worker_factor / WORKER_FACTOR_SCALE)

次のif文は、現に受け付けている接続数が、AsyncRequestWorkerFactorの条件範囲に収まるかどうかをチェックしている。

connection_count 現在の接続数
lingering_count 現在リンガリングクローズ中の接続数
(connection_countに含まれる)

この差は、アクティブな接続数となる。


threads_per_child ThreadsPerChildで指定されたworkerスレッド数
ap_queue_info_get_idlers(worker_queue_info) 現在のアイドルworkerスレッド数
worker_factor / WORKER_FACTOR_SCALE syncRequestWorkerFactorの設定値
(16分の1単位で近似されている)

    158 #define WORKER_FACTOR_SCALE   16  /* scale factor to allow fractional values */

この式は、受け入れ可能な接続の上限だ。

アクティブな接続数が受け入れ可能な接続の上限を超えた場合、これ以上新たな接続を受け入れない。
そのため、disable_listensocks()でイベント監視対象からListenソケットを取り除く。
そして、listeners_disabled変数に1をセットする。

   1576                 {
   1577                     if (!listeners_disabled)
   1578                         disable_listensocks(process_slot);
   1579                     ap_log_error(APLOG_MARK, APLOG_DEBUG, 0, ap_server_conf,
   1580                                  "Too many open connections (%u), "
   1581                                  "not accepting new conns in this process",
   1582                                  apr_atomic_read32(&connection_count));
   1583                     ap_log_error(APLOG_MARK, APLOG_TRACE1, 0, ap_server_conf,
   1584                                  "Idle workers: %u",
   1585                                  ap_queue_info_get_idlers(worker_queue_info));
   1586                     listeners_disabled = 1;
   1587                 }
   1588                 else if (listeners_disabled) {

それ以外の条件の場合、新たな接続を受け入れてよいので、
もし listeners_disabledに1がセットされていたら、0にリセットし、
enable_listensocks()でListenソケットをイベント監視対象に復帰させる。

   1589                     listeners_disabled = 0;
   1590                     enable_listensocks(process_slot);
   1591                 }
   1592                 if (!listeners_disabled) {

上記の処理を経て、ここでlisteners_disabled変数が0の場合、acceptを実行する。

   1593                     lr = (ap_listen_rec *) pt->baton;

lrは、ap_listen_rec情報だ。
この情報は、listenしているサーバソケットの情報を保持している。

listener_poll_type構造体 ptのtypeがPT_ACCEPTである情報は、
listener_thread関数の初期処理のひとつinit_pollset()関数内で作成される。
これは、Listenディレクティブで定義されているサーバソケットに対応して作成される。
ここで作成されるのは、apr_pollfd_t構造体の配列で、
このapr_pollfd_t情報のclient_data変数に、listener_poll_type情報が格納される。

    221 static apr_pollfd_t *listener_pollfd;

つまり、listener_pollfd[i].client_dataに、listener_poll_type情報は保持される。

そして、このlistener_poll_type情報のbaton変数に
ap_listen_rec情報がセットされる。
ap_listen_rec情報がまさに、Listenディレクティブの処理時に作成される情報だ。

そして、このap_listen_recのaccept_func変数に、同じくinit_pollset()関数内で
ap_unixd_accept関数ポインタがセットされている。

そして、このapr_pollfd_t情報が、
apr_pollset_add(event_pollset, &listener_pollfd[i])という形で、
apr_pollset_add()関数の第2引数に渡されて、監視対象に追加され、
1496行目で、pt変数に戻ってきている。

そして、ptのbaton変数に格納されているのが、ap_listen_rec情報だ。

   1594                     ap_pop_pool(&ptrans, worker_queue_info);

空いているトランザクションプールを取り出す。

   1595
   1596                     if (ptrans == NULL) {

このif文は空いているトランザクションプールがない場合に作成する処理だ。
トランザクションプールは、acceptするソケットごとに作成される。

   1597                         /* create a new transaction pool for each accepted socket */
   1598                         apr_allocator_t *allocator;
   1599
   1600                         apr_allocator_create(&allocator);
   1601                         apr_allocator_max_free_set(allocator,
   1602                                                    ap_max_mem_free);
   1603                         apr_pool_create_ex(&ptrans, pconf, NULL, allocator);
   1604                         apr_allocator_owner_set(allocator, ptrans);
   1605                         if (ptrans == NULL) {
   1606                             ap_log_error(APLOG_MARK, APLOG_CRIT, rc,
   1607                                          ap_server_conf,
   1608                                          "Failed to create transaction pool");
   1609                             signal_threads(ST_GRACEFUL);
   1610                             return NULL;
   1611                         }
   1612                     }
   1613                     apr_pool_tag(ptrans, "transaction");
   1614

そして、アイドルworkerスレッドを取得する
ここは第2引数が1なので、ブロックモードでアイドルworkerスレッドを確保することになる。
基本的には、workers_were_busyが0の場合、つまりアイドルworkerスレッドがある場合に、ここに入るので、ここではブロックモードではあるが、直ちにreturnすると期待されるだろう。

   1615                     get_worker(&have_idle_worker, 1, &workers_were_busy);

アイドルworkerスレッドが確保できたら、
次がaccept処理となる。
lr->accept_funcは、ap_unixd_accept()が実行される。
基本的にはapccet()が実行される。
合わせて、apr_socket_t情報が作成され、
このソケット情報を生成したメモリプール(ptrans)に対して、cleanup関数を登録している。
これは、socket_cleanup()関数である。
メモリプールやそのcleanup関数についてはまだ説明していないが、APRライブラリに実装されている。
やがては確認したいが、ここでは追わない。

   1616                     rc = lr->accept_func(&csd, lr, ptrans);
   1617
   1618                     /* later we trash rv and rely on csd to indicate
   1619                      * success/failure
   1620                      */
   1621                     AP_DEBUG_ASSERT(rc == APR_SUCCESS || !csd);
   1622
   1623                     if (rc == APR_EGENERAL) {

各種エラー時で、ここでは子プロセスのgracefulな終了を開始している。

   1624                         /* E[NM]FILE, ENOMEM, etc */
   1625                         resource_shortage = 1;
   1626                         signal_threads(ST_GRACEFUL);
   1627                     }
   1628
   1629                     if (csd != NULL) {

このif文が正常に、acceptが行われた場合の処理だ。

   1630                         conns_this_child--;

ここでは、MaxConnectionsPerChildのチェック用に、処理コネクション数をカウントダウンしている。

   1631                         rc = ap_queue_push(worker_queue, csd, NULL, ptrans);

そして、acceptした情報をworkerスレッドとの通信用のメッセージキューに登録している。
workerスレッド側では、acceptイベントを他のイベントと同様に ap_queue_pop_something()で取り出す。
ここでの1540行目との大きな違いは、event_conn_state_t情報がまだ作成されていないことだ。
ap_queue_push()関数は、第3引数がevent_conn_state_t情報だが、この値がNULLになっている。

process_socket関数の893行目

    893     if (cs == NULL) {           /* This is a new connection */

この変数csがNULLとなるのは、このケースだ。

   1632                         if (rc != APR_SUCCESS) {
   1633                             /* trash the connection; we couldn't queue the connected
   1634                              * socket to a worker
   1635                              */
   1636                             apr_socket_close(csd);
   1637                             ap_log_error(APLOG_MARK, APLOG_CRIT, rc,
   1638                                          ap_server_conf,
   1639                                          "ap_queue_push failed");
   1640                             apr_pool_clear(ptrans);
   1641                             ap_push_pool(worker_queue_info, ptrans);
   1642                         }
   1643                         else {

正常終了時にはhave_idle_workerの値を初期化(0)している。
このまま処理を継続する。

   1644                             have_idle_worker = 0;
   1645                         }
   1646                     }
   1647                     else {

こちらは、APR_EGENERAL以外のエラーのケースだ。
処理をこのまま継続している。

   1648                         apr_pool_clear(ptrans);
   1649                         ap_push_pool(worker_queue_info, ptrans);
   1650                     }
   1651                 }
   1652             }               /* if:else on pt->type */
   1653             out_pfd++;
   1654             num--;
   1655         }                   /* while for processing poll */

1414行目のfor()の末尾だ。
ここまでが、epoll_wait()でチェックしたソケットイベント処理である。

次は、タイムアウトキューの処理に進む。

0 件のコメント:

コメントを投稿