From: Alexander Bluhm Subject: TCP parallel input preview To: tech@openbsd.org Date: Wed, 15 Jan 2025 16:49:37 +0100 Hi, This is my idea to unlock TCP input and run it in parallel. On my 4 CPU test machine I gain 100% percent TCP throughput when running 10 TCP streams in parallel. Unfortunately single TCP performance drops by 25%. Before it can be commited these issues have to be resolved: - sonewconn() must return locked socket a bit more consistently - nd6_nud_hint() must be made MP safe - single stream performance must not be much slower than currently In my flame graphs I see contention on the socket lock between soreceive() and tcp_input(). http://bluhm.genua.de/perform/results/2025-01-14T10%3A27%3A19Z/patch-sys-tcp-mpinput.0/btrace/iperf3_-c10.3.45.35_-w1m_-t10_-R-btrace-kstack.0.svg?s=rw_ With current exclusive net lock the packets are queued before tcp_input(). With this single lock we do bulk input which seems to perform better. Getting the socket lock for each TCP packet is slow. Multiple streams on multiple CPU compensate the locking effect. http://bluhm.genua.de/perform/results/2025-01-14T10%3A27%3A19Z/patch-sys-tcp-mpinput.0/btrace/iperf3_-c10.3.45.35_-w1m_-P10_-t10_-R-btrace-kstack.0.svg?s=rw_ bluhm Index: kern/uipc_socket2.c =================================================================== RCS file: /data/mirror/openbsd/cvs/src/sys/kern/uipc_socket2.c,v diff -u -p -r1.164 uipc_socket2.c --- kern/uipc_socket2.c 5 Jan 2025 12:36:48 -0000 1.164 +++ kern/uipc_socket2.c 15 Jan 2025 15:23:05 -0000 @@ -220,6 +220,8 @@ sonewconn(struct socket *head, int conns */ if (persocket) solock(so); + else + rw_enter_write(&so->so_lock); /* * Inherit watermarks but those may get clamped in low mem situations. @@ -260,6 +262,8 @@ sonewconn(struct socket *head, int conns fail: if (persocket) sounlock(so); + else + rw_exit_write(&so->so_lock); sigio_free(&so->so_sigio); klist_free(&so->so_rcv.sb_klist); klist_free(&so->so_snd.sb_klist); Index: netinet/in_proto.c =================================================================== RCS file: /data/mirror/openbsd/cvs/src/sys/netinet/in_proto.c,v diff -u -p -r1.121 in_proto.c --- netinet/in_proto.c 5 Jan 2025 12:36:48 -0000 1.121 +++ netinet/in_proto.c 15 Jan 2025 15:23:05 -0000 @@ -197,7 +197,8 @@ const struct protosw inetsw[] = { .pr_type = SOCK_STREAM, .pr_domain = &inetdomain, .pr_protocol = IPPROTO_TCP, - .pr_flags = PR_CONNREQUIRED|PR_WANTRCVD|PR_ABRTACPTDIS|PR_SPLICE, + .pr_flags = PR_CONNREQUIRED|PR_WANTRCVD|PR_ABRTACPTDIS|PR_SPLICE| + PR_MPINPUT, .pr_input = tcp_input, .pr_ctlinput = tcp_ctlinput, .pr_ctloutput = tcp_ctloutput, Index: netinet/tcp_input.c =================================================================== RCS file: /data/mirror/openbsd/cvs/src/sys/netinet/tcp_input.c,v diff -u -p -r1.422 tcp_input.c --- netinet/tcp_input.c 10 Jan 2025 20:19:03 -0000 1.422 +++ netinet/tcp_input.c 15 Jan 2025 15:26:07 -0000 @@ -605,6 +605,11 @@ findpcb: tcpstat_inc(tcps_noport); goto dropwithreset_ratelim; } + so = in_pcbsolock_ref(inp); + if (so == NULL) { + tcpstat_inc(tcps_noport); + goto dropwithreset_ratelim; + } KASSERT(sotoinpcb(inp->inp_socket) == inp); KASSERT(intotcpcb(inp) == NULL || intotcpcb(inp)->t_inpcb == inp); @@ -637,7 +642,6 @@ findpcb: else tiwin = th->th_win; - so = inp->inp_socket; if (so->so_options & (SO_DEBUG|SO_ACCEPTCONN)) { union syn_cache_sa src; union syn_cache_sa dst; @@ -726,6 +730,7 @@ findpcb: * in use for the reply, * do not free it. */ + so = NULL; m = *mp = NULL; goto drop; } else { @@ -733,13 +738,11 @@ findpcb: * We have created a * full-blown connection. */ - tp = NULL; in_pcbunref(inp); inp = in_pcbref(sotoinpcb(so)); tp = intotcpcb(inp); if (tp == NULL) goto badsyn; /*XXX*/ - } break; @@ -845,6 +848,7 @@ findpcb: tcpstat_inc(tcps_dropsyn); goto drop; } + in_pcbsounlock_rele(inp, so); in_pcbunref(inp); return IPPROTO_DONE; } @@ -1020,6 +1024,7 @@ findpcb: if (so->so_snd.sb_cc || tp->t_flags & TF_NEEDOUTPUT) (void) tcp_output(tp); + in_pcbsounlock_rele(inp, so); in_pcbunref(inp); return IPPROTO_DONE; } @@ -1070,6 +1075,7 @@ findpcb: tp->t_flags &= ~TF_BLOCKOUTPUT; if (tp->t_flags & (TF_ACKNOW|TF_NEEDOUTPUT)) (void) tcp_output(tp); + in_pcbsounlock_rele(inp, so); in_pcbunref(inp); return IPPROTO_DONE; } @@ -1262,6 +1268,8 @@ trimthenstep6: ((arc4random() & 0x7fffffff) | 0x8000); reuse = &iss; tp = tcp_close(tp); + in_pcbsounlock_rele(inp, so); + so = NULL; in_pcbunref(inp); inp = NULL; goto findpcb; @@ -2062,6 +2070,7 @@ dodata: /* XXX */ */ if (tp->t_flags & (TF_ACKNOW|TF_NEEDOUTPUT)) (void) tcp_output(tp); + in_pcbsounlock_rele(inp, so); in_pcbunref(inp); return IPPROTO_DONE; @@ -2091,6 +2100,7 @@ dropafterack: m_freem(m); tp->t_flags |= TF_ACKNOW; (void) tcp_output(tp); + in_pcbsounlock_rele(inp, so); in_pcbunref(inp); return IPPROTO_DONE; @@ -2126,6 +2136,7 @@ dropwithreset: (tcp_seq)0, TH_RST|TH_ACK, m->m_pkthdr.ph_rtableid, now); } m_freem(m); + in_pcbsounlock_rele(inp, so); in_pcbunref(inp); return IPPROTO_DONE; @@ -2137,6 +2148,7 @@ drop: tcp_trace(TA_DROP, ostate, tp, otp, &saveti.caddr, 0, tlen); m_freem(m); + in_pcbsounlock_rele(inp, so); in_pcbunref(inp); return IPPROTO_DONE; } @@ -3531,15 +3543,18 @@ syn_cache_get(struct sockaddr *src, stru struct inpcb *inp, *oldinp; struct tcpcb *tp = NULL; struct mbuf *am; - struct socket *oso; + struct socket *oldso; u_int rtableid; NET_ASSERT_LOCKED(); + inp = sotoinpcb(so); + mtx_enter(&syn_cache_mtx); sc = syn_cache_lookup(src, dst, &scp, sotoinpcb(so)->inp_rtableid); if (sc == NULL) { mtx_leave(&syn_cache_mtx); + in_pcbsounlock_rele(inp, so); return (NULL); } @@ -3553,6 +3568,7 @@ syn_cache_get(struct sockaddr *src, stru refcnt_take(&sc->sc_refcnt); mtx_leave(&syn_cache_mtx); (void) syn_cache_respond(sc, m, now, do_ecn); + in_pcbsounlock_rele(inp, so); syn_cache_put(sc); return ((struct socket *)(-1)); } @@ -3567,12 +3583,13 @@ syn_cache_get(struct sockaddr *src, stru * connection when the SYN arrived. If we can't create * the connection, abort it. */ - oso = so; + oldso = so; + oldinp = inp; so = sonewconn(so, SS_ISCONNECTED, M_DONTWAIT); if (so == NULL) goto resetandabort; - - oldinp = sotoinpcb(oso); + soassertlocked(so); + soref(so); inp = sotoinpcb(so); #ifdef IPSEC @@ -3633,7 +3650,7 @@ syn_cache_get(struct sockaddr *src, stru (void) m_free(am); tp = intotcpcb(inp); - tp->t_flags = sototcpcb(oso)->t_flags & (TF_NOPUSH|TF_NODELAY); + tp->t_flags = sototcpcb(oldso)->t_flags & (TF_NOPUSH|TF_NODELAY); if (sc->sc_request_r_scale != 15) { tp->requested_s_scale = sc->sc_requested_s_scale; tp->request_r_scale = sc->sc_request_r_scale; @@ -3645,6 +3662,7 @@ syn_cache_get(struct sockaddr *src, stru tp->t_template = tcp_template(tp); if (tp->t_template == 0) { tp = tcp_drop(tp, ENOBUFS); /* destroys socket */ + in_pcbsounlock_rele(inp, so); so = NULL; goto abort; } @@ -3696,8 +3714,9 @@ syn_cache_get(struct sockaddr *src, stru tp->rcv_adv = tp->rcv_nxt + sc->sc_win; tp->last_ack_sent = tp->rcv_nxt; - tcpstat_inc(tcps_sc_completed); + in_pcbsounlock_rele(oldinp, oldso); syn_cache_put(sc); + tcpstat_inc(tcps_sc_completed); return (so); resetandabort: @@ -3707,6 +3726,8 @@ abort: m_freem(m); if (so != NULL) soabort(so); + in_pcbsounlock_rele(inp, so); + in_pcbsounlock_rele(oldinp, oldso); syn_cache_put(sc); tcpstat_inc(tcps_sc_aborted); return ((struct socket *)(-1)); @@ -3740,8 +3761,8 @@ syn_cache_reset(struct sockaddr *src, st } syn_cache_rm(sc); mtx_leave(&syn_cache_mtx); - tcpstat_inc(tcps_sc_reset); syn_cache_put(sc); + tcpstat_inc(tcps_sc_reset); } void @@ -3781,8 +3802,8 @@ syn_cache_unreach(const struct sockaddr syn_cache_rm(sc); mtx_leave(&syn_cache_mtx); - tcpstat_inc(tcps_sc_unreach); syn_cache_put(sc); + tcpstat_inc(tcps_sc_unreach); } /* @@ -3810,7 +3831,7 @@ syn_cache_add(struct sockaddr *src, stru struct syn_cache_head *scp; struct mbuf *ipopts; - NET_ASSERT_LOCKED(); + soassertlocked(so); tp = sototcpcb(so); @@ -3968,9 +3989,8 @@ syn_cache_add(struct sockaddr *src, stru if (syn_cache_respond(sc, m, now, do_ecn) == 0) { mtx_enter(&syn_cache_mtx); /* - * XXXSMP Currently exclusive netlock prevents another insert - * after our syn_cache_lookup() and before syn_cache_insert(). - * Double insert should be handled and not rely on netlock. + * Socket lock prevents another insert after our + * syn_cache_lookup() and before syn_cache_insert(). */ syn_cache_insert(sc, tp); mtx_leave(&syn_cache_mtx); Index: netinet6/in6_proto.c =================================================================== RCS file: /data/mirror/openbsd/cvs/src/sys/netinet6/in6_proto.c,v diff -u -p -r1.124 in6_proto.c --- netinet6/in6_proto.c 5 Jan 2025 12:36:48 -0000 1.124 +++ netinet6/in6_proto.c 15 Jan 2025 15:23:05 -0000 @@ -147,7 +147,8 @@ const struct protosw inet6sw[] = { .pr_type = SOCK_STREAM, .pr_domain = &inet6domain, .pr_protocol = IPPROTO_TCP, - .pr_flags = PR_CONNREQUIRED|PR_WANTRCVD|PR_ABRTACPTDIS|PR_SPLICE, + .pr_flags = PR_CONNREQUIRED|PR_WANTRCVD|PR_ABRTACPTDIS|PR_SPLICE| + PR_MPINPUT, .pr_input = tcp_input, .pr_ctlinput = tcp6_ctlinput, .pr_ctloutput = tcp_ctloutput, Index: netinet6/nd6.c =================================================================== RCS file: /data/mirror/openbsd/cvs/src/sys/netinet6/nd6.c,v diff -u -p -r1.283 nd6.c --- netinet6/nd6.c 4 Sep 2024 07:54:52 -0000 1.283 +++ netinet6/nd6.c 15 Jan 2025 15:23:05 -0000 @@ -709,7 +709,9 @@ nd6_nud_hint(struct rtentry *rt) struct llinfo_nd6 *ln; struct ifnet *ifp; + /* XXX NET_ASSERT_LOCKED_EXCLUSIVE(); + */ ifp = if_get(rt->rt_ifidx); if (ifp == NULL)