From: "Lorenz (xha)" Subject: Re: kernel rwlocks vs the scheduler To: David Gwynne Cc: tech@openbsd.org Date: Sun, 24 Nov 2024 09:57:12 +0100 hi David, i was mostly just curious about this diff, and when going through it i found a couple of small things. maby it helps :) On Wed, Nov 13, 2024 at 11:41:10PM +1000, David Gwynne wrote: > Index: kern/kern_rwlock.c > =================================================================== > RCS file: /cvs/src/sys/kern/kern_rwlock.c,v > diff -u -p -r1.50 kern_rwlock.c > --- kern/kern_rwlock.c 14 Jul 2023 07:07:08 -0000 1.50 > +++ kern/kern_rwlock.c 24 Oct 2024 11:18:05 -0000 > @@ -40,166 +42,131 @@ void rw_do_exit(struct rwlock *, unsigne > #define RW_SPINS 1000 > > #ifdef MULTIPROCESSOR > -#define rw_cas(p, o, n) (atomic_cas_ulong(p, o, n) != o) > +#define rw_cas(p, e, n) atomic_cas_ulong(p, e, n) > +#define rw_inc(p) atomic_inc_int(p) > +#define rw_dec(p) atomic_dec_int(p) > #else > -static inline int > -rw_cas(volatile unsigned long *p, unsigned long o, unsigned long n) > +static inline unsigned long > +rw_cas(volatile unsigned long *p, unsigned long e, unsigned long n) > { > - if (*p != o) > - return (1); > - *p = n; > + unsigned long o = *p; > > - return (0); > + if (o == e) > + *p = n; > + > + return (o); > +} > + > +static inline void > +rw_inc(volatile unsigned int *p) > +{ > + ++(*p); > +} > + > +static inline void > +rw_dec(volatile unsigned int *p) > +{ > + (*p)--; > } > #endif > > -/* > - * Magic wand for lock operations. Every operation checks if certain > - * flags are set and if they aren't, it increments the lock with some > - * value (that might need some computing in a few cases). If the operation > - * fails, we need to set certain flags while waiting for the lock. > - * > - * RW_WRITE The lock must be completely empty. We increment it with > - * RWLOCK_WRLOCK and the proc pointer of the holder. > - * Sets RWLOCK_WAIT|RWLOCK_WRWANT while waiting. > - * RW_READ RWLOCK_WRLOCK|RWLOCK_WRWANT may not be set. We increment > - * with RWLOCK_READ_INCR. RWLOCK_WAIT while waiting. > - */ > -static const struct rwlock_op { > - unsigned long inc; > - unsigned long check; > - unsigned long wait_set; > - long proc_mult; > - int wait_prio; > -} rw_ops[] = { > - { /* RW_WRITE */ > - RWLOCK_WRLOCK, > - ULONG_MAX, > - RWLOCK_WAIT | RWLOCK_WRWANT, > - 1, > - PLOCK - 4 > - }, > - { /* RW_READ */ > - RWLOCK_READ_INCR, > - RWLOCK_WRLOCK | RWLOCK_WRWANT, > - RWLOCK_WAIT, > - 0, > - PLOCK > - }, > - { /* Sparse Entry. */ > - 0, > - }, > - { /* RW_DOWNGRADE */ > - RWLOCK_READ_INCR - RWLOCK_WRLOCK, > - 0, > - 0, > - -1, > - PLOCK > - }, > -}; > +static int rw_do_enter_read(struct rwlock *, int); > +static void rw_do_exit_read(struct rwlock *, unsigned long); > +static int rw_do_enter_write(struct rwlock *, int); > +static int rw_downgrade(struct rwlock *, int); > + > +static void rw_exited(struct rwlock *); > + > +static unsigned long > +rw_self(void) > +{ > + unsigned long self = (unsigned long)curproc; > + > + CLR(self, RWLOCK_MASK); > + SET(self, RWLOCK_WRLOCK); > + > + return (self); > +} > > void > rw_enter_read(struct rwlock *rwl) > { > - unsigned long owner = rwl->rwl_owner; > - > - if (__predict_false((owner & (RWLOCK_WRLOCK | RWLOCK_WRWANT)) || > - rw_cas(&rwl->rwl_owner, owner, owner + RWLOCK_READ_INCR))) > - rw_enter(rwl, RW_READ); > - else { > - membar_enter_after_atomic(); > - WITNESS_CHECKORDER(&rwl->rwl_lock_obj, LOP_NEWORDER, NULL); > - WITNESS_LOCK(&rwl->rwl_lock_obj, 0); > - } > + rw_do_enter_read(rwl, 0); > } > > void > rw_enter_write(struct rwlock *rwl) > { > - struct proc *p = curproc; > - > - if (__predict_false(rw_cas(&rwl->rwl_owner, 0, > - RW_PROC(p) | RWLOCK_WRLOCK))) > - rw_enter(rwl, RW_WRITE); > - else { > - membar_enter_after_atomic(); > - WITNESS_CHECKORDER(&rwl->rwl_lock_obj, > - LOP_EXCLUSIVE | LOP_NEWORDER, NULL); > - WITNESS_LOCK(&rwl->rwl_lock_obj, LOP_EXCLUSIVE); > - } > + rw_do_enter_write(rwl, 0); > } > > void > rw_exit_read(struct rwlock *rwl) > { > - unsigned long owner; > + /* maybe we're the last one? */ > + rw_do_exit_read(rwl, RWLOCK_READ_INCR); > +} > + > +static void > +rw_do_exit_read(struct rwlock *rwl, unsigned long owner) > +{ > + unsigned long decr; > + unsigned long nowner; > > - rw_assert_rdlock(rwl); > WITNESS_UNLOCK(&rwl->rwl_lock_obj, 0); > > - membar_exit_before_atomic(); > - owner = rwl->rwl_owner; > - if (__predict_false((owner & RWLOCK_WAIT) || > - rw_cas(&rwl->rwl_owner, owner, owner - RWLOCK_READ_INCR))) > - rw_do_exit(rwl, 0); > + for (;;) { > + decr = owner - RWLOCK_READ_INCR; > + nowner = rw_cas(&rwl->rwl_owner, owner, decr); > + if (owner == nowner) > + break; > + > + if (__predict_false(ISSET(nowner, RWLOCK_WRLOCK))) { > + panic("%s rwlock %p: exit read on write locked lock" > + " (owner 0x%lx)", rwl->rwl_name, rwl, nowner); > + } > + if (__predict_false(nowner == 0)) { > + panic("%s rwlock %p: exit read on unlocked lock", > + rwl->rwl_name, rwl); > + } > + > + owner = nowner; > + } > + > + /* read lock didn't change anything, so no barrier needed? */ since rw_exited() calls membar_consumer(), and the data that the lock is protecting should not change when the lock is read-locked, i also i think this is fine. > + > + if (decr == 0) { > + /* last one out */ > + rw_exited(rwl); > + } > } > @@ -223,144 +190,285 @@ _rw_init_flags(struct rwlock *rwl, const > int > rw_enter(struct rwlock *rwl, int flags) > { > - const struct rwlock_op *op; > - unsigned long inc, o; > -#ifdef MULTIPROCESSOR > - /* > - * If process holds the kernel lock, then we want to give up on CPU > - * as soon as possible so other processes waiting for the kernel lock > - * can progress. Hence no spinning if we hold the kernel lock. > - */ > - unsigned int spin = (_kernel_lock_held()) ? 0 : RW_SPINS; > -#endif > - int error, prio; > -#ifdef WITNESS > - int lop_flags; > + int op = flags & RW_OPMASK; > + int error; > + > + switch (op) { > + case RW_WRITE: > + error = rw_do_enter_write(rwl, flags); > + break; > + case RW_READ: > + error = rw_do_enter_read(rwl, flags); > + break; > + case RW_DOWNGRADE: > + error = rw_downgrade(rwl, flags); > + break; > + default: > + panic("%s rwlock %p: %s unexpected op 0x%x", > + rwl->rwl_name, rwl, __func__, op); > + /* NOTREACHED */ > + } > > - lop_flags = LOP_NEWORDER; > - if (flags & RW_WRITE) > - lop_flags |= LOP_EXCLUSIVE; > - if (flags & RW_DUPOK) > + return (error); > +} > + > +static int > +rw_do_enter_write(struct rwlock *rwl, int flags) > +{ > + unsigned long self = rw_self(); > + unsigned long owner; > + int prio; > + int error; > + > +#ifdef WITNESS > + int lop_flags = LOP_NEWORDER | LOP_EXCLUSIVE; > + if (ISSET(flags, RW_DUPOK)) > lop_flags |= LOP_DUPOK; > - if ((flags & RW_NOSLEEP) == 0 && (flags & RW_DOWNGRADE) == 0) > + > + if (!ISSET(flags, RW_NOSLEEP)) > WITNESS_CHECKORDER(&rwl->rwl_lock_obj, lop_flags, NULL); > #endif > > - op = &rw_ops[(flags & RW_OPMASK) - 1]; > - > - inc = op->inc + RW_PROC(curproc) * op->proc_mult; > -retry: > - while (__predict_false(((o = rwl->rwl_owner) & op->check) != 0)) { > - unsigned long set = o | op->wait_set; > - int do_sleep; > - > - /* Avoid deadlocks after panic or in DDB */ > - if (panicstr || db_active) > - return (0); > + owner = rw_cas(&rwl->rwl_owner, 0, self); > + if (owner == 0) { > + /* wow, we won. so easy */ > + goto locked; > + } > + if (__predict_false(owner == self)) { > + panic("%s rwlock %p: enter write deadlock", > + rwl->rwl_name, rwl); > + } > > #ifdef MULTIPROCESSOR > + /* > + * If process holds the kernel lock, then we want to give up on CPU > + * as soon as possible so other processes waiting for the kernel lock > + * can progress. Hence no spinning if we hold the kernel lock. > + */ > + if (!_kernel_lock_held()) { > + int spins; > + > /* > * It makes sense to try to spin just in case the lock > * is acquired by writer. > */ i think the above comment can be removed? > - if ((o & RWLOCK_WRLOCK) && (spin != 0)) { > - spin--; > + > + for (spins = 0; spins < RW_SPINS; spins++) { > CPU_BUSY_CYCLE(); > - continue; > + owner = atomic_load_long(&rwl->rwl_owner); > + if (owner != 0) > + continue; > + > + owner = rw_cas(&rwl->rwl_owner, 0, self); > + if (owner == 0) { > + /* ok, we won now. */ > + goto locked; > + } > } > + } > #endif > > - rw_enter_diag(rwl, flags); > - > - if (flags & RW_NOSLEEP) > - return (EBUSY); > + if (ISSET(flags, RW_NOSLEEP)) > + return (EBUSY); > > - prio = op->wait_prio; > - if (flags & RW_INTR) > - prio |= PCATCH; > - sleep_setup(rwl, prio, rwl->rwl_name); > + prio = PLOCK - 4; > + if (ISSET(flags, RW_INTR)) > + prio |= PCATCH; > > - do_sleep = !rw_cas(&rwl->rwl_owner, o, set); > - > - error = sleep_finish(0, do_sleep); > - if ((flags & RW_INTR) && > - (error != 0)) > + rw_inc(&rwl->rwl_waiters); > + membar_producer(); > + do { > + sleep_setup(&rwl->rwl_waiters, prio, rwl->rwl_name); > + membar_consumer(); > + owner = atomic_load_long(&rwl->rwl_owner); > + error = sleep_finish(RW_SLEEP_TMO, owner != 0); > +#ifdef RWDIAG > + if (error == EWOULDBLOCK) { > + printf("%s rwlock %p: %s timeout owner 0x%lx " > + "(self 0x%lx)", rwl->rwl_name, rwl, __func__, > + owner, self); > + db_enter(); > + } > +#endif > + if (ISSET(flags, RW_INTR) && (error != 0)) { > + rw_dec(&rwl->rwl_waiters); > return (error); > - if (flags & RW_SLEEPFAIL) > + } > + if (ISSET(flags, RW_SLEEPFAIL)) { > + rw_dec(&rwl->rwl_waiters); > + rw_exited(rwl); > return (EAGAIN); > - } > + } > + > + owner = rw_cas(&rwl->rwl_owner, 0, self); > + } while (owner != 0); > + rw_dec(&rwl->rwl_waiters); > > - if (__predict_false(rw_cas(&rwl->rwl_owner, o, o + inc))) > - goto retry; > +locked: > membar_enter_after_atomic(); > + WITNESS_LOCK(&rwl->rwl_lock_obj, lop_flags); > > - /* > - * If old lock had RWLOCK_WAIT and RWLOCK_WRLOCK set, it means we > - * downgraded a write lock and had possible read waiter, wake them > - * to let them retry the lock. > - */ > - if (__predict_false((o & (RWLOCK_WRLOCK|RWLOCK_WAIT)) == > - (RWLOCK_WRLOCK|RWLOCK_WAIT))) > - wakeup(rwl); > + return (0); > +} > > - if (flags & RW_DOWNGRADE) > - WITNESS_DOWNGRADE(&rwl->rwl_lock_obj, lop_flags); > - else > - WITNESS_LOCK(&rwl->rwl_lock_obj, lop_flags); > +static int > +rw_read_incr(struct rwlock *rwl, unsigned long owner) > +{ > + unsigned long incr; > + unsigned long nowner; > + > + do { > + incr = owner + RWLOCK_READ_INCR; > + nowner = rw_cas(&rwl->rwl_owner, owner, incr); > + if (nowner == owner) > + return (1); > + > + owner = nowner; > + } while (!ISSET(owner, RWLOCK_WRLOCK)); > > return (0); > } > > -void > -rw_exit(struct rwlock *rwl) > +static int > +rw_do_enter_read(struct rwlock *rwl, int flags) > { > - unsigned long wrlock; > + unsigned long owner; > + int error; > + int prio; > > - /* Avoid deadlocks after panic or in DDB */ > - if (panicstr || db_active) > - return; > +#ifdef WITNESS > + int lop_flags = LOP_NEWORDER; > + if (ISSET(flags, RW_DUPOK)) > + lop_flags |= LOP_DUPOK; > + if (!ISSET(flags, RW_NOSLEEP)) > + WITNESS_CHECKORDER(&rwl->rwl_lock_obj, lop_flags, NULL); > +#endif > > - wrlock = rwl->rwl_owner & RWLOCK_WRLOCK; > - if (wrlock) > - rw_assert_wrlock(rwl); > - else > - rw_assert_rdlock(rwl); > - WITNESS_UNLOCK(&rwl->rwl_lock_obj, wrlock ? LOP_EXCLUSIVE : 0); > + owner = rw_cas(&rwl->rwl_owner, 0, RWLOCK_READ_INCR); > + if (owner == 0) { > + /* ermagerd, we won! */ > + goto locked; > + } > + > + if (ISSET(owner, RWLOCK_WRLOCK)) { > + if (__predict_false(owner == rw_self())) { > + panic("%s rwlock %p: enter read deadlock", > + rwl->rwl_name, rwl); > + } > + } else if (atomic_load_int(&rwl->rwl_waiters) == 0) { > + if (rw_read_incr(rwl, owner)) { > + /* nailed it */ > + goto locked; > + } > + } > + > + if (ISSET(flags, RW_NOSLEEP)) > + return (EBUSY); > + > + prio = PLOCK; > + if (ISSET(flags, RW_INTR)) > + prio |= PCATCH; > + > + rw_inc(&rwl->rwl_readers); > + membar_producer(); > + do { > + sleep_setup(&rwl->rwl_readers, prio, rwl->rwl_name); > + membar_consumer(); > + error = sleep_finish(RW_SLEEP_TMO, > + atomic_load_int(&rwl->rwl_waiters) > 0 || > + ISSET(atomic_load_long(&rwl->rwl_owner), RWLOCK_WRLOCK)); > +#ifdef RWDIAG > + if (error == EWOULDBLOCK) { > + printf("%s rwlock %p: %s timeout owner 0x%lx\n", > + rwl->rwl_name, rwl, __func__, owner); > + db_enter(); > + } > +#endif > + if (ISSET(flags, RW_INTR) && (error != 0)) > + goto fail; > + if (ISSET(flags, RW_SLEEPFAIL)) { > + error = EAGAIN; > + goto fail; > + } > + } while (!rw_read_incr(rwl, 0)); > + rw_dec(&rwl->rwl_readers); > + > +locked: > + membar_enter_after_atomic(); > + WITNESS_LOCK(&rwl->rwl_lock_obj, lop_flags); > + > + return (0); > +fail: > + rw_dec(&rwl->rwl_readers); > + return (error); > +} > + > +static int > +rw_downgrade(struct rwlock *rwl, int flags) > +{ > + unsigned long self = rw_self(); > + unsigned long owner; > > membar_exit_before_atomic(); > - rw_do_exit(rwl, wrlock); > + owner = atomic_cas_ulong(&rwl->rwl_owner, self, RWLOCK_READ_INCR); > + if (__predict_false(owner != self)) { > + panic("%s rwlock %p: downgrade when lock not held " > + "(owner 0x%lx, self 0x%lx)", rwl->rwl_name, rwl, > + owner, self); > + } > + > +#ifdef WITNESS > + { > + int lop_flags = LOP_NEWORDER; > + if (ISSET(flags, RW_DUPOK)) > + lop_flags |= LOP_DUPOK; > + WITNESS_DOWNGRADE(&rwl->rwl_lock_obj, lop_flags); > + } > +#endif > + > + membar_consumer(); hmm, why is this memory barrier here? shouldn't it be part of the WITNESS block? and isn't it the wrong type of barrier then? i am not sure, probably missing something here. > + if (atomic_load_int(&rwl->rwl_waiters) == 0 && > + atomic_load_int(&rwl->rwl_readers) > 0) > + wakeup(&rwl->rwl_readers); why is this only waking up the other readers when there are no writers? i suppose to ensure that the writer gets the lock as soon as poossible? in that case a comment would maby make sense? on the other side, if i am downgrading a lock, i think the only reason would be do allow others to also start reading. and if this is not happening, i don't know... i'd be very interested in the rationale here. > + > + return (0); > }