1

我正在遍历数据集的行并将其插入到活动空间环境中(由 tibco 提供,它是一个内存数据库)。我就是这样做的。

有没有更快的方法来解决这个问题?

我正在考虑对行进行分区,然后将每个分区并行化,但我不知道这是否会使其更快。

System.Threading.Tasks.Parallel.ForEach(
    dataSet.Tables[0].Rows,
    currRow =>
    {
        var tuple = Com.Tibco.As.Space.Tuple.Create();

        for (int i = 0; i < currRow.Values.Length; i++)
        {
            if (currRow.Values[i] != null)
            {
                var k = ConvertToAny(currRow.Values[i].ToString());

                if (k.GetType().IsEquivalentTo(typeof(DateTime)))
                {
                    tuple.Put(dataSet.Tables[0].ColumnNames[i], (DateTime)k);
                }
                else if (k.GetType().IsEquivalentTo(typeof(double)))
                {
                    tuple.Put(dataSet.Tables[0].ColumnNames[i], (double)k);
                }
                else
                {
                    tuple.Put(dataSet.Tables[0].ColumnNames[i], k.ToString());
                }
            }
        }
        try
        {
            inSpace_.Put(tuple);
        }
        catch (Exception e)
        {
        }
    }
);

我正在考虑一次批量处理大约 1000 个,如果有人可以请帮助:(

编辑:

List tuplesToAdd = new List(); for (int i = 0; i < dataSet.Tables[0].Rows.Length; i++) { var tuple = Com.Tibco.As.Space.Tuple.Create();

            for (int j = 0; j < dataSet.Tables[0].Rows[i].Values.Length; j++)
            {
                if (dataSet.Tables[0].Rows[i].Values[j] != null)
                {
                    var k = ConvertToAny(dataSet.Tables[0].Rows[i].Values[j].ToString());
                    if (k is DateTime)
                    {
                        tuple.Put(dataSet.Tables[0].ColumnNames[j], (DateTime)k);
                    }
                    else if (k is Double)
                    {
                        tuple.Put(dataSet.Tables[0].ColumnNames[j], (Double)k);
                    }
                    else
                    {
                        tuple.Put(dataSet.Tables[0].ColumnNames[j], k.ToString());
                    }
                }
            }

            tuplesToAdd.Add(tuple);

            if (i % 100000 == 0 || i == dataSet.Tables[0].Rows.Length - 1)
            {

                ThreadStart TUPLE_WORKER = delegate
                {
                    inSpace_.PutAll(tuplesToAdd);
                };
                new Thread(TUPLE_WORKER).Start();
                tuplesToAdd.Clear();
            }
        }

这是我尝试做的新方法(通过批处理)

4

1 回答 1

1

我不确定,但看起来你可以避免ToString在转换代码中使用。也就是说,而不是:

var k = ConvertToAny(currRow.Values[i].ToString());

if (k.GetType().IsEquivalentTo(typeof(DateTime)))

可以换成...

var k = currRow.Values[i];
if (k is DateTime)
{
    tuple.Put(dataSet.Tables[0].ColumnNames[i], (DateTime)k);
}

这应该可以节省您转换为字符串然后返回的时间。

为回应评论而添加

首先,你ConvertToAny是不必要的。中的项目currRow.Values[i]已经是正确的类型。你只是不知道它是什么类型。除非您说它可能是 aDateTime或 a的字符串表示形式Double。例如,如果类型已经是双精度类型,则没有理由转换为字符串、解析然后再转换回来。也就是说,以下两段代码做同样的事情:

object o = 3.14;
var k = ConvertToAny(o.ToString());
if (k.GetType.IsEquivalentTo(typeof(double))

object o = 3.14;
if (o is double)

唯一的区别是第二个会快得多。

但是,如果你有

object o = "3.14";

并且您希望将其转换为double,那么您必须进行转换。

您的批处理代码必须在添加和更新时锁定列表。否则你会破坏它。我会建议:

lock (tuplesToAdd)
{
    tuplesToAdd.Add(tuple);
    if ((tuplesToAdd.Count % 10000) == 0)
    {
        // push them all to the database.
        inspace_.PutAll(tuplesToAdd);
        tuplesToAdd.Clear();
    }
}

当你全部完成时(即Parallel.Foreach完成):

if (tuplesToAdd.Count > 0)
{
    // push the remaining items
}

现在,如果您想避免在更新期间阻塞所有线程,您可以发挥一些创意。

首先,创建两个可以锁定的对象:

private object lockObject = new Object();
private object pushLock = new Object();

创建tuplesToAdd列表后立即创建。然后,当您要添加项目时:

Monitor.Enter(lockObject); // acquires the lock
tuplesToAdd.Add(tuple);
if (tuplesToAdd.Count == 100000)
{
    var tuplesToPush = tuplesToAdd;
    tuplesToAdd = new List<tuple>(10000);
    Monitor.Exit(lockObject);  // releases the lock so other threads can process
    lock (pushLock)  // prevent multiple threads from pushing at the same time
    {
        inspace_.PutAll(tuplesToPush);
    }
}
else
{
    Monitor.Exit(lockObject);
}

这样,当一个线程更新数据库时,其他线程可以在下一次填充列表。


在我仔细考虑之后,您甚至可能不需要对这项任务使用并行处理。很可能您的大部分时间都花在等待Put调用的线程上。使用单个线程对这些进行批处理并批量编写它们可能会比您的原始解决方案执行得更快。您决定的并行版本会更快,但我怀疑它会更快

于 2013-03-08T17:01:36.970 回答