From: Dave Voutila Subject: Re: multithreading vmd's vionet To: Dave Voutila Cc: "tech@openbsd.org" , Peter Hessler Date: Fri, 16 Feb 2024 14:10:03 -0500 Still hoping for some test reports. Dave Voutila writes: > Dave Voutila writes: > >> Now that my rewrite of the virtio network device is in, I wanted to >> share a new diff for testing. > > Updated diff below that applies to the latest tree and includes some > changes/additions outlined below as well. So far have a positive report > from phessler@, but since I've made some additions would appreciate more > tests. > > Since I falied to mention last time, one of the problems this > multithreading solves is starving the RX side of the device when under > heavy TX demand. If you run iperf3 in bidirectional mode between the > host and a guest, very rapidly the RX bandwidth drops to 0. By letting > the RX side run independently, we let the spice flow. > >> >> Still tweaking things and needs some pause/unpause/etc. lifecycle >> testing, but the general idea is where I want it: >> >> - splits rx, tx, and general event handling into 3 threads so send and >> receive processing can happen in parallel >> - consequently, extends some of the ipc functions to accept explicit >> libevent(3) event bases since each thread needs their own >> - uses unidirectional pipes as simple channels, similar to how the vm >> process does one-way communication between the vcpu thread and device >> threads. >> > - adds pthread rwlocks to protect device configuration state during > config register reads/writes by the multiple threads > >> A variation of this diff has been tested by some folks already and the >> general feedback has been it reduces average latency. >> > > diff refs/heads/master refs/heads/vmd-vionet-threads > commit - 634073900323bdd83ca3b5eb0732098812c0aa24 > commit + 2412c45704e90fb80d01ebc5b030cff1d3d9bc44 > blob - 380b35412d0d7f633bfc983aeed8bf7c4b37d8a0 > blob + c5f1a72f10a32af6c963d6f76dfa8dad459638e9 > --- usr.sbin/vmd/proc.c > +++ usr.sbin/vmd/proc.c > @@ -685,10 +685,15 @@ proc_dispatch_null(int fd, struct privsep_proc *p, str > /* > * imsg helper functions > */ > - > void > imsg_event_add(struct imsgev *iev) > { > + imsg_event_add2(iev, NULL); > +} > + > +void > +imsg_event_add2(struct imsgev *iev, struct event_base *ev_base) > +{ > if (iev->handler == NULL) { > imsg_flush(&iev->ibuf); > return; > @@ -700,6 +705,8 @@ imsg_event_add(struct imsgev *iev) > > event_del(&iev->ev); > event_set(&iev->ev, iev->ibuf.fd, iev->events, iev->handler, iev->data); > + if (ev_base != NULL) > + event_base_set(ev_base, &iev->ev); > event_add(&iev->ev, NULL); > } > > @@ -707,12 +714,20 @@ int > imsg_compose_event(struct imsgev *iev, uint16_t type, uint32_t peerid, > pid_t pid, int fd, void *data, uint16_t datalen) > { > + return imsg_compose_event2(iev, type, peerid, pid, fd, data, datalen, > + NULL); > +} > + > +int > +imsg_compose_event2(struct imsgev *iev, uint16_t type, uint32_t peerid, > + pid_t pid, int fd, void *data, uint16_t datalen, struct event_base *ev_base) > +{ > int ret; > > if ((ret = imsg_compose(&iev->ibuf, type, peerid, > pid, fd, data, datalen)) == -1) > return (ret); > - imsg_event_add(iev); > + imsg_event_add2(iev, ev_base); > return (ret); > } > > blob - 5e000af95c69d08c93ebec70842314052eff58b9 > blob + 9b81237a1dba5e803dc1b6fbcd52c538b0956698 > --- usr.sbin/vmd/proc.h > +++ usr.sbin/vmd/proc.h > @@ -160,8 +160,11 @@ void proc_run(struct privsep *, struct privsep_proc * > struct privsep_proc *, unsigned int, > void (*)(struct privsep *, struct privsep_proc *, void *), void *); > void imsg_event_add(struct imsgev *); > +void imsg_event_add2(struct imsgev *, struct event_base *); > int imsg_compose_event(struct imsgev *, uint16_t, uint32_t, > pid_t, int, void *, uint16_t); > +int imsg_compose_event2(struct imsgev *, uint16_t, uint32_t, > + pid_t, int, void *, uint16_t, struct event_base *); > int imsg_composev_event(struct imsgev *, uint16_t, uint32_t, > pid_t, int, const struct iovec *, int); > int proc_compose_imsg(struct privsep *, enum privsep_procid, int, > blob - d97e5cc113aa0e3f28991f148491c1786a24c661 > blob + d0a107a6061c303178f0fc0739fac344798ec437 > --- usr.sbin/vmd/vioblk.c > +++ usr.sbin/vmd/vioblk.c > @@ -167,7 +167,7 @@ vioblk_main(int fd, int fd_vmm) > /* Wire up an async imsg channel. */ > log_debug("%s: wiring in async vm event handler (fd=%d)", __func__, > dev.async_fd); > - if (vm_device_pipe(&dev, dev_dispatch_vm)) { > + if (vm_device_pipe(&dev, dev_dispatch_vm, NULL)) { > ret = EIO; > log_warnx("vm_device_pipe"); > goto fail; > blob - 0de50784bb2ad2d66d486f7a04bc1bfeecd78861 > blob + 83a73cb6500fe8aa802ecb91a2b46c4ac227f8b0 > --- usr.sbin/vmd/vionet.c > +++ usr.sbin/vmd/vionet.c > @@ -29,6 +29,8 @@ > #include > #include > #include > +#include > +#include > #include > #include > #include > @@ -49,6 +51,8 @@ struct packet { > size_t len; > }; > > +static void *rx_run_loop(void *); > +static void *tx_run_loop(void *); > static int vionet_rx(struct vionet_dev *, int); > static ssize_t vionet_rx_copy(struct vionet_dev *, int, const struct iovec *, > int, size_t); > @@ -57,22 +61,37 @@ static ssize_t vionet_rx_zerocopy(struct vionet_dev *, > static void vionet_rx_event(int, short, void *); > static uint32_t handle_io_read(struct viodev_msg *, struct virtio_dev *, > int8_t *); > -static int handle_io_write(struct viodev_msg *, struct virtio_dev *); > -static int vionet_notify_tx(struct virtio_dev *); > -static int vionet_notifyq(struct virtio_dev *); > - > +static void handle_io_write(struct viodev_msg *, struct virtio_dev *); > +static int vionet_tx(struct virtio_dev *); > +static void vionet_notifyq(struct virtio_dev *); > static void dev_dispatch_vm(int, short, void *); > static void handle_sync_io(int, short, void *); > +static void read_pipe_main(int, short, void *); > +static void read_pipe_rx(int, short, void *); > +static void read_pipe_tx(int, short, void *); > +static void vionet_assert_pic_irq(struct virtio_dev *); > +static void vionet_deassert_pic_irq(struct virtio_dev *); > > /* Device Globals */ > struct event ev_tap; > struct event ev_inject; > +struct event_base *ev_base_main; > +struct event_base *ev_base_rx; > +struct event_base *ev_base_tx; > +pthread_t rx_thread; > +pthread_t tx_thread; > +struct vm_dev_pipe pipe_main; > +struct vm_dev_pipe pipe_rx; > +struct vm_dev_pipe pipe_tx; > int pipe_inject[2]; > #define READ 0 > #define WRITE 1 > struct iovec iov_rx[VIONET_QUEUE_SIZE]; > struct iovec iov_tx[VIONET_QUEUE_SIZE]; > +pthread_rwlock_t lock = NULL; /* Guards device config state. */ > > +/* Transient reset state used by the main thread to coordinate device reset. */ > +int resetting = 0; > > __dead void > vionet_main(int fd, int fd_vmm) > @@ -176,30 +195,51 @@ vionet_main(int fd, int fd_vmm) > goto fail; > } > > + /* Initialize inter-thread communication channels. */ > + vm_pipe_init2(&pipe_main, read_pipe_main, &dev); > + vm_pipe_init2(&pipe_rx, read_pipe_rx, &dev); > + vm_pipe_init2(&pipe_tx, read_pipe_tx, &dev); > + > + /* Initialize RX and TX threads . */ > + ret = pthread_create(&rx_thread, NULL, rx_run_loop, &dev); > + if (ret) { > + errno = ret; > + log_warn("%s: failed to initialize rx thread", __func__); > + goto fail; > + } > + pthread_set_name_np(rx_thread, "rx"); > + ret = pthread_create(&tx_thread, NULL, tx_run_loop, &dev); > + if (ret) { > + errno = ret; > + log_warn("%s: failed to initialize tx thread", __func__); > + goto fail; > + } > + pthread_set_name_np(tx_thread, "tx"); > + > + /* Initialize our rwlock for guarding shared device state. */ > + ret = pthread_rwlock_init(&lock, NULL); > + if (ret) { > + errno = ret; > + log_warn("%s: failed to initialize rwlock", __func__); > + goto fail; > + } > + > /* Initialize libevent so we can start wiring event handlers. */ > - event_init(); > + ev_base_main = event_base_new(); > > + /* Add our handler for receiving messages from the RX/TX threads. */ > + event_base_set(ev_base_main, &pipe_main.read_ev); > + event_add(&pipe_main.read_ev, NULL); > + > /* Wire up an async imsg channel. */ > log_debug("%s: wiring in async vm event handler (fd=%d)", __func__, > dev.async_fd); > - if (vm_device_pipe(&dev, dev_dispatch_vm)) { > + if (vm_device_pipe(&dev, dev_dispatch_vm, ev_base_main)) { > ret = EIO; > log_warnx("vm_device_pipe"); > goto fail; > } > > - /* Wire up event handling for the tap fd. */ > - log_debug("%s: wiring in tap fd handler (fd=%d)", __func__, > - vionet->data_fd); > - event_set(&ev_tap, vionet->data_fd, EV_READ | EV_PERSIST, > - vionet_rx_event, &dev); > - > - /* Add an event for injected packets. */ > - log_debug("%s: wiring in packet injection handler (fd=%d)", __func__, > - pipe_inject[READ]); > - event_set(&ev_inject, pipe_inject[READ], EV_READ | EV_PERSIST, > - vionet_rx_event, &dev); > - > /* Configure our sync channel event handler. */ > log_debug("%s: wiring in sync channel handler (fd=%d)", __func__, > dev.sync_fd); > @@ -207,32 +247,46 @@ vionet_main(int fd, int fd_vmm) > dev.sync_iev.handler = handle_sync_io; > dev.sync_iev.data = &dev; > dev.sync_iev.events = EV_READ; > - imsg_event_add(&dev.sync_iev); > + imsg_event_add2(&dev.sync_iev, ev_base_main); > > /* Send a ready message over the sync channel. */ > log_debug("%s: telling vm %s device is ready", __func__, vcp->vcp_name); > memset(&msg, 0, sizeof(msg)); > msg.type = VIODEV_MSG_READY; > - imsg_compose_event(&dev.sync_iev, IMSG_DEVOP_MSG, 0, 0, -1, &msg, > - sizeof(msg)); > + imsg_compose_event2(&dev.sync_iev, IMSG_DEVOP_MSG, 0, 0, -1, &msg, > + sizeof(msg), ev_base_main); > > /* Send a ready message over the async channel. */ > log_debug("%s: sending async ready message", __func__); > - ret = imsg_compose_event(&dev.async_iev, IMSG_DEVOP_MSG, 0, 0, -1, > - &msg, sizeof(msg)); > + ret = imsg_compose_event2(&dev.async_iev, IMSG_DEVOP_MSG, 0, 0, -1, > + &msg, sizeof(msg), ev_base_main); > if (ret == -1) { > log_warnx("%s: failed to send async ready message!", __func__); > goto fail; > } > > /* Engage the event loop! */ > - ret = event_dispatch(); > + ret = event_base_dispatch(ev_base_main); > + event_base_free(ev_base_main); > > + /* Try stopping the rx & tx threads cleanly by messaging them. */ > + vm_pipe_send(&pipe_rx, VIRTIO_THREAD_STOP); > + vm_pipe_send(&pipe_tx, VIRTIO_THREAD_STOP); > + > + /* Wait for threads to stop. */ > + pthread_join(rx_thread, NULL); > + pthread_join(tx_thread, NULL); > + pthread_rwlock_destroy(&lock); > + > /* Cleanup */ > if (ret == 0) { > close_fd(dev.sync_fd); > close_fd(dev.async_fd); > close_fd(vionet->data_fd); > + close_fd(pipe_main.read); > + close_fd(pipe_main.write); > + close_fd(pipe_rx.write); > + close_fd(pipe_tx.write); > close_fd(pipe_inject[READ]); > close_fd(pipe_inject[WRITE]); > _exit(ret); > @@ -253,7 +307,8 @@ fail: > close_fd(pipe_inject[WRITE]); > if (vionet != NULL) > close_fd(vionet->data_fd); > - > + if (lock != NULL) > + pthread_rwlock_destroy(&lock); > _exit(ret); > } > > @@ -329,9 +384,10 @@ vionet_rx(struct vionet_dev *dev, int fd) > struct iovec *iov; > int notify = 0; > ssize_t sz; > + uint8_t status = 0; > > - if (!(dev->cfg.device_status > - & VIRTIO_CONFIG_DEVICE_STATUS_DRIVER_OK)) { > + status = dev->cfg.device_status & VIRTIO_CONFIG_DEVICE_STATUS_DRIVER_OK; > + if (status != VIRTIO_CONFIG_DEVICE_STATUS_DRIVER_OK) { > log_warnx("%s: driver not ready", __func__); > return (0); > } > @@ -453,16 +509,12 @@ vionet_rx(struct vionet_dev *dev, int fd) > if (idx != vq_info->last_avail && > !(avail->flags & VRING_AVAIL_F_NO_INTERRUPT)) { > notify = 1; > - dev->cfg.isr_status |= 1; > } > > vq_info->last_avail = idx; > return (notify); > reset: > - log_warnx("%s: requesting device reset", __func__); > - dev->cfg.device_status |= DEVICE_NEEDS_RESET; > - dev->cfg.isr_status |= VIRTIO_CONFIG_ISR_CONFIG_CHANGE; > - return (1); > + return (-1); > } > > /* > @@ -603,27 +655,49 @@ vionet_rx_zerocopy(struct vionet_dev *dev, int fd, con > static void > vionet_rx_event(int fd, short event, void *arg) > { > - struct virtio_dev *dev = (struct virtio_dev *)arg; > + struct virtio_dev *dev = (struct virtio_dev *)arg; > + struct vionet_dev *vionet = &dev->vionet; > + int ret = 0; > > if (!(event & EV_READ)) > fatalx("%s: invalid event type", __func__); > > - if (vionet_rx(&dev->vionet, fd) > 0) > - virtio_assert_pic_irq(dev, 0); > + pthread_rwlock_rdlock(&lock); > + ret = vionet_rx(vionet, fd); > + pthread_rwlock_unlock(&lock); > + > + if (ret == 0) { > + /* Nothing to do. */ > + return; > + } > + > + pthread_rwlock_wrlock(&lock); > + if (ret == 1) { > + /* Notify the driver. */ > + vionet->cfg.isr_status |= 1; > + } else { > + /* Need a reset. Something went wrong. */ > + log_warnx("%s: requesting device reset", __func__); > + vionet->cfg.device_status |= DEVICE_NEEDS_RESET; > + vionet->cfg.isr_status |= VIRTIO_CONFIG_ISR_CONFIG_CHANGE; > + } > + pthread_rwlock_unlock(&lock); > + > + vm_pipe_send(&pipe_main, VIRTIO_RAISE_IRQ); > } > > -static int > +static void > vionet_notifyq(struct virtio_dev *dev) > { > - struct vionet_dev *vionet = &dev->vionet; > + struct vionet_dev *vionet = &dev->vionet; > > switch (vionet->cfg.queue_notify) { > case RXQ: > - event_add(&ev_tap, NULL); > - event_add(&ev_inject, NULL); > + vm_pipe_send(&pipe_rx, VIRTIO_NOTIFY); > break; > case TXQ: > - return vionet_notify_tx(dev); > + vm_pipe_send(&pipe_tx, VIRTIO_NOTIFY); > + break; > default: > /* > * Catch the unimplemented queue ID 2 (control queue) as > @@ -633,12 +707,10 @@ vionet_notifyq(struct virtio_dev *dev) > __func__, vionet->cfg.queue_notify); > break; > } > - > - return (0); > } > > static int > -vionet_notify_tx(struct virtio_dev *dev) > +vionet_tx(struct virtio_dev *dev) > { > uint16_t idx, hdr_idx; > size_t chain_len, iov_cnt; > @@ -653,9 +725,11 @@ vionet_notify_tx(struct virtio_dev *dev) > struct ether_header *eh; > struct iovec *iov; > struct packet pkt; > + uint8_t status = 0; > > - if (!(vionet->cfg.device_status > - & VIRTIO_CONFIG_DEVICE_STATUS_DRIVER_OK)) { > + status = vionet->cfg.device_status > + & VIRTIO_CONFIG_DEVICE_STATUS_DRIVER_OK; > + if (status != VIRTIO_CONFIG_DEVICE_STATUS_DRIVER_OK) { > log_warnx("%s: driver not ready", __func__); > return (0); > } > @@ -806,18 +880,14 @@ drop: > } > > if (idx != vq_info->last_avail && > - !(avail->flags & VRING_AVAIL_F_NO_INTERRUPT)) { > + !(avail->flags & VRING_AVAIL_F_NO_INTERRUPT)) > notify = 1; > - vionet->cfg.isr_status |= 1; > - } > > + > vq_info->last_avail = idx; > return (notify); > reset: > - log_warnx("%s: requesting device reset", __func__); > - vionet->cfg.device_status |= DEVICE_NEEDS_RESET; > - vionet->cfg.isr_status |= VIRTIO_CONFIG_ISR_CONFIG_CHANGE; > - return (1); > + return (-1); > } > > static void > @@ -830,6 +900,7 @@ dev_dispatch_vm(int fd, short event, void *arg) > struct imsg imsg; > ssize_t n = 0; > int verbose; > + uint8_t status = 0; > > if (dev == NULL) > fatalx("%s: missing vionet pointer", __func__); > @@ -841,7 +912,7 @@ dev_dispatch_vm(int fd, short event, void *arg) > /* this pipe is dead, so remove the event handler */ > log_debug("%s: pipe dead (EV_READ)", __func__); > event_del(&iev->ev); > - event_loopexit(NULL); > + event_base_loopexit(ev_base_main, NULL); > return; > } > } > @@ -853,7 +924,7 @@ dev_dispatch_vm(int fd, short event, void *arg) > /* this pipe is dead, so remove the event handler */ > log_debug("%s: pipe dead (EV_WRITE)", __func__); > event_del(&iev->ev); > - event_loopexit(NULL); > + event_base_loopexit(ev_base_main, NULL); > return; > } > } > @@ -873,16 +944,16 @@ dev_dispatch_vm(int fd, short event, void *arg) > break; > case IMSG_VMDOP_PAUSE_VM: > log_debug("%s: pausing", __func__); > - event_del(&ev_tap); > - event_del(&ev_inject); > + vm_pipe_send(&pipe_rx, VIRTIO_THREAD_PAUSE); > break; > case IMSG_VMDOP_UNPAUSE_VM: > log_debug("%s: unpausing", __func__); > - if (vionet->cfg.device_status > - & VIRTIO_CONFIG_DEVICE_STATUS_DRIVER_OK) { > - event_add(&ev_tap, NULL); > - event_add(&ev_inject, NULL); > - } > + pthread_rwlock_rdlock(&lock); > + status = vionet->cfg.device_status & > + VIRTIO_CONFIG_DEVICE_STATUS_DRIVER_OK; > + pthread_rwlock_unlock(&lock); > + if (status) > + vm_pipe_send(&pipe_rx, VIRTIO_THREAD_START); > break; > case IMSG_CTL_VERBOSE: > IMSG_SIZE_CHECK(&imsg, &verbose); > @@ -892,7 +963,7 @@ dev_dispatch_vm(int fd, short event, void *arg) > } > imsg_free(&imsg); > } > - imsg_event_add(iev); > + imsg_event_add2(iev, ev_base_main); > } > > /* > @@ -917,7 +988,7 @@ handle_sync_io(int fd, short event, void *arg) > /* this pipe is dead, so remove the event handler */ > log_debug("%s: pipe dead (EV_READ)", __func__); > event_del(&iev->ev); > - event_loopexit(NULL); > + event_base_loopexit(ev_base_main, NULL); > return; > } > } > @@ -929,7 +1000,7 @@ handle_sync_io(int fd, short event, void *arg) > /* this pipe is dead, so remove the event handler */ > log_debug("%s: pipe dead (EV_WRITE)", __func__); > event_del(&iev->ev); > - event_loopexit(NULL); > + event_base_loopexit(ev_base_main, NULL); > return; > } > } > @@ -959,34 +1030,33 @@ handle_sync_io(int fd, short event, void *arg) > msg.data = handle_io_read(&msg, dev, &intr); > msg.data_valid = 1; > msg.state = intr; > - imsg_compose_event(iev, IMSG_DEVOP_MSG, 0, 0, -1, &msg, > - sizeof(msg)); > + imsg_compose_event2(iev, IMSG_DEVOP_MSG, 0, 0, -1, &msg, > + sizeof(msg), ev_base_main); > break; > case VIODEV_MSG_IO_WRITE: > /* Write IO: no reply needed */ > - if (handle_io_write(&msg, dev) == 1) > - virtio_assert_pic_irq(dev, 0); > + handle_io_write(&msg, dev); > break; > case VIODEV_MSG_SHUTDOWN: > event_del(&dev->sync_iev.ev); > - event_del(&ev_tap); > - event_del(&ev_inject); > - event_loopbreak(); > + event_base_loopbreak(ev_base_main); > return; > default: > fatalx("%s: invalid msg type %d", __func__, msg.type); > } > } > - imsg_event_add(iev); > + imsg_event_add2(iev, ev_base_main); > } > > -static int > +static void > handle_io_write(struct viodev_msg *msg, struct virtio_dev *dev) > { > - struct vionet_dev *vionet = &dev->vionet; > - uint32_t data = msg->data; > - int intr = 0; > + struct vionet_dev *vionet = &dev->vionet; > + uint32_t data = msg->data; > + int pause_devices = 0; > > + pthread_rwlock_wrlock(&lock); > + > switch (msg->reg) { > case VIRTIO_CONFIG_DEVICE_FEATURES: > case VIRTIO_CONFIG_QUEUE_SIZE: > @@ -1007,35 +1077,25 @@ handle_io_write(struct viodev_msg *msg, struct virtio_ > break; > case VIRTIO_CONFIG_QUEUE_NOTIFY: > vionet->cfg.queue_notify = data; > - if (vionet_notifyq(dev)) > - intr = 1; > + vionet_notifyq(dev); > break; > case VIRTIO_CONFIG_DEVICE_STATUS: > - vionet->cfg.device_status = data; > - if (vionet->cfg.device_status == 0) { > - vionet->cfg.guest_feature = 0; > - > - vionet->cfg.queue_pfn = 0; > - vionet_update_qa(vionet); > - > - vionet->cfg.queue_size = 0; > - vionet_update_qs(vionet); > - > - vionet->cfg.queue_select = 0; > - vionet->cfg.queue_notify = 0; > - vionet->cfg.isr_status = 0; > - vionet->vq[RXQ].last_avail = 0; > - vionet->vq[RXQ].notified_avail = 0; > - vionet->vq[TXQ].last_avail = 0; > - vionet->vq[TXQ].notified_avail = 0; > - virtio_deassert_pic_irq(dev, msg->vcpu); > - > - event_del(&ev_tap); > - event_del(&ev_inject); > + if (data == 0) { > + resetting = 2; /* Wait on two acks: rx & tx */ > + pause_devices = 1; > + } else { > + // XXX is this correct? > + vionet->cfg.device_status = data; > } > break; > } > - return (intr); > + > + pthread_rwlock_unlock(&lock); > + if (pause_devices) { > + vionet_deassert_pic_irq(dev); > + vm_pipe_send(&pipe_rx, VIRTIO_THREAD_PAUSE); > + vm_pipe_send(&pipe_tx, VIRTIO_THREAD_PAUSE); > + } > } > > static uint32_t > @@ -1044,6 +1104,8 @@ handle_io_read(struct viodev_msg *msg, struct virtio_d > struct vionet_dev *vionet = &dev->vionet; > uint32_t data; > > + pthread_rwlock_rdlock(&lock); > + > switch (msg->reg) { > case VIRTIO_CONFIG_DEVICE_CONFIG_NOMSI: > case VIRTIO_CONFIG_DEVICE_CONFIG_NOMSI + 1: > @@ -1076,14 +1138,263 @@ handle_io_read(struct viodev_msg *msg, struct virtio_d > data = vionet->cfg.device_status; > break; > case VIRTIO_CONFIG_ISR_STATUS: > + pthread_rwlock_unlock(&lock); > + pthread_rwlock_wrlock(&lock); > data = vionet->cfg.isr_status; > vionet->cfg.isr_status = 0; > if (intr != NULL) > *intr = INTR_STATE_DEASSERT; > break; > default: > - return (0xFFFFFFFF); > + data = 0xFFFFFFFF; > } > > + pthread_rwlock_unlock(&lock); > return (data); > } > + > +/* > + * Handle the rx side processing, communicating to the main thread via pipe. > + */ > +static void * > +rx_run_loop(void *arg) > +{ > + struct virtio_dev *dev = (struct virtio_dev *)arg; > + struct vionet_dev *vionet = &dev->vionet; > + int ret; > + > + ev_base_rx = event_base_new(); > + > + /* Wire up event handling for the tap fd. */ > + event_set(&ev_tap, vionet->data_fd, EV_READ | EV_PERSIST, > + vionet_rx_event, dev); > + event_base_set(ev_base_rx, &ev_tap); > + > + /* Wire up event handling for the packet injection pipe. */ > + event_set(&ev_inject, pipe_inject[READ], EV_READ | EV_PERSIST, > + vionet_rx_event, dev); > + event_base_set(ev_base_rx, &ev_inject); > + > + /* Wire up event handling for our inter-thread communication channel. */ > + event_base_set(ev_base_rx, &pipe_rx.read_ev); > + event_add(&pipe_rx.read_ev, NULL); > + > + /* Begin our event loop with our channel event active. */ > + ret = event_base_dispatch(ev_base_rx); > + event_base_free(ev_base_rx); > + > + log_debug("%s: exiting (%d)", __func__, ret); > + > + close_fd(pipe_rx.read); > + close_fd(pipe_inject[READ]); > + > + return (NULL); > +} > + > +/* > + * Handle the tx side processing, communicating to the main thread via pipe. > + */ > +static void * > +tx_run_loop(void *arg) > +{ > + int ret; > + > + ev_base_tx = event_base_new(); > + > + /* Wire up event handling for our inter-thread communication channel. */ > + event_base_set(ev_base_tx, &pipe_tx.read_ev); > + event_add(&pipe_tx.read_ev, NULL); > + > + /* Begin our event loop with our channel event active. */ > + ret = event_base_dispatch(ev_base_tx); > + event_base_free(ev_base_tx); > + > + log_debug("%s: exiting (%d)", __func__, ret); > + > + close_fd(pipe_tx.read); > + > + return (NULL); > +} > + > +/* > + * Read events sent by the main thread to the rx thread. > + */ > +static void > +read_pipe_rx(int fd, short event, void *arg) > +{ > + enum pipe_msg_type msg; > + > + if (!(event & EV_READ)) > + fatalx("%s: invalid event type", __func__); > + > + msg = vm_pipe_recv(&pipe_rx); > + > + switch (msg) { > + case VIRTIO_NOTIFY: > + case VIRTIO_THREAD_START: > + event_add(&ev_tap, NULL); > + event_add(&ev_inject, NULL); > + break; > + case VIRTIO_THREAD_PAUSE: > + event_del(&ev_tap); > + event_del(&ev_inject); > + vm_pipe_send(&pipe_main, VIRTIO_THREAD_ACK); > + break; > + case VIRTIO_THREAD_STOP: > + event_del(&ev_tap); > + event_del(&ev_inject); > + event_base_loopexit(ev_base_rx, NULL); > + break; > + default: > + fatalx("%s: invalid channel message: %d", __func__, msg); > + } > +} > + > +/* > + * Read events sent by the main thread to the tx thread. > + */ > +static void > +read_pipe_tx(int fd, short event, void *arg) > +{ > + struct virtio_dev *dev = (struct virtio_dev*)arg; > + struct vionet_dev *vionet = &dev->vionet; > + enum pipe_msg_type msg; > + int ret = 0; > + > + if (!(event & EV_READ)) > + fatalx("%s: invalid event type", __func__); > + > + msg = vm_pipe_recv(&pipe_tx); > + > + switch (msg) { > + case VIRTIO_NOTIFY: > + pthread_rwlock_rdlock(&lock); > + ret = vionet_tx(dev); > + pthread_rwlock_unlock(&lock); > + break; > + case VIRTIO_THREAD_START: > + /* Ignore Start messages. */ > + break; > + case VIRTIO_THREAD_PAUSE: > + /* > + * Nothing to do when pausing on the tx side, but ACK so main > + * thread knows we're not transmitting. > + */ > + vm_pipe_send(&pipe_main, VIRTIO_THREAD_ACK); > + break; > + case VIRTIO_THREAD_STOP: > + event_base_loopexit(ev_base_tx, NULL); > + break; > + default: > + fatalx("%s: invalid channel message: %d", __func__, msg); > + } > + > + if (ret == 0) { > + /* No notification needed. Return early. */ > + return; > + } > + > + pthread_rwlock_wrlock(&lock); > + if (ret == 1) { > + /* Notify the driver. */ > + vionet->cfg.isr_status |= 1; > + } else { > + /* Need a reset. Something went wrong. */ > + log_warnx("%s: requesting device reset", __func__); > + vionet->cfg.device_status |= DEVICE_NEEDS_RESET; > + vionet->cfg.isr_status |= VIRTIO_CONFIG_ISR_CONFIG_CHANGE; > + } > + pthread_rwlock_unlock(&lock); > + > + vm_pipe_send(&pipe_main, VIRTIO_RAISE_IRQ); > +} > + > +/* > + * Read events sent by the rx/tx threads to the main thread. > + */ > +static void > +read_pipe_main(int fd, short event, void *arg) > +{ > + struct virtio_dev *dev = (struct virtio_dev*)arg; > + struct vionet_dev *vionet = &dev->vionet; > + enum pipe_msg_type msg; > + > + if (!(event & EV_READ)) > + fatalx("%s: invalid event type", __func__); > + > + msg = vm_pipe_recv(&pipe_main); > + switch (msg) { > + case VIRTIO_RAISE_IRQ: > + vionet_assert_pic_irq(dev); > + break; > + case VIRTIO_THREAD_ACK: > + resetting--; > + if (resetting == 0) { > + log_debug("%s: resetting virtio network device %d", > + __func__, vionet->idx); > + > + pthread_rwlock_wrlock(&lock); > + vionet->cfg.device_status = 0; > + vionet->cfg.guest_feature = 0; > + vionet->cfg.queue_pfn = 0; > + vionet_update_qa(vionet); > + vionet->cfg.queue_size = 0; > + vionet_update_qs(vionet); > + vionet->cfg.queue_select = 0; > + vionet->cfg.queue_notify = 0; > + vionet->cfg.isr_status = 0; > + vionet->vq[RXQ].last_avail = 0; > + vionet->vq[RXQ].notified_avail = 0; > + vionet->vq[TXQ].last_avail = 0; > + vionet->vq[TXQ].notified_avail = 0; > + pthread_rwlock_unlock(&lock); > + } > + break; > + default: > + fatalx("%s: invalid channel msg: %d", __func__, msg); > + } > +} > + > +/* > + * Message the vm process asking to raise the irq. Must be called from the main > + * thread. > + */ > +static void > +vionet_assert_pic_irq(struct virtio_dev *dev) > +{ > + struct viodev_msg msg; > + int ret; > + > + memset(&msg, 0, sizeof(msg)); > + msg.irq = dev->irq; > + msg.vcpu = 0; // XXX > + msg.type = VIODEV_MSG_KICK; > + msg.state = INTR_STATE_ASSERT; > + > + ret = imsg_compose_event2(&dev->async_iev, IMSG_DEVOP_MSG, 0, 0, -1, > + &msg, sizeof(msg), ev_base_main); > + if (ret == -1) > + log_warnx("%s: failed to assert irq %d", __func__, dev->irq); > +} > + > +/* > + * Message the vm process asking to lower the irq. Must be called from the main > + * thread. > + */ > +static void > +vionet_deassert_pic_irq(struct virtio_dev *dev) > +{ > + struct viodev_msg msg; > + int ret; > + > + memset(&msg, 0, sizeof(msg)); > + msg.irq = dev->irq; > + msg.vcpu = 0; // XXX > + msg.type = VIODEV_MSG_KICK; > + msg.state = INTR_STATE_DEASSERT; > + > + ret = imsg_compose_event2(&dev->async_iev, IMSG_DEVOP_MSG, 0, 0, -1, > + &msg, sizeof(msg), ev_base_main); > + if (ret == -1) > + log_warnx("%s: failed to assert irq %d", __func__, dev->irq); > +} > blob - 4b69f05e8a10b4276ae103ac2e386fa1289a670e > blob + 69ce382422267e8b4cc232756bec5166aad29bf6 > --- usr.sbin/vmd/virtio.c > +++ usr.sbin/vmd/virtio.c > @@ -1424,7 +1424,7 @@ virtio_dev_launch(struct vmd_vm *vm, struct virtio_dev > */ > dev->sync_fd = sync_fds[0]; > dev->async_fd = async_fds[0]; > - vm_device_pipe(dev, virtio_dispatch_dev); > + vm_device_pipe(dev, virtio_dispatch_dev, NULL); > } else { > /* Child */ > close_fd(async_fds[0]); > @@ -1502,7 +1502,8 @@ err: > * Initialize an async imsg channel for a virtio device. > */ > int > -vm_device_pipe(struct virtio_dev *dev, void (*cb)(int, short, void *)) > +vm_device_pipe(struct virtio_dev *dev, void (*cb)(int, short, void *), > + struct event_base *ev_base) > { > struct imsgev *iev = &dev->async_iev; > int fd = dev->async_fd; > @@ -1514,7 +1515,7 @@ vm_device_pipe(struct virtio_dev *dev, void (*cb)(int, > iev->handler = cb; > iev->data = dev; > iev->events = EV_READ; > - imsg_event_add(iev); > + imsg_event_add2(iev, ev_base); > > return (0); > } > blob - 1e51bd731a049e59c6fc4dac5f2f9486a7359649 > blob + acd58494dd145ed17245af4026a94531f5cfeda9 > --- usr.sbin/vmd/virtio.h > +++ usr.sbin/vmd/virtio.h > @@ -343,7 +343,8 @@ int virtio_restore(int, struct vmd_vm *, int, int[][VM > int *); > const char *virtio_reg_name(uint8_t); > uint32_t vring_size(uint32_t); > -int vm_device_pipe(struct virtio_dev *, void (*)(int, short, void *)); > +int vm_device_pipe(struct virtio_dev *, void (*)(int, short, void *), > + struct event_base *); > int virtio_pci_io(int, uint16_t, uint32_t *, uint8_t *, void *, uint8_t); > void virtio_assert_pic_irq(struct virtio_dev *, int); > void virtio_deassert_pic_irq(struct virtio_dev *, int); > blob - 772d044604d742116ff31e8e7e9ae4db059f6bf4 > blob + 2176de21c84445f112da6999fe43764be7e76e23 > --- usr.sbin/vmd/vm.c > +++ usr.sbin/vmd/vm.c > @@ -2436,18 +2436,25 @@ translate_gva(struct vm_exit* exit, uint64_t va, uint6 > return (0); > } > > +void > +vm_pipe_init(struct vm_dev_pipe *p, void (*cb)(int, short, void *)) > +{ > + vm_pipe_init2(p, cb, NULL); > +} > + > /* > - * vm_pipe_init > + * vm_pipe_init2 > * > * Initialize a vm_dev_pipe, setting up its file descriptors and its > - * event structure with the given callback. > + * event structure with the given callback and argument. > * > * Parameters: > * p: pointer to vm_dev_pipe struct to initizlize > * cb: callback to use for READ events on the read end of the pipe > + * arg: pointer to pass to the callback on event trigger > */ > void > -vm_pipe_init(struct vm_dev_pipe *p, void (*cb)(int, short, void *)) > +vm_pipe_init2(struct vm_dev_pipe *p, void (*cb)(int, short, void *), void *arg) > { > int ret; > int fds[2]; > @@ -2461,13 +2468,14 @@ vm_pipe_init(struct vm_dev_pipe *p, void (*cb)(int, sh > p->read = fds[0]; > p->write = fds[1]; > > - event_set(&p->read_ev, p->read, EV_READ | EV_PERSIST, cb, NULL); > + event_set(&p->read_ev, p->read, EV_READ | EV_PERSIST, cb, arg); > } > > /* > * vm_pipe_send > * > - * Send a message to an emulated device vie the provided vm_dev_pipe. > + * Send a message to an emulated device vie the provided vm_dev_pipe. This > + * relies on the fact sizeof(msg) < PIPE_BUF to ensure atomic writes. > * > * Parameters: > * p: pointer to initialized vm_dev_pipe > @@ -2486,7 +2494,8 @@ vm_pipe_send(struct vm_dev_pipe *p, enum pipe_msg_type > * vm_pipe_recv > * > * Receive a message for an emulated device via the provided vm_dev_pipe. > - * Returns the message value, otherwise will exit on failure. > + * Returns the message value, otherwise will exit on failure. This relies on > + * the fact sizeof(enum pipe_msg_type) < PIPE_BUF for atomic reads. > * > * Parameters: > * p: pointer to initialized vm_dev_pipe > blob - 19995e951c71db93cf3c794147bfdf3ea1bc6616 > blob + bd3b08973c86c3d799652569b1a052ac55d5dc83 > --- usr.sbin/vmd/vmd.h > +++ usr.sbin/vmd/vmd.h > @@ -149,6 +149,7 @@ enum imsg_type { > /* Device Operation Messages */ > IMSG_DEVOP_HOSTMAC, > IMSG_DEVOP_MSG, > + IMSG_DEVOP_VIONET_MSG, > }; > > struct vmop_result { > @@ -410,7 +411,13 @@ enum pipe_msg_type { > I8253_RESET_CHAN_2 = 2, > NS8250_ZERO_READ, > NS8250_RATELIMIT, > - MC146818_RESCHEDULE_PER > + MC146818_RESCHEDULE_PER, > + VIRTIO_NOTIFY, > + VIRTIO_RAISE_IRQ, > + VIRTIO_THREAD_START, > + VIRTIO_THREAD_PAUSE, > + VIRTIO_THREAD_STOP, > + VIRTIO_THREAD_ACK, > }; > > static inline struct sockaddr_in * > @@ -495,6 +502,8 @@ int read_mem(paddr_t, void *buf, size_t); > int start_vm(struct vmd_vm *, int); > __dead void vm_shutdown(unsigned int); > void vm_pipe_init(struct vm_dev_pipe *, void (*)(int, short, void *)); > +void vm_pipe_init2(struct vm_dev_pipe *, void (*)(int, short, void *), > + void *); > void vm_pipe_send(struct vm_dev_pipe *, enum pipe_msg_type); > enum pipe_msg_type vm_pipe_recv(struct vm_dev_pipe *); > int write_mem(paddr_t, const void *buf, size_t);