2014年11月25日火曜日

apr_bucket と apr_bucket_brigade

今度は CORE_IN入力フィルタを見ようとしたところだが、入出力フィルタではbucketとbucket brigadeの処理が重要だ。
そこで、先に少し見ておくことにした。

bucket brigade(apr_bucket_brigade)は内部にbucket(apr_bucket)の列のリング構造を持っている。
bucket情報は前後へのポインタを持っており、bucket brigade情報自体もbucket情報へのポインタを持っている。
bucket brigadeの持っている次へのポインタが、bucket列の先頭に繋がり、bucket列を順に辿って、bucketの末尾の次が再び、bucket brigadeに繋がってリング構造となる。
逆方向も同じで、bucket brigadeの前にbucket列の末尾が繋がり、前に辿っていくと、bucket列の先頭に繋がり、その前に再びbucket brigadeが位置する。
このbucket brigade自身(内部)のアドレスは有効なbucket情報ではなく、センチネル(番兵)として機能する。



brigadeが軍隊がらみの用語らしく、bucket brigadeが何を意味するのか分からなかったが、単純に "bucket brigade" で辞書を引けば載っていた。バケツリレーの列のことだった。
バケツリレーか。なるほどね。

2014年11月17日月曜日

出力フィルタ: CORE出力フィルタ

CORE出力フィルタ: ap_core_output_filter



以前、出力フィルタの概要を見た。
今度は、出力フィルタの階層の最下流にあるCORE出力フィルタを少し細かく見る。
中間層の出力フィルタは上流から受け取った bucket brigadeを加工し、新しいbucket brigadeを作成すると、これを下流の出力フィルタに渡した。
最下流では、もう渡し先の出力フィルタはないので、データをクライアントに向けて送信する。


(httpd-2.4.9/server/core_filters.c)

    372 apr_status_t ap_core_output_filter(ap_filter_t *f, apr_bucket_brigade *new_bb)


ap_filter_t *fで、この出力フィルタ(ap_core_output_filter)のフィルタ情報が渡され、
apr_bucket_brigade *new_bbには送信するbacket brigadeが渡される。

    373 {
    374     conn_rec *c = f->c;

この処理のconn_rec情報。

    375     core_net_rec *net = f->ctx;

core_net_rec情報は、process_socket()で実行されるpre_connectionフック関数のうち
core_pre_connection()で用意される。

    376     core_output_filter_ctx_t *ctx = net->out_ctx;

CORE出力フィルタ用のコンテキスト情報。最初の関数呼び出しで本フィルタ内で作成される。

    377     apr_bucket_brigade *bb = NULL;
    378     apr_bucket *bucket, *next, *flush_upto = NULL;
    379     apr_size_t bytes_in_brigade, non_file_bytes_in_brigade;
    380     int eor_buckets_in_brigade, morphing_bucket_in_brigade;
    381     apr_status_t rv;
    382
    383     /* Fail quickly if the connection has already been aborted. */
    384     if (c->aborted) {

ここは異常系の処理だ。

    385         if (new_bb != NULL) {
    386             apr_brigade_cleanup(new_bb);
    387         }
    388         return APR_ECONNABORTED;
    389     }
    390
    391     if (ctx == NULL) {

コンテキスト情報 core_output_filter_ctx_t を用意する。
リクエスト処理で初めてCORE出力フィルタが呼ばれたときに行われることになる。

CORE出力フィルタのコンテキスト情報の定義は次の通り。

     81 struct core_output_filter_ctx {
     82     apr_bucket_brigade *buffered_bb;
     83     apr_bucket_brigade *tmp_flush_bb;
     84     apr_pool_t *deferred_write_pool;
     85     apr_size_t bytes_written;
     86 };

    392         ctx = apr_pcalloc(c->pool, sizeof(*ctx));
    393         net->out_ctx = (core_output_filter_ctx_t *)ctx;
    394         /*
    395          * Need to create tmp brigade with correct lifetime. Passing
    396          * NULL to apr_brigade_split_ex would result in a brigade
    397          * allocated from bb->pool which might be wrong.
    398          */

2つの作業用のbucket brigadeを作成し、コンテクスト情報に登録する。
いずれも、conn_recのプール(transactionプール)から確保している。

    399         ctx->tmp_flush_bb = apr_brigade_create(c->pool, c->bucket_alloc);
    400         /* same for buffered_bb and ap_save_brigade */
    401         ctx->buffered_bb = apr_brigade_create(c->pool, c->bucket_alloc);
    402     }
    403
    404     if (new_bb != NULL)
    405         bb = new_bb;

上流から渡されてきたbucket brigadeのアドレスを作業用の変数 bbにセットする。

    406
    407     if ((ctx->buffered_bb != NULL) &&
    408         !APR_BRIGADE_EMPTY(ctx->buffered_bb)) {
    409         if (new_bb != NULL) {
    410             APR_BRIGADE_PREPEND(bb, ctx->buffered_bb);
    411         }
    412         else {
    413             bb = ctx->buffered_bb;
    414         }

コンテキスト情報のbuffered_bbの内容と、今回の呼び出しで渡されてきた bucket brigadeの内容を
マージしている。
つまり、受け取ったnew_bbの前にフィルタの内部に前の処理時に留保されたbucket brigadeであるbuffered_bbを繋いでいる。

    415         c->data_in_output_filters = 0;

data_in_output_filtersは出力フィルタに未出力のデータ(bucket brigade)があるかどうかを判定している。
初期値として、なし(0)をセットする。

    416     }
    417     else if (new_bb == NULL) {

上流からbucket brigadeが渡されていない場合、
そして、buffered_bbにもデータがない場合、
処理対象となるbucketがないため、処理は行われず、直ちに成功でreturnする。

    418         return APR_SUCCESS;
    419     }

この部分のソースコードには、処理概要を説明している長めのコメントがある。
これを読めば、やっていることが分かるが、
ここでは、ソースコードに従って見ていきたいので、削った。
同じことを以下で書いている。
1)、2)だのa)、b)だのは、このコメントに書かれていた番号を該当する箇所の説明につけたものだ。

    467
    468     if (new_bb == NULL) {

1) ここは、コンテキスト情報のbufferd_bbに既にあったbucketだけが処理対象となるケース。

new_bbがNULLになるケースがどれほどあるか把握していないが、
process_socket()でCONN_STATE_WRITE_COMPLETION状態の場合に実行される
以下の989行目のコードは該当している。

(httpd-2.4.9/server/mpm/event/event.c)
    982     if (cs->pub.state == CONN_STATE_WRITE_COMPLETION) {
    983         ap_filter_t *output_filter = c->output_filters;
    984         apr_status_t rv;
    985         ap_update_child_status_from_conn(sbh, SERVER_BUSY_WRITE, c);
    986         while (output_filter->next != NULL) {
    987             output_filter = output_filter->next;
    988         }
    989         rv = output_filter->frec->filter_func.out_func(output_filter, NULL);

ここでは、無条件にデータの送信が非ブロックモードで実行される。

    469         rv = send_brigade_nonblocking(net->client_socket, bb,
    470                                       &(ctx->bytes_written), c);

send_brigade_nonblocking()処理内を辿ると、実際のクライアントへの送信処理が行われている。
net->client_socketはクライアントとの通信ソケット情報。
bbはbucket brigade。
ctx->bytes_writtenには送信済みデータ長が格納される(加算される)。
cはconn_rec情報。

    471         if (APR_STATUS_IS_EAGAIN(rv)) {
    472             rv = APR_SUCCESS;
    473         }
    474         else if (rv != APR_SUCCESS) {
    475             /* The client has aborted the connection */
    476             ap_log_cerror(APLOG_MARK, APLOG_TRACE1, rv, c,
    477                           "core_output_filter: writing data to the network");
    478             c->aborted = 1;
    479         }
    480         setaside_remaining_output(f, ctx, bb, c);

setaside_remaining_output()は、bucket brigade bbの未出力のデータを buffere_bbに退避する。
未出力データがあった場合には、識別用にconn_recのdata_in_output_filtersに1がセットされる。

送信処理が実行されると、このCORE出力関数からは復帰する。

    481         return rv;
    482     }

ここまでが1)の処理となる。

以下でいくつかの送信判定用の変数値が初期化される。

    483
    484     bytes_in_brigade = 0;
    485     non_file_bytes_in_brigade = 0;
    486     eor_buckets_in_brigade = 0;
    487     morphing_bucket_in_brigade = 0;

bytes_in_brigade bucket brigade内のデータバケットの lengthの合計を格納する(lengthが-1のものを除く)
non_file_bytes_in_brigade bucket brigade内のデータバケット(FILEを除く)の lengthの合計を格納する(lengthが-1のものを除く)
eor_buckets_in_brigade EORメタデータバケットの数をカウントする
morphing_bucket_in_brigade bucket brigade内のデータバケットでlengthが -1のものがあった場合に1を立てるフラグ


    488
    489     for (bucket = APR_BRIGADE_FIRST(bb); bucket != APR_BRIGADE_SENTINEL(bb);
    490          bucket = next) {
    491         next = APR_BUCKET_NEXT(bucket);

ここで、bufferd_bbとnew_bbが連結された bucket brigade bb 内のbucketが順に処理される。

    492
    493         if (!APR_BUCKET_IS_METADATA(bucket)) {

bucketがデータタイプのバケットの場合

    494             if (bucket->length == (apr_size_t)-1) {

これはバケットのデータ長が事前には不明な場合で、
たとえば、SOCKETバケットやPIPEバケットなど、
読み込んでみないとデータ長が分からないバケットが該当する。

    495                 /*
    496                  * A setaside of morphing buckets would read everything into
    497                  * memory. Instead, we will flush everything up to and
    498                  * including this bucket.
    499                  */
    500                 morphing_bucket_in_brigade = 1;
    501             }
    502             else {

バケットのデータ長が分かっている場合は、bytes_in_brigadeに値を足しこむ。

    503                 bytes_in_brigade += bucket->length;
    504                 if (!APR_BUCKET_IS_FILE(bucket))

特にバケットがFILEバケットでない場合に、non_file_bytes_in_brigadeに値を足しこむ

    505                     non_file_bytes_in_brigade += bucket->length;
    506             }
    507         }
    508         else if (AP_BUCKET_IS_EOR(bucket)) {

bucketが EORメタデータバケットの場合
eor_buckets_in_brigadeをカウントアップする。

    509             eor_buckets_in_brigade++;
    510         }
    511

2) 上記のbucketのチェックを終えて、ここまでのデータ(bucket列)を実際に送信するかどうかの判定を行う。

    512         if (APR_BUCKET_IS_FLUSH(bucket)
    513             || non_file_bytes_in_brigade >= THRESHOLD_MAX_BUFFER
    514             || morphing_bucket_in_brigade
    515             || eor_buckets_in_brigade > MAX_REQUESTS_IN_PIPELINE) {
    516             /* this segment of the brigade MUST be sent before returning. */

このif文の条件を満たすと、ここまでのbucketの送信を実行する。
下記でflush_uptoにセットされるnextが送信停止位置を示すことになる。
条件は以下のいずれかが真の場合だ。

(a)FLUSHメタデータバケットがあった場合
(b)FILEではないデータバケットのlengthの合計(non_file_bytes_in_brigade)がTHRESHOLD_MAX_BUFFER(64KB)以上になった場合

    364 #define THRESHOLD_MAX_BUFFER 65536

(d)データバケットにlengthが-1のものがあった場合(morphing_bucket_in_brigade==1)
(c)EORメタデータバケットがMAX_REQUESTS_IN_PIPELINE(5)個以上処理された場合

(eor_buckets_in_brigade)
    365 #define MAX_REQUESTS_IN_PIPELINE 5


    517
    518             if (APLOGctrace6(c)) {
    519                 char *reason = APR_BUCKET_IS_FLUSH(bucket) ?
    520                                "FLUSH bucket" :
    521                                (non_file_bytes_in_brigade >= THRESHOLD_MAX_BUFFER) ?
    522                                "THRESHOLD_MAX_BUFFER" :
    523                                morphing_bucket_in_brigade ? "morphing bucket" :
    524                                "MAX_REQUESTS_IN_PIPELINE";
    525                 ap_log_cerror(APLOG_MARK, APLOG_TRACE6, 0, c,
    526                               "core_output_filter: flushing because of %s",
    527                               reason);
    528             }
    529             /*
    530              * Defer the actual blocking write to avoid doing many writes.
    531              */
    532             flush_upto = next;

flush_uptoに値がセットされた場合、以降の処理で、先頭からこの直前(ここで処理されているbucket)までのbucket列を出力することになる(3)。

ここで、いったん送信条件をリセットする。

    533
    534             bytes_in_brigade = 0;
    535             non_file_bytes_in_brigade = 0;
    536             eor_buckets_in_brigade = 0;
    537             morphing_bucket_in_brigade = 0;
    538         }
    539     }
    540
    541     if (flush_upto != NULL) {

3) flush_uptoがNULLでない場合、ここまでのデータ(bucket列)の送信が実行される。

    542         ctx->tmp_flush_bb = apr_brigade_split_ex(bb, flush_upto,
    543                                                  ctx->tmp_flush_bb);

まず、flush_uptoから末尾までのbucket列を分割し、コンテキスト情報のtmp_flush_bbに退避する。

    544         rv = send_brigade_blocking(net->client_socket, bb,
    545                                    &(ctx->bytes_written), c);

そして、残った先頭からfulsh_uptoの前までのbucketからなる bucket brigade bb をブロックモードで送信する。

ブロックモードでの送信処理を辿ると、その内部で非ブロックモードの送信処理send_brigade_nonblocking()を呼び出している。
非ブロックでの送信処理の実行結果が、エラーで、エラーコードがEAGAIN/EWOULDBLOCK の場合に、poll関数で、書込み可能になるまで待機する(Timeoutの設定が待ち時間の上限)処理が追加されている。

    546         if (rv != APR_SUCCESS) {
    547             /* The client has aborted the connection */
    548             ap_log_cerror(APLOG_MARK, APLOG_TRACE1, rv, c,
    549                           "core_output_filter: writing data to the network");
    550             c->aborted = 1;
    551             return rv;
    552         }
    553         APR_BRIGADE_CONCAT(bb, ctx->tmp_flush_bb);

送信処理を終えると、tmp_flush_bbに退避しておいた残りのbucket列を bb に繋ぎなおしている。

    554     }
    555
    556     if (bytes_in_brigade >= THRESHOLD_MIN_WRITE) {

4) FILEを含むデータバケットのlengthの合計が、THRESHOLD_MIN_WRITE(4KB)以上になった場合にも出力処理を実行する。

    363 #define THRESHOLD_MIN_WRITE 4096


ここでは、出力は非ブロックモードで行われる。

    557         rv = send_brigade_nonblocking(net->client_socket, bb,
    558                                       &(ctx->bytes_written), c);
    559         if ((rv != APR_SUCCESS) && (!APR_STATUS_IS_EAGAIN(rv))) {
    560             /* The client has aborted the connection */
    561             ap_log_cerror(APLOG_MARK, APLOG_TRACE1, rv, c,
    562                           "core_output_filter: writing data to the network");
    563             c->aborted = 1;
    564             return rv;
    565         }
    566     }
    567


    568     setaside_remaining_output(f, ctx, bb, c);

未送信データが存在した場合buffered_bbに保持しておいて、識別用にconn_recのdata_in_output_filtersに1をセットする。

    569     return APR_SUCCESS;
    570 }


ここまでが、CORE出力関数となる。
実際のソケットの送信処理は、下記の関数内で実行されている。
いずれも server/core_filters.c ファイル内でstatic関数として定義されている。
概要は上述した通り。

  • send_brigade_nonblocking
  • send_brigade_blocking

送信の実体は、sendfile()であったり、バケットのデータをI/Oベクトルに収めてのwritev()であったりする。
これは、別途コードを追う予定。

2014年11月4日火曜日

リクエスト処理の流れ: eventMPM コネクション処理フロー

コネクション処理フロー

worker_thread()、process_socket()とlistener_thread()の処理を概観して、コネクションのconn_state_e情報と処理の流れを図にした。



accept待ちのListenソケットがlistener_threadで接続を受け付けて、workerスレッドのprocess_socketでのリクエスト処理/出力処理を経て、ソケットを閉じたり、次リクエスト受け付けを待つという流れになっている。
青い四角がworkerスレッドの処理、黄色い四角がlistenerスレッドの処理。ソケットクローズは、listenerスレッドで行われる。ただ、これは正常系の場合で、既に見てきているように、ソケットのクローズは、エラー時などには別のタイミングでも実行されている。

SUSPENDED状態を持つタイムアウト処理は、いまのところmod_dialupにしか実装は見当たらない。
タイムアウト処理をSUSPENDED状態の中で繰り返し、処理を終えると、ソケットを閉じる処理に向かっている。