我必须将少量数据(~1k 字节)从thread A
所有线程threads B-1
发送到threads B-n
.
我当前的实现相当复杂:
用于GHashTable
将队列映射到thread id
s。通过和将所有threads B-x
内容置于等待状态。将一个指针推送到所有线程应该从每个队列接收的数据,通过广播更新(它们都使用相同的实例)。如果一个线程决定结束(即远程 DC),首先从 Queue 中移除 Queue ,清除队列并销毁内容。还有一些我遗漏的细节(比如竞争条件、等待块周围的中间 ref/unref)。GCond
g_cond_wait(_until)
thread A
g_cond_broadcast
GCond
GHashTable
这是一个理智的方法吗?我该如何改善这一点。它根本没有“感觉”有效率。
仅供参考,附上一些草稿代码:
typedef struct {
//TODO verify this is not too stupid
// if we use that mutex too often, all parallel foo is pointless
GMutex mutex;
GHashTable *hashmap; //full of queues
gint refs;
GDestroyNotify fx_ref;
GDestroyNotify fx_unref;
} Foo;
Foo *
foo_new (GDestroyNotify fx_ref, GDestroyNotify fx_unref)
{
Foo *foo;
foo = g_new0 (Foo, 1);
g_assert (foo);
g_mutex_init (&(foo->mutex));
foo->hashmap = g_hash_table_new_full ();
foo->refs = 1;
foo->fx_ref = fx_ref; //just asume this increases the refcount atomically
foo->fx_unref = fx_unref; //"" decreases ""
return foo;
}
void
foo_register_thread (Foo *obj, gint threadid)
{
AQueue *aq;
foo_lock (obj);
aq = a_queue_new ((GDestroyNotify)i_do_unref);
g_hash_table_insert (obj->hashmap, id, aq);
foo_unlock (obj);
}
void
foo_unregister_thread (Foo *obj, gint threadid)
{
AQueue *aq;
foo_lock (obj);
g_hash_table_remove (obj->hashmap, id);
// broadcast _after_ removing the queue from the hashtable,
// so the thread wakes up and quits its foo_thread_wait_until_ready call
g_cond_broadcast (obj->cond);
foo_unlock (obj);
// allow somebody to sneak in
foo_lock (obj);
a_queue_unref (aq)
foo_unlock (obj);
}
void
foo_enqueue (Foo *obj, gpointer data)
{
GHashTableIter iter;
gint key;
GAsyncQueue *queue;
//wave after wave, not wave intermixing
g_mutex_lock (&obj->mutex);
g_hash_table_iter_init (iter, obj->ht);
while (g_hash_table_iter_next (&iter, &id, &queue)) {
if (foo->fx_ref)
foo->fx_ref (data);
g_queue_push_tail (queue, data);
}
g_cond_broadcast (cond);
g_mutex_unlock (&obj->mutex);
}
gpointer
foo_thread_pop (Foo *obj, gint id)
{
AQueue *aq;
gpointer data = NULL;
g_return_val_if_fail (obj, NULL);
g_return_val_if_fail (id>0, NULL);
foo_lock (obj);
aq = g_hash_table_lookup (obj->hashmap, id);
if (aq) {
data = g_queue_pop_head ((GQueue*)aq);
}
foo_unlock (obj);
return data;
}
/**
* wait until the queue gets removed or until data is ready to be read
*/
gpointer
foo_thread_wait_until_ready (Foo *obj, gint id)
{
gpointer data = NULL;
AQueue *aq;
foo_lock (obj);
aq = (AQueue*)g_hash_table_lookup (obj->hashmap, id);
if (!aq)
return NULL;
// just in case stuff gets cleaned up in the meantime
a_queue_ref (aq);
while (g_queue_peek_head ((GQueue*)aq)==NULL) {
g_cond_wait_until (&(obj->cond), &(obj->mutex))
// make sure queue still exists, if not this means this thread is dying
if (g_hash_table_lookup (obj->hashmap, id) != (gpointer)aq)
break;
}
data = g_queue_pop_head ((GQueue*)aq);
a_queue_unref (aq);
foo_unlock (obj);
return data;
}
void
foo_destroy (Foo *obj)
{
g_return_if_fail (obj);
g_mutex_clear (&obj->mutex);
g_cond_clear (&obj->cond);
}
void
foo_unref (Foo *obj)
{
g_return_if_fail (obj);
if (g_atomic_int_dec_and_test (&obj->refs))
foo_destroy (obj);
}
void
foo_ref (Foo *obj)
{
g_return_if_fail (obj);
g_atomic_int_inc (&obj->refs);
}
void
foo_lock (Foo *obj)
{
g_return_if_fail (obj);
g_atomic_int_inc (&obj->refs);
g_mutex_lock (&obj->mutex);
}
void
foo_unlock (Foo *obj)
{
g_return_if_fail (obj);
g_mutex_unlock (&obj->mutex);
foo_unref (obj);
}