From: Claudio Jeker Subject: bgpd: use msgbuf_queuelen where appropriate To: tech@openbsd.org Date: Tue, 20 Aug 2024 12:25:45 +0200 When working with msgbuf and not imsg buffers use msgbuf_queuelen() instead of wbuf.queue. Also export the message queue length as a stat instead of grabbing it out of struct peer in bgpctl. PS: there will be a imsgbuf_queuelen() which does the same for imsgbufs. -- :wq Claudio Index: bgpctl/output.c =================================================================== RCS file: /cvs/src/usr.sbin/bgpctl/output.c,v diff -u -p -r1.53 output.c --- bgpctl/output.c 14 Aug 2024 19:10:51 -0000 1.53 +++ bgpctl/output.c 20 Aug 2024 09:35:32 -0000 @@ -119,7 +119,7 @@ show_summary(struct peer *p) p->stats.msg_sent_open + p->stats.msg_sent_notification + p->stats.msg_sent_update + p->stats.msg_sent_keepalive + p->stats.msg_sent_rrefresh, - p->wbuf.queued, + p->stats.msg_queue_len, fmt_monotime(p->stats.last_updown)); if (p->state == STATE_ESTABLISHED) { printf("%6u", p->stats.prefix_cnt); Index: bgpd/control.c =================================================================== RCS file: /cvs/src/usr.sbin/bgpd/control.c,v diff -u -p -r1.117 control.c --- bgpd/control.c 22 Apr 2024 09:36:04 -0000 1.117 +++ bgpd/control.c 20 Aug 2024 09:27:06 -0000 @@ -564,6 +564,7 @@ control_imsg_relay(struct imsg *imsg, st p->stats.prefix_sent_eor = stats.prefix_sent_eor; p->stats.pending_update = stats.pending_update; p->stats.pending_withdraw = stats.pending_withdraw; + p->stats.msg_queue_len = msgbuf_queuelen(&p->wbuf); return imsg_compose(&c->imsgbuf, type, 0, pid, -1, p, sizeof(*p)); Index: bgpd/rde.c =================================================================== RCS file: /cvs/src/usr.sbin/bgpd/rde.c,v diff -u -p -r1.626 rde.c --- bgpd/rde.c 14 Aug 2024 19:09:51 -0000 1.626 +++ bgpd/rde.c 20 Aug 2024 09:34:30 -0000 @@ -245,7 +245,7 @@ rde_main(int debug, int verbose) if (i >= pfd_elms) fatalx("poll pfd too small"); - if (mctx->mrt.wbuf.queued) { + if (msgbuf_queuelen(&mctx->mrt.wbuf) > 0) { pfd[i].fd = mctx->mrt.wbuf.fd; pfd[i].events = POLLOUT; i++; @@ -3175,7 +3175,7 @@ rde_mrt_throttled(void *arg) { struct mrt *mrt = arg; - return (mrt->wbuf.queued > SESS_MSG_LOW_MARK); + return (msgbuf_queuelen(&mrt->wbuf) > SESS_MSG_LOW_MARK); } static void Index: bgpd/rtr_proto.c =================================================================== RCS file: /cvs/src/usr.sbin/bgpd/rtr_proto.c,v diff -u -p -r1.38 rtr_proto.c --- bgpd/rtr_proto.c 12 Aug 2024 09:04:23 -0000 1.38 +++ bgpd/rtr_proto.c 20 Aug 2024 09:29:34 -0000 @@ -1263,7 +1263,7 @@ rtr_dispatch_msg(struct pollfd *pfd, str rtr_fsm(rs, RTR_EVNT_CON_CLOSE); return; } - if (pfd->revents & POLLOUT && rs->w.queued) { + if (pfd->revents & POLLOUT && msgbuf_queuelen(&rs->w) > 0) { if ((error = ibuf_write(&rs->w)) == -1) { if (errno != EAGAIN) { log_warn("rtr %s: write error", log_rtr(rs)); @@ -1272,7 +1272,8 @@ rtr_dispatch_msg(struct pollfd *pfd, str } if (error == 0) rtr_fsm(rs, RTR_EVNT_CON_CLOSE); - if (rs->w.queued == 0 && rs->state == RTR_STATE_ERROR) + if (rs->state == RTR_STATE_ERROR && + msgbuf_queuelen(&rs->w) == 0) rtr_fsm(rs, RTR_EVNT_CON_CLOSE); } if (pfd->revents & POLLIN) { @@ -1378,7 +1379,7 @@ rtr_poll_events(struct pollfd *pfds, siz pfd->fd = rs->fd; pfd->events = 0; - if (rs->w.queued) + if (msgbuf_queuelen(&rs->w) > 0) pfd->events |= POLLOUT; if (rs->state >= RTR_STATE_ESTABLISHED) pfd->events |= POLLIN; Index: bgpd/session.c =================================================================== RCS file: /cvs/src/usr.sbin/bgpd/session.c,v diff -u -p -r1.480 session.c --- bgpd/session.c 10 Jun 2024 12:51:25 -0000 1.480 +++ bgpd/session.c 20 Aug 2024 09:34:12 -0000 @@ -305,7 +305,7 @@ session_main(int debug, int verbose) free(m); continue; } - if (m->wbuf.queued) + if (msgbuf_queuelen(&m->wbuf) > 0) mrt_cnt++; } @@ -415,7 +415,8 @@ session_main(int debug, int verbose) /* are we waiting for a write? */ events = POLLIN; - if (p->wbuf.queued > 0 || p->state == STATE_CONNECT) + if (msgbuf_queuelen(&p->wbuf) > 0 || + p->state == STATE_CONNECT) events |= POLLOUT; /* is there still work to do? */ if (p->rpending && p->rbuf && p->rbuf->wpos) @@ -433,7 +434,7 @@ session_main(int debug, int verbose) idx_peers = i; LIST_FOREACH(m, &mrthead, entry) - if (m->wbuf.queued) { + if (msgbuf_queuelen(&m->wbuf) > 0) { pfd[i].fd = m->wbuf.fd; pfd[i].events = POLLOUT; mrt_l[i - idx_peers] = m; @@ -884,7 +885,8 @@ change_state(struct peer *peer, enum ses * try to write out what's buffered (maybe a notification), * don't bother if it fails */ - if (peer->state >= STATE_OPENSENT && peer->wbuf.queued) + if (peer->state >= STATE_OPENSENT && + msgbuf_queuelen(&peer->wbuf) > 0) msgbuf_write(&peer->wbuf); /* @@ -1429,7 +1431,7 @@ session_sendmsg(struct bgp_msg *msg, str } ibuf_close(&p->wbuf, msg->buf); - if (!p->throttled && p->wbuf.queued > SESS_MSG_HIGH_MARK) { + if (!p->throttled && msgbuf_queuelen(&p->wbuf) > SESS_MSG_HIGH_MARK) { if (imsg_rde(IMSG_XOFF, p->conf.id, NULL, 0) == -1) log_peer_warn(&p->conf, "imsg_compose XOFF"); else @@ -1936,7 +1938,7 @@ session_dispatch_msg(struct pollfd *pfd, return (1); } - if (pfd->revents & POLLOUT && p->wbuf.queued) { + if (pfd->revents & POLLOUT && msgbuf_queuelen(&p->wbuf) > 0) { if ((error = msgbuf_write(&p->wbuf)) <= 0 && errno != EAGAIN) { if (error == 0) log_peer_warnx(&p->conf, "Connection closed"); @@ -1947,7 +1949,8 @@ session_dispatch_msg(struct pollfd *pfd, } p->stats.last_write = getmonotime(); start_timer_sendholdtime(p); - if (p->throttled && p->wbuf.queued < SESS_MSG_LOW_MARK) { + if (p->throttled && + msgbuf_queuelen(&p->wbuf) < SESS_MSG_LOW_MARK) { if (imsg_rde(IMSG_XON, p->conf.id, NULL, 0) == -1) log_peer_warn(&p->conf, "imsg_compose XON"); else Index: bgpd/session.h =================================================================== RCS file: /cvs/src/usr.sbin/bgpd/session.h,v diff -u -p -r1.171 session.h --- bgpd/session.h 12 Aug 2024 09:04:23 -0000 1.171 +++ bgpd/session.h 20 Aug 2024 09:24:20 -0000 @@ -151,6 +151,7 @@ struct peer_stats { time_t last_updown; time_t last_read; time_t last_write; + uint32_t msg_queue_len; uint32_t prefix_cnt; uint32_t prefix_out_cnt; uint32_t pending_update;