From: Vitaliy Makkoveev Subject: Re: Unlock udp(4) somove() To: Alexander Bluhm Cc: tech@openbsd.org Date: Fri, 19 Jul 2024 19:21:40 +0300 On Fri, Jul 19, 2024 at 03:15:34PM +0200, Alexander Bluhm wrote: > On Sun, Jul 14, 2024 at 07:09:19PM +0300, Vitaliy Makkoveev wrote: > > > When I introduced socket splicing, my experience was that a single > > > task makes TCP faster, and direct somove makes UDP faster. > > > > > > The idea is that TCP data accumulates in the socket buffer. The > > > task calls yield() to slow things down, and moves huges chunks of > > > data with little overhead. > > > > > > For UDP on the other hand best performance is achieved when every > > > packet is sent immediately. Additional task and quueing is just > > > overhead. > > > > > > > This is obvious, but as I understand, the main goal was to avoid double > > crossing of kernel bounds and copying from and to userland. This diff > > still avoids it, but makes behaviour similar to non spliced UDP > > delivery. The question is which will be performance impact and is is > > acceptable in real-life application. This diff allows to easy make > > parallel UDP input, so it is interesting to see both diffs together. > > Have tested somove dgram. > http://bluhm.genua.de/perform/results/2024-07-17T19:31:54Z/perform.html > Most relevant is this graph > http://bluhm.genua.de/perform/results/2024-07-17T19:31:54Z/gnuplot/splice.html > > Columns are: > 1. current > 2. your somove dgram diff > 3. my udp input parallel diff > 4. your somove cobined with my udp input > > Note that 3 is not MP safe, but it does not crash and is suitable > for performance comparison. > > In most cases 2 is not worse than 1, 4 is not worse than 3. > So I think we can try to commit 2 then 4 as first step. > In the long term I would like to get back calling somove() directly > from sorwakeup() for udp. For very short packets this is faster. > > If you look at btrace kstack flamegraph you see the overhead of > somove task. > Diff 3: > http://bluhm.genua.de/perform/results/2024-07-17T19:31:54Z/patch-sys-udp-parallel-input.0/btrace/netbench.pl_-v_-B1000000000_-b1000000_-d1_-f0_-i3_-N10_-cperform%40lt13_-sperform%40lt16_-A10.3.46.40_-a10.3.46.60_-t10_udpsplice-btrace-kstack.0.svg > Diff 4: > http://bluhm.genua.de/perform/results/2024-07-17T19:31:54Z/patch-sys-udp-parallel-somove.0/btrace/netbench.pl_-v_-B1000000000_-b1000000_-d1_-f0_-i3_-N10_-cperform%40lt13_-sperform%40lt16_-A10.3.46.40_-a10.3.46.60_-t10_udpsplice-btrace-kstack.0.svg Thanks for testing this. Nice to hear, that the performance is comparable. > Some comments inline... > > > Index: sys/kern/uipc_socket.c > > =================================================================== > > RCS file: /cvs/src/sys/kern/uipc_socket.c,v > > diff -u -p -r1.337 uipc_socket.c > > --- sys/kern/uipc_socket.c 12 Jul 2024 17:20:18 -0000 1.337 > > +++ sys/kern/uipc_socket.c 14 Jul 2024 15:54:08 -0000 > > @@ -65,6 +65,7 @@ void sotask(void *); > > void soreaper(void *); > > void soput(void *); > > int somove(struct socket *, int); > > +int somove_dgram(struct socket *, int); > > somove_dgram() looks very much like somove(). I would prefer to > keep one function an as some mutex enter and leave. Combined them. I intentionally used direct mtx_enter(9) instead of sb_mtx_lock() wrapper. It was used already, so no reason to mix them. > > Could you rename so2 to soback? so2 is too generic, soback would > be clear to me. > No problem. > As you write in the XXXSMP comment, sblock() does not seem the > correct lock for this prupose. As I understand, sblock() should > be used to separate userland read and write access to a socket. I > think it is wrong in sofree(). But I don't have a good alternative. > May be XXXSMP is not correct here. Splicing belongs to socket buffers, so sblock() and `sb_mtx' mutexes should be used as protection. But this requires solock() re-locking in the soclose(), because sblock() should be taken first. UDP sockets rely on reference counters, but it is not appropriate for TCP, because sofree() can't release solock() while called from the stack. That's why I copy-pasted this block. I the future I want to rework sorele() logic to avoid refcnt_finalize() in sofree() and make unsplicing common for all sockets. So, I reworked this commentary as: + if (so->so_proto->pr_flags & PR_WANTRCVD) { + /* + * Copy - Paste, but can't relock an sleep in + * sofree() in tcp(4) case. That's why tcp(4) + * still rely on solock() for splicing and + * unsplicing. + */ + > Although the comment wiht softnet interrupt not coreect anymore, I > would like to keep the motivation for setting SB_SPLICE later. > Something like this? > > /* > * To prevent sorwakeup() calling somove() before this somove() > * has finished, the socket buffers are not marked as spliced yet. > */ > Done. > > Index: sys/netinet/udp_usrreq.c > > =================================================================== > > RCS file: /cvs/src/sys/netinet/udp_usrreq.c,v > > diff -u -p -r1.320 udp_usrreq.c > > --- sys/netinet/udp_usrreq.c 17 Apr 2024 20:48:51 -0000 1.320 > > +++ sys/netinet/udp_usrreq.c 14 Jul 2024 15:54:08 -0000 > > @@ -1241,6 +1241,11 @@ udp_send(struct socket *so, struct mbuf > > > > soassertlocked(so); > > > > + if (inp == NULL) { > > + /* PCB could be destroyed, but socket still spliced. */ > > + return (EINVAL); > > + } > > The EINVAL seems wrong. It is for invalid userland input or the > developer has no idea. Would EPIPE the correct one? Or would it > generate additional SIGPIPE? But userland has to cope with that > as multiple places set that is splicing breaks unexpectedly. It > is also documented in somove(9). > > tcp_sogetpcb() also uses EINVAL. But somove() converts SS_CANTSENDMORE > to EPIPE. I think we run into this path. > Before we split *_usrreq() handlers to many per-request handlers, EPIPE was the error value for the case whle PCB is not set, so I just returned it back. udp_usrreq(struct socket *so, int req, struct mbuf *m, struct mbuf *addr, struct mbuf *control, struct proc *p) { struct inpcb *inp; int error = 0; if (req == PRU_CONTROL) { } soassertlocked(so); inp = sotoinpcb(so); if (inp == NULL) { error = EINVAL; goto release; } I don't know is EPIPE correct here. What about to check `so_pcb' and set error to EPIPE in somove? if (error) { if (sosp->so_snd.sb_state & SS_CANTSENDMORE || sosp->so_pcb == NULL) error = EPIPE; goto release; } Index: sys/kern/uipc_socket.c =================================================================== RCS file: /cvs/src/sys/kern/uipc_socket.c,v diff -u -p -r1.338 uipc_socket.c --- sys/kern/uipc_socket.c 14 Jul 2024 15:42:23 -0000 1.338 +++ sys/kern/uipc_socket.c 19 Jul 2024 16:18:02 -0000 @@ -324,31 +324,22 @@ sofree(struct socket *so, int keep_lock) sounlock(head); } - if (persocket) { + switch (so->so_proto->pr_domain->dom_family) { + case AF_INET: + case AF_INET6: + if (so->so_proto->pr_type == SOCK_STREAM) + break; + /* FALLTHROUGH */ + default: sounlock(so); refcnt_finalize(&so->so_refcnt, "sofinal"); solock(so); + break; } sigio_free(&so->so_sigio); klist_free(&so->so_rcv.sb_klist); klist_free(&so->so_snd.sb_klist); -#ifdef SOCKET_SPLICE - if (issplicedback(so)) { - int freeing = SOSP_FREEING_WRITE; - - if (so->so_sp->ssp_soback == so) - freeing |= SOSP_FREEING_READ; - sounsplice(so->so_sp->ssp_soback, so, freeing); - } - if (isspliced(so)) { - int freeing = SOSP_FREEING_READ; - - if (so == so->so_sp->ssp_socket) - freeing |= SOSP_FREEING_WRITE; - sounsplice(so, so->so_sp->ssp_socket, freeing); - } -#endif /* SOCKET_SPLICE */ mtx_enter(&so->so_snd.sb_mtx); sbrelease(so, &so->so_snd); @@ -458,6 +449,83 @@ discard: if (so->so_state & SS_NOFDREF) panic("soclose NOFDREF: so %p, so_type %d", so, so->so_type); so->so_state |= SS_NOFDREF; + +#ifdef SOCKET_SPLICE + if (so->so_sp) { + struct socket *soback; + + if (so->so_proto->pr_flags & PR_WANTRCVD) { + /* + * Copy - Paste, but can't relock an sleep in + * sofree() in tcp(4) case. That's why tcp(4) + * still rely on solock() for splicing and + * unsplicing. + */ + + if (issplicedback(so)) { + int freeing = SOSP_FREEING_WRITE; + + if (so->so_sp->ssp_soback == so) + freeing |= SOSP_FREEING_READ; + sounsplice(so->so_sp->ssp_soback, so, freeing); + } + if (isspliced(so)) { + int freeing = SOSP_FREEING_READ; + + if (so == so->so_sp->ssp_socket) + freeing |= SOSP_FREEING_WRITE; + sounsplice(so, so->so_sp->ssp_socket, freeing); + } + goto free; + } + + sounlock(so); + mtx_enter(&so->so_snd.sb_mtx); + /* + * Concurrent sounsplice() locks `sb_mtx' mutextes on + * both `so_snd' and `so_rcv' before unsplice sockets. + */ + if ((soback = so->so_sp->ssp_soback) == NULL) { + mtx_leave(&so->so_snd.sb_mtx); + goto notsplicedback; + } + soref(soback); + mtx_leave(&so->so_snd.sb_mtx); + + sblock(&soback->so_rcv, SBL_WAIT | SBL_NOINTR); + /* + * sblock() is always taken on `so_rcv' before call + * sounsplice(). `so' is dying and can be only unspliced. + */ + if (issplicedback(so)) { + int freeing = SOSP_FREEING_WRITE; + + if (so->so_sp->ssp_soback == so) + freeing |= SOSP_FREEING_READ; + solock(soback); + sounsplice(so->so_sp->ssp_soback, so, freeing); + sounlock(soback); + } + sbunlock(&soback->so_rcv); + sorele(soback); + +notsplicedback: + sblock(&so->so_rcv, SBL_WAIT | SBL_NOINTR); + if (isspliced(so)) { + int freeing = SOSP_FREEING_READ; + + if (so == so->so_sp->ssp_socket) + freeing |= SOSP_FREEING_WRITE; + solock(so); + sounsplice(so, so->so_sp->ssp_socket, freeing); + sounlock(so); + } + sbunlock(&so->so_rcv); + + solock(so); + } +free: +#endif /* SOCKET_SPLICE */ /* sofree() calls sounlock(). */ sofree(so, 0); return (error); @@ -1411,14 +1479,6 @@ sosplice(struct socket *so, int fd, off_ goto release; } - /* Splice so and sosp together. */ - mtx_enter(&so->so_rcv.sb_mtx); - mtx_enter(&sosp->so_snd.sb_mtx); - so->so_sp->ssp_socket = sosp; - sosp->so_sp->ssp_soback = so; - mtx_leave(&sosp->so_snd.sb_mtx); - mtx_leave(&so->so_rcv.sb_mtx); - so->so_splicelen = 0; so->so_splicemax = max; if (tv) @@ -1429,9 +1489,20 @@ sosplice(struct socket *so, int fd, off_ task_set(&so->so_splicetask, sotask, so); /* - * To prevent softnet interrupt from calling somove() while - * we sleep, the socket buffers are not marked as spliced yet. + * To prevent sorwakeup() calling somove() before this somove() + * has finished, the socket buffers are not marked as spliced yet. */ + + /* Splice so and sosp together. */ + mtx_enter(&so->so_rcv.sb_mtx); + mtx_enter(&sosp->so_snd.sb_mtx); + so->so_sp->ssp_socket = sosp; + sosp->so_sp->ssp_soback = so; + mtx_leave(&sosp->so_snd.sb_mtx); + mtx_leave(&so->so_rcv.sb_mtx); + + if ((so->so_proto->pr_flags & PR_WANTRCVD) == 0) + sounlock(so); if (somove(so, M_WAIT)) { mtx_enter(&so->so_rcv.sb_mtx); mtx_enter(&sosp->so_snd.sb_mtx); @@ -1440,6 +1511,8 @@ sosplice(struct socket *so, int fd, off_ mtx_leave(&sosp->so_snd.sb_mtx); mtx_leave(&so->so_rcv.sb_mtx); } + if ((so->so_proto->pr_flags & PR_WANTRCVD) == 0) + solock(so); release: sounlock(so); @@ -1454,6 +1527,8 @@ sosplice(struct socket *so, int fd, off_ void sounsplice(struct socket *so, struct socket *sosp, int freeing) { + if ((so->so_proto->pr_flags & PR_WANTRCVD) == 0) + sbassertlocked(&so->so_rcv); soassertlocked(so); task_del(sosplice_taskq, &so->so_splicetask); @@ -1479,32 +1554,51 @@ soidle(void *arg) { struct socket *so = arg; + sblock(&so->so_rcv, SBL_WAIT | SBL_NOINTR); solock(so); + /* + * Depending on socket type, sblock(&so->so_rcv) or solock() + * is always held while modifying SB_SPLICE and + * so->so_sp->ssp_socket. + */ if (so->so_rcv.sb_flags & SB_SPLICE) { so->so_error = ETIMEDOUT; sounsplice(so, so->so_sp->ssp_socket, 0); } sounlock(so); + sbunlock(&so->so_rcv); } void sotask(void *arg) { struct socket *so = arg; + int doyield = 0; + int sockstream = (so->so_proto->pr_flags & PR_WANTRCVD); + + /* + * sblock() on `so_rcv' protects sockets from beind unspliced + * for UDP case. TCP sockets still rely on solock(). + */ + + sblock(&so->so_rcv, SBL_WAIT | SBL_NOINTR); + if (sockstream) + solock(so); - solock(so); if (so->so_rcv.sb_flags & SB_SPLICE) { - /* - * We may not sleep here as sofree() and unsplice() may be - * called from softnet interrupt context. This would remove - * the socket during somove(). - */ + if (sockstream) + doyield = 1; somove(so, M_DONTWAIT); } - sounlock(so); - /* Avoid user land starvation. */ - yield(); + sbunlock(&so->so_rcv); + if (sockstream) + sounlock(so); + + if (doyield) { + /* Avoid user land starvation. */ + yield(); + } } /* @@ -1546,25 +1640,30 @@ somove(struct socket *so, int wait) struct mbuf *m, **mp, *nextrecord; u_long len, off, oobmark; long space; - int error = 0, maxreached = 0; + int error = 0, maxreached = 0, unsplice = 0; unsigned int rcvstate; + int sockdgram = ((so->so_proto->pr_flags & + PR_WANTRCVD) == 0); - soassertlocked(so); + if (sockdgram) + sbassertlocked(&so->so_rcv); + else + soassertlocked(so); nextpkt: - if (so->so_error) { - error = so->so_error; + mtx_enter(&so->so_rcv.sb_mtx); + mtx_enter(&sosp->so_snd.sb_mtx); + + if ((error = READ_ONCE(so->so_error))) goto release; - } if (sosp->so_snd.sb_state & SS_CANTSENDMORE) { error = EPIPE; goto release; } - if (sosp->so_error && sosp->so_error != ETIMEDOUT && - sosp->so_error != EFBIG && sosp->so_error != ELOOP) { - error = sosp->so_error; + + error = READ_ONCE(sosp->so_error); + if (error && error != ETIMEDOUT && error != EFBIG && error != ELOOP) goto release; - } if ((sosp->so_state & SS_ISCONNECTED) == 0) goto release; @@ -1577,26 +1676,22 @@ somove(struct socket *so, int wait) maxreached = 1; } } - mtx_enter(&sosp->so_snd.sb_mtx); space = sbspace_locked(sosp, &sosp->so_snd); if (so->so_oobmark && so->so_oobmark < len && so->so_oobmark < space + 1024) space += 1024; if (space <= 0) { - mtx_leave(&sosp->so_snd.sb_mtx); maxreached = 0; goto release; } if (space < len) { maxreached = 0; if (space < sosp->so_snd.sb_lowat) { - mtx_leave(&sosp->so_snd.sb_mtx); goto release; } len = space; } sosp->so_snd.sb_state |= SS_ISSENDING; - mtx_leave(&sosp->so_snd.sb_mtx); SBLASTRECORDCHK(&so->so_rcv, "somove 1"); SBLASTMBUFCHK(&so->so_rcv, "somove 1"); @@ -1617,7 +1712,9 @@ somove(struct socket *so, int wait) while (m && m->m_type == MT_CONTROL) m = m->m_next; if (m == NULL) { + mtx_leave(&sosp->so_snd.sb_mtx); sbdroprecord(so, &so->so_rcv); + mtx_leave(&so->so_rcv.sb_mtx); if (so->so_proto->pr_flags & PR_WANTRCVD) pru_rcvd(so); goto nextpkt; @@ -1724,11 +1821,15 @@ somove(struct socket *so, int wait) } /* Send window update to source peer as receive buffer has changed. */ - if (so->so_proto->pr_flags & PR_WANTRCVD) + if (so->so_proto->pr_flags & PR_WANTRCVD) { + mtx_leave(&sosp->so_snd.sb_mtx); + mtx_leave(&so->so_rcv.sb_mtx); pru_rcvd(so); + mtx_enter(&so->so_rcv.sb_mtx); + mtx_enter(&sosp->so_snd.sb_mtx); + } /* Receive buffer did shrink by len bytes, adjust oob. */ - mtx_enter(&so->so_rcv.sb_mtx); rcvstate = so->so_rcv.sb_state; so->so_rcv.sb_state &= ~SS_RCVATMARK; oobmark = so->so_oobmark; @@ -1739,7 +1840,6 @@ somove(struct socket *so, int wait) if (oobmark >= len) oobmark = 0; } - mtx_leave(&so->so_rcv.sb_mtx); /* * Handle oob data. If any malloc fails, ignore error. @@ -1755,7 +1855,12 @@ somove(struct socket *so, int wait) } else if (oobmark) { o = m_split(m, oobmark, wait); if (o) { + mtx_leave(&sosp->so_snd.sb_mtx); + mtx_leave(&so->so_rcv.sb_mtx); error = pru_send(sosp, m, NULL, NULL); + mtx_enter(&so->so_rcv.sb_mtx); + mtx_enter(&sosp->so_snd.sb_mtx); + if (error) { if (sosp->so_snd.sb_state & SS_CANTSENDMORE) @@ -1773,7 +1878,13 @@ somove(struct socket *so, int wait) if (o) { o->m_len = 1; *mtod(o, caddr_t) = *mtod(m, caddr_t); + + mtx_leave(&sosp->so_snd.sb_mtx); + mtx_leave(&so->so_rcv.sb_mtx); error = pru_sendoob(sosp, o, NULL, NULL); + mtx_enter(&so->so_rcv.sb_mtx); + mtx_enter(&sosp->so_snd.sb_mtx); + if (error) { if (sosp->so_snd.sb_state & SS_CANTSENDMORE) error = EPIPE; @@ -1791,45 +1902,67 @@ somove(struct socket *so, int wait) } } - mtx_enter(&sosp->so_snd.sb_mtx); /* Append all remaining data to drain socket. */ if (so->so_rcv.sb_cc == 0 || maxreached) sosp->so_snd.sb_state &= ~SS_ISSENDING; + mtx_leave(&sosp->so_snd.sb_mtx); + mtx_leave(&so->so_rcv.sb_mtx); + if (sockdgram) + solock_shared(sosp); error = pru_send(sosp, m, NULL, NULL); + if (sockdgram) + sounlock_shared(sosp); + + mtx_enter(&so->so_rcv.sb_mtx); + mtx_enter(&sosp->so_snd.sb_mtx); + if (error) { - if (sosp->so_snd.sb_state & SS_CANTSENDMORE) + if (sosp->so_snd.sb_state & SS_CANTSENDMORE || + sosp->so_pcb == NULL) error = EPIPE; goto release; } so->so_splicelen += len; /* Move several packets if possible. */ - if (!maxreached && nextrecord) + if (!maxreached && nextrecord) { + mtx_leave(&sosp->so_snd.sb_mtx); + mtx_leave(&so->so_rcv.sb_mtx); goto nextpkt; + } release: - mtx_enter(&sosp->so_snd.sb_mtx); sosp->so_snd.sb_state &= ~SS_ISSENDING; - mtx_leave(&sosp->so_snd.sb_mtx); if (!error && maxreached && so->so_splicemax == so->so_splicelen) error = EFBIG; if (error) - so->so_error = error; + WRITE_ONCE(so->so_error, error); + if (((so->so_rcv.sb_state & SS_CANTRCVMORE) && so->so_rcv.sb_cc == 0) || (sosp->so_snd.sb_state & SS_CANTSENDMORE) || - maxreached || error) { + maxreached || error) + unsplice = 1; + + mtx_leave(&sosp->so_snd.sb_mtx); + mtx_leave(&so->so_rcv.sb_mtx); + + if (unsplice) { + if (sockdgram) + solock(so); sounsplice(so, sosp, 0); + if (sockdgram) + sounlock(so); + return (0); } if (timerisset(&so->so_idletv)) timeout_add_tv(&so->so_idleto, &so->so_idletv); return (1); } - #endif /* SOCKET_SPLICE */ void @@ -1839,22 +1972,16 @@ sorwakeup(struct socket *so) soassertlocked_readonly(so); #ifdef SOCKET_SPLICE - if (so->so_rcv.sb_flags & SB_SPLICE) { - /* - * TCP has a sendbuffer that can handle multiple packets - * at once. So queue the stream a bit to accumulate data. - * The sosplice thread will call somove() later and send - * the packets calling tcp_output() only once. - * In the UDP case, send out the packets immediately. - * Using a thread would make things slower. - */ - if (so->so_proto->pr_flags & PR_WANTRCVD) + if (so->so_proto->pr_flags & PR_SPLICE) { + sb_mtx_lock(&so->so_rcv); + if (so->so_rcv.sb_flags & SB_SPLICE) task_add(sosplice_taskq, &so->so_splicetask); - else - somove(so, M_DONTWAIT); + if (isspliced(so)) { + sb_mtx_unlock(&so->so_rcv); + return; + } + sb_mtx_unlock(&so->so_rcv); } - if (isspliced(so)) - return; #endif sowakeup(so, &so->so_rcv); if (so->so_upcall) @@ -1868,10 +1995,17 @@ sowwakeup(struct socket *so) soassertlocked_readonly(so); #ifdef SOCKET_SPLICE - if (so->so_snd.sb_flags & SB_SPLICE) - task_add(sosplice_taskq, &so->so_sp->ssp_soback->so_splicetask); - if (issplicedback(so)) - return; + if (so->so_proto->pr_flags & PR_SPLICE) { + sb_mtx_lock(&so->so_snd); + if (so->so_snd.sb_flags & SB_SPLICE) + task_add(sosplice_taskq, + &so->so_sp->ssp_soback->so_splicetask); + if (issplicedback(so)) { + sb_mtx_unlock(&so->so_snd); + return; + } + sb_mtx_unlock(&so->so_snd); + } #endif sowakeup(so, &so->so_snd); } Index: sys/netinet/udp_usrreq.c =================================================================== RCS file: /cvs/src/sys/netinet/udp_usrreq.c,v diff -u -p -r1.321 udp_usrreq.c --- sys/netinet/udp_usrreq.c 12 Jul 2024 19:50:35 -0000 1.321 +++ sys/netinet/udp_usrreq.c 19 Jul 2024 16:18:02 -0000 @@ -1209,6 +1209,11 @@ udp_send(struct socket *so, struct mbuf soassertlocked(so); + if (inp == NULL) { + /* PCB could be destroyed, but socket still spliced. */ + return (EINVAL); + } + #ifdef PIPEX if (inp->inp_pipex) { struct pipex_session *session; Index: sys/sys/socketvar.h =================================================================== RCS file: /cvs/src/sys/sys/socketvar.h,v diff -u -p -r1.132 socketvar.h --- sys/sys/socketvar.h 12 Jul 2024 17:20:18 -0000 1.132 +++ sys/sys/socketvar.h 19 Jul 2024 16:18:02 -0000 @@ -330,6 +330,12 @@ int sblock(struct sockbuf *, int); /* release lock on sockbuf sb */ void sbunlock(struct sockbuf *); +static inline void +sbassertlocked(struct sockbuf *sb) +{ + rw_assert_wrlock(&sb->sb_lock); +} + #define SB_EMPTY_FIXUP(sb) do { \ if ((sb)->sb_mb == NULL) { \ (sb)->sb_mbtail = NULL; \