Download raw body.
TCP parallel input preview
> On 15 Jan 2025, at 18:49, Alexander Bluhm <bluhm@openbsd.org> wrote:
>
> Hi,
>
> This is my idea to unlock TCP input and run it in parallel. On my
> 4 CPU test machine I gain 100% percent TCP throughput when running
> 10 TCP streams in parallel. Unfortunately single TCP performance
> drops by 25%.
>
> Before it can be commited these issues have to be resolved:
> - sonewconn() must return locked socket a bit more consistently
> - nd6_nud_hint() must be made MP safe
> - single stream performance must not be much slower than currently
>
> In my flame graphs I see contention on the socket lock between
> soreceive() and tcp_input().
> http://bluhm.genua.de/perform/results/2025-01-14T10%3A27%3A19Z/patch-sys-tcp-mpinput.0/btrace/iperf3_-c10.3.45.35_-w1m_-t10_-R-btrace-kstack.0.svg?s=rw_
>
> With current exclusive net lock the packets are queued before
> tcp_input(). With this single lock we do bulk input which seems
> to perform better. Getting the socket lock for each TCP packet is
> slow.
>
> Multiple streams on multiple CPU compensate the locking effect.
> http://bluhm.genua.de/perform/results/2025-01-14T10%3A27%3A19Z/patch-sys-tcp-mpinput.0/btrace/iperf3_-c10.3.45.35_-w1m_-P10_-t10_-R-btrace-kstack.0.svg?s=rw_
>
> bluhm
>
> Index: kern/uipc_socket2.c
> ===================================================================
> RCS file: /data/mirror/openbsd/cvs/src/sys/kern/uipc_socket2.c,v
> diff -u -p -r1.164 uipc_socket2.c
> --- kern/uipc_socket2.c 5 Jan 2025 12:36:48 -0000 1.164
> +++ kern/uipc_socket2.c 15 Jan 2025 15:23:05 -0000
> @@ -220,6 +220,8 @@ sonewconn(struct socket *head, int conns
> */
> if (persocket)
> solock(so);
> + else
> + rw_enter_write(&so->so_lock);
>
> /*
> * Inherit watermarks but those may get clamped in low mem situations.
> @@ -260,6 +262,8 @@ sonewconn(struct socket *head, int conns
> fail:
> if (persocket)
> sounlock(so);
> + else
> + rw_exit_write(&so->so_lock);
> sigio_free(&so->so_sigio);
> klist_free(&so->so_rcv.sb_klist);
> klist_free(&so->so_snd.sb_klist);
> Index: netinet/in_proto.c
> ===================================================================
> RCS file: /data/mirror/openbsd/cvs/src/sys/netinet/in_proto.c,v
> diff -u -p -r1.121 in_proto.c
> --- netinet/in_proto.c 5 Jan 2025 12:36:48 -0000 1.121
> +++ netinet/in_proto.c 15 Jan 2025 15:23:05 -0000
> @@ -197,7 +197,8 @@ const struct protosw inetsw[] = {
> .pr_type = SOCK_STREAM,
> .pr_domain = &inetdomain,
> .pr_protocol = IPPROTO_TCP,
> - .pr_flags = PR_CONNREQUIRED|PR_WANTRCVD|PR_ABRTACPTDIS|PR_SPLICE,
> + .pr_flags = PR_CONNREQUIRED|PR_WANTRCVD|PR_ABRTACPTDIS|PR_SPLICE|
> + PR_MPINPUT,
> .pr_input = tcp_input,
> .pr_ctlinput = tcp_ctlinput,
> .pr_ctloutput = tcp_ctloutput,
> Index: netinet/tcp_input.c
> ===================================================================
> RCS file: /data/mirror/openbsd/cvs/src/sys/netinet/tcp_input.c,v
> diff -u -p -r1.422 tcp_input.c
> --- netinet/tcp_input.c 10 Jan 2025 20:19:03 -0000 1.422
> +++ netinet/tcp_input.c 15 Jan 2025 15:26:07 -0000
> @@ -605,6 +605,11 @@ findpcb:
> tcpstat_inc(tcps_noport);
> goto dropwithreset_ratelim;
> }
> + so = in_pcbsolock_ref(inp);
> + if (so == NULL) {
> + tcpstat_inc(tcps_noport);
> + goto dropwithreset_ratelim;
> + }
>
> KASSERT(sotoinpcb(inp->inp_socket) == inp);
> KASSERT(intotcpcb(inp) == NULL || intotcpcb(inp)->t_inpcb == inp);
> @@ -637,7 +642,6 @@ findpcb:
> else
> tiwin = th->th_win;
>
> - so = inp->inp_socket;
> if (so->so_options & (SO_DEBUG|SO_ACCEPTCONN)) {
> union syn_cache_sa src;
> union syn_cache_sa dst;
> @@ -726,6 +730,7 @@ findpcb:
> * in use for the reply,
> * do not free it.
> */
> + so = NULL;
> m = *mp = NULL;
> goto drop;
> } else {
> @@ -733,13 +738,11 @@ findpcb:
> * We have created a
> * full-blown connection.
> */
> - tp = NULL;
> in_pcbunref(inp);
> inp = in_pcbref(sotoinpcb(so));
> tp = intotcpcb(inp);
> if (tp == NULL)
> goto badsyn; /*XXX*/
> -
> }
> break;
>
> @@ -845,6 +848,7 @@ findpcb:
> tcpstat_inc(tcps_dropsyn);
> goto drop;
> }
> + in_pcbsounlock_rele(inp, so);
> in_pcbunref(inp);
> return IPPROTO_DONE;
> }
> @@ -1020,6 +1024,7 @@ findpcb:
> if (so->so_snd.sb_cc ||
> tp->t_flags & TF_NEEDOUTPUT)
> (void) tcp_output(tp);
> + in_pcbsounlock_rele(inp, so);
> in_pcbunref(inp);
> return IPPROTO_DONE;
> }
> @@ -1070,6 +1075,7 @@ findpcb:
> tp->t_flags &= ~TF_BLOCKOUTPUT;
> if (tp->t_flags & (TF_ACKNOW|TF_NEEDOUTPUT))
> (void) tcp_output(tp);
> + in_pcbsounlock_rele(inp, so);
> in_pcbunref(inp);
> return IPPROTO_DONE;
> }
> @@ -1262,6 +1268,8 @@ trimthenstep6:
> ((arc4random() & 0x7fffffff) | 0x8000);
> reuse = &iss;
> tp = tcp_close(tp);
> + in_pcbsounlock_rele(inp, so);
> + so = NULL;
> in_pcbunref(inp);
> inp = NULL;
> goto findpcb;
> @@ -2062,6 +2070,7 @@ dodata: /* XXX */
> */
> if (tp->t_flags & (TF_ACKNOW|TF_NEEDOUTPUT))
> (void) tcp_output(tp);
> + in_pcbsounlock_rele(inp, so);
> in_pcbunref(inp);
> return IPPROTO_DONE;
>
> @@ -2091,6 +2100,7 @@ dropafterack:
> m_freem(m);
> tp->t_flags |= TF_ACKNOW;
> (void) tcp_output(tp);
> + in_pcbsounlock_rele(inp, so);
> in_pcbunref(inp);
> return IPPROTO_DONE;
>
> @@ -2126,6 +2136,7 @@ dropwithreset:
> (tcp_seq)0, TH_RST|TH_ACK, m->m_pkthdr.ph_rtableid, now);
> }
> m_freem(m);
> + in_pcbsounlock_rele(inp, so);
> in_pcbunref(inp);
> return IPPROTO_DONE;
>
> @@ -2137,6 +2148,7 @@ drop:
> tcp_trace(TA_DROP, ostate, tp, otp, &saveti.caddr, 0, tlen);
>
> m_freem(m);
> + in_pcbsounlock_rele(inp, so);
> in_pcbunref(inp);
> return IPPROTO_DONE;
> }
> @@ -3531,15 +3543,18 @@ syn_cache_get(struct sockaddr *src, stru
> struct inpcb *inp, *oldinp;
> struct tcpcb *tp = NULL;
> struct mbuf *am;
> - struct socket *oso;
> + struct socket *oldso;
> u_int rtableid;
>
> NET_ASSERT_LOCKED();
>
> + inp = sotoinpcb(so);
> +
> mtx_enter(&syn_cache_mtx);
> sc = syn_cache_lookup(src, dst, &scp, sotoinpcb(so)->inp_rtableid);
> if (sc == NULL) {
> mtx_leave(&syn_cache_mtx);
> + in_pcbsounlock_rele(inp, so);
> return (NULL);
> }
>
> @@ -3553,6 +3568,7 @@ syn_cache_get(struct sockaddr *src, stru
> refcnt_take(&sc->sc_refcnt);
> mtx_leave(&syn_cache_mtx);
> (void) syn_cache_respond(sc, m, now, do_ecn);
> + in_pcbsounlock_rele(inp, so);
> syn_cache_put(sc);
> return ((struct socket *)(-1));
> }
> @@ -3567,12 +3583,13 @@ syn_cache_get(struct sockaddr *src, stru
> * connection when the SYN arrived. If we can't create
> * the connection, abort it.
> */
> - oso = so;
> + oldso = so;
> + oldinp = inp;
> so = sonewconn(so, SS_ISCONNECTED, M_DONTWAIT);
> if (so == NULL)
> goto resetandabort;
> -
> - oldinp = sotoinpcb(oso);
> + soassertlocked(so);
> + soref(so);
> inp = sotoinpcb(so);
>
> #ifdef IPSEC
> @@ -3633,7 +3650,7 @@ syn_cache_get(struct sockaddr *src, stru
> (void) m_free(am);
>
> tp = intotcpcb(inp);
> - tp->t_flags = sototcpcb(oso)->t_flags & (TF_NOPUSH|TF_NODELAY);
> + tp->t_flags = sototcpcb(oldso)->t_flags & (TF_NOPUSH|TF_NODELAY);
> if (sc->sc_request_r_scale != 15) {
> tp->requested_s_scale = sc->sc_requested_s_scale;
> tp->request_r_scale = sc->sc_request_r_scale;
> @@ -3645,6 +3662,7 @@ syn_cache_get(struct sockaddr *src, stru
> tp->t_template = tcp_template(tp);
> if (tp->t_template == 0) {
> tp = tcp_drop(tp, ENOBUFS); /* destroys socket */
> + in_pcbsounlock_rele(inp, so);
> so = NULL;
> goto abort;
> }
> @@ -3696,8 +3714,9 @@ syn_cache_get(struct sockaddr *src, stru
> tp->rcv_adv = tp->rcv_nxt + sc->sc_win;
> tp->last_ack_sent = tp->rcv_nxt;
>
> - tcpstat_inc(tcps_sc_completed);
> + in_pcbsounlock_rele(oldinp, oldso);
> syn_cache_put(sc);
> + tcpstat_inc(tcps_sc_completed);
> return (so);
>
> resetandabort:
> @@ -3707,6 +3726,8 @@ abort:
> m_freem(m);
> if (so != NULL)
> soabort(so);
> + in_pcbsounlock_rele(inp, so);
> + in_pcbsounlock_rele(oldinp, oldso);
> syn_cache_put(sc);
> tcpstat_inc(tcps_sc_aborted);
> return ((struct socket *)(-1));
> @@ -3740,8 +3761,8 @@ syn_cache_reset(struct sockaddr *src, st
> }
> syn_cache_rm(sc);
> mtx_leave(&syn_cache_mtx);
> - tcpstat_inc(tcps_sc_reset);
> syn_cache_put(sc);
> + tcpstat_inc(tcps_sc_reset);
> }
>
> void
> @@ -3781,8 +3802,8 @@ syn_cache_unreach(const struct sockaddr
>
> syn_cache_rm(sc);
> mtx_leave(&syn_cache_mtx);
> - tcpstat_inc(tcps_sc_unreach);
> syn_cache_put(sc);
> + tcpstat_inc(tcps_sc_unreach);
> }
>
> /*
> @@ -3810,7 +3831,7 @@ syn_cache_add(struct sockaddr *src, stru
> struct syn_cache_head *scp;
> struct mbuf *ipopts;
>
> - NET_ASSERT_LOCKED();
> + soassertlocked(so);
>
> tp = sototcpcb(so);
>
> @@ -3968,9 +3989,8 @@ syn_cache_add(struct sockaddr *src, stru
> if (syn_cache_respond(sc, m, now, do_ecn) == 0) {
> mtx_enter(&syn_cache_mtx);
> /*
> - * XXXSMP Currently exclusive netlock prevents another insert
> - * after our syn_cache_lookup() and before syn_cache_insert().
> - * Double insert should be handled and not rely on netlock.
> + * Socket lock prevents another insert after our
> + * syn_cache_lookup() and before syn_cache_insert().
> */
> syn_cache_insert(sc, tp);
> mtx_leave(&syn_cache_mtx);
> Index: netinet6/in6_proto.c
> ===================================================================
> RCS file: /data/mirror/openbsd/cvs/src/sys/netinet6/in6_proto.c,v
> diff -u -p -r1.124 in6_proto.c
> --- netinet6/in6_proto.c 5 Jan 2025 12:36:48 -0000 1.124
> +++ netinet6/in6_proto.c 15 Jan 2025 15:23:05 -0000
> @@ -147,7 +147,8 @@ const struct protosw inet6sw[] = {
> .pr_type = SOCK_STREAM,
> .pr_domain = &inet6domain,
> .pr_protocol = IPPROTO_TCP,
> - .pr_flags = PR_CONNREQUIRED|PR_WANTRCVD|PR_ABRTACPTDIS|PR_SPLICE,
> + .pr_flags = PR_CONNREQUIRED|PR_WANTRCVD|PR_ABRTACPTDIS|PR_SPLICE|
> + PR_MPINPUT,
> .pr_input = tcp_input,
> .pr_ctlinput = tcp6_ctlinput,
> .pr_ctloutput = tcp_ctloutput,
> Index: netinet6/nd6.c
> ===================================================================
> RCS file: /data/mirror/openbsd/cvs/src/sys/netinet6/nd6.c,v
> diff -u -p -r1.283 nd6.c
> --- netinet6/nd6.c 4 Sep 2024 07:54:52 -0000 1.283
> +++ netinet6/nd6.c 15 Jan 2025 15:23:05 -0000
> @@ -709,7 +709,9 @@ nd6_nud_hint(struct rtentry *rt)
> struct llinfo_nd6 *ln;
> struct ifnet *ifp;
>
> + /* XXX
> NET_ASSERT_LOCKED_EXCLUSIVE();
> + */
>
> ifp = if_get(rt->rt_ifidx);
> if (ifp == NULL)
>
TCP parallel input preview