diff options
Diffstat (limited to 'flow/gsl/gsloputil.c')
-rw-r--r-- | flow/gsl/gsloputil.c | 721 |
1 files changed, 721 insertions, 0 deletions
diff --git a/flow/gsl/gsloputil.c b/flow/gsl/gsloputil.c new file mode 100644 index 0000000..9adce89 --- /dev/null +++ b/flow/gsl/gsloputil.c @@ -0,0 +1,721 @@ +/* GSL Engine - Flow module operation engine + * Copyright (C) 2001 Tim Janik + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License as published by the Free Software Foundation; either + * version 2 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General + * Public License along with this library; if not, write to the + * Free Software Foundation, Inc., 59 Temple Place, Suite 330, + * Boston, MA 02111-1307, USA. + */ +#include "gsloputil.h" + +#include "gslcommon.h" +#include "gslopnode.h" +#include "gslopschedule.h" +#include "gslsignal.h" +#include <string.h> +#include <unistd.h> +#include <fcntl.h> +#include <errno.h> +#include <math.h> + + +/* --- UserThread --- */ +GslOStream* +_engine_alloc_ostreams (guint n) +{ + if (n) + { + guint i = sizeof (GslOStream) * n + sizeof (gfloat) * gsl_engine_block_size () * n; + GslOStream *streams = gsl_alloc_memblock0 (i); + gfloat *buffers = (gfloat*) (streams + n); + + for (i = 0; i < n; i++) + { + streams[i].values = buffers; + buffers += gsl_engine_block_size (); + } + return streams; + } + else + return NULL; +} + +static void +free_node (EngineNode *node) +{ + guint j; + + g_return_if_fail (node != NULL); + g_return_if_fail (node->output_nodes == NULL); + g_return_if_fail (node->integrated == FALSE); + g_return_if_fail (node->sched_tag == FALSE); + g_return_if_fail (node->sched_router_tag == FALSE); + + if (node->module.klass->free) + node->module.klass->free (node->module.user_data, node->module.klass); + gsl_rec_mutex_destroy (&node->rec_mutex); + if (node->module.ostreams) + { + guint n = ENGINE_NODE_N_OSTREAMS (node); + guint i = sizeof (GslOStream) * n + sizeof (gfloat) * gsl_engine_block_size () * n; + + gsl_free_memblock (i, node->module.ostreams); + gsl_delete_structs (EngineOutput, ENGINE_NODE_N_OSTREAMS (node), node->outputs); + } + if (node->module.istreams) + { + gsl_delete_structs (GslIStream, ENGINE_NODE_N_ISTREAMS (node), node->module.istreams); + gsl_delete_structs (EngineInput, ENGINE_NODE_N_ISTREAMS (node), node->inputs); + } + for (j = 0; j < ENGINE_NODE_N_JSTREAMS (node); j++) + { + g_free (node->jinputs[j]); + g_free (node->module.jstreams[j].values); + } + if (node->module.jstreams) + { + gsl_delete_structs (GslJStream, ENGINE_NODE_N_JSTREAMS (node), node->module.jstreams); + gsl_delete_structs (EngineJInput*, ENGINE_NODE_N_JSTREAMS (node), node->jinputs); + } + gsl_delete_struct (EngineNode, node); +} + +static void +free_job (GslJob *job) +{ + g_return_if_fail (job != NULL); + + switch (job->job_id) + { + case ENGINE_JOB_ACCESS: + if (job->data.access.free_func) + job->data.access.free_func (job->data.access.data); + break; + case ENGINE_JOB_DEBUG: + g_free (job->data.debug); + break; + case ENGINE_JOB_ADD_POLL: + case ENGINE_JOB_REMOVE_POLL: + g_free (job->data.poll.fds); + if (job->data.poll.free_func) + job->data.poll.free_func (job->data.poll.data); + break; + case ENGINE_JOB_DISCARD: + free_node (job->data.node); + break; + default: ; + } + gsl_delete_struct (GslJob, job); +} + +static void +free_flow_job (EngineFlowJob *fjob) +{ + switch (fjob->fjob_id) + { + case ENGINE_FLOW_JOB_SUSPEND: + case ENGINE_FLOW_JOB_RESUME: + gsl_delete_struct (EngineFlowJobAny, &fjob->any); + break; + case ENGINE_FLOW_JOB_ACCESS: + if (fjob->access.free_func) + fjob->access.free_func (fjob->access.data); + gsl_delete_struct (EngineFlowJobAccess, &fjob->access); + break; + default: + g_assert_not_reached (); + } +} + +void +_engine_free_trans (GslTrans *trans) +{ + GslJob *job; + + g_return_if_fail (trans != NULL); + g_return_if_fail (trans->comitted == FALSE); + if (trans->jobs_tail) + g_return_if_fail (trans->jobs_tail->next == NULL); /* paranoid */ + + job = trans->jobs_head; + while (job) + { + GslJob *tmp = job->next; + + free_job (job); + job = tmp; + } + gsl_delete_struct (GslTrans, trans); +} + + +/* -- master node list --- */ +static EngineNode *master_node_list_head = NULL; +static EngineNode *master_node_list_tail = NULL; + +EngineNode* +_engine_mnl_head (void) +{ + return master_node_list_head; +} + +void +_engine_mnl_remove (EngineNode *node) +{ + g_return_if_fail (node->integrated == TRUE); + + node->integrated = FALSE; + /* remove */ + if (node->mnl_prev) + node->mnl_prev->mnl_next = node->mnl_next; + else + master_node_list_head = node->mnl_next; + if (node->mnl_next) + node->mnl_next->mnl_prev = node->mnl_prev; + else + master_node_list_tail = node->mnl_prev; + node->mnl_prev = NULL; + node->mnl_next = NULL; +} + +void +_engine_mnl_integrate (EngineNode *node) +{ + g_return_if_fail (node->integrated == FALSE); + g_return_if_fail (node->flow_jobs == NULL); + + node->integrated = TRUE; + /* append */ + if (master_node_list_tail) + master_node_list_tail->mnl_next = node; + node->mnl_prev = master_node_list_tail; + master_node_list_tail = node; + if (!master_node_list_head) + master_node_list_head = master_node_list_tail; + g_assert (node->mnl_next == NULL); +} + +void +_engine_mnl_reorder (EngineNode *node) +{ + EngineNode *sibling; + + g_return_if_fail (node->integrated == TRUE); + + /* the master node list is partially sorted. that is, all + * nodes which are not scheduled and have pending flow_jobs + * are agglomerated at the head. + */ + sibling = node->mnl_prev ? node->mnl_prev : node->mnl_next; + if (sibling && GSL_MNL_HEAD_NODE (node) != GSL_MNL_HEAD_NODE (sibling)) + { + /* remove */ + if (node->mnl_prev) + node->mnl_prev->mnl_next = node->mnl_next; + else + master_node_list_head = node->mnl_next; + if (node->mnl_next) + node->mnl_next->mnl_prev = node->mnl_prev; + else + master_node_list_tail = node->mnl_prev; + if (GSL_MNL_HEAD_NODE (node)) /* move towards head */ + { + /* prepend to non-NULL list */ + master_node_list_head->mnl_prev = node; + node->mnl_next = master_node_list_head; + master_node_list_head = node; + node->mnl_prev = NULL; + } + else /* move towards tail */ + { + /* append to non-NULL list */ + master_node_list_tail->mnl_next = node; + node->mnl_prev = master_node_list_tail; + master_node_list_tail = node; + node->mnl_next = NULL; + } + } +} + + +/* --- const value blocks --- */ +typedef struct +{ + guint n_nodes; + gfloat **nodes; + guint8 *nodes_used; +} ConstValuesArray; + +static const guint8 CONST_VALUES_EXPIRE = 16; /* expire value after being unused for 16 times */ + +static inline gfloat** +const_values_lookup_nextmost (ConstValuesArray *array, + gfloat key_value) +{ + guint n_nodes = array->n_nodes; + + if (n_nodes > 0) + { + gfloat **nodes = array->nodes; + gfloat **check; + + nodes -= 1; + do + { + guint i; + register gfloat cmp; + + i = (n_nodes + 1) >> 1; + check = nodes + i; + cmp = key_value - **check; + if (cmp > GSL_SIGNAL_EPSILON) + { + n_nodes -= i; + nodes = check; + } + else if (cmp < -GSL_SIGNAL_EPSILON) + n_nodes = i - 1; + else /* cmp ~==~ 0.0 */ + return check; /* matched */ + } + while (n_nodes); + + return check; /* nextmost */ + } + + return NULL; +} + +static inline guint +upper_power2 (guint number) +{ + return gsl_alloc_upper_power2 (MAX (number, 8)); +} + +static inline void +const_values_insert (ConstValuesArray *array, + guint index, + gfloat *value_block) +{ + if (array->n_nodes == 0) + { + guint new_size = upper_power2 (sizeof (gfloat*)); + + array->nodes = g_realloc (array->nodes, new_size); + array->nodes_used = g_realloc (array->nodes_used, new_size / sizeof (gfloat*)); + array->n_nodes = 1; + + g_assert (index == 0); + } + else + { + guint n_nodes = array->n_nodes++; + + if (*array->nodes[index] < *value_block) + index++; + + if (1) + { + guint new_size = upper_power2 (array->n_nodes * sizeof (gfloat*)); + guint old_size = upper_power2 (n_nodes * sizeof (gfloat*)); + + if (new_size != old_size) + { + array->nodes = g_realloc (array->nodes, new_size); + array->nodes_used = g_realloc (array->nodes_used, new_size / sizeof(gfloat*)); + } + } + g_memmove (array->nodes + index + 1, array->nodes + index, (n_nodes - index) * sizeof (array->nodes[0])); + g_memmove (array->nodes_used + index + 1, array->nodes_used + index, (n_nodes - index) * sizeof (array->nodes_used[0])); + } + + array->nodes[index] = value_block; + array->nodes_used[index] = CONST_VALUES_EXPIRE; +} + +static ConstValuesArray cvalue_array = { 0, NULL, NULL }; + +gfloat* +gsl_engine_const_values (gfloat value) +{ + extern const gfloat gsl_engine_master_zero_block[]; + gfloat **block; + + if (fabs (value) < GSL_SIGNAL_EPSILON) + return (gfloat*) gsl_engine_master_zero_block; + + block = const_values_lookup_nextmost (&cvalue_array, value); + + /* found correct match? */ + if (block && fabs (**block - value) < GSL_SIGNAL_EPSILON) + { + cvalue_array.nodes_used[block - cvalue_array.nodes] = CONST_VALUES_EXPIRE; + return *block; + } + else + { + /* create new value block */ + gfloat *values = g_new (gfloat, gsl_engine_block_size ()); + guint i; + + for (i = 0; i < gsl_engine_block_size (); i++) + values[i] = value; + + if (block) + const_values_insert (&cvalue_array, block - cvalue_array.nodes, values); + else + const_values_insert (&cvalue_array, 0, values); + + return values; + } +} + +void +_engine_recycle_const_values (void) +{ + gfloat **nodes = cvalue_array.nodes; + guint8 *used = cvalue_array.nodes_used; + guint count = cvalue_array.n_nodes, e = 0, i; + + for (i = 0; i < count; i++) + { + used[i]--; /* invariant: use counts are never 0 */ + + if (used[i] == 0) + g_free (nodes[i]); + else /* preserve node */ + { + if (e < i) + { + nodes[e] = nodes[i]; + used[e] = used[i]; + } + e++; + } + } + cvalue_array.n_nodes = e; +} + +/* --- job transactions --- */ +static GslMutex cqueue_trans = { 0, }; +static GslTrans *cqueue_trans_pending_head = NULL; +static GslTrans *cqueue_trans_pending_tail = NULL; +static GslCond cqueue_trans_cond = { 0, }; +static GslTrans *cqueue_trans_trash = NULL; +static GslTrans *cqueue_trans_active_head = NULL; +static GslTrans *cqueue_trans_active_tail = NULL; +static EngineFlowJob *cqueue_trash_fjobs = NULL; +static GslJob *cqueue_trans_job = NULL; + +void +_engine_enqueue_trans (GslTrans *trans) +{ + g_return_if_fail (trans != NULL); + g_return_if_fail (trans->comitted == TRUE); + g_return_if_fail (trans->jobs_head != NULL); + g_return_if_fail (trans->cqt_next == NULL); + + GSL_SPIN_LOCK (&cqueue_trans); + if (cqueue_trans_pending_tail) + { + cqueue_trans_pending_tail->cqt_next = trans; + cqueue_trans_pending_tail->jobs_tail->next = trans->jobs_head; + } + else + cqueue_trans_pending_head = trans; + cqueue_trans_pending_tail = trans; + GSL_SPIN_UNLOCK (&cqueue_trans); + gsl_cond_signal (&cqueue_trans_cond); +} + +void +_engine_wait_on_trans (void) +{ + GSL_SPIN_LOCK (&cqueue_trans); + while (cqueue_trans_pending_head || cqueue_trans_active_head) + gsl_cond_wait (&cqueue_trans_cond, &cqueue_trans); + GSL_SPIN_UNLOCK (&cqueue_trans); +} + +gboolean +_engine_job_pending (void) +{ + gboolean pending = cqueue_trans_job != NULL; + + if (!pending) + { + GSL_SPIN_LOCK (&cqueue_trans); + pending = cqueue_trans_pending_head != NULL; + GSL_SPIN_UNLOCK (&cqueue_trans); + } + return pending; +} + +GslJob* +_engine_pop_job (void) /* (glong max_useconds) */ +{ + /* clean up if necessary and try fetching new jobs */ + if (!cqueue_trans_job) + { + if (cqueue_trans_active_head) + { + GSL_SPIN_LOCK (&cqueue_trans); + /* get rid of processed transaction and + * signal UserThread which might be in + * op_com_wait_on_trans() + */ + cqueue_trans_active_tail->cqt_next = cqueue_trans_trash; + cqueue_trans_trash = cqueue_trans_active_head; + /* fetch new transaction */ + cqueue_trans_active_head = cqueue_trans_pending_head; + cqueue_trans_active_tail = cqueue_trans_pending_tail; + cqueue_trans_pending_head = NULL; + cqueue_trans_pending_tail = NULL; + GSL_SPIN_UNLOCK (&cqueue_trans); + gsl_cond_signal (&cqueue_trans_cond); + } + else + { + GSL_SPIN_LOCK (&cqueue_trans); + /* fetch new transaction */ + cqueue_trans_active_head = cqueue_trans_pending_head; + cqueue_trans_active_tail = cqueue_trans_pending_tail; + cqueue_trans_pending_head = NULL; + cqueue_trans_pending_tail = NULL; + GSL_SPIN_UNLOCK (&cqueue_trans); + } + cqueue_trans_job = cqueue_trans_active_head ? cqueue_trans_active_head->jobs_head : NULL; + } + + /* pick new job and out of here */ + if (cqueue_trans_job) + { + GslJob *job = cqueue_trans_job; + + cqueue_trans_job = job->next; + return job; + } + +#if 0 + /* wait until jobs are present */ + if (max_useconds != 0) + { + GSL_SPIN_LOCK (&cqueue_trans); + if (!cqueue_trans_pending_head) + gsl_cond_wait_timed (&cqueue_trans_cond, + &cqueue_trans, + max_useconds); + GSL_SPIN_UNLOCK (&cqueue_trans); + + /* there may be jobs now, start from scratch */ + return op_com_pop_job_timed (max_useconds < 0 ? -1 : 0); + } +#endif + + /* time expired, no jobs... */ + return NULL; +} + + +/* --- user thread garbage collection --- */ +/** + * gsl_engine_garbage_collect + * + * GSL Engine user thread function. Collects processed jobs + * and transactions from the engine and frees them, this + * involves callback invocation of GslFreeFunc() functions, + * e.g. from gsl_job_access() or gsl_flow_job_access() + * jobs. + * This function may only be called from the user thread, + * as GslFreeFunc() functions are guranteed to be executed + * in the user thread. + */ +void +gsl_engine_garbage_collect (void) +{ + GslTrans *trans; + EngineFlowJob *fjobs; + + GSL_SPIN_LOCK (&cqueue_trans); + trans = cqueue_trans_trash; + cqueue_trans_trash = NULL; + fjobs = cqueue_trash_fjobs; + cqueue_trash_fjobs = NULL; + GSL_SPIN_UNLOCK (&cqueue_trans); + + while (trans) + { + GslTrans *t = trans; + + trans = t->cqt_next; + t->cqt_next = NULL; + t->jobs_tail->next = NULL; + t->comitted = FALSE; + _engine_free_trans (t); + } + + while (fjobs) + { + EngineFlowJob *j = fjobs; + + fjobs = j->any.next; + j->any.next = NULL; + free_flow_job (j); + } +} + + +/* --- node processing queue --- */ +static GslMutex pqueue_mutex = { 0, }; +static EngineSchedule *pqueue_schedule = NULL; +static guint pqueue_n_nodes = 0; +static guint pqueue_n_cycles = 0; +static GslCond pqueue_done_cond = { 0, }; +static EngineFlowJob *pqueue_trash_fjobs_first = NULL; +static EngineFlowJob *pqueue_trash_fjobs_last = NULL; + +void +_engine_set_schedule (EngineSchedule *sched) +{ + g_return_if_fail (sched != NULL); + g_return_if_fail (sched->secured == TRUE); + + GSL_SPIN_LOCK (&pqueue_mutex); + if_reject (pqueue_schedule) + { + GSL_SPIN_UNLOCK (&pqueue_mutex); + g_warning (G_STRLOC ": schedule already set"); + return; + } + pqueue_schedule = sched; + sched->in_pqueue = TRUE; + GSL_SPIN_UNLOCK (&pqueue_mutex); +} + +void +_engine_unset_schedule (EngineSchedule *sched) +{ + EngineFlowJob *trash_fjobs_first, *trash_fjobs_last; + + g_return_if_fail (sched != NULL); + + GSL_SPIN_LOCK (&pqueue_mutex); + if_reject (pqueue_schedule != sched) + { + GSL_SPIN_UNLOCK (&pqueue_mutex); + g_warning (G_STRLOC ": schedule(%p) not currently set", sched); + return; + } + if_reject (pqueue_n_nodes || pqueue_n_cycles) + g_warning (G_STRLOC ": schedule(%p) still busy", sched); + + sched->in_pqueue = FALSE; + pqueue_schedule = NULL; + trash_fjobs_first = pqueue_trash_fjobs_first; + trash_fjobs_last = pqueue_trash_fjobs_last; + pqueue_trash_fjobs_first = NULL; + pqueue_trash_fjobs_last = NULL; + GSL_SPIN_UNLOCK (&pqueue_mutex); + + if (trash_fjobs_first) /* move trash flow jobs */ + { + GSL_SPIN_LOCK (&cqueue_trans); + trash_fjobs_last->any.next = cqueue_trash_fjobs; + cqueue_trash_fjobs = trash_fjobs_first; + GSL_SPIN_UNLOCK (&cqueue_trans); + } +} + +EngineNode* +_engine_pop_unprocessed_node (void) +{ + EngineNode *node; + + GSL_SPIN_LOCK (&pqueue_mutex); + node = pqueue_schedule ? _engine_schedule_pop_node (pqueue_schedule) : NULL; + if (node) + pqueue_n_nodes += 1; + GSL_SPIN_UNLOCK (&pqueue_mutex); + + if (node) + ENGINE_NODE_LOCK (node); + + return node; +} + +void +_engine_push_processed_node (EngineNode *node) +{ + g_return_if_fail (node != NULL); + g_return_if_fail (pqueue_n_nodes > 0); + g_return_if_fail (ENGINE_NODE_IS_SCHEDULED (node)); + + GSL_SPIN_LOCK (&pqueue_mutex); + g_assert (pqueue_n_nodes > 0); /* paranoid */ + if (node->fjob_first) /* collect trash flow jobs */ + { + node->fjob_last->any.next = pqueue_trash_fjobs_first; + pqueue_trash_fjobs_first = node->fjob_first; + if (!pqueue_trash_fjobs_last) + pqueue_trash_fjobs_last = node->fjob_last; + node->fjob_first = NULL; + node->fjob_last = NULL; + } + pqueue_n_nodes -= 1; + ENGINE_NODE_UNLOCK (node); + if (!pqueue_n_nodes && !pqueue_n_cycles && GSL_SCHEDULE_NONPOPABLE (pqueue_schedule)) + gsl_cond_signal (&pqueue_done_cond); + GSL_SPIN_UNLOCK (&pqueue_mutex); +} + +GslRing* +_engine_pop_unprocessed_cycle (void) +{ + return NULL; +} + +void +_engine_push_processed_cycle (GslRing *cycle) +{ + g_return_if_fail (cycle != NULL); + g_return_if_fail (pqueue_n_cycles > 0); + g_return_if_fail (ENGINE_NODE_IS_SCHEDULED (cycle->data)); +} + +void +_engine_wait_on_unprocessed (void) +{ + GSL_SPIN_LOCK (&pqueue_mutex); + while (pqueue_n_nodes || pqueue_n_cycles || !GSL_SCHEDULE_NONPOPABLE (pqueue_schedule)) + gsl_cond_wait (&pqueue_done_cond, &pqueue_mutex); + GSL_SPIN_UNLOCK (&pqueue_mutex); +} + + +/* --- initialization --- */ +void +_gsl_init_engine_utils (void) +{ + static gboolean initialized = FALSE; + + g_assert (initialized == FALSE); /* single invocation */ + initialized++; + + gsl_mutex_init (&cqueue_trans); + gsl_cond_init (&cqueue_trans_cond); + gsl_mutex_init (&pqueue_mutex); + gsl_cond_init (&pqueue_done_cond); +} + + +/* vim:set ts=8 sts=2 sw=2: */ |