今度は CORE_IN入力フィルタを見ようとしたところだが、入出力フィルタではbucketとbucket brigadeの処理が重要だ。
そこで、先に少し見ておくことにした。
bucket brigade(apr_bucket_brigade)は内部にbucket(apr_bucket)の列のリング構造を持っている。
bucket情報は前後へのポインタを持っており、bucket brigade情報自体もbucket情報へのポインタを持っている。
bucket brigadeの持っている次へのポインタが、bucket列の先頭に繋がり、bucket列を順に辿って、bucketの末尾の次が再び、bucket brigadeに繋がってリング構造となる。
逆方向も同じで、bucket brigadeの前にbucket列の末尾が繋がり、前に辿っていくと、bucket列の先頭に繋がり、その前に再びbucket brigadeが位置する。
このbucket brigade自身(内部)のアドレスは有効なbucket情報ではなく、センチネル(番兵)として機能する。
brigadeが軍隊がらみの用語らしく、bucket brigadeが何を意味するのか分からなかったが、単純に "bucket brigade" で辞書を引けば載っていた。バケツリレーの列のことだった。
バケツリレーか。なるほどね。
2014年11月25日火曜日
2014年11月17日月曜日
出力フィルタ: CORE出力フィルタ
CORE出力フィルタ: ap_core_output_filter
以前、出力フィルタの概要を見た。
今度は、出力フィルタの階層の最下流にあるCORE出力フィルタを少し細かく見る。
中間層の出力フィルタは上流から受け取った bucket brigadeを加工し、新しいbucket brigadeを作成すると、これを下流の出力フィルタに渡した。
最下流では、もう渡し先の出力フィルタはないので、データをクライアントに向けて送信する。
(httpd-2.4.9/server/core_filters.c)
372 apr_status_t ap_core_output_filter(ap_filter_t *f, apr_bucket_brigade *new_bb)
ap_filter_t *fで、この出力フィルタ(ap_core_output_filter)のフィルタ情報が渡され、
apr_bucket_brigade *new_bbには送信するbacket brigadeが渡される。
373 { 374 conn_rec *c = f->c;
この処理のconn_rec情報。
375 core_net_rec *net = f->ctx;
core_net_rec情報は、process_socket()で実行されるpre_connectionフック関数のうち
core_pre_connection()で用意される。
376 core_output_filter_ctx_t *ctx = net->out_ctx;
CORE出力フィルタ用のコンテキスト情報。最初の関数呼び出しで本フィルタ内で作成される。
377 apr_bucket_brigade *bb = NULL; 378 apr_bucket *bucket, *next, *flush_upto = NULL; 379 apr_size_t bytes_in_brigade, non_file_bytes_in_brigade; 380 int eor_buckets_in_brigade, morphing_bucket_in_brigade; 381 apr_status_t rv; 382 383 /* Fail quickly if the connection has already been aborted. */ 384 if (c->aborted) {
ここは異常系の処理だ。
385 if (new_bb != NULL) { 386 apr_brigade_cleanup(new_bb); 387 } 388 return APR_ECONNABORTED; 389 } 390 391 if (ctx == NULL) {
コンテキスト情報 core_output_filter_ctx_t を用意する。
リクエスト処理で初めてCORE出力フィルタが呼ばれたときに行われることになる。
CORE出力フィルタのコンテキスト情報の定義は次の通り。
81 struct core_output_filter_ctx { 82 apr_bucket_brigade *buffered_bb; 83 apr_bucket_brigade *tmp_flush_bb; 84 apr_pool_t *deferred_write_pool; 85 apr_size_t bytes_written; 86 };
392 ctx = apr_pcalloc(c->pool, sizeof(*ctx)); 393 net->out_ctx = (core_output_filter_ctx_t *)ctx; 394 /* 395 * Need to create tmp brigade with correct lifetime. Passing 396 * NULL to apr_brigade_split_ex would result in a brigade 397 * allocated from bb->pool which might be wrong. 398 */
2つの作業用のbucket brigadeを作成し、コンテクスト情報に登録する。
いずれも、conn_recのプール(transactionプール)から確保している。
399 ctx->tmp_flush_bb = apr_brigade_create(c->pool, c->bucket_alloc); 400 /* same for buffered_bb and ap_save_brigade */ 401 ctx->buffered_bb = apr_brigade_create(c->pool, c->bucket_alloc); 402 } 403 404 if (new_bb != NULL) 405 bb = new_bb;
上流から渡されてきたbucket brigadeのアドレスを作業用の変数 bbにセットする。
406 407 if ((ctx->buffered_bb != NULL) && 408 !APR_BRIGADE_EMPTY(ctx->buffered_bb)) { 409 if (new_bb != NULL) { 410 APR_BRIGADE_PREPEND(bb, ctx->buffered_bb); 411 } 412 else { 413 bb = ctx->buffered_bb; 414 }
コンテキスト情報のbuffered_bbの内容と、今回の呼び出しで渡されてきた bucket brigadeの内容を
マージしている。
つまり、受け取ったnew_bbの前にフィルタの内部に前の処理時に留保されたbucket brigadeであるbuffered_bbを繋いでいる。
415 c->data_in_output_filters = 0;
data_in_output_filtersは出力フィルタに未出力のデータ(bucket brigade)があるかどうかを判定している。
初期値として、なし(0)をセットする。
416 } 417 else if (new_bb == NULL) {
上流からbucket brigadeが渡されていない場合、
そして、buffered_bbにもデータがない場合、
処理対象となるbucketがないため、処理は行われず、直ちに成功でreturnする。
418 return APR_SUCCESS; 419 }
この部分のソースコードには、処理概要を説明している長めのコメントがある。
これを読めば、やっていることが分かるが、
ここでは、ソースコードに従って見ていきたいので、削った。
同じことを以下で書いている。
1)、2)だのa)、b)だのは、このコメントに書かれていた番号を該当する箇所の説明につけたものだ。
467 468 if (new_bb == NULL) {
1) ここは、コンテキスト情報のbufferd_bbに既にあったbucketだけが処理対象となるケース。
new_bbがNULLになるケースがどれほどあるか把握していないが、
process_socket()でCONN_STATE_WRITE_COMPLETION状態の場合に実行される
以下の989行目のコードは該当している。
(httpd-2.4.9/server/mpm/event/event.c) 982 if (cs->pub.state == CONN_STATE_WRITE_COMPLETION) { 983 ap_filter_t *output_filter = c->output_filters; 984 apr_status_t rv; 985 ap_update_child_status_from_conn(sbh, SERVER_BUSY_WRITE, c); 986 while (output_filter->next != NULL) { 987 output_filter = output_filter->next; 988 } 989 rv = output_filter->frec->filter_func.out_func(output_filter, NULL);
ここでは、無条件にデータの送信が非ブロックモードで実行される。
469 rv = send_brigade_nonblocking(net->client_socket, bb,
470 &(ctx->bytes_written), c);
send_brigade_nonblocking()処理内を辿ると、実際のクライアントへの送信処理が行われている。
net->client_socketはクライアントとの通信ソケット情報。
bbはbucket brigade。
ctx->bytes_writtenには送信済みデータ長が格納される(加算される)。
cはconn_rec情報。
471 if (APR_STATUS_IS_EAGAIN(rv)) {
472 rv = APR_SUCCESS;
473 }
474 else if (rv != APR_SUCCESS) {
475 /* The client has aborted the connection */
476 ap_log_cerror(APLOG_MARK, APLOG_TRACE1, rv, c,
477 "core_output_filter: writing data to the network");
478 c->aborted = 1;
479 }
480 setaside_remaining_output(f, ctx, bb, c);
setaside_remaining_output()は、bucket brigade bbの未出力のデータを buffere_bbに退避する。
未出力データがあった場合には、識別用にconn_recのdata_in_output_filtersに1がセットされる。
送信処理が実行されると、このCORE出力関数からは復帰する。
481 return rv; 482 }
ここまでが1)の処理となる。
以下でいくつかの送信判定用の変数値が初期化される。
483 484 bytes_in_brigade = 0; 485 non_file_bytes_in_brigade = 0; 486 eor_buckets_in_brigade = 0; 487 morphing_bucket_in_brigade = 0;
bytes_in_brigade | bucket brigade内のデータバケットの lengthの合計を格納する(lengthが-1のものを除く) |
non_file_bytes_in_brigade | bucket brigade内のデータバケット(FILEを除く)の lengthの合計を格納する(lengthが-1のものを除く) |
eor_buckets_in_brigade | EORメタデータバケットの数をカウントする |
morphing_bucket_in_brigade | bucket brigade内のデータバケットでlengthが -1のものがあった場合に1を立てるフラグ |
488 489 for (bucket = APR_BRIGADE_FIRST(bb); bucket != APR_BRIGADE_SENTINEL(bb); 490 bucket = next) { 491 next = APR_BUCKET_NEXT(bucket);
ここで、bufferd_bbとnew_bbが連結された bucket brigade bb 内のbucketが順に処理される。
492
493 if (!APR_BUCKET_IS_METADATA(bucket)) {
bucketがデータタイプのバケットの場合
494 if (bucket->length == (apr_size_t)-1) {
これはバケットのデータ長が事前には不明な場合で、
たとえば、SOCKETバケットやPIPEバケットなど、
読み込んでみないとデータ長が分からないバケットが該当する。
495 /*
496 * A setaside of morphing buckets would read everything into
497 * memory. Instead, we will flush everything up to and
498 * including this bucket.
499 */
500 morphing_bucket_in_brigade = 1;
501 }
502 else {
バケットのデータ長が分かっている場合は、bytes_in_brigadeに値を足しこむ。
503 bytes_in_brigade += bucket->length;
504 if (!APR_BUCKET_IS_FILE(bucket))
特にバケットがFILEバケットでない場合に、non_file_bytes_in_brigadeに値を足しこむ
505 non_file_bytes_in_brigade += bucket->length;
506 }
507 }
508 else if (AP_BUCKET_IS_EOR(bucket)) {
bucketが EORメタデータバケットの場合
eor_buckets_in_brigadeをカウントアップする。
509 eor_buckets_in_brigade++;
510 }
511
2) 上記のbucketのチェックを終えて、ここまでのデータ(bucket列)を実際に送信するかどうかの判定を行う。
512 if (APR_BUCKET_IS_FLUSH(bucket) 513 || non_file_bytes_in_brigade >= THRESHOLD_MAX_BUFFER 514 || morphing_bucket_in_brigade 515 || eor_buckets_in_brigade > MAX_REQUESTS_IN_PIPELINE) { 516 /* this segment of the brigade MUST be sent before returning. */
このif文の条件を満たすと、ここまでのbucketの送信を実行する。
下記でflush_uptoにセットされるnextが送信停止位置を示すことになる。
条件は以下のいずれかが真の場合だ。
(a)FLUSHメタデータバケットがあった場合
(b)FILEではないデータバケットのlengthの合計(non_file_bytes_in_brigade)がTHRESHOLD_MAX_BUFFER(64KB)以上になった場合
364 #define THRESHOLD_MAX_BUFFER 65536
(d)データバケットにlengthが-1のものがあった場合(morphing_bucket_in_brigade==1)
(c)EORメタデータバケットがMAX_REQUESTS_IN_PIPELINE(5)個以上処理された場合
(eor_buckets_in_brigade) 365 #define MAX_REQUESTS_IN_PIPELINE 5
517
518 if (APLOGctrace6(c)) {
519 char *reason = APR_BUCKET_IS_FLUSH(bucket) ?
520 "FLUSH bucket" :
521 (non_file_bytes_in_brigade >= THRESHOLD_MAX_BUFFER) ?
522 "THRESHOLD_MAX_BUFFER" :
523 morphing_bucket_in_brigade ? "morphing bucket" :
524 "MAX_REQUESTS_IN_PIPELINE";
525 ap_log_cerror(APLOG_MARK, APLOG_TRACE6, 0, c,
526 "core_output_filter: flushing because of %s",
527 reason);
528 }
529 /*
530 * Defer the actual blocking write to avoid doing many writes.
531 */
532 flush_upto = next;
flush_uptoに値がセットされた場合、以降の処理で、先頭からこの直前(ここで処理されているbucket)までのbucket列を出力することになる(3)。
ここで、いったん送信条件をリセットする。
533
534 bytes_in_brigade = 0;
535 non_file_bytes_in_brigade = 0;
536 eor_buckets_in_brigade = 0;
537 morphing_bucket_in_brigade = 0;
538 }
539 }
540
541 if (flush_upto != NULL) {
3) flush_uptoがNULLでない場合、ここまでのデータ(bucket列)の送信が実行される。
542 ctx->tmp_flush_bb = apr_brigade_split_ex(bb, flush_upto,
543 ctx->tmp_flush_bb);
まず、flush_uptoから末尾までのbucket列を分割し、コンテキスト情報のtmp_flush_bbに退避する。
544 rv = send_brigade_blocking(net->client_socket, bb,
545 &(ctx->bytes_written), c);
そして、残った先頭からfulsh_uptoの前までのbucketからなる bucket brigade bb をブロックモードで送信する。
ブロックモードでの送信処理を辿ると、その内部で非ブロックモードの送信処理send_brigade_nonblocking()を呼び出している。
非ブロックでの送信処理の実行結果が、エラーで、エラーコードがEAGAIN/EWOULDBLOCK の場合に、poll関数で、書込み可能になるまで待機する(Timeoutの設定が待ち時間の上限)処理が追加されている。
546 if (rv != APR_SUCCESS) {
547 /* The client has aborted the connection */
548 ap_log_cerror(APLOG_MARK, APLOG_TRACE1, rv, c,
549 "core_output_filter: writing data to the network");
550 c->aborted = 1;
551 return rv;
552 }
553 APR_BRIGADE_CONCAT(bb, ctx->tmp_flush_bb);
送信処理を終えると、tmp_flush_bbに退避しておいた残りのbucket列を bb に繋ぎなおしている。
554 }
555
556 if (bytes_in_brigade >= THRESHOLD_MIN_WRITE) {
4) FILEを含むデータバケットのlengthの合計が、THRESHOLD_MIN_WRITE(4KB)以上になった場合にも出力処理を実行する。
363 #define THRESHOLD_MIN_WRITE 4096
ここでは、出力は非ブロックモードで行われる。
557 rv = send_brigade_nonblocking(net->client_socket, bb,
558 &(ctx->bytes_written), c);
559 if ((rv != APR_SUCCESS) && (!APR_STATUS_IS_EAGAIN(rv))) {
560 /* The client has aborted the connection */
561 ap_log_cerror(APLOG_MARK, APLOG_TRACE1, rv, c,
562 "core_output_filter: writing data to the network");
563 c->aborted = 1;
564 return rv;
565 }
566 }
567
568 setaside_remaining_output(f, ctx, bb, c);
未送信データが存在した場合buffered_bbに保持しておいて、識別用にconn_recのdata_in_output_filtersに1をセットする。
569 return APR_SUCCESS; 570 }
ここまでが、CORE出力関数となる。
実際のソケットの送信処理は、下記の関数内で実行されている。
いずれも server/core_filters.c ファイル内でstatic関数として定義されている。
概要は上述した通り。
- send_brigade_nonblocking
- send_brigade_blocking
送信の実体は、sendfile()であったり、バケットのデータをI/Oベクトルに収めてのwritev()であったりする。
これは、別途コードを追う予定。
2014年11月4日火曜日
リクエスト処理の流れ: eventMPM コネクション処理フロー
コネクション処理フロー
worker_thread()、process_socket()とlistener_thread()の処理を概観して、コネクションのconn_state_e情報と処理の流れを図にした。accept待ちのListenソケットがlistener_threadで接続を受け付けて、workerスレッドのprocess_socketでのリクエスト処理/出力処理を経て、ソケットを閉じたり、次リクエスト受け付けを待つという流れになっている。
青い四角がworkerスレッドの処理、黄色い四角がlistenerスレッドの処理。ソケットクローズは、listenerスレッドで行われる。ただ、これは正常系の場合で、既に見てきているように、ソケットのクローズは、エラー時などには別のタイミングでも実行されている。
SUSPENDED状態を持つタイムアウト処理は、いまのところmod_dialupにしか実装は見当たらない。
タイムアウト処理をSUSPENDED状態の中で繰り返し、処理を終えると、ソケットを閉じる処理に向かっている。
登録:
投稿 (Atom)