Commit a2ea9a90 authored by David Howells's avatar David Howells Committed by Jakub Kicinski
Browse files

rxrpc: Use irq-disabling spinlocks between app and I/O thread



Where a spinlock is used by both the application thread and the I/O thread,
use irq-disabling locking so that an interrupt taken on the app thread
doesn't also slow down the I/O thread.

Signed-off-by: default avatarDavid Howells <dhowells@redhat.com>
cc: Marc Dionne <marc.dionne@auristor.com>
cc: linux-afs@lists.infradead.org
Signed-off-by: default avatarJakub Kicinski <kuba@kernel.org>
parent 08d55d7c
Loading
Loading
Loading
Loading
+2 −2
Original line number Diff line number Diff line
@@ -408,9 +408,9 @@ void rxrpc_kernel_shutdown_call(struct socket *sock, struct rxrpc_call *call)

		/* Make sure we're not going to call back into a kernel service */
		if (call->notify_rx) {
			spin_lock(&call->notify_lock);
			spin_lock_irq(&call->notify_lock);
			call->notify_rx = rxrpc_dummy_notify_rx;
			spin_unlock(&call->notify_lock);
			spin_unlock_irq(&call->notify_lock);
		}
	}
	mutex_unlock(&call->user_mutex);
+0 −1
Original line number Diff line number Diff line
@@ -700,7 +700,6 @@ struct rxrpc_call {
	struct rxrpc_txqueue	*send_queue;	/* Queue that sendmsg is writing into */

	/* Transmitted data tracking. */
	spinlock_t		tx_lock;	/* Transmit queue lock */
	struct rxrpc_txqueue	*tx_queue;	/* Start of transmission buffers */
	struct rxrpc_txqueue	*tx_qtail;	/* End of transmission buffers */
	rxrpc_seq_t		tx_qbase;	/* First slot in tx_queue */
+10 −10
Original line number Diff line number Diff line
@@ -188,8 +188,8 @@ void rxrpc_discard_prealloc(struct rxrpc_sock *rx)
	/* Make sure that there aren't any incoming calls in progress before we
	 * clear the preallocation buffers.
	 */
	spin_lock(&rx->incoming_lock);
	spin_unlock(&rx->incoming_lock);
	spin_lock_irq(&rx->incoming_lock);
	spin_unlock_irq(&rx->incoming_lock);

	head = b->peer_backlog_head;
	tail = b->peer_backlog_tail;
@@ -343,7 +343,7 @@ bool rxrpc_new_incoming_call(struct rxrpc_local *local,
	if (sp->hdr.type != RXRPC_PACKET_TYPE_DATA)
		return rxrpc_protocol_error(skb, rxrpc_eproto_no_service_call);

	read_lock(&local->services_lock);
	read_lock_irq(&local->services_lock);

	/* Weed out packets to services we're not offering.  Packets that would
	 * begin a call are explicitly rejected and the rest are just
@@ -399,12 +399,12 @@ bool rxrpc_new_incoming_call(struct rxrpc_local *local,
	spin_unlock(&conn->state_lock);

	spin_unlock(&rx->incoming_lock);
	read_unlock(&local->services_lock);
	read_unlock_irq(&local->services_lock);

	if (hlist_unhashed(&call->error_link)) {
		spin_lock(&call->peer->lock);
		spin_lock_irq(&call->peer->lock);
		hlist_add_head(&call->error_link, &call->peer->error_targets);
		spin_unlock(&call->peer->lock);
		spin_unlock_irq(&call->peer->lock);
	}

	_leave(" = %p{%d}", call, call->debug_id);
@@ -413,20 +413,20 @@ bool rxrpc_new_incoming_call(struct rxrpc_local *local,
	return true;

unsupported_service:
	read_unlock(&local->services_lock);
	read_unlock_irq(&local->services_lock);
	return rxrpc_direct_abort(skb, rxrpc_abort_service_not_offered,
				  RX_INVALID_OPERATION, -EOPNOTSUPP);
unsupported_security:
	read_unlock(&local->services_lock);
	read_unlock_irq(&local->services_lock);
	return rxrpc_direct_abort(skb, rxrpc_abort_service_not_offered,
				  RX_INVALID_OPERATION, -EKEYREJECTED);
no_call:
	spin_unlock(&rx->incoming_lock);
	read_unlock(&local->services_lock);
	read_unlock_irq(&local->services_lock);
	_leave(" = f [%u]", skb->mark);
	return false;
discard:
	read_unlock(&local->services_lock);
	read_unlock_irq(&local->services_lock);
	return true;
}

+7 −8
Original line number Diff line number Diff line
@@ -49,7 +49,7 @@ void rxrpc_poke_call(struct rxrpc_call *call, enum rxrpc_call_poke_trace what)
	bool busy;

	if (!test_bit(RXRPC_CALL_DISCONNECTED, &call->flags)) {
		spin_lock_bh(&local->lock);
		spin_lock_irq(&local->lock);
		busy = !list_empty(&call->attend_link);
		trace_rxrpc_poke_call(call, busy, what);
		if (!busy && !rxrpc_try_get_call(call, rxrpc_call_get_poke))
@@ -57,7 +57,7 @@ void rxrpc_poke_call(struct rxrpc_call *call, enum rxrpc_call_poke_trace what)
		if (!busy) {
			list_add_tail(&call->attend_link, &local->call_attend_q);
		}
		spin_unlock_bh(&local->lock);
		spin_unlock_irq(&local->lock);
		if (!busy)
			rxrpc_wake_up_io_thread(local);
	}
@@ -151,7 +151,6 @@ struct rxrpc_call *rxrpc_alloc_call(struct rxrpc_sock *rx, gfp_t gfp,
	skb_queue_head_init(&call->rx_oos_queue);
	init_waitqueue_head(&call->waitq);
	spin_lock_init(&call->notify_lock);
	spin_lock_init(&call->tx_lock);
	refcount_set(&call->ref, 1);
	call->debug_id		= debug_id;
	call->tx_total_len	= -1;
@@ -302,9 +301,9 @@ static int rxrpc_connect_call(struct rxrpc_call *call, gfp_t gfp)

	trace_rxrpc_client(NULL, -1, rxrpc_client_queue_new_call);
	rxrpc_get_call(call, rxrpc_call_get_io_thread);
	spin_lock(&local->client_call_lock);
	spin_lock_irq(&local->client_call_lock);
	list_add_tail(&call->wait_link, &local->new_client_calls);
	spin_unlock(&local->client_call_lock);
	spin_unlock_irq(&local->client_call_lock);
	rxrpc_wake_up_io_thread(local);
	return 0;

@@ -434,7 +433,7 @@ struct rxrpc_call *rxrpc_new_client_call(struct rxrpc_sock *rx,

/*
 * Set up an incoming call.  call->conn points to the connection.
 * This is called in BH context and isn't allowed to fail.
 * This is called with interrupts disabled and isn't allowed to fail.
 */
void rxrpc_incoming_call(struct rxrpc_sock *rx,
			 struct rxrpc_call *call,
@@ -576,7 +575,7 @@ void rxrpc_release_call(struct rxrpc_sock *rx, struct rxrpc_call *call)
	rxrpc_put_call_slot(call);

	/* Make sure we don't get any more notifications */
	spin_lock(&rx->recvmsg_lock);
	spin_lock_irq(&rx->recvmsg_lock);

	if (!list_empty(&call->recvmsg_link)) {
		_debug("unlinking once-pending call %p { e=%lx f=%lx }",
@@ -589,7 +588,7 @@ void rxrpc_release_call(struct rxrpc_sock *rx, struct rxrpc_call *call)
	call->recvmsg_link.next = NULL;
	call->recvmsg_link.prev = NULL;

	spin_unlock(&rx->recvmsg_lock);
	spin_unlock_irq(&rx->recvmsg_lock);
	if (put)
		rxrpc_put_call(call, rxrpc_call_put_unnotify);

+6 −6
Original line number Diff line number Diff line
@@ -510,9 +510,9 @@ void rxrpc_connect_client_calls(struct rxrpc_local *local)
	struct rxrpc_call *call;
	LIST_HEAD(new_client_calls);

	spin_lock(&local->client_call_lock);
	spin_lock_irq(&local->client_call_lock);
	list_splice_tail_init(&local->new_client_calls, &new_client_calls);
	spin_unlock(&local->client_call_lock);
	spin_unlock_irq(&local->client_call_lock);

	while ((call = list_first_entry_or_null(&new_client_calls,
						struct rxrpc_call, wait_link))) {
@@ -547,9 +547,9 @@ void rxrpc_expose_client_call(struct rxrpc_call *call)
			set_bit(RXRPC_CONN_DONT_REUSE, &conn->flags);
		trace_rxrpc_client(conn, channel, rxrpc_client_exposed);

		spin_lock(&call->peer->lock);
		spin_lock_irq(&call->peer->lock);
		hlist_add_head(&call->error_link, &call->peer->error_targets);
		spin_unlock(&call->peer->lock);
		spin_unlock_irq(&call->peer->lock);
	}
}

@@ -590,9 +590,9 @@ void rxrpc_disconnect_client_call(struct rxrpc_bundle *bundle, struct rxrpc_call
		ASSERTCMP(call->call_id, ==, 0);
		ASSERT(!test_bit(RXRPC_CALL_EXPOSED, &call->flags));
		/* May still be on ->new_client_calls. */
		spin_lock(&local->client_call_lock);
		spin_lock_irq(&local->client_call_lock);
		list_del_init(&call->wait_link);
		spin_unlock(&local->client_call_lock);
		spin_unlock_irq(&local->client_call_lock);
		return;
	}

Loading