Commit c2c9af9a authored by Bernd Schubert's avatar Bernd Schubert Committed by Miklos Szeredi
Browse files

fuse: Allow to queue fg requests through io-uring



This prepares queueing and sending foreground requests through
io-uring.

Signed-off-by: default avatarBernd Schubert <bschubert@ddn.com>
Reviewed-by: Pavel Begunkov <asml.silence@gmail.com> # io_uring
Reviewed-by: default avatarLuis Henriques <luis@igalia.com>
Signed-off-by: default avatarMiklos Szeredi <mszeredi@redhat.com>
parent ba74ba57
Loading
Loading
Loading
Loading
+180 −0
Original line number Diff line number Diff line
@@ -24,6 +24,29 @@ bool fuse_uring_enabled(void)
	return enable_uring;
}

struct fuse_uring_pdu {
	struct fuse_ring_ent *ent;
};

static const struct fuse_iqueue_ops fuse_io_uring_ops;

static void uring_cmd_set_ring_ent(struct io_uring_cmd *cmd,
				   struct fuse_ring_ent *ring_ent)
{
	struct fuse_uring_pdu *pdu =
		io_uring_cmd_to_pdu(cmd, struct fuse_uring_pdu);

	pdu->ent = ring_ent;
}

static struct fuse_ring_ent *uring_cmd_to_ring_ent(struct io_uring_cmd *cmd)
{
	struct fuse_uring_pdu *pdu =
		io_uring_cmd_to_pdu(cmd, struct fuse_uring_pdu);

	return pdu->ent;
}

static void fuse_uring_req_end(struct fuse_ring_ent *ent, struct fuse_req *req,
			       int error)
{
@@ -776,6 +799,31 @@ static int fuse_uring_commit_fetch(struct io_uring_cmd *cmd, int issue_flags,
	return 0;
}

static bool is_ring_ready(struct fuse_ring *ring, int current_qid)
{
	int qid;
	struct fuse_ring_queue *queue;
	bool ready = true;

	for (qid = 0; qid < ring->nr_queues && ready; qid++) {
		if (current_qid == qid)
			continue;

		queue = ring->queues[qid];
		if (!queue) {
			ready = false;
			break;
		}

		spin_lock(&queue->lock);
		if (list_empty(&queue->ent_avail_queue))
			ready = false;
		spin_unlock(&queue->lock);
	}

	return ready;
}

/*
 * fuse_uring_req_fetch command handling
 */
@@ -784,11 +832,23 @@ static void fuse_uring_do_register(struct fuse_ring_ent *ent,
				   unsigned int issue_flags)
{
	struct fuse_ring_queue *queue = ent->queue;
	struct fuse_ring *ring = queue->ring;
	struct fuse_conn *fc = ring->fc;
	struct fuse_iqueue *fiq = &fc->iq;

	spin_lock(&queue->lock);
	ent->cmd = cmd;
	fuse_uring_ent_avail(ent, queue);
	spin_unlock(&queue->lock);

	if (!ring->ready) {
		bool ready = is_ring_ready(ring, queue->qid);

		if (ready) {
			WRITE_ONCE(fiq->ops, &fuse_io_uring_ops);
			WRITE_ONCE(ring->ready, true);
		}
	}
}

/*
@@ -972,3 +1032,123 @@ int __maybe_unused fuse_uring_cmd(struct io_uring_cmd *cmd,

	return -EIOCBQUEUED;
}

static void fuse_uring_send(struct fuse_ring_ent *ent, struct io_uring_cmd *cmd,
			    ssize_t ret, unsigned int issue_flags)
{
	struct fuse_ring_queue *queue = ent->queue;

	spin_lock(&queue->lock);
	ent->state = FRRS_USERSPACE;
	list_move(&ent->list, &queue->ent_in_userspace);
	ent->cmd = NULL;
	spin_unlock(&queue->lock);

	io_uring_cmd_done(cmd, ret, 0, issue_flags);
}

/*
 * This prepares and sends the ring request in fuse-uring task context.
 * User buffers are not mapped yet - the application does not have permission
 * to write to it - this has to be executed in ring task context.
 */
static void fuse_uring_send_in_task(struct io_uring_cmd *cmd,
				    unsigned int issue_flags)
{
	struct fuse_ring_ent *ent = uring_cmd_to_ring_ent(cmd);
	struct fuse_ring_queue *queue = ent->queue;
	int err;

	if (!(issue_flags & IO_URING_F_TASK_DEAD)) {
		err = fuse_uring_prepare_send(ent, ent->fuse_req);
		if (err) {
			fuse_uring_next_fuse_req(ent, queue, issue_flags);
			return;
		}
	} else {
		err = -ECANCELED;
	}

	fuse_uring_send(ent, cmd, err, issue_flags);
}

static struct fuse_ring_queue *fuse_uring_task_to_queue(struct fuse_ring *ring)
{
	unsigned int qid;
	struct fuse_ring_queue *queue;

	qid = task_cpu(current);

	if (WARN_ONCE(qid >= ring->nr_queues,
		      "Core number (%u) exceeds nr queues (%zu)\n", qid,
		      ring->nr_queues))
		qid = 0;

	queue = ring->queues[qid];
	WARN_ONCE(!queue, "Missing queue for qid %d\n", qid);

	return queue;
}

static void fuse_uring_dispatch_ent(struct fuse_ring_ent *ent)
{
	struct io_uring_cmd *cmd = ent->cmd;

	uring_cmd_set_ring_ent(cmd, ent);
	io_uring_cmd_complete_in_task(cmd, fuse_uring_send_in_task);
}

/* queue a fuse request and send it if a ring entry is available */
void fuse_uring_queue_fuse_req(struct fuse_iqueue *fiq, struct fuse_req *req)
{
	struct fuse_conn *fc = req->fm->fc;
	struct fuse_ring *ring = fc->ring;
	struct fuse_ring_queue *queue;
	struct fuse_ring_ent *ent = NULL;
	int err;

	err = -EINVAL;
	queue = fuse_uring_task_to_queue(ring);
	if (!queue)
		goto err;

	if (req->in.h.opcode != FUSE_NOTIFY_REPLY)
		req->in.h.unique = fuse_get_unique(fiq);

	spin_lock(&queue->lock);
	err = -ENOTCONN;
	if (unlikely(queue->stopped))
		goto err_unlock;

	ent = list_first_entry_or_null(&queue->ent_avail_queue,
				       struct fuse_ring_ent, list);
	if (ent)
		fuse_uring_add_req_to_ring_ent(ent, req);
	else
		list_add_tail(&req->list, &queue->fuse_req_queue);
	spin_unlock(&queue->lock);

	if (ent)
		fuse_uring_dispatch_ent(ent);

	return;

err_unlock:
	spin_unlock(&queue->lock);
err:
	req->out.h.error = err;
	clear_bit(FR_PENDING, &req->flags);
	fuse_request_end(req);
}

static const struct fuse_iqueue_ops fuse_io_uring_ops = {
	/* should be send over io-uring as enhancement */
	.send_forget = fuse_dev_queue_forget,

	/*
	 * could be send over io-uring, but interrupts should be rare,
	 * no need to make the code complex
	 */
	.send_interrupt = fuse_dev_queue_interrupt,
	.send_req = fuse_uring_queue_fuse_req,
};
+8 −0
Original line number Diff line number Diff line
@@ -117,6 +117,8 @@ struct fuse_ring {
	unsigned long teardown_time;

	atomic_t queue_refs;

	bool ready;
};

bool fuse_uring_enabled(void);
@@ -124,6 +126,7 @@ void fuse_uring_destruct(struct fuse_conn *fc);
void fuse_uring_stop_queues(struct fuse_ring *ring);
void fuse_uring_abort_end_requests(struct fuse_ring *ring);
int fuse_uring_cmd(struct io_uring_cmd *cmd, unsigned int issue_flags);
void fuse_uring_queue_fuse_req(struct fuse_iqueue *fiq, struct fuse_req *req);

static inline void fuse_uring_abort(struct fuse_conn *fc)
{
@@ -147,6 +150,11 @@ static inline void fuse_uring_wait_stopped_queues(struct fuse_conn *fc)
			   atomic_read(&ring->queue_refs) == 0);
}

static inline bool fuse_uring_ready(struct fuse_conn *fc)
{
	return fc->ring && fc->ring->ready;
}

#else /* CONFIG_FUSE_IO_URING */

struct fuse_ring;