Commit 8751d154 authored by Pavel Begunkov's avatar Pavel Begunkov Committed by Jens Axboe
Browse files

io_uring: reduce scheduling due to tw



Every task_work will try to wake the task to be executed, which causes
excessive scheduling and additional overhead. For some tw it's
justified, but others won't do much but post a single CQE.

When a task waits for multiple cqes, every such task_work will wake it
up. Instead, the task may give a hint about how many cqes it waits for,
io_req_local_work_add() will compare against it and skip wake ups
if #cqes + #tw is not enough to satisfy the waiting condition. Task_work
that uses the optimisation should be simple enough and never post more
than one CQE. It's also ignored for non DEFER_TASKRUN rings.

Signed-off-by: default avatarPavel Begunkov <asml.silence@gmail.com>
Link: https://lore.kernel.org/r/d2b77e99d1e86624d8a69f7037d764b739dcd225.1680782017.git.asml.silence@gmail.com


Signed-off-by: default avatarJens Axboe <axboe@kernel.dk>
parent 51509400
Loading
Loading
Loading
Loading
+2 −1
Original line number Diff line number Diff line
@@ -296,7 +296,7 @@ struct io_ring_ctx {
		spinlock_t		completion_lock;

		bool			poll_multi_queue;
		bool			cq_waiting;
		atomic_t		cq_wait_nr;

		/*
		 * ->iopoll_list is protected by the ctx->uring_lock for
@@ -566,6 +566,7 @@ struct io_kiocb {
	atomic_t			refs;
	atomic_t			poll_refs;
	struct io_task_work		io_task_work;
	unsigned			nr_tw;
	/* for polled requests, i.e. IORING_OP_POLL_ADD and async armed poll */
	union {
		struct hlist_node	hash_node;
+47 −21
Original line number Diff line number Diff line
@@ -1300,34 +1300,58 @@ static __cold void io_fallback_tw(struct io_uring_task *tctx)
	}
}

static void io_req_local_work_add(struct io_kiocb *req)
static void io_req_local_work_add(struct io_kiocb *req, unsigned flags)
{
	struct io_ring_ctx *ctx = req->ctx;
	unsigned nr_wait, nr_tw, nr_tw_prev;
	struct llist_node *first;

	if (req->flags & (REQ_F_LINK | REQ_F_HARDLINK))
		flags &= ~IOU_F_TWQ_LAZY_WAKE;

	first = READ_ONCE(ctx->work_llist.first);
	do {
		nr_tw_prev = 0;
		if (first) {
			struct io_kiocb *first_req = container_of(first,
							struct io_kiocb,
							io_task_work.node);
			/*
			 * Might be executed at any moment, rely on
			 * SLAB_TYPESAFE_BY_RCU to keep it alive.
			 */
			nr_tw_prev = READ_ONCE(first_req->nr_tw);
		}
		nr_tw = nr_tw_prev + 1;
		/* Large enough to fail the nr_wait comparison below */
		if (!(flags & IOU_F_TWQ_LAZY_WAKE))
			nr_tw = -1U;

		req->nr_tw = nr_tw;
		req->io_task_work.node.next = first;
	} while (!try_cmpxchg(&ctx->work_llist.first, &first,
			      &req->io_task_work.node));

	if (first)
		return;

	/* needed for the following wake up */
	smp_mb__after_atomic();

	if (!first) {
		if (unlikely(atomic_read(&req->task->io_uring->in_cancel))) {
			io_move_task_work_from_local(ctx);
			return;
		}

		if (ctx->flags & IORING_SETUP_TASKRUN_FLAG)
			atomic_or(IORING_SQ_TASKRUN, &ctx->rings->sq_flags);
		if (ctx->has_evfd)
			io_eventfd_signal(ctx);
	}

	if (READ_ONCE(ctx->cq_waiting))
	nr_wait = atomic_read(&ctx->cq_wait_nr);
	/* no one is waiting */
	if (!nr_wait)
		return;
	/* either not enough or the previous add has already woken it up */
	if (nr_wait > nr_tw || nr_tw_prev >= nr_wait)
		return;
	/* pairs with set_current_state() in io_cqring_wait() */
	smp_mb__after_atomic();
	wake_up_state(ctx->submitter_task, TASK_INTERRUPTIBLE);
}

@@ -1339,7 +1363,7 @@ void __io_req_task_work_add(struct io_kiocb *req, unsigned flags)
	if (!(flags & IOU_F_TWQ_FORCE_NORMAL) &&
	    (ctx->flags & IORING_SETUP_DEFER_TASKRUN)) {
		rcu_read_lock();
		io_req_local_work_add(req);
		io_req_local_work_add(req, flags);
		rcu_read_unlock();
		return;
	}
@@ -2625,7 +2649,9 @@ static int io_cqring_wait(struct io_ring_ctx *ctx, int min_events,
		unsigned long check_cq;

		if (ctx->flags & IORING_SETUP_DEFER_TASKRUN) {
			WRITE_ONCE(ctx->cq_waiting, 1);
			int nr_wait = (int) iowq.cq_tail - READ_ONCE(ctx->rings->cq.tail);

			atomic_set(&ctx->cq_wait_nr, nr_wait);
			set_current_state(TASK_INTERRUPTIBLE);
		} else {
			prepare_to_wait_exclusive(&ctx->cq_wait, &iowq.wq,
@@ -2634,7 +2660,7 @@ static int io_cqring_wait(struct io_ring_ctx *ctx, int min_events,

		ret = io_cqring_wait_schedule(ctx, &iowq);
		__set_current_state(TASK_RUNNING);
		WRITE_ONCE(ctx->cq_waiting, 0);
		atomic_set(&ctx->cq_wait_nr, 0);

		if (ret < 0)
			break;
@@ -4517,7 +4543,7 @@ static int __init io_uring_init(void)
	io_uring_optable_init();

	req_cachep = KMEM_CACHE(io_kiocb, SLAB_HWCACHE_ALIGN | SLAB_PANIC |
				SLAB_ACCOUNT);
				SLAB_ACCOUNT | SLAB_TYPESAFE_BY_RCU);
	return 0;
};
__initcall(io_uring_init);
+9 −0
Original line number Diff line number Diff line
@@ -18,6 +18,15 @@
enum {
	/* don't use deferred task_work */
	IOU_F_TWQ_FORCE_NORMAL			= 1,

	/*
	 * A hint to not wake right away but delay until there are enough of
	 * tw's queued to match the number of CQEs the task is waiting for.
	 *
	 * Must not be used wirh requests generating more than one CQE.
	 * It's also ignored unless IORING_SETUP_DEFER_TASKRUN is set.
	 */
	IOU_F_TWQ_LAZY_WAKE			= 2,
};

enum {
+1 −1
Original line number Diff line number Diff line
@@ -31,7 +31,7 @@ static void io_tx_ubuf_callback(struct sk_buff *skb, struct ubuf_info *uarg,
	struct io_kiocb *notif = cmd_to_io_kiocb(nd);

	if (refcount_dec_and_test(&uarg->refcnt))
		io_req_task_work_add(notif);
		__io_req_task_work_add(notif, IOU_F_TWQ_LAZY_WAKE);
}

static void io_tx_ubuf_callback_ext(struct sk_buff *skb, struct ubuf_info *uarg,
+1 −1
Original line number Diff line number Diff line
@@ -33,7 +33,7 @@ static inline void io_notif_flush(struct io_kiocb *notif)

	/* drop slot's master ref */
	if (refcount_dec_and_test(&nd->uarg.refcnt))
		io_req_task_work_add(notif);
		__io_req_task_work_add(notif, IOU_F_TWQ_LAZY_WAKE);
}

static inline int io_notif_account_mem(struct io_kiocb *notif, unsigned len)
Loading