From: Martin Pieuchot Subject: Re: dt: Use one ringbuffer per CPU To: Christian Ludwig Cc: tech@openbsd.org Date: Sun, 27 Oct 2024 18:52:50 +0100 On 02/10/24(Wed) 15:37, Christian Ludwig wrote: > Hi, > > here is an updated version. It incorporates the diff to continue reading > events from where we left off in a previous read(2) (from [1]). > > [1] https://marc.info/?l=openbsd-tech&m=172642998727042 Great ! Some comments below. > --- > sys/dev/dt/dt_dev.c | 164 +++++++++++++++++++++++++++++++--------------------- > sys/dev/dt/dtvar.h | 9 --- > 2 files changed, 98 insertions(+), 75 deletions(-) > > diff --git a/sys/dev/dt/dt_dev.c b/sys/dev/dt/dt_dev.c > index 05ce734abf44..c0357239b98e 100644 > --- a/sys/dev/dt/dt_dev.c > +++ b/sys/dev/dt/dt_dev.c > @@ -85,14 +85,34 @@ > > #define DPRINTF(x...) /* nothing */ > > +/* > + * Event states ring I'd name it "dt_cpu" or "dt_cpubuf" and call it "Per-CPU Event States" to reflect that it includes more than the ring and to avoid confusion with the actual per-CPU ring variable 'dr_ring'. > + * > + * Locks used to protect struct members: > + * a atomic > + * r owned by read(2) proc > + * c owned by CPU > + * s sliced ownership, based on read/write indexes > + */ > +struct dt_ringbuf { > + unsigned int dr_prod; /* [a,r] read index */ > + unsigned int dr_cons; /* [a,c] write index */ I'm not sure we use `a' for producer/consumer owned variables. To me `a' would suggest the use of an atomic_*() helper. However here it's more about ownership and visibility, right? > + struct dt_evt *dr_ring; /* [s] ring of event states */ > + int dr_block; /* [c] block nested events */ Can we call it `dr_spl' or `dr_ipl'? > + > + /* Counters */ > + unsigned int dr_dropevt; /* [a] # of dropped events */ > +}; > + > /* > * Descriptor associated with each program opening /dev/dt. It is used > * to keep track of enabled PCBs. > * > * Locks used to protect struct members in this file: > * a atomic > - * m per-softc mutex > * K kernel lock > + * r owned by read(2) proc > + * I invariant after initialization > */ > struct dt_softc { > SLIST_ENTRY(dt_softc) ds_next; /* [K] descriptor list */ > @@ -100,15 +120,16 @@ struct dt_softc { > pid_t ds_pid; /* [I] PID of tracing program */ > void *ds_si; /* [I] to defer wakeup(9) */ > > - struct mutex ds_mtx; Killing `ds_mtx' is a great move forward, it simplifies inspection of locks which is being discussed off-list. > - > struct dt_pcb_list ds_pcbs; /* [K] list of enabled PCBs */ > int ds_recording; /* [K] currently recording? */ > - int ds_evtcnt; /* [m] # of readable evts */ > + unsigned int ds_evtcnt; /* [a] # of readable evts */ > + > + struct dt_ringbuf ds_ring[MAXCPUS]; /* [I] event rings */ > + unsigned int ds_lastcpu; /* [r] last CPU ring read(2). */ > > /* Counters */ > - uint64_t ds_readevt; /* [m] # of events read */ > - uint64_t ds_dropevt; /* [m] # of events dropped */ > + unsigned int ds_readevt; /* [a] # of events read */ > + unsigned int ds_dropevt; /* [a] # of events dropped */ Should we move those counters to the per-CPU struct and aggregate them all in dt_ioctl_get_stats()? > }; > > SLIST_HEAD(, dt_softc) dtdev_list; /* [K] list of open /dev/dt nodes */ > @@ -142,7 +163,7 @@ int dt_ioctl_probe_enable(struct dt_softc *, struct dtioc_req *); > int dt_ioctl_probe_disable(struct dt_softc *, struct dtioc_req *); > int dt_ioctl_get_auxbase(struct dt_softc *, struct dtioc_getaux *); > > -int dt_pcb_ring_copy(struct dt_pcb *, struct uio *, size_t, size_t *, > +int dt_ring_copy(struct dt_ringbuf *, struct uio *, size_t, size_t *, > uint64_t *); > > void dt_wakeup(struct dt_softc *); > @@ -167,7 +188,8 @@ int > dtopen(dev_t dev, int flags, int mode, struct proc *p) > { > struct dt_softc *sc; > - int unit = minor(dev); > + struct dt_evt *dtev; > + int i, unit = minor(dev); > > if (atomic_load_int(&allowdt) == 0) > return EPERM; > @@ -176,16 +198,31 @@ dtopen(dev_t dev, int flags, int mode, struct proc *p) > if (sc == NULL) > return ENOMEM; > > - /* no sleep after this point */ This is not possible and introduce races. We should not sleep after calling dtlookup() below which is a guard against multiple allocations. > if (dtlookup(unit) != NULL) { > free(sc, M_DEVBUF, sizeof(*sc)); > return EBUSY; > } > > + for (i = 0; i < ncpusfound; i++) { > + dtev = mallocarray(DT_EVTRING_SIZE, sizeof(*dtev), M_DT, > + M_WAITOK|M_CANFAIL|M_ZERO); > + if (dtev == NULL) > + break; > + sc->ds_ring[i].dr_ring = dtev; > + } > + if (i < ncpusfound) { > + for (i--; i >= 0; i--) { > + dtev = sc->ds_ring[i].dr_ring; > + free(dtev, M_DT, DT_EVTRING_SIZE * sizeof(*dtev)); > + } > + free(sc, M_DEVBUF, sizeof(*sc)); > + return ENOMEM; > + } Maybe add dt_alloc() which contains all the sleeping allocations in a single place? > + > sc->ds_unit = unit; > sc->ds_pid = p->p_p->ps_pid; > TAILQ_INIT(&sc->ds_pcbs); > - mtx_init(&sc->ds_mtx, IPL_HIGH); > + sc->ds_lastcpu = 0; > sc->ds_evtcnt = 0; > sc->ds_readevt = 0; > sc->ds_dropevt = 0; > @@ -206,7 +243,8 @@ int > dtclose(dev_t dev, int flags, int mode, struct proc *p) > { > struct dt_softc *sc; > - int unit = minor(dev); > + struct dt_evt *dtev; > + int i, unit = minor(dev); > > sc = dtlookup(unit); > KASSERT(sc != NULL); > @@ -218,6 +256,10 @@ dtclose(dev_t dev, int flags, int mode, struct proc *p) > dt_pcb_purge(&sc->ds_pcbs); > softintr_disestablish(sc->ds_si); > > + for (i = 0; i < ncpusfound; i++) { > + dtev = sc->ds_ring[i].dr_ring; > + free(dtev, M_DT, DT_EVTRING_SIZE * sizeof(*dtev)); > + } > free(sc, M_DEVBUF, sizeof(*sc)); Same here, dt_free()? > > return 0; > @@ -227,8 +269,8 @@ int > dtread(dev_t dev, struct uio *uio, int flags) > { > struct dt_softc *sc; > - struct dt_pcb *dp; > - int error = 0, unit = minor(dev); > + struct dt_ringbuf *dr; > + int i, error = 0, unit = minor(dev); > size_t count, max, read = 0; > uint64_t dropped = 0; > > @@ -239,9 +281,9 @@ dtread(dev_t dev, struct uio *uio, int flags) > if (max < 1) > return (EMSGSIZE); > > - while (!sc->ds_evtcnt) { > + while (!atomic_load_int(&sc->ds_evtcnt)) { > sleep_setup(sc, PWAIT | PCATCH, "dtread"); > - error = sleep_finish(0, !sc->ds_evtcnt); > + error = sleep_finish(0, !atomic_load_int(&sc->ds_evtcnt)); > if (error == EINTR || error == ERESTART) > break; > } > @@ -249,9 +291,10 @@ dtread(dev_t dev, struct uio *uio, int flags) > return error; > > KERNEL_ASSERT_LOCKED(); > - TAILQ_FOREACH(dp, &sc->ds_pcbs, dp_snext) { > + for (i = 0; i < ncpusfound; i++) { > count = 0; > - error = dt_pcb_ring_copy(dp, uio, max, &count, &dropped); > + dr = &sc->ds_ring[(sc->ds_lastcpu + i) % ncpusfound]; > + error = dt_ring_copy(dr, uio, max, &count, &dropped); > if (error && count == 0) > break; > > @@ -260,12 +303,11 @@ dtread(dev_t dev, struct uio *uio, int flags) > if (max == 0) > break; > } > + sc->ds_lastcpu += i % ncpusfound; > > - mtx_enter(&sc->ds_mtx); > - sc->ds_evtcnt -= read; > - sc->ds_readevt += read; > - sc->ds_dropevt += dropped; > - mtx_leave(&sc->ds_mtx); > + atomic_add_int(&sc->ds_readevt, read); > + atomic_add_int(&sc->ds_dropevt, dropped); > + atomic_sub_int(&sc->ds_evtcnt, read); I'd suggest using per-CPU counters as mentioned above. > > return error; > } > @@ -434,10 +476,8 @@ dt_ioctl_get_args(struct dt_softc *sc, struct dtioc_arg *dtar) > int > dt_ioctl_get_stats(struct dt_softc *sc, struct dtioc_stat *dtst) > { > - mtx_enter(&sc->ds_mtx); > - dtst->dtst_readevt = sc->ds_readevt; > - dtst->dtst_dropevt = sc->ds_dropevt; > - mtx_leave(&sc->ds_mtx); > + dtst->dtst_readevt = atomic_load_int(&sc->ds_readevt); > + dtst->dtst_dropevt = atomic_load_int(&sc->ds_dropevt); And here we look at all the per-CPU counters. > > return 0; > } > @@ -637,28 +677,16 @@ dt_pcb_alloc(struct dt_probe *dtp, struct dt_softc *sc) > > dp = malloc(sizeof(*dp), M_DT, M_WAITOK|M_CANFAIL|M_ZERO); > if (dp == NULL) > - goto bad; > + return NULL; > > - dp->dp_ring = mallocarray(DT_EVTRING_SIZE, sizeof(*dp->dp_ring), M_DT, > - M_WAITOK|M_CANFAIL|M_ZERO); > - if (dp->dp_ring == NULL) > - goto bad; > - > - mtx_init(&dp->dp_mtx, IPL_HIGH); > dp->dp_sc = sc; > dp->dp_dtp = dtp; > return dp; > -bad: > - dt_pcb_free(dp); > - return NULL; > } > > void > dt_pcb_free(struct dt_pcb *dp) > { > - if (dp == NULL) > - return; > - free(dp->dp_ring, M_DT, DT_EVTRING_SIZE * sizeof(*dp->dp_ring)); > free(dp, M_DT, sizeof(*dp)); > } > > @@ -681,21 +709,26 @@ dt_pcb_ring_get(struct dt_pcb *dp, int profiling) > { > struct proc *p = curproc; > struct dt_evt *dtev; > - int distance; > + int prod, cons, distance; > + struct dt_ringbuf *dr = &dp->dp_sc->ds_ring[cpu_number()]; > > - mtx_enter(&dp->dp_mtx); > - distance = dp->dp_prod - dp->dp_cons; > + dr->dr_block = splhigh(); I understand we want to avoid recursion while we are manipulating an event. Could we use a per thread flag similar to P_INKTR to achieve this goal? It should be easy to check for this flag in DT_ENTER() & co. My intention is to rely on as few subsystems as possible inside dt(4) to be able to inspect them. > + membar_consumer(); > + prod = dr->dr_prod; > + cons = dr->dr_cons; > + distance = prod - cons; > if (distance == 1 || distance == (1 - DT_EVTRING_SIZE)) { > + splx(dr->dr_block); > + > /* read(2) isn't finished */ > - dp->dp_dropevt++; > - mtx_leave(&dp->dp_mtx); > + atomic_inc_int(&dr->dr_dropevt); > return NULL; > } > > /* > * Save states in next free event slot. > */ > - dtev = &dp->dp_ring[dp->dp_cons]; > + dtev = &dr->dr_ring[cons]; > memset(dtev, 0, sizeof(*dtev)); > > dtev->dtev_pbn = dp->dp_dtp->dtp_pbn; > @@ -722,25 +755,27 @@ dt_pcb_ring_get(struct dt_pcb *dp, int profiling) > void > dt_pcb_ring_consume(struct dt_pcb *dp, struct dt_evt *dtev) > { > - MUTEX_ASSERT_LOCKED(&dp->dp_mtx); > - KASSERT(dtev == &dp->dp_ring[dp->dp_cons]); > + struct dt_ringbuf *dr = &dp->dp_sc->ds_ring[cpu_number()]; > > - dp->dp_cons = (dp->dp_cons + 1) % DT_EVTRING_SIZE; > - mtx_leave(&dp->dp_mtx); > + KASSERT(dtev == &dr->dr_ring[dr->dr_cons]); > + > + dr->dr_cons = (dr->dr_cons + 1) % DT_EVTRING_SIZE; > + membar_producer(); > + > + splx(dr->dr_block); > + > + atomic_inc_int(&dp->dp_sc->ds_evtcnt); > > - mtx_enter(&dp->dp_sc->ds_mtx); > - dp->dp_sc->ds_evtcnt++; > - mtx_leave(&dp->dp_sc->ds_mtx); > dt_wakeup(dp->dp_sc); > } > > /* > - * Copy at most `max' events from `dp', producing the same amount > + * Copy at most `max' events from `dr', producing the same amount > * of free slots. > */ > int > -dt_pcb_ring_copy(struct dt_pcb *dp, struct uio *uio, size_t max, > - size_t *rcvd, uint64_t *dropped) > +dt_ring_copy(struct dt_ringbuf *dr, struct uio *uio, size_t max, > + size_t *rcvd, uint64_t *dropped) > { > size_t count, copied = 0; > unsigned int cons, prod; > @@ -748,12 +783,10 @@ dt_pcb_ring_copy(struct dt_pcb *dp, struct uio *uio, size_t max, > > KASSERT(max > 0); > > - mtx_enter(&dp->dp_mtx); > - cons = dp->dp_cons; > - prod = dp->dp_prod; > - *dropped += dp->dp_dropevt; > - dp->dp_dropevt = 0; > - mtx_leave(&dp->dp_mtx); > + membar_consumer(); > + cons = dr->dr_cons; > + prod = dr->dr_prod; > + *dropped += atomic_swap_uint(&dr->dr_dropevt, 0); > > if (cons < prod) > count = DT_EVTRING_SIZE - prod; > @@ -764,7 +797,7 @@ dt_pcb_ring_copy(struct dt_pcb *dp, struct uio *uio, size_t max, > return 0; > > count = MIN(count, max); > - error = uiomove(&dp->dp_ring[prod], count * sizeof(struct dt_evt), uio); > + error = uiomove(&dr->dr_ring[prod], count * sizeof(struct dt_evt), uio); > if (error) > return error; > copied += count; > @@ -777,7 +810,7 @@ dt_pcb_ring_copy(struct dt_pcb *dp, struct uio *uio, size_t max, > goto out; > > count = MIN(cons, (max - copied)); > - error = uiomove(&dp->dp_ring[0], count * sizeof(struct dt_evt), uio); > + error = uiomove(&dr->dr_ring[0], count * sizeof(struct dt_evt), uio); > if (error) > goto out; > > @@ -785,9 +818,8 @@ dt_pcb_ring_copy(struct dt_pcb *dp, struct uio *uio, size_t max, > prod += count; > > out: > - mtx_enter(&dp->dp_mtx); > - dp->dp_prod = prod; > - mtx_leave(&dp->dp_mtx); > + dr->dr_prod = prod; > + membar_producer(); > > *rcvd = copied; > return error; > diff --git a/sys/dev/dt/dtvar.h b/sys/dev/dt/dtvar.h > index 5bb7ab0d7715..07e2b6c2f6df 100644 > --- a/sys/dev/dt/dtvar.h > +++ b/sys/dev/dt/dtvar.h > @@ -163,12 +163,6 @@ struct dt_pcb { > SMR_SLIST_ENTRY(dt_pcb) dp_pnext; /* [K,S] next PCB per probe */ > TAILQ_ENTRY(dt_pcb) dp_snext; /* [K] next PCB per softc */ > > - /* Event states ring */ > - unsigned int dp_prod; /* [m] read index */ > - unsigned int dp_cons; /* [m] write index */ > - struct dt_evt *dp_ring; /* [m] ring of event states */ > - struct mutex dp_mtx; > - > struct dt_softc *dp_sc; /* [I] related softc */ > struct dt_probe *dp_dtp; /* [I] related probe */ > uint64_t dp_evtflags; /* [I] event states to record */ > @@ -177,9 +171,6 @@ struct dt_pcb { > struct clockintr dp_clockintr; /* [D] profiling handle */ > uint64_t dp_nsecs; /* [I] profiling period */ > struct cpu_info *dp_cpu; /* [I] on which CPU */ > - > - /* Counters */ > - uint64_t dp_dropevt; /* [m] # dropped event */ > }; > > TAILQ_HEAD(dt_pcb_list, dt_pcb);