]> git.puffer.fish Git - mirror/frr.git/commitdiff
bgpd: batched i/o
authorQuentin Young <qlyoung@cumulusnetworks.com>
Fri, 2 Jun 2017 01:52:39 +0000 (01:52 +0000)
committerQuentin Young <qlyoung@cumulusnetworks.com>
Thu, 30 Nov 2017 21:18:00 +0000 (16:18 -0500)
Instead of reading a packet header and the rest of the packet in two
separate i/o cycles, instead read a chunk of data at one time and then
parse as many packets as possible out of the chunk.

Also changes bgp_packet.c to batch process packets.

To avoid thrashing on useless mutex locks, the scheduling call for
bgp_process_packet has been changed to always succeed at the cost of no
longer being cancel-able. In this case this is acceptable; following the
pattern of other event-based callbacks, an additional check in
bgp_process_packet to ignore stray events is sufficient. Before deleting
the peer all events are cleared which provides the requisite ordering.

XXX: chunk hardcoded to 5, should use something similar to wpkt_quanta

Signed-off-by: Quentin Young <qlyoung@cumulusnetworks.com>
bgpd/bgp_fsm.c
bgpd/bgp_io.c
bgpd/bgp_packet.c
bgpd/bgpd.c

index 0c0c29b740ed762e9684cb8373f58885aa8610a0..961ee20abbb4aef9cc46d6cdb31071b710b14dee 100644 (file)
@@ -360,11 +360,9 @@ void bgp_timer_set(struct peer *peer)
                   and keepalive must be turned off. */
                if (peer->v_holdtime == 0) {
                        BGP_TIMER_OFF(peer->t_holdtime);
-                       bgp_keepalives_off(peer);
                } else {
                        BGP_TIMER_ON(peer->t_holdtime, bgp_holdtime_timer,
                                     peer->v_holdtime);
-                       bgp_keepalives_on(peer);
                }
                break;
        case Deleted:
@@ -1553,7 +1551,10 @@ static int bgp_establish(struct peer *peer)
 
        hook_call(peer_established, peer);
 
-       /* Reset uptime, send keepalive, send current table. */
+       /* Reset uptime, turn on keepalives, send current table. */
+       if (!peer->v_holdtime)
+               bgp_keepalives_on(peer);
+
        peer->uptime = bgp_clock();
 
        /* Send route-refresh when ORF is enabled */
index c833b6f3fd2b5703da0d7e04686075fc6f3de6d8..71c2812959007ec06be87953e17c1c5ad71a47f5 100644 (file)
@@ -47,10 +47,6 @@ static bool validate_header(struct peer *);
 #define BGP_IO_TRANS_ERR        (1 << 1) // EAGAIN or similar occurred
 #define BGP_IO_FATAL_ERR        (1 << 2) // some kind of fatal TCP error
 
-/* bgp_read() status codes */
-#define BGP_IO_READ_HEADER      (1 << 3) // when read a full packet header
-#define BGP_IO_READ_FULLPACKET  (1 << 4) // read a full packet
-
 /* Start and stop routines for I/O pthread + control variables
  * ------------------------------------------------------------------------ */
 bool bgp_packet_write_thread_run = false;
@@ -174,6 +170,8 @@ void bgp_reads_on(struct peer *peer)
                listnode_delete(read_cancel, peer);
                thread_add_read(fpt->master, bgp_process_reads, peer, peer->fd,
                                &peer->t_read);
+               thread_add_background(bm->master, bgp_process_packet, peer, 0,
+                                     &peer->t_process_packet);
                SET_FLAG(peer->thread_flags, PEER_THREAD_READS_ON);
        }
        pthread_mutex_unlock(work_mtx);
@@ -236,76 +234,128 @@ static int bgp_process_writes(struct thread *thread)
 /**
  * Called from PTHREAD_IO when select() or poll() determines that the file
  * descriptor is ready to be read from.
+ *
+ * We read as much data as possible, process as many packets as we can and
+ * place them on peer->ibuf for secondary processing by the main thread.
  */
 static int bgp_process_reads(struct thread *thread)
 {
-       static struct peer *peer;
+       static struct peer *peer; // peer to read from
+       uint16_t status;          // bgp_read status code
+       bool more = true;        // whether we got more data
+       bool fatal = false;       // whether fatal error occurred
+       bool added_pkt = false;   // whether we pushed onto ->ibuf
+       bool header_valid = true; // whether header is valid
+
        peer = THREAD_ARG(thread);
-       uint16_t status;
 
        if (peer->fd < 0)
                return -1;
 
        struct frr_pthread *fpt = frr_pthread_get(PTHREAD_IO);
 
-       bool reschedule = true;
-
-       // execute read
        pthread_mutex_lock(&peer->io_mtx);
        {
                status = bgp_read(peer);
        }
        pthread_mutex_unlock(&peer->io_mtx);
 
-       // check results of read
-       bool header_valid = true;
+       /* error checking phase */
+       if (CHECK_FLAG(status, BGP_IO_TRANS_ERR)) {
+               /* no problem; just don't process packets */
+               more = false;
+       }
 
-       if (CHECK_FLAG(status, BGP_IO_TRANS_ERR)) { /* no problem */
+       if (CHECK_FLAG(status, BGP_IO_FATAL_ERR)) {
+               /* problem; tear down session */
+               more = false;
+               fatal = true;
        }
 
-       if (CHECK_FLAG(status, BGP_IO_FATAL_ERR))
-               reschedule = false; // problem
+       while (more) {
+               /* static buffer for transferring packets */
+               static unsigned char pktbuf[BGP_MAX_PACKET_SIZE];
+               /* shorter alias to peer's input buffer */
+               struct stream *ibw = peer->ibuf_work;
+               /* offset of start of current packet */
+               size_t offset = stream_get_getp(ibw);
+               /* packet size as given by header */
+               u_int16_t pktsize = 0;
+
+               /* check that we have enough data for a header */
+               if (STREAM_READABLE(ibw) < BGP_HEADER_SIZE)
+                       break;
 
-       if (CHECK_FLAG(status, BGP_IO_READ_HEADER)) {
+               /* validate header */
                header_valid = validate_header(peer);
+
                if (!header_valid) {
-                       bgp_size_t packetsize =
-                               MIN((int)stream_get_endp(peer->ibuf_work),
-                                   BGP_MAX_PACKET_SIZE);
-                       memcpy(peer->last_reset_cause, peer->ibuf_work->data,
-                              packetsize);
-                       peer->last_reset_cause_size = packetsize;
-                       // We're tearing the session down, no point in
-                       // rescheduling.
-                       // Additionally, bgp_read() will use the TLV if it's
-                       // present to
-                       // determine how much to read; if this is corrupt, we'll
-                       // crash the
-                       // program.
-                       reschedule = false;
+                       fatal = true;
+                       break;
                }
-       }
 
-       // if we read a full packet, push it onto peer->ibuf, reset our WiP
-       // buffer
-       // and schedule a job to process it on the main thread
-       if (header_valid && CHECK_FLAG(status, BGP_IO_READ_FULLPACKET)) {
-               pthread_mutex_lock(&peer->io_mtx);
-               {
-                       stream_fifo_push(peer->ibuf,
-                                        stream_dup(peer->ibuf_work));
-               }
-               pthread_mutex_unlock(&peer->io_mtx);
-               stream_reset(peer->ibuf_work);
-               assert(stream_get_endp(peer->ibuf_work) == 0);
+               /* header is valid; retrieve packet size */
+               pktsize = stream_getw_from(ibw, offset + BGP_MARKER_SIZE);
 
-               thread_add_background(bm->master, bgp_process_packet, peer, 0,
-                                     &peer->t_process_packet);
+               /* if this fails we are seriously screwed */
+               assert(pktsize <= BGP_MAX_PACKET_SIZE);
+
+               /* If we have that much data, chuck it into its own
+                * stream and append to input queue for processing. */
+               if (STREAM_READABLE(ibw) >= pktsize) {
+                       struct stream *pkt = stream_new(pktsize);
+                       stream_get(pktbuf, ibw, pktsize);
+                       stream_put(pkt, pktbuf, pktsize);
+
+                       pthread_mutex_lock(&peer->io_mtx);
+                       {
+                               stream_fifo_push(peer->ibuf, pkt);
+                       }
+                       pthread_mutex_unlock(&peer->io_mtx);
+
+                       added_pkt = true;
+               } else
+                       break;
+       }
+
+       /* After reading:
+        * 1. Move unread data to stream start to make room for more.
+        * 2. Reschedule and return when we have additional data.
+        *
+        * XXX: Heavy abuse of stream API. This needs a ring buffer.
+        */
+       if (more && STREAM_WRITEABLE(peer->ibuf_work) < BGP_MAX_PACKET_SIZE) {
+               void *from = stream_pnt(peer->ibuf_work);
+               void *to = peer->ibuf_work->data;
+               size_t siz = STREAM_READABLE(peer->ibuf_work);
+               memmove(to, from, siz);
+               stream_set_getp(peer->ibuf_work, 0);
+               stream_set_endp(peer->ibuf_work, siz);
        }
 
-       if (reschedule)
+       assert(STREAM_WRITEABLE(peer->ibuf_work) >= BGP_MAX_PACKET_SIZE);
+
+       /* handle invalid header */
+       if (fatal) {
+               if (!header_valid) {
+                       bgp_size_t pktsize = BGP_HEADER_SIZE;
+                       stream_get(peer->last_reset_cause, peer->ibuf_work,
+                                  pktsize);
+                       peer->last_reset_cause_size = pktsize;
+               }
+
+               /* wipe buffer just in case someone screwed up */
+               stream_reset(peer->ibuf_work);
+       } else {
                thread_add_read(fpt->master, bgp_process_reads, peer, peer->fd,
                                &peer->t_read);
+               if (added_pkt)
+                       thread_add_event(bm->master, bgp_process_packet, peer,
+                                        0, NULL);
+               //                        thread_add_background(bm->master,
+               //                        bgp_process_packet, peer,
+               //                                         0, NULL);
+       }
 
        return 0;
 }
@@ -420,19 +470,9 @@ static uint16_t bgp_read(struct peer *peer)
 {
        int readsize; // how many bytes we want to read
        int nbytes;   // how many bytes we actually read
-       bool have_header = false;
        uint16_t status = 0;
 
-       if (stream_get_endp(peer->ibuf_work) < BGP_HEADER_SIZE)
-               readsize = BGP_HEADER_SIZE - stream_get_endp(peer->ibuf_work);
-       else {
-               // retrieve packet length from tlv and compute # bytes we still
-               // need
-               u_int16_t mlen =
-                       stream_getw_from(peer->ibuf_work, BGP_MARKER_SIZE);
-               readsize = mlen - stream_get_endp(peer->ibuf_work);
-               have_header = true;
-       }
+       readsize = STREAM_WRITEABLE(peer->ibuf_work);
 
        nbytes = stream_read_try(peer->ibuf_work, peer->fd, readsize);
 
@@ -491,25 +531,6 @@ static uint16_t bgp_read(struct peer *peer)
                return status;
        }
 
-       // If we didn't have the header before read(), and now we do, set the
-       // appropriate flag. The caller must validate the header for us.
-       if (!have_header
-           && stream_get_endp(peer->ibuf_work) >= BGP_HEADER_SIZE) {
-               SET_FLAG(status, BGP_IO_READ_HEADER);
-               have_header = true;
-       }
-       // If we read the # of bytes specified in the tlv, we have read a full
-       // packet.
-       //
-       // Note that the header may not have been validated here. This flag
-       // means
-       // ONLY that we read the # of bytes specified in the header; if the
-       // header is
-       // not valid, the packet MUST NOT be processed further.
-       if (have_header && (stream_getw_from(peer->ibuf_work, BGP_MARKER_SIZE)
-                           == stream_get_endp(peer->ibuf_work)))
-               SET_FLAG(status, BGP_IO_READ_FULLPACKET);
-
        return status;
 }
 
@@ -520,20 +541,22 @@ static uint16_t bgp_read(struct peer *peer)
 static bool validate_header(struct peer *peer)
 {
        u_int16_t size, type;
+       struct stream *pkt = peer->ibuf_work;
+       size_t getp = stream_get_getp(pkt);
 
        static uint8_t marker[BGP_MARKER_SIZE] = {
                0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff,
                0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff};
 
-       if (memcmp(marker, peer->ibuf_work->data, BGP_MARKER_SIZE) != 0) {
+       if (memcmp(marker, stream_pnt(pkt), BGP_MARKER_SIZE) != 0) {
                bgp_notify_send(peer, BGP_NOTIFY_HEADER_ERR,
                                BGP_NOTIFY_HEADER_NOT_SYNC);
                return false;
        }
 
        /* Get size and type. */
-       size = stream_getw_from(peer->ibuf_work, BGP_MARKER_SIZE);
-       type = stream_getc_from(peer->ibuf_work, BGP_MARKER_SIZE + 2);
+       size = stream_getw_from(pkt, getp + BGP_MARKER_SIZE);
+       type = stream_getc_from(pkt, getp + BGP_MARKER_SIZE + 2);
 
        /* BGP type check. */
        if (type != BGP_MSG_OPEN && type != BGP_MSG_UPDATE
index c21e3cbe379d355819e44f583e1b366e92c1204f..28964deeff7388097bc33a057998a5911c484e05 100644 (file)
@@ -1966,84 +1966,97 @@ int bgp_process_packet(struct thread *thread)
        peer = THREAD_ARG(thread);
 
        /* Guard against scheduled events that occur after peer deletion. */
-       if (peer->status == Deleted)
+       if (peer->status == Deleted || peer->status == Clearing)
                return 0;
 
-       u_char type = 0;
-       bgp_size_t size;
-       char notify_data_length[2];
-       u_int32_t notify_out;
+       int processed = 0;
 
-       /* Note notify_out so we can check later to see if we sent another one
-        */
-       notify_out = peer->notify_out;
+       while (processed < 5 && peer->ibuf->count > 0) {
+               u_char type = 0;
+               bgp_size_t size;
+               char notify_data_length[2];
+               u_int32_t notify_out;
 
-       pthread_mutex_lock(&peer->io_mtx);
-       {
-               peer->curr = stream_fifo_pop(peer->ibuf);
-       }
-       pthread_mutex_unlock(&peer->io_mtx);
+               /* Note notify_out so we can check later to see if we sent
+                * another one */
+               notify_out = peer->notify_out;
 
-       if (peer->curr == NULL) // no packets to process, hmm...
-               return 0;
+               pthread_mutex_lock(&peer->io_mtx);
+               {
+                       peer->curr = stream_fifo_pop(peer->ibuf);
+               }
+               pthread_mutex_unlock(&peer->io_mtx);
 
-       bgp_size_t actual_size = stream_get_endp(peer->curr);
-
-       /* skip the marker and copy the packet length */
-       stream_forward_getp(peer->curr, BGP_MARKER_SIZE);
-       memcpy(notify_data_length, stream_pnt(peer->curr), 2);
-
-       /* read in the packet length and type */
-       size = stream_getw(peer->curr);
-       type = stream_getc(peer->curr);
-
-       /* BGP packet dump function. */
-       bgp_dump_packet(peer, type, peer->curr);
-
-       /* adjust size to exclude the marker + length + type */
-       size -= BGP_HEADER_SIZE;
-
-       /* Read rest of the packet and call each sort of packet routine */
-       switch (type) {
-       case BGP_MSG_OPEN:
-               peer->open_in++;
-               bgp_open_receive(peer, size); /* XXX return value ignored! */
-               break;
-       case BGP_MSG_UPDATE:
-               peer->readtime = monotime(NULL);
-               bgp_update_receive(peer, size);
-               break;
-       case BGP_MSG_NOTIFY:
-               bgp_notify_receive(peer, size);
-               break;
-       case BGP_MSG_KEEPALIVE:
-               peer->readtime = monotime(NULL);
-               bgp_keepalive_receive(peer, size);
-               break;
-       case BGP_MSG_ROUTE_REFRESH_NEW:
-       case BGP_MSG_ROUTE_REFRESH_OLD:
-               peer->refresh_in++;
-               bgp_route_refresh_receive(peer, size);
-               break;
-       case BGP_MSG_CAPABILITY:
-               peer->dynamic_cap_in++;
-               bgp_capability_receive(peer, size);
-               break;
-       }
+               if (peer->curr == NULL) // no packets to process, hmm...
+                       return 0;
 
-       /* If reading this packet caused us to send a NOTIFICATION then store a
-        * copy
-        * of the packet for troubleshooting purposes
-        */
-       if (notify_out < peer->notify_out) {
-               memcpy(peer->last_reset_cause, peer->curr->data, actual_size);
-               peer->last_reset_cause_size = actual_size;
+               bgp_size_t actual_size = stream_get_endp(peer->curr);
+
+               /* skip the marker and copy the packet length */
+               stream_forward_getp(peer->curr, BGP_MARKER_SIZE);
+               memcpy(notify_data_length, stream_pnt(peer->curr), 2);
+
+               /* read in the packet length and type */
+               size = stream_getw(peer->curr);
+               type = stream_getc(peer->curr);
+
+               /* BGP packet dump function. */
+               bgp_dump_packet(peer, type, peer->curr);
+
+               /* adjust size to exclude the marker + length + type */
+               size -= BGP_HEADER_SIZE;
+
+               /* Read rest of the packet and call each sort of packet routine
+                */
+               switch (type) {
+               case BGP_MSG_OPEN:
+                       peer->open_in++;
+                       bgp_open_receive(peer,
+                                        size); /* XXX return value ignored! */
+                       break;
+               case BGP_MSG_UPDATE:
+                       peer->readtime = monotime(NULL);
+                       bgp_update_receive(peer, size);
+                       break;
+               case BGP_MSG_NOTIFY:
+                       bgp_notify_receive(peer, size);
+                       break;
+               case BGP_MSG_KEEPALIVE:
+                       peer->readtime = monotime(NULL);
+                       bgp_keepalive_receive(peer, size);
+                       break;
+               case BGP_MSG_ROUTE_REFRESH_NEW:
+               case BGP_MSG_ROUTE_REFRESH_OLD:
+                       peer->refresh_in++;
+                       bgp_route_refresh_receive(peer, size);
+                       break;
+               case BGP_MSG_CAPABILITY:
+                       peer->dynamic_cap_in++;
+                       bgp_capability_receive(peer, size);
+                       break;
+               }
+
+               /* If reading this packet caused us to send a NOTIFICATION then
+                * store a copy
+                * of the packet for troubleshooting purposes
+                */
+               if (notify_out < peer->notify_out) {
+                       memcpy(peer->last_reset_cause, peer->curr->data,
+                              actual_size);
+                       peer->last_reset_cause_size = actual_size;
+               }
+
+               /* Delete packet and carry on. */
+               if (peer->curr) {
+                       stream_free(peer->curr);
+                       peer->curr = NULL;
+                       processed++;
+               }
        }
 
-       /* Delete packet and carry on. */
-       if (peer->curr) {
-               stream_free(peer->curr);
-               peer->curr = NULL;
+       if (peer->ibuf->count > 0) { // more work to do, come back later
+               thread_add_background(bm->master, bgp_process_packet, peer, 0,
+                                     &peer->t_process_packet);
        }
 
        return 0;
index 361a995a696575bd28c13a0c5e907795cedad306..527066e0a3abf5e05b14afba3fce455de852cbcf 100644 (file)
@@ -1162,10 +1162,9 @@ struct peer *peer_new(struct bgp *bgp)
         */
        peer->obuf_work =
                stream_new(BGP_MAX_PACKET_SIZE + BGP_MAX_PACKET_SIZE_OVERFLOW);
-       peer->ibuf_work = stream_new(BGP_MAX_PACKET_SIZE);
+       peer->ibuf_work = stream_new(BGP_MAX_PACKET_SIZE * 5);
        peer->scratch = stream_new(BGP_MAX_PACKET_SIZE);
 
-
        bgp_sync_init(peer);
 
        /* Get service port number.  */