Commit 17c1d665 authored by Jeff Layton's avatar Jeff Layton Committed by Chuck Lever
Browse files

sunrpc: convert queue_lock from global spinlock to per-cache-detail lock



The global queue_lock serializes all upcall queue operations across
every cache_detail instance. Convert it to a per-cache-detail spinlock
so that different caches (e.g. auth.unix.ip vs nfsd.fh) no longer
contend with each other on queue operations.

Signed-off-by: default avatarJeff Layton <jlayton@kernel.org>
Signed-off-by: default avatarChuck Lever <chuck.lever@oracle.com>
parent 6b4f16a5
Loading
Loading
Loading
Loading
+1 −0
Original line number Diff line number Diff line
@@ -113,6 +113,7 @@ struct cache_detail {

	/* fields for communication over channel */
	struct list_head	queue;
	spinlock_t		queue_lock;

	atomic_t		writers;		/* how many time is /channel open */
	time64_t		last_close;		/* if no writers, when did last close */
+23 −24
Original line number Diff line number Diff line
@@ -400,6 +400,7 @@ void sunrpc_init_cache_detail(struct cache_detail *cd)
{
	spin_lock_init(&cd->hash_lock);
	INIT_LIST_HEAD(&cd->queue);
	spin_lock_init(&cd->queue_lock);
	spin_lock(&cache_list_lock);
	cd->nextcheck = 0;
	cd->entries = 0;
@@ -803,8 +804,6 @@ void cache_clean_deferred(void *owner)
 *
 */

static DEFINE_SPINLOCK(queue_lock);

struct cache_queue {
	struct list_head	list;
	int			reader;	/* if 0, then request */
@@ -847,7 +846,7 @@ static ssize_t cache_read(struct file *filp, char __user *buf, size_t count,
	inode_lock(inode); /* protect against multiple concurrent
			      * readers on this file */
 again:
	spin_lock(&queue_lock);
	spin_lock(&cd->queue_lock);
	/* need to find next request */
	while (rp->q.list.next != &cd->queue &&
	       list_entry(rp->q.list.next, struct cache_queue, list)
@@ -856,7 +855,7 @@ static ssize_t cache_read(struct file *filp, char __user *buf, size_t count,
		list_move(&rp->q.list, next);
	}
	if (rp->q.list.next == &cd->queue) {
		spin_unlock(&queue_lock);
		spin_unlock(&cd->queue_lock);
		inode_unlock(inode);
		WARN_ON_ONCE(rp->offset);
		return 0;
@@ -865,7 +864,7 @@ static ssize_t cache_read(struct file *filp, char __user *buf, size_t count,
	WARN_ON_ONCE(rq->q.reader);
	if (rp->offset == 0)
		rq->readers++;
	spin_unlock(&queue_lock);
	spin_unlock(&cd->queue_lock);

	if (rq->len == 0) {
		err = cache_request(cd, rq);
@@ -876,9 +875,9 @@ static ssize_t cache_read(struct file *filp, char __user *buf, size_t count,

	if (rp->offset == 0 && !test_bit(CACHE_PENDING, &rq->item->flags)) {
		err = -EAGAIN;
		spin_lock(&queue_lock);
		spin_lock(&cd->queue_lock);
		list_move(&rp->q.list, &rq->q.list);
		spin_unlock(&queue_lock);
		spin_unlock(&cd->queue_lock);
	} else {
		if (rp->offset + count > rq->len)
			count = rq->len - rp->offset;
@@ -888,26 +887,26 @@ static ssize_t cache_read(struct file *filp, char __user *buf, size_t count,
		rp->offset += count;
		if (rp->offset >= rq->len) {
			rp->offset = 0;
			spin_lock(&queue_lock);
			spin_lock(&cd->queue_lock);
			list_move(&rp->q.list, &rq->q.list);
			spin_unlock(&queue_lock);
			spin_unlock(&cd->queue_lock);
		}
		err = 0;
	}
 out:
	if (rp->offset == 0) {
		/* need to release rq */
		spin_lock(&queue_lock);
		spin_lock(&cd->queue_lock);
		rq->readers--;
		if (rq->readers == 0 &&
		    !test_bit(CACHE_PENDING, &rq->item->flags)) {
			list_del(&rq->q.list);
			spin_unlock(&queue_lock);
			spin_unlock(&cd->queue_lock);
			cache_put(rq->item, cd);
			kfree(rq->buf);
			kfree(rq);
		} else
			spin_unlock(&queue_lock);
			spin_unlock(&cd->queue_lock);
	}
	if (err == -EAGAIN)
		goto again;
@@ -988,7 +987,7 @@ static __poll_t cache_poll(struct file *filp, poll_table *wait,
	if (!rp)
		return mask;

	spin_lock(&queue_lock);
	spin_lock(&cd->queue_lock);

	for (cq= &rp->q; &cq->list != &cd->queue;
	     cq = list_entry(cq->list.next, struct cache_queue, list))
@@ -996,7 +995,7 @@ static __poll_t cache_poll(struct file *filp, poll_table *wait,
			mask |= EPOLLIN | EPOLLRDNORM;
			break;
		}
	spin_unlock(&queue_lock);
	spin_unlock(&cd->queue_lock);
	return mask;
}

@@ -1011,7 +1010,7 @@ static int cache_ioctl(struct inode *ino, struct file *filp,
	if (cmd != FIONREAD || !rp)
		return -EINVAL;

	spin_lock(&queue_lock);
	spin_lock(&cd->queue_lock);

	/* only find the length remaining in current request,
	 * or the length of the next request
@@ -1024,7 +1023,7 @@ static int cache_ioctl(struct inode *ino, struct file *filp,
			len = cr->len - rp->offset;
			break;
		}
	spin_unlock(&queue_lock);
	spin_unlock(&cd->queue_lock);

	return put_user(len, (int __user *)arg);
}
@@ -1046,9 +1045,9 @@ static int cache_open(struct inode *inode, struct file *filp,
		rp->offset = 0;
		rp->q.reader = 1;

		spin_lock(&queue_lock);
		spin_lock(&cd->queue_lock);
		list_add(&rp->q.list, &cd->queue);
		spin_unlock(&queue_lock);
		spin_unlock(&cd->queue_lock);
	}
	if (filp->f_mode & FMODE_WRITE)
		atomic_inc(&cd->writers);
@@ -1064,7 +1063,7 @@ static int cache_release(struct inode *inode, struct file *filp,
	if (rp) {
		struct cache_request *rq = NULL;

		spin_lock(&queue_lock);
		spin_lock(&cd->queue_lock);
		if (rp->offset) {
			struct cache_queue *cq;
			for (cq = &rp->q; &cq->list != &cd->queue;
@@ -1086,7 +1085,7 @@ static int cache_release(struct inode *inode, struct file *filp,
			rp->offset = 0;
		}
		list_del(&rp->q.list);
		spin_unlock(&queue_lock);
		spin_unlock(&cd->queue_lock);

		if (rq) {
			cache_put(rq->item, cd);
@@ -1113,7 +1112,7 @@ static void cache_dequeue(struct cache_detail *detail, struct cache_head *ch)
	struct cache_request *cr;
	LIST_HEAD(dequeued);

	spin_lock(&queue_lock);
	spin_lock(&detail->queue_lock);
	list_for_each_entry_safe(cq, tmp, &detail->queue, list)
		if (!cq->reader) {
			cr = container_of(cq, struct cache_request, q);
@@ -1126,7 +1125,7 @@ static void cache_dequeue(struct cache_detail *detail, struct cache_head *ch)
				continue;
			list_move(&cr->q.list, &dequeued);
		}
	spin_unlock(&queue_lock);
	spin_unlock(&detail->queue_lock);
	while (!list_empty(&dequeued)) {
		cr = list_entry(dequeued.next, struct cache_request, q.list);
		list_del(&cr->q.list);
@@ -1251,7 +1250,7 @@ static int cache_pipe_upcall(struct cache_detail *detail, struct cache_head *h)
	crq->buf = buf;
	crq->len = 0;
	crq->readers = 0;
	spin_lock(&queue_lock);
	spin_lock(&detail->queue_lock);
	if (test_bit(CACHE_PENDING, &h->flags)) {
		crq->item = cache_get(h);
		list_add_tail(&crq->q.list, &detail->queue);
@@ -1259,7 +1258,7 @@ static int cache_pipe_upcall(struct cache_detail *detail, struct cache_head *h)
	} else
		/* Lost a race, no longer PENDING, so don't enqueue */
		ret = -EAGAIN;
	spin_unlock(&queue_lock);
	spin_unlock(&detail->queue_lock);
	wake_up(&queue_wait);
	if (ret == -EAGAIN) {
		kfree(buf);