From: Alexander Bluhm Subject: parallel TCP input To: tech@openbsd.org Date: Wed, 5 Feb 2025 19:25:11 +0100 Hi, To run TCP input in parallel, each packet needs the socket lock. As lock and unlock per packet kills throughput for single stream TCP, diff below keeps the socket locked when consecutive packets belong to the same socket. I keep a list of TCP packets in struct softnet. So each softnet thread has its own list to accumulate packets. After processing all packets from an interface input queue, TCP input is called for each TCP packet. The socket lock is only unlocked and locked if the TCP socket changes between the packets. This solves the performance issue. To remember the softnet thread and packet offset, I use the cookie field in the mbuf header. Hopefully nothing else uses cookies in this path. Please test with various interfaces and pseudo devices. Are there setups where my idea does not work? How much is TCP stack getting faster? Are there performance regressions? Note that at most 4 CPUs are used for network. You might want to increase NET_TASKQ. Also you need network interfaces that support multiqueue. ix, ixl, igc, bnxt, vio, vmx, ... bluhm Index: net/if.c =================================================================== RCS file: /data/mirror/openbsd/cvs/src/sys/net/if.c,v diff -u -p -r1.726 if.c --- net/if.c 3 Feb 2025 08:58:52 -0000 1.726 +++ net/if.c 5 Feb 2025 12:41:07 -0000 @@ -241,19 +241,10 @@ struct rwlock if_tmplist_lock = RWLOCK_I struct mutex if_hooks_mtx = MUTEX_INITIALIZER(IPL_NONE); void if_hooks_run(struct task_list *); -int ifq_congestion; - -int netisr; - -struct softnet { - char sn_name[16]; - struct taskq *sn_taskq; -}; - -#define NET_TASKQ 4 +int ifq_congestion; +int netisr; struct softnet softnets[NET_TASKQ]; - -struct task if_input_task_locked = TASK_INITIALIZER(if_netisr, NULL); +struct task if_input_task_locked = TASK_INITIALIZER(if_netisr, NULL); /* * Serialize socket operations to ensure no new sleeping points @@ -979,9 +970,10 @@ if_output_local(struct ifnet *ifp, struc } void -if_input_process(struct ifnet *ifp, struct mbuf_list *ml) +if_input_process(struct ifnet *ifp, struct mbuf_list *ml, unsigned int idx) { struct mbuf *m; + struct softnet *sn; if (ml_empty(ml)) return; @@ -996,9 +988,23 @@ if_input_process(struct ifnet *ifp, stru * read only or MP safe. Usually they hold the exclusive net lock. */ + sn = net_sn(idx); + ml_init(&sn->sn_tcp_ml); +#ifdef INET6 + ml_init(&sn->sn_tcp6_ml); +#endif + NET_LOCK_SHARED(); - while ((m = ml_dequeue(ml)) != NULL) + + while ((m = ml_dequeue(ml)) != NULL) { + m->m_pkthdr.ph_cookie = sn; (*ifp->if_input)(ifp, m); + } + tcp_input_mlist(&sn->sn_tcp_ml, AF_INET); +#ifdef INET6 + tcp_input_mlist(&sn->sn_tcp6_ml, AF_INET6); +#endif + NET_UNLOCK_SHARED(); } @@ -3672,18 +3678,21 @@ unhandled_af(int af) panic("unhandled af %d", af); } -struct taskq * -net_tq(unsigned int ifindex) +struct softnet * +net_sn(unsigned int ifindex) { - struct softnet *sn; static int nettaskqs; if (nettaskqs == 0) nettaskqs = min(NET_TASKQ, ncpus); - sn = &softnets[ifindex % nettaskqs]; + return (&softnets[ifindex % nettaskqs]); +} - return (sn->sn_taskq); +struct taskq * +net_tq(unsigned int ifindex) +{ + return (net_sn(ifindex)->sn_taskq); } void Index: net/if.h =================================================================== RCS file: /data/mirror/openbsd/cvs/src/sys/net/if.h,v diff -u -p -r1.217 if.h --- net/if.h 9 Jun 2024 16:25:28 -0000 1.217 +++ net/if.h 5 Feb 2025 12:41:07 -0000 @@ -560,7 +560,10 @@ void if_congestion(void); int if_congested(void); __dead void unhandled_af(int); int if_setlladdr(struct ifnet *, const uint8_t *); -struct taskq * net_tq(unsigned int); +struct softnet * + net_sn(unsigned int); +struct taskq * + net_tq(unsigned int); void net_tq_barriers(const char *); #endif /* _KERNEL */ Index: net/if_var.h =================================================================== RCS file: /data/mirror/openbsd/cvs/src/sys/net/if_var.h,v diff -u -p -r1.135 if_var.h --- net/if_var.h 24 Jan 2025 09:19:07 -0000 1.135 +++ net/if_var.h 5 Feb 2025 12:41:07 -0000 @@ -301,6 +301,15 @@ struct ifg_list { #define IF_WWAN_DEFAULT_PRIORITY 6 #define IF_CARP_DEFAULT_PRIORITY 15 +struct softnet { + char sn_name[16]; + struct taskq *sn_taskq; + struct mbuf_list sn_tcp_ml; + struct mbuf_list sn_tcp6_ml; +}; +#define NET_TASKQ 4 +extern struct softnet softnets[NET_TASKQ]; + /* * Network stack input queues. */ @@ -331,7 +340,7 @@ int if_enqueue(struct ifnet *, struct mb int if_enqueue_ifq(struct ifnet *, struct mbuf *); void if_input(struct ifnet *, struct mbuf_list *); void if_vinput(struct ifnet *, struct mbuf *); -void if_input_process(struct ifnet *, struct mbuf_list *); +void if_input_process(struct ifnet *, struct mbuf_list *, unsigned int); int if_input_local(struct ifnet *, struct mbuf *, sa_family_t); int if_output_ml(struct ifnet *, struct mbuf_list *, struct sockaddr *, struct rtentry *); Index: net/ifq.c =================================================================== RCS file: /data/mirror/openbsd/cvs/src/sys/net/ifq.c,v diff -u -p -r1.56 ifq.c --- net/ifq.c 3 Feb 2025 08:58:52 -0000 1.56 +++ net/ifq.c 5 Feb 2025 12:41:07 -0000 @@ -862,7 +862,7 @@ ifiq_process(void *arg) ml_init(&ifiq->ifiq_ml); mtx_leave(&ifiq->ifiq_mtx); - if_input_process(ifiq->ifiq_if, &ml); + if_input_process(ifiq->ifiq_if, &ml, ifiq->ifiq_idx); } int Index: netinet/in_proto.c =================================================================== RCS file: /data/mirror/openbsd/cvs/src/sys/netinet/in_proto.c,v diff -u -p -r1.121 in_proto.c --- netinet/in_proto.c 5 Jan 2025 12:36:48 -0000 1.121 +++ netinet/in_proto.c 5 Feb 2025 12:40:30 -0000 @@ -197,7 +197,8 @@ const struct protosw inetsw[] = { .pr_type = SOCK_STREAM, .pr_domain = &inetdomain, .pr_protocol = IPPROTO_TCP, - .pr_flags = PR_CONNREQUIRED|PR_WANTRCVD|PR_ABRTACPTDIS|PR_SPLICE, + .pr_flags = PR_CONNREQUIRED|PR_WANTRCVD|PR_ABRTACPTDIS|PR_SPLICE| + PR_MPINPUT, .pr_input = tcp_input, .pr_ctlinput = tcp_ctlinput, .pr_ctloutput = tcp_ctloutput, Index: netinet/tcp_input.c =================================================================== RCS file: /data/mirror/openbsd/cvs/src/sys/netinet/tcp_input.c,v diff -u -p -r1.429 tcp_input.c --- netinet/tcp_input.c 31 Jan 2025 11:48:18 -0000 1.429 +++ netinet/tcp_input.c 5 Feb 2025 12:41:07 -0000 @@ -100,9 +100,6 @@ #include #endif -int tcp_mss_adv(struct mbuf *, int); -int tcp_flush_queue(struct tcpcb *); - #ifdef INET6 #include #include @@ -177,6 +174,9 @@ do { \ if_put(ifp); \ } while (0) +int tcp_input_solocked(struct mbuf **, int *, int, int, struct socket **); +int tcp_mss_adv(struct mbuf *, int); +int tcp_flush_queue(struct tcpcb *); void tcp_sack_partialack(struct tcpcb *, struct tcphdr *); void tcp_newreno_partialack(struct tcpcb *, struct tcphdr *); @@ -347,12 +347,58 @@ tcp_flush_queue(struct tcpcb *tp) return (flags); } +int +tcp_input(struct mbuf **mp, int *offp, int proto, int af) +{ + struct softnet *sn; + + if ((*mp)->m_pkthdr.ph_cookie == NULL) + return tcp_input_solocked(mp, offp, proto, af, NULL); + sn = (*mp)->m_pkthdr.ph_cookie; + /* sanity check that noone else uses mbuf cookie */ + KASSERT(sn >= softnets && sn < softnets + sizeof(softnets)); + (*mp)->m_pkthdr.ph_cookie = (void *)(long)(*offp); + switch (af) { + case AF_INET: + ml_enqueue(&sn->sn_tcp_ml, *mp); + break; +#ifdef INET6 + case AF_INET6: + ml_enqueue(&sn->sn_tcp6_ml, *mp); + break; +#endif + default: + m_freemp(mp); + } + *mp = NULL; + return IPPROTO_DONE; +} + +void +tcp_input_mlist(struct mbuf_list *ml, int af) +{ + struct socket *so = NULL; + struct mbuf *m; + + while ((m = ml_dequeue(ml)) != NULL) { + int off, nxt; + + off = (long)m->m_pkthdr.ph_cookie; + m->m_pkthdr.ph_cookie = NULL; + nxt = tcp_input_solocked(&m, &off, IPPROTO_TCP, af, &so); + KASSERT(nxt == IPPROTO_DONE); + } + + in_pcbsounlock_rele(NULL, so); +} + /* * TCP input routine, follows pages 65-76 of the * protocol specification dated September, 1981 very closely. */ int -tcp_input(struct mbuf **mp, int *offp, int proto, int af) +tcp_input_solocked(struct mbuf **mp, int *offp, int proto, int af, + struct socket ** solocked) { struct mbuf *m = *mp; int iphlen = *offp; @@ -604,6 +650,25 @@ findpcb: tcpstat_inc(tcps_noport); goto dropwithreset_ratelim; } + /* + * Avoid needless lock and unlock operation when handling multiple + * TCP packets from the same stream consecutively. + */ + if (solocked != NULL && *solocked != NULL && + sotoinpcb(*solocked) == inp) { + so = *solocked; + *solocked = NULL; + } else { + if (solocked != NULL && *solocked != NULL) { + in_pcbsounlock_rele(NULL, *solocked); + *solocked = NULL; + } + so = in_pcbsolock_ref(inp); + } + if (so == NULL) { + tcpstat_inc(tcps_noport); + goto dropwithreset_ratelim; + } KASSERT(sotoinpcb(inp->inp_socket) == inp); KASSERT(intotcpcb(inp) == NULL || intotcpcb(inp)->t_inpcb == inp); @@ -636,7 +701,6 @@ findpcb: else tiwin = th->th_win; - so = inp->inp_socket; if (so->so_options & (SO_DEBUG|SO_ACCEPTCONN)) { union syn_cache_sa src; union syn_cache_sa dst; @@ -725,6 +789,7 @@ findpcb: * in use for the reply, * do not free it. */ + so = NULL; m = *mp = NULL; goto drop; } else { @@ -732,13 +797,11 @@ findpcb: * We have created a * full-blown connection. */ - tp = NULL; in_pcbunref(inp); inp = in_pcbref(sotoinpcb(so)); tp = intotcpcb(inp); if (tp == NULL) goto badsyn; /*XXX*/ - } break; @@ -844,6 +907,10 @@ findpcb: tcpstat_inc(tcps_dropsyn); goto drop; } + if (solocked != NULL) + *solocked = so; + else + in_pcbsounlock_rele(inp, so); in_pcbunref(inp); return IPPROTO_DONE; } @@ -1019,6 +1086,10 @@ findpcb: if (so->so_snd.sb_cc || tp->t_flags & TF_NEEDOUTPUT) (void) tcp_output(tp); + if (solocked != NULL) + *solocked = so; + else + in_pcbsounlock_rele(inp, so); in_pcbunref(inp); return IPPROTO_DONE; } @@ -1069,6 +1140,10 @@ findpcb: tp->t_flags &= ~TF_BLOCKOUTPUT; if (tp->t_flags & (TF_ACKNOW|TF_NEEDOUTPUT)) (void) tcp_output(tp); + if (solocked != NULL) + *solocked = so; + else + in_pcbsounlock_rele(inp, so); in_pcbunref(inp); return IPPROTO_DONE; } @@ -1262,6 +1337,8 @@ trimthenstep6: ((arc4random() & 0x7fffffff) | 0x8000); reuse = &iss; tp = tcp_close(tp); + in_pcbsounlock_rele(inp, so); + so = NULL; in_pcbunref(inp); inp = NULL; goto findpcb; @@ -2066,6 +2143,10 @@ dodata: /* XXX */ */ if (tp->t_flags & (TF_ACKNOW|TF_NEEDOUTPUT)) (void) tcp_output(tp); + if (solocked != NULL) + *solocked = so; + else + in_pcbsounlock_rele(inp, so); in_pcbunref(inp); return IPPROTO_DONE; @@ -2095,6 +2176,10 @@ dropafterack: m_freem(m); tp->t_flags |= TF_ACKNOW; (void) tcp_output(tp); + if (solocked != NULL) + *solocked = so; + else + in_pcbsounlock_rele(inp, so); in_pcbunref(inp); return IPPROTO_DONE; @@ -2130,6 +2215,7 @@ dropwithreset: (tcp_seq)0, TH_RST|TH_ACK, m->m_pkthdr.ph_rtableid, now); } m_freem(m); + in_pcbsounlock_rele(inp, so); in_pcbunref(inp); return IPPROTO_DONE; @@ -2141,6 +2227,7 @@ drop: tcp_trace(TA_DROP, ostate, tp, otp, &saveti.caddr, 0, tlen); m_freem(m); + in_pcbsounlock_rele(inp, so); in_pcbunref(inp); return IPPROTO_DONE; } @@ -3546,6 +3633,7 @@ syn_cache_get(struct sockaddr *src, stru sc = syn_cache_lookup(src, dst, &scp, inp->inp_rtableid); if (sc == NULL) { mtx_leave(&syn_cache_mtx); + in_pcbsounlock_rele(inp, so); return (NULL); } @@ -3559,6 +3647,7 @@ syn_cache_get(struct sockaddr *src, stru refcnt_take(&sc->sc_refcnt); mtx_leave(&syn_cache_mtx); (void) syn_cache_respond(sc, m, now, do_ecn); + in_pcbsounlock_rele(inp, so); syn_cache_put(sc); return ((struct socket *)(-1)); } @@ -3699,7 +3788,7 @@ syn_cache_get(struct sockaddr *src, stru tp->rcv_adv = tp->rcv_nxt + sc->sc_win; tp->last_ack_sent = tp->rcv_nxt; - in_pcbsounlock_rele(inp, so); + in_pcbsounlock_rele(listeninp, listenso); tcpstat_inc(tcps_sc_completed); syn_cache_put(sc); return (so); @@ -3712,6 +3801,7 @@ abort: tp = tcp_drop(tp, ECONNABORTED); /* destroys socket */ m_freem(m); in_pcbsounlock_rele(inp, so); + in_pcbsounlock_rele(listeninp, listenso); syn_cache_put(sc); tcpstat_inc(tcps_sc_aborted); return ((struct socket *)(-1)); @@ -3815,7 +3905,7 @@ syn_cache_add(struct sockaddr *src, stru struct syn_cache_head *scp; struct mbuf *ipopts; - NET_ASSERT_LOCKED(); + soassertlocked(so); tp = sototcpcb(so); @@ -3973,9 +4063,8 @@ syn_cache_add(struct sockaddr *src, stru if (syn_cache_respond(sc, m, now, do_ecn) == 0) { mtx_enter(&syn_cache_mtx); /* - * XXXSMP Currently exclusive netlock prevents another insert - * after our syn_cache_lookup() and before syn_cache_insert(). - * Double insert should be handled and not rely on netlock. + * Socket lock prevents another insert after our + * syn_cache_lookup() and before syn_cache_insert(). */ syn_cache_insert(sc, tp); mtx_leave(&syn_cache_mtx); Index: netinet/tcp_var.h =================================================================== RCS file: /data/mirror/openbsd/cvs/src/sys/netinet/tcp_var.h,v diff -u -p -r1.185 tcp_var.h --- netinet/tcp_var.h 16 Jan 2025 11:59:20 -0000 1.185 +++ netinet/tcp_var.h 5 Feb 2025 12:41:07 -0000 @@ -718,6 +718,7 @@ int tcp_dooptions(struct tcpcb *, u_cha struct mbuf *, int, struct tcp_opt_info *, u_int, uint64_t); void tcp_init(void); int tcp_input(struct mbuf **, int *, int, int); +void tcp_input_mlist(struct mbuf_list *, int); int tcp_mss(struct tcpcb *, int); void tcp_mss_update(struct tcpcb *); u_int tcp_hdrsz(struct tcpcb *); Index: netinet6/in6_proto.c =================================================================== RCS file: /data/mirror/openbsd/cvs/src/sys/netinet6/in6_proto.c,v diff -u -p -r1.124 in6_proto.c --- netinet6/in6_proto.c 5 Jan 2025 12:36:48 -0000 1.124 +++ netinet6/in6_proto.c 5 Feb 2025 12:40:30 -0000 @@ -147,7 +147,8 @@ const struct protosw inet6sw[] = { .pr_type = SOCK_STREAM, .pr_domain = &inet6domain, .pr_protocol = IPPROTO_TCP, - .pr_flags = PR_CONNREQUIRED|PR_WANTRCVD|PR_ABRTACPTDIS|PR_SPLICE, + .pr_flags = PR_CONNREQUIRED|PR_WANTRCVD|PR_ABRTACPTDIS|PR_SPLICE| + PR_MPINPUT, .pr_input = tcp_input, .pr_ctlinput = tcp6_ctlinput, .pr_ctloutput = tcp_ctloutput,