From: Christian Ludwig Subject: dt: Use one ringbuffer per CPU To: Date: Wed, 11 Sep 2024 17:15:38 +0200 Hi, The current dt(4) implentation uses one ringbuffer per PCB. That means the number of ringbuffers scales with the number of probes. It also means that each CPU needs to lock a ringbuffer to fill an event slot. Move to using one ringbuffer per CPU instead. Now the number of buffers scales with the number of online CPUs. And each CPU does not need to synchronize against other CPUs anymore. They still have to synchronize against their own nested events, though. Synchronization with the reader happens via atomic operations. That gets us rid of the PCB mutex. Also update counters with atomic operations. That gets rid of the softc mutex. In my tests, btrace processes more events now during the same timeframe. Please test and review. Thank you. - Christian --- sys/dev/dt/dt_dev.c | 160 ++++++++++++++++++++++++++------------------ sys/dev/dt/dtvar.h | 9 --- 2 files changed, 94 insertions(+), 75 deletions(-) diff --git a/sys/dev/dt/dt_dev.c b/sys/dev/dt/dt_dev.c index 05ce734abf44..52ba947401e5 100644 --- a/sys/dev/dt/dt_dev.c +++ b/sys/dev/dt/dt_dev.c @@ -85,14 +85,33 @@ #define DPRINTF(x...) /* nothing */ +/* + * Event states ring + * + * Locks used to protect struct members: + * a atomic + * r owned by read(2) proc + * c owned by CPU + * s sliced ownership, based on read/write indexes + */ +struct dt_ringbuf { + unsigned int dr_prod; /* [a,r] read index */ + unsigned int dr_cons; /* [a,c] write index */ + struct dt_evt *dr_ring; /* [s] ring of event states */ + int dr_block; /* [c] block nested events */ + + /* Counters */ + unsigned int dr_dropevt; /* [a] # of dropped events */ +}; + /* * Descriptor associated with each program opening /dev/dt. It is used * to keep track of enabled PCBs. * * Locks used to protect struct members in this file: * a atomic - * m per-softc mutex * K kernel lock + * I invariant after initialization */ struct dt_softc { SLIST_ENTRY(dt_softc) ds_next; /* [K] descriptor list */ @@ -100,15 +119,15 @@ struct dt_softc { pid_t ds_pid; /* [I] PID of tracing program */ void *ds_si; /* [I] to defer wakeup(9) */ - struct mutex ds_mtx; - struct dt_pcb_list ds_pcbs; /* [K] list of enabled PCBs */ int ds_recording; /* [K] currently recording? */ - int ds_evtcnt; /* [m] # of readable evts */ + unsigned int ds_evtcnt; /* [a] # of readable evts */ + + struct dt_ringbuf ds_ring[MAXCPUS]; /* [I] event rings */ /* Counters */ - uint64_t ds_readevt; /* [m] # of events read */ - uint64_t ds_dropevt; /* [m] # of events dropped */ + unsigned int ds_readevt; /* [a] # of events read */ + unsigned int ds_dropevt; /* [a] # of events dropped */ }; SLIST_HEAD(, dt_softc) dtdev_list; /* [K] list of open /dev/dt nodes */ @@ -142,7 +161,7 @@ int dt_ioctl_probe_enable(struct dt_softc *, struct dtioc_req *); int dt_ioctl_probe_disable(struct dt_softc *, struct dtioc_req *); int dt_ioctl_get_auxbase(struct dt_softc *, struct dtioc_getaux *); -int dt_pcb_ring_copy(struct dt_pcb *, struct uio *, size_t, size_t *, +int dt_ring_copy(struct dt_ringbuf *, struct uio *, size_t, size_t *, uint64_t *); void dt_wakeup(struct dt_softc *); @@ -167,7 +186,8 @@ int dtopen(dev_t dev, int flags, int mode, struct proc *p) { struct dt_softc *sc; - int unit = minor(dev); + struct dt_evt *dtev; + int i, unit = minor(dev); if (atomic_load_int(&allowdt) == 0) return EPERM; @@ -176,16 +196,30 @@ dtopen(dev_t dev, int flags, int mode, struct proc *p) if (sc == NULL) return ENOMEM; - /* no sleep after this point */ if (dtlookup(unit) != NULL) { free(sc, M_DEVBUF, sizeof(*sc)); return EBUSY; } + for (i = 0; i < ncpusfound; i++) { + dtev = mallocarray(DT_EVTRING_SIZE, sizeof(*dtev), M_DT, + M_WAITOK|M_CANFAIL|M_ZERO); + if (dtev == NULL) + break; + sc->ds_ring[i].dr_ring = dtev; + } + if (i < ncpusfound) { + for (i--; i >= 0; i--) { + dtev = sc->ds_ring[i].dr_ring; + free(dtev, M_DT, DT_EVTRING_SIZE * sizeof(*dtev)); + } + free(sc, M_DEVBUF, sizeof(*sc)); + return ENOMEM; + } + sc->ds_unit = unit; sc->ds_pid = p->p_p->ps_pid; TAILQ_INIT(&sc->ds_pcbs); - mtx_init(&sc->ds_mtx, IPL_HIGH); sc->ds_evtcnt = 0; sc->ds_readevt = 0; sc->ds_dropevt = 0; @@ -206,7 +240,8 @@ int dtclose(dev_t dev, int flags, int mode, struct proc *p) { struct dt_softc *sc; - int unit = minor(dev); + struct dt_evt *dtev; + int i, unit = minor(dev); sc = dtlookup(unit); KASSERT(sc != NULL); @@ -218,6 +253,10 @@ dtclose(dev_t dev, int flags, int mode, struct proc *p) dt_pcb_purge(&sc->ds_pcbs); softintr_disestablish(sc->ds_si); + for (i = 0; i < ncpusfound; i++) { + dtev = sc->ds_ring[i].dr_ring; + free(dtev, M_DT, DT_EVTRING_SIZE * sizeof(*dtev)); + } free(sc, M_DEVBUF, sizeof(*sc)); return 0; @@ -227,8 +266,8 @@ int dtread(dev_t dev, struct uio *uio, int flags) { struct dt_softc *sc; - struct dt_pcb *dp; - int error = 0, unit = minor(dev); + struct dt_ringbuf *dr; + int i, error = 0, unit = minor(dev); size_t count, max, read = 0; uint64_t dropped = 0; @@ -239,9 +278,9 @@ dtread(dev_t dev, struct uio *uio, int flags) if (max < 1) return (EMSGSIZE); - while (!sc->ds_evtcnt) { + while (!atomic_load_int(&sc->ds_evtcnt)) { sleep_setup(sc, PWAIT | PCATCH, "dtread"); - error = sleep_finish(0, !sc->ds_evtcnt); + error = sleep_finish(0, !atomic_load_int(&sc->ds_evtcnt)); if (error == EINTR || error == ERESTART) break; } @@ -249,9 +288,10 @@ dtread(dev_t dev, struct uio *uio, int flags) return error; KERNEL_ASSERT_LOCKED(); - TAILQ_FOREACH(dp, &sc->ds_pcbs, dp_snext) { + for (i = 0; i < ncpusfound; i++) { count = 0; - error = dt_pcb_ring_copy(dp, uio, max, &count, &dropped); + dr = &sc->ds_ring[i]; + error = dt_ring_copy(dr, uio, max, &count, &dropped); if (error && count == 0) break; @@ -261,11 +301,9 @@ dtread(dev_t dev, struct uio *uio, int flags) break; } - mtx_enter(&sc->ds_mtx); - sc->ds_evtcnt -= read; - sc->ds_readevt += read; - sc->ds_dropevt += dropped; - mtx_leave(&sc->ds_mtx); + atomic_add_int(&sc->ds_readevt, read); + atomic_add_int(&sc->ds_dropevt, dropped); + atomic_sub_int(&sc->ds_evtcnt, read); return error; } @@ -434,10 +472,8 @@ dt_ioctl_get_args(struct dt_softc *sc, struct dtioc_arg *dtar) int dt_ioctl_get_stats(struct dt_softc *sc, struct dtioc_stat *dtst) { - mtx_enter(&sc->ds_mtx); - dtst->dtst_readevt = sc->ds_readevt; - dtst->dtst_dropevt = sc->ds_dropevt; - mtx_leave(&sc->ds_mtx); + dtst->dtst_readevt = atomic_load_int(&sc->ds_readevt); + dtst->dtst_dropevt = atomic_load_int(&sc->ds_dropevt); return 0; } @@ -637,28 +673,16 @@ dt_pcb_alloc(struct dt_probe *dtp, struct dt_softc *sc) dp = malloc(sizeof(*dp), M_DT, M_WAITOK|M_CANFAIL|M_ZERO); if (dp == NULL) - goto bad; + return NULL; - dp->dp_ring = mallocarray(DT_EVTRING_SIZE, sizeof(*dp->dp_ring), M_DT, - M_WAITOK|M_CANFAIL|M_ZERO); - if (dp->dp_ring == NULL) - goto bad; - - mtx_init(&dp->dp_mtx, IPL_HIGH); dp->dp_sc = sc; dp->dp_dtp = dtp; return dp; -bad: - dt_pcb_free(dp); - return NULL; } void dt_pcb_free(struct dt_pcb *dp) { - if (dp == NULL) - return; - free(dp->dp_ring, M_DT, DT_EVTRING_SIZE * sizeof(*dp->dp_ring)); free(dp, M_DT, sizeof(*dp)); } @@ -681,21 +705,26 @@ dt_pcb_ring_get(struct dt_pcb *dp, int profiling) { struct proc *p = curproc; struct dt_evt *dtev; - int distance; + int prod, cons, distance; + struct dt_ringbuf *dr = &dp->dp_sc->ds_ring[cpu_number()]; - mtx_enter(&dp->dp_mtx); - distance = dp->dp_prod - dp->dp_cons; + dr->dr_block = splhigh(); + membar_consumer(); + prod = dr->dr_prod; + cons = dr->dr_cons; + distance = prod - cons; if (distance == 1 || distance == (1 - DT_EVTRING_SIZE)) { + splx(dr->dr_block); + /* read(2) isn't finished */ - dp->dp_dropevt++; - mtx_leave(&dp->dp_mtx); + atomic_inc_int(&dr->dr_dropevt); return NULL; } /* * Save states in next free event slot. */ - dtev = &dp->dp_ring[dp->dp_cons]; + dtev = &dr->dr_ring[cons]; memset(dtev, 0, sizeof(*dtev)); dtev->dtev_pbn = dp->dp_dtp->dtp_pbn; @@ -722,25 +751,27 @@ dt_pcb_ring_get(struct dt_pcb *dp, int profiling) void dt_pcb_ring_consume(struct dt_pcb *dp, struct dt_evt *dtev) { - MUTEX_ASSERT_LOCKED(&dp->dp_mtx); - KASSERT(dtev == &dp->dp_ring[dp->dp_cons]); + struct dt_ringbuf *dr = &dp->dp_sc->ds_ring[cpu_number()]; - dp->dp_cons = (dp->dp_cons + 1) % DT_EVTRING_SIZE; - mtx_leave(&dp->dp_mtx); + KASSERT(dtev == &dr->dr_ring[dr->dr_cons]); + + dr->dr_cons = (dr->dr_cons + 1) % DT_EVTRING_SIZE; + membar_producer(); + + splx(dr->dr_block); + + atomic_inc_int(&dp->dp_sc->ds_evtcnt); - mtx_enter(&dp->dp_sc->ds_mtx); - dp->dp_sc->ds_evtcnt++; - mtx_leave(&dp->dp_sc->ds_mtx); dt_wakeup(dp->dp_sc); } /* - * Copy at most `max' events from `dp', producing the same amount + * Copy at most `max' events from `dr', producing the same amount * of free slots. */ int -dt_pcb_ring_copy(struct dt_pcb *dp, struct uio *uio, size_t max, - size_t *rcvd, uint64_t *dropped) +dt_ring_copy(struct dt_ringbuf *dr, struct uio *uio, size_t max, + size_t *rcvd, uint64_t *dropped) { size_t count, copied = 0; unsigned int cons, prod; @@ -748,12 +779,10 @@ dt_pcb_ring_copy(struct dt_pcb *dp, struct uio *uio, size_t max, KASSERT(max > 0); - mtx_enter(&dp->dp_mtx); - cons = dp->dp_cons; - prod = dp->dp_prod; - *dropped += dp->dp_dropevt; - dp->dp_dropevt = 0; - mtx_leave(&dp->dp_mtx); + membar_consumer(); + cons = dr->dr_cons; + prod = dr->dr_prod; + *dropped += atomic_swap_uint(&dr->dr_dropevt, 0); if (cons < prod) count = DT_EVTRING_SIZE - prod; @@ -764,7 +793,7 @@ dt_pcb_ring_copy(struct dt_pcb *dp, struct uio *uio, size_t max, return 0; count = MIN(count, max); - error = uiomove(&dp->dp_ring[prod], count * sizeof(struct dt_evt), uio); + error = uiomove(&dr->dr_ring[prod], count * sizeof(struct dt_evt), uio); if (error) return error; copied += count; @@ -777,7 +806,7 @@ dt_pcb_ring_copy(struct dt_pcb *dp, struct uio *uio, size_t max, goto out; count = MIN(cons, (max - copied)); - error = uiomove(&dp->dp_ring[0], count * sizeof(struct dt_evt), uio); + error = uiomove(&dr->dr_ring[0], count * sizeof(struct dt_evt), uio); if (error) goto out; @@ -785,9 +814,8 @@ dt_pcb_ring_copy(struct dt_pcb *dp, struct uio *uio, size_t max, prod += count; out: - mtx_enter(&dp->dp_mtx); - dp->dp_prod = prod; - mtx_leave(&dp->dp_mtx); + dr->dr_prod = prod; + membar_producer(); *rcvd = copied; return error; diff --git a/sys/dev/dt/dtvar.h b/sys/dev/dt/dtvar.h index 5bb7ab0d7715..07e2b6c2f6df 100644 --- a/sys/dev/dt/dtvar.h +++ b/sys/dev/dt/dtvar.h @@ -163,12 +163,6 @@ struct dt_pcb { SMR_SLIST_ENTRY(dt_pcb) dp_pnext; /* [K,S] next PCB per probe */ TAILQ_ENTRY(dt_pcb) dp_snext; /* [K] next PCB per softc */ - /* Event states ring */ - unsigned int dp_prod; /* [m] read index */ - unsigned int dp_cons; /* [m] write index */ - struct dt_evt *dp_ring; /* [m] ring of event states */ - struct mutex dp_mtx; - struct dt_softc *dp_sc; /* [I] related softc */ struct dt_probe *dp_dtp; /* [I] related probe */ uint64_t dp_evtflags; /* [I] event states to record */ @@ -177,9 +171,6 @@ struct dt_pcb { struct clockintr dp_clockintr; /* [D] profiling handle */ uint64_t dp_nsecs; /* [I] profiling period */ struct cpu_info *dp_cpu; /* [I] on which CPU */ - - /* Counters */ - uint64_t dp_dropevt; /* [m] # dropped event */ }; TAILQ_HEAD(dt_pcb_list, dt_pcb); -- 2.34.1