diff options
Diffstat (limited to 'bgpd/bgp_io.c')
| -rw-r--r-- | bgpd/bgp_io.c | 199 |
1 files changed, 104 insertions, 95 deletions
diff --git a/bgpd/bgp_io.c b/bgpd/bgp_io.c index 650adc1c9a..b07e69ac31 100644 --- a/bgpd/bgp_io.c +++ b/bgpd/bgp_io.c @@ -29,11 +29,11 @@ /* clang-format on */ /* forward declarations */ -static uint16_t bgp_write(struct peer *); -static uint16_t bgp_read(struct peer *peer, int *code_p); +static uint16_t bgp_write(struct peer_connection *connection); +static uint16_t bgp_read(struct peer_connection *connection, int *code_p); static void bgp_process_writes(struct event *event); static void bgp_process_reads(struct event *event); -static bool validate_header(struct peer *); +static bool validate_header(struct peer_connection *connection); /* generic i/o status codes */ #define BGP_IO_TRANS_ERR (1 << 0) /* EAGAIN or similar occurred */ @@ -42,65 +42,67 @@ static bool validate_header(struct peer *); /* Thread external API ----------------------------------------------------- */ -void bgp_writes_on(struct peer *peer) +void bgp_writes_on(struct peer_connection *connection) { struct frr_pthread *fpt = bgp_pth_io; + assert(fpt->running); - assert(peer->status != Deleted); - assert(peer->obuf); - assert(peer->ibuf); - assert(peer->ibuf_work); - assert(!peer->t_connect_check_r); - assert(!peer->t_connect_check_w); - assert(peer->fd); - - event_add_write(fpt->master, bgp_process_writes, peer, peer->fd, - &peer->t_write); - SET_FLAG(peer->thread_flags, PEER_THREAD_WRITES_ON); + assert(connection->status != Deleted); + assert(connection->obuf); + assert(connection->ibuf); + assert(connection->ibuf_work); + assert(!connection->t_connect_check_r); + assert(!connection->t_connect_check_w); + assert(connection->fd); + + event_add_write(fpt->master, bgp_process_writes, connection, + connection->fd, &connection->t_write); + SET_FLAG(connection->thread_flags, PEER_THREAD_WRITES_ON); } -void bgp_writes_off(struct peer *peer) +void bgp_writes_off(struct peer_connection *connection) { + struct peer *peer = connection->peer; struct frr_pthread *fpt = bgp_pth_io; assert(fpt->running); - event_cancel_async(fpt->master, &peer->t_write, NULL); - EVENT_OFF(peer->t_generate_updgrp_packets); + event_cancel_async(fpt->master, &connection->t_write, NULL); + EVENT_OFF(connection->t_generate_updgrp_packets); - UNSET_FLAG(peer->thread_flags, PEER_THREAD_WRITES_ON); + UNSET_FLAG(peer->connection->thread_flags, PEER_THREAD_WRITES_ON); } -void bgp_reads_on(struct peer *peer) +void bgp_reads_on(struct peer_connection *connection) { struct frr_pthread *fpt = bgp_pth_io; assert(fpt->running); - assert(peer->status != Deleted); - assert(peer->ibuf); - assert(peer->fd); - assert(peer->ibuf_work); - assert(peer->obuf); - assert(!peer->t_connect_check_r); - assert(!peer->t_connect_check_w); - assert(peer->fd); + assert(connection->status != Deleted); + assert(connection->ibuf); + assert(connection->fd); + assert(connection->ibuf_work); + assert(connection->obuf); + assert(!connection->t_connect_check_r); + assert(!connection->t_connect_check_w); + assert(connection->fd); - event_add_read(fpt->master, bgp_process_reads, peer, peer->fd, - &peer->t_read); + event_add_read(fpt->master, bgp_process_reads, connection, + connection->fd, &connection->t_read); - SET_FLAG(peer->thread_flags, PEER_THREAD_READS_ON); + SET_FLAG(connection->thread_flags, PEER_THREAD_READS_ON); } -void bgp_reads_off(struct peer *peer) +void bgp_reads_off(struct peer_connection *connection) { struct frr_pthread *fpt = bgp_pth_io; assert(fpt->running); - event_cancel_async(fpt->master, &peer->t_read, NULL); - EVENT_OFF(peer->t_process_packet); - EVENT_OFF(peer->t_process_packet_error); + event_cancel_async(fpt->master, &connection->t_read, NULL); + EVENT_OFF(connection->t_process_packet); + EVENT_OFF(connection->t_process_packet_error); - UNSET_FLAG(peer->thread_flags, PEER_THREAD_READS_ON); + UNSET_FLAG(connection->thread_flags, PEER_THREAD_READS_ON); } /* Thread internal functions ----------------------------------------------- */ @@ -111,19 +113,21 @@ void bgp_reads_off(struct peer *peer) static void bgp_process_writes(struct event *thread) { static struct peer *peer; - peer = EVENT_ARG(thread); + struct peer_connection *connection = EVENT_ARG(thread); uint16_t status; bool reschedule; bool fatal = false; - if (peer->fd < 0) + peer = connection->peer; + + if (connection->fd < 0) return; struct frr_pthread *fpt = bgp_pth_io; - frr_with_mutex (&peer->io_mtx) { - status = bgp_write(peer); - reschedule = (stream_fifo_head(peer->obuf) != NULL); + frr_with_mutex (&connection->io_mtx) { + status = bgp_write(connection); + reschedule = (stream_fifo_head(connection->obuf) != NULL); } /* no problem */ @@ -142,26 +146,26 @@ static void bgp_process_writes(struct event *thread) * sent in the update message */ if (reschedule) { - event_add_write(fpt->master, bgp_process_writes, peer, peer->fd, - &peer->t_write); + event_add_write(fpt->master, bgp_process_writes, connection, + connection->fd, &connection->t_write); } else if (!fatal) { - BGP_UPDATE_GROUP_TIMER_ON(&peer->t_generate_updgrp_packets, + BGP_UPDATE_GROUP_TIMER_ON(&connection->t_generate_updgrp_packets, bgp_generate_updgrp_packets); } } -static int read_ibuf_work(struct peer *peer) +static int read_ibuf_work(struct peer_connection *connection) { /* static buffer for transferring packets */ /* shorter alias to peer's input buffer */ - struct ringbuf *ibw = peer->ibuf_work; + struct ringbuf *ibw = connection->ibuf_work; /* packet size as given by header */ uint16_t pktsize = 0; struct stream *pkt; /* ============================================== */ - frr_with_mutex (&peer->io_mtx) { - if (peer->ibuf->count >= bm->inq_limit) + frr_with_mutex (&connection->io_mtx) { + if (connection->ibuf->count >= bm->inq_limit) return -ENOMEM; } @@ -170,7 +174,7 @@ static int read_ibuf_work(struct peer *peer) return 0; /* check that header is valid */ - if (!validate_header(peer)) + if (!validate_header(connection)) return -EBADMSG; /* header is valid; retrieve packet size */ @@ -179,7 +183,7 @@ static int read_ibuf_work(struct peer *peer) pktsize = ntohs(pktsize); /* if this fails we are seriously screwed */ - assert(pktsize <= peer->max_packet_size); + assert(pktsize <= connection->peer->max_packet_size); /* * If we have that much data, chuck it into its own @@ -195,9 +199,9 @@ static int read_ibuf_work(struct peer *peer) assert(ringbuf_get(ibw, pkt->data, pktsize) == pktsize); stream_set_endp(pkt, pktsize); - frrtrace(2, frr_bgp, packet_read, peer, pkt); - frr_with_mutex (&peer->io_mtx) { - stream_fifo_push(peer->ibuf, pkt); + frrtrace(2, frr_bgp, packet_read, connection->peer, pkt); + frr_with_mutex (&connection->io_mtx) { + stream_fifo_push(connection->ibuf, pkt); } return pktsize; @@ -208,30 +212,31 @@ static int read_ibuf_work(struct peer *peer) * or has hung up. * * We read as much data as possible, process as many packets as we can and - * place them on peer->ibuf for secondary processing by the main thread. + * place them on peer->connection.ibuf for secondary processing by the main + * thread. */ static void bgp_process_reads(struct event *thread) { /* clang-format off */ + struct peer_connection *connection = EVENT_ARG(thread); static struct peer *peer; /* peer to read from */ uint16_t status; /* bgp_read status code */ bool fatal = false; /* whether fatal error occurred */ - bool added_pkt = false; /* whether we pushed onto ->ibuf */ + bool added_pkt = false; /* whether we pushed onto ->connection.ibuf */ int code = 0; /* FSM code if error occurred */ - bool ibuf_full = false; /* Is peer fifo IN Buffer full */ static bool ibuf_full_logged; /* Have we logged full already */ int ret = 1; /* clang-format on */ - peer = EVENT_ARG(thread); + peer = connection->peer; - if (peer->fd < 0 || bm->terminating) + if (bm->terminating || connection->fd < 0) return; struct frr_pthread *fpt = bgp_pth_io; - frr_with_mutex (&peer->io_mtx) { - status = bgp_read(peer, &code); + frr_with_mutex (&connection->io_mtx) { + status = bgp_read(connection, &code); } /* error checking phase */ @@ -247,13 +252,13 @@ static void bgp_process_reads(struct event *thread) /* Handle the error in the main pthread, include the * specific state change from 'bgp_read'. */ - event_add_event(bm->master, bgp_packet_process_error, peer, - code, &peer->t_process_packet_error); + event_add_event(bm->master, bgp_packet_process_error, connection, + code, &connection->t_process_packet_error); goto done; } while (true) { - ret = read_ibuf_work(peer); + ret = read_ibuf_work(connection); if (ret <= 0) break; @@ -265,7 +270,6 @@ static void bgp_process_reads(struct event *thread) fatal = true; break; case -ENOMEM: - ibuf_full = true; if (!ibuf_full_logged) { if (bgp_debug_neighbor_events(peer)) zlog_debug( @@ -284,35 +288,33 @@ done: /* handle invalid header */ if (fatal) { /* wipe buffer just in case someone screwed up */ - ringbuf_wipe(peer->ibuf_work); + ringbuf_wipe(connection->ibuf_work); return; } - /* ringbuf should be fully drained unless ibuf is full */ - if (!ibuf_full) - assert(ringbuf_space(peer->ibuf_work) >= peer->max_packet_size); - - event_add_read(fpt->master, bgp_process_reads, peer, peer->fd, - &peer->t_read); + event_add_read(fpt->master, bgp_process_reads, connection, + connection->fd, &connection->t_read); if (added_pkt) - event_add_event(bm->master, bgp_process_packet, peer, 0, - &peer->t_process_packet); + event_add_event(bm->master, bgp_process_packet, connection, 0, + &connection->t_process_packet); } /* * Flush peer output buffer. * - * This function pops packets off of peer->obuf and writes them to peer->fd. - * The amount of packets written is equal to the minimum of peer->wpkt_quanta - * and the number of packets on the output buffer, unless an error occurs. + * This function pops packets off of peer->connection.obuf and writes them to + * peer->connection.fd. The amount of packets written is equal to the minimum of + * peer->wpkt_quanta and the number of packets on the output buffer, unless an + * error occurs. * * If write() returns an error, the appropriate FSM event is generated. * * The return value is equal to the number of packets written * (which may be zero). */ -static uint16_t bgp_write(struct peer *peer) +static uint16_t bgp_write(struct peer_connection *connection) { + struct peer *peer = connection->peer; uint8_t type; struct stream *s; int update_last_write = 0; @@ -334,7 +336,7 @@ static uint16_t bgp_write(struct peer *peer) struct stream **streams = ostreams; struct iovec iov[wpkt_quanta_old]; - s = stream_fifo_head(peer->obuf); + s = stream_fifo_head(connection->obuf); if (!s) goto done; @@ -354,11 +356,11 @@ static uint16_t bgp_write(struct peer *peer) total_written = 0; do { - num = writev(peer->fd, iov, iovsz); + num = writev(connection->fd, iov, iovsz); if (num < 0) { if (!ERRNO_IO_RETRY(errno)) { - BGP_EVENT_ADD(peer, TCP_fatal_error); + BGP_EVENT_ADD(connection, TCP_fatal_error); SET_FLAG(status, BGP_IO_FATAL_ERR); } else { SET_FLAG(status, BGP_IO_TRANS_ERR); @@ -403,7 +405,7 @@ static uint16_t bgp_write(struct peer *peer) /* Handle statistics */ for (unsigned int i = 0; i < total_written; i++) { - s = stream_fifo_pop(peer->obuf); + s = stream_fifo_pop(connection->obuf); assert(s == ostreams[i]); @@ -435,7 +437,7 @@ static uint16_t bgp_write(struct peer *peer) * Handle Graceful Restart case where the state changes * to Connect instead of Idle. */ - BGP_EVENT_ADD(peer, BGP_Stop); + BGP_EVENT_ADD(connection, BGP_Stop); goto done; case BGP_MSG_KEEPALIVE: @@ -480,31 +482,37 @@ done : { return status; } +uint8_t ibuf_scratch[BGP_EXTENDED_MESSAGE_MAX_PACKET_SIZE * BGP_READ_PACKET_MAX]; /* - * Reads a chunk of data from peer->fd into peer->ibuf_work. + * Reads a chunk of data from peer->connection.fd into + * peer->connection.ibuf_work. * * code_p * Pointer to location to store FSM event code in case of fatal error. * * @return status flag (see top-of-file) + * + * PLEASE NOTE: If we ever transform the bgp_read to be a pthread + * per peer then we need to rethink the global ibuf_scratch + * data structure above. */ -static uint16_t bgp_read(struct peer *peer, int *code_p) +static uint16_t bgp_read(struct peer_connection *connection, int *code_p) { size_t readsize; /* how many bytes we want to read */ ssize_t nbytes; /* how many bytes we actually read */ size_t ibuf_work_space; /* space we can read into the work buf */ uint16_t status = 0; - ibuf_work_space = ringbuf_space(peer->ibuf_work); + ibuf_work_space = ringbuf_space(connection->ibuf_work); if (ibuf_work_space == 0) { SET_FLAG(status, BGP_IO_WORK_FULL_ERR); return status; } - readsize = MIN(ibuf_work_space, sizeof(peer->ibuf_scratch)); + readsize = MIN(ibuf_work_space, sizeof(ibuf_scratch)); - nbytes = read(peer->fd, peer->ibuf_scratch, readsize); + nbytes = read(connection->fd, ibuf_scratch, readsize); /* EAGAIN or EWOULDBLOCK; come back later */ if (nbytes < 0 && ERRNO_IO_RETRY(errno)) { @@ -512,8 +520,8 @@ static uint16_t bgp_read(struct peer *peer, int *code_p) } else if (nbytes < 0) { /* Fatal error; tear down session */ flog_err(EC_BGP_UPDATE_RCV, - "%s [Error] bgp_read_packet error: %s", peer->host, - safe_strerror(errno)); + "%s [Error] bgp_read_packet error: %s", + connection->peer->host, safe_strerror(errno)); /* Handle the error in the main pthread. */ if (code_p) @@ -523,9 +531,9 @@ static uint16_t bgp_read(struct peer *peer, int *code_p) } else if (nbytes == 0) { /* Received EOF / TCP session closed */ - if (bgp_debug_neighbor_events(peer)) + if (bgp_debug_neighbor_events(connection->peer)) zlog_debug("%s [Event] BGP connection closed fd %d", - peer->host, peer->fd); + connection->peer->host, connection->fd); /* Handle the error in the main pthread. */ if (code_p) @@ -533,8 +541,8 @@ static uint16_t bgp_read(struct peer *peer, int *code_p) SET_FLAG(status, BGP_IO_FATAL_ERR); } else { - assert(ringbuf_put(peer->ibuf_work, peer->ibuf_scratch, nbytes) - == (size_t)nbytes); + assert(ringbuf_put(connection->ibuf_work, ibuf_scratch, + nbytes) == (size_t)nbytes); } return status; @@ -547,11 +555,12 @@ static uint16_t bgp_read(struct peer *peer, int *code_p) * Assumes that there are at least BGP_HEADER_SIZE readable bytes in the input * buffer. */ -static bool validate_header(struct peer *peer) +static bool validate_header(struct peer_connection *connection) { + struct peer *peer = connection->peer; uint16_t size; uint8_t type; - struct ringbuf *pkt = peer->ibuf_work; + struct ringbuf *pkt = connection->ibuf_work; static const uint8_t m_correct[BGP_MARKER_SIZE] = { 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, |
