From: Alexander Bluhm Subject: Re: Unlock udp(4) somove() To: Vitaliy Makkoveev Cc: tech@openbsd.org Date: Sat, 20 Jul 2024 18:41:51 +0200 On Sat, Jul 20, 2024 at 06:35:23PM +0300, Vitaliy Makkoveev wrote: > I fixed typos and white spaces. Updated diff also contains locks > description for 'sosplice' structure, this is not functional change. You attached the diff twice in this mail. But both are identical. I cannot test right now as my machines are busy with other tests. There is no functional change to the version I have tested, so OK bluhm@ I would suggest to - you commit somove diff - wait a bit and watch for fallout - I commit parallel udp input - live with that for a while - try to call UDP somove without task from sorwakeup Currently we queue before udp_input. Parallel udp input moves the queueing before somove. This is an improvement for performance in splicing and non-splciing case. Calling somove directly can be done later. bluhm > Index: sys/kern/uipc_socket.c > =================================================================== > RCS file: /cvs/src/sys/kern/uipc_socket.c,v > diff -u -p -r1.338 uipc_socket.c > --- sys/kern/uipc_socket.c 14 Jul 2024 15:42:23 -0000 1.338 > +++ sys/kern/uipc_socket.c 20 Jul 2024 15:32:12 -0000 > @@ -324,31 +324,22 @@ sofree(struct socket *so, int keep_lock) > sounlock(head); > } > > - if (persocket) { > + switch (so->so_proto->pr_domain->dom_family) { > + case AF_INET: > + case AF_INET6: > + if (so->so_proto->pr_type == SOCK_STREAM) > + break; > + /* FALLTHROUGH */ > + default: > sounlock(so); > refcnt_finalize(&so->so_refcnt, "sofinal"); > solock(so); > + break; > } > > sigio_free(&so->so_sigio); > klist_free(&so->so_rcv.sb_klist); > klist_free(&so->so_snd.sb_klist); > -#ifdef SOCKET_SPLICE > - if (issplicedback(so)) { > - int freeing = SOSP_FREEING_WRITE; > - > - if (so->so_sp->ssp_soback == so) > - freeing |= SOSP_FREEING_READ; > - sounsplice(so->so_sp->ssp_soback, so, freeing); > - } > - if (isspliced(so)) { > - int freeing = SOSP_FREEING_READ; > - > - if (so == so->so_sp->ssp_socket) > - freeing |= SOSP_FREEING_WRITE; > - sounsplice(so, so->so_sp->ssp_socket, freeing); > - } > -#endif /* SOCKET_SPLICE */ > > mtx_enter(&so->so_snd.sb_mtx); > sbrelease(so, &so->so_snd); > @@ -458,6 +449,85 @@ discard: > if (so->so_state & SS_NOFDREF) > panic("soclose NOFDREF: so %p, so_type %d", so, so->so_type); > so->so_state |= SS_NOFDREF; > + > +#ifdef SOCKET_SPLICE > + if (so->so_sp) { > + struct socket *soback; > + > + if (so->so_proto->pr_flags & PR_WANTRCVD) { > + /* > + * Copy - Paste, but can't relock and sleep in > + * sofree() in tcp(4) case. That's why tcp(4) > + * still rely on solock() for splicing and > + * unsplicing. > + */ > + > + if (issplicedback(so)) { > + int freeing = SOSP_FREEING_WRITE; > + > + if (so->so_sp->ssp_soback == so) > + freeing |= SOSP_FREEING_READ; > + sounsplice(so->so_sp->ssp_soback, so, freeing); > + } > + if (isspliced(so)) { > + int freeing = SOSP_FREEING_READ; > + > + if (so == so->so_sp->ssp_socket) > + freeing |= SOSP_FREEING_WRITE; > + sounsplice(so, so->so_sp->ssp_socket, freeing); > + } > + goto free; > + } > + > + sounlock(so); > + mtx_enter(&so->so_snd.sb_mtx); > + /* > + * Concurrent sounsplice() locks `sb_mtx' mutexes on > + * both `so_snd' and `so_rcv' before unsplice sockets. > + */ > + if ((soback = so->so_sp->ssp_soback) == NULL) { > + mtx_leave(&so->so_snd.sb_mtx); > + goto notsplicedback; > + } > + soref(soback); > + mtx_leave(&so->so_snd.sb_mtx); > + > + /* > + * `so' can be only unspliced, and never spliced again. > + * Thus if issplicedback(so) check is positive, socket is > + * still spliced and `ssp_soback' points to the same > + * socket that `soback'. > + */ > + sblock(&soback->so_rcv, SBL_WAIT | SBL_NOINTR); > + if (issplicedback(so)) { > + int freeing = SOSP_FREEING_WRITE; > + > + if (so->so_sp->ssp_soback == so) > + freeing |= SOSP_FREEING_READ; > + solock(soback); > + sounsplice(so->so_sp->ssp_soback, so, freeing); > + sounlock(soback); > + } > + sbunlock(&soback->so_rcv); > + sorele(soback); > + > +notsplicedback: > + sblock(&so->so_rcv, SBL_WAIT | SBL_NOINTR); > + if (isspliced(so)) { > + int freeing = SOSP_FREEING_READ; > + > + if (so == so->so_sp->ssp_socket) > + freeing |= SOSP_FREEING_WRITE; > + solock(so); > + sounsplice(so, so->so_sp->ssp_socket, freeing); > + sounlock(so); > + } > + sbunlock(&so->so_rcv); > + > + solock(so); > + } > +free: > +#endif /* SOCKET_SPLICE */ > /* sofree() calls sounlock(). */ > sofree(so, 0); > return (error); > @@ -1411,14 +1481,6 @@ sosplice(struct socket *so, int fd, off_ > goto release; > } > > - /* Splice so and sosp together. */ > - mtx_enter(&so->so_rcv.sb_mtx); > - mtx_enter(&sosp->so_snd.sb_mtx); > - so->so_sp->ssp_socket = sosp; > - sosp->so_sp->ssp_soback = so; > - mtx_leave(&sosp->so_snd.sb_mtx); > - mtx_leave(&so->so_rcv.sb_mtx); > - > so->so_splicelen = 0; > so->so_splicemax = max; > if (tv) > @@ -1429,9 +1491,20 @@ sosplice(struct socket *so, int fd, off_ > task_set(&so->so_splicetask, sotask, so); > > /* > - * To prevent softnet interrupt from calling somove() while > - * we sleep, the socket buffers are not marked as spliced yet. > + * To prevent sorwakeup() calling somove() before this somove() > + * has finished, the socket buffers are not marked as spliced yet. > */ > + > + /* Splice so and sosp together. */ > + mtx_enter(&so->so_rcv.sb_mtx); > + mtx_enter(&sosp->so_snd.sb_mtx); > + so->so_sp->ssp_socket = sosp; > + sosp->so_sp->ssp_soback = so; > + mtx_leave(&sosp->so_snd.sb_mtx); > + mtx_leave(&so->so_rcv.sb_mtx); > + > + if ((so->so_proto->pr_flags & PR_WANTRCVD) == 0) > + sounlock(so); > if (somove(so, M_WAIT)) { > mtx_enter(&so->so_rcv.sb_mtx); > mtx_enter(&sosp->so_snd.sb_mtx); > @@ -1440,6 +1513,8 @@ sosplice(struct socket *so, int fd, off_ > mtx_leave(&sosp->so_snd.sb_mtx); > mtx_leave(&so->so_rcv.sb_mtx); > } > + if ((so->so_proto->pr_flags & PR_WANTRCVD) == 0) > + solock(so); > > release: > sounlock(so); > @@ -1454,6 +1529,8 @@ sosplice(struct socket *so, int fd, off_ > void > sounsplice(struct socket *so, struct socket *sosp, int freeing) > { > + if ((so->so_proto->pr_flags & PR_WANTRCVD) == 0) > + sbassertlocked(&so->so_rcv); > soassertlocked(so); > > task_del(sosplice_taskq, &so->so_splicetask); > @@ -1479,32 +1556,51 @@ soidle(void *arg) > { > struct socket *so = arg; > > + sblock(&so->so_rcv, SBL_WAIT | SBL_NOINTR); > solock(so); > + /* > + * Depending on socket type, sblock(&so->so_rcv) or solock() > + * is always held while modifying SB_SPLICE and > + * so->so_sp->ssp_socket. > + */ > if (so->so_rcv.sb_flags & SB_SPLICE) { > so->so_error = ETIMEDOUT; > sounsplice(so, so->so_sp->ssp_socket, 0); > } > sounlock(so); > + sbunlock(&so->so_rcv); > } > > void > sotask(void *arg) > { > struct socket *so = arg; > + int doyield = 0; > + int sockstream = (so->so_proto->pr_flags & PR_WANTRCVD); > + > + /* > + * sblock() on `so_rcv' protects sockets from beind unspliced > + * for UDP case. TCP sockets still rely on solock(). > + */ > + > + sblock(&so->so_rcv, SBL_WAIT | SBL_NOINTR); > + if (sockstream) > + solock(so); > > - solock(so); > if (so->so_rcv.sb_flags & SB_SPLICE) { > - /* > - * We may not sleep here as sofree() and unsplice() may be > - * called from softnet interrupt context. This would remove > - * the socket during somove(). > - */ > + if (sockstream) > + doyield = 1; > somove(so, M_DONTWAIT); > } > - sounlock(so); > > - /* Avoid user land starvation. */ > - yield(); > + if (sockstream) > + sounlock(so); > + sbunlock(&so->so_rcv); > + > + if (doyield) { > + /* Avoid user land starvation. */ > + yield(); > + } > } > > /* > @@ -1546,24 +1642,32 @@ somove(struct socket *so, int wait) > struct mbuf *m, **mp, *nextrecord; > u_long len, off, oobmark; > long space; > - int error = 0, maxreached = 0; > + int error = 0, maxreached = 0, unsplice = 0; > unsigned int rcvstate; > + int sockdgram = ((so->so_proto->pr_flags & > + PR_WANTRCVD) == 0); > > - soassertlocked(so); > + if (sockdgram) > + sbassertlocked(&so->so_rcv); > + else > + soassertlocked(so); > + > + mtx_enter(&so->so_rcv.sb_mtx); > + mtx_enter(&sosp->so_snd.sb_mtx); > > nextpkt: > - if (so->so_error) { > - error = so->so_error; > + if ((error = READ_ONCE(so->so_error))) > goto release; > - } > if (sosp->so_snd.sb_state & SS_CANTSENDMORE) { > error = EPIPE; > goto release; > } > - if (sosp->so_error && sosp->so_error != ETIMEDOUT && > - sosp->so_error != EFBIG && sosp->so_error != ELOOP) { > - error = sosp->so_error; > - goto release; > + > + error = READ_ONCE(sosp->so_error); > + if (error) { > + if (error != ETIMEDOUT && error != EFBIG && error != ELOOP) > + goto release; > + error = 0; > } > if ((sosp->so_state & SS_ISCONNECTED) == 0) > goto release; > @@ -1577,26 +1681,21 @@ somove(struct socket *so, int wait) > maxreached = 1; > } > } > - mtx_enter(&sosp->so_snd.sb_mtx); > space = sbspace_locked(sosp, &sosp->so_snd); > if (so->so_oobmark && so->so_oobmark < len && > so->so_oobmark < space + 1024) > space += 1024; > if (space <= 0) { > - mtx_leave(&sosp->so_snd.sb_mtx); > maxreached = 0; > goto release; > } > if (space < len) { > maxreached = 0; > - if (space < sosp->so_snd.sb_lowat) { > - mtx_leave(&sosp->so_snd.sb_mtx); > + if (space < sosp->so_snd.sb_lowat) > goto release; > - } > len = space; > } > sosp->so_snd.sb_state |= SS_ISSENDING; > - mtx_leave(&sosp->so_snd.sb_mtx); > > SBLASTRECORDCHK(&so->so_rcv, "somove 1"); > SBLASTMBUFCHK(&so->so_rcv, "somove 1"); > @@ -1618,8 +1717,13 @@ somove(struct socket *so, int wait) > m = m->m_next; > if (m == NULL) { > sbdroprecord(so, &so->so_rcv); > - if (so->so_proto->pr_flags & PR_WANTRCVD) > + if (so->so_proto->pr_flags & PR_WANTRCVD) { > + mtx_leave(&sosp->so_snd.sb_mtx); > + mtx_leave(&so->so_rcv.sb_mtx); > pru_rcvd(so); > + mtx_enter(&so->so_rcv.sb_mtx); > + mtx_enter(&sosp->so_snd.sb_mtx); > + } > goto nextpkt; > } > > @@ -1724,11 +1828,15 @@ somove(struct socket *so, int wait) > } > > /* Send window update to source peer as receive buffer has changed. */ > - if (so->so_proto->pr_flags & PR_WANTRCVD) > + if (so->so_proto->pr_flags & PR_WANTRCVD) { > + mtx_leave(&sosp->so_snd.sb_mtx); > + mtx_leave(&so->so_rcv.sb_mtx); > pru_rcvd(so); > + mtx_enter(&so->so_rcv.sb_mtx); > + mtx_enter(&sosp->so_snd.sb_mtx); > + } > > /* Receive buffer did shrink by len bytes, adjust oob. */ > - mtx_enter(&so->so_rcv.sb_mtx); > rcvstate = so->so_rcv.sb_state; > so->so_rcv.sb_state &= ~SS_RCVATMARK; > oobmark = so->so_oobmark; > @@ -1739,7 +1847,6 @@ somove(struct socket *so, int wait) > if (oobmark >= len) > oobmark = 0; > } > - mtx_leave(&so->so_rcv.sb_mtx); > > /* > * Handle oob data. If any malloc fails, ignore error. > @@ -1755,7 +1862,12 @@ somove(struct socket *so, int wait) > } else if (oobmark) { > o = m_split(m, oobmark, wait); > if (o) { > + mtx_leave(&sosp->so_snd.sb_mtx); > + mtx_leave(&so->so_rcv.sb_mtx); > error = pru_send(sosp, m, NULL, NULL); > + mtx_enter(&so->so_rcv.sb_mtx); > + mtx_enter(&sosp->so_snd.sb_mtx); > + > if (error) { > if (sosp->so_snd.sb_state & > SS_CANTSENDMORE) > @@ -1773,7 +1885,13 @@ somove(struct socket *so, int wait) > if (o) { > o->m_len = 1; > *mtod(o, caddr_t) = *mtod(m, caddr_t); > + > + mtx_leave(&sosp->so_snd.sb_mtx); > + mtx_leave(&so->so_rcv.sb_mtx); > error = pru_sendoob(sosp, o, NULL, NULL); > + mtx_enter(&so->so_rcv.sb_mtx); > + mtx_enter(&sosp->so_snd.sb_mtx); > + > if (error) { > if (sosp->so_snd.sb_state & SS_CANTSENDMORE) > error = EPIPE; > @@ -1791,15 +1909,25 @@ somove(struct socket *so, int wait) > } > } > > - mtx_enter(&sosp->so_snd.sb_mtx); > /* Append all remaining data to drain socket. */ > if (so->so_rcv.sb_cc == 0 || maxreached) > sosp->so_snd.sb_state &= ~SS_ISSENDING; > + > mtx_leave(&sosp->so_snd.sb_mtx); > + mtx_leave(&so->so_rcv.sb_mtx); > > + if (sockdgram) > + solock_shared(sosp); > error = pru_send(sosp, m, NULL, NULL); > + if (sockdgram) > + sounlock_shared(sosp); > + > + mtx_enter(&so->so_rcv.sb_mtx); > + mtx_enter(&sosp->so_snd.sb_mtx); > + > if (error) { > - if (sosp->so_snd.sb_state & SS_CANTSENDMORE) > + if (sosp->so_snd.sb_state & SS_CANTSENDMORE || > + so->so_pcb == NULL) > error = EPIPE; > goto release; > } > @@ -1810,26 +1938,35 @@ somove(struct socket *so, int wait) > goto nextpkt; > > release: > - mtx_enter(&sosp->so_snd.sb_mtx); > sosp->so_snd.sb_state &= ~SS_ISSENDING; > - mtx_leave(&sosp->so_snd.sb_mtx); > > if (!error && maxreached && so->so_splicemax == so->so_splicelen) > error = EFBIG; > if (error) > - so->so_error = error; > + WRITE_ONCE(so->so_error, error); > + > if (((so->so_rcv.sb_state & SS_CANTRCVMORE) && > so->so_rcv.sb_cc == 0) || > (sosp->so_snd.sb_state & SS_CANTSENDMORE) || > - maxreached || error) { > + maxreached || error) > + unsplice = 1; > + > + mtx_leave(&sosp->so_snd.sb_mtx); > + mtx_leave(&so->so_rcv.sb_mtx); > + > + if (unsplice) { > + if (sockdgram) > + solock(so); > sounsplice(so, sosp, 0); > + if (sockdgram) > + sounlock(so); > + > return (0); > } > if (timerisset(&so->so_idletv)) > timeout_add_tv(&so->so_idleto, &so->so_idletv); > return (1); > } > - > #endif /* SOCKET_SPLICE */ > > void > @@ -1839,22 +1976,16 @@ sorwakeup(struct socket *so) > soassertlocked_readonly(so); > > #ifdef SOCKET_SPLICE > - if (so->so_rcv.sb_flags & SB_SPLICE) { > - /* > - * TCP has a sendbuffer that can handle multiple packets > - * at once. So queue the stream a bit to accumulate data. > - * The sosplice thread will call somove() later and send > - * the packets calling tcp_output() only once. > - * In the UDP case, send out the packets immediately. > - * Using a thread would make things slower. > - */ > - if (so->so_proto->pr_flags & PR_WANTRCVD) > + if (so->so_proto->pr_flags & PR_SPLICE) { > + sb_mtx_lock(&so->so_rcv); > + if (so->so_rcv.sb_flags & SB_SPLICE) > task_add(sosplice_taskq, &so->so_splicetask); > - else > - somove(so, M_DONTWAIT); > + if (isspliced(so)) { > + sb_mtx_unlock(&so->so_rcv); > + return; > + } > + sb_mtx_unlock(&so->so_rcv); > } > - if (isspliced(so)) > - return; > #endif > sowakeup(so, &so->so_rcv); > if (so->so_upcall) > @@ -1868,10 +1999,17 @@ sowwakeup(struct socket *so) > soassertlocked_readonly(so); > > #ifdef SOCKET_SPLICE > - if (so->so_snd.sb_flags & SB_SPLICE) > - task_add(sosplice_taskq, &so->so_sp->ssp_soback->so_splicetask); > - if (issplicedback(so)) > - return; > + if (so->so_proto->pr_flags & PR_SPLICE) { > + sb_mtx_lock(&so->so_snd); > + if (so->so_snd.sb_flags & SB_SPLICE) > + task_add(sosplice_taskq, > + &so->so_sp->ssp_soback->so_splicetask); > + if (issplicedback(so)) { > + sb_mtx_unlock(&so->so_snd); > + return; > + } > + sb_mtx_unlock(&so->so_snd); > + } > #endif > sowakeup(so, &so->so_snd); > } > Index: sys/netinet/udp_usrreq.c > =================================================================== > RCS file: /cvs/src/sys/netinet/udp_usrreq.c,v > diff -u -p -r1.322 udp_usrreq.c > --- sys/netinet/udp_usrreq.c 19 Jul 2024 15:41:58 -0000 1.322 > +++ sys/netinet/udp_usrreq.c 20 Jul 2024 15:32:12 -0000 > @@ -1209,6 +1209,11 @@ udp_send(struct socket *so, struct mbuf > > soassertlocked_readonly(so); > > + if (inp == NULL) { > + /* PCB could be destroyed, but socket still spliced. */ > + return (EINVAL); > + } > + > #ifdef PIPEX > if (inp->inp_pipex) { > struct pipex_session *session; > Index: sys/sys/socketvar.h > =================================================================== > RCS file: /cvs/src/sys/sys/socketvar.h,v > diff -u -p -r1.132 socketvar.h > --- sys/sys/socketvar.h 12 Jul 2024 17:20:18 -0000 1.132 > +++ sys/sys/socketvar.h 20 Jul 2024 15:32:12 -0000 > @@ -52,6 +52,33 @@ typedef __socklen_t socklen_t; /* length > TAILQ_HEAD(soqhead, socket); > > /* > + * Locks used to protect global data and struct members: > + * I immutable after creation > + * mr sb_mxt of so_rcv buffer > + * ms sb_mtx of so_snd buffer > + * br sblock() of so_rcv buffer > + * bs sblock() od so_snd buffer > + * s solock() > + */ > + > +/* > + * XXXSMP: tcp(4) sockets rely on exclusive solock() for all the cases. > + */ > + > +/* > + * Variables for socket splicing, allocated only when needed. > + */ > +struct sosplice { > + struct socket *ssp_socket; /* [mr ms] send data to drain socket */ > + struct socket *ssp_soback; /* [ms ms] back ref to source socket */ > + off_t ssp_len; /* [mr] number of bytes spliced */ > + off_t ssp_max; /* [I] maximum number of bytes */ > + struct timeval ssp_idletv; /* [I] idle timeout */ > + struct timeout ssp_idleto; > + struct task ssp_task; /* task for somove */ > +}; > + > +/* > * Kernel structure per socket. > * Contains send and receive buffer queues, > * handle on protocol and pointer to protocol > @@ -89,18 +116,8 @@ struct socket { > short so_timeo; /* connection timeout */ > u_long so_oobmark; /* chars to oob mark */ > u_int so_error; /* error affecting connection */ > -/* > - * Variables for socket splicing, allocated only when needed. > - */ > - struct sosplice { > - struct socket *ssp_socket; /* send data to drain socket */ > - struct socket *ssp_soback; /* back ref to source socket */ > - off_t ssp_len; /* number of bytes spliced */ > - off_t ssp_max; /* maximum number of bytes */ > - struct timeval ssp_idletv; /* idle timeout */ > - struct timeout ssp_idleto; > - struct task ssp_task; /* task for somove */ > - } *so_sp; > + > + struct sosplice *so_sp; /* [s br] */ > /* > * Variables for socket buffering. > */ > @@ -329,6 +346,12 @@ int sblock(struct sockbuf *, int); > > /* release lock on sockbuf sb */ > void sbunlock(struct sockbuf *); > + > +static inline void > +sbassertlocked(struct sockbuf *sb) > +{ > + rw_assert_wrlock(&sb->sb_lock); > +} > > #define SB_EMPTY_FIXUP(sb) do { \ > if ((sb)->sb_mb == NULL) { \ > Index: sys/kern/uipc_socket.c > =================================================================== > RCS file: /cvs/src/sys/kern/uipc_socket.c,v > diff -u -p -r1.338 uipc_socket.c > --- sys/kern/uipc_socket.c 14 Jul 2024 15:42:23 -0000 1.338 > +++ sys/kern/uipc_socket.c 20 Jul 2024 15:32:12 -0000 > @@ -324,31 +324,22 @@ sofree(struct socket *so, int keep_lock) > sounlock(head); > } > > - if (persocket) { > + switch (so->so_proto->pr_domain->dom_family) { > + case AF_INET: > + case AF_INET6: > + if (so->so_proto->pr_type == SOCK_STREAM) > + break; > + /* FALLTHROUGH */ > + default: > sounlock(so); > refcnt_finalize(&so->so_refcnt, "sofinal"); > solock(so); > + break; > } > > sigio_free(&so->so_sigio); > klist_free(&so->so_rcv.sb_klist); > klist_free(&so->so_snd.sb_klist); > -#ifdef SOCKET_SPLICE > - if (issplicedback(so)) { > - int freeing = SOSP_FREEING_WRITE; > - > - if (so->so_sp->ssp_soback == so) > - freeing |= SOSP_FREEING_READ; > - sounsplice(so->so_sp->ssp_soback, so, freeing); > - } > - if (isspliced(so)) { > - int freeing = SOSP_FREEING_READ; > - > - if (so == so->so_sp->ssp_socket) > - freeing |= SOSP_FREEING_WRITE; > - sounsplice(so, so->so_sp->ssp_socket, freeing); > - } > -#endif /* SOCKET_SPLICE */ > > mtx_enter(&so->so_snd.sb_mtx); > sbrelease(so, &so->so_snd); > @@ -458,6 +449,85 @@ discard: > if (so->so_state & SS_NOFDREF) > panic("soclose NOFDREF: so %p, so_type %d", so, so->so_type); > so->so_state |= SS_NOFDREF; > + > +#ifdef SOCKET_SPLICE > + if (so->so_sp) { > + struct socket *soback; > + > + if (so->so_proto->pr_flags & PR_WANTRCVD) { > + /* > + * Copy - Paste, but can't relock and sleep in > + * sofree() in tcp(4) case. That's why tcp(4) > + * still rely on solock() for splicing and > + * unsplicing. > + */ > + > + if (issplicedback(so)) { > + int freeing = SOSP_FREEING_WRITE; > + > + if (so->so_sp->ssp_soback == so) > + freeing |= SOSP_FREEING_READ; > + sounsplice(so->so_sp->ssp_soback, so, freeing); > + } > + if (isspliced(so)) { > + int freeing = SOSP_FREEING_READ; > + > + if (so == so->so_sp->ssp_socket) > + freeing |= SOSP_FREEING_WRITE; > + sounsplice(so, so->so_sp->ssp_socket, freeing); > + } > + goto free; > + } > + > + sounlock(so); > + mtx_enter(&so->so_snd.sb_mtx); > + /* > + * Concurrent sounsplice() locks `sb_mtx' mutexes on > + * both `so_snd' and `so_rcv' before unsplice sockets. > + */ > + if ((soback = so->so_sp->ssp_soback) == NULL) { > + mtx_leave(&so->so_snd.sb_mtx); > + goto notsplicedback; > + } > + soref(soback); > + mtx_leave(&so->so_snd.sb_mtx); > + > + /* > + * `so' can be only unspliced, and never spliced again. > + * Thus if issplicedback(so) check is positive, socket is > + * still spliced and `ssp_soback' points to the same > + * socket that `soback'. > + */ > + sblock(&soback->so_rcv, SBL_WAIT | SBL_NOINTR); > + if (issplicedback(so)) { > + int freeing = SOSP_FREEING_WRITE; > + > + if (so->so_sp->ssp_soback == so) > + freeing |= SOSP_FREEING_READ; > + solock(soback); > + sounsplice(so->so_sp->ssp_soback, so, freeing); > + sounlock(soback); > + } > + sbunlock(&soback->so_rcv); > + sorele(soback); > + > +notsplicedback: > + sblock(&so->so_rcv, SBL_WAIT | SBL_NOINTR); > + if (isspliced(so)) { > + int freeing = SOSP_FREEING_READ; > + > + if (so == so->so_sp->ssp_socket) > + freeing |= SOSP_FREEING_WRITE; > + solock(so); > + sounsplice(so, so->so_sp->ssp_socket, freeing); > + sounlock(so); > + } > + sbunlock(&so->so_rcv); > + > + solock(so); > + } > +free: > +#endif /* SOCKET_SPLICE */ > /* sofree() calls sounlock(). */ > sofree(so, 0); > return (error); > @@ -1411,14 +1481,6 @@ sosplice(struct socket *so, int fd, off_ > goto release; > } > > - /* Splice so and sosp together. */ > - mtx_enter(&so->so_rcv.sb_mtx); > - mtx_enter(&sosp->so_snd.sb_mtx); > - so->so_sp->ssp_socket = sosp; > - sosp->so_sp->ssp_soback = so; > - mtx_leave(&sosp->so_snd.sb_mtx); > - mtx_leave(&so->so_rcv.sb_mtx); > - > so->so_splicelen = 0; > so->so_splicemax = max; > if (tv) > @@ -1429,9 +1491,20 @@ sosplice(struct socket *so, int fd, off_ > task_set(&so->so_splicetask, sotask, so); > > /* > - * To prevent softnet interrupt from calling somove() while > - * we sleep, the socket buffers are not marked as spliced yet. > + * To prevent sorwakeup() calling somove() before this somove() > + * has finished, the socket buffers are not marked as spliced yet. > */ > + > + /* Splice so and sosp together. */ > + mtx_enter(&so->so_rcv.sb_mtx); > + mtx_enter(&sosp->so_snd.sb_mtx); > + so->so_sp->ssp_socket = sosp; > + sosp->so_sp->ssp_soback = so; > + mtx_leave(&sosp->so_snd.sb_mtx); > + mtx_leave(&so->so_rcv.sb_mtx); > + > + if ((so->so_proto->pr_flags & PR_WANTRCVD) == 0) > + sounlock(so); > if (somove(so, M_WAIT)) { > mtx_enter(&so->so_rcv.sb_mtx); > mtx_enter(&sosp->so_snd.sb_mtx); > @@ -1440,6 +1513,8 @@ sosplice(struct socket *so, int fd, off_ > mtx_leave(&sosp->so_snd.sb_mtx); > mtx_leave(&so->so_rcv.sb_mtx); > } > + if ((so->so_proto->pr_flags & PR_WANTRCVD) == 0) > + solock(so); > > release: > sounlock(so); > @@ -1454,6 +1529,8 @@ sosplice(struct socket *so, int fd, off_ > void > sounsplice(struct socket *so, struct socket *sosp, int freeing) > { > + if ((so->so_proto->pr_flags & PR_WANTRCVD) == 0) > + sbassertlocked(&so->so_rcv); > soassertlocked(so); > > task_del(sosplice_taskq, &so->so_splicetask); > @@ -1479,32 +1556,51 @@ soidle(void *arg) > { > struct socket *so = arg; > > + sblock(&so->so_rcv, SBL_WAIT | SBL_NOINTR); > solock(so); > + /* > + * Depending on socket type, sblock(&so->so_rcv) or solock() > + * is always held while modifying SB_SPLICE and > + * so->so_sp->ssp_socket. > + */ > if (so->so_rcv.sb_flags & SB_SPLICE) { > so->so_error = ETIMEDOUT; > sounsplice(so, so->so_sp->ssp_socket, 0); > } > sounlock(so); > + sbunlock(&so->so_rcv); > } > > void > sotask(void *arg) > { > struct socket *so = arg; > + int doyield = 0; > + int sockstream = (so->so_proto->pr_flags & PR_WANTRCVD); > + > + /* > + * sblock() on `so_rcv' protects sockets from beind unspliced > + * for UDP case. TCP sockets still rely on solock(). > + */ > + > + sblock(&so->so_rcv, SBL_WAIT | SBL_NOINTR); > + if (sockstream) > + solock(so); > > - solock(so); > if (so->so_rcv.sb_flags & SB_SPLICE) { > - /* > - * We may not sleep here as sofree() and unsplice() may be > - * called from softnet interrupt context. This would remove > - * the socket during somove(). > - */ > + if (sockstream) > + doyield = 1; > somove(so, M_DONTWAIT); > } > - sounlock(so); > > - /* Avoid user land starvation. */ > - yield(); > + if (sockstream) > + sounlock(so); > + sbunlock(&so->so_rcv); > + > + if (doyield) { > + /* Avoid user land starvation. */ > + yield(); > + } > } > > /* > @@ -1546,24 +1642,32 @@ somove(struct socket *so, int wait) > struct mbuf *m, **mp, *nextrecord; > u_long len, off, oobmark; > long space; > - int error = 0, maxreached = 0; > + int error = 0, maxreached = 0, unsplice = 0; > unsigned int rcvstate; > + int sockdgram = ((so->so_proto->pr_flags & > + PR_WANTRCVD) == 0); > > - soassertlocked(so); > + if (sockdgram) > + sbassertlocked(&so->so_rcv); > + else > + soassertlocked(so); > + > + mtx_enter(&so->so_rcv.sb_mtx); > + mtx_enter(&sosp->so_snd.sb_mtx); > > nextpkt: > - if (so->so_error) { > - error = so->so_error; > + if ((error = READ_ONCE(so->so_error))) > goto release; > - } > if (sosp->so_snd.sb_state & SS_CANTSENDMORE) { > error = EPIPE; > goto release; > } > - if (sosp->so_error && sosp->so_error != ETIMEDOUT && > - sosp->so_error != EFBIG && sosp->so_error != ELOOP) { > - error = sosp->so_error; > - goto release; > + > + error = READ_ONCE(sosp->so_error); > + if (error) { > + if (error != ETIMEDOUT && error != EFBIG && error != ELOOP) > + goto release; > + error = 0; > } > if ((sosp->so_state & SS_ISCONNECTED) == 0) > goto release; > @@ -1577,26 +1681,21 @@ somove(struct socket *so, int wait) > maxreached = 1; > } > } > - mtx_enter(&sosp->so_snd.sb_mtx); > space = sbspace_locked(sosp, &sosp->so_snd); > if (so->so_oobmark && so->so_oobmark < len && > so->so_oobmark < space + 1024) > space += 1024; > if (space <= 0) { > - mtx_leave(&sosp->so_snd.sb_mtx); > maxreached = 0; > goto release; > } > if (space < len) { > maxreached = 0; > - if (space < sosp->so_snd.sb_lowat) { > - mtx_leave(&sosp->so_snd.sb_mtx); > + if (space < sosp->so_snd.sb_lowat) > goto release; > - } > len = space; > } > sosp->so_snd.sb_state |= SS_ISSENDING; > - mtx_leave(&sosp->so_snd.sb_mtx); > > SBLASTRECORDCHK(&so->so_rcv, "somove 1"); > SBLASTMBUFCHK(&so->so_rcv, "somove 1"); > @@ -1618,8 +1717,13 @@ somove(struct socket *so, int wait) > m = m->m_next; > if (m == NULL) { > sbdroprecord(so, &so->so_rcv); > - if (so->so_proto->pr_flags & PR_WANTRCVD) > + if (so->so_proto->pr_flags & PR_WANTRCVD) { > + mtx_leave(&sosp->so_snd.sb_mtx); > + mtx_leave(&so->so_rcv.sb_mtx); > pru_rcvd(so); > + mtx_enter(&so->so_rcv.sb_mtx); > + mtx_enter(&sosp->so_snd.sb_mtx); > + } > goto nextpkt; > } > > @@ -1724,11 +1828,15 @@ somove(struct socket *so, int wait) > } > > /* Send window update to source peer as receive buffer has changed. */ > - if (so->so_proto->pr_flags & PR_WANTRCVD) > + if (so->so_proto->pr_flags & PR_WANTRCVD) { > + mtx_leave(&sosp->so_snd.sb_mtx); > + mtx_leave(&so->so_rcv.sb_mtx); > pru_rcvd(so); > + mtx_enter(&so->so_rcv.sb_mtx); > + mtx_enter(&sosp->so_snd.sb_mtx); > + } > > /* Receive buffer did shrink by len bytes, adjust oob. */ > - mtx_enter(&so->so_rcv.sb_mtx); > rcvstate = so->so_rcv.sb_state; > so->so_rcv.sb_state &= ~SS_RCVATMARK; > oobmark = so->so_oobmark; > @@ -1739,7 +1847,6 @@ somove(struct socket *so, int wait) > if (oobmark >= len) > oobmark = 0; > } > - mtx_leave(&so->so_rcv.sb_mtx); > > /* > * Handle oob data. If any malloc fails, ignore error. > @@ -1755,7 +1862,12 @@ somove(struct socket *so, int wait) > } else if (oobmark) { > o = m_split(m, oobmark, wait); > if (o) { > + mtx_leave(&sosp->so_snd.sb_mtx); > + mtx_leave(&so->so_rcv.sb_mtx); > error = pru_send(sosp, m, NULL, NULL); > + mtx_enter(&so->so_rcv.sb_mtx); > + mtx_enter(&sosp->so_snd.sb_mtx); > + > if (error) { > if (sosp->so_snd.sb_state & > SS_CANTSENDMORE) > @@ -1773,7 +1885,13 @@ somove(struct socket *so, int wait) > if (o) { > o->m_len = 1; > *mtod(o, caddr_t) = *mtod(m, caddr_t); > + > + mtx_leave(&sosp->so_snd.sb_mtx); > + mtx_leave(&so->so_rcv.sb_mtx); > error = pru_sendoob(sosp, o, NULL, NULL); > + mtx_enter(&so->so_rcv.sb_mtx); > + mtx_enter(&sosp->so_snd.sb_mtx); > + > if (error) { > if (sosp->so_snd.sb_state & SS_CANTSENDMORE) > error = EPIPE; > @@ -1791,15 +1909,25 @@ somove(struct socket *so, int wait) > } > } > > - mtx_enter(&sosp->so_snd.sb_mtx); > /* Append all remaining data to drain socket. */ > if (so->so_rcv.sb_cc == 0 || maxreached) > sosp->so_snd.sb_state &= ~SS_ISSENDING; > + > mtx_leave(&sosp->so_snd.sb_mtx); > + mtx_leave(&so->so_rcv.sb_mtx); > > + if (sockdgram) > + solock_shared(sosp); > error = pru_send(sosp, m, NULL, NULL); > + if (sockdgram) > + sounlock_shared(sosp); > + > + mtx_enter(&so->so_rcv.sb_mtx); > + mtx_enter(&sosp->so_snd.sb_mtx); > + > if (error) { > - if (sosp->so_snd.sb_state & SS_CANTSENDMORE) > + if (sosp->so_snd.sb_state & SS_CANTSENDMORE || > + so->so_pcb == NULL) > error = EPIPE; > goto release; > } > @@ -1810,26 +1938,35 @@ somove(struct socket *so, int wait) > goto nextpkt; > > release: > - mtx_enter(&sosp->so_snd.sb_mtx); > sosp->so_snd.sb_state &= ~SS_ISSENDING; > - mtx_leave(&sosp->so_snd.sb_mtx); > > if (!error && maxreached && so->so_splicemax == so->so_splicelen) > error = EFBIG; > if (error) > - so->so_error = error; > + WRITE_ONCE(so->so_error, error); > + > if (((so->so_rcv.sb_state & SS_CANTRCVMORE) && > so->so_rcv.sb_cc == 0) || > (sosp->so_snd.sb_state & SS_CANTSENDMORE) || > - maxreached || error) { > + maxreached || error) > + unsplice = 1; > + > + mtx_leave(&sosp->so_snd.sb_mtx); > + mtx_leave(&so->so_rcv.sb_mtx); > + > + if (unsplice) { > + if (sockdgram) > + solock(so); > sounsplice(so, sosp, 0); > + if (sockdgram) > + sounlock(so); > + > return (0); > } > if (timerisset(&so->so_idletv)) > timeout_add_tv(&so->so_idleto, &so->so_idletv); > return (1); > } > - > #endif /* SOCKET_SPLICE */ > > void > @@ -1839,22 +1976,16 @@ sorwakeup(struct socket *so) > soassertlocked_readonly(so); > > #ifdef SOCKET_SPLICE > - if (so->so_rcv.sb_flags & SB_SPLICE) { > - /* > - * TCP has a sendbuffer that can handle multiple packets > - * at once. So queue the stream a bit to accumulate data. > - * The sosplice thread will call somove() later and send > - * the packets calling tcp_output() only once. > - * In the UDP case, send out the packets immediately. > - * Using a thread would make things slower. > - */ > - if (so->so_proto->pr_flags & PR_WANTRCVD) > + if (so->so_proto->pr_flags & PR_SPLICE) { > + sb_mtx_lock(&so->so_rcv); > + if (so->so_rcv.sb_flags & SB_SPLICE) > task_add(sosplice_taskq, &so->so_splicetask); > - else > - somove(so, M_DONTWAIT); > + if (isspliced(so)) { > + sb_mtx_unlock(&so->so_rcv); > + return; > + } > + sb_mtx_unlock(&so->so_rcv); > } > - if (isspliced(so)) > - return; > #endif > sowakeup(so, &so->so_rcv); > if (so->so_upcall) > @@ -1868,10 +1999,17 @@ sowwakeup(struct socket *so) > soassertlocked_readonly(so); > > #ifdef SOCKET_SPLICE > - if (so->so_snd.sb_flags & SB_SPLICE) > - task_add(sosplice_taskq, &so->so_sp->ssp_soback->so_splicetask); > - if (issplicedback(so)) > - return; > + if (so->so_proto->pr_flags & PR_SPLICE) { > + sb_mtx_lock(&so->so_snd); > + if (so->so_snd.sb_flags & SB_SPLICE) > + task_add(sosplice_taskq, > + &so->so_sp->ssp_soback->so_splicetask); > + if (issplicedback(so)) { > + sb_mtx_unlock(&so->so_snd); > + return; > + } > + sb_mtx_unlock(&so->so_snd); > + } > #endif > sowakeup(so, &so->so_snd); > } > Index: sys/netinet/udp_usrreq.c > =================================================================== > RCS file: /cvs/src/sys/netinet/udp_usrreq.c,v > diff -u -p -r1.322 udp_usrreq.c > --- sys/netinet/udp_usrreq.c 19 Jul 2024 15:41:58 -0000 1.322 > +++ sys/netinet/udp_usrreq.c 20 Jul 2024 15:32:12 -0000 > @@ -1209,6 +1209,11 @@ udp_send(struct socket *so, struct mbuf > > soassertlocked_readonly(so); > > + if (inp == NULL) { > + /* PCB could be destroyed, but socket still spliced. */ > + return (EINVAL); > + } > + > #ifdef PIPEX > if (inp->inp_pipex) { > struct pipex_session *session; > Index: sys/sys/socketvar.h > =================================================================== > RCS file: /cvs/src/sys/sys/socketvar.h,v > diff -u -p -r1.132 socketvar.h > --- sys/sys/socketvar.h 12 Jul 2024 17:20:18 -0000 1.132 > +++ sys/sys/socketvar.h 20 Jul 2024 15:32:12 -0000 > @@ -52,6 +52,33 @@ typedef __socklen_t socklen_t; /* length > TAILQ_HEAD(soqhead, socket); > > /* > + * Locks used to protect global data and struct members: > + * I immutable after creation > + * mr sb_mxt of so_rcv buffer > + * ms sb_mtx of so_snd buffer > + * br sblock() of so_rcv buffer > + * bs sblock() od so_snd buffer > + * s solock() > + */ > + > +/* > + * XXXSMP: tcp(4) sockets rely on exclusive solock() for all the cases. > + */ > + > +/* > + * Variables for socket splicing, allocated only when needed. > + */ > +struct sosplice { > + struct socket *ssp_socket; /* [mr ms] send data to drain socket */ > + struct socket *ssp_soback; /* [ms ms] back ref to source socket */ > + off_t ssp_len; /* [mr] number of bytes spliced */ > + off_t ssp_max; /* [I] maximum number of bytes */ > + struct timeval ssp_idletv; /* [I] idle timeout */ > + struct timeout ssp_idleto; > + struct task ssp_task; /* task for somove */ > +}; > + > +/* > * Kernel structure per socket. > * Contains send and receive buffer queues, > * handle on protocol and pointer to protocol > @@ -89,18 +116,8 @@ struct socket { > short so_timeo; /* connection timeout */ > u_long so_oobmark; /* chars to oob mark */ > u_int so_error; /* error affecting connection */ > -/* > - * Variables for socket splicing, allocated only when needed. > - */ > - struct sosplice { > - struct socket *ssp_socket; /* send data to drain socket */ > - struct socket *ssp_soback; /* back ref to source socket */ > - off_t ssp_len; /* number of bytes spliced */ > - off_t ssp_max; /* maximum number of bytes */ > - struct timeval ssp_idletv; /* idle timeout */ > - struct timeout ssp_idleto; > - struct task ssp_task; /* task for somove */ > - } *so_sp; > + > + struct sosplice *so_sp; /* [s br] */ > /* > * Variables for socket buffering. > */ > @@ -329,6 +346,12 @@ int sblock(struct sockbuf *, int); > > /* release lock on sockbuf sb */ > void sbunlock(struct sockbuf *); > + > +static inline void > +sbassertlocked(struct sockbuf *sb) > +{ > + rw_assert_wrlock(&sb->sb_lock); > +} > > #define SB_EMPTY_FIXUP(sb) do { \ > if ((sb)->sb_mb == NULL) { \