Commit 765bde47 authored by Chuck Lever's avatar Chuck Lever Committed by Trond Myklebust
Browse files

xprtrdma: Close lost-wakeup race in xprt_rdma_alloc_slot



xprt_rdma_alloc_slot() and xprt_rdma_free_slot() lack serialization
between the buffer pool and the backlog queue.  A buffer freed
after rpcrdma_buffer_get() finds the pool empty but before
rpc_sleep_on() places the task on the backlog is returned to the
pool with no waiter to wake, leaving the task stuck on the backlog
indefinitely.

After joining the backlog, re-check the pool and route any
recovered buffer through xprt_wake_up_backlog(), whose queue lock
serializes with concurrent wakeups and avoids double-assignment
of slots.

Because xprt_rdma_free_slot() does not hold reserve_lock, the
XPRT_CONGESTED double-check in xprt_throttle_congested() is
ineffective: a task can join the backlog through that path after
free_slot has already found it empty and cleared the bit.  Avoid
this by using xprt_add_backlog_noncongested(), which queues the
task without setting XPRT_CONGESTED, so every allocation reaches
xprt_rdma_alloc_slot() and its post-sleep re-check.

Fixes: edb41e61 ("xprtrdma: Make rpc_rqst part of rpcrdma_req")
Signed-off-by: default avatarChuck Lever <chuck.lever@oracle.com>
Signed-off-by: default avatarTrond Myklebust <trond.myklebust@hammerspace.com>
parent 10014209
Loading
Loading
Loading
Loading
+2 −0
Original line number Diff line number Diff line
@@ -404,6 +404,8 @@ struct rpc_xprt * xprt_alloc(struct net *net, size_t size,
				unsigned int max_req);
void			xprt_free(struct rpc_xprt *);
void			xprt_add_backlog(struct rpc_xprt *xprt, struct rpc_task *task);
void			xprt_add_backlog_noncongested(struct rpc_xprt *xprt,
					struct rpc_task *task);
bool			xprt_wake_up_backlog(struct rpc_xprt *xprt, struct rpc_rqst *req);
void			xprt_cleanup_ids(void);

+16 −0
Original line number Diff line number Diff line
@@ -1663,6 +1663,22 @@ void xprt_add_backlog(struct rpc_xprt *xprt, struct rpc_task *task)
}
EXPORT_SYMBOL_GPL(xprt_add_backlog);

/**
 * xprt_add_backlog_noncongested - queue task on backlog
 * @xprt: transport whose backlog queue receives the task
 * @task: task to queue
 *
 * Like xprt_add_backlog, but does not set XPRT_CONGESTED.
 * For transports whose free_slot path does not synchronize
 * with xprt_throttle_congested via reserve_lock.
 */
void xprt_add_backlog_noncongested(struct rpc_xprt *xprt,
				   struct rpc_task *task)
{
	rpc_sleep_on(&xprt->backlog, task, xprt_complete_request_init);
}
EXPORT_SYMBOL_GPL(xprt_add_backlog_noncongested);

static bool __xprt_set_rq(struct rpc_task *task, void *data)
{
	struct rpc_rqst *req = data;
+14 −1
Original line number Diff line number Diff line
@@ -511,7 +511,20 @@ xprt_rdma_alloc_slot(struct rpc_xprt *xprt, struct rpc_task *task)

out_sleep:
	task->tk_status = -EAGAIN;
	xprt_add_backlog(xprt, task);
	xprt_add_backlog_noncongested(xprt, task);
	/* A buffer freed between buffer_get and rpc_sleep_on
	 * goes back to the pool with no waiter to wake.
	 * Re-check after joining the backlog to close that gap.
	 */
	req = rpcrdma_buffer_get(&r_xprt->rx_buf);
	if (req) {
		struct rpc_rqst *rqst = &req->rl_slot;

		if (!xprt_wake_up_backlog(xprt, rqst)) {
			memset(rqst, 0, sizeof(*rqst));
			rpcrdma_buffer_put(&r_xprt->rx_buf, req);
		}
	}
}

/**