From: Martin Pieuchot Subject: Re: kernel rwlocks vs the scheduler To: David Gwynne Cc: tech@openbsd.org Date: Thu, 14 Nov 2024 13:50:40 +0100 Thanks David! On 13/11/24(Wed) 23:41, David Gwynne wrote: > it's become obvious that heavily contended rwlocks put a lot of > pressure on the scheduler, and we have enough contended locks that > there's a benefit to changing rwlocks to try and mitigate against > that pressure. Indeed, some numbers below. > when a thread is waiting on an rwlock, it sets a bit in the rwlock > to indicate that when the current owner of the rwlock leaves the > critical section, it should wake up the waiting thread to try and > take the lock. if there's no waiting thread, the owner can skip the > wakeup. > > the problem is that rwlocks can't tell the difference between one > waiting thread and more than one waiting thread. so when the "there's > a thread waiting" bit is cleared, all the waiting threads are woken > up. one of these woken threads will take ownership of the lock, but > also importantly, the other threads will end up setting the "im > waiting" bit again, which is necessary for them to be woken up by > the 2nd thread that won the race to become the owner of the lock. > > this is compounded by pending writers and readers waiting on the > same wait channel. an rwlock may have one pending writer trying to > take the lock, but many readers waiting for it too. it would make > sense to wake up only the writer so it can take the lock next, but > we end up waking the readers at the same time. > > the result of this is that contended rwlocks wake up a lot of > threads, which puts a lot of pressure on the scheduler. this is > noticeable as a lot of contention on the scheduler lock, which > increases the system time used by the system. this is a pretty > classic thundering herd problem. > > the diff below mitigates against these wakeups by adding counters > to rwlocks for the number threads waiting to take write and read > locks instead of relying on bits. when a thread needs to wait for > a rwlock it increments the relevant counter before sleeping. after > it is woken up and takes the lock it decrements that counter. this > means rwlocks know how many threads are waiting at all times without > having to wake everything up to rebuild state every time a thread > releases the lock. > > pending writers and readers also wait on separate wchans. this > allows us to prioritise writers and to wake them up one at a time. > once there's no pending writers all pending readers can be woken > up in one go so > they can share the lock as soon as possible. > > if you are suffering a contended rwlock, this should reduce the > amount of time spent spinning on the sched lock, which in turn may > also reduce the wall clock time doing that work. > > the only downside to this diff in my opinion is that it grows struct > rwlock by 8 bytes. IMHO that's a non issue. The current rwlock implementation and its original goal to be small is an example of too early optimization that proved to do more harm than good. > i've been hitting this diff hard, but would appreciate more tests. > rwlocks are a very fundamental part of the kernel so they need to > work. benchmarks and witness tests are also welcome. I love this diff. I love how simple the implementation became and how easier it will be to extend it. I already suggested playing the `spc_spinning' dance. I also like the fact we can now unify the various rW_enter_*() functions. I tested this diff on a 24 CPUs amd64 and I couldn't notice any notable difference. However on a 80 CPUs arm64 machine it decreases the contention *a lot* for workload using more than 32 CPUs. Here are numbers: kernel -j64 with -current ========================= 6m45.56s real 11m42.21s user 162m11.60s system (cold) (~50% spin) 2m02.54s real 11m38.43s user 19m04.36s system (hot) 1m57.80s real 11m41.16s user 16m55.80s system (hot) kernel -j64 with -current+rwlock ================================ 4m05.35s real 11m49.56s user 61m52.88s system (cold) (<30% spin) 1m23.47s real 11m45.28s user 7m23.24s system (hot) 1m23.73s real 11m47.70s user 7m25.20s system (hot) For reference, with this diff, we have the same number as with 'make -j32': 1m23.02s real 11m29.40s user 6m59.53s system I haven't try WITNESS kernel, I'd be glad if someone could report on that. > Index: sys/rwlock.h > =================================================================== > RCS file: /cvs/src/sys/sys/rwlock.h,v > diff -u -p -r1.28 rwlock.h > --- sys/rwlock.h 11 Jan 2021 18:49:38 -0000 1.28 > +++ sys/rwlock.h 24 Oct 2024 11:18:04 -0000 > @@ -60,6 +60,8 @@ struct proc; > > struct rwlock { > volatile unsigned long rwl_owner; > + volatile unsigned int rwl_waiters; > + volatile unsigned int rwl_readers; > const char *rwl_name; > #ifdef WITNESS > struct lock_object rwl_lock_obj; > @@ -91,14 +93,12 @@ struct rwlock { > > #ifdef WITNESS > #define RWLOCK_INITIALIZER(name) \ > - { 0, name, .rwl_lock_obj = RWLOCK_LO_INITIALIZER(name, 0) } > + { 0, 0, 0, name, .rwl_lock_obj = RWLOCK_LO_INITIALIZER(name, 0) } > #else > #define RWLOCK_INITIALIZER(name) \ > - { 0, name } > + { 0, 0, 0, name } > #endif > > -#define RWLOCK_WAIT 0x01UL > -#define RWLOCK_WRWANT 0x02UL > #define RWLOCK_WRLOCK 0x04UL > #define RWLOCK_MASK 0x07UL > > Index: kern/kern_rwlock.c > =================================================================== > RCS file: /cvs/src/sys/kern/kern_rwlock.c,v > diff -u -p -r1.50 kern_rwlock.c > --- kern/kern_rwlock.c 14 Jul 2023 07:07:08 -0000 1.50 > +++ kern/kern_rwlock.c 24 Oct 2024 11:18:05 -0000 > @@ -26,10 +26,12 @@ > #include > #include > > -void rw_do_exit(struct rwlock *, unsigned long); > - > -/* XXX - temporary measure until proc0 is properly aligned */ > -#define RW_PROC(p) (((long)p) & ~RWLOCK_MASK) > +#ifdef RWDIAG > +#include /* for hz */ > +#define RW_SLEEP_TMO 10 * hz > +#else > +#define RW_SLEEP_TMO 0 > +#endif > > /* > * Other OSes implement more sophisticated mechanism to determine how long the > @@ -40,166 +42,131 @@ void rw_do_exit(struct rwlock *, unsigne > #define RW_SPINS 1000 > > #ifdef MULTIPROCESSOR > -#define rw_cas(p, o, n) (atomic_cas_ulong(p, o, n) != o) > +#define rw_cas(p, e, n) atomic_cas_ulong(p, e, n) > +#define rw_inc(p) atomic_inc_int(p) > +#define rw_dec(p) atomic_dec_int(p) > #else > -static inline int > -rw_cas(volatile unsigned long *p, unsigned long o, unsigned long n) > +static inline unsigned long > +rw_cas(volatile unsigned long *p, unsigned long e, unsigned long n) > { > - if (*p != o) > - return (1); > - *p = n; > + unsigned long o = *p; > > - return (0); > + if (o == e) > + *p = n; > + > + return (o); > +} > + > +static inline void > +rw_inc(volatile unsigned int *p) > +{ > + ++(*p); > +} > + > +static inline void > +rw_dec(volatile unsigned int *p) > +{ > + (*p)--; > } > #endif > > -/* > - * Magic wand for lock operations. Every operation checks if certain > - * flags are set and if they aren't, it increments the lock with some > - * value (that might need some computing in a few cases). If the operation > - * fails, we need to set certain flags while waiting for the lock. > - * > - * RW_WRITE The lock must be completely empty. We increment it with > - * RWLOCK_WRLOCK and the proc pointer of the holder. > - * Sets RWLOCK_WAIT|RWLOCK_WRWANT while waiting. > - * RW_READ RWLOCK_WRLOCK|RWLOCK_WRWANT may not be set. We increment > - * with RWLOCK_READ_INCR. RWLOCK_WAIT while waiting. > - */ > -static const struct rwlock_op { > - unsigned long inc; > - unsigned long check; > - unsigned long wait_set; > - long proc_mult; > - int wait_prio; > -} rw_ops[] = { > - { /* RW_WRITE */ > - RWLOCK_WRLOCK, > - ULONG_MAX, > - RWLOCK_WAIT | RWLOCK_WRWANT, > - 1, > - PLOCK - 4 > - }, > - { /* RW_READ */ > - RWLOCK_READ_INCR, > - RWLOCK_WRLOCK | RWLOCK_WRWANT, > - RWLOCK_WAIT, > - 0, > - PLOCK > - }, > - { /* Sparse Entry. */ > - 0, > - }, > - { /* RW_DOWNGRADE */ > - RWLOCK_READ_INCR - RWLOCK_WRLOCK, > - 0, > - 0, > - -1, > - PLOCK > - }, > -}; > +static int rw_do_enter_read(struct rwlock *, int); > +static void rw_do_exit_read(struct rwlock *, unsigned long); > +static int rw_do_enter_write(struct rwlock *, int); > +static int rw_downgrade(struct rwlock *, int); > + > +static void rw_exited(struct rwlock *); > + > +static unsigned long > +rw_self(void) > +{ > + unsigned long self = (unsigned long)curproc; > + > + CLR(self, RWLOCK_MASK); > + SET(self, RWLOCK_WRLOCK); > + > + return (self); > +} > > void > rw_enter_read(struct rwlock *rwl) > { > - unsigned long owner = rwl->rwl_owner; > - > - if (__predict_false((owner & (RWLOCK_WRLOCK | RWLOCK_WRWANT)) || > - rw_cas(&rwl->rwl_owner, owner, owner + RWLOCK_READ_INCR))) > - rw_enter(rwl, RW_READ); > - else { > - membar_enter_after_atomic(); > - WITNESS_CHECKORDER(&rwl->rwl_lock_obj, LOP_NEWORDER, NULL); > - WITNESS_LOCK(&rwl->rwl_lock_obj, 0); > - } > + rw_do_enter_read(rwl, 0); > } > > void > rw_enter_write(struct rwlock *rwl) > { > - struct proc *p = curproc; > - > - if (__predict_false(rw_cas(&rwl->rwl_owner, 0, > - RW_PROC(p) | RWLOCK_WRLOCK))) > - rw_enter(rwl, RW_WRITE); > - else { > - membar_enter_after_atomic(); > - WITNESS_CHECKORDER(&rwl->rwl_lock_obj, > - LOP_EXCLUSIVE | LOP_NEWORDER, NULL); > - WITNESS_LOCK(&rwl->rwl_lock_obj, LOP_EXCLUSIVE); > - } > + rw_do_enter_write(rwl, 0); > } > > void > rw_exit_read(struct rwlock *rwl) > { > - unsigned long owner; > + /* maybe we're the last one? */ > + rw_do_exit_read(rwl, RWLOCK_READ_INCR); > +} > + > +static void > +rw_do_exit_read(struct rwlock *rwl, unsigned long owner) > +{ > + unsigned long decr; > + unsigned long nowner; > > - rw_assert_rdlock(rwl); > WITNESS_UNLOCK(&rwl->rwl_lock_obj, 0); > > - membar_exit_before_atomic(); > - owner = rwl->rwl_owner; > - if (__predict_false((owner & RWLOCK_WAIT) || > - rw_cas(&rwl->rwl_owner, owner, owner - RWLOCK_READ_INCR))) > - rw_do_exit(rwl, 0); > + for (;;) { > + decr = owner - RWLOCK_READ_INCR; > + nowner = rw_cas(&rwl->rwl_owner, owner, decr); > + if (owner == nowner) > + break; > + > + if (__predict_false(ISSET(nowner, RWLOCK_WRLOCK))) { > + panic("%s rwlock %p: exit read on write locked lock" > + " (owner 0x%lx)", rwl->rwl_name, rwl, nowner); > + } > + if (__predict_false(nowner == 0)) { > + panic("%s rwlock %p: exit read on unlocked lock", > + rwl->rwl_name, rwl); > + } > + > + owner = nowner; > + } > + > + /* read lock didn't change anything, so no barrier needed? */ > + > + if (decr == 0) { > + /* last one out */ > + rw_exited(rwl); > + } > } > > void > rw_exit_write(struct rwlock *rwl) > { > + unsigned long self = rw_self(); > unsigned long owner; > > - rw_assert_wrlock(rwl); > WITNESS_UNLOCK(&rwl->rwl_lock_obj, LOP_EXCLUSIVE); > > membar_exit_before_atomic(); > - owner = rwl->rwl_owner; > - if (__predict_false((owner & RWLOCK_WAIT) || > - rw_cas(&rwl->rwl_owner, owner, 0))) > - rw_do_exit(rwl, RWLOCK_WRLOCK); > -} > - > -#ifdef DIAGNOSTIC > -/* > - * Put the diagnostic functions here to keep the main code free > - * from ifdef clutter. > - */ > -static void > -rw_enter_diag(struct rwlock *rwl, int flags) > -{ > - switch (flags & RW_OPMASK) { > - case RW_WRITE: > - case RW_READ: > - if (RW_PROC(curproc) == RW_PROC(rwl->rwl_owner)) > - panic("rw_enter: %s locking against myself", > - rwl->rwl_name); > - break; > - case RW_DOWNGRADE: > - /* > - * If we're downgrading, we must hold the write lock. > - */ > - if ((rwl->rwl_owner & RWLOCK_WRLOCK) == 0) > - panic("rw_enter: %s downgrade of non-write lock", > - rwl->rwl_name); > - if (RW_PROC(curproc) != RW_PROC(rwl->rwl_owner)) > - panic("rw_enter: %s downgrade, not holder", > - rwl->rwl_name); > - break; > - > - default: > - panic("rw_enter: unknown op 0x%x", flags); > + owner = rw_cas(&rwl->rwl_owner, self, 0); > + if (__predict_false(owner != self)) { > + panic("%s rwlock %p: exit write when lock not held " > + "(owner 0x%lx, self 0x%lx)", rwl->rwl_name, rwl, > + owner, self); > } > -} > > -#else > -#define rw_enter_diag(r, f) > -#endif > + rw_exited(rwl); > +} > > static void > _rw_init_flags_witness(struct rwlock *rwl, const char *name, int lo_flags, > const struct lock_type *type) > { > rwl->rwl_owner = 0; > + rwl->rwl_waiters = 0; > + rwl->rwl_readers = 0; > rwl->rwl_name = name; > > #ifdef WITNESS > @@ -223,144 +190,285 @@ _rw_init_flags(struct rwlock *rwl, const > int > rw_enter(struct rwlock *rwl, int flags) > { > - const struct rwlock_op *op; > - unsigned long inc, o; > -#ifdef MULTIPROCESSOR > - /* > - * If process holds the kernel lock, then we want to give up on CPU > - * as soon as possible so other processes waiting for the kernel lock > - * can progress. Hence no spinning if we hold the kernel lock. > - */ > - unsigned int spin = (_kernel_lock_held()) ? 0 : RW_SPINS; > -#endif > - int error, prio; > -#ifdef WITNESS > - int lop_flags; > + int op = flags & RW_OPMASK; > + int error; > + > + switch (op) { > + case RW_WRITE: > + error = rw_do_enter_write(rwl, flags); > + break; > + case RW_READ: > + error = rw_do_enter_read(rwl, flags); > + break; > + case RW_DOWNGRADE: > + error = rw_downgrade(rwl, flags); > + break; > + default: > + panic("%s rwlock %p: %s unexpected op 0x%x", > + rwl->rwl_name, rwl, __func__, op); > + /* NOTREACHED */ > + } > > - lop_flags = LOP_NEWORDER; > - if (flags & RW_WRITE) > - lop_flags |= LOP_EXCLUSIVE; > - if (flags & RW_DUPOK) > + return (error); > +} > + > +static int > +rw_do_enter_write(struct rwlock *rwl, int flags) > +{ > + unsigned long self = rw_self(); > + unsigned long owner; > + int prio; > + int error; > + > +#ifdef WITNESS > + int lop_flags = LOP_NEWORDER | LOP_EXCLUSIVE; > + if (ISSET(flags, RW_DUPOK)) > lop_flags |= LOP_DUPOK; > - if ((flags & RW_NOSLEEP) == 0 && (flags & RW_DOWNGRADE) == 0) > + > + if (!ISSET(flags, RW_NOSLEEP)) > WITNESS_CHECKORDER(&rwl->rwl_lock_obj, lop_flags, NULL); > #endif > > - op = &rw_ops[(flags & RW_OPMASK) - 1]; > - > - inc = op->inc + RW_PROC(curproc) * op->proc_mult; > -retry: > - while (__predict_false(((o = rwl->rwl_owner) & op->check) != 0)) { > - unsigned long set = o | op->wait_set; > - int do_sleep; > - > - /* Avoid deadlocks after panic or in DDB */ > - if (panicstr || db_active) > - return (0); > + owner = rw_cas(&rwl->rwl_owner, 0, self); > + if (owner == 0) { > + /* wow, we won. so easy */ > + goto locked; > + } > + if (__predict_false(owner == self)) { > + panic("%s rwlock %p: enter write deadlock", > + rwl->rwl_name, rwl); > + } > > #ifdef MULTIPROCESSOR > + /* > + * If process holds the kernel lock, then we want to give up on CPU > + * as soon as possible so other processes waiting for the kernel lock > + * can progress. Hence no spinning if we hold the kernel lock. > + */ > + if (!_kernel_lock_held()) { > + int spins; > + > /* > * It makes sense to try to spin just in case the lock > * is acquired by writer. > */ > - if ((o & RWLOCK_WRLOCK) && (spin != 0)) { > - spin--; > + > + for (spins = 0; spins < RW_SPINS; spins++) { > CPU_BUSY_CYCLE(); > - continue; > + owner = atomic_load_long(&rwl->rwl_owner); > + if (owner != 0) > + continue; > + > + owner = rw_cas(&rwl->rwl_owner, 0, self); > + if (owner == 0) { > + /* ok, we won now. */ > + goto locked; > + } > } > + } > #endif > > - rw_enter_diag(rwl, flags); > - > - if (flags & RW_NOSLEEP) > - return (EBUSY); > + if (ISSET(flags, RW_NOSLEEP)) > + return (EBUSY); > > - prio = op->wait_prio; > - if (flags & RW_INTR) > - prio |= PCATCH; > - sleep_setup(rwl, prio, rwl->rwl_name); > + prio = PLOCK - 4; > + if (ISSET(flags, RW_INTR)) > + prio |= PCATCH; > > - do_sleep = !rw_cas(&rwl->rwl_owner, o, set); > - > - error = sleep_finish(0, do_sleep); > - if ((flags & RW_INTR) && > - (error != 0)) > + rw_inc(&rwl->rwl_waiters); > + membar_producer(); > + do { > + sleep_setup(&rwl->rwl_waiters, prio, rwl->rwl_name); > + membar_consumer(); > + owner = atomic_load_long(&rwl->rwl_owner); > + error = sleep_finish(RW_SLEEP_TMO, owner != 0); > +#ifdef RWDIAG > + if (error == EWOULDBLOCK) { > + printf("%s rwlock %p: %s timeout owner 0x%lx " > + "(self 0x%lx)", rwl->rwl_name, rwl, __func__, > + owner, self); > + db_enter(); > + } > +#endif > + if (ISSET(flags, RW_INTR) && (error != 0)) { > + rw_dec(&rwl->rwl_waiters); > return (error); > - if (flags & RW_SLEEPFAIL) > + } > + if (ISSET(flags, RW_SLEEPFAIL)) { > + rw_dec(&rwl->rwl_waiters); > + rw_exited(rwl); > return (EAGAIN); > - } > + } > + > + owner = rw_cas(&rwl->rwl_owner, 0, self); > + } while (owner != 0); > + rw_dec(&rwl->rwl_waiters); > > - if (__predict_false(rw_cas(&rwl->rwl_owner, o, o + inc))) > - goto retry; > +locked: > membar_enter_after_atomic(); > + WITNESS_LOCK(&rwl->rwl_lock_obj, lop_flags); > > - /* > - * If old lock had RWLOCK_WAIT and RWLOCK_WRLOCK set, it means we > - * downgraded a write lock and had possible read waiter, wake them > - * to let them retry the lock. > - */ > - if (__predict_false((o & (RWLOCK_WRLOCK|RWLOCK_WAIT)) == > - (RWLOCK_WRLOCK|RWLOCK_WAIT))) > - wakeup(rwl); > + return (0); > +} > > - if (flags & RW_DOWNGRADE) > - WITNESS_DOWNGRADE(&rwl->rwl_lock_obj, lop_flags); > - else > - WITNESS_LOCK(&rwl->rwl_lock_obj, lop_flags); > +static int > +rw_read_incr(struct rwlock *rwl, unsigned long owner) > +{ > + unsigned long incr; > + unsigned long nowner; > + > + do { > + incr = owner + RWLOCK_READ_INCR; > + nowner = rw_cas(&rwl->rwl_owner, owner, incr); > + if (nowner == owner) > + return (1); > + > + owner = nowner; > + } while (!ISSET(owner, RWLOCK_WRLOCK)); > > return (0); > } > > -void > -rw_exit(struct rwlock *rwl) > +static int > +rw_do_enter_read(struct rwlock *rwl, int flags) > { > - unsigned long wrlock; > + unsigned long owner; > + int error; > + int prio; > > - /* Avoid deadlocks after panic or in DDB */ > - if (panicstr || db_active) > - return; > +#ifdef WITNESS > + int lop_flags = LOP_NEWORDER; > + if (ISSET(flags, RW_DUPOK)) > + lop_flags |= LOP_DUPOK; > + if (!ISSET(flags, RW_NOSLEEP)) > + WITNESS_CHECKORDER(&rwl->rwl_lock_obj, lop_flags, NULL); > +#endif > > - wrlock = rwl->rwl_owner & RWLOCK_WRLOCK; > - if (wrlock) > - rw_assert_wrlock(rwl); > - else > - rw_assert_rdlock(rwl); > - WITNESS_UNLOCK(&rwl->rwl_lock_obj, wrlock ? LOP_EXCLUSIVE : 0); > + owner = rw_cas(&rwl->rwl_owner, 0, RWLOCK_READ_INCR); > + if (owner == 0) { > + /* ermagerd, we won! */ > + goto locked; > + } > + > + if (ISSET(owner, RWLOCK_WRLOCK)) { > + if (__predict_false(owner == rw_self())) { > + panic("%s rwlock %p: enter read deadlock", > + rwl->rwl_name, rwl); > + } > + } else if (atomic_load_int(&rwl->rwl_waiters) == 0) { > + if (rw_read_incr(rwl, owner)) { > + /* nailed it */ > + goto locked; > + } > + } > + > + if (ISSET(flags, RW_NOSLEEP)) > + return (EBUSY); > + > + prio = PLOCK; > + if (ISSET(flags, RW_INTR)) > + prio |= PCATCH; > + > + rw_inc(&rwl->rwl_readers); > + membar_producer(); > + do { > + sleep_setup(&rwl->rwl_readers, prio, rwl->rwl_name); > + membar_consumer(); > + error = sleep_finish(RW_SLEEP_TMO, > + atomic_load_int(&rwl->rwl_waiters) > 0 || > + ISSET(atomic_load_long(&rwl->rwl_owner), RWLOCK_WRLOCK)); > +#ifdef RWDIAG > + if (error == EWOULDBLOCK) { > + printf("%s rwlock %p: %s timeout owner 0x%lx\n", > + rwl->rwl_name, rwl, __func__, owner); > + db_enter(); > + } > +#endif > + if (ISSET(flags, RW_INTR) && (error != 0)) > + goto fail; > + if (ISSET(flags, RW_SLEEPFAIL)) { > + error = EAGAIN; > + goto fail; > + } > + } while (!rw_read_incr(rwl, 0)); > + rw_dec(&rwl->rwl_readers); > + > +locked: > + membar_enter_after_atomic(); > + WITNESS_LOCK(&rwl->rwl_lock_obj, lop_flags); > + > + return (0); > +fail: > + rw_dec(&rwl->rwl_readers); > + return (error); > +} > + > +static int > +rw_downgrade(struct rwlock *rwl, int flags) > +{ > + unsigned long self = rw_self(); > + unsigned long owner; > > membar_exit_before_atomic(); > - rw_do_exit(rwl, wrlock); > + owner = atomic_cas_ulong(&rwl->rwl_owner, self, RWLOCK_READ_INCR); > + if (__predict_false(owner != self)) { > + panic("%s rwlock %p: downgrade when lock not held " > + "(owner 0x%lx, self 0x%lx)", rwl->rwl_name, rwl, > + owner, self); > + } > + > +#ifdef WITNESS > + { > + int lop_flags = LOP_NEWORDER; > + if (ISSET(flags, RW_DUPOK)) > + lop_flags |= LOP_DUPOK; > + WITNESS_DOWNGRADE(&rwl->rwl_lock_obj, lop_flags); > + } > +#endif > + > + membar_consumer(); > + if (atomic_load_int(&rwl->rwl_waiters) == 0 && > + atomic_load_int(&rwl->rwl_readers) > 0) > + wakeup(&rwl->rwl_readers); > + > + return (0); > } > > -/* membar_exit_before_atomic() has to precede call of this function. */ > void > -rw_do_exit(struct rwlock *rwl, unsigned long wrlock) > +rw_exit(struct rwlock *rwl) > { > - unsigned long owner, set; > + unsigned long owner; > > - do { > - owner = rwl->rwl_owner; > - if (wrlock) > - set = 0; > - else > - set = (owner - RWLOCK_READ_INCR) & > - ~(RWLOCK_WAIT|RWLOCK_WRWANT); > - /* > - * Potential MP race here. If the owner had WRWANT set, we > - * cleared it and a reader can sneak in before a writer. > - */ > - } while (__predict_false(rw_cas(&rwl->rwl_owner, owner, set))); > + owner = atomic_load_long(&rwl->rwl_owner); > + if (__predict_false(owner == 0)) { > + panic("%s rwlock %p: exit on unlocked lock", > + rwl->rwl_name, rwl); > + } > + > + if (ISSET(owner, RWLOCK_WRLOCK)) > + rw_exit_write(rwl); > + else > + rw_do_exit_read(rwl, owner); > +} > > - if (owner & RWLOCK_WAIT) > - wakeup(rwl); > +static void > +rw_exited(struct rwlock *rwl) > +{ > + membar_consumer(); > + if (atomic_load_int(&rwl->rwl_waiters) > 0) > + wakeup_one(&rwl->rwl_waiters); > + else if (atomic_load_int(&rwl->rwl_readers) > 0) > + wakeup(&rwl->rwl_readers); > } > > int > rw_status(struct rwlock *rwl) > { > - unsigned long owner = rwl->rwl_owner; > + unsigned long owner; > > - if (owner & RWLOCK_WRLOCK) { > - if (RW_PROC(curproc) == RW_PROC(owner)) > + owner = atomic_load_long(&rwl->rwl_owner); > + if (ISSET(owner, RWLOCK_WRLOCK)) { > + if (rw_self() == owner) > return RW_WRITE; > else > return RW_WRITE_OTHER; > @@ -380,11 +488,10 @@ rw_assert_wrlock(struct rwlock *rwl) > #ifdef WITNESS > witness_assert(&rwl->rwl_lock_obj, LA_XLOCKED); > #else > - if (!(rwl->rwl_owner & RWLOCK_WRLOCK)) > - panic("%s: lock not held", rwl->rwl_name); > - > - if (RW_PROC(curproc) != RW_PROC(rwl->rwl_owner)) > - panic("%s: lock not held by this process", rwl->rwl_name); > + if (atomic_load_long(&rwl->rwl_owner) != rw_self()) { > + panic("%s rwlock %p: lock not held by this process", > + rwl->rwl_name, rwl); > + } > #endif > } > > @@ -397,8 +504,8 @@ rw_assert_rdlock(struct rwlock *rwl) > #ifdef WITNESS > witness_assert(&rwl->rwl_lock_obj, LA_SLOCKED); > #else > - if (!RW_PROC(rwl->rwl_owner) || (rwl->rwl_owner & RWLOCK_WRLOCK)) > - panic("%s: lock not shared", rwl->rwl_name); > + if (rw_status(rwl) != RW_READ) > + panic("%s rwlock %p: lock not shared", rwl->rwl_name, rwl); > #endif > } > > @@ -413,9 +520,11 @@ rw_assert_anylock(struct rwlock *rwl) > #else > switch (rw_status(rwl)) { > case RW_WRITE_OTHER: > - panic("%s: lock held by different process", rwl->rwl_name); > + panic("%s rwlock %p: lock held by different process " > + "(self %lx, owner %lx)", rwl->rwl_name, rwl, > + rw_self(), rwl->rwl_owner); > case 0: > - panic("%s: lock not held", rwl->rwl_name); > + panic("%s rwlock %p: lock not held", rwl->rwl_name, rwl); > } > #endif > } > @@ -429,8 +538,8 @@ rw_assert_unlocked(struct rwlock *rwl) > #ifdef WITNESS > witness_assert(&rwl->rwl_lock_obj, LA_UNLOCKED); > #else > - if (RW_PROC(curproc) == RW_PROC(rwl->rwl_owner)) > - panic("%s: lock held", rwl->rwl_name); > + if (atomic_load_long(&rwl->rwl_owner) == rw_self()) > + panic("%s rwlock %p: lock held", rwl->rwl_name, rwl); > #endif > } > #endif > @@ -450,7 +559,7 @@ rrw_enter(struct rrwlock *rrwl, int flag > { > int rv; > > - if (RW_PROC(rrwl->rrwl_lock.rwl_owner) == RW_PROC(curproc)) { > + if (atomic_load_long(&rrwl->rrwl_lock.rwl_owner) == rw_self()) { > if (flags & RW_RECURSEFAIL) > return (EDEADLK); > else { > @@ -472,7 +581,7 @@ void > rrw_exit(struct rrwlock *rrwl) > { > > - if (RW_PROC(rrwl->rrwl_lock.rwl_owner) == RW_PROC(curproc)) { > + if (atomic_load_long(&rrwl->rrwl_lock.rwl_owner) == rw_self()) { > KASSERT(rrwl->rrwl_wcnt > 0); > rrwl->rrwl_wcnt--; > if (rrwl->rrwl_wcnt != 0) { >