Commit 9b052c6b authored by David Howells's avatar David Howells Committed by Jakub Kicinski
Browse files

rxrpc: Use the new rxrpc_tx_queue struct to more efficiently process ACKs



With the change in the structure of the transmission buffer to store
buffers in bunches of 32 or 64 (BITS_PER_LONG) we can place sets of
per-buffer flags into the rxrpc_tx_queue struct rather than storing them in
rxrpc_tx_buf, thereby vastly increasing efficiency when assessing the SACK
table in an ACK packet.

Signed-off-by: default avatarDavid Howells <dhowells@redhat.com>
cc: Marc Dionne <marc.dionne@auristor.com>
cc: linux-afs@lists.infradead.org
Link: https://patch.msgid.link/20241204074710.990092-24-dhowells@redhat.com


Signed-off-by: default avatarJakub Kicinski <kuba@kernel.org>
parent f7dd0dc9
Loading
Loading
Loading
Loading
+75 −11
Original line number Diff line number Diff line
@@ -132,7 +132,6 @@
	EM(rxrpc_skb_get_call_rx,		"GET call-rx  ") \
	EM(rxrpc_skb_get_conn_secured,		"GET conn-secd") \
	EM(rxrpc_skb_get_conn_work,		"GET conn-work") \
	EM(rxrpc_skb_get_last_nack,		"GET last-nack") \
	EM(rxrpc_skb_get_local_work,		"GET locl-work") \
	EM(rxrpc_skb_get_reject_work,		"GET rej-work ") \
	EM(rxrpc_skb_get_to_recvmsg,		"GET to-recv  ") \
@@ -147,7 +146,6 @@
	EM(rxrpc_skb_put_error_report,		"PUT error-rep") \
	EM(rxrpc_skb_put_input,			"PUT input    ") \
	EM(rxrpc_skb_put_jumbo_subpacket,	"PUT jumbo-sub") \
	EM(rxrpc_skb_put_last_nack,		"PUT last-nack") \
	EM(rxrpc_skb_put_purge,			"PUT purge    ") \
	EM(rxrpc_skb_put_rotate,		"PUT rotate   ") \
	EM(rxrpc_skb_put_unknown,		"PUT unknown  ") \
@@ -499,6 +497,11 @@
	EM(rxrpc_pmtud_reduce_icmp,		"Icmp ")	\
	E_(rxrpc_pmtud_reduce_route,		"Route")

#define rxrpc_rotate_traces \
	EM(rxrpc_rotate_trace_hack,		"hard-ack")	\
	EM(rxrpc_rotate_trace_sack,		"soft-ack")	\
	E_(rxrpc_rotate_trace_snak,		"soft-nack")

/*
 * Generate enums for tracing information.
 */
@@ -525,6 +528,7 @@ enum rxrpc_propose_ack_trace { rxrpc_propose_ack_traces } __mode(byte);
enum rxrpc_receive_trace	{ rxrpc_receive_traces } __mode(byte);
enum rxrpc_recvmsg_trace	{ rxrpc_recvmsg_traces } __mode(byte);
enum rxrpc_req_ack_trace	{ rxrpc_req_ack_traces } __mode(byte);
enum rxrpc_rotate_trace		{ rxrpc_rotate_traces } __mode(byte);
enum rxrpc_rtt_rx_trace		{ rxrpc_rtt_rx_traces } __mode(byte);
enum rxrpc_rtt_tx_trace		{ rxrpc_rtt_tx_traces } __mode(byte);
enum rxrpc_sack_trace		{ rxrpc_sack_traces } __mode(byte);
@@ -562,6 +566,7 @@ rxrpc_propose_ack_traces;
rxrpc_receive_traces;
rxrpc_recvmsg_traces;
rxrpc_req_ack_traces;
rxrpc_rotate_traces;
rxrpc_rtt_rx_traces;
rxrpc_rtt_tx_traces;
rxrpc_sack_traces;
@@ -1667,6 +1672,7 @@ TRACE_EVENT(rxrpc_retransmit,

	    TP_STRUCT__entry(
		    __field(unsigned int,	call)
		    __field(unsigned int,	qbase)
		    __field(rxrpc_seq_t,	seq)
		    __field(rxrpc_serial_t,	serial)
		    __field(ktime_t,		expiry)
@@ -1674,13 +1680,15 @@ TRACE_EVENT(rxrpc_retransmit,

	    TP_fast_assign(
		    __entry->call = call->debug_id;
		    __entry->qbase = req->tq->qbase;
		    __entry->seq = req->seq;
		    __entry->serial = txb->serial;
		    __entry->expiry = expiry;
			   ),

	    TP_printk("c=%08x q=%x r=%x xp=%lld",
	    TP_printk("c=%08x tq=%x q=%x r=%x xp=%lld",
		      __entry->call,
		      __entry->qbase,
		      __entry->seq,
		      __entry->serial,
		      ktime_to_us(__entry->expiry))
@@ -1724,7 +1732,7 @@ TRACE_EVENT(rxrpc_congest,
		    memcpy(&__entry->sum, summary, sizeof(__entry->sum));
			   ),

	    TP_printk("c=%08x r=%08x %s q=%08x %s cw=%u ss=%u nA=%u,%u+%u,%u b=%u u=%u d=%u l=%x%s%s%s",
	    TP_printk("c=%08x r=%08x %s q=%08x %s cw=%u ss=%u A=%u+%u/%u+%u r=%u b=%u u=%u d=%u l=%x%s%s%s",
		      __entry->call,
		      __entry->ack_serial,
		      __print_symbolic(__entry->sum.ack_reason, rxrpc_ack_names),
@@ -1732,9 +1740,9 @@ TRACE_EVENT(rxrpc_congest,
		      __print_symbolic(__entry->ca_state, rxrpc_ca_states),
		      __entry->cwnd,
		      __entry->ssthresh,
		      __entry->nr_sacks, __entry->sum.nr_retained_snacks,
		      __entry->sum.nr_new_sacks,
		      __entry->sum.nr_new_snacks,
		      __entry->nr_sacks, __entry->sum.nr_new_sacks,
		      __entry->nr_snacks, __entry->sum.nr_new_snacks,
		      __entry->sum.nr_new_hacks,
		      __entry->top - __entry->hard_ack,
		      __entry->cumul_acks,
		      __entry->dup_acks,
@@ -1850,10 +1858,36 @@ TRACE_EVENT(rxrpc_connect_call,
		      &__entry->srx.transport)
	    );

TRACE_EVENT(rxrpc_apply_acks,
	    TP_PROTO(struct rxrpc_call *call, struct rxrpc_txqueue *tq),

	    TP_ARGS(call, tq),

	    TP_STRUCT__entry(
		    __field(unsigned int,	call)
		    __field(unsigned int,	nr_rep)
		    __field(rxrpc_seq_t,	qbase)
		    __field(unsigned long,	acks)
			     ),

	    TP_fast_assign(
		    __entry->call = call->debug_id;
		    __entry->qbase = tq->qbase;
		    __entry->acks = tq->segment_acked;
		    __entry->nr_rep = tq->nr_reported_acks;
			   ),

	    TP_printk("c=%08x tq=%x acks=%016lx rep=%u",
		      __entry->call,
		      __entry->qbase,
		      __entry->acks,
		      __entry->nr_rep)
	    );

TRACE_EVENT(rxrpc_resend,
	    TP_PROTO(struct rxrpc_call *call, struct sk_buff *ack),
	    TP_PROTO(struct rxrpc_call *call, rxrpc_serial_t ack_serial),

	    TP_ARGS(call, ack),
	    TP_ARGS(call, ack_serial),

	    TP_STRUCT__entry(
		    __field(unsigned int,	call)
@@ -1863,11 +1897,10 @@ TRACE_EVENT(rxrpc_resend,
			     ),

	    TP_fast_assign(
		    struct rxrpc_skb_priv *sp = ack ? rxrpc_skb(ack) : NULL;
		    __entry->call = call->debug_id;
		    __entry->seq = call->acks_hard_ack;
		    __entry->transmitted = call->tx_transmitted;
		    __entry->ack_serial = sp ? sp->hdr.serial : 0;
		    __entry->ack_serial = ack_serial;
			   ),

	    TP_printk("c=%08x r=%x q=%x tq=%x",
@@ -1877,6 +1910,37 @@ TRACE_EVENT(rxrpc_resend,
		      __entry->transmitted)
	    );

TRACE_EVENT(rxrpc_rotate,
	    TP_PROTO(struct rxrpc_call *call, struct rxrpc_txqueue *tq,
		     struct rxrpc_ack_summary *summary, rxrpc_seq_t seq,
		     enum rxrpc_rotate_trace trace),

	    TP_ARGS(call, tq, summary, seq, trace),

	    TP_STRUCT__entry(
		    __field(unsigned int,	call)
		    __field(rxrpc_seq_t,	qbase)
		    __field(rxrpc_seq_t,	seq)
		    __field(unsigned int,	nr_rep)
		    __field(enum rxrpc_rotate_trace, trace)
			     ),

	    TP_fast_assign(
		    __entry->call = call->debug_id;
		    __entry->qbase = tq->qbase;
		    __entry->seq = seq;
		    __entry->nr_rep = tq->nr_reported_acks;
		    __entry->trace = trace;
			   ),

	    TP_printk("c=%08x tq=%x q=%x nr=%x %s",
		      __entry->call,
		      __entry->qbase,
		      __entry->seq,
		      __entry->nr_rep,
		      __print_symbolic(__entry->trace, rxrpc_rotate_traces))
	    );

TRACE_EVENT(rxrpc_rx_icmp,
	    TP_PROTO(struct rxrpc_peer *peer, struct sock_extended_err *ee,
		     struct sockaddr_rxrpc *srx),
+17 −6
Original line number Diff line number Diff line
@@ -214,9 +214,8 @@ struct rxrpc_skb_priv {
			rxrpc_seq_t	first_ack;	/* First packet in acks table */
			rxrpc_seq_t	prev_ack;	/* Highest seq seen */
			rxrpc_serial_t	acked_serial;	/* Packet in response to (or 0) */
			u16		nr_acks;	/* Number of acks+nacks */
			u8		reason;		/* Reason for ack */
			u8		nr_acks;	/* Number of acks+nacks */
			u8		nr_nacks;	/* Number of nacks */
		} ack;
	};
	struct rxrpc_host_header hdr;	/* RxRPC packet header from this packet */
@@ -734,7 +733,6 @@ struct rxrpc_call {
	u16			cong_dup_acks;	/* Count of ACKs showing missing packets */
	u16			cong_cumul_acks; /* Cumulative ACK count */
	ktime_t			cong_tstamp;	/* Last time cwnd was changed */
	struct sk_buff		*cong_last_nack; /* Last ACK with nacks received */

	/* Receive-phase ACK management (ACKs we send). */
	u8			ackr_reason;	/* reason to ACK */
@@ -775,11 +773,10 @@ struct rxrpc_ack_summary {
	u16		nr_new_hacks;		/* Number of rotated new ACKs */
	u16		nr_new_sacks;		/* Number of new soft ACKs in packet */
	u16		nr_new_snacks;		/* Number of new soft nacks in packet */
	u16		nr_retained_snacks;	/* Number of nacks retained between ACKs */
	u8		ack_reason;
	bool		saw_snacks:1;		/* T if we saw a soft NACK */
	bool		new_low_snack:1;	/* T if new low soft NACK found */
	bool		retrans_timeo:1;	/* T if reTx due to timeout happened */
	bool		need_retransmit:1;	/* T if we need transmission */
	u8 /*enum rxrpc_congest_change*/ change;
};

@@ -858,6 +855,10 @@ struct rxrpc_txqueue {
	struct rxrpc_txqueue	*next;
	ktime_t			xmit_ts_base;
	rxrpc_seq_t		qbase;
	u8			nr_reported_acks; /* Number of segments explicitly acked/nacked */
	unsigned long		segment_acked;	/* Bit-per-buf: Set if ACK'd */
	unsigned long		segment_lost;	/* Bit-per-buf: Set if declared lost */
	unsigned long		segment_retransmitted; /* Bit-per-buf: Set if retransmitted */

	/* The arrays we want to pack into as few cache lines as possible. */
	struct {
@@ -935,7 +936,7 @@ void rxrpc_propose_ping(struct rxrpc_call *call, u32 serial,
			enum rxrpc_propose_ack_trace why);
void rxrpc_propose_delay_ACK(struct rxrpc_call *, rxrpc_serial_t,
			     enum rxrpc_propose_ack_trace);
void rxrpc_resend(struct rxrpc_call *call, struct sk_buff *ack_skb);
void rxrpc_resend(struct rxrpc_call *call, rxrpc_serial_t ack_serial, bool ping_response);

bool rxrpc_input_call_event(struct rxrpc_call *call);

@@ -1383,6 +1384,16 @@ static inline bool after_eq(u32 seq1, u32 seq2)
        return (s32)(seq1 - seq2) >= 0;
}

static inline u32 earliest(u32 seq1, u32 seq2)
{
	return before(seq1, seq2) ? seq1 : seq2;
}

static inline u32 latest(u32 seq1, u32 seq2)
{
	return after(seq1, seq2) ? seq1 : seq2;
}

static inline void rxrpc_queue_rx_call_packet(struct rxrpc_call *call, struct sk_buff *skb)
{
	rxrpc_get_skb(skb, rxrpc_skb_get_call_rx);
+87 −94
Original line number Diff line number Diff line
@@ -65,9 +65,9 @@ static void rxrpc_congestion_timeout(struct rxrpc_call *call)
/*
 * Retransmit one or more packets.
 */
static void rxrpc_retransmit_data(struct rxrpc_call *call,
static bool rxrpc_retransmit_data(struct rxrpc_call *call,
				  struct rxrpc_send_data_req *req,
				  ktime_t rto)
				  ktime_t rto, bool skip_too_young)
{
	struct rxrpc_txqueue *tq = req->tq;
	unsigned int ix = req->seq & RXRPC_TXQ_MASK;
@@ -78,9 +78,11 @@ static void rxrpc_retransmit_data(struct rxrpc_call *call,

	xmit_ts = ktime_add_us(tq->xmit_ts_base, tq->segment_xmit_ts[ix]);
	resend_at = ktime_add(xmit_ts, rto);
	trace_rxrpc_retransmit(call, req, txb,
			       ktime_sub(resend_at, req->now));
	trace_rxrpc_retransmit(call, req, txb, ktime_sub(resend_at, req->now));
	if (skip_too_young && ktime_after(resend_at, req->now))
		return false;

	__set_bit(ix, &tq->segment_retransmitted);
	txb->flags |= RXRPC_TXBUF_RESENT;
	rxrpc_send_data_packet(call, req);
	rxrpc_inc_stat(call->rxnet, stat_tx_data_retrans);
@@ -89,128 +91,119 @@ static void rxrpc_retransmit_data(struct rxrpc_call *call,
	req->n		= 0;
	req->did_send	= true;
	req->now	= ktime_get_real();
	return true;
}

/*
 * Perform retransmission of NAK'd and unack'd packets.
 */
void rxrpc_resend(struct rxrpc_call *call, struct sk_buff *ack_skb)
void rxrpc_resend(struct rxrpc_call *call, rxrpc_serial_t ack_serial, bool ping_response)
{
	struct rxrpc_send_data_req req = {
		.now	= ktime_get_real(),
	};
	struct rxrpc_ackpacket *ack = NULL;
	struct rxrpc_skb_priv *sp;
	struct rxrpc_txqueue *tq;
	struct rxrpc_txbuf *txb;
	rxrpc_seq_t transmitted = call->tx_transmitted, seq;
	ktime_t next_resend = KTIME_MAX, rto = ns_to_ktime(call->peer->rto_us * NSEC_PER_USEC);
	ktime_t resend_at = KTIME_MAX, delay;
	bool unacked = false, did_send = false;
	unsigned int qix;
	struct rxrpc_txqueue *tq = call->tx_queue;
	ktime_t lowest_xmit_ts = KTIME_MAX, rto	= ns_to_ktime(call->peer->rto_us * NSEC_PER_USEC);
	bool unacked = false;

	_enter("{%d,%d}", call->tx_bottom, call->tx_top);

	if (call->tx_bottom == call->tx_top)
		goto no_resend;
	if (call->tx_bottom == call->tx_top) {
		call->resend_at = KTIME_MAX;
		trace_rxrpc_timer_can(call, rxrpc_timer_trace_resend);
		return;
	}

	trace_rxrpc_resend(call, ack_serial);

	trace_rxrpc_resend(call, ack_skb);
	tq = call->tx_queue;
	seq = call->tx_bottom;
	/* Scan the transmission queue, looking for explicitly NAK'd packets. */
	do {
		unsigned long naks = ~tq->segment_acked;
		rxrpc_seq_t tq_top = tq->qbase + RXRPC_NR_TXQUEUE - 1;

	/* Scan the soft ACK table and resend any explicitly NAK'd packets. */
	if (ack_skb) {
		sp = rxrpc_skb(ack_skb);
		ack = (void *)ack_skb->data + sizeof(struct rxrpc_wire_header);
		if (after(tq->qbase, call->tx_transmitted))
			break;

		for (int i = 0; i < sp->ack.nr_acks; i++) {
			rxrpc_seq_t aseq;
		if (tq->nr_reported_acks < RXRPC_NR_TXQUEUE)
			naks &= (1UL << tq->nr_reported_acks) - 1;

			if (ack->acks[i] & 1)
				continue;
			aseq = sp->ack.first_ack + i;
			while (after_eq(aseq, tq->qbase + RXRPC_NR_TXQUEUE))
				tq = tq->next;
			seq = aseq;
			qix = seq - tq->qbase;
			txb = tq->bufs[qix];
			if (after(seq, transmitted))
				goto no_further_resend;

			resend_at = ktime_add_us(tq->xmit_ts_base, tq->segment_xmit_ts[qix]);
			resend_at = ktime_add(resend_at, rto);
			if (after(txb->serial, call->acks_highest_serial)) {
				if (ktime_after(resend_at, req.now) &&
				    ktime_before(resend_at, next_resend))
					next_resend = resend_at;
		_debug("retr %16lx %u c=%08x [%x]",
		       tq->segment_acked, tq->nr_reported_acks, call->debug_id, tq->qbase);
		_debug("nack %16lx", naks);

		while (naks) {
			unsigned int ix = __ffs(naks);
			struct rxrpc_txbuf *txb = tq->bufs[ix];

			__clear_bit(ix, &naks);
			if (after(txb->serial, call->acks_highest_serial))
				continue; /* Ack point not yet reached */
			}

			rxrpc_see_txbuf(txb, rxrpc_txbuf_see_unacked);

			req.tq  = tq;
			req.seq = seq;
			req.seq = tq->qbase + ix;
			req.n   = 1;
			rxrpc_retransmit_data(call, &req, rto);

			if (after_eq(seq, call->tx_top))
				goto no_further_resend;
		}
			rxrpc_retransmit_data(call, &req, rto, false);
		}

	/* Fast-forward through the Tx queue to the point the peer says it has
	 * seen.  Anything between the soft-ACK table and that point will get
	 * ACK'd or NACK'd in due course, so don't worry about it here; here we
	 * need to consider retransmitting anything beyond that point.
		/* Anything after the soft-ACK table up to and including
		 * ack.previousPacket will get ACK'd or NACK'd in due course,
		 * so don't worry about those here.  We do, however, need to
		 * consider retransmitting anything beyond that point.
		 */
	seq = call->acks_prev_seq;
	if (after_eq(seq, call->tx_transmitted))
		goto no_further_resend;
	seq++;

	while (after_eq(seq, tq->qbase + RXRPC_NR_TXQUEUE))
		tq = tq->next;

	while (before_eq(seq, call->tx_transmitted)) {
		qix = seq - tq->qbase;
		if (qix >= RXRPC_NR_TXQUEUE) {
			tq = tq->next;
			continue;
		if (tq->nr_reported_acks < RXRPC_NR_TXQUEUE &&
		    after(tq_top, call->acks_prev_seq)) {
			rxrpc_seq_t start = latest(call->acks_prev_seq,
						   tq->qbase + tq->nr_reported_acks);
			rxrpc_seq_t stop = earliest(tq_top, call->tx_transmitted);

			_debug("unrep %x-%x", start, stop);
			for (rxrpc_seq_t seq = start; before(seq, stop); seq++) {
				struct rxrpc_txbuf *txb = tq->bufs[seq & RXRPC_TXQ_MASK];

				if (ping_response &&
				    before(txb->serial, call->acks_highest_serial))
					break; /* Wasn't accounted for by a more recent ping. */
				req.tq  = tq;
				req.seq = seq;
				req.n   = 1;
				if (rxrpc_retransmit_data(call, &req, rto, true))
					unacked = true;
			}
		}
		txb = tq->bufs[qix];
		resend_at = ktime_add_us(tq->xmit_ts_base, tq->segment_xmit_ts[qix]);
		resend_at = ktime_add(resend_at, rto);

		if (ack && ack->reason == RXRPC_ACK_PING_RESPONSE &&
		    before(txb->serial, ntohl(ack->serial)))
			goto do_resend; /* Wasn't accounted for by a more recent ping. */
		/* Work out the next retransmission timeout. */
		if (ktime_before(tq->xmit_ts_base, lowest_xmit_ts)) {
			unsigned int lowest_us = UINT_MAX;

		if (ktime_after(resend_at, req.now)) {
			if (ktime_before(resend_at, next_resend))
				next_resend = resend_at;
			seq++;
			continue;
		}
			for (int i = 0; i < RXRPC_NR_TXQUEUE; i++)
				if (!test_bit(i, &tq->segment_acked) &&
				    tq->segment_xmit_ts[i] < lowest_us)
					lowest_us = tq->segment_xmit_ts[i];
			_debug("lowest[%x] %llx %u", tq->qbase, tq->xmit_ts_base, lowest_us);

	do_resend:
		unacked = true;
			if (lowest_us != UINT_MAX) {
				ktime_t lowest_ns = ktime_add_us(tq->xmit_ts_base, lowest_us);

		req.tq  = tq;
		req.seq = seq;
		req.n   = 1;
		rxrpc_retransmit_data(call, &req, rto);
		seq++;
				if (ktime_before(lowest_ns, lowest_xmit_ts))
					lowest_xmit_ts = lowest_ns;
			}
		}
	} while ((tq = tq->next));

no_further_resend:
no_resend:
	if (resend_at < KTIME_MAX) {
		delay = rxrpc_get_rto_backoff(call->peer, did_send);
		resend_at = ktime_add(resend_at, delay);
	if (lowest_xmit_ts < KTIME_MAX) {
		ktime_t delay = rxrpc_get_rto_backoff(call->peer, req.did_send);
		ktime_t resend_at = ktime_add(lowest_xmit_ts, delay);

		_debug("delay %llu %lld", delay, ktime_sub(resend_at, req.now));
		call->resend_at = resend_at;
		trace_rxrpc_timer_set(call, resend_at - req.now,
				      rxrpc_timer_trace_resend_reset);
	} else {
		call->resend_at = KTIME_MAX;
		trace_rxrpc_timer_can(call, rxrpc_timer_trace_resend);
	}
	call->resend_at = resend_at;

	if (unacked)
		rxrpc_congestion_timeout(call);
@@ -494,7 +487,7 @@ bool rxrpc_input_call_event(struct rxrpc_call *call)
	if (resend &&
	    __rxrpc_call_state(call) != RXRPC_CALL_CLIENT_RECV_REPLY &&
	    !test_bit(RXRPC_CALL_TX_ALL_ACKED, &call->flags))
		rxrpc_resend(call, NULL);
		rxrpc_resend(call, 0, false);

	if (test_and_clear_bit(RXRPC_CALL_RX_IS_IDLE, &call->flags))
		rxrpc_send_ACK(call, RXRPC_ACK_IDLE, 0,
+0 −1
Original line number Diff line number Diff line
@@ -691,7 +691,6 @@ static void rxrpc_destroy_call(struct work_struct *work)

	del_timer_sync(&call->timer);

	rxrpc_free_skb(call->cong_last_nack, rxrpc_skb_put_last_nack);
	rxrpc_cleanup_tx_buffers(call);
	rxrpc_cleanup_rx_buffers(call);
	rxrpc_put_txbuf(call->tx_pending, rxrpc_txbuf_put_cleaned);
+161 −91

File changed.

Preview size limit exceeded, changes collapsed.

Loading