diff options
| -rw-r--r-- | bgpd/bgp_fsm.c | 8 | ||||
| -rw-r--r-- | bgpd/bgp_io.c | 90 | ||||
| -rw-r--r-- | bgpd/bgpd.c | 7 | ||||
| -rw-r--r-- | bgpd/bgpd.h | 4 | ||||
| -rw-r--r-- | bgpd/rfapi/rfapi.c | 3 | ||||
| -rw-r--r-- | bgpd/rfapi/vnc_zebra.c | 3 |
6 files changed, 55 insertions, 60 deletions
diff --git a/bgpd/bgp_fsm.c b/bgpd/bgp_fsm.c index 9e58e466e1..831aca1e35 100644 --- a/bgpd/bgp_fsm.c +++ b/bgpd/bgp_fsm.c @@ -27,6 +27,7 @@ #include "thread.h" #include "log.h" #include "stream.h" +#include "ringbuf.h" #include "memory.h" #include "plist.h" #include "workqueue.h" @@ -155,7 +156,6 @@ static struct peer *peer_xfer_conn(struct peer *from_peer) stream_fifo_clean(peer->ibuf); stream_fifo_clean(peer->obuf); - stream_reset(peer->ibuf_work); /* * this should never happen, since bgp_process_packet() is the @@ -183,7 +183,9 @@ static struct peer *peer_xfer_conn(struct peer *from_peer) stream_fifo_push(peer->ibuf, stream_fifo_pop(from_peer->ibuf)); - stream_copy(peer->ibuf_work, from_peer->ibuf_work); + ringbuf_wipe(peer->ibuf_work); + ringbuf_copy(peer->ibuf_work, from_peer->ibuf_work, + ringbuf_remain(from_peer->ibuf_work)); } pthread_mutex_unlock(&from_peer->io_mtx); pthread_mutex_unlock(&peer->io_mtx); @@ -1097,7 +1099,7 @@ int bgp_stop(struct peer *peer) stream_fifo_clean(peer->obuf); if (peer->ibuf_work) - stream_reset(peer->ibuf_work); + ringbuf_wipe(peer->ibuf_work); if (peer->obuf_work) stream_reset(peer->obuf_work); diff --git a/bgpd/bgp_io.c b/bgpd/bgp_io.c index 548167b3a3..24f55a66f6 100644 --- a/bgpd/bgp_io.c +++ b/bgpd/bgp_io.c @@ -29,6 +29,7 @@ #include "memory.h" // for MTYPE_TMP, XCALLOC, XFREE #include "network.h" // for ERRNO_IO_RETRY #include "stream.h" // for stream_get_endp, stream_getw_from, str... +#include "ringbuf.h" // for ringbuf_remain, ringbuf_peek, ringbuf_... #include "thread.h" // for THREAD_OFF, THREAD_ARG, thread, thread... #include "zassert.h" // for assert @@ -273,14 +274,12 @@ static int bgp_process_reads(struct thread *thread) /* static buffer for transferring packets */ static unsigned char pktbuf[BGP_MAX_PACKET_SIZE]; /* shorter alias to peer's input buffer */ - struct stream *ibw = peer->ibuf_work; - /* offset of start of current packet */ - size_t offset = stream_get_getp(ibw); + struct ringbuf *ibw = peer->ibuf_work; /* packet size as given by header */ - u_int16_t pktsize = 0; + uint16_t pktsize = 0; /* check that we have enough data for a header */ - if (STREAM_READABLE(ibw) < BGP_HEADER_SIZE) + if (ringbuf_remain(ibw) < BGP_HEADER_SIZE) break; /* validate header */ @@ -292,16 +291,18 @@ static int bgp_process_reads(struct thread *thread) } /* header is valid; retrieve packet size */ - pktsize = stream_getw_from(ibw, offset + BGP_MARKER_SIZE); + ringbuf_peek(ibw, BGP_MARKER_SIZE, &pktsize, sizeof(pktsize)); + + pktsize = ntohs(pktsize); /* if this fails we are seriously screwed */ assert(pktsize <= BGP_MAX_PACKET_SIZE); /* If we have that much data, chuck it into its own * stream and append to input queue for processing. */ - if (STREAM_READABLE(ibw) >= pktsize) { + if (ringbuf_remain(ibw) >= pktsize) { struct stream *pkt = stream_new(pktsize); - stream_get(pktbuf, ibw, pktsize); + assert(ringbuf_get(ibw, pktbuf, pktsize) == pktsize); stream_put(pkt, pktbuf, pktsize); pthread_mutex_lock(&peer->io_mtx); @@ -315,28 +316,12 @@ static int bgp_process_reads(struct thread *thread) break; } - /* - * After reading: - * 1. Move unread data to stream start to make room for more. - * 2. Reschedule and return when we have additional data. - * - * XXX: Heavy abuse of stream API. This needs a ring buffer. - */ - if (more && STREAM_WRITEABLE(peer->ibuf_work) < BGP_MAX_PACKET_SIZE) { - void *from = stream_pnt(peer->ibuf_work); - void *to = peer->ibuf_work->data; - size_t siz = STREAM_READABLE(peer->ibuf_work); - memmove(to, from, siz); - stream_set_getp(peer->ibuf_work, 0); - stream_set_endp(peer->ibuf_work, siz); - } - - assert(STREAM_WRITEABLE(peer->ibuf_work) >= BGP_MAX_PACKET_SIZE); + assert(ringbuf_space(peer->ibuf_work) >= BGP_MAX_PACKET_SIZE); /* handle invalid header */ if (fatal) { /* wipe buffer just in case someone screwed up */ - stream_reset(peer->ibuf_work); + ringbuf_wipe(peer->ibuf_work); } else { thread_add_read(fpt->master, bgp_process_reads, peer, peer->fd, &peer->t_read); @@ -474,14 +459,16 @@ static uint16_t bgp_read(struct peer *peer) size_t readsize; // how many bytes we want to read ssize_t nbytes; // how many bytes we actually read uint16_t status = 0; + static uint8_t ibw[BGP_MAX_PACKET_SIZE * BGP_READ_PACKET_MAX]; - readsize = STREAM_WRITEABLE(peer->ibuf_work); - - nbytes = stream_read_try(peer->ibuf_work, peer->fd, readsize); + readsize = MIN(ringbuf_space(peer->ibuf_work), sizeof(ibw)); + nbytes = read(peer->fd, ibw, readsize); - switch (nbytes) { + /* EAGAIN or EWOULDBLOCK; come back later */ + if (nbytes < 0 && ERRNO_IO_RETRY(errno)) { + SET_FLAG(status, BGP_IO_TRANS_ERR); /* Fatal error; tear down session */ - case -1: + } else if (nbytes < 0) { zlog_err("%s [Error] bgp_read_packet error: %s", peer->host, safe_strerror(errno)); @@ -495,10 +482,8 @@ static uint16_t bgp_read(struct peer *peer) BGP_EVENT_ADD(peer, TCP_fatal_error); SET_FLAG(status, BGP_IO_FATAL_ERR); - break; - /* Received EOF / TCP session closed */ - case 0: + } else if (nbytes == 0) { if (bgp_debug_neighbor_events(peer)) zlog_debug("%s [Event] BGP connection closed fd %d", peer->host, peer->fd); @@ -513,14 +498,9 @@ static uint16_t bgp_read(struct peer *peer) BGP_EVENT_ADD(peer, TCP_connection_closed); SET_FLAG(status, BGP_IO_FATAL_ERR); - break; - - /* EAGAIN or EWOULDBLOCK; come back later */ - case -2: - SET_FLAG(status, BGP_IO_TRANS_ERR); - break; - default: - break; + } else { + assert(ringbuf_put(peer->ibuf_work, ibw, nbytes) + == (size_t)nbytes); } return status; @@ -529,27 +509,35 @@ static uint16_t bgp_read(struct peer *peer) /* * Called after we have read a BGP packet header. Validates marker, message * type and packet length. If any of these aren't correct, sends a notify. + * + * Assumes that there are at least BGP_HEADER_SIZE readable bytes in the input + * buffer. */ static bool validate_header(struct peer *peer) { uint16_t size; uint8_t type; - struct stream *pkt = peer->ibuf_work; - size_t getp = stream_get_getp(pkt); + struct ringbuf *pkt = peer->ibuf_work; - static uint8_t marker[BGP_MARKER_SIZE] = { - 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, - 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff}; + static uint8_t m_correct[BGP_MARKER_SIZE] = { + 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, + 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff}; + uint8_t m_rx[BGP_MARKER_SIZE] = {0x00}; - if (memcmp(marker, stream_pnt(pkt), BGP_MARKER_SIZE) != 0) { + if (ringbuf_peek(pkt, 0, m_rx, BGP_MARKER_SIZE) != BGP_MARKER_SIZE) + return false; + + if (memcmp(m_correct, m_rx, BGP_MARKER_SIZE) != 0) { bgp_notify_send(peer, BGP_NOTIFY_HEADER_ERR, BGP_NOTIFY_HEADER_NOT_SYNC); return false; } - /* Get size and type in host byte order. */ - size = stream_getw_from(pkt, getp + BGP_MARKER_SIZE); - type = stream_getc_from(pkt, getp + BGP_MARKER_SIZE + 2); + /* Get size and type in network byte order. */ + ringbuf_peek(pkt, BGP_MARKER_SIZE, &size, sizeof(size)); + ringbuf_peek(pkt, BGP_MARKER_SIZE + 2, &type, sizeof(type)); + + size = ntohs(size); /* BGP type check. */ if (type != BGP_MSG_OPEN && type != BGP_MSG_UPDATE diff --git a/bgpd/bgpd.c b/bgpd/bgpd.c index 6dcb603cb6..7b29820db1 100644 --- a/bgpd/bgpd.c +++ b/bgpd/bgpd.c @@ -24,6 +24,7 @@ #include "thread.h" #include "buffer.h" #include "stream.h" +#include "ringbuf.h" #include "command.h" #include "sockunion.h" #include "sockopt.h" @@ -1162,7 +1163,9 @@ struct peer *peer_new(struct bgp *bgp) */ peer->obuf_work = stream_new(BGP_MAX_PACKET_SIZE + BGP_MAX_PACKET_SIZE_OVERFLOW); - peer->ibuf_work = stream_new(BGP_MAX_PACKET_SIZE * BGP_READ_PACKET_MAX); + peer->ibuf_work = + ringbuf_new(BGP_MAX_PACKET_SIZE * BGP_READ_PACKET_MAX); + peer->scratch = stream_new(BGP_MAX_PACKET_SIZE); bgp_sync_init(peer); @@ -2179,7 +2182,7 @@ int peer_delete(struct peer *peer) } if (peer->ibuf_work) { - stream_free(peer->ibuf_work); + ringbuf_del(peer->ibuf_work); peer->ibuf_work = NULL; } diff --git a/bgpd/bgpd.h b/bgpd/bgpd.h index 6bb49e0c53..ee829f33d1 100644 --- a/bgpd/bgpd.h +++ b/bgpd/bgpd.h @@ -598,8 +598,8 @@ struct peer { struct stream_fifo *ibuf; // packets waiting to be processed struct stream_fifo *obuf; // packets waiting to be written - struct stream *ibuf_work; // WiP buffer used by bgp_read() only - struct stream *obuf_work; // WiP buffer used to construct packets + struct ringbuf *ibuf_work; // WiP buffer used by bgp_read() only + struct stream *obuf_work; // WiP buffer used to construct packets struct stream *curr; // the current packet being parsed diff --git a/bgpd/rfapi/rfapi.c b/bgpd/rfapi/rfapi.c index 5ba7a96a8f..1e3c5a0352 100644 --- a/bgpd/rfapi/rfapi.c +++ b/bgpd/rfapi/rfapi.c @@ -31,6 +31,7 @@ #include "lib/linklist.h" #include "lib/command.h" #include "lib/stream.h" +#include "lib/ringbuf.h" #include "bgpd/bgpd.h" #include "bgpd/bgp_ecommunity.h" @@ -1310,7 +1311,7 @@ static int rfapi_open_inner(struct rfapi_descriptor *rfd, struct bgp *bgp, stream_fifo_free(rfd->peer->obuf); if (rfd->peer->ibuf_work) - stream_free(rfd->peer->ibuf_work); + ringbuf_del(rfd->peer->ibuf_work); if (rfd->peer->obuf_work) stream_free(rfd->peer->obuf_work); diff --git a/bgpd/rfapi/vnc_zebra.c b/bgpd/rfapi/vnc_zebra.c index 07be7833b6..f8b38468f5 100644 --- a/bgpd/rfapi/vnc_zebra.c +++ b/bgpd/rfapi/vnc_zebra.c @@ -30,6 +30,7 @@ #include "lib/command.h" #include "lib/zclient.h" #include "lib/stream.h" +#include "lib/ringbuf.h" #include "lib/memory.h" #include "bgpd/bgpd.h" @@ -198,7 +199,7 @@ static void vnc_redistribute_add(struct prefix *p, u_int32_t metric, stream_fifo_free(vncHD1VR.peer->obuf); if (vncHD1VR.peer->ibuf_work) - stream_free(vncHD1VR.peer->ibuf_work); + ringbuf_del(vncHD1VR.peer->ibuf_work); if (vncHD1VR.peer->obuf_work) stream_free(vncHD1VR.peer->obuf_work); |
