Index | Thread | Search

From:
Christian Ludwig <cludwig@genua.de>
Subject:
Re: dt: Use one ringbuffer per CPU
To:
Martin Pieuchot <mpi@grenadille.net>
Cc:
<tech@openbsd.org>
Date:
Wed, 30 Oct 2024 21:17:36 +0100

Download raw body.

Thread
Hi,

please find an updated diff below. I have incorporated all comments.
I guard against recursion with a CPU-local variable now.


 - Christian

---
 sys/dev/dt/dt_dev.c | 203 +++++++++++++++++++++++++++++++++-------------------
 sys/dev/dt/dtvar.h  |   9 ---
 2 files changed, 130 insertions(+), 82 deletions(-)

diff --git a/sys/dev/dt/dt_dev.c b/sys/dev/dt/dt_dev.c
index 05ce734abf44..a22f5997fd77 100644
--- a/sys/dev/dt/dt_dev.c
+++ b/sys/dev/dt/dt_dev.c
@@ -85,14 +85,35 @@
 
 #define DPRINTF(x...) /* nothing */
 
+/*
+ * Per-CPU Event States
+ *
+ *  Locks used to protect struct members:
+ *	r	owned by read(2) proc
+ *	c	owned by CPU
+ *	s	sliced ownership, based on read/write indexes
+ *	S	written by CPU, read by read(2) proc
+ */
+struct dt_cpubuf {
+	unsigned int		 dc_prod;	/* [r] read index */
+	unsigned int		 dc_cons;	/* [c] write index */
+	struct dt_evt		*dc_ring;	/* [s] ring of event states */
+	unsigned int	 	 dc_inevt;	/* [c] in event already? */
+
+	/* Counters */
+	unsigned int		 dc_dropevt;	/* [S] # of events dropped */
+	unsigned int		 dc_readevt;	/* [r] # of events read */
+};
+
 /*
  * Descriptor associated with each program opening /dev/dt.  It is used
  * to keep track of enabled PCBs.
  *
  *  Locks used to protect struct members in this file:
  *	a	atomic
- *	m	per-softc mutex
  *	K	kernel lock
+ *	r	owned by read(2) proc
+ *	I	invariant after initialization
  */
 struct dt_softc {
 	SLIST_ENTRY(dt_softc)	 ds_next;	/* [K] descriptor list */
@@ -100,15 +121,12 @@ struct dt_softc {
 	pid_t			 ds_pid;	/* [I] PID of tracing program */
 	void			*ds_si;		/* [I] to defer wakeup(9) */
 
-	struct mutex		 ds_mtx;
-
 	struct dt_pcb_list	 ds_pcbs;	/* [K] list of enabled PCBs */
 	int			 ds_recording;	/* [K] currently recording? */
-	int			 ds_evtcnt;	/* [m] # of readable evts */
+	unsigned int		 ds_evtcnt;	/* [a] # of readable evts */
 
-	/* Counters */
-	uint64_t		 ds_readevt;	/* [m] # of events read */
-	uint64_t		 ds_dropevt;	/* [m] # of events dropped */
+	struct dt_cpubuf	 ds_cpu[MAXCPUS]; /* [I] Per-cpu event states */
+	unsigned int		 ds_lastcpu;	/* [r] last CPU ring read(2). */
 };
 
 SLIST_HEAD(, dt_softc) dtdev_list;	/* [K] list of open /dev/dt nodes */
@@ -132,6 +150,8 @@ int	dtread(dev_t, struct uio *, int);
 int	dtioctl(dev_t, u_long, caddr_t, int, struct proc *);
 
 struct	dt_softc *dtlookup(int);
+struct	dt_softc *dtalloc(void);
+void	dtfree(struct dt_softc *);
 
 int	dt_ioctl_list_probes(struct dt_softc *, struct dtioc_probe *);
 int	dt_ioctl_get_args(struct dt_softc *, struct dtioc_arg *);
@@ -142,8 +162,7 @@ int	dt_ioctl_probe_enable(struct dt_softc *, struct dtioc_req *);
 int	dt_ioctl_probe_disable(struct dt_softc *, struct dtioc_req *);
 int	dt_ioctl_get_auxbase(struct dt_softc *, struct dtioc_getaux *);
 
-int	dt_pcb_ring_copy(struct dt_pcb *, struct uio *, size_t, size_t *,
-		uint64_t *);
+int	dt_ring_copy(struct dt_cpubuf *, struct uio *, size_t, size_t *);
 
 void	dt_wakeup(struct dt_softc *);
 void	dt_deferred_wakeup(void *);
@@ -172,26 +191,24 @@ dtopen(dev_t dev, int flags, int mode, struct proc *p)
 	if (atomic_load_int(&allowdt) == 0)
 		return EPERM;
 
-	sc = malloc(sizeof(*sc), M_DEVBUF, M_WAITOK|M_CANFAIL|M_ZERO);
+	sc = dtalloc();
 	if (sc == NULL)
 		return ENOMEM;
 
 	/* no sleep after this point */
 	if (dtlookup(unit) != NULL) {
-		free(sc, M_DEVBUF, sizeof(*sc));
+		dtfree(sc);
 		return EBUSY;
 	}
 
 	sc->ds_unit = unit;
 	sc->ds_pid = p->p_p->ps_pid;
 	TAILQ_INIT(&sc->ds_pcbs);
-	mtx_init(&sc->ds_mtx, IPL_HIGH);
+	sc->ds_lastcpu = 0;
 	sc->ds_evtcnt = 0;
-	sc->ds_readevt = 0;
-	sc->ds_dropevt = 0;
 	sc->ds_si = softintr_establish(IPL_SOFTCLOCK, dt_deferred_wakeup, sc);
 	if (sc->ds_si == NULL) {
-		free(sc, M_DEVBUF, sizeof(*sc));
+		dtfree(sc);
 		return ENOMEM;
 	}
 
@@ -217,8 +234,7 @@ dtclose(dev_t dev, int flags, int mode, struct proc *p)
 	dt_ioctl_record_stop(sc);
 	dt_pcb_purge(&sc->ds_pcbs);
 	softintr_disestablish(sc->ds_si);
-
-	free(sc, M_DEVBUF, sizeof(*sc));
+	dtfree(sc);
 
 	return 0;
 }
@@ -227,10 +243,9 @@ int
 dtread(dev_t dev, struct uio *uio, int flags)
 {
 	struct dt_softc *sc;
-	struct dt_pcb *dp;
-	int error = 0, unit = minor(dev);
+	struct dt_cpubuf *dc;
+	int i, error = 0, unit = minor(dev);
 	size_t count, max, read = 0;
-	uint64_t dropped = 0;
 
 	sc = dtlookup(unit);
 	KASSERT(sc != NULL);
@@ -239,9 +254,9 @@ dtread(dev_t dev, struct uio *uio, int flags)
 	if (max < 1)
 		return (EMSGSIZE);
 
-	while (!sc->ds_evtcnt) {
+	while (!atomic_load_int(&sc->ds_evtcnt)) {
 		sleep_setup(sc, PWAIT | PCATCH, "dtread");
-		error = sleep_finish(0, !sc->ds_evtcnt);
+		error = sleep_finish(0, !atomic_load_int(&sc->ds_evtcnt));
 		if (error == EINTR || error == ERESTART)
 			break;
 	}
@@ -249,9 +264,10 @@ dtread(dev_t dev, struct uio *uio, int flags)
 		return error;
 
 	KERNEL_ASSERT_LOCKED();
-	TAILQ_FOREACH(dp, &sc->ds_pcbs, dp_snext) {
+	for (i = 0; i < ncpusfound; i++) {
 		count = 0;
-		error = dt_pcb_ring_copy(dp, uio, max, &count, &dropped);
+		dc = &sc->ds_cpu[(sc->ds_lastcpu + i) % ncpusfound];
+		error = dt_ring_copy(dc, uio, max, &count);
 		if (error && count == 0)
 			break;
 
@@ -260,12 +276,9 @@ dtread(dev_t dev, struct uio *uio, int flags)
 		if (max == 0)
 			break;
 	}
+	sc->ds_lastcpu += i % ncpusfound;
 
-	mtx_enter(&sc->ds_mtx);
-	sc->ds_evtcnt -= read;
-	sc->ds_readevt += read;
-	sc->ds_dropevt += dropped;
-	mtx_leave(&sc->ds_mtx);
+	atomic_sub_int(&sc->ds_evtcnt, read);
 
 	return error;
 }
@@ -339,6 +352,46 @@ dtlookup(int unit)
 	return sc;
 }
 
+struct dt_softc *
+dtalloc(void)
+{
+	struct dt_softc *sc;
+	struct dt_evt *dtev;
+	int i;
+
+	sc = malloc(sizeof(*sc), M_DEVBUF, M_WAITOK|M_CANFAIL|M_ZERO);
+	if (sc == NULL)
+		return NULL;
+
+	for (i = 0; i < ncpusfound; i++) {
+		dtev = mallocarray(DT_EVTRING_SIZE, sizeof(*dtev), M_DT,
+				   M_WAITOK|M_CANFAIL|M_ZERO);
+		if (dtev == NULL)
+			break;
+		sc->ds_cpu[i].dc_ring = dtev;
+	}
+	if (i < ncpusfound) {
+		dtfree(sc);
+		return NULL;
+	}
+
+	return sc;
+}
+
+void
+dtfree(struct dt_softc *sc)
+{
+	struct dt_evt *dtev;
+	int i;
+
+	for (i = 0; i < ncpusfound; i++) {
+		dtev = sc->ds_cpu[i].dc_ring;
+		if (dtev != NULL)
+			free(dtev, M_DT, DT_EVTRING_SIZE * sizeof(*dtev));
+	}
+	free(sc, M_DEVBUF, sizeof(*sc));
+}
+
 int
 dt_ioctl_list_probes(struct dt_softc *sc, struct dtioc_probe *dtpr)
 {
@@ -434,11 +487,20 @@ dt_ioctl_get_args(struct dt_softc *sc, struct dtioc_arg *dtar)
 int
 dt_ioctl_get_stats(struct dt_softc *sc, struct dtioc_stat *dtst)
 {
-	mtx_enter(&sc->ds_mtx);
-	dtst->dtst_readevt = sc->ds_readevt;
-	dtst->dtst_dropevt = sc->ds_dropevt;
-	mtx_leave(&sc->ds_mtx);
+	struct dt_cpubuf *dc;
+	uint64_t readevt = 0, dropevt = 0;
+	int i;
 
+	for (i = 0; i < ncpusfound; i++) {
+		dc = &sc->ds_cpu[i];
+
+		membar_consumer();
+		dropevt += dc->dc_dropevt;
+		readevt += dc->dc_readevt;
+	}
+
+	dtst->dtst_readevt = readevt;
+	dtst->dtst_dropevt = dropevt;
 	return 0;
 }
 
@@ -637,28 +699,16 @@ dt_pcb_alloc(struct dt_probe *dtp, struct dt_softc *sc)
 
 	dp = malloc(sizeof(*dp), M_DT, M_WAITOK|M_CANFAIL|M_ZERO);
 	if (dp == NULL)
-		goto bad;
+		return NULL;
 
-	dp->dp_ring = mallocarray(DT_EVTRING_SIZE, sizeof(*dp->dp_ring), M_DT,
-	    M_WAITOK|M_CANFAIL|M_ZERO);
-	if (dp->dp_ring == NULL)
-		goto bad;
-
-	mtx_init(&dp->dp_mtx, IPL_HIGH);
 	dp->dp_sc = sc;
 	dp->dp_dtp = dtp;
 	return dp;
-bad:
-	dt_pcb_free(dp);
-	return NULL;
 }
 
 void
 dt_pcb_free(struct dt_pcb *dp)
 {
-	if (dp == NULL)
-		return;
-	free(dp->dp_ring, M_DT, DT_EVTRING_SIZE * sizeof(*dp->dp_ring));
 	free(dp, M_DT, sizeof(*dp));
 }
 
@@ -681,21 +731,31 @@ dt_pcb_ring_get(struct dt_pcb *dp, int profiling)
 {
 	struct proc *p = curproc;
 	struct dt_evt *dtev;
-	int distance;
+	int prod, cons, distance;
+	struct dt_cpubuf *dc = &dp->dp_sc->ds_cpu[cpu_number()];
 
-	mtx_enter(&dp->dp_mtx);
-	distance = dp->dp_prod - dp->dp_cons;
+	if (dc->dc_inevt == 1)
+		return NULL;
+
+	dc->dc_inevt = 1;
+
+	membar_consumer();
+	prod = dc->dc_prod;
+	cons = dc->dc_cons;
+	distance = prod - cons;
 	if (distance == 1 || distance == (1 - DT_EVTRING_SIZE)) {
 		/* read(2) isn't finished */
-		dp->dp_dropevt++;
-		mtx_leave(&dp->dp_mtx);
+		dc->dc_dropevt++;
+		membar_producer();
+
+		dc->dc_inevt = 0;
 		return NULL;
 	}
 
 	/*
 	 * Save states in next free event slot.
 	 */
-	dtev = &dp->dp_ring[dp->dp_cons];
+	dtev = &dc->dc_ring[cons];
 	memset(dtev, 0, sizeof(*dtev));
 
 	dtev->dtev_pbn = dp->dp_dtp->dtp_pbn;
@@ -722,25 +782,25 @@ dt_pcb_ring_get(struct dt_pcb *dp, int profiling)
 void
 dt_pcb_ring_consume(struct dt_pcb *dp, struct dt_evt *dtev)
 {
-	MUTEX_ASSERT_LOCKED(&dp->dp_mtx);
-	KASSERT(dtev == &dp->dp_ring[dp->dp_cons]);
+	struct dt_cpubuf *dc = &dp->dp_sc->ds_cpu[cpu_number()];
 
-	dp->dp_cons = (dp->dp_cons + 1) % DT_EVTRING_SIZE;
-	mtx_leave(&dp->dp_mtx);
+	KASSERT(dtev == &dc->dc_ring[dc->dc_cons]);
+
+	dc->dc_cons = (dc->dc_cons + 1) % DT_EVTRING_SIZE;
+	membar_producer();
+
+	atomic_inc_int(&dp->dp_sc->ds_evtcnt);
+	dc->dc_inevt = 0;
 
-	mtx_enter(&dp->dp_sc->ds_mtx);
-	dp->dp_sc->ds_evtcnt++;
-	mtx_leave(&dp->dp_sc->ds_mtx);
 	dt_wakeup(dp->dp_sc);
 }
 
 /*
- * Copy at most `max' events from `dp', producing the same amount
+ * Copy at most `max' events from `dc', producing the same amount
  * of free slots.
  */
 int
-dt_pcb_ring_copy(struct dt_pcb *dp, struct uio *uio, size_t max,
-		size_t *rcvd, uint64_t *dropped)
+dt_ring_copy(struct dt_cpubuf *dc, struct uio *uio, size_t max, size_t *rcvd)
 {
 	size_t count, copied = 0;
 	unsigned int cons, prod;
@@ -748,12 +808,9 @@ dt_pcb_ring_copy(struct dt_pcb *dp, struct uio *uio, size_t max,
 
 	KASSERT(max > 0);
 
-	mtx_enter(&dp->dp_mtx);
-	cons = dp->dp_cons;
-	prod = dp->dp_prod;
-	*dropped += dp->dp_dropevt;
-	dp->dp_dropevt = 0;
-	mtx_leave(&dp->dp_mtx);
+	membar_consumer();
+	cons = dc->dc_cons;
+	prod = dc->dc_prod;
 
 	if (cons < prod)
 		count = DT_EVTRING_SIZE - prod;
@@ -764,7 +821,7 @@ dt_pcb_ring_copy(struct dt_pcb *dp, struct uio *uio, size_t max,
 		return 0;
 
 	count = MIN(count, max);
-	error = uiomove(&dp->dp_ring[prod], count * sizeof(struct dt_evt), uio);
+	error = uiomove(&dc->dc_ring[prod], count * sizeof(struct dt_evt), uio);
 	if (error)
 		return error;
 	copied += count;
@@ -777,7 +834,7 @@ dt_pcb_ring_copy(struct dt_pcb *dp, struct uio *uio, size_t max,
 		goto out;
 
 	count = MIN(cons, (max - copied));
-	error = uiomove(&dp->dp_ring[0], count * sizeof(struct dt_evt), uio);
+	error = uiomove(&dc->dc_ring[0], count * sizeof(struct dt_evt), uio);
 	if (error)
 		goto out;
 
@@ -785,9 +842,9 @@ dt_pcb_ring_copy(struct dt_pcb *dp, struct uio *uio, size_t max,
 	prod += count;
 
 out:
-	mtx_enter(&dp->dp_mtx);
-	dp->dp_prod = prod;
-	mtx_leave(&dp->dp_mtx);
+	dc->dc_readevt += copied;
+	dc->dc_prod = prod;
+	membar_producer();
 
 	*rcvd = copied;
 	return error;
diff --git a/sys/dev/dt/dtvar.h b/sys/dev/dt/dtvar.h
index 5bb7ab0d7715..07e2b6c2f6df 100644
--- a/sys/dev/dt/dtvar.h
+++ b/sys/dev/dt/dtvar.h
@@ -163,12 +163,6 @@ struct dt_pcb {
 	SMR_SLIST_ENTRY(dt_pcb)	 dp_pnext;	/* [K,S] next PCB per probe */
 	TAILQ_ENTRY(dt_pcb)	 dp_snext;	/* [K] next PCB per softc */
 
-	/* Event states ring */
-	unsigned int		 dp_prod;	/* [m] read index */
-	unsigned int		 dp_cons;	/* [m] write index */
-	struct dt_evt		*dp_ring;	/* [m] ring of event states */
-	struct mutex		 dp_mtx;
-
 	struct dt_softc		*dp_sc;		/* [I] related softc */
 	struct dt_probe		*dp_dtp;	/* [I] related probe */
 	uint64_t		 dp_evtflags;	/* [I] event states to record */
@@ -177,9 +171,6 @@ struct dt_pcb {
 	struct clockintr	 dp_clockintr;	/* [D] profiling handle */
 	uint64_t		 dp_nsecs;	/* [I] profiling period */
 	struct cpu_info		*dp_cpu;	/* [I] on which CPU */
-
-	/* Counters */
-	uint64_t		 dp_dropevt;	/* [m] # dropped event */
 };
 
 TAILQ_HEAD(dt_pcb_list, dt_pcb);