EVENT_OFF(keeper->t_delayopen);
EVENT_OFF(keeper->t_connect_check_r);
EVENT_OFF(keeper->t_connect_check_w);
- EVENT_OFF(keeper->t_process_packet);
+
+ frr_with_mutex (&bm->peer_connection_mtx) {
+ if (peer_connection_fifo_member(&bm->connection_fifo, keeper))
+ peer_connection_fifo_del(&bm->connection_fifo, keeper);
+ }
/*
* At this point in time, it is possible that there are packets pending
bgp_reads_on(keeper);
bgp_writes_on(keeper);
- event_add_event(bm->master, bgp_process_packet, keeper, 0,
- &keeper->t_process_packet);
+
+ frr_with_mutex (&bm->peer_connection_mtx) {
+ if (!peer_connection_fifo_member(&bm->connection_fifo, keeper)) {
+ peer_connection_fifo_add_tail(&bm->connection_fifo, keeper);
+ }
+ }
+ event_add_event(bm->master, bgp_process_packet, NULL, 0, &bm->e_process_packet);
return (peer);
}
assert(fpt->running);
event_cancel_async(fpt->master, &connection->t_read, NULL);
- EVENT_OFF(connection->t_process_packet);
+
+ frr_with_mutex (&bm->peer_connection_mtx) {
+ if (peer_connection_fifo_member(&bm->connection_fifo, connection))
+ peer_connection_fifo_del(&bm->connection_fifo, connection);
+ }
UNSET_FLAG(connection->thread_flags, PEER_THREAD_READS_ON);
}
event_add_read(fpt->master, bgp_process_reads, connection,
connection->fd, &connection->t_read);
- if (added_pkt)
- event_add_event(bm->master, bgp_process_packet, connection, 0,
- &connection->t_process_packet);
+ if (added_pkt) {
+ frr_with_mutex (&bm->peer_connection_mtx) {
+ if (!peer_connection_fifo_member(&bm->connection_fifo, connection))
+ peer_connection_fifo_add_tail(&bm->connection_fifo, connection);
+ }
+ event_add_event(bm->master, bgp_process_packet, NULL, 0, &bm->e_process_packet);
+ }
}
/*
* would not, making event flow difficult to understand. Please think twice
* before hacking this.
*
+ * packet_processing is now a FIFO of connections that need to be handled
+ * This loop has a maximum run of 100(BGP_PACKET_PROCESS_LIMIT) packets,
+ * but each individual connection can only handle the quanta value as
+ * specified in bgp_vty.c. If the connection still has work to do, place it
+ * back on the back of the queue for more work. Do note that event_should_yield
+ * is also being called to figure out if processing should stop and work
+ * picked up after other items can run. This was added *After* withdrawals
+ * started being processed at scale and this function was taking cpu for 40+ seconds
+ * On my machine we are getting 2-3 packets before a yield should happen in the
+ * update case. Withdrawal is 1 packet being processed(note this is a very very
+ * fast computer) before other items should be run.
+ *
* Thread type: EVENT_EVENT
* @param thread
* @return 0
uint32_t rpkt_quanta_old; // how many packets to read
int fsm_update_result; // return code of bgp_event_update()
int mprc; // message processing return code
+ uint32_t processed = 0, curr_connection_processed = 0;
+ bool more_work = false;
+ size_t count;
+ uint32_t total_packets_to_process, total_processed = 0;
+
+ frr_with_mutex (&bm->peer_connection_mtx)
+ connection = peer_connection_fifo_pop(&bm->connection_fifo);
+
+ if (!connection)
+ goto done;
- connection = EVENT_ARG(thread);
+ total_packets_to_process = BGP_PACKET_PROCESS_LIMIT;
peer = connection->peer;
rpkt_quanta_old = atomic_load_explicit(&peer->bgp->rpkt_quanta,
memory_order_relaxed);
+
fsm_update_result = 0;
- /* Guard against scheduled events that occur after peer deletion. */
- if (connection->status == Deleted || connection->status == Clearing)
- return;
+ while ((processed < total_packets_to_process) && connection) {
+ total_processed++;
+ /* Guard against scheduled events that occur after peer deletion. */
+ if (connection->status == Deleted || connection->status == Clearing) {
+ frr_with_mutex (&bm->peer_connection_mtx)
+ connection = peer_connection_fifo_pop(&bm->connection_fifo);
- unsigned int processed = 0;
+ if (connection)
+ peer = connection->peer;
+
+ continue;
+ }
- while (processed < rpkt_quanta_old) {
uint8_t type = 0;
bgp_size_t size;
char notify_data_length[2];
- frr_with_mutex (&connection->io_mtx) {
+ frr_with_mutex (&connection->io_mtx)
peer->curr = stream_fifo_pop(connection->ibuf);
- }
- if (peer->curr == NULL) // no packets to process, hmm...
- return;
+ if (peer->curr == NULL) {
+ frr_with_mutex (&bm->peer_connection_mtx)
+ connection = peer_connection_fifo_pop(&bm->connection_fifo);
+
+
+ if (connection)
+ peer = connection->peer;
+
+ continue;
+ }
/* skip the marker and copy the packet length */
stream_forward_getp(peer->curr, BGP_MARKER_SIZE);
stream_free(peer->curr);
peer->curr = NULL;
processed++;
+ curr_connection_processed++;
/* Update FSM */
if (mprc != BGP_PACKET_NOOP)
fsm_update_result = bgp_event_update(connection, mprc);
- else
- continue;
/*
* If peer was deleted, do not process any more packets. This
* is usually due to executing BGP_Stop or a stub deletion.
*/
- if (fsm_update_result == FSM_PEER_TRANSFERRED
- || fsm_update_result == FSM_PEER_STOPPED)
- break;
+ if (fsm_update_result == FSM_PEER_TRANSFERRED ||
+ fsm_update_result == FSM_PEER_STOPPED) {
+ frr_with_mutex (&bm->peer_connection_mtx)
+ connection = peer_connection_fifo_pop(&bm->connection_fifo);
+
+ if (connection)
+ peer = connection->peer;
+
+ continue;
+ }
+
+ bool yield = event_should_yield(thread);
+ if (curr_connection_processed >= rpkt_quanta_old || yield) {
+ curr_connection_processed = 0;
+ frr_with_mutex (&bm->peer_connection_mtx) {
+ if (!peer_connection_fifo_member(&bm->connection_fifo, connection))
+ peer_connection_fifo_add_tail(&bm->connection_fifo,
+ connection);
+ if (!yield)
+ connection = peer_connection_fifo_pop(&bm->connection_fifo);
+ else
+ connection = NULL;
+ }
+ if (connection)
+ peer = connection->peer;
+
+ continue;
+ }
+
+ frr_with_mutex (&connection->io_mtx) {
+ if (connection->ibuf->count > 0)
+ more_work = true;
+ else
+ more_work = false;
+ }
+
+ if (!more_work) {
+ frr_with_mutex (&bm->peer_connection_mtx)
+ connection = peer_connection_fifo_pop(&bm->connection_fifo);
+
+ if (connection)
+ peer = connection->peer;
+ }
}
- if (fsm_update_result != FSM_PEER_TRANSFERRED
- && fsm_update_result != FSM_PEER_STOPPED) {
+ if (connection) {
frr_with_mutex (&connection->io_mtx) {
- // more work to do, come back later
if (connection->ibuf->count > 0)
- event_add_event(bm->master, bgp_process_packet,
- connection, 0,
- &connection->t_process_packet);
+ more_work = true;
+ else
+ more_work = false;
+ }
+ frr_with_mutex (&bm->peer_connection_mtx) {
+ if (more_work &&
+ !peer_connection_fifo_member(&bm->connection_fifo, connection))
+ peer_connection_fifo_add_tail(&bm->connection_fifo, connection);
}
}
+
+done:
+ frr_with_mutex (&bm->peer_connection_mtx)
+ count = peer_connection_fifo_count(&bm->connection_fifo);
+
+ if (count)
+ event_add_event(bm->master, bgp_process_packet, NULL, 0, &bm->e_process_packet);
}
/* Send EOR when routes are processed by selection deferral timer */
extern struct frr_pthread *bgp_pth_io;
extern struct frr_pthread *bgp_pth_ka;
+/* FIFO list for peer connections */
+PREDECL_LIST(peer_connection_fifo);
+
/* BGP master for system wide configurations and variables. */
struct bgp_master {
/* BGP instance list. */
/* BGP port number. */
uint16_t port;
+ /* FIFO list head for peer connections */
+ struct peer_connection_fifo_head connection_fifo;
+ struct event *e_process_packet;
+ pthread_mutex_t peer_connection_mtx;
+
/* Listener addresses */
struct list *addresses;
struct event *t_pmax_restart;
struct event *t_routeadv;
- struct event *t_process_packet;
struct event *t_stop_with_notify;
union sockunion *su_local; /* Sockunion of local address. */
union sockunion *su_remote; /* Sockunion of remote address. */
+
+ /* For FIFO list */
+ struct peer_connection_fifo_item fifo_item;
};
+
+/* Declare the FIFO list implementation */
+DECLARE_LIST(peer_connection_fifo, struct peer_connection, fifo_item);
+
const char *bgp_peer_get_connection_direction(struct peer_connection *connection);
extern struct peer_connection *bgp_peer_connection_new(struct peer *peer);
extern void bgp_peer_connection_free(struct peer_connection **connection);