Commit 9d7f0f13 authored by Yehuda Sadeh's avatar Yehuda Sadeh Committed by Sage Weil
Browse files

ceph: refactor messages data section allocation

parent 2450418c
Loading
Loading
Loading
Loading
+39 −28
Original line number Diff line number Diff line
@@ -1315,7 +1315,7 @@ static int read_partial_message(struct ceph_connection *con)
	struct ceph_msg *m = con->in_msg;
	void *p;
	int ret;
	int to, want, left;
	int to, left;
	unsigned front_len, middle_len, data_len, data_off;
	int datacrc = con->msgr->nocrc;
	int skip;
@@ -1351,6 +1351,7 @@ static int read_partial_message(struct ceph_connection *con)
	data_len = le32_to_cpu(con->in_hdr.data_len);
	if (data_len > CEPH_MSG_MAX_DATA_LEN)
		return -EIO;
	data_off = le16_to_cpu(con->in_hdr.data_off);

	/* allocate message? */
	if (!con->in_msg) {
@@ -1375,7 +1376,10 @@ static int read_partial_message(struct ceph_connection *con)
		m->front.iov_len = 0;    /* haven't read it yet */
		if (m->middle)
			m->middle->vec.iov_len = 0;
		memcpy(&m->hdr, &con->in_hdr, sizeof(con->in_hdr));

		con->in_msg_pos.page = 0;
		con->in_msg_pos.page_pos = data_off & ~PAGE_MASK;
		con->in_msg_pos.data_pos = 0;
	}

	/* front */
@@ -1393,31 +1397,6 @@ static int read_partial_message(struct ceph_connection *con)
	}

	/* (page) data */
	data_off = le16_to_cpu(m->hdr.data_off);
	if (data_len == 0)
		goto no_data;

	if (m->nr_pages == 0) {
		con->in_msg_pos.page = 0;
		con->in_msg_pos.page_pos = data_off & ~PAGE_MASK;
		con->in_msg_pos.data_pos = 0;
		/* find pages for data payload */
		want = calc_pages_for(data_off & ~PAGE_MASK, data_len);
		ret = -1;
		mutex_unlock(&con->mutex);
		if (con->ops->prepare_pages)
			ret = con->ops->prepare_pages(con, m, want);
		mutex_lock(&con->mutex);
		if (ret < 0) {
			dout("%p prepare_pages failed, skipping payload\n", m);
			con->in_base_pos = -data_len - sizeof(m->footer);
			ceph_msg_put(con->in_msg);
			con->in_msg = NULL;
			con->in_tag = CEPH_MSGR_TAG_READY;
			return 0;
		}
		BUG_ON(m->nr_pages < want);
	}
	while (con->in_msg_pos.data_pos < data_len) {
		left = min((int)(data_len - con->in_msg_pos.data_pos),
			   (int)(PAGE_SIZE - con->in_msg_pos.page_pos));
@@ -1440,7 +1419,6 @@ static int read_partial_message(struct ceph_connection *con)
		}
	}

no_data:
	/* footer */
	to = sizeof(m->hdr) + sizeof(m->footer);
	while (con->in_base_pos < to) {
@@ -2136,6 +2114,25 @@ static int ceph_alloc_middle(struct ceph_connection *con, struct ceph_msg *msg)
	return 0;
}

static int ceph_alloc_data_section(struct ceph_connection *con, struct ceph_msg *msg)
{
	int ret;
	int want;
	int data_len = le32_to_cpu(msg->hdr.data_len);
	unsigned data_off = le16_to_cpu(msg->hdr.data_off);

	want = calc_pages_for(data_off & ~PAGE_MASK, data_len);
	ret = -1;
	mutex_unlock(&con->mutex);
	if (con->ops->prepare_pages)
		ret = con->ops->prepare_pages(con, msg, want);
	mutex_lock(&con->mutex);

	BUG_ON(msg->nr_pages < want);

	return ret;
}

/*
 * Generic message allocator, for incoming messages.
 */
@@ -2146,6 +2143,7 @@ static struct ceph_msg *ceph_alloc_msg(struct ceph_connection *con,
	int type = le16_to_cpu(hdr->type);
	int front_len = le32_to_cpu(hdr->front_len);
	int middle_len = le32_to_cpu(hdr->middle_len);
	int data_len = le32_to_cpu(hdr->data_len);
	struct ceph_msg *msg = NULL;
	int ret;

@@ -2166,6 +2164,7 @@ static struct ceph_msg *ceph_alloc_msg(struct ceph_connection *con,
			return ERR_PTR(-ENOMEM);
		}
	}
	memcpy(&msg->hdr, &con->in_hdr, sizeof(con->in_hdr));

	if (middle_len) {
		ret = ceph_alloc_middle(con, msg);
@@ -2175,6 +2174,18 @@ static struct ceph_msg *ceph_alloc_msg(struct ceph_connection *con,
			return msg;
		}
	}

	if (data_len) {
		ret = ceph_alloc_data_section(con, msg);

		if (ret < 0) {
			*skip = 1;
			ceph_msg_put(msg);
			return NULL;
		}
	}


	return msg;
}