diff options
| author | Quentin Young <qlyoung@cumulusnetworks.com> | 2018-01-22 14:23:55 -0500 | 
|---|---|---|
| committer | Quentin Young <qlyoung@cumulusnetworks.com> | 2018-01-24 15:30:50 -0500 | 
| commit | a45dc9742c3e4e504f66bf18b7bc9da3c218233a (patch) | |
| tree | 091042b0cbc6b0c7c46f2dfe6eda1ba5093f9dca /lib/frr_pthread.c | |
| parent | 872c4e980e45e7185530b76e9e8541fa2b6c9375 (diff) | |
lib: streamline frr_pthreads, add default loop
Some work on FRR's pthread wrapper.
* Provide a built-in way to synchronize thread startup
* Make utility functions take frr_pthread * instead of its integer ID
* Pass frr_pthread * as pthread start function argument
* Correct some comment styling
* Rename some variables to match naming conventions in the file
* Change parameter ordering in stop function prototype to follow the
  convention in the other functions
* Default new frr_pthreads to using a vanilla event loop
For the last point, the original goal when designing the implementation
of pthreads into FRR was to be able to use the thread.c event based
system inside pthreads. This code essentially encapuslates all the
thread.c functionality into an easy to use pthread out of the box.
Creating a new frr_pthread with a null attributes field will cause the
created frr_pthread to run a thread.c event loop. The upshot of this is
that it is now possible to safely run existing functions in a pthread in
roughly 3 lines of code. It also serves as an example / starting point
for others.
Signed-off-by: Quentin Young <qlyoung@cumulusnetworks.com>
Diffstat (limited to 'lib/frr_pthread.c')
| -rw-r--r-- | lib/frr_pthread.c | 231 | 
1 files changed, 160 insertions, 71 deletions
diff --git a/lib/frr_pthread.c b/lib/frr_pthread.c index de522e5ef9..72b47ae5c3 100644 --- a/lib/frr_pthread.c +++ b/lib/frr_pthread.c @@ -1,5 +1,5 @@  /* - * Utilities and interfaces for managing POSIX threads + * Utilities and interfaces for managing POSIX threads within FRR.   * Copyright (C) 2017  Cumulus Networks   *   * This program is free software; you can redistribute it and/or modify @@ -25,79 +25,99 @@  #include "memory.h"  #include "hash.h" -DEFINE_MTYPE_STATIC(LIB, FRR_PTHREAD, "FRR POSIX Thread"); +DEFINE_MTYPE(LIB, FRR_PTHREAD, "FRR POSIX Thread");  DEFINE_MTYPE(LIB, PTHREAD_PRIM, "POSIX synchronization primitives"); +/* id for next created pthread */  static unsigned int next_id = 0; -/* Hash table of all frr_pthreads along with synchronization primitive(s) and - * hash table callbacks. - * ------------------------------------------------------------------------ */ -static struct hash *pthread_table; -static pthread_mutex_t pthread_table_mtx = PTHREAD_MUTEX_INITIALIZER; +/* default frr_pthread start/stop routine prototypes */ +static void *fpt_run(void *arg); +static int fpt_halt(struct frr_pthread *fpt, void **res); -/* pthread_table->hash_cmp */ -static int pthread_table_hash_cmp(const void *value1, const void *value2) +/* default frr_pthread attributes */ +struct frr_pthread_attr frr_pthread_attr_default = { +	.id = 0, +	.start = fpt_run, +	.stop = fpt_halt, +	.name = "Anonymous", +}; + +/* hash table to keep track of all frr_pthreads */ +static struct hash *frr_pthread_hash; +static pthread_mutex_t frr_pthread_hash_mtx = PTHREAD_MUTEX_INITIALIZER; + +/* frr_pthread_hash->hash_cmp */ +static int frr_pthread_hash_cmp(const void *value1, const void *value2)  {  	const struct frr_pthread *tq1 = value1;  	const struct frr_pthread *tq2 = value2; -	return (tq1->id == tq2->id); +	return (tq1->attr.id == tq2->attr.id);  } -/* pthread_table->hash_key */ -static unsigned int pthread_table_hash_key(void *value) +/* frr_pthread_hash->hash_key */ +static unsigned int frr_pthread_hash_key(void *value)  { -	return ((struct frr_pthread *)value)->id; +	return ((struct frr_pthread *)value)->attr.id;  } +  /* ------------------------------------------------------------------------ */  void frr_pthread_init()  { -	pthread_mutex_lock(&pthread_table_mtx); +	pthread_mutex_lock(&frr_pthread_hash_mtx);  	{ -		pthread_table = hash_create(pthread_table_hash_key, -					    pthread_table_hash_cmp, NULL); +		frr_pthread_hash = hash_create(frr_pthread_hash_key, +					       frr_pthread_hash_cmp, NULL);  	} -	pthread_mutex_unlock(&pthread_table_mtx); +	pthread_mutex_unlock(&frr_pthread_hash_mtx);  }  void frr_pthread_finish()  { -	pthread_mutex_lock(&pthread_table_mtx); +	pthread_mutex_lock(&frr_pthread_hash_mtx);  	{ -		hash_clean(pthread_table, +		hash_clean(frr_pthread_hash,  			   (void (*)(void *))frr_pthread_destroy); -		hash_free(pthread_table); +		hash_free(frr_pthread_hash);  	} -	pthread_mutex_unlock(&pthread_table_mtx); +	pthread_mutex_unlock(&frr_pthread_hash_mtx);  } -struct frr_pthread *frr_pthread_new(const char *name, unsigned int id, -				    void *(*start_routine)(void *), -				    int (*stop_routine)(void **, -							struct frr_pthread *)) +struct frr_pthread *frr_pthread_new(struct frr_pthread_attr *attr)  {  	static struct frr_pthread holder = {0};  	struct frr_pthread *fpt = NULL; -	pthread_mutex_lock(&pthread_table_mtx); +	attr = attr ? attr : &frr_pthread_attr_default; + +	pthread_mutex_lock(&frr_pthread_hash_mtx);  	{ -		holder.id = id; - -		if (!hash_lookup(pthread_table, &holder)) { -			struct frr_pthread *fpt = XCALLOC( -				MTYPE_FRR_PTHREAD, sizeof(struct frr_pthread)); -			fpt->id = id; -			fpt->master = thread_master_create(name); -			fpt->start_routine = start_routine; -			fpt->stop_routine = stop_routine; -			fpt->name = XSTRDUP(MTYPE_FRR_PTHREAD, name); - -			hash_get(pthread_table, fpt, hash_alloc_intern); +		holder.attr.id = attr->id; + +		if (!hash_lookup(frr_pthread_hash, &holder)) { +			fpt = XCALLOC(MTYPE_FRR_PTHREAD, +				      sizeof(struct frr_pthread)); +			/* create new thread master */ +			fpt->master = thread_master_create(attr->name); +			/* set attributes */ +			fpt->attr = *attr; +			if (attr == &frr_pthread_attr_default) +				fpt->attr.id = frr_pthread_get_id(); +			/* initialize startup synchronization primitives */ +			fpt->running_cond_mtx = XCALLOC( +				MTYPE_PTHREAD_PRIM, sizeof(pthread_mutex_t)); +			fpt->running_cond = XCALLOC(MTYPE_PTHREAD_PRIM, +						    sizeof(pthread_cond_t)); +			pthread_mutex_init(fpt->running_cond_mtx, NULL); +			pthread_cond_init(fpt->running_cond, NULL); + +			/* insert into global thread hash */ +			hash_get(frr_pthread_hash, fpt, hash_alloc_intern);  		}  	} -	pthread_mutex_unlock(&pthread_table_mtx); +	pthread_mutex_unlock(&frr_pthread_hash_mtx);  	return fpt;  } @@ -105,7 +125,11 @@ struct frr_pthread *frr_pthread_new(const char *name, unsigned int id,  void frr_pthread_destroy(struct frr_pthread *fpt)  {  	thread_master_free(fpt->master); -	XFREE(MTYPE_FRR_PTHREAD, fpt->name); + +	pthread_mutex_destroy(fpt->running_cond_mtx); +	pthread_cond_destroy(fpt->running_cond); +	XFREE(MTYPE_PTHREAD_PRIM, fpt->running_cond_mtx); +	XFREE(MTYPE_PTHREAD_PRIM, fpt->running_cond);  	XFREE(MTYPE_FRR_PTHREAD, fpt);  } @@ -114,74 +138,82 @@ struct frr_pthread *frr_pthread_get(unsigned int id)  	static struct frr_pthread holder = {0};  	struct frr_pthread *fpt; -	pthread_mutex_lock(&pthread_table_mtx); +	pthread_mutex_lock(&frr_pthread_hash_mtx);  	{ -		holder.id = id; -		fpt = hash_lookup(pthread_table, &holder); +		holder.attr.id = id; +		fpt = hash_lookup(frr_pthread_hash, &holder);  	} -	pthread_mutex_unlock(&pthread_table_mtx); +	pthread_mutex_unlock(&frr_pthread_hash_mtx);  	return fpt;  } -int frr_pthread_run(unsigned int id, const pthread_attr_t *attr, void *arg) +int frr_pthread_run(struct frr_pthread *fpt, const pthread_attr_t *attr)  { -	struct frr_pthread *fpt = frr_pthread_get(id);  	int ret; -	if (!fpt) -		return -1; - -	ret = pthread_create(&fpt->thread, attr, fpt->start_routine, arg); +	ret = pthread_create(&fpt->thread, attr, fpt->attr.start, fpt); -	/* Per pthread_create(3), the contents of fpt->thread are undefined if -	 * pthread_create() did not succeed. Reset this value to zero. */ +	/* +	 * Per pthread_create(3), the contents of fpt->thread are undefined if +	 * pthread_create() did not succeed. Reset this value to zero. +	 */  	if (ret < 0)  		memset(&fpt->thread, 0x00, sizeof(fpt->thread));  	return ret;  } -/** - * Calls the stop routine for the frr_pthread and resets any relevant fields. - * - * @param fpt - the frr_pthread to stop - * @param result - pointer to result pointer - * @return the return code from the stop routine - */ -static int frr_pthread_stop_actual(struct frr_pthread *fpt, void **result) +void frr_pthread_wait_running(struct frr_pthread *fpt)  { -	int ret = (*fpt->stop_routine)(result, fpt); -	memset(&fpt->thread, 0x00, sizeof(fpt->thread)); -	return ret; +	pthread_mutex_lock(fpt->running_cond_mtx); +	{ +		while (!fpt->running) +			pthread_cond_wait(fpt->running_cond, +					  fpt->running_cond_mtx); +	} +	pthread_mutex_unlock(fpt->running_cond_mtx);  } -int frr_pthread_stop(unsigned int id, void **result) +void frr_pthread_notify_running(struct frr_pthread *fpt)  { -	struct frr_pthread *fpt = frr_pthread_get(id); -	return frr_pthread_stop_actual(fpt, result); +	pthread_mutex_lock(fpt->running_cond_mtx); +	{ +		fpt->running = true; +		pthread_cond_signal(fpt->running_cond); +	} +	pthread_mutex_unlock(fpt->running_cond_mtx);  } -/** +int frr_pthread_stop(struct frr_pthread *fpt, void **result) +{ +	int ret = (*fpt->attr.stop)(fpt, result); +	memset(&fpt->thread, 0x00, sizeof(fpt->thread)); +	return ret; +} + +/*   * Callback for hash_iterate to stop all frr_pthread's.   */  static void frr_pthread_stop_all_iter(struct hash_backet *hb, void *arg)  {  	struct frr_pthread *fpt = hb->data; -	frr_pthread_stop_actual(fpt, NULL); +	frr_pthread_stop(fpt, NULL);  }  void frr_pthread_stop_all()  { -	pthread_mutex_lock(&pthread_table_mtx); +	pthread_mutex_lock(&frr_pthread_hash_mtx);  	{ -		hash_iterate(pthread_table, frr_pthread_stop_all_iter, NULL); +		hash_iterate(frr_pthread_hash, frr_pthread_stop_all_iter, NULL);  	} -	pthread_mutex_unlock(&pthread_table_mtx); +	pthread_mutex_unlock(&frr_pthread_hash_mtx);  }  unsigned int frr_pthread_get_id()  { +	/* just a sanity check, this should never happen */ +	assert(next_id <= INT_MAX - 1);  	return next_id++;  } @@ -189,3 +221,60 @@ void frr_pthread_yield(void)  {  	(void)sched_yield();  } + +/* + * ---------------------------------------------------------------------------- + * Default Event Loop + * ---------------------------------------------------------------------------- + */ + +/* dummy task for sleeper pipe */ +static int fpt_dummy(struct thread *thread) +{ +	return 0; +} + +/* poison pill task to end event loop */ +static int fpt_finish(struct thread *thread) +{ +	struct frr_pthread *fpt = THREAD_ARG(thread); +	atomic_store_explicit(&fpt->running, false, memory_order_relaxed); +	return 0; +} + +/* stop function, called from other threads to halt this one */ +static int fpt_halt(struct frr_pthread *fpt, void **res) +{ +	thread_add_event(fpt->master, &fpt_finish, fpt, 0, NULL); +	pthread_join(fpt->thread, res); +	fpt = NULL; + +	return 0; +} + +/* entry pthread function & main event loop */ +static void *fpt_run(void *arg) +{ +	struct frr_pthread *fpt = arg; +	fpt->master->owner = pthread_self(); + +	int sleeper[2]; +	pipe(sleeper); +	thread_add_read(fpt->master, &fpt_dummy, NULL, sleeper[0], NULL); + +	fpt->master->handle_signals = false; + +	frr_pthread_notify_running(fpt); + +	struct thread task; +	while (atomic_load_explicit(&fpt->running, memory_order_relaxed)) { +		if (thread_fetch(fpt->master, &task)) { +			thread_call(&task); +		} +	} + +	close(sleeper[1]); +	close(sleeper[0]); + +	return NULL; +}  | 
