Index | Thread | Search

From:
Martin Pieuchot <mpi@grenadille.net>
Subject:
Re: kernel rwlocks vs the scheduler
To:
David Gwynne <david@gwynne.id.au>, tech@openbsd.org
Date:
Mon, 18 Nov 2024 07:28:05 +0100

Download raw body.

Thread
On 14/11/24(Thu) 13:50, Martin Pieuchot wrote:
> [...] 
> I haven't try WITNESS kernel, I'd be glad if someone could report on
> that.

I did more tests including with WITNESS during the WE.  I observed that
on my 24 CPUs machine the contention that was previously on SCHED_LOCK()
shifted to the KERNEL_LOCK() and global UVM mutexes which explains why I
doesn't notice as many improvements as with 80 CPUs.

ok mpi@

> > Index: sys/rwlock.h
> > ===================================================================
> > RCS file: /cvs/src/sys/sys/rwlock.h,v
> > diff -u -p -r1.28 rwlock.h
> > --- sys/rwlock.h	11 Jan 2021 18:49:38 -0000	1.28
> > +++ sys/rwlock.h	24 Oct 2024 11:18:04 -0000
> > @@ -60,6 +60,8 @@ struct proc;
> >  
> >  struct rwlock {
> >  	volatile unsigned long	 rwl_owner;
> > +	volatile unsigned int	 rwl_waiters;
> > +	volatile unsigned int	 rwl_readers;
> >  	const char		*rwl_name;
> >  #ifdef WITNESS
> >  	struct lock_object	 rwl_lock_obj;
> > @@ -91,14 +93,12 @@ struct rwlock {
> >  
> >  #ifdef WITNESS
> >  #define RWLOCK_INITIALIZER(name) \
> > -	{ 0, name, .rwl_lock_obj = RWLOCK_LO_INITIALIZER(name, 0) }
> > +	{ 0, 0, 0, name, .rwl_lock_obj = RWLOCK_LO_INITIALIZER(name, 0) }
> >  #else
> >  #define RWLOCK_INITIALIZER(name) \
> > -	{ 0, name }
> > +	{ 0, 0, 0, name }
> >  #endif
> >  
> > -#define RWLOCK_WAIT		0x01UL
> > -#define RWLOCK_WRWANT		0x02UL
> >  #define RWLOCK_WRLOCK		0x04UL
> >  #define RWLOCK_MASK		0x07UL
> >  
> > Index: kern/kern_rwlock.c
> > ===================================================================
> > RCS file: /cvs/src/sys/kern/kern_rwlock.c,v
> > diff -u -p -r1.50 kern_rwlock.c
> > --- kern/kern_rwlock.c	14 Jul 2023 07:07:08 -0000	1.50
> > +++ kern/kern_rwlock.c	24 Oct 2024 11:18:05 -0000
> > @@ -26,10 +26,12 @@
> >  #include <sys/atomic.h>
> >  #include <sys/witness.h>
> >  
> > -void	rw_do_exit(struct rwlock *, unsigned long);
> > -
> > -/* XXX - temporary measure until proc0 is properly aligned */
> > -#define RW_PROC(p) (((long)p) & ~RWLOCK_MASK)
> > +#ifdef RWDIAG
> > +#include <sys/kernel.h> /* for hz */
> > +#define RW_SLEEP_TMO	10 * hz
> > +#else
> > +#define RW_SLEEP_TMO	0
> > +#endif
> >  
> >  /*
> >   * Other OSes implement more sophisticated mechanism to determine how long the
> > @@ -40,166 +42,131 @@ void	rw_do_exit(struct rwlock *, unsigne
> >  #define RW_SPINS	1000
> >  
> >  #ifdef MULTIPROCESSOR
> > -#define rw_cas(p, o, n)	(atomic_cas_ulong(p, o, n) != o)
> > +#define rw_cas(p, e, n)	atomic_cas_ulong(p, e, n)
> > +#define rw_inc(p)	atomic_inc_int(p)
> > +#define rw_dec(p)	atomic_dec_int(p)
> >  #else
> > -static inline int
> > -rw_cas(volatile unsigned long *p, unsigned long o, unsigned long n)
> > +static inline unsigned long
> > +rw_cas(volatile unsigned long *p, unsigned long e, unsigned long n)
> >  {
> > -	if (*p != o)
> > -		return (1);
> > -	*p = n;
> > +	unsigned long o = *p;
> >  
> > -	return (0);
> > +	if (o == e)
> > +		*p = n;
> > +
> > +	return (o);
> > +}
> > +
> > +static inline void
> > +rw_inc(volatile unsigned int *p)
> > +{
> > +	++(*p);
> > +}
> > +
> > +static inline void
> > +rw_dec(volatile unsigned int *p)
> > +{
> > +	(*p)--;
> >  }
> >  #endif
> >  
> > -/*
> > - * Magic wand for lock operations. Every operation checks if certain
> > - * flags are set and if they aren't, it increments the lock with some
> > - * value (that might need some computing in a few cases). If the operation
> > - * fails, we need to set certain flags while waiting for the lock.
> > - *
> > - * RW_WRITE	The lock must be completely empty. We increment it with
> > - *		RWLOCK_WRLOCK and the proc pointer of the holder.
> > - *		Sets RWLOCK_WAIT|RWLOCK_WRWANT while waiting.
> > - * RW_READ	RWLOCK_WRLOCK|RWLOCK_WRWANT may not be set. We increment
> > - *		with RWLOCK_READ_INCR. RWLOCK_WAIT while waiting.
> > - */
> > -static const struct rwlock_op {
> > -	unsigned long inc;
> > -	unsigned long check;
> > -	unsigned long wait_set;
> > -	long proc_mult;
> > -	int wait_prio;
> > -} rw_ops[] = {
> > -	{	/* RW_WRITE */
> > -		RWLOCK_WRLOCK,
> > -		ULONG_MAX,
> > -		RWLOCK_WAIT | RWLOCK_WRWANT,
> > -		1,
> > -		PLOCK - 4
> > -	},
> > -	{	/* RW_READ */
> > -		RWLOCK_READ_INCR,
> > -		RWLOCK_WRLOCK | RWLOCK_WRWANT,
> > -		RWLOCK_WAIT,
> > -		0,
> > -		PLOCK
> > -	},
> > -	{	/* Sparse Entry. */
> > -		0,
> > -	},
> > -	{	/* RW_DOWNGRADE */
> > -		RWLOCK_READ_INCR - RWLOCK_WRLOCK,
> > -		0,
> > -		0,
> > -		-1,
> > -		PLOCK
> > -	},
> > -};
> > +static int	rw_do_enter_read(struct rwlock *, int);
> > +static void	rw_do_exit_read(struct rwlock *, unsigned long);
> > +static int	rw_do_enter_write(struct rwlock *, int);
> > +static int	rw_downgrade(struct rwlock *, int);
> > +
> > +static void	rw_exited(struct rwlock *);
> > +
> > +static unsigned long
> > +rw_self(void)
> > +{
> > +	unsigned long self = (unsigned long)curproc;
> > +
> > +	CLR(self, RWLOCK_MASK);
> > +	SET(self, RWLOCK_WRLOCK);
> > +
> > +	return (self);
> > +}
> >  
> >  void
> >  rw_enter_read(struct rwlock *rwl)
> >  {
> > -	unsigned long owner = rwl->rwl_owner;
> > -
> > -	if (__predict_false((owner & (RWLOCK_WRLOCK | RWLOCK_WRWANT)) ||
> > -	    rw_cas(&rwl->rwl_owner, owner, owner + RWLOCK_READ_INCR)))
> > -		rw_enter(rwl, RW_READ);
> > -	else {
> > -		membar_enter_after_atomic();
> > -		WITNESS_CHECKORDER(&rwl->rwl_lock_obj, LOP_NEWORDER, NULL);
> > -		WITNESS_LOCK(&rwl->rwl_lock_obj, 0);
> > -	}
> > +	rw_do_enter_read(rwl, 0);
> >  }
> >  
> >  void
> >  rw_enter_write(struct rwlock *rwl)
> >  {
> > -	struct proc *p = curproc;
> > -
> > -	if (__predict_false(rw_cas(&rwl->rwl_owner, 0,
> > -	    RW_PROC(p) | RWLOCK_WRLOCK)))
> > -		rw_enter(rwl, RW_WRITE);
> > -	else {
> > -		membar_enter_after_atomic();
> > -		WITNESS_CHECKORDER(&rwl->rwl_lock_obj,
> > -		    LOP_EXCLUSIVE | LOP_NEWORDER, NULL);
> > -		WITNESS_LOCK(&rwl->rwl_lock_obj, LOP_EXCLUSIVE);
> > -	}
> > +	rw_do_enter_write(rwl, 0);
> >  }
> >  
> >  void
> >  rw_exit_read(struct rwlock *rwl)
> >  {
> > -	unsigned long owner;
> > +	/* maybe we're the last one? */
> > +	rw_do_exit_read(rwl, RWLOCK_READ_INCR);
> > +}
> > +
> > +static void
> > +rw_do_exit_read(struct rwlock *rwl, unsigned long owner)
> > +{
> > +	unsigned long decr;
> > +	unsigned long nowner;
> >  
> > -	rw_assert_rdlock(rwl);
> >  	WITNESS_UNLOCK(&rwl->rwl_lock_obj, 0);
> >  
> > -	membar_exit_before_atomic();
> > -	owner = rwl->rwl_owner;
> > -	if (__predict_false((owner & RWLOCK_WAIT) ||
> > -	    rw_cas(&rwl->rwl_owner, owner, owner - RWLOCK_READ_INCR)))
> > -		rw_do_exit(rwl, 0);
> > +	for (;;) {
> > +		decr = owner - RWLOCK_READ_INCR;
> > +		nowner = rw_cas(&rwl->rwl_owner, owner, decr);
> > +		if (owner == nowner)
> > +			break;
> > +
> > +		if (__predict_false(ISSET(nowner, RWLOCK_WRLOCK))) {
> > +			panic("%s rwlock %p: exit read on write locked lock"
> > +			    " (owner 0x%lx)", rwl->rwl_name, rwl, nowner);
> > +		}
> > +		if (__predict_false(nowner == 0)) {
> > +			panic("%s rwlock %p: exit read on unlocked lock",
> > +			    rwl->rwl_name, rwl);
> > +		}
> > +
> > +		owner = nowner;
> > +	}
> > +
> > +	/* read lock didn't change anything, so no barrier needed? */
> > +
> > +	if (decr == 0) {
> > +		/* last one out */
> > +		rw_exited(rwl);
> > +	}
> >  }
> >  
> >  void
> >  rw_exit_write(struct rwlock *rwl)
> >  {
> > +	unsigned long self = rw_self();
> >  	unsigned long owner;
> >  
> > -	rw_assert_wrlock(rwl);
> >  	WITNESS_UNLOCK(&rwl->rwl_lock_obj, LOP_EXCLUSIVE);
> >  
> >  	membar_exit_before_atomic();
> > -	owner = rwl->rwl_owner;
> > -	if (__predict_false((owner & RWLOCK_WAIT) ||
> > -	    rw_cas(&rwl->rwl_owner, owner, 0)))
> > -		rw_do_exit(rwl, RWLOCK_WRLOCK);
> > -}
> > -
> > -#ifdef DIAGNOSTIC
> > -/*
> > - * Put the diagnostic functions here to keep the main code free
> > - * from ifdef clutter.
> > - */
> > -static void
> > -rw_enter_diag(struct rwlock *rwl, int flags)
> > -{
> > -	switch (flags & RW_OPMASK) {
> > -	case RW_WRITE:
> > -	case RW_READ:
> > -		if (RW_PROC(curproc) == RW_PROC(rwl->rwl_owner))
> > -			panic("rw_enter: %s locking against myself",
> > -			    rwl->rwl_name);
> > -		break;
> > -	case RW_DOWNGRADE:
> > -		/*
> > -		 * If we're downgrading, we must hold the write lock.
> > -		 */
> > -		if ((rwl->rwl_owner & RWLOCK_WRLOCK) == 0)
> > -			panic("rw_enter: %s downgrade of non-write lock",
> > -			    rwl->rwl_name);
> > -		if (RW_PROC(curproc) != RW_PROC(rwl->rwl_owner))
> > -			panic("rw_enter: %s downgrade, not holder",
> > -			    rwl->rwl_name);
> > -		break;
> > -
> > -	default:
> > -		panic("rw_enter: unknown op 0x%x", flags);
> > +	owner = rw_cas(&rwl->rwl_owner, self, 0);
> > +	if (__predict_false(owner != self)) {
> > +		panic("%s rwlock %p: exit write when lock not held "
> > +		    "(owner 0x%lx, self 0x%lx)", rwl->rwl_name, rwl,
> > +		    owner, self);
> >  	}
> > -}
> >  
> > -#else
> > -#define rw_enter_diag(r, f)
> > -#endif
> > +	rw_exited(rwl);
> > +}
> >  
> >  static void
> >  _rw_init_flags_witness(struct rwlock *rwl, const char *name, int lo_flags,
> >      const struct lock_type *type)
> >  {
> >  	rwl->rwl_owner = 0;
> > +	rwl->rwl_waiters = 0;
> > +	rwl->rwl_readers = 0;
> >  	rwl->rwl_name = name;
> >  
> >  #ifdef WITNESS
> > @@ -223,144 +190,285 @@ _rw_init_flags(struct rwlock *rwl, const
> >  int
> >  rw_enter(struct rwlock *rwl, int flags)
> >  {
> > -	const struct rwlock_op *op;
> > -	unsigned long inc, o;
> > -#ifdef MULTIPROCESSOR
> > -	/*
> > -	 * If process holds the kernel lock, then we want to give up on CPU
> > -	 * as soon as possible so other processes waiting for the kernel lock
> > -	 * can progress. Hence no spinning if we hold the kernel lock.
> > -	 */
> > -	unsigned int spin = (_kernel_lock_held()) ? 0 : RW_SPINS;
> > -#endif
> > -	int error, prio;
> > -#ifdef WITNESS
> > -	int lop_flags;
> > +	int op = flags & RW_OPMASK;
> > +	int error;
> > +
> > +	switch (op) {
> > +	case RW_WRITE:
> > +		error = rw_do_enter_write(rwl, flags);
> > +		break;
> > +	case RW_READ:
> > +		error = rw_do_enter_read(rwl, flags);
> > +		break;
> > +	case RW_DOWNGRADE:
> > +		error = rw_downgrade(rwl, flags);
> > +		break;
> > +	default:
> > +		panic("%s rwlock %p: %s unexpected op 0x%x",
> > +		    rwl->rwl_name, rwl, __func__, op);
> > +		/* NOTREACHED */
> > +	}
> >  
> > -	lop_flags = LOP_NEWORDER;
> > -	if (flags & RW_WRITE)
> > -		lop_flags |= LOP_EXCLUSIVE;
> > -	if (flags & RW_DUPOK)
> > +	return (error);
> > +}
> > +
> > +static int
> > +rw_do_enter_write(struct rwlock *rwl, int flags)
> > +{
> > +	unsigned long self = rw_self();
> > +	unsigned long owner;
> > +	int prio;
> > +	int error;
> > +
> > +#ifdef WITNESS
> > +	int lop_flags = LOP_NEWORDER | LOP_EXCLUSIVE;
> > +	if (ISSET(flags, RW_DUPOK))
> >  		lop_flags |= LOP_DUPOK;
> > -	if ((flags & RW_NOSLEEP) == 0 && (flags & RW_DOWNGRADE) == 0)
> > +
> > +	if (!ISSET(flags, RW_NOSLEEP))
> >  		WITNESS_CHECKORDER(&rwl->rwl_lock_obj, lop_flags, NULL);
> >  #endif
> >  
> > -	op = &rw_ops[(flags & RW_OPMASK) - 1];
> > -
> > -	inc = op->inc + RW_PROC(curproc) * op->proc_mult;
> > -retry:
> > -	while (__predict_false(((o = rwl->rwl_owner) & op->check) != 0)) {
> > -		unsigned long set = o | op->wait_set;
> > -		int do_sleep;
> > -
> > -		/* Avoid deadlocks after panic or in DDB */
> > -		if (panicstr || db_active)
> > -			return (0);
> > +	owner = rw_cas(&rwl->rwl_owner, 0, self);
> > +	if (owner == 0) {
> > +		/* wow, we won. so easy */
> > +		goto locked;
> > +	}
> > +	if (__predict_false(owner == self)) {
> > +		panic("%s rwlock %p: enter write deadlock",
> > +		    rwl->rwl_name, rwl);
> > +	}
> >  
> >  #ifdef MULTIPROCESSOR
> > +	/*
> > +	 * If process holds the kernel lock, then we want to give up on CPU
> > +	 * as soon as possible so other processes waiting for the kernel lock
> > +	 * can progress. Hence no spinning if we hold the kernel lock.
> > +	 */
> > +	if (!_kernel_lock_held()) {
> > +		int spins;
> > +
> >  		/*
> >  		 * It makes sense to try to spin just in case the lock
> >  		 * is acquired by writer.
> >  		 */
> > -		if ((o & RWLOCK_WRLOCK) && (spin != 0)) {
> > -			spin--;
> > +
> > +		for (spins = 0; spins < RW_SPINS; spins++) {
> >  			CPU_BUSY_CYCLE();
> > -			continue;
> > +			owner = atomic_load_long(&rwl->rwl_owner);
> > +			if (owner != 0)
> > +				continue;
> > +
> > +			owner = rw_cas(&rwl->rwl_owner, 0, self);
> > +			if (owner == 0) {
> > +				/* ok, we won now. */
> > +				goto locked;
> > +			}
> >  		}
> > +	}
> >  #endif
> >  
> > -		rw_enter_diag(rwl, flags);
> > -
> > -		if (flags & RW_NOSLEEP)
> > -			return (EBUSY);
> > +	if (ISSET(flags, RW_NOSLEEP))
> > +		return (EBUSY);
> >  
> > -		prio = op->wait_prio;
> > -		if (flags & RW_INTR)
> > -			prio |= PCATCH;
> > -		sleep_setup(rwl, prio, rwl->rwl_name);
> > +	prio = PLOCK - 4;
> > +	if (ISSET(flags, RW_INTR))
> > +		prio |= PCATCH;
> >  
> > -		do_sleep = !rw_cas(&rwl->rwl_owner, o, set);
> > -
> > -		error = sleep_finish(0, do_sleep);
> > -		if ((flags & RW_INTR) &&
> > -		    (error != 0))
> > +	rw_inc(&rwl->rwl_waiters);
> > +	membar_producer();
> > +	do {
> > +		sleep_setup(&rwl->rwl_waiters, prio, rwl->rwl_name);
> > +		membar_consumer();
> > +		owner = atomic_load_long(&rwl->rwl_owner);
> > +		error = sleep_finish(RW_SLEEP_TMO, owner != 0);
> > +#ifdef RWDIAG
> > +		if (error == EWOULDBLOCK) {
> > +			printf("%s rwlock %p: %s timeout owner 0x%lx "
> > +			    "(self 0x%lx)", rwl->rwl_name, rwl, __func__,
> > +			    owner, self);
> > +			db_enter();
> > +		}
> > +#endif
> > +		if (ISSET(flags, RW_INTR) && (error != 0)) {
> > +			rw_dec(&rwl->rwl_waiters);
> >  			return (error);
> > -		if (flags & RW_SLEEPFAIL)
> > +		}
> > +		if (ISSET(flags, RW_SLEEPFAIL)) {
> > +			rw_dec(&rwl->rwl_waiters);
> > +			rw_exited(rwl);
> >  			return (EAGAIN);
> > -	}
> > +		}
> > +
> > +		owner = rw_cas(&rwl->rwl_owner, 0, self);
> > +	} while (owner != 0);
> > +	rw_dec(&rwl->rwl_waiters);
> >  
> > -	if (__predict_false(rw_cas(&rwl->rwl_owner, o, o + inc)))
> > -		goto retry;
> > +locked:
> >  	membar_enter_after_atomic();
> > +	WITNESS_LOCK(&rwl->rwl_lock_obj, lop_flags);
> >  
> > -	/*
> > -	 * If old lock had RWLOCK_WAIT and RWLOCK_WRLOCK set, it means we
> > -	 * downgraded a write lock and had possible read waiter, wake them
> > -	 * to let them retry the lock.
> > -	 */
> > -	if (__predict_false((o & (RWLOCK_WRLOCK|RWLOCK_WAIT)) ==
> > -	    (RWLOCK_WRLOCK|RWLOCK_WAIT)))
> > -		wakeup(rwl);
> > +	return (0);
> > +}
> >  
> > -	if (flags & RW_DOWNGRADE)
> > -		WITNESS_DOWNGRADE(&rwl->rwl_lock_obj, lop_flags);
> > -	else
> > -		WITNESS_LOCK(&rwl->rwl_lock_obj, lop_flags);
> > +static int
> > +rw_read_incr(struct rwlock *rwl, unsigned long owner)
> > +{
> > +	unsigned long incr;
> > +	unsigned long nowner;
> > +
> > +	do {
> > +		incr = owner + RWLOCK_READ_INCR;
> > +		nowner = rw_cas(&rwl->rwl_owner, owner, incr);
> > +		if (nowner == owner)
> > +			return (1);
> > +
> > +		owner = nowner;
> > +	} while (!ISSET(owner, RWLOCK_WRLOCK));
> >  
> >  	return (0);
> >  }
> >  
> > -void
> > -rw_exit(struct rwlock *rwl)
> > +static int
> > +rw_do_enter_read(struct rwlock *rwl, int flags)
> >  {
> > -	unsigned long wrlock;
> > +	unsigned long owner;
> > +	int error;
> > +	int prio;
> >  
> > -	/* Avoid deadlocks after panic or in DDB */
> > -	if (panicstr || db_active)
> > -		return;
> > +#ifdef WITNESS
> > +	int lop_flags = LOP_NEWORDER;
> > +	if (ISSET(flags, RW_DUPOK))
> > +		lop_flags |= LOP_DUPOK;
> > +	if (!ISSET(flags, RW_NOSLEEP))
> > +		WITNESS_CHECKORDER(&rwl->rwl_lock_obj, lop_flags, NULL);
> > +#endif
> >  
> > -	wrlock = rwl->rwl_owner & RWLOCK_WRLOCK;
> > -	if (wrlock)
> > -		rw_assert_wrlock(rwl);
> > -	else
> > -		rw_assert_rdlock(rwl);
> > -	WITNESS_UNLOCK(&rwl->rwl_lock_obj, wrlock ? LOP_EXCLUSIVE : 0);
> > +	owner = rw_cas(&rwl->rwl_owner, 0, RWLOCK_READ_INCR);
> > +	if (owner == 0) {
> > +		/* ermagerd, we won! */
> > +		goto locked;
> > +	}
> > +
> > +	if (ISSET(owner, RWLOCK_WRLOCK)) {
> > +		if (__predict_false(owner == rw_self())) {
> > +			panic("%s rwlock %p: enter read deadlock",
> > +			    rwl->rwl_name, rwl);
> > +		}
> > +	} else if (atomic_load_int(&rwl->rwl_waiters) == 0) {
> > +		if (rw_read_incr(rwl, owner)) {
> > +			/* nailed it */
> > +			goto locked;
> > +		}
> > +	}
> > +
> > +	if (ISSET(flags, RW_NOSLEEP))
> > +		return (EBUSY);
> > +
> > +	prio = PLOCK;
> > +	if (ISSET(flags, RW_INTR))
> > +		prio |= PCATCH;
> > +
> > +	rw_inc(&rwl->rwl_readers);
> > +	membar_producer();
> > +	do {
> > +		sleep_setup(&rwl->rwl_readers, prio, rwl->rwl_name);
> > +		membar_consumer();
> > +		error = sleep_finish(RW_SLEEP_TMO,
> > +		    atomic_load_int(&rwl->rwl_waiters) > 0 ||
> > +		    ISSET(atomic_load_long(&rwl->rwl_owner), RWLOCK_WRLOCK));
> > +#ifdef RWDIAG
> > +		if (error == EWOULDBLOCK) {
> > +			printf("%s rwlock %p: %s timeout owner 0x%lx\n",
> > +			    rwl->rwl_name, rwl, __func__, owner);
> > +			db_enter();
> > +		}
> > +#endif
> > +		if (ISSET(flags, RW_INTR) && (error != 0))
> > +			goto fail;
> > +		if (ISSET(flags, RW_SLEEPFAIL)) {
> > +			error = EAGAIN;
> > +			goto fail;
> > +		}
> > +	} while (!rw_read_incr(rwl, 0));
> > +	rw_dec(&rwl->rwl_readers);
> > +
> > +locked:
> > +	membar_enter_after_atomic();
> > +	WITNESS_LOCK(&rwl->rwl_lock_obj, lop_flags);
> > +
> > +	return (0);
> > +fail:
> > +	rw_dec(&rwl->rwl_readers);
> > +	return (error);
> > +}
> > +
> > +static int
> > +rw_downgrade(struct rwlock *rwl, int flags)
> > +{
> > +	unsigned long self = rw_self();
> > +	unsigned long owner;
> >  
> >  	membar_exit_before_atomic();
> > -	rw_do_exit(rwl, wrlock);
> > +	owner = atomic_cas_ulong(&rwl->rwl_owner, self, RWLOCK_READ_INCR);
> > +	if (__predict_false(owner != self)) {
> > +		panic("%s rwlock %p: downgrade when lock not held "
> > +		    "(owner 0x%lx, self 0x%lx)", rwl->rwl_name, rwl,
> > +		    owner, self);
> > +	}
> > +
> > +#ifdef WITNESS
> > +	{
> > +		int lop_flags = LOP_NEWORDER;
> > +		if (ISSET(flags, RW_DUPOK))
> > +			lop_flags |= LOP_DUPOK;
> > +		WITNESS_DOWNGRADE(&rwl->rwl_lock_obj, lop_flags);
> > +	}
> > +#endif
> > +
> > +	membar_consumer();
> > +	if (atomic_load_int(&rwl->rwl_waiters) == 0 &&
> > +	    atomic_load_int(&rwl->rwl_readers) > 0)
> > +		wakeup(&rwl->rwl_readers);
> > +
> > +	return (0);
> >  }
> >  
> > -/* membar_exit_before_atomic() has to precede call of this function. */
> >  void
> > -rw_do_exit(struct rwlock *rwl, unsigned long wrlock)
> > +rw_exit(struct rwlock *rwl)
> >  {
> > -	unsigned long owner, set;
> > +	unsigned long owner;
> >  
> > -	do {
> > -		owner = rwl->rwl_owner;
> > -		if (wrlock)
> > -			set = 0;
> > -		else
> > -			set = (owner - RWLOCK_READ_INCR) &
> > -				~(RWLOCK_WAIT|RWLOCK_WRWANT);
> > -		/*
> > -		 * Potential MP race here.  If the owner had WRWANT set, we
> > -		 * cleared it and a reader can sneak in before a writer.
> > -		 */
> > -	} while (__predict_false(rw_cas(&rwl->rwl_owner, owner, set)));
> > +	owner = atomic_load_long(&rwl->rwl_owner);
> > +	if (__predict_false(owner == 0)) {
> > +		panic("%s rwlock %p: exit on unlocked lock",
> > +		    rwl->rwl_name, rwl);
> > +	}
> > +
> > +	if (ISSET(owner, RWLOCK_WRLOCK))
> > +		rw_exit_write(rwl);
> > +	else
> > +		rw_do_exit_read(rwl, owner);
> > +}
> >  
> > -	if (owner & RWLOCK_WAIT)
> > -		wakeup(rwl);
> > +static void
> > +rw_exited(struct rwlock *rwl)
> > +{
> > +	membar_consumer();
> > +	if (atomic_load_int(&rwl->rwl_waiters) > 0)
> > +		wakeup_one(&rwl->rwl_waiters);
> > +	else if (atomic_load_int(&rwl->rwl_readers) > 0)
> > +		wakeup(&rwl->rwl_readers);
> >  }
> >  
> >  int
> >  rw_status(struct rwlock *rwl)
> >  {
> > -	unsigned long owner = rwl->rwl_owner;
> > +	unsigned long owner;
> >  
> > -	if (owner & RWLOCK_WRLOCK) {
> > -		if (RW_PROC(curproc) == RW_PROC(owner))
> > +	owner = atomic_load_long(&rwl->rwl_owner);
> > +	if (ISSET(owner, RWLOCK_WRLOCK)) {
> > +		if (rw_self() == owner)
> >  			return RW_WRITE;
> >  		else
> >  			return RW_WRITE_OTHER;
> > @@ -380,11 +488,10 @@ rw_assert_wrlock(struct rwlock *rwl)
> >  #ifdef WITNESS
> >  	witness_assert(&rwl->rwl_lock_obj, LA_XLOCKED);
> >  #else
> > -	if (!(rwl->rwl_owner & RWLOCK_WRLOCK))
> > -		panic("%s: lock not held", rwl->rwl_name);
> > -
> > -	if (RW_PROC(curproc) != RW_PROC(rwl->rwl_owner))
> > -		panic("%s: lock not held by this process", rwl->rwl_name);
> > +	if (atomic_load_long(&rwl->rwl_owner) != rw_self()) {
> > +		panic("%s rwlock %p: lock not held by this process",
> > +		    rwl->rwl_name, rwl);
> > +	}
> >  #endif
> >  }
> >  
> > @@ -397,8 +504,8 @@ rw_assert_rdlock(struct rwlock *rwl)
> >  #ifdef WITNESS
> >  	witness_assert(&rwl->rwl_lock_obj, LA_SLOCKED);
> >  #else
> > -	if (!RW_PROC(rwl->rwl_owner) || (rwl->rwl_owner & RWLOCK_WRLOCK))
> > -		panic("%s: lock not shared", rwl->rwl_name);
> > +	if (rw_status(rwl) != RW_READ)
> > +		panic("%s rwlock %p: lock not shared", rwl->rwl_name, rwl);
> >  #endif
> >  }
> >  
> > @@ -413,9 +520,11 @@ rw_assert_anylock(struct rwlock *rwl)
> >  #else
> >  	switch (rw_status(rwl)) {
> >  	case RW_WRITE_OTHER:
> > -		panic("%s: lock held by different process", rwl->rwl_name);
> > +		panic("%s rwlock %p: lock held by different process "
> > +		    "(self %lx, owner %lx)", rwl->rwl_name, rwl,
> > +		    rw_self(), rwl->rwl_owner);
> >  	case 0:
> > -		panic("%s: lock not held", rwl->rwl_name);
> > +		panic("%s rwlock %p: lock not held", rwl->rwl_name, rwl);
> >  	}
> >  #endif
> >  }
> > @@ -429,8 +538,8 @@ rw_assert_unlocked(struct rwlock *rwl)
> >  #ifdef WITNESS
> >  	witness_assert(&rwl->rwl_lock_obj, LA_UNLOCKED);
> >  #else
> > -	if (RW_PROC(curproc) == RW_PROC(rwl->rwl_owner))
> > -		panic("%s: lock held", rwl->rwl_name);
> > +	if (atomic_load_long(&rwl->rwl_owner) == rw_self())
> > +		panic("%s rwlock %p: lock held", rwl->rwl_name, rwl);
> >  #endif
> >  }
> >  #endif
> > @@ -450,7 +559,7 @@ rrw_enter(struct rrwlock *rrwl, int flag
> >  {
> >  	int	rv;
> >  
> > -	if (RW_PROC(rrwl->rrwl_lock.rwl_owner) == RW_PROC(curproc)) {
> > +	if (atomic_load_long(&rrwl->rrwl_lock.rwl_owner) == rw_self()) {
> >  		if (flags & RW_RECURSEFAIL)
> >  			return (EDEADLK);
> >  		else {
> > @@ -472,7 +581,7 @@ void
> >  rrw_exit(struct rrwlock *rrwl)
> >  {
> >  
> > -	if (RW_PROC(rrwl->rrwl_lock.rwl_owner) == RW_PROC(curproc)) {
> > +	if (atomic_load_long(&rrwl->rrwl_lock.rwl_owner) == rw_self()) {
> >  		KASSERT(rrwl->rrwl_wcnt > 0);
> >  		rrwl->rrwl_wcnt--;
> >  		if (rrwl->rrwl_wcnt != 0) {
> > 
>