Commit b341a026 authored by David Howells's avatar David Howells Committed by Jakub Kicinski
Browse files

rxrpc: Implement progressive transmission queue struct



We need to scan the buffers in the transmission queue occasionally when
processing ACKs, but the transmission queue is currently a linked list of
transmission buffers which, when we eventually expand the Tx window to 8192
packets will be very slow to walk.

Instead, pull the fields we need to examine a lot (last sent time,
retransmitted flag) into a new struct rxrpc_txqueue and make each one hold
an array of 32 or 64 packets.

The transmission queue is then a list of these structs, each pointing to a
contiguous set of packets.  Scanning is then a lot faster as the flags and
timestamps are concentrated in the CPU dcache.

The transmission timestamps are stored as a number of microseconds from a
base ktime to reduce memory requirements.  This should be fine provided we
manage to transmit an entire buffer within an hour.

This will make implementing RACK-TLP [RFC8985] easier as it will be less
costly to scan the transmission buffers.

Signed-off-by: default avatarDavid Howells <dhowells@redhat.com>
cc: Marc Dionne <marc.dionne@auristor.com>
cc: linux-afs@lists.infradead.org
Link: https://patch.msgid.link/20241204074710.990092-19-dhowells@redhat.com


Signed-off-by: default avatarJakub Kicinski <kuba@kernel.org>
parent 6396b48a
Loading
Loading
Loading
Loading
+83 −15
Original line number Diff line number Diff line
@@ -297,7 +297,6 @@

#define rxrpc_txqueue_traces \
	EM(rxrpc_txqueue_await_reply,		"AWR") \
	EM(rxrpc_txqueue_dequeue,		"DEQ") \
	EM(rxrpc_txqueue_end,			"END") \
	EM(rxrpc_txqueue_queue,			"QUE") \
	EM(rxrpc_txqueue_queue_last,		"QLS") \
@@ -482,6 +481,19 @@
	EM(rxrpc_txbuf_see_send_more,		"SEE SEND+  ")	\
	E_(rxrpc_txbuf_see_unacked,		"SEE UNACKED")

#define rxrpc_tq_traces \
	EM(rxrpc_tq_alloc,			"ALLOC") \
	EM(rxrpc_tq_cleaned,			"CLEAN") \
	EM(rxrpc_tq_decant,			"DCNT ") \
	EM(rxrpc_tq_decant_advance,		"DCNT>") \
	EM(rxrpc_tq_queue,			"QUEUE") \
	EM(rxrpc_tq_queue_dup,			"QUE!!") \
	EM(rxrpc_tq_rotate,			"ROT  ") \
	EM(rxrpc_tq_rotate_and_free,		"ROT-F") \
	EM(rxrpc_tq_rotate_and_keep,		"ROT-K") \
	EM(rxrpc_tq_transmit,			"XMIT ") \
	E_(rxrpc_tq_transmit_advance,		"XMIT>")

#define rxrpc_pmtud_reduce_traces \
	EM(rxrpc_pmtud_reduce_ack,		"Ack  ")	\
	EM(rxrpc_pmtud_reduce_icmp,		"Icmp ")	\
@@ -518,6 +530,7 @@ enum rxrpc_rtt_tx_trace { rxrpc_rtt_tx_traces } __mode(byte);
enum rxrpc_sack_trace		{ rxrpc_sack_traces } __mode(byte);
enum rxrpc_skb_trace		{ rxrpc_skb_traces } __mode(byte);
enum rxrpc_timer_trace		{ rxrpc_timer_traces } __mode(byte);
enum rxrpc_tq_trace		{ rxrpc_tq_traces } __mode(byte);
enum rxrpc_tx_point		{ rxrpc_tx_points } __mode(byte);
enum rxrpc_txbuf_trace		{ rxrpc_txbuf_traces } __mode(byte);
enum rxrpc_txqueue_trace	{ rxrpc_txqueue_traces } __mode(byte);
@@ -554,6 +567,7 @@ rxrpc_rtt_tx_traces;
rxrpc_sack_traces;
rxrpc_skb_traces;
rxrpc_timer_traces;
rxrpc_tq_traces;
rxrpc_tx_points;
rxrpc_txbuf_traces;
rxrpc_txqueue_traces;
@@ -881,7 +895,7 @@ TRACE_EVENT(rxrpc_txqueue,
		    __field(rxrpc_seq_t,		acks_hard_ack)
		    __field(rxrpc_seq_t,		tx_bottom)
		    __field(rxrpc_seq_t,		tx_top)
		    __field(rxrpc_seq_t,		tx_prepared)
		    __field(rxrpc_seq_t,		send_top)
		    __field(int,			tx_winsize)
			     ),

@@ -891,7 +905,7 @@ TRACE_EVENT(rxrpc_txqueue,
		    __entry->acks_hard_ack = call->acks_hard_ack;
		    __entry->tx_bottom = call->tx_bottom;
		    __entry->tx_top = call->tx_top;
		    __entry->tx_prepared = call->tx_prepared;
		    __entry->send_top = call->send_top;
		    __entry->tx_winsize = call->tx_winsize;
			   ),

@@ -902,14 +916,14 @@ TRACE_EVENT(rxrpc_txqueue,
		      __entry->acks_hard_ack,
		      __entry->tx_top - __entry->tx_bottom,
		      __entry->tx_top - __entry->acks_hard_ack,
		      __entry->tx_prepared - __entry->tx_bottom,
		      __entry->send_top - __entry->tx_top,
		      __entry->tx_winsize)
	    );

TRACE_EVENT(rxrpc_transmit,
	    TP_PROTO(struct rxrpc_call *call, int space),
	    TP_PROTO(struct rxrpc_call *call, rxrpc_seq_t send_top, int space),

	    TP_ARGS(call, space),
	    TP_ARGS(call, send_top, space),

	    TP_STRUCT__entry(
		    __field(unsigned int,	call)
@@ -925,12 +939,12 @@ TRACE_EVENT(rxrpc_transmit,

	    TP_fast_assign(
		    __entry->call	= call->debug_id;
		    __entry->seq	= call->tx_bottom;
		    __entry->seq	= call->tx_top + 1;
		    __entry->space	= space;
		    __entry->tx_winsize	= call->tx_winsize;
		    __entry->cong_cwnd	= call->cong_cwnd;
		    __entry->cong_extra	= call->cong_extra;
		    __entry->prepared	= call->tx_prepared - call->tx_bottom;
		    __entry->prepared	= send_top - call->tx_bottom;
		    __entry->in_flight	= call->tx_top - call->acks_hard_ack;
		    __entry->pmtud_jumbo = call->peer->pmtud_jumbo;
			   ),
@@ -947,6 +961,32 @@ TRACE_EVENT(rxrpc_transmit,
		      __entry->pmtud_jumbo)
	    );

TRACE_EVENT(rxrpc_tx_rotate,
	    TP_PROTO(struct rxrpc_call *call, rxrpc_seq_t seq, rxrpc_seq_t to),

	    TP_ARGS(call, seq, to),

	    TP_STRUCT__entry(
		    __field(unsigned int,	call)
		    __field(rxrpc_seq_t,	seq)
		    __field(rxrpc_seq_t,	to)
		    __field(rxrpc_seq_t,	top)
			     ),

	    TP_fast_assign(
		    __entry->call	= call->debug_id;
		    __entry->seq	= seq;
		    __entry->to		= to;
		    __entry->top	= call->tx_top;
			   ),

	    TP_printk("c=%08x q=%08x-%08x-%08x",
		      __entry->call,
		      __entry->seq,
		      __entry->to,
		      __entry->top)
	    );

TRACE_EVENT(rxrpc_rx_data,
	    TP_PROTO(unsigned int call, rxrpc_seq_t seq,
		     rxrpc_serial_t serial, u8 flags),
@@ -1621,10 +1661,11 @@ TRACE_EVENT(rxrpc_drop_ack,
	    );

TRACE_EVENT(rxrpc_retransmit,
	    TP_PROTO(struct rxrpc_call *call, rxrpc_seq_t seq,
		     rxrpc_serial_t serial, ktime_t expiry),
	    TP_PROTO(struct rxrpc_call *call,
		     struct rxrpc_send_data_req *req,
		     struct rxrpc_txbuf *txb, ktime_t expiry),

	    TP_ARGS(call, seq, serial, expiry),
	    TP_ARGS(call, req, txb, expiry),

	    TP_STRUCT__entry(
		    __field(unsigned int,	call)
@@ -1635,8 +1676,8 @@ TRACE_EVENT(rxrpc_retransmit,

	    TP_fast_assign(
		    __entry->call = call->debug_id;
		    __entry->seq = seq;
		    __entry->serial = serial;
		    __entry->seq = req->seq;
		    __entry->serial = txb->serial;
		    __entry->expiry = expiry;
			   ),

@@ -1714,9 +1755,9 @@ TRACE_EVENT(rxrpc_reset_cwnd,
		    __entry->cwnd	= call->cong_cwnd;
		    __entry->extra	= call->cong_extra;
		    __entry->hard_ack	= call->acks_hard_ack;
		    __entry->prepared	= call->tx_prepared - call->tx_bottom;
		    __entry->prepared	= call->send_top - call->tx_bottom;
		    __entry->since_last_tx = ktime_sub(now, call->tx_last_sent);
		    __entry->has_data	= !list_empty(&call->tx_sendmsg);
		    __entry->has_data	= call->tx_bottom != call->tx_top;
			   ),

	    TP_printk("c=%08x q=%08x %s cw=%u+%u pr=%u tm=%llu d=%u",
@@ -2024,6 +2065,33 @@ TRACE_EVENT(rxrpc_txbuf,
		      __entry->ref)
	    );

TRACE_EVENT(rxrpc_tq,
	    TP_PROTO(struct rxrpc_call *call, struct rxrpc_txqueue *tq,
		     rxrpc_seq_t seq, enum rxrpc_tq_trace trace),

	    TP_ARGS(call, tq, seq, trace),

	    TP_STRUCT__entry(
		    __field(unsigned int,		call_debug_id)
		    __field(rxrpc_seq_t,		qbase)
		    __field(rxrpc_seq_t,		seq)
		    __field(enum rxrpc_tq_trace,	trace)
			     ),

	    TP_fast_assign(
		    __entry->call_debug_id = call->debug_id;
		    __entry->qbase = tq ? tq->qbase : call->tx_qbase;
		    __entry->seq = seq;
		    __entry->trace = trace;
			   ),

	    TP_printk("c=%08x bq=%08x q=%08x %s",
		      __entry->call_debug_id,
		      __entry->qbase,
		      __entry->seq,
		      __print_symbolic(__entry->trace, rxrpc_tq_traces))
	    );

TRACE_EVENT(rxrpc_poke_call,
	    TP_PROTO(struct rxrpc_call *call, bool busy,
		     enum rxrpc_call_poke_trace what),
+39 −8
Original line number Diff line number Diff line
@@ -30,6 +30,7 @@ struct rxrpc_crypt {
struct key_preparsed_payload;
struct rxrpc_connection;
struct rxrpc_txbuf;
struct rxrpc_txqueue;

/*
 * Mark applied to socket buffers in skb->mark.  skb->priority is used
@@ -691,13 +692,17 @@ struct rxrpc_call {
	unsigned short		rx_pkt_offset;	/* Current recvmsg packet offset */
	unsigned short		rx_pkt_len;	/* Current recvmsg packet len */

	/* Sendmsg data tracking. */
	rxrpc_seq_t		send_top;	/* Highest Tx slot filled by sendmsg. */
	struct rxrpc_txqueue	*send_queue;	/* Queue that sendmsg is writing into */

	/* Transmitted data tracking. */
	spinlock_t		tx_lock;	/* Transmit queue lock */
	struct list_head	tx_sendmsg;	/* Sendmsg prepared packets */
	struct list_head	tx_buffer;	/* Buffer of transmissible packets */
	struct rxrpc_txqueue	*tx_queue;	/* Start of transmission buffers */
	struct rxrpc_txqueue	*tx_qtail;	/* End of transmission buffers */
	rxrpc_seq_t		tx_qbase;	/* First slot in tx_queue */
	rxrpc_seq_t		tx_bottom;	/* First packet in buffer */
	rxrpc_seq_t		tx_transmitted;	/* Highest packet transmitted */
	rxrpc_seq_t		tx_prepared;	/* Highest Tx slot prepared. */
	rxrpc_seq_t		tx_top;		/* Highest Tx slot allocated. */
	u16			tx_backoff;	/* Delay to insert due to Tx failure (ms) */
	u8			tx_winsize;	/* Maximum size of Tx window */
@@ -815,9 +820,6 @@ struct rxrpc_send_params {
 * Buffer of data to be output as a packet.
 */
struct rxrpc_txbuf {
	struct list_head	call_link;	/* Link in call->tx_sendmsg/tx_buffer */
	struct list_head	tx_link;	/* Link in live Enc queue or Tx queue */
	ktime_t			last_sent;	/* Time at which last transmitted */
	refcount_t		ref;
	rxrpc_seq_t		seq;		/* Sequence number of this packet */
	rxrpc_serial_t		serial;		/* Last serial number transmitted with */
@@ -849,6 +851,36 @@ static inline bool rxrpc_sending_to_client(const struct rxrpc_txbuf *txb)
	return !rxrpc_sending_to_server(txb);
}

/*
 * Transmit queue element, including RACK [RFC8985] per-segment metadata.  The
 * transmission timestamp is in usec from the base.
 */
struct rxrpc_txqueue {
	/* Start with the members we want to prefetch. */
	struct rxrpc_txqueue	*next;
	ktime_t			xmit_ts_base;
	rxrpc_seq_t		qbase;

	/* The arrays we want to pack into as few cache lines as possible. */
	struct {
#define RXRPC_NR_TXQUEUE BITS_PER_LONG
#define RXRPC_TXQ_MASK (RXRPC_NR_TXQUEUE - 1)
		struct rxrpc_txbuf *bufs[RXRPC_NR_TXQUEUE];
		unsigned int	segment_xmit_ts[RXRPC_NR_TXQUEUE];
	} ____cacheline_aligned;
};

/*
 * Data transmission request.
 */
struct rxrpc_send_data_req {
	ktime_t			now;		/* Current time */
	struct rxrpc_txqueue	*tq;		/* Tx queue segment holding first DATA */
	rxrpc_seq_t		seq;		/* Sequence of first data */
	int			n;		/* Number of DATA packets to glue into jumbo */
	bool			did_send;	/* T if did actually send */
};

#include <trace/events/rxrpc.h>

/*
@@ -905,7 +937,6 @@ void rxrpc_propose_ping(struct rxrpc_call *call, u32 serial,
			enum rxrpc_propose_ack_trace why);
void rxrpc_propose_delay_ACK(struct rxrpc_call *, rxrpc_serial_t,
			     enum rxrpc_propose_ack_trace);
void rxrpc_shrink_call_tx_buffer(struct rxrpc_call *);
void rxrpc_resend(struct rxrpc_call *call, struct sk_buff *ack_skb);

bool rxrpc_input_call_event(struct rxrpc_call *call);
@@ -1191,10 +1222,10 @@ void rxrpc_send_ACK(struct rxrpc_call *call, u8 ack_reason,
		    rxrpc_serial_t serial, enum rxrpc_propose_ack_trace why);
void rxrpc_send_probe_for_pmtud(struct rxrpc_call *call);
int rxrpc_send_abort_packet(struct rxrpc_call *);
void rxrpc_send_data_packet(struct rxrpc_call *call, struct rxrpc_send_data_req *req);
void rxrpc_send_conn_abort(struct rxrpc_connection *conn);
void rxrpc_reject_packet(struct rxrpc_local *local, struct sk_buff *skb);
void rxrpc_send_keepalive(struct rxrpc_peer *);
void rxrpc_transmit_data(struct rxrpc_call *call, struct rxrpc_txbuf *txb, int n);

/*
 * peer_event.c
+128 −76
Original line number Diff line number Diff line
@@ -62,57 +62,85 @@ static void rxrpc_congestion_timeout(struct rxrpc_call *call)
	set_bit(RXRPC_CALL_RETRANS_TIMEOUT, &call->flags);
}

/*
 * Retransmit one or more packets.
 */
static void rxrpc_retransmit_data(struct rxrpc_call *call,
				  struct rxrpc_send_data_req *req,
				  ktime_t rto)
{
	struct rxrpc_txqueue *tq = req->tq;
	unsigned int ix = req->seq & RXRPC_TXQ_MASK;
	struct rxrpc_txbuf *txb = tq->bufs[ix];
	ktime_t xmit_ts, resend_at;

	_enter("%x,%x,%x,%x", tq->qbase, req->seq, ix, txb->debug_id);

	xmit_ts = ktime_add_us(tq->xmit_ts_base, tq->segment_xmit_ts[ix]);
	resend_at = ktime_add(xmit_ts, rto);
	trace_rxrpc_retransmit(call, req, txb,
			       ktime_sub(resend_at, req->now));

	txb->flags |= RXRPC_TXBUF_RESENT;
	rxrpc_send_data_packet(call, req);
	rxrpc_inc_stat(call->rxnet, stat_tx_data_retrans);

	req->tq		= NULL;
	req->n		= 0;
	req->did_send	= true;
	req->now	= ktime_get_real();
}

/*
 * Perform retransmission of NAK'd and unack'd packets.
 */
void rxrpc_resend(struct rxrpc_call *call, struct sk_buff *ack_skb)
{
	struct rxrpc_send_data_req req = {
		.now	= ktime_get_real(),
	};
	struct rxrpc_ackpacket *ack = NULL;
	struct rxrpc_skb_priv *sp;
	struct rxrpc_txqueue *tq;
	struct rxrpc_txbuf *txb;
	rxrpc_seq_t transmitted = call->tx_transmitted;
	rxrpc_seq_t transmitted = call->tx_transmitted, seq;
	ktime_t next_resend = KTIME_MAX, rto = ns_to_ktime(call->peer->rto_us * NSEC_PER_USEC);
	ktime_t resend_at = KTIME_MAX, now, delay;
	ktime_t resend_at = KTIME_MAX, delay;
	bool unacked = false, did_send = false;
	unsigned int i;
	unsigned int qix;

	_enter("{%d,%d}", call->acks_hard_ack, call->tx_top);

	now = ktime_get_real();

	if (list_empty(&call->tx_buffer))
	if (call->tx_bottom == call->tx_top)
		goto no_resend;

	trace_rxrpc_resend(call, ack_skb);
	txb = list_first_entry(&call->tx_buffer, struct rxrpc_txbuf, call_link);
	tq = call->tx_queue;
	seq = call->tx_bottom;

	/* Scan the soft ACK table without dropping the lock and resend any
	 * explicitly NAK'd packets.
	 */
	/* Scan the soft ACK table and resend any explicitly NAK'd packets. */
	if (ack_skb) {
		sp = rxrpc_skb(ack_skb);
		ack = (void *)ack_skb->data + sizeof(struct rxrpc_wire_header);

		for (i = 0; i < sp->ack.nr_acks; i++) {
			rxrpc_seq_t seq;
		for (int i = 0; i < sp->ack.nr_acks; i++) {
			rxrpc_seq_t aseq;

			if (ack->acks[i] & 1)
				continue;
			seq = sp->ack.first_ack + i;
			if (after(txb->seq, transmitted))
				break;
			if (after(txb->seq, seq))
				continue; /* A new hard ACK probably came in */
			list_for_each_entry_from(txb, &call->tx_buffer, call_link) {
				if (txb->seq == seq)
					goto found_txb;
			}
			aseq = sp->ack.first_ack + i;
			while (after_eq(aseq, tq->qbase + RXRPC_NR_TXQUEUE))
				tq = tq->next;
			seq = aseq;
			qix = seq - tq->qbase;
			txb = tq->bufs[qix];
			if (after(seq, transmitted))
				goto no_further_resend;

		found_txb:
			resend_at = ktime_add(txb->last_sent, rto);
			resend_at = ktime_add_us(tq->xmit_ts_base, tq->segment_xmit_ts[qix]);
			resend_at = ktime_add(resend_at, rto);
			if (after(txb->serial, call->acks_highest_serial)) {
				if (ktime_after(resend_at, now) &&
				if (ktime_after(resend_at, req.now) &&
				    ktime_before(resend_at, next_resend))
					next_resend = resend_at;
				continue; /* Ack point not yet reached */
@@ -120,17 +148,13 @@ void rxrpc_resend(struct rxrpc_call *call, struct sk_buff *ack_skb)

			rxrpc_see_txbuf(txb, rxrpc_txbuf_see_unacked);

			trace_rxrpc_retransmit(call, txb->seq, txb->serial,
					       ktime_sub(resend_at, now));
			req.tq  = tq;
			req.seq = seq;
			req.n   = 1;
			rxrpc_retransmit_data(call, &req, rto);

			txb->flags |= RXRPC_TXBUF_RESENT;
			rxrpc_transmit_data(call, txb, 1);
			did_send = true;
			now = ktime_get_real();

			if (list_is_last(&txb->call_link, &call->tx_buffer))
			if (after_eq(seq, call->tx_top))
				goto no_further_resend;
			txb = list_next_entry(txb, call_link);
		}
	}

@@ -139,35 +163,43 @@ void rxrpc_resend(struct rxrpc_call *call, struct sk_buff *ack_skb)
	 * ACK'd or NACK'd in due course, so don't worry about it here; here we
	 * need to consider retransmitting anything beyond that point.
	 */
	if (after_eq(call->acks_prev_seq, call->tx_transmitted))
	seq = call->acks_prev_seq;
	if (after_eq(seq, call->tx_transmitted))
		goto no_further_resend;
	seq++;

	list_for_each_entry_from(txb, &call->tx_buffer, call_link) {
		resend_at = ktime_add(txb->last_sent, rto);
	while (after_eq(seq, tq->qbase + RXRPC_NR_TXQUEUE))
		tq = tq->next;

		if (before_eq(txb->seq, call->acks_prev_seq))
	while (before_eq(seq, call->tx_transmitted)) {
		qix = seq - tq->qbase;
		if (qix >= RXRPC_NR_TXQUEUE) {
			tq = tq->next;
			continue;
		if (after(txb->seq, call->tx_transmitted))
			break; /* Not transmitted yet */
		}
		txb = tq->bufs[qix];
		resend_at = ktime_add_us(tq->xmit_ts_base, tq->segment_xmit_ts[qix]);
		resend_at = ktime_add(resend_at, rto);

		if (ack && ack->reason == RXRPC_ACK_PING_RESPONSE &&
		    before(txb->serial, ntohl(ack->serial)))
			goto do_resend; /* Wasn't accounted for by a more recent ping. */

		if (ktime_after(resend_at, now)) {
		if (ktime_after(resend_at, req.now)) {
			if (ktime_before(resend_at, next_resend))
				next_resend = resend_at;
			seq++;
			continue;
		}

	do_resend:
		unacked = true;

		txb->flags |= RXRPC_TXBUF_RESENT;
		rxrpc_transmit_data(call, txb, 1);
		did_send = true;
		rxrpc_inc_stat(call->rxnet, stat_tx_data_retrans);
		now = ktime_get_real();
		req.tq  = tq;
		req.seq = seq;
		req.n   = 1;
		rxrpc_retransmit_data(call, &req, rto);
		seq++;
	}

no_further_resend:
@@ -175,7 +207,8 @@ void rxrpc_resend(struct rxrpc_call *call, struct sk_buff *ack_skb)
	if (resend_at < KTIME_MAX) {
		delay = rxrpc_get_rto_backoff(call->peer, did_send);
		resend_at = ktime_add(resend_at, delay);
		trace_rxrpc_timer_set(call, resend_at - now, rxrpc_timer_trace_resend_reset);
		trace_rxrpc_timer_set(call, resend_at - req.now,
				      rxrpc_timer_trace_resend_reset);
	}
	call->resend_at = resend_at;

@@ -186,11 +219,11 @@ void rxrpc_resend(struct rxrpc_call *call, struct sk_buff *ack_skb)
	 * that an ACK got lost somewhere.  Send a ping to find out instead of
	 * retransmitting data.
	 */
	if (!did_send) {
	if (!req.did_send) {
		ktime_t next_ping = ktime_add_us(call->acks_latest_ts,
						 call->peer->srtt_us >> 3);

		if (ktime_sub(next_ping, now) <= 0)
		if (ktime_sub(next_ping, req.now) <= 0)
			rxrpc_send_ACK(call, RXRPC_ACK_PING, 0,
				       rxrpc_propose_ack_ping_for_0_retrans);
	}
@@ -240,47 +273,68 @@ static unsigned int rxrpc_tx_window_space(struct rxrpc_call *call)
}

/*
 * Decant some if the sendmsg prepared queue into the transmission buffer.
 * Transmit some as-yet untransmitted data.
 */
static void rxrpc_decant_prepared_tx(struct rxrpc_call *call)
static void rxrpc_transmit_fresh_data(struct rxrpc_call *call)
{
	int space = rxrpc_tx_window_space(call);

	if (!test_bit(RXRPC_CALL_EXPOSED, &call->flags)) {
		if (list_empty(&call->tx_sendmsg))
		if (call->send_top == call->tx_top)
			return;
		rxrpc_expose_client_call(call);
	}

	while (space > 0) {
		struct rxrpc_txbuf *head = NULL, *txb;
		int count = 0, limit = min(space, 1);
		struct rxrpc_send_data_req req = {
			.now	= ktime_get_real(),
			.seq	= call->tx_transmitted + 1,
			.n	= 0,
		};
		struct rxrpc_txqueue *tq;
		struct rxrpc_txbuf *txb;
		rxrpc_seq_t send_top, seq;
		int limit = min(space, 1);

		if (list_empty(&call->tx_sendmsg))
		/* Order send_top before the contents of the new txbufs and
		 * txqueue pointers
		 */
		send_top = smp_load_acquire(&call->send_top);
		if (call->tx_top == send_top)
			break;

		trace_rxrpc_transmit(call, space);
		trace_rxrpc_transmit(call, send_top, space);

		tq = call->tx_qtail;
		seq = call->tx_top;
		trace_rxrpc_tq(call, tq, seq, rxrpc_tq_decant);

		spin_lock(&call->tx_lock);
		do {
			txb = list_first_entry(&call->tx_sendmsg,
					       struct rxrpc_txbuf, call_link);
			if (!head)
				head = txb;
			list_move_tail(&txb->call_link, &call->tx_buffer);
			count++;
			int ix;

			seq++;
			ix = seq & RXRPC_TXQ_MASK;
			if (!ix) {
				tq = tq->next;
				trace_rxrpc_tq(call, tq, seq, rxrpc_tq_decant_advance);
			}
			if (!req.tq)
				req.tq = tq;
			txb = tq->bufs[ix];
			req.n++;
			if (!txb->jumboable)
				break;
		} while (count < limit && !list_empty(&call->tx_sendmsg));

		spin_unlock(&call->tx_lock);
		} while (req.n < limit && before(seq, send_top));

		call->tx_top = txb->seq;
		if (txb->flags & RXRPC_LAST_PACKET)
		if (txb->flags & RXRPC_LAST_PACKET) {
			rxrpc_close_tx_phase(call);
			tq = NULL;
		}
		call->tx_qtail = tq;
		call->tx_top = seq;

		space -= count;
		rxrpc_transmit_data(call, head, count);
		space -= req.n;
		rxrpc_send_data_packet(call, &req);
	}
}

@@ -288,7 +342,7 @@ static void rxrpc_transmit_some_data(struct rxrpc_call *call)
{
	switch (__rxrpc_call_state(call)) {
	case RXRPC_CALL_SERVER_ACK_REQUEST:
		if (list_empty(&call->tx_sendmsg))
		if (call->tx_bottom == READ_ONCE(call->send_top))
			return;
		rxrpc_begin_service_reply(call);
		fallthrough;
@@ -297,11 +351,11 @@ static void rxrpc_transmit_some_data(struct rxrpc_call *call)
	case RXRPC_CALL_CLIENT_SEND_REQUEST:
		if (!rxrpc_tx_window_space(call))
			return;
		if (list_empty(&call->tx_sendmsg)) {
		if (call->tx_bottom == READ_ONCE(call->send_top)) {
			rxrpc_inc_stat(call->rxnet, stat_tx_data_underflow);
			return;
		}
		rxrpc_decant_prepared_tx(call);
		rxrpc_transmit_fresh_data(call);
		break;
	default:
		return;
@@ -503,8 +557,6 @@ bool rxrpc_input_call_event(struct rxrpc_call *call)
		    call->peer->pmtud_pending)
			rxrpc_send_probe_for_pmtud(call);
	}
	if (call->acks_hard_ack != call->tx_bottom)
		rxrpc_shrink_call_tx_buffer(call);
	_leave("");
	return true;

+21 −17
Original line number Diff line number Diff line
@@ -146,8 +146,6 @@ struct rxrpc_call *rxrpc_alloc_call(struct rxrpc_sock *rx, gfp_t gfp,
	INIT_LIST_HEAD(&call->recvmsg_link);
	INIT_LIST_HEAD(&call->sock_link);
	INIT_LIST_HEAD(&call->attend_link);
	INIT_LIST_HEAD(&call->tx_sendmsg);
	INIT_LIST_HEAD(&call->tx_buffer);
	skb_queue_head_init(&call->rx_queue);
	skb_queue_head_init(&call->recvmsg_queue);
	skb_queue_head_init(&call->rx_oos_queue);
@@ -532,9 +530,26 @@ void rxrpc_get_call(struct rxrpc_call *call, enum rxrpc_call_trace why)
}

/*
 * Clean up the Rx skb ring.
 * Clean up the transmission buffers.
 */
static void rxrpc_cleanup_ring(struct rxrpc_call *call)
static void rxrpc_cleanup_tx_buffers(struct rxrpc_call *call)
{
	struct rxrpc_txqueue *tq, *next;

	for (tq = call->tx_queue; tq; tq = next) {
		next = tq->next;
		for (int i = 0; i < RXRPC_NR_TXQUEUE; i++)
			if (tq->bufs[i])
				rxrpc_put_txbuf(tq->bufs[i], rxrpc_txbuf_put_cleaned);
		trace_rxrpc_tq(call, tq, 0, rxrpc_tq_cleaned);
		kfree(tq);
	}
}

/*
 * Clean up the receive buffers.
 */
static void rxrpc_cleanup_rx_buffers(struct rxrpc_call *call)
{
	rxrpc_purge_queue(&call->recvmsg_queue);
	rxrpc_purge_queue(&call->rx_queue);
@@ -673,23 +688,12 @@ static void rxrpc_rcu_free_call(struct rcu_head *rcu)
static void rxrpc_destroy_call(struct work_struct *work)
{
	struct rxrpc_call *call = container_of(work, struct rxrpc_call, destroyer);
	struct rxrpc_txbuf *txb;

	del_timer_sync(&call->timer);

	rxrpc_free_skb(call->cong_last_nack, rxrpc_skb_put_last_nack);
	rxrpc_cleanup_ring(call);
	while ((txb = list_first_entry_or_null(&call->tx_sendmsg,
					       struct rxrpc_txbuf, call_link))) {
		list_del(&txb->call_link);
		rxrpc_put_txbuf(txb, rxrpc_txbuf_put_cleaned);
	}
	while ((txb = list_first_entry_or_null(&call->tx_buffer,
					       struct rxrpc_txbuf, call_link))) {
		list_del(&txb->call_link);
		rxrpc_put_txbuf(txb, rxrpc_txbuf_put_cleaned);
	}

	rxrpc_cleanup_tx_buffers(call);
	rxrpc_cleanup_rx_buffers(call);
	rxrpc_put_txbuf(call->tx_pending, rxrpc_txbuf_put_cleaned);
	rxrpc_put_connection(call->conn, rxrpc_conn_put_call);
	rxrpc_deactivate_bundle(call->bundle);
+57 −15
Original line number Diff line number Diff line
@@ -214,24 +214,71 @@ void rxrpc_congestion_degrade(struct rxrpc_call *call)
static bool rxrpc_rotate_tx_window(struct rxrpc_call *call, rxrpc_seq_t to,
				   struct rxrpc_ack_summary *summary)
{
	struct rxrpc_txbuf *txb;
	struct rxrpc_txqueue *tq = call->tx_queue;
	rxrpc_seq_t seq = call->tx_bottom + 1;
	bool rot_last = false;

	list_for_each_entry_rcu(txb, &call->tx_buffer, call_link, false) {
		if (before_eq(txb->seq, call->acks_hard_ack))
			continue;
		if (txb->flags & RXRPC_LAST_PACKET) {
	_enter("%x,%x,%x", call->tx_bottom, call->acks_hard_ack, to);

	trace_rxrpc_tx_rotate(call, seq, to);
	trace_rxrpc_tq(call, tq, seq, rxrpc_tq_rotate);

	/* We may have a left over fully-consumed buffer at the front that we
	 * couldn't drop before (rotate_and_keep below).
	 */
	if (seq == call->tx_qbase + RXRPC_NR_TXQUEUE) {
		call->tx_qbase += RXRPC_NR_TXQUEUE;
		call->tx_queue = tq->next;
		trace_rxrpc_tq(call, tq, seq, rxrpc_tq_rotate_and_free);
		kfree(tq);
		tq = call->tx_queue;
	}

	do {
		unsigned int ix = seq - call->tx_qbase;

		_debug("tq=%x seq=%x i=%d f=%x", tq->qbase, seq, ix, tq->bufs[ix]->flags);
		if (tq->bufs[ix]->flags & RXRPC_LAST_PACKET) {
			set_bit(RXRPC_CALL_TX_LAST, &call->flags);
			rot_last = true;
		}
		if (txb->seq == to)
		rxrpc_put_txbuf(tq->bufs[ix], rxrpc_txbuf_put_rotated);
		tq->bufs[ix] = NULL;

		WRITE_ONCE(call->tx_bottom, seq);
		WRITE_ONCE(call->acks_hard_ack, seq);
		trace_rxrpc_txqueue(call, (rot_last ?
					   rxrpc_txqueue_rotate_last :
					   rxrpc_txqueue_rotate));

		seq++;
		if (!(seq & RXRPC_TXQ_MASK)) {
			prefetch(tq->next);
			if (tq != call->tx_qtail) {
				call->tx_qbase += RXRPC_NR_TXQUEUE;
				call->tx_queue = tq->next;
				trace_rxrpc_tq(call, tq, seq, rxrpc_tq_rotate_and_free);
				kfree(tq);
				tq = call->tx_queue;
			} else {
				trace_rxrpc_tq(call, tq, seq, rxrpc_tq_rotate_and_keep);
				tq = NULL;
				break;
			}
		}

	} while (before_eq(seq, to));

	if (rot_last)
	if (rot_last) {
		set_bit(RXRPC_CALL_TX_ALL_ACKED, &call->flags);
		if (tq) {
			trace_rxrpc_tq(call, tq, seq, rxrpc_tq_rotate_and_free);
			kfree(tq);
			call->tx_queue = NULL;
		}
	}

	_enter("%x,%x,%x,%d", to, call->acks_hard_ack, call->tx_top, rot_last);
	_debug("%x,%x,%x,%d", to, call->acks_hard_ack, call->tx_top, rot_last);

	if (call->acks_lowest_nak == call->acks_hard_ack) {
		call->acks_lowest_nak = to;
@@ -240,11 +287,6 @@ static bool rxrpc_rotate_tx_window(struct rxrpc_call *call, rxrpc_seq_t to,
		call->acks_lowest_nak = to;
	}

	smp_store_release(&call->acks_hard_ack, to);

	trace_rxrpc_txqueue(call, (rot_last ?
				   rxrpc_txqueue_rotate_last :
				   rxrpc_txqueue_rotate));
	wake_up(&call->waitq);
	return rot_last;
}
Loading