Download raw body.
multithreading vmd's vionet
Still hoping for some test reports.
Dave Voutila <dv@sisu.io> writes:
> Dave Voutila <dv@sisu.io> writes:
>
>> Now that my rewrite of the virtio network device is in, I wanted to
>> share a new diff for testing.
>
> Updated diff below that applies to the latest tree and includes some
> changes/additions outlined below as well. So far have a positive report
> from phessler@, but since I've made some additions would appreciate more
> tests.
>
> Since I falied to mention last time, one of the problems this
> multithreading solves is starving the RX side of the device when under
> heavy TX demand. If you run iperf3 in bidirectional mode between the
> host and a guest, very rapidly the RX bandwidth drops to 0. By letting
> the RX side run independently, we let the spice flow.
>
>>
>> Still tweaking things and needs some pause/unpause/etc. lifecycle
>> testing, but the general idea is where I want it:
>>
>> - splits rx, tx, and general event handling into 3 threads so send and
>> receive processing can happen in parallel
>> - consequently, extends some of the ipc functions to accept explicit
>> libevent(3) event bases since each thread needs their own
>> - uses unidirectional pipes as simple channels, similar to how the vm
>> process does one-way communication between the vcpu thread and device
>> threads.
>>
> - adds pthread rwlocks to protect device configuration state during
> config register reads/writes by the multiple threads
>
>> A variation of this diff has been tested by some folks already and the
>> general feedback has been it reduces average latency.
>>
>
> diff refs/heads/master refs/heads/vmd-vionet-threads
> commit - 634073900323bdd83ca3b5eb0732098812c0aa24
> commit + 2412c45704e90fb80d01ebc5b030cff1d3d9bc44
> blob - 380b35412d0d7f633bfc983aeed8bf7c4b37d8a0
> blob + c5f1a72f10a32af6c963d6f76dfa8dad459638e9
> --- usr.sbin/vmd/proc.c
> +++ usr.sbin/vmd/proc.c
> @@ -685,10 +685,15 @@ proc_dispatch_null(int fd, struct privsep_proc *p, str
> /*
> * imsg helper functions
> */
> -
> void
> imsg_event_add(struct imsgev *iev)
> {
> + imsg_event_add2(iev, NULL);
> +}
> +
> +void
> +imsg_event_add2(struct imsgev *iev, struct event_base *ev_base)
> +{
> if (iev->handler == NULL) {
> imsg_flush(&iev->ibuf);
> return;
> @@ -700,6 +705,8 @@ imsg_event_add(struct imsgev *iev)
>
> event_del(&iev->ev);
> event_set(&iev->ev, iev->ibuf.fd, iev->events, iev->handler, iev->data);
> + if (ev_base != NULL)
> + event_base_set(ev_base, &iev->ev);
> event_add(&iev->ev, NULL);
> }
>
> @@ -707,12 +714,20 @@ int
> imsg_compose_event(struct imsgev *iev, uint16_t type, uint32_t peerid,
> pid_t pid, int fd, void *data, uint16_t datalen)
> {
> + return imsg_compose_event2(iev, type, peerid, pid, fd, data, datalen,
> + NULL);
> +}
> +
> +int
> +imsg_compose_event2(struct imsgev *iev, uint16_t type, uint32_t peerid,
> + pid_t pid, int fd, void *data, uint16_t datalen, struct event_base *ev_base)
> +{
> int ret;
>
> if ((ret = imsg_compose(&iev->ibuf, type, peerid,
> pid, fd, data, datalen)) == -1)
> return (ret);
> - imsg_event_add(iev);
> + imsg_event_add2(iev, ev_base);
> return (ret);
> }
>
> blob - 5e000af95c69d08c93ebec70842314052eff58b9
> blob + 9b81237a1dba5e803dc1b6fbcd52c538b0956698
> --- usr.sbin/vmd/proc.h
> +++ usr.sbin/vmd/proc.h
> @@ -160,8 +160,11 @@ void proc_run(struct privsep *, struct privsep_proc *
> struct privsep_proc *, unsigned int,
> void (*)(struct privsep *, struct privsep_proc *, void *), void *);
> void imsg_event_add(struct imsgev *);
> +void imsg_event_add2(struct imsgev *, struct event_base *);
> int imsg_compose_event(struct imsgev *, uint16_t, uint32_t,
> pid_t, int, void *, uint16_t);
> +int imsg_compose_event2(struct imsgev *, uint16_t, uint32_t,
> + pid_t, int, void *, uint16_t, struct event_base *);
> int imsg_composev_event(struct imsgev *, uint16_t, uint32_t,
> pid_t, int, const struct iovec *, int);
> int proc_compose_imsg(struct privsep *, enum privsep_procid, int,
> blob - d97e5cc113aa0e3f28991f148491c1786a24c661
> blob + d0a107a6061c303178f0fc0739fac344798ec437
> --- usr.sbin/vmd/vioblk.c
> +++ usr.sbin/vmd/vioblk.c
> @@ -167,7 +167,7 @@ vioblk_main(int fd, int fd_vmm)
> /* Wire up an async imsg channel. */
> log_debug("%s: wiring in async vm event handler (fd=%d)", __func__,
> dev.async_fd);
> - if (vm_device_pipe(&dev, dev_dispatch_vm)) {
> + if (vm_device_pipe(&dev, dev_dispatch_vm, NULL)) {
> ret = EIO;
> log_warnx("vm_device_pipe");
> goto fail;
> blob - 0de50784bb2ad2d66d486f7a04bc1bfeecd78861
> blob + 83a73cb6500fe8aa802ecb91a2b46c4ac227f8b0
> --- usr.sbin/vmd/vionet.c
> +++ usr.sbin/vmd/vionet.c
> @@ -29,6 +29,8 @@
> #include <errno.h>
> #include <event.h>
> #include <fcntl.h>
> +#include <pthread.h>
> +#include <pthread_np.h>
> #include <stdlib.h>
> #include <string.h>
> #include <unistd.h>
> @@ -49,6 +51,8 @@ struct packet {
> size_t len;
> };
>
> +static void *rx_run_loop(void *);
> +static void *tx_run_loop(void *);
> static int vionet_rx(struct vionet_dev *, int);
> static ssize_t vionet_rx_copy(struct vionet_dev *, int, const struct iovec *,
> int, size_t);
> @@ -57,22 +61,37 @@ static ssize_t vionet_rx_zerocopy(struct vionet_dev *,
> static void vionet_rx_event(int, short, void *);
> static uint32_t handle_io_read(struct viodev_msg *, struct virtio_dev *,
> int8_t *);
> -static int handle_io_write(struct viodev_msg *, struct virtio_dev *);
> -static int vionet_notify_tx(struct virtio_dev *);
> -static int vionet_notifyq(struct virtio_dev *);
> -
> +static void handle_io_write(struct viodev_msg *, struct virtio_dev *);
> +static int vionet_tx(struct virtio_dev *);
> +static void vionet_notifyq(struct virtio_dev *);
> static void dev_dispatch_vm(int, short, void *);
> static void handle_sync_io(int, short, void *);
> +static void read_pipe_main(int, short, void *);
> +static void read_pipe_rx(int, short, void *);
> +static void read_pipe_tx(int, short, void *);
> +static void vionet_assert_pic_irq(struct virtio_dev *);
> +static void vionet_deassert_pic_irq(struct virtio_dev *);
>
> /* Device Globals */
> struct event ev_tap;
> struct event ev_inject;
> +struct event_base *ev_base_main;
> +struct event_base *ev_base_rx;
> +struct event_base *ev_base_tx;
> +pthread_t rx_thread;
> +pthread_t tx_thread;
> +struct vm_dev_pipe pipe_main;
> +struct vm_dev_pipe pipe_rx;
> +struct vm_dev_pipe pipe_tx;
> int pipe_inject[2];
> #define READ 0
> #define WRITE 1
> struct iovec iov_rx[VIONET_QUEUE_SIZE];
> struct iovec iov_tx[VIONET_QUEUE_SIZE];
> +pthread_rwlock_t lock = NULL; /* Guards device config state. */
>
> +/* Transient reset state used by the main thread to coordinate device reset. */
> +int resetting = 0;
>
> __dead void
> vionet_main(int fd, int fd_vmm)
> @@ -176,30 +195,51 @@ vionet_main(int fd, int fd_vmm)
> goto fail;
> }
>
> + /* Initialize inter-thread communication channels. */
> + vm_pipe_init2(&pipe_main, read_pipe_main, &dev);
> + vm_pipe_init2(&pipe_rx, read_pipe_rx, &dev);
> + vm_pipe_init2(&pipe_tx, read_pipe_tx, &dev);
> +
> + /* Initialize RX and TX threads . */
> + ret = pthread_create(&rx_thread, NULL, rx_run_loop, &dev);
> + if (ret) {
> + errno = ret;
> + log_warn("%s: failed to initialize rx thread", __func__);
> + goto fail;
> + }
> + pthread_set_name_np(rx_thread, "rx");
> + ret = pthread_create(&tx_thread, NULL, tx_run_loop, &dev);
> + if (ret) {
> + errno = ret;
> + log_warn("%s: failed to initialize tx thread", __func__);
> + goto fail;
> + }
> + pthread_set_name_np(tx_thread, "tx");
> +
> + /* Initialize our rwlock for guarding shared device state. */
> + ret = pthread_rwlock_init(&lock, NULL);
> + if (ret) {
> + errno = ret;
> + log_warn("%s: failed to initialize rwlock", __func__);
> + goto fail;
> + }
> +
> /* Initialize libevent so we can start wiring event handlers. */
> - event_init();
> + ev_base_main = event_base_new();
>
> + /* Add our handler for receiving messages from the RX/TX threads. */
> + event_base_set(ev_base_main, &pipe_main.read_ev);
> + event_add(&pipe_main.read_ev, NULL);
> +
> /* Wire up an async imsg channel. */
> log_debug("%s: wiring in async vm event handler (fd=%d)", __func__,
> dev.async_fd);
> - if (vm_device_pipe(&dev, dev_dispatch_vm)) {
> + if (vm_device_pipe(&dev, dev_dispatch_vm, ev_base_main)) {
> ret = EIO;
> log_warnx("vm_device_pipe");
> goto fail;
> }
>
> - /* Wire up event handling for the tap fd. */
> - log_debug("%s: wiring in tap fd handler (fd=%d)", __func__,
> - vionet->data_fd);
> - event_set(&ev_tap, vionet->data_fd, EV_READ | EV_PERSIST,
> - vionet_rx_event, &dev);
> -
> - /* Add an event for injected packets. */
> - log_debug("%s: wiring in packet injection handler (fd=%d)", __func__,
> - pipe_inject[READ]);
> - event_set(&ev_inject, pipe_inject[READ], EV_READ | EV_PERSIST,
> - vionet_rx_event, &dev);
> -
> /* Configure our sync channel event handler. */
> log_debug("%s: wiring in sync channel handler (fd=%d)", __func__,
> dev.sync_fd);
> @@ -207,32 +247,46 @@ vionet_main(int fd, int fd_vmm)
> dev.sync_iev.handler = handle_sync_io;
> dev.sync_iev.data = &dev;
> dev.sync_iev.events = EV_READ;
> - imsg_event_add(&dev.sync_iev);
> + imsg_event_add2(&dev.sync_iev, ev_base_main);
>
> /* Send a ready message over the sync channel. */
> log_debug("%s: telling vm %s device is ready", __func__, vcp->vcp_name);
> memset(&msg, 0, sizeof(msg));
> msg.type = VIODEV_MSG_READY;
> - imsg_compose_event(&dev.sync_iev, IMSG_DEVOP_MSG, 0, 0, -1, &msg,
> - sizeof(msg));
> + imsg_compose_event2(&dev.sync_iev, IMSG_DEVOP_MSG, 0, 0, -1, &msg,
> + sizeof(msg), ev_base_main);
>
> /* Send a ready message over the async channel. */
> log_debug("%s: sending async ready message", __func__);
> - ret = imsg_compose_event(&dev.async_iev, IMSG_DEVOP_MSG, 0, 0, -1,
> - &msg, sizeof(msg));
> + ret = imsg_compose_event2(&dev.async_iev, IMSG_DEVOP_MSG, 0, 0, -1,
> + &msg, sizeof(msg), ev_base_main);
> if (ret == -1) {
> log_warnx("%s: failed to send async ready message!", __func__);
> goto fail;
> }
>
> /* Engage the event loop! */
> - ret = event_dispatch();
> + ret = event_base_dispatch(ev_base_main);
> + event_base_free(ev_base_main);
>
> + /* Try stopping the rx & tx threads cleanly by messaging them. */
> + vm_pipe_send(&pipe_rx, VIRTIO_THREAD_STOP);
> + vm_pipe_send(&pipe_tx, VIRTIO_THREAD_STOP);
> +
> + /* Wait for threads to stop. */
> + pthread_join(rx_thread, NULL);
> + pthread_join(tx_thread, NULL);
> + pthread_rwlock_destroy(&lock);
> +
> /* Cleanup */
> if (ret == 0) {
> close_fd(dev.sync_fd);
> close_fd(dev.async_fd);
> close_fd(vionet->data_fd);
> + close_fd(pipe_main.read);
> + close_fd(pipe_main.write);
> + close_fd(pipe_rx.write);
> + close_fd(pipe_tx.write);
> close_fd(pipe_inject[READ]);
> close_fd(pipe_inject[WRITE]);
> _exit(ret);
> @@ -253,7 +307,8 @@ fail:
> close_fd(pipe_inject[WRITE]);
> if (vionet != NULL)
> close_fd(vionet->data_fd);
> -
> + if (lock != NULL)
> + pthread_rwlock_destroy(&lock);
> _exit(ret);
> }
>
> @@ -329,9 +384,10 @@ vionet_rx(struct vionet_dev *dev, int fd)
> struct iovec *iov;
> int notify = 0;
> ssize_t sz;
> + uint8_t status = 0;
>
> - if (!(dev->cfg.device_status
> - & VIRTIO_CONFIG_DEVICE_STATUS_DRIVER_OK)) {
> + status = dev->cfg.device_status & VIRTIO_CONFIG_DEVICE_STATUS_DRIVER_OK;
> + if (status != VIRTIO_CONFIG_DEVICE_STATUS_DRIVER_OK) {
> log_warnx("%s: driver not ready", __func__);
> return (0);
> }
> @@ -453,16 +509,12 @@ vionet_rx(struct vionet_dev *dev, int fd)
> if (idx != vq_info->last_avail &&
> !(avail->flags & VRING_AVAIL_F_NO_INTERRUPT)) {
> notify = 1;
> - dev->cfg.isr_status |= 1;
> }
>
> vq_info->last_avail = idx;
> return (notify);
> reset:
> - log_warnx("%s: requesting device reset", __func__);
> - dev->cfg.device_status |= DEVICE_NEEDS_RESET;
> - dev->cfg.isr_status |= VIRTIO_CONFIG_ISR_CONFIG_CHANGE;
> - return (1);
> + return (-1);
> }
>
> /*
> @@ -603,27 +655,49 @@ vionet_rx_zerocopy(struct vionet_dev *dev, int fd, con
> static void
> vionet_rx_event(int fd, short event, void *arg)
> {
> - struct virtio_dev *dev = (struct virtio_dev *)arg;
> + struct virtio_dev *dev = (struct virtio_dev *)arg;
> + struct vionet_dev *vionet = &dev->vionet;
> + int ret = 0;
>
> if (!(event & EV_READ))
> fatalx("%s: invalid event type", __func__);
>
> - if (vionet_rx(&dev->vionet, fd) > 0)
> - virtio_assert_pic_irq(dev, 0);
> + pthread_rwlock_rdlock(&lock);
> + ret = vionet_rx(vionet, fd);
> + pthread_rwlock_unlock(&lock);
> +
> + if (ret == 0) {
> + /* Nothing to do. */
> + return;
> + }
> +
> + pthread_rwlock_wrlock(&lock);
> + if (ret == 1) {
> + /* Notify the driver. */
> + vionet->cfg.isr_status |= 1;
> + } else {
> + /* Need a reset. Something went wrong. */
> + log_warnx("%s: requesting device reset", __func__);
> + vionet->cfg.device_status |= DEVICE_NEEDS_RESET;
> + vionet->cfg.isr_status |= VIRTIO_CONFIG_ISR_CONFIG_CHANGE;
> + }
> + pthread_rwlock_unlock(&lock);
> +
> + vm_pipe_send(&pipe_main, VIRTIO_RAISE_IRQ);
> }
>
> -static int
> +static void
> vionet_notifyq(struct virtio_dev *dev)
> {
> - struct vionet_dev *vionet = &dev->vionet;
> + struct vionet_dev *vionet = &dev->vionet;
>
> switch (vionet->cfg.queue_notify) {
> case RXQ:
> - event_add(&ev_tap, NULL);
> - event_add(&ev_inject, NULL);
> + vm_pipe_send(&pipe_rx, VIRTIO_NOTIFY);
> break;
> case TXQ:
> - return vionet_notify_tx(dev);
> + vm_pipe_send(&pipe_tx, VIRTIO_NOTIFY);
> + break;
> default:
> /*
> * Catch the unimplemented queue ID 2 (control queue) as
> @@ -633,12 +707,10 @@ vionet_notifyq(struct virtio_dev *dev)
> __func__, vionet->cfg.queue_notify);
> break;
> }
> -
> - return (0);
> }
>
> static int
> -vionet_notify_tx(struct virtio_dev *dev)
> +vionet_tx(struct virtio_dev *dev)
> {
> uint16_t idx, hdr_idx;
> size_t chain_len, iov_cnt;
> @@ -653,9 +725,11 @@ vionet_notify_tx(struct virtio_dev *dev)
> struct ether_header *eh;
> struct iovec *iov;
> struct packet pkt;
> + uint8_t status = 0;
>
> - if (!(vionet->cfg.device_status
> - & VIRTIO_CONFIG_DEVICE_STATUS_DRIVER_OK)) {
> + status = vionet->cfg.device_status
> + & VIRTIO_CONFIG_DEVICE_STATUS_DRIVER_OK;
> + if (status != VIRTIO_CONFIG_DEVICE_STATUS_DRIVER_OK) {
> log_warnx("%s: driver not ready", __func__);
> return (0);
> }
> @@ -806,18 +880,14 @@ drop:
> }
>
> if (idx != vq_info->last_avail &&
> - !(avail->flags & VRING_AVAIL_F_NO_INTERRUPT)) {
> + !(avail->flags & VRING_AVAIL_F_NO_INTERRUPT))
> notify = 1;
> - vionet->cfg.isr_status |= 1;
> - }
>
> +
> vq_info->last_avail = idx;
> return (notify);
> reset:
> - log_warnx("%s: requesting device reset", __func__);
> - vionet->cfg.device_status |= DEVICE_NEEDS_RESET;
> - vionet->cfg.isr_status |= VIRTIO_CONFIG_ISR_CONFIG_CHANGE;
> - return (1);
> + return (-1);
> }
>
> static void
> @@ -830,6 +900,7 @@ dev_dispatch_vm(int fd, short event, void *arg)
> struct imsg imsg;
> ssize_t n = 0;
> int verbose;
> + uint8_t status = 0;
>
> if (dev == NULL)
> fatalx("%s: missing vionet pointer", __func__);
> @@ -841,7 +912,7 @@ dev_dispatch_vm(int fd, short event, void *arg)
> /* this pipe is dead, so remove the event handler */
> log_debug("%s: pipe dead (EV_READ)", __func__);
> event_del(&iev->ev);
> - event_loopexit(NULL);
> + event_base_loopexit(ev_base_main, NULL);
> return;
> }
> }
> @@ -853,7 +924,7 @@ dev_dispatch_vm(int fd, short event, void *arg)
> /* this pipe is dead, so remove the event handler */
> log_debug("%s: pipe dead (EV_WRITE)", __func__);
> event_del(&iev->ev);
> - event_loopexit(NULL);
> + event_base_loopexit(ev_base_main, NULL);
> return;
> }
> }
> @@ -873,16 +944,16 @@ dev_dispatch_vm(int fd, short event, void *arg)
> break;
> case IMSG_VMDOP_PAUSE_VM:
> log_debug("%s: pausing", __func__);
> - event_del(&ev_tap);
> - event_del(&ev_inject);
> + vm_pipe_send(&pipe_rx, VIRTIO_THREAD_PAUSE);
> break;
> case IMSG_VMDOP_UNPAUSE_VM:
> log_debug("%s: unpausing", __func__);
> - if (vionet->cfg.device_status
> - & VIRTIO_CONFIG_DEVICE_STATUS_DRIVER_OK) {
> - event_add(&ev_tap, NULL);
> - event_add(&ev_inject, NULL);
> - }
> + pthread_rwlock_rdlock(&lock);
> + status = vionet->cfg.device_status &
> + VIRTIO_CONFIG_DEVICE_STATUS_DRIVER_OK;
> + pthread_rwlock_unlock(&lock);
> + if (status)
> + vm_pipe_send(&pipe_rx, VIRTIO_THREAD_START);
> break;
> case IMSG_CTL_VERBOSE:
> IMSG_SIZE_CHECK(&imsg, &verbose);
> @@ -892,7 +963,7 @@ dev_dispatch_vm(int fd, short event, void *arg)
> }
> imsg_free(&imsg);
> }
> - imsg_event_add(iev);
> + imsg_event_add2(iev, ev_base_main);
> }
>
> /*
> @@ -917,7 +988,7 @@ handle_sync_io(int fd, short event, void *arg)
> /* this pipe is dead, so remove the event handler */
> log_debug("%s: pipe dead (EV_READ)", __func__);
> event_del(&iev->ev);
> - event_loopexit(NULL);
> + event_base_loopexit(ev_base_main, NULL);
> return;
> }
> }
> @@ -929,7 +1000,7 @@ handle_sync_io(int fd, short event, void *arg)
> /* this pipe is dead, so remove the event handler */
> log_debug("%s: pipe dead (EV_WRITE)", __func__);
> event_del(&iev->ev);
> - event_loopexit(NULL);
> + event_base_loopexit(ev_base_main, NULL);
> return;
> }
> }
> @@ -959,34 +1030,33 @@ handle_sync_io(int fd, short event, void *arg)
> msg.data = handle_io_read(&msg, dev, &intr);
> msg.data_valid = 1;
> msg.state = intr;
> - imsg_compose_event(iev, IMSG_DEVOP_MSG, 0, 0, -1, &msg,
> - sizeof(msg));
> + imsg_compose_event2(iev, IMSG_DEVOP_MSG, 0, 0, -1, &msg,
> + sizeof(msg), ev_base_main);
> break;
> case VIODEV_MSG_IO_WRITE:
> /* Write IO: no reply needed */
> - if (handle_io_write(&msg, dev) == 1)
> - virtio_assert_pic_irq(dev, 0);
> + handle_io_write(&msg, dev);
> break;
> case VIODEV_MSG_SHUTDOWN:
> event_del(&dev->sync_iev.ev);
> - event_del(&ev_tap);
> - event_del(&ev_inject);
> - event_loopbreak();
> + event_base_loopbreak(ev_base_main);
> return;
> default:
> fatalx("%s: invalid msg type %d", __func__, msg.type);
> }
> }
> - imsg_event_add(iev);
> + imsg_event_add2(iev, ev_base_main);
> }
>
> -static int
> +static void
> handle_io_write(struct viodev_msg *msg, struct virtio_dev *dev)
> {
> - struct vionet_dev *vionet = &dev->vionet;
> - uint32_t data = msg->data;
> - int intr = 0;
> + struct vionet_dev *vionet = &dev->vionet;
> + uint32_t data = msg->data;
> + int pause_devices = 0;
>
> + pthread_rwlock_wrlock(&lock);
> +
> switch (msg->reg) {
> case VIRTIO_CONFIG_DEVICE_FEATURES:
> case VIRTIO_CONFIG_QUEUE_SIZE:
> @@ -1007,35 +1077,25 @@ handle_io_write(struct viodev_msg *msg, struct virtio_
> break;
> case VIRTIO_CONFIG_QUEUE_NOTIFY:
> vionet->cfg.queue_notify = data;
> - if (vionet_notifyq(dev))
> - intr = 1;
> + vionet_notifyq(dev);
> break;
> case VIRTIO_CONFIG_DEVICE_STATUS:
> - vionet->cfg.device_status = data;
> - if (vionet->cfg.device_status == 0) {
> - vionet->cfg.guest_feature = 0;
> -
> - vionet->cfg.queue_pfn = 0;
> - vionet_update_qa(vionet);
> -
> - vionet->cfg.queue_size = 0;
> - vionet_update_qs(vionet);
> -
> - vionet->cfg.queue_select = 0;
> - vionet->cfg.queue_notify = 0;
> - vionet->cfg.isr_status = 0;
> - vionet->vq[RXQ].last_avail = 0;
> - vionet->vq[RXQ].notified_avail = 0;
> - vionet->vq[TXQ].last_avail = 0;
> - vionet->vq[TXQ].notified_avail = 0;
> - virtio_deassert_pic_irq(dev, msg->vcpu);
> -
> - event_del(&ev_tap);
> - event_del(&ev_inject);
> + if (data == 0) {
> + resetting = 2; /* Wait on two acks: rx & tx */
> + pause_devices = 1;
> + } else {
> + // XXX is this correct?
> + vionet->cfg.device_status = data;
> }
> break;
> }
> - return (intr);
> +
> + pthread_rwlock_unlock(&lock);
> + if (pause_devices) {
> + vionet_deassert_pic_irq(dev);
> + vm_pipe_send(&pipe_rx, VIRTIO_THREAD_PAUSE);
> + vm_pipe_send(&pipe_tx, VIRTIO_THREAD_PAUSE);
> + }
> }
>
> static uint32_t
> @@ -1044,6 +1104,8 @@ handle_io_read(struct viodev_msg *msg, struct virtio_d
> struct vionet_dev *vionet = &dev->vionet;
> uint32_t data;
>
> + pthread_rwlock_rdlock(&lock);
> +
> switch (msg->reg) {
> case VIRTIO_CONFIG_DEVICE_CONFIG_NOMSI:
> case VIRTIO_CONFIG_DEVICE_CONFIG_NOMSI + 1:
> @@ -1076,14 +1138,263 @@ handle_io_read(struct viodev_msg *msg, struct virtio_d
> data = vionet->cfg.device_status;
> break;
> case VIRTIO_CONFIG_ISR_STATUS:
> + pthread_rwlock_unlock(&lock);
> + pthread_rwlock_wrlock(&lock);
> data = vionet->cfg.isr_status;
> vionet->cfg.isr_status = 0;
> if (intr != NULL)
> *intr = INTR_STATE_DEASSERT;
> break;
> default:
> - return (0xFFFFFFFF);
> + data = 0xFFFFFFFF;
> }
>
> + pthread_rwlock_unlock(&lock);
> return (data);
> }
> +
> +/*
> + * Handle the rx side processing, communicating to the main thread via pipe.
> + */
> +static void *
> +rx_run_loop(void *arg)
> +{
> + struct virtio_dev *dev = (struct virtio_dev *)arg;
> + struct vionet_dev *vionet = &dev->vionet;
> + int ret;
> +
> + ev_base_rx = event_base_new();
> +
> + /* Wire up event handling for the tap fd. */
> + event_set(&ev_tap, vionet->data_fd, EV_READ | EV_PERSIST,
> + vionet_rx_event, dev);
> + event_base_set(ev_base_rx, &ev_tap);
> +
> + /* Wire up event handling for the packet injection pipe. */
> + event_set(&ev_inject, pipe_inject[READ], EV_READ | EV_PERSIST,
> + vionet_rx_event, dev);
> + event_base_set(ev_base_rx, &ev_inject);
> +
> + /* Wire up event handling for our inter-thread communication channel. */
> + event_base_set(ev_base_rx, &pipe_rx.read_ev);
> + event_add(&pipe_rx.read_ev, NULL);
> +
> + /* Begin our event loop with our channel event active. */
> + ret = event_base_dispatch(ev_base_rx);
> + event_base_free(ev_base_rx);
> +
> + log_debug("%s: exiting (%d)", __func__, ret);
> +
> + close_fd(pipe_rx.read);
> + close_fd(pipe_inject[READ]);
> +
> + return (NULL);
> +}
> +
> +/*
> + * Handle the tx side processing, communicating to the main thread via pipe.
> + */
> +static void *
> +tx_run_loop(void *arg)
> +{
> + int ret;
> +
> + ev_base_tx = event_base_new();
> +
> + /* Wire up event handling for our inter-thread communication channel. */
> + event_base_set(ev_base_tx, &pipe_tx.read_ev);
> + event_add(&pipe_tx.read_ev, NULL);
> +
> + /* Begin our event loop with our channel event active. */
> + ret = event_base_dispatch(ev_base_tx);
> + event_base_free(ev_base_tx);
> +
> + log_debug("%s: exiting (%d)", __func__, ret);
> +
> + close_fd(pipe_tx.read);
> +
> + return (NULL);
> +}
> +
> +/*
> + * Read events sent by the main thread to the rx thread.
> + */
> +static void
> +read_pipe_rx(int fd, short event, void *arg)
> +{
> + enum pipe_msg_type msg;
> +
> + if (!(event & EV_READ))
> + fatalx("%s: invalid event type", __func__);
> +
> + msg = vm_pipe_recv(&pipe_rx);
> +
> + switch (msg) {
> + case VIRTIO_NOTIFY:
> + case VIRTIO_THREAD_START:
> + event_add(&ev_tap, NULL);
> + event_add(&ev_inject, NULL);
> + break;
> + case VIRTIO_THREAD_PAUSE:
> + event_del(&ev_tap);
> + event_del(&ev_inject);
> + vm_pipe_send(&pipe_main, VIRTIO_THREAD_ACK);
> + break;
> + case VIRTIO_THREAD_STOP:
> + event_del(&ev_tap);
> + event_del(&ev_inject);
> + event_base_loopexit(ev_base_rx, NULL);
> + break;
> + default:
> + fatalx("%s: invalid channel message: %d", __func__, msg);
> + }
> +}
> +
> +/*
> + * Read events sent by the main thread to the tx thread.
> + */
> +static void
> +read_pipe_tx(int fd, short event, void *arg)
> +{
> + struct virtio_dev *dev = (struct virtio_dev*)arg;
> + struct vionet_dev *vionet = &dev->vionet;
> + enum pipe_msg_type msg;
> + int ret = 0;
> +
> + if (!(event & EV_READ))
> + fatalx("%s: invalid event type", __func__);
> +
> + msg = vm_pipe_recv(&pipe_tx);
> +
> + switch (msg) {
> + case VIRTIO_NOTIFY:
> + pthread_rwlock_rdlock(&lock);
> + ret = vionet_tx(dev);
> + pthread_rwlock_unlock(&lock);
> + break;
> + case VIRTIO_THREAD_START:
> + /* Ignore Start messages. */
> + break;
> + case VIRTIO_THREAD_PAUSE:
> + /*
> + * Nothing to do when pausing on the tx side, but ACK so main
> + * thread knows we're not transmitting.
> + */
> + vm_pipe_send(&pipe_main, VIRTIO_THREAD_ACK);
> + break;
> + case VIRTIO_THREAD_STOP:
> + event_base_loopexit(ev_base_tx, NULL);
> + break;
> + default:
> + fatalx("%s: invalid channel message: %d", __func__, msg);
> + }
> +
> + if (ret == 0) {
> + /* No notification needed. Return early. */
> + return;
> + }
> +
> + pthread_rwlock_wrlock(&lock);
> + if (ret == 1) {
> + /* Notify the driver. */
> + vionet->cfg.isr_status |= 1;
> + } else {
> + /* Need a reset. Something went wrong. */
> + log_warnx("%s: requesting device reset", __func__);
> + vionet->cfg.device_status |= DEVICE_NEEDS_RESET;
> + vionet->cfg.isr_status |= VIRTIO_CONFIG_ISR_CONFIG_CHANGE;
> + }
> + pthread_rwlock_unlock(&lock);
> +
> + vm_pipe_send(&pipe_main, VIRTIO_RAISE_IRQ);
> +}
> +
> +/*
> + * Read events sent by the rx/tx threads to the main thread.
> + */
> +static void
> +read_pipe_main(int fd, short event, void *arg)
> +{
> + struct virtio_dev *dev = (struct virtio_dev*)arg;
> + struct vionet_dev *vionet = &dev->vionet;
> + enum pipe_msg_type msg;
> +
> + if (!(event & EV_READ))
> + fatalx("%s: invalid event type", __func__);
> +
> + msg = vm_pipe_recv(&pipe_main);
> + switch (msg) {
> + case VIRTIO_RAISE_IRQ:
> + vionet_assert_pic_irq(dev);
> + break;
> + case VIRTIO_THREAD_ACK:
> + resetting--;
> + if (resetting == 0) {
> + log_debug("%s: resetting virtio network device %d",
> + __func__, vionet->idx);
> +
> + pthread_rwlock_wrlock(&lock);
> + vionet->cfg.device_status = 0;
> + vionet->cfg.guest_feature = 0;
> + vionet->cfg.queue_pfn = 0;
> + vionet_update_qa(vionet);
> + vionet->cfg.queue_size = 0;
> + vionet_update_qs(vionet);
> + vionet->cfg.queue_select = 0;
> + vionet->cfg.queue_notify = 0;
> + vionet->cfg.isr_status = 0;
> + vionet->vq[RXQ].last_avail = 0;
> + vionet->vq[RXQ].notified_avail = 0;
> + vionet->vq[TXQ].last_avail = 0;
> + vionet->vq[TXQ].notified_avail = 0;
> + pthread_rwlock_unlock(&lock);
> + }
> + break;
> + default:
> + fatalx("%s: invalid channel msg: %d", __func__, msg);
> + }
> +}
> +
> +/*
> + * Message the vm process asking to raise the irq. Must be called from the main
> + * thread.
> + */
> +static void
> +vionet_assert_pic_irq(struct virtio_dev *dev)
> +{
> + struct viodev_msg msg;
> + int ret;
> +
> + memset(&msg, 0, sizeof(msg));
> + msg.irq = dev->irq;
> + msg.vcpu = 0; // XXX
> + msg.type = VIODEV_MSG_KICK;
> + msg.state = INTR_STATE_ASSERT;
> +
> + ret = imsg_compose_event2(&dev->async_iev, IMSG_DEVOP_MSG, 0, 0, -1,
> + &msg, sizeof(msg), ev_base_main);
> + if (ret == -1)
> + log_warnx("%s: failed to assert irq %d", __func__, dev->irq);
> +}
> +
> +/*
> + * Message the vm process asking to lower the irq. Must be called from the main
> + * thread.
> + */
> +static void
> +vionet_deassert_pic_irq(struct virtio_dev *dev)
> +{
> + struct viodev_msg msg;
> + int ret;
> +
> + memset(&msg, 0, sizeof(msg));
> + msg.irq = dev->irq;
> + msg.vcpu = 0; // XXX
> + msg.type = VIODEV_MSG_KICK;
> + msg.state = INTR_STATE_DEASSERT;
> +
> + ret = imsg_compose_event2(&dev->async_iev, IMSG_DEVOP_MSG, 0, 0, -1,
> + &msg, sizeof(msg), ev_base_main);
> + if (ret == -1)
> + log_warnx("%s: failed to assert irq %d", __func__, dev->irq);
> +}
> blob - 4b69f05e8a10b4276ae103ac2e386fa1289a670e
> blob + 69ce382422267e8b4cc232756bec5166aad29bf6
> --- usr.sbin/vmd/virtio.c
> +++ usr.sbin/vmd/virtio.c
> @@ -1424,7 +1424,7 @@ virtio_dev_launch(struct vmd_vm *vm, struct virtio_dev
> */
> dev->sync_fd = sync_fds[0];
> dev->async_fd = async_fds[0];
> - vm_device_pipe(dev, virtio_dispatch_dev);
> + vm_device_pipe(dev, virtio_dispatch_dev, NULL);
> } else {
> /* Child */
> close_fd(async_fds[0]);
> @@ -1502,7 +1502,8 @@ err:
> * Initialize an async imsg channel for a virtio device.
> */
> int
> -vm_device_pipe(struct virtio_dev *dev, void (*cb)(int, short, void *))
> +vm_device_pipe(struct virtio_dev *dev, void (*cb)(int, short, void *),
> + struct event_base *ev_base)
> {
> struct imsgev *iev = &dev->async_iev;
> int fd = dev->async_fd;
> @@ -1514,7 +1515,7 @@ vm_device_pipe(struct virtio_dev *dev, void (*cb)(int,
> iev->handler = cb;
> iev->data = dev;
> iev->events = EV_READ;
> - imsg_event_add(iev);
> + imsg_event_add2(iev, ev_base);
>
> return (0);
> }
> blob - 1e51bd731a049e59c6fc4dac5f2f9486a7359649
> blob + acd58494dd145ed17245af4026a94531f5cfeda9
> --- usr.sbin/vmd/virtio.h
> +++ usr.sbin/vmd/virtio.h
> @@ -343,7 +343,8 @@ int virtio_restore(int, struct vmd_vm *, int, int[][VM
> int *);
> const char *virtio_reg_name(uint8_t);
> uint32_t vring_size(uint32_t);
> -int vm_device_pipe(struct virtio_dev *, void (*)(int, short, void *));
> +int vm_device_pipe(struct virtio_dev *, void (*)(int, short, void *),
> + struct event_base *);
> int virtio_pci_io(int, uint16_t, uint32_t *, uint8_t *, void *, uint8_t);
> void virtio_assert_pic_irq(struct virtio_dev *, int);
> void virtio_deassert_pic_irq(struct virtio_dev *, int);
> blob - 772d044604d742116ff31e8e7e9ae4db059f6bf4
> blob + 2176de21c84445f112da6999fe43764be7e76e23
> --- usr.sbin/vmd/vm.c
> +++ usr.sbin/vmd/vm.c
> @@ -2436,18 +2436,25 @@ translate_gva(struct vm_exit* exit, uint64_t va, uint6
> return (0);
> }
>
> +void
> +vm_pipe_init(struct vm_dev_pipe *p, void (*cb)(int, short, void *))
> +{
> + vm_pipe_init2(p, cb, NULL);
> +}
> +
> /*
> - * vm_pipe_init
> + * vm_pipe_init2
> *
> * Initialize a vm_dev_pipe, setting up its file descriptors and its
> - * event structure with the given callback.
> + * event structure with the given callback and argument.
> *
> * Parameters:
> * p: pointer to vm_dev_pipe struct to initizlize
> * cb: callback to use for READ events on the read end of the pipe
> + * arg: pointer to pass to the callback on event trigger
> */
> void
> -vm_pipe_init(struct vm_dev_pipe *p, void (*cb)(int, short, void *))
> +vm_pipe_init2(struct vm_dev_pipe *p, void (*cb)(int, short, void *), void *arg)
> {
> int ret;
> int fds[2];
> @@ -2461,13 +2468,14 @@ vm_pipe_init(struct vm_dev_pipe *p, void (*cb)(int, sh
> p->read = fds[0];
> p->write = fds[1];
>
> - event_set(&p->read_ev, p->read, EV_READ | EV_PERSIST, cb, NULL);
> + event_set(&p->read_ev, p->read, EV_READ | EV_PERSIST, cb, arg);
> }
>
> /*
> * vm_pipe_send
> *
> - * Send a message to an emulated device vie the provided vm_dev_pipe.
> + * Send a message to an emulated device vie the provided vm_dev_pipe. This
> + * relies on the fact sizeof(msg) < PIPE_BUF to ensure atomic writes.
> *
> * Parameters:
> * p: pointer to initialized vm_dev_pipe
> @@ -2486,7 +2494,8 @@ vm_pipe_send(struct vm_dev_pipe *p, enum pipe_msg_type
> * vm_pipe_recv
> *
> * Receive a message for an emulated device via the provided vm_dev_pipe.
> - * Returns the message value, otherwise will exit on failure.
> + * Returns the message value, otherwise will exit on failure. This relies on
> + * the fact sizeof(enum pipe_msg_type) < PIPE_BUF for atomic reads.
> *
> * Parameters:
> * p: pointer to initialized vm_dev_pipe
> blob - 19995e951c71db93cf3c794147bfdf3ea1bc6616
> blob + bd3b08973c86c3d799652569b1a052ac55d5dc83
> --- usr.sbin/vmd/vmd.h
> +++ usr.sbin/vmd/vmd.h
> @@ -149,6 +149,7 @@ enum imsg_type {
> /* Device Operation Messages */
> IMSG_DEVOP_HOSTMAC,
> IMSG_DEVOP_MSG,
> + IMSG_DEVOP_VIONET_MSG,
> };
>
> struct vmop_result {
> @@ -410,7 +411,13 @@ enum pipe_msg_type {
> I8253_RESET_CHAN_2 = 2,
> NS8250_ZERO_READ,
> NS8250_RATELIMIT,
> - MC146818_RESCHEDULE_PER
> + MC146818_RESCHEDULE_PER,
> + VIRTIO_NOTIFY,
> + VIRTIO_RAISE_IRQ,
> + VIRTIO_THREAD_START,
> + VIRTIO_THREAD_PAUSE,
> + VIRTIO_THREAD_STOP,
> + VIRTIO_THREAD_ACK,
> };
>
> static inline struct sockaddr_in *
> @@ -495,6 +502,8 @@ int read_mem(paddr_t, void *buf, size_t);
> int start_vm(struct vmd_vm *, int);
> __dead void vm_shutdown(unsigned int);
> void vm_pipe_init(struct vm_dev_pipe *, void (*)(int, short, void *));
> +void vm_pipe_init2(struct vm_dev_pipe *, void (*)(int, short, void *),
> + void *);
> void vm_pipe_send(struct vm_dev_pipe *, enum pipe_msg_type);
> enum pipe_msg_type vm_pipe_recv(struct vm_dev_pipe *);
> int write_mem(paddr_t, const void *buf, size_t);
multithreading vmd's vionet