Index | Thread | Search

From:
Vitaliy Makkoveev <otto@bsdbox.dev>
Subject:
Re: parallel raw IP input
To:
Alexander Bluhm <bluhm@openbsd.org>
Cc:
tech@openbsd.org
Date:
Fri, 12 Apr 2024 01:03:39 +0300

Download raw body.

Thread
  • Alexander Bluhm:

    parallel raw IP input

    • Vitaliy Makkoveev:

      parallel raw IP input

> On 11 Apr 2024, at 23:24, Alexander Bluhm <bluhm@openbsd.org> wrote:
> 
> Hi,
> 
> As mvs@ mentioned, running raw IP in parallel is easier as it is
> less complex than UDP.  Especially there is no socket splicing.
> 
> So I fixed one race in rip_input() and reused my shared net lock
> ip_deliver() loop.
> 
> The idea is that ip_deliver() may run with shared or exclusive net
> lock.  The last parameter indicates the mode.  If is is running
> with shared netlock and encounters a protocol that needs exclusive
> lock, the packet is queued.
> 
> Before ip_ours() always queued the packet.  Now it tries to deliver
> with shared net lock, and if that is not possible, it queues the
> packet.
> 
> In case we have an IPv6 header chain that must switch from shared
> to exclusive processing, the next protocol and mbuf offset are
> stored in a mbuf tag.
> 
> The only drawback is that we have very limited test coverage for
> raw IP.  The ip_deliver() shared locking change works also with
> UDP, Hvroje has tested it in 2022.
> 
> ok?
> 

Sure, ok mvs.

> bluhm
> 
> Index: net/if_bridge.c
> ===================================================================
> RCS file: /data/mirror/openbsd/cvs/src/sys/net/if_bridge.c,v
> diff -u -p -r1.369 if_bridge.c
> --- net/if_bridge.c	13 Feb 2024 12:22:09 -0000	1.369
> +++ net/if_bridge.c	11 Apr 2024 18:15:15 -0000
> @@ -1600,7 +1600,7 @@ bridge_ipsec(struct ifnet *ifp, struct e
> 			    off);
> 			tdb_unref(tdb);
> 			if (prot != IPPROTO_DONE)
> -				ip_deliver(&m, &hlen, prot, af);
> +				ip_deliver(&m, &hlen, prot, af, 0);
> 			return (1);
> 		} else {
> 			tdb_unref(tdb);
> Index: netinet/in_proto.c
> ===================================================================
> RCS file: /data/mirror/openbsd/cvs/src/sys/netinet/in_proto.c,v
> diff -u -p -r1.103 in_proto.c
> --- netinet/in_proto.c	11 Jan 2024 14:15:12 -0000	1.103
> +++ netinet/in_proto.c	11 Apr 2024 18:15:15 -0000
> @@ -210,7 +210,7 @@ const struct protosw inetsw[] = {
>   .pr_type	= SOCK_RAW,
>   .pr_domain	= &inetdomain,
>   .pr_protocol	= IPPROTO_RAW,
> -  .pr_flags	= PR_ATOMIC|PR_ADDR,
> +  .pr_flags	= PR_ATOMIC|PR_ADDR|PR_MPINPUT,
>   .pr_input	= rip_input,
>   .pr_ctloutput	= rip_ctloutput,
>   .pr_usrreqs	= &rip_usrreqs,
> @@ -377,7 +377,7 @@ const struct protosw inetsw[] = {
>   /* raw wildcard */
>   .pr_type	= SOCK_RAW,
>   .pr_domain	= &inetdomain,
> -  .pr_flags	= PR_ATOMIC|PR_ADDR,
> +  .pr_flags	= PR_ATOMIC|PR_ADDR|PR_MPINPUT,
>   .pr_input	= rip_input,
>   .pr_ctloutput	= rip_ctloutput,
>   .pr_usrreqs	= &rip_usrreqs,
> Index: netinet/ip_input.c
> ===================================================================
> RCS file: /data/mirror/openbsd/cvs/src/sys/netinet/ip_input.c,v
> diff -u -p -r1.391 ip_input.c
> --- netinet/ip_input.c	28 Feb 2024 10:57:20 -0000	1.391
> +++ netinet/ip_input.c	11 Apr 2024 19:51:52 -0000
> @@ -245,6 +245,30 @@ ip_ours(struct mbuf **mp, int *offp, int
> 	if (af != AF_UNSPEC)
> 		return nxt;
> 
> +	nxt = ip_deliver(mp, offp, nxt, AF_INET, 1);
> +	if (nxt == IPPROTO_DONE)
> +		return IPPROTO_DONE;
> +
> +	/* save values for later, use after dequeue */
> +	if (*offp != sizeof(struct ip)) {
> +		struct m_tag *mtag;
> +		struct ipoffnxt *ion;
> +
> +		/* mbuf tags are expensive, but only used for header options */
> +		mtag = m_tag_get(PACKET_TAG_IP_OFFNXT, sizeof(*ion),
> +		    M_NOWAIT);
> +		if (mtag == NULL) {
> +			ipstat_inc(ips_idropped);
> +			m_freemp(mp);
> +			return IPPROTO_DONE;
> +		}
> +		ion = (struct ipoffnxt *)(mtag + 1);
> +		ion->ion_off = *offp;
> +		ion->ion_nxt = nxt;
> +
> +		m_tag_prepend(*mp, mtag);
> +	}
> +
> 	niq_enqueue(&ipintrq, *mp);
> 	*mp = NULL;
> 	return IPPROTO_DONE;
> @@ -260,18 +284,31 @@ ipintr(void)
> 	struct mbuf *m;
> 
> 	while ((m = niq_dequeue(&ipintrq)) != NULL) {
> -		struct ip *ip;
> +		struct m_tag *mtag;
> 		int off, nxt;
> 
> #ifdef DIAGNOSTIC
> 		if ((m->m_flags & M_PKTHDR) == 0)
> 			panic("ipintr no HDR");
> #endif
> -		ip = mtod(m, struct ip *);
> -		off = ip->ip_hl << 2;
> -		nxt = ip->ip_p;
> +		mtag = m_tag_find(m, PACKET_TAG_IP_OFFNXT, NULL);
> +		if (mtag != NULL) {
> +			struct ipoffnxt *ion;
> +
> +			ion = (struct ipoffnxt *)(mtag + 1);
> +			off = ion->ion_off;
> +			nxt = ion->ion_nxt;
> +
> +			m_tag_delete(m, mtag);
> +		} else {
> +			struct ip *ip;
> 
> -		nxt = ip_deliver(&m, &off, nxt, AF_INET);
> +			ip = mtod(m, struct ip *);
> +			off = ip->ip_hl << 2;
> +			nxt = ip->ip_p;
> +		}
> +
> +		nxt = ip_deliver(&m, &off, nxt, AF_INET, 0);
> 		KASSERT(nxt == IPPROTO_DONE);
> 	}
> }
> @@ -675,15 +712,11 @@ ip_fragcheck(struct mbuf **mp, int *offp
> #endif
> 
> int
> -ip_deliver(struct mbuf **mp, int *offp, int nxt, int af)
> +ip_deliver(struct mbuf **mp, int *offp, int nxt, int af, int shared)
> {
> -	const struct protosw *psw;
> -	int naf = af;
> #ifdef INET6
> 	int nest = 0;
> -#endif /* INET6 */
> -
> -	NET_ASSERT_LOCKED_EXCLUSIVE();
> +#endif
> 
> 	/*
> 	 * Tell launch routine the next header
> @@ -691,13 +724,41 @@ ip_deliver(struct mbuf **mp, int *offp, 
> 	IPSTAT_INC(delivered);
> 
> 	while (nxt != IPPROTO_DONE) {
> +		const struct protosw *psw;
> +		int naf;
> +
> +		switch (af) {
> +		case AF_INET:
> +			psw = &inetsw[ip_protox[nxt]];
> +			break;
> +#ifdef INET6
> +		case AF_INET6:
> +			psw = &inet6sw[ip6_protox[nxt]];
> +			break;
> +#endif
> +		}
> +		if (shared && !ISSET(psw->pr_flags, PR_MPINPUT)) {
> +			/* delivery not finished, decrement counter, queue */
> +			switch (af) {
> +			case AF_INET:
> +				counters_dec(ipcounters, ips_delivered);
> +				break;
> +#ifdef INET6
> +			case AF_INET6:
> +				counters_dec(ip6counters, ip6s_delivered);
> +				break;
> +#endif
> +			}
> +			break;
> +		}
> +
> #ifdef INET6
> 		if (af == AF_INET6 &&
> 		    ip6_hdrnestlimit && (++nest > ip6_hdrnestlimit)) {
> 			ip6stat_inc(ip6s_toomanyhdr);
> 			goto bad;
> 		}
> -#endif /* INET6 */
> +#endif
> 
> 		/*
> 		 * protection against faulty packet - there should be
> @@ -716,7 +777,7 @@ ip_deliver(struct mbuf **mp, int *offp, 
> 			}
> 		}
> 		/* Otherwise, just fall through and deliver the packet */
> -#endif /* IPSEC */
> +#endif
> 
> 		switch (nxt) {
> 		case IPPROTO_IPV4:
> @@ -728,17 +789,10 @@ ip_deliver(struct mbuf **mp, int *offp, 
> 			naf = AF_INET6;
> 			ip6stat_inc(ip6s_delivered);
> 			break;
> -#endif /* INET6 */
> -		}
> -		switch (af) {
> -		case AF_INET:
> -			psw = &inetsw[ip_protox[nxt]];
> -			break;
> -#ifdef INET6
> -		case AF_INET6:
> -			psw = &inet6sw[ip6_protox[nxt]];
> +#endif
> +		default:
> +			naf = af;
> 			break;
> -#endif /* INET6 */
> 		}
> 		nxt = (*psw->pr_input)(mp, offp, nxt, af);
> 		af = naf;
> Index: netinet/ip_var.h
> ===================================================================
> RCS file: /data/mirror/openbsd/cvs/src/sys/netinet/ip_var.h,v
> diff -u -p -r1.114 ip_var.h
> --- netinet/ip_var.h	5 Mar 2024 09:45:13 -0000	1.114
> +++ netinet/ip_var.h	11 Apr 2024 18:15:15 -0000
> @@ -198,6 +198,11 @@ struct ipq {
> 	struct	  in_addr ipq_src, ipq_dst;
> };
> 
> +struct ipoffnxt {
> +	int	ion_off;
> +	int	ion_nxt;
> +};
> +
> /* flags passed to ip_output */
> #define	IP_FORWARDING		0x1		/* most of ip header exists */
> #define	IP_RAWOUTPUT		0x2		/* raw ip header exists */
> @@ -254,7 +259,7 @@ int	 ip_sysctl(int *, u_int, void *, siz
> void	 ip_savecontrol(struct inpcb *, struct mbuf **, struct ip *,
> 	    struct mbuf *);
> int	 ip_input_if(struct mbuf **, int *, int, int, struct ifnet *);
> -int	 ip_deliver(struct mbuf **, int *, int, int);
> +int	 ip_deliver(struct mbuf **, int *, int, int, int);
> void	 ip_forward(struct mbuf *, struct ifnet *, struct rtentry *, int);
> int	 rip_ctloutput(int, struct socket *, int, int, struct mbuf *);
> void	 rip_init(void);
> Index: netinet/raw_ip.c
> ===================================================================
> RCS file: /data/mirror/openbsd/cvs/src/sys/netinet/raw_ip.c,v
> diff -u -p -r1.157 raw_ip.c
> --- netinet/raw_ip.c	5 Mar 2024 09:45:13 -0000	1.157
> +++ netinet/raw_ip.c	11 Apr 2024 18:15:15 -0000
> @@ -172,7 +172,13 @@ rip_input(struct mbuf **mp, int *offp, i
> 	TAILQ_FOREACH(inp, &rawcbtable.inpt_queue, inp_queue) {
> 		KASSERT(!ISSET(inp->inp_flags, INP_IPV6));
> 
> -		if (inp->inp_socket->so_rcv.sb_state & SS_CANTRCVMORE)
> +		/*
> +		 * Packet must not be inserted after disconnected wakeup
> +		 * call.  To avoid race, check again when holding receive
> +		 * buffer mutex.
> +		 */
> +		if (ISSET(READ_ONCE(inp->inp_socket->so_rcv.sb_state),
> +		    SS_CANTRCVMORE))
> 			continue;
> 		if (rtable_l2(inp->inp_rtableid) !=
> 		    rtable_l2(m->m_pkthdr.ph_rtableid))
> @@ -219,21 +225,24 @@ rip_input(struct mbuf **mp, int *offp, i
> 			n = m_copym(m, 0, M_COPYALL, M_NOWAIT);
> 		if (n != NULL) {
> 			struct socket *so = inp->inp_socket;
> -			int ret;
> +			int ret = 0;
> 
> 			if (inp->inp_flags & INP_CONTROLOPTS ||
> 			    so->so_options & SO_TIMESTAMP)
> 				ip_savecontrol(inp, &opts, ip, n);
> 
> 			mtx_enter(&so->so_rcv.sb_mtx);
> -			ret = sbappendaddr(so, &so->so_rcv,
> -			    sintosa(&ripsrc), n, opts);
> +			if (!ISSET(inp->inp_socket->so_rcv.sb_state,
> +			    SS_CANTRCVMORE)) {
> +				ret = sbappendaddr(so, &so->so_rcv,
> +				    sintosa(&ripsrc), n, opts);
> +			}
> 			mtx_leave(&so->so_rcv.sb_mtx);
> 
> 			if (ret == 0) {
> -				/* should notify about lost packet */
> 				m_freem(n);
> 				m_freem(opts);
> +				ipstat_inc(ips_noproto);
> 			} else
> 				sorwakeup(so);
> 		}
> Index: netinet6/ip6_input.c
> ===================================================================
> RCS file: /data/mirror/openbsd/cvs/src/sys/netinet6/ip6_input.c,v
> diff -u -p -r1.259 ip6_input.c
> --- netinet6/ip6_input.c	28 Feb 2024 10:57:20 -0000	1.259
> +++ netinet6/ip6_input.c	11 Apr 2024 18:15:15 -0000
> @@ -166,11 +166,6 @@ ip6_init(void)
> #endif
> }
> 
> -struct ip6_offnxt {
> -	int	ion_off;
> -	int	ion_nxt;
> -};
> -
> /*
>  * Enqueue packet for local delivery.  Queuing is used as a boundary
>  * between the network layer (input/forward path) running with
> @@ -190,10 +185,14 @@ ip6_ours(struct mbuf **mp, int *offp, in
> 	if (af != AF_UNSPEC)
> 		return nxt;
> 
> +	nxt = ip_deliver(mp, offp, nxt, AF_INET6, 1);
> +	if (nxt == IPPROTO_DONE)
> +		return IPPROTO_DONE;
> +
> 	/* save values for later, use after dequeue */
> 	if (*offp != sizeof(struct ip6_hdr)) {
> 		struct m_tag *mtag;
> -		struct ip6_offnxt *ion;
> +		struct ipoffnxt *ion;
> 
> 		/* mbuf tags are expensive, but only used for header options */
> 		mtag = m_tag_get(PACKET_TAG_IP6_OFFNXT, sizeof(*ion),
> @@ -203,7 +202,7 @@ ip6_ours(struct mbuf **mp, int *offp, in
> 			m_freemp(mp);
> 			return IPPROTO_DONE;
> 		}
> -		ion = (struct ip6_offnxt *)(mtag + 1);
> +		ion = (struct ipoffnxt *)(mtag + 1);
> 		ion->ion_off = *offp;
> 		ion->ion_nxt = nxt;
> 
> @@ -234,9 +233,9 @@ ip6intr(void)
> #endif
> 		mtag = m_tag_find(m, PACKET_TAG_IP6_OFFNXT, NULL);
> 		if (mtag != NULL) {
> -			struct ip6_offnxt *ion;
> +			struct ipoffnxt *ion;
> 
> -			ion = (struct ip6_offnxt *)(mtag + 1);
> +			ion = (struct ipoffnxt *)(mtag + 1);
> 			off = ion->ion_off;
> 			nxt = ion->ion_nxt;
> 
> @@ -248,7 +247,7 @@ ip6intr(void)
> 			off = sizeof(struct ip6_hdr);
> 			nxt = ip6->ip6_nxt;
> 		}
> -		nxt = ip_deliver(&m, &off, nxt, AF_INET6);
> +		nxt = ip_deliver(&m, &off, nxt, AF_INET6, 0);
> 		KASSERT(nxt == IPPROTO_DONE);
> 	}
> }
> Index: sys/mbuf.h
> ===================================================================
> RCS file: /data/mirror/openbsd/cvs/src/sys/sys/mbuf.h,v
> diff -u -p -r1.262 mbuf.h
> --- sys/mbuf.h	21 Feb 2024 13:42:06 -0000	1.262
> +++ sys/mbuf.h	11 Apr 2024 18:15:15 -0000
> @@ -471,6 +471,8 @@ struct m_tag *m_tag_next(struct mbuf *, 
> #define PACKET_TAG_IPSEC_IN_DONE	0x0001  /* IPsec applied, in */
> #define PACKET_TAG_IPSEC_OUT_DONE	0x0002  /* IPsec applied, out */
> #define PACKET_TAG_IPSEC_FLOWINFO	0x0004	/* IPsec flowinfo */
> +#define PACKET_TAG_IP_OFFNXT		0x0010  /* IPv4 offset and next proto */
> +#define PACKET_TAG_IP6_OFFNXT		0x0020  /* IPv6 offset and next proto */
> #define PACKET_TAG_WIREGUARD		0x0040  /* WireGuard data */
> #define PACKET_TAG_GRE			0x0080  /* GRE processing done */
> #define PACKET_TAG_DLT			0x0100 /* data link layer type */
> @@ -479,7 +481,6 @@ struct m_tag *m_tag_next(struct mbuf *, 
> #define PACKET_TAG_SRCROUTE		0x1000 /* IPv4 source routing options */
> #define PACKET_TAG_TUNNEL		0x2000	/* Tunnel endpoint address */
> #define PACKET_TAG_CARP_BAL_IP		0x4000  /* carp(4) ip balanced marker */
> -#define PACKET_TAG_IP6_OFFNXT		0x8000  /* IPv6 offset and next proto */
> 
> #define MTAG_BITS \
>     ("\20\1IPSEC_IN_DONE\2IPSEC_OUT_DONE\3IPSEC_FLOWINFO" \
> Index: sys/protosw.h
> ===================================================================
> RCS file: /data/mirror/openbsd/cvs/src/sys/sys/protosw.h,v
> diff -u -p -r1.65 protosw.h
> --- sys/protosw.h	3 Feb 2024 22:50:09 -0000	1.65
> +++ sys/protosw.h	11 Apr 2024 18:15:15 -0000
> @@ -132,6 +132,7 @@ struct protosw {
> #define PR_ABRTACPTDIS	0x0020		/* abort on accept(2) to disconnected
> 					   socket */
> #define PR_SPLICE	0x0040		/* socket splicing is possible */
> +#define PR_MPINPUT	0x0080		/* input runs with shared netlock */
> 
> /*
>  * The arguments to usrreq are:
>