diff options
| author | Soumya Roy <souroy@nvidia.com> | 2025-03-14 21:44:39 +0000 | 
|---|---|---|
| committer | Soumya Roy <souroy@nvidia.com> | 2025-03-20 16:13:34 +0000 | 
| commit | c0c46bad15a5f3f69032678177ac7d00b7cd31be (patch) | |
| tree | 2b72f2221423f27c55e224aa85b97a0a201c88ee | |
| parent | 8f8d0923a7d0670abd48d3bc15a6c7a6296d6d2b (diff) | |
lib: Add support for stream buffer to expand
Issue:
 Currently, during encode time, if required memory is
 more than available space in stream buffer, stream buffer
 can't be expanded. This fix introduces new apis to support
 stream buffer expansion.
 Testing:
 Tested with zebra nexthop encoding with 512 nexthops, which triggers
 this new code changes, it works fine. Without fix, for same trigger
 it asserts.
 Signed-off-by: Soumya Roy <souroy@nvidia.com>
| -rw-r--r-- | lib/stream.c | 142 | ||||
| -rw-r--r-- | lib/stream.h | 4 | 
2 files changed, 120 insertions, 26 deletions
diff --git a/lib/stream.c b/lib/stream.c index bb90f3b944..6ac51859d3 100644 --- a/lib/stream.c +++ b/lib/stream.c @@ -16,6 +16,11 @@  #include "frr_pthread.h"  #include "lib_errors.h" +#define MIN_STREAM_EXPANSION_SZ 512 + +/* Extra size  needed for a stream in bytes, given new write size */ +#define STREAM_EXPAND_SIZE(S, WSZ) ((WSZ)-STREAM_WRITEABLE(S)) +  DEFINE_MTYPE_STATIC(LIB, STREAM, "Stream");  DEFINE_MTYPE_STATIC(LIB, STREAM_FIFO, "Stream FIFO"); @@ -92,11 +97,20 @@ struct stream *stream_new(size_t size)  	assert(size > 0); -	s = XMALLOC(MTYPE_STREAM, sizeof(struct stream) + size); +	s = XMALLOC(MTYPE_STREAM, sizeof(struct stream)); +	s->data = XMALLOC(MTYPE_STREAM, size);  	s->getp = s->endp = 0;  	s->next = NULL;  	s->size = size; +	s->allow_expansion = false; +	return s; +} + +struct stream *stream_new_expandable(size_t size) +{ +	struct stream *s = stream_new(size); +	s->allow_expansion = true;  	return s;  } @@ -106,6 +120,7 @@ void stream_free(struct stream *s)  	if (!s)  		return; +	XFREE(MTYPE_STREAM, s->data);  	XFREE(MTYPE_STREAM, s);  } @@ -115,7 +130,7 @@ struct stream *stream_copy(struct stream *dest, const struct stream *src)  	assert(dest != NULL);  	assert(STREAM_SIZE(dest) >= src->endp); - +	dest->allow_expansion = src->allow_expansion;  	dest->endp = src->endp;  	dest->getp = src->getp; @@ -131,7 +146,7 @@ struct stream *stream_dup(const struct stream *s)  	STREAM_VERIFY_SANE(s);  	snew = stream_new(s->endp); - +	snew->allow_expansion = s->allow_expansion;  	return (stream_copy(snew, s));  } @@ -143,9 +158,16 @@ struct stream *stream_dupcat(const struct stream *s1, const struct stream *s2,  	STREAM_VERIFY_SANE(s1);  	STREAM_VERIFY_SANE(s2); +	if (offset > s1->endp) { +		fprintf(stderr, "Error: Invalid offset %zu, exceeds s1->endp %zu\n", offset, +			s1->endp); +		return NULL; +	} +  	if ((new = stream_new(s1->endp + s2->endp)) == NULL)  		return NULL; +	new->allow_expansion = s1->allow_expansion || s2->allow_expansion;  	memcpy(new->data, s1->data, offset);  	memcpy(new->data + offset, s2->data, s2->endp);  	memcpy(new->data + offset + s2->endp, s1->data + offset, @@ -160,7 +182,7 @@ size_t stream_resize_inplace(struct stream **sptr, size_t newsize)  	STREAM_VERIFY_SANE(orig); -	orig = XREALLOC(MTYPE_STREAM, orig, sizeof(struct stream) + newsize); +	orig->data = XREALLOC(MTYPE_STREAM, orig->data, newsize);  	orig->size = newsize; @@ -175,6 +197,29 @@ size_t stream_resize_inplace(struct stream **sptr, size_t newsize)  	return orig->size;  } +/* Helper function to expand stream if needed and allowed */ +static void stream_expand(struct stream *s, size_t expand_size) +{ +	size_t new_size; +	size_t actual_expand_size = expand_size; + +	/* Growth strategy: +	 * For small expansions (<= min expansion bytes): grow by min size +	 * otherwise grow by needed size +	 */ +	if (actual_expand_size <= MIN_STREAM_EXPANSION_SZ) { +		actual_expand_size = MIN_STREAM_EXPANSION_SZ; +	} + +	/* Calculate new total size */ +	new_size = s->size + actual_expand_size; +	/* Reallocate the data buffer */ +	s->data = XREALLOC(MTYPE_STREAM, s->data, new_size); + +	/* Update the stream's data size */ +	s->size = new_size; +} +  size_t stream_get_getp(const struct stream *s)  {  	STREAM_VERIFY_SANE(s); @@ -691,8 +736,12 @@ void stream_put(struct stream *s, const void *src, size_t size)  	STREAM_VERIFY_SANE(s);  	if (STREAM_WRITEABLE(s) < size) { -		STREAM_BOUND_WARN(s, "put"); -		return; +		if (s->allow_expansion) { +			stream_expand(s, STREAM_EXPAND_SIZE(s, size)); +		} else { +			STREAM_BOUND_WARN(s, "put"); +			return; +		}  	}  	if (src) @@ -709,8 +758,12 @@ int stream_putc(struct stream *s, uint8_t c)  	STREAM_VERIFY_SANE(s);  	if (STREAM_WRITEABLE(s) < sizeof(uint8_t)) { -		STREAM_BOUND_WARN(s, "put"); -		return 0; +		if (s->allow_expansion) { +			stream_expand(s, STREAM_EXPAND_SIZE(s, sizeof(uint8_t))); +		} else { +			STREAM_BOUND_WARN(s, "put"); +			return 0; +		}  	}  	s->data[s->endp++] = c; @@ -723,8 +776,12 @@ int stream_putw(struct stream *s, uint16_t w)  	STREAM_VERIFY_SANE(s);  	if (STREAM_WRITEABLE(s) < sizeof(uint16_t)) { -		STREAM_BOUND_WARN(s, "put"); -		return 0; +		if (s->allow_expansion) { +			stream_expand(s, STREAM_EXPAND_SIZE(s, sizeof(uint16_t))); +		} else { +			STREAM_BOUND_WARN(s, "put"); +			return 0; +		}  	}  	s->data[s->endp++] = (uint8_t)(w >> 8); @@ -739,8 +796,12 @@ int stream_put3(struct stream *s, uint32_t l)  	STREAM_VERIFY_SANE(s);  	if (STREAM_WRITEABLE(s) < 3) { -		STREAM_BOUND_WARN(s, "put"); -		return 0; +		if (s->allow_expansion) { +			stream_expand(s, STREAM_EXPAND_SIZE(s, 3)); +		} else { +			STREAM_BOUND_WARN(s, "put"); +			return 0; +		}  	}  	s->data[s->endp++] = (uint8_t)(l >> 16); @@ -756,8 +817,12 @@ int stream_putl(struct stream *s, uint32_t l)  	STREAM_VERIFY_SANE(s);  	if (STREAM_WRITEABLE(s) < sizeof(uint32_t)) { -		STREAM_BOUND_WARN(s, "put"); -		return 0; +		if (s->allow_expansion) { +			stream_expand(s, STREAM_EXPAND_SIZE(s, sizeof(uint32_t))); +		} else { +			STREAM_BOUND_WARN(s, "put"); +			return 0; +		}  	}  	s->data[s->endp++] = (uint8_t)(l >> 24); @@ -774,8 +839,12 @@ int stream_putq(struct stream *s, uint64_t q)  	STREAM_VERIFY_SANE(s);  	if (STREAM_WRITEABLE(s) < sizeof(uint64_t)) { -		STREAM_BOUND_WARN(s, "put quad"); -		return 0; +		if (s->allow_expansion) { +			stream_expand(s, STREAM_EXPAND_SIZE(s, sizeof(uint64_t))); +		} else { +			STREAM_BOUND_WARN(s, "put"); +			return 0; +		}  	}  	s->data[s->endp++] = (uint8_t)(q >> 56); @@ -896,7 +965,13 @@ int stream_put_ipv4(struct stream *s, uint32_t l)  	STREAM_VERIFY_SANE(s);  	if (STREAM_WRITEABLE(s) < sizeof(uint32_t)) { -		STREAM_BOUND_WARN(s, "put"); +		if (s->allow_expansion) { +			stream_expand(s, STREAM_EXPAND_SIZE(s, sizeof(uint32_t))); +		} else { +			STREAM_BOUND_WARN(s, "put"); +			return 0; +		} +  		return 0;  	}  	memcpy(s->data + s->endp, &l, sizeof(uint32_t)); @@ -911,8 +986,12 @@ int stream_put_in_addr(struct stream *s, const struct in_addr *addr)  	STREAM_VERIFY_SANE(s);  	if (STREAM_WRITEABLE(s) < sizeof(uint32_t)) { -		STREAM_BOUND_WARN(s, "put"); -		return 0; +		if (s->allow_expansion) { +			stream_expand(s, STREAM_EXPAND_SIZE(s, sizeof(uint32_t))); +		} else { +			STREAM_BOUND_WARN(s, "put"); +			return 0; +		}  	}  	memcpy(s->data + s->endp, addr, sizeof(uint32_t)); @@ -989,8 +1068,13 @@ int stream_put_prefix_addpath(struct stream *s, const struct prefix *p,  		psize_with_addpath = psize;  	if (STREAM_WRITEABLE(s) < (psize_with_addpath + sizeof(uint8_t))) { -		STREAM_BOUND_WARN(s, "put"); -		return 0; +		if (s->allow_expansion) { +			stream_expand(s, +				      STREAM_EXPAND_SIZE(s, psize_with_addpath + sizeof(uint8_t))); +		} else { +			STREAM_BOUND_WARN(s, "put"); +			return 0; +		}  	}  	if (addpath_capable) { @@ -1027,8 +1111,12 @@ int stream_put_labeled_prefix(struct stream *s, const struct prefix *p,  	psize_with_addpath = psize + (addpath_capable ? 4 : 0);  	if (STREAM_WRITEABLE(s) < (psize_with_addpath + 3)) { -		STREAM_BOUND_WARN(s, "put"); -		return 0; +		if (s->allow_expansion) { +			stream_expand(s, STREAM_EXPAND_SIZE(s, psize_with_addpath + 3)); +		} else { +			STREAM_BOUND_WARN(s, "put"); +			return 0; +		}  	}  	if (addpath_capable) { @@ -1167,8 +1255,12 @@ size_t stream_write(struct stream *s, const void *ptr, size_t size)  	STREAM_VERIFY_SANE(s);  	if (STREAM_WRITEABLE(s) < size) { -		STREAM_BOUND_WARN(s, "put"); -		return 0; +		if (s->allow_expansion) { +			stream_expand(s, STREAM_EXPAND_SIZE(s, size)); +		} else { +			STREAM_BOUND_WARN(s, "put"); +			return 0; +		}  	}  	memcpy(s->data + s->endp, ptr, size); diff --git a/lib/stream.h b/lib/stream.h index e48cedc613..437ba5aa74 100644 --- a/lib/stream.h +++ b/lib/stream.h @@ -95,7 +95,8 @@ struct stream {  	size_t getp;	       /* next get position */  	size_t endp;	       /* last valid data position */  	size_t size;	       /* size of data segment */ -	unsigned char data[];  /* data pointer */ +	bool allow_expansion;  /* whether stream can be expanded */ +	unsigned char *data;   /* data pointer */  };  /* First in first out queue structure. */ @@ -132,6 +133,7 @@ struct stream_fifo {   * q: quad (four words)   */  extern struct stream *stream_new(size_t); +extern struct stream *stream_new_expandable(size_t);  extern void stream_free(struct stream *);  /* Copy 'src' into 'dest', returns 'dest' */  extern struct stream *stream_copy(struct stream *dest,  | 
