Index | Thread | Search

From:
Vitaliy Makkoveev <mvs@openbsd.org>
Subject:
Re: Unlock udp(4) somove()
To:
Alexander Bluhm <bluhm@openbsd.org>
Cc:
tech@openbsd.org
Date:
Fri, 19 Jul 2024 21:47:34 +0300

Download raw body.

Thread
On Fri, Jul 19, 2024 at 08:32:23PM +0200, Alexander Bluhm wrote:
> Look at my proposal.  I did not change unlocking of pru_rcvd().  I
> just moved it directly around unlock/pru_rcvd/lock.  Only difference
> is that sbdroprecord() runs with sosp->so_snd.sb_mtx locked.  That
> does not matter.  In the UDP path we do not unlock/lock when we go
> to nextpkt.
> 

Understood. This one releases `sb_mtx' mutexes only around pru_rcvd(). 

Index: sys/kern/uipc_socket.c
===================================================================
RCS file: /cvs/src/sys/kern/uipc_socket.c,v
diff -u -p -r1.338 uipc_socket.c
--- sys/kern/uipc_socket.c	14 Jul 2024 15:42:23 -0000	1.338
+++ sys/kern/uipc_socket.c	19 Jul 2024 18:42:26 -0000
@@ -324,31 +324,22 @@ sofree(struct socket *so, int keep_lock)
 			sounlock(head);
 	}
 
-	if (persocket) {
+	switch (so->so_proto->pr_domain->dom_family) {
+	case AF_INET:
+	case AF_INET6:
+		if (so->so_proto->pr_type == SOCK_STREAM)
+			break;
+		/* FALLTHROUGH */
+	default:
 		sounlock(so);
 		refcnt_finalize(&so->so_refcnt, "sofinal");
 		solock(so);
+		break;
 	}
 
 	sigio_free(&so->so_sigio);
 	klist_free(&so->so_rcv.sb_klist);
 	klist_free(&so->so_snd.sb_klist);
-#ifdef SOCKET_SPLICE
-	if (issplicedback(so)) {
-		int freeing = SOSP_FREEING_WRITE;
-
-		if (so->so_sp->ssp_soback == so)
-			freeing |= SOSP_FREEING_READ;
-		sounsplice(so->so_sp->ssp_soback, so, freeing);
-	}
-	if (isspliced(so)) {
-		int freeing = SOSP_FREEING_READ;
-
-		if (so == so->so_sp->ssp_socket)
-			freeing |= SOSP_FREEING_WRITE;
-		sounsplice(so, so->so_sp->ssp_socket, freeing);
-	}
-#endif /* SOCKET_SPLICE */
 
 	mtx_enter(&so->so_snd.sb_mtx);
 	sbrelease(so, &so->so_snd);
@@ -458,6 +449,83 @@ discard:
 	if (so->so_state & SS_NOFDREF)
 		panic("soclose NOFDREF: so %p, so_type %d", so, so->so_type);
 	so->so_state |= SS_NOFDREF;
+
+#ifdef SOCKET_SPLICE
+	if (so->so_sp) {
+		struct socket *soback;
+
+		if (so->so_proto->pr_flags & PR_WANTRCVD) {
+			/*
+			 * Copy - Paste, but can't relock an sleep in
+			 * sofree() in tcp(4) case. That's why tcp(4)
+			 * still rely on solock() for splicing and
+			 * unsplicing.
+			 */
+
+			if (issplicedback(so)) {
+				int freeing = SOSP_FREEING_WRITE;
+
+				if (so->so_sp->ssp_soback == so)
+					freeing |= SOSP_FREEING_READ;
+				sounsplice(so->so_sp->ssp_soback, so, freeing);
+			}
+			if (isspliced(so)) {
+				int freeing = SOSP_FREEING_READ;
+
+				if (so == so->so_sp->ssp_socket)
+					freeing |= SOSP_FREEING_WRITE;
+				sounsplice(so, so->so_sp->ssp_socket, freeing);
+			}
+			goto free;
+		}
+
+		sounlock(so);
+		mtx_enter(&so->so_snd.sb_mtx);
+		/*
+		 * Concurrent sounsplice() locks `sb_mtx' mutextes on
+		 * both `so_snd' and `so_rcv' before unsplice sockets.
+		 */
+		if ((soback = so->so_sp->ssp_soback) == NULL) {
+			mtx_leave(&so->so_snd.sb_mtx);
+			goto notsplicedback;
+		}
+		soref(soback);
+		mtx_leave(&so->so_snd.sb_mtx);
+
+		sblock(&soback->so_rcv, SBL_WAIT | SBL_NOINTR);
+		/*
+		 * sblock() is always taken on `so_rcv' before call
+		 * sounsplice(). `so' is dying and can be only unspliced.
+		 */
+		if (issplicedback(so)) {
+			int freeing = SOSP_FREEING_WRITE;
+
+			if (so->so_sp->ssp_soback == so)
+				freeing |= SOSP_FREEING_READ;
+			solock(soback);
+			sounsplice(so->so_sp->ssp_soback, so, freeing);
+			sounlock(soback);
+		}
+		sbunlock(&soback->so_rcv);
+		sorele(soback);
+
+notsplicedback:
+		sblock(&so->so_rcv, SBL_WAIT | SBL_NOINTR);
+		if (isspliced(so)) {
+			int freeing = SOSP_FREEING_READ;
+
+			if (so == so->so_sp->ssp_socket)
+				freeing |= SOSP_FREEING_WRITE;
+			solock(so);
+			sounsplice(so, so->so_sp->ssp_socket, freeing);
+			sounlock(so);
+		}
+		sbunlock(&so->so_rcv);
+
+		solock(so);
+	}
+free:
+#endif /* SOCKET_SPLICE */
 	/* sofree() calls sounlock(). */
 	sofree(so, 0);
 	return (error);
@@ -1411,14 +1479,6 @@ sosplice(struct socket *so, int fd, off_
 		goto release;
 	}
 
-	/* Splice so and sosp together. */
-	mtx_enter(&so->so_rcv.sb_mtx);
-	mtx_enter(&sosp->so_snd.sb_mtx);
-	so->so_sp->ssp_socket = sosp;
-	sosp->so_sp->ssp_soback = so;
-	mtx_leave(&sosp->so_snd.sb_mtx);
-	mtx_leave(&so->so_rcv.sb_mtx);
-
 	so->so_splicelen = 0;
 	so->so_splicemax = max;
 	if (tv)
@@ -1429,9 +1489,20 @@ sosplice(struct socket *so, int fd, off_
 	task_set(&so->so_splicetask, sotask, so);
 
 	/*
-	 * To prevent softnet interrupt from calling somove() while
-	 * we sleep, the socket buffers are not marked as spliced yet.
+	 * To prevent sorwakeup() calling somove() before this somove()
+	 * has finished, the socket buffers are not marked as spliced yet.
 	 */
+
+	/* Splice so and sosp together. */
+	mtx_enter(&so->so_rcv.sb_mtx);
+	mtx_enter(&sosp->so_snd.sb_mtx);
+	so->so_sp->ssp_socket = sosp;
+	sosp->so_sp->ssp_soback = so;
+	mtx_leave(&sosp->so_snd.sb_mtx);
+	mtx_leave(&so->so_rcv.sb_mtx);
+
+	if ((so->so_proto->pr_flags & PR_WANTRCVD) == 0)
+		sounlock(so);
 	if (somove(so, M_WAIT)) {
 		mtx_enter(&so->so_rcv.sb_mtx);
 		mtx_enter(&sosp->so_snd.sb_mtx);
@@ -1440,6 +1511,8 @@ sosplice(struct socket *so, int fd, off_
 		mtx_leave(&sosp->so_snd.sb_mtx);
 		mtx_leave(&so->so_rcv.sb_mtx);
 	}
+	if ((so->so_proto->pr_flags & PR_WANTRCVD) == 0)
+		solock(so);
 
  release:
 	sounlock(so);
@@ -1454,6 +1527,8 @@ sosplice(struct socket *so, int fd, off_
 void
 sounsplice(struct socket *so, struct socket *sosp, int freeing)
 {
+	if ((so->so_proto->pr_flags & PR_WANTRCVD) == 0)
+		sbassertlocked(&so->so_rcv);
 	soassertlocked(so);
 
 	task_del(sosplice_taskq, &so->so_splicetask);
@@ -1479,32 +1554,51 @@ soidle(void *arg)
 {
 	struct socket *so = arg;
 
+	sblock(&so->so_rcv, SBL_WAIT | SBL_NOINTR);
 	solock(so);
+	/*
+	 * Depending on socket type, sblock(&so->so_rcv) or solock()
+	 * is always held while modifying SB_SPLICE and
+	 * so->so_sp->ssp_socket.
+	 */
 	if (so->so_rcv.sb_flags & SB_SPLICE) {
 		so->so_error = ETIMEDOUT;
 		sounsplice(so, so->so_sp->ssp_socket, 0);
 	}
 	sounlock(so);
+	sbunlock(&so->so_rcv);
 }
 
 void
 sotask(void *arg)
 {
 	struct socket *so = arg;
+	int doyield = 0;
+	int sockstream = (so->so_proto->pr_flags & PR_WANTRCVD);
+
+	/*
+	 * sblock() on `so_rcv' protects sockets from beind unspliced
+	 * for UDP case. TCP sockets still rely on solock(). 
+	 */
+
+	sblock(&so->so_rcv, SBL_WAIT | SBL_NOINTR);
+	if (sockstream)
+		solock(so);
 
-	solock(so);
 	if (so->so_rcv.sb_flags & SB_SPLICE) {
-		/*
-		 * We may not sleep here as sofree() and unsplice() may be
-		 * called from softnet interrupt context.  This would remove
-		 * the socket during somove().
-		 */
+		if (sockstream)
+			doyield = 1;
 		somove(so, M_DONTWAIT);
 	}
-	sounlock(so);
 
-	/* Avoid user land starvation. */
-	yield();
+	if (sockstream)
+		sounlock(so);
+	sbunlock(&so->so_rcv);
+
+	if (doyield) {
+		/* Avoid user land starvation. */
+		yield();
+	}
 }
 
 /*
@@ -1546,25 +1640,30 @@ somove(struct socket *so, int wait)
 	struct mbuf	*m, **mp, *nextrecord;
 	u_long		 len, off, oobmark;
 	long		 space;
-	int		 error = 0, maxreached = 0;
+	int		 error = 0, maxreached = 0, unsplice = 0;
 	unsigned int	 rcvstate;
+	int		 sockdgram = ((so->so_proto->pr_flags &
+			     PR_WANTRCVD) == 0);
 
-	soassertlocked(so);
+	if (sockdgram)
+		sbassertlocked(&so->so_rcv);
+	else
+		soassertlocked(so);
+
+	mtx_enter(&so->so_rcv.sb_mtx);
+	mtx_enter(&sosp->so_snd.sb_mtx);
 
  nextpkt:
-	if (so->so_error) {
-		error = so->so_error;
+	if ((error = READ_ONCE(so->so_error)))
 		goto release;
-	}
 	if (sosp->so_snd.sb_state & SS_CANTSENDMORE) {
 		error = EPIPE;
 		goto release;
 	}
-	if (sosp->so_error && sosp->so_error != ETIMEDOUT &&
-	    sosp->so_error != EFBIG && sosp->so_error != ELOOP) {
-		error = sosp->so_error;
+
+	error = READ_ONCE(sosp->so_error);
+	if (error && error != ETIMEDOUT && error != EFBIG && error != ELOOP)
 		goto release;
-	}
 	if ((sosp->so_state & SS_ISCONNECTED) == 0)
 		goto release;
 
@@ -1577,26 +1676,22 @@ somove(struct socket *so, int wait)
 			maxreached = 1;
 		}
 	}
-	mtx_enter(&sosp->so_snd.sb_mtx);
 	space = sbspace_locked(sosp, &sosp->so_snd);
 	if (so->so_oobmark && so->so_oobmark < len &&
 	    so->so_oobmark < space + 1024)
 		space += 1024;
 	if (space <= 0) {
-		mtx_leave(&sosp->so_snd.sb_mtx);
 		maxreached = 0;
 		goto release;
 	}
 	if (space < len) {
 		maxreached = 0;
 		if (space < sosp->so_snd.sb_lowat) {
-			mtx_leave(&sosp->so_snd.sb_mtx);
 			goto release;
 		}
 		len = space;
 	}
 	sosp->so_snd.sb_state |= SS_ISSENDING;
-	mtx_leave(&sosp->so_snd.sb_mtx);
 
 	SBLASTRECORDCHK(&so->so_rcv, "somove 1");
 	SBLASTMBUFCHK(&so->so_rcv, "somove 1");
@@ -1618,8 +1713,13 @@ somove(struct socket *so, int wait)
 		m = m->m_next;
 	if (m == NULL) {
 		sbdroprecord(so, &so->so_rcv);
-		if (so->so_proto->pr_flags & PR_WANTRCVD)
+		if (so->so_proto->pr_flags & PR_WANTRCVD) {
+			mtx_leave(&sosp->so_snd.sb_mtx);
+			mtx_leave(&so->so_rcv.sb_mtx);
 			pru_rcvd(so);
+			mtx_enter(&so->so_rcv.sb_mtx);
+			mtx_enter(&sosp->so_snd.sb_mtx);
+		}
 		goto nextpkt;
 	}
 
@@ -1724,11 +1824,15 @@ somove(struct socket *so, int wait)
 	}
 
 	/* Send window update to source peer as receive buffer has changed. */
-	if (so->so_proto->pr_flags & PR_WANTRCVD)
+	if (so->so_proto->pr_flags & PR_WANTRCVD) {
+		mtx_leave(&sosp->so_snd.sb_mtx);
+		mtx_leave(&so->so_rcv.sb_mtx);
 		pru_rcvd(so);
+		mtx_enter(&so->so_rcv.sb_mtx);
+		mtx_enter(&sosp->so_snd.sb_mtx);
+	}
 
 	/* Receive buffer did shrink by len bytes, adjust oob. */
-	mtx_enter(&so->so_rcv.sb_mtx);
 	rcvstate = so->so_rcv.sb_state;
 	so->so_rcv.sb_state &= ~SS_RCVATMARK;
 	oobmark = so->so_oobmark;
@@ -1739,7 +1843,6 @@ somove(struct socket *so, int wait)
 		if (oobmark >= len)
 			oobmark = 0;
 	}
-	mtx_leave(&so->so_rcv.sb_mtx);
 
 	/*
 	 * Handle oob data.  If any malloc fails, ignore error.
@@ -1755,7 +1858,12 @@ somove(struct socket *so, int wait)
 		} else if (oobmark) {
 			o = m_split(m, oobmark, wait);
 			if (o) {
+				mtx_leave(&sosp->so_snd.sb_mtx);
+				mtx_leave(&so->so_rcv.sb_mtx);
 				error = pru_send(sosp, m, NULL, NULL);
+				mtx_enter(&so->so_rcv.sb_mtx);
+				mtx_enter(&sosp->so_snd.sb_mtx);
+
 				if (error) {
 					if (sosp->so_snd.sb_state &
 					    SS_CANTSENDMORE)
@@ -1773,7 +1881,13 @@ somove(struct socket *so, int wait)
 		if (o) {
 			o->m_len = 1;
 			*mtod(o, caddr_t) = *mtod(m, caddr_t);
+		
+			mtx_leave(&sosp->so_snd.sb_mtx);
+			mtx_leave(&so->so_rcv.sb_mtx);
 			error = pru_sendoob(sosp, o, NULL, NULL);
+			mtx_enter(&so->so_rcv.sb_mtx);
+			mtx_enter(&sosp->so_snd.sb_mtx);
+
 			if (error) {
 				if (sosp->so_snd.sb_state & SS_CANTSENDMORE)
 					error = EPIPE;
@@ -1791,15 +1905,24 @@ somove(struct socket *so, int wait)
 		}
 	}
 
-	mtx_enter(&sosp->so_snd.sb_mtx);
 	/* Append all remaining data to drain socket. */
 	if (so->so_rcv.sb_cc == 0 || maxreached)
 		sosp->so_snd.sb_state &= ~SS_ISSENDING;
+
 	mtx_leave(&sosp->so_snd.sb_mtx);
+	mtx_leave(&so->so_rcv.sb_mtx);
 
+	if (sockdgram)
+		solock_shared(sosp);
 	error = pru_send(sosp, m, NULL, NULL);
+	if (sockdgram)
+		sounlock_shared(sosp);
+
+	mtx_enter(&so->so_rcv.sb_mtx);
+	mtx_enter(&sosp->so_snd.sb_mtx);
+	
 	if (error) {
-		if (sosp->so_snd.sb_state & SS_CANTSENDMORE)
+		if (sosp->so_snd.sb_state & SS_CANTSENDMORE) 
 			error = EPIPE;
 		goto release;
 	}
@@ -1810,26 +1933,35 @@ somove(struct socket *so, int wait)
 		goto nextpkt;
 
  release:
-	mtx_enter(&sosp->so_snd.sb_mtx);
 	sosp->so_snd.sb_state &= ~SS_ISSENDING;
-	mtx_leave(&sosp->so_snd.sb_mtx);
 
 	if (!error && maxreached && so->so_splicemax == so->so_splicelen)
 		error = EFBIG;
 	if (error)
-		so->so_error = error;
+		WRITE_ONCE(so->so_error, error);
+
 	if (((so->so_rcv.sb_state & SS_CANTRCVMORE) &&
 	    so->so_rcv.sb_cc == 0) ||
 	    (sosp->so_snd.sb_state & SS_CANTSENDMORE) ||
-	    maxreached || error) {
+	    maxreached || error)
+		unsplice = 1;
+
+	mtx_leave(&sosp->so_snd.sb_mtx);
+	mtx_leave(&so->so_rcv.sb_mtx);
+
+	if (unsplice) {
+		if (sockdgram)
+			solock(so);
 		sounsplice(so, sosp, 0);
+		if (sockdgram)
+			sounlock(so);
+
 		return (0);
 	}
 	if (timerisset(&so->so_idletv))
 		timeout_add_tv(&so->so_idleto, &so->so_idletv);
 	return (1);
 }
-
 #endif /* SOCKET_SPLICE */
 
 void
@@ -1839,22 +1971,16 @@ sorwakeup(struct socket *so)
 		soassertlocked_readonly(so);
 
 #ifdef SOCKET_SPLICE
-	if (so->so_rcv.sb_flags & SB_SPLICE) {
-		/*
-		 * TCP has a sendbuffer that can handle multiple packets
-		 * at once.  So queue the stream a bit to accumulate data.
-		 * The sosplice thread will call somove() later and send
-		 * the packets calling tcp_output() only once.
-		 * In the UDP case, send out the packets immediately.
-		 * Using a thread would make things slower.
-		 */
-		if (so->so_proto->pr_flags & PR_WANTRCVD)
+	if (so->so_proto->pr_flags & PR_SPLICE) {
+		sb_mtx_lock(&so->so_rcv);
+		if (so->so_rcv.sb_flags & SB_SPLICE)
 			task_add(sosplice_taskq, &so->so_splicetask);
-		else
-			somove(so, M_DONTWAIT);
+		if (isspliced(so)) {
+			sb_mtx_unlock(&so->so_rcv);
+			return;
+		}
+		sb_mtx_unlock(&so->so_rcv);
 	}
-	if (isspliced(so))
-		return;
 #endif
 	sowakeup(so, &so->so_rcv);
 	if (so->so_upcall)
@@ -1868,10 +1994,17 @@ sowwakeup(struct socket *so)
 		soassertlocked_readonly(so);
 
 #ifdef SOCKET_SPLICE
-	if (so->so_snd.sb_flags & SB_SPLICE)
-		task_add(sosplice_taskq, &so->so_sp->ssp_soback->so_splicetask);
-	if (issplicedback(so))
-		return;
+	if (so->so_proto->pr_flags & PR_SPLICE) {
+		sb_mtx_lock(&so->so_snd);
+		if (so->so_snd.sb_flags & SB_SPLICE)
+			task_add(sosplice_taskq,
+			    &so->so_sp->ssp_soback->so_splicetask);
+		if (issplicedback(so)) {
+			sb_mtx_unlock(&so->so_snd);
+			return;
+		}
+		sb_mtx_unlock(&so->so_snd);
+	}
 #endif
 	sowakeup(so, &so->so_snd);
 }
Index: sys/netinet/udp_usrreq.c
===================================================================
RCS file: /cvs/src/sys/netinet/udp_usrreq.c,v
diff -u -p -r1.322 udp_usrreq.c
--- sys/netinet/udp_usrreq.c	19 Jul 2024 15:41:58 -0000	1.322
+++ sys/netinet/udp_usrreq.c	19 Jul 2024 18:42:26 -0000
@@ -1209,6 +1209,11 @@ udp_send(struct socket *so, struct mbuf 
 
 	soassertlocked_readonly(so);
 
+	if (inp == NULL) {
+		/* PCB could be destroyed, but socket still spliced. */
+		return (EINVAL);
+	}
+
 #ifdef PIPEX
 	if (inp->inp_pipex) {
 		struct pipex_session *session;
Index: sys/sys/socketvar.h
===================================================================
RCS file: /cvs/src/sys/sys/socketvar.h,v
diff -u -p -r1.132 socketvar.h
--- sys/sys/socketvar.h	12 Jul 2024 17:20:18 -0000	1.132
+++ sys/sys/socketvar.h	19 Jul 2024 18:42:26 -0000
@@ -330,6 +330,12 @@ int sblock(struct sockbuf *, int);
 /* release lock on sockbuf sb */
 void sbunlock(struct sockbuf *);
 
+static inline void
+sbassertlocked(struct sockbuf *sb)
+{
+	rw_assert_wrlock(&sb->sb_lock);
+}
+
 #define	SB_EMPTY_FIXUP(sb) do {						\
 	if ((sb)->sb_mb == NULL) {					\
 		(sb)->sb_mbtail = NULL;					\