#define BGP_IO_TRANS_ERR (1 << 1) // EAGAIN or similar occurred
#define BGP_IO_FATAL_ERR (1 << 2) // some kind of fatal TCP error
-/* bgp_read() status codes */
-#define BGP_IO_READ_HEADER (1 << 3) // when read a full packet header
-#define BGP_IO_READ_FULLPACKET (1 << 4) // read a full packet
-
/* Start and stop routines for I/O pthread + control variables
* ------------------------------------------------------------------------ */
bool bgp_packet_write_thread_run = false;
listnode_delete(read_cancel, peer);
thread_add_read(fpt->master, bgp_process_reads, peer, peer->fd,
&peer->t_read);
+ thread_add_background(bm->master, bgp_process_packet, peer, 0,
+ &peer->t_process_packet);
SET_FLAG(peer->thread_flags, PEER_THREAD_READS_ON);
}
pthread_mutex_unlock(work_mtx);
/**
* Called from PTHREAD_IO when select() or poll() determines that the file
* descriptor is ready to be read from.
+ *
+ * We read as much data as possible, process as many packets as we can and
+ * place them on peer->ibuf for secondary processing by the main thread.
*/
static int bgp_process_reads(struct thread *thread)
{
- static struct peer *peer;
+ static struct peer *peer; // peer to read from
+ uint16_t status; // bgp_read status code
+ bool more = true; // whether we got more data
+ bool fatal = false; // whether fatal error occurred
+ bool added_pkt = false; // whether we pushed onto ->ibuf
+ bool header_valid = true; // whether header is valid
+
peer = THREAD_ARG(thread);
- uint16_t status;
if (peer->fd < 0)
return -1;
struct frr_pthread *fpt = frr_pthread_get(PTHREAD_IO);
- bool reschedule = true;
-
- // execute read
pthread_mutex_lock(&peer->io_mtx);
{
status = bgp_read(peer);
}
pthread_mutex_unlock(&peer->io_mtx);
- // check results of read
- bool header_valid = true;
+ /* error checking phase */
+ if (CHECK_FLAG(status, BGP_IO_TRANS_ERR)) {
+ /* no problem; just don't process packets */
+ more = false;
+ }
- if (CHECK_FLAG(status, BGP_IO_TRANS_ERR)) { /* no problem */
+ if (CHECK_FLAG(status, BGP_IO_FATAL_ERR)) {
+ /* problem; tear down session */
+ more = false;
+ fatal = true;
}
- if (CHECK_FLAG(status, BGP_IO_FATAL_ERR))
- reschedule = false; // problem
+ while (more) {
+ /* static buffer for transferring packets */
+ static unsigned char pktbuf[BGP_MAX_PACKET_SIZE];
+ /* shorter alias to peer's input buffer */
+ struct stream *ibw = peer->ibuf_work;
+ /* offset of start of current packet */
+ size_t offset = stream_get_getp(ibw);
+ /* packet size as given by header */
+ u_int16_t pktsize = 0;
+
+ /* check that we have enough data for a header */
+ if (STREAM_READABLE(ibw) < BGP_HEADER_SIZE)
+ break;
- if (CHECK_FLAG(status, BGP_IO_READ_HEADER)) {
+ /* validate header */
header_valid = validate_header(peer);
+
if (!header_valid) {
- bgp_size_t packetsize =
- MIN((int)stream_get_endp(peer->ibuf_work),
- BGP_MAX_PACKET_SIZE);
- memcpy(peer->last_reset_cause, peer->ibuf_work->data,
- packetsize);
- peer->last_reset_cause_size = packetsize;
- // We're tearing the session down, no point in
- // rescheduling.
- // Additionally, bgp_read() will use the TLV if it's
- // present to
- // determine how much to read; if this is corrupt, we'll
- // crash the
- // program.
- reschedule = false;
+ fatal = true;
+ break;
}
- }
- // if we read a full packet, push it onto peer->ibuf, reset our WiP
- // buffer
- // and schedule a job to process it on the main thread
- if (header_valid && CHECK_FLAG(status, BGP_IO_READ_FULLPACKET)) {
- pthread_mutex_lock(&peer->io_mtx);
- {
- stream_fifo_push(peer->ibuf,
- stream_dup(peer->ibuf_work));
- }
- pthread_mutex_unlock(&peer->io_mtx);
- stream_reset(peer->ibuf_work);
- assert(stream_get_endp(peer->ibuf_work) == 0);
+ /* header is valid; retrieve packet size */
+ pktsize = stream_getw_from(ibw, offset + BGP_MARKER_SIZE);
- thread_add_background(bm->master, bgp_process_packet, peer, 0,
- &peer->t_process_packet);
+ /* if this fails we are seriously screwed */
+ assert(pktsize <= BGP_MAX_PACKET_SIZE);
+
+ /* If we have that much data, chuck it into its own
+ * stream and append to input queue for processing. */
+ if (STREAM_READABLE(ibw) >= pktsize) {
+ struct stream *pkt = stream_new(pktsize);
+ stream_get(pktbuf, ibw, pktsize);
+ stream_put(pkt, pktbuf, pktsize);
+
+ pthread_mutex_lock(&peer->io_mtx);
+ {
+ stream_fifo_push(peer->ibuf, pkt);
+ }
+ pthread_mutex_unlock(&peer->io_mtx);
+
+ added_pkt = true;
+ } else
+ break;
+ }
+
+ /* After reading:
+ * 1. Move unread data to stream start to make room for more.
+ * 2. Reschedule and return when we have additional data.
+ *
+ * XXX: Heavy abuse of stream API. This needs a ring buffer.
+ */
+ if (more && STREAM_WRITEABLE(peer->ibuf_work) < BGP_MAX_PACKET_SIZE) {
+ void *from = stream_pnt(peer->ibuf_work);
+ void *to = peer->ibuf_work->data;
+ size_t siz = STREAM_READABLE(peer->ibuf_work);
+ memmove(to, from, siz);
+ stream_set_getp(peer->ibuf_work, 0);
+ stream_set_endp(peer->ibuf_work, siz);
}
- if (reschedule)
+ assert(STREAM_WRITEABLE(peer->ibuf_work) >= BGP_MAX_PACKET_SIZE);
+
+ /* handle invalid header */
+ if (fatal) {
+ if (!header_valid) {
+ bgp_size_t pktsize = BGP_HEADER_SIZE;
+ stream_get(peer->last_reset_cause, peer->ibuf_work,
+ pktsize);
+ peer->last_reset_cause_size = pktsize;
+ }
+
+ /* wipe buffer just in case someone screwed up */
+ stream_reset(peer->ibuf_work);
+ } else {
thread_add_read(fpt->master, bgp_process_reads, peer, peer->fd,
&peer->t_read);
+ if (added_pkt)
+ thread_add_event(bm->master, bgp_process_packet, peer,
+ 0, NULL);
+ // thread_add_background(bm->master,
+ // bgp_process_packet, peer,
+ // 0, NULL);
+ }
return 0;
}
{
int readsize; // how many bytes we want to read
int nbytes; // how many bytes we actually read
- bool have_header = false;
uint16_t status = 0;
- if (stream_get_endp(peer->ibuf_work) < BGP_HEADER_SIZE)
- readsize = BGP_HEADER_SIZE - stream_get_endp(peer->ibuf_work);
- else {
- // retrieve packet length from tlv and compute # bytes we still
- // need
- u_int16_t mlen =
- stream_getw_from(peer->ibuf_work, BGP_MARKER_SIZE);
- readsize = mlen - stream_get_endp(peer->ibuf_work);
- have_header = true;
- }
+ readsize = STREAM_WRITEABLE(peer->ibuf_work);
nbytes = stream_read_try(peer->ibuf_work, peer->fd, readsize);
return status;
}
- // If we didn't have the header before read(), and now we do, set the
- // appropriate flag. The caller must validate the header for us.
- if (!have_header
- && stream_get_endp(peer->ibuf_work) >= BGP_HEADER_SIZE) {
- SET_FLAG(status, BGP_IO_READ_HEADER);
- have_header = true;
- }
- // If we read the # of bytes specified in the tlv, we have read a full
- // packet.
- //
- // Note that the header may not have been validated here. This flag
- // means
- // ONLY that we read the # of bytes specified in the header; if the
- // header is
- // not valid, the packet MUST NOT be processed further.
- if (have_header && (stream_getw_from(peer->ibuf_work, BGP_MARKER_SIZE)
- == stream_get_endp(peer->ibuf_work)))
- SET_FLAG(status, BGP_IO_READ_FULLPACKET);
-
return status;
}
static bool validate_header(struct peer *peer)
{
u_int16_t size, type;
+ struct stream *pkt = peer->ibuf_work;
+ size_t getp = stream_get_getp(pkt);
static uint8_t marker[BGP_MARKER_SIZE] = {
0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff,
0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff};
- if (memcmp(marker, peer->ibuf_work->data, BGP_MARKER_SIZE) != 0) {
+ if (memcmp(marker, stream_pnt(pkt), BGP_MARKER_SIZE) != 0) {
bgp_notify_send(peer, BGP_NOTIFY_HEADER_ERR,
BGP_NOTIFY_HEADER_NOT_SYNC);
return false;
}
/* Get size and type. */
- size = stream_getw_from(peer->ibuf_work, BGP_MARKER_SIZE);
- type = stream_getc_from(peer->ibuf_work, BGP_MARKER_SIZE + 2);
+ size = stream_getw_from(pkt, getp + BGP_MARKER_SIZE);
+ type = stream_getc_from(pkt, getp + BGP_MARKER_SIZE + 2);
/* BGP type check. */
if (type != BGP_MSG_OPEN && type != BGP_MSG_UPDATE
peer = THREAD_ARG(thread);
/* Guard against scheduled events that occur after peer deletion. */
- if (peer->status == Deleted)
+ if (peer->status == Deleted || peer->status == Clearing)
return 0;
- u_char type = 0;
- bgp_size_t size;
- char notify_data_length[2];
- u_int32_t notify_out;
+ int processed = 0;
- /* Note notify_out so we can check later to see if we sent another one
- */
- notify_out = peer->notify_out;
+ while (processed < 5 && peer->ibuf->count > 0) {
+ u_char type = 0;
+ bgp_size_t size;
+ char notify_data_length[2];
+ u_int32_t notify_out;
- pthread_mutex_lock(&peer->io_mtx);
- {
- peer->curr = stream_fifo_pop(peer->ibuf);
- }
- pthread_mutex_unlock(&peer->io_mtx);
+ /* Note notify_out so we can check later to see if we sent
+ * another one */
+ notify_out = peer->notify_out;
- if (peer->curr == NULL) // no packets to process, hmm...
- return 0;
+ pthread_mutex_lock(&peer->io_mtx);
+ {
+ peer->curr = stream_fifo_pop(peer->ibuf);
+ }
+ pthread_mutex_unlock(&peer->io_mtx);
- bgp_size_t actual_size = stream_get_endp(peer->curr);
-
- /* skip the marker and copy the packet length */
- stream_forward_getp(peer->curr, BGP_MARKER_SIZE);
- memcpy(notify_data_length, stream_pnt(peer->curr), 2);
-
- /* read in the packet length and type */
- size = stream_getw(peer->curr);
- type = stream_getc(peer->curr);
-
- /* BGP packet dump function. */
- bgp_dump_packet(peer, type, peer->curr);
-
- /* adjust size to exclude the marker + length + type */
- size -= BGP_HEADER_SIZE;
-
- /* Read rest of the packet and call each sort of packet routine */
- switch (type) {
- case BGP_MSG_OPEN:
- peer->open_in++;
- bgp_open_receive(peer, size); /* XXX return value ignored! */
- break;
- case BGP_MSG_UPDATE:
- peer->readtime = monotime(NULL);
- bgp_update_receive(peer, size);
- break;
- case BGP_MSG_NOTIFY:
- bgp_notify_receive(peer, size);
- break;
- case BGP_MSG_KEEPALIVE:
- peer->readtime = monotime(NULL);
- bgp_keepalive_receive(peer, size);
- break;
- case BGP_MSG_ROUTE_REFRESH_NEW:
- case BGP_MSG_ROUTE_REFRESH_OLD:
- peer->refresh_in++;
- bgp_route_refresh_receive(peer, size);
- break;
- case BGP_MSG_CAPABILITY:
- peer->dynamic_cap_in++;
- bgp_capability_receive(peer, size);
- break;
- }
+ if (peer->curr == NULL) // no packets to process, hmm...
+ return 0;
- /* If reading this packet caused us to send a NOTIFICATION then store a
- * copy
- * of the packet for troubleshooting purposes
- */
- if (notify_out < peer->notify_out) {
- memcpy(peer->last_reset_cause, peer->curr->data, actual_size);
- peer->last_reset_cause_size = actual_size;
+ bgp_size_t actual_size = stream_get_endp(peer->curr);
+
+ /* skip the marker and copy the packet length */
+ stream_forward_getp(peer->curr, BGP_MARKER_SIZE);
+ memcpy(notify_data_length, stream_pnt(peer->curr), 2);
+
+ /* read in the packet length and type */
+ size = stream_getw(peer->curr);
+ type = stream_getc(peer->curr);
+
+ /* BGP packet dump function. */
+ bgp_dump_packet(peer, type, peer->curr);
+
+ /* adjust size to exclude the marker + length + type */
+ size -= BGP_HEADER_SIZE;
+
+ /* Read rest of the packet and call each sort of packet routine
+ */
+ switch (type) {
+ case BGP_MSG_OPEN:
+ peer->open_in++;
+ bgp_open_receive(peer,
+ size); /* XXX return value ignored! */
+ break;
+ case BGP_MSG_UPDATE:
+ peer->readtime = monotime(NULL);
+ bgp_update_receive(peer, size);
+ break;
+ case BGP_MSG_NOTIFY:
+ bgp_notify_receive(peer, size);
+ break;
+ case BGP_MSG_KEEPALIVE:
+ peer->readtime = monotime(NULL);
+ bgp_keepalive_receive(peer, size);
+ break;
+ case BGP_MSG_ROUTE_REFRESH_NEW:
+ case BGP_MSG_ROUTE_REFRESH_OLD:
+ peer->refresh_in++;
+ bgp_route_refresh_receive(peer, size);
+ break;
+ case BGP_MSG_CAPABILITY:
+ peer->dynamic_cap_in++;
+ bgp_capability_receive(peer, size);
+ break;
+ }
+
+ /* If reading this packet caused us to send a NOTIFICATION then
+ * store a copy
+ * of the packet for troubleshooting purposes
+ */
+ if (notify_out < peer->notify_out) {
+ memcpy(peer->last_reset_cause, peer->curr->data,
+ actual_size);
+ peer->last_reset_cause_size = actual_size;
+ }
+
+ /* Delete packet and carry on. */
+ if (peer->curr) {
+ stream_free(peer->curr);
+ peer->curr = NULL;
+ processed++;
+ }
}
- /* Delete packet and carry on. */
- if (peer->curr) {
- stream_free(peer->curr);
- peer->curr = NULL;
+ if (peer->ibuf->count > 0) { // more work to do, come back later
+ thread_add_background(bm->master, bgp_process_packet, peer, 0,
+ &peer->t_process_packet);
}
return 0;