Index | Thread | Search

From:
Christian Ludwig <cludwig@genua.de>
Subject:
Re: dt: Use one ringbuffer per CPU
To:
Claudio Jeker <cjeker@diehard.n-r-g.com>
Cc:
<tech@openbsd.org>
Date:
Thu, 12 Sep 2024 18:17:59 +0200

Download raw body.

Thread
On Thu, Sep 12, 2024 at 02:30:50PM +0200 Claudio Jeker wrote:
> On Wed, Sep 11, 2024 at 05:15:38PM +0200, Christian Ludwig wrote:
> > Hi,
> > 
> > The current dt(4) implentation uses one ringbuffer per PCB. That means
> > the number of ringbuffers scales with the number of probes. It also
> > means that each CPU needs to lock a ringbuffer to fill an event slot.
> > 
> > Move to using one ringbuffer per CPU instead. Now the number of buffers
> > scales with the number of online CPUs. And each CPU does not need to
> > synchronize against other CPUs anymore. They still have to synchronize
> > against their own nested events, though. Synchronization with the reader
> > happens via atomic operations. That gets us rid of the PCB mutex.
> > 
> > Also update counters with atomic operations. That gets rid of the softc
> > mutex.
> > 
> > In my tests, btrace processes more events now during the same timeframe.
> > Please test and review. Thank you.
> > 
> 
> I think this is a step in the right direction but I see one mayor issue
> with this. The dt events are not any longer in order. So if a probe fires
> on cpu63 and then shortly after the response fires on cpu0 btrace will see
> the response before the action. This will mess up some btrace features
> like maps and histograms.

In the general case, events are not guaranteed to be globally ordered in
current, too. Suppose the response is in a different probe than the
trigger event, which is very common. Now dtread() loops over all PCBs
and reads the trigger PCB. After that another trigger/response happens.
Now it advances to the response PCB and reads a response, but hasn't
captured the trigger event, yet. That gets read in the next round.

 +--------------+
 | trigger PCB  | (1) read
 +--------------+
         |
 +--------------+
 | another PCB  | (2) read   (3) CPU63 fires trigger
 +--------------+            (4) CPU00 fires response
         |
 +--------------+
 | response PCB | (5) read response without trigger
 +--------------+

> I think other tools solve this by sorting the buffers by timestamp.
> Which can and should be done in userland.

Agreed.

> Also your read function always starts with cpu0 which could result in
> constant starvation of the last few CPUs if the read buffer is not big
> enough.

That problem exists for a big number of PCBs in current, too. We starve
the last probes. With this diff, we are able to extend the read function
to continue where we left off the next time we enter. In fact, I have a
diff for that in my queue already. I wasn't quite sure how to solve this
in current, because PCBs are allowed to vanish in between reads. This
diff keeps buffers available from open to close.

> >  - Christian
> > 
> > ---
> >  sys/dev/dt/dt_dev.c | 160 ++++++++++++++++++++++++++------------------
> >  sys/dev/dt/dtvar.h  |   9 ---
> >  2 files changed, 94 insertions(+), 75 deletions(-)
> > 
> > diff --git a/sys/dev/dt/dt_dev.c b/sys/dev/dt/dt_dev.c
> > index 05ce734abf44..52ba947401e5 100644
> > --- a/sys/dev/dt/dt_dev.c
> > +++ b/sys/dev/dt/dt_dev.c
> > @@ -85,14 +85,33 @@
> >  
> >  #define DPRINTF(x...) /* nothing */
> >  
> > +/*
> > + * Event states ring
> > + *
> > + *  Locks used to protect struct members:
> > + *	a	atomic
> > + *	r	owned by read(2) proc
> > + *	c	owned by CPU
> > + *	s	sliced ownership, based on read/write indexes
> > + */
> > +struct dt_ringbuf {
> > +	unsigned int		 dr_prod;	/* [a,r] read index */
> > +	unsigned int		 dr_cons;	/* [a,c] write index */
> > +	struct dt_evt		*dr_ring;	/* [s] ring of event states */
> > +	int			 dr_block;	/* [c] block nested events */
> > +
> > +	/* Counters */
> > +	unsigned int		 dr_dropevt;	/* [a] # of dropped events */
> > +};
> > +
> >  /*
> >   * Descriptor associated with each program opening /dev/dt.  It is used
> >   * to keep track of enabled PCBs.
> >   *
> >   *  Locks used to protect struct members in this file:
> >   *	a	atomic
> > - *	m	per-softc mutex
> >   *	K	kernel lock
> > + *	I	invariant after initialization
> >   */
> >  struct dt_softc {
> >  	SLIST_ENTRY(dt_softc)	 ds_next;	/* [K] descriptor list */
> > @@ -100,15 +119,15 @@ struct dt_softc {
> >  	pid_t			 ds_pid;	/* [I] PID of tracing program */
> >  	void			*ds_si;		/* [I] to defer wakeup(9) */
> >  
> > -	struct mutex		 ds_mtx;
> > -
> >  	struct dt_pcb_list	 ds_pcbs;	/* [K] list of enabled PCBs */
> >  	int			 ds_recording;	/* [K] currently recording? */
> > -	int			 ds_evtcnt;	/* [m] # of readable evts */
> > +	unsigned int		 ds_evtcnt;	/* [a] # of readable evts */
> > +
> > +	struct dt_ringbuf	 ds_ring[MAXCPUS]; /* [I] event rings */
> >  
> >  	/* Counters */
> > -	uint64_t		 ds_readevt;	/* [m] # of events read */
> > -	uint64_t		 ds_dropevt;	/* [m] # of events dropped */
> > +	unsigned int		 ds_readevt;	/* [a] # of events read */
> > +	unsigned int		 ds_dropevt;	/* [a] # of events dropped */
> >  };
> >  
> >  SLIST_HEAD(, dt_softc) dtdev_list;	/* [K] list of open /dev/dt nodes */
> > @@ -142,7 +161,7 @@ int	dt_ioctl_probe_enable(struct dt_softc *, struct dtioc_req *);
> >  int	dt_ioctl_probe_disable(struct dt_softc *, struct dtioc_req *);
> >  int	dt_ioctl_get_auxbase(struct dt_softc *, struct dtioc_getaux *);
> >  
> > -int	dt_pcb_ring_copy(struct dt_pcb *, struct uio *, size_t, size_t *,
> > +int	dt_ring_copy(struct dt_ringbuf *, struct uio *, size_t, size_t *,
> >  		uint64_t *);
> >  
> >  void	dt_wakeup(struct dt_softc *);
> > @@ -167,7 +186,8 @@ int
> >  dtopen(dev_t dev, int flags, int mode, struct proc *p)
> >  {
> >  	struct dt_softc *sc;
> > -	int unit = minor(dev);
> > +	struct dt_evt *dtev;
> > +	int i, unit = minor(dev);
> >  
> >  	if (atomic_load_int(&allowdt) == 0)
> >  		return EPERM;
> > @@ -176,16 +196,30 @@ dtopen(dev_t dev, int flags, int mode, struct proc *p)
> >  	if (sc == NULL)
> >  		return ENOMEM;
> >  
> > -	/* no sleep after this point */
> >  	if (dtlookup(unit) != NULL) {
> >  		free(sc, M_DEVBUF, sizeof(*sc));
> >  		return EBUSY;
> >  	}
> >  
> > +	for (i = 0; i < ncpusfound; i++) {
> > +		dtev = mallocarray(DT_EVTRING_SIZE, sizeof(*dtev), M_DT,
> > +				   M_WAITOK|M_CANFAIL|M_ZERO);
> > +		if (dtev == NULL)
> > +			break;
> > +		sc->ds_ring[i].dr_ring = dtev;
> > +	}
> > +	if (i < ncpusfound) {
> > +		for (i--; i >= 0; i--) {
> > +			dtev = sc->ds_ring[i].dr_ring;
> > +			free(dtev, M_DT, DT_EVTRING_SIZE * sizeof(*dtev));
> > +		}
> > +		free(sc, M_DEVBUF, sizeof(*sc));
> > +		return ENOMEM;
> > +	}
> > +
> >  	sc->ds_unit = unit;
> >  	sc->ds_pid = p->p_p->ps_pid;
> >  	TAILQ_INIT(&sc->ds_pcbs);
> > -	mtx_init(&sc->ds_mtx, IPL_HIGH);
> >  	sc->ds_evtcnt = 0;
> >  	sc->ds_readevt = 0;
> >  	sc->ds_dropevt = 0;
> > @@ -206,7 +240,8 @@ int
> >  dtclose(dev_t dev, int flags, int mode, struct proc *p)
> >  {
> >  	struct dt_softc *sc;
> > -	int unit = minor(dev);
> > +	struct dt_evt *dtev;
> > +	int i, unit = minor(dev);
> >  
> >  	sc = dtlookup(unit);
> >  	KASSERT(sc != NULL);
> > @@ -218,6 +253,10 @@ dtclose(dev_t dev, int flags, int mode, struct proc *p)
> >  	dt_pcb_purge(&sc->ds_pcbs);
> >  	softintr_disestablish(sc->ds_si);
> >  
> > +	for (i = 0; i < ncpusfound; i++) {
> > +		dtev = sc->ds_ring[i].dr_ring;
> > +		free(dtev, M_DT, DT_EVTRING_SIZE * sizeof(*dtev));
> > +	}
> >  	free(sc, M_DEVBUF, sizeof(*sc));
> >  
> >  	return 0;
> > @@ -227,8 +266,8 @@ int
> >  dtread(dev_t dev, struct uio *uio, int flags)
> >  {
> >  	struct dt_softc *sc;
> > -	struct dt_pcb *dp;
> > -	int error = 0, unit = minor(dev);
> > +	struct dt_ringbuf *dr;
> > +	int i, error = 0, unit = minor(dev);
> >  	size_t count, max, read = 0;
> >  	uint64_t dropped = 0;
> >  
> > @@ -239,9 +278,9 @@ dtread(dev_t dev, struct uio *uio, int flags)
> >  	if (max < 1)
> >  		return (EMSGSIZE);
> >  
> > -	while (!sc->ds_evtcnt) {
> > +	while (!atomic_load_int(&sc->ds_evtcnt)) {
> >  		sleep_setup(sc, PWAIT | PCATCH, "dtread");
> > -		error = sleep_finish(0, !sc->ds_evtcnt);
> > +		error = sleep_finish(0, !atomic_load_int(&sc->ds_evtcnt));
> >  		if (error == EINTR || error == ERESTART)
> >  			break;
> >  	}
> > @@ -249,9 +288,10 @@ dtread(dev_t dev, struct uio *uio, int flags)
> >  		return error;
> >  
> >  	KERNEL_ASSERT_LOCKED();
> > -	TAILQ_FOREACH(dp, &sc->ds_pcbs, dp_snext) {
> > +	for (i = 0; i < ncpusfound; i++) {
> >  		count = 0;
> > -		error = dt_pcb_ring_copy(dp, uio, max, &count, &dropped);
> > +		dr = &sc->ds_ring[i];
> > +		error = dt_ring_copy(dr, uio, max, &count, &dropped);
> >  		if (error && count == 0)
> >  			break;
> >  
> > @@ -261,11 +301,9 @@ dtread(dev_t dev, struct uio *uio, int flags)
> >  			break;
> >  	}
> >  
> > -	mtx_enter(&sc->ds_mtx);
> > -	sc->ds_evtcnt -= read;
> > -	sc->ds_readevt += read;
> > -	sc->ds_dropevt += dropped;
> > -	mtx_leave(&sc->ds_mtx);
> > +	atomic_add_int(&sc->ds_readevt, read);
> > +	atomic_add_int(&sc->ds_dropevt, dropped);
> > +	atomic_sub_int(&sc->ds_evtcnt, read);
> >  
> >  	return error;
> >  }
> > @@ -434,10 +472,8 @@ dt_ioctl_get_args(struct dt_softc *sc, struct dtioc_arg *dtar)
> >  int
> >  dt_ioctl_get_stats(struct dt_softc *sc, struct dtioc_stat *dtst)
> >  {
> > -	mtx_enter(&sc->ds_mtx);
> > -	dtst->dtst_readevt = sc->ds_readevt;
> > -	dtst->dtst_dropevt = sc->ds_dropevt;
> > -	mtx_leave(&sc->ds_mtx);
> > +	dtst->dtst_readevt = atomic_load_int(&sc->ds_readevt);
> > +	dtst->dtst_dropevt = atomic_load_int(&sc->ds_dropevt);
> >  
> >  	return 0;
> >  }
> > @@ -637,28 +673,16 @@ dt_pcb_alloc(struct dt_probe *dtp, struct dt_softc *sc)
> >  
> >  	dp = malloc(sizeof(*dp), M_DT, M_WAITOK|M_CANFAIL|M_ZERO);
> >  	if (dp == NULL)
> > -		goto bad;
> > +		return NULL;
> >  
> > -	dp->dp_ring = mallocarray(DT_EVTRING_SIZE, sizeof(*dp->dp_ring), M_DT,
> > -	    M_WAITOK|M_CANFAIL|M_ZERO);
> > -	if (dp->dp_ring == NULL)
> > -		goto bad;
> > -
> > -	mtx_init(&dp->dp_mtx, IPL_HIGH);
> >  	dp->dp_sc = sc;
> >  	dp->dp_dtp = dtp;
> >  	return dp;
> > -bad:
> > -	dt_pcb_free(dp);
> > -	return NULL;
> >  }
> >  
> >  void
> >  dt_pcb_free(struct dt_pcb *dp)
> >  {
> > -	if (dp == NULL)
> > -		return;
> > -	free(dp->dp_ring, M_DT, DT_EVTRING_SIZE * sizeof(*dp->dp_ring));
> >  	free(dp, M_DT, sizeof(*dp));
> >  }
> >  
> > @@ -681,21 +705,26 @@ dt_pcb_ring_get(struct dt_pcb *dp, int profiling)
> >  {
> >  	struct proc *p = curproc;
> >  	struct dt_evt *dtev;
> > -	int distance;
> > +	int prod, cons, distance;
> > +	struct dt_ringbuf *dr = &dp->dp_sc->ds_ring[cpu_number()];
> >  
> > -	mtx_enter(&dp->dp_mtx);
> > -	distance = dp->dp_prod - dp->dp_cons;
> > +	dr->dr_block = splhigh();
> > +	membar_consumer();
> > +	prod = dr->dr_prod;
> > +	cons = dr->dr_cons;
> > +	distance = prod - cons;
> >  	if (distance == 1 || distance == (1 - DT_EVTRING_SIZE)) {
> > +		splx(dr->dr_block);
> > +
> >  		/* read(2) isn't finished */
> > -		dp->dp_dropevt++;
> > -		mtx_leave(&dp->dp_mtx);
> > +		atomic_inc_int(&dr->dr_dropevt);
> >  		return NULL;
> >  	}
> >  
> >  	/*
> >  	 * Save states in next free event slot.
> >  	 */
> > -	dtev = &dp->dp_ring[dp->dp_cons];
> > +	dtev = &dr->dr_ring[cons];
> >  	memset(dtev, 0, sizeof(*dtev));
> >  
> >  	dtev->dtev_pbn = dp->dp_dtp->dtp_pbn;
> > @@ -722,25 +751,27 @@ dt_pcb_ring_get(struct dt_pcb *dp, int profiling)
> >  void
> >  dt_pcb_ring_consume(struct dt_pcb *dp, struct dt_evt *dtev)
> >  {
> > -	MUTEX_ASSERT_LOCKED(&dp->dp_mtx);
> > -	KASSERT(dtev == &dp->dp_ring[dp->dp_cons]);
> > +	struct dt_ringbuf *dr = &dp->dp_sc->ds_ring[cpu_number()];
> >  
> > -	dp->dp_cons = (dp->dp_cons + 1) % DT_EVTRING_SIZE;
> > -	mtx_leave(&dp->dp_mtx);
> > +	KASSERT(dtev == &dr->dr_ring[dr->dr_cons]);
> > +
> > +	dr->dr_cons = (dr->dr_cons + 1) % DT_EVTRING_SIZE;
> > +	membar_producer();
> > +
> > +	splx(dr->dr_block);
> > +
> > +	atomic_inc_int(&dp->dp_sc->ds_evtcnt);
> >  
> > -	mtx_enter(&dp->dp_sc->ds_mtx);
> > -	dp->dp_sc->ds_evtcnt++;
> > -	mtx_leave(&dp->dp_sc->ds_mtx);
> >  	dt_wakeup(dp->dp_sc);
> >  }
> >  
> >  /*
> > - * Copy at most `max' events from `dp', producing the same amount
> > + * Copy at most `max' events from `dr', producing the same amount
> >   * of free slots.
> >   */
> >  int
> > -dt_pcb_ring_copy(struct dt_pcb *dp, struct uio *uio, size_t max,
> > -		size_t *rcvd, uint64_t *dropped)
> > +dt_ring_copy(struct dt_ringbuf *dr, struct uio *uio, size_t max,
> > +	     size_t *rcvd, uint64_t *dropped)
> >  {
> >  	size_t count, copied = 0;
> >  	unsigned int cons, prod;
> > @@ -748,12 +779,10 @@ dt_pcb_ring_copy(struct dt_pcb *dp, struct uio *uio, size_t max,
> >  
> >  	KASSERT(max > 0);
> >  
> > -	mtx_enter(&dp->dp_mtx);
> > -	cons = dp->dp_cons;
> > -	prod = dp->dp_prod;
> > -	*dropped += dp->dp_dropevt;
> > -	dp->dp_dropevt = 0;
> > -	mtx_leave(&dp->dp_mtx);
> > +	membar_consumer();
> > +	cons = dr->dr_cons;
> > +	prod = dr->dr_prod;
> > +	*dropped += atomic_swap_uint(&dr->dr_dropevt, 0);
> >  
> >  	if (cons < prod)
> >  		count = DT_EVTRING_SIZE - prod;
> > @@ -764,7 +793,7 @@ dt_pcb_ring_copy(struct dt_pcb *dp, struct uio *uio, size_t max,
> >  		return 0;
> >  
> >  	count = MIN(count, max);
> > -	error = uiomove(&dp->dp_ring[prod], count * sizeof(struct dt_evt), uio);
> > +	error = uiomove(&dr->dr_ring[prod], count * sizeof(struct dt_evt), uio);
> >  	if (error)
> >  		return error;
> >  	copied += count;
> > @@ -777,7 +806,7 @@ dt_pcb_ring_copy(struct dt_pcb *dp, struct uio *uio, size_t max,
> >  		goto out;
> >  
> >  	count = MIN(cons, (max - copied));
> > -	error = uiomove(&dp->dp_ring[0], count * sizeof(struct dt_evt), uio);
> > +	error = uiomove(&dr->dr_ring[0], count * sizeof(struct dt_evt), uio);
> >  	if (error)
> >  		goto out;
> >  
> > @@ -785,9 +814,8 @@ dt_pcb_ring_copy(struct dt_pcb *dp, struct uio *uio, size_t max,
> >  	prod += count;
> >  
> >  out:
> > -	mtx_enter(&dp->dp_mtx);
> > -	dp->dp_prod = prod;
> > -	mtx_leave(&dp->dp_mtx);
> > +	dr->dr_prod = prod;
> > +	membar_producer();
> >  
> >  	*rcvd = copied;
> >  	return error;
> > diff --git a/sys/dev/dt/dtvar.h b/sys/dev/dt/dtvar.h
> > index 5bb7ab0d7715..07e2b6c2f6df 100644
> > --- a/sys/dev/dt/dtvar.h
> > +++ b/sys/dev/dt/dtvar.h
> > @@ -163,12 +163,6 @@ struct dt_pcb {
> >  	SMR_SLIST_ENTRY(dt_pcb)	 dp_pnext;	/* [K,S] next PCB per probe */
> >  	TAILQ_ENTRY(dt_pcb)	 dp_snext;	/* [K] next PCB per softc */
> >  
> > -	/* Event states ring */
> > -	unsigned int		 dp_prod;	/* [m] read index */
> > -	unsigned int		 dp_cons;	/* [m] write index */
> > -	struct dt_evt		*dp_ring;	/* [m] ring of event states */
> > -	struct mutex		 dp_mtx;
> > -
> >  	struct dt_softc		*dp_sc;		/* [I] related softc */
> >  	struct dt_probe		*dp_dtp;	/* [I] related probe */
> >  	uint64_t		 dp_evtflags;	/* [I] event states to record */
> > @@ -177,9 +171,6 @@ struct dt_pcb {
> >  	struct clockintr	 dp_clockintr;	/* [D] profiling handle */
> >  	uint64_t		 dp_nsecs;	/* [I] profiling period */
> >  	struct cpu_info		*dp_cpu;	/* [I] on which CPU */
> > -
> > -	/* Counters */
> > -	uint64_t		 dp_dropevt;	/* [m] # dropped event */
> >  };
> >  
> >  TAILQ_HEAD(dt_pcb_list, dt_pcb);
> > -- 
> > 2.34.1
> > 
> 
> 
> 
> -- 
> :wq Claudio