Commit 5ff817b2 authored by NeilBrown's avatar NeilBrown Committed by Chuck Lever
Browse files

SUNRPC: add list of idle threads



Rather than searching a list of threads to find an idle one, having a
list of idle threads allows an idle thread to be found immediately.

This adds some spin_lock calls which is not ideal, but as the hold-time
is tiny it is still faster than searching a list.  A future patch will
remove them using llist.h.  This involves some subtlety and so is left
to a separate patch.

This removes the need for the RQ_BUSY flag.  The rqst is "busy"
precisely when it is not on the "idle" list.

Signed-off-by: default avatarNeilBrown <neilb@suse.de>
Signed-off-by: default avatarChuck Lever <chuck.lever@oracle.com>
parent fa341560
Loading
Loading
Loading
Loading
+24 −1
Original line number Diff line number Diff line
@@ -37,6 +37,7 @@ struct svc_pool {
	struct list_head	sp_sockets;	/* pending sockets */
	unsigned int		sp_nrthreads;	/* # of threads in pool */
	struct list_head	sp_all_threads;	/* all server threads */
	struct list_head	sp_idle_threads; /* idle server threads */

	/* statistics on pool operation */
	struct percpu_counter	sp_messages_arrived;
@@ -186,6 +187,7 @@ extern u32 svc_max_payload(const struct svc_rqst *rqstp);
 */
struct svc_rqst {
	struct list_head	rq_all;		/* all threads list */
	struct list_head	rq_idle;	/* On the idle list */
	struct rcu_head		rq_rcu_head;	/* for RCU deferred kfree */
	struct svc_xprt *	rq_xprt;	/* transport ptr */

@@ -262,10 +264,31 @@ enum {
	RQ_SPLICE_OK,		/* turned off in gss privacy to prevent
				 * encrypting page cache pages */
	RQ_VICTIM,		/* Have agreed to shut down */
	RQ_BUSY,		/* request is busy */
	RQ_DATA,		/* request has data */
};

/**
 * svc_thread_set_busy - mark a thread as busy
 * @rqstp: the thread which is now busy
 *
 * If rq_idle is "empty", the thread must be busy.
 */
static inline void svc_thread_set_busy(struct svc_rqst *rqstp)
{
	INIT_LIST_HEAD(&rqstp->rq_idle);
}

/**
 * svc_thread_busy - check if a thread as busy
 * @rqstp: the thread which might be busy
 *
 * If rq_idle is "empty", the thread must be busy.
 */
static inline bool svc_thread_busy(struct svc_rqst *rqstp)
{
	return list_empty(&rqstp->rq_idle);
}

#define SVC_NET(rqst) (rqst->rq_xprt ? rqst->rq_xprt->xpt_net : rqst->rq_bc_net)

/*
+0 −1
Original line number Diff line number Diff line
@@ -1677,7 +1677,6 @@ DEFINE_SVCXDRBUF_EVENT(sendto);
	svc_rqst_flag(DROPME)						\
	svc_rqst_flag(SPLICE_OK)					\
	svc_rqst_flag(VICTIM)						\
	svc_rqst_flag(BUSY)						\
	svc_rqst_flag_end(DATA)

#undef svc_rqst_flag
+9 −5
Original line number Diff line number Diff line
@@ -510,6 +510,7 @@ __svc_create(struct svc_program *prog, unsigned int bufsize, int npools,
		pool->sp_id = i;
		INIT_LIST_HEAD(&pool->sp_sockets);
		INIT_LIST_HEAD(&pool->sp_all_threads);
		INIT_LIST_HEAD(&pool->sp_idle_threads);
		spin_lock_init(&pool->sp_lock);

		percpu_counter_init(&pool->sp_messages_arrived, 0, GFP_KERNEL);
@@ -641,7 +642,7 @@ svc_rqst_alloc(struct svc_serv *serv, struct svc_pool *pool, int node)

	folio_batch_init(&rqstp->rq_fbatch);

	__set_bit(RQ_BUSY, &rqstp->rq_flags);
	svc_thread_set_busy(rqstp);
	rqstp->rq_server = serv;
	rqstp->rq_pool = pool;

@@ -702,10 +703,13 @@ void svc_pool_wake_idle_thread(struct svc_pool *pool)
	struct svc_rqst	*rqstp;

	rcu_read_lock();
	list_for_each_entry_rcu(rqstp, &pool->sp_all_threads, rq_all) {
		if (test_and_set_bit(RQ_BUSY, &rqstp->rq_flags))
			continue;

	spin_lock_bh(&pool->sp_lock);
	rqstp = list_first_entry_or_null(&pool->sp_idle_threads,
					 struct svc_rqst, rq_idle);
	if (rqstp)
		list_del_init(&rqstp->rq_idle);
	spin_unlock_bh(&pool->sp_lock);
	if (rqstp) {
		WRITE_ONCE(rqstp->rq_qtime, ktime_get());
		wake_up_process(rqstp->rq_task);
		rcu_read_unlock();
+11 −4
Original line number Diff line number Diff line
@@ -737,8 +737,9 @@ static void svc_rqst_wait_for_work(struct svc_rqst *rqstp)
		set_current_state(TASK_IDLE);
		smp_mb__before_atomic();
		clear_bit(SP_CONGESTED, &pool->sp_flags);
		clear_bit(RQ_BUSY, &rqstp->rq_flags);
		smp_mb__after_atomic();
		spin_lock_bh(&pool->sp_lock);
		list_add(&rqstp->rq_idle, &pool->sp_idle_threads);
		spin_unlock_bh(&pool->sp_lock);

		/* Need to check should_sleep() again after
		 * setting task state in case a wakeup happened
@@ -751,8 +752,14 @@ static void svc_rqst_wait_for_work(struct svc_rqst *rqstp)
			cond_resched();
		}

		set_bit(RQ_BUSY, &rqstp->rq_flags);
		smp_mb__after_atomic();
		/* We *must* be removed from the list before we can continue.
		 * If we were woken, this is already done
		 */
		if (!svc_thread_busy(rqstp)) {
			spin_lock_bh(&pool->sp_lock);
			list_del_init(&rqstp->rq_idle);
			spin_unlock_bh(&pool->sp_lock);
		}
	} else {
		cond_resched();
	}