Index | Thread | Search

From:
Christian Ludwig <cludwig@genua.de>
Subject:
dt: Use one ringbuffer per CPU
To:
<tech@openbsd.org>
Date:
Wed, 11 Sep 2024 17:15:38 +0200

Download raw body.

Thread
Hi,

The current dt(4) implentation uses one ringbuffer per PCB. That means
the number of ringbuffers scales with the number of probes. It also
means that each CPU needs to lock a ringbuffer to fill an event slot.

Move to using one ringbuffer per CPU instead. Now the number of buffers
scales with the number of online CPUs. And each CPU does not need to
synchronize against other CPUs anymore. They still have to synchronize
against their own nested events, though. Synchronization with the reader
happens via atomic operations. That gets us rid of the PCB mutex.

Also update counters with atomic operations. That gets rid of the softc
mutex.

In my tests, btrace processes more events now during the same timeframe.
Please test and review. Thank you.


 - Christian

---
 sys/dev/dt/dt_dev.c | 160 ++++++++++++++++++++++++++------------------
 sys/dev/dt/dtvar.h  |   9 ---
 2 files changed, 94 insertions(+), 75 deletions(-)

diff --git a/sys/dev/dt/dt_dev.c b/sys/dev/dt/dt_dev.c
index 05ce734abf44..52ba947401e5 100644
--- a/sys/dev/dt/dt_dev.c
+++ b/sys/dev/dt/dt_dev.c
@@ -85,14 +85,33 @@
 
 #define DPRINTF(x...) /* nothing */
 
+/*
+ * Event states ring
+ *
+ *  Locks used to protect struct members:
+ *	a	atomic
+ *	r	owned by read(2) proc
+ *	c	owned by CPU
+ *	s	sliced ownership, based on read/write indexes
+ */
+struct dt_ringbuf {
+	unsigned int		 dr_prod;	/* [a,r] read index */
+	unsigned int		 dr_cons;	/* [a,c] write index */
+	struct dt_evt		*dr_ring;	/* [s] ring of event states */
+	int			 dr_block;	/* [c] block nested events */
+
+	/* Counters */
+	unsigned int		 dr_dropevt;	/* [a] # of dropped events */
+};
+
 /*
  * Descriptor associated with each program opening /dev/dt.  It is used
  * to keep track of enabled PCBs.
  *
  *  Locks used to protect struct members in this file:
  *	a	atomic
- *	m	per-softc mutex
  *	K	kernel lock
+ *	I	invariant after initialization
  */
 struct dt_softc {
 	SLIST_ENTRY(dt_softc)	 ds_next;	/* [K] descriptor list */
@@ -100,15 +119,15 @@ struct dt_softc {
 	pid_t			 ds_pid;	/* [I] PID of tracing program */
 	void			*ds_si;		/* [I] to defer wakeup(9) */
 
-	struct mutex		 ds_mtx;
-
 	struct dt_pcb_list	 ds_pcbs;	/* [K] list of enabled PCBs */
 	int			 ds_recording;	/* [K] currently recording? */
-	int			 ds_evtcnt;	/* [m] # of readable evts */
+	unsigned int		 ds_evtcnt;	/* [a] # of readable evts */
+
+	struct dt_ringbuf	 ds_ring[MAXCPUS]; /* [I] event rings */
 
 	/* Counters */
-	uint64_t		 ds_readevt;	/* [m] # of events read */
-	uint64_t		 ds_dropevt;	/* [m] # of events dropped */
+	unsigned int		 ds_readevt;	/* [a] # of events read */
+	unsigned int		 ds_dropevt;	/* [a] # of events dropped */
 };
 
 SLIST_HEAD(, dt_softc) dtdev_list;	/* [K] list of open /dev/dt nodes */
@@ -142,7 +161,7 @@ int	dt_ioctl_probe_enable(struct dt_softc *, struct dtioc_req *);
 int	dt_ioctl_probe_disable(struct dt_softc *, struct dtioc_req *);
 int	dt_ioctl_get_auxbase(struct dt_softc *, struct dtioc_getaux *);
 
-int	dt_pcb_ring_copy(struct dt_pcb *, struct uio *, size_t, size_t *,
+int	dt_ring_copy(struct dt_ringbuf *, struct uio *, size_t, size_t *,
 		uint64_t *);
 
 void	dt_wakeup(struct dt_softc *);
@@ -167,7 +186,8 @@ int
 dtopen(dev_t dev, int flags, int mode, struct proc *p)
 {
 	struct dt_softc *sc;
-	int unit = minor(dev);
+	struct dt_evt *dtev;
+	int i, unit = minor(dev);
 
 	if (atomic_load_int(&allowdt) == 0)
 		return EPERM;
@@ -176,16 +196,30 @@ dtopen(dev_t dev, int flags, int mode, struct proc *p)
 	if (sc == NULL)
 		return ENOMEM;
 
-	/* no sleep after this point */
 	if (dtlookup(unit) != NULL) {
 		free(sc, M_DEVBUF, sizeof(*sc));
 		return EBUSY;
 	}
 
+	for (i = 0; i < ncpusfound; i++) {
+		dtev = mallocarray(DT_EVTRING_SIZE, sizeof(*dtev), M_DT,
+				   M_WAITOK|M_CANFAIL|M_ZERO);
+		if (dtev == NULL)
+			break;
+		sc->ds_ring[i].dr_ring = dtev;
+	}
+	if (i < ncpusfound) {
+		for (i--; i >= 0; i--) {
+			dtev = sc->ds_ring[i].dr_ring;
+			free(dtev, M_DT, DT_EVTRING_SIZE * sizeof(*dtev));
+		}
+		free(sc, M_DEVBUF, sizeof(*sc));
+		return ENOMEM;
+	}
+
 	sc->ds_unit = unit;
 	sc->ds_pid = p->p_p->ps_pid;
 	TAILQ_INIT(&sc->ds_pcbs);
-	mtx_init(&sc->ds_mtx, IPL_HIGH);
 	sc->ds_evtcnt = 0;
 	sc->ds_readevt = 0;
 	sc->ds_dropevt = 0;
@@ -206,7 +240,8 @@ int
 dtclose(dev_t dev, int flags, int mode, struct proc *p)
 {
 	struct dt_softc *sc;
-	int unit = minor(dev);
+	struct dt_evt *dtev;
+	int i, unit = minor(dev);
 
 	sc = dtlookup(unit);
 	KASSERT(sc != NULL);
@@ -218,6 +253,10 @@ dtclose(dev_t dev, int flags, int mode, struct proc *p)
 	dt_pcb_purge(&sc->ds_pcbs);
 	softintr_disestablish(sc->ds_si);
 
+	for (i = 0; i < ncpusfound; i++) {
+		dtev = sc->ds_ring[i].dr_ring;
+		free(dtev, M_DT, DT_EVTRING_SIZE * sizeof(*dtev));
+	}
 	free(sc, M_DEVBUF, sizeof(*sc));
 
 	return 0;
@@ -227,8 +266,8 @@ int
 dtread(dev_t dev, struct uio *uio, int flags)
 {
 	struct dt_softc *sc;
-	struct dt_pcb *dp;
-	int error = 0, unit = minor(dev);
+	struct dt_ringbuf *dr;
+	int i, error = 0, unit = minor(dev);
 	size_t count, max, read = 0;
 	uint64_t dropped = 0;
 
@@ -239,9 +278,9 @@ dtread(dev_t dev, struct uio *uio, int flags)
 	if (max < 1)
 		return (EMSGSIZE);
 
-	while (!sc->ds_evtcnt) {
+	while (!atomic_load_int(&sc->ds_evtcnt)) {
 		sleep_setup(sc, PWAIT | PCATCH, "dtread");
-		error = sleep_finish(0, !sc->ds_evtcnt);
+		error = sleep_finish(0, !atomic_load_int(&sc->ds_evtcnt));
 		if (error == EINTR || error == ERESTART)
 			break;
 	}
@@ -249,9 +288,10 @@ dtread(dev_t dev, struct uio *uio, int flags)
 		return error;
 
 	KERNEL_ASSERT_LOCKED();
-	TAILQ_FOREACH(dp, &sc->ds_pcbs, dp_snext) {
+	for (i = 0; i < ncpusfound; i++) {
 		count = 0;
-		error = dt_pcb_ring_copy(dp, uio, max, &count, &dropped);
+		dr = &sc->ds_ring[i];
+		error = dt_ring_copy(dr, uio, max, &count, &dropped);
 		if (error && count == 0)
 			break;
 
@@ -261,11 +301,9 @@ dtread(dev_t dev, struct uio *uio, int flags)
 			break;
 	}
 
-	mtx_enter(&sc->ds_mtx);
-	sc->ds_evtcnt -= read;
-	sc->ds_readevt += read;
-	sc->ds_dropevt += dropped;
-	mtx_leave(&sc->ds_mtx);
+	atomic_add_int(&sc->ds_readevt, read);
+	atomic_add_int(&sc->ds_dropevt, dropped);
+	atomic_sub_int(&sc->ds_evtcnt, read);
 
 	return error;
 }
@@ -434,10 +472,8 @@ dt_ioctl_get_args(struct dt_softc *sc, struct dtioc_arg *dtar)
 int
 dt_ioctl_get_stats(struct dt_softc *sc, struct dtioc_stat *dtst)
 {
-	mtx_enter(&sc->ds_mtx);
-	dtst->dtst_readevt = sc->ds_readevt;
-	dtst->dtst_dropevt = sc->ds_dropevt;
-	mtx_leave(&sc->ds_mtx);
+	dtst->dtst_readevt = atomic_load_int(&sc->ds_readevt);
+	dtst->dtst_dropevt = atomic_load_int(&sc->ds_dropevt);
 
 	return 0;
 }
@@ -637,28 +673,16 @@ dt_pcb_alloc(struct dt_probe *dtp, struct dt_softc *sc)
 
 	dp = malloc(sizeof(*dp), M_DT, M_WAITOK|M_CANFAIL|M_ZERO);
 	if (dp == NULL)
-		goto bad;
+		return NULL;
 
-	dp->dp_ring = mallocarray(DT_EVTRING_SIZE, sizeof(*dp->dp_ring), M_DT,
-	    M_WAITOK|M_CANFAIL|M_ZERO);
-	if (dp->dp_ring == NULL)
-		goto bad;
-
-	mtx_init(&dp->dp_mtx, IPL_HIGH);
 	dp->dp_sc = sc;
 	dp->dp_dtp = dtp;
 	return dp;
-bad:
-	dt_pcb_free(dp);
-	return NULL;
 }
 
 void
 dt_pcb_free(struct dt_pcb *dp)
 {
-	if (dp == NULL)
-		return;
-	free(dp->dp_ring, M_DT, DT_EVTRING_SIZE * sizeof(*dp->dp_ring));
 	free(dp, M_DT, sizeof(*dp));
 }
 
@@ -681,21 +705,26 @@ dt_pcb_ring_get(struct dt_pcb *dp, int profiling)
 {
 	struct proc *p = curproc;
 	struct dt_evt *dtev;
-	int distance;
+	int prod, cons, distance;
+	struct dt_ringbuf *dr = &dp->dp_sc->ds_ring[cpu_number()];
 
-	mtx_enter(&dp->dp_mtx);
-	distance = dp->dp_prod - dp->dp_cons;
+	dr->dr_block = splhigh();
+	membar_consumer();
+	prod = dr->dr_prod;
+	cons = dr->dr_cons;
+	distance = prod - cons;
 	if (distance == 1 || distance == (1 - DT_EVTRING_SIZE)) {
+		splx(dr->dr_block);
+
 		/* read(2) isn't finished */
-		dp->dp_dropevt++;
-		mtx_leave(&dp->dp_mtx);
+		atomic_inc_int(&dr->dr_dropevt);
 		return NULL;
 	}
 
 	/*
 	 * Save states in next free event slot.
 	 */
-	dtev = &dp->dp_ring[dp->dp_cons];
+	dtev = &dr->dr_ring[cons];
 	memset(dtev, 0, sizeof(*dtev));
 
 	dtev->dtev_pbn = dp->dp_dtp->dtp_pbn;
@@ -722,25 +751,27 @@ dt_pcb_ring_get(struct dt_pcb *dp, int profiling)
 void
 dt_pcb_ring_consume(struct dt_pcb *dp, struct dt_evt *dtev)
 {
-	MUTEX_ASSERT_LOCKED(&dp->dp_mtx);
-	KASSERT(dtev == &dp->dp_ring[dp->dp_cons]);
+	struct dt_ringbuf *dr = &dp->dp_sc->ds_ring[cpu_number()];
 
-	dp->dp_cons = (dp->dp_cons + 1) % DT_EVTRING_SIZE;
-	mtx_leave(&dp->dp_mtx);
+	KASSERT(dtev == &dr->dr_ring[dr->dr_cons]);
+
+	dr->dr_cons = (dr->dr_cons + 1) % DT_EVTRING_SIZE;
+	membar_producer();
+
+	splx(dr->dr_block);
+
+	atomic_inc_int(&dp->dp_sc->ds_evtcnt);
 
-	mtx_enter(&dp->dp_sc->ds_mtx);
-	dp->dp_sc->ds_evtcnt++;
-	mtx_leave(&dp->dp_sc->ds_mtx);
 	dt_wakeup(dp->dp_sc);
 }
 
 /*
- * Copy at most `max' events from `dp', producing the same amount
+ * Copy at most `max' events from `dr', producing the same amount
  * of free slots.
  */
 int
-dt_pcb_ring_copy(struct dt_pcb *dp, struct uio *uio, size_t max,
-		size_t *rcvd, uint64_t *dropped)
+dt_ring_copy(struct dt_ringbuf *dr, struct uio *uio, size_t max,
+	     size_t *rcvd, uint64_t *dropped)
 {
 	size_t count, copied = 0;
 	unsigned int cons, prod;
@@ -748,12 +779,10 @@ dt_pcb_ring_copy(struct dt_pcb *dp, struct uio *uio, size_t max,
 
 	KASSERT(max > 0);
 
-	mtx_enter(&dp->dp_mtx);
-	cons = dp->dp_cons;
-	prod = dp->dp_prod;
-	*dropped += dp->dp_dropevt;
-	dp->dp_dropevt = 0;
-	mtx_leave(&dp->dp_mtx);
+	membar_consumer();
+	cons = dr->dr_cons;
+	prod = dr->dr_prod;
+	*dropped += atomic_swap_uint(&dr->dr_dropevt, 0);
 
 	if (cons < prod)
 		count = DT_EVTRING_SIZE - prod;
@@ -764,7 +793,7 @@ dt_pcb_ring_copy(struct dt_pcb *dp, struct uio *uio, size_t max,
 		return 0;
 
 	count = MIN(count, max);
-	error = uiomove(&dp->dp_ring[prod], count * sizeof(struct dt_evt), uio);
+	error = uiomove(&dr->dr_ring[prod], count * sizeof(struct dt_evt), uio);
 	if (error)
 		return error;
 	copied += count;
@@ -777,7 +806,7 @@ dt_pcb_ring_copy(struct dt_pcb *dp, struct uio *uio, size_t max,
 		goto out;
 
 	count = MIN(cons, (max - copied));
-	error = uiomove(&dp->dp_ring[0], count * sizeof(struct dt_evt), uio);
+	error = uiomove(&dr->dr_ring[0], count * sizeof(struct dt_evt), uio);
 	if (error)
 		goto out;
 
@@ -785,9 +814,8 @@ dt_pcb_ring_copy(struct dt_pcb *dp, struct uio *uio, size_t max,
 	prod += count;
 
 out:
-	mtx_enter(&dp->dp_mtx);
-	dp->dp_prod = prod;
-	mtx_leave(&dp->dp_mtx);
+	dr->dr_prod = prod;
+	membar_producer();
 
 	*rcvd = copied;
 	return error;
diff --git a/sys/dev/dt/dtvar.h b/sys/dev/dt/dtvar.h
index 5bb7ab0d7715..07e2b6c2f6df 100644
--- a/sys/dev/dt/dtvar.h
+++ b/sys/dev/dt/dtvar.h
@@ -163,12 +163,6 @@ struct dt_pcb {
 	SMR_SLIST_ENTRY(dt_pcb)	 dp_pnext;	/* [K,S] next PCB per probe */
 	TAILQ_ENTRY(dt_pcb)	 dp_snext;	/* [K] next PCB per softc */
 
-	/* Event states ring */
-	unsigned int		 dp_prod;	/* [m] read index */
-	unsigned int		 dp_cons;	/* [m] write index */
-	struct dt_evt		*dp_ring;	/* [m] ring of event states */
-	struct mutex		 dp_mtx;
-
 	struct dt_softc		*dp_sc;		/* [I] related softc */
 	struct dt_probe		*dp_dtp;	/* [I] related probe */
 	uint64_t		 dp_evtflags;	/* [I] event states to record */
@@ -177,9 +171,6 @@ struct dt_pcb {
 	struct clockintr	 dp_clockintr;	/* [D] profiling handle */
 	uint64_t		 dp_nsecs;	/* [I] profiling period */
 	struct cpu_info		*dp_cpu;	/* [I] on which CPU */
-
-	/* Counters */
-	uint64_t		 dp_dropevt;	/* [m] # dropped event */
 };
 
 TAILQ_HEAD(dt_pcb_list, dt_pcb);
-- 
2.34.1