Commit 902ce82c authored by Pavel Begunkov's avatar Pavel Begunkov Committed by Jens Axboe
Browse files

io_uring: get rid of intermediate aux cqe caches



io_post_aux_cqe(), which is used for multishot requests, delays
completions by putting CQEs into a temporary array for the purpose
completion lock/flush batching.

DEFER_TASKRUN doesn't need any locking, so for it we can put completions
directly into the CQ and defer post completion handling with a flag.
That leaves !DEFER_TASKRUN, which is not that interesting / hot for
multishot requests, so have conditional locking with deferred flush
for them.

Signed-off-by: default avatarPavel Begunkov <asml.silence@gmail.com>
Tested-by: default avatarMing Lei <ming.lei@redhat.com>
Link: https://lore.kernel.org/r/b1d05a81fd27aaa2a07f9860af13059e7ad7a890.1710799188.git.asml.silence@gmail.com


Signed-off-by: default avatarJens Axboe <axboe@kernel.dk>
parent e5c12945
Loading
Loading
Loading
Loading
+1 −2
Original line number Diff line number Diff line
@@ -205,6 +205,7 @@ struct io_submit_state {

	bool			plug_started;
	bool			need_plug;
	bool			cq_flush;
	unsigned short		submit_nr;
	unsigned int		cqes_count;
	struct blk_plug		plug;
@@ -341,8 +342,6 @@ struct io_ring_ctx {
		unsigned		cq_last_tm_flush;
	} ____cacheline_aligned_in_smp;

	struct io_uring_cqe	completion_cqes[16];

	spinlock_t		completion_lock;

	/* IRQ completion list, under ->completion_lock */
+13 −49
Original line number Diff line number Diff line
@@ -630,6 +630,12 @@ static inline void __io_cq_lock(struct io_ring_ctx *ctx)
		spin_lock(&ctx->completion_lock);
}

static inline void __io_cq_unlock(struct io_ring_ctx *ctx)
{
	if (!ctx->lockless_cq)
		spin_unlock(&ctx->completion_lock);
}

static inline void io_cq_lock(struct io_ring_ctx *ctx)
	__acquires(ctx->completion_lock)
{
@@ -882,31 +888,6 @@ static bool io_fill_cqe_aux(struct io_ring_ctx *ctx, u64 user_data, s32 res,
	return false;
}

static void __io_flush_post_cqes(struct io_ring_ctx *ctx)
	__must_hold(&ctx->uring_lock)
{
	struct io_submit_state *state = &ctx->submit_state;
	unsigned int i;

	lockdep_assert_held(&ctx->uring_lock);
	for (i = 0; i < state->cqes_count; i++) {
		struct io_uring_cqe *cqe = &ctx->completion_cqes[i];

		if (!io_fill_cqe_aux(ctx, cqe->user_data, cqe->res, cqe->flags)) {
			if (ctx->lockless_cq) {
				spin_lock(&ctx->completion_lock);
				io_cqring_event_overflow(ctx, cqe->user_data,
							cqe->res, cqe->flags, 0, 0);
				spin_unlock(&ctx->completion_lock);
			} else {
				io_cqring_event_overflow(ctx, cqe->user_data,
							cqe->res, cqe->flags, 0, 0);
			}
		}
	}
	state->cqes_count = 0;
}

bool io_post_aux_cqe(struct io_ring_ctx *ctx, u64 user_data, s32 res, u32 cflags)
{
	bool filled;
@@ -927,31 +908,16 @@ bool io_post_aux_cqe(struct io_ring_ctx *ctx, u64 user_data, s32 res, u32 cflags
bool io_req_post_cqe(struct io_kiocb *req, s32 res, u32 cflags)
{
	struct io_ring_ctx *ctx = req->ctx;
	u64 user_data = req->cqe.user_data;
	struct io_uring_cqe *cqe;
	bool posted;

	lockdep_assert(!io_wq_current_is_worker());
	lockdep_assert_held(&ctx->uring_lock);

	if (ctx->submit_state.cqes_count == ARRAY_SIZE(ctx->completion_cqes)) {
	__io_cq_lock(ctx);
		__io_flush_post_cqes(ctx);
		/* no need to flush - flush is deferred */
	posted = io_fill_cqe_aux(ctx, req->cqe.user_data, res, cflags);
	ctx->submit_state.cq_flush = true;
	__io_cq_unlock_post(ctx);
	}

	/* For defered completions this is not as strict as it is otherwise,
	 * however it's main job is to prevent unbounded posted completions,
	 * and in that it works just as well.
	 */
	if (test_bit(IO_CHECK_CQ_OVERFLOW_BIT, &ctx->check_cq))
		return false;

	cqe = &ctx->completion_cqes[ctx->submit_state.cqes_count++];
	cqe->user_data = user_data;
	cqe->res = res;
	cqe->flags = cflags;
	return true;
	return posted;
}

static void __io_req_complete_post(struct io_kiocb *req, unsigned issue_flags)
@@ -1545,9 +1511,6 @@ void __io_submit_flush_completions(struct io_ring_ctx *ctx)
	struct io_wq_work_node *node;

	__io_cq_lock(ctx);
	/* must come first to preserve CQE ordering in failure cases */
	if (state->cqes_count)
		__io_flush_post_cqes(ctx);
	__wq_list_for_each(node, &state->compl_reqs) {
		struct io_kiocb *req = container_of(node, struct io_kiocb,
					    comp_list);
@@ -1569,6 +1532,7 @@ void __io_submit_flush_completions(struct io_ring_ctx *ctx)
		io_free_batch_list(ctx, state->compl_reqs.first);
		INIT_WQ_LIST(&state->compl_reqs);
	}
	ctx->submit_state.cq_flush = false;
}

static unsigned io_cqring_events(struct io_ring_ctx *ctx)
+1 −1
Original line number Diff line number Diff line
@@ -156,7 +156,7 @@ static inline void io_req_task_work_add(struct io_kiocb *req)
static inline void io_submit_flush_completions(struct io_ring_ctx *ctx)
{
	if (!wq_list_empty(&ctx->submit_state.compl_reqs) ||
	    ctx->submit_state.cqes_count)
	    ctx->submit_state.cq_flush)
		__io_submit_flush_completions(ctx);
}