]> git.kaiwu.me - nginx.git/commitdiff
QUIC: separate files for stream related processing.
authorVladimir Homutov <vl@nginx.com>
Tue, 13 Apr 2021 11:40:00 +0000 (14:40 +0300)
committerVladimir Homutov <vl@nginx.com>
Tue, 13 Apr 2021 11:40:00 +0000 (14:40 +0300)
auto/modules
src/event/quic/ngx_event_quic.c
src/event/quic/ngx_event_quic_connection.h
src/event/quic/ngx_event_quic_streams.c [new file with mode: 0644]
src/event/quic/ngx_event_quic_streams.h [new file with mode: 0644]

index a3518574104877daa58e3eeab60060c2a6efe1db..ba0131da65966b44f42d35319f29b0246e4aa541 100644 (file)
@@ -1345,13 +1345,15 @@ if [ $USE_OPENSSL$USE_OPENSSL_QUIC = YESYES ]; then
                      src/event/quic/ngx_event_quic_connection.h \
                      src/event/quic/ngx_event_quic_frames.h \
                      src/event/quic/ngx_event_quic_connid.h \
-                     src/event/quic/ngx_event_quic_migration.h"
+                     src/event/quic/ngx_event_quic_migration.h \
+                     src/event/quic/ngx_event_quic_streams.h"
     ngx_module_srcs="src/event/quic/ngx_event_quic.c \
                      src/event/quic/ngx_event_quic_transport.c \
                      src/event/quic/ngx_event_quic_protection.c \
                      src/event/quic/ngx_event_quic_frames.c \
                      src/event/quic/ngx_event_quic_connid.c \
-                     src/event/quic/ngx_event_quic_migration.c"
+                     src/event/quic/ngx_event_quic_migration.c \
+                     src/event/quic/ngx_event_quic_streams.c"
 
     ngx_module_libs=
     ngx_module_link=YES
index cc44250447d93db70dea769b11967cef5ec43f6d..703425085920266e373126008d263c04005fc344 100644 (file)
@@ -21,8 +21,6 @@
  */
 #define NGX_QUIC_MAX_BUFFERED    65535
 
-#define NGX_QUIC_STREAM_GONE     (void *) -1
-
 /*
  * Endpoints MUST discard packets that are too small to be valid QUIC
  * packets.  With the set of AEAD functions defined in [QUIC-TLS],
@@ -80,8 +78,6 @@ static void ngx_quic_input_handler(ngx_event_t *rev);
 
 static ngx_int_t ngx_quic_close_quic(ngx_connection_t *c, ngx_int_t rc);
 static void ngx_quic_close_timer_handler(ngx_event_t *ev);
-static ngx_int_t ngx_quic_close_streams(ngx_connection_t *c,
-    ngx_quic_connection_t *qc);
 
 static ngx_int_t ngx_quic_input(ngx_connection_t *c, ngx_buf_t *b,
     ngx_quic_conf_t *conf);
@@ -115,34 +111,12 @@ static ngx_int_t ngx_quic_handle_ack_frame_range(ngx_connection_t *c,
     ngx_msec_t *send_time);
 static void ngx_quic_rtt_sample(ngx_connection_t *c, ngx_quic_ack_frame_t *ack,
     enum ssl_encryption_level_t level, ngx_msec_t send_time);
-static void ngx_quic_handle_stream_ack(ngx_connection_t *c,
-    ngx_quic_frame_t *f);
 
 static ngx_int_t ngx_quic_handle_crypto_frame(ngx_connection_t *c,
     ngx_quic_header_t *pkt, ngx_quic_frame_t *frame);
 ngx_int_t ngx_quic_crypto_input(ngx_connection_t *c,
     ngx_quic_frame_t *frame, void *data);
-static ngx_int_t ngx_quic_handle_stream_frame(ngx_connection_t *c,
-    ngx_quic_header_t *pkt, ngx_quic_frame_t *frame);
-static ngx_int_t ngx_quic_stream_input(ngx_connection_t *c,
-    ngx_quic_frame_t *frame, void *data);
 
-static ngx_int_t ngx_quic_handle_max_data_frame(ngx_connection_t *c,
-    ngx_quic_max_data_frame_t *f);
-static ngx_int_t ngx_quic_handle_streams_blocked_frame(ngx_connection_t *c,
-    ngx_quic_header_t *pkt, ngx_quic_streams_blocked_frame_t *f);
-static ngx_int_t ngx_quic_handle_stream_data_blocked_frame(ngx_connection_t *c,
-    ngx_quic_header_t *pkt, ngx_quic_stream_data_blocked_frame_t *f);
-static ngx_int_t ngx_quic_handle_max_stream_data_frame(ngx_connection_t *c,
-    ngx_quic_header_t *pkt, ngx_quic_max_stream_data_frame_t *f);
-static ngx_int_t ngx_quic_handle_reset_stream_frame(ngx_connection_t *c,
-    ngx_quic_header_t *pkt, ngx_quic_reset_stream_frame_t *f);
-static ngx_int_t ngx_quic_handle_stop_sending_frame(ngx_connection_t *c,
-    ngx_quic_header_t *pkt, ngx_quic_stop_sending_frame_t *f);
-static ngx_int_t ngx_quic_handle_max_streams_frame(ngx_connection_t *c,
-    ngx_quic_header_t *pkt, ngx_quic_max_streams_frame_t *f);
-
-static ngx_int_t ngx_quic_output(ngx_connection_t *c);
 static ngx_uint_t ngx_quic_get_padding_level(ngx_connection_t *c);
 static ngx_int_t ngx_quic_generate_ack(ngx_connection_t *c,
     ngx_quic_send_ctx_t *ctx);
@@ -160,24 +134,6 @@ static void ngx_quic_resend_frames(ngx_connection_t *c,
     ngx_quic_send_ctx_t *ctx);
 static void ngx_quic_push_handler(ngx_event_t *ev);
 
-static void ngx_quic_rbtree_insert_stream(ngx_rbtree_node_t *temp,
-    ngx_rbtree_node_t *node, ngx_rbtree_node_t *sentinel);
-static ngx_quic_stream_t *ngx_quic_find_stream(ngx_rbtree_t *rbtree,
-    uint64_t id);
-static ngx_quic_stream_t *ngx_quic_create_client_stream(ngx_connection_t *c,
-    uint64_t id);
-static ngx_quic_stream_t *ngx_quic_create_stream(ngx_connection_t *c,
-    uint64_t id, size_t rcvbuf_size);
-static ssize_t ngx_quic_stream_recv(ngx_connection_t *c, u_char *buf,
-    size_t size);
-static ssize_t ngx_quic_stream_send(ngx_connection_t *c, u_char *buf,
-    size_t size);
-static ngx_chain_t *ngx_quic_stream_send_chain(ngx_connection_t *c,
-    ngx_chain_t *in, off_t limit);
-static size_t ngx_quic_max_stream_flow(ngx_connection_t *c);
-static void ngx_quic_stream_cleanup_handler(void *data);
-static void ngx_quic_shutdown_quic(ngx_connection_t *c);
-
 static void ngx_quic_congestion_ack(ngx_connection_t *c,
     ngx_quic_frame_t *frame);
 static void ngx_quic_congestion_lost(ngx_connection_t *c,
@@ -1607,94 +1563,6 @@ ngx_quic_close_timer_handler(ngx_event_t *ev)
 }
 
 
-static ngx_int_t
-ngx_quic_close_streams(ngx_connection_t *c, ngx_quic_connection_t *qc)
-{
-    ngx_event_t        *rev, *wev;
-    ngx_rbtree_t       *tree;
-    ngx_rbtree_node_t  *node;
-    ngx_quic_stream_t  *qs;
-
-#if (NGX_DEBUG)
-    ngx_uint_t          ns;
-#endif
-
-    tree = &qc->streams.tree;
-
-    if (tree->root == tree->sentinel) {
-        return NGX_OK;
-    }
-
-#if (NGX_DEBUG)
-    ns = 0;
-#endif
-
-    for (node = ngx_rbtree_min(tree->root, tree->sentinel);
-         node;
-         node = ngx_rbtree_next(tree, node))
-    {
-        qs = (ngx_quic_stream_t *) node;
-
-        rev = qs->c->read;
-        rev->error = 1;
-        rev->ready = 1;
-
-        wev = qs->c->write;
-        wev->error = 1;
-        wev->ready = 1;
-
-        ngx_post_event(rev, &ngx_posted_events);
-
-        if (rev->timer_set) {
-            ngx_del_timer(rev);
-        }
-
-#if (NGX_DEBUG)
-        ns++;
-#endif
-    }
-
-    ngx_log_debug1(NGX_LOG_DEBUG_EVENT, c->log, 0,
-                   "quic connection has %ui active streams", ns);
-
-    return NGX_AGAIN;
-}
-
-
-ngx_int_t
-ngx_quic_reset_stream(ngx_connection_t *c, ngx_uint_t err)
-{
-    ngx_event_t            *wev;
-    ngx_connection_t       *pc;
-    ngx_quic_frame_t       *frame;
-    ngx_quic_stream_t      *qs;
-    ngx_quic_connection_t  *qc;
-
-    qs = c->quic;
-    pc = qs->parent;
-    qc = ngx_quic_get_connection(pc);
-
-    frame = ngx_quic_alloc_frame(pc);
-    if (frame == NULL) {
-        return NGX_ERROR;
-    }
-
-    frame->level = ssl_encryption_application;
-    frame->type = NGX_QUIC_FT_RESET_STREAM;
-    frame->u.reset_stream.id = qs->id;
-    frame->u.reset_stream.error_code = err;
-    frame->u.reset_stream.final_size = c->sent;
-
-    ngx_quic_queue_frame(qc, frame);
-
-    wev = c->write;
-    wev->error = 1;
-    wev->ready = 1;
-
-    return NGX_OK;
-}
-
-
 static ngx_int_t
 ngx_quic_input(ngx_connection_t *c, ngx_buf_t *b, ngx_quic_conf_t *conf)
 {
@@ -3152,38 +3020,6 @@ ngx_quic_pto(ngx_connection_t *c, ngx_quic_send_ctx_t *ctx)
 }
 
 
-static void
-ngx_quic_handle_stream_ack(ngx_connection_t *c, ngx_quic_frame_t *f)
-{
-    uint64_t                sent, unacked;
-    ngx_event_t            *wev;
-    ngx_quic_stream_t      *sn;
-    ngx_quic_connection_t  *qc;
-
-    qc = ngx_quic_get_connection(c);
-
-    sn = ngx_quic_find_stream(&qc->streams.tree, f->u.stream.stream_id);
-    if (sn == NULL) {
-        return;
-    }
-
-    wev = sn->c->write;
-    sent = sn->c->sent;
-    unacked = sent - sn->acked;
-
-    if (unacked >= NGX_QUIC_STREAM_BUFSIZE && wev->active) {
-        wev->ready = 1;
-        ngx_post_event(wev, &ngx_posted_events);
-    }
-
-    sn->acked += f->u.stream.length;
-
-    ngx_log_debug3(NGX_LOG_DEBUG_EVENT, sn->c->log, 0,
-                   "quic stream ack len:%uL acked:%uL unacked:%uL",
-                   f->u.stream.length, sn->acked, sent - sn->acked);
-}
-
-
 static ngx_int_t
 ngx_quic_handle_crypto_frame(ngx_connection_t *c, ngx_quic_header_t *pkt,
     ngx_quic_frame_t *frame)
@@ -3334,1904 +3170,785 @@ ngx_quic_crypto_input(ngx_connection_t *c, ngx_quic_frame_t *frame, void *data)
 }
 
 
-static ngx_int_t
-ngx_quic_handle_stream_frame(ngx_connection_t *c, ngx_quic_header_t *pkt,
-    ngx_quic_frame_t *frame)
+ngx_int_t
+ngx_quic_output(ngx_connection_t *c)
 {
-    size_t                     window;
-    uint64_t                   last;
-    ngx_buf_t                 *b;
-    ngx_pool_t                *pool;
-    ngx_connection_t          *sc;
-    ngx_quic_stream_t         *sn;
-    ngx_quic_connection_t     *qc;
-    ngx_quic_stream_frame_t   *f;
-    ngx_quic_frames_stream_t  *fs;
-
-    qc = ngx_quic_get_connection(c);
-    f = &frame->u.stream;
-
-    if ((f->stream_id & NGX_QUIC_STREAM_UNIDIRECTIONAL)
-        && (f->stream_id & NGX_QUIC_STREAM_SERVER_INITIATED))
-    {
-        qc->error = NGX_QUIC_ERR_STREAM_STATE_ERROR;
-        return NGX_ERROR;
-    }
-
-    /* no overflow since both values are 62-bit */
-    last = f->offset + f->length;
-
-    sn = ngx_quic_find_stream(&qc->streams.tree, f->stream_id);
+    off_t                   max;
+    size_t                  len, min, in_flight;
+    ssize_t                 n;
+    u_char                 *p;
+    ngx_uint_t              i, pad;
+    ngx_quic_send_ctx_t    *ctx;
+    ngx_quic_congestion_t  *cg;
+    ngx_quic_connection_t  *qc;
+    static u_char           dst[NGX_QUIC_MAX_UDP_PAYLOAD_SIZE];
 
-    if (sn == NULL) {
-        sn = ngx_quic_create_client_stream(c, f->stream_id);
+    c->log->action = "sending frames";
 
-        if (sn == NULL) {
-            return NGX_ERROR;
-        }
+    qc = ngx_quic_get_connection(c);
+    cg = &qc->congestion;
 
-        if (sn == NGX_QUIC_STREAM_GONE) {
-            return NGX_OK;
-        }
+    in_flight = cg->in_flight;
 
-        sc = sn->c;
-        fs = &sn->fs;
-        b = sn->b;
-        window = b->end - b->last;
+    for ( ;; ) {
+        p = dst;
 
-        if (last > window) {
-            qc->error = NGX_QUIC_ERR_FLOW_CONTROL_ERROR;
-            goto cleanup;
-        }
+        len = ngx_min(qc->ctp.max_udp_payload_size,
+                      NGX_QUIC_MAX_UDP_PAYLOAD_SIZE);
 
-        if (ngx_quic_handle_ordered_frame(c, fs, frame, ngx_quic_stream_input,
-                                          sn)
-            != NGX_OK)
-        {
-            goto cleanup;
+        if (!qc->validated) {
+            max = qc->received * 3;
+            max = (c->sent >= max) ? 0 : max - c->sent;
+            len = ngx_min(len, (size_t) max);
         }
 
-        sc->listening->handler(sc);
-
-        return NGX_OK;
-    }
-
-    fs = &sn->fs;
-    b = sn->b;
-    window = (b->pos - b->start) + (b->end - b->last);
-
-    if (last > fs->received && last - fs->received > window) {
-        qc->error = NGX_QUIC_ERR_FLOW_CONTROL_ERROR;
-        return NGX_ERROR;
-    }
-
-    return ngx_quic_handle_ordered_frame(c, fs, frame, ngx_quic_stream_input,
-                                         sn);
-
-cleanup:
-
-    pool = sc->pool;
-
-    ngx_close_connection(sc);
-    ngx_destroy_pool(pool);
-
-    return NGX_ERROR;
-}
-
-
-static ngx_int_t
-ngx_quic_stream_input(ngx_connection_t *c, ngx_quic_frame_t *frame, void *data)
-{
-    uint64_t                  id;
-    ngx_buf_t                *b;
-    ngx_event_t              *rev;
-    ngx_chain_t              *cl;
-    ngx_quic_stream_t        *sn;
-    ngx_quic_connection_t    *qc;
-    ngx_quic_stream_frame_t  *f;
+        pad = ngx_quic_get_padding_level(c);
 
-    qc = ngx_quic_get_connection(c);
-    sn = data;
+        for (i = 0; i < NGX_QUIC_SEND_CTX_LAST; i++) {
 
-    f = &frame->u.stream;
-    id = f->stream_id;
+            ctx = &qc->send_ctx[i];
 
-    b = sn->b;
+            if (ngx_quic_generate_ack(c, ctx) != NGX_OK) {
+                return NGX_ERROR;
+            }
 
-    if ((size_t) ((b->pos - b->start) + (b->end - b->last)) < f->length) {
-        ngx_log_error(NGX_LOG_INFO, c->log, 0,
-                      "quic no space in stream buffer");
-        return NGX_ERROR;
-    }
+            min = (i == pad && p - dst < NGX_QUIC_MIN_INITIAL_SIZE)
+                  ? NGX_QUIC_MIN_INITIAL_SIZE - (p - dst) : 0;
 
-    if ((size_t) (b->end - b->last) < f->length) {
-        b->last = ngx_movemem(b->start, b->pos, b->last - b->pos);
-        b->pos = b->start;
-    }
+            n = ngx_quic_output_packet(c, ctx, p, len, min);
+            if (n == NGX_ERROR) {
+                return NGX_ERROR;
+            }
 
-    for (cl = frame->data; cl; cl = cl->next) {
-        b->last = ngx_cpymem(b->last, cl->buf->pos,
-                             cl->buf->last - cl->buf->pos);
-    }
+            p += n;
+            len -= n;
+        }
 
-    rev = sn->c->read;
-    rev->ready = 1;
+        len = p - dst;
+        if (len == 0) {
+            break;
+        }
 
-    if (f->fin) {
-        rev->pending_eof = 1;
+        n = ngx_quic_send(c, dst, len);
+        if (n == NGX_ERROR) {
+            return NGX_ERROR;
+        }
     }
 
-    if (rev->active) {
-        rev->handler(rev);
+    if (in_flight != cg->in_flight && !qc->send_timer_set && !qc->closing) {
+        qc->send_timer_set = 1;
+        ngx_add_timer(c->read, qc->tp.max_idle_timeout);
     }
 
-    /* check if stream was destroyed by handler */
-    if (ngx_quic_find_stream(&qc->streams.tree, id) == NULL) {
-        return NGX_DONE;
-    }
+    ngx_quic_set_lost_timer(c);
 
     return NGX_OK;
 }
 
 
-static ngx_int_t
-ngx_quic_handle_max_data_frame(ngx_connection_t *c,
-    ngx_quic_max_data_frame_t *f)
+static ngx_uint_t
+ngx_quic_get_padding_level(ngx_connection_t *c)
 {
-    ngx_event_t            *wev;
-    ngx_rbtree_t           *tree;
-    ngx_rbtree_node_t      *node;
-    ngx_quic_stream_t      *qs;
+    ngx_queue_t            *q;
+    ngx_quic_frame_t       *f;
+    ngx_quic_send_ctx_t    *ctx;
     ngx_quic_connection_t  *qc;
 
-    qc = ngx_quic_get_connection(c);
-    tree = &qc->streams.tree;
+    /*
+     * 14.1.  Initial Datagram Size
+     *
+     * Similarly, a server MUST expand the payload of all UDP datagrams
+     * carrying ack-eliciting Initial packets to at least the smallest
+     * allowed maximum datagram size of 1200 bytes
+     */
 
-    if (f->max_data <= qc->streams.send_max_data) {
-        return NGX_OK;
-    }
+    qc = ngx_quic_get_connection(c);
+    ctx = ngx_quic_get_send_ctx(qc, ssl_encryption_initial);
 
-    if (qc->streams.sent >= qc->streams.send_max_data) {
+    for (q = ngx_queue_head(&ctx->frames);
+         q != ngx_queue_sentinel(&ctx->frames);
+         q = ngx_queue_next(q))
+    {
+        f = ngx_queue_data(q, ngx_quic_frame_t, queue);
 
-        for (node = ngx_rbtree_min(tree->root, tree->sentinel);
-             node;
-             node = ngx_rbtree_next(tree, node))
-        {
-            qs = (ngx_quic_stream_t *) node;
-            wev = qs->c->write;
+        if (f->need_ack) {
+            ctx = ngx_quic_get_send_ctx(qc, ssl_encryption_handshake);
 
-            if (wev->active) {
-                wev->ready = 1;
-                ngx_post_event(wev, &ngx_posted_events);
+            if (ngx_queue_empty(&ctx->frames)) {
+                return 0;
             }
+
+            return 1;
         }
     }
 
-    qc->streams.send_max_data = f->max_data;
-
-    return NGX_OK;
-}
-
-
-static ngx_int_t
-ngx_quic_handle_streams_blocked_frame(ngx_connection_t *c,
-    ngx_quic_header_t *pkt, ngx_quic_streams_blocked_frame_t *f)
-{
-    return NGX_OK;
+    return NGX_QUIC_SEND_CTX_LAST;
 }
 
 
 static ngx_int_t
-ngx_quic_handle_stream_data_blocked_frame(ngx_connection_t *c,
-    ngx_quic_header_t *pkt, ngx_quic_stream_data_blocked_frame_t *f)
+ngx_quic_generate_ack(ngx_connection_t *c, ngx_quic_send_ctx_t *ctx)
 {
-    size_t                  n;
-    ngx_buf_t              *b;
-    ngx_quic_frame_t       *frame;
-    ngx_quic_stream_t      *sn;
+    ngx_msec_t              delay;
     ngx_quic_connection_t  *qc;
 
-    qc = ngx_quic_get_connection(c);
-
-    if ((f->id & NGX_QUIC_STREAM_UNIDIRECTIONAL)
-        && (f->id & NGX_QUIC_STREAM_SERVER_INITIATED))
-    {
-        qc->error = NGX_QUIC_ERR_STREAM_STATE_ERROR;
-        return NGX_ERROR;
+    if (!ctx->send_ack) {
+        return NGX_OK;
     }
 
-    sn = ngx_quic_find_stream(&qc->streams.tree, f->id);
+    if (ctx->level == ssl_encryption_application)  {
 
-    if (sn == NULL) {
-        sn = ngx_quic_create_client_stream(c, f->id);
+        delay = ngx_current_msec - ctx->ack_delay_start;
+        qc = ngx_quic_get_connection(c);
 
-        if (sn == NULL) {
-            return NGX_ERROR;
-        }
+        if (ctx->send_ack < NGX_QUIC_MAX_ACK_GAP
+            && delay < qc->tp.max_ack_delay)
+        {
+            if (!qc->push.timer_set && !qc->closing) {
+                ngx_add_timer(&qc->push,
+                              qc->tp.max_ack_delay - delay);
+            }
 
-        if (sn == NGX_QUIC_STREAM_GONE) {
             return NGX_OK;
         }
-
-        b = sn->b;
-        n = b->end - b->last;
-
-        sn->c->listening->handler(sn->c);
-
-    } else {
-        b = sn->b;
-        n = sn->fs.received + (b->pos - b->start) + (b->end - b->last);
     }
 
-    frame = ngx_quic_alloc_frame(c);
-    if (frame == NULL) {
+    if (ngx_quic_send_ack(c, ctx) != NGX_OK) {
         return NGX_ERROR;
     }
 
-    frame->level = pkt->level;
-    frame->type = NGX_QUIC_FT_MAX_STREAM_DATA;
-    frame->u.max_stream_data.id = f->id;
-    frame->u.max_stream_data.limit = n;
-
-    ngx_quic_queue_frame(qc, frame);
+    ctx->send_ack = 0;
 
     return NGX_OK;
 }
 
 
-static ngx_int_t
-ngx_quic_handle_max_stream_data_frame(ngx_connection_t *c,
-    ngx_quic_header_t *pkt, ngx_quic_max_stream_data_frame_t *f)
+static ssize_t
+ngx_quic_output_packet(ngx_connection_t *c, ngx_quic_send_ctx_t *ctx,
+    u_char *data, size_t max, size_t min)
 {
-    uint64_t                sent;
-    ngx_event_t            *wev;
-    ngx_quic_stream_t      *sn;
-    ngx_quic_connection_t  *qc;
-
-    qc = ngx_quic_get_connection(c);
-
-    if ((f->id & NGX_QUIC_STREAM_UNIDIRECTIONAL)
-        && (f->id & NGX_QUIC_STREAM_SERVER_INITIATED) == 0)
-    {
-        qc->error = NGX_QUIC_ERR_STREAM_STATE_ERROR;
-        return NGX_ERROR;
-    }
-
-    sn = ngx_quic_find_stream(&qc->streams.tree, f->id);
-
-    if (sn == NULL) {
-        sn = ngx_quic_create_client_stream(c, f->id);
-
-        if (sn == NULL) {
-            return NGX_ERROR;
-        }
-
-        if (sn == NGX_QUIC_STREAM_GONE) {
-            return NGX_OK;
-        }
-
-        if (f->limit > sn->send_max_data) {
-            sn->send_max_data = f->limit;
-        }
-
-        sn->c->listening->handler(sn->c);
-
-        return NGX_OK;
-    }
-
-    if (f->limit <= sn->send_max_data) {
-        return NGX_OK;
-    }
-
-    sent = sn->c->sent;
-
-    if (sent >= sn->send_max_data) {
-        wev = sn->c->write;
-
-        if (wev->active) {
-            wev->ready = 1;
-            ngx_post_event(wev, &ngx_posted_events);
-        }
-    }
-
-    sn->send_max_data = f->limit;
-
-    return NGX_OK;
-}
-
-
-static ngx_int_t
-ngx_quic_handle_reset_stream_frame(ngx_connection_t *c,
-    ngx_quic_header_t *pkt, ngx_quic_reset_stream_frame_t *f)
-{
-    ngx_event_t            *rev;
-    ngx_connection_t       *sc;
-    ngx_quic_stream_t      *sn;
-    ngx_quic_connection_t  *qc;
-
-    qc = ngx_quic_get_connection(c);
-
-    if ((f->id & NGX_QUIC_STREAM_UNIDIRECTIONAL)
-        && (f->id & NGX_QUIC_STREAM_SERVER_INITIATED))
-    {
-        qc->error = NGX_QUIC_ERR_STREAM_STATE_ERROR;
-        return NGX_ERROR;
-    }
-
-    sn = ngx_quic_find_stream(&qc->streams.tree, f->id);
-
-    if (sn == NULL) {
-        sn = ngx_quic_create_client_stream(c, f->id);
-
-        if (sn == NULL) {
-            return NGX_ERROR;
-        }
-
-        if (sn == NGX_QUIC_STREAM_GONE) {
-            return NGX_OK;
-        }
-
-        sc = sn->c;
-
-        rev = sc->read;
-        rev->error = 1;
-        rev->ready = 1;
-
-        sc->listening->handler(sc);
-
-        return NGX_OK;
-    }
-
-    rev = sn->c->read;
-    rev->error = 1;
-    rev->ready = 1;
-
-    if (rev->active) {
-        rev->handler(rev);
-    }
-
-    return NGX_OK;
-}
-
-
-static ngx_int_t
-ngx_quic_handle_stop_sending_frame(ngx_connection_t *c,
-    ngx_quic_header_t *pkt, ngx_quic_stop_sending_frame_t *f)
-{
-    ngx_event_t            *wev;
-    ngx_connection_t       *sc;
-    ngx_quic_stream_t      *sn;
-    ngx_quic_connection_t  *qc;
-
-    qc = ngx_quic_get_connection(c);
-
-    if ((f->id & NGX_QUIC_STREAM_UNIDIRECTIONAL)
-        && (f->id & NGX_QUIC_STREAM_SERVER_INITIATED) == 0)
-    {
-        qc->error = NGX_QUIC_ERR_STREAM_STATE_ERROR;
-        return NGX_ERROR;
-    }
-
-    sn = ngx_quic_find_stream(&qc->streams.tree, f->id);
-
-    if (sn == NULL) {
-        sn = ngx_quic_create_client_stream(c, f->id);
-
-        if (sn == NULL) {
-            return NGX_ERROR;
-        }
-
-        if (sn == NGX_QUIC_STREAM_GONE) {
-            return NGX_OK;
-        }
-
-        sc = sn->c;
-
-        wev = sc->write;
-        wev->error = 1;
-        wev->ready = 1;
-
-        sc->listening->handler(sc);
-
-        return NGX_OK;
-    }
-
-    wev = sn->c->write;
-    wev->error = 1;
-    wev->ready = 1;
-
-    if (wev->active) {
-        wev->handler(wev);
-    }
-
-    return NGX_OK;
-}
-
-
-static ngx_int_t
-ngx_quic_handle_max_streams_frame(ngx_connection_t *c,
-    ngx_quic_header_t *pkt, ngx_quic_max_streams_frame_t *f)
-{
-    ngx_quic_connection_t  *qc;
-
-    qc = ngx_quic_get_connection(c);
-
-    if (f->bidi) {
-        if (qc->streams.server_max_streams_bidi < f->limit) {
-            qc->streams.server_max_streams_bidi = f->limit;
-
-            ngx_log_debug1(NGX_LOG_DEBUG_EVENT, c->log, 0,
-                           "quic max_streams_bidi:%uL", f->limit);
-        }
-
-    } else {
-        if (qc->streams.server_max_streams_uni < f->limit) {
-            qc->streams.server_max_streams_uni = f->limit;
-
-            ngx_log_debug1(NGX_LOG_DEBUG_EVENT, c->log, 0,
-                           "quic max_streams_uni:%uL", f->limit);
-        }
-    }
-
-    return NGX_OK;
-}
-
-
-static ngx_int_t
-ngx_quic_output(ngx_connection_t *c)
-{
-    off_t                   max;
-    size_t                  len, min, in_flight;
-    ssize_t                 n;
-    u_char                 *p;
-    ngx_uint_t              i, pad;
-    ngx_quic_send_ctx_t    *ctx;
-    ngx_quic_congestion_t  *cg;
-    ngx_quic_connection_t  *qc;
-    static u_char           dst[NGX_QUIC_MAX_UDP_PAYLOAD_SIZE];
-
-    c->log->action = "sending frames";
-
-    qc = ngx_quic_get_connection(c);
-    cg = &qc->congestion;
-
-    in_flight = cg->in_flight;
-
-    for ( ;; ) {
-        p = dst;
-
-        len = ngx_min(qc->ctp.max_udp_payload_size,
-                      NGX_QUIC_MAX_UDP_PAYLOAD_SIZE);
-
-        if (!qc->validated) {
-            max = qc->received * 3;
-            max = (c->sent >= max) ? 0 : max - c->sent;
-            len = ngx_min(len, (size_t) max);
-        }
-
-        pad = ngx_quic_get_padding_level(c);
-
-        for (i = 0; i < NGX_QUIC_SEND_CTX_LAST; i++) {
-
-            ctx = &qc->send_ctx[i];
-
-            if (ngx_quic_generate_ack(c, ctx) != NGX_OK) {
-                return NGX_ERROR;
-            }
-
-            min = (i == pad && p - dst < NGX_QUIC_MIN_INITIAL_SIZE)
-                  ? NGX_QUIC_MIN_INITIAL_SIZE - (p - dst) : 0;
-
-            n = ngx_quic_output_packet(c, ctx, p, len, min);
-            if (n == NGX_ERROR) {
-                return NGX_ERROR;
-            }
-
-            p += n;
-            len -= n;
-        }
-
-        len = p - dst;
-        if (len == 0) {
-            break;
-        }
-
-        n = ngx_quic_send(c, dst, len);
-        if (n == NGX_ERROR) {
-            return NGX_ERROR;
-        }
-    }
-
-    if (in_flight != cg->in_flight && !qc->send_timer_set && !qc->closing) {
-        qc->send_timer_set = 1;
-        ngx_add_timer(c->read, qc->tp.max_idle_timeout);
-    }
-
-    ngx_quic_set_lost_timer(c);
-
-    return NGX_OK;
-}
-
-
-static ngx_uint_t
-ngx_quic_get_padding_level(ngx_connection_t *c)
-{
-    ngx_queue_t            *q;
-    ngx_quic_frame_t       *f;
-    ngx_quic_send_ctx_t    *ctx;
-    ngx_quic_connection_t  *qc;
-
-    /*
-     * 14.1.  Initial Datagram Size
-     *
-     * Similarly, a server MUST expand the payload of all UDP datagrams
-     * carrying ack-eliciting Initial packets to at least the smallest
-     * allowed maximum datagram size of 1200 bytes
-     */
-
-    qc = ngx_quic_get_connection(c);
-    ctx = ngx_quic_get_send_ctx(qc, ssl_encryption_initial);
-
-    for (q = ngx_queue_head(&ctx->frames);
-         q != ngx_queue_sentinel(&ctx->frames);
-         q = ngx_queue_next(q))
-    {
-        f = ngx_queue_data(q, ngx_quic_frame_t, queue);
-
-        if (f->need_ack) {
-            ctx = ngx_quic_get_send_ctx(qc, ssl_encryption_handshake);
-
-            if (ngx_queue_empty(&ctx->frames)) {
-                return 0;
-            }
-
-            return 1;
-        }
-    }
-
-    return NGX_QUIC_SEND_CTX_LAST;
-}
-
-
-static ngx_int_t
-ngx_quic_generate_ack(ngx_connection_t *c, ngx_quic_send_ctx_t *ctx)
-{
-    ngx_msec_t              delay;
-    ngx_quic_connection_t  *qc;
-
-    if (!ctx->send_ack) {
-        return NGX_OK;
-    }
-
-    if (ctx->level == ssl_encryption_application)  {
-
-        delay = ngx_current_msec - ctx->ack_delay_start;
-        qc = ngx_quic_get_connection(c);
-
-        if (ctx->send_ack < NGX_QUIC_MAX_ACK_GAP
-            && delay < qc->tp.max_ack_delay)
-        {
-            if (!qc->push.timer_set && !qc->closing) {
-                ngx_add_timer(&qc->push,
-                              qc->tp.max_ack_delay - delay);
-            }
-
-            return NGX_OK;
-        }
-    }
-
-    if (ngx_quic_send_ack(c, ctx) != NGX_OK) {
-        return NGX_ERROR;
-    }
-
-    ctx->send_ack = 0;
-
-    return NGX_OK;
-}
-
-
-static ssize_t
-ngx_quic_output_packet(ngx_connection_t *c, ngx_quic_send_ctx_t *ctx,
-    u_char *data, size_t max, size_t min)
-{
-    size_t                  len, hlen, pad_len;
-    u_char                 *p;
-    ssize_t                 flen;
-    ngx_str_t               out, res;
-    ngx_int_t               rc;
-    ngx_uint_t              nframes;
-    ngx_msec_t              now;
-    ngx_queue_t            *q;
-    ngx_quic_frame_t       *f;
-    ngx_quic_header_t       pkt;
-    ngx_quic_congestion_t  *cg;
-    ngx_quic_connection_t  *qc;
-    static u_char           src[NGX_QUIC_MAX_UDP_PAYLOAD_SIZE];
-
-    if (ngx_queue_empty(&ctx->frames)) {
-        return 0;
-    }
-
-    ngx_log_debug3(NGX_LOG_DEBUG_EVENT, c->log, 0,
-                   "quic output %s packet max:%uz min:%uz",
-                   ngx_quic_level_name(ctx->level), max, min);
-
-    qc = ngx_quic_get_connection(c);
-    cg = &qc->congestion;
-
-    hlen = (ctx->level == ssl_encryption_application)
-           ? NGX_QUIC_MAX_SHORT_HEADER
-           : NGX_QUIC_MAX_LONG_HEADER;
-
-    hlen += EVP_GCM_TLS_TAG_LEN;
-    hlen -= NGX_QUIC_MAX_CID_LEN - qc->scid.len;
-
-    ngx_memzero(&pkt, sizeof(ngx_quic_header_t));
-
-    now = ngx_current_msec;
-    nframes = 0;
-    p = src;
-    len = 0;
-
-    for (q = ngx_queue_head(&ctx->frames);
-         q != ngx_queue_sentinel(&ctx->frames);
-         q = ngx_queue_next(q))
-    {
-        f = ngx_queue_data(q, ngx_quic_frame_t, queue);
-
-        if (!pkt.need_ack && f->need_ack && max > cg->window) {
-            max = cg->window;
-        }
-
-        if (hlen + len >= max) {
-            break;
-        }
-
-        if (hlen + len + f->len > max) {
-            rc = ngx_quic_split_frame(c, f, max - hlen - len);
-
-            if (rc == NGX_ERROR) {
-                return NGX_ERROR;
-            }
-
-            if (rc == NGX_DECLINED) {
-                break;
-            }
-        }
-
-        if (f->need_ack) {
-            pkt.need_ack = 1;
-        }
-
-        ngx_quic_log_frame(c->log, f, 1);
-
-        flen = ngx_quic_create_frame(p, f);
-        if (flen == -1) {
-            return NGX_ERROR;
-        }
-
-        len += flen;
-        p += flen;
-
-        f->pnum = ctx->pnum;
-        f->first = now;
-        f->last = now;
-        f->plen = 0;
-
-        nframes++;
-
-        if (f->flush) {
-            break;
-        }
-    }
-
-    if (nframes == 0) {
-        return 0;
-    }
-
-    out.data = src;
-    out.len = len;
-
-    pkt.keys = qc->keys;
-    pkt.flags = NGX_QUIC_PKT_FIXED_BIT;
-
-    if (ctx->level == ssl_encryption_initial) {
-        pkt.flags |= NGX_QUIC_PKT_LONG | NGX_QUIC_PKT_INITIAL;
-
-    } else if (ctx->level == ssl_encryption_handshake) {
-        pkt.flags |= NGX_QUIC_PKT_LONG | NGX_QUIC_PKT_HANDSHAKE;
-
-    } else {
-        if (qc->key_phase) {
-            pkt.flags |= NGX_QUIC_PKT_KPHASE;
-        }
-    }
-
-    ngx_quic_set_packet_number(&pkt, ctx);
-
-    pkt.version = qc->version;
-    pkt.log = c->log;
-    pkt.level = ctx->level;
-    pkt.dcid = qc->scid;
-    pkt.scid = qc->dcid;
-
-    pad_len = 4;
-
-    if (min) {
-        hlen = EVP_GCM_TLS_TAG_LEN
-               + ngx_quic_create_header(&pkt, NULL, out.len, NULL);
-
-        if (min > hlen + pad_len) {
-            pad_len = min - hlen;
-        }
-    }
-
-    if (out.len < pad_len) {
-        ngx_memset(p, NGX_QUIC_FT_PADDING, pad_len - out.len);
-        out.len = pad_len;
-    }
-
-    pkt.payload = out;
-
-    res.data = data;
-
-    ngx_log_debug6(NGX_LOG_DEBUG_EVENT, c->log, 0,
-                   "quic packet tx %s bytes:%ui"
-                   " need_ack:%d number:%L encoded nl:%d trunc:0x%xD",
-                   ngx_quic_level_name(ctx->level), out.len, pkt.need_ack,
-                   pkt.number, pkt.num_len, pkt.trunc);
-
-    if (ngx_quic_encrypt(&pkt, &res) != NGX_OK) {
-        return NGX_ERROR;
-    }
-
-    ctx->pnum++;
-
-    if (pkt.need_ack) {
-        /* move frames into the sent queue to wait for ack */
-
-        if (!qc->closing) {
-            q = ngx_queue_head(&ctx->frames);
-            f = ngx_queue_data(q, ngx_quic_frame_t, queue);
-            f->plen = res.len;
-
-            do {
-                q = ngx_queue_head(&ctx->frames);
-                ngx_queue_remove(q);
-                ngx_queue_insert_tail(&ctx->sent, q);
-            } while (--nframes);
-        }
-
-        cg->in_flight += res.len;
-
-        ngx_log_debug1(NGX_LOG_DEBUG_EVENT, c->log, 0,
-                       "quic congestion send if:%uz", cg->in_flight);
-    }
-
-    while (nframes--) {
-        q = ngx_queue_head(&ctx->frames);
-        f = ngx_queue_data(q, ngx_quic_frame_t, queue);
-
-        ngx_queue_remove(q);
-        ngx_quic_free_frame(c, f);
-    }
-
-    return res.len;
-}
-
-
-static ssize_t
-ngx_quic_send(ngx_connection_t *c, u_char *buf, size_t len)
-{
-    ngx_buf_t    b;
-    ngx_chain_t  cl, *res;
-
-    ngx_memzero(&b, sizeof(ngx_buf_t));
-
-    b.pos = b.start = buf;
-    b.last = b.end = buf + len;
-    b.last_buf = 1;
-    b.temporary = 1;
-
-    cl.buf = &b;
-    cl.next= NULL;
-
-    res = c->send_chain(c, &cl, 0);
-    if (res == NGX_CHAIN_ERROR) {
-        return NGX_ERROR;
-    }
-
-    return len;
-}
-
-
-static void
-ngx_quic_set_packet_number(ngx_quic_header_t *pkt, ngx_quic_send_ctx_t *ctx)
-{
-    uint64_t  delta;
-
-    delta = ctx->pnum - ctx->largest_ack;
-    pkt->number = ctx->pnum;
-
-    if (delta <= 0x7F) {
-        pkt->num_len = 1;
-        pkt->trunc = ctx->pnum & 0xff;
-
-    } else if (delta <= 0x7FFF) {
-        pkt->num_len = 2;
-        pkt->flags |= 0x1;
-        pkt->trunc = ctx->pnum & 0xffff;
-
-    } else if (delta <= 0x7FFFFF) {
-        pkt->num_len = 3;
-        pkt->flags |= 0x2;
-        pkt->trunc = ctx->pnum & 0xffffff;
-
-    } else {
-        pkt->num_len = 4;
-        pkt->flags |= 0x3;
-        pkt->trunc = ctx->pnum & 0xffffffff;
-    }
-}
-
-
-static void
-ngx_quic_pto_handler(ngx_event_t *ev)
-{
-    ngx_uint_t              i;
-    ngx_msec_t              now;
-    ngx_queue_t            *q, *next;
-    ngx_connection_t       *c;
-    ngx_quic_frame_t       *f;
-    ngx_quic_send_ctx_t    *ctx;
-    ngx_quic_connection_t  *qc;
-
-    ngx_log_debug0(NGX_LOG_DEBUG_EVENT, ev->log, 0, "quic pto timer");
-
-    c = ev->data;
-    qc = ngx_quic_get_connection(c);
-    now = ngx_current_msec;
-
-    for (i = 0; i < NGX_QUIC_SEND_CTX_LAST; i++) {
-
-        ctx = &qc->send_ctx[i];
-
-        if (ngx_queue_empty(&ctx->sent)) {
-            continue;
-        }
-
-        q = ngx_queue_head(&ctx->sent);
-        f = ngx_queue_data(q, ngx_quic_frame_t, queue);
-
-        if (f->pnum <= ctx->largest_ack
-            && ctx->largest_ack != NGX_QUIC_UNSET_PN)
-        {
-            continue;
-        }
-
-        if ((ngx_msec_int_t) (f->last + ngx_quic_pto(c, ctx) - now) > 0) {
-            continue;
-        }
-
-        ngx_log_debug2(NGX_LOG_DEBUG_EVENT, c->log, 0,
-                       "quic pto %s pto_count:%ui",
-                       ngx_quic_level_name(ctx->level), qc->pto_count);
-
-        for (q = ngx_queue_head(&ctx->frames);
-             q != ngx_queue_sentinel(&ctx->frames);
-             /* void */)
-        {
-            next = ngx_queue_next(q);
-            f = ngx_queue_data(q, ngx_quic_frame_t, queue);
-
-            if (f->type == NGX_QUIC_FT_PING) {
-                ngx_queue_remove(q);
-                ngx_quic_free_frame(c, f);
-            }
-
-            q = next;
-        }
-
-        for (q = ngx_queue_head(&ctx->sent);
-             q != ngx_queue_sentinel(&ctx->sent);
-             /* void */)
-        {
-            next = ngx_queue_next(q);
-            f = ngx_queue_data(q, ngx_quic_frame_t, queue);
-
-            if (f->type == NGX_QUIC_FT_PING) {
-                ngx_quic_congestion_lost(c, f);
-                ngx_queue_remove(q);
-                ngx_quic_free_frame(c, f);
-            }
-
-            q = next;
-        }
-
-        /* enforce 2 udp datagrams */
-
-        f = ngx_quic_alloc_frame(c);
-        if (f == NULL) {
-            break;
-        }
-
-        f->level = ctx->level;
-        f->type = NGX_QUIC_FT_PING;
-        f->flush = 1;
-
-        ngx_quic_queue_frame(qc, f);
-
-        f = ngx_quic_alloc_frame(c);
-        if (f == NULL) {
-            break;
-        }
-
-        f->level = ctx->level;
-        f->type = NGX_QUIC_FT_PING;
-
-        ngx_quic_queue_frame(qc, f);
-    }
-
-    qc->pto_count++;
-
-    ngx_quic_connstate_dbg(c);
-}
-
-
-static void
-ngx_quic_push_handler(ngx_event_t *ev)
-{
-    ngx_connection_t  *c;
-
-    ngx_log_debug0(NGX_LOG_DEBUG_EVENT, ev->log, 0, "quic push timer");
-
-    c = ev->data;
-
-    if (ngx_quic_output(c) != NGX_OK) {
-        ngx_quic_close_connection(c, NGX_ERROR);
-        return;
-    }
-
-    ngx_quic_connstate_dbg(c);
-}
-
-
-static
-void ngx_quic_lost_handler(ngx_event_t *ev)
-{
-    ngx_connection_t  *c;
-
-    ngx_log_debug0(NGX_LOG_DEBUG_EVENT, ev->log, 0, "quic lost timer");
-
-    c = ev->data;
-
-    if (ngx_quic_detect_lost(c) != NGX_OK) {
-        ngx_quic_close_connection(c, NGX_ERROR);
-    }
-
-    ngx_quic_connstate_dbg(c);
-}
-
-
-static ngx_int_t
-ngx_quic_detect_lost(ngx_connection_t *c)
-{
-    ngx_uint_t              i;
-    ngx_msec_t              now, wait, thr;
-    ngx_queue_t            *q;
-    ngx_quic_frame_t       *start;
-    ngx_quic_send_ctx_t    *ctx;
-    ngx_quic_connection_t  *qc;
-
-    qc = ngx_quic_get_connection(c);
-    now = ngx_current_msec;
-    thr = ngx_quic_lost_threshold(qc);
-
-    for (i = 0; i < NGX_QUIC_SEND_CTX_LAST; i++) {
-
-        ctx = &qc->send_ctx[i];
-
-        if (ctx->largest_ack == NGX_QUIC_UNSET_PN) {
-            continue;
-        }
-
-        while (!ngx_queue_empty(&ctx->sent)) {
-
-            q = ngx_queue_head(&ctx->sent);
-            start = ngx_queue_data(q, ngx_quic_frame_t, queue);
-
-            if (start->pnum > ctx->largest_ack) {
-                break;
-            }
-
-            wait = start->last + thr - now;
-
-            ngx_log_debug4(NGX_LOG_DEBUG_EVENT, c->log, 0,
-                           "quic detect_lost pnum:%uL thr:%M wait:%i level:%d",
-                           start->pnum, thr, (ngx_int_t) wait, start->level);
-
-            if ((ngx_msec_int_t) wait > 0
-                && ctx->largest_ack - start->pnum < NGX_QUIC_PKT_THR)
-            {
-                break;
-            }
-
-            ngx_quic_resend_frames(c, ctx);
-        }
-    }
-
-    ngx_quic_set_lost_timer(c);
-
-    return NGX_OK;
-}
-
-
-static void
-ngx_quic_set_lost_timer(ngx_connection_t *c)
-{
-    ngx_uint_t              i;
+    size_t                  len, hlen, pad_len;
+    u_char                 *p;
+    ssize_t                 flen;
+    ngx_str_t               out, res;
+    ngx_int_t               rc;
+    ngx_uint_t              nframes;
     ngx_msec_t              now;
     ngx_queue_t            *q;
-    ngx_msec_int_t          lost, pto, w;
     ngx_quic_frame_t       *f;
-    ngx_quic_send_ctx_t    *ctx;
+    ngx_quic_header_t       pkt;
+    ngx_quic_congestion_t  *cg;
     ngx_quic_connection_t  *qc;
+    static u_char           src[NGX_QUIC_MAX_UDP_PAYLOAD_SIZE];
 
-    qc = ngx_quic_get_connection(c);
-    now = ngx_current_msec;
-
-    lost = -1;
-    pto = -1;
-
-    for (i = 0; i < NGX_QUIC_SEND_CTX_LAST; i++) {
-        ctx = &qc->send_ctx[i];
-
-        if (ngx_queue_empty(&ctx->sent)) {
-            continue;
-        }
-
-        if (ctx->largest_ack != NGX_QUIC_UNSET_PN) {
-            q = ngx_queue_head(&ctx->sent);
-            f = ngx_queue_data(q, ngx_quic_frame_t, queue);
-            w = (ngx_msec_int_t) (f->last + ngx_quic_lost_threshold(qc) - now);
-
-            if (f->pnum <= ctx->largest_ack) {
-                if (w < 0 || ctx->largest_ack - f->pnum >= NGX_QUIC_PKT_THR) {
-                    w = 0;
-                }
-
-                if (lost == -1 || w < lost) {
-                    lost = w;
-                }
-            }
-        }
-
-        q = ngx_queue_last(&ctx->sent);
-        f = ngx_queue_data(q, ngx_quic_frame_t, queue);
-        w = (ngx_msec_int_t) (f->last + ngx_quic_pto(c, ctx) - now);
-
-        if (w < 0) {
-            w = 0;
-        }
-
-        if (pto == -1 || w < pto) {
-            pto = w;
-        }
-    }
-
-    if (qc->pto.timer_set) {
-        ngx_del_timer(&qc->pto);
-    }
-
-    if (lost != -1) {
-        ngx_log_debug1(NGX_LOG_DEBUG_EVENT, c->log, 0,
-                       "quic lost timer lost:%M", lost);
-
-        qc->pto.handler = ngx_quic_lost_handler;
-        ngx_add_timer(&qc->pto, lost);
-        return;
-    }
-
-    if (pto != -1) {
-        ngx_log_debug1(NGX_LOG_DEBUG_EVENT, c->log, 0,
-                       "quic lost timer pto:%M", pto);
-
-        qc->pto.handler = ngx_quic_pto_handler;
-        ngx_add_timer(&qc->pto, pto);
-        return;
+    if (ngx_queue_empty(&ctx->frames)) {
+        return 0;
     }
 
-    ngx_log_debug0(NGX_LOG_DEBUG_EVENT, c->log, 0, "quic lost timer unset");
-}
+    ngx_log_debug3(NGX_LOG_DEBUG_EVENT, c->log, 0,
+                   "quic output %s packet max:%uz min:%uz",
+                   ngx_quic_level_name(ctx->level), max, min);
 
+    qc = ngx_quic_get_connection(c);
+    cg = &qc->congestion;
 
-static void
-ngx_quic_resend_frames(ngx_connection_t *c, ngx_quic_send_ctx_t *ctx)
-{
-    size_t                  n;
-    ngx_buf_t              *b;
-    ngx_queue_t            *q;
-    ngx_quic_frame_t       *f, *start;
-    ngx_quic_stream_t      *sn;
-    ngx_quic_connection_t  *qc;
+    hlen = (ctx->level == ssl_encryption_application)
+           ? NGX_QUIC_MAX_SHORT_HEADER
+           : NGX_QUIC_MAX_LONG_HEADER;
 
-    qc = ngx_quic_get_connection(c);
-    q = ngx_queue_head(&ctx->sent);
-    start = ngx_queue_data(q, ngx_quic_frame_t, queue);
+    hlen += EVP_GCM_TLS_TAG_LEN;
+    hlen -= NGX_QUIC_MAX_CID_LEN - qc->scid.len;
 
-    ngx_log_debug1(NGX_LOG_DEBUG_EVENT, c->log, 0,
-                   "quic resend packet pnum:%uL", start->pnum);
+    ngx_memzero(&pkt, sizeof(ngx_quic_header_t));
 
-    ngx_quic_congestion_lost(c, start);
+    now = ngx_current_msec;
+    nframes = 0;
+    p = src;
+    len = 0;
 
-    do {
+    for (q = ngx_queue_head(&ctx->frames);
+         q != ngx_queue_sentinel(&ctx->frames);
+         q = ngx_queue_next(q))
+    {
         f = ngx_queue_data(q, ngx_quic_frame_t, queue);
 
-        if (f->pnum != start->pnum) {
-            break;
+        if (!pkt.need_ack && f->need_ack && max > cg->window) {
+            max = cg->window;
         }
 
-        q = ngx_queue_next(q);
-
-        ngx_queue_remove(&f->queue);
-
-        switch (f->type) {
-        case NGX_QUIC_FT_ACK:
-        case NGX_QUIC_FT_ACK_ECN:
-            if (ctx->level == ssl_encryption_application) {
-                /* force generation of most recent acknowledgment */
-                ctx->send_ack = NGX_QUIC_MAX_ACK_GAP;
-            }
-
-            ngx_quic_free_frame(c, f);
-            break;
-
-        case NGX_QUIC_FT_PING:
-        case NGX_QUIC_FT_PATH_RESPONSE:
-        case NGX_QUIC_FT_CONNECTION_CLOSE:
-            ngx_quic_free_frame(c, f);
-            break;
-
-        case NGX_QUIC_FT_MAX_DATA:
-            f->u.max_data.max_data = qc->streams.recv_max_data;
-            ngx_quic_queue_frame(qc, f);
-            break;
-
-        case NGX_QUIC_FT_MAX_STREAMS:
-        case NGX_QUIC_FT_MAX_STREAMS2:
-            f->u.max_streams.limit = f->u.max_streams.bidi
-                                     ? qc->streams.client_max_streams_bidi
-                                     : qc->streams.client_max_streams_uni;
-            ngx_quic_queue_frame(qc, f);
+        if (hlen + len >= max) {
             break;
+        }
 
-        case NGX_QUIC_FT_MAX_STREAM_DATA:
-            sn = ngx_quic_find_stream(&qc->streams.tree,
-                                      f->u.max_stream_data.id);
-            if (sn == NULL) {
-                ngx_quic_free_frame(c, f);
-                break;
-            }
-
-            b = sn->b;
-            n = sn->fs.received + (b->pos - b->start) + (b->end - b->last);
+        if (hlen + len + f->len > max) {
+            rc = ngx_quic_split_frame(c, f, max - hlen - len);
 
-            if (f->u.max_stream_data.limit < n) {
-                f->u.max_stream_data.limit = n;
+            if (rc == NGX_ERROR) {
+                return NGX_ERROR;
             }
 
-            ngx_quic_queue_frame(qc, f);
-            break;
-
-        case NGX_QUIC_FT_STREAM0:
-        case NGX_QUIC_FT_STREAM1:
-        case NGX_QUIC_FT_STREAM2:
-        case NGX_QUIC_FT_STREAM3:
-        case NGX_QUIC_FT_STREAM4:
-        case NGX_QUIC_FT_STREAM5:
-        case NGX_QUIC_FT_STREAM6:
-        case NGX_QUIC_FT_STREAM7:
-            sn = ngx_quic_find_stream(&qc->streams.tree, f->u.stream.stream_id);
-
-            if (sn && sn->c->write->error) {
-                /* RESET_STREAM was sent */
-                ngx_quic_free_frame(c, f);
+            if (rc == NGX_DECLINED) {
                 break;
             }
-
-            /* fall through */
-
-        default:
-            ngx_queue_insert_tail(&ctx->frames, &f->queue);
         }
 
-    } while (q != ngx_queue_sentinel(&ctx->sent));
-
-    if (qc->closing) {
-        return;
-    }
-
-    ngx_post_event(&qc->push, &ngx_posted_events);
-}
-
-
-ngx_connection_t *
-ngx_quic_open_stream(ngx_connection_t *c, ngx_uint_t bidi)
-{
-    size_t                  rcvbuf_size;
-    uint64_t                id;
-    ngx_quic_stream_t      *qs, *sn;
-    ngx_quic_connection_t  *qc;
-
-    qs = c->quic;
-    qc = ngx_quic_get_connection(qs->parent);
-
-    if (bidi) {
-        if (qc->streams.server_streams_bidi
-            >= qc->streams.server_max_streams_bidi)
-        {
-            ngx_log_debug1(NGX_LOG_DEBUG_EVENT, c->log, 0,
-                           "quic too many server bidi streams:%uL",
-                           qc->streams.server_streams_bidi);
-            return NULL;
+        if (f->need_ack) {
+            pkt.need_ack = 1;
         }
 
-        id = (qc->streams.server_streams_bidi << 2)
-             | NGX_QUIC_STREAM_SERVER_INITIATED;
-
-        ngx_log_debug3(NGX_LOG_DEBUG_EVENT, c->log, 0,
-                       "quic creating server bidi stream"
-                       " streams:%uL max:%uL id:0x%xL",
-                       qc->streams.server_streams_bidi,
-                       qc->streams.server_max_streams_bidi, id);
-
-        qc->streams.server_streams_bidi++;
-        rcvbuf_size = qc->tp.initial_max_stream_data_bidi_local;
+        ngx_quic_log_frame(c->log, f, 1);
 
-    } else {
-        if (qc->streams.server_streams_uni
-            >= qc->streams.server_max_streams_uni)
-        {
-            ngx_log_debug1(NGX_LOG_DEBUG_EVENT, c->log, 0,
-                           "quic too many server uni streams:%uL",
-                           qc->streams.server_streams_uni);
-            return NULL;
+        flen = ngx_quic_create_frame(p, f);
+        if (flen == -1) {
+            return NGX_ERROR;
         }
 
-        id = (qc->streams.server_streams_uni << 2)
-             | NGX_QUIC_STREAM_SERVER_INITIATED
-             | NGX_QUIC_STREAM_UNIDIRECTIONAL;
-
-        ngx_log_debug3(NGX_LOG_DEBUG_EVENT, c->log, 0,
-                       "quic creating server uni stream"
-                       " streams:%uL max:%uL id:0x%xL",
-                       qc->streams.server_streams_uni,
-                       qc->streams.server_max_streams_uni, id);
-
-        qc->streams.server_streams_uni++;
-        rcvbuf_size = 0;
-    }
-
-    sn = ngx_quic_create_stream(qs->parent, id, rcvbuf_size);
-    if (sn == NULL) {
-        return NULL;
-    }
-
-    return sn->c;
-}
-
-
-static void
-ngx_quic_rbtree_insert_stream(ngx_rbtree_node_t *temp,
-    ngx_rbtree_node_t *node, ngx_rbtree_node_t *sentinel)
-{
-    ngx_rbtree_node_t  **p;
-    ngx_quic_stream_t   *qn, *qnt;
+        len += flen;
+        p += flen;
 
-    for ( ;; ) {
-        qn = (ngx_quic_stream_t *) node;
-        qnt = (ngx_quic_stream_t *) temp;
+        f->pnum = ctx->pnum;
+        f->first = now;
+        f->last = now;
+        f->plen = 0;
 
-        p = (qn->id < qnt->id) ? &temp->left : &temp->right;
+        nframes++;
 
-        if (*p == sentinel) {
+        if (f->flush) {
             break;
         }
-
-        temp = *p;
     }
 
-    *p = node;
-    node->parent = temp;
-    node->left = sentinel;
-    node->right = sentinel;
-    ngx_rbt_red(node);
-}
+    if (nframes == 0) {
+        return 0;
+    }
 
+    out.data = src;
+    out.len = len;
 
-static ngx_quic_stream_t *
-ngx_quic_find_stream(ngx_rbtree_t *rbtree, uint64_t id)
-{
-    ngx_rbtree_node_t  *node, *sentinel;
-    ngx_quic_stream_t  *qn;
+    pkt.keys = qc->keys;
+    pkt.flags = NGX_QUIC_PKT_FIXED_BIT;
 
-    node = rbtree->root;
-    sentinel = rbtree->sentinel;
+    if (ctx->level == ssl_encryption_initial) {
+        pkt.flags |= NGX_QUIC_PKT_LONG | NGX_QUIC_PKT_INITIAL;
 
-    while (node != sentinel) {
-        qn = (ngx_quic_stream_t *) node;
+    } else if (ctx->level == ssl_encryption_handshake) {
+        pkt.flags |= NGX_QUIC_PKT_LONG | NGX_QUIC_PKT_HANDSHAKE;
 
-        if (id == qn->id) {
-            return qn;
+    } else {
+        if (qc->key_phase) {
+            pkt.flags |= NGX_QUIC_PKT_KPHASE;
         }
-
-        node = (id < qn->id) ? node->left : node->right;
     }
 
-    return NULL;
-}
-
+    ngx_quic_set_packet_number(&pkt, ctx);
 
-static ngx_quic_stream_t *
-ngx_quic_create_client_stream(ngx_connection_t *c, uint64_t id)
-{
-    size_t                  n;
-    uint64_t                min_id;
-    ngx_quic_stream_t      *sn;
-    ngx_quic_connection_t  *qc;
+    pkt.version = qc->version;
+    pkt.log = c->log;
+    pkt.level = ctx->level;
+    pkt.dcid = qc->scid;
+    pkt.scid = qc->dcid;
 
-    ngx_log_debug1(NGX_LOG_DEBUG_EVENT, c->log, 0,
-                   "quic stream id:0x%xL is new", id);
+    pad_len = 4;
 
-    qc = ngx_quic_get_connection(c);
+    if (min) {
+        hlen = EVP_GCM_TLS_TAG_LEN
+               + ngx_quic_create_header(&pkt, NULL, out.len, NULL);
 
-    if (qc->shutdown) {
-        return NGX_QUIC_STREAM_GONE;
+        if (min > hlen + pad_len) {
+            pad_len = min - hlen;
+        }
     }
 
-    if (id & NGX_QUIC_STREAM_UNIDIRECTIONAL) {
-
-        if (id & NGX_QUIC_STREAM_SERVER_INITIATED) {
-            if ((id >> 2) < qc->streams.server_streams_uni) {
-                return NGX_QUIC_STREAM_GONE;
-            }
-
-            qc->error = NGX_QUIC_ERR_STREAM_STATE_ERROR;
-            return NULL;
-        }
+    if (out.len < pad_len) {
+        ngx_memset(p, NGX_QUIC_FT_PADDING, pad_len - out.len);
+        out.len = pad_len;
+    }
 
-        if ((id >> 2) < qc->streams.client_streams_uni) {
-            return NGX_QUIC_STREAM_GONE;
-        }
+    pkt.payload = out;
 
-        if ((id >> 2) >= qc->streams.client_max_streams_uni) {
-            qc->error = NGX_QUIC_ERR_STREAM_LIMIT_ERROR;
-            return NULL;
-        }
+    res.data = data;
 
-        min_id = (qc->streams.client_streams_uni << 2)
-                 | NGX_QUIC_STREAM_UNIDIRECTIONAL;
-        qc->streams.client_streams_uni = (id >> 2) + 1;
-        n = qc->tp.initial_max_stream_data_uni;
+    ngx_log_debug6(NGX_LOG_DEBUG_EVENT, c->log, 0,
+                   "quic packet tx %s bytes:%ui"
+                   " need_ack:%d number:%L encoded nl:%d trunc:0x%xD",
+                   ngx_quic_level_name(ctx->level), out.len, pkt.need_ack,
+                   pkt.number, pkt.num_len, pkt.trunc);
 
-    } else {
+    if (ngx_quic_encrypt(&pkt, &res) != NGX_OK) {
+        return NGX_ERROR;
+    }
 
-        if (id & NGX_QUIC_STREAM_SERVER_INITIATED) {
-            if ((id >> 2) < qc->streams.server_streams_bidi) {
-                return NGX_QUIC_STREAM_GONE;
-            }
+    ctx->pnum++;
 
-            qc->error = NGX_QUIC_ERR_STREAM_STATE_ERROR;
-            return NULL;
-        }
+    if (pkt.need_ack) {
+        /* move frames into the sent queue to wait for ack */
 
-        if ((id >> 2) < qc->streams.client_streams_bidi) {
-            return NGX_QUIC_STREAM_GONE;
-        }
+        if (!qc->closing) {
+            q = ngx_queue_head(&ctx->frames);
+            f = ngx_queue_data(q, ngx_quic_frame_t, queue);
+            f->plen = res.len;
 
-        if ((id >> 2) >= qc->streams.client_max_streams_bidi) {
-            qc->error = NGX_QUIC_ERR_STREAM_LIMIT_ERROR;
-            return NULL;
+            do {
+                q = ngx_queue_head(&ctx->frames);
+                ngx_queue_remove(q);
+                ngx_queue_insert_tail(&ctx->sent, q);
+            } while (--nframes);
         }
 
-        min_id = (qc->streams.client_streams_bidi << 2);
-        qc->streams.client_streams_bidi = (id >> 2) + 1;
-        n = qc->tp.initial_max_stream_data_bidi_remote;
-    }
+        cg->in_flight += res.len;
 
-    if (n < NGX_QUIC_STREAM_BUFSIZE) {
-        n = NGX_QUIC_STREAM_BUFSIZE;
+        ngx_log_debug1(NGX_LOG_DEBUG_EVENT, c->log, 0,
+                       "quic congestion send if:%uz", cg->in_flight);
     }
 
-    /*
-     *   2.1.  Stream Types and Identifiers
-     *
-     *   Within each type, streams are created with numerically increasing
-     *   stream IDs.  A stream ID that is used out of order results in all
-     *   streams of that type with lower-numbered stream IDs also being
-     *   opened.
-     */
-
-    for ( /* void */ ; min_id < id; min_id += 0x04) {
-
-        sn = ngx_quic_create_stream(c, min_id, n);
-        if (sn == NULL) {
-            return NULL;
-        }
-
-        sn->c->listening->handler(sn->c);
+    while (nframes--) {
+        q = ngx_queue_head(&ctx->frames);
+        f = ngx_queue_data(q, ngx_quic_frame_t, queue);
 
-        if (qc->shutdown) {
-            return NGX_QUIC_STREAM_GONE;
-        }
+        ngx_queue_remove(q);
+        ngx_quic_free_frame(c, f);
     }
 
-    return ngx_quic_create_stream(c, id, n);
+    return res.len;
 }
 
 
-static ngx_quic_stream_t *
-ngx_quic_create_stream(ngx_connection_t *c, uint64_t id, size_t rcvbuf_size)
+static ssize_t
+ngx_quic_send(ngx_connection_t *c, u_char *buf, size_t len)
 {
-    ngx_log_t              *log;
-    ngx_pool_t             *pool;
-    ngx_quic_stream_t      *sn;
-    ngx_pool_cleanup_t     *cln;
-    ngx_quic_connection_t  *qc;
+    ngx_buf_t    b;
+    ngx_chain_t  cl, *res;
 
-    ngx_log_debug1(NGX_LOG_DEBUG_EVENT, c->log, 0,
-                   "quic stream id:0x%xL create", id);
+    ngx_memzero(&b, sizeof(ngx_buf_t));
 
-    qc = ngx_quic_get_connection(c);
+    b.pos = b.start = buf;
+    b.last = b.end = buf + len;
+    b.last_buf = 1;
+    b.temporary = 1;
 
-    pool = ngx_create_pool(NGX_DEFAULT_POOL_SIZE, c->log);
-    if (pool == NULL) {
-        return NULL;
-    }
+    cl.buf = &b;
+    cl.next= NULL;
 
-    sn = ngx_pcalloc(pool, sizeof(ngx_quic_stream_t));
-    if (sn == NULL) {
-        ngx_destroy_pool(pool);
-        return NULL;
+    res = c->send_chain(c, &cl, 0);
+    if (res == NGX_CHAIN_ERROR) {
+        return NGX_ERROR;
     }
 
-    sn->node.key = id;
-    sn->parent = c;
-    sn->id = id;
+    return len;
+}
 
-    sn->b = ngx_create_temp_buf(pool, rcvbuf_size);
-    if (sn->b == NULL) {
-        ngx_destroy_pool(pool);
-        return NULL;
-    }
 
-    ngx_queue_init(&sn->fs.frames);
+static void
+ngx_quic_set_packet_number(ngx_quic_header_t *pkt, ngx_quic_send_ctx_t *ctx)
+{
+    uint64_t  delta;
+
+    delta = ctx->pnum - ctx->largest_ack;
+    pkt->number = ctx->pnum;
 
-    log = ngx_palloc(pool, sizeof(ngx_log_t));
-    if (log == NULL) {
-        ngx_destroy_pool(pool);
-        return NULL;
-    }
+    if (delta <= 0x7F) {
+        pkt->num_len = 1;
+        pkt->trunc = ctx->pnum & 0xff;
 
-    *log = *c->log;
-    pool->log = log;
+    } else if (delta <= 0x7FFF) {
+        pkt->num_len = 2;
+        pkt->flags |= 0x1;
+        pkt->trunc = ctx->pnum & 0xffff;
 
-    sn->c = ngx_get_connection(-1, log);
-    if (sn->c == NULL) {
-        ngx_destroy_pool(pool);
-        return NULL;
+    } else if (delta <= 0x7FFFFF) {
+        pkt->num_len = 3;
+        pkt->flags |= 0x2;
+        pkt->trunc = ctx->pnum & 0xffffff;
+
+    } else {
+        pkt->num_len = 4;
+        pkt->flags |= 0x3;
+        pkt->trunc = ctx->pnum & 0xffffffff;
     }
+}
 
-    sn->c->quic = sn;
-    sn->c->type = SOCK_STREAM;
-    sn->c->pool = pool;
-    sn->c->ssl = c->ssl;
-    sn->c->sockaddr = c->sockaddr;
-    sn->c->listening = c->listening;
-    sn->c->addr_text = c->addr_text;
-    sn->c->local_sockaddr = c->local_sockaddr;
-    sn->c->local_socklen = c->local_socklen;
-    sn->c->number = ngx_atomic_fetch_add(ngx_connection_counter, 1);
 
-    sn->c->recv = ngx_quic_stream_recv;
-    sn->c->send = ngx_quic_stream_send;
-    sn->c->send_chain = ngx_quic_stream_send_chain;
+static void
+ngx_quic_pto_handler(ngx_event_t *ev)
+{
+    ngx_uint_t              i;
+    ngx_msec_t              now;
+    ngx_queue_t            *q, *next;
+    ngx_connection_t       *c;
+    ngx_quic_frame_t       *f;
+    ngx_quic_send_ctx_t    *ctx;
+    ngx_quic_connection_t  *qc;
 
-    sn->c->read->log = log;
-    sn->c->write->log = log;
+    ngx_log_debug0(NGX_LOG_DEBUG_EVENT, ev->log, 0, "quic pto timer");
 
-    log->connection = sn->c->number;
+    c = ev->data;
+    qc = ngx_quic_get_connection(c);
+    now = ngx_current_msec;
 
-    if ((id & NGX_QUIC_STREAM_UNIDIRECTIONAL) == 0
-        || (id & NGX_QUIC_STREAM_SERVER_INITIATED))
-    {
-        sn->c->write->ready = 1;
-    }
+    for (i = 0; i < NGX_QUIC_SEND_CTX_LAST; i++) {
 
-    if (id & NGX_QUIC_STREAM_UNIDIRECTIONAL) {
-        if (id & NGX_QUIC_STREAM_SERVER_INITIATED) {
-            sn->send_max_data = qc->ctp.initial_max_stream_data_uni;
-        }
+        ctx = &qc->send_ctx[i];
 
-    } else {
-        if (id & NGX_QUIC_STREAM_SERVER_INITIATED) {
-            sn->send_max_data = qc->ctp.initial_max_stream_data_bidi_remote;
-        } else {
-            sn->send_max_data = qc->ctp.initial_max_stream_data_bidi_local;
+        if (ngx_queue_empty(&ctx->sent)) {
+            continue;
         }
-    }
-
-    cln = ngx_pool_cleanup_add(pool, 0);
-    if (cln == NULL) {
-        ngx_close_connection(sn->c);
-        ngx_destroy_pool(pool);
-        return NULL;
-    }
 
-    cln->handler = ngx_quic_stream_cleanup_handler;
-    cln->data = sn->c;
+        q = ngx_queue_head(&ctx->sent);
+        f = ngx_queue_data(q, ngx_quic_frame_t, queue);
 
-    ngx_rbtree_insert(&qc->streams.tree, &sn->node);
+        if (f->pnum <= ctx->largest_ack
+            && ctx->largest_ack != NGX_QUIC_UNSET_PN)
+        {
+            continue;
+        }
 
-    return sn;
-}
+        if ((ngx_msec_int_t) (f->last + ngx_quic_pto(c, ctx) - now) > 0) {
+            continue;
+        }
 
+        ngx_log_debug2(NGX_LOG_DEBUG_EVENT, c->log, 0,
+                       "quic pto %s pto_count:%ui",
+                       ngx_quic_level_name(ctx->level), qc->pto_count);
 
-static ssize_t
-ngx_quic_stream_recv(ngx_connection_t *c, u_char *buf, size_t size)
-{
-    ssize_t                 len;
-    ngx_buf_t              *b;
-    ngx_event_t            *rev;
-    ngx_connection_t       *pc;
-    ngx_quic_frame_t       *frame;
-    ngx_quic_stream_t      *qs;
-    ngx_quic_connection_t  *qc;
+        for (q = ngx_queue_head(&ctx->frames);
+             q != ngx_queue_sentinel(&ctx->frames);
+             /* void */)
+        {
+            next = ngx_queue_next(q);
+            f = ngx_queue_data(q, ngx_quic_frame_t, queue);
 
-    qs = c->quic;
-    b = qs->b;
-    pc = qs->parent;
-    qc = ngx_quic_get_connection(pc);
-    rev = c->read;
+            if (f->type == NGX_QUIC_FT_PING) {
+                ngx_queue_remove(q);
+                ngx_quic_free_frame(c, f);
+            }
 
-    if (rev->error) {
-        return NGX_ERROR;
-    }
+            q = next;
+        }
 
-    ngx_log_debug3(NGX_LOG_DEBUG_EVENT, c->log, 0,
-                   "quic stream recv id:0x%xL eof:%d avail:%z",
-                   qs->id, rev->pending_eof, b->last - b->pos);
+        for (q = ngx_queue_head(&ctx->sent);
+             q != ngx_queue_sentinel(&ctx->sent);
+             /* void */)
+        {
+            next = ngx_queue_next(q);
+            f = ngx_queue_data(q, ngx_quic_frame_t, queue);
 
-    if (b->pos == b->last) {
-        rev->ready = 0;
+            if (f->type == NGX_QUIC_FT_PING) {
+                ngx_quic_congestion_lost(c, f);
+                ngx_queue_remove(q);
+                ngx_quic_free_frame(c, f);
+            }
 
-        if (rev->pending_eof) {
-            rev->eof = 1;
-            return 0;
+            q = next;
         }
 
-        ngx_log_debug1(NGX_LOG_DEBUG_EVENT, c->log, 0,
-                       "quic stream id:0x%xL recv() not ready", qs->id);
-        return NGX_AGAIN;
-    }
-
-    len = ngx_min(b->last - b->pos, (ssize_t) size);
-
-    ngx_memcpy(buf, b->pos, len);
+        /* enforce 2 udp datagrams */
 
-    b->pos += len;
-    qc->streams.received += len;
+        f = ngx_quic_alloc_frame(c);
+        if (f == NULL) {
+            break;
+        }
 
-    if (b->pos == b->last) {
-        b->pos = b->start;
-        b->last = b->start;
-        rev->ready = rev->pending_eof;
-    }
+        f->level = ctx->level;
+        f->type = NGX_QUIC_FT_PING;
+        f->flush = 1;
 
-    ngx_log_debug3(NGX_LOG_DEBUG_EVENT, c->log, 0,
-                   "quic stream id:0x%xL recv len:%z of size:%uz",
-                   qs->id, len, size);
+        ngx_quic_queue_frame(qc, f);
 
-    if (!rev->pending_eof) {
-        frame = ngx_quic_alloc_frame(pc);
-        if (frame == NULL) {
-            return NGX_ERROR;
+        f = ngx_quic_alloc_frame(c);
+        if (f == NULL) {
+            break;
         }
 
-        frame->level = ssl_encryption_application;
-        frame->type = NGX_QUIC_FT_MAX_STREAM_DATA;
-        frame->u.max_stream_data.id = qs->id;
-        frame->u.max_stream_data.limit = qs->fs.received + (b->pos - b->start)
-                                         + (b->end - b->last);
+        f->level = ctx->level;
+        f->type = NGX_QUIC_FT_PING;
 
-        ngx_quic_queue_frame(qc, frame);
+        ngx_quic_queue_frame(qc, f);
     }
 
-    if ((qc->streams.recv_max_data / 2) < qc->streams.received) {
-
-        frame = ngx_quic_alloc_frame(pc);
+    qc->pto_count++;
 
-        if (frame == NULL) {
-            return NGX_ERROR;
-        }
+    ngx_quic_connstate_dbg(c);
+}
 
-        qc->streams.recv_max_data *= 2;
 
-        frame->level = ssl_encryption_application;
-        frame->type = NGX_QUIC_FT_MAX_DATA;
-        frame->u.max_data.max_data = qc->streams.recv_max_data;
+static void
+ngx_quic_push_handler(ngx_event_t *ev)
+{
+    ngx_connection_t  *c;
 
-        ngx_quic_queue_frame(qc, frame);
+    ngx_log_debug0(NGX_LOG_DEBUG_EVENT, ev->log, 0, "quic push timer");
 
-        ngx_log_debug2(NGX_LOG_DEBUG_EVENT, c->log, 0,
-                       "quic stream id:0x%xL recv: increased max_data:%uL",
-                       qs->id, qc->streams.recv_max_data);
+    c = ev->data;
+
+    if (ngx_quic_output(c) != NGX_OK) {
+        ngx_quic_close_connection(c, NGX_ERROR);
+        return;
     }
 
-    return len;
+    ngx_quic_connstate_dbg(c);
 }
 
 
-static ssize_t
-ngx_quic_stream_send(ngx_connection_t *c, u_char *buf, size_t size)
+static
+void ngx_quic_lost_handler(ngx_event_t *ev)
 {
-    ngx_buf_t    b;
-    ngx_chain_t  cl;
-
-    ngx_memzero(&b, sizeof(ngx_buf_t));
-
-    b.memory = 1;
-    b.pos = buf;
-    b.last = buf + size;
+    ngx_connection_t  *c;
 
-    cl.buf = &b;
-    cl.next = NULL;
+    ngx_log_debug0(NGX_LOG_DEBUG_EVENT, ev->log, 0, "quic lost timer");
 
-    if (ngx_quic_stream_send_chain(c, &cl, 0) == NGX_CHAIN_ERROR) {
-        return NGX_ERROR;
-    }
+    c = ev->data;
 
-    if (b.pos == buf) {
-        return NGX_AGAIN;
+    if (ngx_quic_detect_lost(c) != NGX_OK) {
+        ngx_quic_close_connection(c, NGX_ERROR);
     }
 
-    return b.pos - buf;
+    ngx_quic_connstate_dbg(c);
 }
 
 
-static ngx_chain_t *
-ngx_quic_stream_send_chain(ngx_connection_t *c, ngx_chain_t *in, off_t limit)
+static ngx_int_t
+ngx_quic_detect_lost(ngx_connection_t *c)
 {
-    size_t                  n, flow;
-    ngx_event_t            *wev;
-    ngx_chain_t            *cl;
-    ngx_connection_t       *pc;
-    ngx_quic_frame_t       *frame;
-    ngx_quic_stream_t      *qs;
+    ngx_uint_t              i;
+    ngx_msec_t              now, wait, thr;
+    ngx_queue_t            *q;
+    ngx_quic_frame_t       *start;
+    ngx_quic_send_ctx_t    *ctx;
     ngx_quic_connection_t  *qc;
 
-    qs = c->quic;
-    pc = qs->parent;
-    qc = ngx_quic_get_connection(pc);
-    wev = c->write;
-
-    if (wev->error) {
-        return NGX_CHAIN_ERROR;
-    }
-
-    flow = ngx_quic_max_stream_flow(c);
-    if (flow == 0) {
-        wev->ready = 0;
-        return in;
-    }
+    qc = ngx_quic_get_connection(c);
+    now = ngx_current_msec;
+    thr = ngx_quic_lost_threshold(qc);
 
-    n = (limit && (size_t) limit < flow) ? (size_t) limit : flow;
+    for (i = 0; i < NGX_QUIC_SEND_CTX_LAST; i++) {
 
-    frame = ngx_quic_alloc_frame(pc);
-    if (frame == NULL) {
-        return NGX_CHAIN_ERROR;
-    }
+        ctx = &qc->send_ctx[i];
 
-    frame->data = ngx_quic_copy_chain(pc, in, n);
-    if (frame->data == NGX_CHAIN_ERROR) {
-        return NGX_CHAIN_ERROR;
-    }
+        if (ctx->largest_ack == NGX_QUIC_UNSET_PN) {
+            continue;
+        }
 
-    for (n = 0, cl = frame->data; cl; cl = cl->next) {
-        n += ngx_buf_size(cl->buf);
-    }
+        while (!ngx_queue_empty(&ctx->sent)) {
 
-    while (in && ngx_buf_size(in->buf) == 0) {
-        in = in->next;
-    }
+            q = ngx_queue_head(&ctx->sent);
+            start = ngx_queue_data(q, ngx_quic_frame_t, queue);
 
-    frame->level = ssl_encryption_application;
-    frame->type = NGX_QUIC_FT_STREAM6; /* OFF=1 LEN=1 FIN=0 */
-    frame->u.stream.off = 1;
-    frame->u.stream.len = 1;
-    frame->u.stream.fin = 0;
+            if (start->pnum > ctx->largest_ack) {
+                break;
+            }
 
-    frame->u.stream.type = frame->type;
-    frame->u.stream.stream_id = qs->id;
-    frame->u.stream.offset = c->sent;
-    frame->u.stream.length = n;
+            wait = start->last + thr - now;
 
-    c->sent += n;
-    qc->streams.sent += n;
+            ngx_log_debug4(NGX_LOG_DEBUG_EVENT, c->log, 0,
+                           "quic detect_lost pnum:%uL thr:%M wait:%i level:%d",
+                           start->pnum, thr, (ngx_int_t) wait, start->level);
 
-    ngx_quic_queue_frame(qc, frame);
+            if ((ngx_msec_int_t) wait > 0
+                && ctx->largest_ack - start->pnum < NGX_QUIC_PKT_THR)
+            {
+                break;
+            }
 
-    wev->ready = (n < flow) ? 1 : 0;
+            ngx_quic_resend_frames(c, ctx);
+        }
+    }
 
-    ngx_log_debug1(NGX_LOG_DEBUG_EVENT, c->log, 0,
-                   "quic send_chain sent:%uz", n);
+    ngx_quic_set_lost_timer(c);
 
-    return in;
+    return NGX_OK;
 }
 
 
-static size_t
-ngx_quic_max_stream_flow(ngx_connection_t *c)
+static void
+ngx_quic_set_lost_timer(ngx_connection_t *c)
 {
-    size_t                  size;
-    uint64_t                sent, unacked;
-    ngx_quic_stream_t      *qs;
+    ngx_uint_t              i;
+    ngx_msec_t              now;
+    ngx_queue_t            *q;
+    ngx_msec_int_t          lost, pto, w;
+    ngx_quic_frame_t       *f;
+    ngx_quic_send_ctx_t    *ctx;
     ngx_quic_connection_t  *qc;
 
-    qs = c->quic;
-    qc = ngx_quic_get_connection(qs->parent);
+    qc = ngx_quic_get_connection(c);
+    now = ngx_current_msec;
 
-    size = NGX_QUIC_STREAM_BUFSIZE;
-    sent = c->sent;
-    unacked = sent - qs->acked;
+    lost = -1;
+    pto = -1;
 
-    if (qc->streams.send_max_data == 0) {
-        qc->streams.send_max_data = qc->ctp.initial_max_data;
-    }
+    for (i = 0; i < NGX_QUIC_SEND_CTX_LAST; i++) {
+        ctx = &qc->send_ctx[i];
 
-    if (unacked >= NGX_QUIC_STREAM_BUFSIZE) {
-        ngx_log_debug0(NGX_LOG_DEBUG_EVENT, c->log, 0,
-                       "quic send flow hit buffer size");
-        return 0;
-    }
+        if (ngx_queue_empty(&ctx->sent)) {
+            continue;
+        }
 
-    if (unacked + size > NGX_QUIC_STREAM_BUFSIZE) {
-        size = NGX_QUIC_STREAM_BUFSIZE - unacked;
-    }
+        if (ctx->largest_ack != NGX_QUIC_UNSET_PN) {
+            q = ngx_queue_head(&ctx->sent);
+            f = ngx_queue_data(q, ngx_quic_frame_t, queue);
+            w = (ngx_msec_int_t) (f->last + ngx_quic_lost_threshold(qc) - now);
 
-    if (qc->streams.sent >= qc->streams.send_max_data) {
-        ngx_log_debug0(NGX_LOG_DEBUG_EVENT, c->log, 0,
-                       "quic send flow hit MAX_DATA");
-        return 0;
-    }
+            if (f->pnum <= ctx->largest_ack) {
+                if (w < 0 || ctx->largest_ack - f->pnum >= NGX_QUIC_PKT_THR) {
+                    w = 0;
+                }
+
+                if (lost == -1 || w < lost) {
+                    lost = w;
+                }
+            }
+        }
+
+        q = ngx_queue_last(&ctx->sent);
+        f = ngx_queue_data(q, ngx_quic_frame_t, queue);
+        w = (ngx_msec_int_t) (f->last + ngx_quic_pto(c, ctx) - now);
+
+        if (w < 0) {
+            w = 0;
+        }
 
-    if (qc->streams.sent + size > qc->streams.send_max_data) {
-        size = qc->streams.send_max_data - qc->streams.sent;
+        if (pto == -1 || w < pto) {
+            pto = w;
+        }
     }
 
-    if (sent >= qs->send_max_data) {
-        ngx_log_debug0(NGX_LOG_DEBUG_EVENT, c->log, 0,
-                       "quic send flow hit MAX_STREAM_DATA");
-        return 0;
+    if (qc->pto.timer_set) {
+        ngx_del_timer(&qc->pto);
     }
 
-    if (sent + size > qs->send_max_data) {
-        size = qs->send_max_data - sent;
+    if (lost != -1) {
+        ngx_log_debug1(NGX_LOG_DEBUG_EVENT, c->log, 0,
+                       "quic lost timer lost:%M", lost);
+
+        qc->pto.handler = ngx_quic_lost_handler;
+        ngx_add_timer(&qc->pto, lost);
+        return;
     }
 
-    ngx_log_debug1(NGX_LOG_DEBUG_EVENT, c->log, 0,
-                   "quic send flow:%uz", size);
+    if (pto != -1) {
+        ngx_log_debug1(NGX_LOG_DEBUG_EVENT, c->log, 0,
+                       "quic lost timer pto:%M", pto);
+
+        qc->pto.handler = ngx_quic_pto_handler;
+        ngx_add_timer(&qc->pto, pto);
+        return;
+    }
 
-    return size;
+    ngx_log_debug0(NGX_LOG_DEBUG_EVENT, c->log, 0, "quic lost timer unset");
 }
 
 
 static void
-ngx_quic_stream_cleanup_handler(void *data)
+ngx_quic_resend_frames(ngx_connection_t *c, ngx_quic_send_ctx_t *ctx)
 {
-    ngx_connection_t *c = data;
-
-    ngx_connection_t       *pc;
-    ngx_quic_frame_t       *frame;
-    ngx_quic_stream_t      *qs;
+    size_t                  n;
+    ngx_buf_t              *b;
+    ngx_queue_t            *q;
+    ngx_quic_frame_t       *f, *start;
+    ngx_quic_stream_t      *sn;
     ngx_quic_connection_t  *qc;
 
-    qs = c->quic;
-    pc = qs->parent;
-    qc = ngx_quic_get_connection(pc);
+    qc = ngx_quic_get_connection(c);
+    q = ngx_queue_head(&ctx->sent);
+    start = ngx_queue_data(q, ngx_quic_frame_t, queue);
 
     ngx_log_debug1(NGX_LOG_DEBUG_EVENT, c->log, 0,
-                   "quic stream id:0x%xL cleanup", qs->id);
+                   "quic resend packet pnum:%uL", start->pnum);
 
-    ngx_rbtree_delete(&qc->streams.tree, &qs->node);
-    ngx_quic_free_frames(pc, &qs->fs.frames);
+    ngx_quic_congestion_lost(c, start);
 
-    if (qc->closing) {
-        /* schedule handler call to continue ngx_quic_close_connection() */
-        ngx_post_event(pc->read, &ngx_posted_events);
-        return;
-    }
+    do {
+        f = ngx_queue_data(q, ngx_quic_frame_t, queue);
 
-    if ((qs->id & NGX_QUIC_STREAM_SERVER_INITIATED) == 0
-        || (qs->id & NGX_QUIC_STREAM_UNIDIRECTIONAL) == 0)
-    {
-        if (!c->read->pending_eof && !c->read->error) {
-            frame = ngx_quic_alloc_frame(pc);
-            if (frame == NULL) {
-                goto done;
-            }
+        if (f->pnum != start->pnum) {
+            break;
+        }
 
-            frame->level = ssl_encryption_application;
-            frame->type = NGX_QUIC_FT_STOP_SENDING;
-            frame->u.stop_sending.id = qs->id;
-            frame->u.stop_sending.error_code = 0x100; /* HTTP/3 no error */
+        q = ngx_queue_next(q);
 
-            ngx_quic_queue_frame(qc, frame);
-        }
-    }
+        ngx_queue_remove(&f->queue);
 
-    if ((qs->id & NGX_QUIC_STREAM_SERVER_INITIATED) == 0) {
-        frame = ngx_quic_alloc_frame(pc);
-        if (frame == NULL) {
-            goto done;
-        }
+        switch (f->type) {
+        case NGX_QUIC_FT_ACK:
+        case NGX_QUIC_FT_ACK_ECN:
+            if (ctx->level == ssl_encryption_application) {
+                /* force generation of most recent acknowledgment */
+                ctx->send_ack = NGX_QUIC_MAX_ACK_GAP;
+            }
 
-        frame->level = ssl_encryption_application;
-        frame->type = NGX_QUIC_FT_MAX_STREAMS;
+            ngx_quic_free_frame(c, f);
+            break;
 
-        if (qs->id & NGX_QUIC_STREAM_UNIDIRECTIONAL) {
-            frame->u.max_streams.limit = ++qc->streams.client_max_streams_uni;
-            frame->u.max_streams.bidi = 0;
+        case NGX_QUIC_FT_PING:
+        case NGX_QUIC_FT_PATH_RESPONSE:
+        case NGX_QUIC_FT_CONNECTION_CLOSE:
+            ngx_quic_free_frame(c, f);
+            break;
 
-        } else {
-            frame->u.max_streams.limit = ++qc->streams.client_max_streams_bidi;
-            frame->u.max_streams.bidi = 1;
-        }
+        case NGX_QUIC_FT_MAX_DATA:
+            f->u.max_data.max_data = qc->streams.recv_max_data;
+            ngx_quic_queue_frame(qc, f);
+            break;
 
-        ngx_quic_queue_frame(qc, frame);
+        case NGX_QUIC_FT_MAX_STREAMS:
+        case NGX_QUIC_FT_MAX_STREAMS2:
+            f->u.max_streams.limit = f->u.max_streams.bidi
+                                     ? qc->streams.client_max_streams_bidi
+                                     : qc->streams.client_max_streams_uni;
+            ngx_quic_queue_frame(qc, f);
+            break;
 
-        if (qs->id & NGX_QUIC_STREAM_UNIDIRECTIONAL) {
-            /* do not send fin for client unidirectional streams */
-            goto done;
-        }
-    }
+        case NGX_QUIC_FT_MAX_STREAM_DATA:
+            sn = ngx_quic_find_stream(&qc->streams.tree,
+                                      f->u.max_stream_data.id);
+            if (sn == NULL) {
+                ngx_quic_free_frame(c, f);
+                break;
+            }
 
-    if (c->write->error) {
-        goto done;
-    }
+            b = sn->b;
+            n = sn->fs.received + (b->pos - b->start) + (b->end - b->last);
 
-    ngx_log_debug1(NGX_LOG_DEBUG_EVENT, c->log, 0,
-                   "quic stream id:0x%xL send fin", qs->id);
+            if (f->u.max_stream_data.limit < n) {
+                f->u.max_stream_data.limit = n;
+            }
 
-    frame = ngx_quic_alloc_frame(pc);
-    if (frame == NULL) {
-        goto done;
-    }
+            ngx_quic_queue_frame(qc, f);
+            break;
 
-    frame->level = ssl_encryption_application;
-    frame->type = NGX_QUIC_FT_STREAM7; /* OFF=1 LEN=1 FIN=1 */
-    frame->u.stream.off = 1;
-    frame->u.stream.len = 1;
-    frame->u.stream.fin = 1;
+        case NGX_QUIC_FT_STREAM0:
+        case NGX_QUIC_FT_STREAM1:
+        case NGX_QUIC_FT_STREAM2:
+        case NGX_QUIC_FT_STREAM3:
+        case NGX_QUIC_FT_STREAM4:
+        case NGX_QUIC_FT_STREAM5:
+        case NGX_QUIC_FT_STREAM6:
+        case NGX_QUIC_FT_STREAM7:
+            sn = ngx_quic_find_stream(&qc->streams.tree, f->u.stream.stream_id);
 
-    frame->u.stream.type = frame->type;
-    frame->u.stream.stream_id = qs->id;
-    frame->u.stream.offset = c->sent;
-    frame->u.stream.length = 0;
+            if (sn && sn->c->write->error) {
+                /* RESET_STREAM was sent */
+                ngx_quic_free_frame(c, f);
+                break;
+            }
 
-    ngx_quic_queue_frame(qc, frame);
+            /* fall through */
 
-done:
+        default:
+            ngx_queue_insert_tail(&ctx->frames, &f->queue);
+        }
 
-    (void) ngx_quic_output(pc);
+    } while (q != ngx_queue_sentinel(&ctx->sent));
 
-    if (qc->shutdown) {
-        ngx_quic_shutdown_quic(pc);
+    if (qc->closing) {
+        return;
     }
+
+    ngx_post_event(&qc->push, &ngx_posted_events);
 }
 
 
-static void
+void
 ngx_quic_shutdown_quic(ngx_connection_t *c)
 {
     ngx_rbtree_t           *tree;
index a14bd65b4b316d206a73a22f3766b382c0539cb6..17bc96435da6c66924279b11df8402756b0d8aec 100644 (file)
@@ -19,6 +19,7 @@ typedef struct ngx_quic_connection_s  ngx_quic_connection_t;
 #include <ngx_event_quic_frames.h>
 #include <ngx_event_quic_migration.h>
 #include <ngx_event_quic_connid.h>
+#include <ngx_event_quic_streams.h>
 
 
 #define NGX_QUIC_MAX_SHORT_HEADER            25 /* 1 flags + 20 dcid + 4 pn */
@@ -225,6 +226,9 @@ ngx_msec_t ngx_quic_pto(ngx_connection_t *c, ngx_quic_send_ctx_t *ctx);
 ngx_int_t ngx_quic_new_sr_token(ngx_connection_t *c, ngx_str_t *cid,
     u_char *secret, u_char *token);
 
+ngx_int_t ngx_quic_output(ngx_connection_t *c);
+void ngx_quic_shutdown_quic(ngx_connection_t *c);
+
 /********************************* DEBUG *************************************/
 
 /* #define NGX_QUIC_DEBUG_PACKETS */      /* dump packet contents */
diff --git a/src/event/quic/ngx_event_quic_streams.c b/src/event/quic/ngx_event_quic_streams.c
new file mode 100644 (file)
index 0000000..90d6bce
--- /dev/null
@@ -0,0 +1,1268 @@
+
+/*
+ * Copyright (C) Nginx, Inc.
+ */
+
+
+#include <ngx_config.h>
+#include <ngx_core.h>
+#include <ngx_event.h>
+#include <ngx_event_quic_transport.h>
+#include <ngx_event_quic_connection.h>
+#include <ngx_event_quic_streams.h>
+
+
+#define NGX_QUIC_STREAM_GONE     (void *) -1
+
+
+static ngx_quic_stream_t *ngx_quic_create_client_stream(ngx_connection_t *c,
+    uint64_t id);
+static ngx_quic_stream_t *ngx_quic_create_stream(ngx_connection_t *c,
+    uint64_t id, size_t rcvbuf_size);
+static ssize_t ngx_quic_stream_recv(ngx_connection_t *c, u_char *buf,
+    size_t size);
+static ssize_t ngx_quic_stream_send(ngx_connection_t *c, u_char *buf,
+    size_t size);
+static ngx_chain_t *ngx_quic_stream_send_chain(ngx_connection_t *c,
+    ngx_chain_t *in, off_t limit);
+static size_t ngx_quic_max_stream_flow(ngx_connection_t *c);
+static void ngx_quic_stream_cleanup_handler(void *data);
+
+
+ngx_connection_t *
+ngx_quic_open_stream(ngx_connection_t *c, ngx_uint_t bidi)
+{
+    size_t                  rcvbuf_size;
+    uint64_t                id;
+    ngx_quic_stream_t      *qs, *sn;
+    ngx_quic_connection_t  *qc;
+
+    qs = c->quic;
+    qc = ngx_quic_get_connection(qs->parent);
+
+    if (bidi) {
+        if (qc->streams.server_streams_bidi
+            >= qc->streams.server_max_streams_bidi)
+        {
+            ngx_log_debug1(NGX_LOG_DEBUG_EVENT, c->log, 0,
+                           "quic too many server bidi streams:%uL",
+                           qc->streams.server_streams_bidi);
+            return NULL;
+        }
+
+        id = (qc->streams.server_streams_bidi << 2)
+             | NGX_QUIC_STREAM_SERVER_INITIATED;
+
+        ngx_log_debug3(NGX_LOG_DEBUG_EVENT, c->log, 0,
+                       "quic creating server bidi stream"
+                       " streams:%uL max:%uL id:0x%xL",
+                       qc->streams.server_streams_bidi,
+                       qc->streams.server_max_streams_bidi, id);
+
+        qc->streams.server_streams_bidi++;
+        rcvbuf_size = qc->tp.initial_max_stream_data_bidi_local;
+
+    } else {
+        if (qc->streams.server_streams_uni
+            >= qc->streams.server_max_streams_uni)
+        {
+            ngx_log_debug1(NGX_LOG_DEBUG_EVENT, c->log, 0,
+                           "quic too many server uni streams:%uL",
+                           qc->streams.server_streams_uni);
+            return NULL;
+        }
+
+        id = (qc->streams.server_streams_uni << 2)
+             | NGX_QUIC_STREAM_SERVER_INITIATED
+             | NGX_QUIC_STREAM_UNIDIRECTIONAL;
+
+        ngx_log_debug3(NGX_LOG_DEBUG_EVENT, c->log, 0,
+                       "quic creating server uni stream"
+                       " streams:%uL max:%uL id:0x%xL",
+                       qc->streams.server_streams_uni,
+                       qc->streams.server_max_streams_uni, id);
+
+        qc->streams.server_streams_uni++;
+        rcvbuf_size = 0;
+    }
+
+    sn = ngx_quic_create_stream(qs->parent, id, rcvbuf_size);
+    if (sn == NULL) {
+        return NULL;
+    }
+
+    return sn->c;
+}
+
+
+void
+ngx_quic_rbtree_insert_stream(ngx_rbtree_node_t *temp,
+    ngx_rbtree_node_t *node, ngx_rbtree_node_t *sentinel)
+{
+    ngx_rbtree_node_t  **p;
+    ngx_quic_stream_t   *qn, *qnt;
+
+    for ( ;; ) {
+        qn = (ngx_quic_stream_t *) node;
+        qnt = (ngx_quic_stream_t *) temp;
+
+        p = (qn->id < qnt->id) ? &temp->left : &temp->right;
+
+        if (*p == sentinel) {
+            break;
+        }
+
+        temp = *p;
+    }
+
+    *p = node;
+    node->parent = temp;
+    node->left = sentinel;
+    node->right = sentinel;
+    ngx_rbt_red(node);
+}
+
+
+ngx_quic_stream_t *
+ngx_quic_find_stream(ngx_rbtree_t *rbtree, uint64_t id)
+{
+    ngx_rbtree_node_t  *node, *sentinel;
+    ngx_quic_stream_t  *qn;
+
+    node = rbtree->root;
+    sentinel = rbtree->sentinel;
+
+    while (node != sentinel) {
+        qn = (ngx_quic_stream_t *) node;
+
+        if (id == qn->id) {
+            return qn;
+        }
+
+        node = (id < qn->id) ? node->left : node->right;
+    }
+
+    return NULL;
+}
+
+
+ngx_int_t
+ngx_quic_close_streams(ngx_connection_t *c, ngx_quic_connection_t *qc)
+{
+    ngx_event_t        *rev, *wev;
+    ngx_rbtree_t       *tree;
+    ngx_rbtree_node_t  *node;
+    ngx_quic_stream_t  *qs;
+
+#if (NGX_DEBUG)
+    ngx_uint_t          ns;
+#endif
+
+    tree = &qc->streams.tree;
+
+    if (tree->root == tree->sentinel) {
+        return NGX_OK;
+    }
+
+#if (NGX_DEBUG)
+    ns = 0;
+#endif
+
+    for (node = ngx_rbtree_min(tree->root, tree->sentinel);
+         node;
+         node = ngx_rbtree_next(tree, node))
+    {
+        qs = (ngx_quic_stream_t *) node;
+
+        rev = qs->c->read;
+        rev->error = 1;
+        rev->ready = 1;
+
+        wev = qs->c->write;
+        wev->error = 1;
+        wev->ready = 1;
+
+        ngx_post_event(rev, &ngx_posted_events);
+
+        if (rev->timer_set) {
+            ngx_del_timer(rev);
+        }
+
+#if (NGX_DEBUG)
+        ns++;
+#endif
+    }
+
+    ngx_log_debug1(NGX_LOG_DEBUG_EVENT, c->log, 0,
+                   "quic connection has %ui active streams", ns);
+
+    return NGX_AGAIN;
+}
+
+
+ngx_int_t
+ngx_quic_reset_stream(ngx_connection_t *c, ngx_uint_t err)
+{
+    ngx_event_t            *wev;
+    ngx_connection_t       *pc;
+    ngx_quic_frame_t       *frame;
+    ngx_quic_stream_t      *qs;
+    ngx_quic_connection_t  *qc;
+
+    qs = c->quic;
+    pc = qs->parent;
+    qc = ngx_quic_get_connection(pc);
+
+    frame = ngx_quic_alloc_frame(pc);
+    if (frame == NULL) {
+        return NGX_ERROR;
+    }
+
+    frame->level = ssl_encryption_application;
+    frame->type = NGX_QUIC_FT_RESET_STREAM;
+    frame->u.reset_stream.id = qs->id;
+    frame->u.reset_stream.error_code = err;
+    frame->u.reset_stream.final_size = c->sent;
+
+    ngx_quic_queue_frame(qc, frame);
+
+    wev = c->write;
+    wev->error = 1;
+    wev->ready = 1;
+
+    return NGX_OK;
+}
+
+
+static ngx_quic_stream_t *
+ngx_quic_create_client_stream(ngx_connection_t *c, uint64_t id)
+{
+    size_t                  n;
+    uint64_t                min_id;
+    ngx_quic_stream_t      *sn;
+    ngx_quic_connection_t  *qc;
+
+    ngx_log_debug1(NGX_LOG_DEBUG_EVENT, c->log, 0,
+                   "quic stream id:0x%xL is new", id);
+
+    qc = ngx_quic_get_connection(c);
+
+    if (qc->shutdown) {
+        return NGX_QUIC_STREAM_GONE;
+    }
+
+    if (id & NGX_QUIC_STREAM_UNIDIRECTIONAL) {
+
+        if (id & NGX_QUIC_STREAM_SERVER_INITIATED) {
+            if ((id >> 2) < qc->streams.server_streams_uni) {
+                return NGX_QUIC_STREAM_GONE;
+            }
+
+            qc->error = NGX_QUIC_ERR_STREAM_STATE_ERROR;
+            return NULL;
+        }
+
+        if ((id >> 2) < qc->streams.client_streams_uni) {
+            return NGX_QUIC_STREAM_GONE;
+        }
+
+        if ((id >> 2) >= qc->streams.client_max_streams_uni) {
+            qc->error = NGX_QUIC_ERR_STREAM_LIMIT_ERROR;
+            return NULL;
+        }
+
+        min_id = (qc->streams.client_streams_uni << 2)
+                 | NGX_QUIC_STREAM_UNIDIRECTIONAL;
+        qc->streams.client_streams_uni = (id >> 2) + 1;
+        n = qc->tp.initial_max_stream_data_uni;
+
+    } else {
+
+        if (id & NGX_QUIC_STREAM_SERVER_INITIATED) {
+            if ((id >> 2) < qc->streams.server_streams_bidi) {
+                return NGX_QUIC_STREAM_GONE;
+            }
+
+            qc->error = NGX_QUIC_ERR_STREAM_STATE_ERROR;
+            return NULL;
+        }
+
+        if ((id >> 2) < qc->streams.client_streams_bidi) {
+            return NGX_QUIC_STREAM_GONE;
+        }
+
+        if ((id >> 2) >= qc->streams.client_max_streams_bidi) {
+            qc->error = NGX_QUIC_ERR_STREAM_LIMIT_ERROR;
+            return NULL;
+        }
+
+        min_id = (qc->streams.client_streams_bidi << 2);
+        qc->streams.client_streams_bidi = (id >> 2) + 1;
+        n = qc->tp.initial_max_stream_data_bidi_remote;
+    }
+
+    if (n < NGX_QUIC_STREAM_BUFSIZE) {
+        n = NGX_QUIC_STREAM_BUFSIZE;
+    }
+
+    /*
+     *   2.1.  Stream Types and Identifiers
+     *
+     *   Within each type, streams are created with numerically increasing
+     *   stream IDs.  A stream ID that is used out of order results in all
+     *   streams of that type with lower-numbered stream IDs also being
+     *   opened.
+     */
+
+    for ( /* void */ ; min_id < id; min_id += 0x04) {
+
+        sn = ngx_quic_create_stream(c, min_id, n);
+        if (sn == NULL) {
+            return NULL;
+        }
+
+        sn->c->listening->handler(sn->c);
+
+        if (qc->shutdown) {
+            return NGX_QUIC_STREAM_GONE;
+        }
+    }
+
+    return ngx_quic_create_stream(c, id, n);
+}
+
+
+static ngx_quic_stream_t *
+ngx_quic_create_stream(ngx_connection_t *c, uint64_t id, size_t rcvbuf_size)
+{
+    ngx_log_t              *log;
+    ngx_pool_t             *pool;
+    ngx_quic_stream_t      *sn;
+    ngx_pool_cleanup_t     *cln;
+    ngx_quic_connection_t  *qc;
+
+    ngx_log_debug1(NGX_LOG_DEBUG_EVENT, c->log, 0,
+                   "quic stream id:0x%xL create", id);
+
+    qc = ngx_quic_get_connection(c);
+
+    pool = ngx_create_pool(NGX_DEFAULT_POOL_SIZE, c->log);
+    if (pool == NULL) {
+        return NULL;
+    }
+
+    sn = ngx_pcalloc(pool, sizeof(ngx_quic_stream_t));
+    if (sn == NULL) {
+        ngx_destroy_pool(pool);
+        return NULL;
+    }
+
+    sn->node.key = id;
+    sn->parent = c;
+    sn->id = id;
+
+    sn->b = ngx_create_temp_buf(pool, rcvbuf_size);
+    if (sn->b == NULL) {
+        ngx_destroy_pool(pool);
+        return NULL;
+    }
+
+    ngx_queue_init(&sn->fs.frames);
+
+    log = ngx_palloc(pool, sizeof(ngx_log_t));
+    if (log == NULL) {
+        ngx_destroy_pool(pool);
+        return NULL;
+    }
+
+    *log = *c->log;
+    pool->log = log;
+
+    sn->c = ngx_get_connection(-1, log);
+    if (sn->c == NULL) {
+        ngx_destroy_pool(pool);
+        return NULL;
+    }
+
+    sn->c->quic = sn;
+    sn->c->type = SOCK_STREAM;
+    sn->c->pool = pool;
+    sn->c->ssl = c->ssl;
+    sn->c->sockaddr = c->sockaddr;
+    sn->c->listening = c->listening;
+    sn->c->addr_text = c->addr_text;
+    sn->c->local_sockaddr = c->local_sockaddr;
+    sn->c->local_socklen = c->local_socklen;
+    sn->c->number = ngx_atomic_fetch_add(ngx_connection_counter, 1);
+
+    sn->c->recv = ngx_quic_stream_recv;
+    sn->c->send = ngx_quic_stream_send;
+    sn->c->send_chain = ngx_quic_stream_send_chain;
+
+    sn->c->read->log = log;
+    sn->c->write->log = log;
+
+    log->connection = sn->c->number;
+
+    if ((id & NGX_QUIC_STREAM_UNIDIRECTIONAL) == 0
+        || (id & NGX_QUIC_STREAM_SERVER_INITIATED))
+    {
+        sn->c->write->ready = 1;
+    }
+
+    if (id & NGX_QUIC_STREAM_UNIDIRECTIONAL) {
+        if (id & NGX_QUIC_STREAM_SERVER_INITIATED) {
+            sn->send_max_data = qc->ctp.initial_max_stream_data_uni;
+        }
+
+    } else {
+        if (id & NGX_QUIC_STREAM_SERVER_INITIATED) {
+            sn->send_max_data = qc->ctp.initial_max_stream_data_bidi_remote;
+        } else {
+            sn->send_max_data = qc->ctp.initial_max_stream_data_bidi_local;
+        }
+    }
+
+    cln = ngx_pool_cleanup_add(pool, 0);
+    if (cln == NULL) {
+        ngx_close_connection(sn->c);
+        ngx_destroy_pool(pool);
+        return NULL;
+    }
+
+    cln->handler = ngx_quic_stream_cleanup_handler;
+    cln->data = sn->c;
+
+    ngx_rbtree_insert(&qc->streams.tree, &sn->node);
+
+    return sn;
+}
+
+
+static ssize_t
+ngx_quic_stream_recv(ngx_connection_t *c, u_char *buf, size_t size)
+{
+    ssize_t                 len;
+    ngx_buf_t              *b;
+    ngx_event_t            *rev;
+    ngx_connection_t       *pc;
+    ngx_quic_frame_t       *frame;
+    ngx_quic_stream_t      *qs;
+    ngx_quic_connection_t  *qc;
+
+    qs = c->quic;
+    b = qs->b;
+    pc = qs->parent;
+    qc = ngx_quic_get_connection(pc);
+    rev = c->read;
+
+    if (rev->error) {
+        return NGX_ERROR;
+    }
+
+    ngx_log_debug3(NGX_LOG_DEBUG_EVENT, c->log, 0,
+                   "quic stream recv id:0x%xL eof:%d avail:%z",
+                   qs->id, rev->pending_eof, b->last - b->pos);
+
+    if (b->pos == b->last) {
+        rev->ready = 0;
+
+        if (rev->pending_eof) {
+            rev->eof = 1;
+            return 0;
+        }
+
+        ngx_log_debug1(NGX_LOG_DEBUG_EVENT, c->log, 0,
+                       "quic stream id:0x%xL recv() not ready", qs->id);
+        return NGX_AGAIN;
+    }
+
+    len = ngx_min(b->last - b->pos, (ssize_t) size);
+
+    ngx_memcpy(buf, b->pos, len);
+
+    b->pos += len;
+    qc->streams.received += len;
+
+    if (b->pos == b->last) {
+        b->pos = b->start;
+        b->last = b->start;
+        rev->ready = rev->pending_eof;
+    }
+
+    ngx_log_debug3(NGX_LOG_DEBUG_EVENT, c->log, 0,
+                   "quic stream id:0x%xL recv len:%z of size:%uz",
+                   qs->id, len, size);
+
+    if (!rev->pending_eof) {
+        frame = ngx_quic_alloc_frame(pc);
+        if (frame == NULL) {
+            return NGX_ERROR;
+        }
+
+        frame->level = ssl_encryption_application;
+        frame->type = NGX_QUIC_FT_MAX_STREAM_DATA;
+        frame->u.max_stream_data.id = qs->id;
+        frame->u.max_stream_data.limit = qs->fs.received + (b->pos - b->start)
+                                         + (b->end - b->last);
+
+        ngx_quic_queue_frame(qc, frame);
+    }
+
+    if ((qc->streams.recv_max_data / 2) < qc->streams.received) {
+
+        frame = ngx_quic_alloc_frame(pc);
+
+        if (frame == NULL) {
+            return NGX_ERROR;
+        }
+
+        qc->streams.recv_max_data *= 2;
+
+        frame->level = ssl_encryption_application;
+        frame->type = NGX_QUIC_FT_MAX_DATA;
+        frame->u.max_data.max_data = qc->streams.recv_max_data;
+
+        ngx_quic_queue_frame(qc, frame);
+
+        ngx_log_debug2(NGX_LOG_DEBUG_EVENT, c->log, 0,
+                       "quic stream id:0x%xL recv: increased max_data:%uL",
+                       qs->id, qc->streams.recv_max_data);
+    }
+
+    return len;
+}
+
+
+static ssize_t
+ngx_quic_stream_send(ngx_connection_t *c, u_char *buf, size_t size)
+{
+    ngx_buf_t    b;
+    ngx_chain_t  cl;
+
+    ngx_memzero(&b, sizeof(ngx_buf_t));
+
+    b.memory = 1;
+    b.pos = buf;
+    b.last = buf + size;
+
+    cl.buf = &b;
+    cl.next = NULL;
+
+    if (ngx_quic_stream_send_chain(c, &cl, 0) == NGX_CHAIN_ERROR) {
+        return NGX_ERROR;
+    }
+
+    if (b.pos == buf) {
+        return NGX_AGAIN;
+    }
+
+    return b.pos - buf;
+}
+
+
+static ngx_chain_t *
+ngx_quic_stream_send_chain(ngx_connection_t *c, ngx_chain_t *in, off_t limit)
+{
+    size_t                  n, flow;
+    ngx_event_t            *wev;
+    ngx_chain_t            *cl;
+    ngx_connection_t       *pc;
+    ngx_quic_frame_t       *frame;
+    ngx_quic_stream_t      *qs;
+    ngx_quic_connection_t  *qc;
+
+    qs = c->quic;
+    pc = qs->parent;
+    qc = ngx_quic_get_connection(pc);
+    wev = c->write;
+
+    if (wev->error) {
+        return NGX_CHAIN_ERROR;
+    }
+
+    flow = ngx_quic_max_stream_flow(c);
+    if (flow == 0) {
+        wev->ready = 0;
+        return in;
+    }
+
+    n = (limit && (size_t) limit < flow) ? (size_t) limit : flow;
+
+    frame = ngx_quic_alloc_frame(pc);
+    if (frame == NULL) {
+        return NGX_CHAIN_ERROR;
+    }
+
+    frame->data = ngx_quic_copy_chain(pc, in, n);
+    if (frame->data == NGX_CHAIN_ERROR) {
+        return NGX_CHAIN_ERROR;
+    }
+
+    for (n = 0, cl = frame->data; cl; cl = cl->next) {
+        n += ngx_buf_size(cl->buf);
+    }
+
+    while (in && ngx_buf_size(in->buf) == 0) {
+        in = in->next;
+    }
+
+    frame->level = ssl_encryption_application;
+    frame->type = NGX_QUIC_FT_STREAM6; /* OFF=1 LEN=1 FIN=0 */
+    frame->u.stream.off = 1;
+    frame->u.stream.len = 1;
+    frame->u.stream.fin = 0;
+
+    frame->u.stream.type = frame->type;
+    frame->u.stream.stream_id = qs->id;
+    frame->u.stream.offset = c->sent;
+    frame->u.stream.length = n;
+
+    c->sent += n;
+    qc->streams.sent += n;
+
+    ngx_quic_queue_frame(qc, frame);
+
+    wev->ready = (n < flow) ? 1 : 0;
+
+    ngx_log_debug1(NGX_LOG_DEBUG_EVENT, c->log, 0,
+                   "quic send_chain sent:%uz", n);
+
+    return in;
+}
+
+
+static size_t
+ngx_quic_max_stream_flow(ngx_connection_t *c)
+{
+    size_t                  size;
+    uint64_t                sent, unacked;
+    ngx_quic_stream_t      *qs;
+    ngx_quic_connection_t  *qc;
+
+    qs = c->quic;
+    qc = ngx_quic_get_connection(qs->parent);
+
+    size = NGX_QUIC_STREAM_BUFSIZE;
+    sent = c->sent;
+    unacked = sent - qs->acked;
+
+    if (qc->streams.send_max_data == 0) {
+        qc->streams.send_max_data = qc->ctp.initial_max_data;
+    }
+
+    if (unacked >= NGX_QUIC_STREAM_BUFSIZE) {
+        ngx_log_debug0(NGX_LOG_DEBUG_EVENT, c->log, 0,
+                       "quic send flow hit buffer size");
+        return 0;
+    }
+
+    if (unacked + size > NGX_QUIC_STREAM_BUFSIZE) {
+        size = NGX_QUIC_STREAM_BUFSIZE - unacked;
+    }
+
+    if (qc->streams.sent >= qc->streams.send_max_data) {
+        ngx_log_debug0(NGX_LOG_DEBUG_EVENT, c->log, 0,
+                       "quic send flow hit MAX_DATA");
+        return 0;
+    }
+
+    if (qc->streams.sent + size > qc->streams.send_max_data) {
+        size = qc->streams.send_max_data - qc->streams.sent;
+    }
+
+    if (sent >= qs->send_max_data) {
+        ngx_log_debug0(NGX_LOG_DEBUG_EVENT, c->log, 0,
+                       "quic send flow hit MAX_STREAM_DATA");
+        return 0;
+    }
+
+    if (sent + size > qs->send_max_data) {
+        size = qs->send_max_data - sent;
+    }
+
+    ngx_log_debug1(NGX_LOG_DEBUG_EVENT, c->log, 0,
+                   "quic send flow:%uz", size);
+
+    return size;
+}
+
+
+static void
+ngx_quic_stream_cleanup_handler(void *data)
+{
+    ngx_connection_t *c = data;
+
+    ngx_connection_t       *pc;
+    ngx_quic_frame_t       *frame;
+    ngx_quic_stream_t      *qs;
+    ngx_quic_connection_t  *qc;
+
+    qs = c->quic;
+    pc = qs->parent;
+    qc = ngx_quic_get_connection(pc);
+
+    ngx_log_debug1(NGX_LOG_DEBUG_EVENT, c->log, 0,
+                   "quic stream id:0x%xL cleanup", qs->id);
+
+    ngx_rbtree_delete(&qc->streams.tree, &qs->node);
+    ngx_quic_free_frames(pc, &qs->fs.frames);
+
+    if (qc->closing) {
+        /* schedule handler call to continue ngx_quic_close_connection() */
+        ngx_post_event(pc->read, &ngx_posted_events);
+        return;
+    }
+
+    if ((qs->id & NGX_QUIC_STREAM_SERVER_INITIATED) == 0
+        || (qs->id & NGX_QUIC_STREAM_UNIDIRECTIONAL) == 0)
+    {
+        if (!c->read->pending_eof && !c->read->error) {
+            frame = ngx_quic_alloc_frame(pc);
+            if (frame == NULL) {
+                goto done;
+            }
+
+            frame->level = ssl_encryption_application;
+            frame->type = NGX_QUIC_FT_STOP_SENDING;
+            frame->u.stop_sending.id = qs->id;
+            frame->u.stop_sending.error_code = 0x100; /* HTTP/3 no error */
+
+            ngx_quic_queue_frame(qc, frame);
+        }
+    }
+
+    if ((qs->id & NGX_QUIC_STREAM_SERVER_INITIATED) == 0) {
+        frame = ngx_quic_alloc_frame(pc);
+        if (frame == NULL) {
+            goto done;
+        }
+
+        frame->level = ssl_encryption_application;
+        frame->type = NGX_QUIC_FT_MAX_STREAMS;
+
+        if (qs->id & NGX_QUIC_STREAM_UNIDIRECTIONAL) {
+            frame->u.max_streams.limit = ++qc->streams.client_max_streams_uni;
+            frame->u.max_streams.bidi = 0;
+
+        } else {
+            frame->u.max_streams.limit = ++qc->streams.client_max_streams_bidi;
+            frame->u.max_streams.bidi = 1;
+        }
+
+        ngx_quic_queue_frame(qc, frame);
+
+        if (qs->id & NGX_QUIC_STREAM_UNIDIRECTIONAL) {
+            /* do not send fin for client unidirectional streams */
+            goto done;
+        }
+    }
+
+    if (c->write->error) {
+        goto done;
+    }
+
+    ngx_log_debug1(NGX_LOG_DEBUG_EVENT, c->log, 0,
+                   "quic stream id:0x%xL send fin", qs->id);
+
+    frame = ngx_quic_alloc_frame(pc);
+    if (frame == NULL) {
+        goto done;
+    }
+
+    frame->level = ssl_encryption_application;
+    frame->type = NGX_QUIC_FT_STREAM7; /* OFF=1 LEN=1 FIN=1 */
+    frame->u.stream.off = 1;
+    frame->u.stream.len = 1;
+    frame->u.stream.fin = 1;
+
+    frame->u.stream.type = frame->type;
+    frame->u.stream.stream_id = qs->id;
+    frame->u.stream.offset = c->sent;
+    frame->u.stream.length = 0;
+
+    ngx_quic_queue_frame(qc, frame);
+
+done:
+
+    (void) ngx_quic_output(pc);
+
+    if (qc->shutdown) {
+        ngx_quic_shutdown_quic(pc);
+    }
+}
+
+
+ngx_int_t
+ngx_quic_handle_stream_frame(ngx_connection_t *c, ngx_quic_header_t *pkt,
+    ngx_quic_frame_t *frame)
+{
+    size_t                     window;
+    uint64_t                   last;
+    ngx_buf_t                 *b;
+    ngx_pool_t                *pool;
+    ngx_connection_t          *sc;
+    ngx_quic_stream_t         *sn;
+    ngx_quic_connection_t     *qc;
+    ngx_quic_stream_frame_t   *f;
+    ngx_quic_frames_stream_t  *fs;
+
+    qc = ngx_quic_get_connection(c);
+    f = &frame->u.stream;
+
+    if ((f->stream_id & NGX_QUIC_STREAM_UNIDIRECTIONAL)
+        && (f->stream_id & NGX_QUIC_STREAM_SERVER_INITIATED))
+    {
+        qc->error = NGX_QUIC_ERR_STREAM_STATE_ERROR;
+        return NGX_ERROR;
+    }
+
+    /* no overflow since both values are 62-bit */
+    last = f->offset + f->length;
+
+    sn = ngx_quic_find_stream(&qc->streams.tree, f->stream_id);
+
+    if (sn == NULL) {
+        sn = ngx_quic_create_client_stream(c, f->stream_id);
+
+        if (sn == NULL) {
+            return NGX_ERROR;
+        }
+
+        if (sn == NGX_QUIC_STREAM_GONE) {
+            return NGX_OK;
+        }
+
+        sc = sn->c;
+        fs = &sn->fs;
+        b = sn->b;
+        window = b->end - b->last;
+
+        if (last > window) {
+            qc->error = NGX_QUIC_ERR_FLOW_CONTROL_ERROR;
+            goto cleanup;
+        }
+
+        if (ngx_quic_handle_ordered_frame(c, fs, frame, ngx_quic_stream_input,
+                                          sn)
+            != NGX_OK)
+        {
+            goto cleanup;
+        }
+
+        sc->listening->handler(sc);
+
+        return NGX_OK;
+    }
+
+    fs = &sn->fs;
+    b = sn->b;
+    window = (b->pos - b->start) + (b->end - b->last);
+
+    if (last > fs->received && last - fs->received > window) {
+        qc->error = NGX_QUIC_ERR_FLOW_CONTROL_ERROR;
+        return NGX_ERROR;
+    }
+
+    return ngx_quic_handle_ordered_frame(c, fs, frame, ngx_quic_stream_input,
+                                         sn);
+
+cleanup:
+
+    pool = sc->pool;
+
+    ngx_close_connection(sc);
+    ngx_destroy_pool(pool);
+
+    return NGX_ERROR;
+}
+
+
+ngx_int_t
+ngx_quic_stream_input(ngx_connection_t *c, ngx_quic_frame_t *frame, void *data)
+{
+    uint64_t                  id;
+    ngx_buf_t                *b;
+    ngx_event_t              *rev;
+    ngx_chain_t              *cl;
+    ngx_quic_stream_t        *sn;
+    ngx_quic_connection_t    *qc;
+    ngx_quic_stream_frame_t  *f;
+
+    qc = ngx_quic_get_connection(c);
+    sn = data;
+
+    f = &frame->u.stream;
+    id = f->stream_id;
+
+    b = sn->b;
+
+    if ((size_t) ((b->pos - b->start) + (b->end - b->last)) < f->length) {
+        ngx_log_error(NGX_LOG_INFO, c->log, 0,
+                      "quic no space in stream buffer");
+        return NGX_ERROR;
+    }
+
+    if ((size_t) (b->end - b->last) < f->length) {
+        b->last = ngx_movemem(b->start, b->pos, b->last - b->pos);
+        b->pos = b->start;
+    }
+
+    for (cl = frame->data; cl; cl = cl->next) {
+        b->last = ngx_cpymem(b->last, cl->buf->pos,
+                             cl->buf->last - cl->buf->pos);
+    }
+
+    rev = sn->c->read;
+    rev->ready = 1;
+
+    if (f->fin) {
+        rev->pending_eof = 1;
+    }
+
+    if (rev->active) {
+        rev->handler(rev);
+    }
+
+    /* check if stream was destroyed by handler */
+    if (ngx_quic_find_stream(&qc->streams.tree, id) == NULL) {
+        return NGX_DONE;
+    }
+
+    return NGX_OK;
+}
+
+
+ngx_int_t
+ngx_quic_handle_max_data_frame(ngx_connection_t *c,
+    ngx_quic_max_data_frame_t *f)
+{
+    ngx_event_t            *wev;
+    ngx_rbtree_t           *tree;
+    ngx_rbtree_node_t      *node;
+    ngx_quic_stream_t      *qs;
+    ngx_quic_connection_t  *qc;
+
+    qc = ngx_quic_get_connection(c);
+    tree = &qc->streams.tree;
+
+    if (f->max_data <= qc->streams.send_max_data) {
+        return NGX_OK;
+    }
+
+    if (qc->streams.sent >= qc->streams.send_max_data) {
+
+        for (node = ngx_rbtree_min(tree->root, tree->sentinel);
+             node;
+             node = ngx_rbtree_next(tree, node))
+        {
+            qs = (ngx_quic_stream_t *) node;
+            wev = qs->c->write;
+
+            if (wev->active) {
+                wev->ready = 1;
+                ngx_post_event(wev, &ngx_posted_events);
+            }
+        }
+    }
+
+    qc->streams.send_max_data = f->max_data;
+
+    return NGX_OK;
+}
+
+
+ngx_int_t
+ngx_quic_handle_streams_blocked_frame(ngx_connection_t *c,
+    ngx_quic_header_t *pkt, ngx_quic_streams_blocked_frame_t *f)
+{
+    return NGX_OK;
+}
+
+
+ngx_int_t
+ngx_quic_handle_stream_data_blocked_frame(ngx_connection_t *c,
+    ngx_quic_header_t *pkt, ngx_quic_stream_data_blocked_frame_t *f)
+{
+    size_t                  n;
+    ngx_buf_t              *b;
+    ngx_quic_frame_t       *frame;
+    ngx_quic_stream_t      *sn;
+    ngx_quic_connection_t  *qc;
+
+    qc = ngx_quic_get_connection(c);
+
+    if ((f->id & NGX_QUIC_STREAM_UNIDIRECTIONAL)
+        && (f->id & NGX_QUIC_STREAM_SERVER_INITIATED))
+    {
+        qc->error = NGX_QUIC_ERR_STREAM_STATE_ERROR;
+        return NGX_ERROR;
+    }
+
+    sn = ngx_quic_find_stream(&qc->streams.tree, f->id);
+
+    if (sn == NULL) {
+        sn = ngx_quic_create_client_stream(c, f->id);
+
+        if (sn == NULL) {
+            return NGX_ERROR;
+        }
+
+        if (sn == NGX_QUIC_STREAM_GONE) {
+            return NGX_OK;
+        }
+
+        b = sn->b;
+        n = b->end - b->last;
+
+        sn->c->listening->handler(sn->c);
+
+    } else {
+        b = sn->b;
+        n = sn->fs.received + (b->pos - b->start) + (b->end - b->last);
+    }
+
+    frame = ngx_quic_alloc_frame(c);
+    if (frame == NULL) {
+        return NGX_ERROR;
+    }
+
+    frame->level = pkt->level;
+    frame->type = NGX_QUIC_FT_MAX_STREAM_DATA;
+    frame->u.max_stream_data.id = f->id;
+    frame->u.max_stream_data.limit = n;
+
+    ngx_quic_queue_frame(qc, frame);
+
+    return NGX_OK;
+}
+
+
+ngx_int_t
+ngx_quic_handle_max_stream_data_frame(ngx_connection_t *c,
+    ngx_quic_header_t *pkt, ngx_quic_max_stream_data_frame_t *f)
+{
+    uint64_t                sent;
+    ngx_event_t            *wev;
+    ngx_quic_stream_t      *sn;
+    ngx_quic_connection_t  *qc;
+
+    qc = ngx_quic_get_connection(c);
+
+    if ((f->id & NGX_QUIC_STREAM_UNIDIRECTIONAL)
+        && (f->id & NGX_QUIC_STREAM_SERVER_INITIATED) == 0)
+    {
+        qc->error = NGX_QUIC_ERR_STREAM_STATE_ERROR;
+        return NGX_ERROR;
+    }
+
+    sn = ngx_quic_find_stream(&qc->streams.tree, f->id);
+
+    if (sn == NULL) {
+        sn = ngx_quic_create_client_stream(c, f->id);
+
+        if (sn == NULL) {
+            return NGX_ERROR;
+        }
+
+        if (sn == NGX_QUIC_STREAM_GONE) {
+            return NGX_OK;
+        }
+
+        if (f->limit > sn->send_max_data) {
+            sn->send_max_data = f->limit;
+        }
+
+        sn->c->listening->handler(sn->c);
+
+        return NGX_OK;
+    }
+
+    if (f->limit <= sn->send_max_data) {
+        return NGX_OK;
+    }
+
+    sent = sn->c->sent;
+
+    if (sent >= sn->send_max_data) {
+        wev = sn->c->write;
+
+        if (wev->active) {
+            wev->ready = 1;
+            ngx_post_event(wev, &ngx_posted_events);
+        }
+    }
+
+    sn->send_max_data = f->limit;
+
+    return NGX_OK;
+}
+
+
+ngx_int_t
+ngx_quic_handle_reset_stream_frame(ngx_connection_t *c,
+    ngx_quic_header_t *pkt, ngx_quic_reset_stream_frame_t *f)
+{
+    ngx_event_t            *rev;
+    ngx_connection_t       *sc;
+    ngx_quic_stream_t      *sn;
+    ngx_quic_connection_t  *qc;
+
+    qc = ngx_quic_get_connection(c);
+
+    if ((f->id & NGX_QUIC_STREAM_UNIDIRECTIONAL)
+        && (f->id & NGX_QUIC_STREAM_SERVER_INITIATED))
+    {
+        qc->error = NGX_QUIC_ERR_STREAM_STATE_ERROR;
+        return NGX_ERROR;
+    }
+
+    sn = ngx_quic_find_stream(&qc->streams.tree, f->id);
+
+    if (sn == NULL) {
+        sn = ngx_quic_create_client_stream(c, f->id);
+
+        if (sn == NULL) {
+            return NGX_ERROR;
+        }
+
+        if (sn == NGX_QUIC_STREAM_GONE) {
+            return NGX_OK;
+        }
+
+        sc = sn->c;
+
+        rev = sc->read;
+        rev->error = 1;
+        rev->ready = 1;
+
+        sc->listening->handler(sc);
+
+        return NGX_OK;
+    }
+
+    rev = sn->c->read;
+    rev->error = 1;
+    rev->ready = 1;
+
+    if (rev->active) {
+        rev->handler(rev);
+    }
+
+    return NGX_OK;
+}
+
+
+ngx_int_t
+ngx_quic_handle_stop_sending_frame(ngx_connection_t *c,
+    ngx_quic_header_t *pkt, ngx_quic_stop_sending_frame_t *f)
+{
+    ngx_event_t            *wev;
+    ngx_connection_t       *sc;
+    ngx_quic_stream_t      *sn;
+    ngx_quic_connection_t  *qc;
+
+    qc = ngx_quic_get_connection(c);
+
+    if ((f->id & NGX_QUIC_STREAM_UNIDIRECTIONAL)
+        && (f->id & NGX_QUIC_STREAM_SERVER_INITIATED) == 0)
+    {
+        qc->error = NGX_QUIC_ERR_STREAM_STATE_ERROR;
+        return NGX_ERROR;
+    }
+
+    sn = ngx_quic_find_stream(&qc->streams.tree, f->id);
+
+    if (sn == NULL) {
+        sn = ngx_quic_create_client_stream(c, f->id);
+
+        if (sn == NULL) {
+            return NGX_ERROR;
+        }
+
+        if (sn == NGX_QUIC_STREAM_GONE) {
+            return NGX_OK;
+        }
+
+        sc = sn->c;
+
+        wev = sc->write;
+        wev->error = 1;
+        wev->ready = 1;
+
+        sc->listening->handler(sc);
+
+        return NGX_OK;
+    }
+
+    wev = sn->c->write;
+    wev->error = 1;
+    wev->ready = 1;
+
+    if (wev->active) {
+        wev->handler(wev);
+    }
+
+    return NGX_OK;
+}
+
+
+ngx_int_t
+ngx_quic_handle_max_streams_frame(ngx_connection_t *c,
+    ngx_quic_header_t *pkt, ngx_quic_max_streams_frame_t *f)
+{
+    ngx_quic_connection_t  *qc;
+
+    qc = ngx_quic_get_connection(c);
+
+    if (f->bidi) {
+        if (qc->streams.server_max_streams_bidi < f->limit) {
+            qc->streams.server_max_streams_bidi = f->limit;
+
+            ngx_log_debug1(NGX_LOG_DEBUG_EVENT, c->log, 0,
+                           "quic max_streams_bidi:%uL", f->limit);
+        }
+
+    } else {
+        if (qc->streams.server_max_streams_uni < f->limit) {
+            qc->streams.server_max_streams_uni = f->limit;
+
+            ngx_log_debug1(NGX_LOG_DEBUG_EVENT, c->log, 0,
+                           "quic max_streams_uni:%uL", f->limit);
+        }
+    }
+
+    return NGX_OK;
+}
+
+
+void
+ngx_quic_handle_stream_ack(ngx_connection_t *c, ngx_quic_frame_t *f)
+{
+    uint64_t                sent, unacked;
+    ngx_event_t            *wev;
+    ngx_quic_stream_t      *sn;
+    ngx_quic_connection_t  *qc;
+
+    qc = ngx_quic_get_connection(c);
+
+    sn = ngx_quic_find_stream(&qc->streams.tree, f->u.stream.stream_id);
+    if (sn == NULL) {
+        return;
+    }
+
+    wev = sn->c->write;
+    sent = sn->c->sent;
+    unacked = sent - sn->acked;
+
+    if (unacked >= NGX_QUIC_STREAM_BUFSIZE && wev->active) {
+        wev->ready = 1;
+        ngx_post_event(wev, &ngx_posted_events);
+    }
+
+    sn->acked += f->u.stream.length;
+
+    ngx_log_debug3(NGX_LOG_DEBUG_EVENT, sn->c->log, 0,
+                   "quic stream ack len:%uL acked:%uL unacked:%uL",
+                   f->u.stream.length, sn->acked, sent - sn->acked);
+}
diff --git a/src/event/quic/ngx_event_quic_streams.h b/src/event/quic/ngx_event_quic_streams.h
new file mode 100644 (file)
index 0000000..1a755e9
--- /dev/null
@@ -0,0 +1,43 @@
+
+/*
+ * Copyright (C) Nginx, Inc.
+ */
+
+
+#ifndef _NGX_EVENT_QUIC_STREAMS_H_INCLUDED_
+#define _NGX_EVENT_QUIC_STREAMS_H_INCLUDED_
+
+
+#include <ngx_config.h>
+#include <ngx_core.h>
+
+
+ngx_int_t ngx_quic_handle_stream_frame(ngx_connection_t *c,
+    ngx_quic_header_t *pkt, ngx_quic_frame_t *frame);
+ngx_int_t ngx_quic_stream_input(ngx_connection_t *c,
+    ngx_quic_frame_t *frame, void *data);
+void ngx_quic_handle_stream_ack(ngx_connection_t *c,
+    ngx_quic_frame_t *f);
+ngx_int_t ngx_quic_handle_max_data_frame(ngx_connection_t *c,
+    ngx_quic_max_data_frame_t *f);
+ngx_int_t ngx_quic_handle_streams_blocked_frame(ngx_connection_t *c,
+    ngx_quic_header_t *pkt, ngx_quic_streams_blocked_frame_t *f);
+ngx_int_t ngx_quic_handle_stream_data_blocked_frame(ngx_connection_t *c,
+    ngx_quic_header_t *pkt, ngx_quic_stream_data_blocked_frame_t *f);
+ngx_int_t ngx_quic_handle_max_stream_data_frame(ngx_connection_t *c,
+    ngx_quic_header_t *pkt, ngx_quic_max_stream_data_frame_t *f);
+ngx_int_t ngx_quic_handle_reset_stream_frame(ngx_connection_t *c,
+    ngx_quic_header_t *pkt, ngx_quic_reset_stream_frame_t *f);
+ngx_int_t ngx_quic_handle_stop_sending_frame(ngx_connection_t *c,
+    ngx_quic_header_t *pkt, ngx_quic_stop_sending_frame_t *f);
+ngx_int_t ngx_quic_handle_max_streams_frame(ngx_connection_t *c,
+    ngx_quic_header_t *pkt, ngx_quic_max_streams_frame_t *f);
+
+void ngx_quic_rbtree_insert_stream(ngx_rbtree_node_t *temp,
+    ngx_rbtree_node_t *node, ngx_rbtree_node_t *sentinel);
+ngx_quic_stream_t *ngx_quic_find_stream(ngx_rbtree_t *rbtree,
+    uint64_t id);
+ngx_int_t ngx_quic_close_streams(ngx_connection_t *c,
+    ngx_quic_connection_t *qc);
+
+#endif /* _NGX_EVENT_QUIC_STREAMS_H_INCLUDED_ */