Index | Thread | Search

From:
Alexander Bluhm <bluhm@openbsd.org>
Subject:
parallel TCP input
To:
tech@openbsd.org
Date:
Wed, 5 Feb 2025 19:25:11 +0100

Download raw body.

Thread
Hi,

To run TCP input in parallel, each packet needs the socket lock.
As lock and unlock per packet kills throughput for single stream
TCP, diff below keeps the socket locked when consecutive packets
belong to the same socket.

I keep a list of TCP packets in struct softnet.  So each softnet
thread has its own list to accumulate packets.  After processing
all packets from an interface input queue, TCP input is called for
each TCP packet.  The socket lock is only unlocked and locked if
the TCP socket changes between the packets.  This solves the
performance issue.

To remember the softnet thread and packet offset, I use the cookie
field in the mbuf header.  Hopefully nothing else uses cookies in
this path.

Please test with various interfaces and pseudo devices.
Are there setups where my idea does not work?
How much is TCP stack getting faster?
Are there performance regressions?

Note that at most 4 CPUs are used for network.  You might want to
increase NET_TASKQ.  Also you need network interfaces that support
multiqueue.  ix, ixl, igc, bnxt, vio, vmx, ...

bluhm

Index: net/if.c
===================================================================
RCS file: /data/mirror/openbsd/cvs/src/sys/net/if.c,v
diff -u -p -r1.726 if.c
--- net/if.c	3 Feb 2025 08:58:52 -0000	1.726
+++ net/if.c	5 Feb 2025 12:41:07 -0000
@@ -241,19 +241,10 @@ struct rwlock if_tmplist_lock = RWLOCK_I
 struct mutex if_hooks_mtx = MUTEX_INITIALIZER(IPL_NONE);
 void	if_hooks_run(struct task_list *);
 
-int	ifq_congestion;
-
-int		 netisr;
-
-struct softnet {
-	char		 sn_name[16];
-	struct taskq	*sn_taskq;
-};
-
-#define	NET_TASKQ	4
+int		ifq_congestion;
+int		netisr;
 struct softnet	softnets[NET_TASKQ];
-
-struct task if_input_task_locked = TASK_INITIALIZER(if_netisr, NULL);
+struct task	if_input_task_locked = TASK_INITIALIZER(if_netisr, NULL);
 
 /*
  * Serialize socket operations to ensure no new sleeping points
@@ -979,9 +970,10 @@ if_output_local(struct ifnet *ifp, struc
 }
 
 void
-if_input_process(struct ifnet *ifp, struct mbuf_list *ml)
+if_input_process(struct ifnet *ifp, struct mbuf_list *ml, unsigned int idx)
 {
 	struct mbuf *m;
+	struct softnet *sn;
 
 	if (ml_empty(ml))
 		return;
@@ -996,9 +988,23 @@ if_input_process(struct ifnet *ifp, stru
 	 * read only or MP safe.  Usually they hold the exclusive net lock.
 	 */
 
+	sn = net_sn(idx);
+	ml_init(&sn->sn_tcp_ml);
+#ifdef INET6
+	ml_init(&sn->sn_tcp6_ml);
+#endif
+
 	NET_LOCK_SHARED();
-	while ((m = ml_dequeue(ml)) != NULL)
+
+	while ((m = ml_dequeue(ml)) != NULL) {
+		m->m_pkthdr.ph_cookie = sn;
 		(*ifp->if_input)(ifp, m);
+	}
+	tcp_input_mlist(&sn->sn_tcp_ml, AF_INET);
+#ifdef INET6
+	tcp_input_mlist(&sn->sn_tcp6_ml, AF_INET6);
+#endif
+
 	NET_UNLOCK_SHARED();
 }
 
@@ -3672,18 +3678,21 @@ unhandled_af(int af)
 	panic("unhandled af %d", af);
 }
 
-struct taskq *
-net_tq(unsigned int ifindex)
+struct softnet *
+net_sn(unsigned int ifindex)
 {
-	struct softnet *sn;
 	static int nettaskqs;
 
 	if (nettaskqs == 0)
 		nettaskqs = min(NET_TASKQ, ncpus);
 
-	sn = &softnets[ifindex % nettaskqs];
+	return (&softnets[ifindex % nettaskqs]);
+}
 
-	return (sn->sn_taskq);
+struct taskq *
+net_tq(unsigned int ifindex)
+{
+	return (net_sn(ifindex)->sn_taskq);
 }
 
 void
Index: net/if.h
===================================================================
RCS file: /data/mirror/openbsd/cvs/src/sys/net/if.h,v
diff -u -p -r1.217 if.h
--- net/if.h	9 Jun 2024 16:25:28 -0000	1.217
+++ net/if.h	5 Feb 2025 12:41:07 -0000
@@ -560,7 +560,10 @@ void	if_congestion(void);
 int	if_congested(void);
 __dead void	unhandled_af(int);
 int	if_setlladdr(struct ifnet *, const uint8_t *);
-struct taskq * net_tq(unsigned int);
+struct softnet *
+	net_sn(unsigned int);
+struct taskq *
+	net_tq(unsigned int);
 void	net_tq_barriers(const char *);
 
 #endif /* _KERNEL */
Index: net/if_var.h
===================================================================
RCS file: /data/mirror/openbsd/cvs/src/sys/net/if_var.h,v
diff -u -p -r1.135 if_var.h
--- net/if_var.h	24 Jan 2025 09:19:07 -0000	1.135
+++ net/if_var.h	5 Feb 2025 12:41:07 -0000
@@ -301,6 +301,15 @@ struct ifg_list {
 #define IF_WWAN_DEFAULT_PRIORITY	6
 #define IF_CARP_DEFAULT_PRIORITY	15
 
+struct softnet {
+	char			 sn_name[16];
+	struct taskq		*sn_taskq;
+	struct mbuf_list	 sn_tcp_ml;
+	struct mbuf_list	 sn_tcp6_ml;
+};
+#define NET_TASKQ	4
+extern struct softnet	softnets[NET_TASKQ];
+
 /*
  * Network stack input queues.
  */
@@ -331,7 +340,7 @@ int	if_enqueue(struct ifnet *, struct mb
 int	if_enqueue_ifq(struct ifnet *, struct mbuf *);
 void	if_input(struct ifnet *, struct mbuf_list *);
 void	if_vinput(struct ifnet *, struct mbuf *);
-void	if_input_process(struct ifnet *, struct mbuf_list *);
+void	if_input_process(struct ifnet *, struct mbuf_list *, unsigned int);
 int	if_input_local(struct ifnet *, struct mbuf *, sa_family_t);
 int	if_output_ml(struct ifnet *, struct mbuf_list *,
 	    struct sockaddr *, struct rtentry *);
Index: net/ifq.c
===================================================================
RCS file: /data/mirror/openbsd/cvs/src/sys/net/ifq.c,v
diff -u -p -r1.56 ifq.c
--- net/ifq.c	3 Feb 2025 08:58:52 -0000	1.56
+++ net/ifq.c	5 Feb 2025 12:41:07 -0000
@@ -862,7 +862,7 @@ ifiq_process(void *arg)
 	ml_init(&ifiq->ifiq_ml);
 	mtx_leave(&ifiq->ifiq_mtx);
 
-	if_input_process(ifiq->ifiq_if, &ml);
+	if_input_process(ifiq->ifiq_if, &ml, ifiq->ifiq_idx);
 }
 
 int
Index: netinet/in_proto.c
===================================================================
RCS file: /data/mirror/openbsd/cvs/src/sys/netinet/in_proto.c,v
diff -u -p -r1.121 in_proto.c
--- netinet/in_proto.c	5 Jan 2025 12:36:48 -0000	1.121
+++ netinet/in_proto.c	5 Feb 2025 12:40:30 -0000
@@ -197,7 +197,8 @@ const struct protosw inetsw[] = {
   .pr_type	= SOCK_STREAM,
   .pr_domain	= &inetdomain,
   .pr_protocol	= IPPROTO_TCP,
-  .pr_flags	= PR_CONNREQUIRED|PR_WANTRCVD|PR_ABRTACPTDIS|PR_SPLICE,
+  .pr_flags	= PR_CONNREQUIRED|PR_WANTRCVD|PR_ABRTACPTDIS|PR_SPLICE|
+		    PR_MPINPUT,
   .pr_input	= tcp_input,
   .pr_ctlinput	= tcp_ctlinput,
   .pr_ctloutput	= tcp_ctloutput,
Index: netinet/tcp_input.c
===================================================================
RCS file: /data/mirror/openbsd/cvs/src/sys/netinet/tcp_input.c,v
diff -u -p -r1.429 tcp_input.c
--- netinet/tcp_input.c	31 Jan 2025 11:48:18 -0000	1.429
+++ netinet/tcp_input.c	5 Feb 2025 12:41:07 -0000
@@ -100,9 +100,6 @@
 #include <net/pfvar.h>
 #endif
 
-int tcp_mss_adv(struct mbuf *, int);
-int tcp_flush_queue(struct tcpcb *);
-
 #ifdef INET6
 #include <netinet6/in6_var.h>
 #include <netinet6/nd6.h>
@@ -177,6 +174,9 @@ do { \
 	if_put(ifp); \
 } while (0)
 
+int	 tcp_input_solocked(struct mbuf **, int *, int, int, struct socket **);
+int	 tcp_mss_adv(struct mbuf *, int);
+int	 tcp_flush_queue(struct tcpcb *);
 void	 tcp_sack_partialack(struct tcpcb *, struct tcphdr *);
 void	 tcp_newreno_partialack(struct tcpcb *, struct tcphdr *);
 
@@ -347,12 +347,58 @@ tcp_flush_queue(struct tcpcb *tp)
 	return (flags);
 }
 
+int
+tcp_input(struct mbuf **mp, int *offp, int proto, int af)
+{
+	struct softnet *sn;
+
+	if ((*mp)->m_pkthdr.ph_cookie == NULL)
+		return tcp_input_solocked(mp, offp, proto, af, NULL);
+	sn = (*mp)->m_pkthdr.ph_cookie;
+	/* sanity check that noone else uses mbuf cookie */
+	KASSERT(sn >= softnets && sn < softnets + sizeof(softnets));
+	(*mp)->m_pkthdr.ph_cookie = (void *)(long)(*offp);
+	switch (af) {
+	case AF_INET:
+		ml_enqueue(&sn->sn_tcp_ml, *mp);
+		break;
+#ifdef INET6
+	case AF_INET6:
+		ml_enqueue(&sn->sn_tcp6_ml, *mp);
+		break;
+#endif
+	default:
+		m_freemp(mp);
+	}
+	*mp = NULL;
+	return IPPROTO_DONE;
+}
+
+void
+tcp_input_mlist(struct mbuf_list *ml, int af)
+{
+	struct socket *so = NULL;
+	struct mbuf *m;
+
+	while ((m = ml_dequeue(ml)) != NULL) {
+		int off, nxt;
+
+		off = (long)m->m_pkthdr.ph_cookie;
+		m->m_pkthdr.ph_cookie = NULL;
+		nxt = tcp_input_solocked(&m, &off, IPPROTO_TCP, af, &so);
+		KASSERT(nxt == IPPROTO_DONE);
+	}
+
+	in_pcbsounlock_rele(NULL, so);
+}
+
 /*
  * TCP input routine, follows pages 65-76 of the
  * protocol specification dated September, 1981 very closely.
  */
 int
-tcp_input(struct mbuf **mp, int *offp, int proto, int af)
+tcp_input_solocked(struct mbuf **mp, int *offp, int proto, int af,
+    struct socket ** solocked)
 {
 	struct mbuf *m = *mp;
 	int iphlen = *offp;
@@ -604,6 +650,25 @@ findpcb:
 		tcpstat_inc(tcps_noport);
 		goto dropwithreset_ratelim;
 	}
+	/*
+	 * Avoid needless lock and unlock operation when handling multiple
+	 * TCP packets from the same stream consecutively.
+	 */
+	if (solocked != NULL && *solocked != NULL &&
+	    sotoinpcb(*solocked) == inp) {
+		so = *solocked;
+		*solocked = NULL;
+	} else {
+		if (solocked != NULL && *solocked != NULL) {
+			in_pcbsounlock_rele(NULL, *solocked);
+			*solocked = NULL;
+		}
+		so = in_pcbsolock_ref(inp);
+	}
+	if (so == NULL) {
+		tcpstat_inc(tcps_noport);
+		goto dropwithreset_ratelim;
+	}
 
 	KASSERT(sotoinpcb(inp->inp_socket) == inp);
 	KASSERT(intotcpcb(inp) == NULL || intotcpcb(inp)->t_inpcb == inp);
@@ -636,7 +701,6 @@ findpcb:
 	else
 		tiwin = th->th_win;
 
-	so = inp->inp_socket;
 	if (so->so_options & (SO_DEBUG|SO_ACCEPTCONN)) {
 		union syn_cache_sa src;
 		union syn_cache_sa dst;
@@ -725,6 +789,7 @@ findpcb:
 					 * in use for the reply,
 					 * do not free it.
 					 */
+					so = NULL;
 					m = *mp = NULL;
 					goto drop;
 				} else {
@@ -732,13 +797,11 @@ findpcb:
 					 * We have created a
 					 * full-blown connection.
 					 */
-					tp = NULL;
 					in_pcbunref(inp);
 					inp = in_pcbref(sotoinpcb(so));
 					tp = intotcpcb(inp);
 					if (tp == NULL)
 						goto badsyn;	/*XXX*/
-
 				}
 				break;
 
@@ -844,6 +907,10 @@ findpcb:
 					tcpstat_inc(tcps_dropsyn);
 					goto drop;
 				}
+				if (solocked != NULL)
+					*solocked = so;
+				else
+					in_pcbsounlock_rele(inp, so);
 				in_pcbunref(inp);
 				return IPPROTO_DONE;
 			}
@@ -1019,6 +1086,10 @@ findpcb:
 				if (so->so_snd.sb_cc ||
 				    tp->t_flags & TF_NEEDOUTPUT)
 					(void) tcp_output(tp);
+				if (solocked != NULL)
+					*solocked = so;
+				else
+					in_pcbsounlock_rele(inp, so);
 				in_pcbunref(inp);
 				return IPPROTO_DONE;
 			}
@@ -1069,6 +1140,10 @@ findpcb:
 			tp->t_flags &= ~TF_BLOCKOUTPUT;
 			if (tp->t_flags & (TF_ACKNOW|TF_NEEDOUTPUT))
 				(void) tcp_output(tp);
+			if (solocked != NULL)
+				*solocked = so;
+			else
+				in_pcbsounlock_rele(inp, so);
 			in_pcbunref(inp);
 			return IPPROTO_DONE;
 		}
@@ -1262,6 +1337,8 @@ trimthenstep6:
 			    ((arc4random() & 0x7fffffff) | 0x8000);
 			reuse = &iss;
 			tp = tcp_close(tp);
+			in_pcbsounlock_rele(inp, so);
+			so = NULL;
 			in_pcbunref(inp);
 			inp = NULL;
 			goto findpcb;
@@ -2066,6 +2143,10 @@ dodata:							/* XXX */
 	 */
 	if (tp->t_flags & (TF_ACKNOW|TF_NEEDOUTPUT))
 		(void) tcp_output(tp);
+	if (solocked != NULL)
+		*solocked = so;
+	else
+		in_pcbsounlock_rele(inp, so);
 	in_pcbunref(inp);
 	return IPPROTO_DONE;
 
@@ -2095,6 +2176,10 @@ dropafterack:
 	m_freem(m);
 	tp->t_flags |= TF_ACKNOW;
 	(void) tcp_output(tp);
+	if (solocked != NULL)
+		*solocked = so;
+	else
+		in_pcbsounlock_rele(inp, so);
 	in_pcbunref(inp);
 	return IPPROTO_DONE;
 
@@ -2130,6 +2215,7 @@ dropwithreset:
 		    (tcp_seq)0, TH_RST|TH_ACK, m->m_pkthdr.ph_rtableid, now);
 	}
 	m_freem(m);
+	in_pcbsounlock_rele(inp, so);
 	in_pcbunref(inp);
 	return IPPROTO_DONE;
 
@@ -2141,6 +2227,7 @@ drop:
 		tcp_trace(TA_DROP, ostate, tp, otp, &saveti.caddr, 0, tlen);
 
 	m_freem(m);
+	in_pcbsounlock_rele(inp, so);
 	in_pcbunref(inp);
 	return IPPROTO_DONE;
 }
@@ -3546,6 +3633,7 @@ syn_cache_get(struct sockaddr *src, stru
 	sc = syn_cache_lookup(src, dst, &scp, inp->inp_rtableid);
 	if (sc == NULL) {
 		mtx_leave(&syn_cache_mtx);
+		in_pcbsounlock_rele(inp, so);
 		return (NULL);
 	}
 
@@ -3559,6 +3647,7 @@ syn_cache_get(struct sockaddr *src, stru
 		refcnt_take(&sc->sc_refcnt);
 		mtx_leave(&syn_cache_mtx);
 		(void) syn_cache_respond(sc, m, now, do_ecn);
+		in_pcbsounlock_rele(inp, so);
 		syn_cache_put(sc);
 		return ((struct socket *)(-1));
 	}
@@ -3699,7 +3788,7 @@ syn_cache_get(struct sockaddr *src, stru
 		tp->rcv_adv = tp->rcv_nxt + sc->sc_win;
 	tp->last_ack_sent = tp->rcv_nxt;
 
-	in_pcbsounlock_rele(inp, so);
+	in_pcbsounlock_rele(listeninp, listenso);
 	tcpstat_inc(tcps_sc_completed);
 	syn_cache_put(sc);
 	return (so);
@@ -3712,6 +3801,7 @@ abort:
 		tp = tcp_drop(tp, ECONNABORTED);	/* destroys socket */
 	m_freem(m);
 	in_pcbsounlock_rele(inp, so);
+	in_pcbsounlock_rele(listeninp, listenso);
 	syn_cache_put(sc);
 	tcpstat_inc(tcps_sc_aborted);
 	return ((struct socket *)(-1));
@@ -3815,7 +3905,7 @@ syn_cache_add(struct sockaddr *src, stru
 	struct syn_cache_head *scp;
 	struct mbuf *ipopts;
 
-	NET_ASSERT_LOCKED();
+	soassertlocked(so);
 
 	tp = sototcpcb(so);
 
@@ -3973,9 +4063,8 @@ syn_cache_add(struct sockaddr *src, stru
 	if (syn_cache_respond(sc, m, now, do_ecn) == 0) {
 		mtx_enter(&syn_cache_mtx);
 		/*
-		 * XXXSMP Currently exclusive netlock prevents another insert
-		 * after our syn_cache_lookup() and before syn_cache_insert().
-		 * Double insert should be handled and not rely on netlock.
+		 * Socket lock prevents another insert after our
+		 * syn_cache_lookup() and before syn_cache_insert().
 		 */
 		syn_cache_insert(sc, tp);
 		mtx_leave(&syn_cache_mtx);
Index: netinet/tcp_var.h
===================================================================
RCS file: /data/mirror/openbsd/cvs/src/sys/netinet/tcp_var.h,v
diff -u -p -r1.185 tcp_var.h
--- netinet/tcp_var.h	16 Jan 2025 11:59:20 -0000	1.185
+++ netinet/tcp_var.h	5 Feb 2025 12:41:07 -0000
@@ -718,6 +718,7 @@ int	 tcp_dooptions(struct tcpcb *, u_cha
 		struct mbuf *, int, struct tcp_opt_info *, u_int, uint64_t);
 void	 tcp_init(void);
 int	 tcp_input(struct mbuf **, int *, int, int);
+void	 tcp_input_mlist(struct mbuf_list *, int);
 int	 tcp_mss(struct tcpcb *, int);
 void	 tcp_mss_update(struct tcpcb *);
 u_int	 tcp_hdrsz(struct tcpcb *);
Index: netinet6/in6_proto.c
===================================================================
RCS file: /data/mirror/openbsd/cvs/src/sys/netinet6/in6_proto.c,v
diff -u -p -r1.124 in6_proto.c
--- netinet6/in6_proto.c	5 Jan 2025 12:36:48 -0000	1.124
+++ netinet6/in6_proto.c	5 Feb 2025 12:40:30 -0000
@@ -147,7 +147,8 @@ const struct protosw inet6sw[] = {
   .pr_type	= SOCK_STREAM,
   .pr_domain	= &inet6domain,
   .pr_protocol	= IPPROTO_TCP,
-  .pr_flags	= PR_CONNREQUIRED|PR_WANTRCVD|PR_ABRTACPTDIS|PR_SPLICE,
+  .pr_flags	= PR_CONNREQUIRED|PR_WANTRCVD|PR_ABRTACPTDIS|PR_SPLICE|
+		    PR_MPINPUT,
   .pr_input	= tcp_input,
   .pr_ctlinput	= tcp6_ctlinput,
   .pr_ctloutput	= tcp_ctloutput,