{
/* Yes first of all get peer pointer. */
struct peer *peer;
+ uint32_t rpkt_quanta_old;
+
peer = THREAD_ARG(thread);
+ rpkt_quanta_old = atomic_load_explicit(&peer->bgp->rpkt_quanta,
+ memory_order_relaxed);
+
+ /*
+ * XXX: At present multiple packet reads per input cycle are
+ * problematic. The issue is that some of the packet processing
+ * functions perform their own FSM checks, that arguably should be
+ * located in bgp_fsm.c. For example if we are in OpenConfirm process a
+ * Keepalive, then a keepalive-received event is placed on the event
+ * queue to handle later. If we then process an Update before that
+ * event has popped, the update function checks that the peer status is
+ * in Established and if not tears down the session. Therefore we'll
+ * limit input processing to 1 packet per cycle, as it traditionally
+ * was, until this problem is rectified.
+ *
+ * @qlyoung June 2017
+ */
+ rpkt_quanta_old = 1;
/* Guard against scheduled events that occur after peer deletion. */
if (peer->status == Deleted || peer->status == Clearing)
return 0;
- int processed = 0;
+ unsigned int processed = 0;
- while (processed < 5 && peer->ibuf->count > 0) {
+ while (processed < rpkt_quanta_old) {
u_char type = 0;
bgp_size_t size;
char notify_data_length[2];
}
}
- if (peer->ibuf->count > 0) { // more work to do, come back later
- thread_add_background(bm->master, bgp_process_packet, peer, 0,
- &peer->t_process_packet);
+ pthread_mutex_lock(&peer->io_mtx);
+ {
+ if (peer->ibuf->count > 0) // more work to do, come back later
+ thread_add_event(bm->master, bgp_process_packet, peer,
+ 0, NULL);
}
+ pthread_mutex_unlock(&peer->io_mtx);
return 0;
}
#include "bgpd/bgp_packet.h"
#include "bgpd/bgp_updgrp.h"
#include "bgpd/bgp_bfd.h"
+#include "bgpd/bgp_io.h"
static struct peer_group *listen_range_exists(struct bgp *bgp,
struct prefix *range, int exact);
{
VTY_DECLVAR_CONTEXT(bgp, bgp);
- if (set)
- bgp->wpkt_quanta = strtoul(num, NULL, 10);
- else
- bgp->wpkt_quanta = BGP_WRITE_PACKET_MAX;
+ if (set) {
+ uint32_t quanta = strtoul(num, NULL, 10);
+ atomic_store_explicit(&bgp->wpkt_quanta, quanta,
+ memory_order_relaxed);
+ } else {
+ atomic_store_explicit(&bgp->wpkt_quanta, BGP_WRITE_PACKET_MAX,
+ memory_order_relaxed);
+ }
+
+ return CMD_SUCCESS;
+}
+
+static int bgp_rpkt_quanta_config_vty(struct vty *vty, const char *num,
+ char set)
+{
+ VTY_DECLVAR_CONTEXT(bgp, bgp);
+
+ if (set) {
+ uint32_t quanta = strtoul(num, NULL, 10);
+ atomic_store_explicit(&bgp->rpkt_quanta, quanta,
+ memory_order_relaxed);
+ } else {
+ atomic_store_explicit(&bgp->rpkt_quanta, BGP_READ_PACKET_MAX,
+ memory_order_relaxed);
+ }
return CMD_SUCCESS;
}
void bgp_config_write_wpkt_quanta(struct vty *vty, struct bgp *bgp)
{
- if (bgp->wpkt_quanta != BGP_WRITE_PACKET_MAX)
- vty_out(vty, " write-quanta %d\n", bgp->wpkt_quanta);
+ uint32_t quanta =
+ atomic_load_explicit(&bgp->wpkt_quanta, memory_order_relaxed);
+ if (quanta != BGP_WRITE_PACKET_MAX)
+ vty_out(vty, " write-quanta %d%s", quanta, VTY_NEWLINE);
}
+void bgp_config_write_rpkt_quanta(struct vty *vty, struct bgp *bgp)
+{
+ uint32_t quanta =
+ atomic_load_explicit(&bgp->rpkt_quanta, memory_order_relaxed);
+ if (quanta != BGP_READ_PACKET_MAX)
+ vty_out(vty, " read-quanta %d%s", quanta, VTY_NEWLINE);
+}
-/* Update-delay configuration */
+/* Packet quanta configuration */
DEFUN (bgp_wpkt_quanta,
bgp_wpkt_quanta_cmd,
- "write-quanta (1-10000)",
+ "write-quanta (1-10)",
"How many packets to write to peer socket per run\n"
"Number of packets\n")
{
return bgp_wpkt_quanta_config_vty(vty, argv[idx_number]->arg, 1);
}
-/* Update-delay deconfiguration */
DEFUN (no_bgp_wpkt_quanta,
no_bgp_wpkt_quanta_cmd,
- "no write-quanta (1-10000)",
+ "no write-quanta (1-10)",
NO_STR
- "How many packets to write to peer socket per run\n"
+ "How many packets to write to peer socket per I/O cycle\n"
"Number of packets\n")
{
int idx_number = 2;
return bgp_wpkt_quanta_config_vty(vty, argv[idx_number]->arg, 0);
}
+DEFUN (bgp_rpkt_quanta,
+ bgp_rpkt_quanta_cmd,
+ "read-quanta (1-10)",
+ "How many packets to read from peer socket per I/O cycle\n"
+ "Number of packets\n")
+{
+ int idx_number = 1;
+ return bgp_rpkt_quanta_config_vty(vty, argv[idx_number]->arg, 1);
+}
+
+DEFUN (no_bgp_rpkt_quanta,
+ no_bgp_rpkt_quanta_cmd,
+ "no read-quanta (1-10)",
+ NO_STR
+ "How many packets to read from peer socket per I/O cycle\n"
+ "Number of packets\n")
+{
+ int idx_number = 2;
+ return bgp_rpkt_quanta_config_vty(vty, argv[idx_number]->arg, 0);
+}
+
void bgp_config_write_coalesce_time(struct vty *vty, struct bgp *bgp)
{
if (bgp->coalesce_time != BGP_DEFAULT_SUBGROUP_COALESCE_TIME)
install_element(BGP_NODE, &bgp_wpkt_quanta_cmd);
install_element(BGP_NODE, &no_bgp_wpkt_quanta_cmd);
+ install_element(BGP_NODE, &bgp_rpkt_quanta_cmd);
+ install_element(BGP_NODE, &no_bgp_rpkt_quanta_cmd);
install_element(BGP_NODE, &bgp_coalesce_time_cmd);
install_element(BGP_NODE, &no_bgp_coalesce_time_cmd);
*/
peer->obuf_work =
stream_new(BGP_MAX_PACKET_SIZE + BGP_MAX_PACKET_SIZE_OVERFLOW);
- peer->ibuf_work = stream_new(BGP_MAX_PACKET_SIZE * 5);
+ peer->ibuf_work = stream_new(BGP_MAX_PACKET_SIZE * BGP_READ_PACKET_MAX);
peer->scratch = stream_new(BGP_MAX_PACKET_SIZE);
bgp_sync_init(peer);
bgp->restart_time, &bgp->t_startup);
}
- bgp->wpkt_quanta = BGP_WRITE_PACKET_MAX;
+ atomic_store_explicit(&bgp->wpkt_quanta, BGP_WRITE_PACKET_MAX,
+ memory_order_relaxed);
+ atomic_store_explicit(&bgp->rpkt_quanta, BGP_READ_PACKET_MAX,
+ memory_order_relaxed);
bgp->coalesce_time = BGP_DEFAULT_SUBGROUP_COALESCE_TIME;
QOBJ_REG(bgp, bgp);
/* write quanta */
bgp_config_write_wpkt_quanta(vty, bgp);
+ /* read quanta */
+ bgp_config_write_rpkt_quanta(vty, bgp);
/* coalesce time */
bgp_config_write_coalesce_time(vty, bgp);