From 9eb217ff69bb8c74191f9347b73959bc17ebe7df Mon Sep 17 00:00:00 2001 From: Quentin Young Date: Fri, 2 Jun 2017 01:52:39 +0000 Subject: [PATCH] bgpd: batched i/o Instead of reading a packet header and the rest of the packet in two separate i/o cycles, instead read a chunk of data at one time and then parse as many packets as possible out of the chunk. Also changes bgp_packet.c to batch process packets. To avoid thrashing on useless mutex locks, the scheduling call for bgp_process_packet has been changed to always succeed at the cost of no longer being cancel-able. In this case this is acceptable; following the pattern of other event-based callbacks, an additional check in bgp_process_packet to ignore stray events is sufficient. Before deleting the peer all events are cleared which provides the requisite ordering. XXX: chunk hardcoded to 5, should use something similar to wpkt_quanta Signed-off-by: Quentin Young --- bgpd/bgp_fsm.c | 7 +- bgpd/bgp_io.c | 179 ++++++++++++++++++++++++++-------------------- bgpd/bgp_packet.c | 151 ++++++++++++++++++++------------------ bgpd/bgpd.c | 3 +- 4 files changed, 188 insertions(+), 152 deletions(-) diff --git a/bgpd/bgp_fsm.c b/bgpd/bgp_fsm.c index 0c0c29b740..961ee20abb 100644 --- a/bgpd/bgp_fsm.c +++ b/bgpd/bgp_fsm.c @@ -360,11 +360,9 @@ void bgp_timer_set(struct peer *peer) and keepalive must be turned off. */ if (peer->v_holdtime == 0) { BGP_TIMER_OFF(peer->t_holdtime); - bgp_keepalives_off(peer); } else { BGP_TIMER_ON(peer->t_holdtime, bgp_holdtime_timer, peer->v_holdtime); - bgp_keepalives_on(peer); } break; case Deleted: @@ -1553,7 +1551,10 @@ static int bgp_establish(struct peer *peer) hook_call(peer_established, peer); - /* Reset uptime, send keepalive, send current table. */ + /* Reset uptime, turn on keepalives, send current table. */ + if (!peer->v_holdtime) + bgp_keepalives_on(peer); + peer->uptime = bgp_clock(); /* Send route-refresh when ORF is enabled */ diff --git a/bgpd/bgp_io.c b/bgpd/bgp_io.c index c833b6f3fd..71c2812959 100644 --- a/bgpd/bgp_io.c +++ b/bgpd/bgp_io.c @@ -47,10 +47,6 @@ static bool validate_header(struct peer *); #define BGP_IO_TRANS_ERR (1 << 1) // EAGAIN or similar occurred #define BGP_IO_FATAL_ERR (1 << 2) // some kind of fatal TCP error -/* bgp_read() status codes */ -#define BGP_IO_READ_HEADER (1 << 3) // when read a full packet header -#define BGP_IO_READ_FULLPACKET (1 << 4) // read a full packet - /* Start and stop routines for I/O pthread + control variables * ------------------------------------------------------------------------ */ bool bgp_packet_write_thread_run = false; @@ -174,6 +170,8 @@ void bgp_reads_on(struct peer *peer) listnode_delete(read_cancel, peer); thread_add_read(fpt->master, bgp_process_reads, peer, peer->fd, &peer->t_read); + thread_add_background(bm->master, bgp_process_packet, peer, 0, + &peer->t_process_packet); SET_FLAG(peer->thread_flags, PEER_THREAD_READS_ON); } pthread_mutex_unlock(work_mtx); @@ -236,76 +234,128 @@ static int bgp_process_writes(struct thread *thread) /** * Called from PTHREAD_IO when select() or poll() determines that the file * descriptor is ready to be read from. + * + * We read as much data as possible, process as many packets as we can and + * place them on peer->ibuf for secondary processing by the main thread. */ static int bgp_process_reads(struct thread *thread) { - static struct peer *peer; + static struct peer *peer; // peer to read from + uint16_t status; // bgp_read status code + bool more = true; // whether we got more data + bool fatal = false; // whether fatal error occurred + bool added_pkt = false; // whether we pushed onto ->ibuf + bool header_valid = true; // whether header is valid + peer = THREAD_ARG(thread); - uint16_t status; if (peer->fd < 0) return -1; struct frr_pthread *fpt = frr_pthread_get(PTHREAD_IO); - bool reschedule = true; - - // execute read pthread_mutex_lock(&peer->io_mtx); { status = bgp_read(peer); } pthread_mutex_unlock(&peer->io_mtx); - // check results of read - bool header_valid = true; + /* error checking phase */ + if (CHECK_FLAG(status, BGP_IO_TRANS_ERR)) { + /* no problem; just don't process packets */ + more = false; + } - if (CHECK_FLAG(status, BGP_IO_TRANS_ERR)) { /* no problem */ + if (CHECK_FLAG(status, BGP_IO_FATAL_ERR)) { + /* problem; tear down session */ + more = false; + fatal = true; } - if (CHECK_FLAG(status, BGP_IO_FATAL_ERR)) - reschedule = false; // problem + while (more) { + /* static buffer for transferring packets */ + static unsigned char pktbuf[BGP_MAX_PACKET_SIZE]; + /* shorter alias to peer's input buffer */ + struct stream *ibw = peer->ibuf_work; + /* offset of start of current packet */ + size_t offset = stream_get_getp(ibw); + /* packet size as given by header */ + u_int16_t pktsize = 0; + + /* check that we have enough data for a header */ + if (STREAM_READABLE(ibw) < BGP_HEADER_SIZE) + break; - if (CHECK_FLAG(status, BGP_IO_READ_HEADER)) { + /* validate header */ header_valid = validate_header(peer); + if (!header_valid) { - bgp_size_t packetsize = - MIN((int)stream_get_endp(peer->ibuf_work), - BGP_MAX_PACKET_SIZE); - memcpy(peer->last_reset_cause, peer->ibuf_work->data, - packetsize); - peer->last_reset_cause_size = packetsize; - // We're tearing the session down, no point in - // rescheduling. - // Additionally, bgp_read() will use the TLV if it's - // present to - // determine how much to read; if this is corrupt, we'll - // crash the - // program. - reschedule = false; + fatal = true; + break; } - } - // if we read a full packet, push it onto peer->ibuf, reset our WiP - // buffer - // and schedule a job to process it on the main thread - if (header_valid && CHECK_FLAG(status, BGP_IO_READ_FULLPACKET)) { - pthread_mutex_lock(&peer->io_mtx); - { - stream_fifo_push(peer->ibuf, - stream_dup(peer->ibuf_work)); - } - pthread_mutex_unlock(&peer->io_mtx); - stream_reset(peer->ibuf_work); - assert(stream_get_endp(peer->ibuf_work) == 0); + /* header is valid; retrieve packet size */ + pktsize = stream_getw_from(ibw, offset + BGP_MARKER_SIZE); - thread_add_background(bm->master, bgp_process_packet, peer, 0, - &peer->t_process_packet); + /* if this fails we are seriously screwed */ + assert(pktsize <= BGP_MAX_PACKET_SIZE); + + /* If we have that much data, chuck it into its own + * stream and append to input queue for processing. */ + if (STREAM_READABLE(ibw) >= pktsize) { + struct stream *pkt = stream_new(pktsize); + stream_get(pktbuf, ibw, pktsize); + stream_put(pkt, pktbuf, pktsize); + + pthread_mutex_lock(&peer->io_mtx); + { + stream_fifo_push(peer->ibuf, pkt); + } + pthread_mutex_unlock(&peer->io_mtx); + + added_pkt = true; + } else + break; + } + + /* After reading: + * 1. Move unread data to stream start to make room for more. + * 2. Reschedule and return when we have additional data. + * + * XXX: Heavy abuse of stream API. This needs a ring buffer. + */ + if (more && STREAM_WRITEABLE(peer->ibuf_work) < BGP_MAX_PACKET_SIZE) { + void *from = stream_pnt(peer->ibuf_work); + void *to = peer->ibuf_work->data; + size_t siz = STREAM_READABLE(peer->ibuf_work); + memmove(to, from, siz); + stream_set_getp(peer->ibuf_work, 0); + stream_set_endp(peer->ibuf_work, siz); } - if (reschedule) + assert(STREAM_WRITEABLE(peer->ibuf_work) >= BGP_MAX_PACKET_SIZE); + + /* handle invalid header */ + if (fatal) { + if (!header_valid) { + bgp_size_t pktsize = BGP_HEADER_SIZE; + stream_get(peer->last_reset_cause, peer->ibuf_work, + pktsize); + peer->last_reset_cause_size = pktsize; + } + + /* wipe buffer just in case someone screwed up */ + stream_reset(peer->ibuf_work); + } else { thread_add_read(fpt->master, bgp_process_reads, peer, peer->fd, &peer->t_read); + if (added_pkt) + thread_add_event(bm->master, bgp_process_packet, peer, + 0, NULL); + // thread_add_background(bm->master, + // bgp_process_packet, peer, + // 0, NULL); + } return 0; } @@ -420,19 +470,9 @@ static uint16_t bgp_read(struct peer *peer) { int readsize; // how many bytes we want to read int nbytes; // how many bytes we actually read - bool have_header = false; uint16_t status = 0; - if (stream_get_endp(peer->ibuf_work) < BGP_HEADER_SIZE) - readsize = BGP_HEADER_SIZE - stream_get_endp(peer->ibuf_work); - else { - // retrieve packet length from tlv and compute # bytes we still - // need - u_int16_t mlen = - stream_getw_from(peer->ibuf_work, BGP_MARKER_SIZE); - readsize = mlen - stream_get_endp(peer->ibuf_work); - have_header = true; - } + readsize = STREAM_WRITEABLE(peer->ibuf_work); nbytes = stream_read_try(peer->ibuf_work, peer->fd, readsize); @@ -491,25 +531,6 @@ static uint16_t bgp_read(struct peer *peer) return status; } - // If we didn't have the header before read(), and now we do, set the - // appropriate flag. The caller must validate the header for us. - if (!have_header - && stream_get_endp(peer->ibuf_work) >= BGP_HEADER_SIZE) { - SET_FLAG(status, BGP_IO_READ_HEADER); - have_header = true; - } - // If we read the # of bytes specified in the tlv, we have read a full - // packet. - // - // Note that the header may not have been validated here. This flag - // means - // ONLY that we read the # of bytes specified in the header; if the - // header is - // not valid, the packet MUST NOT be processed further. - if (have_header && (stream_getw_from(peer->ibuf_work, BGP_MARKER_SIZE) - == stream_get_endp(peer->ibuf_work))) - SET_FLAG(status, BGP_IO_READ_FULLPACKET); - return status; } @@ -520,20 +541,22 @@ static uint16_t bgp_read(struct peer *peer) static bool validate_header(struct peer *peer) { u_int16_t size, type; + struct stream *pkt = peer->ibuf_work; + size_t getp = stream_get_getp(pkt); static uint8_t marker[BGP_MARKER_SIZE] = { 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff}; - if (memcmp(marker, peer->ibuf_work->data, BGP_MARKER_SIZE) != 0) { + if (memcmp(marker, stream_pnt(pkt), BGP_MARKER_SIZE) != 0) { bgp_notify_send(peer, BGP_NOTIFY_HEADER_ERR, BGP_NOTIFY_HEADER_NOT_SYNC); return false; } /* Get size and type. */ - size = stream_getw_from(peer->ibuf_work, BGP_MARKER_SIZE); - type = stream_getc_from(peer->ibuf_work, BGP_MARKER_SIZE + 2); + size = stream_getw_from(pkt, getp + BGP_MARKER_SIZE); + type = stream_getc_from(pkt, getp + BGP_MARKER_SIZE + 2); /* BGP type check. */ if (type != BGP_MSG_OPEN && type != BGP_MSG_UPDATE diff --git a/bgpd/bgp_packet.c b/bgpd/bgp_packet.c index c21e3cbe37..28964deeff 100644 --- a/bgpd/bgp_packet.c +++ b/bgpd/bgp_packet.c @@ -1966,84 +1966,97 @@ int bgp_process_packet(struct thread *thread) peer = THREAD_ARG(thread); /* Guard against scheduled events that occur after peer deletion. */ - if (peer->status == Deleted) + if (peer->status == Deleted || peer->status == Clearing) return 0; - u_char type = 0; - bgp_size_t size; - char notify_data_length[2]; - u_int32_t notify_out; + int processed = 0; - /* Note notify_out so we can check later to see if we sent another one - */ - notify_out = peer->notify_out; + while (processed < 5 && peer->ibuf->count > 0) { + u_char type = 0; + bgp_size_t size; + char notify_data_length[2]; + u_int32_t notify_out; - pthread_mutex_lock(&peer->io_mtx); - { - peer->curr = stream_fifo_pop(peer->ibuf); - } - pthread_mutex_unlock(&peer->io_mtx); + /* Note notify_out so we can check later to see if we sent + * another one */ + notify_out = peer->notify_out; - if (peer->curr == NULL) // no packets to process, hmm... - return 0; + pthread_mutex_lock(&peer->io_mtx); + { + peer->curr = stream_fifo_pop(peer->ibuf); + } + pthread_mutex_unlock(&peer->io_mtx); - bgp_size_t actual_size = stream_get_endp(peer->curr); - - /* skip the marker and copy the packet length */ - stream_forward_getp(peer->curr, BGP_MARKER_SIZE); - memcpy(notify_data_length, stream_pnt(peer->curr), 2); - - /* read in the packet length and type */ - size = stream_getw(peer->curr); - type = stream_getc(peer->curr); - - /* BGP packet dump function. */ - bgp_dump_packet(peer, type, peer->curr); - - /* adjust size to exclude the marker + length + type */ - size -= BGP_HEADER_SIZE; - - /* Read rest of the packet and call each sort of packet routine */ - switch (type) { - case BGP_MSG_OPEN: - peer->open_in++; - bgp_open_receive(peer, size); /* XXX return value ignored! */ - break; - case BGP_MSG_UPDATE: - peer->readtime = monotime(NULL); - bgp_update_receive(peer, size); - break; - case BGP_MSG_NOTIFY: - bgp_notify_receive(peer, size); - break; - case BGP_MSG_KEEPALIVE: - peer->readtime = monotime(NULL); - bgp_keepalive_receive(peer, size); - break; - case BGP_MSG_ROUTE_REFRESH_NEW: - case BGP_MSG_ROUTE_REFRESH_OLD: - peer->refresh_in++; - bgp_route_refresh_receive(peer, size); - break; - case BGP_MSG_CAPABILITY: - peer->dynamic_cap_in++; - bgp_capability_receive(peer, size); - break; - } + if (peer->curr == NULL) // no packets to process, hmm... + return 0; - /* If reading this packet caused us to send a NOTIFICATION then store a - * copy - * of the packet for troubleshooting purposes - */ - if (notify_out < peer->notify_out) { - memcpy(peer->last_reset_cause, peer->curr->data, actual_size); - peer->last_reset_cause_size = actual_size; + bgp_size_t actual_size = stream_get_endp(peer->curr); + + /* skip the marker and copy the packet length */ + stream_forward_getp(peer->curr, BGP_MARKER_SIZE); + memcpy(notify_data_length, stream_pnt(peer->curr), 2); + + /* read in the packet length and type */ + size = stream_getw(peer->curr); + type = stream_getc(peer->curr); + + /* BGP packet dump function. */ + bgp_dump_packet(peer, type, peer->curr); + + /* adjust size to exclude the marker + length + type */ + size -= BGP_HEADER_SIZE; + + /* Read rest of the packet and call each sort of packet routine + */ + switch (type) { + case BGP_MSG_OPEN: + peer->open_in++; + bgp_open_receive(peer, + size); /* XXX return value ignored! */ + break; + case BGP_MSG_UPDATE: + peer->readtime = monotime(NULL); + bgp_update_receive(peer, size); + break; + case BGP_MSG_NOTIFY: + bgp_notify_receive(peer, size); + break; + case BGP_MSG_KEEPALIVE: + peer->readtime = monotime(NULL); + bgp_keepalive_receive(peer, size); + break; + case BGP_MSG_ROUTE_REFRESH_NEW: + case BGP_MSG_ROUTE_REFRESH_OLD: + peer->refresh_in++; + bgp_route_refresh_receive(peer, size); + break; + case BGP_MSG_CAPABILITY: + peer->dynamic_cap_in++; + bgp_capability_receive(peer, size); + break; + } + + /* If reading this packet caused us to send a NOTIFICATION then + * store a copy + * of the packet for troubleshooting purposes + */ + if (notify_out < peer->notify_out) { + memcpy(peer->last_reset_cause, peer->curr->data, + actual_size); + peer->last_reset_cause_size = actual_size; + } + + /* Delete packet and carry on. */ + if (peer->curr) { + stream_free(peer->curr); + peer->curr = NULL; + processed++; + } } - /* Delete packet and carry on. */ - if (peer->curr) { - stream_free(peer->curr); - peer->curr = NULL; + if (peer->ibuf->count > 0) { // more work to do, come back later + thread_add_background(bm->master, bgp_process_packet, peer, 0, + &peer->t_process_packet); } return 0; diff --git a/bgpd/bgpd.c b/bgpd/bgpd.c index 361a995a69..527066e0a3 100644 --- a/bgpd/bgpd.c +++ b/bgpd/bgpd.c @@ -1162,10 +1162,9 @@ struct peer *peer_new(struct bgp *bgp) */ peer->obuf_work = stream_new(BGP_MAX_PACKET_SIZE + BGP_MAX_PACKET_SIZE_OVERFLOW); - peer->ibuf_work = stream_new(BGP_MAX_PACKET_SIZE); + peer->ibuf_work = stream_new(BGP_MAX_PACKET_SIZE * 5); peer->scratch = stream_new(BGP_MAX_PACKET_SIZE); - bgp_sync_init(peer); /* Get service port number. */ -- 2.39.5