mirror of https://github.com/nginx/nginx.git
HTTP/2: refactor proxy filter functions to reduce code duplication.
This commit is contained in:
parent
2501a9b9da
commit
cf1dd40948
|
@ -121,6 +121,10 @@ static ngx_int_t ngx_http_v2_proxy_non_buffered_filter(void *data,
|
|||
ssize_t bytes);
|
||||
static ngx_int_t ngx_http_v2_proxy_body_filter(ngx_event_pipe_t *p,
|
||||
ngx_buf_t *buf);
|
||||
static ngx_int_t ngx_http_v2_proxy_process_frames(ngx_http_request_t *r,
|
||||
ngx_http_v2_proxy_ctx_t *ctx, ngx_buf_t *b);
|
||||
static ngx_int_t ngx_http_v2_proxy_output_data(ngx_http_request_t *r,
|
||||
ngx_http_v2_proxy_ctx_t *ctx, ngx_buf_t *in, ngx_buf_t *out);
|
||||
|
||||
static ngx_int_t ngx_http_v2_proxy_parse_frame(ngx_http_request_t *r,
|
||||
ngx_http_v2_proxy_ctx_t *ctx, ngx_buf_t *b);
|
||||
|
@ -1709,21 +1713,20 @@ ngx_http_v2_proxy_non_buffered_filter(void *data, ssize_t bytes)
|
|||
ngx_http_v2_proxy_ctx_t *ctx = data;
|
||||
|
||||
ngx_int_t rc;
|
||||
ngx_buf_t *b, *buf;
|
||||
ngx_buf_t *buf, *b;
|
||||
ngx_chain_t *cl, **ll;
|
||||
ngx_table_elt_t *h;
|
||||
ngx_http_request_t *r;
|
||||
ngx_http_upstream_t *u;
|
||||
|
||||
r = ctx->request;
|
||||
u = r->upstream;
|
||||
b = &u->buffer;
|
||||
buf = &u->buffer;
|
||||
|
||||
ngx_log_debug1(NGX_LOG_DEBUG_HTTP, r->connection->log, 0,
|
||||
"proxy http2 filter bytes:%z", bytes);
|
||||
|
||||
b->pos = b->last;
|
||||
b->last += bytes;
|
||||
buf->pos = buf->last;
|
||||
buf->last += bytes;
|
||||
|
||||
for (cl = u->out_bufs, ll = &u->out_bufs; cl; cl = cl->next) {
|
||||
ll = &cl->next;
|
||||
|
@ -1731,419 +1734,22 @@ ngx_http_v2_proxy_non_buffered_filter(void *data, ssize_t bytes)
|
|||
|
||||
for ( ;; ) {
|
||||
|
||||
if (ctx->state < ngx_http_v2_proxy_st_payload) {
|
||||
rc = ngx_http_v2_proxy_process_frames(r, ctx, buf);
|
||||
|
||||
rc = ngx_http_v2_proxy_parse_frame(r, ctx, b);
|
||||
|
||||
if (rc == NGX_AGAIN) {
|
||||
|
||||
if (ctx->done) {
|
||||
|
||||
if (ctx->length > 0) {
|
||||
ngx_log_error(NGX_LOG_ERR, r->connection->log, 0,
|
||||
"upstream prematurely closed stream");
|
||||
return NGX_ERROR;
|
||||
}
|
||||
|
||||
/*
|
||||
* We have finished parsing the response and the
|
||||
* remaining control frames. If there are unsent
|
||||
* control frames, post a write event to send them.
|
||||
*/
|
||||
|
||||
if (ctx->out) {
|
||||
ngx_post_event(u->peer.connection->write,
|
||||
&ngx_posted_events);
|
||||
return NGX_AGAIN;
|
||||
}
|
||||
|
||||
u->length = 0;
|
||||
|
||||
if (ctx->in == NULL
|
||||
&& ctx->output_closed
|
||||
&& !ctx->output_blocked
|
||||
&& !ctx->goaway
|
||||
&& ctx->state == ngx_http_v2_proxy_st_start)
|
||||
{
|
||||
u->keepalive = 1;
|
||||
}
|
||||
|
||||
break;
|
||||
}
|
||||
|
||||
return NGX_AGAIN;
|
||||
}
|
||||
|
||||
if (rc == NGX_ERROR) {
|
||||
return NGX_ERROR;
|
||||
}
|
||||
|
||||
if ((ctx->type == NGX_HTTP_V2_CONTINUATION_FRAME
|
||||
&& !ctx->parsing_headers)
|
||||
|| (ctx->type != NGX_HTTP_V2_CONTINUATION_FRAME
|
||||
&& ctx->parsing_headers))
|
||||
{
|
||||
ngx_log_error(NGX_LOG_ERR, r->connection->log, 0,
|
||||
"upstream sent unexpected http2 frame: %d",
|
||||
ctx->type);
|
||||
return NGX_ERROR;
|
||||
}
|
||||
|
||||
if (ctx->type == NGX_HTTP_V2_DATA_FRAME) {
|
||||
|
||||
if (ctx->stream_id != ctx->id) {
|
||||
ngx_log_error(NGX_LOG_ERR, r->connection->log, 0,
|
||||
"upstream sent data frame "
|
||||
"for unknown stream %ui",
|
||||
ctx->stream_id);
|
||||
return NGX_ERROR;
|
||||
}
|
||||
|
||||
if (ctx->rest > ctx->recv_window) {
|
||||
ngx_log_error(NGX_LOG_ERR, r->connection->log, 0,
|
||||
"upstream violated stream flow control, "
|
||||
"received %uz data frame with window %uz",
|
||||
ctx->rest, ctx->recv_window);
|
||||
return NGX_ERROR;
|
||||
}
|
||||
|
||||
if (ctx->rest > ctx->connection->recv_window) {
|
||||
ngx_log_error(NGX_LOG_ERR, r->connection->log, 0,
|
||||
"upstream violated connection flow control, "
|
||||
"received %uz data frame with window %uz",
|
||||
ctx->rest, ctx->connection->recv_window);
|
||||
return NGX_ERROR;
|
||||
}
|
||||
|
||||
ctx->recv_window -= ctx->rest;
|
||||
ctx->connection->recv_window -= ctx->rest;
|
||||
|
||||
if (ctx->connection->recv_window < NGX_HTTP_V2_MAX_WINDOW / 4
|
||||
|| ctx->recv_window < NGX_HTTP_V2_MAX_WINDOW / 4)
|
||||
{
|
||||
if (ngx_http_v2_proxy_send_window_update(r, ctx)
|
||||
!= NGX_OK)
|
||||
{
|
||||
return NGX_ERROR;
|
||||
}
|
||||
|
||||
ngx_post_event(u->peer.connection->write,
|
||||
&ngx_posted_events);
|
||||
}
|
||||
}
|
||||
|
||||
if (ctx->stream_id && ctx->stream_id != ctx->id) {
|
||||
ngx_log_error(NGX_LOG_ERR, r->connection->log, 0,
|
||||
"upstream sent frame for unknown stream %ui",
|
||||
ctx->stream_id);
|
||||
return NGX_ERROR;
|
||||
}
|
||||
|
||||
if (ctx->stream_id && ctx->done
|
||||
&& ctx->type != NGX_HTTP_V2_RST_STREAM_FRAME
|
||||
&& ctx->type != NGX_HTTP_V2_WINDOW_UPDATE_FRAME)
|
||||
{
|
||||
ngx_log_error(NGX_LOG_ERR, r->connection->log, 0,
|
||||
"upstream sent frame for closed stream %ui",
|
||||
ctx->stream_id);
|
||||
return NGX_ERROR;
|
||||
}
|
||||
|
||||
ctx->padding = 0;
|
||||
if (rc == NGX_DONE) {
|
||||
u->length = 0;
|
||||
break;
|
||||
}
|
||||
|
||||
if (ctx->state == ngx_http_v2_proxy_st_padding) {
|
||||
|
||||
if (b->last - b->pos < (ssize_t) ctx->rest) {
|
||||
ctx->rest -= b->last - b->pos;
|
||||
b->pos = b->last;
|
||||
return NGX_AGAIN;
|
||||
}
|
||||
|
||||
b->pos += ctx->rest;
|
||||
ctx->rest = 0;
|
||||
ctx->state = ngx_http_v2_proxy_st_start;
|
||||
|
||||
if (ctx->flags & NGX_HTTP_V2_END_STREAM_FLAG) {
|
||||
ctx->done = 1;
|
||||
}
|
||||
|
||||
continue;
|
||||
}
|
||||
|
||||
/* frame payload */
|
||||
|
||||
if (ctx->type == NGX_HTTP_V2_RST_STREAM_FRAME) {
|
||||
|
||||
rc = ngx_http_v2_proxy_parse_rst_stream(r, ctx, b);
|
||||
|
||||
if (rc == NGX_AGAIN) {
|
||||
return NGX_AGAIN;
|
||||
}
|
||||
|
||||
if (rc == NGX_ERROR) {
|
||||
return NGX_ERROR;
|
||||
}
|
||||
|
||||
if (ctx->error || !ctx->done) {
|
||||
ngx_log_error(NGX_LOG_ERR, r->connection->log, 0,
|
||||
"upstream rejected request with error %ui",
|
||||
ctx->error);
|
||||
return NGX_ERROR;
|
||||
}
|
||||
|
||||
if (ctx->rst) {
|
||||
ngx_log_error(NGX_LOG_ERR, r->connection->log, 0,
|
||||
"upstream sent frame for closed stream %ui",
|
||||
ctx->stream_id);
|
||||
return NGX_ERROR;
|
||||
}
|
||||
|
||||
ctx->rst = 1;
|
||||
|
||||
continue;
|
||||
}
|
||||
|
||||
if (ctx->type == NGX_HTTP_V2_GOAWAY_FRAME) {
|
||||
|
||||
rc = ngx_http_v2_proxy_parse_goaway(r, ctx, b);
|
||||
|
||||
if (rc == NGX_AGAIN) {
|
||||
return NGX_AGAIN;
|
||||
}
|
||||
|
||||
if (rc == NGX_ERROR) {
|
||||
return NGX_ERROR;
|
||||
}
|
||||
|
||||
/*
|
||||
* If stream_id is lower than one we use, our
|
||||
* request won't be processed and needs to be retried.
|
||||
* If stream_id is greater or equal to the one we use,
|
||||
* we can continue normally (except we can't use this
|
||||
* connection for additional requests). If there is
|
||||
* a real error, the connection will be closed.
|
||||
*/
|
||||
|
||||
if (ctx->stream_id < ctx->id) {
|
||||
|
||||
/* TODO: we can retry non-idempotent requests */
|
||||
|
||||
ngx_log_error(NGX_LOG_ERR, r->connection->log, 0,
|
||||
"upstream sent goaway with error %ui",
|
||||
ctx->error);
|
||||
|
||||
return NGX_ERROR;
|
||||
}
|
||||
|
||||
ctx->goaway = 1;
|
||||
|
||||
continue;
|
||||
}
|
||||
|
||||
if (ctx->type == NGX_HTTP_V2_WINDOW_UPDATE_FRAME) {
|
||||
|
||||
rc = ngx_http_v2_proxy_parse_window_update(r, ctx, b);
|
||||
|
||||
if (rc == NGX_AGAIN) {
|
||||
return NGX_AGAIN;
|
||||
}
|
||||
|
||||
if (rc == NGX_ERROR) {
|
||||
return NGX_ERROR;
|
||||
}
|
||||
|
||||
if (ctx->in) {
|
||||
ngx_post_event(u->peer.connection->write, &ngx_posted_events);
|
||||
}
|
||||
|
||||
continue;
|
||||
}
|
||||
|
||||
if (ctx->type == NGX_HTTP_V2_SETTINGS_FRAME) {
|
||||
|
||||
rc = ngx_http_v2_proxy_parse_settings(r, ctx, b);
|
||||
|
||||
if (rc == NGX_AGAIN) {
|
||||
return NGX_AGAIN;
|
||||
}
|
||||
|
||||
if (rc == NGX_ERROR) {
|
||||
return NGX_ERROR;
|
||||
}
|
||||
|
||||
if (ctx->in) {
|
||||
ngx_post_event(u->peer.connection->write, &ngx_posted_events);
|
||||
}
|
||||
|
||||
continue;
|
||||
}
|
||||
|
||||
if (ctx->type == NGX_HTTP_V2_PING_FRAME) {
|
||||
|
||||
rc = ngx_http_v2_proxy_parse_ping(r, ctx, b);
|
||||
|
||||
if (rc == NGX_AGAIN) {
|
||||
return NGX_AGAIN;
|
||||
}
|
||||
|
||||
if (rc == NGX_ERROR) {
|
||||
return NGX_ERROR;
|
||||
}
|
||||
|
||||
ngx_post_event(u->peer.connection->write, &ngx_posted_events);
|
||||
continue;
|
||||
}
|
||||
|
||||
if (ctx->type == NGX_HTTP_V2_PUSH_PROMISE_FRAME) {
|
||||
ngx_log_error(NGX_LOG_ERR, r->connection->log, 0,
|
||||
"upstream sent unexpected push promise frame");
|
||||
if (rc == NGX_ERROR) {
|
||||
return NGX_ERROR;
|
||||
}
|
||||
|
||||
if (ctx->type == NGX_HTTP_V2_HEADERS_FRAME
|
||||
|| ctx->type == NGX_HTTP_V2_CONTINUATION_FRAME)
|
||||
{
|
||||
for ( ;; ) {
|
||||
|
||||
rc = ngx_http_v2_proxy_parse_header(r, ctx, b);
|
||||
|
||||
if (rc == NGX_AGAIN) {
|
||||
break;
|
||||
}
|
||||
|
||||
if (rc == NGX_OK) {
|
||||
|
||||
/* a header line has been parsed successfully */
|
||||
|
||||
ngx_log_debug2(NGX_LOG_DEBUG_HTTP, r->connection->log, 0,
|
||||
"proxy http2 trailer: \"%V: %V\"",
|
||||
&ctx->name, &ctx->value);
|
||||
|
||||
if (ctx->name.len && ctx->name.data[0] == ':') {
|
||||
ngx_log_error(NGX_LOG_ERR, r->connection->log, 0,
|
||||
"upstream sent invalid "
|
||||
"trailer \"%V: %V\"",
|
||||
&ctx->name, &ctx->value);
|
||||
return NGX_ERROR;
|
||||
}
|
||||
|
||||
h = ngx_list_push(&u->headers_in.trailers);
|
||||
if (h == NULL) {
|
||||
return NGX_ERROR;
|
||||
}
|
||||
|
||||
h->key = ctx->name;
|
||||
h->value = ctx->value;
|
||||
h->lowcase_key = h->key.data;
|
||||
h->hash = ngx_hash_key(h->key.data, h->key.len);
|
||||
|
||||
continue;
|
||||
}
|
||||
|
||||
if (rc == NGX_HTTP_PARSE_HEADER_DONE) {
|
||||
|
||||
/* a whole header has been parsed successfully */
|
||||
|
||||
ngx_log_debug0(NGX_LOG_DEBUG_HTTP, r->connection->log, 0,
|
||||
"proxy http2 trailer done");
|
||||
|
||||
if (ctx->end_stream) {
|
||||
ctx->done = 1;
|
||||
break;
|
||||
}
|
||||
|
||||
ngx_log_error(NGX_LOG_ERR, r->connection->log, 0,
|
||||
"upstream sent trailer without "
|
||||
"end stream flag");
|
||||
return NGX_ERROR;
|
||||
}
|
||||
|
||||
/* there was error while a header line parsing */
|
||||
|
||||
ngx_log_error(NGX_LOG_ERR, r->connection->log, 0,
|
||||
"upstream sent invalid trailer");
|
||||
|
||||
return NGX_ERROR;
|
||||
}
|
||||
|
||||
if (rc == NGX_HTTP_PARSE_HEADER_DONE) {
|
||||
continue;
|
||||
}
|
||||
|
||||
/* rc == NGX_AGAIN */
|
||||
|
||||
if (ctx->rest == 0) {
|
||||
ctx->state = ngx_http_v2_proxy_st_start;
|
||||
continue;
|
||||
}
|
||||
|
||||
if (rc == NGX_AGAIN) {
|
||||
return NGX_AGAIN;
|
||||
}
|
||||
|
||||
if (ctx->type != NGX_HTTP_V2_DATA_FRAME) {
|
||||
|
||||
/* priority, unknown frames */
|
||||
|
||||
if (b->last - b->pos < (ssize_t) ctx->rest) {
|
||||
ctx->rest -= b->last - b->pos;
|
||||
b->pos = b->last;
|
||||
return NGX_AGAIN;
|
||||
}
|
||||
|
||||
b->pos += ctx->rest;
|
||||
ctx->rest = 0;
|
||||
ctx->state = ngx_http_v2_proxy_st_start;
|
||||
|
||||
continue;
|
||||
}
|
||||
|
||||
/*
|
||||
* data frame:
|
||||
*
|
||||
* +---------------+
|
||||
* |Pad Length? (8)|
|
||||
* +---------------+-----------------------------------------------+
|
||||
* | Data (*) ...
|
||||
* +---------------------------------------------------------------+
|
||||
* | Padding (*) ...
|
||||
* +---------------------------------------------------------------+
|
||||
*/
|
||||
|
||||
if (ctx->flags & NGX_HTTP_V2_PADDED_FLAG) {
|
||||
|
||||
if (ctx->rest == 0) {
|
||||
ngx_log_error(NGX_LOG_ERR, r->connection->log, 0,
|
||||
"upstream sent too short http2 frame");
|
||||
return NGX_ERROR;
|
||||
}
|
||||
|
||||
if (b->pos == b->last) {
|
||||
return NGX_AGAIN;
|
||||
}
|
||||
|
||||
ctx->flags &= ~NGX_HTTP_V2_PADDED_FLAG;
|
||||
ctx->padding = *b->pos++;
|
||||
ctx->rest -= 1;
|
||||
|
||||
if (ctx->padding > ctx->rest) {
|
||||
ngx_log_error(NGX_LOG_ERR, r->connection->log, 0,
|
||||
"upstream sent http2 frame with too long "
|
||||
"padding: %d in frame %uz",
|
||||
ctx->padding, ctx->rest);
|
||||
return NGX_ERROR;
|
||||
}
|
||||
|
||||
continue;
|
||||
}
|
||||
|
||||
if (ctx->rest == ctx->padding) {
|
||||
goto done;
|
||||
}
|
||||
|
||||
if (b->pos == b->last) {
|
||||
return NGX_AGAIN;
|
||||
}
|
||||
/* rc == NGX_OK */
|
||||
|
||||
cl = ngx_chain_get_free_buf(r->pool, &u->free_bufs);
|
||||
if (cl == NULL) {
|
||||
|
@ -2153,65 +1759,20 @@ ngx_http_v2_proxy_non_buffered_filter(void *data, ssize_t bytes)
|
|||
*ll = cl;
|
||||
ll = &cl->next;
|
||||
|
||||
buf = cl->buf;
|
||||
b = cl->buf;
|
||||
|
||||
buf->flush = 1;
|
||||
buf->memory = 1;
|
||||
b->flush = 1;
|
||||
b->memory = 1;
|
||||
|
||||
buf->pos = b->pos;
|
||||
buf->tag = u->output.tag;
|
||||
b->pos = buf->pos;
|
||||
b->tag = u->output.tag;
|
||||
|
||||
ngx_log_debug1(NGX_LOG_DEBUG_HTTP, r->connection->log, 0,
|
||||
"proxy http2 output buf %p", buf->pos);
|
||||
"proxy http2 output buf %p", b->pos);
|
||||
|
||||
if (b->last - b->pos < (ssize_t) ctx->rest - ctx->padding) {
|
||||
|
||||
ctx->rest -= b->last - b->pos;
|
||||
b->pos = b->last;
|
||||
buf->last = b->pos;
|
||||
|
||||
if (ctx->length != -1) {
|
||||
|
||||
if (buf->last - buf->pos > ctx->length) {
|
||||
ngx_log_error(NGX_LOG_ERR, r->connection->log, 0,
|
||||
"upstream sent response body larger "
|
||||
"than indicated content length");
|
||||
return NGX_ERROR;
|
||||
}
|
||||
|
||||
ctx->length -= buf->last - buf->pos;
|
||||
}
|
||||
|
||||
return NGX_AGAIN;
|
||||
}
|
||||
|
||||
b->pos += ctx->rest - ctx->padding;
|
||||
buf->last = b->pos;
|
||||
ctx->rest = ctx->padding;
|
||||
|
||||
if (ctx->length != -1) {
|
||||
|
||||
if (buf->last - buf->pos > ctx->length) {
|
||||
ngx_log_error(NGX_LOG_ERR, r->connection->log, 0,
|
||||
"upstream sent response body larger "
|
||||
"than indicated content length");
|
||||
return NGX_ERROR;
|
||||
}
|
||||
|
||||
ctx->length -= buf->last - buf->pos;
|
||||
}
|
||||
|
||||
done:
|
||||
|
||||
if (ctx->padding) {
|
||||
ctx->state = ngx_http_v2_proxy_st_padding;
|
||||
continue;
|
||||
}
|
||||
|
||||
ctx->state = ngx_http_v2_proxy_st_start;
|
||||
|
||||
if (ctx->flags & NGX_HTTP_V2_END_STREAM_FLAG) {
|
||||
ctx->done = 1;
|
||||
rc = ngx_http_v2_proxy_output_data(r, ctx, buf, b);
|
||||
if (rc == NGX_ERROR || rc == NGX_AGAIN) {
|
||||
return rc;
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -2225,9 +1786,7 @@ ngx_http_v2_proxy_body_filter(ngx_event_pipe_t *p, ngx_buf_t *buf)
|
|||
ngx_int_t rc;
|
||||
ngx_buf_t *b, **prev;
|
||||
ngx_chain_t *cl;
|
||||
ngx_table_elt_t *h;
|
||||
ngx_http_request_t *r;
|
||||
ngx_http_upstream_t *u;
|
||||
ngx_http_v2_proxy_ctx_t *ctx;
|
||||
|
||||
if (buf->pos == buf->last) {
|
||||
|
@ -2241,7 +1800,6 @@ ngx_http_v2_proxy_body_filter(ngx_event_pipe_t *p, ngx_buf_t *buf)
|
|||
}
|
||||
|
||||
r = ctx->request;
|
||||
u = r->upstream;
|
||||
|
||||
b = NULL;
|
||||
prev = &buf->shadow;
|
||||
|
@ -2249,6 +1807,95 @@ ngx_http_v2_proxy_body_filter(ngx_event_pipe_t *p, ngx_buf_t *buf)
|
|||
ngx_log_debug1(NGX_LOG_DEBUG_HTTP, r->connection->log, 0,
|
||||
"proxy http2 filter bytes:%z", buf->last - buf->pos);
|
||||
|
||||
for ( ;; ) {
|
||||
|
||||
rc = ngx_http_v2_proxy_process_frames(r, ctx, buf);
|
||||
|
||||
if (rc == NGX_DONE) {
|
||||
p->length = 0;
|
||||
break;
|
||||
}
|
||||
|
||||
if (rc == NGX_ERROR) {
|
||||
return NGX_ERROR;
|
||||
}
|
||||
|
||||
if (rc == NGX_AGAIN) {
|
||||
break;
|
||||
}
|
||||
|
||||
/* copy data frame payload for buffering */
|
||||
|
||||
cl = ngx_chain_get_free_buf(p->pool, &p->free);
|
||||
if (cl == NULL) {
|
||||
return NGX_ERROR;
|
||||
}
|
||||
|
||||
b = cl->buf;
|
||||
|
||||
ngx_memzero(b, sizeof(ngx_buf_t));
|
||||
|
||||
b->pos = buf->pos;
|
||||
b->start = buf->start;
|
||||
b->end = buf->end;
|
||||
b->tag = p->tag;
|
||||
b->temporary = 1;
|
||||
b->recycled = 1;
|
||||
|
||||
*prev = b;
|
||||
prev = &b->shadow;
|
||||
|
||||
if (p->in) {
|
||||
*p->last_in = cl;
|
||||
} else {
|
||||
p->in = cl;
|
||||
}
|
||||
p->last_in = &cl->next;
|
||||
|
||||
ngx_log_debug1(NGX_LOG_DEBUG_HTTP, r->connection->log, 0,
|
||||
"proxy http2 copy buf %p", b->pos);
|
||||
|
||||
rc = ngx_http_v2_proxy_output_data(r, ctx, buf, b);
|
||||
|
||||
if (rc == NGX_ERROR) {
|
||||
return NGX_ERROR;
|
||||
}
|
||||
|
||||
if (rc == NGX_AGAIN) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
if (b) {
|
||||
b->shadow = buf;
|
||||
b->last_shadow = 1;
|
||||
|
||||
ngx_log_debug2(NGX_LOG_DEBUG_EVENT, p->log, 0,
|
||||
"input buf %p %z", b->pos, b->last - b->pos);
|
||||
|
||||
return NGX_OK;
|
||||
}
|
||||
|
||||
/* there is no data record in the buf, add it to free chain */
|
||||
|
||||
if (ngx_event_pipe_add_free_buf(p, buf) != NGX_OK) {
|
||||
return NGX_ERROR;
|
||||
}
|
||||
|
||||
return NGX_OK;
|
||||
}
|
||||
|
||||
|
||||
static ngx_int_t
|
||||
ngx_http_v2_proxy_process_frames(ngx_http_request_t *r,
|
||||
ngx_http_v2_proxy_ctx_t *ctx, ngx_buf_t *buf)
|
||||
{
|
||||
ngx_int_t rc;
|
||||
ngx_table_elt_t *h;
|
||||
ngx_http_upstream_t *u;
|
||||
|
||||
u = r->upstream;
|
||||
|
||||
for ( ;; ) {
|
||||
|
||||
if (ctx->state < ngx_http_v2_proxy_st_payload) {
|
||||
|
@ -2274,11 +1921,9 @@ ngx_http_v2_proxy_body_filter(ngx_event_pipe_t *p, ngx_buf_t *buf)
|
|||
if (ctx->out) {
|
||||
ngx_post_event(u->peer.connection->write,
|
||||
&ngx_posted_events);
|
||||
break;
|
||||
return NGX_AGAIN;
|
||||
}
|
||||
|
||||
p->length = 0;
|
||||
|
||||
if (ctx->in == NULL
|
||||
&& ctx->output_closed
|
||||
&& !ctx->output_blocked
|
||||
|
@ -2288,10 +1933,10 @@ ngx_http_v2_proxy_body_filter(ngx_event_pipe_t *p, ngx_buf_t *buf)
|
|||
u->keepalive = 1;
|
||||
}
|
||||
|
||||
break;
|
||||
return NGX_DONE;
|
||||
}
|
||||
|
||||
break;
|
||||
return NGX_AGAIN;
|
||||
}
|
||||
|
||||
if (rc == NGX_ERROR) {
|
||||
|
@ -2377,7 +2022,7 @@ ngx_http_v2_proxy_body_filter(ngx_event_pipe_t *p, ngx_buf_t *buf)
|
|||
if (buf->last - buf->pos < (ssize_t) ctx->rest) {
|
||||
ctx->rest -= buf->last - buf->pos;
|
||||
buf->pos = buf->last;
|
||||
break;
|
||||
return NGX_AGAIN;
|
||||
}
|
||||
|
||||
buf->pos += ctx->rest;
|
||||
|
@ -2386,7 +2031,6 @@ ngx_http_v2_proxy_body_filter(ngx_event_pipe_t *p, ngx_buf_t *buf)
|
|||
|
||||
if (ctx->flags & NGX_HTTP_V2_END_STREAM_FLAG) {
|
||||
ctx->done = 1;
|
||||
p->length = 0;
|
||||
}
|
||||
|
||||
continue;
|
||||
|
@ -2399,7 +2043,7 @@ ngx_http_v2_proxy_body_filter(ngx_event_pipe_t *p, ngx_buf_t *buf)
|
|||
rc = ngx_http_v2_proxy_parse_rst_stream(r, ctx, buf);
|
||||
|
||||
if (rc == NGX_AGAIN) {
|
||||
break;
|
||||
return NGX_AGAIN;
|
||||
}
|
||||
|
||||
if (rc == NGX_ERROR) {
|
||||
|
@ -2421,6 +2065,7 @@ ngx_http_v2_proxy_body_filter(ngx_event_pipe_t *p, ngx_buf_t *buf)
|
|||
}
|
||||
|
||||
ctx->rst = 1;
|
||||
|
||||
continue;
|
||||
}
|
||||
|
||||
|
@ -2429,21 +2074,35 @@ ngx_http_v2_proxy_body_filter(ngx_event_pipe_t *p, ngx_buf_t *buf)
|
|||
rc = ngx_http_v2_proxy_parse_goaway(r, ctx, buf);
|
||||
|
||||
if (rc == NGX_AGAIN) {
|
||||
break;
|
||||
return NGX_AGAIN;
|
||||
}
|
||||
|
||||
if (rc == NGX_ERROR) {
|
||||
return NGX_ERROR;
|
||||
}
|
||||
|
||||
/*
|
||||
* If stream_id is lower than one we use, our
|
||||
* request won't be processed and needs to be retried.
|
||||
* If stream_id is greater or equal to the one we use,
|
||||
* we can continue normally (except we can't use this
|
||||
* connection for additional requests). If there is
|
||||
* a real error, the connection will be closed.
|
||||
*/
|
||||
|
||||
if (ctx->stream_id < ctx->id) {
|
||||
|
||||
/* TODO: we can retry non-idempotent requests */
|
||||
|
||||
ngx_log_error(NGX_LOG_ERR, r->connection->log, 0,
|
||||
"upstream sent goaway with error %ui",
|
||||
ctx->error);
|
||||
|
||||
return NGX_ERROR;
|
||||
}
|
||||
|
||||
ctx->goaway = 1;
|
||||
|
||||
continue;
|
||||
}
|
||||
|
||||
|
@ -2452,7 +2111,7 @@ ngx_http_v2_proxy_body_filter(ngx_event_pipe_t *p, ngx_buf_t *buf)
|
|||
rc = ngx_http_v2_proxy_parse_window_update(r, ctx, buf);
|
||||
|
||||
if (rc == NGX_AGAIN) {
|
||||
break;
|
||||
return NGX_AGAIN;
|
||||
}
|
||||
|
||||
if (rc == NGX_ERROR) {
|
||||
|
@ -2471,7 +2130,7 @@ ngx_http_v2_proxy_body_filter(ngx_event_pipe_t *p, ngx_buf_t *buf)
|
|||
rc = ngx_http_v2_proxy_parse_settings(r, ctx, buf);
|
||||
|
||||
if (rc == NGX_AGAIN) {
|
||||
break;
|
||||
return NGX_AGAIN;
|
||||
}
|
||||
|
||||
if (rc == NGX_ERROR) {
|
||||
|
@ -2490,7 +2149,7 @@ ngx_http_v2_proxy_body_filter(ngx_event_pipe_t *p, ngx_buf_t *buf)
|
|||
rc = ngx_http_v2_proxy_parse_ping(r, ctx, buf);
|
||||
|
||||
if (rc == NGX_AGAIN) {
|
||||
break;
|
||||
return NGX_AGAIN;
|
||||
}
|
||||
|
||||
if (rc == NGX_ERROR) {
|
||||
|
@ -2584,7 +2243,7 @@ ngx_http_v2_proxy_body_filter(ngx_event_pipe_t *p, ngx_buf_t *buf)
|
|||
continue;
|
||||
}
|
||||
|
||||
break;
|
||||
return NGX_AGAIN;
|
||||
}
|
||||
|
||||
if (ctx->type != NGX_HTTP_V2_DATA_FRAME) {
|
||||
|
@ -2594,7 +2253,7 @@ ngx_http_v2_proxy_body_filter(ngx_event_pipe_t *p, ngx_buf_t *buf)
|
|||
if (buf->last - buf->pos < (ssize_t) ctx->rest) {
|
||||
ctx->rest -= buf->last - buf->pos;
|
||||
buf->pos = buf->last;
|
||||
break;
|
||||
return NGX_AGAIN;
|
||||
}
|
||||
|
||||
buf->pos += ctx->rest;
|
||||
|
@ -2625,7 +2284,7 @@ ngx_http_v2_proxy_body_filter(ngx_event_pipe_t *p, ngx_buf_t *buf)
|
|||
}
|
||||
|
||||
if (buf->pos == buf->last) {
|
||||
break;
|
||||
return NGX_AGAIN;
|
||||
}
|
||||
|
||||
ctx->flags &= ~NGX_HTTP_V2_PADDED_FLAG;
|
||||
|
@ -2644,112 +2303,83 @@ ngx_http_v2_proxy_body_filter(ngx_event_pipe_t *p, ngx_buf_t *buf)
|
|||
}
|
||||
|
||||
if (ctx->rest == ctx->padding) {
|
||||
goto done;
|
||||
|
||||
if (ctx->padding) {
|
||||
ctx->state = ngx_http_v2_proxy_st_padding;
|
||||
|
||||
} else {
|
||||
ctx->state = ngx_http_v2_proxy_st_start;
|
||||
|
||||
if (ctx->flags & NGX_HTTP_V2_END_STREAM_FLAG) {
|
||||
ctx->done = 1;
|
||||
}
|
||||
}
|
||||
|
||||
continue;
|
||||
}
|
||||
|
||||
if (buf->pos == buf->last) {
|
||||
break;
|
||||
return NGX_AGAIN;
|
||||
}
|
||||
|
||||
/* copy data frame payload for buffering */
|
||||
return NGX_OK;
|
||||
}
|
||||
}
|
||||
|
||||
cl = ngx_chain_get_free_buf(p->pool, &p->free);
|
||||
if (cl == NULL) {
|
||||
return NGX_ERROR;
|
||||
}
|
||||
|
||||
b = cl->buf;
|
||||
static ngx_int_t
|
||||
ngx_http_v2_proxy_output_data(ngx_http_request_t *r,
|
||||
ngx_http_v2_proxy_ctx_t *ctx, ngx_buf_t *in, ngx_buf_t *out)
|
||||
{
|
||||
if (in->last - in->pos < (ssize_t) ctx->rest - ctx->padding) {
|
||||
|
||||
ngx_memzero(b, sizeof(ngx_buf_t));
|
||||
|
||||
b->pos = buf->pos;
|
||||
b->start = buf->start;
|
||||
b->end = buf->end;
|
||||
b->tag = p->tag;
|
||||
b->temporary = 1;
|
||||
b->recycled = 1;
|
||||
|
||||
*prev = b;
|
||||
prev = &b->shadow;
|
||||
|
||||
if (p->in) {
|
||||
*p->last_in = cl;
|
||||
} else {
|
||||
p->in = cl;
|
||||
}
|
||||
p->last_in = &cl->next;
|
||||
|
||||
ngx_log_debug1(NGX_LOG_DEBUG_HTTP, r->connection->log, 0,
|
||||
"proxy http2 copy buf %p", b->pos);
|
||||
|
||||
if (buf->last - buf->pos < (ssize_t) ctx->rest - ctx->padding) {
|
||||
|
||||
ctx->rest -= buf->last - buf->pos;
|
||||
buf->pos = buf->last;
|
||||
b->last = buf->pos;
|
||||
|
||||
if (ctx->length != -1) {
|
||||
|
||||
if (b->last - b->pos > ctx->length) {
|
||||
ngx_log_error(NGX_LOG_ERR, r->connection->log, 0,
|
||||
"upstream sent response body larger "
|
||||
"than indicated content length");
|
||||
return NGX_ERROR;
|
||||
}
|
||||
|
||||
ctx->length -= b->last - b->pos;
|
||||
}
|
||||
|
||||
break;
|
||||
}
|
||||
|
||||
buf->pos += ctx->rest - ctx->padding;
|
||||
b->last = buf->pos;
|
||||
ctx->rest = ctx->padding;
|
||||
ctx->rest -= in->last - in->pos;
|
||||
in->pos = in->last;
|
||||
out->last = in->pos;
|
||||
|
||||
if (ctx->length != -1) {
|
||||
|
||||
if (b->last - b->pos > ctx->length) {
|
||||
if (out->last - out->pos > ctx->length) {
|
||||
ngx_log_error(NGX_LOG_ERR, r->connection->log, 0,
|
||||
"upstream sent response body larger "
|
||||
"than indicated content length");
|
||||
return NGX_ERROR;
|
||||
}
|
||||
|
||||
ctx->length -= b->last - b->pos;
|
||||
ctx->length -= out->last - out->pos;
|
||||
}
|
||||
|
||||
done:
|
||||
return NGX_AGAIN;
|
||||
}
|
||||
|
||||
if (ctx->padding) {
|
||||
ctx->state = ngx_http_v2_proxy_st_padding;
|
||||
continue;
|
||||
in->pos += ctx->rest - ctx->padding;
|
||||
out->last = in->pos;
|
||||
ctx->rest = ctx->padding;
|
||||
|
||||
if (ctx->length != -1) {
|
||||
|
||||
if (out->last - out->pos > ctx->length) {
|
||||
ngx_log_error(NGX_LOG_ERR, r->connection->log, 0,
|
||||
"upstream sent response body larger "
|
||||
"than indicated content length");
|
||||
return NGX_ERROR;
|
||||
}
|
||||
|
||||
ctx->length -= out->last - out->pos;
|
||||
}
|
||||
|
||||
if (ctx->padding) {
|
||||
ctx->state = ngx_http_v2_proxy_st_padding;
|
||||
|
||||
} else {
|
||||
ctx->state = ngx_http_v2_proxy_st_start;
|
||||
|
||||
if (ctx->flags & NGX_HTTP_V2_END_STREAM_FLAG) {
|
||||
ctx->done = 1;
|
||||
p->length = 0;
|
||||
//p->length = 0;
|
||||
}
|
||||
}
|
||||
|
||||
if (b) {
|
||||
b->shadow = buf;
|
||||
b->last_shadow = 1;
|
||||
|
||||
ngx_log_debug2(NGX_LOG_DEBUG_EVENT, p->log, 0,
|
||||
"input buf %p %z", b->pos, b->last - b->pos);
|
||||
|
||||
return NGX_OK;
|
||||
}
|
||||
|
||||
/* there is no data record in the buf, add it to free chain */
|
||||
|
||||
if (ngx_event_pipe_add_free_buf(p, buf) != NGX_OK) {
|
||||
return NGX_ERROR;
|
||||
}
|
||||
|
||||
return NGX_OK;
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue