Index | Thread | Search

From:
Vitaliy Makkoveev <mvs@openbsd.org>
Subject:
Unlock udp(4) somove()
To:
Alexander Bluhm <bluhm@openbsd.org>, tech@openbsd.org
Date:
Sat, 13 Jul 2024 06:02:27 +0300

Download raw body.

Thread
udp(4) sockets are fully switched to fine-grained buffers locks. Socket
splicing belongs to sockets layer, all the magic performs around socket
buffers, so corresponding locks should be used instead of exclusive 
solock(). This allows simultaneously run many somove() threads, and use
shared solock() only around pru_send().

The problem lays in sorwakeup(). While udp(4) protocol layer appends 
packet to `so_rcv' buffer, sorwakeup() immediately calls somove(). This
denies us to use any shared solock() whiche relies no per-socket
`so_lock' rwlock(9) or sblock() which is per-buffet `sb_lock' rwlock(9).

I propose to always schedule somove() thread as we do for tcp(4) case. I
do understand that this brings delay to packet processing, but for non
spliced udp sockets we always schedule soreceive() threads. So yes, the
delay, but it is comparable with non spliced case and it should not be
such terrible. The profit is obvious - many simultaneous somove()
threads with the only shared solock() around pru_send(). Also  bluhm@
can easily make udp(4) input parallel, because sorwakeup() stops touch
spliced peer.

So the diff.

I intentionally introduced copy-pasted somove_dgram() where I removed
tcp(4) related stuff to make locking more obvious and avoid unnecessary
`sb_mtx' mutexes re-locking. This is also true for unsplicing within
soclose(), where udp(4) and tcp(4) sockets follow different paths. They
could be unified, but not with this diff.

udp(4) sockets still share the only `sosplice_taskq' with tcp(4)
sockets, but nothing stops to introduce many taskqs later, like we
already did with softnet tasks.

udp(4) sockets unsplicing still uses exclusive solock(). Shared solock()
uses per-socket `so_lock' rwlock(9) together shared netlock. In the case
while spliced sockets are different we need to lock them both, but
shared solock() doesn't support this now.

A few words about locking. sblock() on `so_rcv' serializes sosplice(),
sounsplice(), somove_dgram() and soreceive(). `sb_mtx' mutexes on
`so_rcv' and `so_snd' protect `ssp_soback' and `ssp_socket'
modification as corresponding SB_SPLICE bits.

soclose() drops solock() for udp(4) sockets case, somove_dgram() takes
shared solock() only around pru_send(), so connected peer could be
concurrently closed, but not unspliced and destroyed. Corresponding
check `so_pcb' was added to udp_send().

I successfully performed regress/sys/kern/sosplice many times, but I'm
not sosplice() user, so I'm very interesting in independent tests.

Index: sys/kern/uipc_socket.c
===================================================================
RCS file: /cvs/src/sys/kern/uipc_socket.c,v
diff -u -p -r1.337 uipc_socket.c
--- sys/kern/uipc_socket.c	12 Jul 2024 17:20:18 -0000	1.337
+++ sys/kern/uipc_socket.c	13 Jul 2024 01:53:23 -0000
@@ -65,6 +65,7 @@ void	sotask(void *);
 void	soreaper(void *);
 void	soput(void *);
 int	somove(struct socket *, int);
+int	somove_dgram(struct socket *, int);
 void	sorflush(struct socket *);
 
 void	filt_sordetach(struct knote *kn);
@@ -333,22 +334,6 @@ sofree(struct socket *so, int keep_lock)
 	sigio_free(&so->so_sigio);
 	klist_free(&so->so_rcv.sb_klist);
 	klist_free(&so->so_snd.sb_klist);
-#ifdef SOCKET_SPLICE
-	if (issplicedback(so)) {
-		int freeing = SOSP_FREEING_WRITE;
-
-		if (so->so_sp->ssp_soback == so)
-			freeing |= SOSP_FREEING_READ;
-		sounsplice(so->so_sp->ssp_soback, so, freeing);
-	}
-	if (isspliced(so)) {
-		int freeing = SOSP_FREEING_READ;
-
-		if (so == so->so_sp->ssp_socket)
-			freeing |= SOSP_FREEING_WRITE;
-		sounsplice(so, so->so_sp->ssp_socket, freeing);
-	}
-#endif /* SOCKET_SPLICE */
 
 	mtx_enter(&so->so_snd.sb_mtx);
 	sbrelease(so, &so->so_snd);
@@ -458,6 +443,80 @@ discard:
 	if (so->so_state & SS_NOFDREF)
 		panic("soclose NOFDREF: so %p, so_type %d", so, so->so_type);
 	so->so_state |= SS_NOFDREF;
+
+#ifdef SOCKET_SPLICE
+	if (so->so_sp) {
+		if (so->so_proto->pr_flags & PR_WANTRCVD) {
+			/*
+			 * XXXSMP: Copy - Paste, but can't relock and
+			 * sleep in sofree() in tcp(4) case.
+			 */
+
+			if (issplicedback(so)) {
+				int freeing = SOSP_FREEING_WRITE;
+
+				if (so->so_sp->ssp_soback == so)
+					freeing |= SOSP_FREEING_READ;
+				sounsplice(so->so_sp->ssp_soback, so, freeing);
+			}
+			if (isspliced(so)) {
+				int freeing = SOSP_FREEING_READ;
+
+				if (so == so->so_sp->ssp_socket)
+					freeing |= SOSP_FREEING_WRITE;
+				sounsplice(so, so->so_sp->ssp_socket, freeing);
+			}
+			goto free;
+		}
+
+		sounlock(so);
+
+		mtx_enter(&so->so_snd.sb_mtx);
+		/*
+		 * Concurrent sounsplice() locks `sb_mtx' mutextes on
+		 * both `so_snd' and `so_rcv' before unsplice sockets.
+		 */
+		if ((so2 = so->so_sp->ssp_soback) == NULL) {
+			mtx_leave(&so->so_snd.sb_mtx);
+			solock(so);
+			goto free;
+		}
+		soref(so2);
+		mtx_leave(&so->so_snd.sb_mtx);
+
+		sblock(&so2->so_rcv, SBL_WAIT | SBL_NOINTR);
+		/*
+		 * sblock() is always taken on `so_rcv' before call
+		 * sounsplice(). `so' is dying and can be only unspliced.
+		 */
+		if (issplicedback(so)) {
+			int freeing = SOSP_FREEING_WRITE;
+
+			if (so->so_sp->ssp_soback == so)
+				freeing |= SOSP_FREEING_READ;
+			solock(so2);
+			sounsplice(so->so_sp->ssp_soback, so, freeing);
+			sounlock(so2);
+		}
+		sbunlock(&so2->so_rcv);
+		sorele(so2);
+
+		sblock(&so->so_rcv, SBL_WAIT | SBL_NOINTR);
+		if (isspliced(so)) {
+			int freeing = SOSP_FREEING_READ;
+
+			if (so == so->so_sp->ssp_socket)
+				freeing |= SOSP_FREEING_WRITE;
+			solock(so);
+			sounsplice(so, so->so_sp->ssp_socket, freeing);
+			sounlock(so);
+		}
+		sbunlock(&so->so_rcv);
+
+		solock(so);
+	}
+free:
+#endif /* SOCKET_SPLICE */
 	/* sofree() calls sounlock(). */
 	sofree(so, 0);
 	return (error);
@@ -1318,7 +1377,7 @@ sosplice(struct socket *so, int fd, off_
 	struct file	*fp;
 	struct socket	*sosp;
 	struct taskq	*tq;
-	int		 error = 0;
+	int		 error = 0, somoveret;
 
 	if ((so->so_proto->pr_flags & PR_SPLICE) == 0)
 		return (EPROTONOSUPPORT);
@@ -1428,11 +1487,15 @@ sosplice(struct socket *so, int fd, off_
 	timeout_set_proc(&so->so_idleto, soidle, so);
 	task_set(&so->so_splicetask, sotask, so);
 
-	/*
-	 * To prevent softnet interrupt from calling somove() while
-	 * we sleep, the socket buffers are not marked as spliced yet.
-	 */
-	if (somove(so, M_WAIT)) {
+	if (so->so_proto->pr_flags & PR_WANTRCVD) 
+		somoveret = somove(so, M_WAIT);
+	else {
+		sounlock(so);
+		somoveret = somove_dgram(so, M_WAIT);
+		solock(so);
+	}
+	
+	if (somoveret) {
 		mtx_enter(&so->so_rcv.sb_mtx);
 		mtx_enter(&sosp->so_snd.sb_mtx);
 		so->so_rcv.sb_flags |= SB_SPLICE;
@@ -1454,6 +1517,8 @@ sosplice(struct socket *so, int fd, off_
 void
 sounsplice(struct socket *so, struct socket *sosp, int freeing)
 {
+	if ((so->so_proto->pr_flags & PR_WANTRCVD) == 0)
+		sbasserlocked(&so->so_rcv);
 	soassertlocked(so);
 
 	task_del(sosplice_taskq, &so->so_splicetask);
@@ -1479,32 +1544,55 @@ soidle(void *arg)
 {
 	struct socket *so = arg;
 
+	sblock(&so->so_rcv, SBL_WAIT | SBL_NOINTR);
 	solock(so);
+	/*
+	 * Depending on socket type, sblock(&so->so_rcv) or solock()
+	 * is always held while modifying SB_SPLICE and
+	 * so->so_sp->ssp_socket.
+	 */
 	if (so->so_rcv.sb_flags & SB_SPLICE) {
 		so->so_error = ETIMEDOUT;
 		sounsplice(so, so->so_sp->ssp_socket, 0);
 	}
 	sounlock(so);
+	sbunlock(&so->so_rcv);
 }
 
 void
 sotask(void *arg)
 {
 	struct socket *so = arg;
+	int doyield = 0;
 
-	solock(so);
-	if (so->so_rcv.sb_flags & SB_SPLICE) {
+	sblock(&so->so_rcv, SBL_WAIT | SBL_NOINTR);
+	if (so->so_proto->pr_flags & PR_WANTRCVD) {
+		solock(so);
+		/*
+		 * solock() is always held while modifying SB_SPLICE
+		 * and so->so_sp->ssp_socket.
+		 */
+		if (so->so_rcv.sb_flags & SB_SPLICE) {
+			somove(so, M_DONTWAIT);
+			doyield = 1;
+		}
+		sounlock(so);
+	} else {
 		/*
-		 * We may not sleep here as sofree() and unsplice() may be
-		 * called from softnet interrupt context.  This would remove
-		 * the socket during somove().
+		 * sblock(&so->so_rcv) is always held while modifying
+		 * SB_SPLICE and so->so_sp->ssp_socket. UDP sockets
+		 * do not modify `so_snd' buffer in sending path, no
+		 * need to lock it.
 		 */
-		somove(so, M_DONTWAIT);
+		if (so->so_rcv.sb_flags & SB_SPLICE)
+			somove_dgram(so, M_DONTWAIT);
 	}
-	sounlock(so);
+	sbunlock(&so->so_rcv);
 
-	/* Avoid user land starvation. */
-	yield();
+	if (doyield) {
+		/* Avoid user land starvation. */
+		yield();
+	}
 }
 
 /*
@@ -1830,6 +1918,226 @@ somove(struct socket *so, int wait)
 	return (1);
 }
 
+int
+somove_dgram(struct socket *so, int wait)
+{
+	struct socket	*sosp = so->so_sp->ssp_socket;
+	struct mbuf	*m, **mp, *nextrecord;
+	u_long		 len, off;
+	long		 space;
+	int		 error = 0, maxreached = 0, unsplice = 0;
+
+	sbasserlocked(&so->so_rcv);
+
+ nextpkt:
+	mtx_enter(&so->so_rcv.sb_mtx);
+	mtx_enter(&sosp->so_snd.sb_mtx);
+
+	if ((error = READ_ONCE(so->so_error)))
+		goto release;
+	if (sosp->so_snd.sb_state & SS_CANTSENDMORE) {
+		error = EPIPE;
+		goto release;
+	}
+
+	error = READ_ONCE(sosp->so_error);
+	if (error && error != ETIMEDOUT && error != EFBIG && error != ELOOP)
+		goto release;
+	if ((sosp->so_state & SS_ISCONNECTED) == 0)
+		goto release;
+
+	/* Calculate how many bytes can be copied now. */
+	len = so->so_rcv.sb_datacc;
+	if (so->so_splicemax) {
+		KASSERT(so->so_splicelen < so->so_splicemax);
+		if (so->so_splicemax <= so->so_splicelen + len) {
+			len = so->so_splicemax - so->so_splicelen;
+			maxreached = 1;
+		}
+	}
+	space = sbspace_locked(sosp, &sosp->so_snd);
+	if (space <= 0) {
+		maxreached = 0;
+		goto release;
+	}
+	if (space < len) {
+		maxreached = 0;
+		if (space < sosp->so_snd.sb_lowat) {
+			goto release;
+		}
+		len = space;
+	}
+
+	SBLASTRECORDCHK(&so->so_rcv, "somove_dgram 1");
+	SBLASTMBUFCHK(&so->so_rcv, "somove_dgram 1");
+	m = so->so_rcv.sb_mb;
+	if (m == NULL)
+		goto release;
+	nextrecord = m->m_nextpkt;
+
+	/* Drop address and control information not used with splicing. */
+#ifdef DIAGNOSTIC
+	if (m->m_type != MT_SONAME)
+		panic("somove_dgram soname: so %p, so_type %d, m %p, "
+		    "m_type %d", so, so->so_type, m, m->m_type);
+#endif
+	m = m->m_next;
+
+	while (m && m->m_type == MT_CONTROL)
+		m = m->m_next;
+	if (m == NULL) {
+		mtx_leave(&sosp->so_snd.sb_mtx);
+		sbdroprecord(so, &so->so_rcv);
+		mtx_leave(&so->so_rcv.sb_mtx);
+		goto nextpkt;
+	}
+
+	/*
+	 * By splicing sockets connected to localhost, userland might create a
+	 * loop.  Dissolve splicing with error if loop is detected by counter.
+	 *
+	 * If we deal with looped broadcast/multicast packet we bail out with
+	 * no error to suppress splice termination.
+	 */
+	if ((m->m_flags & M_PKTHDR) &&
+	    ((m->m_pkthdr.ph_loopcnt++ >= M_MAXLOOP) ||
+	    ((m->m_flags & M_LOOP) && (m->m_flags & (M_BCAST|M_MCAST))))) {
+		error = ELOOP;
+		goto release;
+	}
+
+	if ((m->m_flags & M_PKTHDR) == 0)
+		panic("somove_dgram !PKTHDR: so %p, so_type %d, m %p, "
+		    "m_type %d", so, so->so_type, m, m->m_type);
+	if (sosp->so_snd.sb_hiwat < m->m_pkthdr.len) {
+		error = EMSGSIZE;
+		goto release;
+	}
+	if (len < m->m_pkthdr.len)
+		goto release;
+	if (m->m_pkthdr.len < len) {
+		maxreached = 0;
+		len = m->m_pkthdr.len;
+	}
+	/*
+	 * Throw away the name mbuf after it has been assured
+	 * that the whole first record can be processed.
+	 */
+	m = so->so_rcv.sb_mb;
+	sbfree(so, &so->so_rcv, m);
+	so->so_rcv.sb_mb = m_free(m);
+	sbsync(&so->so_rcv, nextrecord);
+
+	/*
+	 * Throw away the control mbufs after it has been assured
+	 * that the whole first record can be processed.
+	 */
+	m = so->so_rcv.sb_mb;
+	while (m && m->m_type == MT_CONTROL) {
+		sbfree(so, &so->so_rcv, m);
+		so->so_rcv.sb_mb = m_free(m);
+		m = so->so_rcv.sb_mb;
+		sbsync(&so->so_rcv, nextrecord);
+	}
+
+	SBLASTRECORDCHK(&so->so_rcv, "somove_dgram 2");
+	SBLASTMBUFCHK(&so->so_rcv, "somove_dgram 2");
+
+	/* Take at most len mbufs out of receive buffer. */
+	for (off = 0, mp = &m; off <= len && *mp;
+	    off += (*mp)->m_len, mp = &(*mp)->m_next) {
+		u_long size = len - off;
+
+#ifdef DIAGNOSTIC
+		if ((*mp)->m_type != MT_DATA && (*mp)->m_type != MT_HEADER)
+			panic("somove_dgram type: so %p, so_type %d, m %p, "
+			    "m_type %d", so, so->so_type, *mp, (*mp)->m_type);
+#endif
+		if ((*mp)->m_len > size) {
+			/*
+			 * Move only a partial mbuf at maximum splice length or
+			 * if the drain buffer is too small for this large mbuf.
+			 */
+			if (!maxreached && so->so_snd.sb_datacc > 0) {
+				len -= size;
+				break;
+			}
+			*mp = m_copym(so->so_rcv.sb_mb, 0, size, wait);
+			if (*mp == NULL) {
+				len -= size;
+				break;
+			}
+			so->so_rcv.sb_mb->m_data += size;
+			so->so_rcv.sb_mb->m_len -= size;
+			so->so_rcv.sb_cc -= size;
+			so->so_rcv.sb_datacc -= size;
+		} else {
+			*mp = so->so_rcv.sb_mb;
+			sbfree(so, &so->so_rcv, *mp);
+			so->so_rcv.sb_mb = (*mp)->m_next;
+			sbsync(&so->so_rcv, nextrecord);
+		}
+	}
+	*mp = NULL;
+
+	SBLASTRECORDCHK(&so->so_rcv, "somove_dgram 3");
+	SBLASTMBUFCHK(&so->so_rcv, "somove_dgram 3");
+	SBCHECK(so, &so->so_rcv);
+	if (m == NULL)
+		goto release;
+	m->m_nextpkt = NULL;
+	if (m->m_flags & M_PKTHDR) {
+		m_resethdr(m);
+		m->m_pkthdr.len = len;
+	}
+
+	mtx_leave(&sosp->so_snd.sb_mtx);
+	mtx_leave(&so->so_rcv.sb_mtx);
+	solock_shared(sosp);
+	error = pru_send(sosp, m, NULL, NULL);
+	sounlock_shared(sosp);
+	mtx_enter(&so->so_rcv.sb_mtx);
+	mtx_enter(&sosp->so_snd.sb_mtx);
+
+	if (error) {
+		if (sosp->so_snd.sb_state & SS_CANTSENDMORE)
+			error = EPIPE;
+		goto release;
+	}
+	so->so_splicelen += len;
+
+	/* Move several packets if possible. */
+	if (!maxreached && nextrecord) {
+		mtx_leave(&sosp->so_snd.sb_mtx);
+		mtx_leave(&so->so_rcv.sb_mtx);
+		goto nextpkt;
+	}
+
+ release:
+	if (!error && maxreached && so->so_splicemax == so->so_splicelen)
+		error = EFBIG;
+	if (error)
+		WRITE_ONCE(so->so_error, error);
+
+	if (((so->so_rcv.sb_state & SS_CANTRCVMORE) &&
+	    so->so_rcv.sb_cc == 0) ||
+	    (sosp->so_snd.sb_state & SS_CANTSENDMORE) ||
+	    maxreached || error)
+		unsplice = 1;
+
+	mtx_leave(&sosp->so_snd.sb_mtx);
+	mtx_leave(&so->so_rcv.sb_mtx);
+
+	if (unsplice) {
+		solock(so);
+		sounsplice(so, sosp, 0);
+		sounlock(so);
+		return (0);
+	}
+	if (timerisset(&so->so_idletv))
+		timeout_add_tv(&so->so_idleto, &so->so_idletv);
+	return (1);
+}
 #endif /* SOCKET_SPLICE */
 
 void
@@ -1839,22 +2147,16 @@ sorwakeup(struct socket *so)
 		soassertlocked_readonly(so);
 
 #ifdef SOCKET_SPLICE
-	if (so->so_rcv.sb_flags & SB_SPLICE) {
-		/*
-		 * TCP has a sendbuffer that can handle multiple packets
-		 * at once.  So queue the stream a bit to accumulate data.
-		 * The sosplice thread will call somove() later and send
-		 * the packets calling tcp_output() only once.
-		 * In the UDP case, send out the packets immediately.
-		 * Using a thread would make things slower.
-		 */
-		if (so->so_proto->pr_flags & PR_WANTRCVD)
+	if (so->so_proto->pr_flags & PR_SPLICE) {
+		sb_mtx_lock(&so->so_rcv);
+		if (so->so_rcv.sb_flags & SB_SPLICE)
 			task_add(sosplice_taskq, &so->so_splicetask);
-		else
-			somove(so, M_DONTWAIT);
+		if (isspliced(so)) {
+			sb_mtx_unlock(&so->so_rcv);
+			return;
+		}
+		sb_mtx_unlock(&so->so_rcv);
 	}
-	if (isspliced(so))
-		return;
 #endif
 	sowakeup(so, &so->so_rcv);
 	if (so->so_upcall)
@@ -1868,10 +2170,17 @@ sowwakeup(struct socket *so)
 		soassertlocked_readonly(so);
 
 #ifdef SOCKET_SPLICE
-	if (so->so_snd.sb_flags & SB_SPLICE)
-		task_add(sosplice_taskq, &so->so_sp->ssp_soback->so_splicetask);
-	if (issplicedback(so))
-		return;
+	if (so->so_proto->pr_flags & PR_SPLICE) {
+		sb_mtx_lock(&so->so_snd);
+		if (so->so_snd.sb_flags & SB_SPLICE)
+			task_add(sosplice_taskq,
+			    &so->so_sp->ssp_soback->so_splicetask);
+		if (issplicedback(so)) {
+			sb_mtx_unlock(&so->so_snd);
+			return;
+		}
+		sb_mtx_unlock(&so->so_snd);
+	}
 #endif
 	sowakeup(so, &so->so_snd);
 }
Index: sys/netinet/udp_usrreq.c
===================================================================
RCS file: /cvs/src/sys/netinet/udp_usrreq.c,v
diff -u -p -r1.320 udp_usrreq.c
--- sys/netinet/udp_usrreq.c	17 Apr 2024 20:48:51 -0000	1.320
+++ sys/netinet/udp_usrreq.c	13 Jul 2024 01:53:23 -0000
@@ -1241,6 +1241,11 @@ udp_send(struct socket *so, struct mbuf 
 
 	soassertlocked(so);
 
+	if (inp == NULL) {
+		/* PCB could be destroyed, but socket still spliced. */
+		return (EINVAL);
+	}
+
 #ifdef PIPEX
 	if (inp->inp_pipex) {
 		struct pipex_session *session;
Index: sys/sys/socketvar.h
===================================================================
RCS file: /cvs/src/sys/sys/socketvar.h,v
diff -u -p -r1.132 socketvar.h
--- sys/sys/socketvar.h	12 Jul 2024 17:20:18 -0000	1.132
+++ sys/sys/socketvar.h	13 Jul 2024 01:53:23 -0000
@@ -330,6 +330,12 @@ int sblock(struct sockbuf *, int);
 /* release lock on sockbuf sb */
 void sbunlock(struct sockbuf *);
 
+static inline void
+sbasserlocked(struct sockbuf *sb)
+{
+	rw_assert_wrlock(&sb->sb_lock);
+}
+
 #define	SB_EMPTY_FIXUP(sb) do {						\
 	if ((sb)->sb_mb == NULL) {					\
 		(sb)->sb_mbtail = NULL;					\