Commit 53683e40 authored by Linus Torvalds's avatar Linus Torvalds
Browse files

Merge tag 'trace-ringbuffer-v6.10' of...

Merge tag 'trace-ringbuffer-v6.10' of git://git.kernel.org/pub/scm/linux/kernel/git/trace/linux-trace

Pull tracing ring buffer updates from Steven Rostedt:
 "Add ring_buffer memory mappings.

  The tracing ring buffer was created based on being mostly used with
  the splice system call. It is broken up into page ordered sub-buffers
  and the reader swaps a new sub-buffer with an existing sub-buffer
  that's part of the write buffer. It then has total access to the
  swapped out sub-buffer and can do copyless movements of the memory
  into other mediums (file system, network, etc).

  The buffer is great for passing around the ring buffer contents in the
  kernel, but is not so good for when the consumer is the user space
  task itself.

  A new interface is added that allows user space to memory map the ring
  buffer. It will get all the write sub-buffers as well as reader
  sub-buffer (that is not written to). It can send an ioctl to change
  which sub-buffer is the new reader sub-buffer.

  The ring buffer is read only to user space. It only needs to call the
  ioctl when it is finished with a sub-buffer and needs a new sub-buffer
  that the writer will not write over.

  A self test program was also created for testing and can be used as an
  example for the interface to user space. The libtracefs (external to
  the kernel) also has code that interacts with this, although it is
  disabled until the interface is in a official release. It can be
  enabled by compiling the library with a special flag. This was used
  for testing applications that perform better with the buffer being
  mapped.

  Memory mapped buffers have limitations. The main one is that it can
  not be used with the snapshot logic. If the buffer is mapped,
  snapshots will be disabled. If any logic is set to trigger snapshots
  on a buffer, that buffer will not be allowed to be mapped"

* tag 'trace-ringbuffer-v6.10' of git://git.kernel.org/pub/scm/linux/kernel/git/trace/linux-trace:
  ring-buffer: Add cast to unsigned long addr passed to virt_to_page()
  ring-buffer: Have mmapped ring buffer keep track of missed events
  ring-buffer/selftest: Add ring-buffer mapping test
  Documentation: tracing: Add ring-buffer mapping
  tracing: Allow user-space mapping of the ring-buffer
  ring-buffer: Introducing ring-buffer mapping functions
  ring-buffer: Allocate sub-buffers with __GFP_COMP
parents 594d2815 b9c6820f
Loading
Loading
Loading
Loading
+1 −0
Original line number Diff line number Diff line
@@ -29,6 +29,7 @@ Linux Tracing Technologies
   timerlat-tracer
   intel_th
   ring-buffer-design
   ring-buffer-map
   stm
   sys-t
   coresight/index
+106 −0
Original line number Diff line number Diff line
.. SPDX-License-Identifier: GPL-2.0

==================================
Tracefs ring-buffer memory mapping
==================================

:Author: Vincent Donnefort <vdonnefort@google.com>

Overview
========
Tracefs ring-buffer memory map provides an efficient method to stream data
as no memory copy is necessary. The application mapping the ring-buffer becomes
then a consumer for that ring-buffer, in a similar fashion to trace_pipe.

Memory mapping setup
====================
The mapping works with a mmap() of the trace_pipe_raw interface.

The first system page of the mapping contains ring-buffer statistics and
description. It is referred to as the meta-page. One of the most important
fields of the meta-page is the reader. It contains the sub-buffer ID which can
be safely read by the mapper (see ring-buffer-design.rst).

The meta-page is followed by all the sub-buffers, ordered by ascending ID. It is
therefore effortless to know where the reader starts in the mapping:

.. code-block:: c

        reader_id = meta->reader->id;
        reader_offset = meta->meta_page_size + reader_id * meta->subbuf_size;

When the application is done with the current reader, it can get a new one using
the trace_pipe_raw ioctl() TRACE_MMAP_IOCTL_GET_READER. This ioctl also updates
the meta-page fields.

Limitations
===========
When a mapping is in place on a Tracefs ring-buffer, it is not possible to
either resize it (either by increasing the entire size of the ring-buffer or
each subbuf). It is also not possible to use snapshot and causes splice to copy
the ring buffer data instead of using the copyless swap from the ring buffer.

Concurrent readers (either another application mapping that ring-buffer or the
kernel with trace_pipe) are allowed but not recommended. They will compete for
the ring-buffer and the output is unpredictable, just like concurrent readers on
trace_pipe would be.

Example
=======

.. code-block:: c

        #include <fcntl.h>
        #include <stdio.h>
        #include <stdlib.h>
        #include <unistd.h>

        #include <linux/trace_mmap.h>

        #include <sys/mman.h>
        #include <sys/ioctl.h>

        #define TRACE_PIPE_RAW "/sys/kernel/tracing/per_cpu/cpu0/trace_pipe_raw"

        int main(void)
        {
                int page_size = getpagesize(), fd, reader_id;
                unsigned long meta_len, data_len;
                struct trace_buffer_meta *meta;
                void *map, *reader, *data;

                fd = open(TRACE_PIPE_RAW, O_RDONLY | O_NONBLOCK);
                if (fd < 0)
                        exit(EXIT_FAILURE);

                map = mmap(NULL, page_size, PROT_READ, MAP_SHARED, fd, 0);
                if (map == MAP_FAILED)
                        exit(EXIT_FAILURE);

                meta = (struct trace_buffer_meta *)map;
                meta_len = meta->meta_page_size;

                printf("entries:        %llu\n", meta->entries);
                printf("overrun:        %llu\n", meta->overrun);
                printf("read:           %llu\n", meta->read);
                printf("nr_subbufs:     %u\n", meta->nr_subbufs);

                data_len = meta->subbuf_size * meta->nr_subbufs;
                data = mmap(NULL, data_len, PROT_READ, MAP_SHARED, fd, meta_len);
                if (data == MAP_FAILED)
                        exit(EXIT_FAILURE);

                if (ioctl(fd, TRACE_MMAP_IOCTL_GET_READER) < 0)
                        exit(EXIT_FAILURE);

                reader_id = meta->reader.id;
                reader = data + meta->subbuf_size * reader_id;

                printf("Current reader address: %p\n", reader);

                munmap(data, data_len);
                munmap(meta, meta_len);
                close (fd);

                return 0;
        }
+6 −0
Original line number Diff line number Diff line
@@ -6,6 +6,8 @@
#include <linux/seq_file.h>
#include <linux/poll.h>

#include <uapi/linux/trace_mmap.h>

struct trace_buffer;
struct ring_buffer_iter;

@@ -223,4 +225,8 @@ int trace_rb_cpu_prepare(unsigned int cpu, struct hlist_node *node);
#define trace_rb_cpu_prepare	NULL
#endif

int ring_buffer_map(struct trace_buffer *buffer, int cpu,
		    struct vm_area_struct *vma);
int ring_buffer_unmap(struct trace_buffer *buffer, int cpu);
int ring_buffer_map_get_reader(struct trace_buffer *buffer, int cpu);
#endif /* _LINUX_RING_BUFFER_H */
+48 −0
Original line number Diff line number Diff line
/* SPDX-License-Identifier: GPL-2.0 WITH Linux-syscall-note */
#ifndef _TRACE_MMAP_H_
#define _TRACE_MMAP_H_

#include <linux/types.h>

/**
 * struct trace_buffer_meta - Ring-buffer Meta-page description
 * @meta_page_size:	Size of this meta-page.
 * @meta_struct_len:	Size of this structure.
 * @subbuf_size:	Size of each sub-buffer.
 * @nr_subbufs:		Number of subbfs in the ring-buffer, including the reader.
 * @reader.lost_events:	Number of events lost at the time of the reader swap.
 * @reader.id:		subbuf ID of the current reader. ID range [0 : @nr_subbufs - 1]
 * @reader.read:	Number of bytes read on the reader subbuf.
 * @flags:		Placeholder for now, 0 until new features are supported.
 * @entries:		Number of entries in the ring-buffer.
 * @overrun:		Number of entries lost in the ring-buffer.
 * @read:		Number of entries that have been read.
 * @Reserved1:		Internal use only.
 * @Reserved2:		Internal use only.
 */
struct trace_buffer_meta {
	__u32		meta_page_size;
	__u32		meta_struct_len;

	__u32		subbuf_size;
	__u32		nr_subbufs;

	struct {
		__u64	lost_events;
		__u32	id;
		__u32	read;
	} reader;

	__u64	flags;

	__u64	entries;
	__u64	overrun;
	__u64	read;

	__u64	Reserved1;
	__u64	Reserved2;
};

#define TRACE_MMAP_IOCTL_GET_READER		_IO('T', 0x1)

#endif /* _TRACE_MMAP_H_ */
+460 −11
Original line number Diff line number Diff line
@@ -9,6 +9,7 @@
#include <linux/ring_buffer.h>
#include <linux/trace_clock.h>
#include <linux/sched/clock.h>
#include <linux/cacheflush.h>
#include <linux/trace_seq.h>
#include <linux/spinlock.h>
#include <linux/irq_work.h>
@@ -26,6 +27,7 @@
#include <linux/list.h>
#include <linux/cpu.h>
#include <linux/oom.h>
#include <linux/mm.h>

#include <asm/local64.h>
#include <asm/local.h>
@@ -312,6 +314,8 @@ static u64 rb_event_time_stamp(struct ring_buffer_event *event)
/* Missed count stored at end */
#define RB_MISSED_STORED	(1 << 30)

#define RB_MISSED_MASK		(3 << 30)

struct buffer_data_page {
	u64		 time_stamp;	/* page time stamp */
	local_t		 commit;	/* write committed index */
@@ -338,6 +342,7 @@ struct buffer_page {
	local_t		 entries;	/* entries on this page */
	unsigned long	 real_end;	/* real end of data */
	unsigned	 order;		/* order of the page */
	u32		 id;		/* ID for external mapping */
	struct buffer_data_page *page;	/* Actual data page */
};

@@ -484,6 +489,12 @@ struct ring_buffer_per_cpu {
	u64				read_stamp;
	/* pages removed since last reset */
	unsigned long			pages_removed;

	unsigned int			mapped;
	struct mutex			mapping_lock;
	unsigned long			*subbuf_ids;	/* ID to subbuf VA */
	struct trace_buffer_meta	*meta_page;

	/* ring buffer pages to update, > 0 to add, < 0 to remove */
	long				nr_pages_to_update;
	struct list_head		new_pages; /* new pages to add */
@@ -1524,7 +1535,7 @@ static int __rb_allocate_pages(struct ring_buffer_per_cpu *cpu_buffer,
		list_add(&bpage->list, pages);

		page = alloc_pages_node(cpu_to_node(cpu_buffer->cpu),
					mflags | __GFP_ZERO,
					mflags | __GFP_COMP | __GFP_ZERO,
					cpu_buffer->buffer->subbuf_order);
		if (!page)
			goto free_pages;
@@ -1599,6 +1610,7 @@ rb_allocate_cpu_buffer(struct trace_buffer *buffer, long nr_pages, int cpu)
	init_irq_work(&cpu_buffer->irq_work.work, rb_wake_up_waiters);
	init_waitqueue_head(&cpu_buffer->irq_work.waiters);
	init_waitqueue_head(&cpu_buffer->irq_work.full_waiters);
	mutex_init(&cpu_buffer->mapping_lock);

	bpage = kzalloc_node(ALIGN(sizeof(*bpage), cache_line_size()),
			    GFP_KERNEL, cpu_to_node(cpu));
@@ -1609,7 +1621,7 @@ rb_allocate_cpu_buffer(struct trace_buffer *buffer, long nr_pages, int cpu)

	cpu_buffer->reader_page = bpage;

	page = alloc_pages_node(cpu_to_node(cpu), GFP_KERNEL | __GFP_ZERO,
	page = alloc_pages_node(cpu_to_node(cpu), GFP_KERNEL | __GFP_COMP | __GFP_ZERO,
				cpu_buffer->buffer->subbuf_order);
	if (!page)
		goto fail_free_reader;
@@ -1789,8 +1801,6 @@ bool ring_buffer_time_stamp_abs(struct trace_buffer *buffer)
	return buffer->time_stamp_abs;
}

static void rb_reset_cpu(struct ring_buffer_per_cpu *cpu_buffer);

static inline unsigned long rb_page_entries(struct buffer_page *bpage)
{
	return local_read(&bpage->entries) & RB_WRITE_MASK;
@@ -2318,7 +2328,7 @@ rb_iter_head_event(struct ring_buffer_iter *iter)
/* Size is determined by what has been committed */
static __always_inline unsigned rb_page_size(struct buffer_page *bpage)
{
	return rb_page_commit(bpage);
	return rb_page_commit(bpage) & ~RB_MISSED_MASK;
}

static __always_inline unsigned
@@ -3945,7 +3955,7 @@ static bool rb_per_cpu_empty(struct ring_buffer_per_cpu *cpu_buffer)
		return true;

	/* Reader should exhaust content in reader page */
	if (reader->read != rb_page_commit(reader))
	if (reader->read != rb_page_size(reader))
		return false;

	/*
@@ -4416,7 +4426,7 @@ int ring_buffer_iter_empty(struct ring_buffer_iter *iter)
	return ((iter->head_page == commit_page && iter->head >= commit) ||
		(iter->head_page == reader && commit_page == head_page &&
		 head_page->read == commit &&
		 iter->head == rb_page_commit(cpu_buffer->reader_page)));
		 iter->head == rb_page_size(cpu_buffer->reader_page)));
}
EXPORT_SYMBOL_GPL(ring_buffer_iter_empty);

@@ -5211,6 +5221,22 @@ static void rb_clear_buffer_page(struct buffer_page *page)
	page->read = 0;
}

static void rb_update_meta_page(struct ring_buffer_per_cpu *cpu_buffer)
{
	struct trace_buffer_meta *meta = cpu_buffer->meta_page;

	meta->reader.read = cpu_buffer->reader_page->read;
	meta->reader.id = cpu_buffer->reader_page->id;
	meta->reader.lost_events = cpu_buffer->lost_events;

	meta->entries = local_read(&cpu_buffer->entries);
	meta->overrun = local_read(&cpu_buffer->overrun);
	meta->read = cpu_buffer->read;

	/* Some archs do not have data cache coherency between kernel and user-space */
	flush_dcache_folio(virt_to_folio(cpu_buffer->meta_page));
}

static void
rb_reset_cpu(struct ring_buffer_per_cpu *cpu_buffer)
{
@@ -5255,6 +5281,9 @@ rb_reset_cpu(struct ring_buffer_per_cpu *cpu_buffer)
	cpu_buffer->lost_events = 0;
	cpu_buffer->last_overrun = 0;

	if (cpu_buffer->mapped)
		rb_update_meta_page(cpu_buffer);

	rb_head_page_activate(cpu_buffer);
	cpu_buffer->pages_removed = 0;
}
@@ -5469,6 +5498,12 @@ int ring_buffer_swap_cpu(struct trace_buffer *buffer_a,
	cpu_buffer_a = buffer_a->buffers[cpu];
	cpu_buffer_b = buffer_b->buffers[cpu];

	/* It's up to the callers to not try to swap mapped buffers */
	if (WARN_ON_ONCE(cpu_buffer_a->mapped || cpu_buffer_b->mapped)) {
		ret = -EBUSY;
		goto out;
	}

	/* At least make sure the two buffers are somewhat the same */
	if (cpu_buffer_a->nr_pages != cpu_buffer_b->nr_pages)
		goto out;
@@ -5579,7 +5614,7 @@ ring_buffer_alloc_read_page(struct trace_buffer *buffer, int cpu)
		goto out;

	page = alloc_pages_node(cpu_to_node(cpu),
				GFP_KERNEL | __GFP_NORETRY | __GFP_ZERO,
				GFP_KERNEL | __GFP_NORETRY | __GFP_COMP | __GFP_ZERO,
				cpu_buffer->buffer->subbuf_order);
	if (!page) {
		kfree(bpage);
@@ -5720,7 +5755,7 @@ int ring_buffer_read_page(struct trace_buffer *buffer,
	event = rb_reader_event(cpu_buffer);

	read = reader->read;
	commit = rb_page_commit(reader);
	commit = rb_page_size(reader);

	/* Check if any events were dropped */
	missed_events = cpu_buffer->lost_events;
@@ -5733,7 +5768,8 @@ int ring_buffer_read_page(struct trace_buffer *buffer,
	 * Otherwise, we can simply swap the page with the one passed in.
	 */
	if (read || (len < (commit - read)) ||
	    cpu_buffer->reader_page == cpu_buffer->commit_page) {
	    cpu_buffer->reader_page == cpu_buffer->commit_page ||
	    cpu_buffer->mapped) {
		struct buffer_data_page *rpage = cpu_buffer->reader_page->page;
		unsigned int rpos = read;
		unsigned int pos = 0;
@@ -5796,7 +5832,7 @@ int ring_buffer_read_page(struct trace_buffer *buffer,
	} else {
		/* update the entry counter */
		cpu_buffer->read += rb_page_entries(reader);
		cpu_buffer->read_bytes += rb_page_commit(reader);
		cpu_buffer->read_bytes += rb_page_size(reader);

		/* swap the pages */
		rb_init_page(bpage);
@@ -5956,6 +5992,11 @@ int ring_buffer_subbuf_order_set(struct trace_buffer *buffer, int order)

		cpu_buffer = buffer->buffers[cpu];

		if (cpu_buffer->mapped) {
			err = -EBUSY;
			goto error;
		}

		/* Update the number of pages to match the new size */
		nr_pages = old_size * buffer->buffers[cpu]->nr_pages;
		nr_pages = DIV_ROUND_UP(nr_pages, buffer->subbuf_size);
@@ -6057,6 +6098,414 @@ int ring_buffer_subbuf_order_set(struct trace_buffer *buffer, int order)
}
EXPORT_SYMBOL_GPL(ring_buffer_subbuf_order_set);

static int rb_alloc_meta_page(struct ring_buffer_per_cpu *cpu_buffer)
{
	struct page *page;

	if (cpu_buffer->meta_page)
		return 0;

	page = alloc_page(GFP_USER | __GFP_ZERO);
	if (!page)
		return -ENOMEM;

	cpu_buffer->meta_page = page_to_virt(page);

	return 0;
}

static void rb_free_meta_page(struct ring_buffer_per_cpu *cpu_buffer)
{
	unsigned long addr = (unsigned long)cpu_buffer->meta_page;

	free_page(addr);
	cpu_buffer->meta_page = NULL;
}

static void rb_setup_ids_meta_page(struct ring_buffer_per_cpu *cpu_buffer,
				   unsigned long *subbuf_ids)
{
	struct trace_buffer_meta *meta = cpu_buffer->meta_page;
	unsigned int nr_subbufs = cpu_buffer->nr_pages + 1;
	struct buffer_page *first_subbuf, *subbuf;
	int id = 0;

	subbuf_ids[id] = (unsigned long)cpu_buffer->reader_page->page;
	cpu_buffer->reader_page->id = id++;

	first_subbuf = subbuf = rb_set_head_page(cpu_buffer);
	do {
		if (WARN_ON(id >= nr_subbufs))
			break;

		subbuf_ids[id] = (unsigned long)subbuf->page;
		subbuf->id = id;

		rb_inc_page(&subbuf);
		id++;
	} while (subbuf != first_subbuf);

	/* install subbuf ID to kern VA translation */
	cpu_buffer->subbuf_ids = subbuf_ids;

	meta->meta_page_size = PAGE_SIZE;
	meta->meta_struct_len = sizeof(*meta);
	meta->nr_subbufs = nr_subbufs;
	meta->subbuf_size = cpu_buffer->buffer->subbuf_size + BUF_PAGE_HDR_SIZE;

	rb_update_meta_page(cpu_buffer);
}

static struct ring_buffer_per_cpu *
rb_get_mapped_buffer(struct trace_buffer *buffer, int cpu)
{
	struct ring_buffer_per_cpu *cpu_buffer;

	if (!cpumask_test_cpu(cpu, buffer->cpumask))
		return ERR_PTR(-EINVAL);

	cpu_buffer = buffer->buffers[cpu];

	mutex_lock(&cpu_buffer->mapping_lock);

	if (!cpu_buffer->mapped) {
		mutex_unlock(&cpu_buffer->mapping_lock);
		return ERR_PTR(-ENODEV);
	}

	return cpu_buffer;
}

static void rb_put_mapped_buffer(struct ring_buffer_per_cpu *cpu_buffer)
{
	mutex_unlock(&cpu_buffer->mapping_lock);
}

/*
 * Fast-path for rb_buffer_(un)map(). Called whenever the meta-page doesn't need
 * to be set-up or torn-down.
 */
static int __rb_inc_dec_mapped(struct ring_buffer_per_cpu *cpu_buffer,
			       bool inc)
{
	unsigned long flags;

	lockdep_assert_held(&cpu_buffer->mapping_lock);

	if (inc && cpu_buffer->mapped == UINT_MAX)
		return -EBUSY;

	if (WARN_ON(!inc && cpu_buffer->mapped == 0))
		return -EINVAL;

	mutex_lock(&cpu_buffer->buffer->mutex);
	raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags);

	if (inc)
		cpu_buffer->mapped++;
	else
		cpu_buffer->mapped--;

	raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags);
	mutex_unlock(&cpu_buffer->buffer->mutex);

	return 0;
}

/*
 *   +--------------+  pgoff == 0
 *   |   meta page  |
 *   +--------------+  pgoff == 1
 *   | subbuffer 0  |
 *   |              |
 *   +--------------+  pgoff == (1 + (1 << subbuf_order))
 *   | subbuffer 1  |
 *   |              |
 *         ...
 */
#ifdef CONFIG_MMU
static int __rb_map_vma(struct ring_buffer_per_cpu *cpu_buffer,
			struct vm_area_struct *vma)
{
	unsigned long nr_subbufs, nr_pages, vma_pages, pgoff = vma->vm_pgoff;
	unsigned int subbuf_pages, subbuf_order;
	struct page **pages;
	int p = 0, s = 0;
	int err;

	/* Refuse MP_PRIVATE or writable mappings */
	if (vma->vm_flags & VM_WRITE || vma->vm_flags & VM_EXEC ||
	    !(vma->vm_flags & VM_MAYSHARE))
		return -EPERM;

	/*
	 * Make sure the mapping cannot become writable later. Also tell the VM
	 * to not touch these pages (VM_DONTCOPY | VM_DONTEXPAND).
	 */
	vm_flags_mod(vma, VM_DONTCOPY | VM_DONTEXPAND | VM_DONTDUMP,
		     VM_MAYWRITE);

	lockdep_assert_held(&cpu_buffer->mapping_lock);

	subbuf_order = cpu_buffer->buffer->subbuf_order;
	subbuf_pages = 1 << subbuf_order;

	nr_subbufs = cpu_buffer->nr_pages + 1; /* + reader-subbuf */
	nr_pages = ((nr_subbufs) << subbuf_order) - pgoff + 1; /* + meta-page */

	vma_pages = (vma->vm_end - vma->vm_start) >> PAGE_SHIFT;
	if (!vma_pages || vma_pages > nr_pages)
		return -EINVAL;

	nr_pages = vma_pages;

	pages = kcalloc(nr_pages, sizeof(*pages), GFP_KERNEL);
	if (!pages)
		return -ENOMEM;

	if (!pgoff) {
		pages[p++] = virt_to_page(cpu_buffer->meta_page);

		/*
		 * TODO: Align sub-buffers on their size, once
		 * vm_insert_pages() supports the zero-page.
		 */
	} else {
		/* Skip the meta-page */
		pgoff--;

		if (pgoff % subbuf_pages) {
			err = -EINVAL;
			goto out;
		}

		s += pgoff / subbuf_pages;
	}

	while (p < nr_pages) {
		struct page *page = virt_to_page((void *)cpu_buffer->subbuf_ids[s]);
		int off = 0;

		if (WARN_ON_ONCE(s >= nr_subbufs)) {
			err = -EINVAL;
			goto out;
		}

		for (; off < (1 << (subbuf_order)); off++, page++) {
			if (p >= nr_pages)
				break;

			pages[p++] = page;
		}
		s++;
	}

	err = vm_insert_pages(vma, vma->vm_start, pages, &nr_pages);

out:
	kfree(pages);

	return err;
}
#else
static int __rb_map_vma(struct ring_buffer_per_cpu *cpu_buffer,
			struct vm_area_struct *vma)
{
	return -EOPNOTSUPP;
}
#endif

int ring_buffer_map(struct trace_buffer *buffer, int cpu,
		    struct vm_area_struct *vma)
{
	struct ring_buffer_per_cpu *cpu_buffer;
	unsigned long flags, *subbuf_ids;
	int err = 0;

	if (!cpumask_test_cpu(cpu, buffer->cpumask))
		return -EINVAL;

	cpu_buffer = buffer->buffers[cpu];

	mutex_lock(&cpu_buffer->mapping_lock);

	if (cpu_buffer->mapped) {
		err = __rb_map_vma(cpu_buffer, vma);
		if (!err)
			err = __rb_inc_dec_mapped(cpu_buffer, true);
		mutex_unlock(&cpu_buffer->mapping_lock);
		return err;
	}

	/* prevent another thread from changing buffer/sub-buffer sizes */
	mutex_lock(&buffer->mutex);

	err = rb_alloc_meta_page(cpu_buffer);
	if (err)
		goto unlock;

	/* subbuf_ids include the reader while nr_pages does not */
	subbuf_ids = kcalloc(cpu_buffer->nr_pages + 1, sizeof(*subbuf_ids), GFP_KERNEL);
	if (!subbuf_ids) {
		rb_free_meta_page(cpu_buffer);
		err = -ENOMEM;
		goto unlock;
	}

	atomic_inc(&cpu_buffer->resize_disabled);

	/*
	 * Lock all readers to block any subbuf swap until the subbuf IDs are
	 * assigned.
	 */
	raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags);
	rb_setup_ids_meta_page(cpu_buffer, subbuf_ids);
	raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags);

	err = __rb_map_vma(cpu_buffer, vma);
	if (!err) {
		raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags);
		cpu_buffer->mapped = 1;
		raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags);
	} else {
		kfree(cpu_buffer->subbuf_ids);
		cpu_buffer->subbuf_ids = NULL;
		rb_free_meta_page(cpu_buffer);
	}

unlock:
	mutex_unlock(&buffer->mutex);
	mutex_unlock(&cpu_buffer->mapping_lock);

	return err;
}

int ring_buffer_unmap(struct trace_buffer *buffer, int cpu)
{
	struct ring_buffer_per_cpu *cpu_buffer;
	unsigned long flags;
	int err = 0;

	if (!cpumask_test_cpu(cpu, buffer->cpumask))
		return -EINVAL;

	cpu_buffer = buffer->buffers[cpu];

	mutex_lock(&cpu_buffer->mapping_lock);

	if (!cpu_buffer->mapped) {
		err = -ENODEV;
		goto out;
	} else if (cpu_buffer->mapped > 1) {
		__rb_inc_dec_mapped(cpu_buffer, false);
		goto out;
	}

	mutex_lock(&buffer->mutex);
	raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags);

	cpu_buffer->mapped = 0;

	raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags);

	kfree(cpu_buffer->subbuf_ids);
	cpu_buffer->subbuf_ids = NULL;
	rb_free_meta_page(cpu_buffer);
	atomic_dec(&cpu_buffer->resize_disabled);

	mutex_unlock(&buffer->mutex);

out:
	mutex_unlock(&cpu_buffer->mapping_lock);

	return err;
}

int ring_buffer_map_get_reader(struct trace_buffer *buffer, int cpu)
{
	struct ring_buffer_per_cpu *cpu_buffer;
	struct buffer_page *reader;
	unsigned long missed_events;
	unsigned long reader_size;
	unsigned long flags;

	cpu_buffer = rb_get_mapped_buffer(buffer, cpu);
	if (IS_ERR(cpu_buffer))
		return (int)PTR_ERR(cpu_buffer);

	raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags);

consume:
	if (rb_per_cpu_empty(cpu_buffer))
		goto out;

	reader_size = rb_page_size(cpu_buffer->reader_page);

	/*
	 * There are data to be read on the current reader page, we can
	 * return to the caller. But before that, we assume the latter will read
	 * everything. Let's update the kernel reader accordingly.
	 */
	if (cpu_buffer->reader_page->read < reader_size) {
		while (cpu_buffer->reader_page->read < reader_size)
			rb_advance_reader(cpu_buffer);
		goto out;
	}

	reader = rb_get_reader_page(cpu_buffer);
	if (WARN_ON(!reader))
		goto out;

	/* Check if any events were dropped */
	missed_events = cpu_buffer->lost_events;

	if (cpu_buffer->reader_page != cpu_buffer->commit_page) {
		if (missed_events) {
			struct buffer_data_page *bpage = reader->page;
			unsigned int commit;
			/*
			 * Use the real_end for the data size,
			 * This gives us a chance to store the lost events
			 * on the page.
			 */
			if (reader->real_end)
				local_set(&bpage->commit, reader->real_end);
			/*
			 * If there is room at the end of the page to save the
			 * missed events, then record it there.
			 */
			commit = rb_page_size(reader);
			if (buffer->subbuf_size - commit >= sizeof(missed_events)) {
				memcpy(&bpage->data[commit], &missed_events,
				       sizeof(missed_events));
				local_add(RB_MISSED_STORED, &bpage->commit);
			}
			local_add(RB_MISSED_EVENTS, &bpage->commit);
		}
	} else {
		/*
		 * There really shouldn't be any missed events if the commit
		 * is on the reader page.
		 */
		WARN_ON_ONCE(missed_events);
	}

	cpu_buffer->lost_events = 0;

	goto consume;

out:
	/* Some archs do not have data cache coherency between kernel and user-space */
	flush_dcache_folio(virt_to_folio(cpu_buffer->reader_page->page));

	rb_update_meta_page(cpu_buffer);

	raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags);
	rb_put_mapped_buffer(cpu_buffer);

	return 0;
}

/*
 * We only allocate new buffers, never free them if the CPU goes down.
 * If we were to free the buffer, then the user would lose any trace that was in
Loading