Index | Thread | Search

From:
Vitaliy Makkoveev <otto@bsdbox.dev>
Subject:
Re: TCP parallel input preview
To:
Alexander Bluhm <bluhm@openbsd.org>
Cc:
tech@openbsd.org
Date:
Thu, 16 Jan 2025 01:11:32 +0300

Download raw body.

Thread
> On 15 Jan 2025, at 18:49, Alexander Bluhm <bluhm@openbsd.org> wrote:
> 
> Hi,
> 
> This is my idea to unlock TCP input and run it in parallel.  On my
> 4 CPU test machine I gain 100% percent TCP throughput when running
> 10 TCP streams in parallel.  Unfortunately single TCP performance
> drops by 25%.
> 
> Before it can be commited these issues have to be resolved:
> - sonewconn() must return locked socket a bit more consistently
> - nd6_nud_hint() must be made MP safe
> - single stream performance must not be much slower than currently
> 
> In my flame graphs I see contention on the socket lock between
> soreceive() and tcp_input().
> http://bluhm.genua.de/perform/results/2025-01-14T10%3A27%3A19Z/patch-sys-tcp-mpinput.0/btrace/iperf3_-c10.3.45.35_-w1m_-t10_-R-btrace-kstack.0.svg?s=rw_
> 
> With current exclusive net lock the packets are queued before
> tcp_input().  With this single lock we do bulk input which seems
> to perform better.  Getting the socket lock for each TCP packet is
> slow.
> 
> Multiple streams on multiple CPU compensate the locking effect.
> http://bluhm.genua.de/perform/results/2025-01-14T10%3A27%3A19Z/patch-sys-tcp-mpinput.0/btrace/iperf3_-c10.3.45.35_-w1m_-P10_-t10_-R-btrace-kstack.0.svg?s=rw_
> 
> bluhm
> 
> Index: kern/uipc_socket2.c
> ===================================================================
> RCS file: /data/mirror/openbsd/cvs/src/sys/kern/uipc_socket2.c,v
> diff -u -p -r1.164 uipc_socket2.c
> --- kern/uipc_socket2.c	5 Jan 2025 12:36:48 -0000	1.164
> +++ kern/uipc_socket2.c	15 Jan 2025 15:23:05 -0000
> @@ -220,6 +220,8 @@ sonewconn(struct socket *head, int conns
> 	 */
> 	if (persocket)
> 		solock(so);
> +	else
> +		rw_enter_write(&so->so_lock);
> 
> 	/*
> 	 * Inherit watermarks but those may get clamped in low mem situations.
> @@ -260,6 +262,8 @@ sonewconn(struct socket *head, int conns
> fail:
> 	if (persocket)
> 		sounlock(so);
> +	else
> +		rw_exit_write(&so->so_lock);
> 	sigio_free(&so->so_sigio);
> 	klist_free(&so->so_rcv.sb_klist);
> 	klist_free(&so->so_snd.sb_klist);
> Index: netinet/in_proto.c
> ===================================================================
> RCS file: /data/mirror/openbsd/cvs/src/sys/netinet/in_proto.c,v
> diff -u -p -r1.121 in_proto.c
> --- netinet/in_proto.c	5 Jan 2025 12:36:48 -0000	1.121
> +++ netinet/in_proto.c	15 Jan 2025 15:23:05 -0000
> @@ -197,7 +197,8 @@ const struct protosw inetsw[] = {
>   .pr_type	= SOCK_STREAM,
>   .pr_domain	= &inetdomain,
>   .pr_protocol	= IPPROTO_TCP,
> -  .pr_flags	= PR_CONNREQUIRED|PR_WANTRCVD|PR_ABRTACPTDIS|PR_SPLICE,
> +  .pr_flags	= PR_CONNREQUIRED|PR_WANTRCVD|PR_ABRTACPTDIS|PR_SPLICE|
> +		    PR_MPINPUT,
>   .pr_input	= tcp_input,
>   .pr_ctlinput	= tcp_ctlinput,
>   .pr_ctloutput	= tcp_ctloutput,
> Index: netinet/tcp_input.c
> ===================================================================
> RCS file: /data/mirror/openbsd/cvs/src/sys/netinet/tcp_input.c,v
> diff -u -p -r1.422 tcp_input.c
> --- netinet/tcp_input.c	10 Jan 2025 20:19:03 -0000	1.422
> +++ netinet/tcp_input.c	15 Jan 2025 15:26:07 -0000
> @@ -605,6 +605,11 @@ findpcb:
> 		tcpstat_inc(tcps_noport);
> 		goto dropwithreset_ratelim;
> 	}
> +	so = in_pcbsolock_ref(inp);
> +	if (so == NULL) {
> +		tcpstat_inc(tcps_noport);
> +		goto dropwithreset_ratelim;
> +	}
> 
> 	KASSERT(sotoinpcb(inp->inp_socket) == inp);
> 	KASSERT(intotcpcb(inp) == NULL || intotcpcb(inp)->t_inpcb == inp);
> @@ -637,7 +642,6 @@ findpcb:
> 	else
> 		tiwin = th->th_win;
> 
> -	so = inp->inp_socket;
> 	if (so->so_options & (SO_DEBUG|SO_ACCEPTCONN)) {
> 		union syn_cache_sa src;
> 		union syn_cache_sa dst;
> @@ -726,6 +730,7 @@ findpcb:
> 					 * in use for the reply,
> 					 * do not free it.
> 					 */
> +					so = NULL;
> 					m = *mp = NULL;
> 					goto drop;
> 				} else {
> @@ -733,13 +738,11 @@ findpcb:
> 					 * We have created a
> 					 * full-blown connection.
> 					 */
> -					tp = NULL;
> 					in_pcbunref(inp);
> 					inp = in_pcbref(sotoinpcb(so));
> 					tp = intotcpcb(inp);
> 					if (tp == NULL)
> 						goto badsyn;	/*XXX*/
> -
> 				}
> 				break;
> 
> @@ -845,6 +848,7 @@ findpcb:
> 					tcpstat_inc(tcps_dropsyn);
> 					goto drop;
> 				}
> +				in_pcbsounlock_rele(inp, so);
> 				in_pcbunref(inp);
> 				return IPPROTO_DONE;
> 			}
> @@ -1020,6 +1024,7 @@ findpcb:
> 				if (so->so_snd.sb_cc ||
> 				    tp->t_flags & TF_NEEDOUTPUT)
> 					(void) tcp_output(tp);
> +				in_pcbsounlock_rele(inp, so);
> 				in_pcbunref(inp);
> 				return IPPROTO_DONE;
> 			}
> @@ -1070,6 +1075,7 @@ findpcb:
> 			tp->t_flags &= ~TF_BLOCKOUTPUT;
> 			if (tp->t_flags & (TF_ACKNOW|TF_NEEDOUTPUT))
> 				(void) tcp_output(tp);
> +			in_pcbsounlock_rele(inp, so);
> 			in_pcbunref(inp);
> 			return IPPROTO_DONE;
> 		}
> @@ -1262,6 +1268,8 @@ trimthenstep6:
> 			    ((arc4random() & 0x7fffffff) | 0x8000);
> 			reuse = &iss;
> 			tp = tcp_close(tp);
> +			in_pcbsounlock_rele(inp, so);
> +			so = NULL;
> 			in_pcbunref(inp);
> 			inp = NULL;
> 			goto findpcb;
> @@ -2062,6 +2070,7 @@ dodata:							/* XXX */
> 	 */
> 	if (tp->t_flags & (TF_ACKNOW|TF_NEEDOUTPUT))
> 		(void) tcp_output(tp);
> +	in_pcbsounlock_rele(inp, so);
> 	in_pcbunref(inp);
> 	return IPPROTO_DONE;
> 
> @@ -2091,6 +2100,7 @@ dropafterack:
> 	m_freem(m);
> 	tp->t_flags |= TF_ACKNOW;
> 	(void) tcp_output(tp);
> +	in_pcbsounlock_rele(inp, so);
> 	in_pcbunref(inp);
> 	return IPPROTO_DONE;
> 
> @@ -2126,6 +2136,7 @@ dropwithreset:
> 		    (tcp_seq)0, TH_RST|TH_ACK, m->m_pkthdr.ph_rtableid, now);
> 	}
> 	m_freem(m);
> +	in_pcbsounlock_rele(inp, so);
> 	in_pcbunref(inp);
> 	return IPPROTO_DONE;
> 
> @@ -2137,6 +2148,7 @@ drop:
> 		tcp_trace(TA_DROP, ostate, tp, otp, &saveti.caddr, 0, tlen);
> 
> 	m_freem(m);
> +	in_pcbsounlock_rele(inp, so);
> 	in_pcbunref(inp);
> 	return IPPROTO_DONE;
> }
> @@ -3531,15 +3543,18 @@ syn_cache_get(struct sockaddr *src, stru
> 	struct inpcb *inp, *oldinp;
> 	struct tcpcb *tp = NULL;
> 	struct mbuf *am;
> -	struct socket *oso;
> +	struct socket *oldso;
> 	u_int rtableid;
> 
> 	NET_ASSERT_LOCKED();
> 
> +	inp = sotoinpcb(so);
> +
> 	mtx_enter(&syn_cache_mtx);
> 	sc = syn_cache_lookup(src, dst, &scp, sotoinpcb(so)->inp_rtableid);
> 	if (sc == NULL) {
> 		mtx_leave(&syn_cache_mtx);
> +		in_pcbsounlock_rele(inp, so);
> 		return (NULL);
> 	}
> 
> @@ -3553,6 +3568,7 @@ syn_cache_get(struct sockaddr *src, stru
> 		refcnt_take(&sc->sc_refcnt);
> 		mtx_leave(&syn_cache_mtx);
> 		(void) syn_cache_respond(sc, m, now, do_ecn);
> +		in_pcbsounlock_rele(inp, so);
> 		syn_cache_put(sc);
> 		return ((struct socket *)(-1));
> 	}
> @@ -3567,12 +3583,13 @@ syn_cache_get(struct sockaddr *src, stru
> 	 * connection when the SYN arrived.  If we can't create
> 	 * the connection, abort it.
> 	 */
> -	oso = so;
> +	oldso = so;
> +	oldinp = inp;
> 	so = sonewconn(so, SS_ISCONNECTED, M_DONTWAIT);
> 	if (so == NULL)
> 		goto resetandabort;
> -
> -	oldinp = sotoinpcb(oso);
> +	soassertlocked(so);
> +	soref(so);
> 	inp = sotoinpcb(so);
> 
> #ifdef IPSEC
> @@ -3633,7 +3650,7 @@ syn_cache_get(struct sockaddr *src, stru
> 	(void) m_free(am);
> 
> 	tp = intotcpcb(inp);
> -	tp->t_flags = sototcpcb(oso)->t_flags & (TF_NOPUSH|TF_NODELAY);
> +	tp->t_flags = sototcpcb(oldso)->t_flags & (TF_NOPUSH|TF_NODELAY);
> 	if (sc->sc_request_r_scale != 15) {
> 		tp->requested_s_scale = sc->sc_requested_s_scale;
> 		tp->request_r_scale = sc->sc_request_r_scale;
> @@ -3645,6 +3662,7 @@ syn_cache_get(struct sockaddr *src, stru
> 	tp->t_template = tcp_template(tp);
> 	if (tp->t_template == 0) {
> 		tp = tcp_drop(tp, ENOBUFS);	/* destroys socket */
> +		in_pcbsounlock_rele(inp, so);
> 		so = NULL;
> 		goto abort;
> 	}
> @@ -3696,8 +3714,9 @@ syn_cache_get(struct sockaddr *src, stru
> 		tp->rcv_adv = tp->rcv_nxt + sc->sc_win;
> 	tp->last_ack_sent = tp->rcv_nxt;
> 
> -	tcpstat_inc(tcps_sc_completed);
> +	in_pcbsounlock_rele(oldinp, oldso);
> 	syn_cache_put(sc);
> +	tcpstat_inc(tcps_sc_completed);
> 	return (so);
> 
> resetandabort:
> @@ -3707,6 +3726,8 @@ abort:
> 	m_freem(m);
> 	if (so != NULL)
> 		soabort(so);
> +	in_pcbsounlock_rele(inp, so);
> +	in_pcbsounlock_rele(oldinp, oldso);
> 	syn_cache_put(sc);
> 	tcpstat_inc(tcps_sc_aborted);
> 	return ((struct socket *)(-1));
> @@ -3740,8 +3761,8 @@ syn_cache_reset(struct sockaddr *src, st
> 	}
> 	syn_cache_rm(sc);
> 	mtx_leave(&syn_cache_mtx);
> -	tcpstat_inc(tcps_sc_reset);
> 	syn_cache_put(sc);
> +	tcpstat_inc(tcps_sc_reset);
> }
> 
> void
> @@ -3781,8 +3802,8 @@ syn_cache_unreach(const struct sockaddr 
> 
> 	syn_cache_rm(sc);
> 	mtx_leave(&syn_cache_mtx);
> -	tcpstat_inc(tcps_sc_unreach);
> 	syn_cache_put(sc);
> +	tcpstat_inc(tcps_sc_unreach);
> }
> 
> /*
> @@ -3810,7 +3831,7 @@ syn_cache_add(struct sockaddr *src, stru
> 	struct syn_cache_head *scp;
> 	struct mbuf *ipopts;
> 
> -	NET_ASSERT_LOCKED();
> +	soassertlocked(so);
> 
> 	tp = sototcpcb(so);
> 
> @@ -3968,9 +3989,8 @@ syn_cache_add(struct sockaddr *src, stru
> 	if (syn_cache_respond(sc, m, now, do_ecn) == 0) {
> 		mtx_enter(&syn_cache_mtx);
> 		/*
> -		 * XXXSMP Currently exclusive netlock prevents another insert
> -		 * after our syn_cache_lookup() and before syn_cache_insert().
> -		 * Double insert should be handled and not rely on netlock.
> +		 * Socket lock prevents another insert after our
> +		 * syn_cache_lookup() and before syn_cache_insert().
> 		 */
> 		syn_cache_insert(sc, tp);
> 		mtx_leave(&syn_cache_mtx);
> Index: netinet6/in6_proto.c
> ===================================================================
> RCS file: /data/mirror/openbsd/cvs/src/sys/netinet6/in6_proto.c,v
> diff -u -p -r1.124 in6_proto.c
> --- netinet6/in6_proto.c	5 Jan 2025 12:36:48 -0000	1.124
> +++ netinet6/in6_proto.c	15 Jan 2025 15:23:05 -0000
> @@ -147,7 +147,8 @@ const struct protosw inet6sw[] = {
>   .pr_type	= SOCK_STREAM,
>   .pr_domain	= &inet6domain,
>   .pr_protocol	= IPPROTO_TCP,
> -  .pr_flags	= PR_CONNREQUIRED|PR_WANTRCVD|PR_ABRTACPTDIS|PR_SPLICE,
> +  .pr_flags	= PR_CONNREQUIRED|PR_WANTRCVD|PR_ABRTACPTDIS|PR_SPLICE|
> +		    PR_MPINPUT,
>   .pr_input	= tcp_input,
>   .pr_ctlinput	= tcp6_ctlinput,
>   .pr_ctloutput	= tcp_ctloutput,
> Index: netinet6/nd6.c
> ===================================================================
> RCS file: /data/mirror/openbsd/cvs/src/sys/netinet6/nd6.c,v
> diff -u -p -r1.283 nd6.c
> --- netinet6/nd6.c	4 Sep 2024 07:54:52 -0000	1.283
> +++ netinet6/nd6.c	15 Jan 2025 15:23:05 -0000
> @@ -709,7 +709,9 @@ nd6_nud_hint(struct rtentry *rt)
> 	struct llinfo_nd6 *ln;
> 	struct ifnet *ifp;
> 
> +	/* XXX
> 	NET_ASSERT_LOCKED_EXCLUSIVE();
> +	*/
> 
> 	ifp = if_get(rt->rt_ifidx);
> 	if (ifp == NULL)
>