Commit e9131359 authored by Alexander Aring's avatar Alexander Aring Committed by David Teigland
Browse files

dlm: use rwlock for rsb hash table



The conversion to rhashtable introduced a hash table lock per lockspace,
in place of per bucket locks.  To make this more scalable, switch to
using a rwlock for hash table access.  The common case fast path uses
it as a read lock.

Signed-off-by: default avatarAlexander Aring <aahringo@redhat.com>
Signed-off-by: default avatarDavid Teigland <teigland@redhat.com>
parent b1f2381c
Loading
Loading
Loading
Loading
+2 −2
Original line number Diff line number Diff line
@@ -413,7 +413,7 @@ static void *table_seq_start(struct seq_file *seq, loff_t *pos)
	else
		list = &ls->ls_keep;

	spin_lock_bh(&ls->ls_rsbtbl_lock);
	read_lock_bh(&ls->ls_rsbtbl_lock);
	return seq_list_start(list, *pos);
}

@@ -434,7 +434,7 @@ static void table_seq_stop(struct seq_file *seq, void *iter_ptr)
{
	struct dlm_ls *ls = seq->private;

	spin_unlock_bh(&ls->ls_rsbtbl_lock);
	read_unlock_bh(&ls->ls_rsbtbl_lock);
}

static const struct seq_operations format1_seq_ops = {
+2 −2
Original line number Diff line number Diff line
@@ -200,9 +200,9 @@ static struct dlm_rsb *find_rsb_root(struct dlm_ls *ls, const char *name,
	struct dlm_rsb *r;
	int rv;

	spin_lock_bh(&ls->ls_rsbtbl_lock);
	read_lock_bh(&ls->ls_rsbtbl_lock);
	rv = dlm_search_rsb_tree(&ls->ls_rsbtbl, name, len, &r);
	spin_unlock_bh(&ls->ls_rsbtbl_lock);
	read_unlock_bh(&ls->ls_rsbtbl_lock);
	if (!rv)
		return r;

+1 −1
Original line number Diff line number Diff line
@@ -585,7 +585,7 @@ struct dlm_ls {
	spinlock_t		ls_lkbidr_spin;

	struct rhashtable	ls_rsbtbl;
	spinlock_t		ls_rsbtbl_lock;
	rwlock_t		ls_rsbtbl_lock;

	struct list_head	ls_toss;
	struct list_head	ls_keep;
+194 −75
Original line number Diff line number Diff line
@@ -342,15 +342,15 @@ void dlm_hold_rsb(struct dlm_rsb *r)

/* TODO move this to lib/refcount.c */
static __must_check bool
dlm_refcount_dec_and_lock_bh(refcount_t *r, spinlock_t *lock)
dlm_refcount_dec_and_write_lock_bh(refcount_t *r, rwlock_t *lock)
__cond_acquires(lock)
{
	if (refcount_dec_not_one(r))
		return false;

	spin_lock_bh(lock);
	write_lock_bh(lock);
	if (!refcount_dec_and_test(r)) {
		spin_unlock_bh(lock);
		write_unlock_bh(lock);
		return false;
	}

@@ -358,11 +358,11 @@ __cond_acquires(lock)
}

/* TODO move this to include/linux/kref.h */
static inline int dlm_kref_put_lock_bh(struct kref *kref,
static inline int dlm_kref_put_write_lock_bh(struct kref *kref,
					     void (*release)(struct kref *kref),
				       spinlock_t *lock)
					     rwlock_t *lock)
{
	if (dlm_refcount_dec_and_lock_bh(&kref->refcount, lock)) {
	if (dlm_refcount_dec_and_write_lock_bh(&kref->refcount, lock)) {
		release(kref);
		return 1;
	}
@@ -378,10 +378,10 @@ static void put_rsb(struct dlm_rsb *r)
	struct dlm_ls *ls = r->res_ls;
	int rv;

	rv = dlm_kref_put_lock_bh(&r->res_ref, toss_rsb,
	rv = dlm_kref_put_write_lock_bh(&r->res_ref, toss_rsb,
					&ls->ls_rsbtbl_lock);
	if (rv)
		spin_unlock_bh(&ls->ls_rsbtbl_lock);
		write_unlock_bh(&ls->ls_rsbtbl_lock);
}

void dlm_put_rsb(struct dlm_rsb *r)
@@ -603,7 +603,7 @@ void dlm_rsb_toss_timer(struct timer_list *timer)
		 * a caching handling and the other holders might to put
		 * this rsb out of the toss state.
		 */
		rv = spin_trylock(&ls->ls_rsbtbl_lock);
		rv = write_trylock(&ls->ls_rsbtbl_lock);
		if (!rv) {
			spin_unlock(&ls->ls_toss_q_lock);
			/* rearm again try timer */
@@ -618,7 +618,7 @@ void dlm_rsb_toss_timer(struct timer_list *timer)
		/* not necessary to held the ls_rsbtbl_lock when
		 * calling send_remove()
		 */
		spin_unlock(&ls->ls_rsbtbl_lock);
		write_unlock(&ls->ls_rsbtbl_lock);

		/* remove the rsb out of the toss queue its gone
		 * drom DLM now
@@ -702,16 +702,8 @@ int dlm_search_rsb_tree(struct rhashtable *rhash, const void *name, int len,

static int rsb_insert(struct dlm_rsb *rsb, struct rhashtable *rhash)
{
	int rv;

	rv = rhashtable_insert_fast(rhash, &rsb->res_node,
	return rhashtable_insert_fast(rhash, &rsb->res_node,
				      dlm_rhash_rsb_params);
	if (rv == -EEXIST) {
		log_print("%s match", __func__);
		dlm_dump_rsb(rsb);
	}

	return rv;
}

/*
@@ -806,24 +798,47 @@ static int find_rsb_dir(struct dlm_ls *ls, const void *name, int len,
			goto out;
	}

	spin_lock_bh(&ls->ls_rsbtbl_lock);
 retry_lookup:

	/* check if the rsb is in keep state under read lock - likely path */
	read_lock_bh(&ls->ls_rsbtbl_lock);
	error = dlm_search_rsb_tree(&ls->ls_rsbtbl, name, len, &r);
	if (error)
	if (error) {
		read_unlock_bh(&ls->ls_rsbtbl_lock);
		goto do_new;
	}
	
	/*
	 * rsb is active, so we can't check master_nodeid without lock_rsb.
	 */

	if (rsb_flag(r, RSB_TOSS))
	if (rsb_flag(r, RSB_TOSS)) {
		read_unlock_bh(&ls->ls_rsbtbl_lock);
		goto do_toss;
	}

	kref_get(&r->res_ref);
	goto out_unlock;
	read_unlock_bh(&ls->ls_rsbtbl_lock);
	goto out;


 do_toss:
	write_lock_bh(&ls->ls_rsbtbl_lock);

	/* retry lookup under write lock to see if its still in toss state
	 * if not it's in keep state and we relookup - unlikely path.
	 */
	error = dlm_search_rsb_tree(&ls->ls_rsbtbl, name, len, &r);
	if (!error) {
		if (!rsb_flag(r, RSB_TOSS)) {
			write_unlock_bh(&ls->ls_rsbtbl_lock);
			goto retry_lookup;
		}
	} else {
		write_unlock_bh(&ls->ls_rsbtbl_lock);
		goto do_new;
	}

	/*
	 * rsb found inactive (master_nodeid may be out of date unless
	 * we are the dir_nodeid or were the master)  No other thread
@@ -837,8 +852,9 @@ static int find_rsb_dir(struct dlm_ls *ls, const void *name, int len,
		log_debug(ls, "find_rsb toss from_other %d master %d dir %d %s",
			  from_nodeid, r->res_master_nodeid, dir_nodeid,
			  r->res_name);
		write_unlock_bh(&ls->ls_rsbtbl_lock);
		error = -ENOTBLK;
		goto out_unlock;
		goto out;
	}

	if ((r->res_master_nodeid != our_nodeid) && from_dir) {
@@ -868,9 +884,9 @@ static int find_rsb_dir(struct dlm_ls *ls, const void *name, int len,
	 */
	kref_init(&r->res_ref);
	rsb_delete_toss_timer(ls, r);
	spin_unlock_bh(&ls->ls_rsbtbl_lock);
	write_unlock_bh(&ls->ls_rsbtbl_lock);

	goto out_unlock;
	goto out;


 do_new:
@@ -879,15 +895,13 @@ static int find_rsb_dir(struct dlm_ls *ls, const void *name, int len,
	 */

	if (error == -EBADR && !create)
		goto out_unlock;
		goto out;

	error = get_rsb_struct(ls, name, len, &r);
	if (error == -EAGAIN) {
		spin_unlock_bh(&ls->ls_rsbtbl_lock);
	if (error == -EAGAIN)
		goto retry;
	}
	if (error)
		goto out_unlock;
		goto out;

	r->res_hash = hash;
	r->res_dir_nodeid = dir_nodeid;
@@ -909,7 +923,7 @@ static int find_rsb_dir(struct dlm_ls *ls, const void *name, int len,
		dlm_free_rsb(r);
		r = NULL;
		error = -ENOTBLK;
		goto out_unlock;
		goto out;
	}

	if (from_other) {
@@ -929,11 +943,20 @@ static int find_rsb_dir(struct dlm_ls *ls, const void *name, int len,
	}

 out_add:

	write_lock_bh(&ls->ls_rsbtbl_lock);
	error = rsb_insert(r, &ls->ls_rsbtbl);
	if (!error)
	if (error == -EEXIST) {
		/* somebody else was faster and it seems the
		 * rsb exists now, we do a whole relookup
		 */
		write_unlock_bh(&ls->ls_rsbtbl_lock);
		dlm_free_rsb(r);
		goto retry_lookup;
	} else if (!error) {
		list_add(&r->res_rsbs_list, &ls->ls_keep);
 out_unlock:
	spin_unlock_bh(&ls->ls_rsbtbl_lock);
	}
	write_unlock_bh(&ls->ls_rsbtbl_lock);
 out:
	*r_ret = r;
	return error;
@@ -957,24 +980,49 @@ static int find_rsb_nodir(struct dlm_ls *ls, const void *name, int len,
	if (error < 0)
		goto out;

	spin_lock_bh(&ls->ls_rsbtbl_lock);
 retry_lookup:

	/* check if the rsb is in keep state under read lock - likely path */
	read_lock_bh(&ls->ls_rsbtbl_lock);
	error = dlm_search_rsb_tree(&ls->ls_rsbtbl, name, len, &r);
	if (error)
	if (error) {
		read_unlock_bh(&ls->ls_rsbtbl_lock);
		goto do_new;
	}

	if (rsb_flag(r, RSB_TOSS))
	if (rsb_flag(r, RSB_TOSS)) {
		read_unlock_bh(&ls->ls_rsbtbl_lock);
		goto do_toss;
	}

	/*
	 * rsb is active, so we can't check master_nodeid without lock_rsb.
	 */

	kref_get(&r->res_ref);
	goto out_unlock;
	read_unlock_bh(&ls->ls_rsbtbl_lock);

	goto out;


 do_toss:
	write_lock_bh(&ls->ls_rsbtbl_lock);

	/* retry lookup under write lock to see if its still in toss state
	 * if not it's in keep state and we relookup - unlikely path.
	 */
	error = dlm_search_rsb_tree(&ls->ls_rsbtbl, name, len, &r);
	if (!error) {
		if (!rsb_flag(r, RSB_TOSS)) {
			write_unlock_bh(&ls->ls_rsbtbl_lock);
			goto retry_lookup;
		}
	} else {
		write_unlock_bh(&ls->ls_rsbtbl_lock);
		goto do_new;
	}


	/*
	 * rsb found inactive. No other thread is using this rsb because
	 * it's on the toss list, so we can look at or update
@@ -987,8 +1035,9 @@ static int find_rsb_nodir(struct dlm_ls *ls, const void *name, int len,
		log_error(ls, "find_rsb toss from_nodeid %d master %d dir %d",
			  from_nodeid, r->res_master_nodeid, dir_nodeid);
		dlm_print_rsb(r);
		write_unlock_bh(&ls->ls_rsbtbl_lock);
		error = -ENOTBLK;
		goto out_unlock;
		goto out;
	}

	if (!recover && (r->res_master_nodeid != our_nodeid) &&
@@ -1010,9 +1059,9 @@ static int find_rsb_nodir(struct dlm_ls *ls, const void *name, int len,
	 */
	kref_init(&r->res_ref);
	rsb_delete_toss_timer(ls, r);
	spin_unlock_bh(&ls->ls_rsbtbl_lock);
	write_unlock_bh(&ls->ls_rsbtbl_lock);

	goto out_unlock;
	goto out;


 do_new:
@@ -1022,11 +1071,10 @@ static int find_rsb_nodir(struct dlm_ls *ls, const void *name, int len,

	error = get_rsb_struct(ls, name, len, &r);
	if (error == -EAGAIN) {
		spin_unlock_bh(&ls->ls_rsbtbl_lock);
		goto retry;
	}
	if (error)
		goto out_unlock;
		goto out;

	r->res_hash = hash;
	r->res_dir_nodeid = dir_nodeid;
@@ -1034,11 +1082,20 @@ static int find_rsb_nodir(struct dlm_ls *ls, const void *name, int len,
	r->res_nodeid = (dir_nodeid == our_nodeid) ? 0 : dir_nodeid;
	kref_init(&r->res_ref);

	write_lock_bh(&ls->ls_rsbtbl_lock);
	error = rsb_insert(r, &ls->ls_rsbtbl);
	if (!error)
	if (error == -EEXIST) {
		/* somebody else was faster and it seems the
		 * rsb exists now, we do a whole relookup
		 */
		write_unlock_bh(&ls->ls_rsbtbl_lock);
		dlm_free_rsb(r);
		goto retry_lookup;
	} else if (!error) {
		list_add(&r->res_rsbs_list, &ls->ls_keep);
 out_unlock:
	spin_unlock_bh(&ls->ls_rsbtbl_lock);
	}
	write_unlock_bh(&ls->ls_rsbtbl_lock);

 out:
	*r_ret = r;
	return error;
@@ -1251,18 +1308,23 @@ int dlm_master_lookup(struct dlm_ls *ls, int from_nodeid, const char *name,
	if (error < 0)
		return error;

	spin_lock_bh(&ls->ls_rsbtbl_lock);
 retry_lookup:

	/* check if the rsb is in keep state under read lock - likely path */
	read_lock_bh(&ls->ls_rsbtbl_lock);
	error = dlm_search_rsb_tree(&ls->ls_rsbtbl, name, len, &r);
	if (!error) {
		if (rsb_flag(r, RSB_TOSS))
		if (rsb_flag(r, RSB_TOSS)) {
			read_unlock_bh(&ls->ls_rsbtbl_lock);
			goto do_toss;
		}

		/* because the rsb is active, we need to lock_rsb before
		 * checking/changing re_master_nodeid
		 */

		hold_rsb(r);
		spin_unlock_bh(&ls->ls_rsbtbl_lock);
		read_unlock_bh(&ls->ls_rsbtbl_lock);
		lock_rsb(r);

		__dlm_master_lookup(ls, r, our_nodeid, from_nodeid, false,
@@ -1274,10 +1336,31 @@ int dlm_master_lookup(struct dlm_ls *ls, int from_nodeid, const char *name,

		return 0;
	} else {
		read_unlock_bh(&ls->ls_rsbtbl_lock);
		goto not_found;
	}

 do_toss:
	/* unlikely path - relookup under write */
	write_lock_bh(&ls->ls_rsbtbl_lock);

	/* rsb_mod_timer() requires to held ls_rsbtbl_lock in write lock
	 * check if the rsb is still in toss state, if not relookup
	 */
	error = dlm_search_rsb_tree(&ls->ls_rsbtbl, name, len, &r);
	if (!error) {
		if (!rsb_flag(r, RSB_TOSS)) {
			write_unlock_bh(&ls->ls_rsbtbl_lock);
			/* something as changed, very unlikely but
			 * try again
			 */
			goto retry_lookup;
		}
	} else {
		write_unlock_bh(&ls->ls_rsbtbl_lock);
		goto not_found;
	}

	/* because the rsb is inactive (on toss list), it's not refcounted
	 * and lock_rsb is not used, but is protected by the rsbtbl lock
	 */
@@ -1287,18 +1370,16 @@ int dlm_master_lookup(struct dlm_ls *ls, int from_nodeid, const char *name,

	rsb_mod_timer(ls, r);
	/* the rsb was inactive (on toss list) */
	spin_unlock_bh(&ls->ls_rsbtbl_lock);
	write_unlock_bh(&ls->ls_rsbtbl_lock);

	return 0;

 not_found:
	error = get_rsb_struct(ls, name, len, &r);
	if (error == -EAGAIN) {
		spin_unlock_bh(&ls->ls_rsbtbl_lock);
	if (error == -EAGAIN)
		goto retry;
	}
	if (error)
		goto out_unlock;
		goto out;

	r->res_hash = hash;
	r->res_dir_nodeid = our_nodeid;
@@ -1307,22 +1388,30 @@ int dlm_master_lookup(struct dlm_ls *ls, int from_nodeid, const char *name,
	kref_init(&r->res_ref);
	rsb_set_flag(r, RSB_TOSS);

	write_lock_bh(&ls->ls_rsbtbl_lock);
	error = rsb_insert(r, &ls->ls_rsbtbl);
	if (error) {
	if (error == -EEXIST) {
		/* somebody else was faster and it seems the
		 * rsb exists now, we do a whole relookup
		 */
		write_unlock_bh(&ls->ls_rsbtbl_lock);
		dlm_free_rsb(r);
		goto retry_lookup;
	} else if (error) {
		write_unlock_bh(&ls->ls_rsbtbl_lock);
		/* should never happen */
		dlm_free_rsb(r);
		spin_unlock_bh(&ls->ls_rsbtbl_lock);
		goto retry;
	}

	list_add(&r->res_rsbs_list, &ls->ls_toss);
	rsb_mod_timer(ls, r);
	write_unlock_bh(&ls->ls_rsbtbl_lock);

	if (result)
		*result = DLM_LU_ADD;
	*r_nodeid = from_nodeid;
 out_unlock:
	spin_unlock_bh(&ls->ls_rsbtbl_lock);
 out:
	return error;
}

@@ -1330,12 +1419,12 @@ static void dlm_dump_rsb_hash(struct dlm_ls *ls, uint32_t hash)
{
	struct dlm_rsb *r;

	spin_lock_bh(&ls->ls_rsbtbl_lock);
	read_lock_bh(&ls->ls_rsbtbl_lock);
	list_for_each_entry(r, &ls->ls_keep, res_rsbs_list) {
		if (r->res_hash == hash)
			dlm_dump_rsb(r);
	}
	spin_unlock_bh(&ls->ls_rsbtbl_lock);
	read_unlock_bh(&ls->ls_rsbtbl_lock);
}

void dlm_dump_rsb_name(struct dlm_ls *ls, const char *name, int len)
@@ -1343,14 +1432,14 @@ void dlm_dump_rsb_name(struct dlm_ls *ls, const char *name, int len)
	struct dlm_rsb *r = NULL;
	int error;

	spin_lock_bh(&ls->ls_rsbtbl_lock);
	read_lock_bh(&ls->ls_rsbtbl_lock);
	error = dlm_search_rsb_tree(&ls->ls_rsbtbl, name, len, &r);
	if (!error)
		goto out;

	dlm_dump_rsb(r);
 out:
	spin_unlock_bh(&ls->ls_rsbtbl_lock);
	read_unlock_bh(&ls->ls_rsbtbl_lock);
}

static void toss_rsb(struct kref *kref)
@@ -1478,6 +1567,36 @@ static void kill_lkb(struct kref *kref)
	DLM_ASSERT(!lkb->lkb_status, dlm_print_lkb(lkb););
}

/* TODO move this to lib/refcount.c */
static __must_check bool
dlm_refcount_dec_and_lock_bh(refcount_t *r, spinlock_t *lock)
__cond_acquires(lock)
{
	if (refcount_dec_not_one(r))
		return false;

	spin_lock_bh(lock);
	if (!refcount_dec_and_test(r)) {
		spin_unlock_bh(lock);
		return false;
	}

	return true;
}

/* TODO move this to include/linux/kref.h */
static inline int dlm_kref_put_lock_bh(struct kref *kref,
				       void (*release)(struct kref *kref),
				       spinlock_t *lock)
{
	if (dlm_refcount_dec_and_lock_bh(&kref->refcount, lock)) {
		release(kref);
		return 1;
	}

	return 0;
}

/* __put_lkb() is used when an lkb may not have an rsb attached to
   it so we need to provide the lockspace explicitly */

@@ -4247,14 +4366,14 @@ static void receive_remove(struct dlm_ls *ls, const struct dlm_message *ms)
	memset(name, 0, sizeof(name));
	memcpy(name, ms->m_extra, len);

	spin_lock_bh(&ls->ls_rsbtbl_lock);
	write_lock_bh(&ls->ls_rsbtbl_lock);

	rv = dlm_search_rsb_tree(&ls->ls_rsbtbl, name, len, &r);
	if (rv) {
		/* should not happen */
		log_error(ls, "%s from %d not found %s", __func__,
			  from_nodeid, name);
		spin_unlock_bh(&ls->ls_rsbtbl_lock);
		write_unlock_bh(&ls->ls_rsbtbl_lock);
		return;
	}

@@ -4264,14 +4383,14 @@ static void receive_remove(struct dlm_ls *ls, const struct dlm_message *ms)
			log_error(ls, "receive_remove keep from %d master %d",
				  from_nodeid, r->res_master_nodeid);
			dlm_print_rsb(r);
			spin_unlock_bh(&ls->ls_rsbtbl_lock);
			write_unlock_bh(&ls->ls_rsbtbl_lock);
			return;
		}

		log_debug(ls, "receive_remove from %d master %d first %x %s",
			  from_nodeid, r->res_master_nodeid, r->res_first_lkid,
			  name);
		spin_unlock_bh(&ls->ls_rsbtbl_lock);
		write_unlock_bh(&ls->ls_rsbtbl_lock);
		return;
	}

@@ -4279,14 +4398,14 @@ static void receive_remove(struct dlm_ls *ls, const struct dlm_message *ms)
		log_error(ls, "receive_remove toss from %d master %d",
			  from_nodeid, r->res_master_nodeid);
		dlm_print_rsb(r);
		spin_unlock_bh(&ls->ls_rsbtbl_lock);
		write_unlock_bh(&ls->ls_rsbtbl_lock);
		return;
	}

	list_del(&r->res_rsbs_list);
	rhashtable_remove_fast(&ls->ls_rsbtbl, &r->res_node,
			       dlm_rhash_rsb_params);
	spin_unlock_bh(&ls->ls_rsbtbl_lock);
	write_unlock_bh(&ls->ls_rsbtbl_lock);

	free_toss_rsb(r);
}
@@ -5354,7 +5473,7 @@ static struct dlm_rsb *find_grant_rsb(struct dlm_ls *ls)
{
	struct dlm_rsb *r;

	spin_lock_bh(&ls->ls_rsbtbl_lock);
	read_lock_bh(&ls->ls_rsbtbl_lock);
	list_for_each_entry(r, &ls->ls_keep, res_rsbs_list) {
		if (!rsb_flag(r, RSB_RECOVER_GRANT))
			continue;
@@ -5363,10 +5482,10 @@ static struct dlm_rsb *find_grant_rsb(struct dlm_ls *ls)
			continue;
		}
		hold_rsb(r);
		spin_unlock_bh(&ls->ls_rsbtbl_lock);
		read_unlock_bh(&ls->ls_rsbtbl_lock);
		return r;
	}
	spin_unlock_bh(&ls->ls_rsbtbl_lock);
	read_unlock_bh(&ls->ls_rsbtbl_lock);
	return NULL;
}

+1 −1
Original line number Diff line number Diff line
@@ -424,7 +424,7 @@ static int new_lockspace(const char *name, const char *cluster,

	INIT_LIST_HEAD(&ls->ls_toss);
	INIT_LIST_HEAD(&ls->ls_keep);
	spin_lock_init(&ls->ls_rsbtbl_lock);
	rwlock_init(&ls->ls_rsbtbl_lock);

	error = rhashtable_init(&ls->ls_rsbtbl, &dlm_rhash_rsb_params);
	if (error)
Loading