diff options
| -rw-r--r-- | bgpd/bgp_fsm.c | 41 | ||||
| -rw-r--r-- | bgpd/bgp_fsm.h | 12 | ||||
| -rw-r--r-- | bgpd/bgp_main.c | 4 | ||||
| -rw-r--r-- | bgpd/bgp_packet.c | 581 | ||||
| -rw-r--r-- | bgpd/bgp_packet.h | 11 | ||||
| -rw-r--r-- | bgpd/bgp_updgrp.c | 17 | ||||
| -rw-r--r-- | bgpd/bgp_updgrp.h | 1 | ||||
| -rw-r--r-- | bgpd/bgp_updgrp_adv.c | 12 | ||||
| -rw-r--r-- | bgpd/bgp_updgrp_packet.c | 3 | ||||
| -rw-r--r-- | bgpd/bgpd.c | 3 | ||||
| -rw-r--r-- | bgpd/bgpd.h | 3 |
11 files changed, 365 insertions, 323 deletions
diff --git a/bgpd/bgp_fsm.c b/bgpd/bgp_fsm.c index 8de7e970de..bc4f8272f3 100644 --- a/bgpd/bgp_fsm.c +++ b/bgpd/bgp_fsm.c @@ -125,9 +125,9 @@ static struct peer *peer_xfer_conn(struct peer *from_peer) from_peer->host, from_peer, from_peer->fd, peer, peer->fd); - BGP_WRITE_OFF(peer->t_write); + peer_writes_off(peer); BGP_READ_OFF(peer->t_read); - BGP_WRITE_OFF(from_peer->t_write); + peer_writes_off(from_peer); BGP_READ_OFF(from_peer->t_read); BGP_TIMER_OFF(peer->t_routeadv); @@ -137,8 +137,18 @@ static struct peer *peer_xfer_conn(struct peer *from_peer) peer->fd = from_peer->fd; from_peer->fd = fd; stream_reset(peer->ibuf); - stream_fifo_clean(peer->obuf); - stream_fifo_clean(from_peer->obuf); + + pthread_mutex_lock(&peer->obuf_mtx); + { + stream_fifo_clean(peer->obuf); + } + pthread_mutex_unlock(&peer->obuf_mtx); + + pthread_mutex_lock(&from_peer->obuf_mtx); + { + stream_fifo_clean(from_peer->obuf); + } + pthread_mutex_unlock(&from_peer->obuf_mtx); peer->as = from_peer->as; peer->v_holdtime = from_peer->v_holdtime; @@ -217,7 +227,7 @@ static struct peer *peer_xfer_conn(struct peer *from_peer) } BGP_READ_ON(peer->t_read, bgp_read, peer->fd); - BGP_WRITE_ON(peer->t_write, bgp_write, peer->fd); + peer_writes_on(peer); if (from_peer) peer_xfer_stats(peer, from_peer); @@ -433,8 +443,6 @@ int bgp_routeadv_timer(struct thread *thread) peer->synctime = bgp_clock(); - BGP_WRITE_ON(peer->t_write, bgp_write, peer->fd); - /* MRAI timer will be started again when FIFO is built, no need to * do it here. */ @@ -640,7 +648,6 @@ void bgp_adjust_routeadv(struct peer *peer) BGP_TIMER_OFF(peer->t_routeadv); peer->synctime = bgp_clock(); - BGP_WRITE_ON(peer->t_write, bgp_write, peer->fd); return; } @@ -956,6 +963,9 @@ int bgp_stop(struct peer *peer) char orf_name[BUFSIZ]; int ret = 0; + // immediately remove from threads + peer_writes_off(peer); + if (peer_dynamic_neighbor(peer) && !(CHECK_FLAG(peer->flags, PEER_FLAG_DELETE))) { if (bgp_debug_neighbor_events(peer)) @@ -1037,7 +1047,7 @@ int bgp_stop(struct peer *peer) /* Stop read and write threads when exists. */ BGP_READ_OFF(peer->t_read); - BGP_WRITE_OFF(peer->t_write); + peer_writes_off(peer); /* Stop all timers. */ BGP_TIMER_OFF(peer->t_start); @@ -1054,8 +1064,13 @@ int bgp_stop(struct peer *peer) stream_reset(peer->ibuf); if (peer->work) stream_reset(peer->work); - if (peer->obuf) - stream_fifo_clean(peer->obuf); + + pthread_mutex_lock(&peer->obuf_mtx); + { + if (peer->obuf) + stream_fifo_clean(peer->obuf); + } + pthread_mutex_unlock(&peer->obuf_mtx); /* Close of file descriptor. */ if (peer->fd >= 0) { @@ -1173,6 +1188,8 @@ static int bgp_connect_success(struct peer *peer) return -1; } + peer_writes_on(peer); + if (bgp_getsockname(peer) < 0) { zlog_err("%s: bgp_getsockname(): failed for peer %s, fd %d", __FUNCTION__, peer->host, peer->fd); @@ -1313,7 +1330,7 @@ int bgp_start(struct peer *peer) return -1; } BGP_READ_ON(peer->t_read, bgp_read, peer->fd); - BGP_WRITE_ON(peer->t_write, bgp_write, peer->fd); + peer_writes_on(peer); break; } return 0; diff --git a/bgpd/bgp_fsm.h b/bgpd/bgp_fsm.h index 51d5d7aaa8..a7abfdb2f4 100644 --- a/bgpd/bgp_fsm.h +++ b/bgpd/bgp_fsm.h @@ -35,18 +35,6 @@ THREAD_READ_OFF(T); \ } while (0) -#define BGP_WRITE_ON(T, F, V) \ - do { \ - if ((peer)->status != Deleted) \ - thread_add_write(bm->master, (F), (peer), (V), &(T)); \ - } while (0) - -#define BGP_PEER_WRITE_ON(T, F, V, peer) \ - do { \ - if ((peer)->status != Deleted) \ - thread_add_write(bm->master, (F), (peer), (V), &(T)); \ - } while (0) - #define BGP_WRITE_OFF(T) \ do { \ if (T) \ diff --git a/bgpd/bgp_main.c b/bgpd/bgp_main.c index 1fac2936eb..b4ea45b3e0 100644 --- a/bgpd/bgp_main.c +++ b/bgpd/bgp_main.c @@ -54,6 +54,7 @@ #include "bgpd/bgp_debug.h" #include "bgpd/bgp_filter.h" #include "bgpd/bgp_zebra.h" +#include "bgpd/bgp_packet.h" #ifdef ENABLE_BGP_VNC #include "bgpd/rfapi/rfapi_backend.h" @@ -392,6 +393,9 @@ int main(int argc, char **argv) snprintf(bgpd_di.startinfo, sizeof(bgpd_di.startinfo), ", bgp@%s:%d", (bm->address ? bm->address : "<all>"), bm->port); + pthread_t packet_writes; + pthread_create(&packet_writes, NULL, &peer_writes_start, NULL); + frr_config_fork(); frr_run(bm->master); diff --git a/bgpd/bgp_packet.c b/bgpd/bgp_packet.c index a955b3512c..e27b416d07 100644 --- a/bgpd/bgp_packet.c +++ b/bgpd/bgp_packet.c @@ -19,6 +19,7 @@ */ #include <zebra.h> +#include <sys/time.h> #include "thread.h" #include "stream.h" @@ -55,6 +56,13 @@ #include "bgpd/bgp_updgrp.h" #include "bgpd/bgp_label.h" +/* Linked list of active peers */ +static pthread_mutex_t plist_mtx = PTHREAD_MUTEX_INITIALIZER; +static pthread_cond_t write_cond = PTHREAD_COND_INITIALIZER; +static struct list *plist; + +bool bgp_packet_writes_thread_run; + /* Set up BGP packet marker and packet type. */ int bgp_packet_set_marker(struct stream *s, u_char type) { @@ -87,21 +95,40 @@ int bgp_packet_set_size(struct stream *s) return cp; } -/* Add new packet to the peer. */ -void bgp_packet_add(struct peer *peer, struct stream *s) +/** + * Push a packet onto the beginning of the peer's output queue. + * Must be externally synchronized around 'peer'. + */ +static void bgp_packet_add_unsafe(struct peer *peer, struct stream *s) { /* Add packet to the end of list. */ stream_fifo_push(peer->obuf, s); + peer_writes_wake(); +} + +/* + * Push a packet onto the beginning of the peer's output queue. + * This function acquires the peer's write mutex before proceeding. + */ +static void bgp_packet_add(struct peer *peer, struct stream *s) +{ + pthread_mutex_lock(&peer->obuf_mtx); + bgp_packet_add_unsafe(peer, s); + pthread_mutex_unlock(&peer->obuf_mtx); } -/* Free first packet. */ -static void bgp_packet_delete(struct peer *peer) +/** + * Pop a packet off the end of the peer's output queue. + * Must be externally synchronized around 'peer'. + */ +static void bgp_packet_delete_unsafe(struct peer *peer) { stream_free(stream_fifo_pop(peer->obuf)); } + /* Check file descriptor whether connect is established. */ -int bgp_connect_check(struct peer *peer, int change_state) +static int bgp_connect_check(struct peer *peer, int change_state) { int status; socklen_t slen; @@ -109,7 +136,6 @@ int bgp_connect_check(struct peer *peer, int change_state) /* Anyway I have to reset read and write thread. */ BGP_READ_OFF(peer->t_read); - BGP_WRITE_OFF(peer->t_write); /* Check file descriptor. */ slen = sizeof(status); @@ -176,7 +202,7 @@ static struct stream *bgp_update_packet_eor(struct peer *peer, afi_t afi, } bgp_packet_set_size(s); - bgp_packet_add(peer, s); + bgp_packet_add_unsafe(peer, s); return s; } @@ -248,246 +274,21 @@ static struct stream *bgp_write_packet(struct peer *peer) } - /* - * Found a packet template to send, overwrite packet - * with appropriate - * attributes from peer and advance peer - */ - s = bpacket_reformat_for_peer(next_pkt, paf); - bpacket_queue_advance_peer(paf); - return s; - } - - return NULL; -} - -/* The next action for the peer from a write perspective */ -static void bgp_write_proceed_actions(struct peer *peer) -{ - afi_t afi; - safi_t safi; - struct peer_af *paf; - struct bpacket *next_pkt; - int fullq_found = 0; - struct update_subgroup *subgrp; - - if (stream_fifo_head(peer->obuf)) { - BGP_WRITE_ON(peer->t_write, bgp_write, peer->fd); - return; - } - - FOREACH_AFI_SAFI (afi, safi) { - paf = peer_af_find(peer, afi, safi); - if (!paf) - continue; - subgrp = paf->subgroup; - if (!subgrp) - continue; - - next_pkt = paf->next_pkt_to_send; - if (next_pkt && next_pkt->buffer) { - BGP_WRITE_ON(peer->t_write, bgp_write, peer->fd); - return; - } - - /* No packets readily available for AFI/SAFI, are there - * subgroup packets - * that need to be generated? */ - if (bpacket_queue_is_full(SUBGRP_INST(subgrp), - SUBGRP_PKTQ(subgrp))) - fullq_found = 1; - else if (subgroup_packets_to_build(subgrp)) { - BGP_WRITE_ON(peer->t_write, bgp_write, peer->fd); - return; - } - - /* No packets to send, see if EOR is pending */ - if (CHECK_FLAG(peer->cap, PEER_CAP_RESTART_RCV)) { - if (!subgrp->t_coalesce && peer->afc_nego[afi][safi] - && peer->synctime - && !CHECK_FLAG(peer->af_sflags[afi][safi], - PEER_STATUS_EOR_SEND) - && safi != SAFI_MPLS_VPN) { - BGP_WRITE_ON(peer->t_write, bgp_write, - peer->fd); - return; - } - } - } - if (fullq_found) { - BGP_WRITE_ON(peer->t_write, bgp_write, peer->fd); - return; - } -} - -/* Write packet to the peer. */ -int bgp_write(struct thread *thread) -{ - struct peer *peer; - u_char type; - struct stream *s; - int num; - int update_last_write = 0; - unsigned int count = 0; - unsigned int oc = 0; - - /* Yes first of all get peer pointer. */ - peer = THREAD_ARG(thread); - peer->t_write = NULL; - - /* For non-blocking IO check. */ - if (peer->status == Connect) { - bgp_connect_check(peer, 1); - return 0; - } - - s = bgp_write_packet(peer); - if (!s) { - bgp_write_proceed_actions(peer); - return 0; - } - - sockopt_cork(peer->fd, 1); - - oc = peer->update_out; - - /* Nonblocking write until TCP output buffer is full. */ - do { - int writenum; - - /* Number of bytes to be sent. */ - writenum = stream_get_endp(s) - stream_get_getp(s); - - /* Call write() system call. */ - num = write(peer->fd, STREAM_PNT(s), writenum); - if (num < 0) { - /* write failed either retry needed or error */ - if (ERRNO_IO_RETRY(errno)) - break; - - BGP_EVENT_ADD(peer, TCP_fatal_error); - return 0; - } - - if (num != writenum) { - /* Partial write */ - stream_forward_getp(s, num); - break; - } - - /* Retrieve BGP packet type. */ - stream_set_getp(s, BGP_MARKER_SIZE + 2); - type = stream_getc(s); - - switch (type) { - case BGP_MSG_OPEN: - peer->open_out++; - break; - case BGP_MSG_UPDATE: - peer->update_out++; - break; - case BGP_MSG_NOTIFY: - peer->notify_out++; - /* Double start timer. */ - peer->v_start *= 2; - - /* Overflow check. */ - if (peer->v_start >= (60 * 2)) - peer->v_start = (60 * 2); - - /* Flush any existing events */ - BGP_EVENT_ADD(peer, BGP_Stop); - goto done; - - case BGP_MSG_KEEPALIVE: - peer->keepalive_out++; - break; - case BGP_MSG_ROUTE_REFRESH_NEW: - case BGP_MSG_ROUTE_REFRESH_OLD: - peer->refresh_out++; - break; - case BGP_MSG_CAPABILITY: - peer->dynamic_cap_out++; - break; + /* Found a packet template to send, overwrite packet + * with appropriate + * attributes from peer and advance peer */ + s = bpacket_reformat_for_peer(next_pkt, paf); + bgp_packet_add_unsafe(peer, s); + bpacket_queue_advance_peer(paf); + return s; } - /* OK we send packet so delete it. */ - bgp_packet_delete(peer); - update_last_write = 1; - } while (++count < peer->bgp->wpkt_quanta - && (s = bgp_write_packet(peer)) != NULL); - - bgp_write_proceed_actions(peer); - -done: - /* Update last_update if UPDATEs were written. */ - if (peer->update_out > oc) - peer->last_update = bgp_clock(); - - /* If we TXed any flavor of packet update last_write */ - if (update_last_write) - peer->last_write = bgp_clock(); - - sockopt_cork(peer->fd, 0); - return 0; -} - -/* This is only for sending NOTIFICATION message to neighbor. */ -static int bgp_write_notify(struct peer *peer) -{ - int ret, val; - u_char type; - struct stream *s; - - /* There should be at least one packet. */ - s = stream_fifo_head(peer->obuf); - if (!s) - return 0; - assert(stream_get_endp(s) >= BGP_HEADER_SIZE); - - /* Stop collecting data within the socket */ - sockopt_cork(peer->fd, 0); - - /* socket is in nonblocking mode, if we can't deliver the NOTIFY, well, - * we only care about getting a clean shutdown at this point. */ - ret = write(peer->fd, STREAM_DATA(s), stream_get_endp(s)); - - /* only connection reset/close gets counted as TCP_fatal_error, failure - * to write the entire NOTIFY doesn't get different FSM treatment */ - if (ret <= 0) { - BGP_EVENT_ADD(peer, TCP_fatal_error); - return 0; - } - - /* Disable Nagle, make NOTIFY packet go out right away */ - val = 1; - (void)setsockopt(peer->fd, IPPROTO_TCP, TCP_NODELAY, (char *)&val, - sizeof(val)); - - /* Retrieve BGP packet type. */ - stream_set_getp(s, BGP_MARKER_SIZE + 2); - type = stream_getc(s); - - assert(type == BGP_MSG_NOTIFY); - - /* Type should be notify. */ - peer->notify_out++; - - /* Double start timer. */ - peer->v_start *= 2; - - /* Overflow check. */ - if (peer->v_start >= (60 * 2)) - peer->v_start = (60 * 2); - - /* Handle Graceful Restart case where the state changes to - Connect instead of Idle */ - BGP_EVENT_ADD(peer, BGP_Stop); - - return 0; + return NULL; } -/* Make keepalive packet and send it to the peer. */ +/* + * Creates a BGP Keepalive packet and appends it to the peer's output queue. + */ void bgp_keepalive_send(struct peer *peer) { struct stream *s; @@ -508,11 +309,12 @@ void bgp_keepalive_send(struct peer *peer) /* Add packet to the peer. */ bgp_packet_add(peer, s); - - BGP_WRITE_ON(peer->t_write, bgp_write, peer->fd); } -/* Make open packet and send it to the peer. */ +/* + * Creates a BGP Open packet and appends it to the peer's output queue. + * Sets capabilities as necessary. + */ void bgp_open_send(struct peer *peer) { struct stream *s; @@ -560,11 +362,20 @@ void bgp_open_send(struct peer *peer) /* Add packet to the peer. */ bgp_packet_add(peer, s); - - BGP_WRITE_ON(peer->t_write, bgp_write, peer->fd); } -/* Send BGP notify packet with data potion. */ +/* + * Creates a BGP Notify and appends it to the peer's output queue. + * + * This function awakens the write thread to ensure the packet + * gets out ASAP. + * + * @param peer + * @param code BGP error code + * @param sub_code BGP error subcode + * @param data Data portion + * @param datalen length of data portion + */ void bgp_notify_send_with_data(struct peer *peer, u_char code, u_char sub_code, u_char *data, size_t datalen) { @@ -574,7 +385,7 @@ void bgp_notify_send_with_data(struct peer *peer, u_char code, u_char sub_code, /* Allocate new stream. */ s = stream_new(BGP_MAX_PACKET_SIZE); - /* Make nitify packet. */ + /* Make notify packet. */ bgp_packet_set_marker(s, BGP_MSG_NOTIFY); /* Set notify packet values. */ @@ -589,8 +400,9 @@ void bgp_notify_send_with_data(struct peer *peer, u_char code, u_char sub_code, length = bgp_packet_set_size(s); /* Add packet to the peer. */ + pthread_mutex_lock(&peer->obuf_mtx); stream_fifo_clean(peer->obuf); - bgp_packet_add(peer, s); + pthread_mutex_unlock(&peer->obuf_mtx); /* For debug */ { @@ -641,19 +453,37 @@ void bgp_notify_send_with_data(struct peer *peer, u_char code, u_char sub_code, } else peer->last_reset = PEER_DOWN_NOTIFY_SEND; - /* Call immediately. */ - BGP_WRITE_OFF(peer->t_write); - - bgp_write_notify(peer); + /* Add packet to peer's output queue */ + bgp_packet_add(peer, s); + /* Wake up the write thread to get the notify out ASAP */ + peer_writes_wake(); } -/* Send BGP notify packet. */ +/* + * Creates a BGP Notify and appends it to the peer's output queue. + * + * This function awakens the write thread to ensure the packet + * gets out ASAP. + * + * @param peer + * @param code BGP error code + * @param sub_code BGP error subcode + */ void bgp_notify_send(struct peer *peer, u_char code, u_char sub_code) { bgp_notify_send_with_data(peer, code, sub_code, NULL, 0); } -/* Send route refresh message to the peer. */ +/* + * Creates BGP Route Refresh packet and appends it to the peer's output queue. + * + * @param peer + * @param afi Address Family Identifier + * @param safi Subsequent Address Family Identifier + * @param orf_type Outbound Route Filtering type + * @param when_to_refresh Whether to refresh immediately or defer + * @param remove Whether to remove ORF for specified AFI/SAFI + */ void bgp_route_refresh_send(struct peer *peer, afi_t afi, safi_t safi, u_char orf_type, u_char when_to_refresh, int remove) { @@ -741,11 +571,17 @@ void bgp_route_refresh_send(struct peer *peer, afi_t afi, safi_t safi, /* Add packet to the peer. */ bgp_packet_add(peer, s); - - BGP_WRITE_ON(peer->t_write, bgp_write, peer->fd); } -/* Send capability message to the peer. */ +/* + * Create a BGP Capability packet and append it to the peer's output queue. + * + * @param peer + * @param afi Address Family Identifier + * @param safi Subsequent Address Family Identifier + * @param capability_code BGP Capability Code + * @param action Set or Remove capability + */ void bgp_capability_send(struct peer *peer, afi_t afi, safi_t safi, int capability_code, int action) { @@ -784,8 +620,6 @@ void bgp_capability_send(struct peer *peer, afi_t afi, safi_t safi, /* Add packet to the peer. */ bgp_packet_add(peer, s); - - BGP_WRITE_ON(peer->t_write, bgp_write, peer->fd); } /* RFC1771 6.8 Connection collision detection. */ @@ -2340,3 +2174,226 @@ done: return 0; } + +/* ------------- write thread ------------------ */ + +/** + * Flush peer output buffer. + * + * This function pops packets off of peer->obuf and writes them to peer->fd. + * The amount of packets written is equal to the minimum of peer->wpkt_quanta + * and the number of packets on the output buffer. + * + * If write() returns an error, the appropriate FSM event is generated. + * + * The return value is equal to the number of packets written + * (which may be zero). + */ +static int bgp_write(struct peer *peer) +{ + u_char type; + struct stream *s; + int num; + int update_last_write = 0; + unsigned int count = 0; + unsigned int oc = 0; + + /* For non-blocking IO check. */ + if (peer->status == Connect) { + bgp_connect_check(peer, 1); + return 0; + } + + /* Write packets. The number of packets written is the value of + * bgp->wpkt_quanta or the size of the output buffer, whichever is + * smaller.*/ + while (count < peer->bgp->wpkt_quanta + && (s = bgp_write_packet(peer)) != NULL) { + int writenum; + do { // write a full packet, or return on error + writenum = stream_get_endp(s) - stream_get_getp(s); + num = write(peer->fd, STREAM_PNT(s), writenum); + + if (num < 0) { + if (ERRNO_IO_RETRY(errno)) + continue; + + BGP_EVENT_ADD(peer, TCP_fatal_error); + goto done; + } else if (num != writenum) // incomplete write + stream_forward_getp(s, num); + + } while (num != writenum); + + /* Retrieve BGP packet type. */ + stream_set_getp(s, BGP_MARKER_SIZE + 2); + type = stream_getc(s); + + switch (type) { + case BGP_MSG_OPEN: + peer->open_out++; + break; + case BGP_MSG_UPDATE: + peer->update_out++; + break; + case BGP_MSG_NOTIFY: + peer->notify_out++; + /* Double start timer. */ + peer->v_start *= 2; + + /* Overflow check. */ + if (peer->v_start >= (60 * 2)) + peer->v_start = (60 * 2); + + /* Handle Graceful Restart case where the state changes + to + Connect instead of Idle */ + /* Flush any existing events */ + BGP_EVENT_ADD(peer, BGP_Stop); + goto done; + + case BGP_MSG_KEEPALIVE: + peer->keepalive_out++; + break; + case BGP_MSG_ROUTE_REFRESH_NEW: + case BGP_MSG_ROUTE_REFRESH_OLD: + peer->refresh_out++; + break; + case BGP_MSG_CAPABILITY: + peer->dynamic_cap_out++; + break; + } + + count++; + /* OK we send packet so delete it. */ + bgp_packet_delete_unsafe(peer); + update_last_write = 1; + } + +done : { + /* Update last_update if UPDATEs were written. */ + if (peer->update_out > oc) + peer->last_update = bgp_clock(); + + /* If we TXed any flavor of packet update last_write */ + if (update_last_write) + peer->last_write = bgp_clock(); +} + + return count; +} + +static void cleanup_handler(void *arg) +{ + if (plist) + list_delete(plist); + + plist = NULL; + + pthread_mutex_unlock(&plist_mtx); +} + +/** + * Entry function for peer packet flushing pthread. + * + * The plist must be initialized before calling this. + */ +void *peer_writes_start(void *arg) +{ + struct timeval currtime = {0, 0}; + struct timeval sleeptime = {0, 500}; + struct timespec next_update = {0, 0}; + + // initialize + pthread_mutex_lock(&plist_mtx); + plist = list_new(); + + struct listnode *ln; + struct peer *peer; + + pthread_cleanup_push(&cleanup_handler, NULL); + + bgp_packet_writes_thread_run = true; + + while (bgp_packet_writes_thread_run) { // wait around until next update + // time + if (plist->count > 0) + pthread_cond_timedwait(&write_cond, &plist_mtx, + &next_update); + else // wait around until we have some peers + while (plist->count == 0 + && bgp_packet_writes_thread_run) + pthread_cond_wait(&write_cond, &plist_mtx); + + for (ALL_LIST_ELEMENTS_RO(plist, ln, peer)) { + pthread_mutex_lock(&peer->obuf_mtx); + { + bgp_write(peer); + } + pthread_mutex_unlock(&peer->obuf_mtx); + } + + gettimeofday(&currtime, NULL); + timeradd(&currtime, &sleeptime, &currtime); + TIMEVAL_TO_TIMESPEC(&currtime, &next_update); + } + + // clean up + pthread_cleanup_pop(1); + + return NULL; +} + +/** + * Turns on packet writing for a peer. + */ +void peer_writes_on(struct peer *peer) +{ + if (peer->status == Deleted) + return; + + pthread_mutex_lock(&plist_mtx); + { + struct listnode *ln, *nn; + struct peer *p; + + // make sure this peer isn't already in the list + for (ALL_LIST_ELEMENTS(plist, ln, nn, p)) + if (p == peer) { + pthread_mutex_unlock(&plist_mtx); + return; + } + + peer_lock(peer); + listnode_add(plist, peer); + } + pthread_mutex_unlock(&plist_mtx); + peer_writes_wake(); +} + +/** + * Turns off packet writing for a peer. + */ +void peer_writes_off(struct peer *peer) +{ + struct listnode *ln, *nn; + struct peer *p; + pthread_mutex_lock(&plist_mtx); + { + for (ALL_LIST_ELEMENTS(plist, ln, nn, p)) + if (p == peer) { + list_delete_node(plist, ln); + peer_unlock(peer); + break; + } + } + pthread_mutex_unlock(&plist_mtx); +} + +/** + * Wakes up the write thread to do work. + */ +void peer_writes_wake() +{ + pthread_cond_signal(&write_cond); +} diff --git a/bgpd/bgp_packet.h b/bgpd/bgp_packet.h index 7bf498c37c..c18e4a2ebd 100644 --- a/bgpd/bgp_packet.h +++ b/bgpd/bgp_packet.h @@ -39,8 +39,6 @@ /* Packet send and receive function prototypes. */ extern int bgp_read(struct thread *); -extern int bgp_write(struct thread *); -extern int bgp_connect_check(struct peer *, int change_state); extern void bgp_keepalive_send(struct peer *); extern void bgp_open_send(struct peer *); @@ -65,6 +63,13 @@ extern void bgp_check_update_delay(struct bgp *); extern int bgp_packet_set_marker(struct stream *s, u_char type); extern int bgp_packet_set_size(struct stream *s); -extern void bgp_packet_add(struct peer *peer, struct stream *s); + +/* Control variable for write thread. */ +extern bool bgp_packet_writes_thread_run; + +extern void *peer_writes_start(void *arg); +extern void peer_writes_on(struct peer *peer); +extern void peer_writes_off(struct peer *peer); +extern void peer_writes_wake(void); #endif /* _QUAGGA_BGP_PACKET_H */ diff --git a/bgpd/bgp_updgrp.c b/bgpd/bgp_updgrp.c index 8f67290600..585c6cebbc 100644 --- a/bgpd/bgp_updgrp.c +++ b/bgpd/bgp_updgrp.c @@ -1867,23 +1867,6 @@ void peer_af_announce_route(struct peer_af *paf, int combine) subgrp->peer_count - 1); } -void subgroup_trigger_write(struct update_subgroup *subgrp) -{ - struct peer_af *paf; - -#if 0 - if (bgp_debug_update(NULL, NULL, subgrp->update_group, 0)) - zlog_debug("u%llu:s%llu scheduling write thread for peers", - subgrp->update_group->id, subgrp->id); -#endif - SUBGRP_FOREACH_PEER (subgrp, paf) { - if (paf->peer->status == Established) { - BGP_PEER_WRITE_ON(paf->peer->t_write, bgp_write, - paf->peer->fd, paf->peer); - } - } -} - int update_group_clear_update_dbg(struct update_group *updgrp, void *arg) { UPDGRP_PEER_DBG_OFF(updgrp); diff --git a/bgpd/bgp_updgrp.h b/bgpd/bgp_updgrp.h index 52a21679b8..a50bc05fed 100644 --- a/bgpd/bgp_updgrp.h +++ b/bgpd/bgp_updgrp.h @@ -442,7 +442,6 @@ extern void bgp_adj_out_unset_subgroup(struct bgp_node *rn, char withdraw, u_int32_t addpath_tx_id); void subgroup_announce_table(struct update_subgroup *subgrp, struct bgp_table *table); -extern void subgroup_trigger_write(struct update_subgroup *subgrp); extern int update_group_clear_update_dbg(struct update_group *updgrp, void *arg); diff --git a/bgpd/bgp_updgrp_adv.c b/bgpd/bgp_updgrp_adv.c index b4f18c9f5e..1ec9915ee5 100644 --- a/bgpd/bgp_updgrp_adv.c +++ b/bgpd/bgp_updgrp_adv.c @@ -483,7 +483,6 @@ void bgp_adj_out_unset_subgroup(struct bgp_node *rn, { struct bgp_adj_out *adj; struct bgp_advertise *adv; - char trigger_write; if (DISABLE_BGP_ANNOUNCE) return; @@ -501,20 +500,9 @@ void bgp_adj_out_unset_subgroup(struct bgp_node *rn, adv->rn = rn; adv->adj = adj; - /* Note if we need to trigger a packet write */ - if (BGP_ADV_FIFO_EMPTY(&subgrp->sync->withdraw)) - trigger_write = 1; - else - trigger_write = 0; - /* Add to synchronization entry for withdraw * announcement. */ BGP_ADV_FIFO_ADD(&subgrp->sync->withdraw, &adv->fifo); - - /* Schedule packet write, if FIFO is getting its first - * entry. */ - if (trigger_write) - subgroup_trigger_write(subgrp); } else { /* Remove myself from adjacency. */ BGP_ADJ_OUT_DEL(rn, adj); diff --git a/bgpd/bgp_updgrp_packet.c b/bgpd/bgp_updgrp_packet.c index a35d814e47..77b3ce1937 100644 --- a/bgpd/bgp_updgrp_packet.c +++ b/bgpd/bgp_updgrp_packet.c @@ -633,7 +633,6 @@ struct stream *bpacket_reformat_for_peer(struct bpacket *pkt, } } - bgp_packet_add(peer, s); return s; } @@ -1149,7 +1148,6 @@ void subgroup_default_update_packet(struct update_subgroup *subgrp, bgp_packet_set_size(s); (void)bpacket_queue_add(SUBGRP_PKTQ(subgrp), s, &vecarr); - subgroup_trigger_write(subgrp); } void subgroup_default_withdraw_packet(struct update_subgroup *subgrp) @@ -1242,7 +1240,6 @@ void subgroup_default_withdraw_packet(struct update_subgroup *subgrp) bgp_packet_set_size(s); (void)bpacket_queue_add(SUBGRP_PKTQ(subgrp), s, NULL); - subgroup_trigger_write(subgrp); } static void diff --git a/bgpd/bgpd.c b/bgpd/bgpd.c index a4952be8a6..cee73e2c43 100644 --- a/bgpd/bgpd.c +++ b/bgpd/bgpd.c @@ -990,7 +990,7 @@ static void peer_free(struct peer *peer) */ bgp_timer_set(peer); BGP_READ_OFF(peer->t_read); - BGP_WRITE_OFF(peer->t_write); + peer_writes_off(peer); BGP_EVENT_FLUSH(peer); /* Free connected nexthop, if present */ @@ -1137,6 +1137,7 @@ struct peer *peer_new(struct bgp *bgp) /* Create buffers. */ peer->ibuf = stream_new(BGP_MAX_PACKET_SIZE); peer->obuf = stream_fifo_new(); + pthread_mutex_init(&peer->obuf_mtx, NULL); /* We use a larger buffer for peer->work in the event that: * - We RX a BGP_UPDATE where the attributes alone are just diff --git a/bgpd/bgpd.h b/bgpd/bgpd.h index 36bdaf0125..f427c5d9f5 100644 --- a/bgpd/bgpd.h +++ b/bgpd/bgpd.h @@ -22,6 +22,8 @@ #define _QUAGGA_BGPD_H #include "qobj.h" +#include <pthread.h> + #include "lib/json.h" #include "vrf.h" #include "vty.h" @@ -584,6 +586,7 @@ struct peer { /* Packet receive and send buffer. */ struct stream *ibuf; + pthread_mutex_t obuf_mtx; struct stream_fifo *obuf; struct stream *work; |
