Commit eac026ff authored by Jakub Kicinski's avatar Jakub Kicinski
Browse files

Merge branch 'net-rds-rds-tcp-state-machine-and-message-loss-improvements'

Allison Henderson says:

====================
net/rds: RDS-TCP state machine and message loss improvements

This is subset 2 of the larger RDS-TCP patch series I posted last
Oct.  The greater series aims to correct multiple rds-tcp issues that
can cause dropped or out of sequence messages.  I've broken it down into
smaller sets to make reviews more manageable.

In this set, we correct a few RDS/TCP connection handling issues, and
message loss issues.
====================

Link: https://patch.msgid.link/20260122055213.83608-1-achender@kernel.org


Signed-off-by: default avatarJakub Kicinski <kuba@kernel.org>
parents 6a9e8b60 db69e9b8
Loading
Loading
Loading
Loading
+5 −0
Original line number Diff line number Diff line
@@ -395,6 +395,8 @@ void rds_conn_shutdown(struct rds_conn_path *cp)
		if (!rds_conn_path_transition(cp, RDS_CONN_UP,
					      RDS_CONN_DISCONNECTING) &&
		    !rds_conn_path_transition(cp, RDS_CONN_ERROR,
					      RDS_CONN_DISCONNECTING) &&
		    !rds_conn_path_transition(cp, RDS_CONN_RESETTING,
					      RDS_CONN_DISCONNECTING)) {
			rds_conn_path_error(cp,
					    "shutdown called in state %d\n",
@@ -447,6 +449,9 @@ void rds_conn_shutdown(struct rds_conn_path *cp)
	} else {
		rcu_read_unlock();
	}

	if (conn->c_trans->conn_slots_available)
		conn->c_trans->conn_slots_available(conn);
}

/* destroy a single rds_conn_path. rds_conn_destroy() iterates over
+39 −27
Original line number Diff line number Diff line
@@ -506,33 +506,6 @@ struct rds_notifier {
 */
#define	RDS_TRANS_LOOP	3

/**
 * struct rds_transport -  transport specific behavioural hooks
 *
 * @xmit: .xmit is called by rds_send_xmit() to tell the transport to send
 *        part of a message.  The caller serializes on the send_sem so this
 *        doesn't need to be reentrant for a given conn.  The header must be
 *        sent before the data payload.  .xmit must be prepared to send a
 *        message with no data payload.  .xmit should return the number of
 *        bytes that were sent down the connection, including header bytes.
 *        Returning 0 tells the caller that it doesn't need to perform any
 *        additional work now.  This is usually the case when the transport has
 *        filled the sending queue for its connection and will handle
 *        triggering the rds thread to continue the send when space becomes
 *        available.  Returning -EAGAIN tells the caller to retry the send
 *        immediately.  Returning -ENOMEM tells the caller to retry the send at
 *        some point in the future.
 *
 * @conn_shutdown: conn_shutdown stops traffic on the given connection.  Once
 *                 it returns the connection can not call rds_recv_incoming().
 *                 This will only be called once after conn_connect returns
 *                 non-zero success and will The caller serializes this with
 *                 the send and connecting paths (xmit_* and conn_*).  The
 *                 transport is responsible for other serialization, including
 *                 rds_recv_incoming().  This is called in process context but
 *                 should try hard not to block.
 */

struct rds_transport {
	char			t_name[TRANSNAMSIZ];
	struct list_head	t_item;
@@ -545,10 +518,49 @@ struct rds_transport {
			   __u32 scope_id);
	int (*conn_alloc)(struct rds_connection *conn, gfp_t gfp);
	void (*conn_free)(void *data);

	/*
	 * conn_slots_available is invoked when a previously unavailable
	 * connection slot becomes available again. rds_tcp_accept_one_path may
	 * return -ENOBUFS if it cannot find an available slot, and then stashes
	 * the new socket in "rds_tcp_accepted_sock". This function re-issues
	 * `rds_tcp_accept_one_path`, which picks up the stashed socket and
	 * continuing where it left with "-ENOBUFS" last time.  This ensures
	 * messages received on the new socket are not discarded when no
	 * connection path was available at the time.
	 */
	void (*conn_slots_available)(struct rds_connection *conn);
	int (*conn_path_connect)(struct rds_conn_path *cp);

	/*
	 * conn_shutdown stops traffic on the given connection.  Once
	 * it returns the connection can not call rds_recv_incoming().
	 * This will only be called once after conn_connect returns
	 * non-zero success and will The caller serializes this with
	 * the send and connecting paths (xmit_* and conn_*).  The
	 * transport is responsible for other serialization, including
	 * rds_recv_incoming().  This is called in process context but
	 * should try hard not to block.
	 */
	void (*conn_path_shutdown)(struct rds_conn_path *conn);
	void (*xmit_path_prepare)(struct rds_conn_path *cp);
	void (*xmit_path_complete)(struct rds_conn_path *cp);

	/*
	 * .xmit is called by rds_send_xmit() to tell the transport to send
	 * part of a message.  The caller serializes on the send_sem so this
	 * doesn't need to be reentrant for a given conn.  The header must be
	 * sent before the data payload.  .xmit must be prepared to send a
	 * message with no data payload.  .xmit should return the number of
	 * bytes that were sent down the connection, including header bytes.
	 * Returning 0 tells the caller that it doesn't need to perform any
	 * additional work now.  This is usually the case when the transport has
	 * filled the sending queue for its connection and will handle
	 * triggering the rds thread to continue the send when space becomes
	 * available.  Returning -EAGAIN tells the caller to retry the send
	 * immediately.  Returning -ENOMEM tells the caller to retry the send at
	 * some point in the future.
	 */
	int (*xmit)(struct rds_connection *conn, struct rds_message *rm,
		    unsigned int hdr_off, unsigned int sg, unsigned int off);
	int (*xmit_rdma)(struct rds_connection *conn, struct rm_rdma_op *op);
+4 −0
Original line number Diff line number Diff line
@@ -230,6 +230,10 @@ static void rds_recv_hs_exthdrs(struct rds_header *hdr,
	conn->c_npaths = max_t(int, conn->c_npaths, 1);
	conn->c_ping_triggered = 0;
	rds_conn_peer_gen_update(conn, new_peer_gen_num);

	if (conn->c_npaths > 1 &&
	    conn->c_trans->conn_slots_available)
		conn->c_trans->conn_slots_available(conn);
}

/* rds_start_mprds() will synchronously start multiple paths when appropriate.
+11 −16
Original line number Diff line number Diff line
@@ -213,6 +213,8 @@ void rds_tcp_set_callbacks(struct socket *sock, struct rds_conn_path *cp)
		sock->sk->sk_data_ready = sock->sk->sk_user_data;

	tc->t_sock = sock;
	if (!tc->t_rtn)
		tc->t_rtn = net_generic(sock_net(sock->sk), rds_tcp_netid);
	tc->t_cpath = cp;
	tc->t_orig_data_ready = sock->sk->sk_data_ready;
	tc->t_orig_write_space = sock->sk->sk_write_space;
@@ -378,6 +380,7 @@ static int rds_tcp_conn_alloc(struct rds_connection *conn, gfp_t gfp)
		}
		mutex_init(&tc->t_conn_path_lock);
		tc->t_sock = NULL;
		tc->t_rtn = NULL;
		tc->t_tinc = NULL;
		tc->t_tinc_hdr_rem = sizeof(struct rds_header);
		tc->t_tinc_data_rem = 0;
@@ -458,6 +461,7 @@ struct rds_transport rds_tcp_transport = {
	.recv_path		= rds_tcp_recv_path,
	.conn_alloc		= rds_tcp_conn_alloc,
	.conn_free		= rds_tcp_conn_free,
	.conn_slots_available	= rds_tcp_conn_slots_available,
	.conn_path_connect	= rds_tcp_conn_path_connect,
	.conn_path_shutdown	= rds_tcp_conn_path_shutdown,
	.inc_copy_to_user	= rds_tcp_inc_copy_to_user,
@@ -473,17 +477,7 @@ struct rds_transport rds_tcp_transport = {
	.t_unloading		= rds_tcp_is_unloading,
};

static unsigned int rds_tcp_netid;

/* per-network namespace private data for this module */
struct rds_tcp_net {
	struct socket *rds_tcp_listen_sock;
	struct work_struct rds_tcp_accept_w;
	struct ctl_table_header *rds_tcp_sysctl;
	struct ctl_table *ctl_table;
	int sndbuf_size;
	int rcvbuf_size;
};
int rds_tcp_netid;

/* All module specific customizations to the RDS-TCP socket should be done in
 * rds_tcp_tune() and applied after socket creation.
@@ -526,15 +520,12 @@ static void rds_tcp_accept_worker(struct work_struct *work)
					       struct rds_tcp_net,
					       rds_tcp_accept_w);

	while (rds_tcp_accept_one(rtn->rds_tcp_listen_sock) == 0)
	while (rds_tcp_accept_one(rtn) == 0)
		cond_resched();
}

void rds_tcp_accept_work(struct sock *sk)
void rds_tcp_accept_work(struct rds_tcp_net *rtn)
{
	struct net *net = sock_net(sk);
	struct rds_tcp_net *rtn = net_generic(net, rds_tcp_netid);

	queue_work(rds_wq, &rtn->rds_tcp_accept_w);
}

@@ -546,6 +537,8 @@ static __net_init int rds_tcp_init_net(struct net *net)

	memset(rtn, 0, sizeof(*rtn));

	mutex_init(&rtn->rds_tcp_accept_lock);

	/* {snd, rcv}buf_size default to 0, which implies we let the
	 * stack pick the value, and permit auto-tuning of buffer size.
	 */
@@ -609,6 +602,8 @@ static void rds_tcp_kill_sock(struct net *net)

	rtn->rds_tcp_listen_sock = NULL;
	rds_tcp_listen_stop(lsock, &rtn->rds_tcp_accept_w);
	if (rtn->rds_tcp_accepted_sock)
		sock_release(rtn->rds_tcp_accepted_sock);
	spin_lock_irq(&rds_tcp_conn_lock);
	list_for_each_entry_safe(tc, _tc, &rds_tcp_conn_list, t_tcp_node) {
		struct net *c_net = read_pnet(&tc->t_cpath->cp_conn->c_net);
+20 −2
Original line number Diff line number Diff line
@@ -4,6 +4,21 @@

#define RDS_TCP_PORT	16385

/* per-network namespace private data for this module */
struct rds_tcp_net {
	/* serialize "rds_tcp_accept_one" with "rds_tcp_accept_lock"
	 * to protect "rds_tcp_accepted_sock"
	 */
	struct mutex		rds_tcp_accept_lock;
	struct socket		*rds_tcp_listen_sock;
	struct socket		*rds_tcp_accepted_sock;
	struct work_struct	rds_tcp_accept_w;
	struct ctl_table_header	*rds_tcp_sysctl;
	const struct ctl_table	*ctl_table;
	int			sndbuf_size;
	int			rcvbuf_size;
};

struct rds_tcp_incoming {
	struct rds_incoming	ti_inc;
	struct sk_buff_head	ti_skb_list;
@@ -19,6 +34,7 @@ struct rds_tcp_connection {
	 */
	struct mutex		t_conn_path_lock;
	struct socket		*t_sock;
	struct rds_tcp_net	*t_rtn;
	void			*t_orig_write_space;
	void			*t_orig_data_ready;
	void			*t_orig_state_change;
@@ -49,6 +65,7 @@ struct rds_tcp_statistics {
};

/* tcp.c */
extern int rds_tcp_netid;
bool rds_tcp_tune(struct socket *sock);
void rds_tcp_set_callbacks(struct socket *sock, struct rds_conn_path *cp);
void rds_tcp_reset_callbacks(struct socket *sock, struct rds_conn_path *cp);
@@ -57,7 +74,7 @@ void rds_tcp_restore_callbacks(struct socket *sock,
u32 rds_tcp_write_seq(struct rds_tcp_connection *tc);
u32 rds_tcp_snd_una(struct rds_tcp_connection *tc);
extern struct rds_transport rds_tcp_transport;
void rds_tcp_accept_work(struct sock *sk);
void rds_tcp_accept_work(struct rds_tcp_net *rtn);
int rds_tcp_laddr_check(struct net *net, const struct in6_addr *addr,
			__u32 scope_id);
/* tcp_connect.c */
@@ -69,7 +86,8 @@ void rds_tcp_state_change(struct sock *sk);
struct socket *rds_tcp_listen_init(struct net *net, bool isv6);
void rds_tcp_listen_stop(struct socket *sock, struct work_struct *acceptor);
void rds_tcp_listen_data_ready(struct sock *sk);
int rds_tcp_accept_one(struct socket *sock);
void rds_tcp_conn_slots_available(struct rds_connection *conn);
int rds_tcp_accept_one(struct rds_tcp_net *rtn);
void rds_tcp_keepalive(struct socket *sock);
void *rds_tcp_listen_sock_def_readable(struct net *net);

Loading