No meu último artigo, falei sobre os benefícios de implementar o processamento assíncrono usando o Service Broker no SQL Server em relação aos outros métodos que existem para o processamento desacoplado de tarefas longas. Neste artigo, veremos todos os componentes que precisam ser configurados para uma configuração básica do Service Broker em um único banco de dados e as considerações importantes para o gerenciamento de conversação entre os serviços do broker. Para começar, precisaremos criar um banco de dados e habilitar o banco de dados para uso do Service Broker:
CREATE DATABASE AsyncProcessingDemo; GO IF (SELECT is_broker_enabled FROM sys.databases WHERE name = N'AsyncProcessingDemo') = 0 BEGIN ALTER DATABASE AsyncProcessingDemo SET ENABLE_BROKER; END GO USE AsyncProcessingDemo; GO
Configurando componentes do corretor
Os objetos básicos que precisam ser criados no banco de dados são os tipos de mensagens para as mensagens, um contrato que define como as mensagens serão enviadas entre os serviços, uma fila e o serviço iniciador e uma fila e o serviço de destino. Muitos exemplos online para o Service Broker mostram a nomenclatura de objetos complexos para os tipos de mensagens, contratos e serviços do Service Broker. No entanto, não há um requisito para que os nomes sejam complexos e nomes de objetos simples podem ser usados para qualquer um dos objetos.
Para as mensagens, precisaremos criar um tipo de mensagem para a solicitação, que será chamada de
AsyncRequest
, e um tipo de mensagem para o resultado, que será chamado de AsyncResult
. Ambos utilizarão XML que será validado como corretamente formado pelos serviços do broker para enviar e receber os dados requeridos pelos serviços. -- Create the message types CREATE MESSAGE TYPE [AsyncRequest] VALIDATION = WELL_FORMED_XML; CREATE MESSAGE TYPE [AsyncResult] VALIDATION = WELL_FORMED_XML;
O contrato especifica que o
AsyncRequest
será enviado pelo serviço inicial para o serviço de destino e que o serviço de destino retornará um AsyncResult
mensagem de volta para o serviço inicial. O contrato também pode especificar vários tipos de mensagem para o iniciador e o destino, ou que um tipo de mensagem específico pode ser enviado por qualquer serviço, se o processamento específico exigir. -- Create the contract CREATE CONTRACT [AsyncContract] ( [AsyncRequest] SENT BY INITIATOR, [AsyncResult] SENT BY TARGET );
Para cada um dos serviços, uma fila deve ser criada para fornecer armazenamento das mensagens recebidas pelo serviço. O serviço de destino para o qual a solicitação será enviada precisa ser criado especificando o
AsyncContract
para permitir que as mensagens sejam enviadas para o serviço. Nesse caso, o serviço é denominado ProcessingService
e será criado no ProcessingQueue
dentro do banco de dados. O serviço iniciador não exige a especificação de um contrato, o que o torna capaz de receber apenas mensagens em resposta a uma conversa iniciada a partir dele. -- Create the processing queue and service - specify the contract to allow sending to the service CREATE QUEUE ProcessingQueue; CREATE SERVICE [ProcessingService] ON QUEUE ProcessingQueue ([AsyncContract]); -- Create the request queue and service CREATE QUEUE RequestQueue; CREATE SERVICE [RequestService] ON QUEUE RequestQueue;
Enviando uma mensagem para processamento
Como expliquei no artigo anterior, prefiro implementar um procedimento armazenado de wrapper para enviar uma nova mensagem a um serviço de agente, para que possa ser modificado uma vez para dimensionar o desempenho, se necessário. Este procedimento é um wrapper simples para criar uma nova conversa e enviar a mensagem para o
ProcessingService
. -- Create the wrapper procedure for sending messages CREATE PROCEDURE dbo.SendBrokerMessage @FromService SYSNAME, @ToService SYSNAME, @Contract SYSNAME, @MessageType SYSNAME, @MessageBody XML AS BEGIN SET NOCOUNT ON; DECLARE @conversation_handle UNIQUEIDENTIFIER; BEGIN TRANSACTION; BEGIN DIALOG CONVERSATION @conversation_handle FROM SERVICE @FromService TO SERVICE @ToService ON CONTRACT @Contract WITH ENCRYPTION = OFF; SEND ON CONVERSATION @conversation_handle MESSAGE TYPE @MessageType(@MessageBody); COMMIT TRANSACTION; END GO
Usando o procedimento armazenado wrapper, agora podemos enviar uma mensagem de teste para o
ProcessingService
para validar que configuramos os serviços do corretor corretamente. -- Send a request EXECUTE dbo.SendBrokerMessage @FromService = N'RequestService', @ToService = N'ProcessingService', @Contract = N'AsyncContract', @MessageType = N'AsyncRequest', @MessageBody = N'<AsyncRequest><AccountNumber>12345</AccountNumber></AsyncRequest>'; -- Check for message on processing queue SELECT CAST(message_body AS XML) FROM ProcessingQueue; GO
Processando mensagens
Embora pudéssemos processar manualmente as mensagens do
ProcessingQueue
, provavelmente desejaremos que as mensagens sejam processadas automaticamente à medida que são enviadas para o ProcessingService
. Para fazer isso, é necessário criar um procedimento armazenado de ativação que testaremos e depois vincularemos à fila para automatizar o processamento na ativação da fila. Para processar uma mensagem, precisamos RECEIVE
a mensagem da fila dentro de uma transação, juntamente com o tipo de mensagem e o identificador de conversa para a mensagem. O tipo de mensagem garante que a lógica apropriada seja aplicada à mensagem que está sendo processada e o identificador de conversa permite que uma resposta seja enviada de volta ao serviço inicial quando a mensagem for processada. O
RECEIVE
O comando permite que uma única mensagem ou várias mensagens dentro do mesmo identificador ou grupo de conversação sejam processadas em uma única transação. Para processar várias mensagens, uma variável de tabela deve ser usada, ou para processar uma única mensagem, uma variável local pode ser usada. O procedimento de ativação abaixo recupera uma única mensagem da fila, verifica o tipo de mensagem para determinar se é um AsyncRequest
mensagem e, em seguida, executa o processo de longa execução com base nas informações da mensagem recebidas. Caso não receba uma mensagem dentro do loop, aguardará até 5000ms, ou 5 segundos, para que outra mensagem entre na fila antes de sair do loop e encerrar sua execução. Depois de processar uma mensagem, ele cria um AsyncResult
mensagem e a envia de volta ao iniciador no mesmo identificador de conversa do qual a mensagem foi recebida. O procedimento também verifica o tipo de mensagem para determinar se um EndDialog
ou Error
mensagem foi recebida para limpar a conversa terminando-a. -- Create processing procedure for processing queue CREATE PROCEDURE dbo.ProcessingQueueActivation AS BEGIN SET NOCOUNT ON; DECLARE @conversation_handle UNIQUEIDENTIFIER; DECLARE @message_body XML; DECLARE @message_type_name sysname; WHILE (1=1) BEGIN BEGIN TRANSACTION; WAITFOR ( RECEIVE TOP (1) @conversation_handle = conversation_handle, @message_body = CAST(message_body AS XML), @message_type_name = message_type_name FROM ProcessingQueue ), TIMEOUT 5000; IF (@@ROWCOUNT = 0) BEGIN ROLLBACK TRANSACTION; BREAK; END IF @message_type_name = N'AsyncRequest' BEGIN -- Handle complex long processing here -- For demonstration we'll pull the account number and send a reply back only DECLARE @AccountNumber INT = @message_body.value('(AsyncRequest/AccountNumber)[1]', 'INT'); -- Build reply message and send back DECLARE @reply_message_body XML = N' ' + CAST(@AccountNumber AS NVARCHAR(11)) + ' '; SEND ON CONVERSATION @conversation_handle MESSAGE TYPE [AsyncResult] (@reply_message_body); END -- If end dialog message, end the dialog ELSE IF @message_type_name = N'http://schemas.microsoft.com/SQL/ServiceBroker/EndDialog' BEGIN END CONVERSATION @conversation_handle; END -- If error message, log and end conversation ELSE IF @message_type_name = N'http://schemas.microsoft.com/SQL/ServiceBroker/Error' BEGIN -- Log the error code and perform any required handling here -- End the conversation for the error END CONVERSATION @conversation_handle; END COMMIT TRANSACTION; END END GO
A
RequestQueue
também precisará processar as mensagens que são enviadas a ele, portanto, um procedimento adicional para processar o AsyncResult
mensagens retornadas pelo procedimento ProcessingQueueActivation precisam ser criadas. Como sabemos que a mensagem AsnycResult significa que todo o trabalho de processamento foi concluído, a conversa pode ser encerrada assim que processarmos essa mensagem, que enviará uma mensagem EndDialog ao ProcessingService, que será processada por seu procedimento de ativação para encerrar o conversa limpando tudo e evitando o fogo e esquecendo os problemas que acontecem quando as conversas são encerradas corretamente. -- Create procedure for processing replies to the request queue CREATE PROCEDURE dbo.RequestQueueActivation AS BEGIN SET NOCOUNT ON; DECLARE @conversation_handle UNIQUEIDENTIFIER; DECLARE @message_body XML; DECLARE @message_type_name sysname; WHILE (1=1) BEGIN BEGIN TRANSACTION; WAITFOR ( RECEIVE TOP (1) @conversation_handle = conversation_handle, @message_body = CAST(message_body AS XML), @message_type_name = message_type_name FROM RequestQueue ), TIMEOUT 5000; IF (@@ROWCOUNT = 0) BEGIN ROLLBACK TRANSACTION; BREAK; END IF @message_type_name = N'AsyncResult' BEGIN -- If necessary handle the reply message here DECLARE @AccountNumber INT = @message_body.value('(AsyncResult/AccountNumber)[1]', 'INT'); -- Since this is all the work being done, end the conversation to send the EndDialog message END CONVERSATION @conversation_handle; END -- If end dialog message, end the dialog ELSE IF @message_type_name = N'http://schemas.microsoft.com/SQL/ServiceBroker/EndDialog' BEGIN END CONVERSATION @conversation_handle; END -- If error message, log and end conversation ELSE IF @message_type_name = N'http://schemas.microsoft.com/SQL/ServiceBroker/Error' BEGIN END CONVERSATION @conversation_handle; END COMMIT TRANSACTION; END END GO
Teste dos procedimentos
Antes de automatizar o processamento da fila para nossos serviços, é importante testar os procedimentos de ativação para garantir que eles processem as mensagens adequadamente e para evitar que uma fila seja desabilitada caso ocorra um erro que não seja tratado adequadamente. Como já existe uma mensagem no
ProcessingQueue
a ProcessingQueueActivation
procedimento pode ser executado para processar essa mensagem. Tenha em mente que o WAITFOR
fará com que o procedimento demore 5 segundos para terminar, mesmo que a mensagem seja processada imediatamente da fila. Após processar a mensagem, podemos verificar se o procedimento funcionou corretamente consultando o RequestQueue
para ver se um AsyncResult
mensagem existe, e então podemos verificar se o RequestQueueActivation
procedimento funciona corretamente executando-o. -- Process the message from the processing queue EXECUTE dbo.ProcessingQueueActivation; GO -- Check for reply message on request queue SELECT CAST(message_body AS XML) FROM RequestQueue; GO -- Process the message from the request queue EXECUTE dbo.RequestQueueActivation; GO
Automatizando o processamento
Neste ponto, todos os componentes estão completos para automatizar totalmente nosso processamento. A única coisa que resta é vincular os procedimentos de ativação às suas filas apropriadas e, em seguida, enviar outra mensagem de teste para validar que ela seja processada e nada permaneça nas filas depois.
-- Alter the processing queue to specify internal activation ALTER QUEUE ProcessingQueue WITH ACTIVATION ( STATUS = ON, PROCEDURE_NAME = dbo.ProcessingQueueActivation, MAX_QUEUE_READERS = 10, EXECUTE AS SELF ); GO -- Alter the request queue to specify internal activation ALTER QUEUE RequestQueue WITH ACTIVATION ( STATUS = ON, PROCEDURE_NAME = dbo.RequestQueueActivation, MAX_QUEUE_READERS = 10, EXECUTE AS SELF ); GO -- Test automated activation -- Send a request EXECUTE dbo.SendBrokerMessage @FromService = N'RequestService', @ToService = N'ProcessingService', @Contract = N'AsyncContract', @MessageType = N'AsyncRequest', @MessageBody = N'<AsyncRequest><AccountNumber>12345</AccountNumber></AsyncRequest>'; -- Check for message on processing queue -- nothing is there because it was automatically processed SELECT CAST(message_body AS XML) FROM ProcessingQueue; GO -- Check for reply message on request queue -- nothing is there because it was automatically processed SELECT CAST(message_body AS XML) FROM RequestQueue; GO
Resumo
Os componentes básicos para processamento assíncrono automatizado no SQL Server Service Broker podem ser configurados em uma única configuração de banco de dados para permitir o processamento desacoplado de tarefas de longa execução. Essa pode ser uma ferramenta poderosa para melhorar o desempenho do aplicativo, a partir da experiência do usuário final, desvinculando o processamento das interações do usuário final com o aplicativo.