Commit abe54c16 authored by Uday Shankar's avatar Uday Shankar Committed by Jens Axboe
Browse files

selftests: ublk: kublk: decouple ublk_queues from ublk server threads



Add support in kublk for decoupled ublk_queues and ublk server threads.
kublk now has two modes of operation:

- (preexisting mode) threads and queues are paired 1:1, and each thread
  services all the I/Os of one queue
- (new mode) thread and queue counts are independently configurable.
  threads service I/Os in a way that balances load across threads even
  if load is not balanced over queues.

The default is the preexisting mode. The new mode is activated by
passing the --per_io_tasks flag.

Signed-off-by: default avatarUday Shankar <ushankar@purestorage.com>
Reviewed-by: default avatarMing Lei <ming.lei@redhat.com>
Link: https://lore.kernel.org/r/20250529-ublk_task_per_io-v8-6-e9d3b119336a@purestorage.com


Signed-off-by: default avatarJens Axboe <axboe@kernel.dk>
parent b9848ca7
Loading
Loading
Loading
Loading
+2 −2
Original line number Diff line number Diff line
@@ -54,7 +54,7 @@ static int loop_queue_tgt_rw_io(struct ublk_queue *q, const struct ublksrv_io_de

	ublk_io_alloc_sqes(ublk_get_io(q, tag), sqe, 3);

	io_uring_prep_buf_register(sqe[0], 0, tag, q->q_id, tag);
	io_uring_prep_buf_register(sqe[0], 0, tag, q->q_id, ublk_get_io(q, tag)->buf_index);
	sqe[0]->flags |= IOSQE_CQE_SKIP_SUCCESS | IOSQE_IO_HARDLINK;
	sqe[0]->user_data = build_user_data(tag,
			ublk_cmd_op_nr(sqe[0]->cmd_op), 0, q->q_id, 1);
@@ -66,7 +66,7 @@ static int loop_queue_tgt_rw_io(struct ublk_queue *q, const struct ublksrv_io_de
	sqe[1]->flags |= IOSQE_FIXED_FILE | IOSQE_IO_HARDLINK;
	sqe[1]->user_data = build_user_data(tag, ublk_op, 0, q->q_id, 1);

	io_uring_prep_buf_unregister(sqe[2], 0, tag, q->q_id, tag);
	io_uring_prep_buf_unregister(sqe[2], 0, tag, q->q_id, ublk_get_io(q, tag)->buf_index);
	sqe[2]->user_data = build_user_data(tag, ublk_cmd_op_nr(sqe[2]->cmd_op), 0, q->q_id, 1);

	return 2;
+88 −17
Original line number Diff line number Diff line
@@ -505,8 +505,11 @@ static int ublk_thread_init(struct ublk_thread *t)
	}

	if (dev->dev_info.flags & (UBLK_F_SUPPORT_ZERO_COPY | UBLK_F_AUTO_BUF_REG)) {
		unsigned nr_ios = dev->dev_info.queue_depth * dev->dev_info.nr_hw_queues;
		unsigned max_nr_ios_per_thread = nr_ios / dev->nthreads;
		max_nr_ios_per_thread += !!(nr_ios % dev->nthreads);
		ret = io_uring_register_buffers_sparse(
			&t->ring, dev->dev_info.queue_depth);
			&t->ring, max_nr_ios_per_thread);
		if (ret) {
			ublk_err("ublk dev %d thread %d register spare buffers failed %d",
					dev->dev_info.dev_id, t->idx, ret);
@@ -578,7 +581,7 @@ static void ublk_set_auto_buf_reg(const struct ublk_queue *q,
	if (q->tgt_ops->buf_index)
		buf.index = q->tgt_ops->buf_index(q, tag);
	else
		buf.index = tag;
		buf.index = q->ios[tag].buf_index;

	if (q->state & UBLKSRV_AUTO_BUF_REG_FALLBACK)
		buf.flags = UBLK_AUTO_BUF_REG_FALLBACK;
@@ -660,20 +663,46 @@ int ublk_queue_io_cmd(struct ublk_io *io)

static void ublk_submit_fetch_commands(struct ublk_thread *t)
{
	struct ublk_queue *q;
	struct ublk_io *io;
	int i = 0, j = 0;

	if (t->dev->per_io_tasks) {
		/*
	 * Service exclusively the queue whose q_id matches our thread
	 * index. This may change in the future.
		 * Lexicographically order all the (qid,tag) pairs, with
		 * qid taking priority (so (1,0) > (0,1)). Then make
		 * this thread the daemon for every Nth entry in this
		 * list (N is the number of threads), starting at this
		 * thread's index. This ensures that each queue is
		 * handled by as many ublk server threads as possible,
		 * so that load that is concentrated on one or a few
		 * queues can make use of all ublk server threads.
		 */
		const struct ublksrv_ctrl_dev_info *dinfo = &t->dev->dev_info;
		int nr_ios = dinfo->nr_hw_queues * dinfo->queue_depth;
		for (i = t->idx; i < nr_ios; i += t->dev->nthreads) {
			int q_id = i / dinfo->queue_depth;
			int tag = i % dinfo->queue_depth;
			q = &t->dev->q[q_id];
			io = &q->ios[tag];
			io->t = t;
			io->buf_index = j++;
			ublk_queue_io_cmd(io);
		}
	} else {
		/*
		 * Service exclusively the queue whose q_id matches our
		 * thread index.
		 */
		struct ublk_queue *q = &t->dev->q[t->idx];
	struct ublk_io *io;
	int i = 0;

		for (i = 0; i < q->q_depth; i++) {
			io = &q->ios[i];
			io->t = t;
			io->buf_index = i;
			ublk_queue_io_cmd(io);
		}
	}
}

static int ublk_thread_is_idle(struct ublk_thread *t)
{
@@ -826,6 +855,7 @@ static void *ublk_io_handler_fn(void *data)
		return NULL;
	}
	/* IO perf is sensitive with queue pthread affinity on NUMA machine*/
	if (info->affinity)
		ublk_thread_set_sched_affinity(t, info->affinity);
	sem_post(info->ready);

@@ -893,7 +923,7 @@ static int ublk_start_daemon(const struct dev_ctx *ctx, struct ublk_dev *dev)

	ublk_dbg(UBLK_DBG_DEV, "%s enter\n", __func__);

	tinfo = calloc(sizeof(struct ublk_thread_info), dinfo->nr_hw_queues);
	tinfo = calloc(sizeof(struct ublk_thread_info), dev->nthreads);
	if (!tinfo)
		return -ENOMEM;

@@ -919,17 +949,29 @@ static int ublk_start_daemon(const struct dev_ctx *ctx, struct ublk_dev *dev)
				 dinfo->dev_id, i);
			goto fail;
		}
	}

	for (i = 0; i < dev->nthreads; i++) {
		tinfo[i].dev = dev;
		tinfo[i].idx = i;
		tinfo[i].ready = &ready;

		/*
		 * If threads are not tied 1:1 to queues, setting thread
		 * affinity based on queue affinity makes little sense.
		 * However, thread CPU affinity has significant impact
		 * on performance, so to compare fairly, we'll still set
		 * thread CPU affinity based on queue affinity where
		 * possible.
		 */
		if (dev->nthreads == dinfo->nr_hw_queues)
			tinfo[i].affinity = &affinity_buf[i];
		pthread_create(&dev->threads[i].thread, NULL,
				ublk_io_handler_fn,
				&tinfo[i]);
	}

	for (i = 0; i < dinfo->nr_hw_queues; i++)
	for (i = 0; i < dev->nthreads; i++)
		sem_wait(&ready);
	free(tinfo);
	free(affinity_buf);
@@ -953,7 +995,7 @@ static int ublk_start_daemon(const struct dev_ctx *ctx, struct ublk_dev *dev)
		ublk_send_dev_event(ctx, dev, dev->dev_info.dev_id);

	/* wait until we are terminated */
	for (i = 0; i < dinfo->nr_hw_queues; i++)
	for (i = 0; i < dev->nthreads; i++)
		pthread_join(dev->threads[i].thread, &thread_ret);
 fail:
	for (i = 0; i < dinfo->nr_hw_queues; i++)
@@ -1063,6 +1105,7 @@ static int ublk_stop_io_daemon(const struct ublk_dev *dev)

static int __cmd_dev_add(const struct dev_ctx *ctx)
{
	unsigned nthreads = ctx->nthreads;
	unsigned nr_queues = ctx->nr_hw_queues;
	const char *tgt_type = ctx->tgt_type;
	unsigned depth = ctx->queue_depth;
@@ -1086,6 +1129,23 @@ static int __cmd_dev_add(const struct dev_ctx *ctx)
		return -EINVAL;
	}

	/* default to 1:1 threads:queues if nthreads is unspecified */
	if (!nthreads)
		nthreads = nr_queues;

	if (nthreads > UBLK_MAX_THREADS) {
		ublk_err("%s: %u is too many threads (max %u)\n",
				__func__, nthreads, UBLK_MAX_THREADS);
		return -EINVAL;
	}

	if (nthreads != nr_queues && !ctx->per_io_tasks) {
		ublk_err("%s: threads %u must be same as queues %u if "
			"not using per_io_tasks\n",
			__func__, nthreads, nr_queues);
		return -EINVAL;
	}

	dev = ublk_ctrl_init();
	if (!dev) {
		ublk_err("%s: can't alloc dev id %d, type %s\n",
@@ -1109,6 +1169,8 @@ static int __cmd_dev_add(const struct dev_ctx *ctx)
	if ((features & UBLK_F_QUIESCE) &&
			(info->flags & UBLK_F_USER_RECOVERY))
		info->flags |= UBLK_F_QUIESCE;
	dev->nthreads = nthreads;
	dev->per_io_tasks = ctx->per_io_tasks;
	dev->tgt.ops = ops;
	dev->tgt.sq_depth = depth;
	dev->tgt.cq_depth = depth;
@@ -1307,6 +1369,7 @@ static int cmd_dev_get_features(void)
		[const_ilog2(UBLK_F_UPDATE_SIZE)] = "UPDATE_SIZE",
		[const_ilog2(UBLK_F_AUTO_BUF_REG)] = "AUTO_BUF_REG",
		[const_ilog2(UBLK_F_QUIESCE)] = "QUIESCE",
		[const_ilog2(UBLK_F_PER_IO_DAEMON)] = "PER_IO_DAEMON",
	};
	struct ublk_dev *dev;
	__u64 features = 0;
@@ -1401,8 +1464,10 @@ static void __cmd_create_help(char *exe, bool recovery)
			exe, recovery ? "recover" : "add");
	printf("\t[--foreground] [--quiet] [-z] [--auto_zc] [--auto_zc_fallback] [--debug_mask mask] [-r 0|1 ] [-g]\n");
	printf("\t[-e 0|1 ] [-i 0|1]\n");
	printf("\t[--nthreads threads] [--per_io_tasks]\n");
	printf("\t[target options] [backfile1] [backfile2] ...\n");
	printf("\tdefault: nr_queues=2(max 32), depth=128(max 1024), dev_id=-1(auto allocation)\n");
	printf("\tdefault: nthreads=nr_queues");

	for (i = 0; i < sizeof(tgt_ops_list) / sizeof(tgt_ops_list[0]); i++) {
		const struct ublk_tgt_ops *ops = tgt_ops_list[i];
@@ -1459,6 +1524,8 @@ int main(int argc, char *argv[])
		{ "auto_zc",		0,	NULL,  0 },
		{ "auto_zc_fallback", 	0,	NULL,  0 },
		{ "size",		1,	NULL, 's'},
		{ "nthreads",		1,	NULL,  0 },
		{ "per_io_tasks",	0,	NULL,  0 },
		{ 0, 0, 0, 0 }
	};
	const struct ublk_tgt_ops *ops = NULL;
@@ -1534,6 +1601,10 @@ int main(int argc, char *argv[])
				ctx.flags |= UBLK_F_AUTO_BUF_REG;
			if (!strcmp(longopts[option_idx].name, "auto_zc_fallback"))
				ctx.auto_zc_fallback = 1;
			if (!strcmp(longopts[option_idx].name, "nthreads"))
				ctx.nthreads = strtol(optarg, NULL, 10);
			if (!strcmp(longopts[option_idx].name, "per_io_tasks"))
				ctx.per_io_tasks = 1;
			break;
		case '?':
			/*
+5 −0
Original line number Diff line number Diff line
@@ -80,6 +80,7 @@ struct dev_ctx {
	char tgt_type[16];
	unsigned long flags;
	unsigned nr_hw_queues;
	unsigned short nthreads;
	unsigned queue_depth;
	int dev_id;
	int nr_files;
@@ -89,6 +90,7 @@ struct dev_ctx {
	unsigned int	fg:1;
	unsigned int	recovery:1;
	unsigned int	auto_zc_fallback:1;
	unsigned int	per_io_tasks:1;

	int _evtfd;
	int _shmid;
@@ -131,6 +133,7 @@ struct ublk_io {

	int result;

	unsigned short buf_index;
	unsigned short tgt_ios;
	void *private_data;
	struct ublk_thread *t;
@@ -203,6 +206,8 @@ struct ublk_dev {
	struct ublksrv_ctrl_dev_info  dev_info;
	struct ublk_queue q[UBLK_MAX_QUEUES];
	struct ublk_thread threads[UBLK_MAX_THREADS];
	unsigned nthreads;
	unsigned per_io_tasks;

	int fds[MAX_BACK_FILES + 1];	/* fds[0] points to /dev/ublkcN */
	int nr_fds;
+3 −3
Original line number Diff line number Diff line
@@ -62,7 +62,7 @@ static int null_queue_zc_io(struct ublk_queue *q, int tag)

	ublk_io_alloc_sqes(ublk_get_io(q, tag), sqe, 3);

	io_uring_prep_buf_register(sqe[0], 0, tag, q->q_id, tag);
	io_uring_prep_buf_register(sqe[0], 0, tag, q->q_id, ublk_get_io(q, tag)->buf_index);
	sqe[0]->user_data = build_user_data(tag,
			ublk_cmd_op_nr(sqe[0]->cmd_op), 0, q->q_id, 1);
	sqe[0]->flags |= IOSQE_CQE_SKIP_SUCCESS | IOSQE_IO_HARDLINK;
@@ -70,7 +70,7 @@ static int null_queue_zc_io(struct ublk_queue *q, int tag)
	__setup_nop_io(tag, iod, sqe[1], q->q_id);
	sqe[1]->flags |= IOSQE_IO_HARDLINK;

	io_uring_prep_buf_unregister(sqe[2], 0, tag, q->q_id, tag);
	io_uring_prep_buf_unregister(sqe[2], 0, tag, q->q_id, ublk_get_io(q, tag)->buf_index);
	sqe[2]->user_data = build_user_data(tag, ublk_cmd_op_nr(sqe[2]->cmd_op), 0, q->q_id, 1);

	// buf register is marked as IOSQE_CQE_SKIP_SUCCESS
@@ -136,7 +136,7 @@ static unsigned short ublk_null_buf_index(const struct ublk_queue *q, int tag)
{
	if (q->state & UBLKSRV_AUTO_BUF_REG_FALLBACK)
		return (unsigned short)-1;
	return tag;
	return q->ios[tag].buf_index;
}

const struct ublk_tgt_ops null_tgt_ops = {
+2 −2
Original line number Diff line number Diff line
@@ -141,7 +141,7 @@ static int stripe_queue_tgt_rw_io(struct ublk_queue *q, const struct ublksrv_io_
	ublk_io_alloc_sqes(ublk_get_io(q, tag), sqe, s->nr + extra);

	if (zc) {
		io_uring_prep_buf_register(sqe[0], 0, tag, q->q_id, tag);
		io_uring_prep_buf_register(sqe[0], 0, tag, q->q_id, io->buf_index);
		sqe[0]->flags |= IOSQE_CQE_SKIP_SUCCESS | IOSQE_IO_HARDLINK;
		sqe[0]->user_data = build_user_data(tag,
			ublk_cmd_op_nr(sqe[0]->cmd_op), 0, q->q_id, 1);
@@ -167,7 +167,7 @@ static int stripe_queue_tgt_rw_io(struct ublk_queue *q, const struct ublksrv_io_
	if (zc) {
		struct io_uring_sqe *unreg = sqe[s->nr + 1];

		io_uring_prep_buf_unregister(unreg, 0, tag, q->q_id, tag);
		io_uring_prep_buf_unregister(unreg, 0, tag, q->q_id, io->buf_index);
		unreg->user_data = build_user_data(
			tag, ublk_cmd_op_nr(unreg->cmd_op), 0, q->q_id, 1);
	}