Download raw body.
Unlock udp(4) somove()
On Sat, Jul 20, 2024 at 01:27:37PM +0200, Alexander Bluhm wrote:
> On Fri, Jul 19, 2024 at 09:47:34PM +0300, Vitaliy Makkoveev wrote:
> > Understood. This one releases `sb_mtx' mutexes only around pru_rcvd().
>
> Some comments
>
> > + if ((soback = so->so_sp->ssp_soback) == NULL) {
> > + mtx_leave(&so->so_snd.sb_mtx);
> > + goto notsplicedback;
> > + }
> > + soref(soback);
> > + mtx_leave(&so->so_snd.sb_mtx);
> > +
> > + sblock(&soback->so_rcv, SBL_WAIT | SBL_NOINTR);
> > + /*
> > + * sblock() is always taken on `so_rcv' before call
> > + * sounsplice(). `so' is dying and can be only unspliced.
> > + */
> > + if (issplicedback(so)) {
> > + int freeing = SOSP_FREEING_WRITE;
> > +
> > + if (so->so_sp->ssp_soback == so)
> > + freeing |= SOSP_FREEING_READ;
> > + solock(soback);
> > + sounsplice(so->so_sp->ssp_soback, so, freeing);
> > + sounlock(soback);
> > + }
> > + sbunlock(&soback->so_rcv);
> > + sorele(soback);
>
> Here you mix soback and so->so_sp->ssp_soback. Are they the same?
> Between soback = so->so_sp->ssp_soback and unsplicing so->so_sp->ssp_soback
> there is sounlock(so), mtx_leave(&so->so_snd.sb_mtx), and
> sblock(&soback->so_rcv, SBL_WAIT | SBL_NOINTR) may sleep.
>
They are the same. `so' is not accessible from userland so concurrent
thread could only unsplice it. Thus if issplicedback(so) check is
positive, that means socket is still spliced and `soback' and
`ssp_soback' point to the same socket. However, I need to lock `so_rcv'
buffer on spliced peer before re-check `ssp_soback' by issplicedback().
That's why I use `soback' instead of so->so_sp->ssp_soback. After
unsplicing so->so_sp->ssp_soback is NULL so I use `soback' to unlock
`so_rcv' and release reference on former spliced peer.
I reworked commentary before this block:
+ /*
+ * `so' can be only unspliced, and never spliced again.
+ * Thus if issplicedback(so) check is positive, socket is
+ * still spliced and `ssp_soback' points to the same
+ * socket that `soback'.
+ */
+ sblock(&soback->so_rcv, SBL_WAIT | SBL_NOINTR);
+ if (issplicedback(so)) {
+ int freeing = SOSP_FREEING_WRITE;
+
+ if (so->so_sp->ssp_soback == so)
+ freeing |= SOSP_FREEING_READ;
+ solock(soback);
+ sounsplice(so->so_sp->ssp_soback, so, freeing);
+ sounlock(soback);
+ }
+ sbunlock(&soback->so_rcv);
+ sorele(soback);
> > +
> > + error = READ_ONCE(sosp->so_error);
> > + if (error && error != ETIMEDOUT && error != EFBIG && error != ELOOP)
> > goto release;
> > - }
>
> error may be != 0 here. There are some goto release below that do
> not set error. You sould set error = 0 here.
>
Fixed.
> > + if (so->so_proto->pr_flags & PR_SPLICE) {
> > + sb_mtx_lock(&so->so_snd);
> > + if (so->so_snd.sb_flags & SB_SPLICE)
> > + task_add(sosplice_taskq,
> > + &so->so_sp->ssp_soback->so_splicetask);
> > + if (issplicedback(so)) {
> > + sb_mtx_unlock(&so->so_snd);
> > + return;
> > + }
>
> I think the issplicedback(so) was outside of the if (so->so_proto->pr_flags
> & PR_SPLICE) deliberately.
>
> If somove() during soplice() sleeps, issplicedback(so) is set, but
> (so->so_proto->pr_flags & PR_SPLICE) is not. We do not schedule a
> task as somove() will do its job after sleep. But we do not want
> to wakeup the socket for writing.
You mixed protocol related `pr_flags' and PR_SPLICE bit with socket
buffer related `sb_flags' SB_SPLICE bit.
sorwakeup(struct socket *so)
{
/* ... */
#ifdef SOCKET_SPLICE
if (so->so_proto->pr_flags & PR_SPLICE) {
sb_mtx_lock(&so->so_rcv);
if (so->so_rcv.sb_flags & SB_SPLICE)
task_add(sosplice_taskq, &so->so_splicetask);
if (isspliced(so)) {
SB_SPLICE and `ssp_socket' are mutable, so we need to lock `sb_mtx'
mutex which protects them. So I check splicing ability of the prococol
on `pr_flags' and if it supports, then I lock `sb_mtx' mutex and check
is socket splicing. This was intentionally made to avoid useless locking
on non TCP and UDP sockets.
I fixed typos and white spaces. Updated diff also contains locks
description for 'sosplice' structure, this is not functional change.
Index: sys/kern/uipc_socket.c
===================================================================
RCS file: /cvs/src/sys/kern/uipc_socket.c,v
diff -u -p -r1.338 uipc_socket.c
--- sys/kern/uipc_socket.c 14 Jul 2024 15:42:23 -0000 1.338
+++ sys/kern/uipc_socket.c 20 Jul 2024 15:32:12 -0000
@@ -324,31 +324,22 @@ sofree(struct socket *so, int keep_lock)
sounlock(head);
}
- if (persocket) {
+ switch (so->so_proto->pr_domain->dom_family) {
+ case AF_INET:
+ case AF_INET6:
+ if (so->so_proto->pr_type == SOCK_STREAM)
+ break;
+ /* FALLTHROUGH */
+ default:
sounlock(so);
refcnt_finalize(&so->so_refcnt, "sofinal");
solock(so);
+ break;
}
sigio_free(&so->so_sigio);
klist_free(&so->so_rcv.sb_klist);
klist_free(&so->so_snd.sb_klist);
-#ifdef SOCKET_SPLICE
- if (issplicedback(so)) {
- int freeing = SOSP_FREEING_WRITE;
-
- if (so->so_sp->ssp_soback == so)
- freeing |= SOSP_FREEING_READ;
- sounsplice(so->so_sp->ssp_soback, so, freeing);
- }
- if (isspliced(so)) {
- int freeing = SOSP_FREEING_READ;
-
- if (so == so->so_sp->ssp_socket)
- freeing |= SOSP_FREEING_WRITE;
- sounsplice(so, so->so_sp->ssp_socket, freeing);
- }
-#endif /* SOCKET_SPLICE */
mtx_enter(&so->so_snd.sb_mtx);
sbrelease(so, &so->so_snd);
@@ -458,6 +449,85 @@ discard:
if (so->so_state & SS_NOFDREF)
panic("soclose NOFDREF: so %p, so_type %d", so, so->so_type);
so->so_state |= SS_NOFDREF;
+
+#ifdef SOCKET_SPLICE
+ if (so->so_sp) {
+ struct socket *soback;
+
+ if (so->so_proto->pr_flags & PR_WANTRCVD) {
+ /*
+ * Copy - Paste, but can't relock and sleep in
+ * sofree() in tcp(4) case. That's why tcp(4)
+ * still rely on solock() for splicing and
+ * unsplicing.
+ */
+
+ if (issplicedback(so)) {
+ int freeing = SOSP_FREEING_WRITE;
+
+ if (so->so_sp->ssp_soback == so)
+ freeing |= SOSP_FREEING_READ;
+ sounsplice(so->so_sp->ssp_soback, so, freeing);
+ }
+ if (isspliced(so)) {
+ int freeing = SOSP_FREEING_READ;
+
+ if (so == so->so_sp->ssp_socket)
+ freeing |= SOSP_FREEING_WRITE;
+ sounsplice(so, so->so_sp->ssp_socket, freeing);
+ }
+ goto free;
+ }
+
+ sounlock(so);
+ mtx_enter(&so->so_snd.sb_mtx);
+ /*
+ * Concurrent sounsplice() locks `sb_mtx' mutexes on
+ * both `so_snd' and `so_rcv' before unsplice sockets.
+ */
+ if ((soback = so->so_sp->ssp_soback) == NULL) {
+ mtx_leave(&so->so_snd.sb_mtx);
+ goto notsplicedback;
+ }
+ soref(soback);
+ mtx_leave(&so->so_snd.sb_mtx);
+
+ /*
+ * `so' can be only unspliced, and never spliced again.
+ * Thus if issplicedback(so) check is positive, socket is
+ * still spliced and `ssp_soback' points to the same
+ * socket that `soback'.
+ */
+ sblock(&soback->so_rcv, SBL_WAIT | SBL_NOINTR);
+ if (issplicedback(so)) {
+ int freeing = SOSP_FREEING_WRITE;
+
+ if (so->so_sp->ssp_soback == so)
+ freeing |= SOSP_FREEING_READ;
+ solock(soback);
+ sounsplice(so->so_sp->ssp_soback, so, freeing);
+ sounlock(soback);
+ }
+ sbunlock(&soback->so_rcv);
+ sorele(soback);
+
+notsplicedback:
+ sblock(&so->so_rcv, SBL_WAIT | SBL_NOINTR);
+ if (isspliced(so)) {
+ int freeing = SOSP_FREEING_READ;
+
+ if (so == so->so_sp->ssp_socket)
+ freeing |= SOSP_FREEING_WRITE;
+ solock(so);
+ sounsplice(so, so->so_sp->ssp_socket, freeing);
+ sounlock(so);
+ }
+ sbunlock(&so->so_rcv);
+
+ solock(so);
+ }
+free:
+#endif /* SOCKET_SPLICE */
/* sofree() calls sounlock(). */
sofree(so, 0);
return (error);
@@ -1411,14 +1481,6 @@ sosplice(struct socket *so, int fd, off_
goto release;
}
- /* Splice so and sosp together. */
- mtx_enter(&so->so_rcv.sb_mtx);
- mtx_enter(&sosp->so_snd.sb_mtx);
- so->so_sp->ssp_socket = sosp;
- sosp->so_sp->ssp_soback = so;
- mtx_leave(&sosp->so_snd.sb_mtx);
- mtx_leave(&so->so_rcv.sb_mtx);
-
so->so_splicelen = 0;
so->so_splicemax = max;
if (tv)
@@ -1429,9 +1491,20 @@ sosplice(struct socket *so, int fd, off_
task_set(&so->so_splicetask, sotask, so);
/*
- * To prevent softnet interrupt from calling somove() while
- * we sleep, the socket buffers are not marked as spliced yet.
+ * To prevent sorwakeup() calling somove() before this somove()
+ * has finished, the socket buffers are not marked as spliced yet.
*/
+
+ /* Splice so and sosp together. */
+ mtx_enter(&so->so_rcv.sb_mtx);
+ mtx_enter(&sosp->so_snd.sb_mtx);
+ so->so_sp->ssp_socket = sosp;
+ sosp->so_sp->ssp_soback = so;
+ mtx_leave(&sosp->so_snd.sb_mtx);
+ mtx_leave(&so->so_rcv.sb_mtx);
+
+ if ((so->so_proto->pr_flags & PR_WANTRCVD) == 0)
+ sounlock(so);
if (somove(so, M_WAIT)) {
mtx_enter(&so->so_rcv.sb_mtx);
mtx_enter(&sosp->so_snd.sb_mtx);
@@ -1440,6 +1513,8 @@ sosplice(struct socket *so, int fd, off_
mtx_leave(&sosp->so_snd.sb_mtx);
mtx_leave(&so->so_rcv.sb_mtx);
}
+ if ((so->so_proto->pr_flags & PR_WANTRCVD) == 0)
+ solock(so);
release:
sounlock(so);
@@ -1454,6 +1529,8 @@ sosplice(struct socket *so, int fd, off_
void
sounsplice(struct socket *so, struct socket *sosp, int freeing)
{
+ if ((so->so_proto->pr_flags & PR_WANTRCVD) == 0)
+ sbassertlocked(&so->so_rcv);
soassertlocked(so);
task_del(sosplice_taskq, &so->so_splicetask);
@@ -1479,32 +1556,51 @@ soidle(void *arg)
{
struct socket *so = arg;
+ sblock(&so->so_rcv, SBL_WAIT | SBL_NOINTR);
solock(so);
+ /*
+ * Depending on socket type, sblock(&so->so_rcv) or solock()
+ * is always held while modifying SB_SPLICE and
+ * so->so_sp->ssp_socket.
+ */
if (so->so_rcv.sb_flags & SB_SPLICE) {
so->so_error = ETIMEDOUT;
sounsplice(so, so->so_sp->ssp_socket, 0);
}
sounlock(so);
+ sbunlock(&so->so_rcv);
}
void
sotask(void *arg)
{
struct socket *so = arg;
+ int doyield = 0;
+ int sockstream = (so->so_proto->pr_flags & PR_WANTRCVD);
+
+ /*
+ * sblock() on `so_rcv' protects sockets from beind unspliced
+ * for UDP case. TCP sockets still rely on solock().
+ */
+
+ sblock(&so->so_rcv, SBL_WAIT | SBL_NOINTR);
+ if (sockstream)
+ solock(so);
- solock(so);
if (so->so_rcv.sb_flags & SB_SPLICE) {
- /*
- * We may not sleep here as sofree() and unsplice() may be
- * called from softnet interrupt context. This would remove
- * the socket during somove().
- */
+ if (sockstream)
+ doyield = 1;
somove(so, M_DONTWAIT);
}
- sounlock(so);
- /* Avoid user land starvation. */
- yield();
+ if (sockstream)
+ sounlock(so);
+ sbunlock(&so->so_rcv);
+
+ if (doyield) {
+ /* Avoid user land starvation. */
+ yield();
+ }
}
/*
@@ -1546,24 +1642,32 @@ somove(struct socket *so, int wait)
struct mbuf *m, **mp, *nextrecord;
u_long len, off, oobmark;
long space;
- int error = 0, maxreached = 0;
+ int error = 0, maxreached = 0, unsplice = 0;
unsigned int rcvstate;
+ int sockdgram = ((so->so_proto->pr_flags &
+ PR_WANTRCVD) == 0);
- soassertlocked(so);
+ if (sockdgram)
+ sbassertlocked(&so->so_rcv);
+ else
+ soassertlocked(so);
+
+ mtx_enter(&so->so_rcv.sb_mtx);
+ mtx_enter(&sosp->so_snd.sb_mtx);
nextpkt:
- if (so->so_error) {
- error = so->so_error;
+ if ((error = READ_ONCE(so->so_error)))
goto release;
- }
if (sosp->so_snd.sb_state & SS_CANTSENDMORE) {
error = EPIPE;
goto release;
}
- if (sosp->so_error && sosp->so_error != ETIMEDOUT &&
- sosp->so_error != EFBIG && sosp->so_error != ELOOP) {
- error = sosp->so_error;
- goto release;
+
+ error = READ_ONCE(sosp->so_error);
+ if (error) {
+ if (error != ETIMEDOUT && error != EFBIG && error != ELOOP)
+ goto release;
+ error = 0;
}
if ((sosp->so_state & SS_ISCONNECTED) == 0)
goto release;
@@ -1577,26 +1681,21 @@ somove(struct socket *so, int wait)
maxreached = 1;
}
}
- mtx_enter(&sosp->so_snd.sb_mtx);
space = sbspace_locked(sosp, &sosp->so_snd);
if (so->so_oobmark && so->so_oobmark < len &&
so->so_oobmark < space + 1024)
space += 1024;
if (space <= 0) {
- mtx_leave(&sosp->so_snd.sb_mtx);
maxreached = 0;
goto release;
}
if (space < len) {
maxreached = 0;
- if (space < sosp->so_snd.sb_lowat) {
- mtx_leave(&sosp->so_snd.sb_mtx);
+ if (space < sosp->so_snd.sb_lowat)
goto release;
- }
len = space;
}
sosp->so_snd.sb_state |= SS_ISSENDING;
- mtx_leave(&sosp->so_snd.sb_mtx);
SBLASTRECORDCHK(&so->so_rcv, "somove 1");
SBLASTMBUFCHK(&so->so_rcv, "somove 1");
@@ -1618,8 +1717,13 @@ somove(struct socket *so, int wait)
m = m->m_next;
if (m == NULL) {
sbdroprecord(so, &so->so_rcv);
- if (so->so_proto->pr_flags & PR_WANTRCVD)
+ if (so->so_proto->pr_flags & PR_WANTRCVD) {
+ mtx_leave(&sosp->so_snd.sb_mtx);
+ mtx_leave(&so->so_rcv.sb_mtx);
pru_rcvd(so);
+ mtx_enter(&so->so_rcv.sb_mtx);
+ mtx_enter(&sosp->so_snd.sb_mtx);
+ }
goto nextpkt;
}
@@ -1724,11 +1828,15 @@ somove(struct socket *so, int wait)
}
/* Send window update to source peer as receive buffer has changed. */
- if (so->so_proto->pr_flags & PR_WANTRCVD)
+ if (so->so_proto->pr_flags & PR_WANTRCVD) {
+ mtx_leave(&sosp->so_snd.sb_mtx);
+ mtx_leave(&so->so_rcv.sb_mtx);
pru_rcvd(so);
+ mtx_enter(&so->so_rcv.sb_mtx);
+ mtx_enter(&sosp->so_snd.sb_mtx);
+ }
/* Receive buffer did shrink by len bytes, adjust oob. */
- mtx_enter(&so->so_rcv.sb_mtx);
rcvstate = so->so_rcv.sb_state;
so->so_rcv.sb_state &= ~SS_RCVATMARK;
oobmark = so->so_oobmark;
@@ -1739,7 +1847,6 @@ somove(struct socket *so, int wait)
if (oobmark >= len)
oobmark = 0;
}
- mtx_leave(&so->so_rcv.sb_mtx);
/*
* Handle oob data. If any malloc fails, ignore error.
@@ -1755,7 +1862,12 @@ somove(struct socket *so, int wait)
} else if (oobmark) {
o = m_split(m, oobmark, wait);
if (o) {
+ mtx_leave(&sosp->so_snd.sb_mtx);
+ mtx_leave(&so->so_rcv.sb_mtx);
error = pru_send(sosp, m, NULL, NULL);
+ mtx_enter(&so->so_rcv.sb_mtx);
+ mtx_enter(&sosp->so_snd.sb_mtx);
+
if (error) {
if (sosp->so_snd.sb_state &
SS_CANTSENDMORE)
@@ -1773,7 +1885,13 @@ somove(struct socket *so, int wait)
if (o) {
o->m_len = 1;
*mtod(o, caddr_t) = *mtod(m, caddr_t);
+
+ mtx_leave(&sosp->so_snd.sb_mtx);
+ mtx_leave(&so->so_rcv.sb_mtx);
error = pru_sendoob(sosp, o, NULL, NULL);
+ mtx_enter(&so->so_rcv.sb_mtx);
+ mtx_enter(&sosp->so_snd.sb_mtx);
+
if (error) {
if (sosp->so_snd.sb_state & SS_CANTSENDMORE)
error = EPIPE;
@@ -1791,15 +1909,25 @@ somove(struct socket *so, int wait)
}
}
- mtx_enter(&sosp->so_snd.sb_mtx);
/* Append all remaining data to drain socket. */
if (so->so_rcv.sb_cc == 0 || maxreached)
sosp->so_snd.sb_state &= ~SS_ISSENDING;
+
mtx_leave(&sosp->so_snd.sb_mtx);
+ mtx_leave(&so->so_rcv.sb_mtx);
+ if (sockdgram)
+ solock_shared(sosp);
error = pru_send(sosp, m, NULL, NULL);
+ if (sockdgram)
+ sounlock_shared(sosp);
+
+ mtx_enter(&so->so_rcv.sb_mtx);
+ mtx_enter(&sosp->so_snd.sb_mtx);
+
if (error) {
- if (sosp->so_snd.sb_state & SS_CANTSENDMORE)
+ if (sosp->so_snd.sb_state & SS_CANTSENDMORE ||
+ so->so_pcb == NULL)
error = EPIPE;
goto release;
}
@@ -1810,26 +1938,35 @@ somove(struct socket *so, int wait)
goto nextpkt;
release:
- mtx_enter(&sosp->so_snd.sb_mtx);
sosp->so_snd.sb_state &= ~SS_ISSENDING;
- mtx_leave(&sosp->so_snd.sb_mtx);
if (!error && maxreached && so->so_splicemax == so->so_splicelen)
error = EFBIG;
if (error)
- so->so_error = error;
+ WRITE_ONCE(so->so_error, error);
+
if (((so->so_rcv.sb_state & SS_CANTRCVMORE) &&
so->so_rcv.sb_cc == 0) ||
(sosp->so_snd.sb_state & SS_CANTSENDMORE) ||
- maxreached || error) {
+ maxreached || error)
+ unsplice = 1;
+
+ mtx_leave(&sosp->so_snd.sb_mtx);
+ mtx_leave(&so->so_rcv.sb_mtx);
+
+ if (unsplice) {
+ if (sockdgram)
+ solock(so);
sounsplice(so, sosp, 0);
+ if (sockdgram)
+ sounlock(so);
+
return (0);
}
if (timerisset(&so->so_idletv))
timeout_add_tv(&so->so_idleto, &so->so_idletv);
return (1);
}
-
#endif /* SOCKET_SPLICE */
void
@@ -1839,22 +1976,16 @@ sorwakeup(struct socket *so)
soassertlocked_readonly(so);
#ifdef SOCKET_SPLICE
- if (so->so_rcv.sb_flags & SB_SPLICE) {
- /*
- * TCP has a sendbuffer that can handle multiple packets
- * at once. So queue the stream a bit to accumulate data.
- * The sosplice thread will call somove() later and send
- * the packets calling tcp_output() only once.
- * In the UDP case, send out the packets immediately.
- * Using a thread would make things slower.
- */
- if (so->so_proto->pr_flags & PR_WANTRCVD)
+ if (so->so_proto->pr_flags & PR_SPLICE) {
+ sb_mtx_lock(&so->so_rcv);
+ if (so->so_rcv.sb_flags & SB_SPLICE)
task_add(sosplice_taskq, &so->so_splicetask);
- else
- somove(so, M_DONTWAIT);
+ if (isspliced(so)) {
+ sb_mtx_unlock(&so->so_rcv);
+ return;
+ }
+ sb_mtx_unlock(&so->so_rcv);
}
- if (isspliced(so))
- return;
#endif
sowakeup(so, &so->so_rcv);
if (so->so_upcall)
@@ -1868,10 +1999,17 @@ sowwakeup(struct socket *so)
soassertlocked_readonly(so);
#ifdef SOCKET_SPLICE
- if (so->so_snd.sb_flags & SB_SPLICE)
- task_add(sosplice_taskq, &so->so_sp->ssp_soback->so_splicetask);
- if (issplicedback(so))
- return;
+ if (so->so_proto->pr_flags & PR_SPLICE) {
+ sb_mtx_lock(&so->so_snd);
+ if (so->so_snd.sb_flags & SB_SPLICE)
+ task_add(sosplice_taskq,
+ &so->so_sp->ssp_soback->so_splicetask);
+ if (issplicedback(so)) {
+ sb_mtx_unlock(&so->so_snd);
+ return;
+ }
+ sb_mtx_unlock(&so->so_snd);
+ }
#endif
sowakeup(so, &so->so_snd);
}
Index: sys/netinet/udp_usrreq.c
===================================================================
RCS file: /cvs/src/sys/netinet/udp_usrreq.c,v
diff -u -p -r1.322 udp_usrreq.c
--- sys/netinet/udp_usrreq.c 19 Jul 2024 15:41:58 -0000 1.322
+++ sys/netinet/udp_usrreq.c 20 Jul 2024 15:32:12 -0000
@@ -1209,6 +1209,11 @@ udp_send(struct socket *so, struct mbuf
soassertlocked_readonly(so);
+ if (inp == NULL) {
+ /* PCB could be destroyed, but socket still spliced. */
+ return (EINVAL);
+ }
+
#ifdef PIPEX
if (inp->inp_pipex) {
struct pipex_session *session;
Index: sys/sys/socketvar.h
===================================================================
RCS file: /cvs/src/sys/sys/socketvar.h,v
diff -u -p -r1.132 socketvar.h
--- sys/sys/socketvar.h 12 Jul 2024 17:20:18 -0000 1.132
+++ sys/sys/socketvar.h 20 Jul 2024 15:32:12 -0000
@@ -52,6 +52,33 @@ typedef __socklen_t socklen_t; /* length
TAILQ_HEAD(soqhead, socket);
/*
+ * Locks used to protect global data and struct members:
+ * I immutable after creation
+ * mr sb_mxt of so_rcv buffer
+ * ms sb_mtx of so_snd buffer
+ * br sblock() of so_rcv buffer
+ * bs sblock() od so_snd buffer
+ * s solock()
+ */
+
+/*
+ * XXXSMP: tcp(4) sockets rely on exclusive solock() for all the cases.
+ */
+
+/*
+ * Variables for socket splicing, allocated only when needed.
+ */
+struct sosplice {
+ struct socket *ssp_socket; /* [mr ms] send data to drain socket */
+ struct socket *ssp_soback; /* [ms ms] back ref to source socket */
+ off_t ssp_len; /* [mr] number of bytes spliced */
+ off_t ssp_max; /* [I] maximum number of bytes */
+ struct timeval ssp_idletv; /* [I] idle timeout */
+ struct timeout ssp_idleto;
+ struct task ssp_task; /* task for somove */
+};
+
+/*
* Kernel structure per socket.
* Contains send and receive buffer queues,
* handle on protocol and pointer to protocol
@@ -89,18 +116,8 @@ struct socket {
short so_timeo; /* connection timeout */
u_long so_oobmark; /* chars to oob mark */
u_int so_error; /* error affecting connection */
-/*
- * Variables for socket splicing, allocated only when needed.
- */
- struct sosplice {
- struct socket *ssp_socket; /* send data to drain socket */
- struct socket *ssp_soback; /* back ref to source socket */
- off_t ssp_len; /* number of bytes spliced */
- off_t ssp_max; /* maximum number of bytes */
- struct timeval ssp_idletv; /* idle timeout */
- struct timeout ssp_idleto;
- struct task ssp_task; /* task for somove */
- } *so_sp;
+
+ struct sosplice *so_sp; /* [s br] */
/*
* Variables for socket buffering.
*/
@@ -329,6 +346,12 @@ int sblock(struct sockbuf *, int);
/* release lock on sockbuf sb */
void sbunlock(struct sockbuf *);
+
+static inline void
+sbassertlocked(struct sockbuf *sb)
+{
+ rw_assert_wrlock(&sb->sb_lock);
+}
#define SB_EMPTY_FIXUP(sb) do { \
if ((sb)->sb_mb == NULL) { \
Index: sys/kern/uipc_socket.c
===================================================================
RCS file: /cvs/src/sys/kern/uipc_socket.c,v
diff -u -p -r1.338 uipc_socket.c
--- sys/kern/uipc_socket.c 14 Jul 2024 15:42:23 -0000 1.338
+++ sys/kern/uipc_socket.c 20 Jul 2024 15:32:12 -0000
@@ -324,31 +324,22 @@ sofree(struct socket *so, int keep_lock)
sounlock(head);
}
- if (persocket) {
+ switch (so->so_proto->pr_domain->dom_family) {
+ case AF_INET:
+ case AF_INET6:
+ if (so->so_proto->pr_type == SOCK_STREAM)
+ break;
+ /* FALLTHROUGH */
+ default:
sounlock(so);
refcnt_finalize(&so->so_refcnt, "sofinal");
solock(so);
+ break;
}
sigio_free(&so->so_sigio);
klist_free(&so->so_rcv.sb_klist);
klist_free(&so->so_snd.sb_klist);
-#ifdef SOCKET_SPLICE
- if (issplicedback(so)) {
- int freeing = SOSP_FREEING_WRITE;
-
- if (so->so_sp->ssp_soback == so)
- freeing |= SOSP_FREEING_READ;
- sounsplice(so->so_sp->ssp_soback, so, freeing);
- }
- if (isspliced(so)) {
- int freeing = SOSP_FREEING_READ;
-
- if (so == so->so_sp->ssp_socket)
- freeing |= SOSP_FREEING_WRITE;
- sounsplice(so, so->so_sp->ssp_socket, freeing);
- }
-#endif /* SOCKET_SPLICE */
mtx_enter(&so->so_snd.sb_mtx);
sbrelease(so, &so->so_snd);
@@ -458,6 +449,85 @@ discard:
if (so->so_state & SS_NOFDREF)
panic("soclose NOFDREF: so %p, so_type %d", so, so->so_type);
so->so_state |= SS_NOFDREF;
+
+#ifdef SOCKET_SPLICE
+ if (so->so_sp) {
+ struct socket *soback;
+
+ if (so->so_proto->pr_flags & PR_WANTRCVD) {
+ /*
+ * Copy - Paste, but can't relock and sleep in
+ * sofree() in tcp(4) case. That's why tcp(4)
+ * still rely on solock() for splicing and
+ * unsplicing.
+ */
+
+ if (issplicedback(so)) {
+ int freeing = SOSP_FREEING_WRITE;
+
+ if (so->so_sp->ssp_soback == so)
+ freeing |= SOSP_FREEING_READ;
+ sounsplice(so->so_sp->ssp_soback, so, freeing);
+ }
+ if (isspliced(so)) {
+ int freeing = SOSP_FREEING_READ;
+
+ if (so == so->so_sp->ssp_socket)
+ freeing |= SOSP_FREEING_WRITE;
+ sounsplice(so, so->so_sp->ssp_socket, freeing);
+ }
+ goto free;
+ }
+
+ sounlock(so);
+ mtx_enter(&so->so_snd.sb_mtx);
+ /*
+ * Concurrent sounsplice() locks `sb_mtx' mutexes on
+ * both `so_snd' and `so_rcv' before unsplice sockets.
+ */
+ if ((soback = so->so_sp->ssp_soback) == NULL) {
+ mtx_leave(&so->so_snd.sb_mtx);
+ goto notsplicedback;
+ }
+ soref(soback);
+ mtx_leave(&so->so_snd.sb_mtx);
+
+ /*
+ * `so' can be only unspliced, and never spliced again.
+ * Thus if issplicedback(so) check is positive, socket is
+ * still spliced and `ssp_soback' points to the same
+ * socket that `soback'.
+ */
+ sblock(&soback->so_rcv, SBL_WAIT | SBL_NOINTR);
+ if (issplicedback(so)) {
+ int freeing = SOSP_FREEING_WRITE;
+
+ if (so->so_sp->ssp_soback == so)
+ freeing |= SOSP_FREEING_READ;
+ solock(soback);
+ sounsplice(so->so_sp->ssp_soback, so, freeing);
+ sounlock(soback);
+ }
+ sbunlock(&soback->so_rcv);
+ sorele(soback);
+
+notsplicedback:
+ sblock(&so->so_rcv, SBL_WAIT | SBL_NOINTR);
+ if (isspliced(so)) {
+ int freeing = SOSP_FREEING_READ;
+
+ if (so == so->so_sp->ssp_socket)
+ freeing |= SOSP_FREEING_WRITE;
+ solock(so);
+ sounsplice(so, so->so_sp->ssp_socket, freeing);
+ sounlock(so);
+ }
+ sbunlock(&so->so_rcv);
+
+ solock(so);
+ }
+free:
+#endif /* SOCKET_SPLICE */
/* sofree() calls sounlock(). */
sofree(so, 0);
return (error);
@@ -1411,14 +1481,6 @@ sosplice(struct socket *so, int fd, off_
goto release;
}
- /* Splice so and sosp together. */
- mtx_enter(&so->so_rcv.sb_mtx);
- mtx_enter(&sosp->so_snd.sb_mtx);
- so->so_sp->ssp_socket = sosp;
- sosp->so_sp->ssp_soback = so;
- mtx_leave(&sosp->so_snd.sb_mtx);
- mtx_leave(&so->so_rcv.sb_mtx);
-
so->so_splicelen = 0;
so->so_splicemax = max;
if (tv)
@@ -1429,9 +1491,20 @@ sosplice(struct socket *so, int fd, off_
task_set(&so->so_splicetask, sotask, so);
/*
- * To prevent softnet interrupt from calling somove() while
- * we sleep, the socket buffers are not marked as spliced yet.
+ * To prevent sorwakeup() calling somove() before this somove()
+ * has finished, the socket buffers are not marked as spliced yet.
*/
+
+ /* Splice so and sosp together. */
+ mtx_enter(&so->so_rcv.sb_mtx);
+ mtx_enter(&sosp->so_snd.sb_mtx);
+ so->so_sp->ssp_socket = sosp;
+ sosp->so_sp->ssp_soback = so;
+ mtx_leave(&sosp->so_snd.sb_mtx);
+ mtx_leave(&so->so_rcv.sb_mtx);
+
+ if ((so->so_proto->pr_flags & PR_WANTRCVD) == 0)
+ sounlock(so);
if (somove(so, M_WAIT)) {
mtx_enter(&so->so_rcv.sb_mtx);
mtx_enter(&sosp->so_snd.sb_mtx);
@@ -1440,6 +1513,8 @@ sosplice(struct socket *so, int fd, off_
mtx_leave(&sosp->so_snd.sb_mtx);
mtx_leave(&so->so_rcv.sb_mtx);
}
+ if ((so->so_proto->pr_flags & PR_WANTRCVD) == 0)
+ solock(so);
release:
sounlock(so);
@@ -1454,6 +1529,8 @@ sosplice(struct socket *so, int fd, off_
void
sounsplice(struct socket *so, struct socket *sosp, int freeing)
{
+ if ((so->so_proto->pr_flags & PR_WANTRCVD) == 0)
+ sbassertlocked(&so->so_rcv);
soassertlocked(so);
task_del(sosplice_taskq, &so->so_splicetask);
@@ -1479,32 +1556,51 @@ soidle(void *arg)
{
struct socket *so = arg;
+ sblock(&so->so_rcv, SBL_WAIT | SBL_NOINTR);
solock(so);
+ /*
+ * Depending on socket type, sblock(&so->so_rcv) or solock()
+ * is always held while modifying SB_SPLICE and
+ * so->so_sp->ssp_socket.
+ */
if (so->so_rcv.sb_flags & SB_SPLICE) {
so->so_error = ETIMEDOUT;
sounsplice(so, so->so_sp->ssp_socket, 0);
}
sounlock(so);
+ sbunlock(&so->so_rcv);
}
void
sotask(void *arg)
{
struct socket *so = arg;
+ int doyield = 0;
+ int sockstream = (so->so_proto->pr_flags & PR_WANTRCVD);
+
+ /*
+ * sblock() on `so_rcv' protects sockets from beind unspliced
+ * for UDP case. TCP sockets still rely on solock().
+ */
+
+ sblock(&so->so_rcv, SBL_WAIT | SBL_NOINTR);
+ if (sockstream)
+ solock(so);
- solock(so);
if (so->so_rcv.sb_flags & SB_SPLICE) {
- /*
- * We may not sleep here as sofree() and unsplice() may be
- * called from softnet interrupt context. This would remove
- * the socket during somove().
- */
+ if (sockstream)
+ doyield = 1;
somove(so, M_DONTWAIT);
}
- sounlock(so);
- /* Avoid user land starvation. */
- yield();
+ if (sockstream)
+ sounlock(so);
+ sbunlock(&so->so_rcv);
+
+ if (doyield) {
+ /* Avoid user land starvation. */
+ yield();
+ }
}
/*
@@ -1546,24 +1642,32 @@ somove(struct socket *so, int wait)
struct mbuf *m, **mp, *nextrecord;
u_long len, off, oobmark;
long space;
- int error = 0, maxreached = 0;
+ int error = 0, maxreached = 0, unsplice = 0;
unsigned int rcvstate;
+ int sockdgram = ((so->so_proto->pr_flags &
+ PR_WANTRCVD) == 0);
- soassertlocked(so);
+ if (sockdgram)
+ sbassertlocked(&so->so_rcv);
+ else
+ soassertlocked(so);
+
+ mtx_enter(&so->so_rcv.sb_mtx);
+ mtx_enter(&sosp->so_snd.sb_mtx);
nextpkt:
- if (so->so_error) {
- error = so->so_error;
+ if ((error = READ_ONCE(so->so_error)))
goto release;
- }
if (sosp->so_snd.sb_state & SS_CANTSENDMORE) {
error = EPIPE;
goto release;
}
- if (sosp->so_error && sosp->so_error != ETIMEDOUT &&
- sosp->so_error != EFBIG && sosp->so_error != ELOOP) {
- error = sosp->so_error;
- goto release;
+
+ error = READ_ONCE(sosp->so_error);
+ if (error) {
+ if (error != ETIMEDOUT && error != EFBIG && error != ELOOP)
+ goto release;
+ error = 0;
}
if ((sosp->so_state & SS_ISCONNECTED) == 0)
goto release;
@@ -1577,26 +1681,21 @@ somove(struct socket *so, int wait)
maxreached = 1;
}
}
- mtx_enter(&sosp->so_snd.sb_mtx);
space = sbspace_locked(sosp, &sosp->so_snd);
if (so->so_oobmark && so->so_oobmark < len &&
so->so_oobmark < space + 1024)
space += 1024;
if (space <= 0) {
- mtx_leave(&sosp->so_snd.sb_mtx);
maxreached = 0;
goto release;
}
if (space < len) {
maxreached = 0;
- if (space < sosp->so_snd.sb_lowat) {
- mtx_leave(&sosp->so_snd.sb_mtx);
+ if (space < sosp->so_snd.sb_lowat)
goto release;
- }
len = space;
}
sosp->so_snd.sb_state |= SS_ISSENDING;
- mtx_leave(&sosp->so_snd.sb_mtx);
SBLASTRECORDCHK(&so->so_rcv, "somove 1");
SBLASTMBUFCHK(&so->so_rcv, "somove 1");
@@ -1618,8 +1717,13 @@ somove(struct socket *so, int wait)
m = m->m_next;
if (m == NULL) {
sbdroprecord(so, &so->so_rcv);
- if (so->so_proto->pr_flags & PR_WANTRCVD)
+ if (so->so_proto->pr_flags & PR_WANTRCVD) {
+ mtx_leave(&sosp->so_snd.sb_mtx);
+ mtx_leave(&so->so_rcv.sb_mtx);
pru_rcvd(so);
+ mtx_enter(&so->so_rcv.sb_mtx);
+ mtx_enter(&sosp->so_snd.sb_mtx);
+ }
goto nextpkt;
}
@@ -1724,11 +1828,15 @@ somove(struct socket *so, int wait)
}
/* Send window update to source peer as receive buffer has changed. */
- if (so->so_proto->pr_flags & PR_WANTRCVD)
+ if (so->so_proto->pr_flags & PR_WANTRCVD) {
+ mtx_leave(&sosp->so_snd.sb_mtx);
+ mtx_leave(&so->so_rcv.sb_mtx);
pru_rcvd(so);
+ mtx_enter(&so->so_rcv.sb_mtx);
+ mtx_enter(&sosp->so_snd.sb_mtx);
+ }
/* Receive buffer did shrink by len bytes, adjust oob. */
- mtx_enter(&so->so_rcv.sb_mtx);
rcvstate = so->so_rcv.sb_state;
so->so_rcv.sb_state &= ~SS_RCVATMARK;
oobmark = so->so_oobmark;
@@ -1739,7 +1847,6 @@ somove(struct socket *so, int wait)
if (oobmark >= len)
oobmark = 0;
}
- mtx_leave(&so->so_rcv.sb_mtx);
/*
* Handle oob data. If any malloc fails, ignore error.
@@ -1755,7 +1862,12 @@ somove(struct socket *so, int wait)
} else if (oobmark) {
o = m_split(m, oobmark, wait);
if (o) {
+ mtx_leave(&sosp->so_snd.sb_mtx);
+ mtx_leave(&so->so_rcv.sb_mtx);
error = pru_send(sosp, m, NULL, NULL);
+ mtx_enter(&so->so_rcv.sb_mtx);
+ mtx_enter(&sosp->so_snd.sb_mtx);
+
if (error) {
if (sosp->so_snd.sb_state &
SS_CANTSENDMORE)
@@ -1773,7 +1885,13 @@ somove(struct socket *so, int wait)
if (o) {
o->m_len = 1;
*mtod(o, caddr_t) = *mtod(m, caddr_t);
+
+ mtx_leave(&sosp->so_snd.sb_mtx);
+ mtx_leave(&so->so_rcv.sb_mtx);
error = pru_sendoob(sosp, o, NULL, NULL);
+ mtx_enter(&so->so_rcv.sb_mtx);
+ mtx_enter(&sosp->so_snd.sb_mtx);
+
if (error) {
if (sosp->so_snd.sb_state & SS_CANTSENDMORE)
error = EPIPE;
@@ -1791,15 +1909,25 @@ somove(struct socket *so, int wait)
}
}
- mtx_enter(&sosp->so_snd.sb_mtx);
/* Append all remaining data to drain socket. */
if (so->so_rcv.sb_cc == 0 || maxreached)
sosp->so_snd.sb_state &= ~SS_ISSENDING;
+
mtx_leave(&sosp->so_snd.sb_mtx);
+ mtx_leave(&so->so_rcv.sb_mtx);
+ if (sockdgram)
+ solock_shared(sosp);
error = pru_send(sosp, m, NULL, NULL);
+ if (sockdgram)
+ sounlock_shared(sosp);
+
+ mtx_enter(&so->so_rcv.sb_mtx);
+ mtx_enter(&sosp->so_snd.sb_mtx);
+
if (error) {
- if (sosp->so_snd.sb_state & SS_CANTSENDMORE)
+ if (sosp->so_snd.sb_state & SS_CANTSENDMORE ||
+ so->so_pcb == NULL)
error = EPIPE;
goto release;
}
@@ -1810,26 +1938,35 @@ somove(struct socket *so, int wait)
goto nextpkt;
release:
- mtx_enter(&sosp->so_snd.sb_mtx);
sosp->so_snd.sb_state &= ~SS_ISSENDING;
- mtx_leave(&sosp->so_snd.sb_mtx);
if (!error && maxreached && so->so_splicemax == so->so_splicelen)
error = EFBIG;
if (error)
- so->so_error = error;
+ WRITE_ONCE(so->so_error, error);
+
if (((so->so_rcv.sb_state & SS_CANTRCVMORE) &&
so->so_rcv.sb_cc == 0) ||
(sosp->so_snd.sb_state & SS_CANTSENDMORE) ||
- maxreached || error) {
+ maxreached || error)
+ unsplice = 1;
+
+ mtx_leave(&sosp->so_snd.sb_mtx);
+ mtx_leave(&so->so_rcv.sb_mtx);
+
+ if (unsplice) {
+ if (sockdgram)
+ solock(so);
sounsplice(so, sosp, 0);
+ if (sockdgram)
+ sounlock(so);
+
return (0);
}
if (timerisset(&so->so_idletv))
timeout_add_tv(&so->so_idleto, &so->so_idletv);
return (1);
}
-
#endif /* SOCKET_SPLICE */
void
@@ -1839,22 +1976,16 @@ sorwakeup(struct socket *so)
soassertlocked_readonly(so);
#ifdef SOCKET_SPLICE
- if (so->so_rcv.sb_flags & SB_SPLICE) {
- /*
- * TCP has a sendbuffer that can handle multiple packets
- * at once. So queue the stream a bit to accumulate data.
- * The sosplice thread will call somove() later and send
- * the packets calling tcp_output() only once.
- * In the UDP case, send out the packets immediately.
- * Using a thread would make things slower.
- */
- if (so->so_proto->pr_flags & PR_WANTRCVD)
+ if (so->so_proto->pr_flags & PR_SPLICE) {
+ sb_mtx_lock(&so->so_rcv);
+ if (so->so_rcv.sb_flags & SB_SPLICE)
task_add(sosplice_taskq, &so->so_splicetask);
- else
- somove(so, M_DONTWAIT);
+ if (isspliced(so)) {
+ sb_mtx_unlock(&so->so_rcv);
+ return;
+ }
+ sb_mtx_unlock(&so->so_rcv);
}
- if (isspliced(so))
- return;
#endif
sowakeup(so, &so->so_rcv);
if (so->so_upcall)
@@ -1868,10 +1999,17 @@ sowwakeup(struct socket *so)
soassertlocked_readonly(so);
#ifdef SOCKET_SPLICE
- if (so->so_snd.sb_flags & SB_SPLICE)
- task_add(sosplice_taskq, &so->so_sp->ssp_soback->so_splicetask);
- if (issplicedback(so))
- return;
+ if (so->so_proto->pr_flags & PR_SPLICE) {
+ sb_mtx_lock(&so->so_snd);
+ if (so->so_snd.sb_flags & SB_SPLICE)
+ task_add(sosplice_taskq,
+ &so->so_sp->ssp_soback->so_splicetask);
+ if (issplicedback(so)) {
+ sb_mtx_unlock(&so->so_snd);
+ return;
+ }
+ sb_mtx_unlock(&so->so_snd);
+ }
#endif
sowakeup(so, &so->so_snd);
}
Index: sys/netinet/udp_usrreq.c
===================================================================
RCS file: /cvs/src/sys/netinet/udp_usrreq.c,v
diff -u -p -r1.322 udp_usrreq.c
--- sys/netinet/udp_usrreq.c 19 Jul 2024 15:41:58 -0000 1.322
+++ sys/netinet/udp_usrreq.c 20 Jul 2024 15:32:12 -0000
@@ -1209,6 +1209,11 @@ udp_send(struct socket *so, struct mbuf
soassertlocked_readonly(so);
+ if (inp == NULL) {
+ /* PCB could be destroyed, but socket still spliced. */
+ return (EINVAL);
+ }
+
#ifdef PIPEX
if (inp->inp_pipex) {
struct pipex_session *session;
Index: sys/sys/socketvar.h
===================================================================
RCS file: /cvs/src/sys/sys/socketvar.h,v
diff -u -p -r1.132 socketvar.h
--- sys/sys/socketvar.h 12 Jul 2024 17:20:18 -0000 1.132
+++ sys/sys/socketvar.h 20 Jul 2024 15:32:12 -0000
@@ -52,6 +52,33 @@ typedef __socklen_t socklen_t; /* length
TAILQ_HEAD(soqhead, socket);
/*
+ * Locks used to protect global data and struct members:
+ * I immutable after creation
+ * mr sb_mxt of so_rcv buffer
+ * ms sb_mtx of so_snd buffer
+ * br sblock() of so_rcv buffer
+ * bs sblock() od so_snd buffer
+ * s solock()
+ */
+
+/*
+ * XXXSMP: tcp(4) sockets rely on exclusive solock() for all the cases.
+ */
+
+/*
+ * Variables for socket splicing, allocated only when needed.
+ */
+struct sosplice {
+ struct socket *ssp_socket; /* [mr ms] send data to drain socket */
+ struct socket *ssp_soback; /* [ms ms] back ref to source socket */
+ off_t ssp_len; /* [mr] number of bytes spliced */
+ off_t ssp_max; /* [I] maximum number of bytes */
+ struct timeval ssp_idletv; /* [I] idle timeout */
+ struct timeout ssp_idleto;
+ struct task ssp_task; /* task for somove */
+};
+
+/*
* Kernel structure per socket.
* Contains send and receive buffer queues,
* handle on protocol and pointer to protocol
@@ -89,18 +116,8 @@ struct socket {
short so_timeo; /* connection timeout */
u_long so_oobmark; /* chars to oob mark */
u_int so_error; /* error affecting connection */
-/*
- * Variables for socket splicing, allocated only when needed.
- */
- struct sosplice {
- struct socket *ssp_socket; /* send data to drain socket */
- struct socket *ssp_soback; /* back ref to source socket */
- off_t ssp_len; /* number of bytes spliced */
- off_t ssp_max; /* maximum number of bytes */
- struct timeval ssp_idletv; /* idle timeout */
- struct timeout ssp_idleto;
- struct task ssp_task; /* task for somove */
- } *so_sp;
+
+ struct sosplice *so_sp; /* [s br] */
/*
* Variables for socket buffering.
*/
@@ -329,6 +346,12 @@ int sblock(struct sockbuf *, int);
/* release lock on sockbuf sb */
void sbunlock(struct sockbuf *);
+
+static inline void
+sbassertlocked(struct sockbuf *sb)
+{
+ rw_assert_wrlock(&sb->sb_lock);
+}
#define SB_EMPTY_FIXUP(sb) do { \
if ((sb)->sb_mb == NULL) { \
Unlock udp(4) somove()