Index | Thread | Search

From:
Vitaliy Makkoveev <mvs@openbsd.org>
Subject:
unix(4) sockets: use `sb_mtx' to protect `so_rcv' of receiving socket
To:
Alexander Bluhm <bluhm@openbsd.org>, tech@openbsd.org
Date:
Mon, 12 Feb 2024 02:55:46 +0300

Download raw body.

Thread
  • Vitaliy Makkoveev:

    unix(4) sockets: use `sb_mtx' to protect `so_rcv' of receiving socket

We need to lock both sockets to connect or disconnect them, so it's
enough to hold only one lock to perform read-only access to `unp_conn'
where connected socket is linked.

Use `sb_mtx' mutex(9) to protect sockbuf of the receiving socket and
corresponding sbappend*() in uipc_*_send() paths. Left peer unlocked, so
the sockets relocking is gone from unix(4) sockets transmission, except
the sendto(2) case.

Since the peer socket left unlocked, the transmission from s1 to s2 and
from s2 to s1 could be done simultaneously.

Index: sys/kern/sys_socket.c
===================================================================
RCS file: /cvs/src/sys/kern/sys_socket.c,v
retrieving revision 1.61
diff -u -p -r1.61 sys_socket.c
--- sys/kern/sys_socket.c	15 Apr 2023 13:18:28 -0000	1.61
+++ sys/kern/sys_socket.c	11 Feb 2024 22:38:46 -0000
@@ -144,9 +144,11 @@ soo_stat(struct file *fp, struct stat *u
 	memset(ub, 0, sizeof (*ub));
 	ub->st_mode = S_IFSOCK;
 	solock(so);
+	sb_mtx_lock(&so->so_rcv);
 	if ((so->so_rcv.sb_state & SS_CANTRCVMORE) == 0 ||
 	    so->so_rcv.sb_cc != 0)
 		ub->st_mode |= S_IRUSR | S_IRGRP | S_IROTH;
+	sb_mtx_unlock(&so->so_rcv);
 	if ((so->so_snd.sb_state & SS_CANTSENDMORE) == 0)
 		ub->st_mode |= S_IWUSR | S_IWGRP | S_IWOTH;
 	ub->st_uid = so->so_euid;
Index: sys/kern/uipc_socket.c
===================================================================
RCS file: /cvs/src/sys/kern/uipc_socket.c,v
retrieving revision 1.319
diff -u -p -r1.319 uipc_socket.c
--- sys/kern/uipc_socket.c	11 Feb 2024 21:36:49 -0000	1.319
+++ sys/kern/uipc_socket.c	11 Feb 2024 22:38:46 -0000
@@ -65,7 +65,6 @@ void	sotask(void *);
 void	soreaper(void *);
 void	soput(void *);
 int	somove(struct socket *, int);
-void	sorflush(struct socket *);
 
 void	filt_sordetach(struct knote *kn);
 int	filt_soread(struct knote *kn, long hint);
@@ -160,6 +159,9 @@ soalloc(const struct domain *dp, int wai
 			break;
 		}
 		break;
+	case AF_UNIX:
+		so->so_rcv.sb_flags |= SB_MTXLOCK;
+		break;
 	}
 
 	return (so);
@@ -987,8 +989,13 @@ dontblock:
 				 * Dispose of any SCM_RIGHTS message that went
 				 * through the read path rather than recv.
 				 */
-				if (pr->pr_domain->dom_dispose)
+				if (pr->pr_domain->dom_dispose) {
+					sb_mtx_unlock(&so->so_rcv);
+					sounlock_shared(so);
 					pr->pr_domain->dom_dispose(cm);
+					solock_shared(so);
+					sb_mtx_lock(&so->so_rcv);
+				}
 				m_free(cm);
 			}
 		}
@@ -1173,8 +1180,11 @@ dontblock:
 		}
 		SBLASTRECORDCHK(&so->so_rcv, "soreceive 4");
 		SBLASTMBUFCHK(&so->so_rcv, "soreceive 4");
-		if (pr->pr_flags & PR_WANTRCVD)
+		if (pr->pr_flags & PR_WANTRCVD) {
+			sb_mtx_unlock(&so->so_rcv);
 			pru_rcvd(so);
+			sb_mtx_lock(&so->so_rcv);
+		}
 	}
 	if (orig_resid == uio->uio_resid && orig_resid &&
 	    (flags & MSG_EOR) == 0 &&
@@ -1233,10 +1243,10 @@ sorflush(struct socket *so)
 	/* with SBL_WAIT and SLB_NOINTR sblock() must not fail */
 	KASSERT(error == 0);
 	socantrcvmore(so);
+	mtx_enter(&sb->sb_mtx);
 	m = sb->sb_mb;
 	memset(&sb->sb_startzero, 0,
 	     (caddr_t)&sb->sb_endzero - (caddr_t)&sb->sb_startzero);
-	mtx_enter(&sb->sb_mtx);
 	sb->sb_timeo_nsecs = INFSLP;
 	mtx_leave(&sb->sb_mtx);
 	sbunlock(so, sb);
@@ -1757,7 +1767,8 @@ somove(struct socket *so, int wait)
 void
 sorwakeup(struct socket *so)
 {
-	soassertlocked_readonly(so);
+	if ((so->so_rcv.sb_flags & SB_MTXLOCK) == 0)
+		soassertlocked_readonly(so);
 
 #ifdef SOCKET_SPLICE
 	if (so->so_rcv.sb_flags & SB_SPLICE) {
@@ -1877,6 +1888,7 @@ sosetopt(struct socket *so, int level, i
 				cnt = 1;
 
 			solock(so);
+			sb_mtx_lock(sb);
 			switch (optname) {
 			case SO_SNDBUF:
 			case SO_RCVBUF:
@@ -1898,6 +1910,7 @@ sosetopt(struct socket *so, int level, i
 				    sb->sb_hiwat : cnt;
 				break;
 			}
+			sb_mtx_unlock(sb);
 			sounlock(so);
 			break;
 		    }
@@ -2218,7 +2231,7 @@ filt_soread(struct knote *kn, long hint)
 	struct socket *so = kn->kn_fp->f_data;
 	int rv = 0;
 
-	sofilt_assert_locked(so, &so->so_rcv);
+	sbmtxassertlocked_readonly(so, &so->so_rcv);
 
 	if (so->so_options & SO_ACCEPTCONN) {
 		kn->kn_data = so->so_qlen;
Index: sys/kern/uipc_socket2.c
===================================================================
RCS file: /cvs/src/sys/kern/uipc_socket2.c,v
retrieving revision 1.143
diff -u -p -r1.143 uipc_socket2.c
--- sys/kern/uipc_socket2.c	11 Feb 2024 18:14:26 -0000	1.143
+++ sys/kern/uipc_socket2.c	11 Feb 2024 22:38:46 -0000
@@ -142,7 +142,9 @@ soisdisconnecting(struct socket *so)
 	soassertlocked(so);
 	so->so_state &= ~SS_ISCONNECTING;
 	so->so_state |= SS_ISDISCONNECTING;
+	mtx_enter(&so->so_rcv.sb_mtx);
 	so->so_rcv.sb_state |= SS_CANTRCVMORE;
+	mtx_leave(&so->so_rcv.sb_mtx);
 	so->so_snd.sb_state |= SS_CANTSENDMORE;
 	wakeup(&so->so_timeo);
 	sowwakeup(so);
@@ -155,7 +157,9 @@ soisdisconnected(struct socket *so)
 	soassertlocked(so);
 	so->so_state &= ~(SS_ISCONNECTING|SS_ISCONNECTED|SS_ISDISCONNECTING);
 	so->so_state |= SS_ISDISCONNECTED;
+	mtx_enter(&so->so_rcv.sb_mtx);
 	so->so_rcv.sb_state |= SS_CANTRCVMORE;
+	mtx_leave(&so->so_rcv.sb_mtx);
 	so->so_snd.sb_state |= SS_CANTSENDMORE;
 	wakeup(&so->so_timeo);
 	sowwakeup(so);
@@ -219,9 +223,10 @@ sonewconn(struct socket *head, int conns
 	mtx_enter(&head->so_snd.sb_mtx);
 	so->so_snd.sb_timeo_nsecs = head->so_snd.sb_timeo_nsecs;
 	mtx_leave(&head->so_snd.sb_mtx);
+
+	mtx_enter(&head->so_rcv.sb_mtx);
 	so->so_rcv.sb_wat = head->so_rcv.sb_wat;
 	so->so_rcv.sb_lowat = head->so_rcv.sb_lowat;
-	mtx_enter(&head->so_rcv.sb_mtx);
 	so->so_rcv.sb_timeo_nsecs = head->so_rcv.sb_timeo_nsecs;
 	mtx_leave(&head->so_rcv.sb_mtx);
 
@@ -345,8 +350,9 @@ socantsendmore(struct socket *so)
 void
 socantrcvmore(struct socket *so)
 {
-	soassertlocked(so);
+	mtx_enter(&so->so_rcv.sb_mtx);
 	so->so_rcv.sb_state |= SS_CANTRCVMORE;
+	mtx_leave(&so->so_rcv.sb_mtx);
 	sorwakeup(so);
 }
 
@@ -514,6 +520,16 @@ sbmtxassertlocked(struct socket *so, str
 		soassertlocked(so);
 }
 
+void
+sbmtxassertlocked_readonly(struct socket *so, struct sockbuf *sb)
+{
+	if (sb->sb_flags & SB_MTXLOCK) {
+		if (splassert_ctl > 0 && mtx_owned(&sb->sb_mtx) == 0)
+			splassert_fail(0, RW_WRITE, __func__);
+	} else
+		soassertlocked_readonly(so);
+}
+
 /*
  * Wait for data to arrive at/drain from a socket buffer.
  */
@@ -651,16 +667,22 @@ soreserve(struct socket *so, u_long sndc
 
 	if (sbreserve(so, &so->so_snd, sndcc))
 		goto bad;
-	if (sbreserve(so, &so->so_rcv, rcvcc))
-		goto bad2;
 	so->so_snd.sb_wat = sndcc;
-	so->so_rcv.sb_wat = rcvcc;
-	if (so->so_rcv.sb_lowat == 0)
-		so->so_rcv.sb_lowat = 1;
 	if (so->so_snd.sb_lowat == 0)
 		so->so_snd.sb_lowat = MCLBYTES;
 	if (so->so_snd.sb_lowat > so->so_snd.sb_hiwat)
 		so->so_snd.sb_lowat = so->so_snd.sb_hiwat;
+
+	mtx_enter(&so->so_rcv.sb_mtx);
+	if (sbreserve(so, &so->so_rcv, rcvcc)) {
+		mtx_leave(&so->so_rcv.sb_mtx);
+		goto bad2;
+	}
+	so->so_rcv.sb_wat = rcvcc;
+	if (so->so_rcv.sb_lowat == 0)
+		so->so_rcv.sb_lowat = 1;
+	mtx_leave(&so->so_rcv.sb_mtx);
+
 	return (0);
 bad2:
 	sbrelease(so, &so->so_snd);
@@ -676,8 +698,7 @@ bad:
 int
 sbreserve(struct socket *so, struct sockbuf *sb, u_long cc)
 {
-	KASSERT(sb == &so->so_rcv || sb == &so->so_snd);
-	soassertlocked(so);
+	sbmtxassertlocked(so, sb);
 
 	if (cc == 0 || cc > sb_max)
 		return (1);
@@ -818,7 +839,8 @@ sbappend(struct socket *so, struct sockb
 	if (m == NULL)
 		return;
 
-	soassertlocked(so);
+	sbmtxassertlocked(so, sb);
+
 	SBLASTRECORDCHK(sb, "sbappend 1");
 
 	if ((n = sb->sb_lastrecord) != NULL) {
@@ -899,8 +921,7 @@ sbappendrecord(struct socket *so, struct
 {
 	struct mbuf *m;
 
-	KASSERT(sb == &so->so_rcv || sb == &so->so_snd);
-	soassertlocked(so);
+	sbmtxassertlocked(so, sb);
 
 	if (m0 == NULL)
 		return;
@@ -983,6 +1004,8 @@ sbappendcontrol(struct socket *so, struc
 {
 	struct mbuf *m, *mlast, *n;
 	int eor = 0, space = 0;
+
+	sbmtxassertlocked(so, sb);
 
 	if (control == NULL)
 		panic("sbappendcontrol");
Index: sys/kern/uipc_syscalls.c
===================================================================
RCS file: /cvs/src/sys/kern/uipc_syscalls.c,v
retrieving revision 1.217
diff -u -p -r1.217 uipc_syscalls.c
--- sys/kern/uipc_syscalls.c	3 Feb 2024 22:50:09 -0000	1.217
+++ sys/kern/uipc_syscalls.c	11 Feb 2024 22:38:46 -0000
@@ -288,14 +288,26 @@ doaccept(struct proc *p, int sock, struc
 		goto out_unlock;
 	}
 	if ((headfp->f_flag & FNONBLOCK) && head->so_qlen == 0) {
-		if (head->so_rcv.sb_state & SS_CANTRCVMORE)
+		int cantrcvmore;
+
+		mtx_enter(&head->so_rcv.sb_mtx);
+		cantrcvmore = head->so_rcv.sb_state & SS_CANTRCVMORE;
+		mtx_leave(&head->so_rcv.sb_mtx);
+
+		if (cantrcvmore)
 			error = ECONNABORTED;
 		else
 			error = EWOULDBLOCK;
 		goto out_unlock;
 	}
 	while (head->so_qlen == 0 && head->so_error == 0) {
-		if (head->so_rcv.sb_state & SS_CANTRCVMORE) {
+		int cantrcvmore;
+
+		mtx_enter(&head->so_rcv.sb_mtx);
+		cantrcvmore = head->so_rcv.sb_state & SS_CANTRCVMORE;
+		mtx_leave(&head->so_rcv.sb_mtx);
+
+		if (cantrcvmore) {
 			head->so_error = ECONNABORTED;
 			break;
 		}
Index: sys/kern/uipc_usrreq.c
===================================================================
RCS file: /cvs/src/sys/kern/uipc_usrreq.c,v
retrieving revision 1.200
diff -u -p -r1.200 uipc_usrreq.c
--- sys/kern/uipc_usrreq.c	28 Nov 2023 09:29:20 -0000	1.200
+++ sys/kern/uipc_usrreq.c	11 Feb 2024 22:38:46 -0000
@@ -487,8 +487,11 @@ uipc_rcvd(struct socket *so)
 	 * Adjust backpressure on sender
 	 * and wakeup any waiting to write.
 	 */
+	mtx_enter(&so->so_rcv.sb_mtx);
 	so2->so_snd.sb_mbcnt = so->so_rcv.sb_mbcnt;
 	so2->so_snd.sb_cc = so->so_rcv.sb_cc;
+	mtx_leave(&so->so_rcv.sb_mtx);
+
 	sowwakeup(so2);
 	sounlock(so2);
 }
@@ -497,8 +500,9 @@ int
 uipc_send(struct socket *so, struct mbuf *m, struct mbuf *nam,
     struct mbuf *control)
 {
+	struct unpcb *unp = sotounpcb(so);
 	struct socket *so2;
-	int error = 0;
+	int error = 0, dowakeup = 0;
 
 	if (control) {
 		sounlock(so);
@@ -512,21 +516,25 @@ uipc_send(struct socket *so, struct mbuf
 		error = EPIPE;
 		goto dispose;
 	}
-	if ((so2 = unp_solock_peer(so)) == NULL) {
+
+	if (unp->unp_conn == NULL) {
 		error = ENOTCONN;
 		goto dispose;
 	}
 
+	so2 = unp->unp_conn->unp_socket;
+
 	/*
 	 * Send to paired receive port, and then raise
 	 * send buffer counts to maintain backpressure.
 	 * Wake up readers.
 	 */
+	mtx_enter(&so2->so_rcv.sb_mtx);
 	if (control) {
 		if (sbappendcontrol(so2, &so2->so_rcv, m, control)) {
 			control = NULL;
 		} else {
-			sounlock(so2);
+			mtx_leave(&so2->so_rcv.sb_mtx);
 			error = ENOBUFS;
 			goto dispose;
 		}
@@ -537,9 +545,12 @@ uipc_send(struct socket *so, struct mbuf
 	so->so_snd.sb_mbcnt = so2->so_rcv.sb_mbcnt;
 	so->so_snd.sb_cc = so2->so_rcv.sb_cc;
 	if (so2->so_rcv.sb_cc > 0)
+		dowakeup = 1;
+	mtx_leave(&so2->so_rcv.sb_mtx);
+
+	if (dowakeup)
 		sorwakeup(so2);
 
-	sounlock(so2);
 	m = NULL;
 
 dispose:
@@ -561,7 +572,7 @@ uipc_dgram_send(struct socket *so, struc
 	struct unpcb *unp = sotounpcb(so);
 	struct socket *so2;
 	const struct sockaddr *from;
-	int error = 0;
+	int error = 0, ret;
 
 	if (control) {
 		sounlock(so);
@@ -581,7 +592,7 @@ uipc_dgram_send(struct socket *so, struc
 			goto dispose;
 	}
 
-	if ((so2 = unp_solock_peer(so)) == NULL) {
+	if (unp->unp_conn == NULL) {
 		if (nam != NULL)
 			error = ECONNREFUSED;
 		else
@@ -589,20 +600,24 @@ uipc_dgram_send(struct socket *so, struc
 		goto dispose;
 	}
 
+	so2 = unp->unp_conn->unp_socket;
+
 	if (unp->unp_addr)
 		from = mtod(unp->unp_addr, struct sockaddr *);
 	else
 		from = &sun_noname;
-	if (sbappendaddr(so2, &so2->so_rcv, from, m, control)) {
+
+	mtx_enter(&so2->so_rcv.sb_mtx);
+	ret = sbappendaddr(so2, &so2->so_rcv, from, m, control);
+	mtx_leave(&so2->so_rcv.sb_mtx);
+
+	if (ret > 0) {
 		sorwakeup(so2);
 		m = NULL;
 		control = NULL;
 	} else
 		error = ENOBUFS;
 
-	if (so2 != so)
-		sounlock(so2);
-
 	if (nam)
 		unp_disconnect(unp);
 
@@ -1388,9 +1403,9 @@ unp_gc(void *arg __unused)
 		if ((unp->unp_gcflags & UNP_GCDEAD) == 0)
 			continue;
 		so = unp->unp_socket;
-		solock(so);
+		mtx_enter(&so->so_rcv.sb_mtx);
 		unp_scan(so->so_rcv.sb_mb, unp_remove_gcrefs);
-		sounlock(so);
+		mtx_leave(&so->so_rcv.sb_mtx);
 	}
 
 	/*
@@ -1412,9 +1427,9 @@ unp_gc(void *arg __unused)
 			unp->unp_gcflags &= ~UNP_GCDEAD;
 
 			so = unp->unp_socket;
-			solock(so);
+			mtx_enter(&so->so_rcv.sb_mtx);
 			unp_scan(so->so_rcv.sb_mb, unp_restore_gcrefs);
-			sounlock(so);
+			mtx_leave(&so->so_rcv.sb_mtx);
 
 			KASSERT(nunref > 0);
 			nunref--;
@@ -1436,7 +1451,7 @@ unp_gc(void *arg __unused)
 				 */
 				so = unp->unp_socket;
 				solock(so);
-				unp_scan(so->so_rcv.sb_mb, unp_discard);
+				sorflush(so);
 				sounlock(so);
 			}
 		}
Index: sys/miscfs/fifofs/fifo_vnops.c
===================================================================
RCS file: /cvs/src/sys/miscfs/fifofs/fifo_vnops.c,v
retrieving revision 1.103
diff -u -p -r1.103 fifo_vnops.c
--- sys/miscfs/fifofs/fifo_vnops.c	3 Feb 2024 22:50:09 -0000	1.103
+++ sys/miscfs/fifofs/fifo_vnops.c	11 Feb 2024 22:38:46 -0000
@@ -201,7 +201,9 @@ fifo_open(void *v)
 		if (fip->fi_writers == 1) {
 			solock(rso);
 			rso->so_state &= ~SS_ISDISCONNECTED;
+			mtx_enter(&rso->so_rcv.sb_mtx);
 			rso->so_rcv.sb_state &= ~SS_CANTRCVMORE;
+			mtx_leave(&rso->so_rcv.sb_mtx);
 			sounlock(rso);
 			if (fip->fi_readers > 0)
 				wakeup(&fip->fi_readers);
@@ -518,7 +520,6 @@ filt_fiforead(struct knote *kn, long hin
 	struct socket *so = kn->kn_hook;
 	int rv;
 
-	soassertlocked(so);
 	MUTEX_ASSERT_LOCKED(&so->so_rcv.sb_mtx);
 
 	kn->kn_data = so->so_rcv.sb_cc;
Index: sys/sys/socketvar.h
===================================================================
RCS file: /cvs/src/sys/sys/socketvar.h,v
retrieving revision 1.123
diff -u -p -r1.123 socketvar.h
--- sys/sys/socketvar.h	11 Feb 2024 18:14:27 -0000	1.123
+++ sys/sys/socketvar.h	11 Feb 2024 22:38:47 -0000
@@ -213,6 +213,7 @@ sb_mtx_unlock(struct sockbuf *sb)
 }
 
 void	sbmtxassertlocked(struct socket *so, struct sockbuf *);
+void	sbmtxassertlocked_readonly(struct socket *so, struct sockbuf *);
 
 /*
  * Do we need to notify the other side when I/O is possible?
@@ -242,7 +243,7 @@ sb_notify(struct socket *so, struct sock
 static inline long
 sbspace(struct socket *so, struct sockbuf *sb)
 {
-	soassertlocked_readonly(so);
+	sbmtxassertlocked_readonly(so, sb);
 
 	return lmin(sb->sb_hiwat - sb->sb_cc, sb->sb_mbmax - sb->sb_mbcnt);
 }
@@ -394,6 +395,7 @@ int	sosend(struct socket *, struct mbuf 
 	    struct mbuf *, struct mbuf *, int);
 int	sosetopt(struct socket *, int, int, struct mbuf *);
 int	soshutdown(struct socket *, int);
+void	sorflush(struct socket *so);
 void	sowakeup(struct socket *, struct sockbuf *);
 void	sorwakeup(struct socket *);
 void	sowwakeup(struct socket *);