Download raw body.
dt: Use one ringbuffer per CPU
On 02/10/24(Wed) 15:37, Christian Ludwig wrote:
> Hi,
>
> here is an updated version. It incorporates the diff to continue reading
> events from where we left off in a previous read(2) (from [1]).
>
> [1] https://marc.info/?l=openbsd-tech&m=172642998727042
Great !
Some comments below.
> ---
> sys/dev/dt/dt_dev.c | 164 +++++++++++++++++++++++++++++++---------------------
> sys/dev/dt/dtvar.h | 9 ---
> 2 files changed, 98 insertions(+), 75 deletions(-)
>
> diff --git a/sys/dev/dt/dt_dev.c b/sys/dev/dt/dt_dev.c
> index 05ce734abf44..c0357239b98e 100644
> --- a/sys/dev/dt/dt_dev.c
> +++ b/sys/dev/dt/dt_dev.c
> @@ -85,14 +85,34 @@
>
> #define DPRINTF(x...) /* nothing */
>
> +/*
> + * Event states ring
I'd name it "dt_cpu" or "dt_cpubuf" and call it "Per-CPU Event States"
to reflect that it includes more than the ring and to avoid confusion
with the actual per-CPU ring variable 'dr_ring'.
> + *
> + * Locks used to protect struct members:
> + * a atomic
> + * r owned by read(2) proc
> + * c owned by CPU
> + * s sliced ownership, based on read/write indexes
> + */
> +struct dt_ringbuf {
> + unsigned int dr_prod; /* [a,r] read index */
> + unsigned int dr_cons; /* [a,c] write index */
I'm not sure we use `a' for producer/consumer owned variables. To me
`a' would suggest the use of an atomic_*() helper. However here it's
more about ownership and visibility, right?
> + struct dt_evt *dr_ring; /* [s] ring of event states */
> + int dr_block; /* [c] block nested events */
Can we call it `dr_spl' or `dr_ipl'?
> +
> + /* Counters */
> + unsigned int dr_dropevt; /* [a] # of dropped events */
> +};
> +
> /*
> * Descriptor associated with each program opening /dev/dt. It is used
> * to keep track of enabled PCBs.
> *
> * Locks used to protect struct members in this file:
> * a atomic
> - * m per-softc mutex
> * K kernel lock
> + * r owned by read(2) proc
> + * I invariant after initialization
> */
> struct dt_softc {
> SLIST_ENTRY(dt_softc) ds_next; /* [K] descriptor list */
> @@ -100,15 +120,16 @@ struct dt_softc {
> pid_t ds_pid; /* [I] PID of tracing program */
> void *ds_si; /* [I] to defer wakeup(9) */
>
> - struct mutex ds_mtx;
Killing `ds_mtx' is a great move forward, it simplifies inspection of
locks which is being discussed off-list.
> -
> struct dt_pcb_list ds_pcbs; /* [K] list of enabled PCBs */
> int ds_recording; /* [K] currently recording? */
> - int ds_evtcnt; /* [m] # of readable evts */
> + unsigned int ds_evtcnt; /* [a] # of readable evts */
> +
> + struct dt_ringbuf ds_ring[MAXCPUS]; /* [I] event rings */
> + unsigned int ds_lastcpu; /* [r] last CPU ring read(2). */
>
> /* Counters */
> - uint64_t ds_readevt; /* [m] # of events read */
> - uint64_t ds_dropevt; /* [m] # of events dropped */
> + unsigned int ds_readevt; /* [a] # of events read */
> + unsigned int ds_dropevt; /* [a] # of events dropped */
Should we move those counters to the per-CPU struct and aggregate them
all in dt_ioctl_get_stats()?
> };
>
> SLIST_HEAD(, dt_softc) dtdev_list; /* [K] list of open /dev/dt nodes */
> @@ -142,7 +163,7 @@ int dt_ioctl_probe_enable(struct dt_softc *, struct dtioc_req *);
> int dt_ioctl_probe_disable(struct dt_softc *, struct dtioc_req *);
> int dt_ioctl_get_auxbase(struct dt_softc *, struct dtioc_getaux *);
>
> -int dt_pcb_ring_copy(struct dt_pcb *, struct uio *, size_t, size_t *,
> +int dt_ring_copy(struct dt_ringbuf *, struct uio *, size_t, size_t *,
> uint64_t *);
>
> void dt_wakeup(struct dt_softc *);
> @@ -167,7 +188,8 @@ int
> dtopen(dev_t dev, int flags, int mode, struct proc *p)
> {
> struct dt_softc *sc;
> - int unit = minor(dev);
> + struct dt_evt *dtev;
> + int i, unit = minor(dev);
>
> if (atomic_load_int(&allowdt) == 0)
> return EPERM;
> @@ -176,16 +198,31 @@ dtopen(dev_t dev, int flags, int mode, struct proc *p)
> if (sc == NULL)
> return ENOMEM;
>
> - /* no sleep after this point */
This is not possible and introduce races. We should not sleep after
calling dtlookup() below which is a guard against multiple allocations.
> if (dtlookup(unit) != NULL) {
> free(sc, M_DEVBUF, sizeof(*sc));
> return EBUSY;
> }
>
> + for (i = 0; i < ncpusfound; i++) {
> + dtev = mallocarray(DT_EVTRING_SIZE, sizeof(*dtev), M_DT,
> + M_WAITOK|M_CANFAIL|M_ZERO);
> + if (dtev == NULL)
> + break;
> + sc->ds_ring[i].dr_ring = dtev;
> + }
> + if (i < ncpusfound) {
> + for (i--; i >= 0; i--) {
> + dtev = sc->ds_ring[i].dr_ring;
> + free(dtev, M_DT, DT_EVTRING_SIZE * sizeof(*dtev));
> + }
> + free(sc, M_DEVBUF, sizeof(*sc));
> + return ENOMEM;
> + }
Maybe add dt_alloc() which contains all the sleeping allocations in a
single place?
> +
> sc->ds_unit = unit;
> sc->ds_pid = p->p_p->ps_pid;
> TAILQ_INIT(&sc->ds_pcbs);
> - mtx_init(&sc->ds_mtx, IPL_HIGH);
> + sc->ds_lastcpu = 0;
> sc->ds_evtcnt = 0;
> sc->ds_readevt = 0;
> sc->ds_dropevt = 0;
> @@ -206,7 +243,8 @@ int
> dtclose(dev_t dev, int flags, int mode, struct proc *p)
> {
> struct dt_softc *sc;
> - int unit = minor(dev);
> + struct dt_evt *dtev;
> + int i, unit = minor(dev);
>
> sc = dtlookup(unit);
> KASSERT(sc != NULL);
> @@ -218,6 +256,10 @@ dtclose(dev_t dev, int flags, int mode, struct proc *p)
> dt_pcb_purge(&sc->ds_pcbs);
> softintr_disestablish(sc->ds_si);
>
> + for (i = 0; i < ncpusfound; i++) {
> + dtev = sc->ds_ring[i].dr_ring;
> + free(dtev, M_DT, DT_EVTRING_SIZE * sizeof(*dtev));
> + }
> free(sc, M_DEVBUF, sizeof(*sc));
Same here, dt_free()?
>
> return 0;
> @@ -227,8 +269,8 @@ int
> dtread(dev_t dev, struct uio *uio, int flags)
> {
> struct dt_softc *sc;
> - struct dt_pcb *dp;
> - int error = 0, unit = minor(dev);
> + struct dt_ringbuf *dr;
> + int i, error = 0, unit = minor(dev);
> size_t count, max, read = 0;
> uint64_t dropped = 0;
>
> @@ -239,9 +281,9 @@ dtread(dev_t dev, struct uio *uio, int flags)
> if (max < 1)
> return (EMSGSIZE);
>
> - while (!sc->ds_evtcnt) {
> + while (!atomic_load_int(&sc->ds_evtcnt)) {
> sleep_setup(sc, PWAIT | PCATCH, "dtread");
> - error = sleep_finish(0, !sc->ds_evtcnt);
> + error = sleep_finish(0, !atomic_load_int(&sc->ds_evtcnt));
> if (error == EINTR || error == ERESTART)
> break;
> }
> @@ -249,9 +291,10 @@ dtread(dev_t dev, struct uio *uio, int flags)
> return error;
>
> KERNEL_ASSERT_LOCKED();
> - TAILQ_FOREACH(dp, &sc->ds_pcbs, dp_snext) {
> + for (i = 0; i < ncpusfound; i++) {
> count = 0;
> - error = dt_pcb_ring_copy(dp, uio, max, &count, &dropped);
> + dr = &sc->ds_ring[(sc->ds_lastcpu + i) % ncpusfound];
> + error = dt_ring_copy(dr, uio, max, &count, &dropped);
> if (error && count == 0)
> break;
>
> @@ -260,12 +303,11 @@ dtread(dev_t dev, struct uio *uio, int flags)
> if (max == 0)
> break;
> }
> + sc->ds_lastcpu += i % ncpusfound;
>
> - mtx_enter(&sc->ds_mtx);
> - sc->ds_evtcnt -= read;
> - sc->ds_readevt += read;
> - sc->ds_dropevt += dropped;
> - mtx_leave(&sc->ds_mtx);
> + atomic_add_int(&sc->ds_readevt, read);
> + atomic_add_int(&sc->ds_dropevt, dropped);
> + atomic_sub_int(&sc->ds_evtcnt, read);
I'd suggest using per-CPU counters as mentioned above.
>
> return error;
> }
> @@ -434,10 +476,8 @@ dt_ioctl_get_args(struct dt_softc *sc, struct dtioc_arg *dtar)
> int
> dt_ioctl_get_stats(struct dt_softc *sc, struct dtioc_stat *dtst)
> {
> - mtx_enter(&sc->ds_mtx);
> - dtst->dtst_readevt = sc->ds_readevt;
> - dtst->dtst_dropevt = sc->ds_dropevt;
> - mtx_leave(&sc->ds_mtx);
> + dtst->dtst_readevt = atomic_load_int(&sc->ds_readevt);
> + dtst->dtst_dropevt = atomic_load_int(&sc->ds_dropevt);
And here we look at all the per-CPU counters.
>
> return 0;
> }
> @@ -637,28 +677,16 @@ dt_pcb_alloc(struct dt_probe *dtp, struct dt_softc *sc)
>
> dp = malloc(sizeof(*dp), M_DT, M_WAITOK|M_CANFAIL|M_ZERO);
> if (dp == NULL)
> - goto bad;
> + return NULL;
>
> - dp->dp_ring = mallocarray(DT_EVTRING_SIZE, sizeof(*dp->dp_ring), M_DT,
> - M_WAITOK|M_CANFAIL|M_ZERO);
> - if (dp->dp_ring == NULL)
> - goto bad;
> -
> - mtx_init(&dp->dp_mtx, IPL_HIGH);
> dp->dp_sc = sc;
> dp->dp_dtp = dtp;
> return dp;
> -bad:
> - dt_pcb_free(dp);
> - return NULL;
> }
>
> void
> dt_pcb_free(struct dt_pcb *dp)
> {
> - if (dp == NULL)
> - return;
> - free(dp->dp_ring, M_DT, DT_EVTRING_SIZE * sizeof(*dp->dp_ring));
> free(dp, M_DT, sizeof(*dp));
> }
>
> @@ -681,21 +709,26 @@ dt_pcb_ring_get(struct dt_pcb *dp, int profiling)
> {
> struct proc *p = curproc;
> struct dt_evt *dtev;
> - int distance;
> + int prod, cons, distance;
> + struct dt_ringbuf *dr = &dp->dp_sc->ds_ring[cpu_number()];
>
> - mtx_enter(&dp->dp_mtx);
> - distance = dp->dp_prod - dp->dp_cons;
> + dr->dr_block = splhigh();
I understand we want to avoid recursion while we are manipulating an
event. Could we use a per thread flag similar to P_INKTR to achieve
this goal? It should be easy to check for this flag in DT_ENTER() & co.
My intention is to rely on as few subsystems as possible inside dt(4)
to be able to inspect them.
> + membar_consumer();
> + prod = dr->dr_prod;
> + cons = dr->dr_cons;
> + distance = prod - cons;
> if (distance == 1 || distance == (1 - DT_EVTRING_SIZE)) {
> + splx(dr->dr_block);
> +
> /* read(2) isn't finished */
> - dp->dp_dropevt++;
> - mtx_leave(&dp->dp_mtx);
> + atomic_inc_int(&dr->dr_dropevt);
> return NULL;
> }
>
> /*
> * Save states in next free event slot.
> */
> - dtev = &dp->dp_ring[dp->dp_cons];
> + dtev = &dr->dr_ring[cons];
> memset(dtev, 0, sizeof(*dtev));
>
> dtev->dtev_pbn = dp->dp_dtp->dtp_pbn;
> @@ -722,25 +755,27 @@ dt_pcb_ring_get(struct dt_pcb *dp, int profiling)
> void
> dt_pcb_ring_consume(struct dt_pcb *dp, struct dt_evt *dtev)
> {
> - MUTEX_ASSERT_LOCKED(&dp->dp_mtx);
> - KASSERT(dtev == &dp->dp_ring[dp->dp_cons]);
> + struct dt_ringbuf *dr = &dp->dp_sc->ds_ring[cpu_number()];
>
> - dp->dp_cons = (dp->dp_cons + 1) % DT_EVTRING_SIZE;
> - mtx_leave(&dp->dp_mtx);
> + KASSERT(dtev == &dr->dr_ring[dr->dr_cons]);
> +
> + dr->dr_cons = (dr->dr_cons + 1) % DT_EVTRING_SIZE;
> + membar_producer();
> +
> + splx(dr->dr_block);
> +
> + atomic_inc_int(&dp->dp_sc->ds_evtcnt);
>
> - mtx_enter(&dp->dp_sc->ds_mtx);
> - dp->dp_sc->ds_evtcnt++;
> - mtx_leave(&dp->dp_sc->ds_mtx);
> dt_wakeup(dp->dp_sc);
> }
>
> /*
> - * Copy at most `max' events from `dp', producing the same amount
> + * Copy at most `max' events from `dr', producing the same amount
> * of free slots.
> */
> int
> -dt_pcb_ring_copy(struct dt_pcb *dp, struct uio *uio, size_t max,
> - size_t *rcvd, uint64_t *dropped)
> +dt_ring_copy(struct dt_ringbuf *dr, struct uio *uio, size_t max,
> + size_t *rcvd, uint64_t *dropped)
> {
> size_t count, copied = 0;
> unsigned int cons, prod;
> @@ -748,12 +783,10 @@ dt_pcb_ring_copy(struct dt_pcb *dp, struct uio *uio, size_t max,
>
> KASSERT(max > 0);
>
> - mtx_enter(&dp->dp_mtx);
> - cons = dp->dp_cons;
> - prod = dp->dp_prod;
> - *dropped += dp->dp_dropevt;
> - dp->dp_dropevt = 0;
> - mtx_leave(&dp->dp_mtx);
> + membar_consumer();
> + cons = dr->dr_cons;
> + prod = dr->dr_prod;
> + *dropped += atomic_swap_uint(&dr->dr_dropevt, 0);
>
> if (cons < prod)
> count = DT_EVTRING_SIZE - prod;
> @@ -764,7 +797,7 @@ dt_pcb_ring_copy(struct dt_pcb *dp, struct uio *uio, size_t max,
> return 0;
>
> count = MIN(count, max);
> - error = uiomove(&dp->dp_ring[prod], count * sizeof(struct dt_evt), uio);
> + error = uiomove(&dr->dr_ring[prod], count * sizeof(struct dt_evt), uio);
> if (error)
> return error;
> copied += count;
> @@ -777,7 +810,7 @@ dt_pcb_ring_copy(struct dt_pcb *dp, struct uio *uio, size_t max,
> goto out;
>
> count = MIN(cons, (max - copied));
> - error = uiomove(&dp->dp_ring[0], count * sizeof(struct dt_evt), uio);
> + error = uiomove(&dr->dr_ring[0], count * sizeof(struct dt_evt), uio);
> if (error)
> goto out;
>
> @@ -785,9 +818,8 @@ dt_pcb_ring_copy(struct dt_pcb *dp, struct uio *uio, size_t max,
> prod += count;
>
> out:
> - mtx_enter(&dp->dp_mtx);
> - dp->dp_prod = prod;
> - mtx_leave(&dp->dp_mtx);
> + dr->dr_prod = prod;
> + membar_producer();
>
> *rcvd = copied;
> return error;
> diff --git a/sys/dev/dt/dtvar.h b/sys/dev/dt/dtvar.h
> index 5bb7ab0d7715..07e2b6c2f6df 100644
> --- a/sys/dev/dt/dtvar.h
> +++ b/sys/dev/dt/dtvar.h
> @@ -163,12 +163,6 @@ struct dt_pcb {
> SMR_SLIST_ENTRY(dt_pcb) dp_pnext; /* [K,S] next PCB per probe */
> TAILQ_ENTRY(dt_pcb) dp_snext; /* [K] next PCB per softc */
>
> - /* Event states ring */
> - unsigned int dp_prod; /* [m] read index */
> - unsigned int dp_cons; /* [m] write index */
> - struct dt_evt *dp_ring; /* [m] ring of event states */
> - struct mutex dp_mtx;
> -
> struct dt_softc *dp_sc; /* [I] related softc */
> struct dt_probe *dp_dtp; /* [I] related probe */
> uint64_t dp_evtflags; /* [I] event states to record */
> @@ -177,9 +171,6 @@ struct dt_pcb {
> struct clockintr dp_clockintr; /* [D] profiling handle */
> uint64_t dp_nsecs; /* [I] profiling period */
> struct cpu_info *dp_cpu; /* [I] on which CPU */
> -
> - /* Counters */
> - uint64_t dp_dropevt; /* [m] # dropped event */
> };
>
> TAILQ_HEAD(dt_pcb_list, dt_pcb);
dt: Use one ringbuffer per CPU