Commit 71eb6b6b authored by Kent Overstreet's avatar Kent Overstreet Committed by Christian Brauner
Browse files

fs/aio: obey min_nr when doing wakeups



I've been observing workloads where IPIs due to wakeups in
aio_complete() are ~15% of total CPU time in the profile. Most of those
wakeups are unnecessary when completion batching is in use in
io_getevents().

This plumbs min_nr through via the wait eventry, so that aio_complete()
can avoid doing unnecessary wakeups.

Signed-off-by: default avatarKent Overstreet <kent.overstreet@linux.dev>
Link: https://lore.kernel.org/r/20231122234257.179390-1-kent.overstreet@linux.dev


Cc: Benjamin LaHaise <bcrl@kvack.org>
Cc: Christian Brauner <brauner@kernel.org>
Cc: <linux-aio@kvack.org>
Cc: <linux-fsdevel@vger.kernel.org>
Signed-off-by: default avatarChristian Brauner <brauner@kernel.org>
parent b7638ad0
Loading
Loading
Loading
Loading
+57 −10
Original line number Diff line number Diff line
@@ -1106,6 +1106,11 @@ static inline void iocb_destroy(struct aio_kiocb *iocb)
	kmem_cache_free(kiocb_cachep, iocb);
}

struct aio_waiter {
	struct wait_queue_entry	w;
	size_t			min_nr;
};

/* aio_complete
 *	Called when the io request on the given iocb is complete.
 */
@@ -1114,7 +1119,7 @@ static void aio_complete(struct aio_kiocb *iocb)
	struct kioctx	*ctx = iocb->ki_ctx;
	struct aio_ring	*ring;
	struct io_event	*ev_page, *event;
	unsigned tail, pos, head;
	unsigned tail, pos, head, avail;
	unsigned long	flags;

	/*
@@ -1156,6 +1161,10 @@ static void aio_complete(struct aio_kiocb *iocb)
	ctx->completed_events++;
	if (ctx->completed_events > 1)
		refill_reqs_available(ctx, head, tail);

	avail = tail > head
		? tail - head
		: tail + ctx->nr_events - head;
	spin_unlock_irqrestore(&ctx->completion_lock, flags);

	pr_debug("added to ring %p at [%u]\n", iocb, tail);
@@ -1176,8 +1185,18 @@ static void aio_complete(struct aio_kiocb *iocb)
	 */
	smp_mb();

	if (waitqueue_active(&ctx->wait))
		wake_up(&ctx->wait);
	if (waitqueue_active(&ctx->wait)) {
		struct aio_waiter *curr, *next;
		unsigned long flags;

		spin_lock_irqsave(&ctx->wait.lock, flags);
		list_for_each_entry_safe(curr, next, &ctx->wait.head, w.entry)
			if (avail >= curr->min_nr) {
				list_del_init_careful(&curr->w.entry);
				wake_up_process(curr->w.private);
			}
		spin_unlock_irqrestore(&ctx->wait.lock, flags);
	}
}

static inline void iocb_put(struct aio_kiocb *iocb)
@@ -1290,7 +1309,9 @@ static long read_events(struct kioctx *ctx, long min_nr, long nr,
			struct io_event __user *event,
			ktime_t until)
{
	long ret = 0;
	struct hrtimer_sleeper	t;
	struct aio_waiter	w;
	long ret = 0, ret2 = 0;

	/*
	 * Note that aio_read_events() is being called as the conditional - i.e.
@@ -1306,12 +1327,38 @@ static long read_events(struct kioctx *ctx, long min_nr, long nr,
	 * the ringbuffer empty. So in practice we should be ok, but it's
	 * something to be aware of when touching this code.
	 */
	if (until == 0)
	aio_read_events(ctx, min_nr, nr, event, &ret);
	else
		wait_event_interruptible_hrtimeout(ctx->wait,
				aio_read_events(ctx, min_nr, nr, event, &ret),
				until);
	if (until == 0 || ret < 0 || ret >= min_nr)
		return ret;

	hrtimer_init_sleeper_on_stack(&t, CLOCK_MONOTONIC, HRTIMER_MODE_REL);
	if (until != KTIME_MAX) {
		hrtimer_set_expires_range_ns(&t.timer, until, current->timer_slack_ns);
		hrtimer_sleeper_start_expires(&t, HRTIMER_MODE_REL);
	}

	init_wait(&w.w);

	while (1) {
		unsigned long nr_got = ret;

		w.min_nr = min_nr - ret;

		ret2 = prepare_to_wait_event(&ctx->wait, &w.w, TASK_INTERRUPTIBLE);
		if (!ret2 && !t.task)
			ret2 = -ETIME;

		if (aio_read_events(ctx, min_nr, nr, event, &ret) || ret2)
			break;

		if (nr_got == ret)
			schedule();
	}

	finish_wait(&ctx->wait, &w.w);
	hrtimer_cancel(&t.timer);
	destroy_hrtimer_on_stack(&t.timer);

	return ret;
}