Download raw body.
TCP parallel input preview
On Wed, Jan 15, 2025 at 04:49:37PM +0100, Alexander Bluhm wrote:
> This is my idea to unlock TCP input and run it in parallel. On my
> 4 CPU test machine I gain 100% percent TCP throughput when running
> 10 TCP streams in parallel. Unfortunately single TCP performance
> drops by 25%.
I found a way to avoid degrading TCP single stream performance.
Idea is to have additional storage per softnet thread. There
ip_deliver() queues TCP packets. All TCP packets that if_input_process()
has dequeued together, will be together in the TCP queue per softnet.
After IP deliver, if_input_process() runs TCP input for the mbuf
list. This already avoids a context switch beween IP and TCP
processing. Additionally tcp_input_mlist() remembers the socket
that was locked. Only if the TCP stream changes, the old socket
is unlocked and the new one locked. For single stream this unlock/lock
never happens, and also for multistream traffic distribution by the
multiqueue network hardware keeps the unlock/lock rate low. Remember,
the TCP mbuf list is per softnet thread.
I had to rearrange the mbuf header to store the softnet index. Note
that interface index is maximum 16 bit. So we can get another 8
bit field without increasing the header size.
With this diff I see up to 9% improvement for singe stream and 240%
for multi stream throughput on a 4 CPU machine.
bluhm
Index: kern/uipc_socket2.c
===================================================================
RCS file: /data/mirror/openbsd/cvs/src/sys/kern/uipc_socket2.c,v
diff -u -p -r1.166 uipc_socket2.c
--- kern/uipc_socket2.c 18 Jan 2025 10:44:52 -0000 1.166
+++ kern/uipc_socket2.c 20 Jan 2025 10:44:24 -0000
@@ -180,6 +180,7 @@ struct socket *
sonewconn(struct socket *head, int connstatus, int wait)
{
struct socket *so;
+ int persocket = solock_persocket(head);
int soqueue = connstatus ? 1 : 0;
soassertlocked(head);
@@ -239,7 +240,8 @@ sonewconn(struct socket *head, int conns
wakeup(&head->so_timeo);
}
- sounlock_nonet(so);
+ if (persocket)
+ sounlock_nonet(so);
return (so);
Index: net/if.c
===================================================================
RCS file: /data/mirror/openbsd/cvs/src/sys/net/if.c,v
diff -u -p -r1.722 if.c
--- net/if.c 16 Jan 2025 17:20:23 -0000 1.722
+++ net/if.c 20 Jan 2025 10:44:24 -0000
@@ -244,11 +244,6 @@ int ifq_congestion;
int netisr;
-struct softnet {
- char sn_name[16];
- struct taskq *sn_taskq;
-};
-
#define NET_TASKQ 4
struct softnet softnets[NET_TASKQ];
@@ -978,9 +973,10 @@ if_output_local(struct ifnet *ifp, struc
}
void
-if_input_process(struct ifnet *ifp, struct mbuf_list *ml)
+if_input_process(struct ifnet *ifp, struct mbuf_list *ml, u_int idx)
{
struct mbuf *m;
+ struct softnet *sn;
if (ml_empty(ml))
return;
@@ -995,9 +991,18 @@ if_input_process(struct ifnet *ifp, stru
* read only or MP safe. Usually they hold the exclusive net lock.
*/
+ sn = &softnets[idx];
+ ml_init(&sn->sn_tcp_ml);
+
NET_LOCK_SHARED();
- while ((m = ml_dequeue(ml)) != NULL)
+
+ while ((m = ml_dequeue(ml)) != NULL) {
+ /* add 1 to index, 0 means not set */
+ m->m_pkthdr.ph_softidx = idx + 1;
(*ifp->if_input)(ifp, m);
+ }
+ tcp_input_mlist(&sn->sn_tcp_ml);
+
NET_UNLOCK_SHARED();
}
Index: net/if_var.h
===================================================================
RCS file: /data/mirror/openbsd/cvs/src/sys/net/if_var.h,v
diff -u -p -r1.133 if_var.h
--- net/if_var.h 12 Oct 2024 23:18:10 -0000 1.133
+++ net/if_var.h 20 Jan 2025 10:44:00 -0000
@@ -292,6 +292,14 @@ struct ifg_list {
#define IF_WWAN_DEFAULT_PRIORITY 6
#define IF_CARP_DEFAULT_PRIORITY 15
+struct softnet {
+ char sn_name[16];
+ struct taskq *sn_taskq;
+ struct mbuf_list sn_tcp_ml;
+};
+
+extern struct softnet softnets[];
+
/*
* Network stack input queues.
*/
@@ -322,7 +330,7 @@ int if_enqueue(struct ifnet *, struct mb
int if_enqueue_ifq(struct ifnet *, struct mbuf *);
void if_input(struct ifnet *, struct mbuf_list *);
void if_vinput(struct ifnet *, struct mbuf *);
-void if_input_process(struct ifnet *, struct mbuf_list *);
+void if_input_process(struct ifnet *, struct mbuf_list *, u_int);
int if_input_local(struct ifnet *, struct mbuf *, sa_family_t);
int if_output_ml(struct ifnet *, struct mbuf_list *,
struct sockaddr *, struct rtentry *);
Index: net/ifq.c
===================================================================
RCS file: /data/mirror/openbsd/cvs/src/sys/net/ifq.c,v
diff -u -p -r1.55 ifq.c
--- net/ifq.c 20 Nov 2024 02:18:45 -0000 1.55
+++ net/ifq.c 20 Jan 2025 10:44:00 -0000
@@ -849,7 +849,7 @@ ifiq_process(void *arg)
ml_init(&ifiq->ifiq_ml);
mtx_leave(&ifiq->ifiq_mtx);
- if_input_process(ifiq->ifiq_if, &ml);
+ if_input_process(ifiq->ifiq_if, &ml, ifiq->ifiq_idx);
}
int
Index: netinet/in_pcb.c
===================================================================
RCS file: /data/mirror/openbsd/cvs/src/sys/netinet/in_pcb.c,v
diff -u -p -r1.310 in_pcb.c
--- netinet/in_pcb.c 9 Jan 2025 16:47:24 -0000 1.310
+++ netinet/in_pcb.c 20 Jan 2025 10:44:24 -0000
@@ -643,7 +643,8 @@ in_pcbsounlock_rele(struct inpcb *inp, s
{
if (so == NULL)
return;
- KASSERT(inp->inp_socket == NULL || inp->inp_socket == so);
+ KASSERT(inp == NULL || inp->inp_socket == NULL ||
+ inp->inp_socket == so);
rw_exit_write(&so->so_lock);
sorele(so);
}
Index: netinet/in_proto.c
===================================================================
RCS file: /data/mirror/openbsd/cvs/src/sys/netinet/in_proto.c,v
diff -u -p -r1.121 in_proto.c
--- netinet/in_proto.c 5 Jan 2025 12:36:48 -0000 1.121
+++ netinet/in_proto.c 20 Jan 2025 10:44:24 -0000
@@ -197,7 +197,8 @@ const struct protosw inetsw[] = {
.pr_type = SOCK_STREAM,
.pr_domain = &inetdomain,
.pr_protocol = IPPROTO_TCP,
- .pr_flags = PR_CONNREQUIRED|PR_WANTRCVD|PR_ABRTACPTDIS|PR_SPLICE,
+ .pr_flags = PR_CONNREQUIRED|PR_WANTRCVD|PR_ABRTACPTDIS|PR_SPLICE|
+ PR_MPINPUT,
.pr_input = tcp_input,
.pr_ctlinput = tcp_ctlinput,
.pr_ctloutput = tcp_ctloutput,
Index: netinet/tcp_input.c
===================================================================
RCS file: /data/mirror/openbsd/cvs/src/sys/netinet/tcp_input.c,v
diff -u -p -r1.423 tcp_input.c
--- netinet/tcp_input.c 16 Jan 2025 11:59:20 -0000 1.423
+++ netinet/tcp_input.c 20 Jan 2025 10:44:24 -0000
@@ -100,9 +100,6 @@
#include <net/pfvar.h>
#endif
-int tcp_mss_adv(struct mbuf *, int);
-int tcp_flush_queue(struct tcpcb *);
-
#ifdef INET6
#include <netinet6/in6_var.h>
#include <netinet6/nd6.h>
@@ -177,6 +174,9 @@ do { \
if_put(ifp); \
} while (0)
+int tcp_input_solocked(struct mbuf **, int *, int, int, struct socket **);
+int tcp_mss_adv(struct mbuf *, int);
+int tcp_flush_queue(struct tcpcb *);
void tcp_sack_partialack(struct tcpcb *, struct tcphdr *);
void tcp_newreno_partialack(struct tcpcb *, struct tcphdr *);
@@ -347,12 +347,49 @@ tcp_flush_queue(struct tcpcb *tp)
return (flags);
}
+int
+tcp_input(struct mbuf **mp, int *offp, int proto, int af)
+{
+ struct softnet *sn;
+
+ if ((*mp)->m_pkthdr.ph_softidx == 0)
+ return tcp_input_solocked(mp, offp, proto, af, NULL);
+ sn = &softnets[(*mp)->m_pkthdr.ph_softidx - 1];
+ (*mp)->m_pkthdr.ph_softidx = 0;
+
+ (*mp)->m_pkthdr.ph_cookie = (void *)(long)(*offp);
+ (*mp)->m_pkthdr.ph_family = af;
+ ml_enqueue(&sn->sn_tcp_ml, *mp);
+ *mp = NULL;
+ return IPPROTO_DONE;
+}
+
+void
+tcp_input_mlist(struct mbuf_list *ml)
+{
+ struct socket *so = NULL;
+ struct mbuf *m;
+
+ while ((m = ml_dequeue(ml)) != NULL) {
+ int off, nxt, af;
+
+ off = (long)m->m_pkthdr.ph_cookie;
+ m->m_pkthdr.ph_cookie = NULL;
+ af = m->m_pkthdr.ph_family;
+ nxt = tcp_input_solocked(&m, &off, IPPROTO_TCP, af, &so);
+ KASSERT(nxt == IPPROTO_DONE);
+ }
+
+ in_pcbsounlock_rele(NULL, so);
+}
+
/*
* TCP input routine, follows pages 65-76 of the
* protocol specification dated September, 1981 very closely.
*/
int
-tcp_input(struct mbuf **mp, int *offp, int proto, int af)
+tcp_input_solocked(struct mbuf **mp, int *offp, int proto, int af,
+ struct socket ** solocked)
{
struct mbuf *m = *mp;
int iphlen = *offp;
@@ -604,6 +641,24 @@ findpcb:
tcpstat_inc(tcps_noport);
goto dropwithreset_ratelim;
}
+ /*
+ * Avoid needless lock and unlock operation when handling multiple
+ * TCP packets from the same stream consecutively.
+ */
+ if (solocked != NULL && inp->inp_socket == *solocked) {
+ so = *solocked;
+ *solocked = NULL;
+ } else {
+ if (solocked != NULL) {
+ in_pcbsounlock_rele(NULL, *solocked);
+ *solocked = NULL;
+ }
+ so = in_pcbsolock_ref(inp);
+ }
+ if (so == NULL) {
+ tcpstat_inc(tcps_noport);
+ goto dropwithreset_ratelim;
+ }
KASSERT(sotoinpcb(inp->inp_socket) == inp);
KASSERT(intotcpcb(inp) == NULL || intotcpcb(inp)->t_inpcb == inp);
@@ -636,7 +691,6 @@ findpcb:
else
tiwin = th->th_win;
- so = inp->inp_socket;
if (so->so_options & (SO_DEBUG|SO_ACCEPTCONN)) {
union syn_cache_sa src;
union syn_cache_sa dst;
@@ -725,6 +779,7 @@ findpcb:
* in use for the reply,
* do not free it.
*/
+ so = NULL;
m = *mp = NULL;
goto drop;
} else {
@@ -732,13 +787,11 @@ findpcb:
* We have created a
* full-blown connection.
*/
- tp = NULL;
in_pcbunref(inp);
inp = in_pcbref(sotoinpcb(so));
tp = intotcpcb(inp);
if (tp == NULL)
goto badsyn; /*XXX*/
-
}
break;
@@ -844,6 +897,10 @@ findpcb:
tcpstat_inc(tcps_dropsyn);
goto drop;
}
+ if (solocked != NULL)
+ *solocked = so;
+ else
+ in_pcbsounlock_rele(inp, so);
in_pcbunref(inp);
return IPPROTO_DONE;
}
@@ -1019,6 +1076,10 @@ findpcb:
if (so->so_snd.sb_cc ||
tp->t_flags & TF_NEEDOUTPUT)
(void) tcp_output(tp);
+ if (solocked != NULL)
+ *solocked = so;
+ else
+ in_pcbsounlock_rele(inp, so);
in_pcbunref(inp);
return IPPROTO_DONE;
}
@@ -1069,6 +1130,10 @@ findpcb:
tp->t_flags &= ~TF_BLOCKOUTPUT;
if (tp->t_flags & (TF_ACKNOW|TF_NEEDOUTPUT))
(void) tcp_output(tp);
+ if (solocked != NULL)
+ *solocked = so;
+ else
+ in_pcbsounlock_rele(inp, so);
in_pcbunref(inp);
return IPPROTO_DONE;
}
@@ -1262,6 +1327,8 @@ trimthenstep6:
((arc4random() & 0x7fffffff) | 0x8000);
reuse = &iss;
tp = tcp_close(tp);
+ in_pcbsounlock_rele(inp, so);
+ so = NULL;
in_pcbunref(inp);
inp = NULL;
goto findpcb;
@@ -2066,6 +2133,10 @@ dodata: /* XXX */
*/
if (tp->t_flags & (TF_ACKNOW|TF_NEEDOUTPUT))
(void) tcp_output(tp);
+ if (solocked != NULL)
+ *solocked = so;
+ else
+ in_pcbsounlock_rele(inp, so);
in_pcbunref(inp);
return IPPROTO_DONE;
@@ -2095,6 +2166,10 @@ dropafterack:
m_freem(m);
tp->t_flags |= TF_ACKNOW;
(void) tcp_output(tp);
+ if (solocked != NULL)
+ *solocked = so;
+ else
+ in_pcbsounlock_rele(inp, so);
in_pcbunref(inp);
return IPPROTO_DONE;
@@ -2130,6 +2205,7 @@ dropwithreset:
(tcp_seq)0, TH_RST|TH_ACK, m->m_pkthdr.ph_rtableid, now);
}
m_freem(m);
+ in_pcbsounlock_rele(inp, so);
in_pcbunref(inp);
return IPPROTO_DONE;
@@ -2141,6 +2217,7 @@ drop:
tcp_trace(TA_DROP, ostate, tp, otp, &saveti.caddr, 0, tlen);
m_freem(m);
+ in_pcbsounlock_rele(inp, so);
in_pcbunref(inp);
return IPPROTO_DONE;
}
@@ -3535,15 +3612,18 @@ syn_cache_get(struct sockaddr *src, stru
struct inpcb *inp, *oldinp;
struct tcpcb *tp = NULL;
struct mbuf *am;
- struct socket *oso;
+ struct socket *oldso;
u_int rtableid;
NET_ASSERT_LOCKED();
+ inp = sotoinpcb(so);
+
mtx_enter(&syn_cache_mtx);
sc = syn_cache_lookup(src, dst, &scp, sotoinpcb(so)->inp_rtableid);
if (sc == NULL) {
mtx_leave(&syn_cache_mtx);
+ in_pcbsounlock_rele(inp, so);
return (NULL);
}
@@ -3557,6 +3637,7 @@ syn_cache_get(struct sockaddr *src, stru
refcnt_take(&sc->sc_refcnt);
mtx_leave(&syn_cache_mtx);
(void) syn_cache_respond(sc, m, now, do_ecn);
+ in_pcbsounlock_rele(inp, so);
syn_cache_put(sc);
return ((struct socket *)(-1));
}
@@ -3571,12 +3652,13 @@ syn_cache_get(struct sockaddr *src, stru
* connection when the SYN arrived. If we can't create
* the connection, abort it.
*/
- oso = so;
+ oldso = so;
+ oldinp = inp;
so = sonewconn(so, SS_ISCONNECTED, M_DONTWAIT);
if (so == NULL)
goto resetandabort;
-
- oldinp = sotoinpcb(oso);
+ soassertlocked(so);
+ soref(so);
inp = sotoinpcb(so);
#ifdef IPSEC
@@ -3637,7 +3719,7 @@ syn_cache_get(struct sockaddr *src, stru
(void) m_free(am);
tp = intotcpcb(inp);
- tp->t_flags = sototcpcb(oso)->t_flags & (TF_NOPUSH|TF_NODELAY);
+ tp->t_flags = sototcpcb(oldso)->t_flags & (TF_NOPUSH|TF_NODELAY);
if (sc->sc_request_r_scale != 15) {
tp->requested_s_scale = sc->sc_requested_s_scale;
tp->request_r_scale = sc->sc_request_r_scale;
@@ -3649,6 +3731,7 @@ syn_cache_get(struct sockaddr *src, stru
tp->t_template = tcp_template(tp);
if (tp->t_template == 0) {
tp = tcp_drop(tp, ENOBUFS); /* destroys socket */
+ in_pcbsounlock_rele(inp, so);
so = NULL;
goto abort;
}
@@ -3700,8 +3783,9 @@ syn_cache_get(struct sockaddr *src, stru
tp->rcv_adv = tp->rcv_nxt + sc->sc_win;
tp->last_ack_sent = tp->rcv_nxt;
- tcpstat_inc(tcps_sc_completed);
+ in_pcbsounlock_rele(oldinp, oldso);
syn_cache_put(sc);
+ tcpstat_inc(tcps_sc_completed);
return (so);
resetandabort:
@@ -3711,6 +3795,8 @@ abort:
m_freem(m);
if (so != NULL)
soabort(so);
+ in_pcbsounlock_rele(inp, so);
+ in_pcbsounlock_rele(oldinp, oldso);
syn_cache_put(sc);
tcpstat_inc(tcps_sc_aborted);
return ((struct socket *)(-1));
@@ -3744,8 +3830,8 @@ syn_cache_reset(struct sockaddr *src, st
}
syn_cache_rm(sc);
mtx_leave(&syn_cache_mtx);
- tcpstat_inc(tcps_sc_reset);
syn_cache_put(sc);
+ tcpstat_inc(tcps_sc_reset);
}
void
@@ -3785,8 +3871,8 @@ syn_cache_unreach(const struct sockaddr
syn_cache_rm(sc);
mtx_leave(&syn_cache_mtx);
- tcpstat_inc(tcps_sc_unreach);
syn_cache_put(sc);
+ tcpstat_inc(tcps_sc_unreach);
}
/*
@@ -3814,7 +3900,7 @@ syn_cache_add(struct sockaddr *src, stru
struct syn_cache_head *scp;
struct mbuf *ipopts;
- NET_ASSERT_LOCKED();
+ soassertlocked(so);
tp = sototcpcb(so);
@@ -3972,9 +4058,8 @@ syn_cache_add(struct sockaddr *src, stru
if (syn_cache_respond(sc, m, now, do_ecn) == 0) {
mtx_enter(&syn_cache_mtx);
/*
- * XXXSMP Currently exclusive netlock prevents another insert
- * after our syn_cache_lookup() and before syn_cache_insert().
- * Double insert should be handled and not rely on netlock.
+ * Socket lock prevents another insert after our
+ * syn_cache_lookup() and before syn_cache_insert().
*/
syn_cache_insert(sc, tp);
mtx_leave(&syn_cache_mtx);
Index: netinet/tcp_var.h
===================================================================
RCS file: /data/mirror/openbsd/cvs/src/sys/netinet/tcp_var.h,v
diff -u -p -r1.185 tcp_var.h
--- netinet/tcp_var.h 16 Jan 2025 11:59:20 -0000 1.185
+++ netinet/tcp_var.h 20 Jan 2025 10:44:00 -0000
@@ -718,6 +718,7 @@ int tcp_dooptions(struct tcpcb *, u_cha
struct mbuf *, int, struct tcp_opt_info *, u_int, uint64_t);
void tcp_init(void);
int tcp_input(struct mbuf **, int *, int, int);
+void tcp_input_mlist(struct mbuf_list *);
int tcp_mss(struct tcpcb *, int);
void tcp_mss_update(struct tcpcb *);
u_int tcp_hdrsz(struct tcpcb *);
Index: netinet6/in6_proto.c
===================================================================
RCS file: /data/mirror/openbsd/cvs/src/sys/netinet6/in6_proto.c,v
diff -u -p -r1.124 in6_proto.c
--- netinet6/in6_proto.c 5 Jan 2025 12:36:48 -0000 1.124
+++ netinet6/in6_proto.c 20 Jan 2025 10:44:24 -0000
@@ -147,7 +147,8 @@ const struct protosw inet6sw[] = {
.pr_type = SOCK_STREAM,
.pr_domain = &inet6domain,
.pr_protocol = IPPROTO_TCP,
- .pr_flags = PR_CONNREQUIRED|PR_WANTRCVD|PR_ABRTACPTDIS|PR_SPLICE,
+ .pr_flags = PR_CONNREQUIRED|PR_WANTRCVD|PR_ABRTACPTDIS|PR_SPLICE|
+ PR_MPINPUT,
.pr_input = tcp_input,
.pr_ctlinput = tcp6_ctlinput,
.pr_ctloutput = tcp_ctloutput,
Index: netinet6/nd6.c
===================================================================
RCS file: /data/mirror/openbsd/cvs/src/sys/netinet6/nd6.c,v
diff -u -p -r1.283 nd6.c
--- netinet6/nd6.c 4 Sep 2024 07:54:52 -0000 1.283
+++ netinet6/nd6.c 20 Jan 2025 10:44:24 -0000
@@ -709,7 +709,9 @@ nd6_nud_hint(struct rtentry *rt)
struct llinfo_nd6 *ln;
struct ifnet *ifp;
+ /* XXX
NET_ASSERT_LOCKED_EXCLUSIVE();
+ */
ifp = if_get(rt->rt_ifidx);
if (ifp == NULL)
Index: sys/mbuf.h
===================================================================
RCS file: /data/mirror/openbsd/cvs/src/sys/sys/mbuf.h,v
diff -u -p -r1.265 mbuf.h
--- sys/mbuf.h 5 Nov 2024 13:15:13 -0000 1.265
+++ sys/mbuf.h 20 Jan 2025 10:44:00 -0000
@@ -128,7 +128,7 @@ struct pkthdr {
int64_t ph_timestamp; /* packet timestamp */
int len; /* total packet length */
u_int ph_rtableid; /* routing table id */
- u_int ph_ifidx; /* rcv interface index */
+ u_int16_t ph_ifidx; /* rcv interface index */
u_int16_t ph_tagsset; /* mtags attached */
u_int16_t ph_flowid; /* pseudo unique flow id */
u_int16_t csum_flags; /* checksum flags */
@@ -136,6 +136,8 @@ struct pkthdr {
u_int16_t ph_mss; /* TCP max segment size */
u_int8_t ph_loopcnt; /* mbuf is looping in kernel */
u_int8_t ph_family; /* af, used when queueing */
+ u_int8_t ph_softidx; /* index of softnet thread */
+ u_int8_t ph_pad[1];
struct pkthdr_pf pf;
};
TCP parallel input preview