2015年1月8日木曜日

CORE出力フィルタ: ソケット送信処理(2) writev系

CORE出力フィルタ: ソケット送信処理(1) の続きでwritev()系の
(1) writev_nonblocking()
とそこから呼ばれるAPRライブラリ関数の
(2) apr_socket_sendv()
を見る

(1)writev_nonblocking関数


    763 static apr_status_t writev_nonblocking(apr_socket_t *s,
    764                                        struct iovec *vec, apr_size_t nvec,
    765                                        apr_bucket_brigade *bb,
    766                                        apr_size_t *cumulative_bytes_written,
    767                                        conn_rec *c)

この関数呼び出し時に、
IOベクトル vec と IOベクトルと
bucket brigade bb内のデータバケットと対応付けられている。
対応付けられている IOベクトルとbucket brigadeの数は、 nvecで指定されている。

    768 {
    769     apr_status_t rv = APR_SUCCESS, arv;
    770     apr_size_t bytes_written = 0, bytes_to_write = 0;
    771     apr_size_t i, offset;
    772     apr_interval_time_t old_timeout;
    773
    774     arv = apr_socket_timeout_get(s, &old_timeout);

設定されているソケットタイムアウトの情報を取得する→old_timeout

    775     if (arv != APR_SUCCESS) {
    776         return arv;
    777     }
    778     arv = apr_socket_timeout_set(s, 0);

ソケットタイムアウトを0にする

    779     if (arv != APR_SUCCESS) {
    780         return arv;
    781     }
    782

bytes_to_write: 送信するデータサイズの計算。

    783     for (i = 0; i < nvec; i++) {
    784         bytes_to_write += vec[i].iov_len;
    785     }

offset: IOベクトルのインデックスをセットする。初期値が0。

    786     offset = 0;

bytes_written: 送信済データサイズ。初期値0。
以下のwhileループでは、
apr_socket_sendv()で送信処理を行い、
送信したデータバケットを削除する処理を
送信済みデータ(bytes_written)が送信データサイズ(bytes_to_write)になるまで繰り返している。

    787     while (bytes_written < bytes_to_write) {
    788         apr_size_t n = 0;
    789         rv = apr_socket_sendv(s, vec + offset, nvec - offset, &n);
    790         if (n > 0) {

nバイトデータ送信実行時の経路

    791             bytes_written += n;

送信済みデータサイズをカウントアップする。

    792             for (i = offset; i < nvec; ) {

最初のvec[offset]は、789行目で送信されたIOベクトルの先頭。
処理の最初では、これはvec[0]。

    793                 apr_bucket *bucket = APR_BRIGADE_FIRST(bb);
    794                 if (APR_BUCKET_IS_METADATA(bucket)) {

bucket brigadeのうち、メタデータバケットは無視してよい。
ここでbucket brigadeから切り離し、削除する。

    795                     APR_BUCKET_REMOVE(bucket);
    796                     apr_bucket_destroy(bucket);
    797                 }
    798                 else if (n >= vec[i].iov_len) {

IOベクトル vec の先頭要素のデータ長が、今回送信されたデータ長nと同じか小さい場合。
対応するbucketの持つデータはすべて送信済みなので、削除する。

    799                     APR_BUCKET_REMOVE(bucket);
    800                     apr_bucket_destroy(bucket);

次の送信済みデータを持つbucketおよび対応するIOベクトルを処理するため、offsetを+1する。

    801                     offset++;

残るデータ長を計算する。
このIOベクトル要素に格納されていたデータ長を送信済みデータ長から差し引く。
そして、792行目のforループ先頭に戻る。

    802                     n -= vec[i++].iov_len;
    803                 }
    804                 else {

IOベクトル vec の先頭要素のデータ長が、今回送信されたデータ長nより大きい場合。
この場合、処理対象のデータバケットに未送信のデータが残っている。

送信済みのデータ(nバイト)と未送信のデータとにバケットを切り離す。

    805                     apr_bucket_split(bucket, n);

送信済みのデータのバケット(元の変数bucketに残った方)を切り離し、削除する。

    806                     APR_BUCKET_REMOVE(bucket);
    807                     apr_bucket_destroy(bucket);

処理中のIOベクトルの長さと、データの先頭位置を調整する。

    808                     vec[i].iov_len -= n;
    809                     vec[i].iov_base = (char *) vec[i].iov_base + n;

これで、送信済みのnバイト分のデータバケットの削除が完了する。

    810                     break;
    811                 }
    812             }
    813         }
    814         if (rv != APR_SUCCESS) {
    815             break;
    816         }
    817     }
    818     if ((ap__logio_add_bytes_out != NULL) && (bytes_written > 0)) {
    819         ap__logio_add_bytes_out(c, bytes_written);

ap__logio_add_bytes_outはlogioモジュールが提供するオプション関数だ。
ログ書式%Oのための出力バイト数を集計する。

    820     }
    821     *cumulative_bytes_written += bytes_written;

送信済みデータサイズ(bytes_written)を引数の値に加算する。

    822
    823     arv = apr_socket_timeout_set(s, old_timeout);

ソケットタイムアウトを元に戻す(774行目に取得したold_timeout)。

    824     if ((arv != APR_SUCCESS) && (rv == APR_SUCCESS)) {
    825         return arv;
    826     }
    827     else {
    828         return rv;
    829     }
    830 }


(2)apr_socket_sendv関数

この関数は、writev_nonblocking()から使用されている。
その際には、ソケット情報のtimeout(sock->timeout)を0に変更して使用している。
pollでの書込み可否判定と、writeに戻り値のEAGAIN/EWOULDBLOCKの両方をチェックして、全体としては、timeoutの時間は処理内で書き込み可能になるのを待つようになっている。

(apr-1.5.1/network_io/unix/sendrecv.c)

    194 apr_status_t apr_socket_sendv(apr_socket_t * sock, const struct iovec *vec,
    195                               apr_int32_t nvec, apr_size_t *len)
    196 {
    197 #ifdef HAVE_WRITEV
    198     apr_ssize_t rv;
    199     apr_size_t requested_len = 0;
    200     apr_int32_t i;
    201

送信データ長を計算する

    202     for (i = 0; i < nvec; i++) {
    203         requested_len += vec[i].iov_len;
    204     }
    205

ソケット情報のoptionsをチェックしてAPR_INCOMPLETE_WRITEフラグが立っている場合には、
フラグを消してから、poll処理(219行目)にジャンプする。

    206     if (sock->options & APR_INCOMPLETE_WRITE) {

APR_INCOMPLETE_WRITEフラグは、前回のapr_socket_sendv()処理で、クライアントが書き込み
可能にならないままだったケース(timeout時間待っても書き込み可能にならず、送信データが
残ってしまった場合)にONになる。ここでも、最初に書込み可能かどうかをチェックしている。

なお、writev_nonblocking()から呼ばれている場合、ソケット情報のtimeoutは0にセットされているので、前回のapr_socket_sendv()も同じ条件なら、ここには入らない
(timeout>0である場合に使用されるフラグ)

    207         sock->options &= ~APR_INCOMPLETE_WRITE;
    208         goto do_select;
    209     }
    210

writev()を実行する。
この処理は、EINTRエラーの場合(シグナルで割りこまれた場合)は、処理を再実行する。

    211     do {
    212         rv = writev(sock->socketdes, vec, nvec);
    213     } while (rv == -1 && errno == EINTR);
    214

以下のwhile()ループは
 (1)rv==-1(writevがエラー)
かつ
 (2) エラーがEAGAINかEWOULDBLOCKの場合(書込可能でなかった場合)
かつ
 (3)ソケットのtimeout情報が >0 の場合
に処理を繰り返すことになっている。

つまり、timeout>0の場合に、writevが書込み可能でなかったら繰り返されている。
writev_nonblocking()から呼ばれている場合は、timeout=0なので、この条件は満たさない
timeout>0がセットされている場合にも、このループ内だけ見るとまず、219行目の
apr_wait_for_io_or_timeout()内でソケットが書き込み可能になるのを待つので、
errnoがEAGAIN/EWOULDBLOCKとなってループするケースはないと考えられる。

この条件が必要なのは、212行目のwritev()で書込み可能でなかった場合と考えられる。
その場合、timeut>0であれば、書込み可能になるまで219行目で待ち、
書込み可能になれば、226行目で書き込まれる。

    215     while ((rv == -1) && (errno == EAGAIN || errno == EWOULDBLOCK)
    216                       && (sock->timeout > 0)) {
    217         apr_status_t arv;
    218 do_select:

まず、ソケットが書き込み可能かどうかをチェックする。
poll関数で、sock->socketdesをチェックする。
この際timeoutにsock->timeoutをセットするが、上述の通り、apr_socket_sendv()から呼ばれている場合は、このtimeoutは0となっている(書込み可能でなければ直ちにrerurnする)。

    219         arv = apr_wait_for_io_or_timeout(NULL, sock, 0);
    220         if (arv != APR_SUCCESS) {

エラーのほか、
タイムアウトの場合、apr_wait_for_io_or_timeoutはAPR_TIMEUPを返すのでこの経路となる。
タイムアウトが0の場合は、書込み可能でなければ、直ちにこの経路となる。

    221             *len = 0;
    222             return arv;
    223         }
    224         else {

書込み可能だった場合に、writev()を実行する。
ここでも、シグナル割り込みは無視される(writev()を再実行する)。

    225             do {
    226                 rv = writev(sock->socketdes, vec, nvec);
    227             } while (rv == -1 && errno == EINTR);
    228         }
    229     }

writevエラーの場合、エラーコードを返す。

    230     if (rv == -1) {
    231         *len = 0;
    232         return errno;
    233     }

timeout>0が指定されており、実際の送信済データ長が、リクエストされた送信データ長を満たさなかった場合に、ソケット情報のoptionsにAPR_INCOMPLETE_WRITEフラグを立てている。


    234     if ((sock->timeout > 0) && (rv < requested_len)) {
    235         sock->options |= APR_INCOMPLETE_WRITE;
    236     }

送信したデータ長を *lenにセットし、成功でreturnする。

    237     (*len) = rv;
    238     return APR_SUCCESS;
    239 #else
    240     *len = vec[0].iov_len;
    241     return apr_socket_send(sock, vec[0].iov_base, len);
    242 #endif
    243 }

以上から、apr_socket_sendv()では、writev()を1回実行してreturnする。

0 件のコメント:

コメントを投稿