From: Christian Ludwig Subject: Re: dt: Use one ringbuffer per CPU To: Claudio Jeker Cc: Date: Thu, 12 Sep 2024 18:17:59 +0200 On Thu, Sep 12, 2024 at 02:30:50PM +0200 Claudio Jeker wrote: > On Wed, Sep 11, 2024 at 05:15:38PM +0200, Christian Ludwig wrote: > > Hi, > > > > The current dt(4) implentation uses one ringbuffer per PCB. That means > > the number of ringbuffers scales with the number of probes. It also > > means that each CPU needs to lock a ringbuffer to fill an event slot. > > > > Move to using one ringbuffer per CPU instead. Now the number of buffers > > scales with the number of online CPUs. And each CPU does not need to > > synchronize against other CPUs anymore. They still have to synchronize > > against their own nested events, though. Synchronization with the reader > > happens via atomic operations. That gets us rid of the PCB mutex. > > > > Also update counters with atomic operations. That gets rid of the softc > > mutex. > > > > In my tests, btrace processes more events now during the same timeframe. > > Please test and review. Thank you. > > > > I think this is a step in the right direction but I see one mayor issue > with this. The dt events are not any longer in order. So if a probe fires > on cpu63 and then shortly after the response fires on cpu0 btrace will see > the response before the action. This will mess up some btrace features > like maps and histograms. In the general case, events are not guaranteed to be globally ordered in current, too. Suppose the response is in a different probe than the trigger event, which is very common. Now dtread() loops over all PCBs and reads the trigger PCB. After that another trigger/response happens. Now it advances to the response PCB and reads a response, but hasn't captured the trigger event, yet. That gets read in the next round. +--------------+ | trigger PCB | (1) read +--------------+ | +--------------+ | another PCB | (2) read (3) CPU63 fires trigger +--------------+ (4) CPU00 fires response | +--------------+ | response PCB | (5) read response without trigger +--------------+ > I think other tools solve this by sorting the buffers by timestamp. > Which can and should be done in userland. Agreed. > Also your read function always starts with cpu0 which could result in > constant starvation of the last few CPUs if the read buffer is not big > enough. That problem exists for a big number of PCBs in current, too. We starve the last probes. With this diff, we are able to extend the read function to continue where we left off the next time we enter. In fact, I have a diff for that in my queue already. I wasn't quite sure how to solve this in current, because PCBs are allowed to vanish in between reads. This diff keeps buffers available from open to close. > > - Christian > > > > --- > > sys/dev/dt/dt_dev.c | 160 ++++++++++++++++++++++++++------------------ > > sys/dev/dt/dtvar.h | 9 --- > > 2 files changed, 94 insertions(+), 75 deletions(-) > > > > diff --git a/sys/dev/dt/dt_dev.c b/sys/dev/dt/dt_dev.c > > index 05ce734abf44..52ba947401e5 100644 > > --- a/sys/dev/dt/dt_dev.c > > +++ b/sys/dev/dt/dt_dev.c > > @@ -85,14 +85,33 @@ > > > > #define DPRINTF(x...) /* nothing */ > > > > +/* > > + * Event states ring > > + * > > + * Locks used to protect struct members: > > + * a atomic > > + * r owned by read(2) proc > > + * c owned by CPU > > + * s sliced ownership, based on read/write indexes > > + */ > > +struct dt_ringbuf { > > + unsigned int dr_prod; /* [a,r] read index */ > > + unsigned int dr_cons; /* [a,c] write index */ > > + struct dt_evt *dr_ring; /* [s] ring of event states */ > > + int dr_block; /* [c] block nested events */ > > + > > + /* Counters */ > > + unsigned int dr_dropevt; /* [a] # of dropped events */ > > +}; > > + > > /* > > * Descriptor associated with each program opening /dev/dt. It is used > > * to keep track of enabled PCBs. > > * > > * Locks used to protect struct members in this file: > > * a atomic > > - * m per-softc mutex > > * K kernel lock > > + * I invariant after initialization > > */ > > struct dt_softc { > > SLIST_ENTRY(dt_softc) ds_next; /* [K] descriptor list */ > > @@ -100,15 +119,15 @@ struct dt_softc { > > pid_t ds_pid; /* [I] PID of tracing program */ > > void *ds_si; /* [I] to defer wakeup(9) */ > > > > - struct mutex ds_mtx; > > - > > struct dt_pcb_list ds_pcbs; /* [K] list of enabled PCBs */ > > int ds_recording; /* [K] currently recording? */ > > - int ds_evtcnt; /* [m] # of readable evts */ > > + unsigned int ds_evtcnt; /* [a] # of readable evts */ > > + > > + struct dt_ringbuf ds_ring[MAXCPUS]; /* [I] event rings */ > > > > /* Counters */ > > - uint64_t ds_readevt; /* [m] # of events read */ > > - uint64_t ds_dropevt; /* [m] # of events dropped */ > > + unsigned int ds_readevt; /* [a] # of events read */ > > + unsigned int ds_dropevt; /* [a] # of events dropped */ > > }; > > > > SLIST_HEAD(, dt_softc) dtdev_list; /* [K] list of open /dev/dt nodes */ > > @@ -142,7 +161,7 @@ int dt_ioctl_probe_enable(struct dt_softc *, struct dtioc_req *); > > int dt_ioctl_probe_disable(struct dt_softc *, struct dtioc_req *); > > int dt_ioctl_get_auxbase(struct dt_softc *, struct dtioc_getaux *); > > > > -int dt_pcb_ring_copy(struct dt_pcb *, struct uio *, size_t, size_t *, > > +int dt_ring_copy(struct dt_ringbuf *, struct uio *, size_t, size_t *, > > uint64_t *); > > > > void dt_wakeup(struct dt_softc *); > > @@ -167,7 +186,8 @@ int > > dtopen(dev_t dev, int flags, int mode, struct proc *p) > > { > > struct dt_softc *sc; > > - int unit = minor(dev); > > + struct dt_evt *dtev; > > + int i, unit = minor(dev); > > > > if (atomic_load_int(&allowdt) == 0) > > return EPERM; > > @@ -176,16 +196,30 @@ dtopen(dev_t dev, int flags, int mode, struct proc *p) > > if (sc == NULL) > > return ENOMEM; > > > > - /* no sleep after this point */ > > if (dtlookup(unit) != NULL) { > > free(sc, M_DEVBUF, sizeof(*sc)); > > return EBUSY; > > } > > > > + for (i = 0; i < ncpusfound; i++) { > > + dtev = mallocarray(DT_EVTRING_SIZE, sizeof(*dtev), M_DT, > > + M_WAITOK|M_CANFAIL|M_ZERO); > > + if (dtev == NULL) > > + break; > > + sc->ds_ring[i].dr_ring = dtev; > > + } > > + if (i < ncpusfound) { > > + for (i--; i >= 0; i--) { > > + dtev = sc->ds_ring[i].dr_ring; > > + free(dtev, M_DT, DT_EVTRING_SIZE * sizeof(*dtev)); > > + } > > + free(sc, M_DEVBUF, sizeof(*sc)); > > + return ENOMEM; > > + } > > + > > sc->ds_unit = unit; > > sc->ds_pid = p->p_p->ps_pid; > > TAILQ_INIT(&sc->ds_pcbs); > > - mtx_init(&sc->ds_mtx, IPL_HIGH); > > sc->ds_evtcnt = 0; > > sc->ds_readevt = 0; > > sc->ds_dropevt = 0; > > @@ -206,7 +240,8 @@ int > > dtclose(dev_t dev, int flags, int mode, struct proc *p) > > { > > struct dt_softc *sc; > > - int unit = minor(dev); > > + struct dt_evt *dtev; > > + int i, unit = minor(dev); > > > > sc = dtlookup(unit); > > KASSERT(sc != NULL); > > @@ -218,6 +253,10 @@ dtclose(dev_t dev, int flags, int mode, struct proc *p) > > dt_pcb_purge(&sc->ds_pcbs); > > softintr_disestablish(sc->ds_si); > > > > + for (i = 0; i < ncpusfound; i++) { > > + dtev = sc->ds_ring[i].dr_ring; > > + free(dtev, M_DT, DT_EVTRING_SIZE * sizeof(*dtev)); > > + } > > free(sc, M_DEVBUF, sizeof(*sc)); > > > > return 0; > > @@ -227,8 +266,8 @@ int > > dtread(dev_t dev, struct uio *uio, int flags) > > { > > struct dt_softc *sc; > > - struct dt_pcb *dp; > > - int error = 0, unit = minor(dev); > > + struct dt_ringbuf *dr; > > + int i, error = 0, unit = minor(dev); > > size_t count, max, read = 0; > > uint64_t dropped = 0; > > > > @@ -239,9 +278,9 @@ dtread(dev_t dev, struct uio *uio, int flags) > > if (max < 1) > > return (EMSGSIZE); > > > > - while (!sc->ds_evtcnt) { > > + while (!atomic_load_int(&sc->ds_evtcnt)) { > > sleep_setup(sc, PWAIT | PCATCH, "dtread"); > > - error = sleep_finish(0, !sc->ds_evtcnt); > > + error = sleep_finish(0, !atomic_load_int(&sc->ds_evtcnt)); > > if (error == EINTR || error == ERESTART) > > break; > > } > > @@ -249,9 +288,10 @@ dtread(dev_t dev, struct uio *uio, int flags) > > return error; > > > > KERNEL_ASSERT_LOCKED(); > > - TAILQ_FOREACH(dp, &sc->ds_pcbs, dp_snext) { > > + for (i = 0; i < ncpusfound; i++) { > > count = 0; > > - error = dt_pcb_ring_copy(dp, uio, max, &count, &dropped); > > + dr = &sc->ds_ring[i]; > > + error = dt_ring_copy(dr, uio, max, &count, &dropped); > > if (error && count == 0) > > break; > > > > @@ -261,11 +301,9 @@ dtread(dev_t dev, struct uio *uio, int flags) > > break; > > } > > > > - mtx_enter(&sc->ds_mtx); > > - sc->ds_evtcnt -= read; > > - sc->ds_readevt += read; > > - sc->ds_dropevt += dropped; > > - mtx_leave(&sc->ds_mtx); > > + atomic_add_int(&sc->ds_readevt, read); > > + atomic_add_int(&sc->ds_dropevt, dropped); > > + atomic_sub_int(&sc->ds_evtcnt, read); > > > > return error; > > } > > @@ -434,10 +472,8 @@ dt_ioctl_get_args(struct dt_softc *sc, struct dtioc_arg *dtar) > > int > > dt_ioctl_get_stats(struct dt_softc *sc, struct dtioc_stat *dtst) > > { > > - mtx_enter(&sc->ds_mtx); > > - dtst->dtst_readevt = sc->ds_readevt; > > - dtst->dtst_dropevt = sc->ds_dropevt; > > - mtx_leave(&sc->ds_mtx); > > + dtst->dtst_readevt = atomic_load_int(&sc->ds_readevt); > > + dtst->dtst_dropevt = atomic_load_int(&sc->ds_dropevt); > > > > return 0; > > } > > @@ -637,28 +673,16 @@ dt_pcb_alloc(struct dt_probe *dtp, struct dt_softc *sc) > > > > dp = malloc(sizeof(*dp), M_DT, M_WAITOK|M_CANFAIL|M_ZERO); > > if (dp == NULL) > > - goto bad; > > + return NULL; > > > > - dp->dp_ring = mallocarray(DT_EVTRING_SIZE, sizeof(*dp->dp_ring), M_DT, > > - M_WAITOK|M_CANFAIL|M_ZERO); > > - if (dp->dp_ring == NULL) > > - goto bad; > > - > > - mtx_init(&dp->dp_mtx, IPL_HIGH); > > dp->dp_sc = sc; > > dp->dp_dtp = dtp; > > return dp; > > -bad: > > - dt_pcb_free(dp); > > - return NULL; > > } > > > > void > > dt_pcb_free(struct dt_pcb *dp) > > { > > - if (dp == NULL) > > - return; > > - free(dp->dp_ring, M_DT, DT_EVTRING_SIZE * sizeof(*dp->dp_ring)); > > free(dp, M_DT, sizeof(*dp)); > > } > > > > @@ -681,21 +705,26 @@ dt_pcb_ring_get(struct dt_pcb *dp, int profiling) > > { > > struct proc *p = curproc; > > struct dt_evt *dtev; > > - int distance; > > + int prod, cons, distance; > > + struct dt_ringbuf *dr = &dp->dp_sc->ds_ring[cpu_number()]; > > > > - mtx_enter(&dp->dp_mtx); > > - distance = dp->dp_prod - dp->dp_cons; > > + dr->dr_block = splhigh(); > > + membar_consumer(); > > + prod = dr->dr_prod; > > + cons = dr->dr_cons; > > + distance = prod - cons; > > if (distance == 1 || distance == (1 - DT_EVTRING_SIZE)) { > > + splx(dr->dr_block); > > + > > /* read(2) isn't finished */ > > - dp->dp_dropevt++; > > - mtx_leave(&dp->dp_mtx); > > + atomic_inc_int(&dr->dr_dropevt); > > return NULL; > > } > > > > /* > > * Save states in next free event slot. > > */ > > - dtev = &dp->dp_ring[dp->dp_cons]; > > + dtev = &dr->dr_ring[cons]; > > memset(dtev, 0, sizeof(*dtev)); > > > > dtev->dtev_pbn = dp->dp_dtp->dtp_pbn; > > @@ -722,25 +751,27 @@ dt_pcb_ring_get(struct dt_pcb *dp, int profiling) > > void > > dt_pcb_ring_consume(struct dt_pcb *dp, struct dt_evt *dtev) > > { > > - MUTEX_ASSERT_LOCKED(&dp->dp_mtx); > > - KASSERT(dtev == &dp->dp_ring[dp->dp_cons]); > > + struct dt_ringbuf *dr = &dp->dp_sc->ds_ring[cpu_number()]; > > > > - dp->dp_cons = (dp->dp_cons + 1) % DT_EVTRING_SIZE; > > - mtx_leave(&dp->dp_mtx); > > + KASSERT(dtev == &dr->dr_ring[dr->dr_cons]); > > + > > + dr->dr_cons = (dr->dr_cons + 1) % DT_EVTRING_SIZE; > > + membar_producer(); > > + > > + splx(dr->dr_block); > > + > > + atomic_inc_int(&dp->dp_sc->ds_evtcnt); > > > > - mtx_enter(&dp->dp_sc->ds_mtx); > > - dp->dp_sc->ds_evtcnt++; > > - mtx_leave(&dp->dp_sc->ds_mtx); > > dt_wakeup(dp->dp_sc); > > } > > > > /* > > - * Copy at most `max' events from `dp', producing the same amount > > + * Copy at most `max' events from `dr', producing the same amount > > * of free slots. > > */ > > int > > -dt_pcb_ring_copy(struct dt_pcb *dp, struct uio *uio, size_t max, > > - size_t *rcvd, uint64_t *dropped) > > +dt_ring_copy(struct dt_ringbuf *dr, struct uio *uio, size_t max, > > + size_t *rcvd, uint64_t *dropped) > > { > > size_t count, copied = 0; > > unsigned int cons, prod; > > @@ -748,12 +779,10 @@ dt_pcb_ring_copy(struct dt_pcb *dp, struct uio *uio, size_t max, > > > > KASSERT(max > 0); > > > > - mtx_enter(&dp->dp_mtx); > > - cons = dp->dp_cons; > > - prod = dp->dp_prod; > > - *dropped += dp->dp_dropevt; > > - dp->dp_dropevt = 0; > > - mtx_leave(&dp->dp_mtx); > > + membar_consumer(); > > + cons = dr->dr_cons; > > + prod = dr->dr_prod; > > + *dropped += atomic_swap_uint(&dr->dr_dropevt, 0); > > > > if (cons < prod) > > count = DT_EVTRING_SIZE - prod; > > @@ -764,7 +793,7 @@ dt_pcb_ring_copy(struct dt_pcb *dp, struct uio *uio, size_t max, > > return 0; > > > > count = MIN(count, max); > > - error = uiomove(&dp->dp_ring[prod], count * sizeof(struct dt_evt), uio); > > + error = uiomove(&dr->dr_ring[prod], count * sizeof(struct dt_evt), uio); > > if (error) > > return error; > > copied += count; > > @@ -777,7 +806,7 @@ dt_pcb_ring_copy(struct dt_pcb *dp, struct uio *uio, size_t max, > > goto out; > > > > count = MIN(cons, (max - copied)); > > - error = uiomove(&dp->dp_ring[0], count * sizeof(struct dt_evt), uio); > > + error = uiomove(&dr->dr_ring[0], count * sizeof(struct dt_evt), uio); > > if (error) > > goto out; > > > > @@ -785,9 +814,8 @@ dt_pcb_ring_copy(struct dt_pcb *dp, struct uio *uio, size_t max, > > prod += count; > > > > out: > > - mtx_enter(&dp->dp_mtx); > > - dp->dp_prod = prod; > > - mtx_leave(&dp->dp_mtx); > > + dr->dr_prod = prod; > > + membar_producer(); > > > > *rcvd = copied; > > return error; > > diff --git a/sys/dev/dt/dtvar.h b/sys/dev/dt/dtvar.h > > index 5bb7ab0d7715..07e2b6c2f6df 100644 > > --- a/sys/dev/dt/dtvar.h > > +++ b/sys/dev/dt/dtvar.h > > @@ -163,12 +163,6 @@ struct dt_pcb { > > SMR_SLIST_ENTRY(dt_pcb) dp_pnext; /* [K,S] next PCB per probe */ > > TAILQ_ENTRY(dt_pcb) dp_snext; /* [K] next PCB per softc */ > > > > - /* Event states ring */ > > - unsigned int dp_prod; /* [m] read index */ > > - unsigned int dp_cons; /* [m] write index */ > > - struct dt_evt *dp_ring; /* [m] ring of event states */ > > - struct mutex dp_mtx; > > - > > struct dt_softc *dp_sc; /* [I] related softc */ > > struct dt_probe *dp_dtp; /* [I] related probe */ > > uint64_t dp_evtflags; /* [I] event states to record */ > > @@ -177,9 +171,6 @@ struct dt_pcb { > > struct clockintr dp_clockintr; /* [D] profiling handle */ > > uint64_t dp_nsecs; /* [I] profiling period */ > > struct cpu_info *dp_cpu; /* [I] on which CPU */ > > - > > - /* Counters */ > > - uint64_t dp_dropevt; /* [m] # dropped event */ > > }; > > > > TAILQ_HEAD(dt_pcb_list, dt_pcb); > > -- > > 2.34.1 > > > > > > -- > :wq Claudio