diff options
| author | Quentin Young <qlyoung@cumulusnetworks.com> | 2018-04-21 16:51:54 -0400 |
|---|---|---|
| committer | Quentin Young <qlyoung@cumulusnetworks.com> | 2018-05-07 11:37:07 -0400 |
| commit | 363e24c65140d07c2ced580698bd1427c6b99a55 (patch) | |
| tree | 9fa78138b63b883535e8b1a8f099d7fd721eb17a /lib/stream.c | |
| parent | e8f95403e4b4d55c605ef49a0d772f30834134a4 (diff) | |
lib: add mt-safe variants for stream_fifo ops
stream_fifo is used as our standard internal message queue. Message
queues are useful in multithreaded environments. Up until now I have
been doing my own synchronization when using stream_fifo in this way;
this patch gets rid of the need for that boilerplate and decreases the
risk of locking mistakes when working with this datastructure.
Signed-off-by: Quentin Young <qlyoung@cumulusnetworks.com>
Diffstat (limited to 'lib/stream.c')
| -rw-r--r-- | lib/stream.c | 60 |
1 files changed, 56 insertions, 4 deletions
diff --git a/lib/stream.c b/lib/stream.c index c4edd3d5bf..aba4c20166 100644 --- a/lib/stream.c +++ b/lib/stream.c @@ -21,6 +21,7 @@ #include <zebra.h> #include <stddef.h> +#include <pthread.h> #include "stream.h" #include "memory.h" @@ -1101,6 +1102,7 @@ struct stream_fifo *stream_fifo_new(void) struct stream_fifo *new; new = XCALLOC(MTYPE_STREAM_FIFO, sizeof(struct stream_fifo)); + pthread_mutex_init(&new->mtx, NULL); return new; } @@ -1115,7 +1117,16 @@ void stream_fifo_push(struct stream_fifo *fifo, struct stream *s) fifo->tail = s; fifo->tail->next = NULL; - fifo->count++; + atomic_fetch_add_explicit(&fifo->count, 1, memory_order_release); +} + +void stream_fifo_push_safe(struct stream_fifo *fifo, struct stream *s) +{ + pthread_mutex_lock(&fifo->mtx); + { + stream_fifo_push(fifo, s); + } + pthread_mutex_unlock(&fifo->mtx); } /* Delete first stream from fifo. */ @@ -1131,7 +1142,8 @@ struct stream *stream_fifo_pop(struct stream_fifo *fifo) if (fifo->head == NULL) fifo->tail = NULL; - fifo->count--; + atomic_fetch_sub_explicit(&fifo->count, 1, + memory_order_release); /* ensure stream is scrubbed of references to this fifo */ s->next = NULL; @@ -1140,12 +1152,37 @@ struct stream *stream_fifo_pop(struct stream_fifo *fifo) return s; } -/* Return first fifo entry. */ +struct stream *stream_fifo_pop_safe(struct stream_fifo *fifo) +{ + struct stream *ret; + + pthread_mutex_lock(&fifo->mtx); + { + ret = stream_fifo_pop(fifo); + } + pthread_mutex_unlock(&fifo->mtx); + + return ret; +} + struct stream *stream_fifo_head(struct stream_fifo *fifo) { return fifo->head; } +struct stream *stream_fifo_head_safe(struct stream_fifo *fifo) +{ + struct stream *ret; + + pthread_mutex_lock(&fifo->mtx); + { + ret = stream_fifo_head(fifo); + } + pthread_mutex_unlock(&fifo->mtx); + + return ret; +} + void stream_fifo_clean(struct stream_fifo *fifo) { struct stream *s; @@ -1156,11 +1193,26 @@ void stream_fifo_clean(struct stream_fifo *fifo) stream_free(s); } fifo->head = fifo->tail = NULL; - fifo->count = 0; + atomic_store_explicit(&fifo->count, 0, memory_order_release); +} + +void stream_fifo_clean_safe(struct stream_fifo *fifo) +{ + pthread_mutex_lock(&fifo->mtx); + { + stream_fifo_clean(fifo); + } + pthread_mutex_unlock(&fifo->mtx); +} + +size_t stream_fifo_count_safe(struct stream_fifo *fifo) +{ + return atomic_load_explicit(&fifo->count, memory_order_acquire); } void stream_fifo_free(struct stream_fifo *fifo) { stream_fifo_clean(fifo); + pthread_mutex_destroy(&fifo->mtx); XFREE(MTYPE_STREAM_FIFO, fifo); } |
