]> git.puffer.fish Git - mirror/frr.git/commitdiff
bgpd: implement buffered reads
authorQuentin Young <qlyoung@cumulusnetworks.com>
Tue, 2 May 2017 00:37:45 +0000 (00:37 +0000)
committerQuentin Young <qlyoung@cumulusnetworks.com>
Thu, 30 Nov 2017 21:17:59 +0000 (16:17 -0500)
* Move and modify all network input related code to bgp_io.c
* Add a real input buffer to `struct peer`
* Move connection initialization to its own thread.c task instead of
  piggybacking off of bgp_read()
* Tons of little fixups

Primary changes are in bgp_packet.[ch], bgp_io.[ch], bgp_fsm.[ch].
Changes made elsewhere are almost exclusively refactoring peer->ibuf to
peer->curr since peer->ibuf is now the true FIFO packet input buffer
while peer->curr represents the packet currently being processed by the
main pthread.

Signed-off-by: Quentin Young <qlyoung@cumulusnetworks.com>
17 files changed:
bgpd/bgp_attr.c
bgpd/bgp_fsm.c
bgpd/bgp_fsm.h
bgpd/bgp_io.c
bgpd/bgp_io.h
bgpd/bgp_keepalives.c
bgpd/bgp_keepalives.h
bgpd/bgp_packet.c
bgpd/bgp_packet.h
bgpd/bgp_updgrp.h
bgpd/bgp_vty.c
bgpd/bgpd.c
bgpd/bgpd.h
bgpd/rfapi/rfapi.c
bgpd/rfapi/vnc_zebra.c
lib/thread.c
lib/thread.h

index 6ddb2ec8a7deac4ab8f557f650fbe702322bb6c3..859158e3c2210d2a5a2ddbabc0edb15b5c01c9ce 100644 (file)
@@ -1156,7 +1156,7 @@ static int bgp_attr_aspath(struct bgp_attr_parser_args *args)
         * peer with AS4 => will get 4Byte ASnums
         * otherwise, will get 16 Bit
         */
-       attr->aspath = aspath_parse(peer->ibuf, length,
+       attr->aspath = aspath_parse(peer->curr, length,
                                    CHECK_FLAG(peer->cap, PEER_CAP_AS4_RCV));
 
        /* In case of IBGP, length will be zero. */
@@ -1230,7 +1230,7 @@ static int bgp_attr_as4_path(struct bgp_attr_parser_args *args,
        struct attr *const attr = args->attr;
        const bgp_size_t length = args->length;
 
-       *as4_path = aspath_parse(peer->ibuf, length, 1);
+       *as4_path = aspath_parse(peer->curr, length, 1);
 
        /* In case of IBGP, length will be zero. */
        if (!*as4_path) {
@@ -1271,7 +1271,7 @@ static bgp_attr_parse_ret_t bgp_attr_nexthop(struct bgp_attr_parser_args *args)
           logged locally (this is implemented somewhere else). The UPDATE
           message
           gets ignored in any of these cases. */
-       nexthop_n = stream_get_ipv4(peer->ibuf);
+       nexthop_n = stream_get_ipv4(peer->curr);
        nexthop_h = ntohl(nexthop_n);
        if ((IPV4_NET0(nexthop_h) || IPV4_NET127(nexthop_h)
             || IPV4_CLASS_DE(nexthop_h))
@@ -1307,7 +1307,7 @@ static bgp_attr_parse_ret_t bgp_attr_med(struct bgp_attr_parser_args *args)
                                          args->total);
        }
 
-       attr->med = stream_getl(peer->ibuf);
+       attr->med = stream_getl(peer->curr);
 
        attr->flag |= ATTR_FLAG_BIT(BGP_ATTR_MULTI_EXIT_DISC);
 
@@ -1333,11 +1333,11 @@ bgp_attr_local_pref(struct bgp_attr_parser_args *args)
           external peer, then this attribute MUST be ignored by the
           receiving speaker. */
        if (peer->sort == BGP_PEER_EBGP) {
-               stream_forward_getp(peer->ibuf, length);
+               stream_forward_getp(peer->curr, length);
                return BGP_ATTR_PARSE_PROCEED;
        }
 
-       attr->local_pref = stream_getl(peer->ibuf);
+       attr->local_pref = stream_getl(peer->curr);
 
        /* Set the local-pref flag. */
        attr->flag |= ATTR_FLAG_BIT(BGP_ATTR_LOCAL_PREF);
@@ -1386,10 +1386,10 @@ static int bgp_attr_aggregator(struct bgp_attr_parser_args *args)
        }
 
        if (CHECK_FLAG(peer->cap, PEER_CAP_AS4_RCV))
-               attr->aggregator_as = stream_getl(peer->ibuf);
+               attr->aggregator_as = stream_getl(peer->curr);
        else
-               attr->aggregator_as = stream_getw(peer->ibuf);
-       attr->aggregator_addr.s_addr = stream_get_ipv4(peer->ibuf);
+               attr->aggregator_as = stream_getw(peer->curr);
+       attr->aggregator_addr.s_addr = stream_get_ipv4(peer->curr);
 
        /* Set atomic aggregate flag. */
        attr->flag |= ATTR_FLAG_BIT(BGP_ATTR_AGGREGATOR);
@@ -1413,8 +1413,8 @@ bgp_attr_as4_aggregator(struct bgp_attr_parser_args *args,
                                          0);
        }
 
-       *as4_aggregator_as = stream_getl(peer->ibuf);
-       as4_aggregator_addr->s_addr = stream_get_ipv4(peer->ibuf);
+       *as4_aggregator_as = stream_getl(peer->curr);
+       as4_aggregator_addr->s_addr = stream_get_ipv4(peer->curr);
 
        attr->flag |= ATTR_FLAG_BIT(BGP_ATTR_AS4_AGGREGATOR);
 
@@ -1540,10 +1540,10 @@ bgp_attr_community(struct bgp_attr_parser_args *args)
        }
 
        attr->community =
-               community_parse((u_int32_t *)stream_pnt(peer->ibuf), length);
+               community_parse((u_int32_t *)stream_pnt(peer->curr), length);
 
        /* XXX: fix community_parse to use stream API and remove this */
-       stream_forward_getp(peer->ibuf, length);
+       stream_forward_getp(peer->curr, length);
 
        if (!attr->community)
                return bgp_attr_malformed(args, BGP_NOTIFY_UPDATE_OPT_ATTR_ERR,
@@ -1570,7 +1570,7 @@ bgp_attr_originator_id(struct bgp_attr_parser_args *args)
                                          args->total);
        }
 
-       attr->originator_id.s_addr = stream_get_ipv4(peer->ibuf);
+       attr->originator_id.s_addr = stream_get_ipv4(peer->curr);
 
        attr->flag |= ATTR_FLAG_BIT(BGP_ATTR_ORIGINATOR_ID);
 
@@ -1594,10 +1594,10 @@ bgp_attr_cluster_list(struct bgp_attr_parser_args *args)
        }
 
        attr->cluster =
-               cluster_parse((struct in_addr *)stream_pnt(peer->ibuf), length);
+               cluster_parse((struct in_addr *)stream_pnt(peer->curr), length);
 
        /* XXX: Fix cluster_parse to use stream API and then remove this */
-       stream_forward_getp(peer->ibuf, length);
+       stream_forward_getp(peer->curr, length);
 
        attr->flag |= ATTR_FLAG_BIT(BGP_ATTR_CLUSTER_LIST);
 
@@ -1778,7 +1778,7 @@ int bgp_mp_unreach_parse(struct bgp_attr_parser_args *args,
        struct attr *const attr = args->attr;
        const bgp_size_t length = args->length;
 
-       s = peer->ibuf;
+       s = peer->curr;
 
 #define BGP_MP_UNREACH_MIN_SIZE 3
        if ((length > STREAM_READABLE(s)) || (length < BGP_MP_UNREACH_MIN_SIZE))
@@ -1832,9 +1832,9 @@ bgp_attr_large_community(struct bgp_attr_parser_args *args)
        }
 
        attr->lcommunity =
-               lcommunity_parse((u_int8_t *)stream_pnt(peer->ibuf), length);
+               lcommunity_parse((u_int8_t *)stream_pnt(peer->curr), length);
        /* XXX: fix ecommunity_parse to use stream API */
-       stream_forward_getp(peer->ibuf, length);
+       stream_forward_getp(peer->curr, length);
 
        if (!attr->lcommunity)
                return bgp_attr_malformed(args, BGP_NOTIFY_UPDATE_OPT_ATTR_ERR,
@@ -1861,9 +1861,9 @@ bgp_attr_ext_communities(struct bgp_attr_parser_args *args)
        }
 
        attr->ecommunity =
-               ecommunity_parse((u_int8_t *)stream_pnt(peer->ibuf), length);
+               ecommunity_parse((u_int8_t *)stream_pnt(peer->curr), length);
        /* XXX: fix ecommunity_parse to use stream API */
-       stream_forward_getp(peer->ibuf, length);
+       stream_forward_getp(peer->curr, length);
 
        if (!attr->ecommunity)
                return bgp_attr_malformed(args, BGP_NOTIFY_UPDATE_OPT_ATTR_ERR,
@@ -1957,7 +1957,7 @@ static int bgp_attr_encap(uint8_t type, struct peer *peer, /* IN */
                                      + sublength);
                tlv->type = subtype;
                tlv->length = sublength;
-               stream_get(tlv->value, peer->ibuf, sublength);
+               stream_get(tlv->value, peer->curr, sublength);
                length -= sublength;
 
                /* attach tlv to encap chain */
@@ -2025,8 +2025,8 @@ bgp_attr_prefix_sid(struct bgp_attr_parser_args *args,
 
        attr->flag |= ATTR_FLAG_BIT(BGP_ATTR_PREFIX_SID);
 
-       type = stream_getc(peer->ibuf);
-       length = stream_getw(peer->ibuf);
+       type = stream_getc(peer->curr);
+       length = stream_getw(peer->curr);
 
        if (type == BGP_PREFIX_SID_LABEL_INDEX) {
                if (length != BGP_PREFIX_SID_LABEL_INDEX_LENGTH) {
@@ -2039,11 +2039,11 @@ bgp_attr_prefix_sid(struct bgp_attr_parser_args *args,
                }
 
                /* Ignore flags and reserved */
-               stream_getc(peer->ibuf);
-               stream_getw(peer->ibuf);
+               stream_getc(peer->curr);
+               stream_getw(peer->curr);
 
                /* Fetch the label index and see if it is valid. */
-               label_index = stream_getl(peer->ibuf);
+               label_index = stream_getl(peer->curr);
                if (label_index == BGP_INVALID_LABEL_INDEX)
                        return bgp_attr_malformed(
                                args, BGP_NOTIFY_UPDATE_OPT_ATTR_ERR,
@@ -2074,16 +2074,16 @@ bgp_attr_prefix_sid(struct bgp_attr_parser_args *args,
                }
 
                /* Ignore reserved */
-               stream_getc(peer->ibuf);
-               stream_getw(peer->ibuf);
+               stream_getc(peer->curr);
+               stream_getw(peer->curr);
 
-               stream_get(&ipv6_sid, peer->ibuf, 16);
+               stream_get(&ipv6_sid, peer->curr, 16);
        }
 
        /* Placeholder code for the Originator SRGB type */
        else if (type == BGP_PREFIX_SID_ORIGINATOR_SRGB) {
                /* Ignore flags */
-               stream_getw(peer->ibuf);
+               stream_getw(peer->curr);
 
                length -= 2;
 
@@ -2099,8 +2099,8 @@ bgp_attr_prefix_sid(struct bgp_attr_parser_args *args,
                srgb_count = length / BGP_PREFIX_SID_ORIGINATOR_SRGB_LENGTH;
 
                for (int i = 0; i < srgb_count; i++) {
-                       stream_get(&srgb_base, peer->ibuf, 3);
-                       stream_get(&srgb_range, peer->ibuf, 3);
+                       stream_get(&srgb_base, peer->curr, 3);
+                       stream_get(&srgb_range, peer->curr, 3);
                }
        }
 
@@ -2125,7 +2125,7 @@ static bgp_attr_parse_ret_t bgp_attr_unknown(struct bgp_attr_parser_args *args)
                        peer->host, type, length);
 
        /* Forward read pointer of input stream. */
-       stream_forward_getp(peer->ibuf, length);
+       stream_forward_getp(peer->curr, length);
 
        /* If any of the mandatory well-known attributes are not recognized,
           then the Error Subcode is set to Unrecognized Well-known
index 95e2f157cce778052ccc32e4c2776cb65c3af4bb..18262e6e943c3a589671aae8d4836d20514fadc9 100644 (file)
@@ -126,35 +126,61 @@ static struct peer *peer_xfer_conn(struct peer *from_peer)
                           from_peer->host, from_peer, from_peer->fd, peer,
                           peer->fd);
 
-       peer_writes_off(peer);
-       BGP_READ_OFF(peer->t_read);
-       peer_writes_off(from_peer);
-       BGP_READ_OFF(from_peer->t_read);
+       bgp_writes_off(peer);
+       bgp_reads_off(peer);
+       bgp_writes_off(from_peer);
+       bgp_reads_off(from_peer);
 
        BGP_TIMER_OFF(peer->t_routeadv);
+       BGP_TIMER_OFF(peer->t_connect);
+       BGP_TIMER_OFF(peer->t_connect_check);
        BGP_TIMER_OFF(from_peer->t_routeadv);
-
-       fd = peer->fd;
-       peer->fd = from_peer->fd;
-       from_peer->fd = fd;
-       stream_reset(peer->ibuf);
+       BGP_TIMER_OFF(from_peer->t_connect);
+       BGP_TIMER_OFF(from_peer->t_connect_check);
 
        // At this point in time, it is possible that there are packets pending
        // on
-       // from_peer->obuf. These need to be transferred to the new peer struct.
-       pthread_mutex_lock(&peer->obuf_mtx);
-       pthread_mutex_lock(&from_peer->obuf_mtx);
+       // various buffers. Those need to be transferred or dropped, otherwise
+       // we'll
+       // get spurious failures during session establishment.
+       pthread_mutex_lock(&peer->io_mtx);
+       pthread_mutex_lock(&from_peer->io_mtx);
        {
-               // wipe new peer's packet queue
+               fd = peer->fd;
+               peer->fd = from_peer->fd;
+               from_peer->fd = fd;
+
+               stream_fifo_clean(peer->ibuf);
                stream_fifo_clean(peer->obuf);
+               stream_reset(peer->ibuf_work);
 
-               // copy each packet from old peer's queue to new peer's queue
+               // this should never happen, since bgp_process_packet() is the
+               // only task
+               // that sets and unsets the current packet and it runs in our
+               // pthread.
+               if (peer->curr) {
+                       zlog_err(
+                               "[%s] Dropping pending packet on connection transfer:",
+                               peer->host);
+                       u_int16_t type = stream_getc_from(peer->curr,
+                                                         BGP_MARKER_SIZE + 2);
+                       bgp_dump_packet(peer, type, peer->curr);
+                       stream_free(peer->curr);
+                       peer->curr = NULL;
+               }
+
+               // copy each packet from old peer's output queue to new peer
                while (from_peer->obuf->head)
                        stream_fifo_push(peer->obuf,
                                         stream_fifo_pop(from_peer->obuf));
+
+               // copy each packet from old peer's input queue to new peer
+               while (from_peer->ibuf->head)
+                       stream_fifo_push(peer->ibuf,
+                                        stream_fifo_pop(from_peer->ibuf));
        }
-       pthread_mutex_unlock(&from_peer->obuf_mtx);
-       pthread_mutex_unlock(&peer->obuf_mtx);
+       pthread_mutex_unlock(&from_peer->io_mtx);
+       pthread_mutex_unlock(&peer->io_mtx);
 
        peer->as = from_peer->as;
        peer->v_holdtime = from_peer->v_holdtime;
@@ -232,8 +258,8 @@ static struct peer *peer_xfer_conn(struct peer *from_peer)
                }
        }
 
-       BGP_READ_ON(peer->t_read, bgp_read, peer->fd);
-       peer_writes_on(peer);
+       bgp_reads_on(peer);
+       bgp_writes_on(peer);
 
        if (from_peer)
                peer_xfer_stats(peer, from_peer);
@@ -381,6 +407,10 @@ static int bgp_connect_timer(struct thread *thread)
        int ret;
 
        peer = THREAD_ARG(thread);
+
+       assert(!peer->t_write);
+       assert(!peer->t_read);
+
        peer->t_connect = NULL;
 
        if (bgp_debug_neighbor_events(peer))
@@ -429,6 +459,9 @@ int bgp_routeadv_timer(struct thread *thread)
 
        peer->synctime = bgp_clock();
 
+       thread_add_background(bm->master, bgp_generate_updgrp_packets, peer, 0,
+                             &peer->t_generate_updgrp_packets);
+
        /* MRAI timer will be started again when FIFO is built, no need to
         * do it here.
         */
@@ -634,6 +667,9 @@ void bgp_adjust_routeadv(struct peer *peer)
                        BGP_TIMER_OFF(peer->t_routeadv);
 
                peer->synctime = bgp_clock();
+               thread_add_background(bm->master, bgp_generate_updgrp_packets,
+                                     peer, 0,
+                                     &peer->t_generate_updgrp_packets);
                return;
        }
 
@@ -1028,33 +1064,40 @@ int bgp_stop(struct peer *peer)
                bgp_bfd_deregister_peer(peer);
        }
 
-       /* Stop read and write threads when exists. */
-       BGP_READ_OFF(peer->t_read);
-       peer_writes_off(peer);
+       /* stop keepalives */
+       peer_keepalives_off(peer);
+
+       /* Stop read and write threads. */
+       bgp_writes_off(peer);
+       bgp_reads_off(peer);
+
+       THREAD_OFF(peer->t_connect_check);
 
        /* Stop all timers. */
        BGP_TIMER_OFF(peer->t_start);
        BGP_TIMER_OFF(peer->t_connect);
        BGP_TIMER_OFF(peer->t_holdtime);
-       peer_keepalives_off(peer);
        BGP_TIMER_OFF(peer->t_routeadv);
-       BGP_TIMER_OFF(peer->t_generate_updgrp_packets);
-
-       /* Stream reset. */
-       peer->packet_size = 0;
 
        /* Clear input and output buffer.  */
-       if (peer->ibuf)
-               stream_reset(peer->ibuf);
-       if (peer->work)
-               stream_reset(peer->work);
-
-       pthread_mutex_lock(&peer->obuf_mtx);
+       pthread_mutex_lock(&peer->io_mtx);
        {
+               if (peer->ibuf)
+                       stream_fifo_clean(peer->ibuf);
                if (peer->obuf)
                        stream_fifo_clean(peer->obuf);
+
+               if (peer->ibuf_work)
+                       stream_reset(peer->ibuf_work);
+               if (peer->obuf_work)
+                       stream_reset(peer->obuf_work);
+
+               if (peer->curr) {
+                       stream_free(peer->curr);
+                       peer->curr = NULL;
+               }
        }
-       pthread_mutex_unlock(&peer->obuf_mtx);
+       pthread_mutex_unlock(&peer->io_mtx);
 
        /* Close of file descriptor. */
        if (peer->fd >= 0) {
@@ -1177,10 +1220,12 @@ static int bgp_connect_check(struct thread *thread)
        struct peer *peer;
 
        peer = THREAD_ARG(thread);
+       assert(!CHECK_FLAG(peer->thread_flags, PEER_THREAD_READS_ON));
+       assert(!CHECK_FLAG(peer->thread_flags, PEER_THREAD_WRITES_ON));
+       assert(!peer->t_read);
+       assert(!peer->t_write);
 
-       /* This value needs to be unset in order for bgp_read() to be scheduled
-        */
-       BGP_READ_OFF(peer->t_read);
+       peer->t_connect_check = NULL;
 
        /* Check file descriptor. */
        slen = sizeof(status);
@@ -1218,17 +1263,16 @@ static int bgp_connect_success(struct peer *peer)
                return -1;
        }
 
-       peer_writes_on(peer);
-
        if (bgp_getsockname(peer) < 0) {
                zlog_err("%s: bgp_getsockname(): failed for peer %s, fd %d",
                         __FUNCTION__, peer->host, peer->fd);
                bgp_notify_send(peer, BGP_NOTIFY_FSM_ERR,
                                0); /* internal error */
+               bgp_writes_on(peer);
                return -1;
        }
 
-       BGP_READ_ON(peer->t_read, bgp_read, peer->fd);
+       bgp_reads_on(peer);
 
        if (bgp_debug_neighbor_events(peer)) {
                char buf1[SU_ADDRSTRLEN];
@@ -1332,6 +1376,10 @@ int bgp_start(struct peer *peer)
 #endif
        }
 
+       assert(!peer->t_write);
+       assert(!peer->t_read);
+       assert(!CHECK_FLAG(peer->thread_flags, PEER_THREAD_WRITES_ON));
+       assert(!CHECK_FLAG(peer->thread_flags, PEER_THREAD_READS_ON));
        status = bgp_connect(peer);
 
        switch (status) {
@@ -1362,7 +1410,8 @@ int bgp_start(struct peer *peer)
                // when the socket becomes ready (or fails to connect),
                // bgp_connect_check
                // will be called.
-               BGP_READ_ON(peer->t_read, bgp_connect_check, peer->fd);
+               thread_add_read(bm->master, bgp_connect_check, peer, peer->fd,
+                               &peer->t_connect_check);
                break;
        }
        return 0;
index a7abfdb2f4964d9389ce81f475546f85d662cf56..131de40b5b114aff19fed5c47ee717e28f27f680 100644 (file)
 #define _QUAGGA_BGP_FSM_H
 
 /* Macro for BGP read, write and timer thread.  */
-#define BGP_READ_ON(T, F, V)                                                   \
-       do {                                                                   \
-               if ((peer->status != Deleted))                                 \
-                       thread_add_read(bm->master, (F), peer, (V), &(T));     \
-       } while (0)
-
-#define BGP_READ_OFF(T)                                                        \
-       do {                                                                   \
-               if (T)                                                         \
-                       THREAD_READ_OFF(T);                                    \
-       } while (0)
-
-#define BGP_WRITE_OFF(T)                                                       \
-       do {                                                                   \
-               if (T)                                                         \
-                       THREAD_WRITE_OFF(T);                                   \
-       } while (0)
-
 #define BGP_TIMER_ON(T, F, V)                                                  \
        do {                                                                   \
                if ((peer->status != Deleted))                                 \
index 5d14737d2214a0c7cc09ca82c713889c441d4f78..4bdafa744cae291b428ededcc088d3c9e61a2085 100644 (file)
@@ -1,8 +1,10 @@
 /*
   BGP I/O.
-  Implements a consumer thread to flush packets destined for remote peers.
 
+  Implements packet I/O in a consumer pthread.
+  --------------------------------------------
   Copyright (C) 2017  Cumulus Networks
+  Quentin Young
 
   This program is free software; you can redistribute it and/or modify
   it under the terms of the GNU General Public License as published by
@@ -31,7 +33,7 @@
 #include "log.h"
 #include "monotime.h"
 #include "network.h"
-#include "frr_pthread.h"
+#include "pqueue.h"
 
 #include "bgpd/bgpd.h"
 #include "bgpd/bgp_io.h"
 #include "bgpd/bgp_packet.h"
 #include "bgpd/bgp_fsm.h"
 
-static int bgp_write(struct peer *);
-static void peer_process_writes(struct hash_backet *, void *);
+/* forward declarations */
+static uint16_t bgp_write(struct peer *);
+static uint16_t bgp_read(struct peer *);
+static int bgp_process_writes(struct thread *);
+static int bgp_process_reads(struct thread *);
+static bool validate_header(struct peer *);
 
-bool bgp_packet_writes_thread_run = false;
+/* generic i/o status codes */
+#define BGP_IO_TRANS_ERR        (1 << 1) // EAGAIN or similar occurred
+#define BGP_IO_FATAL_ERR        (1 << 2) // some kind of fatal TCP error
 
-/* Hash table of peers to operate on, associated synchronization primitives and
- * hash table callbacks.
+/* bgp_read() status codes */
+#define BGP_IO_READ_HEADER      (1 << 3) // when read a full packet header
+#define BGP_IO_READ_FULLPACKET  (1 << 4) // read a full packet
+
+/* Start and stop routines for I/O pthread + control variables
  * ------------------------------------------------------------------------ */
-static struct hash *peerhash;
-/* Mutex to protect hash table */
-static pthread_mutex_t *peerhash_mtx;
-/* Condition variable used to notify the write thread that there is work to do
- */
-static pthread_cond_t *write_cond;
+bool bgp_packet_write_thread_run = false;
+pthread_mutex_t *work_mtx;
 
-static unsigned int peer_hash_key_make(void *p)
-{
-       struct peer *peer = p;
-       return sockunion_hash(&peer->su);
-}
+static struct list *read_cancel;
+static struct list *write_cancel;
 
-static int peer_hash_cmp(const void *p1, const void *p2)
+void bgp_io_init()
 {
-       const struct peer *peer1 = p1;
-       const struct peer *peer2 = p2;
-       return (sockunion_same(&peer1->su, &peer2->su)
-               && CHECK_FLAG(peer1->flags, PEER_FLAG_CONFIG_NODE)
-                          == CHECK_FLAG(peer2->flags, PEER_FLAG_CONFIG_NODE));
+       work_mtx = XCALLOC(MTYPE_TMP, sizeof(pthread_mutex_t));
+       pthread_mutex_init(work_mtx, NULL);
+
+       read_cancel = list_new();
+       write_cancel = list_new();
 }
-/* ------------------------------------------------------------------------ */
 
-void peer_writes_init(void)
+void *bgp_io_start(void *arg)
 {
-       peerhash_mtx = XCALLOC(MTYPE_PTHREAD, sizeof(pthread_mutex_t));
-       write_cond = XCALLOC(MTYPE_PTHREAD, sizeof(pthread_cond_t));
-
-       // initialize mutex
-       pthread_mutex_init(peerhash_mtx, NULL);
-
-       // use monotonic clock with condition variable
-       pthread_condattr_t attrs;
-       pthread_condattr_init(&attrs);
-       pthread_condattr_setclock(&attrs, CLOCK_MONOTONIC);
-       pthread_cond_init(write_cond, &attrs);
-       pthread_condattr_destroy(&attrs);
+       struct frr_pthread *fpt = frr_pthread_get(PTHREAD_IO);
+
+       // we definitely don't want to handle signals
+       fpt->master->handle_signals = false;
+
+       bgp_packet_write_thread_run = true;
+       struct thread task;
+
+       while (bgp_packet_write_thread_run) {
+               if (thread_fetch(fpt->master, &task)) {
+                       pthread_mutex_lock(work_mtx);
+                       {
+                               bool cancel = false;
+                               struct peer *peer = THREAD_ARG(&task);
+                               if ((task.func == bgp_process_reads
+                                    && listnode_lookup(read_cancel, peer))
+                                   || (task.func == bgp_process_writes
+                                       && listnode_lookup(write_cancel, peer)))
+                                       cancel = true;
+
+                               list_delete_all_node(write_cancel);
+                               list_delete_all_node(read_cancel);
+
+                               if (!cancel)
+                                       thread_call(&task);
+                       }
+                       pthread_mutex_unlock(work_mtx);
+               }
+       }
 
-       // initialize peerhash
-       peerhash = hash_create_size(2048, peer_hash_key_make, peer_hash_cmp);
+       return NULL;
 }
 
-static void peer_writes_finish(void *arg)
+int bgp_io_stop(void **result, struct frr_pthread *fpt)
 {
-       bgp_packet_writes_thread_run = false;
-
-       if (peerhash)
-               hash_free(peerhash);
-
-       peerhash = NULL;
+       fpt->master->spin = false;
+       bgp_packet_write_thread_run = false;
+       pthread_kill(fpt->thread, SIGINT);
+       pthread_join(fpt->thread, result);
 
-       pthread_mutex_unlock(peerhash_mtx);
-       pthread_mutex_destroy(peerhash_mtx);
-       pthread_cond_destroy(write_cond);
+       pthread_mutex_unlock(work_mtx);
+       pthread_mutex_destroy(work_mtx);
 
-       XFREE(MTYPE_PTHREAD, peerhash_mtx);
-       XFREE(MTYPE_PTHREAD, write_cond);
+       list_delete(read_cancel);
+       list_delete(write_cancel);
+       XFREE(MTYPE_TMP, work_mtx);
+       return 0;
 }
+/* ------------------------------------------------------------------------ */
 
-void *peer_writes_start(void *arg)
+void bgp_writes_on(struct peer *peer)
 {
-       struct timeval currtime = {0, 0};
-       struct timeval sleeptime = {0, 500};
-       struct timespec next_update = {0, 0};
-
-       pthread_mutex_lock(peerhash_mtx);
-
-       // register cleanup handler
-       pthread_cleanup_push(&peer_writes_finish, NULL);
-
-       bgp_packet_writes_thread_run = true;
+       assert(peer->status != Deleted);
+       assert(peer->obuf);
+       assert(peer->ibuf);
+       assert(peer->ibuf_work);
+       assert(!peer->t_connect_check);
+       assert(peer->fd);
 
-       while (bgp_packet_writes_thread_run) {
-               // wait around until next update time
-               if (peerhash->count > 0)
-                       pthread_cond_timedwait(write_cond, peerhash_mtx,
-                                              &next_update);
-               else // wait around until we have some peers
-                       while (peerhash->count == 0
-                              && bgp_packet_writes_thread_run)
-                               pthread_cond_wait(write_cond, peerhash_mtx);
+       struct frr_pthread *fpt = frr_pthread_get(PTHREAD_IO);
 
-               hash_iterate(peerhash, peer_process_writes, NULL);
-
-               monotime(&currtime);
-               timeradd(&currtime, &sleeptime, &currtime);
-               TIMEVAL_TO_TIMESPEC(&currtime, &next_update);
+       pthread_mutex_lock(work_mtx);
+       {
+               listnode_delete(write_cancel, peer);
+               thread_add_write(fpt->master, bgp_process_writes, peer,
+                                peer->fd, &peer->t_write);
+               SET_FLAG(peer->thread_flags, PEER_THREAD_WRITES_ON);
        }
+       pthread_mutex_unlock(work_mtx);
+}
 
-       // clean up
-       pthread_cleanup_pop(1);
+void bgp_writes_off(struct peer *peer)
+{
+       pthread_mutex_lock(work_mtx);
+       {
+               THREAD_OFF(peer->t_write);
+               THREAD_OFF(peer->t_generate_updgrp_packets);
+               listnode_add(write_cancel, peer);
 
-       return NULL;
+               // peer access by us after this point will result in pain
+               UNSET_FLAG(peer->thread_flags, PEER_THREAD_WRITES_ON);
+       }
+       pthread_mutex_unlock(work_mtx);
+       /* upon return, i/o thread must not access the peer */
 }
 
-int peer_writes_stop(void **result)
+void bgp_reads_on(struct peer *peer)
 {
-       struct frr_pthread *fpt = frr_pthread_get(PTHREAD_WRITE);
-       bgp_packet_writes_thread_run = false;
-       peer_writes_wake();
-       pthread_join(fpt->thread, result);
-       return 0;
+       assert(peer->status != Deleted);
+       assert(peer->ibuf);
+       assert(peer->fd);
+       assert(peer->ibuf_work);
+       assert(stream_get_endp(peer->ibuf_work) == 0);
+       assert(peer->obuf);
+       assert(!peer->t_connect_check);
+       assert(peer->fd);
+
+       struct frr_pthread *fpt = frr_pthread_get(PTHREAD_IO);
+
+       pthread_mutex_lock(work_mtx);
+       {
+               listnode_delete(read_cancel, peer);
+               thread_add_read(fpt->master, bgp_process_reads, peer, peer->fd,
+                               &peer->t_read);
+               SET_FLAG(peer->thread_flags, PEER_THREAD_READS_ON);
+       }
+       pthread_mutex_unlock(work_mtx);
 }
 
-void peer_writes_on(struct peer *peer)
+void bgp_reads_off(struct peer *peer)
 {
-       if (peer->status == Deleted)
-               return;
-
-       pthread_mutex_lock(peerhash_mtx);
+       pthread_mutex_lock(work_mtx);
        {
-               if (!hash_lookup(peerhash, peer)) {
-                       hash_get(peerhash, peer, hash_alloc_intern);
-                       peer_lock(peer);
-               }
+               THREAD_OFF(peer->t_read);
+               THREAD_OFF(peer->t_process_packet);
+               listnode_add(read_cancel, peer);
 
-               SET_FLAG(peer->thread_flags, PEER_THREAD_WRITES_ON);
+               // peer access by us after this point will result in pain
+               UNSET_FLAG(peer->thread_flags, PEER_THREAD_READS_ON);
        }
-       pthread_mutex_unlock(peerhash_mtx);
-       peer_writes_wake();
+       pthread_mutex_unlock(work_mtx);
 }
 
-void peer_writes_off(struct peer *peer)
+/**
+ * Called from PTHREAD_IO when select() or poll() determines that the file
+ * descriptor is ready to be written to.
+ */
+static int bgp_process_writes(struct thread *thread)
 {
-       pthread_mutex_lock(peerhash_mtx);
+       static struct peer *peer;
+       peer = THREAD_ARG(thread);
+       uint16_t status;
+
+       if (peer->fd < 0)
+               return -1;
+
+       struct frr_pthread *fpt = frr_pthread_get(PTHREAD_IO);
+
+       bool reschedule;
+       pthread_mutex_lock(&peer->io_mtx);
        {
-               if (hash_release(peerhash, peer)) {
-                       peer_unlock(peer);
-                       fprintf(stderr, "Releasing %p\n", peer);
-               }
+               status = bgp_write(peer);
+               reschedule = (stream_fifo_head(peer->obuf) != NULL);
+       }
+       pthread_mutex_unlock(&peer->io_mtx);
 
-               UNSET_FLAG(peer->thread_flags, PEER_THREAD_WRITES_ON);
+       if (CHECK_FLAG(status, BGP_IO_TRANS_ERR)) { /* no problem */
        }
-       pthread_mutex_unlock(peerhash_mtx);
-}
 
-void peer_writes_wake()
-{
-       pthread_cond_signal(write_cond);
+       if (CHECK_FLAG(status, BGP_IO_FATAL_ERR))
+               reschedule = 0; // problem
+
+       if (reschedule) {
+               thread_add_write(fpt->master, bgp_process_writes, peer,
+                                peer->fd, &peer->t_write);
+               thread_add_background(bm->master, bgp_generate_updgrp_packets,
+                                     peer, 0,
+                                     &peer->t_generate_updgrp_packets);
+       }
+
+       return 0;
 }
 
 /**
- * Callback for hash_iterate. Takes a hash bucket, unwraps it into a peer and
- * synchronously calls bgp_write() on the peer.
+ * Called from PTHREAD_IO when select() or poll() determines that the file
+ * descriptor is ready to be read from.
  */
-static void peer_process_writes(struct hash_backet *hb, void *arg)
+static int bgp_process_reads(struct thread *thread)
 {
        static struct peer *peer;
-       peer = hb->data;
-       pthread_mutex_lock(&peer->obuf_mtx);
+       peer = THREAD_ARG(thread);
+       uint16_t status;
+
+       if (peer->fd < 0)
+               return -1;
+
+       struct frr_pthread *fpt = frr_pthread_get(PTHREAD_IO);
+
+       bool reschedule = true;
+
+       // execute read
+       pthread_mutex_lock(&peer->io_mtx);
        {
-               bgp_write(peer);
+               status = bgp_read(peer);
+       }
+       pthread_mutex_unlock(&peer->io_mtx);
+
+       // check results of read
+       bool header_valid = true;
+
+       if (CHECK_FLAG(status, BGP_IO_TRANS_ERR)) { /* no problem */
        }
-       pthread_mutex_unlock(&peer->obuf_mtx);
 
-       // dispatch job on main thread
-       BGP_TIMER_ON(peer->t_generate_updgrp_packets,
-                    bgp_generate_updgrp_packets, 100);
+       if (CHECK_FLAG(status, BGP_IO_FATAL_ERR))
+               reschedule = false; // problem
+
+       if (CHECK_FLAG(status, BGP_IO_READ_HEADER)) {
+               header_valid = validate_header(peer);
+               if (!header_valid) {
+                       bgp_size_t packetsize =
+                               MIN((int)stream_get_endp(peer->ibuf_work),
+                                   BGP_MAX_PACKET_SIZE);
+                       memcpy(peer->last_reset_cause, peer->ibuf_work->data,
+                              packetsize);
+                       peer->last_reset_cause_size = packetsize;
+                       // We're tearing the session down, no point in
+                       // rescheduling.
+                       // Additionally, bgp_read() will use the TLV if it's
+                       // present to
+                       // determine how much to read; if this is corrupt, we'll
+                       // crash the
+                       // program.
+                       reschedule = false;
+               }
+       }
+
+       // if we read a full packet, push it onto peer->ibuf, reset our WiP
+       // buffer
+       // and schedule a job to process it on the main thread
+       if (header_valid && CHECK_FLAG(status, BGP_IO_READ_FULLPACKET)) {
+               pthread_mutex_lock(&peer->io_mtx);
+               {
+                       stream_fifo_push(peer->ibuf,
+                                        stream_dup(peer->ibuf_work));
+               }
+               pthread_mutex_unlock(&peer->io_mtx);
+               stream_reset(peer->ibuf_work);
+               assert(stream_get_endp(peer->ibuf_work) == 0);
+
+               thread_add_background(bm->master, bgp_process_packet, peer, 0,
+                                     &peer->t_process_packet);
+       }
+
+       if (reschedule)
+               thread_add_read(fpt->master, bgp_process_reads, peer, peer->fd,
+                               &peer->t_read);
+
+       return 0;
 }
 
 /**
@@ -212,14 +320,14 @@ static void peer_process_writes(struct hash_backet *hb, void *arg)
  *
  * This function pops packets off of peer->obuf and writes them to peer->fd.
  * The amount of packets written is equal to the minimum of peer->wpkt_quanta
- * and the number of packets on the output buffer.
+ * and the number of packets on the output buffer, unless an error occurs.
  *
  * If write() returns an error, the appropriate FSM event is generated.
  *
  * The return value is equal to the number of packets written
  * (which may be zero).
  */
-static int bgp_write(struct peer *peer)
+static uint16_t bgp_write(struct peer *peer)
 {
        u_char type;
        struct stream *s;
@@ -227,10 +335,8 @@ static int bgp_write(struct peer *peer)
        int update_last_write = 0;
        unsigned int count = 0;
        unsigned int oc = 0;
+       uint16_t status = 0;
 
-       /* Write packets. The number of packets written is the value of
-        * bgp->wpkt_quanta or the size of the output buffer, whichever is
-        * smaller.*/
        while (count < peer->bgp->wpkt_quanta
               && (s = stream_fifo_head(peer->obuf))) {
                int writenum;
@@ -239,8 +345,12 @@ static int bgp_write(struct peer *peer)
                        num = write(peer->fd, STREAM_PNT(s), writenum);
 
                        if (num < 0) {
-                               if (!ERRNO_IO_RETRY(errno))
+                               if (!ERRNO_IO_RETRY(errno)) {
                                        BGP_EVENT_ADD(peer, TCP_fatal_error);
+                                       SET_FLAG(status, BGP_IO_FATAL_ERR);
+                               } else {
+                                       SET_FLAG(status, BGP_IO_TRANS_ERR);
+                               }
 
                                goto done;
                        } else if (num != writenum) // incomplete write
@@ -288,7 +398,7 @@ static int bgp_write(struct peer *peer)
                }
 
                count++;
-               /* OK we send packet so delete it. */
+
                stream_free(stream_fifo_pop(peer->obuf));
                update_last_write = 1;
        }
@@ -303,5 +413,170 @@ done : {
                peer->last_write = bgp_clock();
 }
 
-       return count;
+       return status;
+}
+
+/**
+ * Reads <= 1 packet worth of data from peer->fd into peer->ibuf_work.
+ *
+ * @return whether a full packet was read
+ */
+static uint16_t bgp_read(struct peer *peer)
+{
+       int readsize; // how many bytes we want to read
+       int nbytes;   // how many bytes we actually read
+       bool have_header = false;
+       uint16_t status = 0;
+
+       if (stream_get_endp(peer->ibuf_work) < BGP_HEADER_SIZE)
+               readsize = BGP_HEADER_SIZE - stream_get_endp(peer->ibuf_work);
+       else {
+               // retrieve packet length from tlv and compute # bytes we still
+               // need
+               u_int16_t mlen =
+                       stream_getw_from(peer->ibuf_work, BGP_MARKER_SIZE);
+               readsize = mlen - stream_get_endp(peer->ibuf_work);
+               have_header = true;
+       }
+
+       nbytes = stream_read_try(peer->ibuf_work, peer->fd, readsize);
+
+       if (nbytes <= 0) // handle errors
+       {
+               switch (nbytes) {
+               case -1: // fatal error; tear down the session
+                       zlog_err("%s [Error] bgp_read_packet error: %s",
+                                peer->host, safe_strerror(errno));
+
+                       if (peer->status == Established) {
+                               if (CHECK_FLAG(peer->sflags,
+                                              PEER_STATUS_NSF_MODE)) {
+                                       peer->last_reset =
+                                               PEER_DOWN_NSF_CLOSE_SESSION;
+                                       SET_FLAG(peer->sflags,
+                                                PEER_STATUS_NSF_WAIT);
+                               } else
+                                       peer->last_reset =
+                                               PEER_DOWN_CLOSE_SESSION;
+                       }
+
+                       BGP_EVENT_ADD(peer, TCP_fatal_error);
+                       SET_FLAG(status, BGP_IO_FATAL_ERR);
+                       break;
+
+               case 0: // TCP session closed
+                       if (bgp_debug_neighbor_events(peer))
+                               zlog_debug(
+                                       "%s [Event] BGP connection closed fd %d",
+                                       peer->host, peer->fd);
+
+                       if (peer->status == Established) {
+                               if (CHECK_FLAG(peer->sflags,
+                                              PEER_STATUS_NSF_MODE)) {
+                                       peer->last_reset =
+                                               PEER_DOWN_NSF_CLOSE_SESSION;
+                                       SET_FLAG(peer->sflags,
+                                                PEER_STATUS_NSF_WAIT);
+                               } else
+                                       peer->last_reset =
+                                               PEER_DOWN_CLOSE_SESSION;
+                       }
+
+                       BGP_EVENT_ADD(peer, TCP_connection_closed);
+                       SET_FLAG(status, BGP_IO_FATAL_ERR);
+                       break;
+
+               case -2: // temporary error; come back later
+                       SET_FLAG(status, BGP_IO_TRANS_ERR);
+                       break;
+               default:
+                       break;
+               }
+
+               return status;
+       }
+
+       // If we didn't have the header before read(), and now we do, set the
+       // appropriate flag. The caller must validate the header for us.
+       if (!have_header
+           && stream_get_endp(peer->ibuf_work) >= BGP_HEADER_SIZE) {
+               SET_FLAG(status, BGP_IO_READ_HEADER);
+               have_header = true;
+       }
+       // If we read the # of bytes specified in the tlv, we have read a full
+       // packet.
+       //
+       // Note that the header may not have been validated here. This flag
+       // means
+       // ONLY that we read the # of bytes specified in the header; if the
+       // header is
+       // not valid, the packet MUST NOT be processed further.
+       if (have_header && (stream_getw_from(peer->ibuf_work, BGP_MARKER_SIZE)
+                           == stream_get_endp(peer->ibuf_work)))
+               SET_FLAG(status, BGP_IO_READ_FULLPACKET);
+
+       return status;
+}
+
+/*
+ * Called after we have read a BGP packet header. Validates marker, message
+ * type and packet length. If any of these aren't correct, sends a notify.
+ */
+static bool validate_header(struct peer *peer)
+{
+       u_int16_t size, type;
+
+       /* Marker check */
+       for (int i = 0; i < BGP_MARKER_SIZE; i++)
+               if (peer->ibuf_work->data[i] != 0xff) {
+                       bgp_notify_send(peer, BGP_NOTIFY_HEADER_ERR,
+                                       BGP_NOTIFY_HEADER_NOT_SYNC);
+                       return false;
+               }
+
+       /* Get size and type. */
+       size = stream_getw_from(peer->ibuf_work, BGP_MARKER_SIZE);
+       type = stream_getc_from(peer->ibuf_work, BGP_MARKER_SIZE + 2);
+
+       /* BGP type check. */
+       if (type != BGP_MSG_OPEN && type != BGP_MSG_UPDATE
+           && type != BGP_MSG_NOTIFY && type != BGP_MSG_KEEPALIVE
+           && type != BGP_MSG_ROUTE_REFRESH_NEW
+           && type != BGP_MSG_ROUTE_REFRESH_OLD
+           && type != BGP_MSG_CAPABILITY) {
+               if (bgp_debug_neighbor_events(peer))
+                       zlog_debug("%s unknown message type 0x%02x", peer->host,
+                                  type);
+
+               bgp_notify_send_with_data(peer, BGP_NOTIFY_HEADER_ERR,
+                                         BGP_NOTIFY_HEADER_BAD_MESTYPE,
+                                         (u_char *)&type, 1);
+               return false;
+       }
+
+       /* Mimimum packet length check. */
+       if ((size < BGP_HEADER_SIZE) || (size > BGP_MAX_PACKET_SIZE)
+           || (type == BGP_MSG_OPEN && size < BGP_MSG_OPEN_MIN_SIZE)
+           || (type == BGP_MSG_UPDATE && size < BGP_MSG_UPDATE_MIN_SIZE)
+           || (type == BGP_MSG_NOTIFY && size < BGP_MSG_NOTIFY_MIN_SIZE)
+           || (type == BGP_MSG_KEEPALIVE && size != BGP_MSG_KEEPALIVE_MIN_SIZE)
+           || (type == BGP_MSG_ROUTE_REFRESH_NEW
+               && size < BGP_MSG_ROUTE_REFRESH_MIN_SIZE)
+           || (type == BGP_MSG_ROUTE_REFRESH_OLD
+               && size < BGP_MSG_ROUTE_REFRESH_MIN_SIZE)
+           || (type == BGP_MSG_CAPABILITY
+               && size < BGP_MSG_CAPABILITY_MIN_SIZE)) {
+               if (bgp_debug_neighbor_events(peer))
+                       zlog_debug("%s bad message length - %d for %s",
+                                  peer->host, size,
+                                  type == 128 ? "ROUTE-REFRESH"
+                                              : bgp_type_str[(int)type]);
+
+               bgp_notify_send_with_data(peer, BGP_NOTIFY_HEADER_ERR,
+                                         BGP_NOTIFY_HEADER_BAD_MESLEN,
+                                         (u_char *)&size, 2);
+               return false;
+       }
+
+       return true;
 }
index 7b81b8ee3bbf8437c8f60d87e14240d56f85516f..fd5f7659db3aaca2bf4211fa950193a1c1aaa7ae 100644 (file)
 #ifndef _FRR_BGP_IO_H
 #define _FRR_BGP_IO_H
 
+#include "frr_pthread.h"
 #include "bgpd/bgpd.h"
 
 /**
  * Control variable for write thread.
  *
- * Setting this variable to false and calling peer_writes_wake() will
- * eventually result in thread termination.
+ * Setting this variable to false will eventually result in thread termination.
  */
 extern bool bgp_packet_writes_thread_run;
 
@@ -37,35 +37,32 @@ extern bool bgp_packet_writes_thread_run;
  * Initializes data structures and flags for the write thread.
  *
  * This function should be called from the main thread before
- * peer_writes_start() is invoked.
+ * bgp_writes_start() is invoked.
  */
-extern void peer_writes_init(void);
+extern void bgp_io_init(void);
 
 /**
  * Start function for write thread.
  *
- * This function should be passed to pthread_create() during BGP startup.
+ * @param arg - unused
  */
-extern void *peer_writes_start(void *arg);
+extern void *bgp_io_start(void *arg);
 
 /**
  * Start function for write thread.
  *
  * Uninitializes all resources and stops the thread.
  *
- * @param result -- where to store data result, unused
+ * @param result - where to store data result, unused
  */
-extern int peer_writes_stop(void **result);
+extern int bgp_io_stop(void **result, struct frr_pthread *fpt);
 
 /**
- * Registers a peer with the write thread.
- *
- * This function adds the peer to an internal data structure, which must be
- * locked for write access. This call will block until the structure can be
- * locked.
+ * Turns on packet writing for a peer.
  *
  * After this function is called, any packets placed on peer->obuf will be
- * written to peer->fd at regular intervals.
+ * written to peer->fd at regular intervals. Additionally it becomes unsafe to
+ * use peer->fd with select() or poll().
  *
  * This function increments the peer reference counter with peer_lock().
  *
@@ -73,32 +70,58 @@ extern int peer_writes_stop(void **result);
  *
  * @param peer - peer to register
  */
-extern void peer_writes_on(struct peer *peer);
+extern void bgp_writes_on(struct peer *peer);
 
 /**
- * Deregisters a peer with the write thread.
- *
- * This function removes the peer from an internal data structure, which must
- * be locked for write access. This call will block until the structure can be
- * locked.
+ * Turns off packet writing for a peer.
  *
  * After this function is called, any packets placed on peer->obuf will not be
- * written to peer->fd.
+ * written to peer->fd. After this function returns it is safe to use peer->fd
+ * with select() or poll().
  *
- * This function decrements the peer reference counter with peer_unlock().
+ * If the flush = true, a last-ditch effort will be made to flush any remaining
+ * packets to peer->fd. Upon encountering any error whatsoever, the attempt
+ * will abort. If the caller wishes to know whether the flush succeeded they
+ * may check peer->obuf->count against zero.
  *
  * If the peer is not registered, nothing happens.
  *
  * @param peer - peer to deregister
+ * @param flush - as described
+ */
+extern void bgp_writes_off(struct peer *peer);
+
+/**
+ * Turns on packet reading for a peer.
+ *
+ * After this function is called, any packets received on peer->fd will be read
+ * and copied into the FIFO queue peer->ibuf. Additionally it becomes unsafe to
+ * use peer->fd with select() or poll().
+ *
+ * When a full packet is read, bgp_process_packet() will be scheduled on the
+ * main thread.
+ *
+ * This function increments the peer reference counter with peer_lock().
+ *
+ * If the peer is already registered, nothing happens.
+ *
+ * @param peer - peer to register
  */
-extern void peer_writes_off(struct peer *peer);
+extern void bgp_reads_on(struct peer *peer);
 
 /**
- * Notifies the write thread that there is work to be done.
+ * Turns off packet reading for a peer.
+ *
+ * After this function is called, any packets received on peer->fd will not be
+ * read. After this function returns it is safe to use peer->fd with select()
+ * or poll().
  *
- * This function has the effect of waking the write thread if it is sleeping.
- * If the thread is not sleeping, this signal will be ignored.
+ * This function decrements the peer reference counter with peer_unlock().
+ *
+ * If the peer is not registered, nothing happens.
+ *
+ * @param peer - peer to deregister
  */
-extern void peer_writes_wake(void);
+extern void bgp_reads_off(struct peer *peer);
 
 #endif /* _FRR_BGP_IO_H */
index 23f3f517325f21402f95b9368123b9593b7d33b5..c7383ae77fb873ea435a4c7478bcfca597770907 100644 (file)
@@ -1,26 +1,26 @@
 /*
* BGP Keepalives.
- *
* Implements a producer thread to generate BGP keepalives for peers.
* ----------------------------------------
* Copyright (C) 2017 Cumulus Networks, Inc.
* Quentin Young
- *
* This file is part of FRRouting.
- *
* FRRouting is free software; you can redistribute it and/or modify it under
* the terms of the GNU General Public License as published by the Free
* Software Foundation; either version 2, or (at your option) any later
* version.
- *
* FRRouting is distributed in the hope that it will be useful, but WITHOUT ANY
* WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
* FOR A PARTICULAR PURPOSE.  See the GNU General Public License for more
* details.
- *
* You should have received a copy of the GNU General Public License along with
* FRRouting; see the file COPYING.  If not, write to the Free Software
* Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
+  BGP Keepalives.
+
+  Implements a producer thread to generate BGP keepalives for peers.
+  ----------------------------------------
+  Copyright (C) 2017 Cumulus Networks, Inc.
+  Quentin Young
+
+  This file is part of FRRouting.
+
+  FRRouting is free software; you can redistribute it and/or modify it under
+  the terms of the GNU General Public License as published by the Free
+  Software Foundation; either version 2, or (at your option) any later
+  version.
+
+  FRRouting is distributed in the hope that it will be useful, but WITHOUT ANY
+  WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
+  FOR A PARTICULAR PURPOSE.  See the GNU General Public License for more
+  details.
+
+  You should have received a copy of the GNU General Public License along with
+  FRRouting; see the file COPYING.  If not, write to the Free Software
+  Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
  */
 #include <zebra.h>
 #include <signal.h>
@@ -73,7 +73,8 @@ static void pkat_del(void *pkat)
 
 
 /*
- * Walks the list of peers, sending keepalives to those that are due for them.
+ * Callback for hash_iterate. Determines if a peer needs a keepalive and if so,
+ * generates and sends it.
  *
  * For any given peer, if the elapsed time since its last keepalive exceeds its
  * configured keepalive timer, a keepalive is sent to the peer and its
@@ -143,8 +144,8 @@ static unsigned int peer_hash_key(void *arg)
 
 void peer_keepalives_init()
 {
-       peerhash_mtx = XCALLOC(MTYPE_PTHREAD, sizeof(pthread_mutex_t));
-       peerhash_cond = XCALLOC(MTYPE_PTHREAD, sizeof(pthread_cond_t));
+       peerhash_mtx = XCALLOC(MTYPE_TMP, sizeof(pthread_mutex_t));
+       peerhash_cond = XCALLOC(MTYPE_TMP, sizeof(pthread_cond_t));
 
        // initialize mutex
        pthread_mutex_init(peerhash_mtx, NULL);
@@ -175,8 +176,8 @@ static void peer_keepalives_finish(void *arg)
        pthread_mutex_destroy(peerhash_mtx);
        pthread_cond_destroy(peerhash_cond);
 
-       XFREE(MTYPE_PTHREAD, peerhash_mtx);
-       XFREE(MTYPE_PTHREAD, peerhash_cond);
+       XFREE(MTYPE_TMP, peerhash_mtx);
+       XFREE(MTYPE_TMP, peerhash_cond);
 }
 
 /**
@@ -275,9 +276,8 @@ void peer_keepalives_wake()
        pthread_mutex_unlock(peerhash_mtx);
 }
 
-int peer_keepalives_stop(void **result)
+int peer_keepalives_stop(void **result, struct frr_pthread *fpt)
 {
-       struct frr_pthread *fpt = frr_pthread_get(PTHREAD_KEEPALIVES);
        bgp_keepalives_thread_run = false;
        peer_keepalives_wake();
        pthread_join(fpt->thread, result);
index d74b69e90867a06f8907d5ffdabc531e4934e7f2..602b0da77a95ca78d2cce843f9c706c437642845 100644 (file)
@@ -24,6 +24,7 @@
 #ifndef _BGP_KEEPALIVES_H_
 #define _BGP_KEEPALIVES_H_
 
+#include "frr_pthread.h"
 #include "bgpd.h"
 
 /* Thread control flag.
@@ -88,6 +89,6 @@ extern void *peer_keepalives_start(void *arg);
 extern void peer_keepalives_wake(void);
 
 /* stop function */
-int peer_keepalives_stop(void **result);
+int peer_keepalives_stop(void **result, struct frr_pthread *fpt);
 
 #endif /* _BGP_KEEPALIVES_H */
index a89d72cc649094e12b512950efe502cac5470bb3..c21e3cbe379d355819e44f583e1b366e92c1204f 100644 (file)
 #include "bgpd/bgp_label.h"
 #include "bgpd/bgp_io.h"
 
-/* Linked list of active peers */
-static pthread_mutex_t *plist_mtx;
-static pthread_cond_t *write_cond;
-static struct list *plist;
-
-/* periodically scheduled thread to generate update-group updates */
-static struct thread *t_generate_updgrp_packets;
-
-bool bgp_packet_writes_thread_run = false;
-
 /* Set up BGP packet marker and packet type. */
 int bgp_packet_set_marker(struct stream *s, u_char type)
 {
@@ -107,11 +97,9 @@ int bgp_packet_set_size(struct stream *s)
  */
 static void bgp_packet_add(struct peer *peer, struct stream *s)
 {
-       pthread_mutex_lock(&peer->obuf_mtx);
+       pthread_mutex_lock(&peer->io_mtx);
        stream_fifo_push(peer->obuf, s);
-       pthread_mutex_unlock(&peer->obuf_mtx);
-
-       peer_writes_wake();
+       pthread_mutex_unlock(&peer->io_mtx);
 }
 
 static struct stream *bgp_update_packet_eor(struct peer *peer, afi_t afi,
@@ -165,7 +153,6 @@ static struct stream *bgp_update_packet_eor(struct peer *peer, afi_t afi,
 int bgp_generate_updgrp_packets(struct thread *thread)
 {
        struct peer *peer = THREAD_ARG(thread);
-       peer->t_generate_updgrp_packets = NULL;
 
        struct stream *s;
        struct peer_af *paf;
@@ -237,10 +224,13 @@ int bgp_generate_updgrp_packets(struct thread *thread)
 
                                                        if ((s = bgp_update_packet_eor(
                                                                     peer, afi,
-                                                                    safi)))
+                                                                    safi))) {
                                                                bgp_packet_add(
                                                                        peer,
                                                                        s);
+                                                               bgp_writes_on(
+                                                                       peer);
+                                                       }
                                                }
                                        }
                                        continue;
@@ -252,6 +242,7 @@ int bgp_generate_updgrp_packets(struct thread *thread)
                                 * attributes from peer and advance peer */
                                s = bpacket_reformat_for_peer(next_pkt, paf);
                                bgp_packet_add(peer, s);
+                               bgp_writes_on(peer);
                                bpacket_queue_advance_peer(paf);
                        }
        } while (s);
@@ -282,6 +273,8 @@ void bgp_keepalive_send(struct peer *peer)
 
        /* Add packet to the peer. */
        bgp_packet_add(peer, s);
+
+       bgp_writes_on(peer);
 }
 
 /*
@@ -335,6 +328,67 @@ void bgp_open_send(struct peer *peer)
 
        /* Add packet to the peer. */
        bgp_packet_add(peer, s);
+
+       bgp_writes_on(peer);
+}
+
+/* This is only for sending NOTIFICATION message to neighbor. */
+static int bgp_write_notify(struct peer *peer)
+{
+       int ret, val;
+       u_char type;
+       struct stream *s;
+
+       pthread_mutex_lock(&peer->io_mtx);
+       {
+               /* There should be at least one packet. */
+               s = stream_fifo_pop(peer->obuf);
+               if (!s)
+                       return 0;
+               assert(stream_get_endp(s) >= BGP_HEADER_SIZE);
+       }
+       pthread_mutex_unlock(&peer->io_mtx);
+
+       /* Stop collecting data within the socket */
+       sockopt_cork(peer->fd, 0);
+
+       /* socket is in nonblocking mode, if we can't deliver the NOTIFY, well,
+        * we only care about getting a clean shutdown at this point. */
+       ret = write(peer->fd, STREAM_DATA(s), stream_get_endp(s));
+
+       /* only connection reset/close gets counted as TCP_fatal_error, failure
+        * to write the entire NOTIFY doesn't get different FSM treatment */
+       if (ret <= 0) {
+               BGP_EVENT_ADD(peer, TCP_fatal_error);
+               return 0;
+       }
+
+       /* Disable Nagle, make NOTIFY packet go out right away */
+       val = 1;
+       (void)setsockopt(peer->fd, IPPROTO_TCP, TCP_NODELAY, (char *)&val,
+                        sizeof(val));
+
+       /* Retrieve BGP packet type. */
+       stream_set_getp(s, BGP_MARKER_SIZE + 2);
+       type = stream_getc(s);
+
+       assert(type == BGP_MSG_NOTIFY);
+
+       /* Type should be notify. */
+       peer->notify_out++;
+
+       /* Double start timer. */
+       peer->v_start *= 2;
+
+       /* Overflow check. */
+       if (peer->v_start >= (60 * 2))
+               peer->v_start = (60 * 2);
+
+       /* Handle Graceful Restart case where the state changes to
+          Connect instead of Idle */
+       BGP_EVENT_ADD(peer, BGP_Stop);
+
+       return 0;
 }
 
 /*
@@ -372,10 +426,12 @@ void bgp_notify_send_with_data(struct peer *peer, u_char code, u_char sub_code,
        /* Set BGP packet length. */
        length = bgp_packet_set_size(s);
 
-       /* Add packet to the peer. */
-       pthread_mutex_lock(&peer->obuf_mtx);
-       stream_fifo_clean(peer->obuf);
-       pthread_mutex_unlock(&peer->obuf_mtx);
+       /* wipe output buffer */
+       pthread_mutex_lock(&peer->io_mtx);
+       {
+               stream_fifo_clean(peer->obuf);
+       }
+       pthread_mutex_unlock(&peer->io_mtx);
 
        /* For debug */
        {
@@ -428,8 +484,8 @@ void bgp_notify_send_with_data(struct peer *peer, u_char code, u_char sub_code,
 
        /* Add packet to peer's output queue */
        bgp_packet_add(peer, s);
-       /* Wake up the write thread to get the notify out ASAP */
-       peer_writes_wake();
+
+       bgp_write_notify(peer);
 }
 
 /*
@@ -544,6 +600,8 @@ void bgp_route_refresh_send(struct peer *peer, afi_t afi, safi_t safi,
 
        /* Add packet to the peer. */
        bgp_packet_add(peer, s);
+
+       bgp_writes_on(peer);
 }
 
 /*
@@ -593,6 +651,8 @@ void bgp_capability_send(struct peer *peer, afi_t afi, safi_t safi,
 
        /* Add packet to the peer. */
        bgp_packet_add(peer, s);
+
+       bgp_writes_on(peer);
 }
 
 /* RFC1771 6.8 Connection collision detection. */
@@ -696,13 +756,13 @@ static int bgp_open_receive(struct peer *peer, bgp_size_t size)
        u_int16_t *holdtime_ptr;
 
        /* Parse open packet. */
-       version = stream_getc(peer->ibuf);
-       memcpy(notify_data_remote_as, stream_pnt(peer->ibuf), 2);
-       remote_as = stream_getw(peer->ibuf);
-       holdtime_ptr = (u_int16_t *)stream_pnt(peer->ibuf);
-       holdtime = stream_getw(peer->ibuf);
-       memcpy(notify_data_remote_id, stream_pnt(peer->ibuf), 4);
-       remote_id.s_addr = stream_get_ipv4(peer->ibuf);
+       version = stream_getc(peer->curr);
+       memcpy(notify_data_remote_as, stream_pnt(peer->curr), 2);
+       remote_as = stream_getw(peer->curr);
+       holdtime_ptr = (u_int16_t *)stream_pnt(peer->curr);
+       holdtime = stream_getw(peer->curr);
+       memcpy(notify_data_remote_id, stream_pnt(peer->curr), 4);
+       remote_id.s_addr = stream_get_ipv4(peer->curr);
 
        /* Receive OPEN message log  */
        if (bgp_debug_neighbor_events(peer))
@@ -714,11 +774,11 @@ static int bgp_open_receive(struct peer *peer, bgp_size_t size)
 
        /* BEGIN to read the capability here, but dont do it yet */
        mp_capability = 0;
-       optlen = stream_getc(peer->ibuf);
+       optlen = stream_getc(peer->curr);
 
        if (optlen != 0) {
                /* If not enough bytes, it is an error. */
-               if (STREAM_READABLE(peer->ibuf) < optlen) {
+               if (STREAM_READABLE(peer->curr) < optlen) {
                        bgp_notify_send(peer, BGP_NOTIFY_OPEN_ERR,
                                        BGP_NOTIFY_OPEN_MALFORMED_ATTR);
                        return -1;
@@ -990,10 +1050,6 @@ static int bgp_open_receive(struct peer *peer, bgp_size_t size)
                return (ret);
        }
 
-       peer->packet_size = 0;
-       if (peer->ibuf)
-               stream_reset(peer->ibuf);
-
        return 0;
 }
 
@@ -1177,7 +1233,7 @@ static int bgp_update_receive(struct peer *peer, bgp_size_t size)
        memset(peer->rcvd_attr_str, 0, BUFSIZ);
        peer->rcvd_attr_printed = 0;
 
-       s = peer->ibuf;
+       s = peer->curr;
        end = stream_pnt(s) + size;
 
        /* RFC1771 6.3 If the Unfeasible Routes Length or Total Attribute
@@ -1424,8 +1480,8 @@ static void bgp_notify_receive(struct peer *peer, bgp_size_t size)
                peer->notify.length = 0;
        }
 
-       bgp_notify.code = stream_getc(peer->ibuf);
-       bgp_notify.subcode = stream_getc(peer->ibuf);
+       bgp_notify.code = stream_getc(peer->curr);
+       bgp_notify.subcode = stream_getc(peer->curr);
        bgp_notify.length = size - 2;
        bgp_notify.data = NULL;
 
@@ -1436,7 +1492,7 @@ static void bgp_notify_receive(struct peer *peer, bgp_size_t size)
        if (bgp_notify.length) {
                peer->notify.length = size - 2;
                peer->notify.data = XMALLOC(MTYPE_TMP, size - 2);
-               memcpy(peer->notify.data, stream_pnt(peer->ibuf), size - 2);
+               memcpy(peer->notify.data, stream_pnt(peer->curr), size - 2);
        }
 
        /* For debug */
@@ -1451,12 +1507,12 @@ static void bgp_notify_receive(struct peer *peer, bgp_size_t size)
                        for (i = 0; i < bgp_notify.length; i++)
                                if (first) {
                                        sprintf(c, " %02x",
-                                               stream_getc(peer->ibuf));
+                                               stream_getc(peer->curr));
                                        strcat(bgp_notify.data, c);
                                } else {
                                        first = 1;
                                        sprintf(c, "%02x",
-                                               stream_getc(peer->ibuf));
+                                               stream_getc(peer->curr));
                                        strcpy(bgp_notify.data, c);
                                }
                        bgp_notify.raw_data = (u_char *)peer->notify.data;
@@ -1526,7 +1582,7 @@ static void bgp_route_refresh_receive(struct peer *peer, bgp_size_t size)
                return;
        }
 
-       s = peer->ibuf;
+       s = peer->curr;
 
        /* Parse packet. */
        pkt_afi = stream_getw(s);
@@ -1874,7 +1930,7 @@ int bgp_capability_receive(struct peer *peer, bgp_size_t size)
        u_char *pnt;
 
        /* Fetch pointer. */
-       pnt = stream_pnt(peer->ibuf);
+       pnt = stream_pnt(peer->curr);
 
        if (bgp_debug_neighbor_events(peer))
                zlog_debug("%s rcv CAPABILITY", peer->host);
@@ -1902,188 +1958,50 @@ int bgp_capability_receive(struct peer *peer, bgp_size_t size)
        return bgp_capability_msg_parse(peer, pnt, size);
 }
 
-/* BGP read utility function. */
-static int bgp_read_packet(struct peer *peer)
+/* Starting point of packet process function. */
+int bgp_process_packet(struct thread *thread)
 {
-       int nbytes;
-       int readsize;
-
-       readsize = peer->packet_size - stream_get_endp(peer->ibuf);
+       /* Yes first of all get peer pointer. */
+       struct peer *peer;
+       peer = THREAD_ARG(thread);
 
-       /* If size is zero then return. */
-       if (!readsize)
+       /* Guard against scheduled events that occur after peer deletion. */
+       if (peer->status == Deleted)
                return 0;
 
-       /* Read packet from fd. */
-       nbytes = stream_read_try(peer->ibuf, peer->fd, readsize);
-
-       /* If read byte is smaller than zero then error occured. */
-       if (nbytes < 0) {
-               /* Transient error should retry */
-               if (nbytes == -2)
-                       return -1;
-
-               zlog_err("%s [Error] bgp_read_packet error: %s", peer->host,
-                        safe_strerror(errno));
-
-               if (peer->status == Established) {
-                       if (CHECK_FLAG(peer->sflags, PEER_STATUS_NSF_MODE)) {
-                               peer->last_reset = PEER_DOWN_NSF_CLOSE_SESSION;
-                               SET_FLAG(peer->sflags, PEER_STATUS_NSF_WAIT);
-                       } else
-                               peer->last_reset = PEER_DOWN_CLOSE_SESSION;
-               }
-
-               BGP_EVENT_ADD(peer, TCP_fatal_error);
-               return -1;
-       }
-
-       /* When read byte is zero : clear bgp peer and return */
-       if (nbytes == 0) {
-               if (bgp_debug_neighbor_events(peer))
-                       zlog_debug("%s [Event] BGP connection closed fd %d",
-                                  peer->host, peer->fd);
-
-               if (peer->status == Established) {
-                       if (CHECK_FLAG(peer->sflags, PEER_STATUS_NSF_MODE)) {
-                               peer->last_reset = PEER_DOWN_NSF_CLOSE_SESSION;
-                               SET_FLAG(peer->sflags, PEER_STATUS_NSF_WAIT);
-                       } else
-                               peer->last_reset = PEER_DOWN_CLOSE_SESSION;
-               }
-
-               BGP_EVENT_ADD(peer, TCP_connection_closed);
-               return -1;
-       }
-
-       /* We read partial packet. */
-       if (stream_get_endp(peer->ibuf) != peer->packet_size)
-               return -1;
-
-       return 0;
-}
-
-/* Marker check. */
-static int bgp_marker_all_one(struct stream *s, int length)
-{
-       int i;
-
-       for (i = 0; i < length; i++)
-               if (s->data[i] != 0xff)
-                       return 0;
-
-       return 1;
-}
-
-/* Starting point of packet process function. */
-int bgp_read(struct thread *thread)
-{
-       int ret;
        u_char type = 0;
-       struct peer *peer;
        bgp_size_t size;
        char notify_data_length[2];
        u_int32_t notify_out;
 
-       /* Yes first of all get peer pointer. */
-       peer = THREAD_ARG(thread);
-       peer->t_read = NULL;
-
        /* Note notify_out so we can check later to see if we sent another one
         */
        notify_out = peer->notify_out;
 
-       if (peer->fd < 0) {
-               zlog_err("bgp_read(): peer's fd is negative value %d",
-                        peer->fd);
-               return -1;
+       pthread_mutex_lock(&peer->io_mtx);
+       {
+               peer->curr = stream_fifo_pop(peer->ibuf);
        }
+       pthread_mutex_unlock(&peer->io_mtx);
 
-       BGP_READ_ON(peer->t_read, bgp_read, peer->fd);
-
-       /* Read packet header to determine type of the packet */
-       if (peer->packet_size == 0)
-               peer->packet_size = BGP_HEADER_SIZE;
-
-       if (stream_get_endp(peer->ibuf) < BGP_HEADER_SIZE) {
-               ret = bgp_read_packet(peer);
-
-               /* Header read error or partial read packet. */
-               if (ret < 0)
-                       goto done;
-
-               /* Get size and type. */
-               stream_forward_getp(peer->ibuf, BGP_MARKER_SIZE);
-               memcpy(notify_data_length, stream_pnt(peer->ibuf), 2);
-               size = stream_getw(peer->ibuf);
-               type = stream_getc(peer->ibuf);
-
-               /* Marker check */
-               if (((type == BGP_MSG_OPEN) || (type == BGP_MSG_KEEPALIVE))
-                   && !bgp_marker_all_one(peer->ibuf, BGP_MARKER_SIZE)) {
-                       bgp_notify_send(peer, BGP_NOTIFY_HEADER_ERR,
-                                       BGP_NOTIFY_HEADER_NOT_SYNC);
-                       goto done;
-               }
-
-               /* BGP type check. */
-               if (type != BGP_MSG_OPEN && type != BGP_MSG_UPDATE
-                   && type != BGP_MSG_NOTIFY && type != BGP_MSG_KEEPALIVE
-                   && type != BGP_MSG_ROUTE_REFRESH_NEW
-                   && type != BGP_MSG_ROUTE_REFRESH_OLD
-                   && type != BGP_MSG_CAPABILITY) {
-                       if (bgp_debug_neighbor_events(peer))
-                               zlog_debug("%s unknown message type 0x%02x",
-                                          peer->host, type);
-                       bgp_notify_send_with_data(peer, BGP_NOTIFY_HEADER_ERR,
-                                                 BGP_NOTIFY_HEADER_BAD_MESTYPE,
-                                                 &type, 1);
-                       goto done;
-               }
-               /* Mimimum packet length check. */
-               if ((size < BGP_HEADER_SIZE) || (size > BGP_MAX_PACKET_SIZE)
-                   || (type == BGP_MSG_OPEN && size < BGP_MSG_OPEN_MIN_SIZE)
-                   || (type == BGP_MSG_UPDATE
-                       && size < BGP_MSG_UPDATE_MIN_SIZE)
-                   || (type == BGP_MSG_NOTIFY
-                       && size < BGP_MSG_NOTIFY_MIN_SIZE)
-                   || (type == BGP_MSG_KEEPALIVE
-                       && size != BGP_MSG_KEEPALIVE_MIN_SIZE)
-                   || (type == BGP_MSG_ROUTE_REFRESH_NEW
-                       && size < BGP_MSG_ROUTE_REFRESH_MIN_SIZE)
-                   || (type == BGP_MSG_ROUTE_REFRESH_OLD
-                       && size < BGP_MSG_ROUTE_REFRESH_MIN_SIZE)
-                   || (type == BGP_MSG_CAPABILITY
-                       && size < BGP_MSG_CAPABILITY_MIN_SIZE)) {
-                       if (bgp_debug_neighbor_events(peer))
-                               zlog_debug("%s bad message length - %d for %s",
-                                          peer->host, size,
-                                          type == 128
-                                                  ? "ROUTE-REFRESH"
-                                                  : bgp_type_str[(int)type]);
-                       bgp_notify_send_with_data(peer, BGP_NOTIFY_HEADER_ERR,
-                                                 BGP_NOTIFY_HEADER_BAD_MESLEN,
-                                                 (u_char *)notify_data_length,
-                                                 2);
-                       goto done;
-               }
+       if (peer->curr == NULL) // no packets to process, hmm...
+               return 0;
 
-               /* Adjust size to message length. */
-               peer->packet_size = size;
-       }
+       bgp_size_t actual_size = stream_get_endp(peer->curr);
 
-       ret = bgp_read_packet(peer);
-       if (ret < 0)
-               goto done;
+       /* skip the marker and copy the packet length */
+       stream_forward_getp(peer->curr, BGP_MARKER_SIZE);
+       memcpy(notify_data_length, stream_pnt(peer->curr), 2);
 
-       /* Get size and type again. */
-       (void)stream_getw_from(peer->ibuf, BGP_MARKER_SIZE);
-       type = stream_getc_from(peer->ibuf, BGP_MARKER_SIZE + 2);
+       /* read in the packet length and type */
+       size = stream_getw(peer->curr);
+       type = stream_getc(peer->curr);
 
        /* BGP packet dump function. */
-       bgp_dump_packet(peer, type, peer->ibuf);
+       bgp_dump_packet(peer, type, peer->curr);
 
-       size = (peer->packet_size - BGP_HEADER_SIZE);
+       /* adjust size to exclude the marker + length + type */
+       size -= BGP_HEADER_SIZE;
 
        /* Read rest of the packet and call each sort of packet routine */
        switch (type) {
@@ -2118,26 +2036,14 @@ int bgp_read(struct thread *thread)
         * of the packet for troubleshooting purposes
         */
        if (notify_out < peer->notify_out) {
-               memcpy(peer->last_reset_cause, peer->ibuf->data,
-                      peer->packet_size);
-               peer->last_reset_cause_size = peer->packet_size;
-               notify_out = peer->notify_out;
+               memcpy(peer->last_reset_cause, peer->curr->data, actual_size);
+               peer->last_reset_cause_size = actual_size;
        }
 
-       /* Clear input buffer. */
-       peer->packet_size = 0;
-       if (peer->ibuf)
-               stream_reset(peer->ibuf);
-
-done:
-       /* If reading this packet caused us to send a NOTIFICATION then store a
-        * copy
-        * of the packet for troubleshooting purposes
-        */
-       if (notify_out < peer->notify_out) {
-               memcpy(peer->last_reset_cause, peer->ibuf->data,
-                      peer->packet_size);
-               peer->last_reset_cause_size = peer->packet_size;
+       /* Delete packet and carry on. */
+       if (peer->curr) {
+               stream_free(peer->curr);
+               peer->curr = NULL;
        }
 
        return 0;
index d7080d7fb63861d74450f40940b0f17918c63d66..502dbbdeedd3df6994d1dfb3f3afe65b03a374a3 100644 (file)
@@ -38,8 +38,6 @@
 #define ORF_COMMON_PART_DENY       0x20 
 
 /* Packet send and receive function prototypes. */
-extern int bgp_read(struct thread *);
-
 extern void bgp_keepalive_send(struct peer *);
 extern void bgp_open_send(struct peer *);
 extern void bgp_notify_send(struct peer *, u_int8_t, u_int8_t);
@@ -68,5 +66,6 @@ extern int bgp_packet_set_size(struct stream *s);
 extern bool bgp_packet_writes_thread_run;
 
 extern int bgp_generate_updgrp_packets(struct thread *);
+extern int bgp_process_packet(struct thread *);
 
 #endif /* _QUAGGA_BGP_PACKET_H */
index a50bc05fedb5fdb19554d88f07028682190738df..3e503a8be7404cf0345bcc41fdd3d68ce81e6067 100644 (file)
@@ -179,7 +179,7 @@ struct update_subgroup {
        struct stream *work;
 
        /* We use a separate stream to encode MP_REACH_NLRI for efficient
-        * NLRI packing. peer->work stores all the other attributes. The
+        * NLRI packing. peer->obuf_work stores all the other attributes. The
         * actual packet is then constructed by concatenating the two.
         */
        struct stream *scratch;
index af702ac853e79b69cd40ce9550e7ed7dae40625b..0953790606f193ff513ce415f3e2f4de3495e2e3 100644 (file)
@@ -7068,14 +7068,40 @@ static int bgp_show_summary(struct vty *vty, struct bgp *bgp, int afi, int safi,
 
                        vty_out(vty, "4 %10u %7u %7u %8" PRIu64 " %4d %4zd %8s",
                                peer->as,
-                               peer->open_in + peer->update_in
-                                       + peer->keepalive_in + peer->notify_in
-                                       + peer->refresh_in
-                                       + peer->dynamic_cap_in,
-                               peer->open_out + peer->update_out
-                                       + peer->keepalive_out + peer->notify_out
-                                       + peer->refresh_out
-                                       + peer->dynamic_cap_out,
+                               atomic_load_explicit(&peer->open_in,
+                                                    memory_order_relaxed)
+                                       + atomic_load_explicit(
+                                                 &peer->update_in,
+                                                 memory_order_relaxed)
+                                       + atomic_load_explicit(
+                                                 &peer->keepalive_in,
+                                                 memory_order_relaxed)
+                                       + atomic_load_explicit(
+                                                 &peer->notify_in,
+                                                 memory_order_relaxed)
+                                       + atomic_load_explicit(
+                                                 &peer->refresh_in,
+                                                 memory_order_relaxed)
+                                       + atomic_load_explicit(
+                                                 &peer->dynamic_cap_in,
+                                                 memory_order_relaxed),
+                               atomic_load_explicit(&peer->open_out,
+                                                    memory_order_relaxed)
+                                       + atomic_load_explicit(
+                                                 &peer->update_out,
+                                                 memory_order_relaxed)
+                                       + atomic_load_explicit(
+                                                 &peer->keepalive_out,
+                                                 memory_order_relaxed)
+                                       + atomic_load_explicit(
+                                                 &peer->notify_out,
+                                                 memory_order_relaxed)
+                                       + atomic_load_explicit(
+                                                 &peer->refresh_out,
+                                                 memory_order_relaxed)
+                                       + atomic_load_explicit(
+                                                 &peer->dynamic_cap_out,
+                                                 memory_order_relaxed),
                                peer->version[afi][safi], 0, peer->obuf->count,
                                peer_uptime(peer->uptime, timebuf,
                                            BGP_UPTIME_LEN, 0, NULL));
index cfe5d5c670b0f6d5eb7430098d05102c49954694..c034eda24503671dc6a176b367a65e67b2e41b9a 100644 (file)
@@ -992,10 +992,14 @@ static void peer_free(struct peer *peer)
         * but just to be sure..
         */
        bgp_timer_set(peer);
-       BGP_READ_OFF(peer->t_read);
-       peer_writes_off(peer);
+       bgp_reads_off(peer);
+       bgp_writes_off(peer);
+       assert(!peer->t_write);
+       assert(!peer->t_read);
        BGP_EVENT_FLUSH(peer);
 
+       pthread_mutex_destroy(&peer->io_mtx);
+
        /* Free connected nexthop, if present */
        if (CHECK_FLAG(peer->flags, PEER_FLAG_CONFIG_NODE)
            && !peer_dynamic_neighbor(peer))
@@ -1138,11 +1142,11 @@ struct peer *peer_new(struct bgp *bgp)
        SET_FLAG(peer->sflags, PEER_STATUS_CAPABILITY_OPEN);
 
        /* Create buffers.  */
-       peer->ibuf = stream_new(BGP_MAX_PACKET_SIZE);
+       peer->ibuf = stream_fifo_new();
        peer->obuf = stream_fifo_new();
-       pthread_mutex_init(&peer->obuf_mtx, NULL);
+       pthread_mutex_init(&peer->io_mtx, NULL);
 
-       /* We use a larger buffer for peer->work in the event that:
+       /* We use a larger buffer for peer->obuf_work in the event that:
         * - We RX a BGP_UPDATE where the attributes alone are just
         *   under BGP_MAX_PACKET_SIZE
         * - The user configures an outbound route-map that does many as-path
@@ -1156,8 +1160,9 @@ struct peer *peer_new(struct bgp *bgp)
         * bounds
         * checking for every single attribute as we construct an UPDATE.
         */
-       peer->work =
+       peer->obuf_work =
                stream_new(BGP_MAX_PACKET_SIZE + BGP_MAX_PACKET_SIZE_OVERFLOW);
+       peer->ibuf_work = stream_new(BGP_MAX_PACKET_SIZE);
        peer->scratch = stream_new(BGP_MAX_PACKET_SIZE);
 
 
@@ -2086,6 +2091,11 @@ int peer_delete(struct peer *peer)
        bgp = peer->bgp;
        accept_peer = CHECK_FLAG(peer->sflags, PEER_STATUS_ACCEPT_PEER);
 
+       bgp_reads_off(peer);
+       bgp_writes_off(peer);
+       assert(!CHECK_FLAG(peer->thread_flags, PEER_THREAD_WRITES_ON));
+       assert(!CHECK_FLAG(peer->thread_flags, PEER_THREAD_READS_ON));
+
        if (CHECK_FLAG(peer->sflags, PEER_STATUS_NSF_WAIT))
                peer_nsf_stop(peer);
 
@@ -2147,7 +2157,7 @@ int peer_delete(struct peer *peer)
 
        /* Buffers.  */
        if (peer->ibuf) {
-               stream_free(peer->ibuf);
+               stream_fifo_free(peer->ibuf);
                peer->ibuf = NULL;
        }
 
@@ -2156,9 +2166,14 @@ int peer_delete(struct peer *peer)
                peer->obuf = NULL;
        }
 
-       if (peer->work) {
-               stream_free(peer->work);
-               peer->work = NULL;
+       if (peer->ibuf_work) {
+               stream_free(peer->ibuf_work);
+               peer->ibuf_work = NULL;
+       }
+
+       if (peer->obuf_work) {
+               stream_free(peer->obuf_work);
+               peer->obuf_work = NULL;
        }
 
        if (peer->scratch) {
@@ -7389,20 +7404,24 @@ void bgp_pthreads_init()
 {
        frr_pthread_init();
 
-       frr_pthread_new("BGP write thread", PTHREAD_WRITE, peer_writes_start,
-                       peer_writes_stop);
+       frr_pthread_new("BGP i/o thread", PTHREAD_IO, bgp_io_start,
+                       bgp_io_stop);
        frr_pthread_new("BGP keepalives thread", PTHREAD_KEEPALIVES,
                        peer_keepalives_start, peer_keepalives_stop);
 
        /* pre-run initialization */
        peer_keepalives_init();
-       peer_writes_init();
+       bgp_io_init();
 }
 
 void bgp_pthreads_run()
 {
-       frr_pthread_run(PTHREAD_WRITE, NULL, NULL);
-       frr_pthread_run(PTHREAD_KEEPALIVES, NULL, NULL);
+       pthread_attr_t attr;
+       pthread_attr_init(&attr);
+       pthread_attr_setschedpolicy(&attr, SCHED_FIFO);
+
+       frr_pthread_run(PTHREAD_IO, &attr, NULL);
+       frr_pthread_run(PTHREAD_KEEPALIVES, &attr, NULL);
 }
 
 void bgp_pthreads_finish()
index 4fb784e2455b8cc139e7f519341769bb2627006b..208cd897d7731b5132f03a885d729aa56557c856 100644 (file)
@@ -101,7 +101,7 @@ struct bgp_master {
        struct thread_master *master;
 
 /* BGP pthreads. */
-#define PTHREAD_WRITE           (1 << 1)
+#define PTHREAD_IO              (1 << 1)
 #define PTHREAD_KEEPALIVES      (1 << 2)
 
        /* work queues */
@@ -589,13 +589,17 @@ struct peer {
        struct in_addr local_id;
 
        /* Packet receive and send buffer. */
-       struct stream *ibuf;
-       pthread_mutex_t obuf_mtx;
-       struct stream_fifo *obuf;
-       struct stream *work;
+       pthread_mutex_t io_mtx;   // guards ibuf, obuf
+       struct stream_fifo *ibuf; // packets waiting to be processed
+       struct stream_fifo *obuf; // packets waiting to be written
+
+       struct stream *ibuf_work; // WiP buffer used by bgp_read() only
+       struct stream *obuf_work; // WiP buffer used to construct packets
+
+       struct stream *curr; // the current packet being parsed
 
        /* We use a separate stream to encode MP_REACH_NLRI for efficient
-        * NLRI packing. peer->work stores all the other attributes. The
+        * NLRI packing. peer->obuf_work stores all the other attributes. The
         * actual packet is then constructed by concatenating the two.
         */
        struct stream *scratch;
@@ -799,7 +803,9 @@ struct peer {
 
        /* Threads. */
        struct thread *t_read;
+       struct thread *t_write;
        struct thread *t_start;
+       struct thread *t_connect_check;
        struct thread *t_connect;
        struct thread *t_holdtime;
        struct thread *t_routeadv;
@@ -807,11 +813,13 @@ struct peer {
        struct thread *t_gr_restart;
        struct thread *t_gr_stale;
        struct thread *t_generate_updgrp_packets;
+       struct thread *t_process_packet;
 
        /* Thread flags. */
        u_int16_t thread_flags;
-#define PEER_THREAD_WRITES_ON         (1 << 0)
-#define PEER_THREAD_KEEPALIVES_ON     (1 << 1)
+#define PEER_THREAD_WRITES_ON         (1 << 1)
+#define PEER_THREAD_READS_ON          (1 << 2)
+#define PEER_THREAD_KEEPALIVES_ON     (1 << 3)
        /* workqueues */
        struct work_queue *clear_node_queue;
 
@@ -853,9 +861,6 @@ struct peer {
        /* Notify data. */
        struct bgp_notify notify;
 
-       /* Whole packet size to be read. */
-       unsigned long packet_size;
-
        /* Filter structure. */
        struct bgp_filter filter[AFI_MAX][SAFI_MAX];
 
@@ -1149,7 +1154,7 @@ enum bgp_clear_type {
 };
 
 /* Macros. */
-#define BGP_INPUT(P)         ((P)->ibuf)
+#define BGP_INPUT(P)         ((P)->curr)
 #define BGP_INPUT_PNT(P)     (STREAM_PNT(BGP_INPUT(P)))
 #define BGP_IS_VALID_STATE_FOR_NOTIF(S)                                        \
        (((S) == OpenSent) || ((S) == OpenConfirm) || ((S) == Established))
index 15a29442f47079001448e04433848e745ac4432c..1c342a2ff82295016e39c369a29b547f6b452c0d 100644 (file)
@@ -1304,18 +1304,29 @@ static int rfapi_open_inner(struct rfapi_descriptor *rfd, struct bgp *bgp,
        rfd->peer = peer_new(bgp);
        rfd->peer->status = Established; /* keep bgp core happy */
        bgp_sync_delete(rfd->peer);      /* don't need these */
-       if (rfd->peer->ibuf) {
-               stream_free(rfd->peer->ibuf); /* don't need it */
+
+       // since this peer is not on the I/O thread, this lock is not strictly
+       // necessary, but serves as a reminder to those who may meddle...
+       pthread_mutex_lock(&rfd->peer->io_mtx);
+       {
+               // we don't need any I/O related facilities
+               if (rfd->peer->ibuf)
+                       stream_fifo_free(rfd->peer->ibuf);
+               if (rfd->peer->obuf)
+                       stream_fifo_free(rfd->peer->obuf);
+
+               if (rfd->peer->ibuf_work)
+                       stream_free(rfd->peer->ibuf_work);
+               if (rfd->peer->obuf_work)
+                       stream_free(rfd->peer->obuf_work);
+
                rfd->peer->ibuf = NULL;
-       }
-       if (rfd->peer->obuf) {
-               stream_fifo_free(rfd->peer->obuf); /* don't need it */
                rfd->peer->obuf = NULL;
+               rfd->peer->obuf_work = NULL;
+               rfd->peer->ibuf_work = NULL;
        }
-       if (rfd->peer->work) {
-               stream_free(rfd->peer->work); /* don't need it */
-               rfd->peer->work = NULL;
-       }
+       pthread_mutex_unlock(&rfd->peer->io_mtx);
+
        { /* base code assumes have valid host pointer */
                char buf[BUFSIZ];
                buf[0] = 0;
index 5c71df238fe5951dc862e40326f302c3d2f11a91..5f798923811bcc9ae86c1f6116f3a49bd0ef0a5e 100644 (file)
@@ -183,22 +183,31 @@ static void vnc_redistribute_add(struct prefix *p, u_int32_t metric,
                        vncHD1VR.peer->status =
                                Established; /* keep bgp core happy */
                        bgp_sync_delete(vncHD1VR.peer); /* don't need these */
-                       if (vncHD1VR.peer->ibuf) {
-                               stream_free(vncHD1VR.peer
-                                                   ->ibuf); /* don't need it */
+
+                       // since this peer is not on the I/O thread, this lock
+                       // is not strictly
+                       // necessary, but serves as a reminder to those who may
+                       // meddle...
+                       pthread_mutex_lock(&vncHD1VR.peer->io_mtx);
+                       {
+                               // we don't need any I/O related facilities
+                               if (vncHD1VR.peer->ibuf)
+                                       stream_fifo_free(vncHD1VR.peer->ibuf);
+                               if (vncHD1VR.peer->obuf)
+                                       stream_fifo_free(vncHD1VR.peer->obuf);
+
+                               if (vncHD1VR.peer->ibuf_work)
+                                       stream_free(vncHD1VR.peer->ibuf_work);
+                               if (vncHD1VR.peer->obuf_work)
+                                       stream_free(vncHD1VR.peer->obuf_work);
+
                                vncHD1VR.peer->ibuf = NULL;
-                       }
-                       if (vncHD1VR.peer->obuf) {
-                               stream_fifo_free(
-                                       vncHD1VR.peer
-                                               ->obuf); /* don't need it */
                                vncHD1VR.peer->obuf = NULL;
+                               vncHD1VR.peer->obuf_work = NULL;
+                               vncHD1VR.peer->ibuf_work = NULL;
                        }
-                       if (vncHD1VR.peer->work) {
-                               stream_free(vncHD1VR.peer
-                                                   ->work); /* don't need it */
-                               vncHD1VR.peer->work = NULL;
-                       }
+                       pthread_mutex_unlock(&vncHD1VR.peer->io_mtx);
+
                        /* base code assumes have valid host pointer */
                        vncHD1VR.peer->host =
                                XSTRDUP(MTYPE_BGP_PEER_HOST, ".zebra.");
index a805ae34540fd99eb280e5d10f5499e90acaf2c5..cb5d1d47ae23b48c66c1b5762be80d8aa4bb551c 100644 (file)
@@ -36,7 +36,6 @@
 DEFINE_MTYPE_STATIC(LIB, THREAD, "Thread")
 DEFINE_MTYPE_STATIC(LIB, THREAD_MASTER, "Thread master")
 DEFINE_MTYPE_STATIC(LIB, THREAD_STATS, "Thread stats")
-DEFINE_MTYPE(LIB, PTHREAD, "POSIX Thread")
 
 #if defined(__APPLE__)
 #include <mach/mach.h>
index 1cf19d987fc047462d73d0bb7399023406943abb..c830446e10209f4ead167c06729fae3d757eef41 100644 (file)
@@ -26,9 +26,6 @@
 #include <poll.h>
 #include "monotime.h"
 
-#include "memory.h"
-DECLARE_MTYPE(PTHREAD)
-
 struct rusage_t {
        struct rusage cpu;
        struct timeval real;