Commit facc4e3c authored by Jeff Layton's avatar Jeff Layton Committed by Chuck Lever
Browse files

sunrpc: split cache_detail queue into request and reader lists



Replace the single interleaved queue (which mixed cache_request and
cache_reader entries distinguished by a ->reader flag) with two
dedicated lists: cd->requests for upcall requests and cd->readers
for open file handles.

Readers now track their position via a monotonically increasing
sequence number (next_seqno) rather than by their position in the
shared list. Each cache_request is assigned a seqno when enqueued,
and a new cache_next_request() helper finds the next request at or
after a given seqno.

This eliminates the cache_queue wrapper struct entirely, simplifies
the reader-skipping loops in cache_read/cache_poll/cache_ioctl/
cache_release, and makes the data flow easier to reason about.

Signed-off-by: default avatarJeff Layton <jlayton@kernel.org>
Signed-off-by: default avatarChuck Lever <chuck.lever@oracle.com>
parent 552d0e17
Loading
Loading
Loading
Loading
+3 −1
Original line number Diff line number Diff line
@@ -113,9 +113,11 @@ struct cache_detail {
	int			entries;

	/* fields for communication over channel */
	struct list_head	queue;
	struct list_head	requests;
	struct list_head	readers;
	spinlock_t		queue_lock;
	wait_queue_head_t	queue_wait;
	u64			next_seqno;

	atomic_t		writers;		/* how many time is /channel open */
	time64_t		last_close;		/* if no writers, when did last close */
+59 −84
Original line number Diff line number Diff line
@@ -399,9 +399,11 @@ static struct delayed_work cache_cleaner;
void sunrpc_init_cache_detail(struct cache_detail *cd)
{
	spin_lock_init(&cd->hash_lock);
	INIT_LIST_HEAD(&cd->queue);
	INIT_LIST_HEAD(&cd->requests);
	INIT_LIST_HEAD(&cd->readers);
	spin_lock_init(&cd->queue_lock);
	init_waitqueue_head(&cd->queue_wait);
	cd->next_seqno = 0;
	spin_lock(&cache_list_lock);
	cd->nextcheck = 0;
	cd->entries = 0;
@@ -796,29 +798,20 @@ void cache_clean_deferred(void *owner)
 * On read, you get a full request, or block.
 * On write, an update request is processed.
 * Poll works if anything to read, and always allows write.
 *
 * Implemented by linked list of requests.  Each open file has
 * a ->private that also exists in this list.  New requests are added
 * to the end and may wakeup and preceding readers.
 * New readers are added to the head.  If, on read, an item is found with
 * CACHE_UPCALLING clear, we free it from the list.
 *
 */

struct cache_queue {
	struct list_head	list;
	int			reader;	/* if 0, then request */
};
struct cache_request {
	struct cache_queue	q;
	struct list_head	list;
	struct cache_head	*item;
	char			*buf;
	int			len;
	int			readers;
	u64			seqno;
};
struct cache_reader {
	struct cache_queue	q;
	struct list_head	list;
	int			offset;	/* if non-0, we have a refcnt on next request */
	u64			next_seqno;
};

static int cache_request(struct cache_detail *detail,
@@ -833,6 +826,17 @@ static int cache_request(struct cache_detail *detail,
	return PAGE_SIZE - len;
}

static struct cache_request *
cache_next_request(struct cache_detail *cd, u64 seqno)
{
	struct cache_request *rq;

	list_for_each_entry(rq, &cd->requests, list)
		if (rq->seqno >= seqno)
			return rq;
	return NULL;
}

static ssize_t cache_read(struct file *filp, char __user *buf, size_t count,
			  loff_t *ppos, struct cache_detail *cd)
{
@@ -849,20 +853,13 @@ static ssize_t cache_read(struct file *filp, char __user *buf, size_t count,
 again:
	spin_lock(&cd->queue_lock);
	/* need to find next request */
	while (rp->q.list.next != &cd->queue &&
	       list_entry(rp->q.list.next, struct cache_queue, list)
	       ->reader) {
		struct list_head *next = rp->q.list.next;
		list_move(&rp->q.list, next);
	}
	if (rp->q.list.next == &cd->queue) {
	rq = cache_next_request(cd, rp->next_seqno);
	if (!rq) {
		spin_unlock(&cd->queue_lock);
		inode_unlock(inode);
		WARN_ON_ONCE(rp->offset);
		return 0;
	}
	rq = container_of(rp->q.list.next, struct cache_request, q.list);
	WARN_ON_ONCE(rq->q.reader);
	if (rp->offset == 0)
		rq->readers++;
	spin_unlock(&cd->queue_lock);
@@ -876,9 +873,7 @@ static ssize_t cache_read(struct file *filp, char __user *buf, size_t count,

	if (rp->offset == 0 && !test_bit(CACHE_PENDING, &rq->item->flags)) {
		err = -EAGAIN;
		spin_lock(&cd->queue_lock);
		list_move(&rp->q.list, &rq->q.list);
		spin_unlock(&cd->queue_lock);
		rp->next_seqno = rq->seqno + 1;
	} else {
		if (rp->offset + count > rq->len)
			count = rq->len - rp->offset;
@@ -888,9 +883,7 @@ static ssize_t cache_read(struct file *filp, char __user *buf, size_t count,
		rp->offset += count;
		if (rp->offset >= rq->len) {
			rp->offset = 0;
			spin_lock(&cd->queue_lock);
			list_move(&rp->q.list, &rq->q.list);
			spin_unlock(&cd->queue_lock);
			rp->next_seqno = rq->seqno + 1;
		}
		err = 0;
	}
@@ -901,7 +894,7 @@ static ssize_t cache_read(struct file *filp, char __user *buf, size_t count,
		rq->readers--;
		if (rq->readers == 0 &&
		    !test_bit(CACHE_PENDING, &rq->item->flags)) {
			list_del(&rq->q.list);
			list_del(&rq->list);
			spin_unlock(&cd->queue_lock);
			cache_put(rq->item, cd);
			kfree(rq->buf);
@@ -976,7 +969,6 @@ static __poll_t cache_poll(struct file *filp, poll_table *wait,
{
	__poll_t mask;
	struct cache_reader *rp = filp->private_data;
	struct cache_queue *cq;

	poll_wait(filp, &cd->queue_wait, wait);

@@ -988,12 +980,8 @@ static __poll_t cache_poll(struct file *filp, poll_table *wait,

	spin_lock(&cd->queue_lock);

	for (cq= &rp->q; &cq->list != &cd->queue;
	     cq = list_entry(cq->list.next, struct cache_queue, list))
		if (!cq->reader) {
	if (cache_next_request(cd, rp->next_seqno))
		mask |= EPOLLIN | EPOLLRDNORM;
			break;
		}
	spin_unlock(&cd->queue_lock);
	return mask;
}
@@ -1004,7 +992,7 @@ static int cache_ioctl(struct inode *ino, struct file *filp,
{
	int len = 0;
	struct cache_reader *rp = filp->private_data;
	struct cache_queue *cq;
	struct cache_request *rq;

	if (cmd != FIONREAD || !rp)
		return -EINVAL;
@@ -1014,14 +1002,9 @@ static int cache_ioctl(struct inode *ino, struct file *filp,
	/* only find the length remaining in current request,
	 * or the length of the next request
	 */
	for (cq= &rp->q; &cq->list != &cd->queue;
	     cq = list_entry(cq->list.next, struct cache_queue, list))
		if (!cq->reader) {
			struct cache_request *cr =
				container_of(cq, struct cache_request, q);
			len = cr->len - rp->offset;
			break;
		}
	rq = cache_next_request(cd, rp->next_seqno);
	if (rq)
		len = rq->len - rp->offset;
	spin_unlock(&cd->queue_lock);

	return put_user(len, (int __user *)arg);
@@ -1042,10 +1025,10 @@ static int cache_open(struct inode *inode, struct file *filp,
			return -ENOMEM;
		}
		rp->offset = 0;
		rp->q.reader = 1;
		rp->next_seqno = 0;

		spin_lock(&cd->queue_lock);
		list_add(&rp->q.list, &cd->queue);
		list_add(&rp->list, &cd->readers);
		spin_unlock(&cd->queue_lock);
	}
	if (filp->f_mode & FMODE_WRITE)
@@ -1064,26 +1047,21 @@ static int cache_release(struct inode *inode, struct file *filp,

		spin_lock(&cd->queue_lock);
		if (rp->offset) {
			struct cache_queue *cq;
			for (cq = &rp->q; &cq->list != &cd->queue;
			     cq = list_entry(cq->list.next,
					     struct cache_queue, list))
				if (!cq->reader) {
					struct cache_request *cr =
						container_of(cq,
						struct cache_request, q);
			struct cache_request *cr;

			cr = cache_next_request(cd, rp->next_seqno);
			if (cr) {
				cr->readers--;
				if (cr->readers == 0 &&
				    !test_bit(CACHE_PENDING,
					      &cr->item->flags)) {
						list_del(&cr->q.list);
					list_del(&cr->list);
					rq = cr;
				}
					break;
			}
			rp->offset = 0;
		}
		list_del(&rp->q.list);
		list_del(&rp->list);
		spin_unlock(&cd->queue_lock);

		if (rq) {
@@ -1107,14 +1085,11 @@ static int cache_release(struct inode *inode, struct file *filp,

static void cache_dequeue(struct cache_detail *detail, struct cache_head *ch)
{
	struct cache_queue *cq, *tmp;
	struct cache_request *cr;
	struct cache_request *cr, *tmp;
	LIST_HEAD(dequeued);

	spin_lock(&detail->queue_lock);
	list_for_each_entry_safe(cq, tmp, &detail->queue, list)
		if (!cq->reader) {
			cr = container_of(cq, struct cache_request, q);
	list_for_each_entry_safe(cr, tmp, &detail->requests, list) {
		if (cr->item != ch)
			continue;
		if (test_bit(CACHE_PENDING, &ch->flags))
@@ -1122,12 +1097,12 @@ static void cache_dequeue(struct cache_detail *detail, struct cache_head *ch)
			break;
		if (cr->readers != 0)
			continue;
			list_move(&cr->q.list, &dequeued);
		list_move(&cr->list, &dequeued);
	}
	spin_unlock(&detail->queue_lock);
	while (!list_empty(&dequeued)) {
		cr = list_entry(dequeued.next, struct cache_request, q.list);
		list_del(&cr->q.list);
		cr = list_entry(dequeued.next, struct cache_request, list);
		list_del(&cr->list);
		cache_put(cr->item, detail);
		kfree(cr->buf);
		kfree(cr);
@@ -1245,14 +1220,14 @@ static int cache_pipe_upcall(struct cache_detail *detail, struct cache_head *h)
		return -EAGAIN;
	}

	crq->q.reader = 0;
	crq->buf = buf;
	crq->len = 0;
	crq->readers = 0;
	spin_lock(&detail->queue_lock);
	if (test_bit(CACHE_PENDING, &h->flags)) {
		crq->item = cache_get(h);
		list_add_tail(&crq->q.list, &detail->queue);
		crq->seqno = detail->next_seqno++;
		list_add_tail(&crq->list, &detail->requests);
		trace_cache_entry_upcall(detail, h);
	} else
		/* Lost a race, no longer PENDING, so don't enqueue */