Index | Thread | Search

From:
Alexander Bluhm <bluhm@openbsd.org>
Subject:
TCP parallel input preview
To:
tech@openbsd.org
Date:
Wed, 15 Jan 2025 16:49:37 +0100

Download raw body.

Thread
Hi,

This is my idea to unlock TCP input and run it in parallel.  On my
4 CPU test machine I gain 100% percent TCP throughput when running
10 TCP streams in parallel.  Unfortunately single TCP performance
drops by 25%.

Before it can be commited these issues have to be resolved:
- sonewconn() must return locked socket a bit more consistently
- nd6_nud_hint() must be made MP safe
- single stream performance must not be much slower than currently

In my flame graphs I see contention on the socket lock between
soreceive() and tcp_input().
http://bluhm.genua.de/perform/results/2025-01-14T10%3A27%3A19Z/patch-sys-tcp-mpinput.0/btrace/iperf3_-c10.3.45.35_-w1m_-t10_-R-btrace-kstack.0.svg?s=rw_

With current exclusive net lock the packets are queued before
tcp_input().  With this single lock we do bulk input which seems
to perform better.  Getting the socket lock for each TCP packet is
slow.

Multiple streams on multiple CPU compensate the locking effect.
http://bluhm.genua.de/perform/results/2025-01-14T10%3A27%3A19Z/patch-sys-tcp-mpinput.0/btrace/iperf3_-c10.3.45.35_-w1m_-P10_-t10_-R-btrace-kstack.0.svg?s=rw_

bluhm

Index: kern/uipc_socket2.c
===================================================================
RCS file: /data/mirror/openbsd/cvs/src/sys/kern/uipc_socket2.c,v
diff -u -p -r1.164 uipc_socket2.c
--- kern/uipc_socket2.c	5 Jan 2025 12:36:48 -0000	1.164
+++ kern/uipc_socket2.c	15 Jan 2025 15:23:05 -0000
@@ -220,6 +220,8 @@ sonewconn(struct socket *head, int conns
 	 */
 	if (persocket)
 		solock(so);
+	else
+		rw_enter_write(&so->so_lock);
 
 	/*
 	 * Inherit watermarks but those may get clamped in low mem situations.
@@ -260,6 +262,8 @@ sonewconn(struct socket *head, int conns
 fail:
 	if (persocket)
 		sounlock(so);
+	else
+		rw_exit_write(&so->so_lock);
 	sigio_free(&so->so_sigio);
 	klist_free(&so->so_rcv.sb_klist);
 	klist_free(&so->so_snd.sb_klist);
Index: netinet/in_proto.c
===================================================================
RCS file: /data/mirror/openbsd/cvs/src/sys/netinet/in_proto.c,v
diff -u -p -r1.121 in_proto.c
--- netinet/in_proto.c	5 Jan 2025 12:36:48 -0000	1.121
+++ netinet/in_proto.c	15 Jan 2025 15:23:05 -0000
@@ -197,7 +197,8 @@ const struct protosw inetsw[] = {
   .pr_type	= SOCK_STREAM,
   .pr_domain	= &inetdomain,
   .pr_protocol	= IPPROTO_TCP,
-  .pr_flags	= PR_CONNREQUIRED|PR_WANTRCVD|PR_ABRTACPTDIS|PR_SPLICE,
+  .pr_flags	= PR_CONNREQUIRED|PR_WANTRCVD|PR_ABRTACPTDIS|PR_SPLICE|
+		    PR_MPINPUT,
   .pr_input	= tcp_input,
   .pr_ctlinput	= tcp_ctlinput,
   .pr_ctloutput	= tcp_ctloutput,
Index: netinet/tcp_input.c
===================================================================
RCS file: /data/mirror/openbsd/cvs/src/sys/netinet/tcp_input.c,v
diff -u -p -r1.422 tcp_input.c
--- netinet/tcp_input.c	10 Jan 2025 20:19:03 -0000	1.422
+++ netinet/tcp_input.c	15 Jan 2025 15:26:07 -0000
@@ -605,6 +605,11 @@ findpcb:
 		tcpstat_inc(tcps_noport);
 		goto dropwithreset_ratelim;
 	}
+	so = in_pcbsolock_ref(inp);
+	if (so == NULL) {
+		tcpstat_inc(tcps_noport);
+		goto dropwithreset_ratelim;
+	}
 
 	KASSERT(sotoinpcb(inp->inp_socket) == inp);
 	KASSERT(intotcpcb(inp) == NULL || intotcpcb(inp)->t_inpcb == inp);
@@ -637,7 +642,6 @@ findpcb:
 	else
 		tiwin = th->th_win;
 
-	so = inp->inp_socket;
 	if (so->so_options & (SO_DEBUG|SO_ACCEPTCONN)) {
 		union syn_cache_sa src;
 		union syn_cache_sa dst;
@@ -726,6 +730,7 @@ findpcb:
 					 * in use for the reply,
 					 * do not free it.
 					 */
+					so = NULL;
 					m = *mp = NULL;
 					goto drop;
 				} else {
@@ -733,13 +738,11 @@ findpcb:
 					 * We have created a
 					 * full-blown connection.
 					 */
-					tp = NULL;
 					in_pcbunref(inp);
 					inp = in_pcbref(sotoinpcb(so));
 					tp = intotcpcb(inp);
 					if (tp == NULL)
 						goto badsyn;	/*XXX*/
-
 				}
 				break;
 
@@ -845,6 +848,7 @@ findpcb:
 					tcpstat_inc(tcps_dropsyn);
 					goto drop;
 				}
+				in_pcbsounlock_rele(inp, so);
 				in_pcbunref(inp);
 				return IPPROTO_DONE;
 			}
@@ -1020,6 +1024,7 @@ findpcb:
 				if (so->so_snd.sb_cc ||
 				    tp->t_flags & TF_NEEDOUTPUT)
 					(void) tcp_output(tp);
+				in_pcbsounlock_rele(inp, so);
 				in_pcbunref(inp);
 				return IPPROTO_DONE;
 			}
@@ -1070,6 +1075,7 @@ findpcb:
 			tp->t_flags &= ~TF_BLOCKOUTPUT;
 			if (tp->t_flags & (TF_ACKNOW|TF_NEEDOUTPUT))
 				(void) tcp_output(tp);
+			in_pcbsounlock_rele(inp, so);
 			in_pcbunref(inp);
 			return IPPROTO_DONE;
 		}
@@ -1262,6 +1268,8 @@ trimthenstep6:
 			    ((arc4random() & 0x7fffffff) | 0x8000);
 			reuse = &iss;
 			tp = tcp_close(tp);
+			in_pcbsounlock_rele(inp, so);
+			so = NULL;
 			in_pcbunref(inp);
 			inp = NULL;
 			goto findpcb;
@@ -2062,6 +2070,7 @@ dodata:							/* XXX */
 	 */
 	if (tp->t_flags & (TF_ACKNOW|TF_NEEDOUTPUT))
 		(void) tcp_output(tp);
+	in_pcbsounlock_rele(inp, so);
 	in_pcbunref(inp);
 	return IPPROTO_DONE;
 
@@ -2091,6 +2100,7 @@ dropafterack:
 	m_freem(m);
 	tp->t_flags |= TF_ACKNOW;
 	(void) tcp_output(tp);
+	in_pcbsounlock_rele(inp, so);
 	in_pcbunref(inp);
 	return IPPROTO_DONE;
 
@@ -2126,6 +2136,7 @@ dropwithreset:
 		    (tcp_seq)0, TH_RST|TH_ACK, m->m_pkthdr.ph_rtableid, now);
 	}
 	m_freem(m);
+	in_pcbsounlock_rele(inp, so);
 	in_pcbunref(inp);
 	return IPPROTO_DONE;
 
@@ -2137,6 +2148,7 @@ drop:
 		tcp_trace(TA_DROP, ostate, tp, otp, &saveti.caddr, 0, tlen);
 
 	m_freem(m);
+	in_pcbsounlock_rele(inp, so);
 	in_pcbunref(inp);
 	return IPPROTO_DONE;
 }
@@ -3531,15 +3543,18 @@ syn_cache_get(struct sockaddr *src, stru
 	struct inpcb *inp, *oldinp;
 	struct tcpcb *tp = NULL;
 	struct mbuf *am;
-	struct socket *oso;
+	struct socket *oldso;
 	u_int rtableid;
 
 	NET_ASSERT_LOCKED();
 
+	inp = sotoinpcb(so);
+
 	mtx_enter(&syn_cache_mtx);
 	sc = syn_cache_lookup(src, dst, &scp, sotoinpcb(so)->inp_rtableid);
 	if (sc == NULL) {
 		mtx_leave(&syn_cache_mtx);
+		in_pcbsounlock_rele(inp, so);
 		return (NULL);
 	}
 
@@ -3553,6 +3568,7 @@ syn_cache_get(struct sockaddr *src, stru
 		refcnt_take(&sc->sc_refcnt);
 		mtx_leave(&syn_cache_mtx);
 		(void) syn_cache_respond(sc, m, now, do_ecn);
+		in_pcbsounlock_rele(inp, so);
 		syn_cache_put(sc);
 		return ((struct socket *)(-1));
 	}
@@ -3567,12 +3583,13 @@ syn_cache_get(struct sockaddr *src, stru
 	 * connection when the SYN arrived.  If we can't create
 	 * the connection, abort it.
 	 */
-	oso = so;
+	oldso = so;
+	oldinp = inp;
 	so = sonewconn(so, SS_ISCONNECTED, M_DONTWAIT);
 	if (so == NULL)
 		goto resetandabort;
-
-	oldinp = sotoinpcb(oso);
+	soassertlocked(so);
+	soref(so);
 	inp = sotoinpcb(so);
 
 #ifdef IPSEC
@@ -3633,7 +3650,7 @@ syn_cache_get(struct sockaddr *src, stru
 	(void) m_free(am);
 
 	tp = intotcpcb(inp);
-	tp->t_flags = sototcpcb(oso)->t_flags & (TF_NOPUSH|TF_NODELAY);
+	tp->t_flags = sototcpcb(oldso)->t_flags & (TF_NOPUSH|TF_NODELAY);
 	if (sc->sc_request_r_scale != 15) {
 		tp->requested_s_scale = sc->sc_requested_s_scale;
 		tp->request_r_scale = sc->sc_request_r_scale;
@@ -3645,6 +3662,7 @@ syn_cache_get(struct sockaddr *src, stru
 	tp->t_template = tcp_template(tp);
 	if (tp->t_template == 0) {
 		tp = tcp_drop(tp, ENOBUFS);	/* destroys socket */
+		in_pcbsounlock_rele(inp, so);
 		so = NULL;
 		goto abort;
 	}
@@ -3696,8 +3714,9 @@ syn_cache_get(struct sockaddr *src, stru
 		tp->rcv_adv = tp->rcv_nxt + sc->sc_win;
 	tp->last_ack_sent = tp->rcv_nxt;
 
-	tcpstat_inc(tcps_sc_completed);
+	in_pcbsounlock_rele(oldinp, oldso);
 	syn_cache_put(sc);
+	tcpstat_inc(tcps_sc_completed);
 	return (so);
 
 resetandabort:
@@ -3707,6 +3726,8 @@ abort:
 	m_freem(m);
 	if (so != NULL)
 		soabort(so);
+	in_pcbsounlock_rele(inp, so);
+	in_pcbsounlock_rele(oldinp, oldso);
 	syn_cache_put(sc);
 	tcpstat_inc(tcps_sc_aborted);
 	return ((struct socket *)(-1));
@@ -3740,8 +3761,8 @@ syn_cache_reset(struct sockaddr *src, st
 	}
 	syn_cache_rm(sc);
 	mtx_leave(&syn_cache_mtx);
-	tcpstat_inc(tcps_sc_reset);
 	syn_cache_put(sc);
+	tcpstat_inc(tcps_sc_reset);
 }
 
 void
@@ -3781,8 +3802,8 @@ syn_cache_unreach(const struct sockaddr 
 
 	syn_cache_rm(sc);
 	mtx_leave(&syn_cache_mtx);
-	tcpstat_inc(tcps_sc_unreach);
 	syn_cache_put(sc);
+	tcpstat_inc(tcps_sc_unreach);
 }
 
 /*
@@ -3810,7 +3831,7 @@ syn_cache_add(struct sockaddr *src, stru
 	struct syn_cache_head *scp;
 	struct mbuf *ipopts;
 
-	NET_ASSERT_LOCKED();
+	soassertlocked(so);
 
 	tp = sototcpcb(so);
 
@@ -3968,9 +3989,8 @@ syn_cache_add(struct sockaddr *src, stru
 	if (syn_cache_respond(sc, m, now, do_ecn) == 0) {
 		mtx_enter(&syn_cache_mtx);
 		/*
-		 * XXXSMP Currently exclusive netlock prevents another insert
-		 * after our syn_cache_lookup() and before syn_cache_insert().
-		 * Double insert should be handled and not rely on netlock.
+		 * Socket lock prevents another insert after our
+		 * syn_cache_lookup() and before syn_cache_insert().
 		 */
 		syn_cache_insert(sc, tp);
 		mtx_leave(&syn_cache_mtx);
Index: netinet6/in6_proto.c
===================================================================
RCS file: /data/mirror/openbsd/cvs/src/sys/netinet6/in6_proto.c,v
diff -u -p -r1.124 in6_proto.c
--- netinet6/in6_proto.c	5 Jan 2025 12:36:48 -0000	1.124
+++ netinet6/in6_proto.c	15 Jan 2025 15:23:05 -0000
@@ -147,7 +147,8 @@ const struct protosw inet6sw[] = {
   .pr_type	= SOCK_STREAM,
   .pr_domain	= &inet6domain,
   .pr_protocol	= IPPROTO_TCP,
-  .pr_flags	= PR_CONNREQUIRED|PR_WANTRCVD|PR_ABRTACPTDIS|PR_SPLICE,
+  .pr_flags	= PR_CONNREQUIRED|PR_WANTRCVD|PR_ABRTACPTDIS|PR_SPLICE|
+		    PR_MPINPUT,
   .pr_input	= tcp_input,
   .pr_ctlinput	= tcp6_ctlinput,
   .pr_ctloutput	= tcp_ctloutput,
Index: netinet6/nd6.c
===================================================================
RCS file: /data/mirror/openbsd/cvs/src/sys/netinet6/nd6.c,v
diff -u -p -r1.283 nd6.c
--- netinet6/nd6.c	4 Sep 2024 07:54:52 -0000	1.283
+++ netinet6/nd6.c	15 Jan 2025 15:23:05 -0000
@@ -709,7 +709,9 @@ nd6_nud_hint(struct rtentry *rt)
 	struct llinfo_nd6 *ln;
 	struct ifnet *ifp;
 
+	/* XXX
 	NET_ASSERT_LOCKED_EXCLUSIVE();
+	*/
 
 	ifp = if_get(rt->rt_ifidx);
 	if (ifp == NULL)