summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorQuentin Young <qlyoung@cumulusnetworks.com>2017-02-06 23:39:06 +0000
committerQuentin Young <qlyoung@cumulusnetworks.com>2017-11-30 16:17:57 -0500
commitd3ecc69e5fba1873872a1f4dc359ff1934f81848 (patch)
treebcdf393161c7f7ca5e8fffbb1208362904bb80a6
parent69df82f3b5d3aa343c769f280fb811ce019601d4 (diff)
bgpd: move packet writes into dedicated pthread
* BGP_WRITE_ON() removed * BGP_WRITE_OFF() removed * peer_writes_on() added * peer_writes_off() added * bgp_write_proceed_actions() removed Signed-off-by: Quentin Young <qlyoung@cumulusnetworks.com>
-rw-r--r--bgpd/bgp_fsm.c41
-rw-r--r--bgpd/bgp_fsm.h12
-rw-r--r--bgpd/bgp_main.c4
-rw-r--r--bgpd/bgp_packet.c581
-rw-r--r--bgpd/bgp_packet.h11
-rw-r--r--bgpd/bgp_updgrp.c17
-rw-r--r--bgpd/bgp_updgrp.h1
-rw-r--r--bgpd/bgp_updgrp_adv.c12
-rw-r--r--bgpd/bgp_updgrp_packet.c3
-rw-r--r--bgpd/bgpd.c3
-rw-r--r--bgpd/bgpd.h3
11 files changed, 365 insertions, 323 deletions
diff --git a/bgpd/bgp_fsm.c b/bgpd/bgp_fsm.c
index 8de7e970de..bc4f8272f3 100644
--- a/bgpd/bgp_fsm.c
+++ b/bgpd/bgp_fsm.c
@@ -125,9 +125,9 @@ static struct peer *peer_xfer_conn(struct peer *from_peer)
from_peer->host, from_peer, from_peer->fd, peer,
peer->fd);
- BGP_WRITE_OFF(peer->t_write);
+ peer_writes_off(peer);
BGP_READ_OFF(peer->t_read);
- BGP_WRITE_OFF(from_peer->t_write);
+ peer_writes_off(from_peer);
BGP_READ_OFF(from_peer->t_read);
BGP_TIMER_OFF(peer->t_routeadv);
@@ -137,8 +137,18 @@ static struct peer *peer_xfer_conn(struct peer *from_peer)
peer->fd = from_peer->fd;
from_peer->fd = fd;
stream_reset(peer->ibuf);
- stream_fifo_clean(peer->obuf);
- stream_fifo_clean(from_peer->obuf);
+
+ pthread_mutex_lock(&peer->obuf_mtx);
+ {
+ stream_fifo_clean(peer->obuf);
+ }
+ pthread_mutex_unlock(&peer->obuf_mtx);
+
+ pthread_mutex_lock(&from_peer->obuf_mtx);
+ {
+ stream_fifo_clean(from_peer->obuf);
+ }
+ pthread_mutex_unlock(&from_peer->obuf_mtx);
peer->as = from_peer->as;
peer->v_holdtime = from_peer->v_holdtime;
@@ -217,7 +227,7 @@ static struct peer *peer_xfer_conn(struct peer *from_peer)
}
BGP_READ_ON(peer->t_read, bgp_read, peer->fd);
- BGP_WRITE_ON(peer->t_write, bgp_write, peer->fd);
+ peer_writes_on(peer);
if (from_peer)
peer_xfer_stats(peer, from_peer);
@@ -433,8 +443,6 @@ int bgp_routeadv_timer(struct thread *thread)
peer->synctime = bgp_clock();
- BGP_WRITE_ON(peer->t_write, bgp_write, peer->fd);
-
/* MRAI timer will be started again when FIFO is built, no need to
* do it here.
*/
@@ -640,7 +648,6 @@ void bgp_adjust_routeadv(struct peer *peer)
BGP_TIMER_OFF(peer->t_routeadv);
peer->synctime = bgp_clock();
- BGP_WRITE_ON(peer->t_write, bgp_write, peer->fd);
return;
}
@@ -956,6 +963,9 @@ int bgp_stop(struct peer *peer)
char orf_name[BUFSIZ];
int ret = 0;
+ // immediately remove from threads
+ peer_writes_off(peer);
+
if (peer_dynamic_neighbor(peer)
&& !(CHECK_FLAG(peer->flags, PEER_FLAG_DELETE))) {
if (bgp_debug_neighbor_events(peer))
@@ -1037,7 +1047,7 @@ int bgp_stop(struct peer *peer)
/* Stop read and write threads when exists. */
BGP_READ_OFF(peer->t_read);
- BGP_WRITE_OFF(peer->t_write);
+ peer_writes_off(peer);
/* Stop all timers. */
BGP_TIMER_OFF(peer->t_start);
@@ -1054,8 +1064,13 @@ int bgp_stop(struct peer *peer)
stream_reset(peer->ibuf);
if (peer->work)
stream_reset(peer->work);
- if (peer->obuf)
- stream_fifo_clean(peer->obuf);
+
+ pthread_mutex_lock(&peer->obuf_mtx);
+ {
+ if (peer->obuf)
+ stream_fifo_clean(peer->obuf);
+ }
+ pthread_mutex_unlock(&peer->obuf_mtx);
/* Close of file descriptor. */
if (peer->fd >= 0) {
@@ -1173,6 +1188,8 @@ static int bgp_connect_success(struct peer *peer)
return -1;
}
+ peer_writes_on(peer);
+
if (bgp_getsockname(peer) < 0) {
zlog_err("%s: bgp_getsockname(): failed for peer %s, fd %d",
__FUNCTION__, peer->host, peer->fd);
@@ -1313,7 +1330,7 @@ int bgp_start(struct peer *peer)
return -1;
}
BGP_READ_ON(peer->t_read, bgp_read, peer->fd);
- BGP_WRITE_ON(peer->t_write, bgp_write, peer->fd);
+ peer_writes_on(peer);
break;
}
return 0;
diff --git a/bgpd/bgp_fsm.h b/bgpd/bgp_fsm.h
index 51d5d7aaa8..a7abfdb2f4 100644
--- a/bgpd/bgp_fsm.h
+++ b/bgpd/bgp_fsm.h
@@ -35,18 +35,6 @@
THREAD_READ_OFF(T); \
} while (0)
-#define BGP_WRITE_ON(T, F, V) \
- do { \
- if ((peer)->status != Deleted) \
- thread_add_write(bm->master, (F), (peer), (V), &(T)); \
- } while (0)
-
-#define BGP_PEER_WRITE_ON(T, F, V, peer) \
- do { \
- if ((peer)->status != Deleted) \
- thread_add_write(bm->master, (F), (peer), (V), &(T)); \
- } while (0)
-
#define BGP_WRITE_OFF(T) \
do { \
if (T) \
diff --git a/bgpd/bgp_main.c b/bgpd/bgp_main.c
index 1fac2936eb..b4ea45b3e0 100644
--- a/bgpd/bgp_main.c
+++ b/bgpd/bgp_main.c
@@ -54,6 +54,7 @@
#include "bgpd/bgp_debug.h"
#include "bgpd/bgp_filter.h"
#include "bgpd/bgp_zebra.h"
+#include "bgpd/bgp_packet.h"
#ifdef ENABLE_BGP_VNC
#include "bgpd/rfapi/rfapi_backend.h"
@@ -392,6 +393,9 @@ int main(int argc, char **argv)
snprintf(bgpd_di.startinfo, sizeof(bgpd_di.startinfo), ", bgp@%s:%d",
(bm->address ? bm->address : "<all>"), bm->port);
+ pthread_t packet_writes;
+ pthread_create(&packet_writes, NULL, &peer_writes_start, NULL);
+
frr_config_fork();
frr_run(bm->master);
diff --git a/bgpd/bgp_packet.c b/bgpd/bgp_packet.c
index a955b3512c..e27b416d07 100644
--- a/bgpd/bgp_packet.c
+++ b/bgpd/bgp_packet.c
@@ -19,6 +19,7 @@
*/
#include <zebra.h>
+#include <sys/time.h>
#include "thread.h"
#include "stream.h"
@@ -55,6 +56,13 @@
#include "bgpd/bgp_updgrp.h"
#include "bgpd/bgp_label.h"
+/* Linked list of active peers */
+static pthread_mutex_t plist_mtx = PTHREAD_MUTEX_INITIALIZER;
+static pthread_cond_t write_cond = PTHREAD_COND_INITIALIZER;
+static struct list *plist;
+
+bool bgp_packet_writes_thread_run;
+
/* Set up BGP packet marker and packet type. */
int bgp_packet_set_marker(struct stream *s, u_char type)
{
@@ -87,21 +95,40 @@ int bgp_packet_set_size(struct stream *s)
return cp;
}
-/* Add new packet to the peer. */
-void bgp_packet_add(struct peer *peer, struct stream *s)
+/**
+ * Push a packet onto the beginning of the peer's output queue.
+ * Must be externally synchronized around 'peer'.
+ */
+static void bgp_packet_add_unsafe(struct peer *peer, struct stream *s)
{
/* Add packet to the end of list. */
stream_fifo_push(peer->obuf, s);
+ peer_writes_wake();
+}
+
+/*
+ * Push a packet onto the beginning of the peer's output queue.
+ * This function acquires the peer's write mutex before proceeding.
+ */
+static void bgp_packet_add(struct peer *peer, struct stream *s)
+{
+ pthread_mutex_lock(&peer->obuf_mtx);
+ bgp_packet_add_unsafe(peer, s);
+ pthread_mutex_unlock(&peer->obuf_mtx);
}
-/* Free first packet. */
-static void bgp_packet_delete(struct peer *peer)
+/**
+ * Pop a packet off the end of the peer's output queue.
+ * Must be externally synchronized around 'peer'.
+ */
+static void bgp_packet_delete_unsafe(struct peer *peer)
{
stream_free(stream_fifo_pop(peer->obuf));
}
+
/* Check file descriptor whether connect is established. */
-int bgp_connect_check(struct peer *peer, int change_state)
+static int bgp_connect_check(struct peer *peer, int change_state)
{
int status;
socklen_t slen;
@@ -109,7 +136,6 @@ int bgp_connect_check(struct peer *peer, int change_state)
/* Anyway I have to reset read and write thread. */
BGP_READ_OFF(peer->t_read);
- BGP_WRITE_OFF(peer->t_write);
/* Check file descriptor. */
slen = sizeof(status);
@@ -176,7 +202,7 @@ static struct stream *bgp_update_packet_eor(struct peer *peer, afi_t afi,
}
bgp_packet_set_size(s);
- bgp_packet_add(peer, s);
+ bgp_packet_add_unsafe(peer, s);
return s;
}
@@ -248,246 +274,21 @@ static struct stream *bgp_write_packet(struct peer *peer)
}
- /*
- * Found a packet template to send, overwrite packet
- * with appropriate
- * attributes from peer and advance peer
- */
- s = bpacket_reformat_for_peer(next_pkt, paf);
- bpacket_queue_advance_peer(paf);
- return s;
- }
-
- return NULL;
-}
-
-/* The next action for the peer from a write perspective */
-static void bgp_write_proceed_actions(struct peer *peer)
-{
- afi_t afi;
- safi_t safi;
- struct peer_af *paf;
- struct bpacket *next_pkt;
- int fullq_found = 0;
- struct update_subgroup *subgrp;
-
- if (stream_fifo_head(peer->obuf)) {
- BGP_WRITE_ON(peer->t_write, bgp_write, peer->fd);
- return;
- }
-
- FOREACH_AFI_SAFI (afi, safi) {
- paf = peer_af_find(peer, afi, safi);
- if (!paf)
- continue;
- subgrp = paf->subgroup;
- if (!subgrp)
- continue;
-
- next_pkt = paf->next_pkt_to_send;
- if (next_pkt && next_pkt->buffer) {
- BGP_WRITE_ON(peer->t_write, bgp_write, peer->fd);
- return;
- }
-
- /* No packets readily available for AFI/SAFI, are there
- * subgroup packets
- * that need to be generated? */
- if (bpacket_queue_is_full(SUBGRP_INST(subgrp),
- SUBGRP_PKTQ(subgrp)))
- fullq_found = 1;
- else if (subgroup_packets_to_build(subgrp)) {
- BGP_WRITE_ON(peer->t_write, bgp_write, peer->fd);
- return;
- }
-
- /* No packets to send, see if EOR is pending */
- if (CHECK_FLAG(peer->cap, PEER_CAP_RESTART_RCV)) {
- if (!subgrp->t_coalesce && peer->afc_nego[afi][safi]
- && peer->synctime
- && !CHECK_FLAG(peer->af_sflags[afi][safi],
- PEER_STATUS_EOR_SEND)
- && safi != SAFI_MPLS_VPN) {
- BGP_WRITE_ON(peer->t_write, bgp_write,
- peer->fd);
- return;
- }
- }
- }
- if (fullq_found) {
- BGP_WRITE_ON(peer->t_write, bgp_write, peer->fd);
- return;
- }
-}
-
-/* Write packet to the peer. */
-int bgp_write(struct thread *thread)
-{
- struct peer *peer;
- u_char type;
- struct stream *s;
- int num;
- int update_last_write = 0;
- unsigned int count = 0;
- unsigned int oc = 0;
-
- /* Yes first of all get peer pointer. */
- peer = THREAD_ARG(thread);
- peer->t_write = NULL;
-
- /* For non-blocking IO check. */
- if (peer->status == Connect) {
- bgp_connect_check(peer, 1);
- return 0;
- }
-
- s = bgp_write_packet(peer);
- if (!s) {
- bgp_write_proceed_actions(peer);
- return 0;
- }
-
- sockopt_cork(peer->fd, 1);
-
- oc = peer->update_out;
-
- /* Nonblocking write until TCP output buffer is full. */
- do {
- int writenum;
-
- /* Number of bytes to be sent. */
- writenum = stream_get_endp(s) - stream_get_getp(s);
-
- /* Call write() system call. */
- num = write(peer->fd, STREAM_PNT(s), writenum);
- if (num < 0) {
- /* write failed either retry needed or error */
- if (ERRNO_IO_RETRY(errno))
- break;
-
- BGP_EVENT_ADD(peer, TCP_fatal_error);
- return 0;
- }
-
- if (num != writenum) {
- /* Partial write */
- stream_forward_getp(s, num);
- break;
- }
-
- /* Retrieve BGP packet type. */
- stream_set_getp(s, BGP_MARKER_SIZE + 2);
- type = stream_getc(s);
-
- switch (type) {
- case BGP_MSG_OPEN:
- peer->open_out++;
- break;
- case BGP_MSG_UPDATE:
- peer->update_out++;
- break;
- case BGP_MSG_NOTIFY:
- peer->notify_out++;
- /* Double start timer. */
- peer->v_start *= 2;
-
- /* Overflow check. */
- if (peer->v_start >= (60 * 2))
- peer->v_start = (60 * 2);
-
- /* Flush any existing events */
- BGP_EVENT_ADD(peer, BGP_Stop);
- goto done;
-
- case BGP_MSG_KEEPALIVE:
- peer->keepalive_out++;
- break;
- case BGP_MSG_ROUTE_REFRESH_NEW:
- case BGP_MSG_ROUTE_REFRESH_OLD:
- peer->refresh_out++;
- break;
- case BGP_MSG_CAPABILITY:
- peer->dynamic_cap_out++;
- break;
+ /* Found a packet template to send, overwrite packet
+ * with appropriate
+ * attributes from peer and advance peer */
+ s = bpacket_reformat_for_peer(next_pkt, paf);
+ bgp_packet_add_unsafe(peer, s);
+ bpacket_queue_advance_peer(paf);
+ return s;
}
- /* OK we send packet so delete it. */
- bgp_packet_delete(peer);
- update_last_write = 1;
- } while (++count < peer->bgp->wpkt_quanta
- && (s = bgp_write_packet(peer)) != NULL);
-
- bgp_write_proceed_actions(peer);
-
-done:
- /* Update last_update if UPDATEs were written. */
- if (peer->update_out > oc)
- peer->last_update = bgp_clock();
-
- /* If we TXed any flavor of packet update last_write */
- if (update_last_write)
- peer->last_write = bgp_clock();
-
- sockopt_cork(peer->fd, 0);
- return 0;
-}
-
-/* This is only for sending NOTIFICATION message to neighbor. */
-static int bgp_write_notify(struct peer *peer)
-{
- int ret, val;
- u_char type;
- struct stream *s;
-
- /* There should be at least one packet. */
- s = stream_fifo_head(peer->obuf);
- if (!s)
- return 0;
- assert(stream_get_endp(s) >= BGP_HEADER_SIZE);
-
- /* Stop collecting data within the socket */
- sockopt_cork(peer->fd, 0);
-
- /* socket is in nonblocking mode, if we can't deliver the NOTIFY, well,
- * we only care about getting a clean shutdown at this point. */
- ret = write(peer->fd, STREAM_DATA(s), stream_get_endp(s));
-
- /* only connection reset/close gets counted as TCP_fatal_error, failure
- * to write the entire NOTIFY doesn't get different FSM treatment */
- if (ret <= 0) {
- BGP_EVENT_ADD(peer, TCP_fatal_error);
- return 0;
- }
-
- /* Disable Nagle, make NOTIFY packet go out right away */
- val = 1;
- (void)setsockopt(peer->fd, IPPROTO_TCP, TCP_NODELAY, (char *)&val,
- sizeof(val));
-
- /* Retrieve BGP packet type. */
- stream_set_getp(s, BGP_MARKER_SIZE + 2);
- type = stream_getc(s);
-
- assert(type == BGP_MSG_NOTIFY);
-
- /* Type should be notify. */
- peer->notify_out++;
-
- /* Double start timer. */
- peer->v_start *= 2;
-
- /* Overflow check. */
- if (peer->v_start >= (60 * 2))
- peer->v_start = (60 * 2);
-
- /* Handle Graceful Restart case where the state changes to
- Connect instead of Idle */
- BGP_EVENT_ADD(peer, BGP_Stop);
-
- return 0;
+ return NULL;
}
-/* Make keepalive packet and send it to the peer. */
+/*
+ * Creates a BGP Keepalive packet and appends it to the peer's output queue.
+ */
void bgp_keepalive_send(struct peer *peer)
{
struct stream *s;
@@ -508,11 +309,12 @@ void bgp_keepalive_send(struct peer *peer)
/* Add packet to the peer. */
bgp_packet_add(peer, s);
-
- BGP_WRITE_ON(peer->t_write, bgp_write, peer->fd);
}
-/* Make open packet and send it to the peer. */
+/*
+ * Creates a BGP Open packet and appends it to the peer's output queue.
+ * Sets capabilities as necessary.
+ */
void bgp_open_send(struct peer *peer)
{
struct stream *s;
@@ -560,11 +362,20 @@ void bgp_open_send(struct peer *peer)
/* Add packet to the peer. */
bgp_packet_add(peer, s);
-
- BGP_WRITE_ON(peer->t_write, bgp_write, peer->fd);
}
-/* Send BGP notify packet with data potion. */
+/*
+ * Creates a BGP Notify and appends it to the peer's output queue.
+ *
+ * This function awakens the write thread to ensure the packet
+ * gets out ASAP.
+ *
+ * @param peer
+ * @param code BGP error code
+ * @param sub_code BGP error subcode
+ * @param data Data portion
+ * @param datalen length of data portion
+ */
void bgp_notify_send_with_data(struct peer *peer, u_char code, u_char sub_code,
u_char *data, size_t datalen)
{
@@ -574,7 +385,7 @@ void bgp_notify_send_with_data(struct peer *peer, u_char code, u_char sub_code,
/* Allocate new stream. */
s = stream_new(BGP_MAX_PACKET_SIZE);
- /* Make nitify packet. */
+ /* Make notify packet. */
bgp_packet_set_marker(s, BGP_MSG_NOTIFY);
/* Set notify packet values. */
@@ -589,8 +400,9 @@ void bgp_notify_send_with_data(struct peer *peer, u_char code, u_char sub_code,
length = bgp_packet_set_size(s);
/* Add packet to the peer. */
+ pthread_mutex_lock(&peer->obuf_mtx);
stream_fifo_clean(peer->obuf);
- bgp_packet_add(peer, s);
+ pthread_mutex_unlock(&peer->obuf_mtx);
/* For debug */
{
@@ -641,19 +453,37 @@ void bgp_notify_send_with_data(struct peer *peer, u_char code, u_char sub_code,
} else
peer->last_reset = PEER_DOWN_NOTIFY_SEND;
- /* Call immediately. */
- BGP_WRITE_OFF(peer->t_write);
-
- bgp_write_notify(peer);
+ /* Add packet to peer's output queue */
+ bgp_packet_add(peer, s);
+ /* Wake up the write thread to get the notify out ASAP */
+ peer_writes_wake();
}
-/* Send BGP notify packet. */
+/*
+ * Creates a BGP Notify and appends it to the peer's output queue.
+ *
+ * This function awakens the write thread to ensure the packet
+ * gets out ASAP.
+ *
+ * @param peer
+ * @param code BGP error code
+ * @param sub_code BGP error subcode
+ */
void bgp_notify_send(struct peer *peer, u_char code, u_char sub_code)
{
bgp_notify_send_with_data(peer, code, sub_code, NULL, 0);
}
-/* Send route refresh message to the peer. */
+/*
+ * Creates BGP Route Refresh packet and appends it to the peer's output queue.
+ *
+ * @param peer
+ * @param afi Address Family Identifier
+ * @param safi Subsequent Address Family Identifier
+ * @param orf_type Outbound Route Filtering type
+ * @param when_to_refresh Whether to refresh immediately or defer
+ * @param remove Whether to remove ORF for specified AFI/SAFI
+ */
void bgp_route_refresh_send(struct peer *peer, afi_t afi, safi_t safi,
u_char orf_type, u_char when_to_refresh, int remove)
{
@@ -741,11 +571,17 @@ void bgp_route_refresh_send(struct peer *peer, afi_t afi, safi_t safi,
/* Add packet to the peer. */
bgp_packet_add(peer, s);
-
- BGP_WRITE_ON(peer->t_write, bgp_write, peer->fd);
}
-/* Send capability message to the peer. */
+/*
+ * Create a BGP Capability packet and append it to the peer's output queue.
+ *
+ * @param peer
+ * @param afi Address Family Identifier
+ * @param safi Subsequent Address Family Identifier
+ * @param capability_code BGP Capability Code
+ * @param action Set or Remove capability
+ */
void bgp_capability_send(struct peer *peer, afi_t afi, safi_t safi,
int capability_code, int action)
{
@@ -784,8 +620,6 @@ void bgp_capability_send(struct peer *peer, afi_t afi, safi_t safi,
/* Add packet to the peer. */
bgp_packet_add(peer, s);
-
- BGP_WRITE_ON(peer->t_write, bgp_write, peer->fd);
}
/* RFC1771 6.8 Connection collision detection. */
@@ -2340,3 +2174,226 @@ done:
return 0;
}
+
+/* ------------- write thread ------------------ */
+
+/**
+ * Flush peer output buffer.
+ *
+ * This function pops packets off of peer->obuf and writes them to peer->fd.
+ * The amount of packets written is equal to the minimum of peer->wpkt_quanta
+ * and the number of packets on the output buffer.
+ *
+ * If write() returns an error, the appropriate FSM event is generated.
+ *
+ * The return value is equal to the number of packets written
+ * (which may be zero).
+ */
+static int bgp_write(struct peer *peer)
+{
+ u_char type;
+ struct stream *s;
+ int num;
+ int update_last_write = 0;
+ unsigned int count = 0;
+ unsigned int oc = 0;
+
+ /* For non-blocking IO check. */
+ if (peer->status == Connect) {
+ bgp_connect_check(peer, 1);
+ return 0;
+ }
+
+ /* Write packets. The number of packets written is the value of
+ * bgp->wpkt_quanta or the size of the output buffer, whichever is
+ * smaller.*/
+ while (count < peer->bgp->wpkt_quanta
+ && (s = bgp_write_packet(peer)) != NULL) {
+ int writenum;
+ do { // write a full packet, or return on error
+ writenum = stream_get_endp(s) - stream_get_getp(s);
+ num = write(peer->fd, STREAM_PNT(s), writenum);
+
+ if (num < 0) {
+ if (ERRNO_IO_RETRY(errno))
+ continue;
+
+ BGP_EVENT_ADD(peer, TCP_fatal_error);
+ goto done;
+ } else if (num != writenum) // incomplete write
+ stream_forward_getp(s, num);
+
+ } while (num != writenum);
+
+ /* Retrieve BGP packet type. */
+ stream_set_getp(s, BGP_MARKER_SIZE + 2);
+ type = stream_getc(s);
+
+ switch (type) {
+ case BGP_MSG_OPEN:
+ peer->open_out++;
+ break;
+ case BGP_MSG_UPDATE:
+ peer->update_out++;
+ break;
+ case BGP_MSG_NOTIFY:
+ peer->notify_out++;
+ /* Double start timer. */
+ peer->v_start *= 2;
+
+ /* Overflow check. */
+ if (peer->v_start >= (60 * 2))
+ peer->v_start = (60 * 2);
+
+ /* Handle Graceful Restart case where the state changes
+ to
+ Connect instead of Idle */
+ /* Flush any existing events */
+ BGP_EVENT_ADD(peer, BGP_Stop);
+ goto done;
+
+ case BGP_MSG_KEEPALIVE:
+ peer->keepalive_out++;
+ break;
+ case BGP_MSG_ROUTE_REFRESH_NEW:
+ case BGP_MSG_ROUTE_REFRESH_OLD:
+ peer->refresh_out++;
+ break;
+ case BGP_MSG_CAPABILITY:
+ peer->dynamic_cap_out++;
+ break;
+ }
+
+ count++;
+ /* OK we send packet so delete it. */
+ bgp_packet_delete_unsafe(peer);
+ update_last_write = 1;
+ }
+
+done : {
+ /* Update last_update if UPDATEs were written. */
+ if (peer->update_out > oc)
+ peer->last_update = bgp_clock();
+
+ /* If we TXed any flavor of packet update last_write */
+ if (update_last_write)
+ peer->last_write = bgp_clock();
+}
+
+ return count;
+}
+
+static void cleanup_handler(void *arg)
+{
+ if (plist)
+ list_delete(plist);
+
+ plist = NULL;
+
+ pthread_mutex_unlock(&plist_mtx);
+}
+
+/**
+ * Entry function for peer packet flushing pthread.
+ *
+ * The plist must be initialized before calling this.
+ */
+void *peer_writes_start(void *arg)
+{
+ struct timeval currtime = {0, 0};
+ struct timeval sleeptime = {0, 500};
+ struct timespec next_update = {0, 0};
+
+ // initialize
+ pthread_mutex_lock(&plist_mtx);
+ plist = list_new();
+
+ struct listnode *ln;
+ struct peer *peer;
+
+ pthread_cleanup_push(&cleanup_handler, NULL);
+
+ bgp_packet_writes_thread_run = true;
+
+ while (bgp_packet_writes_thread_run) { // wait around until next update
+ // time
+ if (plist->count > 0)
+ pthread_cond_timedwait(&write_cond, &plist_mtx,
+ &next_update);
+ else // wait around until we have some peers
+ while (plist->count == 0
+ && bgp_packet_writes_thread_run)
+ pthread_cond_wait(&write_cond, &plist_mtx);
+
+ for (ALL_LIST_ELEMENTS_RO(plist, ln, peer)) {
+ pthread_mutex_lock(&peer->obuf_mtx);
+ {
+ bgp_write(peer);
+ }
+ pthread_mutex_unlock(&peer->obuf_mtx);
+ }
+
+ gettimeofday(&currtime, NULL);
+ timeradd(&currtime, &sleeptime, &currtime);
+ TIMEVAL_TO_TIMESPEC(&currtime, &next_update);
+ }
+
+ // clean up
+ pthread_cleanup_pop(1);
+
+ return NULL;
+}
+
+/**
+ * Turns on packet writing for a peer.
+ */
+void peer_writes_on(struct peer *peer)
+{
+ if (peer->status == Deleted)
+ return;
+
+ pthread_mutex_lock(&plist_mtx);
+ {
+ struct listnode *ln, *nn;
+ struct peer *p;
+
+ // make sure this peer isn't already in the list
+ for (ALL_LIST_ELEMENTS(plist, ln, nn, p))
+ if (p == peer) {
+ pthread_mutex_unlock(&plist_mtx);
+ return;
+ }
+
+ peer_lock(peer);
+ listnode_add(plist, peer);
+ }
+ pthread_mutex_unlock(&plist_mtx);
+ peer_writes_wake();
+}
+
+/**
+ * Turns off packet writing for a peer.
+ */
+void peer_writes_off(struct peer *peer)
+{
+ struct listnode *ln, *nn;
+ struct peer *p;
+ pthread_mutex_lock(&plist_mtx);
+ {
+ for (ALL_LIST_ELEMENTS(plist, ln, nn, p))
+ if (p == peer) {
+ list_delete_node(plist, ln);
+ peer_unlock(peer);
+ break;
+ }
+ }
+ pthread_mutex_unlock(&plist_mtx);
+}
+
+/**
+ * Wakes up the write thread to do work.
+ */
+void peer_writes_wake()
+{
+ pthread_cond_signal(&write_cond);
+}
diff --git a/bgpd/bgp_packet.h b/bgpd/bgp_packet.h
index 7bf498c37c..c18e4a2ebd 100644
--- a/bgpd/bgp_packet.h
+++ b/bgpd/bgp_packet.h
@@ -39,8 +39,6 @@
/* Packet send and receive function prototypes. */
extern int bgp_read(struct thread *);
-extern int bgp_write(struct thread *);
-extern int bgp_connect_check(struct peer *, int change_state);
extern void bgp_keepalive_send(struct peer *);
extern void bgp_open_send(struct peer *);
@@ -65,6 +63,13 @@ extern void bgp_check_update_delay(struct bgp *);
extern int bgp_packet_set_marker(struct stream *s, u_char type);
extern int bgp_packet_set_size(struct stream *s);
-extern void bgp_packet_add(struct peer *peer, struct stream *s);
+
+/* Control variable for write thread. */
+extern bool bgp_packet_writes_thread_run;
+
+extern void *peer_writes_start(void *arg);
+extern void peer_writes_on(struct peer *peer);
+extern void peer_writes_off(struct peer *peer);
+extern void peer_writes_wake(void);
#endif /* _QUAGGA_BGP_PACKET_H */
diff --git a/bgpd/bgp_updgrp.c b/bgpd/bgp_updgrp.c
index 8f67290600..585c6cebbc 100644
--- a/bgpd/bgp_updgrp.c
+++ b/bgpd/bgp_updgrp.c
@@ -1867,23 +1867,6 @@ void peer_af_announce_route(struct peer_af *paf, int combine)
subgrp->peer_count - 1);
}
-void subgroup_trigger_write(struct update_subgroup *subgrp)
-{
- struct peer_af *paf;
-
-#if 0
- if (bgp_debug_update(NULL, NULL, subgrp->update_group, 0))
- zlog_debug("u%llu:s%llu scheduling write thread for peers",
- subgrp->update_group->id, subgrp->id);
-#endif
- SUBGRP_FOREACH_PEER (subgrp, paf) {
- if (paf->peer->status == Established) {
- BGP_PEER_WRITE_ON(paf->peer->t_write, bgp_write,
- paf->peer->fd, paf->peer);
- }
- }
-}
-
int update_group_clear_update_dbg(struct update_group *updgrp, void *arg)
{
UPDGRP_PEER_DBG_OFF(updgrp);
diff --git a/bgpd/bgp_updgrp.h b/bgpd/bgp_updgrp.h
index 52a21679b8..a50bc05fed 100644
--- a/bgpd/bgp_updgrp.h
+++ b/bgpd/bgp_updgrp.h
@@ -442,7 +442,6 @@ extern void bgp_adj_out_unset_subgroup(struct bgp_node *rn,
char withdraw, u_int32_t addpath_tx_id);
void subgroup_announce_table(struct update_subgroup *subgrp,
struct bgp_table *table);
-extern void subgroup_trigger_write(struct update_subgroup *subgrp);
extern int update_group_clear_update_dbg(struct update_group *updgrp,
void *arg);
diff --git a/bgpd/bgp_updgrp_adv.c b/bgpd/bgp_updgrp_adv.c
index b4f18c9f5e..1ec9915ee5 100644
--- a/bgpd/bgp_updgrp_adv.c
+++ b/bgpd/bgp_updgrp_adv.c
@@ -483,7 +483,6 @@ void bgp_adj_out_unset_subgroup(struct bgp_node *rn,
{
struct bgp_adj_out *adj;
struct bgp_advertise *adv;
- char trigger_write;
if (DISABLE_BGP_ANNOUNCE)
return;
@@ -501,20 +500,9 @@ void bgp_adj_out_unset_subgroup(struct bgp_node *rn,
adv->rn = rn;
adv->adj = adj;
- /* Note if we need to trigger a packet write */
- if (BGP_ADV_FIFO_EMPTY(&subgrp->sync->withdraw))
- trigger_write = 1;
- else
- trigger_write = 0;
-
/* Add to synchronization entry for withdraw
* announcement. */
BGP_ADV_FIFO_ADD(&subgrp->sync->withdraw, &adv->fifo);
-
- /* Schedule packet write, if FIFO is getting its first
- * entry. */
- if (trigger_write)
- subgroup_trigger_write(subgrp);
} else {
/* Remove myself from adjacency. */
BGP_ADJ_OUT_DEL(rn, adj);
diff --git a/bgpd/bgp_updgrp_packet.c b/bgpd/bgp_updgrp_packet.c
index a35d814e47..77b3ce1937 100644
--- a/bgpd/bgp_updgrp_packet.c
+++ b/bgpd/bgp_updgrp_packet.c
@@ -633,7 +633,6 @@ struct stream *bpacket_reformat_for_peer(struct bpacket *pkt,
}
}
- bgp_packet_add(peer, s);
return s;
}
@@ -1149,7 +1148,6 @@ void subgroup_default_update_packet(struct update_subgroup *subgrp,
bgp_packet_set_size(s);
(void)bpacket_queue_add(SUBGRP_PKTQ(subgrp), s, &vecarr);
- subgroup_trigger_write(subgrp);
}
void subgroup_default_withdraw_packet(struct update_subgroup *subgrp)
@@ -1242,7 +1240,6 @@ void subgroup_default_withdraw_packet(struct update_subgroup *subgrp)
bgp_packet_set_size(s);
(void)bpacket_queue_add(SUBGRP_PKTQ(subgrp), s, NULL);
- subgroup_trigger_write(subgrp);
}
static void
diff --git a/bgpd/bgpd.c b/bgpd/bgpd.c
index a4952be8a6..cee73e2c43 100644
--- a/bgpd/bgpd.c
+++ b/bgpd/bgpd.c
@@ -990,7 +990,7 @@ static void peer_free(struct peer *peer)
*/
bgp_timer_set(peer);
BGP_READ_OFF(peer->t_read);
- BGP_WRITE_OFF(peer->t_write);
+ peer_writes_off(peer);
BGP_EVENT_FLUSH(peer);
/* Free connected nexthop, if present */
@@ -1137,6 +1137,7 @@ struct peer *peer_new(struct bgp *bgp)
/* Create buffers. */
peer->ibuf = stream_new(BGP_MAX_PACKET_SIZE);
peer->obuf = stream_fifo_new();
+ pthread_mutex_init(&peer->obuf_mtx, NULL);
/* We use a larger buffer for peer->work in the event that:
* - We RX a BGP_UPDATE where the attributes alone are just
diff --git a/bgpd/bgpd.h b/bgpd/bgpd.h
index 36bdaf0125..f427c5d9f5 100644
--- a/bgpd/bgpd.h
+++ b/bgpd/bgpd.h
@@ -22,6 +22,8 @@
#define _QUAGGA_BGPD_H
#include "qobj.h"
+#include <pthread.h>
+
#include "lib/json.h"
#include "vrf.h"
#include "vty.h"
@@ -584,6 +586,7 @@ struct peer {
/* Packet receive and send buffer. */
struct stream *ibuf;
+ pthread_mutex_t obuf_mtx;
struct stream_fifo *obuf;
struct stream *work;