2014年12月22日月曜日

apr_file_t 情報と FILEデータバケット

ここでは apr_bucketとapr_bucket_brigade で調べたデータバケットのひとつであるFILEデータバケットを見ていく。

(1)apr_bucket_file情報

FILEデータバケットを見てみる。
FILEデータバケットの格納データはapr_bucket_file情報だ。

apr_bucket_file情報は、以下の情報を持っていて、バケット情報のタイプ依存変数 data にセットされる。

(apr-util-1.5.4/include/apr_brigade.h)
    612 struct apr_bucket_file {
    613     /** Number of buckets using this memory */
    614     apr_bucket_refcount  refcount;

共有リソース用の参照回数の情報

    615     /** The file this bucket refers to */
    616     apr_file_t *fd;

FILEバケット情報の処理対象のapr_file_t情報がセットされる

    617     /** The pool into which any needed structures should
    618      *  be created while reading from this file bucket */
    619     apr_pool_t *readpool;
    620 #if APR_HAS_MMAP
    621     /** Whether this bucket should be memory-mapped if
    622      *  a caller tries to read from it */
    623     int can_mmap;

mmapが使用できるかどうかのフラグ。
centos環境ではAPR_HAS_MMAPは定義されているので、この値は1が初期値。

    624 #endif /* APR_HAS_MMAP */
    625 };

この構造体の先頭の変数、refcountは、この情報が複数のバケットで共有されるタイプのリソースであることを意味している。
この変数には次の構造体 apr_bucket_refcount を使ってアクセスされる。

(include/apr_buckets.h)

    524 typedef struct apr_bucket_refcount apr_bucket_refcount;

    531 struct apr_bucket_refcount {
    532     /** The number of references to this bucket */
    533     int          refcount;
    534 };

(2)FILEデータバケットタイプ情報

FILEデータバケットのバケットタイプ apr_bucket_type_t情報は次の通り。

(apr-util-1.5.4/buckets/apr_buckets_file.c)

    221 APU_DECLARE_DATA const apr_bucket_type_t apr_bucket_type_file = {
    222     "FILE", 5, APR_BUCKET_DATA,
    223     file_bucket_destroy,
    224     file_bucket_read,
    225     file_bucket_setaside,
    226     apr_bucket_shared_split,
    227     apr_bucket_shared_copy
    228 };

これを見ると、split関数とcopy関数に共有リソースの処理用関数がそのまま使われている。
また、file_bucket_destroy()からも、共有リソース破棄用の関数(apr_bucket_shared_destroy)が呼び出されている。

共有リソースというのは、参照するデータを複数のバケットで共有するための仕組みで、refcountはその名の通り、このリソースを参照しているバケット数を格納するようになっている。
初期値は1で、コピー(copy)や、分割(split)によって、その数はカウントアップされる。

(2.1)copy関数

下記はcopy関数だ。

apr-util-1.5.4/buckets/apr_buckets_refcount.c)

     33 APU_DECLARE_NONSTD(apr_status_t) apr_bucket_shared_copy(apr_bucket *a,
     34                                                         apr_bucket **b)
     35 {
     36     apr_bucket_refcount *r = a->data;
     37
     38     apr_bucket_simple_copy(a, b);

apr_bucket_simple_copy(a,b)は、 apr_bucket *bにapr_bucketの領域を確保し、値を*aからコピーする処理だ。

(apr-util-1.5.4/buckets/apr_buckets_simple.c)

     19 APU_DECLARE_NONSTD(apr_status_t) apr_bucket_simple_copy(apr_bucket *a,
     20                                                         apr_bucket **b)
     21 {
     22     *b = apr_bucket_alloc(sizeof(**b), a->list); /* XXX: check for failure? */
     23     **b = *a;
     24
     25     return APR_SUCCESS;
     26 }

この時、タイプ依存情報のdataのアドレスもコピーされるので、a,bは同じapr_bucket_file情報を参照するようになる。

     39     r->refcount++;

そして、データ依存関数のrefcountをカウントアップする。

     40
     41     return APR_SUCCESS;
     42 }


(2.2)split関数

続いて、split関数だ

(apr-util-1.5.4/buckets/apr_buckets_refcount.c)

     19 APU_DECLARE_NONSTD(apr_status_t) apr_bucket_shared_split(apr_bucket *a,
     20                                                          apr_size_t point)
     21 {
     22     apr_bucket_refcount *r = a->data;
     23     apr_status_t rv;
     24
     25     if ((rv = apr_bucket_simple_split(a, point)) != APR_SUCCESS) {

apr_bucket_simple_splitでは、内部でapr_bucket_simple_copy()を実行し、同じapr_bucketを複製する。
そして、バケット情報のstartとlengthを調整している。

(apr-util-1.5.4/buckets/apr_buckets_simple.c)

     28 APU_DECLARE_NONSTD(apr_status_t) apr_bucket_simple_split(apr_bucket *a,
     29                                                          apr_size_t point)
     30 {
     31     apr_bucket *b;
     32
     33     if (point > a->length) {

pointがバケットaのサイズより大きければ、エラーで返している。

     34         return APR_EINVAL;
     35     }
     36
     37     apr_bucket_simple_copy(a, &b);

バケットaの複製バケットbを作成する。

     38
     39     a->length  = point;

バケットaの方のlengthをpointにおきかえる。

     40     b->length -= point;

バケットbの方のlengthはpoint分、引き算する。

     41     b->start  += point;

バケットbの先頭を元のaの先頭から、point分、先に進める。

     42
     43     APR_BUCKET_INSERT_AFTER(a, b);

そして、バケットbをbucket brigadeの バケットaの後ろに挿入する。

     44
     45     return APR_SUCCESS;
     46 }

バケットaは、aの先頭から、pointまでのデータを含むようになり、
aから複製されたバケットbは、先頭がa+pointに、そして、もとの長さ(length)のpointから後ろ分を格納する。
新しいバケットbは、bucket brigadeのaの後ろに追加される。

     26         return rv;
     27     }
     28     r->refcount++;

apr_bucket_simple_split()でもタイプ異存情報dataは同じアドレスの情報を参照している。
そこでrefcountをカウントアップしている。

     29
     30     return APR_SUCCESS;
     31 }

(2.3)destroy関数

バケットを破棄(destroy)するときにマイナスされていく。refcount==0となると、このリソースを参照しているバケットがないことになる。
FILEデータタイプの場合、その場合に、実際のそのリソースを破棄する処理を行うようになっている。

(apr-util-1.5.4/buckets/apr_buckets_file.c)

    32 static void file_bucket_destroy(void *data)
     33 {
     34     apr_bucket_file *f = data;
     35
     36     if (apr_bucket_shared_destroy(f)) {
     37         /* no need to close the file here; it will get
     38          * done automatically when the pool gets cleaned up */
     39         apr_bucket_free(f);
     40     }
     41 }

この36行目 apr_bucket_shared_destroy(f)が、共有リソースのrefcountをマイナスする処理で、マイナスした結果が0だと真を返す。
refcountが0になると(返り値が真だと)、この関数では、apr_bucket_free()を実行するようになっている。

(2.4)read関数


read関数は、次のようになっている

(apr-util-1.5.4/buckets/apr_buckets_file.c)

     75 static apr_status_t file_bucket_read(apr_bucket *e, const char **str,
     76                                      apr_size_t *len, apr_read_type_e block)

FILEデータバケットを読み込み、データのアドレスをstrにセットする。
lenは、読み込みデータ長が格納される。
読み込み時のブロックモードをblockに指定する。

     77 {
     78     apr_bucket_file *a = e->data;
     79     apr_file_t *f = a->fd;
     80     apr_bucket *b = NULL;
     81     char *buf;
     82     apr_status_t rv;
     83     apr_size_t filelength = e->length;  /* bytes remaining in file past offset */
     84     apr_off_t fileoffset = e->start;

     86     apr_int32_t flags;

     88

     90     if (file_make_mmap(e, filelength, fileoffset, a->readpool)) {

file_make_mmapでは、FILEデータバケットをMMAPデータバケットに変換している。
FILEデータバケットの can_mmapが0の場合にはMMAPに変換できないが、
初期値では、ビルド環境依存になるが、mmapが利用可能(APR_HAS_MMAPが定義されている)なら、このフラグは1で初期化されている。

ちなみに、MMAPデータバケットには参照できるデータサイズの上限が決まっている。
APR_MMAP_LIMITバイト(MMAP_LIMITまたは、4*1024*1024(4MB))だ。
これを越えるファイルは珍しくないだろう。
この場合、FILEデータオブジェクトが、4MBまででsplitされ、先頭の4MBがMMAPデータバケットに変換される。

     91         return apr_bucket_read(e, str, len, block);

その後、MMAPデータバケットのread()関数を実行している。

     92     }

ここ以降は、MMAPデータバケットに変換できない場合の処理だ。

今回は割愛する。

    :
    154 }

(2.5)setaside関数

setaside関数は次のようになっている。

(apr-util-1.5.4/buckets/apr_buckets_file.c)

    201 static apr_status_t file_bucket_setaside(apr_bucket *data, apr_pool_t *reqpool)

バケット dataを メモリプールreqpoolに確保しなおす処理だ。
現在使用されるメモリプールが短命な場合に不都合が生じる(バケットが使用される予定があるのに破棄されてしまう)ようなケースで、より長命なプール上に退避するために使用する。

    202 {
    203     apr_bucket_file *a = data->data;
    204     apr_file_t *fd = NULL;
    205     apr_file_t *f = a->fd;
    206     apr_pool_t *curpool = apr_file_pool_get(f);

apr_foo_pool_get()はgrepしても見つからない。
APR_POOL_DECLARE_ACCESSOR(foo)マクロ
および
APR_POOL_IMPLEMENT_ACCESSOR(foo)マクロ
で定義されており、apr_foo_t構造体のpoolを返す。

    207
    208     if (apr_pool_is_ancestor(curpool, reqpool)) {

メモリプールは、aprライブラリが提供する機能で、親子(parent-child)関係と兄弟姉妹(sibling)関係を持っている。
apr_pool_create_ex()で引数にメモリプールが指定されると、生成されるプールの親にそのメモリプールがセットされ、親メモリプール側には子として新しいメモリプールがセットされる。
親メモリプールが複数の子メモリプールを持つ場合には、子メモリプールには兄弟関係を持つメモリプールのリストが接続される。
メモリプールが破棄される場合、そのメモリプールの子孫のプールもその兄弟も同時に破棄される。
reqpoolに確保しなおす要求の目的はより長命なプールにバケットを移すことであるから、reqpool自体が現在確保されているcurpoolの子孫であった場合には、確保しなおしても意味がないことになる。

apr_pool_is_ancestor()は、reqpoolのparentを辿って、curpoolが現れないかどうかをチェックする。
もし現れた場合(reqpoolがcurpoolの子孫の場合)、この関数では処理を行わない。
なお、reqpoolとcurpoolが同じプールである場合もこの関数は真を返す。

    209         return APR_SUCCESS;
    210     }
    211

以降は、reqpoolがcurpoolの子孫でない場合の処理だ。

    212     if (!apr_pool_is_ancestor(a->readpool, reqpool)) {

apr_bucket_file情報の内部に持っているメモリプール readpool も 親子関係をチェックする。

    213         a->readpool = reqpool;
    214     }
    215
    216     apr_file_setaside(&fd, f, reqpool);

apr_file_t情報をreqpoolに退避する。
aprライブラリ関数で定義されている。

    217     a->fd = fd;

apr_bucket_file情報のapr_file_t情報を fdを退避先のfdに更新して、退避処理が終わる。

    218     return APR_SUCCESS;
    219 }

(3)apr_file_t情報

apr_bucket_file情報のファイル情報は、apr_file_t情報が持っている。
これは次のような構造体だ。

(apr-1.5.1/include/arch/unix/apr_arch_file_io.h)

     93 struct apr_file_t {
     94     apr_pool_t *pool;
     95     int filedes;

ファイルディスクリプタがセットされる。
FILEデータバケットがcopy()やsplit()で新たなapr_file_t情報を生成しては、内部で次々にファイルをオープンすることになりかねない
共有リソース化することでそういった事象が回避できる。

     96     char *fname;

ファイル名

     97     apr_int32_t flags;

apr_file_open()で指定されたフラグが保持されている。
EnableSendfileが有効な場合には、APR_SENDFILE_ENABLEDフラグもONとなっている。

     98     int eof_hit;
     99     int is_pipe;
    100     apr_interval_time_t timeout;
    101     int buffered;

入出力をバッファモードで処理するかどうかの識別。
フラグにAPR_FOPEN_BUFFEREDか、APR_BUFFEREDを指定した場合に有効となる。

    102     enum {BLK_UNKNOWN, BLK_OFF, BLK_ON } blocking;
    103     int ungetchar;    /* Last char provided by an unget op. (-1 = no char)*/

    105     /* if there is a timeout set, then this pollset is used */
    106     apr_pollset_t *pollset;

    108     /* Stuff for buffered mode */
    109     char *buffer;
    110     apr_size_t bufpos;        /* Read/Write position in buffer */
    111     apr_size_t bufsize;       /* The size of the buffer */
    112     unsigned long dataRead;   /* amount of valid data read into buffer */
    113     int direction;            /* buffer being used for 0 = read, 1 = write */
    114     apr_off_t filePtr;        /* position in file of handle */

    116     struct apr_thread_mutex_t *thlock;

マルチスレッドでの排他制御用

    118 };

(4)apr_file_t情報のセットアップ : default_handler

FILEデータバケットは、apr_brigade_insert_file()で作成される。

(apr-util-1.5.4/buckets/apr_brigade.c)

    707 APU_DECLARE(apr_bucket *) apr_brigade_insert_file(apr_bucket_brigade *bb,
    708                                                   apr_file_t *f,
    709                                                   apr_off_t start,
    710                                                   apr_off_t length,
    711                                                   apr_pool_t *p)

この関数の引数には、apr_file_t情報が渡されている。

apr_bucket_brigade *bb 生成されたFILEデータバケットはこのbucket brigade bbの末尾に追加される。

apr_file_t *f 生成するFILEデータバケットに格納するapr_file_t情報

apr_off_t start 生成するFILEデータバケットは、対象のapr_file_t情報のatartバイト目からを参照する。

apr_off_t length 生成するFILEデータバケットは、対象のapr_file_t情報のstartバイト目から、lengthバイト分を参照する。

apr_pool_t *p 処理で使用するメモリプール

FILEデータバケットが内部で参照するデータ長には上限が設けられている。
1GBだ。

(apr-util-1.5.4/buckets/apr_brigade.c)

    704 /* A "safe" maximum bucket size, 1Gb */
    705 #define MAX_BUCKET_SIZE (0x40000000)

例えば、10GBのファイルの場合、1GB単位に分割されて、10個のFILEデータバケットが生成されている。
このとき、バケットの格納データ自体は同じ、apr_file_tで、バケット情報に持つstartとlengthが連続したデータを参照するように調整されることになる。
つまり、1つめのバケットはstart=0でlength=4GB、2つめのバケットは、startが4GBでlengthは同じく4GBと続き、10個のFILEバケットが生成される。

このapr_brigade_insert_file()関数の利用例を見てみる。
例えば、ハンドラフック関数であるdefault_handler()で使われている。

(httpd-2.4.9/server/core.c)

   4248 static int default_handler(request_rec *r)
   4249 {
    :
   4281     if (r->method_number == M_GET || r->method_number == M_POST) {

GET/POSTリクエストの処理の経路となる。

    :
   4330         if ((status = apr_file_open(&fd, r->filename, APR_READ | APR_BINARY

   4332                             | AP_SENDFILE_ENABLED(d->enable_sendfile)

   4334                                     , 0, r->pool)) != APR_SUCCESS) {

apr_file_open()で、r->filenameに指定されたファイルをオープンする。
この処理で生成されるのがapr_file_t情報で、fdに格納される。
メモリ領域は、r->poolを使用する。
この処理では、 (APR_READ|APR_BINARY) → (O_RDONLY|O_BINARY) に変換され、ファイルをオープンする際のフラグに使用される。
ファイル識別子は、apr_file_t情報のfiledes変数にセットされる。
また、open()では使用されないAP_SENDFILE_ENABLEDフラグを含め、フラグ情報は apr_file_t情報のflagsにセットされる。
初期値では、blockingはBLK_ON(ブロック有効)。
bufferedは、APR_FOPEN_BUFFEREDフラグの有無で判定されるが、ここでは指定されていないので、0。
bufferやbuffsizeはAPR_FOPEN_BUFFEREDが有効な場合に使用さえるが、ここでは使用されない。
その他の変数が初期化される。
そして、このapr_file_t情報のcleanup関数が登録される。
以下、apr_file_openのコードを抜粋引用しておく。

(apr-1.5.1/file_io/unix/open.c)

     90 APR_DECLARE(apr_status_t) apr_file_open(apr_file_t **new,
     91                                         const char *fname,
     92                                         apr_int32_t flag,
     93                                         apr_fileperms_t perm,
     94                                         apr_pool_t *pool)
     95 {
     96     apr_os_file_t fd;
     97     int oflags = 0;

     99     apr_thread_mutex_t *thlock;
    100     apr_status_t rv;

    102
    103     if ((flag & APR_FOPEN_READ) && (flag & APR_FOPEN_WRITE)) {
    104         oflags = O_RDWR;
    105     }
    106     else if (flag & APR_FOPEN_READ) {
    107         oflags = O_RDONLY;
    108     }
    109     else if (flag & APR_FOPEN_WRITE) {
    110         oflags = O_WRONLY;
    111     }
    112     else {
    113         return APR_EACCES;
    114     }
     :

    133     if (flag & APR_FOPEN_BINARY) {
    134         oflags |= O_BINARY;
    135     }
     :

    171
    172     if (perm == APR_OS_DEFAULT) {
    173         fd = open(fname, oflags, 0666);
    174     }
    175     else {
    176         fd = open(fname, oflags, apr_unix_perms2mode(perm));
    177     }
    178     if (fd < 0) {
    179        return errno;
    180     }
    181     if (!(flag & APR_FOPEN_NOCLEANUP)) {

    183         static int has_o_cloexec = 0;
    184         if (!has_o_cloexec)

    186         {
    187             int flags;
    188
    189             if ((flags = fcntl(fd, F_GETFD)) == -1) {
    190                 close(fd);
    191                 return errno;
    192             }
    193             if ((flags & FD_CLOEXEC) == 0) {
    194                 flags |= FD_CLOEXEC;
    195                 if (fcntl(fd, F_SETFD, flags) == -1) {
    196                     close(fd);
    197                     return errno;
    198                 }
    199             }

    201             else {
    202                 has_o_cloexec = 1;
    203             }

    205         }
    206     }
    207
    208     (*new) = (apr_file_t *)apr_pcalloc(pool, sizeof(apr_file_t));
    209     (*new)->pool = pool;
    210     (*new)->flags = flag;
    211     (*new)->filedes = fd;
    212
    213     (*new)->fname = apr_pstrdup(pool, fname);
    214
    215     (*new)->blocking = BLK_ON;
    216     (*new)->buffered = (flag & APR_FOPEN_BUFFERED) > 0;
    217
    218     if ((*new)->buffered) {
    219         (*new)->buffer = apr_palloc(pool, APR_FILE_DEFAULT_BUFSIZE);
    220         (*new)->bufsize = APR_FILE_DEFAULT_BUFSIZE;

    222         if ((*new)->flags & APR_FOPEN_XTHREAD) {
    223             (*new)->thlock = thlock;
    224         }

    226     }
    227     else {
    228         (*new)->buffer = NULL;
    229     }
    230
    231     (*new)->is_pipe = 0;
    232     (*new)->timeout = -1;
    233     (*new)->ungetchar = -1;
    234     (*new)->eof_hit = 0;
    235     (*new)->filePtr = 0;
    236     (*new)->bufpos = 0;
    237     (*new)->dataRead = 0;
    238     (*new)->direction = 0;

    240     /* Start out with no pollset.  apr_wait_for_io_or_timeout() will
    241      * initialize the pollset if needed.
    242      */
    243     (*new)->pollset = NULL;

    245     if (!(flag & APR_FOPEN_NOCLEANUP)) {
    246         apr_pool_cleanup_register((*new)->pool, (void *)(*new),
    247                                   apr_unix_file_cleanup,
    248                                   apr_unix_child_file_cleanup);
    249     }
    250     return APR_SUCCESS;
    251 }

この処理が失敗した場合がこのif文内の経路だ。
エラーログを出力して、HTTP_FORBIDDENが戻り値になっている。

   4335             ap_log_rerror(APLOG_MARK, APLOG_ERR, status, r, APLOGNO(00132)
   4336                           "file permissions deny server access: %s", r->filename);
   4337             return HTTP_FORBIDDEN;
   4338         }
    :

ファイルオープン後の処理に進む。

   4350         bb = apr_brigade_create(r->pool, c->bucket_alloc);

bucket brigade bbを作成する。
生成するのはrequestプール(r->pool)上になっている。

   4351
   4352         if ((errstatus = ap_meets_conditions(r)) != OK) {
   4353             apr_file_close(fd);
   4354             r->status = errstatus;
   4355         }
   4356         else {
   4357             e = apr_brigade_insert_file(bb, fd, 0, r->finfo.size, r->pool);

ここで、先に生成したapr_file_t情報 fdを指定して、FILEデータバケットを生成し、bucket brigade bbに追加する処理が行われる。
第3引数は対象ファイルのデータの先頭位置を指定している。0は先頭ということだ。
第4引数は、データサイズ。r->finfo.sizeには、ファイルサイズがセット済みだ。
この関数では、つまり、r->filenameで用意されたファイル全体をFILEデータバケットとして、bucket brigadeに追加している。

   4358
   4359 #if APR_HAS_MMAP
   4360             if (d->enable_mmap == ENABLE_MMAP_OFF) {
   4361                 (void)apr_bucket_file_enable_mmap(e, 0);
   4362             }
   4363 #endif
   4364         }
   4365
   4366         e = apr_bucket_eos_create(c->bucket_alloc);
   4367         APR_BRIGADE_INSERT_TAIL(bb, e);

EOSメタデータバケットを作成し、さらにbucket brigade bbに追加する。

   4368
   4369         status = ap_pass_brigade(r->output_filters, bb);

そして、この処理では、生成したbucket brigade bbを出力フィルタに引き渡している。
出力フィルタは、最終的にはCORE出力フィルタに引き渡され、AP_SENDFILE_ENABLED フラグが有効かどうかによって最終的な処理の方式は異なるが、このFILEデータバケットはソケットからクライアントにデータとして送信されることになる。

    :


今回はここまで

2014年12月15日月曜日

CORE出力フィルタ: ソケット送信処理(1)

CORE出力フィルタ(core_output_filter)から呼び出されているソケット送信処理を実行する関数は以下の2つだった。

(1) send_brigade_nonblocking
(2) send_brigade_blocking

これを追ってみる。

(1)send_brigade_nonblocking関数


bucket brigade bb の先頭から、データを持っているデータバケットを対象に送信を行う。
ソケット送信を実行すると、この関数は(未送信のバケットがあっても)復帰する。
送信済みのbucketは、bbから外され、破棄される。

データバケットはFILEバケットをEnableSendFile on でsendfile()で送信する場合は、
ファイルサイズが256バイト以上であれば、sendfile()で送信する。
sendfileで完全に送信が完了しなかった場合には、FILEバケットが分割され、未送信のFILEバケットが bucket brigade bb に残される。

それ以外のデータバケットの場合、writev()で送信する。この時データバケットごとに
I/Oベクトルの配列にデータをセットし、一定数を超えた場合にはソケット送信を実行する。

あるいは、bucket brigade bbのすべてを処理終えたした時にも、送信データがあればソケット送信を行う。

全データが送信できなかった場合には、送信済みのデータバケットは破棄され、未送信の
データバケットが bucket brigade bb に残される。

以下詳細を見てみる。

(httpd-2.4.9/server/core_filters.c)

    616 static apr_status_t send_brigade_nonblocking(apr_socket_t *s,
    617                                              apr_bucket_brigade *bb,
    618                                              apr_size_t *bytes_written,
    619                                              conn_rec *c)

apr_socket_t *s 通信用のソケット情報
apr_bucket_brigade *bb 送信データ用bucket brigade
apr_size_t *bytes_written 出力データサイズ(関数側で返す)
conn_rec *c コネクション情報

    620 {
    621     apr_bucket *bucket, *next;
    622     apr_status_t rv;
    623     struct iovec vec[MAX_IOVEC_TO_WRITE];
    624     apr_size_t nvec = 0;
    625
    626     remove_empty_buckets(bb);

remove_empty_buckets(bb)は、bucket brigade bbを先頭から辿り、
メタデータバケットかlength==0のデータバケットが続いている間、
そのバケットを bb から取り除き、破棄している。

length!=0のデータバケットが残った場合に、bucket brigadeが処理される。

    627
    628     for (bucket = APR_BRIGADE_FIRST(bb);
    629          bucket != APR_BRIGADE_SENTINEL(bb);
    630          bucket = next) {
    631         next = APR_BUCKET_NEXT(bucket);
    632 #if APR_HAS_SENDFILE

CentOSにはsendfileは存在する

    633         if (APR_BUCKET_IS_FILE(bucket)) {

FILEバケットの場合、
(1)EnableSendFileディレクティブが offではない場合
かつ
(2)サイズがAP_MIN_SENDFILE_BYTES(256)バイト以上の場合
sendfileを使った処理を行う。

EnableSendfiFileがonであっても小さなファイルの送信にはsendfileは使われないようだ。

    634             apr_bucket_file *file_bucket = (apr_bucket_file *)(bucket->data);
    635             apr_file_t *fd = file_bucket->fd;
    636             /* Use sendfile to send this file unless:
    637              *   - the platform doesn't support sendfile,
    638              *   - the file is too small for sendfile to be useful, or
    639              *   - sendfile is disabled in the httpd config via "EnableSendfile off"
    640              */
    641
    642             if ((apr_file_flags_get(fd) & APR_SENDFILE_ENABLED) &&
    643                 (bucket->length >= AP_MIN_SENDFILE_BYTES)) {


     56 #define AP_MIN_SENDFILE_BYTES           (256)


    644                 if (nvec > 0) {

nvecは初期値は0
bucket brigadeを処理していく中で、I/Oベクトルにデータを追加している。
そのバッファされているI/Oベクトルのデータを出力しておく。

    645                     (void)apr_socket_opt_set(s, APR_TCP_NOPUSH, 1);
    646                     rv = writev_nonblocking(s, vec, nvec, bb, bytes_written, c);
    647                     nvec = 0;
    648                     if (rv != APR_SUCCESS) {
    649                         (void)apr_socket_opt_set(s, APR_TCP_NOPUSH, 0);
    650                         return rv;
    651                     }
    652                 }
    653                 rv = sendfile_nonblocking(s, bucket, bytes_written, c);

FILEバケットをsendfileで送信する。
出力可能でなければ、ただちに復帰する。
ファイル全体を送信できた場合には、このbucketをbucket brigadeから取り除いて、apr_bucket_destroy()
を行う。
一部のみ送信できた場合は、このbucketを送信済みの部分と残った部分の2つに分割し、
送信済みのバケットを取り除き、apr_bucket_destroyを行う。


    654                 if (nvec > 0) {


ここには来ないのではないか(647行目で0になっている)。


    655                     (void)apr_socket_opt_set(s, APR_TCP_NOPUSH, 0);
    656                 }
    657                 if (rv != APR_SUCCESS) {
    658                     return rv;
    659                 }
    660                 break;


出力したら、処理を終える(forループを抜ける)。




    661             }
    662         }
    663 #endif /* APR_HAS_SENDFILE */
    664         /* didn't sendfile */
    665         if (!APR_BUCKET_IS_METADATA(bucket)) {

このif文はデータバケットの場合の処理

    666             const char *data;
    667             apr_size_t length;
    668
    669             /* Non-blocking read first, in case this is a morphing
    670              * bucket type. */
    671             rv = apr_bucket_read(bucket, &data, &length, APR_NONBLOCK_READ);

データバケットからデータを読み込む。
読込は非ブロックモードで行っている。

    672             if (APR_STATUS_IS_EAGAIN(rv)) {

上流から渡ってきているbucket brigadeにPIPEバケットやSOCKETバケットがあった場合、非ブロックで読み込んで、読み込みデータがなければ、この経路に入る。

    673                 /* Read would block; flush any pending data and retry. */
    674                 if (nvec) {

ここは、既にデータバケットを処理しており、未送信のものが存在している場合だ。

    675                     rv = writev_nonblocking(s, vec, nvec, bb, bytes_written, c);

I/Oベクトルのデータのソケット送信を実施する。

    676                     if (rv) {

ここは rv!=APR_SUCCESS(0)の場合の経路となる。

    677                         return rv;
    678                     }
    679                     nvec = 0;

nvecの値は0でリセットする。
この経路では、データがソケット送信された場合にも、引き続き処理が行われている。

    680                 }
    681
    682                 rv = apr_bucket_read(bucket, &data, &length, APR_BLOCK_READ);

そして、次にブロックモードでデータを読み込む。

    683             }
    684             if (rv != APR_SUCCESS) {

671行目のapr_bucket_read(APR_NONBLOCK_READ)の戻り値がEAGAIN以外のエラーか、
682行目のapr_bucket_read(APR_BLOCK_READ)がエラーの場合に
この経路に入る。

ここでは、エラーでreturn している。

    685                 return rv;
    686             }
    687
    688             /* reading may have split the bucket, so recompute next: */
    689             next = APR_BUCKET_NEXT(bucket);
    690             vec[nvec].iov_base = (char *)data;
    691             vec[nvec].iov_len = length;
    692             nvec++;

読み込んだデータをI/Oベクトルに追加する。

    693             if (nvec == MAX_IOVEC_TO_WRITE) {

I/Oベクトルの要素数がMAX_IOVEC_TO_WRITEになったら、出力を実行する。

    694                 rv = writev_nonblocking(s, vec, nvec, bb, bytes_written, c);

writev()関数を使って、I/Oベクトルの情報を出力する。
出力可能でなけばただちに復帰する。
出力完了したデータサイズに応じて、bucket brugade bb のlengthを消化して、
bucketをbb から取り除き、apr_bucket_destroy()を行う。
bucketのデータの一部しか送信できていない場合には、そのbucketを分割し、
送信済みのbucketを取り外して、apr_bucket_destroy()を行っている。

    695                 nvec = 0;

nvecは出力後、0にリセットする。

    696                 if (rv != APR_SUCCESS) {
    697                     return rv;
    698                 }
    699                 break;

出力したら、処理を終える(forループを抜ける)。

    700             }
    701         }
    702     }
    703
    704     if (nvec > 0) {

bucket brigadeを最後までチェックして、MAX_IOVEC_TO_WRITEにならなかった場合に
ここで出力する。

    705         rv = writev_nonblocking(s, vec, nvec, bb, bytes_written, c);
    706         if (rv != APR_SUCCESS) {
    707             return rv;
    708         }
    709     }
    710
    711     remove_empty_buckets(bb);
    712
    713     return APR_SUCCESS;
    714 }


(2)send_brigade_blocking関数


送信処理だが、こちらはブロックモードだ。
実装では、send_brigade_nonblocking()を呼んでいる。

ブロックモードでは、send_brigade_nonblocking()を実行して、結果がEAGAINの場合(書込みできない)、apr_socket_t情報のtimeoutの設定値を取得して、poll()で書込み可能になるまで待機する。

timeoutになる前に書込み可能になれば、再度send_brigade_nonblocking()を実行する。
timeoutした場合は、APR_TIMEUPでreturnする。
その他のエラーの場合もエラーでreturnする。

つまり、send_brigade_nonblockingと異なり、非ブロックで書き込みを行い、ソケットの書込み準備ができていない場合には、書込み可能になるまで、Timeoutディレクティブ等で指定されているタイムアウト値に従って待機するようになっている。

    726 static apr_status_t send_brigade_blocking(apr_socket_t *s,
    727                                           apr_bucket_brigade *bb,
    728                                           apr_size_t *bytes_written,
    729                                           conn_rec *c)
    730 {
    731     apr_status_t rv;
    732
    733     rv = APR_SUCCESS;
    734     while (!APR_BRIGADE_EMPTY(bb)) {
    735         rv = send_brigade_nonblocking(s, bb, bytes_written, c);

ここでsend_brigade_nonblocking()が実行されている。

    736         if (rv != APR_SUCCESS) {
    737             if (APR_STATUS_IS_EAGAIN(rv)) {

ソケットの書込み準備ができていなかった場合にこの経路に入る。


    738                 /* Wait until we can send more data */
    739                 apr_int32_t nsds;
    740                 apr_interval_time_t timeout;
    741                 apr_pollfd_t pollset;
    742
    743                 pollset.p = c->pool;
    744                 pollset.desc_type = APR_POLL_SOCKET;
    745                 pollset.reqevents = APR_POLLOUT;
    746                 pollset.desc.s = s;
    747                 apr_socket_timeout_get(s, &timeout);

apr_socket_t情報のtimeout設定を取得している。

    748                 do {
    749                     rv = apr_poll(&pollset, 1, &nsds, timeout);

ソケットが書き込み可能になるまで、所定の時間待つ。
シグナルで割りこまれた場合(EINTR)には処理を継続する。

    750                 } while (APR_STATUS_IS_EINTR(rv));
    751                 if (rv != APR_SUCCESS) {

apr_poll()でEINTR以外の何らかのエラーが発生したか、タイムアウトの場合の経路だ。

    752                     break;
    753                 }
    754             }
    755             else {

send_brigade_nonblockingがEAGAIN以外のエラーだった場合の経路になる

    756                 break;
    757             }
    758         }

send_brigade_nonblocking()が成功した場合、あるいは、ソケットが書き込み可能になった場合、
bucket brigadeが空になるまでsend_brigade_nonblocking()処理を継続する。

    759     }

send_brigade_nonblocking処理は、送信済みのbucketを破棄するので、すべて送信し終えれば、bucket brigadeが空となり
この734行からのwhile()ループを抜ける。

    760     return rv;
    761 }

以上、2関数を見たが、ここでも最終的なソケット送信関数は呼ばれていなかった。
ここでは送信処理として次の関数から呼ばれてる。

  • sendfile_nonblocking
  • writev_nonblocking

実際のところ、上記関数の内部を見ても、ソケット送信関数は呼ばれていない。
それぞれは、さらに次のAPRライブラリ関数を使用し、そこでようやく直接ソケットディスクリプタを扱う関数にたどり着く。

  • apr_socket_sendfile
  • apr_socket_sendv

もう少しなので、辿ってみることにする。

2014年12月8日月曜日

入力フィルタ: CORE_IN入力フィルタ

CORE_IN入力フィルタ: ap_core_input_filter


CORE_IN入力フィルタは、入力フィルタの最下層のネットワークフィルタになる。
この下には入力フィルタはないので、CIRE_IN入力フィルタでは、ネットワークからクライアントの送ってきたデータを受信し、処理して、上位の入力フィルタに引き渡す処理が行われる。
以前、入力フィルタの概要で少し見たが、今回はもう少し細かく全体を見ていく。

     94 apr_status_t ap_core_input_filter(ap_filter_t *f, apr_bucket_brigade *b,
     95                                   ap_input_mode_t mode, apr_read_type_e block,
     96                                   apr_off_t readbytes)

引数は次の通り。

引数 説明
ap_filter_t *f 入力フィルタ情報。fはap_core_input_filterに関する情報を格納する。
apr_bucket_brigade *b 吸い上げたデータを格納する宛先のbucket brigade。
ap_input_mode_t mode 入力モード。
AP_MODE_READBYTES 入力フィルタは最大readbytesバイト読み込んでreturn する
AP_MODE_GETLINE 入力フィルタは、1行(CRLF)分のデータを読み込んでreturnする。(1行が長すぎたり、あるいは、CRLFがないような場合には部分的なデータを読み込んでreturnする可能性がある)
AP_MODE_EATCRLF 入力フィルタは、CRLFを見つけたら消費(除去)する
AP_MODE_SPECULATIVE 入力フィルタの読み込みは投機的なものとみなされる。読み込まれたデータは、以降の別のモードでの処理のために蓄積される。
AP_MODE_EXHAUSTIVE 入力フィルタの読み込みは完全なものとみなされる。それ以上読込できなくなるまで読込が行われる。このモードは特に注意して扱うこと。
AP_MODE_INIT 入力フィルタはコネクションを初期化する。NNTP over SSL やFTP over SSL等で必要。
apr_read_type_e block 読込タイプ
APR_BLOCK_READ ブロックモード
APR_NONBLOCK_READ 非ブロックモード
apr_off_t readbytes 入力データサイズ


2014年12月1日月曜日

apr_socket_t 情報と SOCKETデータバケット

(1)SOCKETデータバケットタイプ情報

SOCKETデータバケットのバケットタイプ情報は次の通り。
(apr-util-1.5.4/buckets/apr_buckets_socket.c)

    107 APU_DECLARE_DATA const apr_bucket_type_t apr_bucket_type_socket = {
    108     "SOCKET", 5, APR_BUCKET_DATA,
    109     apr_bucket_destroy_noop,
    110     socket_bucket_read,
    111     apr_bucket_setaside_notimpl,
    112     apr_bucket_split_notimpl,
    113     apr_bucket_copy_notimpl
    114 };

(2)apr_socket_t 情報のセットアップ


SOCKETデータバケットで、独自の実装を持っているのはread関数だけだ。
それをみておきたいが、その前に、ソケット情報(apr_socket_t)を生成する listenerスレッドのacceptの処理部分を見たい。
listenerスレッドのListenソケットの処理ではacceptの処理について触れた。


(httpd-2.4.9/server/mpm/event/event.c)
   1368 static void * APR_THREAD_FUNC listener_thread(apr_thread_t * thd, void *dummy)
   1369 {
    :
   1616                     rc = lr->accept_func(&csd, lr, ptrans);

accept_func()の実体は、ap_unixd_accept()だ。
(httpd-2.4.9/server/mpm/event/event.c)
   1122 static apr_status_t init_pollset(apr_pool_t *p)
   1123 {
    :
   1151         lr->accept_func = ap_unixd_accept;

ap_unixd_accept()関数で、ソケットがacceptされ、apr_socket_t情報が作成される。
このapr_socket_t情報は、listenerスレッドの、void*型の変数csdにアドレスがセットされる。
(httpd-2.4.9/os/unix/unixd.c)
    291 AP_DECLARE(apr_status_t) ap_unixd_accept(void **accepted, ap_listen_rec *lr,
    292                                          apr_pool_t *ptrans)
    293 {
    294     apr_socket_t *csd;
    :
    300     *accepted = NULL;
    301     status = apr_socket_accept(&csd, lr->sd, ptrans);
    302     if (status == APR_SUCCESS) {
    303         *accepted = csd;
この*acceptedが、listenerスレッドの処理のcsdとなっている。


apr_socket_t情報は以下の通り。
(apr-1.5.1/include/arch/unix/apr_arch_networkio.h)

    103 struct apr_socket_t {
    104     apr_pool_t *pool;
    105     int socketdes;
    106     int type;
    107     int protocol;
    108     apr_sockaddr_t *local_addr;
    109     apr_sockaddr_t *remote_addr;
    110     apr_interval_time_t timeout;
    111 #ifndef HAVE_POLL
     :
    113 #endif
    114     int local_port_unknown;
    115     int local_interface_unknown;
    116     int remote_addr_unknown;
    117     apr_int32_t options;
    118     apr_int32_t inherit;
    119     sock_userdata_t *userdata;
    120 #ifndef WAITIO_USES_POLL
     :
    123 #endif
    124 };

もう少し先を辿る。
apr_socket_accept()だ。

2014年11月25日火曜日

apr_bucket と apr_bucket_brigade

今度は CORE_IN入力フィルタを見ようとしたところだが、入出力フィルタではbucketとbucket brigadeの処理が重要だ。
そこで、先に少し見ておくことにした。

bucket brigade(apr_bucket_brigade)は内部にbucket(apr_bucket)の列のリング構造を持っている。
bucket情報は前後へのポインタを持っており、bucket brigade情報自体もbucket情報へのポインタを持っている。
bucket brigadeの持っている次へのポインタが、bucket列の先頭に繋がり、bucket列を順に辿って、bucketの末尾の次が再び、bucket brigadeに繋がってリング構造となる。
逆方向も同じで、bucket brigadeの前にbucket列の末尾が繋がり、前に辿っていくと、bucket列の先頭に繋がり、その前に再びbucket brigadeが位置する。
このbucket brigade自身(内部)のアドレスは有効なbucket情報ではなく、センチネル(番兵)として機能する。



brigadeが軍隊がらみの用語らしく、bucket brigadeが何を意味するのか分からなかったが、単純に "bucket brigade" で辞書を引けば載っていた。バケツリレーの列のことだった。
バケツリレーか。なるほどね。

2014年11月17日月曜日

出力フィルタ: CORE出力フィルタ

CORE出力フィルタ: ap_core_output_filter



以前、出力フィルタの概要を見た。
今度は、出力フィルタの階層の最下流にあるCORE出力フィルタを少し細かく見る。
中間層の出力フィルタは上流から受け取った bucket brigadeを加工し、新しいbucket brigadeを作成すると、これを下流の出力フィルタに渡した。
最下流では、もう渡し先の出力フィルタはないので、データをクライアントに向けて送信する。


(httpd-2.4.9/server/core_filters.c)

    372 apr_status_t ap_core_output_filter(ap_filter_t *f, apr_bucket_brigade *new_bb)


ap_filter_t *fで、この出力フィルタ(ap_core_output_filter)のフィルタ情報が渡され、
apr_bucket_brigade *new_bbには送信するbacket brigadeが渡される。

    373 {
    374     conn_rec *c = f->c;

この処理のconn_rec情報。

    375     core_net_rec *net = f->ctx;

core_net_rec情報は、process_socket()で実行されるpre_connectionフック関数のうち
core_pre_connection()で用意される。

    376     core_output_filter_ctx_t *ctx = net->out_ctx;

CORE出力フィルタ用のコンテキスト情報。最初の関数呼び出しで本フィルタ内で作成される。

    377     apr_bucket_brigade *bb = NULL;
    378     apr_bucket *bucket, *next, *flush_upto = NULL;
    379     apr_size_t bytes_in_brigade, non_file_bytes_in_brigade;
    380     int eor_buckets_in_brigade, morphing_bucket_in_brigade;
    381     apr_status_t rv;
    382
    383     /* Fail quickly if the connection has already been aborted. */
    384     if (c->aborted) {

ここは異常系の処理だ。

    385         if (new_bb != NULL) {
    386             apr_brigade_cleanup(new_bb);
    387         }
    388         return APR_ECONNABORTED;
    389     }
    390
    391     if (ctx == NULL) {

コンテキスト情報 core_output_filter_ctx_t を用意する。
リクエスト処理で初めてCORE出力フィルタが呼ばれたときに行われることになる。

CORE出力フィルタのコンテキスト情報の定義は次の通り。

     81 struct core_output_filter_ctx {
     82     apr_bucket_brigade *buffered_bb;
     83     apr_bucket_brigade *tmp_flush_bb;
     84     apr_pool_t *deferred_write_pool;
     85     apr_size_t bytes_written;
     86 };

    392         ctx = apr_pcalloc(c->pool, sizeof(*ctx));
    393         net->out_ctx = (core_output_filter_ctx_t *)ctx;
    394         /*
    395          * Need to create tmp brigade with correct lifetime. Passing
    396          * NULL to apr_brigade_split_ex would result in a brigade
    397          * allocated from bb->pool which might be wrong.
    398          */

2つの作業用のbucket brigadeを作成し、コンテクスト情報に登録する。
いずれも、conn_recのプール(transactionプール)から確保している。

    399         ctx->tmp_flush_bb = apr_brigade_create(c->pool, c->bucket_alloc);
    400         /* same for buffered_bb and ap_save_brigade */
    401         ctx->buffered_bb = apr_brigade_create(c->pool, c->bucket_alloc);
    402     }
    403
    404     if (new_bb != NULL)
    405         bb = new_bb;

上流から渡されてきたbucket brigadeのアドレスを作業用の変数 bbにセットする。

    406
    407     if ((ctx->buffered_bb != NULL) &&
    408         !APR_BRIGADE_EMPTY(ctx->buffered_bb)) {
    409         if (new_bb != NULL) {
    410             APR_BRIGADE_PREPEND(bb, ctx->buffered_bb);
    411         }
    412         else {
    413             bb = ctx->buffered_bb;
    414         }

コンテキスト情報のbuffered_bbの内容と、今回の呼び出しで渡されてきた bucket brigadeの内容を
マージしている。
つまり、受け取ったnew_bbの前にフィルタの内部に前の処理時に留保されたbucket brigadeであるbuffered_bbを繋いでいる。

    415         c->data_in_output_filters = 0;

data_in_output_filtersは出力フィルタに未出力のデータ(bucket brigade)があるかどうかを判定している。
初期値として、なし(0)をセットする。

    416     }
    417     else if (new_bb == NULL) {

上流からbucket brigadeが渡されていない場合、
そして、buffered_bbにもデータがない場合、
処理対象となるbucketがないため、処理は行われず、直ちに成功でreturnする。

    418         return APR_SUCCESS;
    419     }

この部分のソースコードには、処理概要を説明している長めのコメントがある。
これを読めば、やっていることが分かるが、
ここでは、ソースコードに従って見ていきたいので、削った。
同じことを以下で書いている。
1)、2)だのa)、b)だのは、このコメントに書かれていた番号を該当する箇所の説明につけたものだ。

    467
    468     if (new_bb == NULL) {

1) ここは、コンテキスト情報のbufferd_bbに既にあったbucketだけが処理対象となるケース。

new_bbがNULLになるケースがどれほどあるか把握していないが、
process_socket()でCONN_STATE_WRITE_COMPLETION状態の場合に実行される
以下の989行目のコードは該当している。

(httpd-2.4.9/server/mpm/event/event.c)
    982     if (cs->pub.state == CONN_STATE_WRITE_COMPLETION) {
    983         ap_filter_t *output_filter = c->output_filters;
    984         apr_status_t rv;
    985         ap_update_child_status_from_conn(sbh, SERVER_BUSY_WRITE, c);
    986         while (output_filter->next != NULL) {
    987             output_filter = output_filter->next;
    988         }
    989         rv = output_filter->frec->filter_func.out_func(output_filter, NULL);

ここでは、無条件にデータの送信が非ブロックモードで実行される。

    469         rv = send_brigade_nonblocking(net->client_socket, bb,
    470                                       &(ctx->bytes_written), c);

send_brigade_nonblocking()処理内を辿ると、実際のクライアントへの送信処理が行われている。
net->client_socketはクライアントとの通信ソケット情報。
bbはbucket brigade。
ctx->bytes_writtenには送信済みデータ長が格納される(加算される)。
cはconn_rec情報。

    471         if (APR_STATUS_IS_EAGAIN(rv)) {
    472             rv = APR_SUCCESS;
    473         }
    474         else if (rv != APR_SUCCESS) {
    475             /* The client has aborted the connection */
    476             ap_log_cerror(APLOG_MARK, APLOG_TRACE1, rv, c,
    477                           "core_output_filter: writing data to the network");
    478             c->aborted = 1;
    479         }
    480         setaside_remaining_output(f, ctx, bb, c);

setaside_remaining_output()は、bucket brigade bbの未出力のデータを buffere_bbに退避する。
未出力データがあった場合には、識別用にconn_recのdata_in_output_filtersに1がセットされる。

送信処理が実行されると、このCORE出力関数からは復帰する。

    481         return rv;
    482     }

ここまでが1)の処理となる。

以下でいくつかの送信判定用の変数値が初期化される。

    483
    484     bytes_in_brigade = 0;
    485     non_file_bytes_in_brigade = 0;
    486     eor_buckets_in_brigade = 0;
    487     morphing_bucket_in_brigade = 0;

bytes_in_brigade bucket brigade内のデータバケットの lengthの合計を格納する(lengthが-1のものを除く)
non_file_bytes_in_brigade bucket brigade内のデータバケット(FILEを除く)の lengthの合計を格納する(lengthが-1のものを除く)
eor_buckets_in_brigade EORメタデータバケットの数をカウントする
morphing_bucket_in_brigade bucket brigade内のデータバケットでlengthが -1のものがあった場合に1を立てるフラグ


    488
    489     for (bucket = APR_BRIGADE_FIRST(bb); bucket != APR_BRIGADE_SENTINEL(bb);
    490          bucket = next) {
    491         next = APR_BUCKET_NEXT(bucket);

ここで、bufferd_bbとnew_bbが連結された bucket brigade bb 内のbucketが順に処理される。

    492
    493         if (!APR_BUCKET_IS_METADATA(bucket)) {

bucketがデータタイプのバケットの場合

    494             if (bucket->length == (apr_size_t)-1) {

これはバケットのデータ長が事前には不明な場合で、
たとえば、SOCKETバケットやPIPEバケットなど、
読み込んでみないとデータ長が分からないバケットが該当する。

    495                 /*
    496                  * A setaside of morphing buckets would read everything into
    497                  * memory. Instead, we will flush everything up to and
    498                  * including this bucket.
    499                  */
    500                 morphing_bucket_in_brigade = 1;
    501             }
    502             else {

バケットのデータ長が分かっている場合は、bytes_in_brigadeに値を足しこむ。

    503                 bytes_in_brigade += bucket->length;
    504                 if (!APR_BUCKET_IS_FILE(bucket))

特にバケットがFILEバケットでない場合に、non_file_bytes_in_brigadeに値を足しこむ

    505                     non_file_bytes_in_brigade += bucket->length;
    506             }
    507         }
    508         else if (AP_BUCKET_IS_EOR(bucket)) {

bucketが EORメタデータバケットの場合
eor_buckets_in_brigadeをカウントアップする。

    509             eor_buckets_in_brigade++;
    510         }
    511

2) 上記のbucketのチェックを終えて、ここまでのデータ(bucket列)を実際に送信するかどうかの判定を行う。

    512         if (APR_BUCKET_IS_FLUSH(bucket)
    513             || non_file_bytes_in_brigade >= THRESHOLD_MAX_BUFFER
    514             || morphing_bucket_in_brigade
    515             || eor_buckets_in_brigade > MAX_REQUESTS_IN_PIPELINE) {
    516             /* this segment of the brigade MUST be sent before returning. */

このif文の条件を満たすと、ここまでのbucketの送信を実行する。
下記でflush_uptoにセットされるnextが送信停止位置を示すことになる。
条件は以下のいずれかが真の場合だ。

(a)FLUSHメタデータバケットがあった場合
(b)FILEではないデータバケットのlengthの合計(non_file_bytes_in_brigade)がTHRESHOLD_MAX_BUFFER(64KB)以上になった場合

    364 #define THRESHOLD_MAX_BUFFER 65536

(d)データバケットにlengthが-1のものがあった場合(morphing_bucket_in_brigade==1)
(c)EORメタデータバケットがMAX_REQUESTS_IN_PIPELINE(5)個以上処理された場合

(eor_buckets_in_brigade)
    365 #define MAX_REQUESTS_IN_PIPELINE 5


    517
    518             if (APLOGctrace6(c)) {
    519                 char *reason = APR_BUCKET_IS_FLUSH(bucket) ?
    520                                "FLUSH bucket" :
    521                                (non_file_bytes_in_brigade >= THRESHOLD_MAX_BUFFER) ?
    522                                "THRESHOLD_MAX_BUFFER" :
    523                                morphing_bucket_in_brigade ? "morphing bucket" :
    524                                "MAX_REQUESTS_IN_PIPELINE";
    525                 ap_log_cerror(APLOG_MARK, APLOG_TRACE6, 0, c,
    526                               "core_output_filter: flushing because of %s",
    527                               reason);
    528             }
    529             /*
    530              * Defer the actual blocking write to avoid doing many writes.
    531              */
    532             flush_upto = next;

flush_uptoに値がセットされた場合、以降の処理で、先頭からこの直前(ここで処理されているbucket)までのbucket列を出力することになる(3)。

ここで、いったん送信条件をリセットする。

    533
    534             bytes_in_brigade = 0;
    535             non_file_bytes_in_brigade = 0;
    536             eor_buckets_in_brigade = 0;
    537             morphing_bucket_in_brigade = 0;
    538         }
    539     }
    540
    541     if (flush_upto != NULL) {

3) flush_uptoがNULLでない場合、ここまでのデータ(bucket列)の送信が実行される。

    542         ctx->tmp_flush_bb = apr_brigade_split_ex(bb, flush_upto,
    543                                                  ctx->tmp_flush_bb);

まず、flush_uptoから末尾までのbucket列を分割し、コンテキスト情報のtmp_flush_bbに退避する。

    544         rv = send_brigade_blocking(net->client_socket, bb,
    545                                    &(ctx->bytes_written), c);

そして、残った先頭からfulsh_uptoの前までのbucketからなる bucket brigade bb をブロックモードで送信する。

ブロックモードでの送信処理を辿ると、その内部で非ブロックモードの送信処理send_brigade_nonblocking()を呼び出している。
非ブロックでの送信処理の実行結果が、エラーで、エラーコードがEAGAIN/EWOULDBLOCK の場合に、poll関数で、書込み可能になるまで待機する(Timeoutの設定が待ち時間の上限)処理が追加されている。

    546         if (rv != APR_SUCCESS) {
    547             /* The client has aborted the connection */
    548             ap_log_cerror(APLOG_MARK, APLOG_TRACE1, rv, c,
    549                           "core_output_filter: writing data to the network");
    550             c->aborted = 1;
    551             return rv;
    552         }
    553         APR_BRIGADE_CONCAT(bb, ctx->tmp_flush_bb);

送信処理を終えると、tmp_flush_bbに退避しておいた残りのbucket列を bb に繋ぎなおしている。

    554     }
    555
    556     if (bytes_in_brigade >= THRESHOLD_MIN_WRITE) {

4) FILEを含むデータバケットのlengthの合計が、THRESHOLD_MIN_WRITE(4KB)以上になった場合にも出力処理を実行する。

    363 #define THRESHOLD_MIN_WRITE 4096


ここでは、出力は非ブロックモードで行われる。

    557         rv = send_brigade_nonblocking(net->client_socket, bb,
    558                                       &(ctx->bytes_written), c);
    559         if ((rv != APR_SUCCESS) && (!APR_STATUS_IS_EAGAIN(rv))) {
    560             /* The client has aborted the connection */
    561             ap_log_cerror(APLOG_MARK, APLOG_TRACE1, rv, c,
    562                           "core_output_filter: writing data to the network");
    563             c->aborted = 1;
    564             return rv;
    565         }
    566     }
    567


    568     setaside_remaining_output(f, ctx, bb, c);

未送信データが存在した場合buffered_bbに保持しておいて、識別用にconn_recのdata_in_output_filtersに1をセットする。

    569     return APR_SUCCESS;
    570 }


ここまでが、CORE出力関数となる。
実際のソケットの送信処理は、下記の関数内で実行されている。
いずれも server/core_filters.c ファイル内でstatic関数として定義されている。
概要は上述した通り。

  • send_brigade_nonblocking
  • send_brigade_blocking

送信の実体は、sendfile()であったり、バケットのデータをI/Oベクトルに収めてのwritev()であったりする。
これは、別途コードを追う予定。

2014年11月4日火曜日

リクエスト処理の流れ: eventMPM コネクション処理フロー

コネクション処理フロー

worker_thread()、process_socket()とlistener_thread()の処理を概観して、コネクションのconn_state_e情報と処理の流れを図にした。



accept待ちのListenソケットがlistener_threadで接続を受け付けて、workerスレッドのprocess_socketでのリクエスト処理/出力処理を経て、ソケットを閉じたり、次リクエスト受け付けを待つという流れになっている。
青い四角がworkerスレッドの処理、黄色い四角がlistenerスレッドの処理。ソケットクローズは、listenerスレッドで行われる。ただ、これは正常系の場合で、既に見てきているように、ソケットのクローズは、エラー時などには別のタイミングでも実行されている。

SUSPENDED状態を持つタイムアウト処理は、いまのところmod_dialupにしか実装は見当たらない。
タイムアウト処理をSUSPENDED状態の中で繰り返し、処理を終えると、ソケットを閉じる処理に向かっている。


2014年10月27日月曜日

リクエスト処理の流れ: listener_thread(5) タイムアウトキューの処理

(14.4) イベント処理のつづき。

(14.5) タイムアウトキューの処理


次は、4つ用意されているタイムアウトキューに対する、タイムアウト処理だ。
タイムアウトに至る前にイベントが発生して、1655行目までのソケットイベント処理で消化されれば
特に何も行われない。
タイムアウトに至るまでに、ソケットの読み込み/書き出しが可能とならなかった場合に、それらソケットが次の処理の対象となる。

   1656
   1657         /* XXX possible optimization: stash the current time for use as
   1658          * r->request_time for new requests
   1659          */
   1660         now = apr_time_now();

まず、現在時刻を取得する。単位はマイクロ秒だ。

   1661         /* we only do this once per 0.1s (TIMEOUT_FUDGE_FACTOR) */
   1662         if (now > timeout_time) {

timeout_timeは初期値は0なので、最初はこのif文に必ず入る。
そして、timeout_timeに現在時刻のTIMEOUT_FUDGE_FACTOR後にセットしている(1664行目)。
定義を見ると、100000マイクロ秒なので、0.1秒。

   1396 #define TIMEOUT_FUDGE_FACTOR 100000

おおむね、0.1秒以上間隔をあけてチェックするということだろう。

   1663             struct process_score *ps;
   1664             timeout_time = now + TIMEOUT_FUDGE_FACTOR;
   1665
   1666             /* handle timed out sockets */
   1667             apr_thread_mutex_lock(timeout_mutex);
   1668
   1669             /* Step 1: keepalive timeouts */
   1670             /* If all workers are busy, we kill older keep-alive connections so that they
   1671              * may connect to another process.
   1672              */

(1)KeepAliveタイムアウト

keepalive_qキューを処理する。

   1673             if (workers_were_busy && keepalive_q.count) {

この時点でworkers_were_busyが1の場合(アイドルworkerスレッドがいない場合)で、
KeepAliveタイムアウトキューに登録がある場合(次のリクエストを待っている場合)、
現在時刻(+0.1秒)からKeepAliveTimeOut先までのタイムアウトイベントを先取りして
リンガリングクローズを実行している。
上記のコメントによれば、すべてのworkerがビジーなら、古いKeep-Alive接続を終了し、
クライアントが必要になった時点で別のプロセスに接続できるようにするということだ。

しかし、KeepAliveTimeOut先でこのkeepalive_qキューのタイムアウトは登録されるのだから、
古いというか、おそらくこの時点で登録済みのすべてのKeepAlive待ちイベントが
全てstart_lingering_close_nonblocking()関数処理されると考えられる。

start_lingering_close_nonblocking()関数は、
既に見た
start_lingering_close_blocking
と異なり、shutdown(SHUT_WR)の前に未出力のデータのフラッシュを試みない。
それ以外は同様で、この処理によって、ソケットはLINGER待ちになりlinger_qキュー、またはshort_linger_qキューに
登録される。

   1674                 ap_log_error(APLOG_MARK, APLOG_TRACE1, 0, ap_server_conf,
   1675                              "All workers are busy, will close %d keep-alive "
   1676                              "connections",
   1677                              keepalive_q.count);
   1678                 process_timeout_queue(&keepalive_q,
   1679                                       timeout_time + ap_server_conf->keep_alive_timeout,
   1680                                       start_lingering_close_nonblocking);
   1681             }
   1682             else {

こちらは、通常の場合で、
timeout_time(現在時刻+0.1秒)時点でタイムアウトしているタイムアウトイベントを
同じく、start_lingering_close_nonblocking()関数処理している。


   1683                 process_timeout_queue(&keepalive_q, timeout_time,
   1684                                       start_lingering_close_nonblocking);
   1685             }
   1686             /* Step 2: write completion timeouts */

(2)書き込み完了タイムアウト


ここは、write_completion_qキューを処理する。
タイムアウト時間が来ていた場合に行う処理は、start_lingering_close_nonblocking()で、
これはKeepAlive待ちと同じだ。
通常は、shutdown(SHUT_WR)が行われ、linger_qキューに登録される。

   1687             process_timeout_queue(&write_completion_q, timeout_time,
   1688                                   start_lingering_close_nonblocking);
   1689             /* Step 3: (normal) lingering close completion timeouts */

(3)リンガリングクローズタイムアウト(標準)

linger_qキューの処理だ。
ここでのタイムアウト時の処理は、stop_lingering_close()だ。
この処理は、ソケットをクローズする。

   1690             process_timeout_queue(&linger_q, timeout_time, stop_lingering_close);
   1691             /* Step 4: (short) lingering close completion timeouts */

(4)リンガリングクローズタイムアウト(短時間)

short_linger_qキューの処理だ。
ここでのタイムアウト時の処理も、linger_qキューと同じで、stop_lingering_close()だ。
ソケットをクローズする。

   1692             process_timeout_queue(&short_linger_q, timeout_time, stop_lingering_close);
   1693

ここまででタイマーキューの処理は終わりになる。

以下は、いくつかの管理情報の更新処理だ。

   1694             ps = ap_get_scoreboard_process(process_slot);
   1695             ps->write_completion = write_completion_q.count;
   1696             ps->keep_alive = keepalive_q.count;
   1697             apr_thread_mutex_unlock(timeout_mutex);
   1698
   1699             ps->connections = apr_atomic_read32(&connection_count);
   1700             ps->suspended = apr_atomic_read32(&suspended_count);
   1701             ps->lingering_close = apr_atomic_read32(&lingering_count);
   1702         }
   1703         if (listeners_disabled && !workers_were_busy
   1704             && (int)apr_atomic_read32(&connection_count)
   1705                - (int)apr_atomic_read32(&lingering_count)
   1706                < ((int)ap_queue_info_get_idlers(worker_queue_info) - 1)
   1707                  * worker_factor / WORKER_FACTOR_SCALE + threads_per_child)
   1708         {
   1709             listeners_disabled = 0;
   1710             enable_listensocks(process_slot);
   1711         }
   1712         /*
   1713          * XXX: do we need to set some timeout that re-enables the listensocks
   1714          * XXX: in case no other event occurs?
   1715          */
   1716     }     /* listener main loop */

ここまでで主ループは終わる。

listenerスレッドの終了処理では、Listenソケットをクローズし、workerスレッド向けのメッセージキューに終了通知を登録する。

   1717
   1718     close_listeners(process_slot, &closed);
   1719     ap_queue_term(worker_queue);
   1720
   1721     apr_thread_exit(thd, APR_SUCCESS);
   1722     return NULL;
   1723 }

これで、listener_threadの処理を終わる。

リクエスト処理の流れ: listener_thread(4) イベント処理

(14.3)タイムアウト処理の続き。

(14.4) イベント処理


次が、イベントチェックのepoll_waitで検知したソケットイベントを処理するwhileループだ。
numにはepoll_wait()で取得した準備済ソケット数が入っている。

   1495         while (num) {
   1496             pt = (listener_poll_type *) out_pfd->client_data;

listener_poll_type構造体 ptは、apr_pollset_add()で登録されたapr_pollfd_t構造体のclient_dataにセットされている。

(a) 確立済みソケットの処理


   1497             if (pt->type == PT_CSD) {

こちらのif文は、処理可能なソケットが、確立済みのソケットの場合だ。
workerスレッドのprocess_socketで作成されたevent_conn_state_t構造体 cs の apr_pollfd_t構造体の変数pfdが、apr_pollset_add(event_pollset, &cs->pfd) と第2引数に与えられている。これがout_pfd->client_dataを介して引き渡される。
KeepAliveの次メッセージ待ち、書込み可能待ち、LINGER待ち(標準時間のものと短時間のもの)が該当する。

   1498                 /* one of the sockets is readable */
   1499                 struct timeout_queue *remove_from_q = &write_completion_q;
   1500                 int blocking = 1;

ここで、event_conn_state_t情報のステータス別に処理が分岐する。
event_conn_state_t情報は、listener_poll_type情報のvoid*変数 batonに保持されている。

   1501                 cs = (event_conn_state_t *) pt->baton;
   1502                 switch (cs->pub.state) {
   1503                 case CONN_STATE_CHECK_REQUEST_LINE_READABLE:

これは、次リクエスト待ちのevent_conn_state_tだ。

   1504                     cs->pub.state = CONN_STATE_READ_REQUEST_LINE;
   1505                     remove_from_q = &keepalive_q;
   1506                     /* don't wait for a worker for a keepalive request */

blockingを0にする。
blockingはget_worker()でアイドルworkerスレッドをチェックしたときに、アイドルworkerスレッドがない場合、アイドルworkerスレッドができるまでそこで処理をブロックするかしないかのフラグだ。
1500行目で1にセットされているが、ここで0(ブロックしない)ように変更される。

   1507                     blocking = 0;
   1508                     /* FALL THROUGH */

ここではbreakしない。下記のcase文に処理が継続する。

   1509                 case CONN_STATE_WRITE_COMPLETION:

これは、書込み可能待ちのevent_conn_state_tだ。

下記のget_worker()でアイドル状態のworkerスレッドの存否をチェックする。
CONN_STATE_WRITE_COMPLETIONの場合は、blockingが1なので、ここでブロックされてしまう。
CONN_STATE_CHECK_REQUEST_LINE_READABLEの場合は、アイドルworkerスレッドがない場合にもすぐに抜けてくる。
この場合、workers_were_busyは1になる。have_idle_workerは0だ。

   1510                     get_worker(&have_idle_worker, blocking,
   1511                                &workers_were_busy);

keepalive_qキュー(CONN_STATE_CHECK_REQUEST_LINE_READABLEの場合)、
または、write_completion_qキュー(CONN_STATE_WRITE_COMPLETIONの場合)からタイムアウト情報が削除される。
KeepAliveの場合、上記の通りブロックされないので、アイドルworkerスレッドがない場合にも削除される。

   1512                     apr_thread_mutex_lock(timeout_mutex);
   1513                     TO_QUEUE_REMOVE(*remove_from_q, cs);
   1514                     rc = apr_pollset_remove(event_pollset, &cs->pfd);
   1515
   1516                     /*
   1517                      * Some of the pollset backends, like KQueue or Epoll
   1518                      * automagically remove the FD if the socket is closed,
   1519                      * therefore, we can accept _SUCCESS or _NOTFOUND,
   1520                      * and we still want to keep going
   1521                      */
   1522                     if (rc != APR_SUCCESS && !APR_STATUS_IS_NOTFOUND(rc)) {
   1523                         ap_log_error(APLOG_MARK, APLOG_ERR, rc, ap_server_conf,
   1524                                      "pollset remove failed");
   1525                         apr_thread_mutex_unlock(timeout_mutex);
   1526                         start_lingering_close_nonblocking(cs);
   1527                         break;
   1528                     }
   1529
   1530                     apr_thread_mutex_unlock(timeout_mutex);
   1531                     TO_QUEUE_ELEM_INIT(cs);
   1532                     /* If we didn't get a worker immediately for a keep-alive
   1533                      * request, we close the connection, so that the client can
   1534                      * re-connect to a different process.
   1535                      */
   1536                     if (!have_idle_worker) {

アイドルworkerスレッドはいなかった場合(この条件はブロックされないkeep-alive待ちの場合に発生する可能性がある)、コネクションを直ちに閉じる。
クライアント側はこれによって、別のプロセスに再接続するチャンスができる。

   1537                         start_lingering_close_nonblocking(cs);
   1538                         break;

ここで、breakするので、switch文から抜ける。

   1539                     }

下記は、このイベントをworkerスレッドとの通信用のメッセージキューに登録している。
out_pfdから out_pfd->client_dataに登録されている listener_poll_type情報の、そのbaton情報に登録されている。
event_conn_state_t情報を取得し、この情報(apr_socket_t、event_conn_state_t、メモリプール情報)をworker_queeに登録する。
workerスレッド側では、これを、1486行目のpush_timer2worker()のものと同じように、ap_queue_pop_something()で受け取る。

   1540                     rc = push2worker(out_pfd, event_pollset);
   1541                     if (rc != APR_SUCCESS) {
   1542                         ap_log_error(APLOG_MARK, APLOG_CRIT, rc,
   1543                                      ap_server_conf, "push2worker failed");
   1544                     }
   1545                     else {
   1546                         have_idle_worker = 0;
   1547                     }

workerへのキューにイベントを登録したら、breakでswitch文を抜ける。
正常終了時にはhave_idle_workerの値を初期化(0)している。

   1548                     break;


   1549                 case CONN_STATE_LINGER_NORMAL:

これは、LINGER待ちのevent_conn_state_tだ。

   1550                 case CONN_STATE_LINGER_SHORT:

これは、LINGER待ち(短時間)のevent_conn_state_tだ。
LINGER待ちの場合、標準の待ち時間の場合も、短時間の場合も、実施する処理が同じで、
ともに、process_lingering_close()だ。

この処理では受信データの空読みを実行する。
全てデータを読み終えても、EOF等にならなければ(EAGAIN)、処理をそこで終える(引き続き読込可能となるのを待つ)。
EOF等が検知されれば、ソケットを閉じる。
また、short_linger_qキューまたは、linger_qキューのタイムアウト情報も削除される。
この処理は、このlistenrスレッドの処理内で直接実行されている。

   1551                     process_lingering_close(cs, out_pfd);
   1552                     break;
   1553                 default:

その他の状態は定義されていない。
エラーだ。

   1554                     ap_log_error(APLOG_MARK, APLOG_CRIT, rc,
   1555                                  ap_server_conf,
   1556                                  "event_loop: unexpected state %d",
   1557                                  cs->pub.state);
   1558                     ap_assert(0);
   1559                 }

ここまでが、workerスレッドの処理で再登録された監視対象ソケットに対するイベント処理だ。

   1560             }

(b) Listenソケットの処理

   1561             else if (pt->type == PT_ACCEPT) {

こちらのif文は、監視対象のソケットが、listenしているサーバソケットの場合だった場合の処理だ。
ここではacceptが行われる。

   1562                 /* A Listener Socket is ready for an accept() */
   1563                 if (workers_were_busy) {

get_worker(非ブロックモード)で処理した場合、アイドルworkerスレッドがなければ、workers_were_busyに1がセットされている。
初期値は0なので、通常はこのif文には入らない。
ここに入る直前のget_worker()処理は、1495行目からのwhile()文のループで、前の周回で行われた処理と考えられる。
ここは、前回のチェックで、アイドルworkerスレッドがなかった場合の処理だ。

アイドルworkerスレッドがない状態の場合、新たな接続を受け入れない。
そのため、disable_listensocks()でイベント監視対象からListenソケットを取り除く。
そして、listeners_disabled変数に1をセットする。

   1564                     if (!listeners_disabled)
   1565                         disable_listensocks(process_slot);
   1566                     listeners_disabled = 1;
   1567                     ap_log_error(APLOG_MARK, APLOG_DEBUG, 0, ap_server_conf,
   1568                                  "All workers busy, not accepting new conns"
   1569                                  "in this process");
   1570                 }
   1571                 else if (  (int)apr_atomic_read32(&connection_count)
   1572                            - (int)apr_atomic_read32(&lingering_count)
   1573                          > threads_per_child
   1574                            + ap_queue_info_get_idlers(worker_queue_info) *
   1575                              worker_factor / WORKER_FACTOR_SCALE)

次のif文は、現に受け付けている接続数が、AsyncRequestWorkerFactorの条件範囲に収まるかどうかをチェックしている。

connection_count 現在の接続数
lingering_count 現在リンガリングクローズ中の接続数
(connection_countに含まれる)

この差は、アクティブな接続数となる。


threads_per_child ThreadsPerChildで指定されたworkerスレッド数
ap_queue_info_get_idlers(worker_queue_info) 現在のアイドルworkerスレッド数
worker_factor / WORKER_FACTOR_SCALE syncRequestWorkerFactorの設定値
(16分の1単位で近似されている)

    158 #define WORKER_FACTOR_SCALE   16  /* scale factor to allow fractional values */

この式は、受け入れ可能な接続の上限だ。

アクティブな接続数が受け入れ可能な接続の上限を超えた場合、これ以上新たな接続を受け入れない。
そのため、disable_listensocks()でイベント監視対象からListenソケットを取り除く。
そして、listeners_disabled変数に1をセットする。

   1576                 {
   1577                     if (!listeners_disabled)
   1578                         disable_listensocks(process_slot);
   1579                     ap_log_error(APLOG_MARK, APLOG_DEBUG, 0, ap_server_conf,
   1580                                  "Too many open connections (%u), "
   1581                                  "not accepting new conns in this process",
   1582                                  apr_atomic_read32(&connection_count));
   1583                     ap_log_error(APLOG_MARK, APLOG_TRACE1, 0, ap_server_conf,
   1584                                  "Idle workers: %u",
   1585                                  ap_queue_info_get_idlers(worker_queue_info));
   1586                     listeners_disabled = 1;
   1587                 }
   1588                 else if (listeners_disabled) {

それ以外の条件の場合、新たな接続を受け入れてよいので、
もし listeners_disabledに1がセットされていたら、0にリセットし、
enable_listensocks()でListenソケットをイベント監視対象に復帰させる。

   1589                     listeners_disabled = 0;
   1590                     enable_listensocks(process_slot);
   1591                 }
   1592                 if (!listeners_disabled) {

上記の処理を経て、ここでlisteners_disabled変数が0の場合、acceptを実行する。

   1593                     lr = (ap_listen_rec *) pt->baton;

lrは、ap_listen_rec情報だ。
この情報は、listenしているサーバソケットの情報を保持している。

listener_poll_type構造体 ptのtypeがPT_ACCEPTである情報は、
listener_thread関数の初期処理のひとつinit_pollset()関数内で作成される。
これは、Listenディレクティブで定義されているサーバソケットに対応して作成される。
ここで作成されるのは、apr_pollfd_t構造体の配列で、
このapr_pollfd_t情報のclient_data変数に、listener_poll_type情報が格納される。

    221 static apr_pollfd_t *listener_pollfd;

つまり、listener_pollfd[i].client_dataに、listener_poll_type情報は保持される。

そして、このlistener_poll_type情報のbaton変数に
ap_listen_rec情報がセットされる。
ap_listen_rec情報がまさに、Listenディレクティブの処理時に作成される情報だ。

そして、このap_listen_recのaccept_func変数に、同じくinit_pollset()関数内で
ap_unixd_accept関数ポインタがセットされている。

そして、このapr_pollfd_t情報が、
apr_pollset_add(event_pollset, &listener_pollfd[i])という形で、
apr_pollset_add()関数の第2引数に渡されて、監視対象に追加され、
1496行目で、pt変数に戻ってきている。

そして、ptのbaton変数に格納されているのが、ap_listen_rec情報だ。

   1594                     ap_pop_pool(&ptrans, worker_queue_info);

空いているトランザクションプールを取り出す。

   1595
   1596                     if (ptrans == NULL) {

このif文は空いているトランザクションプールがない場合に作成する処理だ。
トランザクションプールは、acceptするソケットごとに作成される。

   1597                         /* create a new transaction pool for each accepted socket */
   1598                         apr_allocator_t *allocator;
   1599
   1600                         apr_allocator_create(&allocator);
   1601                         apr_allocator_max_free_set(allocator,
   1602                                                    ap_max_mem_free);
   1603                         apr_pool_create_ex(&ptrans, pconf, NULL, allocator);
   1604                         apr_allocator_owner_set(allocator, ptrans);
   1605                         if (ptrans == NULL) {
   1606                             ap_log_error(APLOG_MARK, APLOG_CRIT, rc,
   1607                                          ap_server_conf,
   1608                                          "Failed to create transaction pool");
   1609                             signal_threads(ST_GRACEFUL);
   1610                             return NULL;
   1611                         }
   1612                     }
   1613                     apr_pool_tag(ptrans, "transaction");
   1614

そして、アイドルworkerスレッドを取得する
ここは第2引数が1なので、ブロックモードでアイドルworkerスレッドを確保することになる。
基本的には、workers_were_busyが0の場合、つまりアイドルworkerスレッドがある場合に、ここに入るので、ここではブロックモードではあるが、直ちにreturnすると期待されるだろう。

   1615                     get_worker(&have_idle_worker, 1, &workers_were_busy);

アイドルworkerスレッドが確保できたら、
次がaccept処理となる。
lr->accept_funcは、ap_unixd_accept()が実行される。
基本的にはapccet()が実行される。
合わせて、apr_socket_t情報が作成され、
このソケット情報を生成したメモリプール(ptrans)に対して、cleanup関数を登録している。
これは、socket_cleanup()関数である。
メモリプールやそのcleanup関数についてはまだ説明していないが、APRライブラリに実装されている。
やがては確認したいが、ここでは追わない。

   1616                     rc = lr->accept_func(&csd, lr, ptrans);
   1617
   1618                     /* later we trash rv and rely on csd to indicate
   1619                      * success/failure
   1620                      */
   1621                     AP_DEBUG_ASSERT(rc == APR_SUCCESS || !csd);
   1622
   1623                     if (rc == APR_EGENERAL) {

各種エラー時で、ここでは子プロセスのgracefulな終了を開始している。

   1624                         /* E[NM]FILE, ENOMEM, etc */
   1625                         resource_shortage = 1;
   1626                         signal_threads(ST_GRACEFUL);
   1627                     }
   1628
   1629                     if (csd != NULL) {

このif文が正常に、acceptが行われた場合の処理だ。

   1630                         conns_this_child--;

ここでは、MaxConnectionsPerChildのチェック用に、処理コネクション数をカウントダウンしている。

   1631                         rc = ap_queue_push(worker_queue, csd, NULL, ptrans);

そして、acceptした情報をworkerスレッドとの通信用のメッセージキューに登録している。
workerスレッド側では、acceptイベントを他のイベントと同様に ap_queue_pop_something()で取り出す。
ここでの1540行目との大きな違いは、event_conn_state_t情報がまだ作成されていないことだ。
ap_queue_push()関数は、第3引数がevent_conn_state_t情報だが、この値がNULLになっている。

process_socket関数の893行目

    893     if (cs == NULL) {           /* This is a new connection */

この変数csがNULLとなるのは、このケースだ。

   1632                         if (rc != APR_SUCCESS) {
   1633                             /* trash the connection; we couldn't queue the connected
   1634                              * socket to a worker
   1635                              */
   1636                             apr_socket_close(csd);
   1637                             ap_log_error(APLOG_MARK, APLOG_CRIT, rc,
   1638                                          ap_server_conf,
   1639                                          "ap_queue_push failed");
   1640                             apr_pool_clear(ptrans);
   1641                             ap_push_pool(worker_queue_info, ptrans);
   1642                         }
   1643                         else {

正常終了時にはhave_idle_workerの値を初期化(0)している。
このまま処理を継続する。

   1644                             have_idle_worker = 0;
   1645                         }
   1646                     }
   1647                     else {

こちらは、APR_EGENERAL以外のエラーのケースだ。
処理をこのまま継続している。

   1648                         apr_pool_clear(ptrans);
   1649                         ap_push_pool(worker_queue_info, ptrans);
   1650                     }
   1651                 }
   1652             }               /* if:else on pt->type */
   1653             out_pfd++;
   1654             num--;
   1655         }                   /* while for processing poll */

1414行目のfor()の末尾だ。
ここまでが、epoll_wait()でチェックしたソケットイベント処理である。

次は、タイムアウトキューの処理に進む。