mirror of
https://git.kernel.org/pub/scm/linux/kernel/git/netdev/net.git/
synced 2026-04-18 06:33:43 -04:00
Enable flexible thread-to-queue mapping in batch I/O mode to support arbitrary combinations of threads and queues, improving resource utilization and scalability. Key improvements: - Support N:M thread-to-queue mapping (previously limited to 1:1) - Dynamic buffer allocation based on actual queue assignment per thread - Thread-safe queue preparation with spinlock protection - Intelligent buffer index calculation for multi-queue scenarios - Enhanced validation for thread/queue combination constraints Implementation details: - Add q_thread_map matrix to track queue-to-thread assignments - Dynamic allocation of commit and fetch buffers per thread - Round-robin queue assignment algorithm for load balancing - Per-queue spinlock to prevent race conditions during prep - Updated buffer index calculation using queue position within thread This enables efficient configurations like: - Any other N:M combinations for optimal resource matching Testing: - Added test_batch_02.sh: 4 threads vs 1 queue - Added test_batch_03.sh: 1 thread vs 4 queues - Validates correctness across different mapping scenarios Signed-off-by: Ming Lei <ming.lei@redhat.com> Signed-off-by: Jens Axboe <axboe@kernel.dk>
608 lines
15 KiB
C
608 lines
15 KiB
C
/* SPDX-License-Identifier: MIT */
|
|
/*
|
|
* Description: UBLK_F_BATCH_IO buffer management
|
|
*/
|
|
|
|
#include "kublk.h"
|
|
|
|
static inline void *ublk_get_commit_buf(struct ublk_thread *t,
|
|
unsigned short buf_idx)
|
|
{
|
|
unsigned idx;
|
|
|
|
if (buf_idx < t->commit_buf_start ||
|
|
buf_idx >= t->commit_buf_start + t->nr_commit_buf)
|
|
return NULL;
|
|
idx = buf_idx - t->commit_buf_start;
|
|
return t->commit_buf + idx * t->commit_buf_size;
|
|
}
|
|
|
|
/*
|
|
* Allocate one buffer for UBLK_U_IO_PREP_IO_CMDS or UBLK_U_IO_COMMIT_IO_CMDS
|
|
*
|
|
* Buffer index is returned.
|
|
*/
|
|
static inline unsigned short ublk_alloc_commit_buf(struct ublk_thread *t)
|
|
{
|
|
int idx = allocator_get(&t->commit_buf_alloc);
|
|
|
|
if (idx >= 0)
|
|
return idx + t->commit_buf_start;
|
|
return UBLKS_T_COMMIT_BUF_INV_IDX;
|
|
}
|
|
|
|
/*
|
|
* Free one commit buffer which is used by UBLK_U_IO_PREP_IO_CMDS or
|
|
* UBLK_U_IO_COMMIT_IO_CMDS
|
|
*/
|
|
static inline void ublk_free_commit_buf(struct ublk_thread *t,
|
|
unsigned short i)
|
|
{
|
|
unsigned short idx = i - t->commit_buf_start;
|
|
|
|
ublk_assert(idx < t->nr_commit_buf);
|
|
ublk_assert(allocator_get_val(&t->commit_buf_alloc, idx) != 0);
|
|
|
|
allocator_put(&t->commit_buf_alloc, idx);
|
|
}
|
|
|
|
static unsigned char ublk_commit_elem_buf_size(struct ublk_dev *dev)
|
|
{
|
|
if (dev->dev_info.flags & (UBLK_F_SUPPORT_ZERO_COPY | UBLK_F_USER_COPY |
|
|
UBLK_F_AUTO_BUF_REG))
|
|
return 8;
|
|
|
|
/* one extra 8bytes for carrying buffer address */
|
|
return 16;
|
|
}
|
|
|
|
static unsigned ublk_commit_buf_size(struct ublk_thread *t)
|
|
{
|
|
struct ublk_dev *dev = t->dev;
|
|
unsigned elem_size = ublk_commit_elem_buf_size(dev);
|
|
unsigned int total = elem_size * dev->dev_info.queue_depth;
|
|
unsigned int page_sz = getpagesize();
|
|
|
|
return round_up(total, page_sz);
|
|
}
|
|
|
|
static void free_batch_commit_buf(struct ublk_thread *t)
|
|
{
|
|
if (t->commit_buf) {
|
|
unsigned buf_size = ublk_commit_buf_size(t);
|
|
unsigned int total = buf_size * t->nr_commit_buf;
|
|
|
|
munlock(t->commit_buf, total);
|
|
free(t->commit_buf);
|
|
}
|
|
allocator_deinit(&t->commit_buf_alloc);
|
|
free(t->commit);
|
|
}
|
|
|
|
static int alloc_batch_commit_buf(struct ublk_thread *t)
|
|
{
|
|
unsigned buf_size = ublk_commit_buf_size(t);
|
|
unsigned int total = buf_size * t->nr_commit_buf;
|
|
unsigned int page_sz = getpagesize();
|
|
void *buf = NULL;
|
|
int i, ret, j = 0;
|
|
|
|
t->commit = calloc(t->nr_queues, sizeof(*t->commit));
|
|
for (i = 0; i < t->dev->dev_info.nr_hw_queues; i++) {
|
|
if (t->q_map[i])
|
|
t->commit[j++].q_id = i;
|
|
}
|
|
|
|
allocator_init(&t->commit_buf_alloc, t->nr_commit_buf);
|
|
|
|
t->commit_buf = NULL;
|
|
ret = posix_memalign(&buf, page_sz, total);
|
|
if (ret || !buf)
|
|
goto fail;
|
|
|
|
t->commit_buf = buf;
|
|
|
|
/* lock commit buffer pages for fast access */
|
|
if (mlock(t->commit_buf, total))
|
|
ublk_err("%s: can't lock commit buffer %s\n", __func__,
|
|
strerror(errno));
|
|
|
|
return 0;
|
|
|
|
fail:
|
|
free_batch_commit_buf(t);
|
|
return ret;
|
|
}
|
|
|
|
static unsigned int ublk_thread_nr_queues(const struct ublk_thread *t)
|
|
{
|
|
int i;
|
|
int ret = 0;
|
|
|
|
for (i = 0; i < t->dev->dev_info.nr_hw_queues; i++)
|
|
ret += !!t->q_map[i];
|
|
|
|
return ret;
|
|
}
|
|
|
|
void ublk_batch_prepare(struct ublk_thread *t)
|
|
{
|
|
/*
|
|
* We only handle single device in this thread context.
|
|
*
|
|
* All queues have same feature flags, so use queue 0's for
|
|
* calculate uring_cmd flags.
|
|
*
|
|
* This way looks not elegant, but it works so far.
|
|
*/
|
|
struct ublk_queue *q = &t->dev->q[0];
|
|
|
|
/* cache nr_queues because we don't support dynamic load-balance yet */
|
|
t->nr_queues = ublk_thread_nr_queues(t);
|
|
|
|
t->commit_buf_elem_size = ublk_commit_elem_buf_size(t->dev);
|
|
t->commit_buf_size = ublk_commit_buf_size(t);
|
|
t->commit_buf_start = t->nr_bufs;
|
|
t->nr_commit_buf = 2 * t->nr_queues;
|
|
t->nr_bufs += t->nr_commit_buf;
|
|
|
|
t->cmd_flags = 0;
|
|
if (ublk_queue_use_auto_zc(q)) {
|
|
if (ublk_queue_auto_zc_fallback(q))
|
|
t->cmd_flags |= UBLK_BATCH_F_AUTO_BUF_REG_FALLBACK;
|
|
} else if (!ublk_queue_no_buf(q))
|
|
t->cmd_flags |= UBLK_BATCH_F_HAS_BUF_ADDR;
|
|
|
|
t->state |= UBLKS_T_BATCH_IO;
|
|
|
|
ublk_log("%s: thread %d commit(nr_bufs %u, buf_size %u, start %u)\n",
|
|
__func__, t->idx,
|
|
t->nr_commit_buf, t->commit_buf_size,
|
|
t->nr_bufs);
|
|
}
|
|
|
|
static void free_batch_fetch_buf(struct ublk_thread *t)
|
|
{
|
|
int i;
|
|
|
|
for (i = 0; i < t->nr_fetch_bufs; i++) {
|
|
io_uring_free_buf_ring(&t->ring, t->fetch[i].br, 1, i);
|
|
munlock(t->fetch[i].fetch_buf, t->fetch[i].fetch_buf_size);
|
|
free(t->fetch[i].fetch_buf);
|
|
}
|
|
free(t->fetch);
|
|
}
|
|
|
|
static int alloc_batch_fetch_buf(struct ublk_thread *t)
|
|
{
|
|
/* page aligned fetch buffer, and it is mlocked for speedup delivery */
|
|
unsigned pg_sz = getpagesize();
|
|
unsigned buf_size = round_up(t->dev->dev_info.queue_depth * 2, pg_sz);
|
|
int ret;
|
|
int i = 0;
|
|
|
|
/* double fetch buffer for each queue */
|
|
t->nr_fetch_bufs = t->nr_queues * 2;
|
|
t->fetch = calloc(t->nr_fetch_bufs, sizeof(*t->fetch));
|
|
|
|
/* allocate one buffer for each queue */
|
|
for (i = 0; i < t->nr_fetch_bufs; i++) {
|
|
t->fetch[i].fetch_buf_size = buf_size;
|
|
|
|
if (posix_memalign((void **)&t->fetch[i].fetch_buf, pg_sz,
|
|
t->fetch[i].fetch_buf_size))
|
|
return -ENOMEM;
|
|
|
|
/* lock fetch buffer page for fast fetching */
|
|
if (mlock(t->fetch[i].fetch_buf, t->fetch[i].fetch_buf_size))
|
|
ublk_err("%s: can't lock fetch buffer %s\n", __func__,
|
|
strerror(errno));
|
|
t->fetch[i].br = io_uring_setup_buf_ring(&t->ring, 1,
|
|
i, IOU_PBUF_RING_INC, &ret);
|
|
if (!t->fetch[i].br) {
|
|
ublk_err("Buffer ring register failed %d\n", ret);
|
|
return ret;
|
|
}
|
|
}
|
|
|
|
return 0;
|
|
}
|
|
|
|
int ublk_batch_alloc_buf(struct ublk_thread *t)
|
|
{
|
|
int ret;
|
|
|
|
ublk_assert(t->nr_commit_buf < 2 * UBLK_MAX_QUEUES);
|
|
|
|
ret = alloc_batch_commit_buf(t);
|
|
if (ret)
|
|
return ret;
|
|
return alloc_batch_fetch_buf(t);
|
|
}
|
|
|
|
void ublk_batch_free_buf(struct ublk_thread *t)
|
|
{
|
|
free_batch_commit_buf(t);
|
|
free_batch_fetch_buf(t);
|
|
}
|
|
|
|
static void ublk_init_batch_cmd(struct ublk_thread *t, __u16 q_id,
|
|
struct io_uring_sqe *sqe, unsigned op,
|
|
unsigned short elem_bytes,
|
|
unsigned short nr_elem,
|
|
unsigned short buf_idx)
|
|
{
|
|
struct ublk_batch_io *cmd;
|
|
__u64 user_data;
|
|
|
|
cmd = (struct ublk_batch_io *)ublk_get_sqe_cmd(sqe);
|
|
|
|
ublk_set_sqe_cmd_op(sqe, op);
|
|
|
|
sqe->fd = 0; /* dev->fds[0] */
|
|
sqe->opcode = IORING_OP_URING_CMD;
|
|
sqe->flags = IOSQE_FIXED_FILE;
|
|
|
|
cmd->q_id = q_id;
|
|
cmd->flags = 0;
|
|
cmd->reserved = 0;
|
|
cmd->elem_bytes = elem_bytes;
|
|
cmd->nr_elem = nr_elem;
|
|
|
|
user_data = build_user_data(buf_idx, _IOC_NR(op), nr_elem, q_id, 0);
|
|
io_uring_sqe_set_data64(sqe, user_data);
|
|
|
|
t->cmd_inflight += 1;
|
|
|
|
ublk_dbg(UBLK_DBG_IO_CMD, "%s: thread %u qid %d cmd_op %x data %lx "
|
|
"nr_elem %u elem_bytes %u buf_size %u buf_idx %d "
|
|
"cmd_inflight %u\n",
|
|
__func__, t->idx, q_id, op, user_data,
|
|
cmd->nr_elem, cmd->elem_bytes,
|
|
nr_elem * elem_bytes, buf_idx, t->cmd_inflight);
|
|
}
|
|
|
|
static void ublk_setup_commit_sqe(struct ublk_thread *t,
|
|
struct io_uring_sqe *sqe,
|
|
unsigned short buf_idx)
|
|
{
|
|
struct ublk_batch_io *cmd;
|
|
|
|
cmd = (struct ublk_batch_io *)ublk_get_sqe_cmd(sqe);
|
|
|
|
/* Use plain user buffer instead of fixed buffer */
|
|
cmd->flags |= t->cmd_flags;
|
|
}
|
|
|
|
static void ublk_batch_queue_fetch(struct ublk_thread *t,
|
|
struct ublk_queue *q,
|
|
unsigned short buf_idx)
|
|
{
|
|
unsigned short nr_elem = t->fetch[buf_idx].fetch_buf_size / 2;
|
|
struct io_uring_sqe *sqe;
|
|
|
|
io_uring_buf_ring_add(t->fetch[buf_idx].br, t->fetch[buf_idx].fetch_buf,
|
|
t->fetch[buf_idx].fetch_buf_size,
|
|
0, 0, 0);
|
|
io_uring_buf_ring_advance(t->fetch[buf_idx].br, 1);
|
|
|
|
ublk_io_alloc_sqes(t, &sqe, 1);
|
|
|
|
ublk_init_batch_cmd(t, q->q_id, sqe, UBLK_U_IO_FETCH_IO_CMDS, 2, nr_elem,
|
|
buf_idx);
|
|
|
|
sqe->rw_flags= IORING_URING_CMD_MULTISHOT;
|
|
sqe->buf_group = buf_idx;
|
|
sqe->flags |= IOSQE_BUFFER_SELECT;
|
|
|
|
t->fetch[buf_idx].fetch_buf_off = 0;
|
|
}
|
|
|
|
void ublk_batch_start_fetch(struct ublk_thread *t)
|
|
{
|
|
int i;
|
|
int j = 0;
|
|
|
|
for (i = 0; i < t->dev->dev_info.nr_hw_queues; i++) {
|
|
if (t->q_map[i]) {
|
|
struct ublk_queue *q = &t->dev->q[i];
|
|
|
|
/* submit two fetch commands for each queue */
|
|
ublk_batch_queue_fetch(t, q, j++);
|
|
ublk_batch_queue_fetch(t, q, j++);
|
|
}
|
|
}
|
|
}
|
|
|
|
static unsigned short ublk_compl_batch_fetch(struct ublk_thread *t,
|
|
struct ublk_queue *q,
|
|
const struct io_uring_cqe *cqe)
|
|
{
|
|
unsigned short buf_idx = user_data_to_tag(cqe->user_data);
|
|
unsigned start = t->fetch[buf_idx].fetch_buf_off;
|
|
unsigned end = start + cqe->res;
|
|
void *buf = t->fetch[buf_idx].fetch_buf;
|
|
int i;
|
|
|
|
if (cqe->res < 0)
|
|
return buf_idx;
|
|
|
|
if ((end - start) / 2 > q->q_depth) {
|
|
ublk_err("%s: fetch duplicated ios offset %u count %u\n", __func__, start, cqe->res);
|
|
|
|
for (i = start; i < end; i += 2) {
|
|
unsigned short tag = *(unsigned short *)(buf + i);
|
|
|
|
ublk_err("%u ", tag);
|
|
}
|
|
ublk_err("\n");
|
|
}
|
|
|
|
for (i = start; i < end; i += 2) {
|
|
unsigned short tag = *(unsigned short *)(buf + i);
|
|
|
|
if (tag >= q->q_depth)
|
|
ublk_err("%s: bad tag %u\n", __func__, tag);
|
|
|
|
if (q->tgt_ops->queue_io)
|
|
q->tgt_ops->queue_io(t, q, tag);
|
|
}
|
|
t->fetch[buf_idx].fetch_buf_off = end;
|
|
return buf_idx;
|
|
}
|
|
|
|
static int __ublk_batch_queue_prep_io_cmds(struct ublk_thread *t, struct ublk_queue *q)
|
|
{
|
|
unsigned short nr_elem = q->q_depth;
|
|
unsigned short buf_idx = ublk_alloc_commit_buf(t);
|
|
struct io_uring_sqe *sqe;
|
|
void *buf;
|
|
int i;
|
|
|
|
ublk_assert(buf_idx != UBLKS_T_COMMIT_BUF_INV_IDX);
|
|
|
|
ublk_io_alloc_sqes(t, &sqe, 1);
|
|
|
|
ublk_assert(nr_elem == q->q_depth);
|
|
buf = ublk_get_commit_buf(t, buf_idx);
|
|
for (i = 0; i < nr_elem; i++) {
|
|
struct ublk_batch_elem *elem = (struct ublk_batch_elem *)(
|
|
buf + i * t->commit_buf_elem_size);
|
|
struct ublk_io *io = &q->ios[i];
|
|
|
|
elem->tag = i;
|
|
elem->result = 0;
|
|
|
|
if (ublk_queue_use_auto_zc(q))
|
|
elem->buf_index = ublk_batch_io_buf_idx(t, q, i);
|
|
else if (!ublk_queue_no_buf(q))
|
|
elem->buf_addr = (__u64)io->buf_addr;
|
|
}
|
|
|
|
sqe->addr = (__u64)buf;
|
|
sqe->len = t->commit_buf_elem_size * nr_elem;
|
|
|
|
ublk_init_batch_cmd(t, q->q_id, sqe, UBLK_U_IO_PREP_IO_CMDS,
|
|
t->commit_buf_elem_size, nr_elem, buf_idx);
|
|
ublk_setup_commit_sqe(t, sqe, buf_idx);
|
|
return 0;
|
|
}
|
|
|
|
int ublk_batch_queue_prep_io_cmds(struct ublk_thread *t, struct ublk_queue *q)
|
|
{
|
|
int ret = 0;
|
|
|
|
pthread_spin_lock(&q->lock);
|
|
if (q->flags & UBLKS_Q_PREPARED)
|
|
goto unlock;
|
|
ret = __ublk_batch_queue_prep_io_cmds(t, q);
|
|
if (!ret)
|
|
q->flags |= UBLKS_Q_PREPARED;
|
|
unlock:
|
|
pthread_spin_unlock(&q->lock);
|
|
|
|
return ret;
|
|
}
|
|
|
|
static void ublk_batch_compl_commit_cmd(struct ublk_thread *t,
|
|
const struct io_uring_cqe *cqe,
|
|
unsigned op)
|
|
{
|
|
unsigned short buf_idx = user_data_to_tag(cqe->user_data);
|
|
|
|
if (op == _IOC_NR(UBLK_U_IO_PREP_IO_CMDS))
|
|
ublk_assert(cqe->res == 0);
|
|
else if (op == _IOC_NR(UBLK_U_IO_COMMIT_IO_CMDS)) {
|
|
int nr_elem = user_data_to_tgt_data(cqe->user_data);
|
|
|
|
ublk_assert(cqe->res == t->commit_buf_elem_size * nr_elem);
|
|
} else
|
|
ublk_assert(0);
|
|
|
|
ublk_free_commit_buf(t, buf_idx);
|
|
}
|
|
|
|
void ublk_batch_compl_cmd(struct ublk_thread *t,
|
|
const struct io_uring_cqe *cqe)
|
|
{
|
|
unsigned op = user_data_to_op(cqe->user_data);
|
|
struct ublk_queue *q;
|
|
unsigned buf_idx;
|
|
unsigned q_id;
|
|
|
|
if (op == _IOC_NR(UBLK_U_IO_PREP_IO_CMDS) ||
|
|
op == _IOC_NR(UBLK_U_IO_COMMIT_IO_CMDS)) {
|
|
t->cmd_inflight--;
|
|
ublk_batch_compl_commit_cmd(t, cqe, op);
|
|
return;
|
|
}
|
|
|
|
/* FETCH command is per queue */
|
|
q_id = user_data_to_q_id(cqe->user_data);
|
|
q = &t->dev->q[q_id];
|
|
buf_idx = ublk_compl_batch_fetch(t, q, cqe);
|
|
|
|
if (cqe->res < 0 && cqe->res != -ENOBUFS) {
|
|
t->cmd_inflight--;
|
|
t->state |= UBLKS_T_STOPPING;
|
|
} else if (!(cqe->flags & IORING_CQE_F_MORE) || cqe->res == -ENOBUFS) {
|
|
t->cmd_inflight--;
|
|
ublk_batch_queue_fetch(t, q, buf_idx);
|
|
}
|
|
}
|
|
|
|
static void __ublk_batch_commit_io_cmds(struct ublk_thread *t,
|
|
struct batch_commit_buf *cb)
|
|
{
|
|
struct io_uring_sqe *sqe;
|
|
unsigned short buf_idx;
|
|
unsigned short nr_elem = cb->done;
|
|
|
|
/* nothing to commit */
|
|
if (!nr_elem) {
|
|
ublk_free_commit_buf(t, cb->buf_idx);
|
|
return;
|
|
}
|
|
|
|
ublk_io_alloc_sqes(t, &sqe, 1);
|
|
buf_idx = cb->buf_idx;
|
|
sqe->addr = (__u64)cb->elem;
|
|
sqe->len = nr_elem * t->commit_buf_elem_size;
|
|
|
|
/* commit isn't per-queue command */
|
|
ublk_init_batch_cmd(t, cb->q_id, sqe, UBLK_U_IO_COMMIT_IO_CMDS,
|
|
t->commit_buf_elem_size, nr_elem, buf_idx);
|
|
ublk_setup_commit_sqe(t, sqe, buf_idx);
|
|
}
|
|
|
|
void ublk_batch_commit_io_cmds(struct ublk_thread *t)
|
|
{
|
|
int i;
|
|
|
|
for (i = 0; i < t->nr_queues; i++) {
|
|
struct batch_commit_buf *cb = &t->commit[i];
|
|
|
|
if (cb->buf_idx != UBLKS_T_COMMIT_BUF_INV_IDX)
|
|
__ublk_batch_commit_io_cmds(t, cb);
|
|
}
|
|
|
|
}
|
|
|
|
static void __ublk_batch_init_commit(struct ublk_thread *t,
|
|
struct batch_commit_buf *cb,
|
|
unsigned short buf_idx)
|
|
{
|
|
/* so far only support 1:1 queue/thread mapping */
|
|
cb->buf_idx = buf_idx;
|
|
cb->elem = ublk_get_commit_buf(t, buf_idx);
|
|
cb->done = 0;
|
|
cb->count = t->commit_buf_size /
|
|
t->commit_buf_elem_size;
|
|
}
|
|
|
|
/* COMMIT_IO_CMDS is per-queue command, so use its own commit buffer */
|
|
static void ublk_batch_init_commit(struct ublk_thread *t,
|
|
struct batch_commit_buf *cb)
|
|
{
|
|
unsigned short buf_idx = ublk_alloc_commit_buf(t);
|
|
|
|
ublk_assert(buf_idx != UBLKS_T_COMMIT_BUF_INV_IDX);
|
|
ublk_assert(!ublk_batch_commit_prepared(cb));
|
|
|
|
__ublk_batch_init_commit(t, cb, buf_idx);
|
|
}
|
|
|
|
void ublk_batch_prep_commit(struct ublk_thread *t)
|
|
{
|
|
int i;
|
|
|
|
for (i = 0; i < t->nr_queues; i++)
|
|
t->commit[i].buf_idx = UBLKS_T_COMMIT_BUF_INV_IDX;
|
|
}
|
|
|
|
void ublk_batch_complete_io(struct ublk_thread *t, struct ublk_queue *q,
|
|
unsigned tag, int res)
|
|
{
|
|
unsigned q_t_idx = ublk_queue_idx_in_thread(t, q);
|
|
struct batch_commit_buf *cb = &t->commit[q_t_idx];
|
|
struct ublk_batch_elem *elem;
|
|
struct ublk_io *io = &q->ios[tag];
|
|
|
|
if (!ublk_batch_commit_prepared(cb))
|
|
ublk_batch_init_commit(t, cb);
|
|
|
|
ublk_assert(q->q_id == cb->q_id);
|
|
|
|
elem = (struct ublk_batch_elem *)(cb->elem + cb->done * t->commit_buf_elem_size);
|
|
elem->tag = tag;
|
|
elem->buf_index = ublk_batch_io_buf_idx(t, q, tag);
|
|
elem->result = res;
|
|
|
|
if (!ublk_queue_no_buf(q))
|
|
elem->buf_addr = (__u64) (uintptr_t) io->buf_addr;
|
|
|
|
cb->done += 1;
|
|
ublk_assert(cb->done <= cb->count);
|
|
}
|
|
|
|
void ublk_batch_setup_map(unsigned char (*q_thread_map)[UBLK_MAX_QUEUES],
|
|
int nthreads, int queues)
|
|
{
|
|
int i, j;
|
|
|
|
/*
|
|
* Setup round-robin queue-to-thread mapping for arbitrary N:M combinations.
|
|
*
|
|
* This algorithm distributes queues across threads (and threads across queues)
|
|
* in a balanced round-robin fashion to ensure even load distribution.
|
|
*
|
|
* Examples:
|
|
* - 2 threads, 4 queues: T0=[Q0,Q2], T1=[Q1,Q3]
|
|
* - 4 threads, 2 queues: T0=[Q0], T1=[Q1], T2=[Q0], T3=[Q1]
|
|
* - 3 threads, 3 queues: T0=[Q0], T1=[Q1], T2=[Q2] (1:1 mapping)
|
|
*
|
|
* Phase 1: Mark which queues each thread handles (boolean mapping)
|
|
*/
|
|
for (i = 0, j = 0; i < queues || j < nthreads; i++, j++) {
|
|
q_thread_map[j % nthreads][i % queues] = 1;
|
|
}
|
|
|
|
/*
|
|
* Phase 2: Convert boolean mapping to sequential indices within each thread.
|
|
*
|
|
* Transform from: q_thread_map[thread][queue] = 1 (handles queue)
|
|
* To: q_thread_map[thread][queue] = N (queue index within thread)
|
|
*
|
|
* This allows each thread to know the local index of each queue it handles,
|
|
* which is essential for buffer allocation and management. For example:
|
|
* - Thread 0 handling queues [0,2] becomes: q_thread_map[0][0]=1, q_thread_map[0][2]=2
|
|
* - Thread 1 handling queues [1,3] becomes: q_thread_map[1][1]=1, q_thread_map[1][3]=2
|
|
*/
|
|
for (j = 0; j < nthreads; j++) {
|
|
unsigned char seq = 1;
|
|
|
|
for (i = 0; i < queues; i++) {
|
|
if (q_thread_map[j][i])
|
|
q_thread_map[j][i] = seq++;
|
|
}
|
|
}
|
|
|
|
#if 0
|
|
for (j = 0; j < nthreads; j++) {
|
|
printf("thread %0d: ", j);
|
|
for (i = 0; i < queues; i++) {
|
|
if (q_thread_map[j][i])
|
|
printf("%03u ", i);
|
|
}
|
|
printf("\n");
|
|
}
|
|
printf("\n");
|
|
for (j = 0; j < nthreads; j++) {
|
|
for (i = 0; i < queues; i++) {
|
|
printf("%03u ", q_thread_map[j][i]);
|
|
}
|
|
printf("\n");
|
|
}
|
|
#endif
|
|
}
|