Download raw body.
TCP parallel input preview
Hi,
This is my idea to unlock TCP input and run it in parallel. On my
4 CPU test machine I gain 100% percent TCP throughput when running
10 TCP streams in parallel. Unfortunately single TCP performance
drops by 25%.
Before it can be commited these issues have to be resolved:
- sonewconn() must return locked socket a bit more consistently
- nd6_nud_hint() must be made MP safe
- single stream performance must not be much slower than currently
In my flame graphs I see contention on the socket lock between
soreceive() and tcp_input().
http://bluhm.genua.de/perform/results/2025-01-14T10%3A27%3A19Z/patch-sys-tcp-mpinput.0/btrace/iperf3_-c10.3.45.35_-w1m_-t10_-R-btrace-kstack.0.svg?s=rw_
With current exclusive net lock the packets are queued before
tcp_input(). With this single lock we do bulk input which seems
to perform better. Getting the socket lock for each TCP packet is
slow.
Multiple streams on multiple CPU compensate the locking effect.
http://bluhm.genua.de/perform/results/2025-01-14T10%3A27%3A19Z/patch-sys-tcp-mpinput.0/btrace/iperf3_-c10.3.45.35_-w1m_-P10_-t10_-R-btrace-kstack.0.svg?s=rw_
bluhm
Index: kern/uipc_socket2.c
===================================================================
RCS file: /data/mirror/openbsd/cvs/src/sys/kern/uipc_socket2.c,v
diff -u -p -r1.164 uipc_socket2.c
--- kern/uipc_socket2.c 5 Jan 2025 12:36:48 -0000 1.164
+++ kern/uipc_socket2.c 15 Jan 2025 15:23:05 -0000
@@ -220,6 +220,8 @@ sonewconn(struct socket *head, int conns
*/
if (persocket)
solock(so);
+ else
+ rw_enter_write(&so->so_lock);
/*
* Inherit watermarks but those may get clamped in low mem situations.
@@ -260,6 +262,8 @@ sonewconn(struct socket *head, int conns
fail:
if (persocket)
sounlock(so);
+ else
+ rw_exit_write(&so->so_lock);
sigio_free(&so->so_sigio);
klist_free(&so->so_rcv.sb_klist);
klist_free(&so->so_snd.sb_klist);
Index: netinet/in_proto.c
===================================================================
RCS file: /data/mirror/openbsd/cvs/src/sys/netinet/in_proto.c,v
diff -u -p -r1.121 in_proto.c
--- netinet/in_proto.c 5 Jan 2025 12:36:48 -0000 1.121
+++ netinet/in_proto.c 15 Jan 2025 15:23:05 -0000
@@ -197,7 +197,8 @@ const struct protosw inetsw[] = {
.pr_type = SOCK_STREAM,
.pr_domain = &inetdomain,
.pr_protocol = IPPROTO_TCP,
- .pr_flags = PR_CONNREQUIRED|PR_WANTRCVD|PR_ABRTACPTDIS|PR_SPLICE,
+ .pr_flags = PR_CONNREQUIRED|PR_WANTRCVD|PR_ABRTACPTDIS|PR_SPLICE|
+ PR_MPINPUT,
.pr_input = tcp_input,
.pr_ctlinput = tcp_ctlinput,
.pr_ctloutput = tcp_ctloutput,
Index: netinet/tcp_input.c
===================================================================
RCS file: /data/mirror/openbsd/cvs/src/sys/netinet/tcp_input.c,v
diff -u -p -r1.422 tcp_input.c
--- netinet/tcp_input.c 10 Jan 2025 20:19:03 -0000 1.422
+++ netinet/tcp_input.c 15 Jan 2025 15:26:07 -0000
@@ -605,6 +605,11 @@ findpcb:
tcpstat_inc(tcps_noport);
goto dropwithreset_ratelim;
}
+ so = in_pcbsolock_ref(inp);
+ if (so == NULL) {
+ tcpstat_inc(tcps_noport);
+ goto dropwithreset_ratelim;
+ }
KASSERT(sotoinpcb(inp->inp_socket) == inp);
KASSERT(intotcpcb(inp) == NULL || intotcpcb(inp)->t_inpcb == inp);
@@ -637,7 +642,6 @@ findpcb:
else
tiwin = th->th_win;
- so = inp->inp_socket;
if (so->so_options & (SO_DEBUG|SO_ACCEPTCONN)) {
union syn_cache_sa src;
union syn_cache_sa dst;
@@ -726,6 +730,7 @@ findpcb:
* in use for the reply,
* do not free it.
*/
+ so = NULL;
m = *mp = NULL;
goto drop;
} else {
@@ -733,13 +738,11 @@ findpcb:
* We have created a
* full-blown connection.
*/
- tp = NULL;
in_pcbunref(inp);
inp = in_pcbref(sotoinpcb(so));
tp = intotcpcb(inp);
if (tp == NULL)
goto badsyn; /*XXX*/
-
}
break;
@@ -845,6 +848,7 @@ findpcb:
tcpstat_inc(tcps_dropsyn);
goto drop;
}
+ in_pcbsounlock_rele(inp, so);
in_pcbunref(inp);
return IPPROTO_DONE;
}
@@ -1020,6 +1024,7 @@ findpcb:
if (so->so_snd.sb_cc ||
tp->t_flags & TF_NEEDOUTPUT)
(void) tcp_output(tp);
+ in_pcbsounlock_rele(inp, so);
in_pcbunref(inp);
return IPPROTO_DONE;
}
@@ -1070,6 +1075,7 @@ findpcb:
tp->t_flags &= ~TF_BLOCKOUTPUT;
if (tp->t_flags & (TF_ACKNOW|TF_NEEDOUTPUT))
(void) tcp_output(tp);
+ in_pcbsounlock_rele(inp, so);
in_pcbunref(inp);
return IPPROTO_DONE;
}
@@ -1262,6 +1268,8 @@ trimthenstep6:
((arc4random() & 0x7fffffff) | 0x8000);
reuse = &iss;
tp = tcp_close(tp);
+ in_pcbsounlock_rele(inp, so);
+ so = NULL;
in_pcbunref(inp);
inp = NULL;
goto findpcb;
@@ -2062,6 +2070,7 @@ dodata: /* XXX */
*/
if (tp->t_flags & (TF_ACKNOW|TF_NEEDOUTPUT))
(void) tcp_output(tp);
+ in_pcbsounlock_rele(inp, so);
in_pcbunref(inp);
return IPPROTO_DONE;
@@ -2091,6 +2100,7 @@ dropafterack:
m_freem(m);
tp->t_flags |= TF_ACKNOW;
(void) tcp_output(tp);
+ in_pcbsounlock_rele(inp, so);
in_pcbunref(inp);
return IPPROTO_DONE;
@@ -2126,6 +2136,7 @@ dropwithreset:
(tcp_seq)0, TH_RST|TH_ACK, m->m_pkthdr.ph_rtableid, now);
}
m_freem(m);
+ in_pcbsounlock_rele(inp, so);
in_pcbunref(inp);
return IPPROTO_DONE;
@@ -2137,6 +2148,7 @@ drop:
tcp_trace(TA_DROP, ostate, tp, otp, &saveti.caddr, 0, tlen);
m_freem(m);
+ in_pcbsounlock_rele(inp, so);
in_pcbunref(inp);
return IPPROTO_DONE;
}
@@ -3531,15 +3543,18 @@ syn_cache_get(struct sockaddr *src, stru
struct inpcb *inp, *oldinp;
struct tcpcb *tp = NULL;
struct mbuf *am;
- struct socket *oso;
+ struct socket *oldso;
u_int rtableid;
NET_ASSERT_LOCKED();
+ inp = sotoinpcb(so);
+
mtx_enter(&syn_cache_mtx);
sc = syn_cache_lookup(src, dst, &scp, sotoinpcb(so)->inp_rtableid);
if (sc == NULL) {
mtx_leave(&syn_cache_mtx);
+ in_pcbsounlock_rele(inp, so);
return (NULL);
}
@@ -3553,6 +3568,7 @@ syn_cache_get(struct sockaddr *src, stru
refcnt_take(&sc->sc_refcnt);
mtx_leave(&syn_cache_mtx);
(void) syn_cache_respond(sc, m, now, do_ecn);
+ in_pcbsounlock_rele(inp, so);
syn_cache_put(sc);
return ((struct socket *)(-1));
}
@@ -3567,12 +3583,13 @@ syn_cache_get(struct sockaddr *src, stru
* connection when the SYN arrived. If we can't create
* the connection, abort it.
*/
- oso = so;
+ oldso = so;
+ oldinp = inp;
so = sonewconn(so, SS_ISCONNECTED, M_DONTWAIT);
if (so == NULL)
goto resetandabort;
-
- oldinp = sotoinpcb(oso);
+ soassertlocked(so);
+ soref(so);
inp = sotoinpcb(so);
#ifdef IPSEC
@@ -3633,7 +3650,7 @@ syn_cache_get(struct sockaddr *src, stru
(void) m_free(am);
tp = intotcpcb(inp);
- tp->t_flags = sototcpcb(oso)->t_flags & (TF_NOPUSH|TF_NODELAY);
+ tp->t_flags = sototcpcb(oldso)->t_flags & (TF_NOPUSH|TF_NODELAY);
if (sc->sc_request_r_scale != 15) {
tp->requested_s_scale = sc->sc_requested_s_scale;
tp->request_r_scale = sc->sc_request_r_scale;
@@ -3645,6 +3662,7 @@ syn_cache_get(struct sockaddr *src, stru
tp->t_template = tcp_template(tp);
if (tp->t_template == 0) {
tp = tcp_drop(tp, ENOBUFS); /* destroys socket */
+ in_pcbsounlock_rele(inp, so);
so = NULL;
goto abort;
}
@@ -3696,8 +3714,9 @@ syn_cache_get(struct sockaddr *src, stru
tp->rcv_adv = tp->rcv_nxt + sc->sc_win;
tp->last_ack_sent = tp->rcv_nxt;
- tcpstat_inc(tcps_sc_completed);
+ in_pcbsounlock_rele(oldinp, oldso);
syn_cache_put(sc);
+ tcpstat_inc(tcps_sc_completed);
return (so);
resetandabort:
@@ -3707,6 +3726,8 @@ abort:
m_freem(m);
if (so != NULL)
soabort(so);
+ in_pcbsounlock_rele(inp, so);
+ in_pcbsounlock_rele(oldinp, oldso);
syn_cache_put(sc);
tcpstat_inc(tcps_sc_aborted);
return ((struct socket *)(-1));
@@ -3740,8 +3761,8 @@ syn_cache_reset(struct sockaddr *src, st
}
syn_cache_rm(sc);
mtx_leave(&syn_cache_mtx);
- tcpstat_inc(tcps_sc_reset);
syn_cache_put(sc);
+ tcpstat_inc(tcps_sc_reset);
}
void
@@ -3781,8 +3802,8 @@ syn_cache_unreach(const struct sockaddr
syn_cache_rm(sc);
mtx_leave(&syn_cache_mtx);
- tcpstat_inc(tcps_sc_unreach);
syn_cache_put(sc);
+ tcpstat_inc(tcps_sc_unreach);
}
/*
@@ -3810,7 +3831,7 @@ syn_cache_add(struct sockaddr *src, stru
struct syn_cache_head *scp;
struct mbuf *ipopts;
- NET_ASSERT_LOCKED();
+ soassertlocked(so);
tp = sototcpcb(so);
@@ -3968,9 +3989,8 @@ syn_cache_add(struct sockaddr *src, stru
if (syn_cache_respond(sc, m, now, do_ecn) == 0) {
mtx_enter(&syn_cache_mtx);
/*
- * XXXSMP Currently exclusive netlock prevents another insert
- * after our syn_cache_lookup() and before syn_cache_insert().
- * Double insert should be handled and not rely on netlock.
+ * Socket lock prevents another insert after our
+ * syn_cache_lookup() and before syn_cache_insert().
*/
syn_cache_insert(sc, tp);
mtx_leave(&syn_cache_mtx);
Index: netinet6/in6_proto.c
===================================================================
RCS file: /data/mirror/openbsd/cvs/src/sys/netinet6/in6_proto.c,v
diff -u -p -r1.124 in6_proto.c
--- netinet6/in6_proto.c 5 Jan 2025 12:36:48 -0000 1.124
+++ netinet6/in6_proto.c 15 Jan 2025 15:23:05 -0000
@@ -147,7 +147,8 @@ const struct protosw inet6sw[] = {
.pr_type = SOCK_STREAM,
.pr_domain = &inet6domain,
.pr_protocol = IPPROTO_TCP,
- .pr_flags = PR_CONNREQUIRED|PR_WANTRCVD|PR_ABRTACPTDIS|PR_SPLICE,
+ .pr_flags = PR_CONNREQUIRED|PR_WANTRCVD|PR_ABRTACPTDIS|PR_SPLICE|
+ PR_MPINPUT,
.pr_input = tcp_input,
.pr_ctlinput = tcp6_ctlinput,
.pr_ctloutput = tcp_ctloutput,
Index: netinet6/nd6.c
===================================================================
RCS file: /data/mirror/openbsd/cvs/src/sys/netinet6/nd6.c,v
diff -u -p -r1.283 nd6.c
--- netinet6/nd6.c 4 Sep 2024 07:54:52 -0000 1.283
+++ netinet6/nd6.c 15 Jan 2025 15:23:05 -0000
@@ -709,7 +709,9 @@ nd6_nud_hint(struct rtentry *rt)
struct llinfo_nd6 *ln;
struct ifnet *ifp;
+ /* XXX
NET_ASSERT_LOCKED_EXCLUSIVE();
+ */
ifp = if_get(rt->rt_ifidx);
if (ifp == NULL)
TCP parallel input preview