]> git.puffer.fish Git - matthieu/frr.git/commitdiff
lib: Add support for stream buffer to expand
authorSoumya Roy <souroy@nvidia.com>
Fri, 14 Mar 2025 21:44:39 +0000 (21:44 +0000)
committerSoumya Roy <souroy@nvidia.com>
Thu, 20 Mar 2025 16:13:34 +0000 (16:13 +0000)
Issue:
 Currently, during encode time, if required memory is
 more than available space in stream buffer, stream buffer
 can't be expanded. This fix introduces new apis to support
 stream buffer expansion.

 Testing:
 Tested with zebra nexthop encoding with 512 nexthops, which triggers
 this new code changes, it works fine. Without fix, for same trigger
 it asserts.

Signed-off-by: Soumya Roy <souroy@nvidia.com>
lib/stream.c
lib/stream.h

index bb90f3b944e981d651ada12da2cd75f05828f94e..6ac51859d3b44b07dd00084a035a6ff6240bc3ac 100644 (file)
 #include "frr_pthread.h"
 #include "lib_errors.h"
 
+#define MIN_STREAM_EXPANSION_SZ 512
+
+/* Extra size  needed for a stream in bytes, given new write size */
+#define STREAM_EXPAND_SIZE(S, WSZ) ((WSZ)-STREAM_WRITEABLE(S))
+
 DEFINE_MTYPE_STATIC(LIB, STREAM, "Stream");
 DEFINE_MTYPE_STATIC(LIB, STREAM_FIFO, "Stream FIFO");
 
@@ -92,11 +97,20 @@ struct stream *stream_new(size_t size)
 
        assert(size > 0);
 
-       s = XMALLOC(MTYPE_STREAM, sizeof(struct stream) + size);
+       s = XMALLOC(MTYPE_STREAM, sizeof(struct stream));
+       s->data = XMALLOC(MTYPE_STREAM, size);
 
        s->getp = s->endp = 0;
        s->next = NULL;
        s->size = size;
+       s->allow_expansion = false;
+       return s;
+}
+
+struct stream *stream_new_expandable(size_t size)
+{
+       struct stream *s = stream_new(size);
+       s->allow_expansion = true;
        return s;
 }
 
@@ -106,6 +120,7 @@ void stream_free(struct stream *s)
        if (!s)
                return;
 
+       XFREE(MTYPE_STREAM, s->data);
        XFREE(MTYPE_STREAM, s);
 }
 
@@ -115,7 +130,7 @@ struct stream *stream_copy(struct stream *dest, const struct stream *src)
 
        assert(dest != NULL);
        assert(STREAM_SIZE(dest) >= src->endp);
-
+       dest->allow_expansion = src->allow_expansion;
        dest->endp = src->endp;
        dest->getp = src->getp;
 
@@ -131,7 +146,7 @@ struct stream *stream_dup(const struct stream *s)
        STREAM_VERIFY_SANE(s);
 
        snew = stream_new(s->endp);
-
+       snew->allow_expansion = s->allow_expansion;
        return (stream_copy(snew, s));
 }
 
@@ -143,9 +158,16 @@ struct stream *stream_dupcat(const struct stream *s1, const struct stream *s2,
        STREAM_VERIFY_SANE(s1);
        STREAM_VERIFY_SANE(s2);
 
+       if (offset > s1->endp) {
+               fprintf(stderr, "Error: Invalid offset %zu, exceeds s1->endp %zu\n", offset,
+                       s1->endp);
+               return NULL;
+       }
+
        if ((new = stream_new(s1->endp + s2->endp)) == NULL)
                return NULL;
 
+       new->allow_expansion = s1->allow_expansion || s2->allow_expansion;
        memcpy(new->data, s1->data, offset);
        memcpy(new->data + offset, s2->data, s2->endp);
        memcpy(new->data + offset + s2->endp, s1->data + offset,
@@ -160,7 +182,7 @@ size_t stream_resize_inplace(struct stream **sptr, size_t newsize)
 
        STREAM_VERIFY_SANE(orig);
 
-       orig = XREALLOC(MTYPE_STREAM, orig, sizeof(struct stream) + newsize);
+       orig->data = XREALLOC(MTYPE_STREAM, orig->data, newsize);
 
        orig->size = newsize;
 
@@ -175,6 +197,29 @@ size_t stream_resize_inplace(struct stream **sptr, size_t newsize)
        return orig->size;
 }
 
+/* Helper function to expand stream if needed and allowed */
+static void stream_expand(struct stream *s, size_t expand_size)
+{
+       size_t new_size;
+       size_t actual_expand_size = expand_size;
+
+       /* Growth strategy:
+        * For small expansions (<= min expansion bytes): grow by min size
+        * otherwise grow by needed size
+        */
+       if (actual_expand_size <= MIN_STREAM_EXPANSION_SZ) {
+               actual_expand_size = MIN_STREAM_EXPANSION_SZ;
+       }
+
+       /* Calculate new total size */
+       new_size = s->size + actual_expand_size;
+       /* Reallocate the data buffer */
+       s->data = XREALLOC(MTYPE_STREAM, s->data, new_size);
+
+       /* Update the stream's data size */
+       s->size = new_size;
+}
+
 size_t stream_get_getp(const struct stream *s)
 {
        STREAM_VERIFY_SANE(s);
@@ -691,8 +736,12 @@ void stream_put(struct stream *s, const void *src, size_t size)
        STREAM_VERIFY_SANE(s);
 
        if (STREAM_WRITEABLE(s) < size) {
-               STREAM_BOUND_WARN(s, "put");
-               return;
+               if (s->allow_expansion) {
+                       stream_expand(s, STREAM_EXPAND_SIZE(s, size));
+               } else {
+                       STREAM_BOUND_WARN(s, "put");
+                       return;
+               }
        }
 
        if (src)
@@ -709,8 +758,12 @@ int stream_putc(struct stream *s, uint8_t c)
        STREAM_VERIFY_SANE(s);
 
        if (STREAM_WRITEABLE(s) < sizeof(uint8_t)) {
-               STREAM_BOUND_WARN(s, "put");
-               return 0;
+               if (s->allow_expansion) {
+                       stream_expand(s, STREAM_EXPAND_SIZE(s, sizeof(uint8_t)));
+               } else {
+                       STREAM_BOUND_WARN(s, "put");
+                       return 0;
+               }
        }
 
        s->data[s->endp++] = c;
@@ -723,8 +776,12 @@ int stream_putw(struct stream *s, uint16_t w)
        STREAM_VERIFY_SANE(s);
 
        if (STREAM_WRITEABLE(s) < sizeof(uint16_t)) {
-               STREAM_BOUND_WARN(s, "put");
-               return 0;
+               if (s->allow_expansion) {
+                       stream_expand(s, STREAM_EXPAND_SIZE(s, sizeof(uint16_t)));
+               } else {
+                       STREAM_BOUND_WARN(s, "put");
+                       return 0;
+               }
        }
 
        s->data[s->endp++] = (uint8_t)(w >> 8);
@@ -739,8 +796,12 @@ int stream_put3(struct stream *s, uint32_t l)
        STREAM_VERIFY_SANE(s);
 
        if (STREAM_WRITEABLE(s) < 3) {
-               STREAM_BOUND_WARN(s, "put");
-               return 0;
+               if (s->allow_expansion) {
+                       stream_expand(s, STREAM_EXPAND_SIZE(s, 3));
+               } else {
+                       STREAM_BOUND_WARN(s, "put");
+                       return 0;
+               }
        }
 
        s->data[s->endp++] = (uint8_t)(l >> 16);
@@ -756,8 +817,12 @@ int stream_putl(struct stream *s, uint32_t l)
        STREAM_VERIFY_SANE(s);
 
        if (STREAM_WRITEABLE(s) < sizeof(uint32_t)) {
-               STREAM_BOUND_WARN(s, "put");
-               return 0;
+               if (s->allow_expansion) {
+                       stream_expand(s, STREAM_EXPAND_SIZE(s, sizeof(uint32_t)));
+               } else {
+                       STREAM_BOUND_WARN(s, "put");
+                       return 0;
+               }
        }
 
        s->data[s->endp++] = (uint8_t)(l >> 24);
@@ -774,8 +839,12 @@ int stream_putq(struct stream *s, uint64_t q)
        STREAM_VERIFY_SANE(s);
 
        if (STREAM_WRITEABLE(s) < sizeof(uint64_t)) {
-               STREAM_BOUND_WARN(s, "put quad");
-               return 0;
+               if (s->allow_expansion) {
+                       stream_expand(s, STREAM_EXPAND_SIZE(s, sizeof(uint64_t)));
+               } else {
+                       STREAM_BOUND_WARN(s, "put");
+                       return 0;
+               }
        }
 
        s->data[s->endp++] = (uint8_t)(q >> 56);
@@ -896,7 +965,13 @@ int stream_put_ipv4(struct stream *s, uint32_t l)
        STREAM_VERIFY_SANE(s);
 
        if (STREAM_WRITEABLE(s) < sizeof(uint32_t)) {
-               STREAM_BOUND_WARN(s, "put");
+               if (s->allow_expansion) {
+                       stream_expand(s, STREAM_EXPAND_SIZE(s, sizeof(uint32_t)));
+               } else {
+                       STREAM_BOUND_WARN(s, "put");
+                       return 0;
+               }
+
                return 0;
        }
        memcpy(s->data + s->endp, &l, sizeof(uint32_t));
@@ -911,8 +986,12 @@ int stream_put_in_addr(struct stream *s, const struct in_addr *addr)
        STREAM_VERIFY_SANE(s);
 
        if (STREAM_WRITEABLE(s) < sizeof(uint32_t)) {
-               STREAM_BOUND_WARN(s, "put");
-               return 0;
+               if (s->allow_expansion) {
+                       stream_expand(s, STREAM_EXPAND_SIZE(s, sizeof(uint32_t)));
+               } else {
+                       STREAM_BOUND_WARN(s, "put");
+                       return 0;
+               }
        }
 
        memcpy(s->data + s->endp, addr, sizeof(uint32_t));
@@ -989,8 +1068,13 @@ int stream_put_prefix_addpath(struct stream *s, const struct prefix *p,
                psize_with_addpath = psize;
 
        if (STREAM_WRITEABLE(s) < (psize_with_addpath + sizeof(uint8_t))) {
-               STREAM_BOUND_WARN(s, "put");
-               return 0;
+               if (s->allow_expansion) {
+                       stream_expand(s,
+                                     STREAM_EXPAND_SIZE(s, psize_with_addpath + sizeof(uint8_t)));
+               } else {
+                       STREAM_BOUND_WARN(s, "put");
+                       return 0;
+               }
        }
 
        if (addpath_capable) {
@@ -1027,8 +1111,12 @@ int stream_put_labeled_prefix(struct stream *s, const struct prefix *p,
        psize_with_addpath = psize + (addpath_capable ? 4 : 0);
 
        if (STREAM_WRITEABLE(s) < (psize_with_addpath + 3)) {
-               STREAM_BOUND_WARN(s, "put");
-               return 0;
+               if (s->allow_expansion) {
+                       stream_expand(s, STREAM_EXPAND_SIZE(s, psize_with_addpath + 3));
+               } else {
+                       STREAM_BOUND_WARN(s, "put");
+                       return 0;
+               }
        }
 
        if (addpath_capable) {
@@ -1167,8 +1255,12 @@ size_t stream_write(struct stream *s, const void *ptr, size_t size)
        STREAM_VERIFY_SANE(s);
 
        if (STREAM_WRITEABLE(s) < size) {
-               STREAM_BOUND_WARN(s, "put");
-               return 0;
+               if (s->allow_expansion) {
+                       stream_expand(s, STREAM_EXPAND_SIZE(s, size));
+               } else {
+                       STREAM_BOUND_WARN(s, "put");
+                       return 0;
+               }
        }
 
        memcpy(s->data + s->endp, ptr, size);
index e48cedc6134513cb8670d0d67ccb2bea15ee05c2..437ba5aa7447582a46d7e796b089ae6c3c40a9aa 100644 (file)
@@ -95,7 +95,8 @@ struct stream {
        size_t getp;           /* next get position */
        size_t endp;           /* last valid data position */
        size_t size;           /* size of data segment */
-       unsigned char data[];  /* data pointer */
+       bool allow_expansion;  /* whether stream can be expanded */
+       unsigned char *data;   /* data pointer */
 };
 
 /* First in first out queue structure. */
@@ -132,6 +133,7 @@ struct stream_fifo {
  * q: quad (four words)
  */
 extern struct stream *stream_new(size_t);
+extern struct stream *stream_new_expandable(size_t);
 extern void stream_free(struct stream *);
 /* Copy 'src' into 'dest', returns 'dest' */
 extern struct stream *stream_copy(struct stream *dest,