我正在使用 LINQ 将两个数据集相互比较以创建新行并更新现有行。我注意到完整的比较持续了大约 1.5 小时,并且两个核心中只有一个处于忙碌状态(任务管理器的 CPU 使用率为 50-52%)。我必须承认我对并行 LINQ 完全陌生,但我认为它可以显着提高性能。
所以我的问题是,我应该如何以及什么并行化?
这些是原始查询(简化为要点):
'check for new data
Dim srcUnique = From row In src.Email_Total
Select Ticket_ID = row.ticket_id, Interaction = row.interaction, ModifiedAt = row.modified_time
Dim destUnique = From row In dest.ContactDetail
Where row.ContactRow.fiContactType = emailContactType.idContactType
Select row.ContactRow.Ticket_ID, row.Interaction, row.ModifiedAt
'get all emails(contactdetails) that are in source but not in destination
Dim diffRows = srcUnique.Except(destUnique).ToList
'get all new emails(according to ticket_id) for calculating contact columns
Dim newRowsTickets = (From row In src.Email_Total
Join d In diffRows
On row.ticket_id Equals d.Ticket_ID _
And row.interaction Equals d.Interaction _
And row.modified_time Equals d.ModifiedAt
Group row By Ticket_ID = row.ticket_id Into NewTicketRows = Group).ToList
For Each ticket In newRowsTickets
Dim contact = dest.Contact.FindByTicket_IDfiContactType(ticket.Ticket_ID, emailContactType.idContactType)
If contact Is Nothing Then
' Create new Contact with many sub-queries on this ticket(omitted) ****'
Dim newContact = Me.dest.Contact.NewContactRow
dest.Contact.AddContactRow(newContact)
contact = newContact
Else
' Update Contact with many sub-queries on this ticket(omitted) '
End If
daContact.Update(dest.Contact)
' Add new ContactDetail-Rows from this Ticket(this is the counterpart of the src.Email_Total-Rows, details omitted) '
For Each newRow In ticket.NewTicketRows
Dim newContactDetail = dest.ContactDetail.NewContactDetailRow
newContactDetail.ContactRow = contact
dest.ContactDetail.AddContactDetailRow(newContactDetail)
Next
daContactDetails.Update(dest.ContactDetail)
Next
注意: daContact
and daContactDetails
are SqlDataAdapters
, source
and dest
areDataSets
和Contact
and ContactDetail
are DataTables
, 其中每个 ContactDetail 都属于一个联系人。
即使不是两个核心都使用 100% CPU,我认为如果我并行化查询会显着提高性能,因为第二个核心几乎是空闲的。这for each
也可能是一个优化的好地方,因为票证彼此不相关。所以我假设我可以循环使用多个线程并并行创建/更新记录。但是如何用 PLINQ 做到这一点?
旁注:正如我在评论中提到的那样,到目前为止,性能对我来说并不是一个关键因素,因为服务器的唯一目的是将 MySQL 数据库(在另一台服务器上)与 MS SQL 服务器(在同一台服务器上)同步作为这个 Windows 服务)。它充当由其他服务生成的报告的来源。但这些报告每天只生成一次。但除此之外,我对学习 PLINQ 很感兴趣,因为我认为这可能是一个很好的练习。仅当目标数据库为空并且必须创建所有记录时才需要提到的 1,5h。如果两个数据库几乎同步,则此方法只需约 1 分钟。在未来的表现将变得更加重要,因为电子邮件只是几种联系类型中的一种(聊天+通话将超过 100 万条记录)。我认为无论如何我都需要某种(LINQ)数据分页。
如果有不清楚的地方,我会相应地更新我的答案。提前致谢。
编辑:这是我的调查和尝试的结果:
问题:如何使用连接“PLINQ”现有的 LINQ 查询?
答:请注意,一些 LINQ 运算符是二进制的——它们将两个 IEnumerables 作为输入。Join 是这种运算符的完美示例。在这些情况下,最左侧数据源的类型决定了使用 LINQ 还是 PLINQ。因此,您只需在第一个数据源上调用 AsParallel 即可让您的查询并行运行:
IEnumerable<T> leftData = ..., rightData = ...;
var q = from x in leftData.AsParallel()
join y in rightData on x.a == y.b
select f(x, y);
但是,如果我以下列方式更改我的查询(注意AsParallel
):
Dim newRowsTickets = (From row In src.Email_Total.AsParallel()
Join d In diffRows
On row.ticket_id Equals d.Ticket_ID _
And row.interaction Equals d.Interaction _
And row.modified_time Equals d.ModifiedAt
Group row By Ticket_ID = row.ticket_id Into NewTicketRows = Group).ToList
编译器会抱怨我也需要添加AsParallel
到正确的数据源。所以这似乎是一个 VB.NET 问题或缺乏文档(文章来自 2007 年)。我假设是后者,因为(除了那篇值得推荐的)文章还说您需要System.Concurrency.dll
手动添加,但实际上它是 .NET 4.0 Framework 和 Namespace 的一部分Sytem.Threading.Tasks
。
我意识到我不会从并行化中受益,Except
因为查询在顺序模式下足够快(即使两个集合中的行数几乎相同,这会导致最大比较次数,我在不到 30 秒内就得到了结果)。但为了完整起见,我稍后会添加它。
因此,我决定将for-each
与 LINQ 查询一样简单的东西并行化,您只需AsParallel()
在最后添加即可。但我意识到我需要强制使用 .NET 的并行性WithExecutionMode(ParallelExecutionMode.ForceParallelism)
,否则 .NET 决定在这个循环中只使用一个核心。我还想告诉 .NET 我希望使用尽可能多的线程,但不超过 8 个:WithDegreeOfParallelism(8).
现在两个内核同时工作,但 CPU 使用率保持在 54%。
所以这是迄今为止的 PLINQ 版本:
Dim diffRows = srcUnique.AsParallel.Except(destUnique.AsParallel).ToList
Dim newRowsTickets = (From row In src.Email_Total.AsParallel()
Join d In diffRows.AsParallel()
On row.ticket_id Equals d.Ticket_ID _
And row.interaction Equals d.Interaction _
And row.modified_time Equals d.ModifiedAt
Group row By Ticket_ID = row.ticket_id Into NewTicketRows = Group).ToList
For Each ticket In newRowsTickets.
AsParallel().
WithDegreeOfParallelism(8).
WithExecutionMode(ParallelExecutionMode.ForceParallelism)
' blah,blah ... '
'add new ContactDetails for this Ticket(only new rows)
For Each newRow In ticket.NewTicketRows.
AsParallel().
WithExecutionMode(ParallelExecutionMode.Default)
' blah,blah ... '
Next
daContactDetails.Update(dest.ContactDetail)
Next
AsParallel
不幸的是,与顺序模式相比,我没有看到任何性能优势:
与for each
(AsParallel
hh:mm:ss.mm):
09/29/2011 18:54:36: Contacts/ContactDetails created or modified. Duration: 01:21:34.40
并且没有:
09/29/2011 16:02:55: Contacts/ContactDetails created or modified. Duration: 01:21:24.50
有人可以解释一下这个结果吗?数据库的写访问是否for each
负责类似的时间?
以下是推荐阅读: