Commit ccc89b9d authored by Chuck Lever's avatar Chuck Lever
Browse files

svcrdma: Add fair queuing for Send Queue access



When the Send Queue fills, multiple threads may wait for SQ slots.
The previous implementation had no ordering guarantee, allowing
starvation when one thread repeatedly acquires slots while others
wait indefinitely.

Introduce a ticket-based fair queuing system. Each waiter takes a
ticket number and is served in FIFO order. This ensures forward
progress for all waiters when SQ capacity is constrained.

The implementation has two phases:
1. Fast path: attempt to reserve SQ slots without waiting
2. Slow path: take a ticket, wait for turn, then wait for slots

The ticket system adds two atomic counters to the transport:
- sc_sq_ticket_head: next ticket to issue
- sc_sq_ticket_tail: ticket currently being served

A dedicated wait queue (sc_sq_ticket_wait) handles ticket
ordering, separate from sc_send_wait which handles SQ capacity.
This separation ensures that send completions (the high-frequency
wake source) wake only the current ticket holder rather than all
queued waiters. Ticket handoff wakes only the ticket wait queue,
and each ticket holder that exits via connection close propagates
the wake to the next waiter in line.

When a waiter successfully reserves slots, it advances the tail
counter and wakes the next waiter. This creates an orderly handoff
that prevents starvation while maintaining good throughput on the
fast path when contention is low.

Signed-off-by: default avatarChuck Lever <chuck.lever@oracle.com>
parent d7f3efd9
Loading
Loading
Loading
Loading
+10 −0
Original line number Diff line number Diff line
@@ -84,6 +84,9 @@ struct svcxprt_rdma {

	atomic_t             sc_sq_avail;	/* SQEs ready to be consumed */
	unsigned int	     sc_sq_depth;	/* Depth of SQ */
	atomic_t	     sc_sq_ticket_head;	/* Next ticket to issue */
	atomic_t	     sc_sq_ticket_tail;	/* Ticket currently serving */
	wait_queue_head_t    sc_sq_ticket_wait;	/* Ticket ordering waitlist */
	__be32		     sc_fc_credits;	/* Forward credits */
	u32		     sc_max_requests;	/* Max requests */
	u32		     sc_max_bc_requests;/* Backward credits */
@@ -306,6 +309,13 @@ extern void svc_rdma_send_error_msg(struct svcxprt_rdma *rdma,
				    struct svc_rdma_recv_ctxt *rctxt,
				    int status);
extern void svc_rdma_wake_send_waiters(struct svcxprt_rdma *rdma, int avail);
extern int svc_rdma_sq_wait(struct svcxprt_rdma *rdma,
			    const struct rpc_rdma_cid *cid, int sqecount);
extern int svc_rdma_post_send_err(struct svcxprt_rdma *rdma,
				  const struct rpc_rdma_cid *cid,
				  const struct ib_send_wr *bad_wr,
				  const struct ib_send_wr *first_wr,
				  int sqecount, int ret);
extern int svc_rdma_sendto(struct svc_rqst *);
extern int svc_rdma_result_payload(struct svc_rqst *rqstp, unsigned int offset,
				   unsigned int length);
+10 −27
Original line number Diff line number Diff line
@@ -405,34 +405,17 @@ static int svc_rdma_post_chunk_ctxt(struct svcxprt_rdma *rdma,
		cqe = NULL;
	}

	do {
		if (atomic_sub_return(cc->cc_sqecount,
				      &rdma->sc_sq_avail) > 0) {
	ret = svc_rdma_sq_wait(rdma, &cc->cc_cid, cc->cc_sqecount);
	if (ret < 0)
		return ret;

	cc->cc_posttime = ktime_get();
	ret = ib_post_send(rdma->sc_qp, first_wr, &bad_wr);
	if (ret)
				break;
			return 0;
		}

		percpu_counter_inc(&svcrdma_stat_sq_starve);
		trace_svcrdma_sq_full(rdma, &cc->cc_cid);
		atomic_add(cc->cc_sqecount, &rdma->sc_sq_avail);
		wait_event(rdma->sc_send_wait,
			   atomic_read(&rdma->sc_sq_avail) > cc->cc_sqecount);
		trace_svcrdma_sq_retry(rdma, &cc->cc_cid);
	} while (1);

	trace_svcrdma_sq_post_err(rdma, &cc->cc_cid, ret);
	svc_xprt_deferred_close(&rdma->sc_xprt);

	/* If even one was posted, there will be a completion. */
	if (bad_wr != first_wr)
		return svc_rdma_post_send_err(rdma, &cc->cc_cid, bad_wr,
					      first_wr, cc->cc_sqecount,
					      ret);
	return 0;

	atomic_add(cc->cc_sqecount, &rdma->sc_sq_avail);
	wake_up(&rdma->sc_send_wait);
	return -ENOTCONN;
}

/* Build a bvec that covers one kvec in an xdr_buf.
+121 −41
Original line number Diff line number Diff line
@@ -294,6 +294,117 @@ void svc_rdma_wake_send_waiters(struct svcxprt_rdma *rdma, int avail)
		wake_up(&rdma->sc_send_wait);
}

/**
 * svc_rdma_sq_wait - Wait for SQ slots using fair queuing
 * @rdma: controlling transport
 * @cid: completion ID for tracing
 * @sqecount: number of SQ entries needed
 *
 * A ticket-based system ensures fair ordering when multiple threads
 * wait for Send Queue capacity. Each waiter takes a ticket and is
 * served in order, preventing starvation.
 *
 * Protocol invariant: every ticket holder must increment
 * sc_sq_ticket_tail exactly once, whether the reservation
 * succeeds or the connection closes. Failing to advance the
 * tail stalls all subsequent waiters.
 *
 * The ticket counters are signed 32-bit atomics. After
 * wrapping through INT_MAX, the equality check
 * (tail == ticket) remains correct because both counters
 * advance monotonically and the comparison uses exact
 * equality rather than relational operators.
 *
 * Return values:
 *   %0: SQ slots were reserved successfully
 *   %-ENOTCONN: The connection was lost
 */
int svc_rdma_sq_wait(struct svcxprt_rdma *rdma,
		     const struct rpc_rdma_cid *cid, int sqecount)
{
	int ticket;

	/* Fast path: try to reserve SQ slots without waiting.
	 *
	 * A failed reservation temporarily understates sc_sq_avail
	 * until the compensating atomic_add restores it. A Send
	 * completion arriving in that window sees a lower count
	 * than reality, but the value self-corrects once the add
	 * completes. No ordering guarantee is needed here because
	 * the slow path serializes all contended waiters.
	 */
	if (likely(atomic_sub_return(sqecount, &rdma->sc_sq_avail) >= 0))
		return 0;
	atomic_add(sqecount, &rdma->sc_sq_avail);

	/* Slow path: take a ticket and wait in line */
	ticket = atomic_fetch_inc(&rdma->sc_sq_ticket_head);

	percpu_counter_inc(&svcrdma_stat_sq_starve);
	trace_svcrdma_sq_full(rdma, cid);

	/* Wait until all earlier tickets have been served */
	wait_event(rdma->sc_sq_ticket_wait,
		   test_bit(XPT_CLOSE, &rdma->sc_xprt.xpt_flags) ||
		   atomic_read(&rdma->sc_sq_ticket_tail) == ticket);
	if (test_bit(XPT_CLOSE, &rdma->sc_xprt.xpt_flags))
		goto out_close;

	/* It's our turn. Wait for enough SQ slots to be available. */
	while (atomic_sub_return(sqecount, &rdma->sc_sq_avail) < 0) {
		atomic_add(sqecount, &rdma->sc_sq_avail);

		wait_event(rdma->sc_send_wait,
			   test_bit(XPT_CLOSE, &rdma->sc_xprt.xpt_flags) ||
			   atomic_read(&rdma->sc_sq_avail) >= sqecount);
		if (test_bit(XPT_CLOSE, &rdma->sc_xprt.xpt_flags))
			goto out_close;
	}

	/* Slots reserved successfully. Let the next waiter proceed. */
	atomic_inc(&rdma->sc_sq_ticket_tail);
	wake_up(&rdma->sc_sq_ticket_wait);
	trace_svcrdma_sq_retry(rdma, cid);
	return 0;

out_close:
	atomic_inc(&rdma->sc_sq_ticket_tail);
	wake_up(&rdma->sc_sq_ticket_wait);
	return -ENOTCONN;
}

/**
 * svc_rdma_post_send_err - Handle ib_post_send failure
 * @rdma: controlling transport
 * @cid: completion ID for tracing
 * @bad_wr: first WR that was not posted
 * @first_wr: first WR in the chain
 * @sqecount: number of SQ entries that were reserved
 * @ret: error code from ib_post_send
 *
 * Return values:
 *   %0: At least one WR was posted; a completion handles cleanup
 *   %-ENOTCONN: No WRs were posted; SQ slots are released
 */
int svc_rdma_post_send_err(struct svcxprt_rdma *rdma,
			   const struct rpc_rdma_cid *cid,
			   const struct ib_send_wr *bad_wr,
			   const struct ib_send_wr *first_wr,
			   int sqecount, int ret)
{
	trace_svcrdma_sq_post_err(rdma, cid, ret);
	svc_xprt_deferred_close(&rdma->sc_xprt);

	/* If even one WR was posted, a Send completion will
	 * return the reserved SQ slots.
	 */
	if (bad_wr != first_wr)
		return 0;

	svc_rdma_wake_send_waiters(rdma, sqecount);
	return -ENOTCONN;
}

/**
 * svc_rdma_wc_send - Invoked by RDMA provider for each polled Send WC
 * @cq: Completion Queue context
@@ -336,11 +447,6 @@ static void svc_rdma_wc_send(struct ib_cq *cq, struct ib_wc *wc)
 * that these values remain available after the ib_post_send() call.
 * In some error flow cases, svc_rdma_wc_send() releases @ctxt.
 *
 * Note there is potential for starvation when the Send Queue is
 * full because there is no order to when waiting threads are
 * awoken. The transport is typically provisioned with a deep
 * enough Send Queue that SQ exhaustion should be a rare event.
 *
 * Return values:
 *   %0: @ctxt's WR chain was posted successfully
 *   %-ENOTCONN: The connection was lost
@@ -362,43 +468,17 @@ int svc_rdma_post_send(struct svcxprt_rdma *rdma,
				      send_wr->sg_list[0].length,
				      DMA_TO_DEVICE);

	/* If the SQ is full, wait until an SQ entry is available */
	while (!test_bit(XPT_CLOSE, &rdma->sc_xprt.xpt_flags)) {
		if (atomic_sub_return(sqecount, &rdma->sc_sq_avail) < 0) {
			svc_rdma_wake_send_waiters(rdma, sqecount);

			/* When the transport is torn down, assume
			 * ib_drain_sq() will trigger enough Send
			 * completions to wake us. The XPT_CLOSE test
			 * above should then cause the while loop to
			 * exit.
			 */
			percpu_counter_inc(&svcrdma_stat_sq_starve);
			trace_svcrdma_sq_full(rdma, &cid);
			wait_event(rdma->sc_send_wait,
				   atomic_read(&rdma->sc_sq_avail) > 0);
			trace_svcrdma_sq_retry(rdma, &cid);
			continue;
		}
	ret = svc_rdma_sq_wait(rdma, &cid, sqecount);
	if (ret < 0)
		return ret;

	trace_svcrdma_post_send(ctxt);
	ret = ib_post_send(rdma->sc_qp, first_wr, &bad_wr);
		if (ret) {
			trace_svcrdma_sq_post_err(rdma, &cid, ret);
			svc_xprt_deferred_close(&rdma->sc_xprt);

			/* If even one WR was posted, there will be a
			 * Send completion that bumps sc_sq_avail.
			 */
			if (bad_wr == first_wr) {
				svc_rdma_wake_send_waiters(rdma, sqecount);
				break;
			}
		}
	if (ret)
		return svc_rdma_post_send_err(rdma, &cid, bad_wr,
					      first_wr, sqecount, ret);
	return 0;
}
	return -ENOTCONN;
}

/**
 * svc_rdma_encode_read_list - Encode RPC Reply's Read chunk list
+5 −1
Original line number Diff line number Diff line
@@ -179,6 +179,7 @@ static struct svcxprt_rdma *svc_rdma_create_xprt(struct svc_serv *serv,
	init_llist_head(&cma_xprt->sc_recv_ctxts);
	init_llist_head(&cma_xprt->sc_rw_ctxts);
	init_waitqueue_head(&cma_xprt->sc_send_wait);
	init_waitqueue_head(&cma_xprt->sc_sq_ticket_wait);

	spin_lock_init(&cma_xprt->sc_lock);
	spin_lock_init(&cma_xprt->sc_rq_dto_lock);
@@ -477,6 +478,8 @@ static struct svc_xprt *svc_rdma_accept(struct svc_xprt *xprt)
	if (newxprt->sc_sq_depth > dev->attrs.max_qp_wr)
		newxprt->sc_sq_depth = dev->attrs.max_qp_wr;
	atomic_set(&newxprt->sc_sq_avail, newxprt->sc_sq_depth);
	atomic_set(&newxprt->sc_sq_ticket_head, 0);
	atomic_set(&newxprt->sc_sq_ticket_tail, 0);

	newxprt->sc_pd = ib_alloc_pd(dev, 0);
	if (IS_ERR(newxprt->sc_pd)) {
@@ -649,7 +652,8 @@ static int svc_rdma_has_wspace(struct svc_xprt *xprt)
	 * If there are already waiters on the SQ,
	 * return false.
	 */
	if (waitqueue_active(&rdma->sc_send_wait))
	if (waitqueue_active(&rdma->sc_send_wait) ||
	    waitqueue_active(&rdma->sc_sq_ticket_wait))
		return 0;

	/* Otherwise return true. */