From: David Gwynne Subject: kernel rwlocks vs the scheduler To: tech@openbsd.org Date: Wed, 13 Nov 2024 23:41:10 +1000 it's become obvious that heavily contended rwlocks put a lot of pressure on the scheduler, and we have enough contended locks that there's a benefit to changing rwlocks to try and mitigate against that pressure. when a thread is waiting on an rwlock, it sets a bit in the rwlock to indicate that when the current owner of the rwlock leaves the critical section, it should wake up the waiting thread to try and take the lock. if there's no waiting thread, the owner can skip the wakeup. the problem is that rwlocks can't tell the difference between one waiting thread and more than one waiting thread. so when the "there's a thread waiting" bit is cleared, all the waiting threads are woken up. one of these woken threads will take ownership of the lock, but also importantly, the other threads will end up setting the "im waiting" bit again, which is necessary for them to be woken up by the 2nd thread that won the race to become the owner of the lock. this is compounded by pending writers and readers waiting on the same wait channel. an rwlock may have one pending writer trying to take the lock, but many readers waiting for it too. it would make sense to wake up only the writer so it can take the lock next, but we end up waking the readers at the same time. the result of this is that contended rwlocks wake up a lot of threads, which puts a lot of pressure on the scheduler. this is noticeable as a lot of contention on the scheduler lock, which increases the system time used by the system. this is a pretty classic thundering herd problem. the diff below mitigates against these wakeups by adding counters to rwlocks for the number threads waiting to take write and read locks instead of relying on bits. when a thread needs to wait for a rwlock it increments the relevant counter before sleeping. after it is woken up and takes the lock it decrements that counter. this means rwlocks know how many threads are waiting at all times without having to wake everything up to rebuild state every time a thread releases the lock. pending writers and readers also wait on separate wchans. this allows us to prioritise writers and to wake them up one at a time. once there's no pending writers all pending readers can be woken up in one go so they can share the lock as soon as possible. if you are suffering a contended rwlock, this should reduce the amount of time spent spinning on the sched lock, which in turn may also reduce the wall clock time doing that work. the only downside to this diff in my opinion is that it grows struct rwlock by 8 bytes. i've been hitting this diff hard, but would appreciate more tests. rwlocks are a very fundamental part of the kernel so they need to work. benchmarks and witness tests are also welcome. Index: sys/rwlock.h =================================================================== RCS file: /cvs/src/sys/sys/rwlock.h,v diff -u -p -r1.28 rwlock.h --- sys/rwlock.h 11 Jan 2021 18:49:38 -0000 1.28 +++ sys/rwlock.h 24 Oct 2024 11:18:04 -0000 @@ -60,6 +60,8 @@ struct proc; struct rwlock { volatile unsigned long rwl_owner; + volatile unsigned int rwl_waiters; + volatile unsigned int rwl_readers; const char *rwl_name; #ifdef WITNESS struct lock_object rwl_lock_obj; @@ -91,14 +93,12 @@ struct rwlock { #ifdef WITNESS #define RWLOCK_INITIALIZER(name) \ - { 0, name, .rwl_lock_obj = RWLOCK_LO_INITIALIZER(name, 0) } + { 0, 0, 0, name, .rwl_lock_obj = RWLOCK_LO_INITIALIZER(name, 0) } #else #define RWLOCK_INITIALIZER(name) \ - { 0, name } + { 0, 0, 0, name } #endif -#define RWLOCK_WAIT 0x01UL -#define RWLOCK_WRWANT 0x02UL #define RWLOCK_WRLOCK 0x04UL #define RWLOCK_MASK 0x07UL Index: kern/kern_rwlock.c =================================================================== RCS file: /cvs/src/sys/kern/kern_rwlock.c,v diff -u -p -r1.50 kern_rwlock.c --- kern/kern_rwlock.c 14 Jul 2023 07:07:08 -0000 1.50 +++ kern/kern_rwlock.c 24 Oct 2024 11:18:05 -0000 @@ -26,10 +26,12 @@ #include #include -void rw_do_exit(struct rwlock *, unsigned long); - -/* XXX - temporary measure until proc0 is properly aligned */ -#define RW_PROC(p) (((long)p) & ~RWLOCK_MASK) +#ifdef RWDIAG +#include /* for hz */ +#define RW_SLEEP_TMO 10 * hz +#else +#define RW_SLEEP_TMO 0 +#endif /* * Other OSes implement more sophisticated mechanism to determine how long the @@ -40,166 +42,131 @@ void rw_do_exit(struct rwlock *, unsigne #define RW_SPINS 1000 #ifdef MULTIPROCESSOR -#define rw_cas(p, o, n) (atomic_cas_ulong(p, o, n) != o) +#define rw_cas(p, e, n) atomic_cas_ulong(p, e, n) +#define rw_inc(p) atomic_inc_int(p) +#define rw_dec(p) atomic_dec_int(p) #else -static inline int -rw_cas(volatile unsigned long *p, unsigned long o, unsigned long n) +static inline unsigned long +rw_cas(volatile unsigned long *p, unsigned long e, unsigned long n) { - if (*p != o) - return (1); - *p = n; + unsigned long o = *p; - return (0); + if (o == e) + *p = n; + + return (o); +} + +static inline void +rw_inc(volatile unsigned int *p) +{ + ++(*p); +} + +static inline void +rw_dec(volatile unsigned int *p) +{ + (*p)--; } #endif -/* - * Magic wand for lock operations. Every operation checks if certain - * flags are set and if they aren't, it increments the lock with some - * value (that might need some computing in a few cases). If the operation - * fails, we need to set certain flags while waiting for the lock. - * - * RW_WRITE The lock must be completely empty. We increment it with - * RWLOCK_WRLOCK and the proc pointer of the holder. - * Sets RWLOCK_WAIT|RWLOCK_WRWANT while waiting. - * RW_READ RWLOCK_WRLOCK|RWLOCK_WRWANT may not be set. We increment - * with RWLOCK_READ_INCR. RWLOCK_WAIT while waiting. - */ -static const struct rwlock_op { - unsigned long inc; - unsigned long check; - unsigned long wait_set; - long proc_mult; - int wait_prio; -} rw_ops[] = { - { /* RW_WRITE */ - RWLOCK_WRLOCK, - ULONG_MAX, - RWLOCK_WAIT | RWLOCK_WRWANT, - 1, - PLOCK - 4 - }, - { /* RW_READ */ - RWLOCK_READ_INCR, - RWLOCK_WRLOCK | RWLOCK_WRWANT, - RWLOCK_WAIT, - 0, - PLOCK - }, - { /* Sparse Entry. */ - 0, - }, - { /* RW_DOWNGRADE */ - RWLOCK_READ_INCR - RWLOCK_WRLOCK, - 0, - 0, - -1, - PLOCK - }, -}; +static int rw_do_enter_read(struct rwlock *, int); +static void rw_do_exit_read(struct rwlock *, unsigned long); +static int rw_do_enter_write(struct rwlock *, int); +static int rw_downgrade(struct rwlock *, int); + +static void rw_exited(struct rwlock *); + +static unsigned long +rw_self(void) +{ + unsigned long self = (unsigned long)curproc; + + CLR(self, RWLOCK_MASK); + SET(self, RWLOCK_WRLOCK); + + return (self); +} void rw_enter_read(struct rwlock *rwl) { - unsigned long owner = rwl->rwl_owner; - - if (__predict_false((owner & (RWLOCK_WRLOCK | RWLOCK_WRWANT)) || - rw_cas(&rwl->rwl_owner, owner, owner + RWLOCK_READ_INCR))) - rw_enter(rwl, RW_READ); - else { - membar_enter_after_atomic(); - WITNESS_CHECKORDER(&rwl->rwl_lock_obj, LOP_NEWORDER, NULL); - WITNESS_LOCK(&rwl->rwl_lock_obj, 0); - } + rw_do_enter_read(rwl, 0); } void rw_enter_write(struct rwlock *rwl) { - struct proc *p = curproc; - - if (__predict_false(rw_cas(&rwl->rwl_owner, 0, - RW_PROC(p) | RWLOCK_WRLOCK))) - rw_enter(rwl, RW_WRITE); - else { - membar_enter_after_atomic(); - WITNESS_CHECKORDER(&rwl->rwl_lock_obj, - LOP_EXCLUSIVE | LOP_NEWORDER, NULL); - WITNESS_LOCK(&rwl->rwl_lock_obj, LOP_EXCLUSIVE); - } + rw_do_enter_write(rwl, 0); } void rw_exit_read(struct rwlock *rwl) { - unsigned long owner; + /* maybe we're the last one? */ + rw_do_exit_read(rwl, RWLOCK_READ_INCR); +} + +static void +rw_do_exit_read(struct rwlock *rwl, unsigned long owner) +{ + unsigned long decr; + unsigned long nowner; - rw_assert_rdlock(rwl); WITNESS_UNLOCK(&rwl->rwl_lock_obj, 0); - membar_exit_before_atomic(); - owner = rwl->rwl_owner; - if (__predict_false((owner & RWLOCK_WAIT) || - rw_cas(&rwl->rwl_owner, owner, owner - RWLOCK_READ_INCR))) - rw_do_exit(rwl, 0); + for (;;) { + decr = owner - RWLOCK_READ_INCR; + nowner = rw_cas(&rwl->rwl_owner, owner, decr); + if (owner == nowner) + break; + + if (__predict_false(ISSET(nowner, RWLOCK_WRLOCK))) { + panic("%s rwlock %p: exit read on write locked lock" + " (owner 0x%lx)", rwl->rwl_name, rwl, nowner); + } + if (__predict_false(nowner == 0)) { + panic("%s rwlock %p: exit read on unlocked lock", + rwl->rwl_name, rwl); + } + + owner = nowner; + } + + /* read lock didn't change anything, so no barrier needed? */ + + if (decr == 0) { + /* last one out */ + rw_exited(rwl); + } } void rw_exit_write(struct rwlock *rwl) { + unsigned long self = rw_self(); unsigned long owner; - rw_assert_wrlock(rwl); WITNESS_UNLOCK(&rwl->rwl_lock_obj, LOP_EXCLUSIVE); membar_exit_before_atomic(); - owner = rwl->rwl_owner; - if (__predict_false((owner & RWLOCK_WAIT) || - rw_cas(&rwl->rwl_owner, owner, 0))) - rw_do_exit(rwl, RWLOCK_WRLOCK); -} - -#ifdef DIAGNOSTIC -/* - * Put the diagnostic functions here to keep the main code free - * from ifdef clutter. - */ -static void -rw_enter_diag(struct rwlock *rwl, int flags) -{ - switch (flags & RW_OPMASK) { - case RW_WRITE: - case RW_READ: - if (RW_PROC(curproc) == RW_PROC(rwl->rwl_owner)) - panic("rw_enter: %s locking against myself", - rwl->rwl_name); - break; - case RW_DOWNGRADE: - /* - * If we're downgrading, we must hold the write lock. - */ - if ((rwl->rwl_owner & RWLOCK_WRLOCK) == 0) - panic("rw_enter: %s downgrade of non-write lock", - rwl->rwl_name); - if (RW_PROC(curproc) != RW_PROC(rwl->rwl_owner)) - panic("rw_enter: %s downgrade, not holder", - rwl->rwl_name); - break; - - default: - panic("rw_enter: unknown op 0x%x", flags); + owner = rw_cas(&rwl->rwl_owner, self, 0); + if (__predict_false(owner != self)) { + panic("%s rwlock %p: exit write when lock not held " + "(owner 0x%lx, self 0x%lx)", rwl->rwl_name, rwl, + owner, self); } -} -#else -#define rw_enter_diag(r, f) -#endif + rw_exited(rwl); +} static void _rw_init_flags_witness(struct rwlock *rwl, const char *name, int lo_flags, const struct lock_type *type) { rwl->rwl_owner = 0; + rwl->rwl_waiters = 0; + rwl->rwl_readers = 0; rwl->rwl_name = name; #ifdef WITNESS @@ -223,144 +190,285 @@ _rw_init_flags(struct rwlock *rwl, const int rw_enter(struct rwlock *rwl, int flags) { - const struct rwlock_op *op; - unsigned long inc, o; -#ifdef MULTIPROCESSOR - /* - * If process holds the kernel lock, then we want to give up on CPU - * as soon as possible so other processes waiting for the kernel lock - * can progress. Hence no spinning if we hold the kernel lock. - */ - unsigned int spin = (_kernel_lock_held()) ? 0 : RW_SPINS; -#endif - int error, prio; -#ifdef WITNESS - int lop_flags; + int op = flags & RW_OPMASK; + int error; + + switch (op) { + case RW_WRITE: + error = rw_do_enter_write(rwl, flags); + break; + case RW_READ: + error = rw_do_enter_read(rwl, flags); + break; + case RW_DOWNGRADE: + error = rw_downgrade(rwl, flags); + break; + default: + panic("%s rwlock %p: %s unexpected op 0x%x", + rwl->rwl_name, rwl, __func__, op); + /* NOTREACHED */ + } - lop_flags = LOP_NEWORDER; - if (flags & RW_WRITE) - lop_flags |= LOP_EXCLUSIVE; - if (flags & RW_DUPOK) + return (error); +} + +static int +rw_do_enter_write(struct rwlock *rwl, int flags) +{ + unsigned long self = rw_self(); + unsigned long owner; + int prio; + int error; + +#ifdef WITNESS + int lop_flags = LOP_NEWORDER | LOP_EXCLUSIVE; + if (ISSET(flags, RW_DUPOK)) lop_flags |= LOP_DUPOK; - if ((flags & RW_NOSLEEP) == 0 && (flags & RW_DOWNGRADE) == 0) + + if (!ISSET(flags, RW_NOSLEEP)) WITNESS_CHECKORDER(&rwl->rwl_lock_obj, lop_flags, NULL); #endif - op = &rw_ops[(flags & RW_OPMASK) - 1]; - - inc = op->inc + RW_PROC(curproc) * op->proc_mult; -retry: - while (__predict_false(((o = rwl->rwl_owner) & op->check) != 0)) { - unsigned long set = o | op->wait_set; - int do_sleep; - - /* Avoid deadlocks after panic or in DDB */ - if (panicstr || db_active) - return (0); + owner = rw_cas(&rwl->rwl_owner, 0, self); + if (owner == 0) { + /* wow, we won. so easy */ + goto locked; + } + if (__predict_false(owner == self)) { + panic("%s rwlock %p: enter write deadlock", + rwl->rwl_name, rwl); + } #ifdef MULTIPROCESSOR + /* + * If process holds the kernel lock, then we want to give up on CPU + * as soon as possible so other processes waiting for the kernel lock + * can progress. Hence no spinning if we hold the kernel lock. + */ + if (!_kernel_lock_held()) { + int spins; + /* * It makes sense to try to spin just in case the lock * is acquired by writer. */ - if ((o & RWLOCK_WRLOCK) && (spin != 0)) { - spin--; + + for (spins = 0; spins < RW_SPINS; spins++) { CPU_BUSY_CYCLE(); - continue; + owner = atomic_load_long(&rwl->rwl_owner); + if (owner != 0) + continue; + + owner = rw_cas(&rwl->rwl_owner, 0, self); + if (owner == 0) { + /* ok, we won now. */ + goto locked; + } } + } #endif - rw_enter_diag(rwl, flags); - - if (flags & RW_NOSLEEP) - return (EBUSY); + if (ISSET(flags, RW_NOSLEEP)) + return (EBUSY); - prio = op->wait_prio; - if (flags & RW_INTR) - prio |= PCATCH; - sleep_setup(rwl, prio, rwl->rwl_name); + prio = PLOCK - 4; + if (ISSET(flags, RW_INTR)) + prio |= PCATCH; - do_sleep = !rw_cas(&rwl->rwl_owner, o, set); - - error = sleep_finish(0, do_sleep); - if ((flags & RW_INTR) && - (error != 0)) + rw_inc(&rwl->rwl_waiters); + membar_producer(); + do { + sleep_setup(&rwl->rwl_waiters, prio, rwl->rwl_name); + membar_consumer(); + owner = atomic_load_long(&rwl->rwl_owner); + error = sleep_finish(RW_SLEEP_TMO, owner != 0); +#ifdef RWDIAG + if (error == EWOULDBLOCK) { + printf("%s rwlock %p: %s timeout owner 0x%lx " + "(self 0x%lx)", rwl->rwl_name, rwl, __func__, + owner, self); + db_enter(); + } +#endif + if (ISSET(flags, RW_INTR) && (error != 0)) { + rw_dec(&rwl->rwl_waiters); return (error); - if (flags & RW_SLEEPFAIL) + } + if (ISSET(flags, RW_SLEEPFAIL)) { + rw_dec(&rwl->rwl_waiters); + rw_exited(rwl); return (EAGAIN); - } + } + + owner = rw_cas(&rwl->rwl_owner, 0, self); + } while (owner != 0); + rw_dec(&rwl->rwl_waiters); - if (__predict_false(rw_cas(&rwl->rwl_owner, o, o + inc))) - goto retry; +locked: membar_enter_after_atomic(); + WITNESS_LOCK(&rwl->rwl_lock_obj, lop_flags); - /* - * If old lock had RWLOCK_WAIT and RWLOCK_WRLOCK set, it means we - * downgraded a write lock and had possible read waiter, wake them - * to let them retry the lock. - */ - if (__predict_false((o & (RWLOCK_WRLOCK|RWLOCK_WAIT)) == - (RWLOCK_WRLOCK|RWLOCK_WAIT))) - wakeup(rwl); + return (0); +} - if (flags & RW_DOWNGRADE) - WITNESS_DOWNGRADE(&rwl->rwl_lock_obj, lop_flags); - else - WITNESS_LOCK(&rwl->rwl_lock_obj, lop_flags); +static int +rw_read_incr(struct rwlock *rwl, unsigned long owner) +{ + unsigned long incr; + unsigned long nowner; + + do { + incr = owner + RWLOCK_READ_INCR; + nowner = rw_cas(&rwl->rwl_owner, owner, incr); + if (nowner == owner) + return (1); + + owner = nowner; + } while (!ISSET(owner, RWLOCK_WRLOCK)); return (0); } -void -rw_exit(struct rwlock *rwl) +static int +rw_do_enter_read(struct rwlock *rwl, int flags) { - unsigned long wrlock; + unsigned long owner; + int error; + int prio; - /* Avoid deadlocks after panic or in DDB */ - if (panicstr || db_active) - return; +#ifdef WITNESS + int lop_flags = LOP_NEWORDER; + if (ISSET(flags, RW_DUPOK)) + lop_flags |= LOP_DUPOK; + if (!ISSET(flags, RW_NOSLEEP)) + WITNESS_CHECKORDER(&rwl->rwl_lock_obj, lop_flags, NULL); +#endif - wrlock = rwl->rwl_owner & RWLOCK_WRLOCK; - if (wrlock) - rw_assert_wrlock(rwl); - else - rw_assert_rdlock(rwl); - WITNESS_UNLOCK(&rwl->rwl_lock_obj, wrlock ? LOP_EXCLUSIVE : 0); + owner = rw_cas(&rwl->rwl_owner, 0, RWLOCK_READ_INCR); + if (owner == 0) { + /* ermagerd, we won! */ + goto locked; + } + + if (ISSET(owner, RWLOCK_WRLOCK)) { + if (__predict_false(owner == rw_self())) { + panic("%s rwlock %p: enter read deadlock", + rwl->rwl_name, rwl); + } + } else if (atomic_load_int(&rwl->rwl_waiters) == 0) { + if (rw_read_incr(rwl, owner)) { + /* nailed it */ + goto locked; + } + } + + if (ISSET(flags, RW_NOSLEEP)) + return (EBUSY); + + prio = PLOCK; + if (ISSET(flags, RW_INTR)) + prio |= PCATCH; + + rw_inc(&rwl->rwl_readers); + membar_producer(); + do { + sleep_setup(&rwl->rwl_readers, prio, rwl->rwl_name); + membar_consumer(); + error = sleep_finish(RW_SLEEP_TMO, + atomic_load_int(&rwl->rwl_waiters) > 0 || + ISSET(atomic_load_long(&rwl->rwl_owner), RWLOCK_WRLOCK)); +#ifdef RWDIAG + if (error == EWOULDBLOCK) { + printf("%s rwlock %p: %s timeout owner 0x%lx\n", + rwl->rwl_name, rwl, __func__, owner); + db_enter(); + } +#endif + if (ISSET(flags, RW_INTR) && (error != 0)) + goto fail; + if (ISSET(flags, RW_SLEEPFAIL)) { + error = EAGAIN; + goto fail; + } + } while (!rw_read_incr(rwl, 0)); + rw_dec(&rwl->rwl_readers); + +locked: + membar_enter_after_atomic(); + WITNESS_LOCK(&rwl->rwl_lock_obj, lop_flags); + + return (0); +fail: + rw_dec(&rwl->rwl_readers); + return (error); +} + +static int +rw_downgrade(struct rwlock *rwl, int flags) +{ + unsigned long self = rw_self(); + unsigned long owner; membar_exit_before_atomic(); - rw_do_exit(rwl, wrlock); + owner = atomic_cas_ulong(&rwl->rwl_owner, self, RWLOCK_READ_INCR); + if (__predict_false(owner != self)) { + panic("%s rwlock %p: downgrade when lock not held " + "(owner 0x%lx, self 0x%lx)", rwl->rwl_name, rwl, + owner, self); + } + +#ifdef WITNESS + { + int lop_flags = LOP_NEWORDER; + if (ISSET(flags, RW_DUPOK)) + lop_flags |= LOP_DUPOK; + WITNESS_DOWNGRADE(&rwl->rwl_lock_obj, lop_flags); + } +#endif + + membar_consumer(); + if (atomic_load_int(&rwl->rwl_waiters) == 0 && + atomic_load_int(&rwl->rwl_readers) > 0) + wakeup(&rwl->rwl_readers); + + return (0); } -/* membar_exit_before_atomic() has to precede call of this function. */ void -rw_do_exit(struct rwlock *rwl, unsigned long wrlock) +rw_exit(struct rwlock *rwl) { - unsigned long owner, set; + unsigned long owner; - do { - owner = rwl->rwl_owner; - if (wrlock) - set = 0; - else - set = (owner - RWLOCK_READ_INCR) & - ~(RWLOCK_WAIT|RWLOCK_WRWANT); - /* - * Potential MP race here. If the owner had WRWANT set, we - * cleared it and a reader can sneak in before a writer. - */ - } while (__predict_false(rw_cas(&rwl->rwl_owner, owner, set))); + owner = atomic_load_long(&rwl->rwl_owner); + if (__predict_false(owner == 0)) { + panic("%s rwlock %p: exit on unlocked lock", + rwl->rwl_name, rwl); + } + + if (ISSET(owner, RWLOCK_WRLOCK)) + rw_exit_write(rwl); + else + rw_do_exit_read(rwl, owner); +} - if (owner & RWLOCK_WAIT) - wakeup(rwl); +static void +rw_exited(struct rwlock *rwl) +{ + membar_consumer(); + if (atomic_load_int(&rwl->rwl_waiters) > 0) + wakeup_one(&rwl->rwl_waiters); + else if (atomic_load_int(&rwl->rwl_readers) > 0) + wakeup(&rwl->rwl_readers); } int rw_status(struct rwlock *rwl) { - unsigned long owner = rwl->rwl_owner; + unsigned long owner; - if (owner & RWLOCK_WRLOCK) { - if (RW_PROC(curproc) == RW_PROC(owner)) + owner = atomic_load_long(&rwl->rwl_owner); + if (ISSET(owner, RWLOCK_WRLOCK)) { + if (rw_self() == owner) return RW_WRITE; else return RW_WRITE_OTHER; @@ -380,11 +488,10 @@ rw_assert_wrlock(struct rwlock *rwl) #ifdef WITNESS witness_assert(&rwl->rwl_lock_obj, LA_XLOCKED); #else - if (!(rwl->rwl_owner & RWLOCK_WRLOCK)) - panic("%s: lock not held", rwl->rwl_name); - - if (RW_PROC(curproc) != RW_PROC(rwl->rwl_owner)) - panic("%s: lock not held by this process", rwl->rwl_name); + if (atomic_load_long(&rwl->rwl_owner) != rw_self()) { + panic("%s rwlock %p: lock not held by this process", + rwl->rwl_name, rwl); + } #endif } @@ -397,8 +504,8 @@ rw_assert_rdlock(struct rwlock *rwl) #ifdef WITNESS witness_assert(&rwl->rwl_lock_obj, LA_SLOCKED); #else - if (!RW_PROC(rwl->rwl_owner) || (rwl->rwl_owner & RWLOCK_WRLOCK)) - panic("%s: lock not shared", rwl->rwl_name); + if (rw_status(rwl) != RW_READ) + panic("%s rwlock %p: lock not shared", rwl->rwl_name, rwl); #endif } @@ -413,9 +520,11 @@ rw_assert_anylock(struct rwlock *rwl) #else switch (rw_status(rwl)) { case RW_WRITE_OTHER: - panic("%s: lock held by different process", rwl->rwl_name); + panic("%s rwlock %p: lock held by different process " + "(self %lx, owner %lx)", rwl->rwl_name, rwl, + rw_self(), rwl->rwl_owner); case 0: - panic("%s: lock not held", rwl->rwl_name); + panic("%s rwlock %p: lock not held", rwl->rwl_name, rwl); } #endif } @@ -429,8 +538,8 @@ rw_assert_unlocked(struct rwlock *rwl) #ifdef WITNESS witness_assert(&rwl->rwl_lock_obj, LA_UNLOCKED); #else - if (RW_PROC(curproc) == RW_PROC(rwl->rwl_owner)) - panic("%s: lock held", rwl->rwl_name); + if (atomic_load_long(&rwl->rwl_owner) == rw_self()) + panic("%s rwlock %p: lock held", rwl->rwl_name, rwl); #endif } #endif @@ -450,7 +559,7 @@ rrw_enter(struct rrwlock *rrwl, int flag { int rv; - if (RW_PROC(rrwl->rrwl_lock.rwl_owner) == RW_PROC(curproc)) { + if (atomic_load_long(&rrwl->rrwl_lock.rwl_owner) == rw_self()) { if (flags & RW_RECURSEFAIL) return (EDEADLK); else { @@ -472,7 +581,7 @@ void rrw_exit(struct rrwlock *rrwl) { - if (RW_PROC(rrwl->rrwl_lock.rwl_owner) == RW_PROC(curproc)) { + if (atomic_load_long(&rrwl->rrwl_lock.rwl_owner) == rw_self()) { KASSERT(rrwl->rrwl_wcnt > 0); rrwl->rrwl_wcnt--; if (rrwl->rrwl_wcnt != 0) {