Commit da64d6db authored by Breno Leitao's avatar Breno Leitao Committed by Jens Axboe
Browse files

io_uring: One wqe per wq



Right now io_wq allocates one io_wqe per NUMA node.  As io_wq is now
bound to a task, the task basically uses only the NUMA local io_wqe, and
almost never changes NUMA nodes, thus, the other wqes are mostly
unused.

Allocate just one io_wqe embedded into io_wq, and uses all possible cpus
(cpu_possible_mask) in the io_wqe->cpumask.

Signed-off-by: default avatarBreno Leitao <leitao@debian.org>
Link: https://lore.kernel.org/r/20230310201107.4020580-1-leitao@debian.org


Signed-off-by: default avatarJens Axboe <axboe@kernel.dk>
parent c56e022c
Loading
Loading
Loading
Loading
+70 −110
Original line number Diff line number Diff line
@@ -15,6 +15,7 @@
#include <linux/cpu.h>
#include <linux/task_work.h>
#include <linux/audit.h>
#include <linux/mmu_context.h>
#include <uapi/linux/io_uring.h>

#include "io-wq.h"
@@ -96,8 +97,6 @@ struct io_wqe {
	raw_spinlock_t lock;
	struct io_wqe_acct acct[IO_WQ_ACCT_NR];

	int node;

	struct hlist_nulls_head free_list;
	struct list_head all_list;

@@ -127,7 +126,7 @@ struct io_wq {

	struct task_struct *task;

	struct io_wqe *wqes[];
	struct io_wqe wqe;
};

static enum cpuhp_state io_wq_online;
@@ -754,7 +753,7 @@ static void create_worker_cont(struct callback_head *cb)
	worker = container_of(cb, struct io_worker, create_work);
	clear_bit_unlock(0, &worker->create_state);
	wqe = worker->wqe;
	tsk = create_io_thread(io_wqe_worker, worker, wqe->node);
	tsk = create_io_thread(io_wqe_worker, worker, NUMA_NO_NODE);
	if (!IS_ERR(tsk)) {
		io_init_new_worker(wqe, worker, tsk);
		io_worker_release(worker);
@@ -804,7 +803,7 @@ static bool create_io_worker(struct io_wq *wq, struct io_wqe *wqe, int index)

	__set_current_state(TASK_RUNNING);

	worker = kzalloc_node(sizeof(*worker), GFP_KERNEL, wqe->node);
	worker = kzalloc(sizeof(*worker), GFP_KERNEL);
	if (!worker) {
fail:
		atomic_dec(&acct->nr_running);
@@ -823,7 +822,7 @@ static bool create_io_worker(struct io_wq *wq, struct io_wqe *wqe, int index)
	if (index == IO_WQ_ACCT_BOUND)
		worker->flags |= IO_WORKER_F_BOUND;

	tsk = create_io_thread(io_wqe_worker, worker, wqe->node);
	tsk = create_io_thread(io_wqe_worker, worker, NUMA_NO_NODE);
	if (!IS_ERR(tsk)) {
		io_init_new_worker(wqe, worker, tsk);
	} else if (!io_should_retry_thread(PTR_ERR(tsk))) {
@@ -961,7 +960,7 @@ static void io_wqe_enqueue(struct io_wqe *wqe, struct io_wq_work *work)

void io_wq_enqueue(struct io_wq *wq, struct io_wq_work *work)
{
	struct io_wqe *wqe = wq->wqes[numa_node_id()];
	struct io_wqe *wqe = &wq->wqe;

	io_wqe_enqueue(wqe, work);
}
@@ -1083,7 +1082,7 @@ enum io_wq_cancel io_wq_cancel_cb(struct io_wq *wq, work_cancel_fn *cancel,
		.data		= data,
		.cancel_all	= cancel_all,
	};
	int node;
	struct io_wqe *wqe = &wq->wqe;

	/*
	 * First check pending list, if we're lucky we can just remove it
@@ -1098,9 +1097,6 @@ enum io_wq_cancel io_wq_cancel_cb(struct io_wq *wq, work_cancel_fn *cancel,
	 * Do both of these while holding the wqe->lock, to ensure that
	 * we'll find a work item regardless of state.
	 */
	for_each_node(node) {
		struct io_wqe *wqe = wq->wqes[node];

	io_wqe_cancel_pending_work(wqe, &match);
	if (match.nr_pending && !match.cancel_all)
		return IO_WQ_CANCEL_OK;
@@ -1110,7 +1106,6 @@ enum io_wq_cancel io_wq_cancel_cb(struct io_wq *wq, work_cancel_fn *cancel,
	raw_spin_unlock(&wqe->lock);
	if (match.nr_running && !match.cancel_all)
		return IO_WQ_CANCEL_RUNNING;
	}

	if (match.nr_running)
		return IO_WQ_CANCEL_RUNNING;
@@ -1140,15 +1135,16 @@ static int io_wqe_hash_wake(struct wait_queue_entry *wait, unsigned mode,

struct io_wq *io_wq_create(unsigned bounded, struct io_wq_data *data)
{
	int ret, node, i;
	int ret, i;
	struct io_wq *wq;
	struct io_wqe *wqe;

	if (WARN_ON_ONCE(!data->free_work || !data->do_work))
		return ERR_PTR(-EINVAL);
	if (WARN_ON_ONCE(!bounded))
		return ERR_PTR(-EINVAL);

	wq = kzalloc(struct_size(wq, wqes, nr_node_ids), GFP_KERNEL);
	wq = kzalloc(sizeof(struct io_wq), GFP_KERNEL);
	if (!wq)
		return ERR_PTR(-ENOMEM);
	ret = cpuhp_state_add_instance_nocalls(io_wq_online, &wq->cpuhp_node);
@@ -1159,22 +1155,13 @@ struct io_wq *io_wq_create(unsigned bounded, struct io_wq_data *data)
	wq->hash = data->hash;
	wq->free_work = data->free_work;
	wq->do_work = data->do_work;
	wqe = &wq->wqe;

	ret = -ENOMEM;
	for_each_node(node) {
		struct io_wqe *wqe;
		int alloc_node = node;

		if (!node_online(alloc_node))
			alloc_node = NUMA_NO_NODE;
		wqe = kzalloc_node(sizeof(struct io_wqe), GFP_KERNEL, alloc_node);
		if (!wqe)
			goto err;
		wq->wqes[node] = wqe;
	if (!alloc_cpumask_var(&wqe->cpu_mask, GFP_KERNEL))
		goto err;
		cpumask_copy(wqe->cpu_mask, cpumask_of_node(node));
		wqe->node = alloc_node;
	cpumask_copy(wqe->cpu_mask, cpu_possible_mask);
	wqe->acct[IO_WQ_ACCT_BOUND].max_workers = bounded;
	wqe->acct[IO_WQ_ACCT_UNBOUND].max_workers =
				task_rlimit(current, RLIMIT_NPROC);
@@ -1192,7 +1179,6 @@ struct io_wq *io_wq_create(unsigned bounded, struct io_wq_data *data)
	raw_spin_lock_init(&wqe->lock);
	INIT_HLIST_NULLS_HEAD(&wqe->free_list, 0);
	INIT_LIST_HEAD(&wqe->all_list);
	}

	wq->task = get_task_struct(data->task);
	atomic_set(&wq->worker_refs, 1);
@@ -1201,12 +1187,8 @@ struct io_wq *io_wq_create(unsigned bounded, struct io_wq_data *data)
err:
	io_wq_put_hash(data->hash);
	cpuhp_state_remove_instance_nocalls(io_wq_online, &wq->cpuhp_node);
	for_each_node(node) {
		if (!wq->wqes[node])
			continue;
		free_cpumask_var(wq->wqes[node]->cpu_mask);
		kfree(wq->wqes[node]);
	}

	free_cpumask_var(wq->wqe.cpu_mask);
err_wq:
	kfree(wq);
	return ERR_PTR(ret);
@@ -1247,48 +1229,36 @@ static void io_wq_cancel_tw_create(struct io_wq *wq)

static void io_wq_exit_workers(struct io_wq *wq)
{
	int node;

	if (!wq->task)
		return;

	io_wq_cancel_tw_create(wq);

	rcu_read_lock();
	for_each_node(node) {
		struct io_wqe *wqe = wq->wqes[node];

		io_wq_for_each_worker(wqe, io_wq_worker_wake, NULL);
	}
	io_wq_for_each_worker(&wq->wqe, io_wq_worker_wake, NULL);
	rcu_read_unlock();
	io_worker_ref_put(wq);
	wait_for_completion(&wq->worker_done);

	for_each_node(node) {
	spin_lock_irq(&wq->hash->wait.lock);
		list_del_init(&wq->wqes[node]->wait.entry);
	list_del_init(&wq->wqe.wait.entry);
	spin_unlock_irq(&wq->hash->wait.lock);
	}

	put_task_struct(wq->task);
	wq->task = NULL;
}

static void io_wq_destroy(struct io_wq *wq)
{
	int node;

	cpuhp_state_remove_instance_nocalls(io_wq_online, &wq->cpuhp_node);

	for_each_node(node) {
		struct io_wqe *wqe = wq->wqes[node];
	struct io_cb_cancel_data match = {
		.fn		= io_wq_work_match_all,
		.cancel_all	= true,
	};
	struct io_wqe *wqe = &wq->wqe;

	cpuhp_state_remove_instance_nocalls(io_wq_online, &wq->cpuhp_node);
	io_wqe_cancel_pending_work(wqe, &match);
	free_cpumask_var(wqe->cpu_mask);
		kfree(wqe);
	}
	io_wq_put_hash(wq->hash);
	kfree(wq);
}
@@ -1323,11 +1293,9 @@ static int __io_wq_cpu_online(struct io_wq *wq, unsigned int cpu, bool online)
		.cpu = cpu,
		.online = online
	};
	int i;

	rcu_read_lock();
	for_each_node(i)
		io_wq_for_each_worker(wq->wqes[i], io_wq_worker_affinity, &od);
	io_wq_for_each_worker(&wq->wqe, io_wq_worker_affinity, &od);
	rcu_read_unlock();
	return 0;
}
@@ -1348,18 +1316,15 @@ static int io_wq_cpu_offline(unsigned int cpu, struct hlist_node *node)

int io_wq_cpu_affinity(struct io_wq *wq, cpumask_var_t mask)
{
	int i;
	struct io_wqe *wqe = &wq->wqe;

	rcu_read_lock();
	for_each_node(i) {
		struct io_wqe *wqe = wq->wqes[i];

	if (mask)
		cpumask_copy(wqe->cpu_mask, mask);
	else
			cpumask_copy(wqe->cpu_mask, cpumask_of_node(i));
	}
		cpumask_copy(wqe->cpu_mask, cpu_possible_mask);
	rcu_read_unlock();

	return 0;
}

@@ -1369,9 +1334,10 @@ int io_wq_cpu_affinity(struct io_wq *wq, cpumask_var_t mask)
 */
int io_wq_max_workers(struct io_wq *wq, int *new_count)
{
	struct io_wqe *wqe = &wq->wqe;
	struct io_wqe_acct *acct;
	int prev[IO_WQ_ACCT_NR];
	bool first_node = true;
	int i, node;
	int i;

	BUILD_BUG_ON((int) IO_WQ_ACCT_BOUND   != (int) IO_WQ_BOUND);
	BUILD_BUG_ON((int) IO_WQ_ACCT_UNBOUND != (int) IO_WQ_UNBOUND);
@@ -1386,21 +1352,15 @@ int io_wq_max_workers(struct io_wq *wq, int *new_count)
		prev[i] = 0;

	rcu_read_lock();
	for_each_node(node) {
		struct io_wqe *wqe = wq->wqes[node];
		struct io_wqe_acct *acct;

	raw_spin_lock(&wqe->lock);
	for (i = 0; i < IO_WQ_ACCT_NR; i++) {
		acct = &wqe->acct[i];
			if (first_node)
		prev[i] = max_t(int, acct->max_workers, prev[i]);
		if (new_count[i])
			acct->max_workers = new_count[i];
	}
	raw_spin_unlock(&wqe->lock);
		first_node = false;
	}
	rcu_read_unlock();

	for (i = 0; i < IO_WQ_ACCT_NR; i++)