Commit 49489bb0 authored by David Howells's avatar David Howells
Browse files

rxrpc: Do zerocopy using MSG_SPLICE_PAGES and page frags



Switch from keeping the transmission buffers in the rxrpc_txbuf struct and
allocated from the slab, to allocating them using page fragment allocators
(which uses raw pages), thereby allowing them to be passed to
MSG_SPLICE_PAGES and avoid copying into the UDP buffers.

Signed-off-by: default avatarDavid Howells <dhowells@redhat.com>
cc: Marc Dionne <marc.dionne@auristor.com>
cc: "David S. Miller" <davem@davemloft.net>
cc: Eric Dumazet <edumazet@google.com>
cc: Jakub Kicinski <kuba@kernel.org>
cc: Paolo Abeni <pabeni@redhat.com>
cc: linux-afs@lists.infradead.org
cc: netdev@vger.kernel.org
parent 8985f2b0
Loading
Loading
Loading
Loading
+10 −22
Original line number Diff line number Diff line
@@ -248,10 +248,9 @@ struct rxrpc_security {
					struct rxrpc_key_token *);

	/* Work out how much data we can store in a packet, given an estimate
	 * of the amount of data remaining.
	 * of the amount of data remaining and allocate a data buffer.
	 */
	int (*how_much_data)(struct rxrpc_call *, size_t,
			     size_t *, size_t *, size_t *);
	struct rxrpc_txbuf *(*alloc_txbuf)(struct rxrpc_call *call, size_t remaining, gfp_t gfp);

	/* impose security on a packet */
	int (*secure_packet)(struct rxrpc_call *, struct rxrpc_txbuf *);
@@ -292,6 +291,7 @@ struct rxrpc_local {
	struct socket		*socket;	/* my UDP socket */
	struct task_struct	*io_thread;
	struct completion	io_thread_ready; /* Indication that the I/O thread started */
	struct page_frag_cache	tx_alloc;	/* Tx control packet allocation (I/O thread only) */
	struct rxrpc_sock	*service;	/* Service(s) listening on this endpoint */
#ifdef CONFIG_AF_RXRPC_INJECT_RX_DELAY
	struct sk_buff_head	rx_delay_queue;	/* Delay injection queue */
@@ -500,6 +500,8 @@ struct rxrpc_connection {
	struct list_head	proc_link;	/* link in procfs list */
	struct list_head	link;		/* link in master connection list */
	struct sk_buff_head	rx_queue;	/* received conn-level packets */
	struct page_frag_cache	tx_data_alloc;	/* Tx DATA packet allocation */
	struct mutex		tx_data_alloc_lock;

	struct mutex		security_lock;	/* Lock for security management */
	const struct rxrpc_security *security;	/* applied security module */
@@ -788,7 +790,6 @@ struct rxrpc_send_params {
 * Buffer of data to be output as a packet.
 */
struct rxrpc_txbuf {
	struct rcu_head		rcu;
	struct list_head	call_link;	/* Link in call->tx_sendmsg/tx_buffer */
	struct list_head	tx_link;	/* Link in live Enc queue or Tx queue */
	ktime_t			last_sent;	/* Time at which last transmitted */
@@ -806,22 +807,8 @@ struct rxrpc_txbuf {
	__be16			cksum;		/* Checksum to go in header */
	unsigned short		ack_rwind;	/* ACK receive window */
	u8 /*enum rxrpc_propose_ack_trace*/ ack_why;	/* If ack, why */
	u8			nr_kvec;
	struct kvec		kvec[1];
	struct {
		/* The packet for encrypting and DMA'ing.  We align it such
		 * that data[] aligns correctly for any crypto blocksize.
		 */
		u8		pad[64 - sizeof(struct rxrpc_wire_header)];
		struct rxrpc_wire_header _wire;	/* Network-ready header */
		union {
			u8	data[RXRPC_JUMBO_DATALEN]; /* Data packet */
			struct {
				struct rxrpc_ackpacket _ack;
				DECLARE_FLEX_ARRAY(u8, acks);
			};
		};
	} __aligned(64);
	u8			nr_kvec;	/* Amount of kvec[] used */
	struct kvec		kvec[3];
};

static inline bool rxrpc_sending_to_server(const struct rxrpc_txbuf *txb)
@@ -1299,8 +1286,9 @@ static inline void rxrpc_sysctl_exit(void) {}
 * txbuf.c
 */
extern atomic_t rxrpc_nr_txbuf;
struct rxrpc_txbuf *rxrpc_alloc_txbuf(struct rxrpc_call *call, u8 packet_type,
				      gfp_t gfp);
struct rxrpc_txbuf *rxrpc_alloc_data_txbuf(struct rxrpc_call *call, size_t data_size,
					   size_t data_align, gfp_t gfp);
struct rxrpc_txbuf *rxrpc_alloc_ack_txbuf(struct rxrpc_call *call, size_t sack_size);
void rxrpc_get_txbuf(struct rxrpc_txbuf *txb, enum rxrpc_txbuf_trace what);
void rxrpc_see_txbuf(struct rxrpc_txbuf *txb, enum rxrpc_txbuf_trace what);
void rxrpc_put_txbuf(struct rxrpc_txbuf *txb, enum rxrpc_txbuf_trace what);
+4 −0
Original line number Diff line number Diff line
@@ -68,6 +68,7 @@ struct rxrpc_connection *rxrpc_alloc_connection(struct rxrpc_net *rxnet,
		INIT_LIST_HEAD(&conn->proc_link);
		INIT_LIST_HEAD(&conn->link);
		mutex_init(&conn->security_lock);
		mutex_init(&conn->tx_data_alloc_lock);
		skb_queue_head_init(&conn->rx_queue);
		conn->rxnet = rxnet;
		conn->security = &rxrpc_no_security;
@@ -341,6 +342,9 @@ static void rxrpc_clean_up_connection(struct work_struct *work)
	 */
	rxrpc_purge_queue(&conn->rx_queue);

	if (conn->tx_data_alloc.va)
		__page_frag_cache_drain(virt_to_page(conn->tx_data_alloc.va),
					conn->tx_data_alloc.pagecnt_bias);
	call_rcu(&conn->rcu, rxrpc_rcu_free_connection);
}

+4 −7
Original line number Diff line number Diff line
@@ -15,14 +15,11 @@ static int none_init_connection_security(struct rxrpc_connection *conn,
}

/*
 * Work out how much data we can put in an unsecured packet.
 * Allocate an appropriately sized buffer for the amount of data remaining.
 */
static int none_how_much_data(struct rxrpc_call *call, size_t remain,
			       size_t *_buf_size, size_t *_data_size, size_t *_offset)
static struct rxrpc_txbuf *none_alloc_txbuf(struct rxrpc_call *call, size_t remain, gfp_t gfp)
{
	*_buf_size = *_data_size = min_t(size_t, remain, RXRPC_JUMBO_DATALEN);
	*_offset = 0;
	return 0;
	return rxrpc_alloc_data_txbuf(call, min_t(size_t, remain, RXRPC_JUMBO_DATALEN), 0, gfp);
}

static int none_secure_packet(struct rxrpc_call *call, struct rxrpc_txbuf *txb)
@@ -79,7 +76,7 @@ const struct rxrpc_security rxrpc_no_security = {
	.exit				= none_exit,
	.init_connection_security	= none_init_connection_security,
	.free_call_crypto		= none_free_call_crypto,
	.how_much_data			= none_how_much_data,
	.alloc_txbuf			= none_alloc_txbuf,
	.secure_packet			= none_secure_packet,
	.verify_packet			= none_verify_packet,
	.respond_to_challenge		= none_respond_to_challenge,
+3 −0
Original line number Diff line number Diff line
@@ -452,6 +452,9 @@ void rxrpc_destroy_local(struct rxrpc_local *local)
#endif
	rxrpc_purge_queue(&local->rx_queue);
	rxrpc_purge_client_connections(local);
	if (local->tx_alloc.va)
		__page_frag_cache_drain(virt_to_page(local->tx_alloc.va),
					local->tx_alloc.pagecnt_bias);
}

/*
+31 −34
Original line number Diff line number Diff line
@@ -83,18 +83,16 @@ static void rxrpc_fill_out_ack(struct rxrpc_call *call,
			       rxrpc_serial_t serial)
{
	struct rxrpc_wire_header *whdr = txb->kvec[0].iov_base;
	struct rxrpc_acktrailer *trailer = txb->kvec[2].iov_base + 3;
	struct rxrpc_ackpacket *ack = (struct rxrpc_ackpacket *)(whdr + 1);
	struct rxrpc_acktrailer trailer;
	unsigned int qsize, sack, wrap, to;
	rxrpc_seq_t window, wtop;
	int rsize;
	u32 mtu, jmax;
	u8 *ackp = txb->acks;
	u8 *filler = txb->kvec[2].iov_base;
	u8 *sackp = txb->kvec[1].iov_base;

	call->ackr_nr_unacked = 0;
	atomic_set(&call->ackr_nr_consumed, 0);
	rxrpc_inc_stat(call->rxnet, stat_tx_ack_fill);
	clear_bit(RXRPC_CALL_RX_IS_IDLE, &call->flags);

	window = call->ackr_window;
	wtop   = call->ackr_wtop;
@@ -110,20 +108,27 @@ static void rxrpc_fill_out_ack(struct rxrpc_call *call,
	ack->serial		= htonl(serial);
	ack->reason		= ack_reason;
	ack->nAcks		= wtop - window;
	filler[0]		= 0;
	filler[1]		= 0;
	filler[2]		= 0;

	if (ack_reason == RXRPC_ACK_PING)
		txb->flags |= RXRPC_REQUEST_ACK;

	if (after(wtop, window)) {
		txb->len += ack->nAcks;
		txb->kvec[1].iov_base = sackp;
		txb->kvec[1].iov_len = ack->nAcks;

		wrap = RXRPC_SACK_SIZE - sack;
		to = min_t(unsigned int, ack->nAcks, RXRPC_SACK_SIZE);

		if (sack + ack->nAcks <= RXRPC_SACK_SIZE) {
			memcpy(txb->acks, call->ackr_sack_table + sack, ack->nAcks);
			memcpy(sackp, call->ackr_sack_table + sack, ack->nAcks);
		} else {
			memcpy(txb->acks, call->ackr_sack_table + sack, wrap);
			memcpy(txb->acks + wrap, call->ackr_sack_table,
			       to - wrap);
			memcpy(sackp, call->ackr_sack_table + sack, wrap);
			memcpy(sackp + wrap, call->ackr_sack_table, to - wrap);
		}

		ackp += to;
	} else if (before(wtop, window)) {
		pr_warn("ack window backward %x %x", window, wtop);
	} else if (ack->reason == RXRPC_ACK_DELAY) {
@@ -136,17 +141,10 @@ static void rxrpc_fill_out_ack(struct rxrpc_call *call,
	qsize = (window - 1) - call->rx_consumed;
	rsize = max_t(int, call->rx_winsize - qsize, 0);
	txb->ack_rwind = rsize;
	trailer.maxMTU		= htonl(rxrpc_rx_mtu);
	trailer.ifMTU		= htonl(mtu);
	trailer.rwind		= htonl(rsize);
	trailer.jumbo_max	= htonl(jmax);

	*ackp++ = 0;
	*ackp++ = 0;
	*ackp++ = 0;
	memcpy(ackp, &trailer, sizeof(trailer));
	txb->kvec[0].iov_len += sizeof(*ack) + ack->nAcks + 3 + sizeof(trailer);
	txb->len = txb->kvec[0].iov_len;
	trailer->maxMTU		= htonl(rxrpc_rx_mtu);
	trailer->ifMTU		= htonl(mtu);
	trailer->rwind		= htonl(rsize);
	trailer->jumbo_max	= htonl(jmax);
}

/*
@@ -195,7 +193,7 @@ static void rxrpc_cancel_rtt_probe(struct rxrpc_call *call,
/*
 * Transmit an ACK packet.
 */
static int rxrpc_send_ack_packet(struct rxrpc_call *call, struct rxrpc_txbuf *txb)
static void rxrpc_send_ack_packet(struct rxrpc_call *call, struct rxrpc_txbuf *txb)
{
	struct rxrpc_wire_header *whdr = txb->kvec[0].iov_base;
	struct rxrpc_connection *conn;
@@ -204,7 +202,7 @@ static int rxrpc_send_ack_packet(struct rxrpc_call *call, struct rxrpc_txbuf *tx
	int ret, rtt_slot = -1;

	if (test_bit(RXRPC_CALL_DISCONNECTED, &call->flags))
		return -ECONNRESET;
		return;

	conn = call->conn;

@@ -212,10 +210,8 @@ static int rxrpc_send_ack_packet(struct rxrpc_call *call, struct rxrpc_txbuf *tx
	msg.msg_namelen	= call->peer->srx.transport_len;
	msg.msg_control	= NULL;
	msg.msg_controllen = 0;
	msg.msg_flags	= 0;
	msg.msg_flags	= MSG_SPLICE_PAGES;

	if (ack->reason == RXRPC_ACK_PING)
		txb->flags |= RXRPC_REQUEST_ACK;
	whdr->flags = txb->flags & RXRPC_TXBUF_WIRE_FLAGS;

	txb->serial = rxrpc_get_next_serial(conn);
@@ -250,8 +246,6 @@ static int rxrpc_send_ack_packet(struct rxrpc_call *call, struct rxrpc_txbuf *tx
			rxrpc_cancel_rtt_probe(call, txb->serial, rtt_slot);
		rxrpc_set_keepalive(call);
	}

	return ret;
}

/*
@@ -267,16 +261,19 @@ void rxrpc_send_ACK(struct rxrpc_call *call, u8 ack_reason,

	rxrpc_inc_stat(call->rxnet, stat_tx_acks[ack_reason]);

	txb = rxrpc_alloc_txbuf(call, RXRPC_PACKET_TYPE_ACK,
				rcu_read_lock_held() ? GFP_ATOMIC | __GFP_NOWARN : GFP_NOFS);
	txb = rxrpc_alloc_ack_txbuf(call, call->ackr_wtop - call->ackr_window);
	if (!txb) {
		kleave(" = -ENOMEM");
		return;
	}

	txb->ack_why = why;

	rxrpc_fill_out_ack(call, txb, ack_reason, serial);
	call->ackr_nr_unacked = 0;
	atomic_set(&call->ackr_nr_consumed, 0);
	clear_bit(RXRPC_CALL_RX_IS_IDLE, &call->flags);

	txb->ack_why = why;
	trace_rxrpc_send_ack(call, why, ack_reason, serial);
	rxrpc_send_ack_packet(call, txb);
	rxrpc_put_txbuf(txb, rxrpc_txbuf_put_ack_tx);
@@ -465,7 +462,7 @@ static int rxrpc_send_data_packet(struct rxrpc_call *call, struct rxrpc_txbuf *t
	msg.msg_namelen	= call->peer->srx.transport_len;
	msg.msg_control	= NULL;
	msg.msg_controllen = 0;
	msg.msg_flags	= 0;
	msg.msg_flags	= MSG_SPLICE_PAGES;

	/* Track what we've attempted to transmit at least once so that the
	 * retransmission algorithm doesn't try to resend what we haven't sent
Loading