Commit f46b9cdb authored by David Wei's avatar David Wei Committed by Jens Axboe
Browse files

io_uring: limit local tw done



Instead of eagerly running all available local tw, limit the amount of
local tw done to the max of IO_LOCAL_TW_DEFAULT_MAX (20) or wait_nr. The
value of 20 is chosen as a reasonable heuristic to allow enough work
batching but also keep latency down.

Add a retry_llist that maintains a list of local tw that couldn't be
done in time. No synchronisation is needed since it is only modified
within the task context.

Signed-off-by: default avatarDavid Wei <dw@davidwei.uk>
Link: https://lore.kernel.org/r/20241120221452.3762588-3-dw@davidwei.uk


Signed-off-by: default avatarJens Axboe <axboe@kernel.dk>
parent 40cfe553
Loading
Loading
Loading
Loading
+1 −0
Original line number Diff line number Diff line
@@ -335,6 +335,7 @@ struct io_ring_ctx {
	 */
	struct {
		struct llist_head	work_llist;
		struct llist_head	retry_llist;
		unsigned long		check_cq;
		atomic_t		cq_wait_nr;
		atomic_t		cq_timeouts;
+32 −11
Original line number Diff line number Diff line
@@ -122,6 +122,7 @@

#define IO_COMPL_BATCH			32
#define IO_REQ_ALLOC_BATCH		8
#define IO_LOCAL_TW_DEFAULT_MAX		20

struct io_defer_entry {
	struct list_head	list;
@@ -1256,6 +1257,8 @@ static void __cold io_move_task_work_from_local(struct io_ring_ctx *ctx)
	struct llist_node *node = llist_del_all(&ctx->work_llist);

	__io_fallback_tw(node, false);
	node = llist_del_all(&ctx->retry_llist);
	__io_fallback_tw(node, false);
}

static bool io_run_local_work_continue(struct io_ring_ctx *ctx, int events,
@@ -1270,37 +1273,55 @@ static bool io_run_local_work_continue(struct io_ring_ctx *ctx, int events,
	return false;
}

static int __io_run_local_work_loop(struct llist_node **node,
				    struct io_tw_state *ts,
				    int events)
{
	while (*node) {
		struct llist_node *next = (*node)->next;
		struct io_kiocb *req = container_of(*node, struct io_kiocb,
						    io_task_work.node);
		INDIRECT_CALL_2(req->io_task_work.func,
				io_poll_task_func, io_req_rw_complete,
				req, ts);
		*node = next;
		if (--events <= 0)
			break;
	}

	return events;
}

static int __io_run_local_work(struct io_ring_ctx *ctx, struct io_tw_state *ts,
			       int min_events)
{
	struct llist_node *node;
	unsigned int loops = 0;
	int ret = 0;
	int ret, limit;

	if (WARN_ON_ONCE(ctx->submitter_task != current))
		return -EEXIST;
	if (ctx->flags & IORING_SETUP_TASKRUN_FLAG)
		atomic_andnot(IORING_SQ_TASKRUN, &ctx->rings->sq_flags);
	limit = max(IO_LOCAL_TW_DEFAULT_MAX, min_events);
again:
	ret = __io_run_local_work_loop(&ctx->retry_llist.first, ts, limit);
	if (ctx->retry_llist.first)
		goto retry_done;

	/*
	 * llists are in reverse order, flip it back the right way before
	 * running the pending items.
	 */
	node = llist_reverse_order(llist_del_all(&ctx->work_llist));
	while (node) {
		struct llist_node *next = node->next;
		struct io_kiocb *req = container_of(node, struct io_kiocb,
						    io_task_work.node);
		INDIRECT_CALL_2(req->io_task_work.func,
				io_poll_task_func, io_req_rw_complete,
				req, ts);
		ret++;
		node = next;
	}
	ret = __io_run_local_work_loop(&node, ts, ret);
	ctx->retry_llist.first = node;
	loops++;

	ret = limit - ret;
	if (io_run_local_work_continue(ctx, ret, min_events))
		goto again;
retry_done:
	io_submit_flush_completions(ctx);
	if (io_run_local_work_continue(ctx, ret, min_events))
		goto again;
+1 −1
Original line number Diff line number Diff line
@@ -349,7 +349,7 @@ static inline int io_run_task_work(void)

static inline bool io_local_work_pending(struct io_ring_ctx *ctx)
{
	return !llist_empty(&ctx->work_llist);
	return !llist_empty(&ctx->work_llist) || !llist_empty(&ctx->retry_llist);
}

static inline bool io_task_work_pending(struct io_ring_ctx *ctx)