Download raw body.
parallel TCP input
On Wed, Feb 05, 2025 at 07:25:11PM +0100, Alexander Bluhm wrote:
> To remember the softnet thread and packet offset, I use the cookie
> field in the mbuf header. Hopefully nothing else uses cookies in
> this path.
We have struct netstack as a softnet thread local storage now. That
means we can temporarily store the TCP input mbuf list there. We
do not risk queueing the mbuf with softnet ph_cookie somewhere else.
Note the I still need the cookie to store the TCP header offset.
But no other subsystem will reuse the cookie while the mbuf is in
the TCP input mbuf list.
Mark Patruck has reported problems with wireguard and the previous
diff. Maybe they were releated to the mbuf cookie.
updated diff below
bluhm
Index: net/if.c
===================================================================
RCS file: /data/mirror/openbsd/cvs/src/sys/net/if.c,v
diff -u -p -r1.728 if.c
--- net/if.c 2 Mar 2025 21:28:31 -0000 1.728
+++ net/if.c 2 Mar 2025 22:28:06 -0000
@@ -1001,10 +1001,21 @@ if_input_process(struct ifnet *ifp, stru
*/
sn = net_sn(idx);
+ ml_init(&sn->sn_netstack.ns_tcp_ml);
+#ifdef INET6
+ ml_init(&sn->sn_netstack.ns_tcp6_ml);
+#endif
NET_LOCK_SHARED();
+
while ((m = ml_dequeue(ml)) != NULL)
(*ifp->if_input)(ifp, m, &sn->sn_netstack);
+
+ tcp_input_mlist(&sn->sn_netstack.ns_tcp_ml, AF_INET);
+#ifdef INET6
+ tcp_input_mlist(&sn->sn_netstack.ns_tcp6_ml, AF_INET6);
+#endif
+
NET_UNLOCK_SHARED();
}
Index: net/if_var.h
===================================================================
RCS file: /data/mirror/openbsd/cvs/src/sys/net/if_var.h,v
diff -u -p -r1.136 if_var.h
--- net/if_var.h 2 Mar 2025 21:28:32 -0000 1.136
+++ net/if_var.h 2 Mar 2025 22:28:06 -0000
@@ -92,7 +92,9 @@ struct task;
struct cpumem;
struct netstack {
- struct route ns_route;
+ struct route ns_route;
+ struct mbuf_list ns_tcp_ml;
+ struct mbuf_list ns_tcp6_ml;
};
/*
Index: netinet/in_proto.c
===================================================================
RCS file: /data/mirror/openbsd/cvs/src/sys/netinet/in_proto.c,v
diff -u -p -r1.121 in_proto.c
--- netinet/in_proto.c 5 Jan 2025 12:36:48 -0000 1.121
+++ netinet/in_proto.c 2 Mar 2025 22:28:06 -0000
@@ -197,7 +197,8 @@ const struct protosw inetsw[] = {
.pr_type = SOCK_STREAM,
.pr_domain = &inetdomain,
.pr_protocol = IPPROTO_TCP,
- .pr_flags = PR_CONNREQUIRED|PR_WANTRCVD|PR_ABRTACPTDIS|PR_SPLICE,
+ .pr_flags = PR_CONNREQUIRED|PR_WANTRCVD|PR_ABRTACPTDIS|PR_SPLICE|
+ PR_MPINPUT,
.pr_input = tcp_input,
.pr_ctlinput = tcp_ctlinput,
.pr_ctloutput = tcp_ctloutput,
Index: netinet/tcp_input.c
===================================================================
RCS file: /data/mirror/openbsd/cvs/src/sys/netinet/tcp_input.c,v
diff -u -p -r1.433 tcp_input.c
--- netinet/tcp_input.c 2 Mar 2025 21:28:32 -0000 1.433
+++ netinet/tcp_input.c 2 Mar 2025 22:28:06 -0000
@@ -100,9 +100,6 @@
#include <net/pfvar.h>
#endif
-int tcp_mss_adv(struct rtentry *, int);
-int tcp_flush_queue(struct tcpcb *);
-
#ifdef INET6
#include <netinet6/in6_var.h>
#include <netinet6/nd6.h>
@@ -177,6 +174,9 @@ do { \
if_put(ifp); \
} while (0)
+int tcp_input_solocked(struct mbuf **, int *, int, int, struct socket **);
+int tcp_mss_adv(struct rtentry *, int);
+int tcp_flush_queue(struct tcpcb *);
void tcp_sack_partialack(struct tcpcb *, struct tcphdr *);
void tcp_newreno_partialack(struct tcpcb *, struct tcphdr *);
@@ -347,12 +347,53 @@ tcp_flush_queue(struct tcpcb *tp)
return (flags);
}
+int
+tcp_input(struct mbuf **mp, int *offp, int proto, int af, struct netstack *ns)
+{
+ if (ns == NULL)
+ return tcp_input_solocked(mp, offp, proto, af, NULL);
+ (*mp)->m_pkthdr.ph_cookie = (void *)(long)(*offp);
+ switch (af) {
+ case AF_INET:
+ ml_enqueue(&ns->ns_tcp_ml, *mp);
+ break;
+#ifdef INET6
+ case AF_INET6:
+ ml_enqueue(&ns->ns_tcp6_ml, *mp);
+ break;
+#endif
+ default:
+ m_freemp(mp);
+ }
+ *mp = NULL;
+ return IPPROTO_DONE;
+}
+
+void
+tcp_input_mlist(struct mbuf_list *ml, int af)
+{
+ struct socket *so = NULL;
+ struct mbuf *m;
+
+ while ((m = ml_dequeue(ml)) != NULL) {
+ int off, nxt;
+
+ off = (long)m->m_pkthdr.ph_cookie;
+ m->m_pkthdr.ph_cookie = NULL;
+ nxt = tcp_input_solocked(&m, &off, IPPROTO_TCP, af, &so);
+ KASSERT(nxt == IPPROTO_DONE);
+ }
+
+ in_pcbsounlock_rele(NULL, so);
+}
+
/*
* TCP input routine, follows pages 65-76 of the
* protocol specification dated September, 1981 very closely.
*/
int
-tcp_input(struct mbuf **mp, int *offp, int proto, int af, struct netstack *ns)
+tcp_input_solocked(struct mbuf **mp, int *offp, int proto, int af,
+ struct socket ** solocked)
{
struct mbuf *m = *mp;
int iphlen = *offp;
@@ -603,6 +644,25 @@ findpcb:
tcpstat_inc(tcps_noport);
goto dropwithreset_ratelim;
}
+ /*
+ * Avoid needless lock and unlock operation when handling multiple
+ * TCP packets from the same stream consecutively.
+ */
+ if (solocked != NULL && *solocked != NULL &&
+ sotoinpcb(*solocked) == inp) {
+ so = *solocked;
+ *solocked = NULL;
+ } else {
+ if (solocked != NULL && *solocked != NULL) {
+ in_pcbsounlock_rele(NULL, *solocked);
+ *solocked = NULL;
+ }
+ so = in_pcbsolock_ref(inp);
+ }
+ if (so == NULL) {
+ tcpstat_inc(tcps_noport);
+ goto dropwithreset_ratelim;
+ }
KASSERT(sotoinpcb(inp->inp_socket) == inp);
KASSERT(intotcpcb(inp) == NULL || intotcpcb(inp)->t_inpcb == inp);
@@ -635,7 +695,6 @@ findpcb:
else
tiwin = th->th_win;
- so = inp->inp_socket;
if (so->so_options & (SO_DEBUG|SO_ACCEPTCONN)) {
union syn_cache_sa src;
union syn_cache_sa dst;
@@ -724,6 +783,7 @@ findpcb:
* in use for the reply,
* do not free it.
*/
+ so = NULL;
m = *mp = NULL;
goto drop;
} else {
@@ -731,13 +791,11 @@ findpcb:
* We have created a
* full-blown connection.
*/
- tp = NULL;
in_pcbunref(inp);
inp = in_pcbref(sotoinpcb(so));
tp = intotcpcb(inp);
if (tp == NULL)
goto badsyn; /*XXX*/
-
}
break;
@@ -843,6 +901,10 @@ findpcb:
tcpstat_inc(tcps_dropsyn);
goto drop;
}
+ if (solocked != NULL)
+ *solocked = so;
+ else
+ in_pcbsounlock_rele(inp, so);
in_pcbunref(inp);
return IPPROTO_DONE;
}
@@ -1018,6 +1080,10 @@ findpcb:
if (so->so_snd.sb_cc ||
tp->t_flags & TF_NEEDOUTPUT)
(void) tcp_output(tp);
+ if (solocked != NULL)
+ *solocked = so;
+ else
+ in_pcbsounlock_rele(inp, so);
in_pcbunref(inp);
return IPPROTO_DONE;
}
@@ -1068,6 +1134,10 @@ findpcb:
tp->t_flags &= ~TF_BLOCKOUTPUT;
if (tp->t_flags & (TF_ACKNOW|TF_NEEDOUTPUT))
(void) tcp_output(tp);
+ if (solocked != NULL)
+ *solocked = so;
+ else
+ in_pcbsounlock_rele(inp, so);
in_pcbunref(inp);
return IPPROTO_DONE;
}
@@ -1261,6 +1331,8 @@ trimthenstep6:
((arc4random() & 0x7fffffff) | 0x8000);
reuse = &iss;
tp = tcp_close(tp);
+ in_pcbsounlock_rele(inp, so);
+ so = NULL;
in_pcbunref(inp);
inp = NULL;
goto findpcb;
@@ -2065,6 +2137,10 @@ dodata: /* XXX */
*/
if (tp->t_flags & (TF_ACKNOW|TF_NEEDOUTPUT))
(void) tcp_output(tp);
+ if (solocked != NULL)
+ *solocked = so;
+ else
+ in_pcbsounlock_rele(inp, so);
in_pcbunref(inp);
return IPPROTO_DONE;
@@ -2094,6 +2170,10 @@ dropafterack:
m_freem(m);
tp->t_flags |= TF_ACKNOW;
(void) tcp_output(tp);
+ if (solocked != NULL)
+ *solocked = so;
+ else
+ in_pcbsounlock_rele(inp, so);
in_pcbunref(inp);
return IPPROTO_DONE;
@@ -2129,6 +2209,7 @@ dropwithreset:
(tcp_seq)0, TH_RST|TH_ACK, m->m_pkthdr.ph_rtableid, now);
}
m_freem(m);
+ in_pcbsounlock_rele(inp, so);
in_pcbunref(inp);
return IPPROTO_DONE;
@@ -2140,6 +2221,7 @@ drop:
tcp_trace(TA_DROP, ostate, tp, otp, &saveti.caddr, 0, tlen);
m_freem(m);
+ in_pcbsounlock_rele(inp, so);
in_pcbunref(inp);
return IPPROTO_DONE;
}
@@ -3543,6 +3625,7 @@ syn_cache_get(struct sockaddr *src, stru
sc = syn_cache_lookup(src, dst, &scp, inp->inp_rtableid);
if (sc == NULL) {
mtx_leave(&syn_cache_mtx);
+ in_pcbsounlock_rele(inp, so);
return (NULL);
}
@@ -3556,6 +3639,7 @@ syn_cache_get(struct sockaddr *src, stru
refcnt_take(&sc->sc_refcnt);
mtx_leave(&syn_cache_mtx);
(void) syn_cache_respond(sc, m, now, do_ecn);
+ in_pcbsounlock_rele(inp, so);
syn_cache_put(sc);
return ((struct socket *)(-1));
}
@@ -3696,7 +3780,7 @@ syn_cache_get(struct sockaddr *src, stru
tp->rcv_adv = tp->rcv_nxt + sc->sc_win;
tp->last_ack_sent = tp->rcv_nxt;
- in_pcbsounlock_rele(inp, so);
+ in_pcbsounlock_rele(listeninp, listenso);
tcpstat_inc(tcps_sc_completed);
syn_cache_put(sc);
return (so);
@@ -3709,6 +3793,7 @@ abort:
tp = tcp_drop(tp, ECONNABORTED); /* destroys socket */
m_freem(m);
in_pcbsounlock_rele(inp, so);
+ in_pcbsounlock_rele(listeninp, listenso);
syn_cache_put(sc);
tcpstat_inc(tcps_sc_aborted);
return ((struct socket *)(-1));
@@ -3813,7 +3898,7 @@ syn_cache_add(struct sockaddr *src, stru
struct mbuf *ipopts;
struct rtentry *rt = NULL;
- NET_ASSERT_LOCKED();
+ soassertlocked(so);
tp = sototcpcb(so);
@@ -3989,9 +4074,8 @@ syn_cache_add(struct sockaddr *src, stru
if (syn_cache_respond(sc, m, now, do_ecn) == 0) {
mtx_enter(&syn_cache_mtx);
/*
- * XXXSMP Currently exclusive netlock prevents another insert
- * after our syn_cache_lookup() and before syn_cache_insert().
- * Double insert should be handled and not rely on netlock.
+ * Socket lock prevents another insert after our
+ * syn_cache_lookup() and before syn_cache_insert().
*/
syn_cache_insert(sc, tp);
mtx_leave(&syn_cache_mtx);
Index: netinet/tcp_var.h
===================================================================
RCS file: /data/mirror/openbsd/cvs/src/sys/netinet/tcp_var.h,v
diff -u -p -r1.186 tcp_var.h
--- netinet/tcp_var.h 2 Mar 2025 21:28:32 -0000 1.186
+++ netinet/tcp_var.h 2 Mar 2025 22:28:06 -0000
@@ -718,6 +718,7 @@ int tcp_dooptions(struct tcpcb *, u_cha
struct mbuf *, int, struct tcp_opt_info *, u_int, uint64_t);
void tcp_init(void);
int tcp_input(struct mbuf **, int *, int, int, struct netstack *);
+void tcp_input_mlist(struct mbuf_list *, int);
int tcp_mss(struct tcpcb *, int);
void tcp_mss_update(struct tcpcb *);
u_int tcp_hdrsz(struct tcpcb *);
Index: netinet6/in6_proto.c
===================================================================
RCS file: /data/mirror/openbsd/cvs/src/sys/netinet6/in6_proto.c,v
diff -u -p -r1.124 in6_proto.c
--- netinet6/in6_proto.c 5 Jan 2025 12:36:48 -0000 1.124
+++ netinet6/in6_proto.c 2 Mar 2025 22:28:06 -0000
@@ -147,7 +147,8 @@ const struct protosw inet6sw[] = {
.pr_type = SOCK_STREAM,
.pr_domain = &inet6domain,
.pr_protocol = IPPROTO_TCP,
- .pr_flags = PR_CONNREQUIRED|PR_WANTRCVD|PR_ABRTACPTDIS|PR_SPLICE,
+ .pr_flags = PR_CONNREQUIRED|PR_WANTRCVD|PR_ABRTACPTDIS|PR_SPLICE|
+ PR_MPINPUT,
.pr_input = tcp_input,
.pr_ctlinput = tcp6_ctlinput,
.pr_ctloutput = tcp_ctloutput,
parallel TCP input