Index | Thread | Search

From:
Dave Voutila <dv@sisu.io>
Subject:
Re: multithreading vmd's vionet
To:
Dave Voutila <dv@sisu.io>
Cc:
"tech@openbsd.org" <tech@openbsd.org>, Peter Hessler <phessler@openbsd.org>
Date:
Fri, 16 Feb 2024 14:10:03 -0500

Download raw body.

Thread
Still hoping for some test reports.

Dave Voutila <dv@sisu.io> writes:

> Dave Voutila <dv@sisu.io> writes:
>
>> Now that my rewrite of the virtio network device is in, I wanted to
>> share a new diff for testing.
>
> Updated diff below that applies to the latest tree and includes some
> changes/additions outlined below as well. So far have a positive report
> from phessler@, but since I've made some additions would appreciate more
> tests.
>
> Since I falied to mention last time, one of the problems this
> multithreading solves is starving the RX side of the device when under
> heavy TX demand. If you run iperf3 in bidirectional mode between the
> host and a guest, very rapidly the RX bandwidth drops to 0. By letting
> the RX side run independently, we let the spice flow.
>
>>
>> Still tweaking things and needs some pause/unpause/etc. lifecycle
>> testing, but the general idea is where I want it:
>>
>>  - splits rx, tx, and general event handling into 3 threads so send and
>>    receive processing can happen in parallel
>>  - consequently, extends some of the ipc functions to accept explicit
>>    libevent(3) event bases since each thread needs their own
>>  - uses unidirectional pipes as simple channels, similar to how the vm
>>    process does one-way communication between the vcpu thread and device
>>    threads.
>>
>   - adds pthread rwlocks to protect device configuration state during
>     config register reads/writes by the multiple threads
>
>> A variation of this diff has been tested by some folks already and the
>> general feedback has been it reduces average latency.
>>
>
> diff refs/heads/master refs/heads/vmd-vionet-threads
> commit - 634073900323bdd83ca3b5eb0732098812c0aa24
> commit + 2412c45704e90fb80d01ebc5b030cff1d3d9bc44
> blob - 380b35412d0d7f633bfc983aeed8bf7c4b37d8a0
> blob + c5f1a72f10a32af6c963d6f76dfa8dad459638e9
> --- usr.sbin/vmd/proc.c
> +++ usr.sbin/vmd/proc.c
> @@ -685,10 +685,15 @@ proc_dispatch_null(int fd, struct privsep_proc *p, str
>  /*
>   * imsg helper functions
>   */
> -
>  void
>  imsg_event_add(struct imsgev *iev)
>  {
> +	imsg_event_add2(iev, NULL);
> +}
> +
> +void
> +imsg_event_add2(struct imsgev *iev, struct event_base *ev_base)
> +{
>  	if (iev->handler == NULL) {
>  		imsg_flush(&iev->ibuf);
>  		return;
> @@ -700,6 +705,8 @@ imsg_event_add(struct imsgev *iev)
>
>  	event_del(&iev->ev);
>  	event_set(&iev->ev, iev->ibuf.fd, iev->events, iev->handler, iev->data);
> +	if (ev_base != NULL)
> +		event_base_set(ev_base, &iev->ev);
>  	event_add(&iev->ev, NULL);
>  }
>
> @@ -707,12 +714,20 @@ int
>  imsg_compose_event(struct imsgev *iev, uint16_t type, uint32_t peerid,
>      pid_t pid, int fd, void *data, uint16_t datalen)
>  {
> +	return imsg_compose_event2(iev, type, peerid, pid, fd, data, datalen,
> +	    NULL);
> +}
> +
> +int
> +imsg_compose_event2(struct imsgev *iev, uint16_t type, uint32_t peerid,
> +    pid_t pid, int fd, void *data, uint16_t datalen, struct event_base *ev_base)
> +{
>  	int	ret;
>
>  	if ((ret = imsg_compose(&iev->ibuf, type, peerid,
>  	    pid, fd, data, datalen)) == -1)
>  		return (ret);
> -	imsg_event_add(iev);
> +	imsg_event_add2(iev, ev_base);
>  	return (ret);
>  }
>
> blob - 5e000af95c69d08c93ebec70842314052eff58b9
> blob + 9b81237a1dba5e803dc1b6fbcd52c538b0956698
> --- usr.sbin/vmd/proc.h
> +++ usr.sbin/vmd/proc.h
> @@ -160,8 +160,11 @@ void	 proc_run(struct privsep *, struct privsep_proc *
>  	    struct privsep_proc *, unsigned int,
>  	    void (*)(struct privsep *, struct privsep_proc *, void *), void *);
>  void	 imsg_event_add(struct imsgev *);
> +void	 imsg_event_add2(struct imsgev *, struct event_base *);
>  int	 imsg_compose_event(struct imsgev *, uint16_t, uint32_t,
>  	    pid_t, int, void *, uint16_t);
> +int	 imsg_compose_event2(struct imsgev *, uint16_t, uint32_t,
> +    	    pid_t, int, void *, uint16_t, struct event_base *);
>  int	 imsg_composev_event(struct imsgev *, uint16_t, uint32_t,
>  	    pid_t, int, const struct iovec *, int);
>  int	 proc_compose_imsg(struct privsep *, enum privsep_procid, int,
> blob - d97e5cc113aa0e3f28991f148491c1786a24c661
> blob + d0a107a6061c303178f0fc0739fac344798ec437
> --- usr.sbin/vmd/vioblk.c
> +++ usr.sbin/vmd/vioblk.c
> @@ -167,7 +167,7 @@ vioblk_main(int fd, int fd_vmm)
>  	/* Wire up an async imsg channel. */
>  	log_debug("%s: wiring in async vm event handler (fd=%d)", __func__,
>  		dev.async_fd);
> -	if (vm_device_pipe(&dev, dev_dispatch_vm)) {
> +	if (vm_device_pipe(&dev, dev_dispatch_vm, NULL)) {
>  		ret = EIO;
>  		log_warnx("vm_device_pipe");
>  		goto fail;
> blob - 0de50784bb2ad2d66d486f7a04bc1bfeecd78861
> blob + 83a73cb6500fe8aa802ecb91a2b46c4ac227f8b0
> --- usr.sbin/vmd/vionet.c
> +++ usr.sbin/vmd/vionet.c
> @@ -29,6 +29,8 @@
>  #include <errno.h>
>  #include <event.h>
>  #include <fcntl.h>
> +#include <pthread.h>
> +#include <pthread_np.h>
>  #include <stdlib.h>
>  #include <string.h>
>  #include <unistd.h>
> @@ -49,6 +51,8 @@ struct packet {
>  	size_t	 len;
>  };
>
> +static void *rx_run_loop(void *);
> +static void *tx_run_loop(void *);
>  static int vionet_rx(struct vionet_dev *, int);
>  static ssize_t vionet_rx_copy(struct vionet_dev *, int, const struct iovec *,
>      int, size_t);
> @@ -57,22 +61,37 @@ static ssize_t vionet_rx_zerocopy(struct vionet_dev *,
>  static void vionet_rx_event(int, short, void *);
>  static uint32_t handle_io_read(struct viodev_msg *, struct virtio_dev *,
>      int8_t *);
> -static int handle_io_write(struct viodev_msg *, struct virtio_dev *);
> -static int vionet_notify_tx(struct virtio_dev *);
> -static int vionet_notifyq(struct virtio_dev *);
> -
> +static void handle_io_write(struct viodev_msg *, struct virtio_dev *);
> +static int vionet_tx(struct virtio_dev *);
> +static void vionet_notifyq(struct virtio_dev *);
>  static void dev_dispatch_vm(int, short, void *);
>  static void handle_sync_io(int, short, void *);
> +static void read_pipe_main(int, short, void *);
> +static void read_pipe_rx(int, short, void *);
> +static void read_pipe_tx(int, short, void *);
> +static void vionet_assert_pic_irq(struct virtio_dev *);
> +static void vionet_deassert_pic_irq(struct virtio_dev *);
>
>  /* Device Globals */
>  struct event ev_tap;
>  struct event ev_inject;
> +struct event_base *ev_base_main;
> +struct event_base *ev_base_rx;
> +struct event_base *ev_base_tx;
> +pthread_t rx_thread;
> +pthread_t tx_thread;
> +struct vm_dev_pipe pipe_main;
> +struct vm_dev_pipe pipe_rx;
> +struct vm_dev_pipe pipe_tx;
>  int pipe_inject[2];
>  #define READ	0
>  #define WRITE	1
>  struct iovec iov_rx[VIONET_QUEUE_SIZE];
>  struct iovec iov_tx[VIONET_QUEUE_SIZE];
> +pthread_rwlock_t lock = NULL;		/* Guards device config state. */
>
> +/* Transient reset state used by the main thread to coordinate device reset. */
> +int resetting = 0;
>
>  __dead void
>  vionet_main(int fd, int fd_vmm)
> @@ -176,30 +195,51 @@ vionet_main(int fd, int fd_vmm)
>  		goto fail;
>  	}
>
> +	/* Initialize inter-thread communication channels. */
> +	vm_pipe_init2(&pipe_main, read_pipe_main, &dev);
> +	vm_pipe_init2(&pipe_rx, read_pipe_rx, &dev);
> +	vm_pipe_init2(&pipe_tx, read_pipe_tx, &dev);
> +
> +	/* Initialize RX and TX threads . */
> +	ret = pthread_create(&rx_thread, NULL, rx_run_loop, &dev);
> +	if (ret) {
> +		errno = ret;
> +		log_warn("%s: failed to initialize rx thread", __func__);
> +		goto fail;
> +	}
> +	pthread_set_name_np(rx_thread, "rx");
> +	ret = pthread_create(&tx_thread, NULL, tx_run_loop, &dev);
> +	if (ret) {
> +		errno = ret;
> +		log_warn("%s: failed to initialize tx thread", __func__);
> +		goto fail;
> +	}
> +	pthread_set_name_np(tx_thread, "tx");
> +
> +	/* Initialize our rwlock for guarding shared device state. */
> +	ret = pthread_rwlock_init(&lock, NULL);
> +	if (ret) {
> +		errno = ret;
> +		log_warn("%s: failed to initialize rwlock", __func__);
> +		goto fail;
> +	}
> +
>  	/* Initialize libevent so we can start wiring event handlers. */
> -	event_init();
> +	ev_base_main = event_base_new();
>
> +	/* Add our handler for receiving messages from the RX/TX threads. */
> +	event_base_set(ev_base_main, &pipe_main.read_ev);
> +	event_add(&pipe_main.read_ev, NULL);
> +
>  	/* Wire up an async imsg channel. */
>  	log_debug("%s: wiring in async vm event handler (fd=%d)", __func__,
>  		dev.async_fd);
> -	if (vm_device_pipe(&dev, dev_dispatch_vm)) {
> +	if (vm_device_pipe(&dev, dev_dispatch_vm, ev_base_main)) {
>  		ret = EIO;
>  		log_warnx("vm_device_pipe");
>  		goto fail;
>  	}
>
> -	/* Wire up event handling for the tap fd. */
> -	log_debug("%s: wiring in tap fd handler (fd=%d)", __func__,
> -	    vionet->data_fd);
> -	event_set(&ev_tap, vionet->data_fd, EV_READ | EV_PERSIST,
> -	    vionet_rx_event, &dev);
> -
> -	/* Add an event for injected packets. */
> -	log_debug("%s: wiring in packet injection handler (fd=%d)", __func__,
> -	    pipe_inject[READ]);
> -	event_set(&ev_inject, pipe_inject[READ], EV_READ | EV_PERSIST,
> -	    vionet_rx_event, &dev);
> -
>  	/* Configure our sync channel event handler. */
>  	log_debug("%s: wiring in sync channel handler (fd=%d)", __func__,
>  		dev.sync_fd);
> @@ -207,32 +247,46 @@ vionet_main(int fd, int fd_vmm)
>  	dev.sync_iev.handler = handle_sync_io;
>  	dev.sync_iev.data = &dev;
>  	dev.sync_iev.events = EV_READ;
> -	imsg_event_add(&dev.sync_iev);
> +	imsg_event_add2(&dev.sync_iev, ev_base_main);
>
>  	/* Send a ready message over the sync channel. */
>  	log_debug("%s: telling vm %s device is ready", __func__, vcp->vcp_name);
>  	memset(&msg, 0, sizeof(msg));
>  	msg.type = VIODEV_MSG_READY;
> -	imsg_compose_event(&dev.sync_iev, IMSG_DEVOP_MSG, 0, 0, -1, &msg,
> -	    sizeof(msg));
> +	imsg_compose_event2(&dev.sync_iev, IMSG_DEVOP_MSG, 0, 0, -1, &msg,
> +	    sizeof(msg), ev_base_main);
>
>  	/* Send a ready message over the async channel. */
>  	log_debug("%s: sending async ready message", __func__);
> -	ret = imsg_compose_event(&dev.async_iev, IMSG_DEVOP_MSG, 0, 0, -1,
> -	    &msg, sizeof(msg));
> +	ret = imsg_compose_event2(&dev.async_iev, IMSG_DEVOP_MSG, 0, 0, -1,
> +	    &msg, sizeof(msg), ev_base_main);
>  	if (ret == -1) {
>  		log_warnx("%s: failed to send async ready message!", __func__);
>  		goto fail;
>  	}
>
>  	/* Engage the event loop! */
> -	ret = event_dispatch();
> +	ret = event_base_dispatch(ev_base_main);
> +	event_base_free(ev_base_main);
>
> +	/* Try stopping the rx & tx threads cleanly by messaging them. */
> +	vm_pipe_send(&pipe_rx, VIRTIO_THREAD_STOP);
> +	vm_pipe_send(&pipe_tx, VIRTIO_THREAD_STOP);
> +
> +	/* Wait for threads to stop. */
> +	pthread_join(rx_thread, NULL);
> +	pthread_join(tx_thread, NULL);
> +	pthread_rwlock_destroy(&lock);
> +
>  	/* Cleanup */
>  	if (ret == 0) {
>  		close_fd(dev.sync_fd);
>  		close_fd(dev.async_fd);
>  		close_fd(vionet->data_fd);
> +		close_fd(pipe_main.read);
> +		close_fd(pipe_main.write);
> +		close_fd(pipe_rx.write);
> +		close_fd(pipe_tx.write);
>  		close_fd(pipe_inject[READ]);
>  		close_fd(pipe_inject[WRITE]);
>  		_exit(ret);
> @@ -253,7 +307,8 @@ fail:
>  	close_fd(pipe_inject[WRITE]);
>  	if (vionet != NULL)
>  		close_fd(vionet->data_fd);
> -
> +	if (lock != NULL)
> +		pthread_rwlock_destroy(&lock);
>  	_exit(ret);
>  }
>
> @@ -329,9 +384,10 @@ vionet_rx(struct vionet_dev *dev, int fd)
>  	struct iovec *iov;
>  	int notify = 0;
>  	ssize_t sz;
> +	uint8_t status = 0;
>
> -	if (!(dev->cfg.device_status
> -	    & VIRTIO_CONFIG_DEVICE_STATUS_DRIVER_OK)) {
> +	status = dev->cfg.device_status & VIRTIO_CONFIG_DEVICE_STATUS_DRIVER_OK;
> +	if (status != VIRTIO_CONFIG_DEVICE_STATUS_DRIVER_OK) {
>  		log_warnx("%s: driver not ready", __func__);
>  		return (0);
>  	}
> @@ -453,16 +509,12 @@ vionet_rx(struct vionet_dev *dev, int fd)
>  	if (idx != vq_info->last_avail &&
>  	    !(avail->flags & VRING_AVAIL_F_NO_INTERRUPT)) {
>  		notify = 1;
> -		dev->cfg.isr_status |= 1;
>  	}
>
>  	vq_info->last_avail = idx;
>  	return (notify);
>  reset:
> -	log_warnx("%s: requesting device reset", __func__);
> -	dev->cfg.device_status |= DEVICE_NEEDS_RESET;
> -	dev->cfg.isr_status |= VIRTIO_CONFIG_ISR_CONFIG_CHANGE;
> -	return (1);
> +	return (-1);
>  }
>
>  /*
> @@ -603,27 +655,49 @@ vionet_rx_zerocopy(struct vionet_dev *dev, int fd, con
>  static void
>  vionet_rx_event(int fd, short event, void *arg)
>  {
> -	struct virtio_dev *dev = (struct virtio_dev *)arg;
> +	struct virtio_dev	*dev = (struct virtio_dev *)arg;
> +	struct vionet_dev	*vionet = &dev->vionet;
> +	int			 ret = 0;
>
>  	if (!(event & EV_READ))
>  		fatalx("%s: invalid event type", __func__);
>
> -	if (vionet_rx(&dev->vionet, fd) > 0)
> -		virtio_assert_pic_irq(dev, 0);
> +	pthread_rwlock_rdlock(&lock);
> +	ret = vionet_rx(vionet, fd);
> +	pthread_rwlock_unlock(&lock);
> +
> +	if (ret == 0) {
> +		/* Nothing to do. */
> +		return;
> +	}
> +
> +	pthread_rwlock_wrlock(&lock);
> +	if (ret == 1) {
> +		/* Notify the driver. */
> +		vionet->cfg.isr_status |= 1;
> +	} else {
> +		/* Need a reset. Something went wrong. */
> +		log_warnx("%s: requesting device reset", __func__);
> +		vionet->cfg.device_status |= DEVICE_NEEDS_RESET;
> +		vionet->cfg.isr_status |= VIRTIO_CONFIG_ISR_CONFIG_CHANGE;
> +	}
> +	pthread_rwlock_unlock(&lock);
> +
> +	vm_pipe_send(&pipe_main, VIRTIO_RAISE_IRQ);
>  }
>
> -static int
> +static void
>  vionet_notifyq(struct virtio_dev *dev)
>  {
> -	struct vionet_dev *vionet = &dev->vionet;
> +	struct vionet_dev	*vionet = &dev->vionet;
>
>  	switch (vionet->cfg.queue_notify) {
>  	case RXQ:
> -		event_add(&ev_tap, NULL);
> -		event_add(&ev_inject, NULL);
> +		vm_pipe_send(&pipe_rx, VIRTIO_NOTIFY);
>  		break;
>  	case TXQ:
> -		return vionet_notify_tx(dev);
> +		vm_pipe_send(&pipe_tx, VIRTIO_NOTIFY);
> +		break;
>  	default:
>  		/*
>  		 * Catch the unimplemented queue ID 2 (control queue) as
> @@ -633,12 +707,10 @@ vionet_notifyq(struct virtio_dev *dev)
>  		    __func__, vionet->cfg.queue_notify);
>  		break;
>  	}
> -
> -	return (0);
>  }
>
>  static int
> -vionet_notify_tx(struct virtio_dev *dev)
> +vionet_tx(struct virtio_dev *dev)
>  {
>  	uint16_t idx, hdr_idx;
>  	size_t chain_len, iov_cnt;
> @@ -653,9 +725,11 @@ vionet_notify_tx(struct virtio_dev *dev)
>  	struct ether_header *eh;
>  	struct iovec *iov;
>  	struct packet pkt;
> +	uint8_t status = 0;
>
> -	if (!(vionet->cfg.device_status
> -	    & VIRTIO_CONFIG_DEVICE_STATUS_DRIVER_OK)) {
> +	status = vionet->cfg.device_status
> +	    & VIRTIO_CONFIG_DEVICE_STATUS_DRIVER_OK;
> +	if (status != VIRTIO_CONFIG_DEVICE_STATUS_DRIVER_OK) {
>  		log_warnx("%s: driver not ready", __func__);
>  		return (0);
>  	}
> @@ -806,18 +880,14 @@ drop:
>  	}
>
>  	if (idx != vq_info->last_avail &&
> -	    !(avail->flags & VRING_AVAIL_F_NO_INTERRUPT)) {
> +	    !(avail->flags & VRING_AVAIL_F_NO_INTERRUPT))
>  		notify = 1;
> -		vionet->cfg.isr_status |= 1;
> -	}
>
> +
>  	vq_info->last_avail = idx;
>  	return (notify);
>  reset:
> -	log_warnx("%s: requesting device reset", __func__);
> -	vionet->cfg.device_status |= DEVICE_NEEDS_RESET;
> -	vionet->cfg.isr_status |= VIRTIO_CONFIG_ISR_CONFIG_CHANGE;
> -	return (1);
> +	return (-1);
>  }
>
>  static void
> @@ -830,6 +900,7 @@ dev_dispatch_vm(int fd, short event, void *arg)
>  	struct imsg	 	 imsg;
>  	ssize_t			 n = 0;
>  	int			 verbose;
> +	uint8_t			 status = 0;
>
>  	if (dev == NULL)
>  		fatalx("%s: missing vionet pointer", __func__);
> @@ -841,7 +912,7 @@ dev_dispatch_vm(int fd, short event, void *arg)
>  			/* this pipe is dead, so remove the event handler */
>  			log_debug("%s: pipe dead (EV_READ)", __func__);
>  			event_del(&iev->ev);
> -			event_loopexit(NULL);
> +			event_base_loopexit(ev_base_main, NULL);
>  			return;
>  		}
>  	}
> @@ -853,7 +924,7 @@ dev_dispatch_vm(int fd, short event, void *arg)
>  			/* this pipe is dead, so remove the event handler */
>  			log_debug("%s: pipe dead (EV_WRITE)", __func__);
>  			event_del(&iev->ev);
> -			event_loopexit(NULL);
> +			event_base_loopexit(ev_base_main, NULL);
>  			return;
>  		}
>  	}
> @@ -873,16 +944,16 @@ dev_dispatch_vm(int fd, short event, void *arg)
>  			break;
>  		case IMSG_VMDOP_PAUSE_VM:
>  			log_debug("%s: pausing", __func__);
> -			event_del(&ev_tap);
> -			event_del(&ev_inject);
> +			vm_pipe_send(&pipe_rx, VIRTIO_THREAD_PAUSE);
>  			break;
>  		case IMSG_VMDOP_UNPAUSE_VM:
>  			log_debug("%s: unpausing", __func__);
> -			if (vionet->cfg.device_status
> -			    & VIRTIO_CONFIG_DEVICE_STATUS_DRIVER_OK) {
> -				event_add(&ev_tap, NULL);
> -				event_add(&ev_inject, NULL);
> -			}
> +			pthread_rwlock_rdlock(&lock);
> +			status = vionet->cfg.device_status &
> +			    VIRTIO_CONFIG_DEVICE_STATUS_DRIVER_OK;
> +			pthread_rwlock_unlock(&lock);
> +			if (status)
> +				vm_pipe_send(&pipe_rx, VIRTIO_THREAD_START);
>  			break;
>  		case IMSG_CTL_VERBOSE:
>  			IMSG_SIZE_CHECK(&imsg, &verbose);
> @@ -892,7 +963,7 @@ dev_dispatch_vm(int fd, short event, void *arg)
>  		}
>  		imsg_free(&imsg);
>  	}
> -	imsg_event_add(iev);
> +	imsg_event_add2(iev, ev_base_main);
>  }
>
>  /*
> @@ -917,7 +988,7 @@ handle_sync_io(int fd, short event, void *arg)
>  			/* this pipe is dead, so remove the event handler */
>  			log_debug("%s: pipe dead (EV_READ)", __func__);
>  			event_del(&iev->ev);
> -			event_loopexit(NULL);
> +			event_base_loopexit(ev_base_main, NULL);
>  			return;
>  		}
>  	}
> @@ -929,7 +1000,7 @@ handle_sync_io(int fd, short event, void *arg)
>  			/* this pipe is dead, so remove the event handler */
>  			log_debug("%s: pipe dead (EV_WRITE)", __func__);
>  			event_del(&iev->ev);
> -			event_loopexit(NULL);
> +			event_base_loopexit(ev_base_main, NULL);
>  			return;
>  		}
>  	}
> @@ -959,34 +1030,33 @@ handle_sync_io(int fd, short event, void *arg)
>  			msg.data = handle_io_read(&msg, dev, &intr);
>  			msg.data_valid = 1;
>  			msg.state = intr;
> -			imsg_compose_event(iev, IMSG_DEVOP_MSG, 0, 0, -1, &msg,
> -			    sizeof(msg));
> +			imsg_compose_event2(iev, IMSG_DEVOP_MSG, 0, 0, -1, &msg,
> +			    sizeof(msg), ev_base_main);
>  			break;
>  		case VIODEV_MSG_IO_WRITE:
>  			/* Write IO: no reply needed */
> -			if (handle_io_write(&msg, dev) == 1)
> -				virtio_assert_pic_irq(dev, 0);
> +			handle_io_write(&msg, dev);
>  			break;
>  		case VIODEV_MSG_SHUTDOWN:
>  			event_del(&dev->sync_iev.ev);
> -			event_del(&ev_tap);
> -			event_del(&ev_inject);
> -			event_loopbreak();
> +			event_base_loopbreak(ev_base_main);
>  			return;
>  		default:
>  			fatalx("%s: invalid msg type %d", __func__, msg.type);
>  		}
>  	}
> -	imsg_event_add(iev);
> +	imsg_event_add2(iev, ev_base_main);
>  }
>
> -static int
> +static void
>  handle_io_write(struct viodev_msg *msg, struct virtio_dev *dev)
>  {
> -	struct vionet_dev *vionet = &dev->vionet;
> -	uint32_t data = msg->data;
> -	int intr = 0;
> +	struct vionet_dev	*vionet = &dev->vionet;
> +	uint32_t		 data = msg->data;
> +	int			 pause_devices = 0;
>
> +	pthread_rwlock_wrlock(&lock);
> +
>  	switch (msg->reg) {
>  	case VIRTIO_CONFIG_DEVICE_FEATURES:
>  	case VIRTIO_CONFIG_QUEUE_SIZE:
> @@ -1007,35 +1077,25 @@ handle_io_write(struct viodev_msg *msg, struct virtio_
>  		break;
>  	case VIRTIO_CONFIG_QUEUE_NOTIFY:
>  		vionet->cfg.queue_notify = data;
> -		if (vionet_notifyq(dev))
> -			intr = 1;
> +		vionet_notifyq(dev);
>  		break;
>  	case VIRTIO_CONFIG_DEVICE_STATUS:
> -		vionet->cfg.device_status = data;
> -		if (vionet->cfg.device_status == 0) {
> -			vionet->cfg.guest_feature = 0;
> -
> -			vionet->cfg.queue_pfn = 0;
> -			vionet_update_qa(vionet);
> -
> -			vionet->cfg.queue_size = 0;
> -			vionet_update_qs(vionet);
> -
> -			vionet->cfg.queue_select = 0;
> -			vionet->cfg.queue_notify = 0;
> -			vionet->cfg.isr_status = 0;
> -			vionet->vq[RXQ].last_avail = 0;
> -			vionet->vq[RXQ].notified_avail = 0;
> -			vionet->vq[TXQ].last_avail = 0;
> -			vionet->vq[TXQ].notified_avail = 0;
> -			virtio_deassert_pic_irq(dev, msg->vcpu);
> -
> -			event_del(&ev_tap);
> -			event_del(&ev_inject);
> +		if (data == 0) {
> +			resetting = 2;	/* Wait on two acks: rx & tx */
> +			pause_devices = 1;
> +		} else {
> +			// XXX is this correct?
> +			vionet->cfg.device_status = data;
>  		}
>  		break;
>  	}
> -	return (intr);
> +
> +	pthread_rwlock_unlock(&lock);
> +	if (pause_devices) {
> +		vionet_deassert_pic_irq(dev);
> +		vm_pipe_send(&pipe_rx, VIRTIO_THREAD_PAUSE);
> +		vm_pipe_send(&pipe_tx, VIRTIO_THREAD_PAUSE);
> +	}
>  }
>
>  static uint32_t
> @@ -1044,6 +1104,8 @@ handle_io_read(struct viodev_msg *msg, struct virtio_d
>  	struct vionet_dev *vionet = &dev->vionet;
>  	uint32_t data;
>
> +	pthread_rwlock_rdlock(&lock);
> +
>  	switch (msg->reg) {
>  	case VIRTIO_CONFIG_DEVICE_CONFIG_NOMSI:
>  	case VIRTIO_CONFIG_DEVICE_CONFIG_NOMSI + 1:
> @@ -1076,14 +1138,263 @@ handle_io_read(struct viodev_msg *msg, struct virtio_d
>  		data = vionet->cfg.device_status;
>  		break;
>  	case VIRTIO_CONFIG_ISR_STATUS:
> +		pthread_rwlock_unlock(&lock);
> +		pthread_rwlock_wrlock(&lock);
>  		data = vionet->cfg.isr_status;
>  		vionet->cfg.isr_status = 0;
>  		if (intr != NULL)
>  			*intr = INTR_STATE_DEASSERT;
>  		break;
>  	default:
> -		return (0xFFFFFFFF);
> +		data = 0xFFFFFFFF;
>  	}
>
> +	pthread_rwlock_unlock(&lock);
>  	return (data);
>  }
> +
> +/*
> + * Handle the rx side processing, communicating to the main thread via pipe.
> + */
> +static void *
> +rx_run_loop(void *arg)
> +{
> +	struct virtio_dev	*dev = (struct virtio_dev *)arg;
> +	struct vionet_dev	*vionet = &dev->vionet;
> +	int			 ret;
> +
> +	ev_base_rx = event_base_new();
> +
> +	/* Wire up event handling for the tap fd. */
> +	event_set(&ev_tap, vionet->data_fd, EV_READ | EV_PERSIST,
> +	    vionet_rx_event, dev);
> +	event_base_set(ev_base_rx, &ev_tap);
> +
> +	/* Wire up event handling for the packet injection pipe. */
> +	event_set(&ev_inject, pipe_inject[READ], EV_READ | EV_PERSIST,
> +	    vionet_rx_event, dev);
> +	event_base_set(ev_base_rx, &ev_inject);
> +
> +	/* Wire up event handling for our inter-thread communication channel. */
> +	event_base_set(ev_base_rx, &pipe_rx.read_ev);
> +	event_add(&pipe_rx.read_ev, NULL);
> +
> +	/* Begin our event loop with our channel event active. */
> +	ret = event_base_dispatch(ev_base_rx);
> +	event_base_free(ev_base_rx);
> +
> +	log_debug("%s: exiting (%d)", __func__, ret);
> +
> +	close_fd(pipe_rx.read);
> +	close_fd(pipe_inject[READ]);
> +
> +	return (NULL);
> +}
> +
> +/*
> + * Handle the tx side processing, communicating to the main thread via pipe.
> + */
> +static void *
> +tx_run_loop(void *arg)
> +{
> +	int			 ret;
> +
> +	ev_base_tx = event_base_new();
> +
> +	/* Wire up event handling for our inter-thread communication channel. */
> +	event_base_set(ev_base_tx, &pipe_tx.read_ev);
> +	event_add(&pipe_tx.read_ev, NULL);
> +
> +	/* Begin our event loop with our channel event active. */
> +	ret = event_base_dispatch(ev_base_tx);
> +	event_base_free(ev_base_tx);
> +
> +	log_debug("%s: exiting (%d)", __func__, ret);
> +
> +	close_fd(pipe_tx.read);
> +
> +	return (NULL);
> +}
> +
> +/*
> + * Read events sent by the main thread to the rx thread.
> + */
> +static void
> +read_pipe_rx(int fd, short event, void *arg)
> +{
> +	enum pipe_msg_type	msg;
> +
> +	if (!(event & EV_READ))
> +		fatalx("%s: invalid event type", __func__);
> +
> +	msg = vm_pipe_recv(&pipe_rx);
> +
> +	switch (msg) {
> +	case VIRTIO_NOTIFY:
> +	case VIRTIO_THREAD_START:
> +		event_add(&ev_tap, NULL);
> +		event_add(&ev_inject, NULL);
> +		break;
> +	case VIRTIO_THREAD_PAUSE:
> +		event_del(&ev_tap);
> +		event_del(&ev_inject);
> +		vm_pipe_send(&pipe_main, VIRTIO_THREAD_ACK);
> +		break;
> +	case VIRTIO_THREAD_STOP:
> +		event_del(&ev_tap);
> +		event_del(&ev_inject);
> +		event_base_loopexit(ev_base_rx, NULL);
> +		break;
> +	default:
> +		fatalx("%s: invalid channel message: %d", __func__, msg);
> +	}
> +}
> +
> +/*
> + * Read events sent by the main thread to the tx thread.
> + */
> +static void
> +read_pipe_tx(int fd, short event, void *arg)
> +{
> +	struct virtio_dev	*dev = (struct virtio_dev*)arg;
> +	struct vionet_dev	*vionet = &dev->vionet;
> +	enum pipe_msg_type	 msg;
> +	int			 ret = 0;
> +
> +	if (!(event & EV_READ))
> +		fatalx("%s: invalid event type", __func__);
> +
> +	msg = vm_pipe_recv(&pipe_tx);
> +
> +	switch (msg) {
> +	case VIRTIO_NOTIFY:
> +		pthread_rwlock_rdlock(&lock);
> +		ret = vionet_tx(dev);
> +		pthread_rwlock_unlock(&lock);
> +		break;
> +	case VIRTIO_THREAD_START:
> +		/* Ignore Start messages. */
> +		break;
> +	case VIRTIO_THREAD_PAUSE:
> +		/*
> +		 * Nothing to do when pausing on the tx side, but ACK so main
> +		 * thread knows we're not transmitting.
> +		 */
> +		vm_pipe_send(&pipe_main, VIRTIO_THREAD_ACK);
> +		break;
> +	case VIRTIO_THREAD_STOP:
> +		event_base_loopexit(ev_base_tx, NULL);
> +		break;
> +	default:
> +		fatalx("%s: invalid channel message: %d", __func__, msg);
> +	}
> +
> +	if (ret == 0) {
> +		/* No notification needed. Return early. */
> +		return;
> +	}
> +
> +	pthread_rwlock_wrlock(&lock);
> +	if (ret == 1) {
> +		/* Notify the driver. */
> +		vionet->cfg.isr_status |= 1;
> +	} else {
> +		/* Need a reset. Something went wrong. */
> +		log_warnx("%s: requesting device reset", __func__);
> +		vionet->cfg.device_status |= DEVICE_NEEDS_RESET;
> +		vionet->cfg.isr_status |= VIRTIO_CONFIG_ISR_CONFIG_CHANGE;
> +	}
> +	pthread_rwlock_unlock(&lock);
> +
> +	vm_pipe_send(&pipe_main, VIRTIO_RAISE_IRQ);
> +}
> +
> +/*
> + * Read events sent by the rx/tx threads to the main thread.
> + */
> +static void
> +read_pipe_main(int fd, short event, void *arg)
> +{
> +	struct virtio_dev	*dev = (struct virtio_dev*)arg;
> +	struct vionet_dev	*vionet = &dev->vionet;
> +	enum pipe_msg_type	 msg;
> +
> +	if (!(event & EV_READ))
> +		fatalx("%s: invalid event type", __func__);
> +
> +	msg = vm_pipe_recv(&pipe_main);
> +	switch (msg) {
> +	case VIRTIO_RAISE_IRQ:
> +		vionet_assert_pic_irq(dev);
> +		break;
> +	case VIRTIO_THREAD_ACK:
> +		resetting--;
> +		if (resetting == 0) {
> +			log_debug("%s: resetting virtio network device %d",
> +			    __func__, vionet->idx);
> +
> +			pthread_rwlock_wrlock(&lock);
> +			vionet->cfg.device_status = 0;
> +			vionet->cfg.guest_feature = 0;
> +			vionet->cfg.queue_pfn = 0;
> +			vionet_update_qa(vionet);
> +			vionet->cfg.queue_size = 0;
> +			vionet_update_qs(vionet);
> +			vionet->cfg.queue_select = 0;
> +			vionet->cfg.queue_notify = 0;
> +			vionet->cfg.isr_status = 0;
> +			vionet->vq[RXQ].last_avail = 0;
> +			vionet->vq[RXQ].notified_avail = 0;
> +			vionet->vq[TXQ].last_avail = 0;
> +			vionet->vq[TXQ].notified_avail = 0;
> +			pthread_rwlock_unlock(&lock);
> +		}
> +		break;
> +	default:
> +		fatalx("%s: invalid channel msg: %d", __func__, msg);
> +	}
> +}
> +
> +/*
> + * Message the vm process asking to raise the irq. Must be called from the main
> + * thread.
> + */
> +static void
> +vionet_assert_pic_irq(struct virtio_dev *dev)
> +{
> +	struct viodev_msg	msg;
> +	int			ret;
> +
> +	memset(&msg, 0, sizeof(msg));
> +	msg.irq = dev->irq;
> +	msg.vcpu = 0; // XXX
> +	msg.type = VIODEV_MSG_KICK;
> +	msg.state = INTR_STATE_ASSERT;
> +
> +	ret = imsg_compose_event2(&dev->async_iev, IMSG_DEVOP_MSG, 0, 0, -1,
> +	    &msg, sizeof(msg), ev_base_main);
> +	if (ret == -1)
> +		log_warnx("%s: failed to assert irq %d", __func__, dev->irq);
> +}
> +
> +/*
> + * Message the vm process asking to lower the irq. Must be called from the main
> + * thread.
> + */
> +static void
> +vionet_deassert_pic_irq(struct virtio_dev *dev)
> +{
> +	struct viodev_msg	msg;
> +	int			ret;
> +
> +	memset(&msg, 0, sizeof(msg));
> +	msg.irq = dev->irq;
> +	msg.vcpu = 0; // XXX
> +	msg.type = VIODEV_MSG_KICK;
> +	msg.state = INTR_STATE_DEASSERT;
> +
> +	ret = imsg_compose_event2(&dev->async_iev, IMSG_DEVOP_MSG, 0, 0, -1,
> +	    &msg, sizeof(msg), ev_base_main);
> +	if (ret == -1)
> +		log_warnx("%s: failed to assert irq %d", __func__, dev->irq);
> +}
> blob - 4b69f05e8a10b4276ae103ac2e386fa1289a670e
> blob + 69ce382422267e8b4cc232756bec5166aad29bf6
> --- usr.sbin/vmd/virtio.c
> +++ usr.sbin/vmd/virtio.c
> @@ -1424,7 +1424,7 @@ virtio_dev_launch(struct vmd_vm *vm, struct virtio_dev
>  		 */
>  		dev->sync_fd = sync_fds[0];
>  		dev->async_fd = async_fds[0];
> -		vm_device_pipe(dev, virtio_dispatch_dev);
> +		vm_device_pipe(dev, virtio_dispatch_dev, NULL);
>  	} else {
>  		/* Child */
>  		close_fd(async_fds[0]);
> @@ -1502,7 +1502,8 @@ err:
>   * Initialize an async imsg channel for a virtio device.
>   */
>  int
> -vm_device_pipe(struct virtio_dev *dev, void (*cb)(int, short, void *))
> +vm_device_pipe(struct virtio_dev *dev, void (*cb)(int, short, void *),
> +    struct event_base *ev_base)
>  {
>  	struct imsgev *iev = &dev->async_iev;
>  	int fd = dev->async_fd;
> @@ -1514,7 +1515,7 @@ vm_device_pipe(struct virtio_dev *dev, void (*cb)(int,
>  	iev->handler = cb;
>  	iev->data = dev;
>  	iev->events = EV_READ;
> -	imsg_event_add(iev);
> +	imsg_event_add2(iev, ev_base);
>
>  	return (0);
>  }
> blob - 1e51bd731a049e59c6fc4dac5f2f9486a7359649
> blob + acd58494dd145ed17245af4026a94531f5cfeda9
> --- usr.sbin/vmd/virtio.h
> +++ usr.sbin/vmd/virtio.h
> @@ -343,7 +343,8 @@ int virtio_restore(int, struct vmd_vm *, int, int[][VM
>      int *);
>  const char *virtio_reg_name(uint8_t);
>  uint32_t vring_size(uint32_t);
> -int vm_device_pipe(struct virtio_dev *, void (*)(int, short, void *));
> +int vm_device_pipe(struct virtio_dev *, void (*)(int, short, void *),
> +    struct event_base *);
>  int virtio_pci_io(int, uint16_t, uint32_t *, uint8_t *, void *, uint8_t);
>  void virtio_assert_pic_irq(struct virtio_dev *, int);
>  void virtio_deassert_pic_irq(struct virtio_dev *, int);
> blob - 772d044604d742116ff31e8e7e9ae4db059f6bf4
> blob + 2176de21c84445f112da6999fe43764be7e76e23
> --- usr.sbin/vmd/vm.c
> +++ usr.sbin/vmd/vm.c
> @@ -2436,18 +2436,25 @@ translate_gva(struct vm_exit* exit, uint64_t va, uint6
>  	return (0);
>  }
>
> +void
> +vm_pipe_init(struct vm_dev_pipe *p, void (*cb)(int, short, void *))
> +{
> +	vm_pipe_init2(p, cb, NULL);
> +}
> +
>  /*
> - * vm_pipe_init
> + * vm_pipe_init2
>   *
>   * Initialize a vm_dev_pipe, setting up its file descriptors and its
> - * event structure with the given callback.
> + * event structure with the given callback and argument.
>   *
>   * Parameters:
>   *  p: pointer to vm_dev_pipe struct to initizlize
>   *  cb: callback to use for READ events on the read end of the pipe
> + *  arg: pointer to pass to the callback on event trigger
>   */
>  void
> -vm_pipe_init(struct vm_dev_pipe *p, void (*cb)(int, short, void *))
> +vm_pipe_init2(struct vm_dev_pipe *p, void (*cb)(int, short, void *), void *arg)
>  {
>  	int ret;
>  	int fds[2];
> @@ -2461,13 +2468,14 @@ vm_pipe_init(struct vm_dev_pipe *p, void (*cb)(int, sh
>  	p->read = fds[0];
>  	p->write = fds[1];
>
> -	event_set(&p->read_ev, p->read, EV_READ | EV_PERSIST, cb, NULL);
> +	event_set(&p->read_ev, p->read, EV_READ | EV_PERSIST, cb, arg);
>  }
>
>  /*
>   * vm_pipe_send
>   *
> - * Send a message to an emulated device vie the provided vm_dev_pipe.
> + * Send a message to an emulated device vie the provided vm_dev_pipe. This
> + * relies on the fact sizeof(msg) < PIPE_BUF to ensure atomic writes.
>   *
>   * Parameters:
>   *  p: pointer to initialized vm_dev_pipe
> @@ -2486,7 +2494,8 @@ vm_pipe_send(struct vm_dev_pipe *p, enum pipe_msg_type
>   * vm_pipe_recv
>   *
>   * Receive a message for an emulated device via the provided vm_dev_pipe.
> - * Returns the message value, otherwise will exit on failure.
> + * Returns the message value, otherwise will exit on failure. This relies on
> + * the fact sizeof(enum pipe_msg_type) < PIPE_BUF for atomic reads.
>   *
>   * Parameters:
>   *  p: pointer to initialized vm_dev_pipe
> blob - 19995e951c71db93cf3c794147bfdf3ea1bc6616
> blob + bd3b08973c86c3d799652569b1a052ac55d5dc83
> --- usr.sbin/vmd/vmd.h
> +++ usr.sbin/vmd/vmd.h
> @@ -149,6 +149,7 @@ enum imsg_type {
>  	/* Device Operation Messages */
>  	IMSG_DEVOP_HOSTMAC,
>  	IMSG_DEVOP_MSG,
> +	IMSG_DEVOP_VIONET_MSG,
>  };
>
>  struct vmop_result {
> @@ -410,7 +411,13 @@ enum pipe_msg_type {
>  	I8253_RESET_CHAN_2 = 2,
>  	NS8250_ZERO_READ,
>  	NS8250_RATELIMIT,
> -	MC146818_RESCHEDULE_PER
> +	MC146818_RESCHEDULE_PER,
> +	VIRTIO_NOTIFY,
> +	VIRTIO_RAISE_IRQ,
> +	VIRTIO_THREAD_START,
> +	VIRTIO_THREAD_PAUSE,
> +	VIRTIO_THREAD_STOP,
> +	VIRTIO_THREAD_ACK,
>  };
>
>  static inline struct sockaddr_in *
> @@ -495,6 +502,8 @@ int	 read_mem(paddr_t, void *buf, size_t);
>  int	 start_vm(struct vmd_vm *, int);
>  __dead void vm_shutdown(unsigned int);
>  void	 vm_pipe_init(struct vm_dev_pipe *, void (*)(int, short, void *));
> +void	 vm_pipe_init2(struct vm_dev_pipe *, void (*)(int, short, void *),
> +	    void *);
>  void	 vm_pipe_send(struct vm_dev_pipe *, enum pipe_msg_type);
>  enum pipe_msg_type vm_pipe_recv(struct vm_dev_pipe *);
>  int	 write_mem(paddr_t, const void *buf, size_t);