Commit 687a9aa5 authored by Tejun Heo's avatar Tejun Heo
Browse files

workqueue: Make per-cpu pool_workqueues allocated and released like unbound ones



Currently, all per-cpu pwq's (pool_workqueue's) are allocated directly
through a per-cpu allocation and thus, unlike unbound workqueues, not
reference counted. This difference in lifetime management between the two
types is a bit confusing.

Unbound workqueues are currently accessed through wq->numa_pwq_tbl[] which
isn't suitiable for the planned CPU locality related improvements. The plan
is to unify pwq handling across per-cpu and unbound workqueues so that
they're always accessed through wq->cpu_pwq.

In preparation, this patch makes per-cpu pwq's to be allocated, reference
counted and released the same way as unbound pwq's. wq->cpu_pwq now holds
pointers to pwq's instead of containing them directly.

pwq_unbound_release_workfn() is renamed to pwq_release_workfn() as it's now
also used for per-cpu work items.

Signed-off-by: default avatarTejun Heo <tj@kernel.org>
parent 967b494e
Loading
Loading
Loading
Loading
+40 −34
Original line number Diff line number Diff line
@@ -258,11 +258,11 @@ struct pool_workqueue {

	/*
	 * Release of unbound pwq is punted to a kthread_worker. See put_pwq()
	 * and pwq_unbound_release_workfn() for details. pool_workqueue itself
	 * is also RCU protected so that the first pwq can be determined without
	 * and pwq_release_workfn() for details. pool_workqueue itself is also
	 * RCU protected so that the first pwq can be determined without
	 * grabbing wq->mutex.
	 */
	struct kthread_work	unbound_release_work;
	struct kthread_work	release_work;
	struct rcu_head		rcu;
} __aligned(1 << WORK_STRUCT_FLAG_BITS);

@@ -321,7 +321,7 @@ struct workqueue_struct {

	/* hot fields used during command issue, aligned to cacheline */
	unsigned int		flags ____cacheline_aligned; /* WQ: WQ_* flags */
	struct pool_workqueue __percpu *cpu_pwq; /* I: per-cpu pwqs */
	struct pool_workqueue __percpu **cpu_pwq; /* I: per-cpu pwqs */
	struct pool_workqueue __rcu *numa_pwq_tbl[]; /* PWR: unbound pwqs indexed by node */
};

@@ -1370,13 +1370,11 @@ static void put_pwq(struct pool_workqueue *pwq)
	lockdep_assert_held(&pwq->pool->lock);
	if (likely(--pwq->refcnt))
		return;
	if (WARN_ON_ONCE(!(pwq->wq->flags & WQ_UNBOUND)))
		return;
	/*
	 * @pwq can't be released under pool->lock, bounce to a dedicated
	 * kthread_worker to avoid A-A deadlocks.
	 */
	kthread_queue_work(pwq_release_worker, &pwq->unbound_release_work);
	kthread_queue_work(pwq_release_worker, &pwq->release_work);
}

/**
@@ -1685,7 +1683,7 @@ static void __queue_work(int cpu, struct workqueue_struct *wq,
	} else {
		if (req_cpu == WORK_CPU_UNBOUND)
			cpu = raw_smp_processor_id();
		pwq = per_cpu_ptr(wq->cpu_pwq, cpu);
		pwq = *per_cpu_ptr(wq->cpu_pwq, cpu);
	}

	pool = pwq->pool;
@@ -4004,31 +4002,30 @@ static void rcu_free_pwq(struct rcu_head *rcu)
 * Scheduled on pwq_release_worker by put_pwq() when an unbound pwq hits zero
 * refcnt and needs to be destroyed.
 */
static void pwq_unbound_release_workfn(struct kthread_work *work)
static void pwq_release_workfn(struct kthread_work *work)
{
	struct pool_workqueue *pwq = container_of(work, struct pool_workqueue,
						  unbound_release_work);
						  release_work);
	struct workqueue_struct *wq = pwq->wq;
	struct worker_pool *pool = pwq->pool;
	bool is_last = false;

	/*
	 * when @pwq is not linked, it doesn't hold any reference to the
	 * When @pwq is not linked, it doesn't hold any reference to the
	 * @wq, and @wq is invalid to access.
	 */
	if (!list_empty(&pwq->pwqs_node)) {
		if (WARN_ON_ONCE(!(wq->flags & WQ_UNBOUND)))
			return;

		mutex_lock(&wq->mutex);
		list_del_rcu(&pwq->pwqs_node);
		is_last = list_empty(&wq->pwqs);
		mutex_unlock(&wq->mutex);
	}

	if (wq->flags & WQ_UNBOUND) {
		mutex_lock(&wq_pool_mutex);
		put_unbound_pool(pool);
		mutex_unlock(&wq_pool_mutex);
	}

	call_rcu(&pwq->rcu, rcu_free_pwq);

@@ -4112,8 +4109,7 @@ static void init_pwq(struct pool_workqueue *pwq, struct workqueue_struct *wq,
	INIT_LIST_HEAD(&pwq->inactive_works);
	INIT_LIST_HEAD(&pwq->pwqs_node);
	INIT_LIST_HEAD(&pwq->mayday_node);
	kthread_init_work(&pwq->unbound_release_work,
			  pwq_unbound_release_workfn);
	kthread_init_work(&pwq->release_work, pwq_release_workfn);
}

/* sync @pwq with the current state of its associated wq and link it */
@@ -4514,20 +4510,25 @@ static int alloc_and_link_pwqs(struct workqueue_struct *wq)
	int cpu, ret;

	if (!(wq->flags & WQ_UNBOUND)) {
		wq->cpu_pwq = alloc_percpu(struct pool_workqueue);
		wq->cpu_pwq = alloc_percpu(struct pool_workqueue *);
		if (!wq->cpu_pwq)
			return -ENOMEM;
			goto enomem;

		for_each_possible_cpu(cpu) {
			struct pool_workqueue *pwq =
			struct pool_workqueue **pwq_p =
				per_cpu_ptr(wq->cpu_pwq, cpu);
			struct worker_pool *cpu_pools =
				per_cpu(cpu_worker_pools, cpu);
			struct worker_pool *pool =
				&(per_cpu_ptr(cpu_worker_pools, cpu)[highpri]);

			*pwq_p = kmem_cache_alloc_node(pwq_cache, GFP_KERNEL,
						       pool->node);
			if (!*pwq_p)
				goto enomem;

			init_pwq(pwq, wq, &cpu_pools[highpri]);
			init_pwq(*pwq_p, wq, pool);

			mutex_lock(&wq->mutex);
			link_pwq(pwq);
			link_pwq(*pwq_p);
			mutex_unlock(&wq->mutex);
		}
		return 0;
@@ -4546,6 +4547,15 @@ static int alloc_and_link_pwqs(struct workqueue_struct *wq)
	cpus_read_unlock();

	return ret;

enomem:
	if (wq->cpu_pwq) {
		for_each_possible_cpu(cpu)
			kfree(*per_cpu_ptr(wq->cpu_pwq, cpu));
		free_percpu(wq->cpu_pwq);
		wq->cpu_pwq = NULL;
	}
	return -ENOMEM;
}

static int wq_clamp_max_active(int max_active, unsigned int flags,
@@ -4719,7 +4729,7 @@ static bool pwq_busy(struct pool_workqueue *pwq)
void destroy_workqueue(struct workqueue_struct *wq)
{
	struct pool_workqueue *pwq;
	int node;
	int cpu, node;

	/*
	 * Remove it from sysfs first so that sanity check failure doesn't
@@ -4779,12 +4789,8 @@ void destroy_workqueue(struct workqueue_struct *wq)
	mutex_unlock(&wq_pool_mutex);

	if (!(wq->flags & WQ_UNBOUND)) {
		wq_unregister_lockdep(wq);
		/*
		 * The base ref is never dropped on per-cpu pwqs.  Directly
		 * schedule RCU free.
		 */
		call_rcu(&wq->rcu, rcu_free_wq);
		for_each_possible_cpu(cpu)
			put_pwq_unlocked(*per_cpu_ptr(wq->cpu_pwq, cpu));
	} else {
		/*
		 * We're the sole accessor of @wq at this point.  Directly
@@ -4901,7 +4907,7 @@ bool workqueue_congested(int cpu, struct workqueue_struct *wq)
		cpu = smp_processor_id();

	if (!(wq->flags & WQ_UNBOUND))
		pwq = per_cpu_ptr(wq->cpu_pwq, cpu);
		pwq = *per_cpu_ptr(wq->cpu_pwq, cpu);
	else
		pwq = unbound_pwq_by_node(wq, cpu_to_node(cpu));