Commit 9fe3eaea authored by Jens Axboe's avatar Jens Axboe
Browse files

io_uring: remove unconditional looping in local task_work handling



If we have a ton of notifications coming in, we can be looping in here
for a long time. This can be problematic for various reasons, mostly
because we can starve userspace. If the application is waiting on N
events, then only re-run if we need more events.

Fixes: c0e0d6ba ("io_uring: add IORING_SETUP_DEFER_TASKRUN")
Signed-off-by: default avatarJens Axboe <axboe@kernel.dk>
parent 670d9d3d
Loading
Loading
Loading
Loading
+29 −15
Original line number Diff line number Diff line
@@ -1386,7 +1386,20 @@ static void __cold io_move_task_work_from_local(struct io_ring_ctx *ctx)
	}
}

static int __io_run_local_work(struct io_ring_ctx *ctx, struct io_tw_state *ts)
static bool io_run_local_work_continue(struct io_ring_ctx *ctx, int events,
				       int min_events)
{
	if (llist_empty(&ctx->work_llist))
		return false;
	if (events < min_events)
		return true;
	if (ctx->flags & IORING_SETUP_TASKRUN_FLAG)
		atomic_or(IORING_SQ_TASKRUN, &ctx->rings->sq_flags);
	return false;
}

static int __io_run_local_work(struct io_ring_ctx *ctx, struct io_tw_state *ts,
			       int min_events)
{
	struct llist_node *node;
	unsigned int loops = 0;
@@ -1414,18 +1427,20 @@ static int __io_run_local_work(struct io_ring_ctx *ctx, struct io_tw_state *ts)
	}
	loops++;

	if (!llist_empty(&ctx->work_llist))
	if (io_run_local_work_continue(ctx, ret, min_events))
		goto again;
	if (ts->locked) {
		io_submit_flush_completions(ctx);
		if (!llist_empty(&ctx->work_llist))
		if (io_run_local_work_continue(ctx, ret, min_events))
			goto again;
	}

	trace_io_uring_local_work_run(ctx, ret, loops);
	return ret;
}

static inline int io_run_local_work_locked(struct io_ring_ctx *ctx)
static inline int io_run_local_work_locked(struct io_ring_ctx *ctx,
					   int min_events)
{
	struct io_tw_state ts = { .locked = true, };
	int ret;
@@ -1433,20 +1448,20 @@ static inline int io_run_local_work_locked(struct io_ring_ctx *ctx)
	if (llist_empty(&ctx->work_llist))
		return 0;

	ret = __io_run_local_work(ctx, &ts);
	ret = __io_run_local_work(ctx, &ts, min_events);
	/* shouldn't happen! */
	if (WARN_ON_ONCE(!ts.locked))
		mutex_lock(&ctx->uring_lock);
	return ret;
}

static int io_run_local_work(struct io_ring_ctx *ctx)
static int io_run_local_work(struct io_ring_ctx *ctx, int min_events)
{
	struct io_tw_state ts = {};
	int ret;

	ts.locked = mutex_trylock(&ctx->uring_lock);
	ret = __io_run_local_work(ctx, &ts);
	ret = __io_run_local_work(ctx, &ts, min_events);
	if (ts.locked)
		mutex_unlock(&ctx->uring_lock);

@@ -1642,7 +1657,7 @@ static int io_iopoll_check(struct io_ring_ctx *ctx, long min)
		    io_task_work_pending(ctx)) {
			u32 tail = ctx->cached_cq_tail;

			(void) io_run_local_work_locked(ctx);
			(void) io_run_local_work_locked(ctx, min);

			if (task_work_pending(current) ||
			    wq_list_empty(&ctx->iopoll_list)) {
@@ -2486,7 +2501,7 @@ int io_run_task_work_sig(struct io_ring_ctx *ctx)
{
	if (!llist_empty(&ctx->work_llist)) {
		__set_current_state(TASK_RUNNING);
		if (io_run_local_work(ctx) > 0)
		if (io_run_local_work(ctx, INT_MAX) > 0)
			return 0;
	}
	if (io_run_task_work() > 0)
@@ -2554,7 +2569,7 @@ static int io_cqring_wait(struct io_ring_ctx *ctx, int min_events,
	if (!io_allowed_run_tw(ctx))
		return -EEXIST;
	if (!llist_empty(&ctx->work_llist))
		io_run_local_work(ctx);
		io_run_local_work(ctx, min_events);
	io_run_task_work();
	io_cqring_overflow_flush(ctx);
	/* if user messes with these they will just get an early return */
@@ -2592,11 +2607,10 @@ static int io_cqring_wait(struct io_ring_ctx *ctx, int min_events,

	trace_io_uring_cqring_wait(ctx, min_events);
	do {
		int nr_wait = (int) iowq.cq_tail - READ_ONCE(ctx->rings->cq.tail);
		unsigned long check_cq;

		if (ctx->flags & IORING_SETUP_DEFER_TASKRUN) {
			int nr_wait = (int) iowq.cq_tail - READ_ONCE(ctx->rings->cq.tail);

			atomic_set(&ctx->cq_wait_nr, nr_wait);
			set_current_state(TASK_INTERRUPTIBLE);
		} else {
@@ -2615,7 +2629,7 @@ static int io_cqring_wait(struct io_ring_ctx *ctx, int min_events,
		 */
		io_run_task_work();
		if (!llist_empty(&ctx->work_llist))
			io_run_local_work(ctx);
			io_run_local_work(ctx, nr_wait);

		/*
		 * Non-local task_work will be run on exit to userspace, but
@@ -3270,7 +3284,7 @@ static __cold bool io_uring_try_cancel_requests(struct io_ring_ctx *ctx,

	if ((ctx->flags & IORING_SETUP_DEFER_TASKRUN) &&
	    io_allowed_defer_tw_run(ctx))
		ret |= io_run_local_work(ctx) > 0;
		ret |= io_run_local_work(ctx, INT_MAX) > 0;
	ret |= io_cancel_defer_files(ctx, task, cancel_all);
	mutex_lock(&ctx->uring_lock);
	ret |= io_poll_remove_all(ctx, task, cancel_all);
@@ -3632,7 +3646,7 @@ SYSCALL_DEFINE6(io_uring_enter, unsigned int, fd, u32, to_submit,
			 * it should handle ownership problems if any.
			 */
			if (ctx->flags & IORING_SETUP_DEFER_TASKRUN)
				(void)io_run_local_work_locked(ctx);
				(void)io_run_local_work_locked(ctx, min_complete);
		}
		mutex_unlock(&ctx->uring_lock);
	}