Commit 1fcb932c authored by Frederic Weisbecker's avatar Frederic Weisbecker Committed by Neeraj Upadhyay
Browse files

rcu/nocb: Simplify (de-)offloading state machine



Now that the (de-)offloading process can only apply to offline CPUs,
there is no more concurrency between rcu_core and nocb kthreads. Also
the mutation now happens on empty queues.

Therefore the state machine can be reduced to a single bit called
SEGCBLIST_OFFLOADED. Simplify the transition as follows:

* Upon offloading: queue the rdp to be added to the rcuog list and
  wait for the rcuog kthread to set the SEGCBLIST_OFFLOADED bit. Unpark
  rcuo kthread.

* Upon de-offloading: Park rcuo kthread. Queue the rdp to be removed
  from the rcuog list and wait for the rcuog kthread to clear the
  SEGCBLIST_OFFLOADED bit.

Signed-off-by: default avatarFrederic Weisbecker <frederic@kernel.org>
Signed-off-by: default avatarPaul E. McKenney <paulmck@kernel.org>
Reviewed-by: default avatarPaul E. McKenney <paulmck@kernel.org>
Signed-off-by: default avatarNeeraj Upadhyay <neeraj.upadhyay@kernel.org>
parent 91e43b90
Loading
Loading
Loading
Loading
+1 −3
Original line number Diff line number Diff line
@@ -185,9 +185,7 @@ struct rcu_cblist {
 *  ----------------------------------------------------------------------------
 */
#define SEGCBLIST_ENABLED	BIT(0)
#define SEGCBLIST_LOCKING	BIT(1)
#define SEGCBLIST_KTHREAD_GP	BIT(2)
#define SEGCBLIST_OFFLOADED	BIT(3)
#define SEGCBLIST_OFFLOADED	BIT(1)

struct rcu_segcblist {
	struct rcu_head *head;
+0 −11
Original line number Diff line number Diff line
@@ -260,17 +260,6 @@ void rcu_segcblist_disable(struct rcu_segcblist *rsclp)
	rcu_segcblist_clear_flags(rsclp, SEGCBLIST_ENABLED);
}

/*
 * Mark the specified rcu_segcblist structure as offloaded (or not)
 */
void rcu_segcblist_offload(struct rcu_segcblist *rsclp, bool offload)
{
	if (offload)
		rcu_segcblist_set_flags(rsclp, SEGCBLIST_LOCKING | SEGCBLIST_OFFLOADED);
	else
		rcu_segcblist_clear_flags(rsclp, SEGCBLIST_OFFLOADED);
}

/*
 * Does the specified rcu_segcblist structure contain callbacks that
 * are ready to be invoked?
+1 −1
Original line number Diff line number Diff line
@@ -89,7 +89,7 @@ static inline bool rcu_segcblist_is_enabled(struct rcu_segcblist *rsclp)
static inline bool rcu_segcblist_is_offloaded(struct rcu_segcblist *rsclp)
{
	if (IS_ENABLED(CONFIG_RCU_NOCB_CPU) &&
	    rcu_segcblist_test_flags(rsclp, SEGCBLIST_LOCKING))
	    rcu_segcblist_test_flags(rsclp, SEGCBLIST_OFFLOADED))
		return true;

	return false;
+66 −72
Original line number Diff line number Diff line
@@ -604,37 +604,33 @@ static void call_rcu_nocb(struct rcu_data *rdp, struct rcu_head *head,
	}
}

static int nocb_gp_toggle_rdp(struct rcu_data *rdp)
static void nocb_gp_toggle_rdp(struct rcu_data *rdp_gp, struct rcu_data *rdp)
{
	struct rcu_segcblist *cblist = &rdp->cblist;
	unsigned long flags;
	int ret;

	rcu_nocb_lock_irqsave(rdp, flags);
	if (rcu_segcblist_test_flags(cblist, SEGCBLIST_OFFLOADED) &&
	    !rcu_segcblist_test_flags(cblist, SEGCBLIST_KTHREAD_GP)) {
	/*
	 * Locking orders future de-offloaded callbacks enqueue against previous
	 * handling of this rdp. Ie: Make sure rcuog is done with this rdp before
	 * deoffloaded callbacks can be enqueued.
	 */
	raw_spin_lock_irqsave(&rdp->nocb_lock, flags);
	if (!rcu_segcblist_test_flags(cblist, SEGCBLIST_OFFLOADED)) {
		/*
		 * Offloading. Set our flag and notify the offload worker.
		 * We will handle this rdp until it ever gets de-offloaded.
		 */
		rcu_segcblist_set_flags(cblist, SEGCBLIST_KTHREAD_GP);
		ret = 1;
	} else if (!rcu_segcblist_test_flags(cblist, SEGCBLIST_OFFLOADED) &&
		   rcu_segcblist_test_flags(cblist, SEGCBLIST_KTHREAD_GP)) {
		list_add_tail(&rdp->nocb_entry_rdp, &rdp_gp->nocb_head_rdp);
		rcu_segcblist_set_flags(cblist, SEGCBLIST_OFFLOADED);
	} else {
		/*
		 * De-offloading. Clear our flag and notify the de-offload worker.
		 * We will ignore this rdp until it ever gets re-offloaded.
		 */
		rcu_segcblist_clear_flags(cblist, SEGCBLIST_KTHREAD_GP);
		ret = 0;
	} else {
		WARN_ON_ONCE(1);
		ret = -1;
		list_del(&rdp->nocb_entry_rdp);
		rcu_segcblist_clear_flags(cblist, SEGCBLIST_OFFLOADED);
	}

	rcu_nocb_unlock_irqrestore(rdp, flags);

	return ret;
	raw_spin_unlock_irqrestore(&rdp->nocb_lock, flags);
}

static void nocb_gp_sleep(struct rcu_data *my_rdp, int cpu)
@@ -841,14 +837,7 @@ static void nocb_gp_wait(struct rcu_data *my_rdp)
	}

	if (rdp_toggling) {
		int ret;

		ret = nocb_gp_toggle_rdp(rdp_toggling);
		if (ret == 1)
			list_add_tail(&rdp_toggling->nocb_entry_rdp, &my_rdp->nocb_head_rdp);
		else if (ret == 0)
			list_del(&rdp_toggling->nocb_entry_rdp);

		nocb_gp_toggle_rdp(my_rdp, rdp_toggling);
		swake_up_one(&rdp_toggling->nocb_state_wq);
	}

@@ -1018,16 +1007,11 @@ void rcu_nocb_flush_deferred_wakeup(void)
}
EXPORT_SYMBOL_GPL(rcu_nocb_flush_deferred_wakeup);

static int rdp_offload_toggle(struct rcu_data *rdp,
			       bool offload, unsigned long flags)
	__releases(rdp->nocb_lock)
static int rcu_nocb_queue_toggle_rdp(struct rcu_data *rdp)
{
	struct rcu_segcblist *cblist = &rdp->cblist;
	struct rcu_data *rdp_gp = rdp->nocb_gp_rdp;
	bool wake_gp = false;

	rcu_segcblist_offload(cblist, offload);
	rcu_nocb_unlock_irqrestore(rdp, flags);
	unsigned long flags;

	raw_spin_lock_irqsave(&rdp_gp->nocb_gp_lock, flags);
	// Queue this rdp for add/del to/from the list to iterate on rcuog
@@ -1041,9 +1025,25 @@ static int rdp_offload_toggle(struct rcu_data *rdp,
	return wake_gp;
}

static bool rcu_nocb_rdp_deoffload_wait_cond(struct rcu_data *rdp)
{
	unsigned long flags;
	bool ret;

	/*
	 * Locking makes sure rcuog is done handling this rdp before deoffloaded
	 * enqueue can happen. Also it keeps the SEGCBLIST_OFFLOADED flag stable
	 * while the ->nocb_lock is held.
	 */
	raw_spin_lock_irqsave(&rdp->nocb_lock, flags);
	ret = !rcu_segcblist_test_flags(&rdp->cblist, SEGCBLIST_OFFLOADED);
	raw_spin_unlock_irqrestore(&rdp->nocb_lock, flags);

	return ret;
}

static int rcu_nocb_rdp_deoffload(struct rcu_data *rdp)
{
	struct rcu_segcblist *cblist = &rdp->cblist;
	unsigned long flags;
	int wake_gp;
	struct rcu_data *rdp_gp = rdp->nocb_gp_rdp;
@@ -1056,51 +1056,42 @@ static int rcu_nocb_rdp_deoffload(struct rcu_data *rdp)
	/* Flush all callbacks from segcblist and bypass */
	rcu_barrier();

	/*
	 * Make sure the rcuoc kthread isn't in the middle of a nocb locked
	 * sequence while offloading is deactivated, along with nocb locking.
	 */
	if (rdp->nocb_cb_kthread)
		kthread_park(rdp->nocb_cb_kthread);

	rcu_nocb_lock_irqsave(rdp, flags);
	WARN_ON_ONCE(rcu_cblist_n_cbs(&rdp->nocb_bypass));
	WARN_ON_ONCE(rcu_segcblist_n_cbs(&rdp->cblist));
	rcu_nocb_unlock_irqrestore(rdp, flags);

	wake_gp = rdp_offload_toggle(rdp, false, flags);
	wake_gp = rcu_nocb_queue_toggle_rdp(rdp);

	mutex_lock(&rdp_gp->nocb_gp_kthread_mutex);

	if (rdp_gp->nocb_gp_kthread) {
		if (wake_gp)
			wake_up_process(rdp_gp->nocb_gp_kthread);

		swait_event_exclusive(rdp->nocb_state_wq,
				      !rcu_segcblist_test_flags(cblist,
								SEGCBLIST_KTHREAD_GP));
		if (rdp->nocb_cb_kthread)
			kthread_park(rdp->nocb_cb_kthread);
				      rcu_nocb_rdp_deoffload_wait_cond(rdp));
	} else {
		/*
		 * No kthread to clear the flags for us or remove the rdp from the nocb list
		 * to iterate. Do it here instead. Locking doesn't look stricly necessary
		 * but we stick to paranoia in this rare path.
		 */
		rcu_nocb_lock_irqsave(rdp, flags);
		rcu_segcblist_clear_flags(&rdp->cblist, SEGCBLIST_KTHREAD_GP);
		rcu_nocb_unlock_irqrestore(rdp, flags);
		raw_spin_lock_irqsave(&rdp->nocb_lock, flags);
		rcu_segcblist_clear_flags(&rdp->cblist, SEGCBLIST_OFFLOADED);
		raw_spin_unlock_irqrestore(&rdp->nocb_lock, flags);

		list_del(&rdp->nocb_entry_rdp);
	}
	mutex_unlock(&rdp_gp->nocb_gp_kthread_mutex);

	/*
	 * Lock one last time to acquire latest callback updates from kthreads
	 * so we can later handle callbacks locally without locking.
	 */
	rcu_nocb_lock_irqsave(rdp, flags);
	/*
	 * Theoretically we could clear SEGCBLIST_LOCKING after the nocb
	 * lock is released but how about being paranoid for once?
	 */
	rcu_segcblist_clear_flags(cblist, SEGCBLIST_LOCKING);
	/*
	 * Without SEGCBLIST_LOCKING, we can't use
	 * rcu_nocb_unlock_irqrestore() anymore.
	 */
	raw_spin_unlock_irqrestore(&rdp->nocb_lock, flags);
	mutex_unlock(&rdp_gp->nocb_gp_kthread_mutex);

	return 0;
}
@@ -1129,10 +1120,20 @@ int rcu_nocb_cpu_deoffload(int cpu)
}
EXPORT_SYMBOL_GPL(rcu_nocb_cpu_deoffload);

static int rcu_nocb_rdp_offload(struct rcu_data *rdp)
static bool rcu_nocb_rdp_offload_wait_cond(struct rcu_data *rdp)
{
	struct rcu_segcblist *cblist = &rdp->cblist;
	unsigned long flags;
	bool ret;

	raw_spin_lock_irqsave(&rdp->nocb_lock, flags);
	ret = rcu_segcblist_test_flags(&rdp->cblist, SEGCBLIST_OFFLOADED);
	raw_spin_unlock_irqrestore(&rdp->nocb_lock, flags);

	return ret;
}

static int rcu_nocb_rdp_offload(struct rcu_data *rdp)
{
	int wake_gp;
	struct rcu_data *rdp_gp = rdp->nocb_gp_rdp;

@@ -1152,20 +1153,14 @@ static int rcu_nocb_rdp_offload(struct rcu_data *rdp)
	WARN_ON_ONCE(rcu_cblist_n_cbs(&rdp->nocb_bypass));
	WARN_ON_ONCE(rcu_segcblist_n_cbs(&rdp->cblist));

	/*
	 * Can't use rcu_nocb_lock_irqsave() before SEGCBLIST_LOCKING
	 * is set.
	 */
	raw_spin_lock_irqsave(&rdp->nocb_lock, flags);

	wake_gp = rdp_offload_toggle(rdp, true, flags);
	wake_gp = rcu_nocb_queue_toggle_rdp(rdp);
	if (wake_gp)
		wake_up_process(rdp_gp->nocb_gp_kthread);

	kthread_unpark(rdp->nocb_cb_kthread);

	swait_event_exclusive(rdp->nocb_state_wq,
			      rcu_segcblist_test_flags(cblist, SEGCBLIST_KTHREAD_GP));
			      rcu_nocb_rdp_offload_wait_cond(rdp));

	kthread_unpark(rdp->nocb_cb_kthread);

	return 0;
}
@@ -1340,8 +1335,7 @@ void __init rcu_init_nohz(void)
		rdp = per_cpu_ptr(&rcu_data, cpu);
		if (rcu_segcblist_empty(&rdp->cblist))
			rcu_segcblist_init(&rdp->cblist);
		rcu_segcblist_offload(&rdp->cblist, true);
		rcu_segcblist_set_flags(&rdp->cblist, SEGCBLIST_KTHREAD_GP);
		rcu_segcblist_set_flags(&rdp->cblist, SEGCBLIST_OFFLOADED);
	}
	rcu_organize_nocb_kthreads();
}