Commit ad7c7f4b authored by Sebastian Andrzej Siewior's avatar Sebastian Andrzej Siewior Committed by Tejun Heo
Browse files

workqueue: Provide a handshake for canceling BH workers



While a BH work item is canceled, the core code spins until it
determines that the item completed. On PREEMPT_RT the spinning relies on
a lock in local_bh_disable() to avoid a live lock if the canceling
thread has higher priority than the BH-worker and preempts it. This lock
ensures that the BH-worker makes progress by PI-boosting it.

This lock in local_bh_disable() is a central per-CPU BKL and about to be
removed.

To provide the required synchronisation add a per pool lock. The lock is
acquired by the bh_worker at the begin while the individual callbacks
are invoked. To enforce progress in case of interruption, __flush_work()
needs to acquire the lock.
This will flush all BH-work items assigned to that pool.

Signed-off-by: default avatarSebastian Andrzej Siewior <bigeasy@linutronix.de>
Signed-off-by: default avatarTejun Heo <tj@kernel.org>
parent cda2b2d6
Loading
Loading
Loading
Loading
+41 −9
Original line number Diff line number Diff line
@@ -222,7 +222,9 @@ struct worker_pool {
	struct workqueue_attrs	*attrs;		/* I: worker attributes */
	struct hlist_node	hash_node;	/* PL: unbound_pool_hash node */
	int			refcnt;		/* PL: refcnt for unbound pools */

#ifdef CONFIG_PREEMPT_RT
	spinlock_t		cb_lock;	/* BH worker cancel lock */
#endif
	/*
	 * Destruction of pool is RCU protected to allow dereferences
	 * from get_work_pool().
@@ -3078,6 +3080,31 @@ __acquires(&pool->lock)
		goto restart;
}

#ifdef CONFIG_PREEMPT_RT
static void worker_lock_callback(struct worker_pool *pool)
{
	spin_lock(&pool->cb_lock);
}

static void worker_unlock_callback(struct worker_pool *pool)
{
	spin_unlock(&pool->cb_lock);
}

static void workqueue_callback_cancel_wait_running(struct worker_pool *pool)
{
	spin_lock(&pool->cb_lock);
	spin_unlock(&pool->cb_lock);
}

#else

static void worker_lock_callback(struct worker_pool *pool) { }
static void worker_unlock_callback(struct worker_pool *pool) { }
static void workqueue_callback_cancel_wait_running(struct worker_pool *pool) { }

#endif

/**
 * manage_workers - manage worker pool
 * @worker: self
@@ -3557,6 +3584,7 @@ static void bh_worker(struct worker *worker)
	int nr_restarts = BH_WORKER_RESTARTS;
	unsigned long end = jiffies + BH_WORKER_JIFFIES;

	worker_lock_callback(pool);
	raw_spin_lock_irq(&pool->lock);
	worker_leave_idle(worker);

@@ -3585,6 +3613,7 @@ static void bh_worker(struct worker *worker)
	worker_enter_idle(worker);
	kick_pool(pool);
	raw_spin_unlock_irq(&pool->lock);
	worker_unlock_callback(pool);
}

/*
@@ -4222,17 +4251,17 @@ static bool __flush_work(struct work_struct *work, bool from_cancel)
		    (data & WORK_OFFQ_BH)) {
			/*
			 * On RT, prevent a live lock when %current preempted
			 * soft interrupt processing or prevents ksoftirqd from
			 * running by keeping flipping BH. If the BH work item
			 * runs on a different CPU then this has no effect other
			 * than doing the BH disable/enable dance for nothing.
			 * This is copied from
			 * kernel/softirq.c::tasklet_unlock_spin_wait().
			 * soft interrupt processing by blocking on lock which
			 * is owned by the thread invoking the callback.
			 */
			while (!try_wait_for_completion(&barr.done)) {
				if (IS_ENABLED(CONFIG_PREEMPT_RT)) {
					local_bh_disable();
					local_bh_enable();
					struct worker_pool *pool;

					guard(rcu)();
					pool = get_work_pool(work);
					if (pool)
						workqueue_callback_cancel_wait_running(pool);
				} else {
					cpu_relax();
				}
@@ -4782,6 +4811,9 @@ static int init_worker_pool(struct worker_pool *pool)
	ida_init(&pool->worker_ida);
	INIT_HLIST_NODE(&pool->hash_node);
	pool->refcnt = 1;
#ifdef CONFIG_PREEMPT_RT
	spin_lock_init(&pool->cb_lock);
#endif

	/* shouldn't fail above this point */
	pool->attrs = alloc_workqueue_attrs();