diff options
| author | Quentin Young <qlyoung@cumulusnetworks.com> | 2017-02-06 23:39:06 +0000 | 
|---|---|---|
| committer | Quentin Young <qlyoung@cumulusnetworks.com> | 2017-11-30 16:17:57 -0500 | 
| commit | d3ecc69e5fba1873872a1f4dc359ff1934f81848 (patch) | |
| tree | bcdf393161c7f7ca5e8fffbb1208362904bb80a6 | |
| parent | 69df82f3b5d3aa343c769f280fb811ce019601d4 (diff) | |
bgpd: move packet writes into dedicated pthread
* BGP_WRITE_ON() removed
* BGP_WRITE_OFF() removed
* peer_writes_on() added
* peer_writes_off() added
* bgp_write_proceed_actions() removed
Signed-off-by: Quentin Young <qlyoung@cumulusnetworks.com>
| -rw-r--r-- | bgpd/bgp_fsm.c | 41 | ||||
| -rw-r--r-- | bgpd/bgp_fsm.h | 12 | ||||
| -rw-r--r-- | bgpd/bgp_main.c | 4 | ||||
| -rw-r--r-- | bgpd/bgp_packet.c | 581 | ||||
| -rw-r--r-- | bgpd/bgp_packet.h | 11 | ||||
| -rw-r--r-- | bgpd/bgp_updgrp.c | 17 | ||||
| -rw-r--r-- | bgpd/bgp_updgrp.h | 1 | ||||
| -rw-r--r-- | bgpd/bgp_updgrp_adv.c | 12 | ||||
| -rw-r--r-- | bgpd/bgp_updgrp_packet.c | 3 | ||||
| -rw-r--r-- | bgpd/bgpd.c | 3 | ||||
| -rw-r--r-- | bgpd/bgpd.h | 3 | 
11 files changed, 365 insertions, 323 deletions
diff --git a/bgpd/bgp_fsm.c b/bgpd/bgp_fsm.c index 8de7e970de..bc4f8272f3 100644 --- a/bgpd/bgp_fsm.c +++ b/bgpd/bgp_fsm.c @@ -125,9 +125,9 @@ static struct peer *peer_xfer_conn(struct peer *from_peer)  			   from_peer->host, from_peer, from_peer->fd, peer,  			   peer->fd); -	BGP_WRITE_OFF(peer->t_write); +	peer_writes_off(peer);  	BGP_READ_OFF(peer->t_read); -	BGP_WRITE_OFF(from_peer->t_write); +	peer_writes_off(from_peer);  	BGP_READ_OFF(from_peer->t_read);  	BGP_TIMER_OFF(peer->t_routeadv); @@ -137,8 +137,18 @@ static struct peer *peer_xfer_conn(struct peer *from_peer)  	peer->fd = from_peer->fd;  	from_peer->fd = fd;  	stream_reset(peer->ibuf); -	stream_fifo_clean(peer->obuf); -	stream_fifo_clean(from_peer->obuf); + +	pthread_mutex_lock(&peer->obuf_mtx); +	{ +		stream_fifo_clean(peer->obuf); +	} +	pthread_mutex_unlock(&peer->obuf_mtx); + +	pthread_mutex_lock(&from_peer->obuf_mtx); +	{ +		stream_fifo_clean(from_peer->obuf); +	} +	pthread_mutex_unlock(&from_peer->obuf_mtx);  	peer->as = from_peer->as;  	peer->v_holdtime = from_peer->v_holdtime; @@ -217,7 +227,7 @@ static struct peer *peer_xfer_conn(struct peer *from_peer)  	}  	BGP_READ_ON(peer->t_read, bgp_read, peer->fd); -	BGP_WRITE_ON(peer->t_write, bgp_write, peer->fd); +	peer_writes_on(peer);  	if (from_peer)  		peer_xfer_stats(peer, from_peer); @@ -433,8 +443,6 @@ int bgp_routeadv_timer(struct thread *thread)  	peer->synctime = bgp_clock(); -	BGP_WRITE_ON(peer->t_write, bgp_write, peer->fd); -  	/* MRAI timer will be started again when FIFO is built, no need to  	 * do it here.  	 */ @@ -640,7 +648,6 @@ void bgp_adjust_routeadv(struct peer *peer)  			BGP_TIMER_OFF(peer->t_routeadv);  		peer->synctime = bgp_clock(); -		BGP_WRITE_ON(peer->t_write, bgp_write, peer->fd);  		return;  	} @@ -956,6 +963,9 @@ int bgp_stop(struct peer *peer)  	char orf_name[BUFSIZ];  	int ret = 0; +	// immediately remove from threads +	peer_writes_off(peer); +  	if (peer_dynamic_neighbor(peer)  	    && !(CHECK_FLAG(peer->flags, PEER_FLAG_DELETE))) {  		if (bgp_debug_neighbor_events(peer)) @@ -1037,7 +1047,7 @@ int bgp_stop(struct peer *peer)  	/* Stop read and write threads when exists. */  	BGP_READ_OFF(peer->t_read); -	BGP_WRITE_OFF(peer->t_write); +	peer_writes_off(peer);  	/* Stop all timers. */  	BGP_TIMER_OFF(peer->t_start); @@ -1054,8 +1064,13 @@ int bgp_stop(struct peer *peer)  		stream_reset(peer->ibuf);  	if (peer->work)  		stream_reset(peer->work); -	if (peer->obuf) -		stream_fifo_clean(peer->obuf); + +	pthread_mutex_lock(&peer->obuf_mtx); +	{ +		if (peer->obuf) +			stream_fifo_clean(peer->obuf); +	} +	pthread_mutex_unlock(&peer->obuf_mtx);  	/* Close of file descriptor. */  	if (peer->fd >= 0) { @@ -1173,6 +1188,8 @@ static int bgp_connect_success(struct peer *peer)  		return -1;  	} +	peer_writes_on(peer); +  	if (bgp_getsockname(peer) < 0) {  		zlog_err("%s: bgp_getsockname(): failed for peer %s, fd %d",  			 __FUNCTION__, peer->host, peer->fd); @@ -1313,7 +1330,7 @@ int bgp_start(struct peer *peer)  			return -1;  		}  		BGP_READ_ON(peer->t_read, bgp_read, peer->fd); -		BGP_WRITE_ON(peer->t_write, bgp_write, peer->fd); +		peer_writes_on(peer);  		break;  	}  	return 0; diff --git a/bgpd/bgp_fsm.h b/bgpd/bgp_fsm.h index 51d5d7aaa8..a7abfdb2f4 100644 --- a/bgpd/bgp_fsm.h +++ b/bgpd/bgp_fsm.h @@ -35,18 +35,6 @@  			THREAD_READ_OFF(T);                                    \  	} while (0) -#define BGP_WRITE_ON(T, F, V)                                                  \ -	do {                                                                   \ -		if ((peer)->status != Deleted)                                 \ -			thread_add_write(bm->master, (F), (peer), (V), &(T));  \ -	} while (0) - -#define BGP_PEER_WRITE_ON(T, F, V, peer)                                       \ -	do {                                                                   \ -		if ((peer)->status != Deleted)                                 \ -			thread_add_write(bm->master, (F), (peer), (V), &(T));  \ -	} while (0) -  #define BGP_WRITE_OFF(T)                                                       \  	do {                                                                   \  		if (T)                                                         \ diff --git a/bgpd/bgp_main.c b/bgpd/bgp_main.c index 1fac2936eb..b4ea45b3e0 100644 --- a/bgpd/bgp_main.c +++ b/bgpd/bgp_main.c @@ -54,6 +54,7 @@  #include "bgpd/bgp_debug.h"  #include "bgpd/bgp_filter.h"  #include "bgpd/bgp_zebra.h" +#include "bgpd/bgp_packet.h"  #ifdef ENABLE_BGP_VNC  #include "bgpd/rfapi/rfapi_backend.h" @@ -392,6 +393,9 @@ int main(int argc, char **argv)  	snprintf(bgpd_di.startinfo, sizeof(bgpd_di.startinfo), ", bgp@%s:%d",  		 (bm->address ? bm->address : "<all>"), bm->port); +	pthread_t packet_writes; +	pthread_create(&packet_writes, NULL, &peer_writes_start, NULL); +  	frr_config_fork();  	frr_run(bm->master); diff --git a/bgpd/bgp_packet.c b/bgpd/bgp_packet.c index a955b3512c..e27b416d07 100644 --- a/bgpd/bgp_packet.c +++ b/bgpd/bgp_packet.c @@ -19,6 +19,7 @@   */  #include <zebra.h> +#include <sys/time.h>  #include "thread.h"  #include "stream.h" @@ -55,6 +56,13 @@  #include "bgpd/bgp_updgrp.h"  #include "bgpd/bgp_label.h" +/* Linked list of active peers */ +static pthread_mutex_t plist_mtx = PTHREAD_MUTEX_INITIALIZER; +static pthread_cond_t write_cond = PTHREAD_COND_INITIALIZER; +static struct list *plist; + +bool bgp_packet_writes_thread_run; +  /* Set up BGP packet marker and packet type. */  int bgp_packet_set_marker(struct stream *s, u_char type)  { @@ -87,21 +95,40 @@ int bgp_packet_set_size(struct stream *s)  	return cp;  } -/* Add new packet to the peer. */ -void bgp_packet_add(struct peer *peer, struct stream *s) +/** + * Push a packet onto the beginning of the peer's output queue. + * Must be externally synchronized around 'peer'. + */ +static void bgp_packet_add_unsafe(struct peer *peer, struct stream *s)  {  	/* Add packet to the end of list. */  	stream_fifo_push(peer->obuf, s); +	peer_writes_wake(); +} + +/* + * Push a packet onto the beginning of the peer's output queue. + * This function acquires the peer's write mutex before proceeding. + */ +static void bgp_packet_add(struct peer *peer, struct stream *s) +{ +	pthread_mutex_lock(&peer->obuf_mtx); +	bgp_packet_add_unsafe(peer, s); +	pthread_mutex_unlock(&peer->obuf_mtx);  } -/* Free first packet. */ -static void bgp_packet_delete(struct peer *peer) +/** + * Pop a packet off the end of the peer's output queue. + * Must be externally synchronized around 'peer'. + */ +static void bgp_packet_delete_unsafe(struct peer *peer)  {  	stream_free(stream_fifo_pop(peer->obuf));  } +  /* Check file descriptor whether connect is established. */ -int bgp_connect_check(struct peer *peer, int change_state) +static int bgp_connect_check(struct peer *peer, int change_state)  {  	int status;  	socklen_t slen; @@ -109,7 +136,6 @@ int bgp_connect_check(struct peer *peer, int change_state)  	/* Anyway I have to reset read and write thread. */  	BGP_READ_OFF(peer->t_read); -	BGP_WRITE_OFF(peer->t_write);  	/* Check file descriptor. */  	slen = sizeof(status); @@ -176,7 +202,7 @@ static struct stream *bgp_update_packet_eor(struct peer *peer, afi_t afi,  	}  	bgp_packet_set_size(s); -	bgp_packet_add(peer, s); +	bgp_packet_add_unsafe(peer, s);  	return s;  } @@ -248,246 +274,21 @@ static struct stream *bgp_write_packet(struct peer *peer)  		} -		/* -		 * Found a packet template to send, overwrite packet -		 * with appropriate -		 * attributes from peer and advance peer -		 */ -		s = bpacket_reformat_for_peer(next_pkt, paf); -		bpacket_queue_advance_peer(paf); -		return s; -	} - -	return NULL; -} - -/* The next action for the peer from a write perspective */ -static void bgp_write_proceed_actions(struct peer *peer) -{ -	afi_t afi; -	safi_t safi; -	struct peer_af *paf; -	struct bpacket *next_pkt; -	int fullq_found = 0; -	struct update_subgroup *subgrp; - -	if (stream_fifo_head(peer->obuf)) { -		BGP_WRITE_ON(peer->t_write, bgp_write, peer->fd); -		return; -	} - -	FOREACH_AFI_SAFI (afi, safi) { -		paf = peer_af_find(peer, afi, safi); -		if (!paf) -			continue; -		subgrp = paf->subgroup; -		if (!subgrp) -			continue; - -		next_pkt = paf->next_pkt_to_send; -		if (next_pkt && next_pkt->buffer) { -			BGP_WRITE_ON(peer->t_write, bgp_write, peer->fd); -			return; -		} - -		/* No packets readily available for AFI/SAFI, are there -		 * subgroup packets -		 * that need to be generated? */ -		if (bpacket_queue_is_full(SUBGRP_INST(subgrp), -					  SUBGRP_PKTQ(subgrp))) -			fullq_found = 1; -		else if (subgroup_packets_to_build(subgrp)) { -			BGP_WRITE_ON(peer->t_write, bgp_write, peer->fd); -			return; -		} - -		/* No packets to send, see if EOR is pending */ -		if (CHECK_FLAG(peer->cap, PEER_CAP_RESTART_RCV)) { -			if (!subgrp->t_coalesce && peer->afc_nego[afi][safi] -			    && peer->synctime -			    && !CHECK_FLAG(peer->af_sflags[afi][safi], -					   PEER_STATUS_EOR_SEND) -			    && safi != SAFI_MPLS_VPN) { -				BGP_WRITE_ON(peer->t_write, bgp_write, -					     peer->fd); -				return; -			} -		} -	} -	if (fullq_found) { -		BGP_WRITE_ON(peer->t_write, bgp_write, peer->fd); -		return; -	} -} - -/* Write packet to the peer. */ -int bgp_write(struct thread *thread) -{ -	struct peer *peer; -	u_char type; -	struct stream *s; -	int num; -	int update_last_write = 0; -	unsigned int count = 0; -	unsigned int oc = 0; - -	/* Yes first of all get peer pointer. */ -	peer = THREAD_ARG(thread); -	peer->t_write = NULL; - -	/* For non-blocking IO check. */ -	if (peer->status == Connect) { -		bgp_connect_check(peer, 1); -		return 0; -	} - -	s = bgp_write_packet(peer); -	if (!s) { -		bgp_write_proceed_actions(peer); -		return 0; -	} - -	sockopt_cork(peer->fd, 1); - -	oc = peer->update_out; - -	/* Nonblocking write until TCP output buffer is full.  */ -	do { -		int writenum; - -		/* Number of bytes to be sent.  */ -		writenum = stream_get_endp(s) - stream_get_getp(s); - -		/* Call write() system call.  */ -		num = write(peer->fd, STREAM_PNT(s), writenum); -		if (num < 0) { -			/* write failed either retry needed or error */ -			if (ERRNO_IO_RETRY(errno)) -				break; - -			BGP_EVENT_ADD(peer, TCP_fatal_error); -			return 0; -		} - -		if (num != writenum) { -			/* Partial write */ -			stream_forward_getp(s, num); -			break; -		} - -		/* Retrieve BGP packet type. */ -		stream_set_getp(s, BGP_MARKER_SIZE + 2); -		type = stream_getc(s); - -		switch (type) { -		case BGP_MSG_OPEN: -			peer->open_out++; -			break; -		case BGP_MSG_UPDATE: -			peer->update_out++; -			break; -		case BGP_MSG_NOTIFY: -			peer->notify_out++; -			/* Double start timer. */ -			peer->v_start *= 2; - -			/* Overflow check. */ -			if (peer->v_start >= (60 * 2)) -				peer->v_start = (60 * 2); - -			/* Flush any existing events */ -			BGP_EVENT_ADD(peer, BGP_Stop); -			goto done; - -		case BGP_MSG_KEEPALIVE: -			peer->keepalive_out++; -			break; -		case BGP_MSG_ROUTE_REFRESH_NEW: -		case BGP_MSG_ROUTE_REFRESH_OLD: -			peer->refresh_out++; -			break; -		case BGP_MSG_CAPABILITY: -			peer->dynamic_cap_out++; -			break; +			/* Found a packet template to send, overwrite packet +			 * with appropriate +			 * attributes from peer and advance peer */ +			s = bpacket_reformat_for_peer(next_pkt, paf); +			bgp_packet_add_unsafe(peer, s); +			bpacket_queue_advance_peer(paf); +			return s;  		} -		/* OK we send packet so delete it. */ -		bgp_packet_delete(peer); -		update_last_write = 1; -	} while (++count < peer->bgp->wpkt_quanta -		 && (s = bgp_write_packet(peer)) != NULL); - -	bgp_write_proceed_actions(peer); - -done: -	/* Update last_update if UPDATEs were written. */ -	if (peer->update_out > oc) -		peer->last_update = bgp_clock(); - -	/* If we TXed any flavor of packet update last_write */ -	if (update_last_write) -		peer->last_write = bgp_clock(); - -	sockopt_cork(peer->fd, 0); -	return 0; -} - -/* This is only for sending NOTIFICATION message to neighbor. */ -static int bgp_write_notify(struct peer *peer) -{ -	int ret, val; -	u_char type; -	struct stream *s; - -	/* There should be at least one packet. */ -	s = stream_fifo_head(peer->obuf); -	if (!s) -		return 0; -	assert(stream_get_endp(s) >= BGP_HEADER_SIZE); - -	/* Stop collecting data within the socket */ -	sockopt_cork(peer->fd, 0); - -	/* socket is in nonblocking mode, if we can't deliver the NOTIFY, well, -	 * we only care about getting a clean shutdown at this point. */ -	ret = write(peer->fd, STREAM_DATA(s), stream_get_endp(s)); - -	/* only connection reset/close gets counted as TCP_fatal_error, failure -	 * to write the entire NOTIFY doesn't get different FSM treatment */ -	if (ret <= 0) { -		BGP_EVENT_ADD(peer, TCP_fatal_error); -		return 0; -	} - -	/* Disable Nagle, make NOTIFY packet go out right away */ -	val = 1; -	(void)setsockopt(peer->fd, IPPROTO_TCP, TCP_NODELAY, (char *)&val, -			 sizeof(val)); - -	/* Retrieve BGP packet type. */ -	stream_set_getp(s, BGP_MARKER_SIZE + 2); -	type = stream_getc(s); - -	assert(type == BGP_MSG_NOTIFY); - -	/* Type should be notify. */ -	peer->notify_out++; - -	/* Double start timer. */ -	peer->v_start *= 2; - -	/* Overflow check. */ -	if (peer->v_start >= (60 * 2)) -		peer->v_start = (60 * 2); - -	/* Handle Graceful Restart case where the state changes to -	   Connect instead of Idle */ -	BGP_EVENT_ADD(peer, BGP_Stop); - -	return 0; +	return NULL;  } -/* Make keepalive packet and send it to the peer. */ +/* + * Creates a BGP Keepalive packet and appends it to the peer's output queue. + */  void bgp_keepalive_send(struct peer *peer)  {  	struct stream *s; @@ -508,11 +309,12 @@ void bgp_keepalive_send(struct peer *peer)  	/* Add packet to the peer. */  	bgp_packet_add(peer, s); - -	BGP_WRITE_ON(peer->t_write, bgp_write, peer->fd);  } -/* Make open packet and send it to the peer. */ +/* + * Creates a BGP Open packet and appends it to the peer's output queue. + * Sets capabilities as necessary. + */  void bgp_open_send(struct peer *peer)  {  	struct stream *s; @@ -560,11 +362,20 @@ void bgp_open_send(struct peer *peer)  	/* Add packet to the peer. */  	bgp_packet_add(peer, s); - -	BGP_WRITE_ON(peer->t_write, bgp_write, peer->fd);  } -/* Send BGP notify packet with data potion. */ +/* + * Creates a BGP Notify and appends it to the peer's output queue. + * + * This function awakens the write thread to ensure the packet + * gets out ASAP. + * + * @param peer + * @param code      BGP error code + * @param sub_code  BGP error subcode + * @param data      Data portion + * @param datalen   length of data portion + */  void bgp_notify_send_with_data(struct peer *peer, u_char code, u_char sub_code,  			       u_char *data, size_t datalen)  { @@ -574,7 +385,7 @@ void bgp_notify_send_with_data(struct peer *peer, u_char code, u_char sub_code,  	/* Allocate new stream. */  	s = stream_new(BGP_MAX_PACKET_SIZE); -	/* Make nitify packet. */ +	/* Make notify packet. */  	bgp_packet_set_marker(s, BGP_MSG_NOTIFY);  	/* Set notify packet values. */ @@ -589,8 +400,9 @@ void bgp_notify_send_with_data(struct peer *peer, u_char code, u_char sub_code,  	length = bgp_packet_set_size(s);  	/* Add packet to the peer. */ +	pthread_mutex_lock(&peer->obuf_mtx);  	stream_fifo_clean(peer->obuf); -	bgp_packet_add(peer, s); +	pthread_mutex_unlock(&peer->obuf_mtx);  	/* For debug */  	{ @@ -641,19 +453,37 @@ void bgp_notify_send_with_data(struct peer *peer, u_char code, u_char sub_code,  	} else  		peer->last_reset = PEER_DOWN_NOTIFY_SEND; -	/* Call immediately. */ -	BGP_WRITE_OFF(peer->t_write); - -	bgp_write_notify(peer); +	/* Add packet to peer's output queue */ +	bgp_packet_add(peer, s); +	/* Wake up the write thread to get the notify out ASAP */ +	peer_writes_wake();  } -/* Send BGP notify packet. */ +/* + * Creates a BGP Notify and appends it to the peer's output queue. + * + * This function awakens the write thread to ensure the packet + * gets out ASAP. + * + * @param peer + * @param code      BGP error code + * @param sub_code  BGP error subcode + */  void bgp_notify_send(struct peer *peer, u_char code, u_char sub_code)  {  	bgp_notify_send_with_data(peer, code, sub_code, NULL, 0);  } -/* Send route refresh message to the peer. */ +/* + * Creates BGP Route Refresh packet and appends it to the peer's output queue. + * + * @param peer + * @param afi               Address Family Identifier + * @param safi              Subsequent Address Family Identifier + * @param orf_type          Outbound Route Filtering type + * @param when_to_refresh   Whether to refresh immediately or defer + * @param remove            Whether to remove ORF for specified AFI/SAFI + */  void bgp_route_refresh_send(struct peer *peer, afi_t afi, safi_t safi,  			    u_char orf_type, u_char when_to_refresh, int remove)  { @@ -741,11 +571,17 @@ void bgp_route_refresh_send(struct peer *peer, afi_t afi, safi_t safi,  	/* Add packet to the peer. */  	bgp_packet_add(peer, s); - -	BGP_WRITE_ON(peer->t_write, bgp_write, peer->fd);  } -/* Send capability message to the peer. */ +/* + * Create a BGP Capability packet and append it to the peer's output queue. + * + * @param peer + * @param afi              Address Family Identifier + * @param safi             Subsequent Address Family Identifier + * @param capability_code  BGP Capability Code + * @param action           Set or Remove capability + */  void bgp_capability_send(struct peer *peer, afi_t afi, safi_t safi,  			 int capability_code, int action)  { @@ -784,8 +620,6 @@ void bgp_capability_send(struct peer *peer, afi_t afi, safi_t safi,  	/* Add packet to the peer. */  	bgp_packet_add(peer, s); - -	BGP_WRITE_ON(peer->t_write, bgp_write, peer->fd);  }  /* RFC1771 6.8 Connection collision detection. */ @@ -2340,3 +2174,226 @@ done:  	return 0;  } + +/* ------------- write thread ------------------ */ + +/** + * Flush peer output buffer. + * + * This function pops packets off of peer->obuf and writes them to peer->fd. + * The amount of packets written is equal to the minimum of peer->wpkt_quanta + * and the number of packets on the output buffer. + * + * If write() returns an error, the appropriate FSM event is generated. + * + * The return value is equal to the number of packets written + * (which may be zero). + */ +static int bgp_write(struct peer *peer) +{ +	u_char type; +	struct stream *s; +	int num; +	int update_last_write = 0; +	unsigned int count = 0; +	unsigned int oc = 0; + +	/* For non-blocking IO check. */ +	if (peer->status == Connect) { +		bgp_connect_check(peer, 1); +		return 0; +	} + +	/* Write packets. The number of packets written is the value of +	 * bgp->wpkt_quanta or the size of the output buffer, whichever is +	 * smaller.*/ +	while (count < peer->bgp->wpkt_quanta +	       && (s = bgp_write_packet(peer)) != NULL) { +		int writenum; +		do { // write a full packet, or return on error +			writenum = stream_get_endp(s) - stream_get_getp(s); +			num = write(peer->fd, STREAM_PNT(s), writenum); + +			if (num < 0) { +				if (ERRNO_IO_RETRY(errno)) +					continue; + +				BGP_EVENT_ADD(peer, TCP_fatal_error); +				goto done; +			} else if (num != writenum) // incomplete write +				stream_forward_getp(s, num); + +		} while (num != writenum); + +		/* Retrieve BGP packet type. */ +		stream_set_getp(s, BGP_MARKER_SIZE + 2); +		type = stream_getc(s); + +		switch (type) { +		case BGP_MSG_OPEN: +			peer->open_out++; +			break; +		case BGP_MSG_UPDATE: +			peer->update_out++; +			break; +		case BGP_MSG_NOTIFY: +			peer->notify_out++; +			/* Double start timer. */ +			peer->v_start *= 2; + +			/* Overflow check. */ +			if (peer->v_start >= (60 * 2)) +				peer->v_start = (60 * 2); + +			/* Handle Graceful Restart case where the state changes +			   to +			   Connect instead of Idle */ +			/* Flush any existing events */ +			BGP_EVENT_ADD(peer, BGP_Stop); +			goto done; + +		case BGP_MSG_KEEPALIVE: +			peer->keepalive_out++; +			break; +		case BGP_MSG_ROUTE_REFRESH_NEW: +		case BGP_MSG_ROUTE_REFRESH_OLD: +			peer->refresh_out++; +			break; +		case BGP_MSG_CAPABILITY: +			peer->dynamic_cap_out++; +			break; +		} + +		count++; +		/* OK we send packet so delete it. */ +		bgp_packet_delete_unsafe(peer); +		update_last_write = 1; +	} + +done : { +	/* Update last_update if UPDATEs were written. */ +	if (peer->update_out > oc) +		peer->last_update = bgp_clock(); + +	/* If we TXed any flavor of packet update last_write */ +	if (update_last_write) +		peer->last_write = bgp_clock(); +} + +	return count; +} + +static void cleanup_handler(void *arg) +{ +	if (plist) +		list_delete(plist); + +	plist = NULL; + +	pthread_mutex_unlock(&plist_mtx); +} + +/** + * Entry function for peer packet flushing pthread. + * + * The plist must be initialized before calling this. + */ +void *peer_writes_start(void *arg) +{ +	struct timeval currtime = {0, 0}; +	struct timeval sleeptime = {0, 500}; +	struct timespec next_update = {0, 0}; + +	// initialize +	pthread_mutex_lock(&plist_mtx); +	plist = list_new(); + +	struct listnode *ln; +	struct peer *peer; + +	pthread_cleanup_push(&cleanup_handler, NULL); + +	bgp_packet_writes_thread_run = true; + +	while (bgp_packet_writes_thread_run) { // wait around until next update +					       // time +		if (plist->count > 0) +			pthread_cond_timedwait(&write_cond, &plist_mtx, +					       &next_update); +		else // wait around until we have some peers +			while (plist->count == 0 +			       && bgp_packet_writes_thread_run) +				pthread_cond_wait(&write_cond, &plist_mtx); + +		for (ALL_LIST_ELEMENTS_RO(plist, ln, peer)) { +			pthread_mutex_lock(&peer->obuf_mtx); +			{ +				bgp_write(peer); +			} +			pthread_mutex_unlock(&peer->obuf_mtx); +		} + +		gettimeofday(&currtime, NULL); +		timeradd(&currtime, &sleeptime, &currtime); +		TIMEVAL_TO_TIMESPEC(&currtime, &next_update); +	} + +	// clean up +	pthread_cleanup_pop(1); + +	return NULL; +} + +/** + * Turns on packet writing for a peer. + */ +void peer_writes_on(struct peer *peer) +{ +	if (peer->status == Deleted) +		return; + +	pthread_mutex_lock(&plist_mtx); +	{ +		struct listnode *ln, *nn; +		struct peer *p; + +		// make sure this peer isn't already in the list +		for (ALL_LIST_ELEMENTS(plist, ln, nn, p)) +			if (p == peer) { +				pthread_mutex_unlock(&plist_mtx); +				return; +			} + +		peer_lock(peer); +		listnode_add(plist, peer); +	} +	pthread_mutex_unlock(&plist_mtx); +	peer_writes_wake(); +} + +/** + * Turns off packet writing for a peer. + */ +void peer_writes_off(struct peer *peer) +{ +	struct listnode *ln, *nn; +	struct peer *p; +	pthread_mutex_lock(&plist_mtx); +	{ +		for (ALL_LIST_ELEMENTS(plist, ln, nn, p)) +			if (p == peer) { +				list_delete_node(plist, ln); +				peer_unlock(peer); +				break; +			} +	} +	pthread_mutex_unlock(&plist_mtx); +} + +/** + * Wakes up the write thread to do work. + */ +void peer_writes_wake() +{ +	pthread_cond_signal(&write_cond); +} diff --git a/bgpd/bgp_packet.h b/bgpd/bgp_packet.h index 7bf498c37c..c18e4a2ebd 100644 --- a/bgpd/bgp_packet.h +++ b/bgpd/bgp_packet.h @@ -39,8 +39,6 @@  /* Packet send and receive function prototypes. */  extern int bgp_read(struct thread *); -extern int bgp_write(struct thread *); -extern int bgp_connect_check(struct peer *, int change_state);  extern void bgp_keepalive_send(struct peer *);  extern void bgp_open_send(struct peer *); @@ -65,6 +63,13 @@ extern void bgp_check_update_delay(struct bgp *);  extern int bgp_packet_set_marker(struct stream *s, u_char type);  extern int bgp_packet_set_size(struct stream *s); -extern void bgp_packet_add(struct peer *peer, struct stream *s); + +/* Control variable for write thread. */ +extern bool bgp_packet_writes_thread_run; + +extern void *peer_writes_start(void *arg); +extern void peer_writes_on(struct peer *peer); +extern void peer_writes_off(struct peer *peer); +extern void peer_writes_wake(void);  #endif /* _QUAGGA_BGP_PACKET_H */ diff --git a/bgpd/bgp_updgrp.c b/bgpd/bgp_updgrp.c index 8f67290600..585c6cebbc 100644 --- a/bgpd/bgp_updgrp.c +++ b/bgpd/bgp_updgrp.c @@ -1867,23 +1867,6 @@ void peer_af_announce_route(struct peer_af *paf, int combine)  			    subgrp->peer_count - 1);  } -void subgroup_trigger_write(struct update_subgroup *subgrp) -{ -	struct peer_af *paf; - -#if 0 -  if (bgp_debug_update(NULL, NULL, subgrp->update_group, 0)) -    zlog_debug("u%llu:s%llu scheduling write thread for peers", -               subgrp->update_group->id, subgrp->id); -#endif -	SUBGRP_FOREACH_PEER (subgrp, paf) { -		if (paf->peer->status == Established) { -			BGP_PEER_WRITE_ON(paf->peer->t_write, bgp_write, -					  paf->peer->fd, paf->peer); -		} -	} -} -  int update_group_clear_update_dbg(struct update_group *updgrp, void *arg)  {  	UPDGRP_PEER_DBG_OFF(updgrp); diff --git a/bgpd/bgp_updgrp.h b/bgpd/bgp_updgrp.h index 52a21679b8..a50bc05fed 100644 --- a/bgpd/bgp_updgrp.h +++ b/bgpd/bgp_updgrp.h @@ -442,7 +442,6 @@ extern void bgp_adj_out_unset_subgroup(struct bgp_node *rn,  				       char withdraw, u_int32_t addpath_tx_id);  void subgroup_announce_table(struct update_subgroup *subgrp,  			     struct bgp_table *table); -extern void subgroup_trigger_write(struct update_subgroup *subgrp);  extern int update_group_clear_update_dbg(struct update_group *updgrp,  					 void *arg); diff --git a/bgpd/bgp_updgrp_adv.c b/bgpd/bgp_updgrp_adv.c index b4f18c9f5e..1ec9915ee5 100644 --- a/bgpd/bgp_updgrp_adv.c +++ b/bgpd/bgp_updgrp_adv.c @@ -483,7 +483,6 @@ void bgp_adj_out_unset_subgroup(struct bgp_node *rn,  {  	struct bgp_adj_out *adj;  	struct bgp_advertise *adv; -	char trigger_write;  	if (DISABLE_BGP_ANNOUNCE)  		return; @@ -501,20 +500,9 @@ void bgp_adj_out_unset_subgroup(struct bgp_node *rn,  			adv->rn = rn;  			adv->adj = adj; -			/* Note if we need to trigger a packet write */ -			if (BGP_ADV_FIFO_EMPTY(&subgrp->sync->withdraw)) -				trigger_write = 1; -			else -				trigger_write = 0; -  			/* Add to synchronization entry for withdraw  			 * announcement.  */  			BGP_ADV_FIFO_ADD(&subgrp->sync->withdraw, &adv->fifo); - -			/* Schedule packet write, if FIFO is getting its first -			 * entry. */ -			if (trigger_write) -				subgroup_trigger_write(subgrp);  		} else {  			/* Remove myself from adjacency. */  			BGP_ADJ_OUT_DEL(rn, adj); diff --git a/bgpd/bgp_updgrp_packet.c b/bgpd/bgp_updgrp_packet.c index a35d814e47..77b3ce1937 100644 --- a/bgpd/bgp_updgrp_packet.c +++ b/bgpd/bgp_updgrp_packet.c @@ -633,7 +633,6 @@ struct stream *bpacket_reformat_for_peer(struct bpacket *pkt,  		}  	} -	bgp_packet_add(peer, s);  	return s;  } @@ -1149,7 +1148,6 @@ void subgroup_default_update_packet(struct update_subgroup *subgrp,  	bgp_packet_set_size(s);  	(void)bpacket_queue_add(SUBGRP_PKTQ(subgrp), s, &vecarr); -	subgroup_trigger_write(subgrp);  }  void subgroup_default_withdraw_packet(struct update_subgroup *subgrp) @@ -1242,7 +1240,6 @@ void subgroup_default_withdraw_packet(struct update_subgroup *subgrp)  	bgp_packet_set_size(s);  	(void)bpacket_queue_add(SUBGRP_PKTQ(subgrp), s, NULL); -	subgroup_trigger_write(subgrp);  }  static void diff --git a/bgpd/bgpd.c b/bgpd/bgpd.c index a4952be8a6..cee73e2c43 100644 --- a/bgpd/bgpd.c +++ b/bgpd/bgpd.c @@ -990,7 +990,7 @@ static void peer_free(struct peer *peer)  	 */  	bgp_timer_set(peer);  	BGP_READ_OFF(peer->t_read); -	BGP_WRITE_OFF(peer->t_write); +	peer_writes_off(peer);  	BGP_EVENT_FLUSH(peer);  	/* Free connected nexthop, if present */ @@ -1137,6 +1137,7 @@ struct peer *peer_new(struct bgp *bgp)  	/* Create buffers.  */  	peer->ibuf = stream_new(BGP_MAX_PACKET_SIZE);  	peer->obuf = stream_fifo_new(); +	pthread_mutex_init(&peer->obuf_mtx, NULL);  	/* We use a larger buffer for peer->work in the event that:  	 * - We RX a BGP_UPDATE where the attributes alone are just diff --git a/bgpd/bgpd.h b/bgpd/bgpd.h index 36bdaf0125..f427c5d9f5 100644 --- a/bgpd/bgpd.h +++ b/bgpd/bgpd.h @@ -22,6 +22,8 @@  #define _QUAGGA_BGPD_H  #include "qobj.h" +#include <pthread.h> +  #include "lib/json.h"  #include "vrf.h"  #include "vty.h" @@ -584,6 +586,7 @@ struct peer {  	/* Packet receive and send buffer. */  	struct stream *ibuf; +	pthread_mutex_t obuf_mtx;  	struct stream_fifo *obuf;  	struct stream *work;  | 
