1

我必须将少量数据(~1k 字节)从thread A所有线程threads B-1发送到threads B-n.

我当前的实现相当复杂:

用于GHashTable将队列映射到thread ids。通过和将所有threads B-x内容置于等待状态。将一个指针推送到所有线程应该从每个队列接收的数据,通过广播更新(它们都使用相同的实例)。如果一个线程决定结束(即远程 DC),首先从 Queue 中移除 Queue ,清除队列并销毁内容。还有一些我遗漏的细节(比如竞争条件、等待块周围的中间 ref/unref)。GCondg_cond_wait(_until)thread Ag_cond_broadcastGCondGHashTable

这是一个理智的方法吗?我该如何改善这一点。它根本没有“感觉”有效率。

仅供参考,附上一些草稿代码:

typedef struct {
    //TODO verify this is not too stupid
    // if we use that mutex too often, all parallel foo is pointless
    GMutex mutex;
    GHashTable *hashmap; //full of queues
    gint refs;
    GDestroyNotify fx_ref;
    GDestroyNotify fx_unref;
} Foo;


Foo *
foo_new (GDestroyNotify fx_ref, GDestroyNotify fx_unref)
{
    Foo *foo;

    foo = g_new0 (Foo, 1);
    g_assert (foo);
    g_mutex_init (&(foo->mutex));
    foo->hashmap = g_hash_table_new_full ();
    foo->refs = 1;
    foo->fx_ref = fx_ref; //just asume this increases the refcount atomically
    foo->fx_unref = fx_unref; //"" decreases ""
    return foo;
}

void
foo_register_thread (Foo *obj, gint threadid)
{
    AQueue *aq;

    foo_lock (obj);
    aq = a_queue_new ((GDestroyNotify)i_do_unref);

    g_hash_table_insert (obj->hashmap, id, aq);
    foo_unlock (obj);
}

void
foo_unregister_thread (Foo *obj, gint threadid)
{
    AQueue *aq;

    foo_lock (obj);
    g_hash_table_remove (obj->hashmap, id);
    // broadcast _after_ removing the queue from the hashtable,
    // so the thread wakes up and quits its foo_thread_wait_until_ready call
    g_cond_broadcast (obj->cond);
    foo_unlock (obj);
    // allow somebody to sneak in
    foo_lock (obj);
    a_queue_unref (aq)
    foo_unlock (obj);
}

void
foo_enqueue (Foo *obj, gpointer data)
{
    GHashTableIter iter;
    gint key;
    GAsyncQueue *queue;

    //wave after wave, not wave intermixing 
    g_mutex_lock (&obj->mutex);

    g_hash_table_iter_init (iter, obj->ht);
    while (g_hash_table_iter_next (&iter, &id, &queue)) {
        if (foo->fx_ref)
            foo->fx_ref (data);
        g_queue_push_tail (queue, data);
    }
    g_cond_broadcast (cond);

    g_mutex_unlock (&obj->mutex);
}


gpointer
foo_thread_pop (Foo *obj, gint id)
{
    AQueue *aq;
    gpointer data = NULL;

    g_return_val_if_fail (obj, NULL);
    g_return_val_if_fail (id>0, NULL);

    foo_lock (obj);
    aq = g_hash_table_lookup (obj->hashmap, id);
    if (aq) {
        data = g_queue_pop_head ((GQueue*)aq);
    }
    foo_unlock (obj);
    return data;
}


/**
 * wait until the queue gets removed or until data is ready to be read
 */
gpointer
foo_thread_wait_until_ready (Foo *obj, gint id)
{
    gpointer data = NULL;
    AQueue *aq;

    foo_lock (obj);
    aq = (AQueue*)g_hash_table_lookup (obj->hashmap, id);
    if (!aq)
        return NULL;

    // just in case stuff gets cleaned up in the meantime
    a_queue_ref (aq);


    while (g_queue_peek_head ((GQueue*)aq)==NULL) {
        g_cond_wait_until (&(obj->cond), &(obj->mutex))
        // make sure queue still exists, if not this means this thread is dying
        if (g_hash_table_lookup (obj->hashmap, id) != (gpointer)aq)
            break;
    }

    data = g_queue_pop_head ((GQueue*)aq);

    a_queue_unref (aq);

    foo_unlock (obj);

    return data;
}


void
foo_destroy (Foo *obj)
{
    g_return_if_fail (obj);
    g_mutex_clear (&obj->mutex);
    g_cond_clear (&obj->cond);
}

void
foo_unref (Foo *obj)
{
    g_return_if_fail (obj);
    if (g_atomic_int_dec_and_test (&obj->refs))
        foo_destroy (obj);
}

void
foo_ref (Foo *obj)
{
    g_return_if_fail (obj);
    g_atomic_int_inc (&obj->refs);
}

void
foo_lock (Foo *obj)
{
    g_return_if_fail (obj);
    g_atomic_int_inc (&obj->refs);
    g_mutex_lock (&obj->mutex);
}

void
foo_unlock (Foo *obj)
{
    g_return_if_fail (obj);
    g_mutex_unlock (&obj->mutex);
    foo_unref (obj);
}
4

1 回答 1

0

我用叉子做了类似的事情。以下是该算法的高级概要:

假设:每个线程不需要向父级广播回内存。可以按任何顺序处理每个请求。

  • 在多线程(分叉)之前,创建一个指向结构的指针数组(结构提供处理排队请求的所有必要信息)。分配一个以 0 作为初始值的共享的、互斥控制的整数。
  • 可以根据需要多次分叉。(我使用了逻辑 CPU 的数量 +1,因为父级大部分是空闲的)。

然后每个分叉如下:

  • 如果共享整数未锁定,则将其锁定并递增。解锁它。处理数组中对应于前一个增量值的元素。
  • 如果锁定,请稍后再试。
  • 如果该值大于全局请求数,则退出。

父级只是等待所有线程都完成。

于 2013-10-09T17:57:28.580 回答