#include <zebra.h>
#include <stddef.h>
+#include <pthread.h>
#include "stream.h"
#include "memory.h"
struct stream_fifo *new;
new = XCALLOC(MTYPE_STREAM_FIFO, sizeof(struct stream_fifo));
+ pthread_mutex_init(&new->mtx, NULL);
return new;
}
fifo->tail = s;
fifo->tail->next = NULL;
- fifo->count++;
+ atomic_fetch_add_explicit(&fifo->count, 1, memory_order_release);
+}
+
+void stream_fifo_push_safe(struct stream_fifo *fifo, struct stream *s)
+{
+ pthread_mutex_lock(&fifo->mtx);
+ {
+ stream_fifo_push(fifo, s);
+ }
+ pthread_mutex_unlock(&fifo->mtx);
}
/* Delete first stream from fifo. */
if (fifo->head == NULL)
fifo->tail = NULL;
- fifo->count--;
+ atomic_fetch_sub_explicit(&fifo->count, 1,
+ memory_order_release);
/* ensure stream is scrubbed of references to this fifo */
s->next = NULL;
return s;
}
-/* Return first fifo entry. */
+struct stream *stream_fifo_pop_safe(struct stream_fifo *fifo)
+{
+ struct stream *ret;
+
+ pthread_mutex_lock(&fifo->mtx);
+ {
+ ret = stream_fifo_pop(fifo);
+ }
+ pthread_mutex_unlock(&fifo->mtx);
+
+ return ret;
+}
+
struct stream *stream_fifo_head(struct stream_fifo *fifo)
{
return fifo->head;
}
+struct stream *stream_fifo_head_safe(struct stream_fifo *fifo)
+{
+ struct stream *ret;
+
+ pthread_mutex_lock(&fifo->mtx);
+ {
+ ret = stream_fifo_head(fifo);
+ }
+ pthread_mutex_unlock(&fifo->mtx);
+
+ return ret;
+}
+
void stream_fifo_clean(struct stream_fifo *fifo)
{
struct stream *s;
stream_free(s);
}
fifo->head = fifo->tail = NULL;
- fifo->count = 0;
+ atomic_store_explicit(&fifo->count, 0, memory_order_release);
+}
+
+void stream_fifo_clean_safe(struct stream_fifo *fifo)
+{
+ pthread_mutex_lock(&fifo->mtx);
+ {
+ stream_fifo_clean(fifo);
+ }
+ pthread_mutex_unlock(&fifo->mtx);
+}
+
+size_t stream_fifo_count_safe(struct stream_fifo *fifo)
+{
+ return atomic_load_explicit(&fifo->count, memory_order_acquire);
}
void stream_fifo_free(struct stream_fifo *fifo)
{
stream_fifo_clean(fifo);
+ pthread_mutex_destroy(&fifo->mtx);
XFREE(MTYPE_STREAM_FIFO, fifo);
}
#ifndef _ZEBRA_STREAM_H
#define _ZEBRA_STREAM_H
+#include <pthread.h>
+
+#include "frratomic.h"
#include "mpls.h"
#include "prefix.h"
/* First in first out queue structure. */
struct stream_fifo {
- size_t count;
+ /* lock for mt-safe operations */
+ pthread_mutex_t mtx;
+
+ /* number of streams in this fifo */
+ _Atomic size_t count;
struct stream *head;
struct stream *tail;
/* deprecated */
extern uint8_t *stream_pnt(struct stream *);
-/* Stream fifo. */
+/*
+ * Operations on struct stream_fifo.
+ *
+ * Each function has a safe variant, which ensures that the operation performed
+ * is atomic with respect to the operations performed by all other safe
+ * variants. In other words, the safe variants lock the stream_fifo's mutex
+ * before performing their action. These are provided for convenience when
+ * using stream_fifo in a multithreaded context, to alleviate the need for the
+ * caller to implement their own synchronization around the stream_fifo.
+ *
+ * The following functions do not have safe variants. The caller must ensure
+ * that these operations are performed safely in a multithreaded context:
+ * - stream_fifo_new
+ * - stream_fifo_free
+ */
+
+/*
+ * Create a new stream_fifo.
+ *
+ * Returns:
+ * newly created stream_fifo
+ */
extern struct stream_fifo *stream_fifo_new(void);
+
+/*
+ * Push a stream onto a stream_fifo.
+ *
+ * fifo
+ * the stream_fifo to push onto
+ *
+ * s
+ * the stream to push onto the stream_fifo
+ */
extern void stream_fifo_push(struct stream_fifo *fifo, struct stream *s);
+extern void stream_fifo_push_safe(struct stream_fifo *fifo, struct stream *s);
+
+/*
+ * Pop a stream off a stream_fifo.
+ *
+ * fifo
+ * the stream_fifo to pop from
+ *
+ * Returns:
+ * the next stream in the stream_fifo
+ */
extern struct stream *stream_fifo_pop(struct stream_fifo *fifo);
+extern struct stream *stream_fifo_pop_safe(struct stream_fifo *fifo);
+
+/*
+ * Retrieve the next stream from a stream_fifo without popping it.
+ *
+ * fifo
+ * the stream_fifo to operate on
+ *
+ * Returns:
+ * the next stream that would be returned from stream_fifo_pop
+ */
extern struct stream *stream_fifo_head(struct stream_fifo *fifo);
+extern struct stream *stream_fifo_head_safe(struct stream_fifo *fifo);
+
+/*
+ * Remove all streams from a stream_fifo.
+ *
+ * fifo
+ * the stream_fifo to clean
+ */
extern void stream_fifo_clean(struct stream_fifo *fifo);
+extern void stream_fifo_clean_safe(struct stream_fifo *fifo);
+
+/*
+ * Retrieve number of streams on a stream_fifo.
+ *
+ * fifo
+ * the stream_fifo to retrieve the count for
+ *
+ * Returns:
+ * the number of streams on the stream_fifo
+ */
+extern size_t stream_fifo_count_safe(struct stream_fifo *fifo);
+
+/*
+ * Free a stream_fifo.
+ *
+ * Calls stream_fifo_clean, then deinitializes the stream_fifo and frees it.
+ *
+ * fifo
+ * the stream_fifo to free
+ */
extern void stream_fifo_free(struct stream_fifo *fifo);
/* This is here because "<< 24" is particularly problematic in C.