summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--bgpd/bgp_fsm.c8
-rw-r--r--bgpd/bgp_io.c90
-rw-r--r--bgpd/bgpd.c7
-rw-r--r--bgpd/bgpd.h4
-rw-r--r--bgpd/rfapi/rfapi.c3
-rw-r--r--bgpd/rfapi/vnc_zebra.c3
6 files changed, 55 insertions, 60 deletions
diff --git a/bgpd/bgp_fsm.c b/bgpd/bgp_fsm.c
index 9e58e466e1..831aca1e35 100644
--- a/bgpd/bgp_fsm.c
+++ b/bgpd/bgp_fsm.c
@@ -27,6 +27,7 @@
#include "thread.h"
#include "log.h"
#include "stream.h"
+#include "ringbuf.h"
#include "memory.h"
#include "plist.h"
#include "workqueue.h"
@@ -155,7 +156,6 @@ static struct peer *peer_xfer_conn(struct peer *from_peer)
stream_fifo_clean(peer->ibuf);
stream_fifo_clean(peer->obuf);
- stream_reset(peer->ibuf_work);
/*
* this should never happen, since bgp_process_packet() is the
@@ -183,7 +183,9 @@ static struct peer *peer_xfer_conn(struct peer *from_peer)
stream_fifo_push(peer->ibuf,
stream_fifo_pop(from_peer->ibuf));
- stream_copy(peer->ibuf_work, from_peer->ibuf_work);
+ ringbuf_wipe(peer->ibuf_work);
+ ringbuf_copy(peer->ibuf_work, from_peer->ibuf_work,
+ ringbuf_remain(from_peer->ibuf_work));
}
pthread_mutex_unlock(&from_peer->io_mtx);
pthread_mutex_unlock(&peer->io_mtx);
@@ -1097,7 +1099,7 @@ int bgp_stop(struct peer *peer)
stream_fifo_clean(peer->obuf);
if (peer->ibuf_work)
- stream_reset(peer->ibuf_work);
+ ringbuf_wipe(peer->ibuf_work);
if (peer->obuf_work)
stream_reset(peer->obuf_work);
diff --git a/bgpd/bgp_io.c b/bgpd/bgp_io.c
index 548167b3a3..24f55a66f6 100644
--- a/bgpd/bgp_io.c
+++ b/bgpd/bgp_io.c
@@ -29,6 +29,7 @@
#include "memory.h" // for MTYPE_TMP, XCALLOC, XFREE
#include "network.h" // for ERRNO_IO_RETRY
#include "stream.h" // for stream_get_endp, stream_getw_from, str...
+#include "ringbuf.h" // for ringbuf_remain, ringbuf_peek, ringbuf_...
#include "thread.h" // for THREAD_OFF, THREAD_ARG, thread, thread...
#include "zassert.h" // for assert
@@ -273,14 +274,12 @@ static int bgp_process_reads(struct thread *thread)
/* static buffer for transferring packets */
static unsigned char pktbuf[BGP_MAX_PACKET_SIZE];
/* shorter alias to peer's input buffer */
- struct stream *ibw = peer->ibuf_work;
- /* offset of start of current packet */
- size_t offset = stream_get_getp(ibw);
+ struct ringbuf *ibw = peer->ibuf_work;
/* packet size as given by header */
- u_int16_t pktsize = 0;
+ uint16_t pktsize = 0;
/* check that we have enough data for a header */
- if (STREAM_READABLE(ibw) < BGP_HEADER_SIZE)
+ if (ringbuf_remain(ibw) < BGP_HEADER_SIZE)
break;
/* validate header */
@@ -292,16 +291,18 @@ static int bgp_process_reads(struct thread *thread)
}
/* header is valid; retrieve packet size */
- pktsize = stream_getw_from(ibw, offset + BGP_MARKER_SIZE);
+ ringbuf_peek(ibw, BGP_MARKER_SIZE, &pktsize, sizeof(pktsize));
+
+ pktsize = ntohs(pktsize);
/* if this fails we are seriously screwed */
assert(pktsize <= BGP_MAX_PACKET_SIZE);
/* If we have that much data, chuck it into its own
* stream and append to input queue for processing. */
- if (STREAM_READABLE(ibw) >= pktsize) {
+ if (ringbuf_remain(ibw) >= pktsize) {
struct stream *pkt = stream_new(pktsize);
- stream_get(pktbuf, ibw, pktsize);
+ assert(ringbuf_get(ibw, pktbuf, pktsize) == pktsize);
stream_put(pkt, pktbuf, pktsize);
pthread_mutex_lock(&peer->io_mtx);
@@ -315,28 +316,12 @@ static int bgp_process_reads(struct thread *thread)
break;
}
- /*
- * After reading:
- * 1. Move unread data to stream start to make room for more.
- * 2. Reschedule and return when we have additional data.
- *
- * XXX: Heavy abuse of stream API. This needs a ring buffer.
- */
- if (more && STREAM_WRITEABLE(peer->ibuf_work) < BGP_MAX_PACKET_SIZE) {
- void *from = stream_pnt(peer->ibuf_work);
- void *to = peer->ibuf_work->data;
- size_t siz = STREAM_READABLE(peer->ibuf_work);
- memmove(to, from, siz);
- stream_set_getp(peer->ibuf_work, 0);
- stream_set_endp(peer->ibuf_work, siz);
- }
-
- assert(STREAM_WRITEABLE(peer->ibuf_work) >= BGP_MAX_PACKET_SIZE);
+ assert(ringbuf_space(peer->ibuf_work) >= BGP_MAX_PACKET_SIZE);
/* handle invalid header */
if (fatal) {
/* wipe buffer just in case someone screwed up */
- stream_reset(peer->ibuf_work);
+ ringbuf_wipe(peer->ibuf_work);
} else {
thread_add_read(fpt->master, bgp_process_reads, peer, peer->fd,
&peer->t_read);
@@ -474,14 +459,16 @@ static uint16_t bgp_read(struct peer *peer)
size_t readsize; // how many bytes we want to read
ssize_t nbytes; // how many bytes we actually read
uint16_t status = 0;
+ static uint8_t ibw[BGP_MAX_PACKET_SIZE * BGP_READ_PACKET_MAX];
- readsize = STREAM_WRITEABLE(peer->ibuf_work);
-
- nbytes = stream_read_try(peer->ibuf_work, peer->fd, readsize);
+ readsize = MIN(ringbuf_space(peer->ibuf_work), sizeof(ibw));
+ nbytes = read(peer->fd, ibw, readsize);
- switch (nbytes) {
+ /* EAGAIN or EWOULDBLOCK; come back later */
+ if (nbytes < 0 && ERRNO_IO_RETRY(errno)) {
+ SET_FLAG(status, BGP_IO_TRANS_ERR);
/* Fatal error; tear down session */
- case -1:
+ } else if (nbytes < 0) {
zlog_err("%s [Error] bgp_read_packet error: %s", peer->host,
safe_strerror(errno));
@@ -495,10 +482,8 @@ static uint16_t bgp_read(struct peer *peer)
BGP_EVENT_ADD(peer, TCP_fatal_error);
SET_FLAG(status, BGP_IO_FATAL_ERR);
- break;
-
/* Received EOF / TCP session closed */
- case 0:
+ } else if (nbytes == 0) {
if (bgp_debug_neighbor_events(peer))
zlog_debug("%s [Event] BGP connection closed fd %d",
peer->host, peer->fd);
@@ -513,14 +498,9 @@ static uint16_t bgp_read(struct peer *peer)
BGP_EVENT_ADD(peer, TCP_connection_closed);
SET_FLAG(status, BGP_IO_FATAL_ERR);
- break;
-
- /* EAGAIN or EWOULDBLOCK; come back later */
- case -2:
- SET_FLAG(status, BGP_IO_TRANS_ERR);
- break;
- default:
- break;
+ } else {
+ assert(ringbuf_put(peer->ibuf_work, ibw, nbytes)
+ == (size_t)nbytes);
}
return status;
@@ -529,27 +509,35 @@ static uint16_t bgp_read(struct peer *peer)
/*
* Called after we have read a BGP packet header. Validates marker, message
* type and packet length. If any of these aren't correct, sends a notify.
+ *
+ * Assumes that there are at least BGP_HEADER_SIZE readable bytes in the input
+ * buffer.
*/
static bool validate_header(struct peer *peer)
{
uint16_t size;
uint8_t type;
- struct stream *pkt = peer->ibuf_work;
- size_t getp = stream_get_getp(pkt);
+ struct ringbuf *pkt = peer->ibuf_work;
- static uint8_t marker[BGP_MARKER_SIZE] = {
- 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff,
- 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff};
+ static uint8_t m_correct[BGP_MARKER_SIZE] = {
+ 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff,
+ 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff};
+ uint8_t m_rx[BGP_MARKER_SIZE] = {0x00};
- if (memcmp(marker, stream_pnt(pkt), BGP_MARKER_SIZE) != 0) {
+ if (ringbuf_peek(pkt, 0, m_rx, BGP_MARKER_SIZE) != BGP_MARKER_SIZE)
+ return false;
+
+ if (memcmp(m_correct, m_rx, BGP_MARKER_SIZE) != 0) {
bgp_notify_send(peer, BGP_NOTIFY_HEADER_ERR,
BGP_NOTIFY_HEADER_NOT_SYNC);
return false;
}
- /* Get size and type in host byte order. */
- size = stream_getw_from(pkt, getp + BGP_MARKER_SIZE);
- type = stream_getc_from(pkt, getp + BGP_MARKER_SIZE + 2);
+ /* Get size and type in network byte order. */
+ ringbuf_peek(pkt, BGP_MARKER_SIZE, &size, sizeof(size));
+ ringbuf_peek(pkt, BGP_MARKER_SIZE + 2, &type, sizeof(type));
+
+ size = ntohs(size);
/* BGP type check. */
if (type != BGP_MSG_OPEN && type != BGP_MSG_UPDATE
diff --git a/bgpd/bgpd.c b/bgpd/bgpd.c
index 6dcb603cb6..7b29820db1 100644
--- a/bgpd/bgpd.c
+++ b/bgpd/bgpd.c
@@ -24,6 +24,7 @@
#include "thread.h"
#include "buffer.h"
#include "stream.h"
+#include "ringbuf.h"
#include "command.h"
#include "sockunion.h"
#include "sockopt.h"
@@ -1162,7 +1163,9 @@ struct peer *peer_new(struct bgp *bgp)
*/
peer->obuf_work =
stream_new(BGP_MAX_PACKET_SIZE + BGP_MAX_PACKET_SIZE_OVERFLOW);
- peer->ibuf_work = stream_new(BGP_MAX_PACKET_SIZE * BGP_READ_PACKET_MAX);
+ peer->ibuf_work =
+ ringbuf_new(BGP_MAX_PACKET_SIZE * BGP_READ_PACKET_MAX);
+
peer->scratch = stream_new(BGP_MAX_PACKET_SIZE);
bgp_sync_init(peer);
@@ -2179,7 +2182,7 @@ int peer_delete(struct peer *peer)
}
if (peer->ibuf_work) {
- stream_free(peer->ibuf_work);
+ ringbuf_del(peer->ibuf_work);
peer->ibuf_work = NULL;
}
diff --git a/bgpd/bgpd.h b/bgpd/bgpd.h
index 6bb49e0c53..ee829f33d1 100644
--- a/bgpd/bgpd.h
+++ b/bgpd/bgpd.h
@@ -598,8 +598,8 @@ struct peer {
struct stream_fifo *ibuf; // packets waiting to be processed
struct stream_fifo *obuf; // packets waiting to be written
- struct stream *ibuf_work; // WiP buffer used by bgp_read() only
- struct stream *obuf_work; // WiP buffer used to construct packets
+ struct ringbuf *ibuf_work; // WiP buffer used by bgp_read() only
+ struct stream *obuf_work; // WiP buffer used to construct packets
struct stream *curr; // the current packet being parsed
diff --git a/bgpd/rfapi/rfapi.c b/bgpd/rfapi/rfapi.c
index 5ba7a96a8f..1e3c5a0352 100644
--- a/bgpd/rfapi/rfapi.c
+++ b/bgpd/rfapi/rfapi.c
@@ -31,6 +31,7 @@
#include "lib/linklist.h"
#include "lib/command.h"
#include "lib/stream.h"
+#include "lib/ringbuf.h"
#include "bgpd/bgpd.h"
#include "bgpd/bgp_ecommunity.h"
@@ -1310,7 +1311,7 @@ static int rfapi_open_inner(struct rfapi_descriptor *rfd, struct bgp *bgp,
stream_fifo_free(rfd->peer->obuf);
if (rfd->peer->ibuf_work)
- stream_free(rfd->peer->ibuf_work);
+ ringbuf_del(rfd->peer->ibuf_work);
if (rfd->peer->obuf_work)
stream_free(rfd->peer->obuf_work);
diff --git a/bgpd/rfapi/vnc_zebra.c b/bgpd/rfapi/vnc_zebra.c
index 07be7833b6..f8b38468f5 100644
--- a/bgpd/rfapi/vnc_zebra.c
+++ b/bgpd/rfapi/vnc_zebra.c
@@ -30,6 +30,7 @@
#include "lib/command.h"
#include "lib/zclient.h"
#include "lib/stream.h"
+#include "lib/ringbuf.h"
#include "lib/memory.h"
#include "bgpd/bgpd.h"
@@ -198,7 +199,7 @@ static void vnc_redistribute_add(struct prefix *p, u_int32_t metric,
stream_fifo_free(vncHD1VR.peer->obuf);
if (vncHD1VR.peer->ibuf_work)
- stream_free(vncHD1VR.peer->ibuf_work);
+ ringbuf_del(vncHD1VR.peer->ibuf_work);
if (vncHD1VR.peer->obuf_work)
stream_free(vncHD1VR.peer->obuf_work);