1

我使用服务代理作为我的消息传递系统来安排和运行作业。Eash 作业由称为引擎的多个任务或步骤组成。

我的服务代理对象是:

  • 消息类型:SubmitJob、JobResponse、SubmitTask、TaskResponse
  • 合同:JobContract、TaskContract
  • 队列:ClientQueue、JobQueue、EngineQueue、ExternalActivatorQueue
  • 服务:ClientService、JobService、EngineService、ExternalActivatorService
  • 事件通知:EventNotificationEngineQueue

我在作业队列上有内部激活(存储过程)。对于 SubmitJob 消息类型,存储过程获取该作业的第一个任务,启动与 EngineService 的对话并向该队列 (StartTask) 发送消息然后他们会被提交到 EngineQueue,如果没有,那么这个作业的任务就完成了(发送消息并清理。)

这一切似乎都很好。但是,我想要一个处理 EngineQueue 消息的外部应用程序(引擎)。所以我使用了微软的外部激活机制(ssbeas.exe)。花了很长时间,但我终于让它工作了。一条消息进入 EngineQueue,EventNotificationEngineQueue 启动我的应用程序并排空队列。到目前为止,一切都很好。但是,我的应用程序似乎运行了多次。我的测试应用程序配置为在完成时发送电子邮件。即使我只发送一项任务的作业,我也会收到多封电子邮件(表明程序运行了多次。)

这是我的应用程序(vb.net)的代码(代理是一个封装服务代理服务(发送、接收等)的对象:

While True

            oBroker.tran = oBroker.cnn.BeginTransaction

            oBroker.Receive("EMGQueue", msgType, msg, serviceInstance, dialogHandle)

            If dialogHandle = System.Guid.Empty Then
                'Console.WriteLine("An Error Occurred. Program Terminated.")
                oBroker.tran.Commit()
                Exit While
            End If

            ConsoleWriteLine("Received: " & msgType)

            If (msg Is Nothing) Then
                ConsoleWriteLine("commiting and exiting")
                oBroker.tran.Commit()
                Exit While

            Else
                Select Case (msgType)
                    Case "SubmitTask"
                        ProcessMsg(oBroker.cnn, oBroker.tran, msgType, msg, iTaskID, iTaskKey)
                        oBroker.Send(dialogHandle, "<TaskStatus>1</TaskStatus>'")

                    Case "http://schemas.microsoft.com/SQL/ServiceBroker/EndDialog"
                        oBroker.EndDialog(dialogHandle)

                    Case "http://schemas.microsoft.com/SQL/ServiceBroker/Error"""
                        oBroker.EndDialog(dialogHandle)

                End Select
            End If


            ConsoleWriteLine("commiting...")
            oBroker.tran.Commit()

        End While

我不明白为什么该应用程序会多次运行,但除此之外,我不明白为什么后续版本仍然能够看到队列中的消息。毕竟,第一个化身应该已经将消息锁定在队列中。它确实锁定了队列,因为我能够使用查询管理器进行测试,以尝试在我的应用程序运行并且它被阻止时接收消息。

我尝试过使用 EAService.config 中的并发值。当我将它设置为 min="0" 和 max="1" 时,我确实将应用程序运行的次数减少到了两次以前,使用 min="0" 和 max="10",它正在运行它好像是18份。

我希望这是有道理的,并对长度感到抱歉。有人知道这里发生了什么吗?我的 .net 应用程序编码有误吗?

谢谢马丁

编辑:添加引擎运行后创建的日志:

2010-02-08 09:31:39 - 主要
2010-02-08 09:31:39 - 收到:提交任务
2010-02-08 09:31:39 - ProcessMsg
2010-02-08 09:31:39 - <Task><TaskID> 5</TaskID><TaskKey>2</TaskKey></Task>
2010 -02-08 09:31:39 - DoWork
2010-02-08 09:31:39 - 发送电子邮件
2010-02-08 09:31:40 - 提交...
2010-02-08 09:31:40 -睡眠
2010-02-08 09:32:10 - 睡眠完成。
2010-02-08 09:32:10 - 主要完成
2010-02-08 09:32:10 - 外部激活的应用程序成功并立即终止。
2010-02-08 09:32:10 - 主要
2010-02-08 09:32:10 - 收到:提交任务
2010-02-08 09:32:10 - ProcessMsg
2010-02-08 09:32:10 - <Task><TaskID> 5</TaskID><TaskKey> 2</TaskKey></Task>
2010 -02-08 09:32:10 - DoWork
2010-02-08 09:32:10 - 发送电子邮件
2010-02-08 09:32:10 - 提交...
2010-02-08 09:32:10 - 睡觉
2010-02-08 09:32:40 - 睡觉完成。
2010-02-08 09:32:40 - 主要完成
2010-02-08 09:32:40 - 外部激活的应用程序成功并立即终止。

你可以看到它通过了整个应用程序两次(主要,接收,dowork,sendemail,完成。)

编辑 2:这是在将作业提交到队列时激活的存储过程(调试语句和所有)的最新版本:

ALTER PROCEDURE [dbo].[pr_ProcessJob] AS BEGIN

    DECLARE     @message_type_name      sysname
    DECLARE     @dialog             uniqueidentifier 
    DECLARE     @message_sequence_number    bigint
    DECLARE     @error_message_sequence_number  bigint
    DECLARE     @message_body           xml
    DECLARE     @cgid               uniqueidentifier
    DECLARE     @JobID              int
    DECLARE     @Params             varchar(MAX)

    DECLARE     @ErrorNumber            bigint
    DECLARE     @ErrorText          nvarchar(MAX)

    DECLARE         @TaskID             int 
    DECLARE     @TaskService            varchar(100)
    DECLARE     @TaskKey            int
    DECLARE     @chEngine           uniqueidentifier
    DECLARE     @Step               int
    DECLARE     @NextStep           int
    DECLARE     @jobch              uniqueidentifier
    DECLARE     @EngineMsg          XML
    DECLARE     @TimeStarted            datetime
    DECLARE     @TaskStatus         int

    --  This procedure will just sit in a loop processing Task messages in the queue
    --  until the queue is empty
    SET NOCOUNT ON
    SET @error_message_sequence_number = -100


    PRINT 'pr_ProcessJob: Start'

    WHILE (1=1) BEGIN

        BEGIN TRY

            PRINT 'pr_ProcessJob: BEGIN TRANSACTION'
            BEGIN TRANSACTION

            -- first lets get the conversation group id for the next message.
            WAITFOR (
                GET CONVERSATION GROUP @cgid FROM [JobQueue]
            ), TIMEOUT 1000

            IF (@@ROWCOUNT = 0) BEGIN

                PRINT 'pr_ProcessJob: ROLLBACK TRANSACTION (GET CONVERSATION)'
                ROLLBACK TRANSACTION
                BREAK
            END

            PRINT @CGID

            -- Inner Loop (Message Processing)
            WHILE (1=1) BEGIN

                -- Receive the next available message
                PRINT 'Receiving Message.'
                WAITFOR (
                    RECEIVE top(1) -- just handle one message at a time
                        @message_type_name=message_type_name,  --the type of message received
                        @message_body=CAST(message_body AS XML),      -- the message contents
                        @message_sequence_number=message_sequence_number,
                        @dialog = conversation_handle    -- the identifier of the dialog this message was received on
                        FROM [JobQueue]
                        WHERE conversation_group_id=@cgid
                ), TIMEOUT 3000  -- if the queue is empty for three seconds, give up and go away

                -- If we didn't get anything, the queue is empty so bail out

                IF (@@ROWCOUNT = 0) BEGIN               
                    PRINT 'pr_ProcessJob::WaitFor - No messages for conversation group bailing out'
                    BREAK
                END --IF (@@ROWCOUNT = 0)

                PRINT 'Message Received: ' + @message_type_name
                SAVE TRANSACTION MessageReceivedSavePoint

                -- Handle the End Conversation Message
                IF (@message_type_name = 'http://schemas.microsoft.com/SQL/ServiceBroker/EndDialog') BEGIN
                    -- When we receive an End Dialog, we need to end also.
                    PRINT 'ENDING CONVERSATION'
                    END CONVERSATION @dialog
                END -- IF (@message_type_name = 'http://schemas.microsoft.com/SQL/ServiceBroker/EndDialog') BEGIN
                ELSE BEGIN
                    -- Handle the Conversation Error Message
                    IF (@message_type_name = 'http://schemas.microsoft.com/SQL/ServiceBroker/Error') BEGIN
                        -- We can't return anything here because the dialog at the other end is closed so just log 
                        -- an error and close our end of the conversation.

                        PRINT 'ENDING CONVERSATION w/Error'
                        END CONVERSATION @dialog
                    END -- (@message_type_name = 'http://schemas.microsoft.com/SQL/ServiceBroker/Error')
                    ELSE BEGIN
                        IF (@message_type_name = 'SubmitJob')  BEGIN -- Process normal Job messages..   

                            PRINT 'pr_ProcessJob:: Message Type SubmitJob received.'

                            -- Pull the information out of the task message with XQuery
                            SELECT @JobID = @message_body.value('(/Job/JobID)[1]', 'int'),
                                @Params = @message_body.value('(/Job/Params)[1]', 'varchar(MAX)')

                            PRINT 'pr_ProcessJob::@JobID = ' + cast(@jobID as varchar(10))
                            PRINT 'pr_ProcessJob::@Params = ' + @Params


                            SELECT  @ErrorNumber = 0, @ErrorText = N''

                            -- Do something with the job
                            -- save state

                            -- we are looking for the first step 
                            SET @Step=1

                            PRINT 'Selecting from JobTask'
                            ---------------------------------------------------------
                            -- Get the next task
                            ---------------------------------------------------------
                            SELECT  TOP 1
                                @TaskID=task.TaskID, 
                                @TaskService=tt.TaskService, 
                                @TaskKey =Task.TaskKey
                            FROM JobTask task INNER JOIN TaskType tt
                                ON task.TaskTypeID = tt.TaskTypeID      
                            WHERE task.jobID=@JobID AND task.enabled=1 and task.step>=@step
                            ORDER BY Task.step
                            ---------------------------------------------------------
                            PRINT 'Selecting from JobTask: complete'

                            PRINT 'Step='+cast(@step as varchar(max))               
                            PRINT 'TaskID='+cast(@TaskID as varchar(max))
                            PRINT 'TaskService='+cast(@TaskService as varchar(max))
                            PRINT'TaskKey='+cast(@TaskKey as varchar(max))                      

                            PRINT 'BEGIN DIALOG with ' + @TaskService   

                            BEGIN DIALOG @chEngine
                                FROM SERVICE [JobService]
                                TO SERVICE @TaskService
                                ON CONTRACT [TaskContract]
                                WITH RELATED_CONVERSATION=@dialog;

                            PRINT 'BEGIN DIALOG with ' + @TaskService+' completed.'

                            SET @EngineMsg = CAST('<Task><TaskID>'+ str(@TaskID)+'</TaskID><TaskKey>'+ str(@Taskkey)+'</TaskKey></Task>' as XML);

                            PRINT CAST(@EngineMsg as varchar(max))

                            PRINT 'Sending Message Type SubmitTask to Engine.';

                            SEND ON CONVERSATION @chEngine
                                MESSAGE TYPE SubmitTask
                                (@EngineMsg)

                            PRINT 'Inserting into jobstate'

                            INSERT INTO JobState(cgid, jobch, jobID, step) VALUES(@cgid, @dialog, @jobid, @step)

                        END -- IF (@message_type_name = 'SubmitJob') 
                        ELSE BEGIN

                            IF (@message_type_name = 'TaskResponse')  BEGIN 

                                PRINT 'Processing MessageType TaskResponse'

                                SELECT @TaskStatus = @message_body.value('(/TaskStatus)[1]', 'int')

                                PRINT 'pr_ProcessJob::@TaskStatus = ' + cast(@TaskStatus as varchar(10))

                                PRINT 'Loading State'

                                --LoadState
                                SELECT  @JobID=jobid, 
                                    @Step=Step, 
                                    @jobch=jobch,
                                    @TimeStarted=sysdate    
                                FROM Jobstate
                                WHERE cgid=@cgid

                                PRINT 'Loading State complete' 

                                PRINT @jobch

                                PRINT 'Selecting from JobTask'
                                ---------------------------------------------------------
                                -- Get the next task
                                ---------------------------------------------------------
                                SELECT  TOP 1
                                    @TaskID=task.TaskID, 
                                    @TaskService=tt.TaskService, 
                                    @TaskKey =task.TaskKey,
                                    @NextStep = task.Step

                                FROM JobTask task INNER JOIN TaskType tt
                                    ON task.TaskTypeID = tt.TaskTypeID      
                                WHERE task.jobID=@JobID AND task.enabled=1 and task.step>@step
                                ORDER BY Task.step
                                ---------------------------------------------------------
                                PRINT 'Selecting from JobTask: complete'

                                PRINT 'NextTask: ['+@TaskService+']'

                                if (@TaskService is null) BEGIN

                                    PRINT '@TaskService is NULL: BEGIN'

                                    -- no more tasks
                                    --END CONVERSATION @jobch

                                    PRINT 'Removing from state table'                               
                                    DELETE FROM JobState 
                                    WHERE @cgid=cgid
                                    PRINT @@ROWCOUNT
                                    PRINT 'Removing from state table-completed'

                                    DECLARE @ResponseDoc xml

                                    -- Send a response message saying we're done
                                    DECLARE @Time nvarchar(100)                 
                                    SET @Time = cast(getdate() as nvarchar(100))

                                    DECLARE @TimeStartedText nvarchar(100)
                                    SET @TimeStartedText = cast(@TimeStarted as nvarchar(100))

                                    SET @ResponseDoc = N'<Job/>'
                                    SET @ResponseDoc.modify(
                                    'insert (<JobID>{ sql:variable("@JobID") }</JobID>, 
                                    <JobStatus>{ sql:variable("@ErrorNumber") }</JobStatus>,
                                    <ErrorNumber>{ sql:variable("@ErrorNumber") }</ErrorNumber>,
                                    <ErrorText>{ sql:variable("@ErrorText") }</ErrorText>,
                                    <TimeStarted>{ sql:variable("@TimeStartedText") }</TimeStarted>, 
                                    <TimeCompleted>{ sql:variable("@Time") }</TimeCompleted>) 
                                    as last into /Job [1]'); 

                                    SEND ON CONVERSATION @jobch 
                                        MESSAGE TYPE [JobResponse] (@ResponseDoc)

                                    END CONVERSATION @jobch             

                                    PRINT '@TaskService is NULL: END'

                                END --if (@TaskService is null) BEGIN
                                ELSE BEGIN
                                    -- there are more tasks 
                                    PRINT '@TaskService is not null: BEGIN'

                                    PRINT 'BEGIN DIALOG with ' + @TaskService

                                    --another task
                                    BEGIN DIALOG @chEngine
                                        FROM SERVICE [JobService]
                                        TO SERVICE @TaskService
                                        ON CONTRACT [TaskContract]
                                        WITH RELATED_CONVERSATION=@dialog;

                                    SET @EngineMsg = CAST('<Task><TaskID>'+ str(@TaskID)+'</TaskID><TaskKey>'+ str(@Taskkey)+'</TaskKey></Task>' as XML);

                                    PRINT 'SEND ' +cast(@EngineMsg as varchar(max));

                                    SEND ON CONVERSATION @chEngine
                                    MESSAGE TYPE SubmitTask (@EngineMsg)                                    

                                    PRINT 'SAVING State: ' +str(@step)

                                    -- save state
                                    Update JobState
                                        SET step = @NextStep
                                    FROM JobState
                                    WHERE cgid=@cgid

                                    PRINT '@TaskService is not null: END'

                                END -- ELSE (@TaskService is NOT NULL)

                                PRINT 'Processing MessageType TaskResponse...Complete'

                            END -- IF (@message_type_name = 'TaskCompleted')

                        END -- ELSE IF (@message_type_name <> 'JobRequest') 
                    END -- ELSE  (@message_type_name <> 'http://schemas.microsoft.com/SQL/ServiceBroker/Error')
                END -- ELSE (@message_type_name <> 'http://schemas.microsoft.com/SQL/ServiceBroker/EndDialog')
            END -- WHILE (1=1) BEGIN

            PRINT 'COMMIT TRANSACTION'
            COMMIT TRANSACTION

        END TRY
        BEGIN CATCH

            --rollback transaction

            DECLARE @ErrNum int
            DECLARE @ErrMsg varchar(max)


                SELECT
                    ERROR_NUMBER() AS ErrorNumber
                    ,ERROR_SEVERITY() AS ErrorSeverity
                    ,ERROR_STATE() AS ErrorState
                    ,ERROR_PROCEDURE() AS ErrorProcedure
                    ,ERROR_LINE() AS ErrorLine
                    ,ERROR_MESSAGE() AS ErrorMessage;

            PRINT 'pr_ProcessJob: ROLLBACK (CATCH)'


            if (error_number()=1205) BEGIN
                -- a deadlock occurred. We can try it again.
                PRINT 'pr_ProcessJob: ROLLBACK TRANSACTION (CATCH)'
                ROLLBACK TRANSACTION
                --CONTINUE
            END --if (error_number()=1205)
            ELSE BEGIN
                if (error_number()=9617) BEGIN
                    PRINT 'pr_ProcessJob: ROLLBACK TRANSACTION (CATCH)'
                    ROLLBACK TRANSACTION
                END
                ELSE BEGIN -- (error_number()<>9617) 
                    -- another error occurred.  The message cant be procesed sucessfully
                    PRINT 'pr_ProcessJob: ROLLBACK TRANSACTION to MessageReceivedSavePoint (CATCH)'
                    ROLLBACK TRANSACTION MessageReceivedSavePoint

                END --ELSE (error_number()<>9617)       
            END -- if (error_number()<>1205)
        END CATCH

    END -- while loop

PRINT 'pr_ProcessJob: Complete' 

END -- CREATE PROCEDURE [dbo].[ProcessJobProc]
4

1 回答 1

1

如果您的应用程序的后续实例能够看到该消息,则仅意味着一件事:前一个实例必须已回滚接收。乍一看,您提供的代码看起来不错,所以我会去寻找您正在使用的对象模型中的错误。如果其中一个方法抛出异常,应用程序将被终止,并且收到消息的事务会自动回滚。

如果您希望一次只运行一个应用程序实例,请将 Max 设置保持为 1,否则默认情况下它将同时运行更多实例以跟上负载。

于 2010-02-09T19:07:44.603 回答