]> git.puffer.fish Git - mirror/frr.git/commitdiff
bgpd: atomize write-quanta, add read-quanta
authorQuentin Young <qlyoung@cumulusnetworks.com>
Mon, 5 Jun 2017 20:14:47 +0000 (20:14 +0000)
committerQuentin Young <qlyoung@cumulusnetworks.com>
Thu, 30 Nov 2017 21:18:00 +0000 (16:18 -0500)
bgpd supports setting a write-quanta that serves as a hint on how many
packets to write per I/O cycle. Now that input is buffered, it makes
sense to add the equivalent parameter for how many packets are processed
per cycle. This is *not* how many packets are read off the wire per I/O
cycle; rather it is how many packets are processed from the input buffer
in a given cycle after having been read off the wire and sanitized.

Since these values must be used from multiple threads, they have also
been made atomic.

Signed-off-by: Quentin Young <qlyoung@cumulusnetworks.com>
bgpd/bgp_io.c
bgpd/bgp_io.h
bgpd/bgp_packet.c
bgpd/bgp_packet.h
bgpd/bgp_vty.c
bgpd/bgp_vty.h
bgpd/bgpd.c
bgpd/bgpd.h

index 71c2812959007ec06be87953e17c1c5ad71a47f5..c9e3002625ae26856b2711c8ba98367829c7139f 100644 (file)
@@ -381,9 +381,13 @@ static uint16_t bgp_write(struct peer *peer)
        unsigned int count = 0;
        unsigned int oc = 0;
        uint16_t status = 0;
+       uint32_t wpkt_quanta_old;
 
-       while (count < peer->bgp->wpkt_quanta
-              && (s = stream_fifo_head(peer->obuf))) {
+       // cache current write quanta
+       wpkt_quanta_old = atomic_load_explicit(&peer->bgp->wpkt_quanta,
+                                              memory_order_relaxed);
+
+       while (count < wpkt_quanta_old && (s = stream_fifo_head(peer->obuf))) {
                int writenum;
                do {
                        writenum = stream_get_endp(s) - stream_get_getp(s);
index 3e1701003e46ce5fd60e2ee0d59563578b5fd412..6cda227a052c90fff59790b28810024f4e35fad2 100644 (file)
@@ -23,6 +23,9 @@
 #ifndef _FRR_BGP_IO_H
 #define _FRR_BGP_IO_H
 
+#define BGP_WRITE_PACKET_MAX 10U
+#define BGP_READ_PACKET_MAX  10U
+
 #include "bgpd/bgpd.h"
 #include "frr_pthread.h"
 
index 28964deeff7388097bc33a057998a5911c484e05..7c924fd7335309f2ab6a4fb13be31fabcddcdc53 100644 (file)
@@ -1963,15 +1963,35 @@ int bgp_process_packet(struct thread *thread)
 {
        /* Yes first of all get peer pointer. */
        struct peer *peer;
+       uint32_t rpkt_quanta_old;
+
        peer = THREAD_ARG(thread);
+       rpkt_quanta_old = atomic_load_explicit(&peer->bgp->rpkt_quanta,
+                                              memory_order_relaxed);
+
+       /*
+        * XXX: At present multiple packet reads per input cycle are
+        * problematic. The issue is that some of the packet processing
+        * functions perform their own FSM checks, that arguably should be
+        * located in bgp_fsm.c. For example if we are in OpenConfirm process a
+        * Keepalive, then a keepalive-received event is placed on the event
+        * queue to handle later. If we then process an Update before that
+        * event has popped, the update function checks that the peer status is
+        * in Established and if not tears down the session. Therefore we'll
+        * limit input processing to 1 packet per cycle, as it traditionally
+        * was, until this problem is rectified.
+        *
+        * @qlyoung June 2017
+        */
+       rpkt_quanta_old = 1;
 
        /* Guard against scheduled events that occur after peer deletion. */
        if (peer->status == Deleted || peer->status == Clearing)
                return 0;
 
-       int processed = 0;
+       unsigned int processed = 0;
 
-       while (processed < 5 && peer->ibuf->count > 0) {
+       while (processed < rpkt_quanta_old) {
                u_char type = 0;
                bgp_size_t size;
                char notify_data_length[2];
@@ -2054,10 +2074,13 @@ int bgp_process_packet(struct thread *thread)
                }
        }
 
-       if (peer->ibuf->count > 0) { // more work to do, come back later
-               thread_add_background(bm->master, bgp_process_packet, peer, 0,
-                                     &peer->t_process_packet);
+       pthread_mutex_lock(&peer->io_mtx);
+       {
+               if (peer->ibuf->count > 0) // more work to do, come back later
+                       thread_add_event(bm->master, bgp_process_packet, peer,
+                                        0, NULL);
        }
+       pthread_mutex_unlock(&peer->io_mtx);
 
        return 0;
 }
index 502dbbdeedd3df6994d1dfb3f3afe65b03a374a3..62c3fe671bf1cd3d5a76d6a9e4d1a715c85756d2 100644 (file)
@@ -24,7 +24,6 @@
 #define BGP_NLRI_LENGTH       1U
 #define BGP_TOTAL_ATTR_LEN    2U
 #define BGP_UNFEASIBLE_LEN    2U
-#define BGP_WRITE_PACKET_MAX 10U
 
 /* When to refresh */
 #define REFRESH_IMMEDIATE 1
index 0953790606f193ff513ce415f3e2f4de3495e2e3..6f1620faca376613c723f301a8509f2b099e30f1 100644 (file)
@@ -57,6 +57,7 @@
 #include "bgpd/bgp_packet.h"
 #include "bgpd/bgp_updgrp.h"
 #include "bgpd/bgp_bfd.h"
+#include "bgpd/bgp_io.h"
 
 static struct peer_group *listen_range_exists(struct bgp *bgp,
                                              struct prefix *range, int exact);
@@ -1332,25 +1333,55 @@ static int bgp_wpkt_quanta_config_vty(struct vty *vty, const char *num,
 {
        VTY_DECLVAR_CONTEXT(bgp, bgp);
 
-       if (set)
-               bgp->wpkt_quanta = strtoul(num, NULL, 10);
-       else
-               bgp->wpkt_quanta = BGP_WRITE_PACKET_MAX;
+       if (set) {
+               uint32_t quanta = strtoul(num, NULL, 10);
+               atomic_store_explicit(&bgp->wpkt_quanta, quanta,
+                                     memory_order_relaxed);
+       } else {
+               atomic_store_explicit(&bgp->wpkt_quanta, BGP_WRITE_PACKET_MAX,
+                                     memory_order_relaxed);
+       }
+
+       return CMD_SUCCESS;
+}
+
+static int bgp_rpkt_quanta_config_vty(struct vty *vty, const char *num,
+                                     char set)
+{
+       VTY_DECLVAR_CONTEXT(bgp, bgp);
+
+       if (set) {
+               uint32_t quanta = strtoul(num, NULL, 10);
+               atomic_store_explicit(&bgp->rpkt_quanta, quanta,
+                                     memory_order_relaxed);
+       } else {
+               atomic_store_explicit(&bgp->rpkt_quanta, BGP_READ_PACKET_MAX,
+                                     memory_order_relaxed);
+       }
 
        return CMD_SUCCESS;
 }
 
 void bgp_config_write_wpkt_quanta(struct vty *vty, struct bgp *bgp)
 {
-       if (bgp->wpkt_quanta != BGP_WRITE_PACKET_MAX)
-               vty_out(vty, " write-quanta %d\n", bgp->wpkt_quanta);
+       uint32_t quanta =
+               atomic_load_explicit(&bgp->wpkt_quanta, memory_order_relaxed);
+       if (quanta != BGP_WRITE_PACKET_MAX)
+               vty_out(vty, " write-quanta %d%s", quanta, VTY_NEWLINE);
 }
 
+void bgp_config_write_rpkt_quanta(struct vty *vty, struct bgp *bgp)
+{
+       uint32_t quanta =
+               atomic_load_explicit(&bgp->rpkt_quanta, memory_order_relaxed);
+       if (quanta != BGP_READ_PACKET_MAX)
+               vty_out(vty, " read-quanta %d%s", quanta, VTY_NEWLINE);
+}
 
-/* Update-delay configuration */
+/* Packet quanta configuration */
 DEFUN (bgp_wpkt_quanta,
        bgp_wpkt_quanta_cmd,
-       "write-quanta (1-10000)",
+       "write-quanta (1-10)",
        "How many packets to write to peer socket per run\n"
        "Number of packets\n")
 {
@@ -1358,18 +1389,38 @@ DEFUN (bgp_wpkt_quanta,
        return bgp_wpkt_quanta_config_vty(vty, argv[idx_number]->arg, 1);
 }
 
-/* Update-delay deconfiguration */
 DEFUN (no_bgp_wpkt_quanta,
        no_bgp_wpkt_quanta_cmd,
-       "no write-quanta (1-10000)",
+       "no write-quanta (1-10)",
        NO_STR
-       "How many packets to write to peer socket per run\n"
+       "How many packets to write to peer socket per I/O cycle\n"
        "Number of packets\n")
 {
        int idx_number = 2;
        return bgp_wpkt_quanta_config_vty(vty, argv[idx_number]->arg, 0);
 }
 
+DEFUN (bgp_rpkt_quanta,
+       bgp_rpkt_quanta_cmd,
+       "read-quanta (1-10)",
+       "How many packets to read from peer socket per I/O cycle\n"
+       "Number of packets\n")
+{
+       int idx_number = 1;
+       return bgp_rpkt_quanta_config_vty(vty, argv[idx_number]->arg, 1);
+}
+
+DEFUN (no_bgp_rpkt_quanta,
+       no_bgp_rpkt_quanta_cmd,
+       "no read-quanta (1-10)",
+       NO_STR
+       "How many packets to read from peer socket per I/O cycle\n"
+       "Number of packets\n")
+{
+       int idx_number = 2;
+       return bgp_rpkt_quanta_config_vty(vty, argv[idx_number]->arg, 0);
+}
+
 void bgp_config_write_coalesce_time(struct vty *vty, struct bgp *bgp)
 {
        if (bgp->coalesce_time != BGP_DEFAULT_SUBGROUP_COALESCE_TIME)
@@ -11375,6 +11426,8 @@ void bgp_vty_init(void)
 
        install_element(BGP_NODE, &bgp_wpkt_quanta_cmd);
        install_element(BGP_NODE, &no_bgp_wpkt_quanta_cmd);
+       install_element(BGP_NODE, &bgp_rpkt_quanta_cmd);
+       install_element(BGP_NODE, &no_bgp_rpkt_quanta_cmd);
 
        install_element(BGP_NODE, &bgp_coalesce_time_cmd);
        install_element(BGP_NODE, &no_bgp_coalesce_time_cmd);
index 59bc0126615486b558de24fad308f9844b44f25e..e456f7caed0611ae69420ab8bb135982da091db0 100644 (file)
@@ -48,6 +48,7 @@ extern const char *afi_safi_print(afi_t, safi_t);
 extern const char *afi_safi_json(afi_t, safi_t);
 extern void bgp_config_write_update_delay(struct vty *, struct bgp *);
 extern void bgp_config_write_wpkt_quanta(struct vty *vty, struct bgp *bgp);
+extern void bgp_config_write_rpkt_quanta(struct vty *vty, struct bgp *bgp);
 extern void bgp_config_write_listen(struct vty *vty, struct bgp *bgp);
 extern void bgp_config_write_coalesce_time(struct vty *vty, struct bgp *bgp);
 extern int bgp_vty_return(struct vty *vty, int ret);
index 527066e0a3abf5e05b14afba3fce455de852cbcf..b546056c16121be2f4b86c46e71890be22bbc7fa 100644 (file)
@@ -1162,7 +1162,7 @@ struct peer *peer_new(struct bgp *bgp)
         */
        peer->obuf_work =
                stream_new(BGP_MAX_PACKET_SIZE + BGP_MAX_PACKET_SIZE_OVERFLOW);
-       peer->ibuf_work = stream_new(BGP_MAX_PACKET_SIZE * 5);
+       peer->ibuf_work = stream_new(BGP_MAX_PACKET_SIZE * BGP_READ_PACKET_MAX);
        peer->scratch = stream_new(BGP_MAX_PACKET_SIZE);
 
        bgp_sync_init(peer);
@@ -2908,7 +2908,10 @@ static struct bgp *bgp_create(as_t *as, const char *name,
                                 bgp->restart_time, &bgp->t_startup);
        }
 
-       bgp->wpkt_quanta = BGP_WRITE_PACKET_MAX;
+       atomic_store_explicit(&bgp->wpkt_quanta, BGP_WRITE_PACKET_MAX,
+                             memory_order_relaxed);
+       atomic_store_explicit(&bgp->rpkt_quanta, BGP_READ_PACKET_MAX,
+                             memory_order_relaxed);
        bgp->coalesce_time = BGP_DEFAULT_SUBGROUP_COALESCE_TIME;
 
        QOBJ_REG(bgp, bgp);
@@ -7192,6 +7195,8 @@ int bgp_config_write(struct vty *vty)
 
                /* write quanta */
                bgp_config_write_wpkt_quanta(vty, bgp);
+               /* read quanta */
+               bgp_config_write_rpkt_quanta(vty, bgp);
 
                /* coalesce time */
                bgp_config_write_coalesce_time(vty, bgp);
index 208cd897d7731b5132f03a885d729aa56557c856..79c445aac5b420322398d284dc7bc96a02a25a49 100644 (file)
@@ -378,7 +378,9 @@ struct bgp {
 #define BGP_FLAG_IBGP_MULTIPATH_SAME_CLUSTERLEN (1 << 0)
        } maxpaths[AFI_MAX][SAFI_MAX];
 
-       u_int32_t wpkt_quanta; /* per peer packet quanta to write */
+       _Atomic uint32_t wpkt_quanta; // max # packets to write per i/o cycle
+       _Atomic uint32_t rpkt_quanta; // max # packets to read per i/o cycle
+
        u_int32_t coalesce_time;
 
        u_int32_t addpath_tx_id;