我使用服务代理作为我的消息传递系统来安排和运行作业。Eash 作业由称为引擎的多个任务或步骤组成。
我的服务代理对象是:
- 消息类型:SubmitJob、JobResponse、SubmitTask、TaskResponse
- 合同:JobContract、TaskContract
- 队列:ClientQueue、JobQueue、EngineQueue、ExternalActivatorQueue
- 服务:ClientService、JobService、EngineService、ExternalActivatorService
- 事件通知:EventNotificationEngineQueue
我在作业队列上有内部激活(存储过程)。对于 SubmitJob 消息类型,存储过程获取该作业的第一个任务,启动与 EngineService 的对话并向该队列 (StartTask) 发送消息然后他们会被提交到 EngineQueue,如果没有,那么这个作业的任务就完成了(发送消息并清理。)
这一切似乎都很好。但是,我想要一个处理 EngineQueue 消息的外部应用程序(引擎)。所以我使用了微软的外部激活机制(ssbeas.exe)。花了很长时间,但我终于让它工作了。一条消息进入 EngineQueue,EventNotificationEngineQueue 启动我的应用程序并排空队列。到目前为止,一切都很好。但是,我的应用程序似乎运行了多次。我的测试应用程序配置为在完成时发送电子邮件。即使我只发送一项任务的作业,我也会收到多封电子邮件(表明程序运行了多次。)
这是我的应用程序(vb.net)的代码(代理是一个封装服务代理服务(发送、接收等)的对象:
While True
oBroker.tran = oBroker.cnn.BeginTransaction
oBroker.Receive("EMGQueue", msgType, msg, serviceInstance, dialogHandle)
If dialogHandle = System.Guid.Empty Then
'Console.WriteLine("An Error Occurred. Program Terminated.")
oBroker.tran.Commit()
Exit While
End If
ConsoleWriteLine("Received: " & msgType)
If (msg Is Nothing) Then
ConsoleWriteLine("commiting and exiting")
oBroker.tran.Commit()
Exit While
Else
Select Case (msgType)
Case "SubmitTask"
ProcessMsg(oBroker.cnn, oBroker.tran, msgType, msg, iTaskID, iTaskKey)
oBroker.Send(dialogHandle, "<TaskStatus>1</TaskStatus>'")
Case "http://schemas.microsoft.com/SQL/ServiceBroker/EndDialog"
oBroker.EndDialog(dialogHandle)
Case "http://schemas.microsoft.com/SQL/ServiceBroker/Error"""
oBroker.EndDialog(dialogHandle)
End Select
End If
ConsoleWriteLine("commiting...")
oBroker.tran.Commit()
End While
我不明白为什么该应用程序会多次运行,但除此之外,我不明白为什么后续版本仍然能够看到队列中的消息。毕竟,第一个化身应该已经将消息锁定在队列中。它确实锁定了队列,因为我能够使用查询管理器进行测试,以尝试在我的应用程序运行并且它被阻止时接收消息。
我尝试过使用 EAService.config 中的并发值。当我将它设置为 min="0" 和 max="1" 时,我确实将应用程序运行的次数减少到了两次以前,使用 min="0" 和 max="10",它正在运行它好像是18份。
我希望这是有道理的,并对长度感到抱歉。有人知道这里发生了什么吗?我的 .net 应用程序编码有误吗?
谢谢马丁
编辑:添加引擎运行后创建的日志:
2010-02-08 09:31:39 - 主要
2010-02-08 09:31:39 - 收到:提交任务
2010-02-08 09:31:39 - ProcessMsg
2010-02-08 09:31:39 - <Task><TaskID> 5</TaskID><TaskKey>2</TaskKey></Task>
2010 -02-08 09:31:39 - DoWork
2010-02-08 09:31:39 - 发送电子邮件
2010-02-08 09:31:40 - 提交...
2010-02-08 09:31:40 -睡眠
2010-02-08 09:32:10 - 睡眠完成。
2010-02-08 09:32:10 - 主要完成
2010-02-08 09:32:10 - 外部激活的应用程序成功并立即终止。
2010-02-08 09:32:10 - 主要
2010-02-08 09:32:10 - 收到:提交任务
2010-02-08 09:32:10 - ProcessMsg
2010-02-08 09:32:10 - <Task><TaskID> 5</TaskID><TaskKey> 2</TaskKey></Task>
2010 -02-08 09:32:10 - DoWork
2010-02-08 09:32:10 - 发送电子邮件
2010-02-08 09:32:10 - 提交...
2010-02-08 09:32:10 - 睡觉
2010-02-08 09:32:40 - 睡觉完成。
2010-02-08 09:32:40 - 主要完成
2010-02-08 09:32:40 - 外部激活的应用程序成功并立即终止。
你可以看到它通过了整个应用程序两次(主要,接收,dowork,sendemail,完成。)
编辑 2:这是在将作业提交到队列时激活的存储过程(调试语句和所有)的最新版本:
ALTER PROCEDURE [dbo].[pr_ProcessJob] AS BEGIN
DECLARE @message_type_name sysname
DECLARE @dialog uniqueidentifier
DECLARE @message_sequence_number bigint
DECLARE @error_message_sequence_number bigint
DECLARE @message_body xml
DECLARE @cgid uniqueidentifier
DECLARE @JobID int
DECLARE @Params varchar(MAX)
DECLARE @ErrorNumber bigint
DECLARE @ErrorText nvarchar(MAX)
DECLARE @TaskID int
DECLARE @TaskService varchar(100)
DECLARE @TaskKey int
DECLARE @chEngine uniqueidentifier
DECLARE @Step int
DECLARE @NextStep int
DECLARE @jobch uniqueidentifier
DECLARE @EngineMsg XML
DECLARE @TimeStarted datetime
DECLARE @TaskStatus int
-- This procedure will just sit in a loop processing Task messages in the queue
-- until the queue is empty
SET NOCOUNT ON
SET @error_message_sequence_number = -100
PRINT 'pr_ProcessJob: Start'
WHILE (1=1) BEGIN
BEGIN TRY
PRINT 'pr_ProcessJob: BEGIN TRANSACTION'
BEGIN TRANSACTION
-- first lets get the conversation group id for the next message.
WAITFOR (
GET CONVERSATION GROUP @cgid FROM [JobQueue]
), TIMEOUT 1000
IF (@@ROWCOUNT = 0) BEGIN
PRINT 'pr_ProcessJob: ROLLBACK TRANSACTION (GET CONVERSATION)'
ROLLBACK TRANSACTION
BREAK
END
PRINT @CGID
-- Inner Loop (Message Processing)
WHILE (1=1) BEGIN
-- Receive the next available message
PRINT 'Receiving Message.'
WAITFOR (
RECEIVE top(1) -- just handle one message at a time
@message_type_name=message_type_name, --the type of message received
@message_body=CAST(message_body AS XML), -- the message contents
@message_sequence_number=message_sequence_number,
@dialog = conversation_handle -- the identifier of the dialog this message was received on
FROM [JobQueue]
WHERE conversation_group_id=@cgid
), TIMEOUT 3000 -- if the queue is empty for three seconds, give up and go away
-- If we didn't get anything, the queue is empty so bail out
IF (@@ROWCOUNT = 0) BEGIN
PRINT 'pr_ProcessJob::WaitFor - No messages for conversation group bailing out'
BREAK
END --IF (@@ROWCOUNT = 0)
PRINT 'Message Received: ' + @message_type_name
SAVE TRANSACTION MessageReceivedSavePoint
-- Handle the End Conversation Message
IF (@message_type_name = 'http://schemas.microsoft.com/SQL/ServiceBroker/EndDialog') BEGIN
-- When we receive an End Dialog, we need to end also.
PRINT 'ENDING CONVERSATION'
END CONVERSATION @dialog
END -- IF (@message_type_name = 'http://schemas.microsoft.com/SQL/ServiceBroker/EndDialog') BEGIN
ELSE BEGIN
-- Handle the Conversation Error Message
IF (@message_type_name = 'http://schemas.microsoft.com/SQL/ServiceBroker/Error') BEGIN
-- We can't return anything here because the dialog at the other end is closed so just log
-- an error and close our end of the conversation.
PRINT 'ENDING CONVERSATION w/Error'
END CONVERSATION @dialog
END -- (@message_type_name = 'http://schemas.microsoft.com/SQL/ServiceBroker/Error')
ELSE BEGIN
IF (@message_type_name = 'SubmitJob') BEGIN -- Process normal Job messages..
PRINT 'pr_ProcessJob:: Message Type SubmitJob received.'
-- Pull the information out of the task message with XQuery
SELECT @JobID = @message_body.value('(/Job/JobID)[1]', 'int'),
@Params = @message_body.value('(/Job/Params)[1]', 'varchar(MAX)')
PRINT 'pr_ProcessJob::@JobID = ' + cast(@jobID as varchar(10))
PRINT 'pr_ProcessJob::@Params = ' + @Params
SELECT @ErrorNumber = 0, @ErrorText = N''
-- Do something with the job
-- save state
-- we are looking for the first step
SET @Step=1
PRINT 'Selecting from JobTask'
---------------------------------------------------------
-- Get the next task
---------------------------------------------------------
SELECT TOP 1
@TaskID=task.TaskID,
@TaskService=tt.TaskService,
@TaskKey =Task.TaskKey
FROM JobTask task INNER JOIN TaskType tt
ON task.TaskTypeID = tt.TaskTypeID
WHERE task.jobID=@JobID AND task.enabled=1 and task.step>=@step
ORDER BY Task.step
---------------------------------------------------------
PRINT 'Selecting from JobTask: complete'
PRINT 'Step='+cast(@step as varchar(max))
PRINT 'TaskID='+cast(@TaskID as varchar(max))
PRINT 'TaskService='+cast(@TaskService as varchar(max))
PRINT'TaskKey='+cast(@TaskKey as varchar(max))
PRINT 'BEGIN DIALOG with ' + @TaskService
BEGIN DIALOG @chEngine
FROM SERVICE [JobService]
TO SERVICE @TaskService
ON CONTRACT [TaskContract]
WITH RELATED_CONVERSATION=@dialog;
PRINT 'BEGIN DIALOG with ' + @TaskService+' completed.'
SET @EngineMsg = CAST('<Task><TaskID>'+ str(@TaskID)+'</TaskID><TaskKey>'+ str(@Taskkey)+'</TaskKey></Task>' as XML);
PRINT CAST(@EngineMsg as varchar(max))
PRINT 'Sending Message Type SubmitTask to Engine.';
SEND ON CONVERSATION @chEngine
MESSAGE TYPE SubmitTask
(@EngineMsg)
PRINT 'Inserting into jobstate'
INSERT INTO JobState(cgid, jobch, jobID, step) VALUES(@cgid, @dialog, @jobid, @step)
END -- IF (@message_type_name = 'SubmitJob')
ELSE BEGIN
IF (@message_type_name = 'TaskResponse') BEGIN
PRINT 'Processing MessageType TaskResponse'
SELECT @TaskStatus = @message_body.value('(/TaskStatus)[1]', 'int')
PRINT 'pr_ProcessJob::@TaskStatus = ' + cast(@TaskStatus as varchar(10))
PRINT 'Loading State'
--LoadState
SELECT @JobID=jobid,
@Step=Step,
@jobch=jobch,
@TimeStarted=sysdate
FROM Jobstate
WHERE cgid=@cgid
PRINT 'Loading State complete'
PRINT @jobch
PRINT 'Selecting from JobTask'
---------------------------------------------------------
-- Get the next task
---------------------------------------------------------
SELECT TOP 1
@TaskID=task.TaskID,
@TaskService=tt.TaskService,
@TaskKey =task.TaskKey,
@NextStep = task.Step
FROM JobTask task INNER JOIN TaskType tt
ON task.TaskTypeID = tt.TaskTypeID
WHERE task.jobID=@JobID AND task.enabled=1 and task.step>@step
ORDER BY Task.step
---------------------------------------------------------
PRINT 'Selecting from JobTask: complete'
PRINT 'NextTask: ['+@TaskService+']'
if (@TaskService is null) BEGIN
PRINT '@TaskService is NULL: BEGIN'
-- no more tasks
--END CONVERSATION @jobch
PRINT 'Removing from state table'
DELETE FROM JobState
WHERE @cgid=cgid
PRINT @@ROWCOUNT
PRINT 'Removing from state table-completed'
DECLARE @ResponseDoc xml
-- Send a response message saying we're done
DECLARE @Time nvarchar(100)
SET @Time = cast(getdate() as nvarchar(100))
DECLARE @TimeStartedText nvarchar(100)
SET @TimeStartedText = cast(@TimeStarted as nvarchar(100))
SET @ResponseDoc = N'<Job/>'
SET @ResponseDoc.modify(
'insert (<JobID>{ sql:variable("@JobID") }</JobID>,
<JobStatus>{ sql:variable("@ErrorNumber") }</JobStatus>,
<ErrorNumber>{ sql:variable("@ErrorNumber") }</ErrorNumber>,
<ErrorText>{ sql:variable("@ErrorText") }</ErrorText>,
<TimeStarted>{ sql:variable("@TimeStartedText") }</TimeStarted>,
<TimeCompleted>{ sql:variable("@Time") }</TimeCompleted>)
as last into /Job [1]');
SEND ON CONVERSATION @jobch
MESSAGE TYPE [JobResponse] (@ResponseDoc)
END CONVERSATION @jobch
PRINT '@TaskService is NULL: END'
END --if (@TaskService is null) BEGIN
ELSE BEGIN
-- there are more tasks
PRINT '@TaskService is not null: BEGIN'
PRINT 'BEGIN DIALOG with ' + @TaskService
--another task
BEGIN DIALOG @chEngine
FROM SERVICE [JobService]
TO SERVICE @TaskService
ON CONTRACT [TaskContract]
WITH RELATED_CONVERSATION=@dialog;
SET @EngineMsg = CAST('<Task><TaskID>'+ str(@TaskID)+'</TaskID><TaskKey>'+ str(@Taskkey)+'</TaskKey></Task>' as XML);
PRINT 'SEND ' +cast(@EngineMsg as varchar(max));
SEND ON CONVERSATION @chEngine
MESSAGE TYPE SubmitTask (@EngineMsg)
PRINT 'SAVING State: ' +str(@step)
-- save state
Update JobState
SET step = @NextStep
FROM JobState
WHERE cgid=@cgid
PRINT '@TaskService is not null: END'
END -- ELSE (@TaskService is NOT NULL)
PRINT 'Processing MessageType TaskResponse...Complete'
END -- IF (@message_type_name = 'TaskCompleted')
END -- ELSE IF (@message_type_name <> 'JobRequest')
END -- ELSE (@message_type_name <> 'http://schemas.microsoft.com/SQL/ServiceBroker/Error')
END -- ELSE (@message_type_name <> 'http://schemas.microsoft.com/SQL/ServiceBroker/EndDialog')
END -- WHILE (1=1) BEGIN
PRINT 'COMMIT TRANSACTION'
COMMIT TRANSACTION
END TRY
BEGIN CATCH
--rollback transaction
DECLARE @ErrNum int
DECLARE @ErrMsg varchar(max)
SELECT
ERROR_NUMBER() AS ErrorNumber
,ERROR_SEVERITY() AS ErrorSeverity
,ERROR_STATE() AS ErrorState
,ERROR_PROCEDURE() AS ErrorProcedure
,ERROR_LINE() AS ErrorLine
,ERROR_MESSAGE() AS ErrorMessage;
PRINT 'pr_ProcessJob: ROLLBACK (CATCH)'
if (error_number()=1205) BEGIN
-- a deadlock occurred. We can try it again.
PRINT 'pr_ProcessJob: ROLLBACK TRANSACTION (CATCH)'
ROLLBACK TRANSACTION
--CONTINUE
END --if (error_number()=1205)
ELSE BEGIN
if (error_number()=9617) BEGIN
PRINT 'pr_ProcessJob: ROLLBACK TRANSACTION (CATCH)'
ROLLBACK TRANSACTION
END
ELSE BEGIN -- (error_number()<>9617)
-- another error occurred. The message cant be procesed sucessfully
PRINT 'pr_ProcessJob: ROLLBACK TRANSACTION to MessageReceivedSavePoint (CATCH)'
ROLLBACK TRANSACTION MessageReceivedSavePoint
END --ELSE (error_number()<>9617)
END -- if (error_number()<>1205)
END CATCH
END -- while loop
PRINT 'pr_ProcessJob: Complete'
END -- CREATE PROCEDURE [dbo].[ProcessJobProc]