Index | Thread | Search

From:
Mark Patruck <mark@wrapped.cx>
Subject:
Re: parallel TCP input
To:
Alexander Bluhm <bluhm@openbsd.org>
Cc:
tech@openbsd.org
Date:
Wed, 12 Mar 2025 10:25:52 +0100

Download raw body.

Thread
On 03.03.2025 01:19, Alexander Bluhm wrote:
>On Wed, Feb 05, 2025 at 07:25:11PM +0100, Alexander Bluhm wrote:
>> To remember the softnet thread and packet offset, I use the cookie
>> field in the mbuf header.  Hopefully nothing else uses cookies in
>> this path.
>
>We have struct netstack as a softnet thread local storage now.  That
>means we can temporarily store the TCP input mbuf list there.  We
>do not risk queueing the mbuf with softnet ph_cookie somewhere else.
>Note the I still need the cookie to store the TCP header offset.
>But no other subsystem will reuse the cookie while the mbuf is in
>the TCP input mbuf list.
>
>Mark Patruck has reported problems with wireguard and the previous
>diff.  Maybe they were releated to the mbuf cookie.

Running -current + this diff, i can confirm that my wireguard
tunnels work now, also i didn't spot any other issues so far.

Thanks,

	-Mark

>updated diff below
>
>bluhm
>
>Index: net/if.c
>===================================================================
>RCS file: /data/mirror/openbsd/cvs/src/sys/net/if.c,v
>diff -u -p -r1.728 if.c
>--- net/if.c	2 Mar 2025 21:28:31 -0000	1.728
>+++ net/if.c	2 Mar 2025 22:28:06 -0000
>@@ -1001,10 +1001,21 @@ if_input_process(struct ifnet *ifp, stru
> 	 */
>
> 	sn = net_sn(idx);
>+	ml_init(&sn->sn_netstack.ns_tcp_ml);
>+#ifdef INET6
>+	ml_init(&sn->sn_netstack.ns_tcp6_ml);
>+#endif
>
> 	NET_LOCK_SHARED();
>+
> 	while ((m = ml_dequeue(ml)) != NULL)
> 		(*ifp->if_input)(ifp, m, &sn->sn_netstack);
>+
>+	tcp_input_mlist(&sn->sn_netstack.ns_tcp_ml, AF_INET);
>+#ifdef INET6
>+	tcp_input_mlist(&sn->sn_netstack.ns_tcp6_ml, AF_INET6);
>+#endif
>+
> 	NET_UNLOCK_SHARED();
> }
>
>Index: net/if_var.h
>===================================================================
>RCS file: /data/mirror/openbsd/cvs/src/sys/net/if_var.h,v
>diff -u -p -r1.136 if_var.h
>--- net/if_var.h	2 Mar 2025 21:28:32 -0000	1.136
>+++ net/if_var.h	2 Mar 2025 22:28:06 -0000
>@@ -92,7 +92,9 @@ struct task;
> struct cpumem;
>
> struct netstack {
>-	struct route	ns_route;
>+	struct route		ns_route;
>+	struct mbuf_list	ns_tcp_ml;
>+	struct mbuf_list	ns_tcp6_ml;
> };
>
> /*
>Index: netinet/in_proto.c
>===================================================================
>RCS file: /data/mirror/openbsd/cvs/src/sys/netinet/in_proto.c,v
>diff -u -p -r1.121 in_proto.c
>--- netinet/in_proto.c	5 Jan 2025 12:36:48 -0000	1.121
>+++ netinet/in_proto.c	2 Mar 2025 22:28:06 -0000
>@@ -197,7 +197,8 @@ const struct protosw inetsw[] = {
>   .pr_type	= SOCK_STREAM,
>   .pr_domain	= &inetdomain,
>   .pr_protocol	= IPPROTO_TCP,
>-  .pr_flags	= PR_CONNREQUIRED|PR_WANTRCVD|PR_ABRTACPTDIS|PR_SPLICE,
>+  .pr_flags	= PR_CONNREQUIRED|PR_WANTRCVD|PR_ABRTACPTDIS|PR_SPLICE|
>+		    PR_MPINPUT,
>   .pr_input	= tcp_input,
>   .pr_ctlinput	= tcp_ctlinput,
>   .pr_ctloutput	= tcp_ctloutput,
>Index: netinet/tcp_input.c
>===================================================================
>RCS file: /data/mirror/openbsd/cvs/src/sys/netinet/tcp_input.c,v
>diff -u -p -r1.433 tcp_input.c
>--- netinet/tcp_input.c	2 Mar 2025 21:28:32 -0000	1.433
>+++ netinet/tcp_input.c	2 Mar 2025 22:28:06 -0000
>@@ -100,9 +100,6 @@
> #include <net/pfvar.h>
> #endif
>
>-int tcp_mss_adv(struct rtentry *, int);
>-int tcp_flush_queue(struct tcpcb *);
>-
> #ifdef INET6
> #include <netinet6/in6_var.h>
> #include <netinet6/nd6.h>
>@@ -177,6 +174,9 @@ do { \
> 	if_put(ifp); \
> } while (0)
>
>+int	 tcp_input_solocked(struct mbuf **, int *, int, int, struct socket **);
>+int	 tcp_mss_adv(struct rtentry *, int);
>+int	 tcp_flush_queue(struct tcpcb *);
> void	 tcp_sack_partialack(struct tcpcb *, struct tcphdr *);
> void	 tcp_newreno_partialack(struct tcpcb *, struct tcphdr *);
>
>@@ -347,12 +347,53 @@ tcp_flush_queue(struct tcpcb *tp)
> 	return (flags);
> }
>
>+int
>+tcp_input(struct mbuf **mp, int *offp, int proto, int af, struct netstack *ns)
>+{
>+	if (ns == NULL)
>+		return tcp_input_solocked(mp, offp, proto, af, NULL);
>+	(*mp)->m_pkthdr.ph_cookie = (void *)(long)(*offp);
>+	switch (af) {
>+	case AF_INET:
>+		ml_enqueue(&ns->ns_tcp_ml, *mp);
>+		break;
>+#ifdef INET6
>+	case AF_INET6:
>+		ml_enqueue(&ns->ns_tcp6_ml, *mp);
>+		break;
>+#endif
>+	default:
>+		m_freemp(mp);
>+	}
>+	*mp = NULL;
>+	return IPPROTO_DONE;
>+}
>+
>+void
>+tcp_input_mlist(struct mbuf_list *ml, int af)
>+{
>+	struct socket *so = NULL;
>+	struct mbuf *m;
>+
>+	while ((m = ml_dequeue(ml)) != NULL) {
>+		int off, nxt;
>+
>+		off = (long)m->m_pkthdr.ph_cookie;
>+		m->m_pkthdr.ph_cookie = NULL;
>+		nxt = tcp_input_solocked(&m, &off, IPPROTO_TCP, af, &so);
>+		KASSERT(nxt == IPPROTO_DONE);
>+	}
>+
>+	in_pcbsounlock_rele(NULL, so);
>+}
>+
> /*
>  * TCP input routine, follows pages 65-76 of the
>  * protocol specification dated September, 1981 very closely.
>  */
> int
>-tcp_input(struct mbuf **mp, int *offp, int proto, int af, struct netstack *ns)
>+tcp_input_solocked(struct mbuf **mp, int *offp, int proto, int af,
>+    struct socket ** solocked)
> {
> 	struct mbuf *m = *mp;
> 	int iphlen = *offp;
>@@ -603,6 +644,25 @@ findpcb:
> 		tcpstat_inc(tcps_noport);
> 		goto dropwithreset_ratelim;
> 	}
>+	/*
>+	 * Avoid needless lock and unlock operation when handling multiple
>+	 * TCP packets from the same stream consecutively.
>+	 */
>+	if (solocked != NULL && *solocked != NULL &&
>+	    sotoinpcb(*solocked) == inp) {
>+		so = *solocked;
>+		*solocked = NULL;
>+	} else {
>+		if (solocked != NULL && *solocked != NULL) {
>+			in_pcbsounlock_rele(NULL, *solocked);
>+			*solocked = NULL;
>+		}
>+		so = in_pcbsolock_ref(inp);
>+	}
>+	if (so == NULL) {
>+		tcpstat_inc(tcps_noport);
>+		goto dropwithreset_ratelim;
>+	}
>
> 	KASSERT(sotoinpcb(inp->inp_socket) == inp);
> 	KASSERT(intotcpcb(inp) == NULL || intotcpcb(inp)->t_inpcb == inp);
>@@ -635,7 +695,6 @@ findpcb:
> 	else
> 		tiwin = th->th_win;
>
>-	so = inp->inp_socket;
> 	if (so->so_options & (SO_DEBUG|SO_ACCEPTCONN)) {
> 		union syn_cache_sa src;
> 		union syn_cache_sa dst;
>@@ -724,6 +783,7 @@ findpcb:
> 					 * in use for the reply,
> 					 * do not free it.
> 					 */
>+					so = NULL;
> 					m = *mp = NULL;
> 					goto drop;
> 				} else {
>@@ -731,13 +791,11 @@ findpcb:
> 					 * We have created a
> 					 * full-blown connection.
> 					 */
>-					tp = NULL;
> 					in_pcbunref(inp);
> 					inp = in_pcbref(sotoinpcb(so));
> 					tp = intotcpcb(inp);
> 					if (tp == NULL)
> 						goto badsyn;	/*XXX*/
>-
> 				}
> 				break;
>
>@@ -843,6 +901,10 @@ findpcb:
> 					tcpstat_inc(tcps_dropsyn);
> 					goto drop;
> 				}
>+				if (solocked != NULL)
>+					*solocked = so;
>+				else
>+					in_pcbsounlock_rele(inp, so);
> 				in_pcbunref(inp);
> 				return IPPROTO_DONE;
> 			}
>@@ -1018,6 +1080,10 @@ findpcb:
> 				if (so->so_snd.sb_cc ||
> 				    tp->t_flags & TF_NEEDOUTPUT)
> 					(void) tcp_output(tp);
>+				if (solocked != NULL)
>+					*solocked = so;
>+				else
>+					in_pcbsounlock_rele(inp, so);
> 				in_pcbunref(inp);
> 				return IPPROTO_DONE;
> 			}
>@@ -1068,6 +1134,10 @@ findpcb:
> 			tp->t_flags &= ~TF_BLOCKOUTPUT;
> 			if (tp->t_flags & (TF_ACKNOW|TF_NEEDOUTPUT))
> 				(void) tcp_output(tp);
>+			if (solocked != NULL)
>+				*solocked = so;
>+			else
>+				in_pcbsounlock_rele(inp, so);
> 			in_pcbunref(inp);
> 			return IPPROTO_DONE;
> 		}
>@@ -1261,6 +1331,8 @@ trimthenstep6:
> 			    ((arc4random() & 0x7fffffff) | 0x8000);
> 			reuse = &iss;
> 			tp = tcp_close(tp);
>+			in_pcbsounlock_rele(inp, so);
>+			so = NULL;
> 			in_pcbunref(inp);
> 			inp = NULL;
> 			goto findpcb;
>@@ -2065,6 +2137,10 @@ dodata:							/* XXX */
> 	 */
> 	if (tp->t_flags & (TF_ACKNOW|TF_NEEDOUTPUT))
> 		(void) tcp_output(tp);
>+	if (solocked != NULL)
>+		*solocked = so;
>+	else
>+		in_pcbsounlock_rele(inp, so);
> 	in_pcbunref(inp);
> 	return IPPROTO_DONE;
>
>@@ -2094,6 +2170,10 @@ dropafterack:
> 	m_freem(m);
> 	tp->t_flags |= TF_ACKNOW;
> 	(void) tcp_output(tp);
>+	if (solocked != NULL)
>+		*solocked = so;
>+	else
>+		in_pcbsounlock_rele(inp, so);
> 	in_pcbunref(inp);
> 	return IPPROTO_DONE;
>
>@@ -2129,6 +2209,7 @@ dropwithreset:
> 		    (tcp_seq)0, TH_RST|TH_ACK, m->m_pkthdr.ph_rtableid, now);
> 	}
> 	m_freem(m);
>+	in_pcbsounlock_rele(inp, so);
> 	in_pcbunref(inp);
> 	return IPPROTO_DONE;
>
>@@ -2140,6 +2221,7 @@ drop:
> 		tcp_trace(TA_DROP, ostate, tp, otp, &saveti.caddr, 0, tlen);
>
> 	m_freem(m);
>+	in_pcbsounlock_rele(inp, so);
> 	in_pcbunref(inp);
> 	return IPPROTO_DONE;
> }
>@@ -3543,6 +3625,7 @@ syn_cache_get(struct sockaddr *src, stru
> 	sc = syn_cache_lookup(src, dst, &scp, inp->inp_rtableid);
> 	if (sc == NULL) {
> 		mtx_leave(&syn_cache_mtx);
>+		in_pcbsounlock_rele(inp, so);
> 		return (NULL);
> 	}
>
>@@ -3556,6 +3639,7 @@ syn_cache_get(struct sockaddr *src, stru
> 		refcnt_take(&sc->sc_refcnt);
> 		mtx_leave(&syn_cache_mtx);
> 		(void) syn_cache_respond(sc, m, now, do_ecn);
>+		in_pcbsounlock_rele(inp, so);
> 		syn_cache_put(sc);
> 		return ((struct socket *)(-1));
> 	}
>@@ -3696,7 +3780,7 @@ syn_cache_get(struct sockaddr *src, stru
> 		tp->rcv_adv = tp->rcv_nxt + sc->sc_win;
> 	tp->last_ack_sent = tp->rcv_nxt;
>
>-	in_pcbsounlock_rele(inp, so);
>+	in_pcbsounlock_rele(listeninp, listenso);
> 	tcpstat_inc(tcps_sc_completed);
> 	syn_cache_put(sc);
> 	return (so);
>@@ -3709,6 +3793,7 @@ abort:
> 		tp = tcp_drop(tp, ECONNABORTED);	/* destroys socket */
> 	m_freem(m);
> 	in_pcbsounlock_rele(inp, so);
>+	in_pcbsounlock_rele(listeninp, listenso);
> 	syn_cache_put(sc);
> 	tcpstat_inc(tcps_sc_aborted);
> 	return ((struct socket *)(-1));
>@@ -3813,7 +3898,7 @@ syn_cache_add(struct sockaddr *src, stru
> 	struct mbuf *ipopts;
> 	struct rtentry *rt = NULL;
>
>-	NET_ASSERT_LOCKED();
>+	soassertlocked(so);
>
> 	tp = sototcpcb(so);
>
>@@ -3989,9 +4074,8 @@ syn_cache_add(struct sockaddr *src, stru
> 	if (syn_cache_respond(sc, m, now, do_ecn) == 0) {
> 		mtx_enter(&syn_cache_mtx);
> 		/*
>-		 * XXXSMP Currently exclusive netlock prevents another insert
>-		 * after our syn_cache_lookup() and before syn_cache_insert().
>-		 * Double insert should be handled and not rely on netlock.
>+		 * Socket lock prevents another insert after our
>+		 * syn_cache_lookup() and before syn_cache_insert().
> 		 */
> 		syn_cache_insert(sc, tp);
> 		mtx_leave(&syn_cache_mtx);
>Index: netinet/tcp_var.h
>===================================================================
>RCS file: /data/mirror/openbsd/cvs/src/sys/netinet/tcp_var.h,v
>diff -u -p -r1.186 tcp_var.h
>--- netinet/tcp_var.h	2 Mar 2025 21:28:32 -0000	1.186
>+++ netinet/tcp_var.h	2 Mar 2025 22:28:06 -0000
>@@ -718,6 +718,7 @@ int	 tcp_dooptions(struct tcpcb *, u_cha
> 		struct mbuf *, int, struct tcp_opt_info *, u_int, uint64_t);
> void	 tcp_init(void);
> int	 tcp_input(struct mbuf **, int *, int, int, struct netstack *);
>+void	 tcp_input_mlist(struct mbuf_list *, int);
> int	 tcp_mss(struct tcpcb *, int);
> void	 tcp_mss_update(struct tcpcb *);
> u_int	 tcp_hdrsz(struct tcpcb *);
>Index: netinet6/in6_proto.c
>===================================================================
>RCS file: /data/mirror/openbsd/cvs/src/sys/netinet6/in6_proto.c,v
>diff -u -p -r1.124 in6_proto.c
>--- netinet6/in6_proto.c	5 Jan 2025 12:36:48 -0000	1.124
>+++ netinet6/in6_proto.c	2 Mar 2025 22:28:06 -0000
>@@ -147,7 +147,8 @@ const struct protosw inet6sw[] = {
>   .pr_type	= SOCK_STREAM,
>   .pr_domain	= &inet6domain,
>   .pr_protocol	= IPPROTO_TCP,
>-  .pr_flags	= PR_CONNREQUIRED|PR_WANTRCVD|PR_ABRTACPTDIS|PR_SPLICE,
>+  .pr_flags	= PR_CONNREQUIRED|PR_WANTRCVD|PR_ABRTACPTDIS|PR_SPLICE|
>+		    PR_MPINPUT,
>   .pr_input	= tcp_input,
>   .pr_ctlinput	= tcp6_ctlinput,
>   .pr_ctloutput	= tcp_ctloutput,
>

--
Mark Patruck ( mark at wrapped.cx )
GPG key 0xF2865E51 / 187F F6D3 EE04 1DCE 1C74  F644 0D3C F66F F286 5E51
  
https://www.wrapped.cx