Download raw body.
Unlock udp(4) somove()
On Sat, Jul 20, 2024 at 06:35:23PM +0300, Vitaliy Makkoveev wrote:
> I fixed typos and white spaces. Updated diff also contains locks
> description for 'sosplice' structure, this is not functional change.
You attached the diff twice in this mail. But both are identical.
I cannot test right now as my machines are busy with other tests.
There is no functional change to the version I have tested, so
OK bluhm@
I would suggest to
- you commit somove diff
- wait a bit and watch for fallout
- I commit parallel udp input
- live with that for a while
- try to call UDP somove without task from sorwakeup
Currently we queue before udp_input. Parallel udp input moves the
queueing before somove. This is an improvement for performance in
splicing and non-splciing case. Calling somove directly can be
done later.
bluhm
> Index: sys/kern/uipc_socket.c
> ===================================================================
> RCS file: /cvs/src/sys/kern/uipc_socket.c,v
> diff -u -p -r1.338 uipc_socket.c
> --- sys/kern/uipc_socket.c 14 Jul 2024 15:42:23 -0000 1.338
> +++ sys/kern/uipc_socket.c 20 Jul 2024 15:32:12 -0000
> @@ -324,31 +324,22 @@ sofree(struct socket *so, int keep_lock)
> sounlock(head);
> }
>
> - if (persocket) {
> + switch (so->so_proto->pr_domain->dom_family) {
> + case AF_INET:
> + case AF_INET6:
> + if (so->so_proto->pr_type == SOCK_STREAM)
> + break;
> + /* FALLTHROUGH */
> + default:
> sounlock(so);
> refcnt_finalize(&so->so_refcnt, "sofinal");
> solock(so);
> + break;
> }
>
> sigio_free(&so->so_sigio);
> klist_free(&so->so_rcv.sb_klist);
> klist_free(&so->so_snd.sb_klist);
> -#ifdef SOCKET_SPLICE
> - if (issplicedback(so)) {
> - int freeing = SOSP_FREEING_WRITE;
> -
> - if (so->so_sp->ssp_soback == so)
> - freeing |= SOSP_FREEING_READ;
> - sounsplice(so->so_sp->ssp_soback, so, freeing);
> - }
> - if (isspliced(so)) {
> - int freeing = SOSP_FREEING_READ;
> -
> - if (so == so->so_sp->ssp_socket)
> - freeing |= SOSP_FREEING_WRITE;
> - sounsplice(so, so->so_sp->ssp_socket, freeing);
> - }
> -#endif /* SOCKET_SPLICE */
>
> mtx_enter(&so->so_snd.sb_mtx);
> sbrelease(so, &so->so_snd);
> @@ -458,6 +449,85 @@ discard:
> if (so->so_state & SS_NOFDREF)
> panic("soclose NOFDREF: so %p, so_type %d", so, so->so_type);
> so->so_state |= SS_NOFDREF;
> +
> +#ifdef SOCKET_SPLICE
> + if (so->so_sp) {
> + struct socket *soback;
> +
> + if (so->so_proto->pr_flags & PR_WANTRCVD) {
> + /*
> + * Copy - Paste, but can't relock and sleep in
> + * sofree() in tcp(4) case. That's why tcp(4)
> + * still rely on solock() for splicing and
> + * unsplicing.
> + */
> +
> + if (issplicedback(so)) {
> + int freeing = SOSP_FREEING_WRITE;
> +
> + if (so->so_sp->ssp_soback == so)
> + freeing |= SOSP_FREEING_READ;
> + sounsplice(so->so_sp->ssp_soback, so, freeing);
> + }
> + if (isspliced(so)) {
> + int freeing = SOSP_FREEING_READ;
> +
> + if (so == so->so_sp->ssp_socket)
> + freeing |= SOSP_FREEING_WRITE;
> + sounsplice(so, so->so_sp->ssp_socket, freeing);
> + }
> + goto free;
> + }
> +
> + sounlock(so);
> + mtx_enter(&so->so_snd.sb_mtx);
> + /*
> + * Concurrent sounsplice() locks `sb_mtx' mutexes on
> + * both `so_snd' and `so_rcv' before unsplice sockets.
> + */
> + if ((soback = so->so_sp->ssp_soback) == NULL) {
> + mtx_leave(&so->so_snd.sb_mtx);
> + goto notsplicedback;
> + }
> + soref(soback);
> + mtx_leave(&so->so_snd.sb_mtx);
> +
> + /*
> + * `so' can be only unspliced, and never spliced again.
> + * Thus if issplicedback(so) check is positive, socket is
> + * still spliced and `ssp_soback' points to the same
> + * socket that `soback'.
> + */
> + sblock(&soback->so_rcv, SBL_WAIT | SBL_NOINTR);
> + if (issplicedback(so)) {
> + int freeing = SOSP_FREEING_WRITE;
> +
> + if (so->so_sp->ssp_soback == so)
> + freeing |= SOSP_FREEING_READ;
> + solock(soback);
> + sounsplice(so->so_sp->ssp_soback, so, freeing);
> + sounlock(soback);
> + }
> + sbunlock(&soback->so_rcv);
> + sorele(soback);
> +
> +notsplicedback:
> + sblock(&so->so_rcv, SBL_WAIT | SBL_NOINTR);
> + if (isspliced(so)) {
> + int freeing = SOSP_FREEING_READ;
> +
> + if (so == so->so_sp->ssp_socket)
> + freeing |= SOSP_FREEING_WRITE;
> + solock(so);
> + sounsplice(so, so->so_sp->ssp_socket, freeing);
> + sounlock(so);
> + }
> + sbunlock(&so->so_rcv);
> +
> + solock(so);
> + }
> +free:
> +#endif /* SOCKET_SPLICE */
> /* sofree() calls sounlock(). */
> sofree(so, 0);
> return (error);
> @@ -1411,14 +1481,6 @@ sosplice(struct socket *so, int fd, off_
> goto release;
> }
>
> - /* Splice so and sosp together. */
> - mtx_enter(&so->so_rcv.sb_mtx);
> - mtx_enter(&sosp->so_snd.sb_mtx);
> - so->so_sp->ssp_socket = sosp;
> - sosp->so_sp->ssp_soback = so;
> - mtx_leave(&sosp->so_snd.sb_mtx);
> - mtx_leave(&so->so_rcv.sb_mtx);
> -
> so->so_splicelen = 0;
> so->so_splicemax = max;
> if (tv)
> @@ -1429,9 +1491,20 @@ sosplice(struct socket *so, int fd, off_
> task_set(&so->so_splicetask, sotask, so);
>
> /*
> - * To prevent softnet interrupt from calling somove() while
> - * we sleep, the socket buffers are not marked as spliced yet.
> + * To prevent sorwakeup() calling somove() before this somove()
> + * has finished, the socket buffers are not marked as spliced yet.
> */
> +
> + /* Splice so and sosp together. */
> + mtx_enter(&so->so_rcv.sb_mtx);
> + mtx_enter(&sosp->so_snd.sb_mtx);
> + so->so_sp->ssp_socket = sosp;
> + sosp->so_sp->ssp_soback = so;
> + mtx_leave(&sosp->so_snd.sb_mtx);
> + mtx_leave(&so->so_rcv.sb_mtx);
> +
> + if ((so->so_proto->pr_flags & PR_WANTRCVD) == 0)
> + sounlock(so);
> if (somove(so, M_WAIT)) {
> mtx_enter(&so->so_rcv.sb_mtx);
> mtx_enter(&sosp->so_snd.sb_mtx);
> @@ -1440,6 +1513,8 @@ sosplice(struct socket *so, int fd, off_
> mtx_leave(&sosp->so_snd.sb_mtx);
> mtx_leave(&so->so_rcv.sb_mtx);
> }
> + if ((so->so_proto->pr_flags & PR_WANTRCVD) == 0)
> + solock(so);
>
> release:
> sounlock(so);
> @@ -1454,6 +1529,8 @@ sosplice(struct socket *so, int fd, off_
> void
> sounsplice(struct socket *so, struct socket *sosp, int freeing)
> {
> + if ((so->so_proto->pr_flags & PR_WANTRCVD) == 0)
> + sbassertlocked(&so->so_rcv);
> soassertlocked(so);
>
> task_del(sosplice_taskq, &so->so_splicetask);
> @@ -1479,32 +1556,51 @@ soidle(void *arg)
> {
> struct socket *so = arg;
>
> + sblock(&so->so_rcv, SBL_WAIT | SBL_NOINTR);
> solock(so);
> + /*
> + * Depending on socket type, sblock(&so->so_rcv) or solock()
> + * is always held while modifying SB_SPLICE and
> + * so->so_sp->ssp_socket.
> + */
> if (so->so_rcv.sb_flags & SB_SPLICE) {
> so->so_error = ETIMEDOUT;
> sounsplice(so, so->so_sp->ssp_socket, 0);
> }
> sounlock(so);
> + sbunlock(&so->so_rcv);
> }
>
> void
> sotask(void *arg)
> {
> struct socket *so = arg;
> + int doyield = 0;
> + int sockstream = (so->so_proto->pr_flags & PR_WANTRCVD);
> +
> + /*
> + * sblock() on `so_rcv' protects sockets from beind unspliced
> + * for UDP case. TCP sockets still rely on solock().
> + */
> +
> + sblock(&so->so_rcv, SBL_WAIT | SBL_NOINTR);
> + if (sockstream)
> + solock(so);
>
> - solock(so);
> if (so->so_rcv.sb_flags & SB_SPLICE) {
> - /*
> - * We may not sleep here as sofree() and unsplice() may be
> - * called from softnet interrupt context. This would remove
> - * the socket during somove().
> - */
> + if (sockstream)
> + doyield = 1;
> somove(so, M_DONTWAIT);
> }
> - sounlock(so);
>
> - /* Avoid user land starvation. */
> - yield();
> + if (sockstream)
> + sounlock(so);
> + sbunlock(&so->so_rcv);
> +
> + if (doyield) {
> + /* Avoid user land starvation. */
> + yield();
> + }
> }
>
> /*
> @@ -1546,24 +1642,32 @@ somove(struct socket *so, int wait)
> struct mbuf *m, **mp, *nextrecord;
> u_long len, off, oobmark;
> long space;
> - int error = 0, maxreached = 0;
> + int error = 0, maxreached = 0, unsplice = 0;
> unsigned int rcvstate;
> + int sockdgram = ((so->so_proto->pr_flags &
> + PR_WANTRCVD) == 0);
>
> - soassertlocked(so);
> + if (sockdgram)
> + sbassertlocked(&so->so_rcv);
> + else
> + soassertlocked(so);
> +
> + mtx_enter(&so->so_rcv.sb_mtx);
> + mtx_enter(&sosp->so_snd.sb_mtx);
>
> nextpkt:
> - if (so->so_error) {
> - error = so->so_error;
> + if ((error = READ_ONCE(so->so_error)))
> goto release;
> - }
> if (sosp->so_snd.sb_state & SS_CANTSENDMORE) {
> error = EPIPE;
> goto release;
> }
> - if (sosp->so_error && sosp->so_error != ETIMEDOUT &&
> - sosp->so_error != EFBIG && sosp->so_error != ELOOP) {
> - error = sosp->so_error;
> - goto release;
> +
> + error = READ_ONCE(sosp->so_error);
> + if (error) {
> + if (error != ETIMEDOUT && error != EFBIG && error != ELOOP)
> + goto release;
> + error = 0;
> }
> if ((sosp->so_state & SS_ISCONNECTED) == 0)
> goto release;
> @@ -1577,26 +1681,21 @@ somove(struct socket *so, int wait)
> maxreached = 1;
> }
> }
> - mtx_enter(&sosp->so_snd.sb_mtx);
> space = sbspace_locked(sosp, &sosp->so_snd);
> if (so->so_oobmark && so->so_oobmark < len &&
> so->so_oobmark < space + 1024)
> space += 1024;
> if (space <= 0) {
> - mtx_leave(&sosp->so_snd.sb_mtx);
> maxreached = 0;
> goto release;
> }
> if (space < len) {
> maxreached = 0;
> - if (space < sosp->so_snd.sb_lowat) {
> - mtx_leave(&sosp->so_snd.sb_mtx);
> + if (space < sosp->so_snd.sb_lowat)
> goto release;
> - }
> len = space;
> }
> sosp->so_snd.sb_state |= SS_ISSENDING;
> - mtx_leave(&sosp->so_snd.sb_mtx);
>
> SBLASTRECORDCHK(&so->so_rcv, "somove 1");
> SBLASTMBUFCHK(&so->so_rcv, "somove 1");
> @@ -1618,8 +1717,13 @@ somove(struct socket *so, int wait)
> m = m->m_next;
> if (m == NULL) {
> sbdroprecord(so, &so->so_rcv);
> - if (so->so_proto->pr_flags & PR_WANTRCVD)
> + if (so->so_proto->pr_flags & PR_WANTRCVD) {
> + mtx_leave(&sosp->so_snd.sb_mtx);
> + mtx_leave(&so->so_rcv.sb_mtx);
> pru_rcvd(so);
> + mtx_enter(&so->so_rcv.sb_mtx);
> + mtx_enter(&sosp->so_snd.sb_mtx);
> + }
> goto nextpkt;
> }
>
> @@ -1724,11 +1828,15 @@ somove(struct socket *so, int wait)
> }
>
> /* Send window update to source peer as receive buffer has changed. */
> - if (so->so_proto->pr_flags & PR_WANTRCVD)
> + if (so->so_proto->pr_flags & PR_WANTRCVD) {
> + mtx_leave(&sosp->so_snd.sb_mtx);
> + mtx_leave(&so->so_rcv.sb_mtx);
> pru_rcvd(so);
> + mtx_enter(&so->so_rcv.sb_mtx);
> + mtx_enter(&sosp->so_snd.sb_mtx);
> + }
>
> /* Receive buffer did shrink by len bytes, adjust oob. */
> - mtx_enter(&so->so_rcv.sb_mtx);
> rcvstate = so->so_rcv.sb_state;
> so->so_rcv.sb_state &= ~SS_RCVATMARK;
> oobmark = so->so_oobmark;
> @@ -1739,7 +1847,6 @@ somove(struct socket *so, int wait)
> if (oobmark >= len)
> oobmark = 0;
> }
> - mtx_leave(&so->so_rcv.sb_mtx);
>
> /*
> * Handle oob data. If any malloc fails, ignore error.
> @@ -1755,7 +1862,12 @@ somove(struct socket *so, int wait)
> } else if (oobmark) {
> o = m_split(m, oobmark, wait);
> if (o) {
> + mtx_leave(&sosp->so_snd.sb_mtx);
> + mtx_leave(&so->so_rcv.sb_mtx);
> error = pru_send(sosp, m, NULL, NULL);
> + mtx_enter(&so->so_rcv.sb_mtx);
> + mtx_enter(&sosp->so_snd.sb_mtx);
> +
> if (error) {
> if (sosp->so_snd.sb_state &
> SS_CANTSENDMORE)
> @@ -1773,7 +1885,13 @@ somove(struct socket *so, int wait)
> if (o) {
> o->m_len = 1;
> *mtod(o, caddr_t) = *mtod(m, caddr_t);
> +
> + mtx_leave(&sosp->so_snd.sb_mtx);
> + mtx_leave(&so->so_rcv.sb_mtx);
> error = pru_sendoob(sosp, o, NULL, NULL);
> + mtx_enter(&so->so_rcv.sb_mtx);
> + mtx_enter(&sosp->so_snd.sb_mtx);
> +
> if (error) {
> if (sosp->so_snd.sb_state & SS_CANTSENDMORE)
> error = EPIPE;
> @@ -1791,15 +1909,25 @@ somove(struct socket *so, int wait)
> }
> }
>
> - mtx_enter(&sosp->so_snd.sb_mtx);
> /* Append all remaining data to drain socket. */
> if (so->so_rcv.sb_cc == 0 || maxreached)
> sosp->so_snd.sb_state &= ~SS_ISSENDING;
> +
> mtx_leave(&sosp->so_snd.sb_mtx);
> + mtx_leave(&so->so_rcv.sb_mtx);
>
> + if (sockdgram)
> + solock_shared(sosp);
> error = pru_send(sosp, m, NULL, NULL);
> + if (sockdgram)
> + sounlock_shared(sosp);
> +
> + mtx_enter(&so->so_rcv.sb_mtx);
> + mtx_enter(&sosp->so_snd.sb_mtx);
> +
> if (error) {
> - if (sosp->so_snd.sb_state & SS_CANTSENDMORE)
> + if (sosp->so_snd.sb_state & SS_CANTSENDMORE ||
> + so->so_pcb == NULL)
> error = EPIPE;
> goto release;
> }
> @@ -1810,26 +1938,35 @@ somove(struct socket *so, int wait)
> goto nextpkt;
>
> release:
> - mtx_enter(&sosp->so_snd.sb_mtx);
> sosp->so_snd.sb_state &= ~SS_ISSENDING;
> - mtx_leave(&sosp->so_snd.sb_mtx);
>
> if (!error && maxreached && so->so_splicemax == so->so_splicelen)
> error = EFBIG;
> if (error)
> - so->so_error = error;
> + WRITE_ONCE(so->so_error, error);
> +
> if (((so->so_rcv.sb_state & SS_CANTRCVMORE) &&
> so->so_rcv.sb_cc == 0) ||
> (sosp->so_snd.sb_state & SS_CANTSENDMORE) ||
> - maxreached || error) {
> + maxreached || error)
> + unsplice = 1;
> +
> + mtx_leave(&sosp->so_snd.sb_mtx);
> + mtx_leave(&so->so_rcv.sb_mtx);
> +
> + if (unsplice) {
> + if (sockdgram)
> + solock(so);
> sounsplice(so, sosp, 0);
> + if (sockdgram)
> + sounlock(so);
> +
> return (0);
> }
> if (timerisset(&so->so_idletv))
> timeout_add_tv(&so->so_idleto, &so->so_idletv);
> return (1);
> }
> -
> #endif /* SOCKET_SPLICE */
>
> void
> @@ -1839,22 +1976,16 @@ sorwakeup(struct socket *so)
> soassertlocked_readonly(so);
>
> #ifdef SOCKET_SPLICE
> - if (so->so_rcv.sb_flags & SB_SPLICE) {
> - /*
> - * TCP has a sendbuffer that can handle multiple packets
> - * at once. So queue the stream a bit to accumulate data.
> - * The sosplice thread will call somove() later and send
> - * the packets calling tcp_output() only once.
> - * In the UDP case, send out the packets immediately.
> - * Using a thread would make things slower.
> - */
> - if (so->so_proto->pr_flags & PR_WANTRCVD)
> + if (so->so_proto->pr_flags & PR_SPLICE) {
> + sb_mtx_lock(&so->so_rcv);
> + if (so->so_rcv.sb_flags & SB_SPLICE)
> task_add(sosplice_taskq, &so->so_splicetask);
> - else
> - somove(so, M_DONTWAIT);
> + if (isspliced(so)) {
> + sb_mtx_unlock(&so->so_rcv);
> + return;
> + }
> + sb_mtx_unlock(&so->so_rcv);
> }
> - if (isspliced(so))
> - return;
> #endif
> sowakeup(so, &so->so_rcv);
> if (so->so_upcall)
> @@ -1868,10 +1999,17 @@ sowwakeup(struct socket *so)
> soassertlocked_readonly(so);
>
> #ifdef SOCKET_SPLICE
> - if (so->so_snd.sb_flags & SB_SPLICE)
> - task_add(sosplice_taskq, &so->so_sp->ssp_soback->so_splicetask);
> - if (issplicedback(so))
> - return;
> + if (so->so_proto->pr_flags & PR_SPLICE) {
> + sb_mtx_lock(&so->so_snd);
> + if (so->so_snd.sb_flags & SB_SPLICE)
> + task_add(sosplice_taskq,
> + &so->so_sp->ssp_soback->so_splicetask);
> + if (issplicedback(so)) {
> + sb_mtx_unlock(&so->so_snd);
> + return;
> + }
> + sb_mtx_unlock(&so->so_snd);
> + }
> #endif
> sowakeup(so, &so->so_snd);
> }
> Index: sys/netinet/udp_usrreq.c
> ===================================================================
> RCS file: /cvs/src/sys/netinet/udp_usrreq.c,v
> diff -u -p -r1.322 udp_usrreq.c
> --- sys/netinet/udp_usrreq.c 19 Jul 2024 15:41:58 -0000 1.322
> +++ sys/netinet/udp_usrreq.c 20 Jul 2024 15:32:12 -0000
> @@ -1209,6 +1209,11 @@ udp_send(struct socket *so, struct mbuf
>
> soassertlocked_readonly(so);
>
> + if (inp == NULL) {
> + /* PCB could be destroyed, but socket still spliced. */
> + return (EINVAL);
> + }
> +
> #ifdef PIPEX
> if (inp->inp_pipex) {
> struct pipex_session *session;
> Index: sys/sys/socketvar.h
> ===================================================================
> RCS file: /cvs/src/sys/sys/socketvar.h,v
> diff -u -p -r1.132 socketvar.h
> --- sys/sys/socketvar.h 12 Jul 2024 17:20:18 -0000 1.132
> +++ sys/sys/socketvar.h 20 Jul 2024 15:32:12 -0000
> @@ -52,6 +52,33 @@ typedef __socklen_t socklen_t; /* length
> TAILQ_HEAD(soqhead, socket);
>
> /*
> + * Locks used to protect global data and struct members:
> + * I immutable after creation
> + * mr sb_mxt of so_rcv buffer
> + * ms sb_mtx of so_snd buffer
> + * br sblock() of so_rcv buffer
> + * bs sblock() od so_snd buffer
> + * s solock()
> + */
> +
> +/*
> + * XXXSMP: tcp(4) sockets rely on exclusive solock() for all the cases.
> + */
> +
> +/*
> + * Variables for socket splicing, allocated only when needed.
> + */
> +struct sosplice {
> + struct socket *ssp_socket; /* [mr ms] send data to drain socket */
> + struct socket *ssp_soback; /* [ms ms] back ref to source socket */
> + off_t ssp_len; /* [mr] number of bytes spliced */
> + off_t ssp_max; /* [I] maximum number of bytes */
> + struct timeval ssp_idletv; /* [I] idle timeout */
> + struct timeout ssp_idleto;
> + struct task ssp_task; /* task for somove */
> +};
> +
> +/*
> * Kernel structure per socket.
> * Contains send and receive buffer queues,
> * handle on protocol and pointer to protocol
> @@ -89,18 +116,8 @@ struct socket {
> short so_timeo; /* connection timeout */
> u_long so_oobmark; /* chars to oob mark */
> u_int so_error; /* error affecting connection */
> -/*
> - * Variables for socket splicing, allocated only when needed.
> - */
> - struct sosplice {
> - struct socket *ssp_socket; /* send data to drain socket */
> - struct socket *ssp_soback; /* back ref to source socket */
> - off_t ssp_len; /* number of bytes spliced */
> - off_t ssp_max; /* maximum number of bytes */
> - struct timeval ssp_idletv; /* idle timeout */
> - struct timeout ssp_idleto;
> - struct task ssp_task; /* task for somove */
> - } *so_sp;
> +
> + struct sosplice *so_sp; /* [s br] */
> /*
> * Variables for socket buffering.
> */
> @@ -329,6 +346,12 @@ int sblock(struct sockbuf *, int);
>
> /* release lock on sockbuf sb */
> void sbunlock(struct sockbuf *);
> +
> +static inline void
> +sbassertlocked(struct sockbuf *sb)
> +{
> + rw_assert_wrlock(&sb->sb_lock);
> +}
>
> #define SB_EMPTY_FIXUP(sb) do { \
> if ((sb)->sb_mb == NULL) { \
> Index: sys/kern/uipc_socket.c
> ===================================================================
> RCS file: /cvs/src/sys/kern/uipc_socket.c,v
> diff -u -p -r1.338 uipc_socket.c
> --- sys/kern/uipc_socket.c 14 Jul 2024 15:42:23 -0000 1.338
> +++ sys/kern/uipc_socket.c 20 Jul 2024 15:32:12 -0000
> @@ -324,31 +324,22 @@ sofree(struct socket *so, int keep_lock)
> sounlock(head);
> }
>
> - if (persocket) {
> + switch (so->so_proto->pr_domain->dom_family) {
> + case AF_INET:
> + case AF_INET6:
> + if (so->so_proto->pr_type == SOCK_STREAM)
> + break;
> + /* FALLTHROUGH */
> + default:
> sounlock(so);
> refcnt_finalize(&so->so_refcnt, "sofinal");
> solock(so);
> + break;
> }
>
> sigio_free(&so->so_sigio);
> klist_free(&so->so_rcv.sb_klist);
> klist_free(&so->so_snd.sb_klist);
> -#ifdef SOCKET_SPLICE
> - if (issplicedback(so)) {
> - int freeing = SOSP_FREEING_WRITE;
> -
> - if (so->so_sp->ssp_soback == so)
> - freeing |= SOSP_FREEING_READ;
> - sounsplice(so->so_sp->ssp_soback, so, freeing);
> - }
> - if (isspliced(so)) {
> - int freeing = SOSP_FREEING_READ;
> -
> - if (so == so->so_sp->ssp_socket)
> - freeing |= SOSP_FREEING_WRITE;
> - sounsplice(so, so->so_sp->ssp_socket, freeing);
> - }
> -#endif /* SOCKET_SPLICE */
>
> mtx_enter(&so->so_snd.sb_mtx);
> sbrelease(so, &so->so_snd);
> @@ -458,6 +449,85 @@ discard:
> if (so->so_state & SS_NOFDREF)
> panic("soclose NOFDREF: so %p, so_type %d", so, so->so_type);
> so->so_state |= SS_NOFDREF;
> +
> +#ifdef SOCKET_SPLICE
> + if (so->so_sp) {
> + struct socket *soback;
> +
> + if (so->so_proto->pr_flags & PR_WANTRCVD) {
> + /*
> + * Copy - Paste, but can't relock and sleep in
> + * sofree() in tcp(4) case. That's why tcp(4)
> + * still rely on solock() for splicing and
> + * unsplicing.
> + */
> +
> + if (issplicedback(so)) {
> + int freeing = SOSP_FREEING_WRITE;
> +
> + if (so->so_sp->ssp_soback == so)
> + freeing |= SOSP_FREEING_READ;
> + sounsplice(so->so_sp->ssp_soback, so, freeing);
> + }
> + if (isspliced(so)) {
> + int freeing = SOSP_FREEING_READ;
> +
> + if (so == so->so_sp->ssp_socket)
> + freeing |= SOSP_FREEING_WRITE;
> + sounsplice(so, so->so_sp->ssp_socket, freeing);
> + }
> + goto free;
> + }
> +
> + sounlock(so);
> + mtx_enter(&so->so_snd.sb_mtx);
> + /*
> + * Concurrent sounsplice() locks `sb_mtx' mutexes on
> + * both `so_snd' and `so_rcv' before unsplice sockets.
> + */
> + if ((soback = so->so_sp->ssp_soback) == NULL) {
> + mtx_leave(&so->so_snd.sb_mtx);
> + goto notsplicedback;
> + }
> + soref(soback);
> + mtx_leave(&so->so_snd.sb_mtx);
> +
> + /*
> + * `so' can be only unspliced, and never spliced again.
> + * Thus if issplicedback(so) check is positive, socket is
> + * still spliced and `ssp_soback' points to the same
> + * socket that `soback'.
> + */
> + sblock(&soback->so_rcv, SBL_WAIT | SBL_NOINTR);
> + if (issplicedback(so)) {
> + int freeing = SOSP_FREEING_WRITE;
> +
> + if (so->so_sp->ssp_soback == so)
> + freeing |= SOSP_FREEING_READ;
> + solock(soback);
> + sounsplice(so->so_sp->ssp_soback, so, freeing);
> + sounlock(soback);
> + }
> + sbunlock(&soback->so_rcv);
> + sorele(soback);
> +
> +notsplicedback:
> + sblock(&so->so_rcv, SBL_WAIT | SBL_NOINTR);
> + if (isspliced(so)) {
> + int freeing = SOSP_FREEING_READ;
> +
> + if (so == so->so_sp->ssp_socket)
> + freeing |= SOSP_FREEING_WRITE;
> + solock(so);
> + sounsplice(so, so->so_sp->ssp_socket, freeing);
> + sounlock(so);
> + }
> + sbunlock(&so->so_rcv);
> +
> + solock(so);
> + }
> +free:
> +#endif /* SOCKET_SPLICE */
> /* sofree() calls sounlock(). */
> sofree(so, 0);
> return (error);
> @@ -1411,14 +1481,6 @@ sosplice(struct socket *so, int fd, off_
> goto release;
> }
>
> - /* Splice so and sosp together. */
> - mtx_enter(&so->so_rcv.sb_mtx);
> - mtx_enter(&sosp->so_snd.sb_mtx);
> - so->so_sp->ssp_socket = sosp;
> - sosp->so_sp->ssp_soback = so;
> - mtx_leave(&sosp->so_snd.sb_mtx);
> - mtx_leave(&so->so_rcv.sb_mtx);
> -
> so->so_splicelen = 0;
> so->so_splicemax = max;
> if (tv)
> @@ -1429,9 +1491,20 @@ sosplice(struct socket *so, int fd, off_
> task_set(&so->so_splicetask, sotask, so);
>
> /*
> - * To prevent softnet interrupt from calling somove() while
> - * we sleep, the socket buffers are not marked as spliced yet.
> + * To prevent sorwakeup() calling somove() before this somove()
> + * has finished, the socket buffers are not marked as spliced yet.
> */
> +
> + /* Splice so and sosp together. */
> + mtx_enter(&so->so_rcv.sb_mtx);
> + mtx_enter(&sosp->so_snd.sb_mtx);
> + so->so_sp->ssp_socket = sosp;
> + sosp->so_sp->ssp_soback = so;
> + mtx_leave(&sosp->so_snd.sb_mtx);
> + mtx_leave(&so->so_rcv.sb_mtx);
> +
> + if ((so->so_proto->pr_flags & PR_WANTRCVD) == 0)
> + sounlock(so);
> if (somove(so, M_WAIT)) {
> mtx_enter(&so->so_rcv.sb_mtx);
> mtx_enter(&sosp->so_snd.sb_mtx);
> @@ -1440,6 +1513,8 @@ sosplice(struct socket *so, int fd, off_
> mtx_leave(&sosp->so_snd.sb_mtx);
> mtx_leave(&so->so_rcv.sb_mtx);
> }
> + if ((so->so_proto->pr_flags & PR_WANTRCVD) == 0)
> + solock(so);
>
> release:
> sounlock(so);
> @@ -1454,6 +1529,8 @@ sosplice(struct socket *so, int fd, off_
> void
> sounsplice(struct socket *so, struct socket *sosp, int freeing)
> {
> + if ((so->so_proto->pr_flags & PR_WANTRCVD) == 0)
> + sbassertlocked(&so->so_rcv);
> soassertlocked(so);
>
> task_del(sosplice_taskq, &so->so_splicetask);
> @@ -1479,32 +1556,51 @@ soidle(void *arg)
> {
> struct socket *so = arg;
>
> + sblock(&so->so_rcv, SBL_WAIT | SBL_NOINTR);
> solock(so);
> + /*
> + * Depending on socket type, sblock(&so->so_rcv) or solock()
> + * is always held while modifying SB_SPLICE and
> + * so->so_sp->ssp_socket.
> + */
> if (so->so_rcv.sb_flags & SB_SPLICE) {
> so->so_error = ETIMEDOUT;
> sounsplice(so, so->so_sp->ssp_socket, 0);
> }
> sounlock(so);
> + sbunlock(&so->so_rcv);
> }
>
> void
> sotask(void *arg)
> {
> struct socket *so = arg;
> + int doyield = 0;
> + int sockstream = (so->so_proto->pr_flags & PR_WANTRCVD);
> +
> + /*
> + * sblock() on `so_rcv' protects sockets from beind unspliced
> + * for UDP case. TCP sockets still rely on solock().
> + */
> +
> + sblock(&so->so_rcv, SBL_WAIT | SBL_NOINTR);
> + if (sockstream)
> + solock(so);
>
> - solock(so);
> if (so->so_rcv.sb_flags & SB_SPLICE) {
> - /*
> - * We may not sleep here as sofree() and unsplice() may be
> - * called from softnet interrupt context. This would remove
> - * the socket during somove().
> - */
> + if (sockstream)
> + doyield = 1;
> somove(so, M_DONTWAIT);
> }
> - sounlock(so);
>
> - /* Avoid user land starvation. */
> - yield();
> + if (sockstream)
> + sounlock(so);
> + sbunlock(&so->so_rcv);
> +
> + if (doyield) {
> + /* Avoid user land starvation. */
> + yield();
> + }
> }
>
> /*
> @@ -1546,24 +1642,32 @@ somove(struct socket *so, int wait)
> struct mbuf *m, **mp, *nextrecord;
> u_long len, off, oobmark;
> long space;
> - int error = 0, maxreached = 0;
> + int error = 0, maxreached = 0, unsplice = 0;
> unsigned int rcvstate;
> + int sockdgram = ((so->so_proto->pr_flags &
> + PR_WANTRCVD) == 0);
>
> - soassertlocked(so);
> + if (sockdgram)
> + sbassertlocked(&so->so_rcv);
> + else
> + soassertlocked(so);
> +
> + mtx_enter(&so->so_rcv.sb_mtx);
> + mtx_enter(&sosp->so_snd.sb_mtx);
>
> nextpkt:
> - if (so->so_error) {
> - error = so->so_error;
> + if ((error = READ_ONCE(so->so_error)))
> goto release;
> - }
> if (sosp->so_snd.sb_state & SS_CANTSENDMORE) {
> error = EPIPE;
> goto release;
> }
> - if (sosp->so_error && sosp->so_error != ETIMEDOUT &&
> - sosp->so_error != EFBIG && sosp->so_error != ELOOP) {
> - error = sosp->so_error;
> - goto release;
> +
> + error = READ_ONCE(sosp->so_error);
> + if (error) {
> + if (error != ETIMEDOUT && error != EFBIG && error != ELOOP)
> + goto release;
> + error = 0;
> }
> if ((sosp->so_state & SS_ISCONNECTED) == 0)
> goto release;
> @@ -1577,26 +1681,21 @@ somove(struct socket *so, int wait)
> maxreached = 1;
> }
> }
> - mtx_enter(&sosp->so_snd.sb_mtx);
> space = sbspace_locked(sosp, &sosp->so_snd);
> if (so->so_oobmark && so->so_oobmark < len &&
> so->so_oobmark < space + 1024)
> space += 1024;
> if (space <= 0) {
> - mtx_leave(&sosp->so_snd.sb_mtx);
> maxreached = 0;
> goto release;
> }
> if (space < len) {
> maxreached = 0;
> - if (space < sosp->so_snd.sb_lowat) {
> - mtx_leave(&sosp->so_snd.sb_mtx);
> + if (space < sosp->so_snd.sb_lowat)
> goto release;
> - }
> len = space;
> }
> sosp->so_snd.sb_state |= SS_ISSENDING;
> - mtx_leave(&sosp->so_snd.sb_mtx);
>
> SBLASTRECORDCHK(&so->so_rcv, "somove 1");
> SBLASTMBUFCHK(&so->so_rcv, "somove 1");
> @@ -1618,8 +1717,13 @@ somove(struct socket *so, int wait)
> m = m->m_next;
> if (m == NULL) {
> sbdroprecord(so, &so->so_rcv);
> - if (so->so_proto->pr_flags & PR_WANTRCVD)
> + if (so->so_proto->pr_flags & PR_WANTRCVD) {
> + mtx_leave(&sosp->so_snd.sb_mtx);
> + mtx_leave(&so->so_rcv.sb_mtx);
> pru_rcvd(so);
> + mtx_enter(&so->so_rcv.sb_mtx);
> + mtx_enter(&sosp->so_snd.sb_mtx);
> + }
> goto nextpkt;
> }
>
> @@ -1724,11 +1828,15 @@ somove(struct socket *so, int wait)
> }
>
> /* Send window update to source peer as receive buffer has changed. */
> - if (so->so_proto->pr_flags & PR_WANTRCVD)
> + if (so->so_proto->pr_flags & PR_WANTRCVD) {
> + mtx_leave(&sosp->so_snd.sb_mtx);
> + mtx_leave(&so->so_rcv.sb_mtx);
> pru_rcvd(so);
> + mtx_enter(&so->so_rcv.sb_mtx);
> + mtx_enter(&sosp->so_snd.sb_mtx);
> + }
>
> /* Receive buffer did shrink by len bytes, adjust oob. */
> - mtx_enter(&so->so_rcv.sb_mtx);
> rcvstate = so->so_rcv.sb_state;
> so->so_rcv.sb_state &= ~SS_RCVATMARK;
> oobmark = so->so_oobmark;
> @@ -1739,7 +1847,6 @@ somove(struct socket *so, int wait)
> if (oobmark >= len)
> oobmark = 0;
> }
> - mtx_leave(&so->so_rcv.sb_mtx);
>
> /*
> * Handle oob data. If any malloc fails, ignore error.
> @@ -1755,7 +1862,12 @@ somove(struct socket *so, int wait)
> } else if (oobmark) {
> o = m_split(m, oobmark, wait);
> if (o) {
> + mtx_leave(&sosp->so_snd.sb_mtx);
> + mtx_leave(&so->so_rcv.sb_mtx);
> error = pru_send(sosp, m, NULL, NULL);
> + mtx_enter(&so->so_rcv.sb_mtx);
> + mtx_enter(&sosp->so_snd.sb_mtx);
> +
> if (error) {
> if (sosp->so_snd.sb_state &
> SS_CANTSENDMORE)
> @@ -1773,7 +1885,13 @@ somove(struct socket *so, int wait)
> if (o) {
> o->m_len = 1;
> *mtod(o, caddr_t) = *mtod(m, caddr_t);
> +
> + mtx_leave(&sosp->so_snd.sb_mtx);
> + mtx_leave(&so->so_rcv.sb_mtx);
> error = pru_sendoob(sosp, o, NULL, NULL);
> + mtx_enter(&so->so_rcv.sb_mtx);
> + mtx_enter(&sosp->so_snd.sb_mtx);
> +
> if (error) {
> if (sosp->so_snd.sb_state & SS_CANTSENDMORE)
> error = EPIPE;
> @@ -1791,15 +1909,25 @@ somove(struct socket *so, int wait)
> }
> }
>
> - mtx_enter(&sosp->so_snd.sb_mtx);
> /* Append all remaining data to drain socket. */
> if (so->so_rcv.sb_cc == 0 || maxreached)
> sosp->so_snd.sb_state &= ~SS_ISSENDING;
> +
> mtx_leave(&sosp->so_snd.sb_mtx);
> + mtx_leave(&so->so_rcv.sb_mtx);
>
> + if (sockdgram)
> + solock_shared(sosp);
> error = pru_send(sosp, m, NULL, NULL);
> + if (sockdgram)
> + sounlock_shared(sosp);
> +
> + mtx_enter(&so->so_rcv.sb_mtx);
> + mtx_enter(&sosp->so_snd.sb_mtx);
> +
> if (error) {
> - if (sosp->so_snd.sb_state & SS_CANTSENDMORE)
> + if (sosp->so_snd.sb_state & SS_CANTSENDMORE ||
> + so->so_pcb == NULL)
> error = EPIPE;
> goto release;
> }
> @@ -1810,26 +1938,35 @@ somove(struct socket *so, int wait)
> goto nextpkt;
>
> release:
> - mtx_enter(&sosp->so_snd.sb_mtx);
> sosp->so_snd.sb_state &= ~SS_ISSENDING;
> - mtx_leave(&sosp->so_snd.sb_mtx);
>
> if (!error && maxreached && so->so_splicemax == so->so_splicelen)
> error = EFBIG;
> if (error)
> - so->so_error = error;
> + WRITE_ONCE(so->so_error, error);
> +
> if (((so->so_rcv.sb_state & SS_CANTRCVMORE) &&
> so->so_rcv.sb_cc == 0) ||
> (sosp->so_snd.sb_state & SS_CANTSENDMORE) ||
> - maxreached || error) {
> + maxreached || error)
> + unsplice = 1;
> +
> + mtx_leave(&sosp->so_snd.sb_mtx);
> + mtx_leave(&so->so_rcv.sb_mtx);
> +
> + if (unsplice) {
> + if (sockdgram)
> + solock(so);
> sounsplice(so, sosp, 0);
> + if (sockdgram)
> + sounlock(so);
> +
> return (0);
> }
> if (timerisset(&so->so_idletv))
> timeout_add_tv(&so->so_idleto, &so->so_idletv);
> return (1);
> }
> -
> #endif /* SOCKET_SPLICE */
>
> void
> @@ -1839,22 +1976,16 @@ sorwakeup(struct socket *so)
> soassertlocked_readonly(so);
>
> #ifdef SOCKET_SPLICE
> - if (so->so_rcv.sb_flags & SB_SPLICE) {
> - /*
> - * TCP has a sendbuffer that can handle multiple packets
> - * at once. So queue the stream a bit to accumulate data.
> - * The sosplice thread will call somove() later and send
> - * the packets calling tcp_output() only once.
> - * In the UDP case, send out the packets immediately.
> - * Using a thread would make things slower.
> - */
> - if (so->so_proto->pr_flags & PR_WANTRCVD)
> + if (so->so_proto->pr_flags & PR_SPLICE) {
> + sb_mtx_lock(&so->so_rcv);
> + if (so->so_rcv.sb_flags & SB_SPLICE)
> task_add(sosplice_taskq, &so->so_splicetask);
> - else
> - somove(so, M_DONTWAIT);
> + if (isspliced(so)) {
> + sb_mtx_unlock(&so->so_rcv);
> + return;
> + }
> + sb_mtx_unlock(&so->so_rcv);
> }
> - if (isspliced(so))
> - return;
> #endif
> sowakeup(so, &so->so_rcv);
> if (so->so_upcall)
> @@ -1868,10 +1999,17 @@ sowwakeup(struct socket *so)
> soassertlocked_readonly(so);
>
> #ifdef SOCKET_SPLICE
> - if (so->so_snd.sb_flags & SB_SPLICE)
> - task_add(sosplice_taskq, &so->so_sp->ssp_soback->so_splicetask);
> - if (issplicedback(so))
> - return;
> + if (so->so_proto->pr_flags & PR_SPLICE) {
> + sb_mtx_lock(&so->so_snd);
> + if (so->so_snd.sb_flags & SB_SPLICE)
> + task_add(sosplice_taskq,
> + &so->so_sp->ssp_soback->so_splicetask);
> + if (issplicedback(so)) {
> + sb_mtx_unlock(&so->so_snd);
> + return;
> + }
> + sb_mtx_unlock(&so->so_snd);
> + }
> #endif
> sowakeup(so, &so->so_snd);
> }
> Index: sys/netinet/udp_usrreq.c
> ===================================================================
> RCS file: /cvs/src/sys/netinet/udp_usrreq.c,v
> diff -u -p -r1.322 udp_usrreq.c
> --- sys/netinet/udp_usrreq.c 19 Jul 2024 15:41:58 -0000 1.322
> +++ sys/netinet/udp_usrreq.c 20 Jul 2024 15:32:12 -0000
> @@ -1209,6 +1209,11 @@ udp_send(struct socket *so, struct mbuf
>
> soassertlocked_readonly(so);
>
> + if (inp == NULL) {
> + /* PCB could be destroyed, but socket still spliced. */
> + return (EINVAL);
> + }
> +
> #ifdef PIPEX
> if (inp->inp_pipex) {
> struct pipex_session *session;
> Index: sys/sys/socketvar.h
> ===================================================================
> RCS file: /cvs/src/sys/sys/socketvar.h,v
> diff -u -p -r1.132 socketvar.h
> --- sys/sys/socketvar.h 12 Jul 2024 17:20:18 -0000 1.132
> +++ sys/sys/socketvar.h 20 Jul 2024 15:32:12 -0000
> @@ -52,6 +52,33 @@ typedef __socklen_t socklen_t; /* length
> TAILQ_HEAD(soqhead, socket);
>
> /*
> + * Locks used to protect global data and struct members:
> + * I immutable after creation
> + * mr sb_mxt of so_rcv buffer
> + * ms sb_mtx of so_snd buffer
> + * br sblock() of so_rcv buffer
> + * bs sblock() od so_snd buffer
> + * s solock()
> + */
> +
> +/*
> + * XXXSMP: tcp(4) sockets rely on exclusive solock() for all the cases.
> + */
> +
> +/*
> + * Variables for socket splicing, allocated only when needed.
> + */
> +struct sosplice {
> + struct socket *ssp_socket; /* [mr ms] send data to drain socket */
> + struct socket *ssp_soback; /* [ms ms] back ref to source socket */
> + off_t ssp_len; /* [mr] number of bytes spliced */
> + off_t ssp_max; /* [I] maximum number of bytes */
> + struct timeval ssp_idletv; /* [I] idle timeout */
> + struct timeout ssp_idleto;
> + struct task ssp_task; /* task for somove */
> +};
> +
> +/*
> * Kernel structure per socket.
> * Contains send and receive buffer queues,
> * handle on protocol and pointer to protocol
> @@ -89,18 +116,8 @@ struct socket {
> short so_timeo; /* connection timeout */
> u_long so_oobmark; /* chars to oob mark */
> u_int so_error; /* error affecting connection */
> -/*
> - * Variables for socket splicing, allocated only when needed.
> - */
> - struct sosplice {
> - struct socket *ssp_socket; /* send data to drain socket */
> - struct socket *ssp_soback; /* back ref to source socket */
> - off_t ssp_len; /* number of bytes spliced */
> - off_t ssp_max; /* maximum number of bytes */
> - struct timeval ssp_idletv; /* idle timeout */
> - struct timeout ssp_idleto;
> - struct task ssp_task; /* task for somove */
> - } *so_sp;
> +
> + struct sosplice *so_sp; /* [s br] */
> /*
> * Variables for socket buffering.
> */
> @@ -329,6 +346,12 @@ int sblock(struct sockbuf *, int);
>
> /* release lock on sockbuf sb */
> void sbunlock(struct sockbuf *);
> +
> +static inline void
> +sbassertlocked(struct sockbuf *sb)
> +{
> + rw_assert_wrlock(&sb->sb_lock);
> +}
>
> #define SB_EMPTY_FIXUP(sb) do { \
> if ((sb)->sb_mb == NULL) { \
Unlock udp(4) somove()