Commit db69e9b8 authored by Gerd Rausch's avatar Gerd Rausch Committed by Jakub Kicinski
Browse files

net/rds: rds_tcp_accept_one ought to not discard messages



RDS/TCP differs from RDS/RDMA in that message acknowledgment
is done based on TCP sequence numbers:
As soon as the last byte of a message has been acknowledged
by the TCP stack of a peer, "rds_tcp_write_space()" goes on
to discard prior messages from the send queue.

Which is fine, for as long as the receiver never throws any messages away.

Unfortunately, that is *not* the case since the introduction of MPRDS:
commit 1a0e100f "RDS: TCP: Enable multipath RDS for TCP"

A new function "rds_tcp_accept_one_path" was introduced,
which is entitled to return "NULL", if no connection path is currently
available.

Unfortunately, this happens after the "->accept()" call, and the new socket
often already contains messages, since the peer already transitioned
to "RDS_CONN_UP" on behalf of "TCP_ESTABLISHED".

That's also the case after this [1]:
commit 1a0e100f "RDS: TCP: Force every connection to be initiated by
numerically smaller IP address"

which tried to address the situation of pending data by only transitioning
connections from a smaller IP address to "RDS_CONN_UP".

But even in those cases, and in particular if the "RDS_EXTHDR_NPATHS"
handshake has not occurred yet, and therefore we're working with
"c_npaths <= 1", "c_conn[0]" may be in a state distinct from
"RDS_CONN_DOWN", and therefore all messages on the just accepted socket
will be tossed away.

This fix changes "rds_tcp_accept_one":

* If connected from a peer with a larger IP address, the new socket
  will continue to get closed right away.
  With commit [1] above, there should not be any messages
  in the socket receive buffer, since the peer never transitioned
  to "RDS_CONN_UP".
  Therefore it should be okay to not make any efforts to dispatch
  the socket receive buffer.

* If connected from a peer with a smaller IP address,
  we call "rds_tcp_accept_one_path" to find a free slot/"path".
  If found, business goes on as usual.
  If none was found, we save/stash the newly accepted socket
  into "rds_tcp_accepted_sock", in order to not lose any
  messages that may have arrived already.
  We then return from "rds_tcp_accept_one" with "-ENOBUFS".
  Later on, when a slot/"path" does become available again
  (e.g. state transitioned to "RDS_CONN_DOWN",
   or HS extension header was received with "c_npaths > 1")
  we call "rds_tcp_conn_slots_available" that simply re-issues
  a "rds_tcp_accept_one_path" worker-callback and picks
  up the new socket from "rds_tcp_accepted_sock", and thereby
  continuing where it left with "-ENOBUFS" last time.
  Since a new slot has become available, those messages
  won't be lost, since processing proceeds as if that slot
  had been available the first time around.

Signed-off-by: default avatarGerd Rausch <gerd.rausch@oracle.com>
Signed-off-by: default avatarJack Vogel <jack.vogel@oracle.com>
Signed-off-by: default avatarAllison Henderson <allison.henderson@oracle.com>
Link: https://patch.msgid.link/20260122055213.83608-3-achender@kernel.org


Signed-off-by: default avatarJakub Kicinski <kuba@kernel.org>
parent ad22d24b
Loading
Loading
Loading
Loading
+3 −0
Original line number Diff line number Diff line
@@ -449,6 +449,9 @@ void rds_conn_shutdown(struct rds_conn_path *cp)
	} else {
		rcu_read_unlock();
	}

	if (conn->c_trans->conn_slots_available)
		conn->c_trans->conn_slots_available(conn);
}

/* destroy a single rds_conn_path. rds_conn_destroy() iterates over
+39 −27
Original line number Diff line number Diff line
@@ -506,33 +506,6 @@ struct rds_notifier {
 */
#define	RDS_TRANS_LOOP	3

/**
 * struct rds_transport -  transport specific behavioural hooks
 *
 * @xmit: .xmit is called by rds_send_xmit() to tell the transport to send
 *        part of a message.  The caller serializes on the send_sem so this
 *        doesn't need to be reentrant for a given conn.  The header must be
 *        sent before the data payload.  .xmit must be prepared to send a
 *        message with no data payload.  .xmit should return the number of
 *        bytes that were sent down the connection, including header bytes.
 *        Returning 0 tells the caller that it doesn't need to perform any
 *        additional work now.  This is usually the case when the transport has
 *        filled the sending queue for its connection and will handle
 *        triggering the rds thread to continue the send when space becomes
 *        available.  Returning -EAGAIN tells the caller to retry the send
 *        immediately.  Returning -ENOMEM tells the caller to retry the send at
 *        some point in the future.
 *
 * @conn_shutdown: conn_shutdown stops traffic on the given connection.  Once
 *                 it returns the connection can not call rds_recv_incoming().
 *                 This will only be called once after conn_connect returns
 *                 non-zero success and will The caller serializes this with
 *                 the send and connecting paths (xmit_* and conn_*).  The
 *                 transport is responsible for other serialization, including
 *                 rds_recv_incoming().  This is called in process context but
 *                 should try hard not to block.
 */

struct rds_transport {
	char			t_name[TRANSNAMSIZ];
	struct list_head	t_item;
@@ -545,10 +518,49 @@ struct rds_transport {
			   __u32 scope_id);
	int (*conn_alloc)(struct rds_connection *conn, gfp_t gfp);
	void (*conn_free)(void *data);

	/*
	 * conn_slots_available is invoked when a previously unavailable
	 * connection slot becomes available again. rds_tcp_accept_one_path may
	 * return -ENOBUFS if it cannot find an available slot, and then stashes
	 * the new socket in "rds_tcp_accepted_sock". This function re-issues
	 * `rds_tcp_accept_one_path`, which picks up the stashed socket and
	 * continuing where it left with "-ENOBUFS" last time.  This ensures
	 * messages received on the new socket are not discarded when no
	 * connection path was available at the time.
	 */
	void (*conn_slots_available)(struct rds_connection *conn);
	int (*conn_path_connect)(struct rds_conn_path *cp);

	/*
	 * conn_shutdown stops traffic on the given connection.  Once
	 * it returns the connection can not call rds_recv_incoming().
	 * This will only be called once after conn_connect returns
	 * non-zero success and will The caller serializes this with
	 * the send and connecting paths (xmit_* and conn_*).  The
	 * transport is responsible for other serialization, including
	 * rds_recv_incoming().  This is called in process context but
	 * should try hard not to block.
	 */
	void (*conn_path_shutdown)(struct rds_conn_path *conn);
	void (*xmit_path_prepare)(struct rds_conn_path *cp);
	void (*xmit_path_complete)(struct rds_conn_path *cp);

	/*
	 * .xmit is called by rds_send_xmit() to tell the transport to send
	 * part of a message.  The caller serializes on the send_sem so this
	 * doesn't need to be reentrant for a given conn.  The header must be
	 * sent before the data payload.  .xmit must be prepared to send a
	 * message with no data payload.  .xmit should return the number of
	 * bytes that were sent down the connection, including header bytes.
	 * Returning 0 tells the caller that it doesn't need to perform any
	 * additional work now.  This is usually the case when the transport has
	 * filled the sending queue for its connection and will handle
	 * triggering the rds thread to continue the send when space becomes
	 * available.  Returning -EAGAIN tells the caller to retry the send
	 * immediately.  Returning -ENOMEM tells the caller to retry the send at
	 * some point in the future.
	 */
	int (*xmit)(struct rds_connection *conn, struct rds_message *rm,
		    unsigned int hdr_off, unsigned int sg, unsigned int off);
	int (*xmit_rdma)(struct rds_connection *conn, struct rm_rdma_op *op);
+4 −0
Original line number Diff line number Diff line
@@ -230,6 +230,10 @@ static void rds_recv_hs_exthdrs(struct rds_header *hdr,
	conn->c_npaths = max_t(int, conn->c_npaths, 1);
	conn->c_ping_triggered = 0;
	rds_conn_peer_gen_update(conn, new_peer_gen_num);

	if (conn->c_npaths > 1 &&
	    conn->c_trans->conn_slots_available)
		conn->c_trans->conn_slots_available(conn);
}

/* rds_start_mprds() will synchronously start multiple paths when appropriate.
+11 −16
Original line number Diff line number Diff line
@@ -213,6 +213,8 @@ void rds_tcp_set_callbacks(struct socket *sock, struct rds_conn_path *cp)
		sock->sk->sk_data_ready = sock->sk->sk_user_data;

	tc->t_sock = sock;
	if (!tc->t_rtn)
		tc->t_rtn = net_generic(sock_net(sock->sk), rds_tcp_netid);
	tc->t_cpath = cp;
	tc->t_orig_data_ready = sock->sk->sk_data_ready;
	tc->t_orig_write_space = sock->sk->sk_write_space;
@@ -378,6 +380,7 @@ static int rds_tcp_conn_alloc(struct rds_connection *conn, gfp_t gfp)
		}
		mutex_init(&tc->t_conn_path_lock);
		tc->t_sock = NULL;
		tc->t_rtn = NULL;
		tc->t_tinc = NULL;
		tc->t_tinc_hdr_rem = sizeof(struct rds_header);
		tc->t_tinc_data_rem = 0;
@@ -458,6 +461,7 @@ struct rds_transport rds_tcp_transport = {
	.recv_path		= rds_tcp_recv_path,
	.conn_alloc		= rds_tcp_conn_alloc,
	.conn_free		= rds_tcp_conn_free,
	.conn_slots_available	= rds_tcp_conn_slots_available,
	.conn_path_connect	= rds_tcp_conn_path_connect,
	.conn_path_shutdown	= rds_tcp_conn_path_shutdown,
	.inc_copy_to_user	= rds_tcp_inc_copy_to_user,
@@ -473,17 +477,7 @@ struct rds_transport rds_tcp_transport = {
	.t_unloading		= rds_tcp_is_unloading,
};

static unsigned int rds_tcp_netid;

/* per-network namespace private data for this module */
struct rds_tcp_net {
	struct socket *rds_tcp_listen_sock;
	struct work_struct rds_tcp_accept_w;
	struct ctl_table_header *rds_tcp_sysctl;
	struct ctl_table *ctl_table;
	int sndbuf_size;
	int rcvbuf_size;
};
int rds_tcp_netid;

/* All module specific customizations to the RDS-TCP socket should be done in
 * rds_tcp_tune() and applied after socket creation.
@@ -526,15 +520,12 @@ static void rds_tcp_accept_worker(struct work_struct *work)
					       struct rds_tcp_net,
					       rds_tcp_accept_w);

	while (rds_tcp_accept_one(rtn->rds_tcp_listen_sock) == 0)
	while (rds_tcp_accept_one(rtn) == 0)
		cond_resched();
}

void rds_tcp_accept_work(struct sock *sk)
void rds_tcp_accept_work(struct rds_tcp_net *rtn)
{
	struct net *net = sock_net(sk);
	struct rds_tcp_net *rtn = net_generic(net, rds_tcp_netid);

	queue_work(rds_wq, &rtn->rds_tcp_accept_w);
}

@@ -546,6 +537,8 @@ static __net_init int rds_tcp_init_net(struct net *net)

	memset(rtn, 0, sizeof(*rtn));

	mutex_init(&rtn->rds_tcp_accept_lock);

	/* {snd, rcv}buf_size default to 0, which implies we let the
	 * stack pick the value, and permit auto-tuning of buffer size.
	 */
@@ -609,6 +602,8 @@ static void rds_tcp_kill_sock(struct net *net)

	rtn->rds_tcp_listen_sock = NULL;
	rds_tcp_listen_stop(lsock, &rtn->rds_tcp_accept_w);
	if (rtn->rds_tcp_accepted_sock)
		sock_release(rtn->rds_tcp_accepted_sock);
	spin_lock_irq(&rds_tcp_conn_lock);
	list_for_each_entry_safe(tc, _tc, &rds_tcp_conn_list, t_tcp_node) {
		struct net *c_net = read_pnet(&tc->t_cpath->cp_conn->c_net);
+20 −2
Original line number Diff line number Diff line
@@ -4,6 +4,21 @@

#define RDS_TCP_PORT	16385

/* per-network namespace private data for this module */
struct rds_tcp_net {
	/* serialize "rds_tcp_accept_one" with "rds_tcp_accept_lock"
	 * to protect "rds_tcp_accepted_sock"
	 */
	struct mutex		rds_tcp_accept_lock;
	struct socket		*rds_tcp_listen_sock;
	struct socket		*rds_tcp_accepted_sock;
	struct work_struct	rds_tcp_accept_w;
	struct ctl_table_header	*rds_tcp_sysctl;
	const struct ctl_table	*ctl_table;
	int			sndbuf_size;
	int			rcvbuf_size;
};

struct rds_tcp_incoming {
	struct rds_incoming	ti_inc;
	struct sk_buff_head	ti_skb_list;
@@ -19,6 +34,7 @@ struct rds_tcp_connection {
	 */
	struct mutex		t_conn_path_lock;
	struct socket		*t_sock;
	struct rds_tcp_net	*t_rtn;
	void			*t_orig_write_space;
	void			*t_orig_data_ready;
	void			*t_orig_state_change;
@@ -49,6 +65,7 @@ struct rds_tcp_statistics {
};

/* tcp.c */
extern int rds_tcp_netid;
bool rds_tcp_tune(struct socket *sock);
void rds_tcp_set_callbacks(struct socket *sock, struct rds_conn_path *cp);
void rds_tcp_reset_callbacks(struct socket *sock, struct rds_conn_path *cp);
@@ -57,7 +74,7 @@ void rds_tcp_restore_callbacks(struct socket *sock,
u32 rds_tcp_write_seq(struct rds_tcp_connection *tc);
u32 rds_tcp_snd_una(struct rds_tcp_connection *tc);
extern struct rds_transport rds_tcp_transport;
void rds_tcp_accept_work(struct sock *sk);
void rds_tcp_accept_work(struct rds_tcp_net *rtn);
int rds_tcp_laddr_check(struct net *net, const struct in6_addr *addr,
			__u32 scope_id);
/* tcp_connect.c */
@@ -69,7 +86,8 @@ void rds_tcp_state_change(struct sock *sk);
struct socket *rds_tcp_listen_init(struct net *net, bool isv6);
void rds_tcp_listen_stop(struct socket *sock, struct work_struct *acceptor);
void rds_tcp_listen_data_ready(struct sock *sk);
int rds_tcp_accept_one(struct socket *sock);
void rds_tcp_conn_slots_available(struct rds_connection *conn);
int rds_tcp_accept_one(struct rds_tcp_net *rtn);
void rds_tcp_keepalive(struct socket *sock);
void *rds_tcp_listen_sock_def_readable(struct net *net);

Loading