Commit b237e1f7 authored by Petr Pavlu's avatar Petr Pavlu Committed by Steven Rostedt (Google)
Browse files

ring-buffer: Limit time with disabled interrupts in rb_check_pages()

The function rb_check_pages() validates the integrity of a specified
per-CPU tracing ring buffer. It does so by traversing the underlying
linked list and checking its next and prev links.

To guarantee that the list isn't modified during the check, a caller
typically needs to take cpu_buffer->reader_lock. This prevents the check
from running concurrently, for example, with a potential reader which
can make the list temporarily inconsistent when swapping its old reader
page into the buffer.

A problem with this approach is that the time when interrupts are
disabled is non-deterministic, dependent on the ring buffer size. This
particularly affects PREEMPT_RT because the reader_lock is a raw
spinlock which doesn't become sleepable on PREEMPT_RT kernels.

Modify the check so it still attempts to traverse the entire list, but
gives up the reader_lock between checking individual pages. Introduce
for this purpose a new variable ring_buffer_per_cpu.cnt which is bumped
any time the list is modified. The value is used by rb_check_pages() to
detect such a change and restart the check.

Cc: Masami Hiramatsu <mhiramat@kernel.org>
Cc: Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
Link: https://lore.kernel.org/20241015112810.27203-1-petr.pavlu@suse.com


Signed-off-by: default avatarPetr Pavlu <petr.pavlu@suse.com>
Signed-off-by: default avatarSteven Rostedt (Google) <rostedt@goodmis.org>
parent 09661f75
Loading
Loading
Loading
Loading
+72 −26
Original line number Diff line number Diff line
@@ -482,6 +482,8 @@ struct ring_buffer_per_cpu {
	unsigned long			nr_pages;
	unsigned int			current_context;
	struct list_head		*pages;
	/* pages generation counter, incremented when the list changes */
	unsigned long			cnt;
	struct buffer_page		*head_page;	/* read from head */
	struct buffer_page		*tail_page;	/* write to tail */
	struct buffer_page		*commit_page;	/* committed pages */
@@ -1475,40 +1477,87 @@ static void rb_check_bpage(struct ring_buffer_per_cpu *cpu_buffer,
	RB_WARN_ON(cpu_buffer, val & RB_FLAG_MASK);
}

static bool rb_check_links(struct ring_buffer_per_cpu *cpu_buffer,
			   struct list_head *list)
{
	if (RB_WARN_ON(cpu_buffer,
		       rb_list_head(rb_list_head(list->next)->prev) != list))
		return false;

	if (RB_WARN_ON(cpu_buffer,
		       rb_list_head(rb_list_head(list->prev)->next) != list))
		return false;

	return true;
}

/**
 * rb_check_pages - integrity check of buffer pages
 * @cpu_buffer: CPU buffer with pages to test
 *
 * As a safety measure we check to make sure the data pages have not
 * been corrupted.
 *
 * Callers of this function need to guarantee that the list of pages doesn't get
 * modified during the check. In particular, if it's possible that the function
 * is invoked with concurrent readers which can swap in a new reader page then
 * the caller should take cpu_buffer->reader_lock.
 */
static void rb_check_pages(struct ring_buffer_per_cpu *cpu_buffer)
{
	struct list_head *head = rb_list_head(cpu_buffer->pages);
	struct list_head *tmp;
	struct list_head *head, *tmp;
	unsigned long buffer_cnt;
	unsigned long flags;
	int nr_loops = 0;

	if (RB_WARN_ON(cpu_buffer,
			rb_list_head(rb_list_head(head->next)->prev) != head))
	/*
	 * Walk the linked list underpinning the ring buffer and validate all
	 * its next and prev links.
	 *
	 * The check acquires the reader_lock to avoid concurrent processing
	 * with code that could be modifying the list. However, the lock cannot
	 * be held for the entire duration of the walk, as this would make the
	 * time when interrupts are disabled non-deterministic, dependent on the
	 * ring buffer size. Therefore, the code releases and re-acquires the
	 * lock after checking each page. The ring_buffer_per_cpu.cnt variable
	 * is then used to detect if the list was modified while the lock was
	 * not held, in which case the check needs to be restarted.
	 *
	 * The code attempts to perform the check at most three times before
	 * giving up. This is acceptable because this is only a self-validation
	 * to detect problems early on. In practice, the list modification
	 * operations are fairly spaced, and so this check typically succeeds at
	 * most on the second try.
	 */
again:
	if (++nr_loops > 3)
		return;

	if (RB_WARN_ON(cpu_buffer,
			rb_list_head(rb_list_head(head->prev)->next) != head))
		return;
	raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags);
	head = rb_list_head(cpu_buffer->pages);
	if (!rb_check_links(cpu_buffer, head))
		goto out_locked;
	buffer_cnt = cpu_buffer->cnt;
	tmp = head;
	raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags);

	for (tmp = rb_list_head(head->next); tmp != head; tmp = rb_list_head(tmp->next)) {
		if (RB_WARN_ON(cpu_buffer,
				rb_list_head(rb_list_head(tmp->next)->prev) != tmp))
			return;
	while (true) {
		raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags);

		if (RB_WARN_ON(cpu_buffer,
				rb_list_head(rb_list_head(tmp->prev)->next) != tmp))
			return;
		if (buffer_cnt != cpu_buffer->cnt) {
			/* The list was updated, try again. */
			raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags);
			goto again;
		}

		tmp = rb_list_head(tmp->next);
		if (tmp == head)
			/* The iteration circled back, all is done. */
			goto out_locked;

		if (!rb_check_links(cpu_buffer, tmp))
			goto out_locked;

		raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags);
	}

out_locked:
	raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags);
}

/*
@@ -2535,6 +2584,7 @@ rb_remove_pages(struct ring_buffer_per_cpu *cpu_buffer, unsigned long nr_pages)

	/* make sure pages points to a valid page in the ring buffer */
	cpu_buffer->pages = next_page;
	cpu_buffer->cnt++;

	/* update head page */
	if (head_bit)
@@ -2641,6 +2691,7 @@ rb_insert_pages(struct ring_buffer_per_cpu *cpu_buffer)
			 * pointer to point to end of list
			 */
			head_page->prev = last_page;
			cpu_buffer->cnt++;
			success = true;
			break;
		}
@@ -2876,12 +2927,8 @@ int ring_buffer_resize(struct trace_buffer *buffer, unsigned long size,
		 */
		synchronize_rcu();
		for_each_buffer_cpu(buffer, cpu) {
			unsigned long flags;

			cpu_buffer = buffer->buffers[cpu];
			raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags);
			rb_check_pages(cpu_buffer);
			raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags);
		}
		atomic_dec(&buffer->record_disabled);
	}
@@ -5299,6 +5346,7 @@ rb_get_reader_page(struct ring_buffer_per_cpu *cpu_buffer)
	rb_list_head(reader->list.next)->prev = &cpu_buffer->reader_page->list;
	rb_inc_page(&cpu_buffer->head_page);

	cpu_buffer->cnt++;
	local_inc(&cpu_buffer->pages_read);

	/* Finally update the reader page to the new head */
@@ -5838,12 +5886,9 @@ void
ring_buffer_read_finish(struct ring_buffer_iter *iter)
{
	struct ring_buffer_per_cpu *cpu_buffer = iter->cpu_buffer;
	unsigned long flags;

	/* Use this opportunity to check the integrity of the ring buffer. */
	raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags);
	rb_check_pages(cpu_buffer);
	raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags);

	atomic_dec(&cpu_buffer->resize_disabled);
	kfree(iter->event);
@@ -6760,6 +6805,7 @@ int ring_buffer_subbuf_order_set(struct trace_buffer *buffer, int order)
		/* Install the new pages, remove the head from the list */
		cpu_buffer->pages = cpu_buffer->new_pages.next;
		list_del_init(&cpu_buffer->new_pages);
		cpu_buffer->cnt++;

		cpu_buffer->head_page
			= list_entry(cpu_buffer->pages, struct buffer_page, list);