2014年12月15日月曜日

CORE出力フィルタ: ソケット送信処理(1)

CORE出力フィルタ(core_output_filter)から呼び出されているソケット送信処理を実行する関数は以下の2つだった。

(1) send_brigade_nonblocking
(2) send_brigade_blocking

これを追ってみる。

(1)send_brigade_nonblocking関数


bucket brigade bb の先頭から、データを持っているデータバケットを対象に送信を行う。
ソケット送信を実行すると、この関数は(未送信のバケットがあっても)復帰する。
送信済みのbucketは、bbから外され、破棄される。

データバケットはFILEバケットをEnableSendFile on でsendfile()で送信する場合は、
ファイルサイズが256バイト以上であれば、sendfile()で送信する。
sendfileで完全に送信が完了しなかった場合には、FILEバケットが分割され、未送信のFILEバケットが bucket brigade bb に残される。

それ以外のデータバケットの場合、writev()で送信する。この時データバケットごとに
I/Oベクトルの配列にデータをセットし、一定数を超えた場合にはソケット送信を実行する。

あるいは、bucket brigade bbのすべてを処理終えたした時にも、送信データがあればソケット送信を行う。

全データが送信できなかった場合には、送信済みのデータバケットは破棄され、未送信の
データバケットが bucket brigade bb に残される。

以下詳細を見てみる。

(httpd-2.4.9/server/core_filters.c)

    616 static apr_status_t send_brigade_nonblocking(apr_socket_t *s,
    617                                              apr_bucket_brigade *bb,
    618                                              apr_size_t *bytes_written,
    619                                              conn_rec *c)

apr_socket_t *s 通信用のソケット情報
apr_bucket_brigade *bb 送信データ用bucket brigade
apr_size_t *bytes_written 出力データサイズ(関数側で返す)
conn_rec *c コネクション情報

    620 {
    621     apr_bucket *bucket, *next;
    622     apr_status_t rv;
    623     struct iovec vec[MAX_IOVEC_TO_WRITE];
    624     apr_size_t nvec = 0;
    625
    626     remove_empty_buckets(bb);

remove_empty_buckets(bb)は、bucket brigade bbを先頭から辿り、
メタデータバケットかlength==0のデータバケットが続いている間、
そのバケットを bb から取り除き、破棄している。

length!=0のデータバケットが残った場合に、bucket brigadeが処理される。

    627
    628     for (bucket = APR_BRIGADE_FIRST(bb);
    629          bucket != APR_BRIGADE_SENTINEL(bb);
    630          bucket = next) {
    631         next = APR_BUCKET_NEXT(bucket);
    632 #if APR_HAS_SENDFILE

CentOSにはsendfileは存在する

    633         if (APR_BUCKET_IS_FILE(bucket)) {

FILEバケットの場合、
(1)EnableSendFileディレクティブが offではない場合
かつ
(2)サイズがAP_MIN_SENDFILE_BYTES(256)バイト以上の場合
sendfileを使った処理を行う。

EnableSendfiFileがonであっても小さなファイルの送信にはsendfileは使われないようだ。

    634             apr_bucket_file *file_bucket = (apr_bucket_file *)(bucket->data);
    635             apr_file_t *fd = file_bucket->fd;
    636             /* Use sendfile to send this file unless:
    637              *   - the platform doesn't support sendfile,
    638              *   - the file is too small for sendfile to be useful, or
    639              *   - sendfile is disabled in the httpd config via "EnableSendfile off"
    640              */
    641
    642             if ((apr_file_flags_get(fd) & APR_SENDFILE_ENABLED) &&
    643                 (bucket->length >= AP_MIN_SENDFILE_BYTES)) {


     56 #define AP_MIN_SENDFILE_BYTES           (256)


    644                 if (nvec > 0) {

nvecは初期値は0
bucket brigadeを処理していく中で、I/Oベクトルにデータを追加している。
そのバッファされているI/Oベクトルのデータを出力しておく。

    645                     (void)apr_socket_opt_set(s, APR_TCP_NOPUSH, 1);
    646                     rv = writev_nonblocking(s, vec, nvec, bb, bytes_written, c);
    647                     nvec = 0;
    648                     if (rv != APR_SUCCESS) {
    649                         (void)apr_socket_opt_set(s, APR_TCP_NOPUSH, 0);
    650                         return rv;
    651                     }
    652                 }
    653                 rv = sendfile_nonblocking(s, bucket, bytes_written, c);

FILEバケットをsendfileで送信する。
出力可能でなければ、ただちに復帰する。
ファイル全体を送信できた場合には、このbucketをbucket brigadeから取り除いて、apr_bucket_destroy()
を行う。
一部のみ送信できた場合は、このbucketを送信済みの部分と残った部分の2つに分割し、
送信済みのバケットを取り除き、apr_bucket_destroyを行う。


    654                 if (nvec > 0) {


ここには来ないのではないか(647行目で0になっている)。


    655                     (void)apr_socket_opt_set(s, APR_TCP_NOPUSH, 0);
    656                 }
    657                 if (rv != APR_SUCCESS) {
    658                     return rv;
    659                 }
    660                 break;


出力したら、処理を終える(forループを抜ける)。




    661             }
    662         }
    663 #endif /* APR_HAS_SENDFILE */
    664         /* didn't sendfile */
    665         if (!APR_BUCKET_IS_METADATA(bucket)) {

このif文はデータバケットの場合の処理

    666             const char *data;
    667             apr_size_t length;
    668
    669             /* Non-blocking read first, in case this is a morphing
    670              * bucket type. */
    671             rv = apr_bucket_read(bucket, &data, &length, APR_NONBLOCK_READ);

データバケットからデータを読み込む。
読込は非ブロックモードで行っている。

    672             if (APR_STATUS_IS_EAGAIN(rv)) {

上流から渡ってきているbucket brigadeにPIPEバケットやSOCKETバケットがあった場合、非ブロックで読み込んで、読み込みデータがなければ、この経路に入る。

    673                 /* Read would block; flush any pending data and retry. */
    674                 if (nvec) {

ここは、既にデータバケットを処理しており、未送信のものが存在している場合だ。

    675                     rv = writev_nonblocking(s, vec, nvec, bb, bytes_written, c);

I/Oベクトルのデータのソケット送信を実施する。

    676                     if (rv) {

ここは rv!=APR_SUCCESS(0)の場合の経路となる。

    677                         return rv;
    678                     }
    679                     nvec = 0;

nvecの値は0でリセットする。
この経路では、データがソケット送信された場合にも、引き続き処理が行われている。

    680                 }
    681
    682                 rv = apr_bucket_read(bucket, &data, &length, APR_BLOCK_READ);

そして、次にブロックモードでデータを読み込む。

    683             }
    684             if (rv != APR_SUCCESS) {

671行目のapr_bucket_read(APR_NONBLOCK_READ)の戻り値がEAGAIN以外のエラーか、
682行目のapr_bucket_read(APR_BLOCK_READ)がエラーの場合に
この経路に入る。

ここでは、エラーでreturn している。

    685                 return rv;
    686             }
    687
    688             /* reading may have split the bucket, so recompute next: */
    689             next = APR_BUCKET_NEXT(bucket);
    690             vec[nvec].iov_base = (char *)data;
    691             vec[nvec].iov_len = length;
    692             nvec++;

読み込んだデータをI/Oベクトルに追加する。

    693             if (nvec == MAX_IOVEC_TO_WRITE) {

I/Oベクトルの要素数がMAX_IOVEC_TO_WRITEになったら、出力を実行する。

    694                 rv = writev_nonblocking(s, vec, nvec, bb, bytes_written, c);

writev()関数を使って、I/Oベクトルの情報を出力する。
出力可能でなけばただちに復帰する。
出力完了したデータサイズに応じて、bucket brugade bb のlengthを消化して、
bucketをbb から取り除き、apr_bucket_destroy()を行う。
bucketのデータの一部しか送信できていない場合には、そのbucketを分割し、
送信済みのbucketを取り外して、apr_bucket_destroy()を行っている。

    695                 nvec = 0;

nvecは出力後、0にリセットする。

    696                 if (rv != APR_SUCCESS) {
    697                     return rv;
    698                 }
    699                 break;

出力したら、処理を終える(forループを抜ける)。

    700             }
    701         }
    702     }
    703
    704     if (nvec > 0) {

bucket brigadeを最後までチェックして、MAX_IOVEC_TO_WRITEにならなかった場合に
ここで出力する。

    705         rv = writev_nonblocking(s, vec, nvec, bb, bytes_written, c);
    706         if (rv != APR_SUCCESS) {
    707             return rv;
    708         }
    709     }
    710
    711     remove_empty_buckets(bb);
    712
    713     return APR_SUCCESS;
    714 }


(2)send_brigade_blocking関数


送信処理だが、こちらはブロックモードだ。
実装では、send_brigade_nonblocking()を呼んでいる。

ブロックモードでは、send_brigade_nonblocking()を実行して、結果がEAGAINの場合(書込みできない)、apr_socket_t情報のtimeoutの設定値を取得して、poll()で書込み可能になるまで待機する。

timeoutになる前に書込み可能になれば、再度send_brigade_nonblocking()を実行する。
timeoutした場合は、APR_TIMEUPでreturnする。
その他のエラーの場合もエラーでreturnする。

つまり、send_brigade_nonblockingと異なり、非ブロックで書き込みを行い、ソケットの書込み準備ができていない場合には、書込み可能になるまで、Timeoutディレクティブ等で指定されているタイムアウト値に従って待機するようになっている。

    726 static apr_status_t send_brigade_blocking(apr_socket_t *s,
    727                                           apr_bucket_brigade *bb,
    728                                           apr_size_t *bytes_written,
    729                                           conn_rec *c)
    730 {
    731     apr_status_t rv;
    732
    733     rv = APR_SUCCESS;
    734     while (!APR_BRIGADE_EMPTY(bb)) {
    735         rv = send_brigade_nonblocking(s, bb, bytes_written, c);

ここでsend_brigade_nonblocking()が実行されている。

    736         if (rv != APR_SUCCESS) {
    737             if (APR_STATUS_IS_EAGAIN(rv)) {

ソケットの書込み準備ができていなかった場合にこの経路に入る。


    738                 /* Wait until we can send more data */
    739                 apr_int32_t nsds;
    740                 apr_interval_time_t timeout;
    741                 apr_pollfd_t pollset;
    742
    743                 pollset.p = c->pool;
    744                 pollset.desc_type = APR_POLL_SOCKET;
    745                 pollset.reqevents = APR_POLLOUT;
    746                 pollset.desc.s = s;
    747                 apr_socket_timeout_get(s, &timeout);

apr_socket_t情報のtimeout設定を取得している。

    748                 do {
    749                     rv = apr_poll(&pollset, 1, &nsds, timeout);

ソケットが書き込み可能になるまで、所定の時間待つ。
シグナルで割りこまれた場合(EINTR)には処理を継続する。

    750                 } while (APR_STATUS_IS_EINTR(rv));
    751                 if (rv != APR_SUCCESS) {

apr_poll()でEINTR以外の何らかのエラーが発生したか、タイムアウトの場合の経路だ。

    752                     break;
    753                 }
    754             }
    755             else {

send_brigade_nonblockingがEAGAIN以外のエラーだった場合の経路になる

    756                 break;
    757             }
    758         }

send_brigade_nonblocking()が成功した場合、あるいは、ソケットが書き込み可能になった場合、
bucket brigadeが空になるまでsend_brigade_nonblocking()処理を継続する。

    759     }

send_brigade_nonblocking処理は、送信済みのbucketを破棄するので、すべて送信し終えれば、bucket brigadeが空となり
この734行からのwhile()ループを抜ける。

    760     return rv;
    761 }

以上、2関数を見たが、ここでも最終的なソケット送信関数は呼ばれていなかった。
ここでは送信処理として次の関数から呼ばれてる。

  • sendfile_nonblocking
  • writev_nonblocking

実際のところ、上記関数の内部を見ても、ソケット送信関数は呼ばれていない。
それぞれは、さらに次のAPRライブラリ関数を使用し、そこでようやく直接ソケットディスクリプタを扱う関数にたどり着く。

  • apr_socket_sendfile
  • apr_socket_sendv

もう少しなので、辿ってみることにする。

0 件のコメント:

コメントを投稿