Commit 58bb3b37 authored by Sage Weil's avatar Sage Weil
Browse files

ceph: support ceph_pagelist for message payload



The ceph_pagelist is a simple list of whole pages, strung together via
their lru list_head.  It facilitates encoding to a "buffer" of unknown
size.  Allow its use in place of the ceph_msg page vector.

This will be used to fix the huge buffer preallocation woes of MDS
reconnection.

Signed-off-by: default avatarSage Weil <sage@newdream.net>
parent 04a419f9
Loading
Loading
Loading
Loading
+1 −1
Original line number Diff line number Diff line
@@ -8,7 +8,7 @@ obj-$(CONFIG_CEPH_FS) += ceph.o

ceph-objs := super.o inode.o dir.o file.o addr.o ioctl.o \
	export.o caps.o snap.o xattr.o \
	messenger.o msgpool.o buffer.o \
	messenger.o msgpool.o buffer.o pagelist.o \
	mds_client.o mdsmap.o \
	mon_client.o \
	osd_client.o osdmap.o crush/crush.o crush/mapper.o crush/hash.o \
+20 −4
Original line number Diff line number Diff line
@@ -13,6 +13,7 @@
#include "super.h"
#include "messenger.h"
#include "decode.h"
#include "pagelist.h"

/*
 * Ceph uses the messenger to exchange ceph_msg messages with other
@@ -728,6 +729,11 @@ static int write_partial_msg_pages(struct ceph_connection *con)
			page = msg->pages[con->out_msg_pos.page];
			if (crc)
				kaddr = kmap(page);
		} else if (msg->pagelist) {
			page = list_first_entry(&msg->pagelist->head,
						struct page, lru);
			if (crc)
				kaddr = kmap(page);
		} else {
			page = con->msgr->zero_page;
			if (crc)
@@ -750,7 +756,7 @@ static int write_partial_msg_pages(struct ceph_connection *con)
				      MSG_DONTWAIT | MSG_NOSIGNAL |
				      MSG_MORE);

		if (crc && msg->pages)
		if (crc && (msg->pages || msg->pagelist))
			kunmap(page);

		if (ret <= 0)
@@ -762,6 +768,9 @@ static int write_partial_msg_pages(struct ceph_connection *con)
			con->out_msg_pos.page_pos = 0;
			con->out_msg_pos.page++;
			con->out_msg_pos.did_page_crc = 0;
			if (msg->pagelist)
				list_move_tail(&page->lru,
					       &msg->pagelist->head);
		}
	}

@@ -1051,13 +1060,13 @@ static int process_banner(struct ceph_connection *con)
				       &con->actual_peer_addr) &&
	    !(addr_is_blank(&con->actual_peer_addr.in_addr) &&
	      con->actual_peer_addr.nonce == con->peer_addr.nonce)) {
		pr_err("wrong peer, want %s/%d, "
		       "got %s/%d, wtf\n",
		pr_warning("wrong peer, want %s/%d, "
		       "got %s/%d\n",
		       pr_addr(&con->peer_addr.in_addr),
		       con->peer_addr.nonce,
		       pr_addr(&con->actual_peer_addr.in_addr),
		       con->actual_peer_addr.nonce);
		con->error_msg = "protocol error, wrong peer";
		con->error_msg = "wrong peer at address";
		return -1;
	}

@@ -2096,6 +2105,7 @@ struct ceph_msg *ceph_msg_new(int type, int front_len,
	/* data */
	m->nr_pages = calc_pages_for(page_off, page_len);
	m->pages = pages;
	m->pagelist = NULL;

	dout("ceph_msg_new %p page %d~%d -> %d\n", m, page_off, page_len,
	     m->nr_pages);
@@ -2181,6 +2191,12 @@ void ceph_msg_last_put(struct kref *kref)
	m->nr_pages = 0;
	m->pages = NULL;

	if (m->pagelist) {
		ceph_pagelist_release(m->pagelist);
		kfree(m->pagelist);
		m->pagelist = NULL;
	}

	if (m->pool)
		ceph_msgpool_put(m->pool, m);
	else
+1 −0
Original line number Diff line number Diff line
@@ -85,6 +85,7 @@ struct ceph_msg {
	struct ceph_buffer *middle;
	struct page **pages;            /* data payload.  NOT OWNER. */
	unsigned nr_pages;              /* size of page array */
	struct ceph_pagelist *pagelist; /* instead of pages */
	struct list_head list_head;
	struct kref kref;
	bool front_is_vmalloc;

fs/ceph/pagelist.c

0 → 100644
+54 −0
Original line number Diff line number Diff line

#include <linux/pagemap.h>
#include <linux/highmem.h>

#include "pagelist.h"

int ceph_pagelist_release(struct ceph_pagelist *pl)
{
	if (pl->mapped_tail)
		kunmap(pl->mapped_tail);
	while (!list_empty(&pl->head)) {
		struct page *page = list_first_entry(&pl->head, struct page,
						     lru);
		list_del(&page->lru);
		__free_page(page);
	}
	return 0;
}

static int ceph_pagelist_addpage(struct ceph_pagelist *pl)
{
	struct page *page = alloc_page(GFP_NOFS);
	if (!page)
		return -ENOMEM;
	pl->room += PAGE_SIZE;
	list_add_tail(&page->lru, &pl->head);
	if (pl->mapped_tail)
		kunmap(pl->mapped_tail);
	pl->mapped_tail = kmap(page);
	return 0;
}

int ceph_pagelist_append(struct ceph_pagelist *pl, void *buf, size_t len)
{
	while (pl->room < len) {
		size_t bit = pl->room;
		int ret;

		memcpy(pl->mapped_tail + (pl->length & ~PAGE_CACHE_MASK),
		       buf, bit);
		pl->length += bit;
		pl->room -= bit;
		buf += bit;
		len -= bit;
		ret = ceph_pagelist_addpage(pl);
		if (ret)
			return ret;
	}

	memcpy(pl->mapped_tail + (pl->length & ~PAGE_CACHE_MASK), buf, len);
	pl->length += len;
	pl->room -= len;
	return 0;
}

fs/ceph/pagelist.h

0 → 100644
+54 −0
Original line number Diff line number Diff line
#ifndef __FS_CEPH_PAGELIST_H
#define __FS_CEPH_PAGELIST_H

#include <linux/list.h>

struct ceph_pagelist {
	struct list_head head;
	void *mapped_tail;
	size_t length;
	size_t room;
};

static inline void ceph_pagelist_init(struct ceph_pagelist *pl)
{
	INIT_LIST_HEAD(&pl->head);
	pl->mapped_tail = NULL;
	pl->length = 0;
	pl->room = 0;
}
extern int ceph_pagelist_release(struct ceph_pagelist *pl);

extern int ceph_pagelist_append(struct ceph_pagelist *pl, void *d, size_t l);

static inline int ceph_pagelist_encode_64(struct ceph_pagelist *pl, u64 v)
{
	__le64 ev = cpu_to_le64(v);
	return ceph_pagelist_append(pl, &ev, sizeof(ev));
}
static inline int ceph_pagelist_encode_32(struct ceph_pagelist *pl, u32 v)
{
	__le32 ev = cpu_to_le32(v);
	return ceph_pagelist_append(pl, &ev, sizeof(ev));
}
static inline int ceph_pagelist_encode_16(struct ceph_pagelist *pl, u16 v)
{
	__le16 ev = cpu_to_le16(v);
	return ceph_pagelist_append(pl, &ev, sizeof(ev));
}
static inline int ceph_pagelist_encode_8(struct ceph_pagelist *pl, u8 v)
{
	return ceph_pagelist_append(pl, &v, 1);
}
static inline int ceph_pagelist_encode_string(struct ceph_pagelist *pl,
					      char *s, size_t len)
{
	int ret = ceph_pagelist_encode_32(pl, len);
	if (ret)
		return ret;
	if (len)
		return ceph_pagelist_append(pl, s, len);
	return 0;
}

#endif