Commit b9848ca7 authored by Uday Shankar's avatar Uday Shankar Committed by Jens Axboe
Browse files

selftests: ublk: kublk: move per-thread data out of ublk_queue



Towards the goal of decoupling ublk_queues from ublk server threads,
move resources/data that should be per-thread rather than per-queue out
of ublk_queue and into a new struct ublk_thread.

Signed-off-by: default avatarUday Shankar <ushankar@purestorage.com>
Reviewed-by: default avatarMing Lei <ming.lei@redhat.com>
Link: https://lore.kernel.org/r/20250529-ublk_task_per_io-v8-5-e9d3b119336a@purestorage.com


Signed-off-by: default avatarJens Axboe <axboe@kernel.dk>
parent 8f75ba28
Loading
Loading
Loading
Loading
+119 −105
Original line number Diff line number Diff line
@@ -348,8 +348,8 @@ static void ublk_ctrl_dump(struct ublk_dev *dev)

		for (i = 0; i < info->nr_hw_queues; i++) {
			ublk_print_cpu_set(&affinity[i], buf, sizeof(buf));
			printf("\tqueue %u: tid %d affinity(%s)\n",
					i, dev->q[i].tid, buf);
			printf("\tqueue %u: affinity(%s)\n",
					i, buf);
		}
		free(affinity);
	}
@@ -419,18 +419,16 @@ static void ublk_queue_deinit(struct ublk_queue *q)
		free(q->ios[i].buf_addr);
}

static void ublk_thread_deinit(struct ublk_queue *q)
static void ublk_thread_deinit(struct ublk_thread *t)
{
	q->tid = 0;
	io_uring_unregister_buffers(&t->ring);

	io_uring_unregister_buffers(&q->ring);
	io_uring_unregister_ring_fd(&t->ring);

	io_uring_unregister_ring_fd(&q->ring);

	if (q->ring.ring_fd > 0) {
		io_uring_unregister_files(&q->ring);
		close(q->ring.ring_fd);
		q->ring.ring_fd = -1;
	if (t->ring.ring_fd > 0) {
		io_uring_unregister_files(&t->ring);
		close(t->ring.ring_fd);
		t->ring.ring_fd = -1;
	}
}

@@ -445,7 +443,6 @@ static int ublk_queue_init(struct ublk_queue *q, unsigned extra_flags)
	q->tgt_ops = dev->tgt.ops;
	q->state = 0;
	q->q_depth = depth;
	q->cmd_inflight = 0;

	if (dev->dev_info.flags & (UBLK_F_SUPPORT_ZERO_COPY | UBLK_F_AUTO_BUF_REG)) {
		q->state |= UBLKSRV_NO_BUF;
@@ -470,6 +467,7 @@ static int ublk_queue_init(struct ublk_queue *q, unsigned extra_flags)
	for (i = 0; i < q->q_depth; i++) {
		q->ios[i].buf_addr = NULL;
		q->ios[i].flags = UBLKSRV_NEED_FETCH_RQ | UBLKSRV_IO_FREE;
		q->ios[i].tag = i;

		if (q->state & UBLKSRV_NO_BUF)
			continue;
@@ -490,47 +488,46 @@ static int ublk_queue_init(struct ublk_queue *q, unsigned extra_flags)
	return -ENOMEM;
}

static int ublk_thread_init(struct ublk_queue *q)
static int ublk_thread_init(struct ublk_thread *t)
{
	struct ublk_dev *dev = q->dev;
	struct ublk_dev *dev = t->dev;
	int ring_depth = dev->tgt.sq_depth, cq_depth = dev->tgt.cq_depth;
	int ret;

	q->tid = gettid();

	ret = ublk_setup_ring(&q->ring, ring_depth, cq_depth,
	ret = ublk_setup_ring(&t->ring, ring_depth, cq_depth,
			IORING_SETUP_COOP_TASKRUN |
			IORING_SETUP_SINGLE_ISSUER |
			IORING_SETUP_DEFER_TASKRUN);
	if (ret < 0) {
		ublk_err("ublk dev %d queue %d setup io_uring failed %d\n",
				q->dev->dev_info.dev_id, q->q_id, ret);
		ublk_err("ublk dev %d thread %d setup io_uring failed %d\n",
				dev->dev_info.dev_id, t->idx, ret);
		goto fail;
	}

	if (dev->dev_info.flags & (UBLK_F_SUPPORT_ZERO_COPY | UBLK_F_AUTO_BUF_REG)) {
		ret = io_uring_register_buffers_sparse(&q->ring, q->q_depth);
		ret = io_uring_register_buffers_sparse(
			&t->ring, dev->dev_info.queue_depth);
		if (ret) {
			ublk_err("ublk dev %d queue %d register spare buffers failed %d",
					dev->dev_info.dev_id, q->q_id, ret);
			ublk_err("ublk dev %d thread %d register spare buffers failed %d",
					dev->dev_info.dev_id, t->idx, ret);
			goto fail;
		}
	}

	io_uring_register_ring_fd(&q->ring);
	io_uring_register_ring_fd(&t->ring);

	ret = io_uring_register_files(&q->ring, dev->fds, dev->nr_fds);
	ret = io_uring_register_files(&t->ring, dev->fds, dev->nr_fds);
	if (ret) {
		ublk_err("ublk dev %d queue %d register files failed %d\n",
				q->dev->dev_info.dev_id, q->q_id, ret);
		ublk_err("ublk dev %d thread %d register files failed %d\n",
				t->dev->dev_info.dev_id, t->idx, ret);
		goto fail;
	}

	return 0;
fail:
	ublk_thread_deinit(q);
	ublk_err("ublk dev %d queue %d thread init failed\n",
			dev->dev_info.dev_id, q->q_id);
	ublk_thread_deinit(t);
	ublk_err("ublk dev %d thread %d init failed\n",
			dev->dev_info.dev_id, t->idx);
	return -ENOMEM;
}

@@ -589,8 +586,10 @@ static void ublk_set_auto_buf_reg(const struct ublk_queue *q,
	sqe->addr = ublk_auto_buf_reg_to_sqe_addr(&buf);
}

int ublk_queue_io_cmd(struct ublk_queue *q, struct ublk_io *io, unsigned tag)
int ublk_queue_io_cmd(struct ublk_io *io)
{
	struct ublk_thread *t = io->t;
	struct ublk_queue *q = ublk_io_to_queue(io);
	struct ublksrv_io_cmd *cmd;
	struct io_uring_sqe *sqe[1];
	unsigned int cmd_op = 0;
@@ -615,13 +614,13 @@ int ublk_queue_io_cmd(struct ublk_queue *q, struct ublk_io *io, unsigned tag)
	else if (io->flags & UBLKSRV_NEED_FETCH_RQ)
		cmd_op = UBLK_U_IO_FETCH_REQ;

	if (io_uring_sq_space_left(&q->ring) < 1)
		io_uring_submit(&q->ring);
	if (io_uring_sq_space_left(&t->ring) < 1)
		io_uring_submit(&t->ring);

	ublk_io_alloc_sqes(ublk_get_io(q, tag), sqe, 1);
	ublk_io_alloc_sqes(io, sqe, 1);
	if (!sqe[0]) {
		ublk_err("%s: run out of sqe %d, tag %d\n",
				__func__, q->q_id, tag);
		ublk_err("%s: run out of sqe. thread %u, tag %d\n",
				__func__, t->idx, io->tag);
		return -1;
	}

@@ -636,7 +635,7 @@ int ublk_queue_io_cmd(struct ublk_queue *q, struct ublk_io *io, unsigned tag)
	sqe[0]->opcode	= IORING_OP_URING_CMD;
	sqe[0]->flags	= IOSQE_FIXED_FILE;
	sqe[0]->rw_flags	= 0;
	cmd->tag	= tag;
	cmd->tag	= io->tag;
	cmd->q_id	= q->q_id;
	if (!(q->state & UBLKSRV_NO_BUF))
		cmd->addr	= (__u64) (uintptr_t) io->buf_addr;
@@ -644,37 +643,46 @@ int ublk_queue_io_cmd(struct ublk_queue *q, struct ublk_io *io, unsigned tag)
		cmd->addr	= 0;

	if (q->state & UBLKSRV_AUTO_BUF_REG)
		ublk_set_auto_buf_reg(q, sqe[0], tag);
		ublk_set_auto_buf_reg(q, sqe[0], io->tag);

	user_data = build_user_data(tag, _IOC_NR(cmd_op), 0, q->q_id, 0);
	user_data = build_user_data(io->tag, _IOC_NR(cmd_op), 0, q->q_id, 0);
	io_uring_sqe_set_data64(sqe[0], user_data);

	io->flags = 0;

	q->cmd_inflight += 1;
	t->cmd_inflight += 1;

	ublk_dbg(UBLK_DBG_IO_CMD, "%s: (qid %d tag %u cmd_op %u) iof %x stopping %d\n",
			__func__, q->q_id, tag, cmd_op,
			io->flags, !!(q->state & UBLKSRV_QUEUE_STOPPING));
	ublk_dbg(UBLK_DBG_IO_CMD, "%s: (thread %u qid %d tag %u cmd_op %u) iof %x stopping %d\n",
			__func__, t->idx, q->q_id, io->tag, cmd_op,
			io->flags, !!(t->state & UBLKSRV_THREAD_STOPPING));
	return 1;
}

static void ublk_submit_fetch_commands(struct ublk_queue *q)
static void ublk_submit_fetch_commands(struct ublk_thread *t)
{
	/*
	 * Service exclusively the queue whose q_id matches our thread
	 * index. This may change in the future.
	 */
	struct ublk_queue *q = &t->dev->q[t->idx];
	struct ublk_io *io;
	int i = 0;

	for (i = 0; i < q->q_depth; i++)
		ublk_queue_io_cmd(q, &q->ios[i], i);
	for (i = 0; i < q->q_depth; i++) {
		io = &q->ios[i];
		io->t = t;
		ublk_queue_io_cmd(io);
	}
}

static int ublk_queue_is_idle(struct ublk_queue *q)
static int ublk_thread_is_idle(struct ublk_thread *t)
{
	return !io_uring_sq_ready(&q->ring) && !q->io_inflight;
	return !io_uring_sq_ready(&t->ring) && !t->io_inflight;
}

static int ublk_queue_is_done(struct ublk_queue *q)
static int ublk_thread_is_done(struct ublk_thread *t)
{
	return (q->state & UBLKSRV_QUEUE_STOPPING) && ublk_queue_is_idle(q);
	return (t->state & UBLKSRV_THREAD_STOPPING) && ublk_thread_is_idle(t);
}

static inline void ublksrv_handle_tgt_cqe(struct ublk_queue *q,
@@ -692,15 +700,16 @@ static inline void ublksrv_handle_tgt_cqe(struct ublk_queue *q,
		q->tgt_ops->tgt_io_done(q, tag, cqe);
}

static void ublk_handle_cqe(struct ublk_dev *dev,
static void ublk_handle_cqe(struct ublk_thread *t,
		struct io_uring_cqe *cqe, void *data)
{
	struct ublk_dev *dev = t->dev;
	unsigned q_id = user_data_to_q_id(cqe->user_data);
	struct ublk_queue *q = &dev->q[q_id];
	unsigned tag = user_data_to_tag(cqe->user_data);
	unsigned cmd_op = user_data_to_op(cqe->user_data);
	int fetch = (cqe->res != UBLK_IO_RES_ABORT) &&
		!(q->state & UBLKSRV_QUEUE_STOPPING);
		!(t->state & UBLKSRV_THREAD_STOPPING);
	struct ublk_io *io;

	if (cqe->res < 0 && cqe->res != -ENODEV)
@@ -711,7 +720,7 @@ static void ublk_handle_cqe(struct ublk_dev *dev,
			__func__, cqe->res, q->q_id, tag, cmd_op,
			is_target_io(cqe->user_data),
			user_data_to_tgt_data(cqe->user_data),
			(q->state & UBLKSRV_QUEUE_STOPPING));
			(t->state & UBLKSRV_THREAD_STOPPING));

	/* Don't retrieve io in case of target io */
	if (is_target_io(cqe->user_data)) {
@@ -720,10 +729,10 @@ static void ublk_handle_cqe(struct ublk_dev *dev,
	}

	io = &q->ios[tag];
	q->cmd_inflight--;
	t->cmd_inflight--;

	if (!fetch) {
		q->state |= UBLKSRV_QUEUE_STOPPING;
		t->state |= UBLKSRV_THREAD_STOPPING;
		io->flags &= ~UBLKSRV_NEED_FETCH_RQ;
	}

@@ -733,7 +742,7 @@ static void ublk_handle_cqe(struct ublk_dev *dev,
			q->tgt_ops->queue_io(q, tag);
	} else if (cqe->res == UBLK_IO_RES_NEED_GET_DATA) {
		io->flags |= UBLKSRV_NEED_GET_DATA | UBLKSRV_IO_FREE;
		ublk_queue_io_cmd(q, io, tag);
		ublk_queue_io_cmd(io);
	} else {
		/*
		 * COMMIT_REQ will be completed immediately since no fetching
@@ -747,87 +756,92 @@ static void ublk_handle_cqe(struct ublk_dev *dev,
	}
}

static int ublk_reap_events_uring(struct ublk_queue *q)
static int ublk_reap_events_uring(struct ublk_thread *t)
{
	struct io_uring_cqe *cqe;
	unsigned head;
	int count = 0;

	io_uring_for_each_cqe(&q->ring, head, cqe) {
		ublk_handle_cqe(q->dev, cqe, NULL);
	io_uring_for_each_cqe(&t->ring, head, cqe) {
		ublk_handle_cqe(t, cqe, NULL);
		count += 1;
	}
	io_uring_cq_advance(&q->ring, count);
	io_uring_cq_advance(&t->ring, count);

	return count;
}

static int ublk_process_io(struct ublk_queue *q)
static int ublk_process_io(struct ublk_thread *t)
{
	int ret, reapped;

	ublk_dbg(UBLK_DBG_QUEUE, "dev%d-q%d: to_submit %d inflight cmd %u stopping %d\n",
				q->dev->dev_info.dev_id,
				q->q_id, io_uring_sq_ready(&q->ring),
				q->cmd_inflight,
				(q->state & UBLKSRV_QUEUE_STOPPING));
	ublk_dbg(UBLK_DBG_THREAD, "dev%d-t%u: to_submit %d inflight cmd %u stopping %d\n",
				t->dev->dev_info.dev_id,
				t->idx, io_uring_sq_ready(&t->ring),
				t->cmd_inflight,
				(t->state & UBLKSRV_THREAD_STOPPING));

	if (ublk_queue_is_done(q))
	if (ublk_thread_is_done(t))
		return -ENODEV;

	ret = io_uring_submit_and_wait(&q->ring, 1);
	reapped = ublk_reap_events_uring(q);
	ret = io_uring_submit_and_wait(&t->ring, 1);
	reapped = ublk_reap_events_uring(t);

	ublk_dbg(UBLK_DBG_QUEUE, "submit result %d, reapped %d stop %d idle %d\n",
			ret, reapped, (q->state & UBLKSRV_QUEUE_STOPPING),
			(q->state & UBLKSRV_QUEUE_IDLE));
	ublk_dbg(UBLK_DBG_THREAD, "submit result %d, reapped %d stop %d idle %d\n",
			ret, reapped, (t->state & UBLKSRV_THREAD_STOPPING),
			(t->state & UBLKSRV_THREAD_IDLE));

	return reapped;
}

static void ublk_queue_set_sched_affinity(const struct ublk_queue *q,
static void ublk_thread_set_sched_affinity(const struct ublk_thread *t,
		cpu_set_t *cpuset)
{
        if (sched_setaffinity(0, sizeof(*cpuset), cpuset) < 0)
                ublk_err("ublk dev %u queue %u set affinity failed",
                                q->dev->dev_info.dev_id, q->q_id);
		ublk_err("ublk dev %u thread %u set affinity failed",
				t->dev->dev_info.dev_id, t->idx);
}

struct ublk_queue_info {
	struct ublk_queue 	*q;
	sem_t 			*queue_sem;
struct ublk_thread_info {
	struct ublk_dev 	*dev;
	unsigned		idx;
	sem_t 			*ready;
	cpu_set_t 		*affinity;
};

static void *ublk_io_handler_fn(void *data)
{
	struct ublk_queue_info *info = data;
	struct ublk_queue *q = info->q;
	int dev_id = q->dev->dev_info.dev_id;
	struct ublk_thread_info *info = data;
	struct ublk_thread *t = &info->dev->threads[info->idx];
	int dev_id = info->dev->dev_info.dev_id;
	int ret;

	ret = ublk_thread_init(q);
	t->dev = info->dev;
	t->idx = info->idx;

	ret = ublk_thread_init(t);
	if (ret) {
		ublk_err("ublk dev %d queue %d thread init failed\n",
				dev_id, q->q_id);
		ublk_err("ublk dev %d thread %u init failed\n",
				dev_id, t->idx);
		return NULL;
	}
	/* IO perf is sensitive with queue pthread affinity on NUMA machine*/
	ublk_queue_set_sched_affinity(q, info->affinity);
	sem_post(info->queue_sem);
	ublk_thread_set_sched_affinity(t, info->affinity);
	sem_post(info->ready);

	ublk_dbg(UBLK_DBG_QUEUE, "tid %d: ublk dev %d queue %d started\n",
			q->tid, dev_id, q->q_id);
	ublk_dbg(UBLK_DBG_THREAD, "tid %d: ublk dev %d thread %u started\n",
			gettid(), dev_id, t->idx);

	/* submit all io commands to ublk driver */
	ublk_submit_fetch_commands(q);
	ublk_submit_fetch_commands(t);
	do {
		if (ublk_process_io(q) < 0)
		if (ublk_process_io(t) < 0)
			break;
	} while (1);

	ublk_dbg(UBLK_DBG_QUEUE, "ublk dev %d queue %d exited\n", dev_id, q->q_id);
	ublk_thread_deinit(q);
	ublk_dbg(UBLK_DBG_THREAD, "tid %d: ublk dev %d thread %d exiting\n",
		 gettid(), dev_id, t->idx);
	ublk_thread_deinit(t);
	return NULL;
}

@@ -870,21 +884,20 @@ static int ublk_send_dev_event(const struct dev_ctx *ctx, struct ublk_dev *dev,
static int ublk_start_daemon(const struct dev_ctx *ctx, struct ublk_dev *dev)
{
	const struct ublksrv_ctrl_dev_info *dinfo = &dev->dev_info;
	struct ublk_queue_info *qinfo;
	struct ublk_thread_info *tinfo;
	unsigned extra_flags = 0;
	cpu_set_t *affinity_buf;
	void *thread_ret;
	sem_t queue_sem;
	sem_t ready;
	int ret, i;

	ublk_dbg(UBLK_DBG_DEV, "%s enter\n", __func__);

	qinfo = (struct ublk_queue_info *)calloc(sizeof(struct ublk_queue_info),
			dinfo->nr_hw_queues);
	if (!qinfo)
	tinfo = calloc(sizeof(struct ublk_thread_info), dinfo->nr_hw_queues);
	if (!tinfo)
		return -ENOMEM;

	sem_init(&queue_sem, 0, 0);
	sem_init(&ready, 0, 0);
	ret = ublk_dev_prep(ctx, dev);
	if (ret)
		return ret;
@@ -907,17 +920,18 @@ static int ublk_start_daemon(const struct dev_ctx *ctx, struct ublk_dev *dev)
			goto fail;
		}

		qinfo[i].q = &dev->q[i];
		qinfo[i].queue_sem = &queue_sem;
		qinfo[i].affinity = &affinity_buf[i];
		pthread_create(&dev->q[i].thread, NULL,
		tinfo[i].dev = dev;
		tinfo[i].idx = i;
		tinfo[i].ready = &ready;
		tinfo[i].affinity = &affinity_buf[i];
		pthread_create(&dev->threads[i].thread, NULL,
				ublk_io_handler_fn,
				&qinfo[i]);
				&tinfo[i]);
	}

	for (i = 0; i < dinfo->nr_hw_queues; i++)
		sem_wait(&queue_sem);
	free(qinfo);
		sem_wait(&ready);
	free(tinfo);
	free(affinity_buf);

	/* everything is fine now, start us */
@@ -940,7 +954,7 @@ static int ublk_start_daemon(const struct dev_ctx *ctx, struct ublk_dev *dev)

	/* wait until we are terminated */
	for (i = 0; i < dinfo->nr_hw_queues; i++)
		pthread_join(dev->q[i].thread, &thread_ret);
		pthread_join(dev->threads[i].thread, &thread_ret);
 fail:
	for (i = 0; i < dinfo->nr_hw_queues; i++)
		ublk_queue_deinit(&dev->q[i]);
+25 −12
Original line number Diff line number Diff line
@@ -51,10 +51,12 @@
#define UBLK_IO_MAX_BYTES               (1 << 20)
#define UBLK_MAX_QUEUES_SHIFT		5
#define UBLK_MAX_QUEUES                 (1 << UBLK_MAX_QUEUES_SHIFT)
#define UBLK_MAX_THREADS_SHIFT		5
#define UBLK_MAX_THREADS		(1 << UBLK_MAX_THREADS_SHIFT)
#define UBLK_QUEUE_DEPTH                1024

#define UBLK_DBG_DEV            (1U << 0)
#define UBLK_DBG_QUEUE          (1U << 1)
#define UBLK_DBG_THREAD         (1U << 1)
#define UBLK_DBG_IO_CMD         (1U << 2)
#define UBLK_DBG_IO             (1U << 3)
#define UBLK_DBG_CTRL_CMD       (1U << 4)
@@ -62,6 +64,7 @@

struct ublk_dev;
struct ublk_queue;
struct ublk_thread;

struct stripe_ctx {
	/* stripe */
@@ -130,6 +133,7 @@ struct ublk_io {

	unsigned short tgt_ios;
	void *private_data;
	struct ublk_thread *t;
};

struct ublk_tgt_ops {
@@ -168,28 +172,37 @@ struct ublk_tgt {
struct ublk_queue {
	int q_id;
	int q_depth;
	unsigned int cmd_inflight;
	unsigned int io_inflight;
	struct ublk_dev *dev;
	const struct ublk_tgt_ops *tgt_ops;
	struct ublksrv_io_desc *io_cmd_buf;
	struct io_uring ring;

	struct ublk_io ios[UBLK_QUEUE_DEPTH];
#define UBLKSRV_QUEUE_STOPPING	(1U << 0)
#define UBLKSRV_QUEUE_IDLE	(1U << 1)
#define UBLKSRV_NO_BUF		(1U << 2)
#define UBLKSRV_ZC		(1U << 3)
#define UBLKSRV_AUTO_BUF_REG		(1U << 4)
#define UBLKSRV_AUTO_BUF_REG_FALLBACK	(1U << 5)
	unsigned state;
	pid_t tid;
};

struct ublk_thread {
	struct ublk_dev *dev;
	struct io_uring ring;
	unsigned int cmd_inflight;
	unsigned int io_inflight;

	pthread_t thread;
	unsigned idx;

#define UBLKSRV_THREAD_STOPPING	(1U << 0)
#define UBLKSRV_THREAD_IDLE	(1U << 1)
	unsigned state;
};

struct ublk_dev {
	struct ublk_tgt tgt;
	struct ublksrv_ctrl_dev_info  dev_info;
	struct ublk_queue q[UBLK_MAX_QUEUES];
	struct ublk_thread threads[UBLK_MAX_THREADS];

	int fds[MAX_BACK_FILES + 1];	/* fds[0] points to /dev/ublkcN */
	int nr_fds;
@@ -214,7 +227,7 @@ struct ublk_dev {


extern unsigned int ublk_dbg_mask;
extern int ublk_queue_io_cmd(struct ublk_queue *q, struct ublk_io *io, unsigned tag);
extern int ublk_queue_io_cmd(struct ublk_io *io);


static inline int ublk_io_auto_zc_fallback(const struct ublksrv_io_desc *iod)
@@ -299,7 +312,7 @@ static inline struct ublk_queue *ublk_io_to_queue(const struct ublk_io *io)
static inline int ublk_io_alloc_sqes(struct ublk_io *io,
		struct io_uring_sqe *sqes[], int nr_sqes)
{
	struct io_uring *ring = &ublk_io_to_queue(io)->ring;
	struct io_uring *ring = &io->t->ring;
	unsigned left = io_uring_sq_space_left(ring);
	int i;

@@ -390,7 +403,7 @@ static inline int ublk_complete_io(struct ublk_queue *q, unsigned tag, int res)

	ublk_mark_io_done(io, res);

	return ublk_queue_io_cmd(q, io, tag);
	return ublk_queue_io_cmd(io);
}

static inline void ublk_queued_tgt_io(struct ublk_queue *q, unsigned tag, int queued)
@@ -400,7 +413,7 @@ static inline void ublk_queued_tgt_io(struct ublk_queue *q, unsigned tag, int qu
	else {
		struct ublk_io *io = ublk_get_io(q, tag);

		q->io_inflight += queued;
		io->t->io_inflight += queued;
		io->tgt_ios = queued;
		io->result = 0;
	}
@@ -410,7 +423,7 @@ static inline int ublk_completed_tgt_io(struct ublk_queue *q, unsigned tag)
{
	struct ublk_io *io = ublk_get_io(q, tag);

	q->io_inflight--;
	io->t->io_inflight--;

	return --io->tgt_ios == 0;
}