Commit d396f89d authored by Jeff Layton's avatar Jeff Layton Committed by Ilya Dryomov
Browse files

libceph: add sparse read support to msgr1



Add 2 new fields to ceph_connection_v1_info to track the necessary info
in sparse reads. Skip initializing the cursor for a sparse read.

Break out read_partial_message_section into a wrapper around a new
read_partial_message_chunk function that doesn't zero out the crc first.

Add new helper functions to drive receiving into the destinations
provided by the sparse_read state machine.

Signed-off-by: default avatarJeff Layton <jlayton@kernel.org>
Reviewed-by: default avatarXiubo Li <xiubli@redhat.com>
Reviewed-and-tested-by: default avatarLuís Henriques <lhenriques@suse.de>
Reviewed-by: default avatarMilind Changire <mchangir@redhat.com>
Signed-off-by: default avatarIlya Dryomov <idryomov@gmail.com>
parent f36217e3
Loading
Loading
Loading
Loading
+4 −0
Original line number Diff line number Diff line
@@ -336,6 +336,10 @@ struct ceph_connection_v1_info {

	int in_base_pos;     /* bytes read */

	/* sparse reads */
	struct kvec in_sr_kvec; /* current location to receive into */
	u64 in_sr_len;		/* amount of data in this extent */

	/* message in temps */
	u8 in_tag;           /* protocol control byte */
	struct ceph_msg_header in_hdr;
+90 −8
Original line number Diff line number Diff line
@@ -159,8 +159,8 @@ static size_t sizeof_footer(struct ceph_connection *con)

static void prepare_message_data(struct ceph_msg *msg, u32 data_len)
{
	/* Initialize data cursor */

	/* Initialize data cursor if it's not a sparse read */
	if (!msg->sparse_read)
		ceph_msg_data_cursor_init(&msg->cursor, msg, data_len);
}

@@ -960,7 +960,7 @@ static void process_ack(struct ceph_connection *con)
	prepare_read_tag(con);
}

static int read_partial_message_section(struct ceph_connection *con,
static int read_partial_message_chunk(struct ceph_connection *con,
				      struct kvec *section,
				      unsigned int sec_len, u32 *crc)
{
@@ -978,11 +978,91 @@ static int read_partial_message_section(struct ceph_connection *con,
		section->iov_len += ret;
	}
	if (section->iov_len == sec_len)
		*crc = crc32c(0, section->iov_base, section->iov_len);
		*crc = crc32c(*crc, section->iov_base, section->iov_len);

	return 1;
}

static inline int read_partial_message_section(struct ceph_connection *con,
					       struct kvec *section,
					       unsigned int sec_len, u32 *crc)
{
	*crc = 0;
	return read_partial_message_chunk(con, section, sec_len, crc);
}

static int read_sparse_msg_extent(struct ceph_connection *con, u32 *crc)
{
	struct ceph_msg_data_cursor *cursor = &con->in_msg->cursor;
	bool do_bounce = ceph_test_opt(from_msgr(con->msgr), RXBOUNCE);

	if (do_bounce && unlikely(!con->bounce_page)) {
		con->bounce_page = alloc_page(GFP_NOIO);
		if (!con->bounce_page) {
			pr_err("failed to allocate bounce page\n");
			return -ENOMEM;
		}
	}

	while (cursor->sr_resid > 0) {
		struct page *page, *rpage;
		size_t off, len;
		int ret;

		page = ceph_msg_data_next(cursor, &off, &len);
		rpage = do_bounce ? con->bounce_page : page;

		/* clamp to what remains in extent */
		len = min_t(int, len, cursor->sr_resid);
		ret = ceph_tcp_recvpage(con->sock, rpage, (int)off, len);
		if (ret <= 0)
			return ret;
		*crc = ceph_crc32c_page(*crc, rpage, off, ret);
		ceph_msg_data_advance(cursor, (size_t)ret);
		cursor->sr_resid -= ret;
		if (do_bounce)
			memcpy_page(page, off, rpage, off, ret);
	}
	return 1;
}

static int read_sparse_msg_data(struct ceph_connection *con)
{
	struct ceph_msg_data_cursor *cursor = &con->in_msg->cursor;
	bool do_datacrc = !ceph_test_opt(from_msgr(con->msgr), NOCRC);
	u32 crc = 0;
	int ret = 1;

	if (do_datacrc)
		crc = con->in_data_crc;

	do {
		if (con->v1.in_sr_kvec.iov_base)
			ret = read_partial_message_chunk(con,
							 &con->v1.in_sr_kvec,
							 con->v1.in_sr_len,
							 &crc);
		else if (cursor->sr_resid > 0)
			ret = read_sparse_msg_extent(con, &crc);

		if (ret <= 0) {
			if (do_datacrc)
				con->in_data_crc = crc;
			return ret;
		}

		memset(&con->v1.in_sr_kvec, 0, sizeof(con->v1.in_sr_kvec));
		ret = con->ops->sparse_read(con, cursor,
				(char **)&con->v1.in_sr_kvec.iov_base);
		con->v1.in_sr_len = ret;
	} while (ret > 0);

	if (do_datacrc)
		con->in_data_crc = crc;

	return ret < 0 ? ret : 1;  /* must return > 0 to indicate success */
}

static int read_partial_msg_data(struct ceph_connection *con)
{
	struct ceph_msg_data_cursor *cursor = &con->in_msg->cursor;
@@ -1173,7 +1253,9 @@ static int read_partial_message(struct ceph_connection *con)
		if (!m->num_data_items)
			return -EIO;

		if (ceph_test_opt(from_msgr(con->msgr), RXBOUNCE))
		if (m->sparse_read)
			ret = read_sparse_msg_data(con);
		else if (ceph_test_opt(from_msgr(con->msgr), RXBOUNCE))
			ret = read_partial_msg_data_bounce(con);
		else
			ret = read_partial_msg_data(con);