Unverified Commit 478dbf12 authored by Max Kellermann's avatar Max Kellermann Committed by Christian Brauner
Browse files

fs/pipe: use spinlock in pipe_read() only if there is a watch_queue



If there is no watch_queue, holding the pipe mutex is enough to
prevent concurrent writes, and we can avoid the spinlock.

O_NOTIFICATION_QUEUE is an exotic and rarely used feature, and of all
the pipes that exist at any given time, only very few actually have a
watch_queue, therefore it appears worthwile to optimize the common
case.

This patch does not optimize pipe_resize_ring() where the spinlocks
could be avoided as well; that does not seem like a worthwile
optimization because this function is not called often.

Related commits:

- commit 8df44129 ("pipe: Check for ring full inside of the
  spinlock in pipe_write()")
- commit b667b867 ("pipe: Advance tail pointer inside of wait
  spinlock in pipe_read()")
- commit 189b0ddc ("pipe: Fix missing lock in pipe_resize_ring()")

Signed-off-by: default avatarMax Kellermann <max.kellermann@ionos.com>
Message-Id: <20230921075755.1378787-4-max.kellermann@ionos.com>
Reviewed-by: default avatarDavid Howells <dhowells@redhat.com>
Signed-off-by: default avatarChristian Brauner <brauner@kernel.org>
parent dfaabf91
Loading
Loading
Loading
Loading
+32 −11
Original line number Diff line number Diff line
@@ -227,6 +227,36 @@ static inline bool pipe_readable(const struct pipe_inode_info *pipe)
	return !pipe_empty(head, tail) || !writers;
}

static inline unsigned int pipe_update_tail(struct pipe_inode_info *pipe,
					    struct pipe_buffer *buf,
					    unsigned int tail)
{
	pipe_buf_release(pipe, buf);

	/*
	 * If the pipe has a watch_queue, we need additional protection
	 * by the spinlock because notifications get posted with only
	 * this spinlock, no mutex
	 */
	if (pipe_has_watch_queue(pipe)) {
		spin_lock_irq(&pipe->rd_wait.lock);
#ifdef CONFIG_WATCH_QUEUE
		if (buf->flags & PIPE_BUF_FLAG_LOSS)
			pipe->note_loss = true;
#endif
		pipe->tail = ++tail;
		spin_unlock_irq(&pipe->rd_wait.lock);
		return tail;
	}

	/*
	 * Without a watch_queue, we can simply increment the tail
	 * without the spinlock - the mutex is enough.
	 */
	pipe->tail = ++tail;
	return tail;
}

static ssize_t
pipe_read(struct kiocb *iocb, struct iov_iter *to)
{
@@ -320,17 +350,8 @@ pipe_read(struct kiocb *iocb, struct iov_iter *to)
				buf->len = 0;
			}

			if (!buf->len) {
				pipe_buf_release(pipe, buf);
				spin_lock_irq(&pipe->rd_wait.lock);
#ifdef CONFIG_WATCH_QUEUE
				if (buf->flags & PIPE_BUF_FLAG_LOSS)
					pipe->note_loss = true;
#endif
				tail++;
				pipe->tail = tail;
				spin_unlock_irq(&pipe->rd_wait.lock);
			}
			if (!buf->len)
				tail = pipe_update_tail(pipe, buf, tail);
			total_len -= chars;
			if (!total_len)
				break;	/* common path: read succeeded */