From c0c46bad15a5f3f69032678177ac7d00b7cd31be Mon Sep 17 00:00:00 2001 From: Soumya Roy Date: Fri, 14 Mar 2025 21:44:39 +0000 Subject: [PATCH] lib: Add support for stream buffer to expand Issue: Currently, during encode time, if required memory is more than available space in stream buffer, stream buffer can't be expanded. This fix introduces new apis to support stream buffer expansion. Testing: Tested with zebra nexthop encoding with 512 nexthops, which triggers this new code changes, it works fine. Without fix, for same trigger it asserts. Signed-off-by: Soumya Roy --- lib/stream.c | 142 ++++++++++++++++++++++++++++++++++++++++++--------- lib/stream.h | 4 +- 2 files changed, 120 insertions(+), 26 deletions(-) diff --git a/lib/stream.c b/lib/stream.c index bb90f3b944..6ac51859d3 100644 --- a/lib/stream.c +++ b/lib/stream.c @@ -16,6 +16,11 @@ #include "frr_pthread.h" #include "lib_errors.h" +#define MIN_STREAM_EXPANSION_SZ 512 + +/* Extra size needed for a stream in bytes, given new write size */ +#define STREAM_EXPAND_SIZE(S, WSZ) ((WSZ)-STREAM_WRITEABLE(S)) + DEFINE_MTYPE_STATIC(LIB, STREAM, "Stream"); DEFINE_MTYPE_STATIC(LIB, STREAM_FIFO, "Stream FIFO"); @@ -92,11 +97,20 @@ struct stream *stream_new(size_t size) assert(size > 0); - s = XMALLOC(MTYPE_STREAM, sizeof(struct stream) + size); + s = XMALLOC(MTYPE_STREAM, sizeof(struct stream)); + s->data = XMALLOC(MTYPE_STREAM, size); s->getp = s->endp = 0; s->next = NULL; s->size = size; + s->allow_expansion = false; + return s; +} + +struct stream *stream_new_expandable(size_t size) +{ + struct stream *s = stream_new(size); + s->allow_expansion = true; return s; } @@ -106,6 +120,7 @@ void stream_free(struct stream *s) if (!s) return; + XFREE(MTYPE_STREAM, s->data); XFREE(MTYPE_STREAM, s); } @@ -115,7 +130,7 @@ struct stream *stream_copy(struct stream *dest, const struct stream *src) assert(dest != NULL); assert(STREAM_SIZE(dest) >= src->endp); - + dest->allow_expansion = src->allow_expansion; dest->endp = src->endp; dest->getp = src->getp; @@ -131,7 +146,7 @@ struct stream *stream_dup(const struct stream *s) STREAM_VERIFY_SANE(s); snew = stream_new(s->endp); - + snew->allow_expansion = s->allow_expansion; return (stream_copy(snew, s)); } @@ -143,9 +158,16 @@ struct stream *stream_dupcat(const struct stream *s1, const struct stream *s2, STREAM_VERIFY_SANE(s1); STREAM_VERIFY_SANE(s2); + if (offset > s1->endp) { + fprintf(stderr, "Error: Invalid offset %zu, exceeds s1->endp %zu\n", offset, + s1->endp); + return NULL; + } + if ((new = stream_new(s1->endp + s2->endp)) == NULL) return NULL; + new->allow_expansion = s1->allow_expansion || s2->allow_expansion; memcpy(new->data, s1->data, offset); memcpy(new->data + offset, s2->data, s2->endp); memcpy(new->data + offset + s2->endp, s1->data + offset, @@ -160,7 +182,7 @@ size_t stream_resize_inplace(struct stream **sptr, size_t newsize) STREAM_VERIFY_SANE(orig); - orig = XREALLOC(MTYPE_STREAM, orig, sizeof(struct stream) + newsize); + orig->data = XREALLOC(MTYPE_STREAM, orig->data, newsize); orig->size = newsize; @@ -175,6 +197,29 @@ size_t stream_resize_inplace(struct stream **sptr, size_t newsize) return orig->size; } +/* Helper function to expand stream if needed and allowed */ +static void stream_expand(struct stream *s, size_t expand_size) +{ + size_t new_size; + size_t actual_expand_size = expand_size; + + /* Growth strategy: + * For small expansions (<= min expansion bytes): grow by min size + * otherwise grow by needed size + */ + if (actual_expand_size <= MIN_STREAM_EXPANSION_SZ) { + actual_expand_size = MIN_STREAM_EXPANSION_SZ; + } + + /* Calculate new total size */ + new_size = s->size + actual_expand_size; + /* Reallocate the data buffer */ + s->data = XREALLOC(MTYPE_STREAM, s->data, new_size); + + /* Update the stream's data size */ + s->size = new_size; +} + size_t stream_get_getp(const struct stream *s) { STREAM_VERIFY_SANE(s); @@ -691,8 +736,12 @@ void stream_put(struct stream *s, const void *src, size_t size) STREAM_VERIFY_SANE(s); if (STREAM_WRITEABLE(s) < size) { - STREAM_BOUND_WARN(s, "put"); - return; + if (s->allow_expansion) { + stream_expand(s, STREAM_EXPAND_SIZE(s, size)); + } else { + STREAM_BOUND_WARN(s, "put"); + return; + } } if (src) @@ -709,8 +758,12 @@ int stream_putc(struct stream *s, uint8_t c) STREAM_VERIFY_SANE(s); if (STREAM_WRITEABLE(s) < sizeof(uint8_t)) { - STREAM_BOUND_WARN(s, "put"); - return 0; + if (s->allow_expansion) { + stream_expand(s, STREAM_EXPAND_SIZE(s, sizeof(uint8_t))); + } else { + STREAM_BOUND_WARN(s, "put"); + return 0; + } } s->data[s->endp++] = c; @@ -723,8 +776,12 @@ int stream_putw(struct stream *s, uint16_t w) STREAM_VERIFY_SANE(s); if (STREAM_WRITEABLE(s) < sizeof(uint16_t)) { - STREAM_BOUND_WARN(s, "put"); - return 0; + if (s->allow_expansion) { + stream_expand(s, STREAM_EXPAND_SIZE(s, sizeof(uint16_t))); + } else { + STREAM_BOUND_WARN(s, "put"); + return 0; + } } s->data[s->endp++] = (uint8_t)(w >> 8); @@ -739,8 +796,12 @@ int stream_put3(struct stream *s, uint32_t l) STREAM_VERIFY_SANE(s); if (STREAM_WRITEABLE(s) < 3) { - STREAM_BOUND_WARN(s, "put"); - return 0; + if (s->allow_expansion) { + stream_expand(s, STREAM_EXPAND_SIZE(s, 3)); + } else { + STREAM_BOUND_WARN(s, "put"); + return 0; + } } s->data[s->endp++] = (uint8_t)(l >> 16); @@ -756,8 +817,12 @@ int stream_putl(struct stream *s, uint32_t l) STREAM_VERIFY_SANE(s); if (STREAM_WRITEABLE(s) < sizeof(uint32_t)) { - STREAM_BOUND_WARN(s, "put"); - return 0; + if (s->allow_expansion) { + stream_expand(s, STREAM_EXPAND_SIZE(s, sizeof(uint32_t))); + } else { + STREAM_BOUND_WARN(s, "put"); + return 0; + } } s->data[s->endp++] = (uint8_t)(l >> 24); @@ -774,8 +839,12 @@ int stream_putq(struct stream *s, uint64_t q) STREAM_VERIFY_SANE(s); if (STREAM_WRITEABLE(s) < sizeof(uint64_t)) { - STREAM_BOUND_WARN(s, "put quad"); - return 0; + if (s->allow_expansion) { + stream_expand(s, STREAM_EXPAND_SIZE(s, sizeof(uint64_t))); + } else { + STREAM_BOUND_WARN(s, "put"); + return 0; + } } s->data[s->endp++] = (uint8_t)(q >> 56); @@ -896,7 +965,13 @@ int stream_put_ipv4(struct stream *s, uint32_t l) STREAM_VERIFY_SANE(s); if (STREAM_WRITEABLE(s) < sizeof(uint32_t)) { - STREAM_BOUND_WARN(s, "put"); + if (s->allow_expansion) { + stream_expand(s, STREAM_EXPAND_SIZE(s, sizeof(uint32_t))); + } else { + STREAM_BOUND_WARN(s, "put"); + return 0; + } + return 0; } memcpy(s->data + s->endp, &l, sizeof(uint32_t)); @@ -911,8 +986,12 @@ int stream_put_in_addr(struct stream *s, const struct in_addr *addr) STREAM_VERIFY_SANE(s); if (STREAM_WRITEABLE(s) < sizeof(uint32_t)) { - STREAM_BOUND_WARN(s, "put"); - return 0; + if (s->allow_expansion) { + stream_expand(s, STREAM_EXPAND_SIZE(s, sizeof(uint32_t))); + } else { + STREAM_BOUND_WARN(s, "put"); + return 0; + } } memcpy(s->data + s->endp, addr, sizeof(uint32_t)); @@ -989,8 +1068,13 @@ int stream_put_prefix_addpath(struct stream *s, const struct prefix *p, psize_with_addpath = psize; if (STREAM_WRITEABLE(s) < (psize_with_addpath + sizeof(uint8_t))) { - STREAM_BOUND_WARN(s, "put"); - return 0; + if (s->allow_expansion) { + stream_expand(s, + STREAM_EXPAND_SIZE(s, psize_with_addpath + sizeof(uint8_t))); + } else { + STREAM_BOUND_WARN(s, "put"); + return 0; + } } if (addpath_capable) { @@ -1027,8 +1111,12 @@ int stream_put_labeled_prefix(struct stream *s, const struct prefix *p, psize_with_addpath = psize + (addpath_capable ? 4 : 0); if (STREAM_WRITEABLE(s) < (psize_with_addpath + 3)) { - STREAM_BOUND_WARN(s, "put"); - return 0; + if (s->allow_expansion) { + stream_expand(s, STREAM_EXPAND_SIZE(s, psize_with_addpath + 3)); + } else { + STREAM_BOUND_WARN(s, "put"); + return 0; + } } if (addpath_capable) { @@ -1167,8 +1255,12 @@ size_t stream_write(struct stream *s, const void *ptr, size_t size) STREAM_VERIFY_SANE(s); if (STREAM_WRITEABLE(s) < size) { - STREAM_BOUND_WARN(s, "put"); - return 0; + if (s->allow_expansion) { + stream_expand(s, STREAM_EXPAND_SIZE(s, size)); + } else { + STREAM_BOUND_WARN(s, "put"); + return 0; + } } memcpy(s->data + s->endp, ptr, size); diff --git a/lib/stream.h b/lib/stream.h index e48cedc613..437ba5aa74 100644 --- a/lib/stream.h +++ b/lib/stream.h @@ -95,7 +95,8 @@ struct stream { size_t getp; /* next get position */ size_t endp; /* last valid data position */ size_t size; /* size of data segment */ - unsigned char data[]; /* data pointer */ + bool allow_expansion; /* whether stream can be expanded */ + unsigned char *data; /* data pointer */ }; /* First in first out queue structure. */ @@ -132,6 +133,7 @@ struct stream_fifo { * q: quad (four words) */ extern struct stream *stream_new(size_t); +extern struct stream *stream_new_expandable(size_t); extern void stream_free(struct stream *); /* Copy 'src' into 'dest', returns 'dest' */ extern struct stream *stream_copy(struct stream *dest, -- 2.39.5