Download raw body.
kernel rwlocks vs the scheduler
Thanks David!
On 13/11/24(Wed) 23:41, David Gwynne wrote:
> it's become obvious that heavily contended rwlocks put a lot of
> pressure on the scheduler, and we have enough contended locks that
> there's a benefit to changing rwlocks to try and mitigate against
> that pressure.
Indeed, some numbers below.
> when a thread is waiting on an rwlock, it sets a bit in the rwlock
> to indicate that when the current owner of the rwlock leaves the
> critical section, it should wake up the waiting thread to try and
> take the lock. if there's no waiting thread, the owner can skip the
> wakeup.
>
> the problem is that rwlocks can't tell the difference between one
> waiting thread and more than one waiting thread. so when the "there's
> a thread waiting" bit is cleared, all the waiting threads are woken
> up. one of these woken threads will take ownership of the lock, but
> also importantly, the other threads will end up setting the "im
> waiting" bit again, which is necessary for them to be woken up by
> the 2nd thread that won the race to become the owner of the lock.
>
> this is compounded by pending writers and readers waiting on the
> same wait channel. an rwlock may have one pending writer trying to
> take the lock, but many readers waiting for it too. it would make
> sense to wake up only the writer so it can take the lock next, but
> we end up waking the readers at the same time.
>
> the result of this is that contended rwlocks wake up a lot of
> threads, which puts a lot of pressure on the scheduler. this is
> noticeable as a lot of contention on the scheduler lock, which
> increases the system time used by the system. this is a pretty
> classic thundering herd problem.
>
> the diff below mitigates against these wakeups by adding counters
> to rwlocks for the number threads waiting to take write and read
> locks instead of relying on bits. when a thread needs to wait for
> a rwlock it increments the relevant counter before sleeping. after
> it is woken up and takes the lock it decrements that counter. this
> means rwlocks know how many threads are waiting at all times without
> having to wake everything up to rebuild state every time a thread
> releases the lock.
>
> pending writers and readers also wait on separate wchans. this
> allows us to prioritise writers and to wake them up one at a time.
> once there's no pending writers all pending readers can be woken
> up in one go so
> they can share the lock as soon as possible.
>
> if you are suffering a contended rwlock, this should reduce the
> amount of time spent spinning on the sched lock, which in turn may
> also reduce the wall clock time doing that work.
>
> the only downside to this diff in my opinion is that it grows struct
> rwlock by 8 bytes.
IMHO that's a non issue. The current rwlock implementation and its
original goal to be small is an example of too early optimization that
proved to do more harm than good.
> i've been hitting this diff hard, but would appreciate more tests.
> rwlocks are a very fundamental part of the kernel so they need to
> work. benchmarks and witness tests are also welcome.
I love this diff. I love how simple the implementation became and how
easier it will be to extend it. I already suggested playing the
`spc_spinning' dance. I also like the fact we can now unify the various
rW_enter_*() functions.
I tested this diff on a 24 CPUs amd64 and I couldn't notice any notable
difference.
However on a 80 CPUs arm64 machine it decreases the contention *a lot*
for workload using more than 32 CPUs.
Here are numbers:
kernel -j64 with -current
=========================
6m45.56s real 11m42.21s user 162m11.60s system (cold) (~50% spin)
2m02.54s real 11m38.43s user 19m04.36s system (hot)
1m57.80s real 11m41.16s user 16m55.80s system (hot)
kernel -j64 with -current+rwlock
================================
4m05.35s real 11m49.56s user 61m52.88s system (cold) (<30% spin)
1m23.47s real 11m45.28s user 7m23.24s system (hot)
1m23.73s real 11m47.70s user 7m25.20s system (hot)
For reference, with this diff, we have the same number as with 'make -j32':
1m23.02s real 11m29.40s user 6m59.53s system
I haven't try WITNESS kernel, I'd be glad if someone could report on
that.
> Index: sys/rwlock.h
> ===================================================================
> RCS file: /cvs/src/sys/sys/rwlock.h,v
> diff -u -p -r1.28 rwlock.h
> --- sys/rwlock.h 11 Jan 2021 18:49:38 -0000 1.28
> +++ sys/rwlock.h 24 Oct 2024 11:18:04 -0000
> @@ -60,6 +60,8 @@ struct proc;
>
> struct rwlock {
> volatile unsigned long rwl_owner;
> + volatile unsigned int rwl_waiters;
> + volatile unsigned int rwl_readers;
> const char *rwl_name;
> #ifdef WITNESS
> struct lock_object rwl_lock_obj;
> @@ -91,14 +93,12 @@ struct rwlock {
>
> #ifdef WITNESS
> #define RWLOCK_INITIALIZER(name) \
> - { 0, name, .rwl_lock_obj = RWLOCK_LO_INITIALIZER(name, 0) }
> + { 0, 0, 0, name, .rwl_lock_obj = RWLOCK_LO_INITIALIZER(name, 0) }
> #else
> #define RWLOCK_INITIALIZER(name) \
> - { 0, name }
> + { 0, 0, 0, name }
> #endif
>
> -#define RWLOCK_WAIT 0x01UL
> -#define RWLOCK_WRWANT 0x02UL
> #define RWLOCK_WRLOCK 0x04UL
> #define RWLOCK_MASK 0x07UL
>
> Index: kern/kern_rwlock.c
> ===================================================================
> RCS file: /cvs/src/sys/kern/kern_rwlock.c,v
> diff -u -p -r1.50 kern_rwlock.c
> --- kern/kern_rwlock.c 14 Jul 2023 07:07:08 -0000 1.50
> +++ kern/kern_rwlock.c 24 Oct 2024 11:18:05 -0000
> @@ -26,10 +26,12 @@
> #include <sys/atomic.h>
> #include <sys/witness.h>
>
> -void rw_do_exit(struct rwlock *, unsigned long);
> -
> -/* XXX - temporary measure until proc0 is properly aligned */
> -#define RW_PROC(p) (((long)p) & ~RWLOCK_MASK)
> +#ifdef RWDIAG
> +#include <sys/kernel.h> /* for hz */
> +#define RW_SLEEP_TMO 10 * hz
> +#else
> +#define RW_SLEEP_TMO 0
> +#endif
>
> /*
> * Other OSes implement more sophisticated mechanism to determine how long the
> @@ -40,166 +42,131 @@ void rw_do_exit(struct rwlock *, unsigne
> #define RW_SPINS 1000
>
> #ifdef MULTIPROCESSOR
> -#define rw_cas(p, o, n) (atomic_cas_ulong(p, o, n) != o)
> +#define rw_cas(p, e, n) atomic_cas_ulong(p, e, n)
> +#define rw_inc(p) atomic_inc_int(p)
> +#define rw_dec(p) atomic_dec_int(p)
> #else
> -static inline int
> -rw_cas(volatile unsigned long *p, unsigned long o, unsigned long n)
> +static inline unsigned long
> +rw_cas(volatile unsigned long *p, unsigned long e, unsigned long n)
> {
> - if (*p != o)
> - return (1);
> - *p = n;
> + unsigned long o = *p;
>
> - return (0);
> + if (o == e)
> + *p = n;
> +
> + return (o);
> +}
> +
> +static inline void
> +rw_inc(volatile unsigned int *p)
> +{
> + ++(*p);
> +}
> +
> +static inline void
> +rw_dec(volatile unsigned int *p)
> +{
> + (*p)--;
> }
> #endif
>
> -/*
> - * Magic wand for lock operations. Every operation checks if certain
> - * flags are set and if they aren't, it increments the lock with some
> - * value (that might need some computing in a few cases). If the operation
> - * fails, we need to set certain flags while waiting for the lock.
> - *
> - * RW_WRITE The lock must be completely empty. We increment it with
> - * RWLOCK_WRLOCK and the proc pointer of the holder.
> - * Sets RWLOCK_WAIT|RWLOCK_WRWANT while waiting.
> - * RW_READ RWLOCK_WRLOCK|RWLOCK_WRWANT may not be set. We increment
> - * with RWLOCK_READ_INCR. RWLOCK_WAIT while waiting.
> - */
> -static const struct rwlock_op {
> - unsigned long inc;
> - unsigned long check;
> - unsigned long wait_set;
> - long proc_mult;
> - int wait_prio;
> -} rw_ops[] = {
> - { /* RW_WRITE */
> - RWLOCK_WRLOCK,
> - ULONG_MAX,
> - RWLOCK_WAIT | RWLOCK_WRWANT,
> - 1,
> - PLOCK - 4
> - },
> - { /* RW_READ */
> - RWLOCK_READ_INCR,
> - RWLOCK_WRLOCK | RWLOCK_WRWANT,
> - RWLOCK_WAIT,
> - 0,
> - PLOCK
> - },
> - { /* Sparse Entry. */
> - 0,
> - },
> - { /* RW_DOWNGRADE */
> - RWLOCK_READ_INCR - RWLOCK_WRLOCK,
> - 0,
> - 0,
> - -1,
> - PLOCK
> - },
> -};
> +static int rw_do_enter_read(struct rwlock *, int);
> +static void rw_do_exit_read(struct rwlock *, unsigned long);
> +static int rw_do_enter_write(struct rwlock *, int);
> +static int rw_downgrade(struct rwlock *, int);
> +
> +static void rw_exited(struct rwlock *);
> +
> +static unsigned long
> +rw_self(void)
> +{
> + unsigned long self = (unsigned long)curproc;
> +
> + CLR(self, RWLOCK_MASK);
> + SET(self, RWLOCK_WRLOCK);
> +
> + return (self);
> +}
>
> void
> rw_enter_read(struct rwlock *rwl)
> {
> - unsigned long owner = rwl->rwl_owner;
> -
> - if (__predict_false((owner & (RWLOCK_WRLOCK | RWLOCK_WRWANT)) ||
> - rw_cas(&rwl->rwl_owner, owner, owner + RWLOCK_READ_INCR)))
> - rw_enter(rwl, RW_READ);
> - else {
> - membar_enter_after_atomic();
> - WITNESS_CHECKORDER(&rwl->rwl_lock_obj, LOP_NEWORDER, NULL);
> - WITNESS_LOCK(&rwl->rwl_lock_obj, 0);
> - }
> + rw_do_enter_read(rwl, 0);
> }
>
> void
> rw_enter_write(struct rwlock *rwl)
> {
> - struct proc *p = curproc;
> -
> - if (__predict_false(rw_cas(&rwl->rwl_owner, 0,
> - RW_PROC(p) | RWLOCK_WRLOCK)))
> - rw_enter(rwl, RW_WRITE);
> - else {
> - membar_enter_after_atomic();
> - WITNESS_CHECKORDER(&rwl->rwl_lock_obj,
> - LOP_EXCLUSIVE | LOP_NEWORDER, NULL);
> - WITNESS_LOCK(&rwl->rwl_lock_obj, LOP_EXCLUSIVE);
> - }
> + rw_do_enter_write(rwl, 0);
> }
>
> void
> rw_exit_read(struct rwlock *rwl)
> {
> - unsigned long owner;
> + /* maybe we're the last one? */
> + rw_do_exit_read(rwl, RWLOCK_READ_INCR);
> +}
> +
> +static void
> +rw_do_exit_read(struct rwlock *rwl, unsigned long owner)
> +{
> + unsigned long decr;
> + unsigned long nowner;
>
> - rw_assert_rdlock(rwl);
> WITNESS_UNLOCK(&rwl->rwl_lock_obj, 0);
>
> - membar_exit_before_atomic();
> - owner = rwl->rwl_owner;
> - if (__predict_false((owner & RWLOCK_WAIT) ||
> - rw_cas(&rwl->rwl_owner, owner, owner - RWLOCK_READ_INCR)))
> - rw_do_exit(rwl, 0);
> + for (;;) {
> + decr = owner - RWLOCK_READ_INCR;
> + nowner = rw_cas(&rwl->rwl_owner, owner, decr);
> + if (owner == nowner)
> + break;
> +
> + if (__predict_false(ISSET(nowner, RWLOCK_WRLOCK))) {
> + panic("%s rwlock %p: exit read on write locked lock"
> + " (owner 0x%lx)", rwl->rwl_name, rwl, nowner);
> + }
> + if (__predict_false(nowner == 0)) {
> + panic("%s rwlock %p: exit read on unlocked lock",
> + rwl->rwl_name, rwl);
> + }
> +
> + owner = nowner;
> + }
> +
> + /* read lock didn't change anything, so no barrier needed? */
> +
> + if (decr == 0) {
> + /* last one out */
> + rw_exited(rwl);
> + }
> }
>
> void
> rw_exit_write(struct rwlock *rwl)
> {
> + unsigned long self = rw_self();
> unsigned long owner;
>
> - rw_assert_wrlock(rwl);
> WITNESS_UNLOCK(&rwl->rwl_lock_obj, LOP_EXCLUSIVE);
>
> membar_exit_before_atomic();
> - owner = rwl->rwl_owner;
> - if (__predict_false((owner & RWLOCK_WAIT) ||
> - rw_cas(&rwl->rwl_owner, owner, 0)))
> - rw_do_exit(rwl, RWLOCK_WRLOCK);
> -}
> -
> -#ifdef DIAGNOSTIC
> -/*
> - * Put the diagnostic functions here to keep the main code free
> - * from ifdef clutter.
> - */
> -static void
> -rw_enter_diag(struct rwlock *rwl, int flags)
> -{
> - switch (flags & RW_OPMASK) {
> - case RW_WRITE:
> - case RW_READ:
> - if (RW_PROC(curproc) == RW_PROC(rwl->rwl_owner))
> - panic("rw_enter: %s locking against myself",
> - rwl->rwl_name);
> - break;
> - case RW_DOWNGRADE:
> - /*
> - * If we're downgrading, we must hold the write lock.
> - */
> - if ((rwl->rwl_owner & RWLOCK_WRLOCK) == 0)
> - panic("rw_enter: %s downgrade of non-write lock",
> - rwl->rwl_name);
> - if (RW_PROC(curproc) != RW_PROC(rwl->rwl_owner))
> - panic("rw_enter: %s downgrade, not holder",
> - rwl->rwl_name);
> - break;
> -
> - default:
> - panic("rw_enter: unknown op 0x%x", flags);
> + owner = rw_cas(&rwl->rwl_owner, self, 0);
> + if (__predict_false(owner != self)) {
> + panic("%s rwlock %p: exit write when lock not held "
> + "(owner 0x%lx, self 0x%lx)", rwl->rwl_name, rwl,
> + owner, self);
> }
> -}
>
> -#else
> -#define rw_enter_diag(r, f)
> -#endif
> + rw_exited(rwl);
> +}
>
> static void
> _rw_init_flags_witness(struct rwlock *rwl, const char *name, int lo_flags,
> const struct lock_type *type)
> {
> rwl->rwl_owner = 0;
> + rwl->rwl_waiters = 0;
> + rwl->rwl_readers = 0;
> rwl->rwl_name = name;
>
> #ifdef WITNESS
> @@ -223,144 +190,285 @@ _rw_init_flags(struct rwlock *rwl, const
> int
> rw_enter(struct rwlock *rwl, int flags)
> {
> - const struct rwlock_op *op;
> - unsigned long inc, o;
> -#ifdef MULTIPROCESSOR
> - /*
> - * If process holds the kernel lock, then we want to give up on CPU
> - * as soon as possible so other processes waiting for the kernel lock
> - * can progress. Hence no spinning if we hold the kernel lock.
> - */
> - unsigned int spin = (_kernel_lock_held()) ? 0 : RW_SPINS;
> -#endif
> - int error, prio;
> -#ifdef WITNESS
> - int lop_flags;
> + int op = flags & RW_OPMASK;
> + int error;
> +
> + switch (op) {
> + case RW_WRITE:
> + error = rw_do_enter_write(rwl, flags);
> + break;
> + case RW_READ:
> + error = rw_do_enter_read(rwl, flags);
> + break;
> + case RW_DOWNGRADE:
> + error = rw_downgrade(rwl, flags);
> + break;
> + default:
> + panic("%s rwlock %p: %s unexpected op 0x%x",
> + rwl->rwl_name, rwl, __func__, op);
> + /* NOTREACHED */
> + }
>
> - lop_flags = LOP_NEWORDER;
> - if (flags & RW_WRITE)
> - lop_flags |= LOP_EXCLUSIVE;
> - if (flags & RW_DUPOK)
> + return (error);
> +}
> +
> +static int
> +rw_do_enter_write(struct rwlock *rwl, int flags)
> +{
> + unsigned long self = rw_self();
> + unsigned long owner;
> + int prio;
> + int error;
> +
> +#ifdef WITNESS
> + int lop_flags = LOP_NEWORDER | LOP_EXCLUSIVE;
> + if (ISSET(flags, RW_DUPOK))
> lop_flags |= LOP_DUPOK;
> - if ((flags & RW_NOSLEEP) == 0 && (flags & RW_DOWNGRADE) == 0)
> +
> + if (!ISSET(flags, RW_NOSLEEP))
> WITNESS_CHECKORDER(&rwl->rwl_lock_obj, lop_flags, NULL);
> #endif
>
> - op = &rw_ops[(flags & RW_OPMASK) - 1];
> -
> - inc = op->inc + RW_PROC(curproc) * op->proc_mult;
> -retry:
> - while (__predict_false(((o = rwl->rwl_owner) & op->check) != 0)) {
> - unsigned long set = o | op->wait_set;
> - int do_sleep;
> -
> - /* Avoid deadlocks after panic or in DDB */
> - if (panicstr || db_active)
> - return (0);
> + owner = rw_cas(&rwl->rwl_owner, 0, self);
> + if (owner == 0) {
> + /* wow, we won. so easy */
> + goto locked;
> + }
> + if (__predict_false(owner == self)) {
> + panic("%s rwlock %p: enter write deadlock",
> + rwl->rwl_name, rwl);
> + }
>
> #ifdef MULTIPROCESSOR
> + /*
> + * If process holds the kernel lock, then we want to give up on CPU
> + * as soon as possible so other processes waiting for the kernel lock
> + * can progress. Hence no spinning if we hold the kernel lock.
> + */
> + if (!_kernel_lock_held()) {
> + int spins;
> +
> /*
> * It makes sense to try to spin just in case the lock
> * is acquired by writer.
> */
> - if ((o & RWLOCK_WRLOCK) && (spin != 0)) {
> - spin--;
> +
> + for (spins = 0; spins < RW_SPINS; spins++) {
> CPU_BUSY_CYCLE();
> - continue;
> + owner = atomic_load_long(&rwl->rwl_owner);
> + if (owner != 0)
> + continue;
> +
> + owner = rw_cas(&rwl->rwl_owner, 0, self);
> + if (owner == 0) {
> + /* ok, we won now. */
> + goto locked;
> + }
> }
> + }
> #endif
>
> - rw_enter_diag(rwl, flags);
> -
> - if (flags & RW_NOSLEEP)
> - return (EBUSY);
> + if (ISSET(flags, RW_NOSLEEP))
> + return (EBUSY);
>
> - prio = op->wait_prio;
> - if (flags & RW_INTR)
> - prio |= PCATCH;
> - sleep_setup(rwl, prio, rwl->rwl_name);
> + prio = PLOCK - 4;
> + if (ISSET(flags, RW_INTR))
> + prio |= PCATCH;
>
> - do_sleep = !rw_cas(&rwl->rwl_owner, o, set);
> -
> - error = sleep_finish(0, do_sleep);
> - if ((flags & RW_INTR) &&
> - (error != 0))
> + rw_inc(&rwl->rwl_waiters);
> + membar_producer();
> + do {
> + sleep_setup(&rwl->rwl_waiters, prio, rwl->rwl_name);
> + membar_consumer();
> + owner = atomic_load_long(&rwl->rwl_owner);
> + error = sleep_finish(RW_SLEEP_TMO, owner != 0);
> +#ifdef RWDIAG
> + if (error == EWOULDBLOCK) {
> + printf("%s rwlock %p: %s timeout owner 0x%lx "
> + "(self 0x%lx)", rwl->rwl_name, rwl, __func__,
> + owner, self);
> + db_enter();
> + }
> +#endif
> + if (ISSET(flags, RW_INTR) && (error != 0)) {
> + rw_dec(&rwl->rwl_waiters);
> return (error);
> - if (flags & RW_SLEEPFAIL)
> + }
> + if (ISSET(flags, RW_SLEEPFAIL)) {
> + rw_dec(&rwl->rwl_waiters);
> + rw_exited(rwl);
> return (EAGAIN);
> - }
> + }
> +
> + owner = rw_cas(&rwl->rwl_owner, 0, self);
> + } while (owner != 0);
> + rw_dec(&rwl->rwl_waiters);
>
> - if (__predict_false(rw_cas(&rwl->rwl_owner, o, o + inc)))
> - goto retry;
> +locked:
> membar_enter_after_atomic();
> + WITNESS_LOCK(&rwl->rwl_lock_obj, lop_flags);
>
> - /*
> - * If old lock had RWLOCK_WAIT and RWLOCK_WRLOCK set, it means we
> - * downgraded a write lock and had possible read waiter, wake them
> - * to let them retry the lock.
> - */
> - if (__predict_false((o & (RWLOCK_WRLOCK|RWLOCK_WAIT)) ==
> - (RWLOCK_WRLOCK|RWLOCK_WAIT)))
> - wakeup(rwl);
> + return (0);
> +}
>
> - if (flags & RW_DOWNGRADE)
> - WITNESS_DOWNGRADE(&rwl->rwl_lock_obj, lop_flags);
> - else
> - WITNESS_LOCK(&rwl->rwl_lock_obj, lop_flags);
> +static int
> +rw_read_incr(struct rwlock *rwl, unsigned long owner)
> +{
> + unsigned long incr;
> + unsigned long nowner;
> +
> + do {
> + incr = owner + RWLOCK_READ_INCR;
> + nowner = rw_cas(&rwl->rwl_owner, owner, incr);
> + if (nowner == owner)
> + return (1);
> +
> + owner = nowner;
> + } while (!ISSET(owner, RWLOCK_WRLOCK));
>
> return (0);
> }
>
> -void
> -rw_exit(struct rwlock *rwl)
> +static int
> +rw_do_enter_read(struct rwlock *rwl, int flags)
> {
> - unsigned long wrlock;
> + unsigned long owner;
> + int error;
> + int prio;
>
> - /* Avoid deadlocks after panic or in DDB */
> - if (panicstr || db_active)
> - return;
> +#ifdef WITNESS
> + int lop_flags = LOP_NEWORDER;
> + if (ISSET(flags, RW_DUPOK))
> + lop_flags |= LOP_DUPOK;
> + if (!ISSET(flags, RW_NOSLEEP))
> + WITNESS_CHECKORDER(&rwl->rwl_lock_obj, lop_flags, NULL);
> +#endif
>
> - wrlock = rwl->rwl_owner & RWLOCK_WRLOCK;
> - if (wrlock)
> - rw_assert_wrlock(rwl);
> - else
> - rw_assert_rdlock(rwl);
> - WITNESS_UNLOCK(&rwl->rwl_lock_obj, wrlock ? LOP_EXCLUSIVE : 0);
> + owner = rw_cas(&rwl->rwl_owner, 0, RWLOCK_READ_INCR);
> + if (owner == 0) {
> + /* ermagerd, we won! */
> + goto locked;
> + }
> +
> + if (ISSET(owner, RWLOCK_WRLOCK)) {
> + if (__predict_false(owner == rw_self())) {
> + panic("%s rwlock %p: enter read deadlock",
> + rwl->rwl_name, rwl);
> + }
> + } else if (atomic_load_int(&rwl->rwl_waiters) == 0) {
> + if (rw_read_incr(rwl, owner)) {
> + /* nailed it */
> + goto locked;
> + }
> + }
> +
> + if (ISSET(flags, RW_NOSLEEP))
> + return (EBUSY);
> +
> + prio = PLOCK;
> + if (ISSET(flags, RW_INTR))
> + prio |= PCATCH;
> +
> + rw_inc(&rwl->rwl_readers);
> + membar_producer();
> + do {
> + sleep_setup(&rwl->rwl_readers, prio, rwl->rwl_name);
> + membar_consumer();
> + error = sleep_finish(RW_SLEEP_TMO,
> + atomic_load_int(&rwl->rwl_waiters) > 0 ||
> + ISSET(atomic_load_long(&rwl->rwl_owner), RWLOCK_WRLOCK));
> +#ifdef RWDIAG
> + if (error == EWOULDBLOCK) {
> + printf("%s rwlock %p: %s timeout owner 0x%lx\n",
> + rwl->rwl_name, rwl, __func__, owner);
> + db_enter();
> + }
> +#endif
> + if (ISSET(flags, RW_INTR) && (error != 0))
> + goto fail;
> + if (ISSET(flags, RW_SLEEPFAIL)) {
> + error = EAGAIN;
> + goto fail;
> + }
> + } while (!rw_read_incr(rwl, 0));
> + rw_dec(&rwl->rwl_readers);
> +
> +locked:
> + membar_enter_after_atomic();
> + WITNESS_LOCK(&rwl->rwl_lock_obj, lop_flags);
> +
> + return (0);
> +fail:
> + rw_dec(&rwl->rwl_readers);
> + return (error);
> +}
> +
> +static int
> +rw_downgrade(struct rwlock *rwl, int flags)
> +{
> + unsigned long self = rw_self();
> + unsigned long owner;
>
> membar_exit_before_atomic();
> - rw_do_exit(rwl, wrlock);
> + owner = atomic_cas_ulong(&rwl->rwl_owner, self, RWLOCK_READ_INCR);
> + if (__predict_false(owner != self)) {
> + panic("%s rwlock %p: downgrade when lock not held "
> + "(owner 0x%lx, self 0x%lx)", rwl->rwl_name, rwl,
> + owner, self);
> + }
> +
> +#ifdef WITNESS
> + {
> + int lop_flags = LOP_NEWORDER;
> + if (ISSET(flags, RW_DUPOK))
> + lop_flags |= LOP_DUPOK;
> + WITNESS_DOWNGRADE(&rwl->rwl_lock_obj, lop_flags);
> + }
> +#endif
> +
> + membar_consumer();
> + if (atomic_load_int(&rwl->rwl_waiters) == 0 &&
> + atomic_load_int(&rwl->rwl_readers) > 0)
> + wakeup(&rwl->rwl_readers);
> +
> + return (0);
> }
>
> -/* membar_exit_before_atomic() has to precede call of this function. */
> void
> -rw_do_exit(struct rwlock *rwl, unsigned long wrlock)
> +rw_exit(struct rwlock *rwl)
> {
> - unsigned long owner, set;
> + unsigned long owner;
>
> - do {
> - owner = rwl->rwl_owner;
> - if (wrlock)
> - set = 0;
> - else
> - set = (owner - RWLOCK_READ_INCR) &
> - ~(RWLOCK_WAIT|RWLOCK_WRWANT);
> - /*
> - * Potential MP race here. If the owner had WRWANT set, we
> - * cleared it and a reader can sneak in before a writer.
> - */
> - } while (__predict_false(rw_cas(&rwl->rwl_owner, owner, set)));
> + owner = atomic_load_long(&rwl->rwl_owner);
> + if (__predict_false(owner == 0)) {
> + panic("%s rwlock %p: exit on unlocked lock",
> + rwl->rwl_name, rwl);
> + }
> +
> + if (ISSET(owner, RWLOCK_WRLOCK))
> + rw_exit_write(rwl);
> + else
> + rw_do_exit_read(rwl, owner);
> +}
>
> - if (owner & RWLOCK_WAIT)
> - wakeup(rwl);
> +static void
> +rw_exited(struct rwlock *rwl)
> +{
> + membar_consumer();
> + if (atomic_load_int(&rwl->rwl_waiters) > 0)
> + wakeup_one(&rwl->rwl_waiters);
> + else if (atomic_load_int(&rwl->rwl_readers) > 0)
> + wakeup(&rwl->rwl_readers);
> }
>
> int
> rw_status(struct rwlock *rwl)
> {
> - unsigned long owner = rwl->rwl_owner;
> + unsigned long owner;
>
> - if (owner & RWLOCK_WRLOCK) {
> - if (RW_PROC(curproc) == RW_PROC(owner))
> + owner = atomic_load_long(&rwl->rwl_owner);
> + if (ISSET(owner, RWLOCK_WRLOCK)) {
> + if (rw_self() == owner)
> return RW_WRITE;
> else
> return RW_WRITE_OTHER;
> @@ -380,11 +488,10 @@ rw_assert_wrlock(struct rwlock *rwl)
> #ifdef WITNESS
> witness_assert(&rwl->rwl_lock_obj, LA_XLOCKED);
> #else
> - if (!(rwl->rwl_owner & RWLOCK_WRLOCK))
> - panic("%s: lock not held", rwl->rwl_name);
> -
> - if (RW_PROC(curproc) != RW_PROC(rwl->rwl_owner))
> - panic("%s: lock not held by this process", rwl->rwl_name);
> + if (atomic_load_long(&rwl->rwl_owner) != rw_self()) {
> + panic("%s rwlock %p: lock not held by this process",
> + rwl->rwl_name, rwl);
> + }
> #endif
> }
>
> @@ -397,8 +504,8 @@ rw_assert_rdlock(struct rwlock *rwl)
> #ifdef WITNESS
> witness_assert(&rwl->rwl_lock_obj, LA_SLOCKED);
> #else
> - if (!RW_PROC(rwl->rwl_owner) || (rwl->rwl_owner & RWLOCK_WRLOCK))
> - panic("%s: lock not shared", rwl->rwl_name);
> + if (rw_status(rwl) != RW_READ)
> + panic("%s rwlock %p: lock not shared", rwl->rwl_name, rwl);
> #endif
> }
>
> @@ -413,9 +520,11 @@ rw_assert_anylock(struct rwlock *rwl)
> #else
> switch (rw_status(rwl)) {
> case RW_WRITE_OTHER:
> - panic("%s: lock held by different process", rwl->rwl_name);
> + panic("%s rwlock %p: lock held by different process "
> + "(self %lx, owner %lx)", rwl->rwl_name, rwl,
> + rw_self(), rwl->rwl_owner);
> case 0:
> - panic("%s: lock not held", rwl->rwl_name);
> + panic("%s rwlock %p: lock not held", rwl->rwl_name, rwl);
> }
> #endif
> }
> @@ -429,8 +538,8 @@ rw_assert_unlocked(struct rwlock *rwl)
> #ifdef WITNESS
> witness_assert(&rwl->rwl_lock_obj, LA_UNLOCKED);
> #else
> - if (RW_PROC(curproc) == RW_PROC(rwl->rwl_owner))
> - panic("%s: lock held", rwl->rwl_name);
> + if (atomic_load_long(&rwl->rwl_owner) == rw_self())
> + panic("%s rwlock %p: lock held", rwl->rwl_name, rwl);
> #endif
> }
> #endif
> @@ -450,7 +559,7 @@ rrw_enter(struct rrwlock *rrwl, int flag
> {
> int rv;
>
> - if (RW_PROC(rrwl->rrwl_lock.rwl_owner) == RW_PROC(curproc)) {
> + if (atomic_load_long(&rrwl->rrwl_lock.rwl_owner) == rw_self()) {
> if (flags & RW_RECURSEFAIL)
> return (EDEADLK);
> else {
> @@ -472,7 +581,7 @@ void
> rrw_exit(struct rrwlock *rrwl)
> {
>
> - if (RW_PROC(rrwl->rrwl_lock.rwl_owner) == RW_PROC(curproc)) {
> + if (atomic_load_long(&rrwl->rrwl_lock.rwl_owner) == rw_self()) {
> KASSERT(rrwl->rrwl_wcnt > 0);
> rrwl->rrwl_wcnt--;
> if (rrwl->rrwl_wcnt != 0) {
>
kernel rwlocks vs the scheduler