Download raw body.
Unlock udp(4) somove()
On Fri, Jul 19, 2024 at 08:32:23PM +0200, Alexander Bluhm wrote:
> Look at my proposal. I did not change unlocking of pru_rcvd(). I
> just moved it directly around unlock/pru_rcvd/lock. Only difference
> is that sbdroprecord() runs with sosp->so_snd.sb_mtx locked. That
> does not matter. In the UDP path we do not unlock/lock when we go
> to nextpkt.
>
Understood. This one releases `sb_mtx' mutexes only around pru_rcvd().
Index: sys/kern/uipc_socket.c
===================================================================
RCS file: /cvs/src/sys/kern/uipc_socket.c,v
diff -u -p -r1.338 uipc_socket.c
--- sys/kern/uipc_socket.c 14 Jul 2024 15:42:23 -0000 1.338
+++ sys/kern/uipc_socket.c 19 Jul 2024 18:42:26 -0000
@@ -324,31 +324,22 @@ sofree(struct socket *so, int keep_lock)
sounlock(head);
}
- if (persocket) {
+ switch (so->so_proto->pr_domain->dom_family) {
+ case AF_INET:
+ case AF_INET6:
+ if (so->so_proto->pr_type == SOCK_STREAM)
+ break;
+ /* FALLTHROUGH */
+ default:
sounlock(so);
refcnt_finalize(&so->so_refcnt, "sofinal");
solock(so);
+ break;
}
sigio_free(&so->so_sigio);
klist_free(&so->so_rcv.sb_klist);
klist_free(&so->so_snd.sb_klist);
-#ifdef SOCKET_SPLICE
- if (issplicedback(so)) {
- int freeing = SOSP_FREEING_WRITE;
-
- if (so->so_sp->ssp_soback == so)
- freeing |= SOSP_FREEING_READ;
- sounsplice(so->so_sp->ssp_soback, so, freeing);
- }
- if (isspliced(so)) {
- int freeing = SOSP_FREEING_READ;
-
- if (so == so->so_sp->ssp_socket)
- freeing |= SOSP_FREEING_WRITE;
- sounsplice(so, so->so_sp->ssp_socket, freeing);
- }
-#endif /* SOCKET_SPLICE */
mtx_enter(&so->so_snd.sb_mtx);
sbrelease(so, &so->so_snd);
@@ -458,6 +449,83 @@ discard:
if (so->so_state & SS_NOFDREF)
panic("soclose NOFDREF: so %p, so_type %d", so, so->so_type);
so->so_state |= SS_NOFDREF;
+
+#ifdef SOCKET_SPLICE
+ if (so->so_sp) {
+ struct socket *soback;
+
+ if (so->so_proto->pr_flags & PR_WANTRCVD) {
+ /*
+ * Copy - Paste, but can't relock an sleep in
+ * sofree() in tcp(4) case. That's why tcp(4)
+ * still rely on solock() for splicing and
+ * unsplicing.
+ */
+
+ if (issplicedback(so)) {
+ int freeing = SOSP_FREEING_WRITE;
+
+ if (so->so_sp->ssp_soback == so)
+ freeing |= SOSP_FREEING_READ;
+ sounsplice(so->so_sp->ssp_soback, so, freeing);
+ }
+ if (isspliced(so)) {
+ int freeing = SOSP_FREEING_READ;
+
+ if (so == so->so_sp->ssp_socket)
+ freeing |= SOSP_FREEING_WRITE;
+ sounsplice(so, so->so_sp->ssp_socket, freeing);
+ }
+ goto free;
+ }
+
+ sounlock(so);
+ mtx_enter(&so->so_snd.sb_mtx);
+ /*
+ * Concurrent sounsplice() locks `sb_mtx' mutextes on
+ * both `so_snd' and `so_rcv' before unsplice sockets.
+ */
+ if ((soback = so->so_sp->ssp_soback) == NULL) {
+ mtx_leave(&so->so_snd.sb_mtx);
+ goto notsplicedback;
+ }
+ soref(soback);
+ mtx_leave(&so->so_snd.sb_mtx);
+
+ sblock(&soback->so_rcv, SBL_WAIT | SBL_NOINTR);
+ /*
+ * sblock() is always taken on `so_rcv' before call
+ * sounsplice(). `so' is dying and can be only unspliced.
+ */
+ if (issplicedback(so)) {
+ int freeing = SOSP_FREEING_WRITE;
+
+ if (so->so_sp->ssp_soback == so)
+ freeing |= SOSP_FREEING_READ;
+ solock(soback);
+ sounsplice(so->so_sp->ssp_soback, so, freeing);
+ sounlock(soback);
+ }
+ sbunlock(&soback->so_rcv);
+ sorele(soback);
+
+notsplicedback:
+ sblock(&so->so_rcv, SBL_WAIT | SBL_NOINTR);
+ if (isspliced(so)) {
+ int freeing = SOSP_FREEING_READ;
+
+ if (so == so->so_sp->ssp_socket)
+ freeing |= SOSP_FREEING_WRITE;
+ solock(so);
+ sounsplice(so, so->so_sp->ssp_socket, freeing);
+ sounlock(so);
+ }
+ sbunlock(&so->so_rcv);
+
+ solock(so);
+ }
+free:
+#endif /* SOCKET_SPLICE */
/* sofree() calls sounlock(). */
sofree(so, 0);
return (error);
@@ -1411,14 +1479,6 @@ sosplice(struct socket *so, int fd, off_
goto release;
}
- /* Splice so and sosp together. */
- mtx_enter(&so->so_rcv.sb_mtx);
- mtx_enter(&sosp->so_snd.sb_mtx);
- so->so_sp->ssp_socket = sosp;
- sosp->so_sp->ssp_soback = so;
- mtx_leave(&sosp->so_snd.sb_mtx);
- mtx_leave(&so->so_rcv.sb_mtx);
-
so->so_splicelen = 0;
so->so_splicemax = max;
if (tv)
@@ -1429,9 +1489,20 @@ sosplice(struct socket *so, int fd, off_
task_set(&so->so_splicetask, sotask, so);
/*
- * To prevent softnet interrupt from calling somove() while
- * we sleep, the socket buffers are not marked as spliced yet.
+ * To prevent sorwakeup() calling somove() before this somove()
+ * has finished, the socket buffers are not marked as spliced yet.
*/
+
+ /* Splice so and sosp together. */
+ mtx_enter(&so->so_rcv.sb_mtx);
+ mtx_enter(&sosp->so_snd.sb_mtx);
+ so->so_sp->ssp_socket = sosp;
+ sosp->so_sp->ssp_soback = so;
+ mtx_leave(&sosp->so_snd.sb_mtx);
+ mtx_leave(&so->so_rcv.sb_mtx);
+
+ if ((so->so_proto->pr_flags & PR_WANTRCVD) == 0)
+ sounlock(so);
if (somove(so, M_WAIT)) {
mtx_enter(&so->so_rcv.sb_mtx);
mtx_enter(&sosp->so_snd.sb_mtx);
@@ -1440,6 +1511,8 @@ sosplice(struct socket *so, int fd, off_
mtx_leave(&sosp->so_snd.sb_mtx);
mtx_leave(&so->so_rcv.sb_mtx);
}
+ if ((so->so_proto->pr_flags & PR_WANTRCVD) == 0)
+ solock(so);
release:
sounlock(so);
@@ -1454,6 +1527,8 @@ sosplice(struct socket *so, int fd, off_
void
sounsplice(struct socket *so, struct socket *sosp, int freeing)
{
+ if ((so->so_proto->pr_flags & PR_WANTRCVD) == 0)
+ sbassertlocked(&so->so_rcv);
soassertlocked(so);
task_del(sosplice_taskq, &so->so_splicetask);
@@ -1479,32 +1554,51 @@ soidle(void *arg)
{
struct socket *so = arg;
+ sblock(&so->so_rcv, SBL_WAIT | SBL_NOINTR);
solock(so);
+ /*
+ * Depending on socket type, sblock(&so->so_rcv) or solock()
+ * is always held while modifying SB_SPLICE and
+ * so->so_sp->ssp_socket.
+ */
if (so->so_rcv.sb_flags & SB_SPLICE) {
so->so_error = ETIMEDOUT;
sounsplice(so, so->so_sp->ssp_socket, 0);
}
sounlock(so);
+ sbunlock(&so->so_rcv);
}
void
sotask(void *arg)
{
struct socket *so = arg;
+ int doyield = 0;
+ int sockstream = (so->so_proto->pr_flags & PR_WANTRCVD);
+
+ /*
+ * sblock() on `so_rcv' protects sockets from beind unspliced
+ * for UDP case. TCP sockets still rely on solock().
+ */
+
+ sblock(&so->so_rcv, SBL_WAIT | SBL_NOINTR);
+ if (sockstream)
+ solock(so);
- solock(so);
if (so->so_rcv.sb_flags & SB_SPLICE) {
- /*
- * We may not sleep here as sofree() and unsplice() may be
- * called from softnet interrupt context. This would remove
- * the socket during somove().
- */
+ if (sockstream)
+ doyield = 1;
somove(so, M_DONTWAIT);
}
- sounlock(so);
- /* Avoid user land starvation. */
- yield();
+ if (sockstream)
+ sounlock(so);
+ sbunlock(&so->so_rcv);
+
+ if (doyield) {
+ /* Avoid user land starvation. */
+ yield();
+ }
}
/*
@@ -1546,25 +1640,30 @@ somove(struct socket *so, int wait)
struct mbuf *m, **mp, *nextrecord;
u_long len, off, oobmark;
long space;
- int error = 0, maxreached = 0;
+ int error = 0, maxreached = 0, unsplice = 0;
unsigned int rcvstate;
+ int sockdgram = ((so->so_proto->pr_flags &
+ PR_WANTRCVD) == 0);
- soassertlocked(so);
+ if (sockdgram)
+ sbassertlocked(&so->so_rcv);
+ else
+ soassertlocked(so);
+
+ mtx_enter(&so->so_rcv.sb_mtx);
+ mtx_enter(&sosp->so_snd.sb_mtx);
nextpkt:
- if (so->so_error) {
- error = so->so_error;
+ if ((error = READ_ONCE(so->so_error)))
goto release;
- }
if (sosp->so_snd.sb_state & SS_CANTSENDMORE) {
error = EPIPE;
goto release;
}
- if (sosp->so_error && sosp->so_error != ETIMEDOUT &&
- sosp->so_error != EFBIG && sosp->so_error != ELOOP) {
- error = sosp->so_error;
+
+ error = READ_ONCE(sosp->so_error);
+ if (error && error != ETIMEDOUT && error != EFBIG && error != ELOOP)
goto release;
- }
if ((sosp->so_state & SS_ISCONNECTED) == 0)
goto release;
@@ -1577,26 +1676,22 @@ somove(struct socket *so, int wait)
maxreached = 1;
}
}
- mtx_enter(&sosp->so_snd.sb_mtx);
space = sbspace_locked(sosp, &sosp->so_snd);
if (so->so_oobmark && so->so_oobmark < len &&
so->so_oobmark < space + 1024)
space += 1024;
if (space <= 0) {
- mtx_leave(&sosp->so_snd.sb_mtx);
maxreached = 0;
goto release;
}
if (space < len) {
maxreached = 0;
if (space < sosp->so_snd.sb_lowat) {
- mtx_leave(&sosp->so_snd.sb_mtx);
goto release;
}
len = space;
}
sosp->so_snd.sb_state |= SS_ISSENDING;
- mtx_leave(&sosp->so_snd.sb_mtx);
SBLASTRECORDCHK(&so->so_rcv, "somove 1");
SBLASTMBUFCHK(&so->so_rcv, "somove 1");
@@ -1618,8 +1713,13 @@ somove(struct socket *so, int wait)
m = m->m_next;
if (m == NULL) {
sbdroprecord(so, &so->so_rcv);
- if (so->so_proto->pr_flags & PR_WANTRCVD)
+ if (so->so_proto->pr_flags & PR_WANTRCVD) {
+ mtx_leave(&sosp->so_snd.sb_mtx);
+ mtx_leave(&so->so_rcv.sb_mtx);
pru_rcvd(so);
+ mtx_enter(&so->so_rcv.sb_mtx);
+ mtx_enter(&sosp->so_snd.sb_mtx);
+ }
goto nextpkt;
}
@@ -1724,11 +1824,15 @@ somove(struct socket *so, int wait)
}
/* Send window update to source peer as receive buffer has changed. */
- if (so->so_proto->pr_flags & PR_WANTRCVD)
+ if (so->so_proto->pr_flags & PR_WANTRCVD) {
+ mtx_leave(&sosp->so_snd.sb_mtx);
+ mtx_leave(&so->so_rcv.sb_mtx);
pru_rcvd(so);
+ mtx_enter(&so->so_rcv.sb_mtx);
+ mtx_enter(&sosp->so_snd.sb_mtx);
+ }
/* Receive buffer did shrink by len bytes, adjust oob. */
- mtx_enter(&so->so_rcv.sb_mtx);
rcvstate = so->so_rcv.sb_state;
so->so_rcv.sb_state &= ~SS_RCVATMARK;
oobmark = so->so_oobmark;
@@ -1739,7 +1843,6 @@ somove(struct socket *so, int wait)
if (oobmark >= len)
oobmark = 0;
}
- mtx_leave(&so->so_rcv.sb_mtx);
/*
* Handle oob data. If any malloc fails, ignore error.
@@ -1755,7 +1858,12 @@ somove(struct socket *so, int wait)
} else if (oobmark) {
o = m_split(m, oobmark, wait);
if (o) {
+ mtx_leave(&sosp->so_snd.sb_mtx);
+ mtx_leave(&so->so_rcv.sb_mtx);
error = pru_send(sosp, m, NULL, NULL);
+ mtx_enter(&so->so_rcv.sb_mtx);
+ mtx_enter(&sosp->so_snd.sb_mtx);
+
if (error) {
if (sosp->so_snd.sb_state &
SS_CANTSENDMORE)
@@ -1773,7 +1881,13 @@ somove(struct socket *so, int wait)
if (o) {
o->m_len = 1;
*mtod(o, caddr_t) = *mtod(m, caddr_t);
+
+ mtx_leave(&sosp->so_snd.sb_mtx);
+ mtx_leave(&so->so_rcv.sb_mtx);
error = pru_sendoob(sosp, o, NULL, NULL);
+ mtx_enter(&so->so_rcv.sb_mtx);
+ mtx_enter(&sosp->so_snd.sb_mtx);
+
if (error) {
if (sosp->so_snd.sb_state & SS_CANTSENDMORE)
error = EPIPE;
@@ -1791,15 +1905,24 @@ somove(struct socket *so, int wait)
}
}
- mtx_enter(&sosp->so_snd.sb_mtx);
/* Append all remaining data to drain socket. */
if (so->so_rcv.sb_cc == 0 || maxreached)
sosp->so_snd.sb_state &= ~SS_ISSENDING;
+
mtx_leave(&sosp->so_snd.sb_mtx);
+ mtx_leave(&so->so_rcv.sb_mtx);
+ if (sockdgram)
+ solock_shared(sosp);
error = pru_send(sosp, m, NULL, NULL);
+ if (sockdgram)
+ sounlock_shared(sosp);
+
+ mtx_enter(&so->so_rcv.sb_mtx);
+ mtx_enter(&sosp->so_snd.sb_mtx);
+
if (error) {
- if (sosp->so_snd.sb_state & SS_CANTSENDMORE)
+ if (sosp->so_snd.sb_state & SS_CANTSENDMORE)
error = EPIPE;
goto release;
}
@@ -1810,26 +1933,35 @@ somove(struct socket *so, int wait)
goto nextpkt;
release:
- mtx_enter(&sosp->so_snd.sb_mtx);
sosp->so_snd.sb_state &= ~SS_ISSENDING;
- mtx_leave(&sosp->so_snd.sb_mtx);
if (!error && maxreached && so->so_splicemax == so->so_splicelen)
error = EFBIG;
if (error)
- so->so_error = error;
+ WRITE_ONCE(so->so_error, error);
+
if (((so->so_rcv.sb_state & SS_CANTRCVMORE) &&
so->so_rcv.sb_cc == 0) ||
(sosp->so_snd.sb_state & SS_CANTSENDMORE) ||
- maxreached || error) {
+ maxreached || error)
+ unsplice = 1;
+
+ mtx_leave(&sosp->so_snd.sb_mtx);
+ mtx_leave(&so->so_rcv.sb_mtx);
+
+ if (unsplice) {
+ if (sockdgram)
+ solock(so);
sounsplice(so, sosp, 0);
+ if (sockdgram)
+ sounlock(so);
+
return (0);
}
if (timerisset(&so->so_idletv))
timeout_add_tv(&so->so_idleto, &so->so_idletv);
return (1);
}
-
#endif /* SOCKET_SPLICE */
void
@@ -1839,22 +1971,16 @@ sorwakeup(struct socket *so)
soassertlocked_readonly(so);
#ifdef SOCKET_SPLICE
- if (so->so_rcv.sb_flags & SB_SPLICE) {
- /*
- * TCP has a sendbuffer that can handle multiple packets
- * at once. So queue the stream a bit to accumulate data.
- * The sosplice thread will call somove() later and send
- * the packets calling tcp_output() only once.
- * In the UDP case, send out the packets immediately.
- * Using a thread would make things slower.
- */
- if (so->so_proto->pr_flags & PR_WANTRCVD)
+ if (so->so_proto->pr_flags & PR_SPLICE) {
+ sb_mtx_lock(&so->so_rcv);
+ if (so->so_rcv.sb_flags & SB_SPLICE)
task_add(sosplice_taskq, &so->so_splicetask);
- else
- somove(so, M_DONTWAIT);
+ if (isspliced(so)) {
+ sb_mtx_unlock(&so->so_rcv);
+ return;
+ }
+ sb_mtx_unlock(&so->so_rcv);
}
- if (isspliced(so))
- return;
#endif
sowakeup(so, &so->so_rcv);
if (so->so_upcall)
@@ -1868,10 +1994,17 @@ sowwakeup(struct socket *so)
soassertlocked_readonly(so);
#ifdef SOCKET_SPLICE
- if (so->so_snd.sb_flags & SB_SPLICE)
- task_add(sosplice_taskq, &so->so_sp->ssp_soback->so_splicetask);
- if (issplicedback(so))
- return;
+ if (so->so_proto->pr_flags & PR_SPLICE) {
+ sb_mtx_lock(&so->so_snd);
+ if (so->so_snd.sb_flags & SB_SPLICE)
+ task_add(sosplice_taskq,
+ &so->so_sp->ssp_soback->so_splicetask);
+ if (issplicedback(so)) {
+ sb_mtx_unlock(&so->so_snd);
+ return;
+ }
+ sb_mtx_unlock(&so->so_snd);
+ }
#endif
sowakeup(so, &so->so_snd);
}
Index: sys/netinet/udp_usrreq.c
===================================================================
RCS file: /cvs/src/sys/netinet/udp_usrreq.c,v
diff -u -p -r1.322 udp_usrreq.c
--- sys/netinet/udp_usrreq.c 19 Jul 2024 15:41:58 -0000 1.322
+++ sys/netinet/udp_usrreq.c 19 Jul 2024 18:42:26 -0000
@@ -1209,6 +1209,11 @@ udp_send(struct socket *so, struct mbuf
soassertlocked_readonly(so);
+ if (inp == NULL) {
+ /* PCB could be destroyed, but socket still spliced. */
+ return (EINVAL);
+ }
+
#ifdef PIPEX
if (inp->inp_pipex) {
struct pipex_session *session;
Index: sys/sys/socketvar.h
===================================================================
RCS file: /cvs/src/sys/sys/socketvar.h,v
diff -u -p -r1.132 socketvar.h
--- sys/sys/socketvar.h 12 Jul 2024 17:20:18 -0000 1.132
+++ sys/sys/socketvar.h 19 Jul 2024 18:42:26 -0000
@@ -330,6 +330,12 @@ int sblock(struct sockbuf *, int);
/* release lock on sockbuf sb */
void sbunlock(struct sockbuf *);
+static inline void
+sbassertlocked(struct sockbuf *sb)
+{
+ rw_assert_wrlock(&sb->sb_lock);
+}
+
#define SB_EMPTY_FIXUP(sb) do { \
if ((sb)->sb_mb == NULL) { \
(sb)->sb_mbtail = NULL; \
Unlock udp(4) somove()