Database
 sql >> Base de Dados >  >> RDS >> Database

Configurando o Service Broker para processamento assíncrono


No meu último artigo, falei sobre os benefícios de implementar o processamento assíncrono usando o Service Broker no SQL Server em relação aos outros métodos que existem para o processamento desacoplado de tarefas longas. Neste artigo, veremos todos os componentes que precisam ser configurados para uma configuração básica do Service Broker em um único banco de dados e as considerações importantes para o gerenciamento de conversação entre os serviços do broker. Para começar, precisaremos criar um banco de dados e habilitar o banco de dados para uso do Service Broker:
CREATE DATABASE AsyncProcessingDemo;
GO
 
IF (SELECT is_broker_enabled FROM sys.databases WHERE name = N'AsyncProcessingDemo') = 0
BEGIN
  ALTER DATABASE AsyncProcessingDemo SET ENABLE_BROKER;
END
GO
 
USE AsyncProcessingDemo;
GO

Configurando componentes do corretor


Os objetos básicos que precisam ser criados no banco de dados são os tipos de mensagens para as mensagens, um contrato que define como as mensagens serão enviadas entre os serviços, uma fila e o serviço iniciador e uma fila e o serviço de destino. Muitos exemplos online para o Service Broker mostram a nomenclatura de objetos complexos para os tipos de mensagens, contratos e serviços do Service Broker. No entanto, não há um requisito para que os nomes sejam complexos e nomes de objetos simples podem ser usados ​​para qualquer um dos objetos.

Para as mensagens, precisaremos criar um tipo de mensagem para a solicitação, que será chamada de AsyncRequest , e um tipo de mensagem para o resultado, que será chamado de AsyncResult . Ambos utilizarão XML que será validado como corretamente formado pelos serviços do broker para enviar e receber os dados requeridos pelos serviços.
-- Create the message types
CREATE MESSAGE TYPE [AsyncRequest] VALIDATION = WELL_FORMED_XML;
CREATE MESSAGE TYPE [AsyncResult]  VALIDATION = WELL_FORMED_XML;

O contrato especifica que o AsyncRequest será enviado pelo serviço inicial para o serviço de destino e que o serviço de destino retornará um AsyncResult mensagem de volta para o serviço inicial. O contrato também pode especificar vários tipos de mensagem para o iniciador e o destino, ou que um tipo de mensagem específico pode ser enviado por qualquer serviço, se o processamento específico exigir.
-- Create the contract
CREATE CONTRACT [AsyncContract] 
(
  [AsyncRequest] SENT BY INITIATOR, 
  [AsyncResult]  SENT BY TARGET
);

Para cada um dos serviços, uma fila deve ser criada para fornecer armazenamento das mensagens recebidas pelo serviço. O serviço de destino para o qual a solicitação será enviada precisa ser criado especificando o AsyncContract para permitir que as mensagens sejam enviadas para o serviço. Nesse caso, o serviço é denominado ProcessingService e será criado no ProcessingQueue dentro do banco de dados. O serviço iniciador não exige a especificação de um contrato, o que o torna capaz de receber apenas mensagens em resposta a uma conversa iniciada a partir dele.
-- Create the processing queue and service - specify the contract to allow sending to the service
CREATE QUEUE ProcessingQueue;
CREATE SERVICE [ProcessingService] ON QUEUE ProcessingQueue ([AsyncContract]);
 
-- Create the request queue and service 
CREATE QUEUE RequestQueue;
CREATE SERVICE [RequestService] ON QUEUE RequestQueue;

Enviando uma mensagem para processamento


Como expliquei no artigo anterior, prefiro implementar um procedimento armazenado de wrapper para enviar uma nova mensagem a um serviço de agente, para que possa ser modificado uma vez para dimensionar o desempenho, se necessário. Este procedimento é um wrapper simples para criar uma nova conversa e enviar a mensagem para o ProcessingService .
-- Create the wrapper procedure for sending messages
CREATE PROCEDURE dbo.SendBrokerMessage 
	@FromService SYSNAME,
	@ToService   SYSNAME,
	@Contract    SYSNAME,
	@MessageType SYSNAME,
	@MessageBody XML
AS
BEGIN
  SET NOCOUNT ON;
 
  DECLARE @conversation_handle UNIQUEIDENTIFIER;
 
  BEGIN TRANSACTION;
 
  BEGIN DIALOG CONVERSATION @conversation_handle
    FROM SERVICE @FromService
    TO SERVICE @ToService
    ON CONTRACT @Contract
    WITH ENCRYPTION = OFF;
 
  SEND ON CONVERSATION @conversation_handle
    MESSAGE TYPE @MessageType(@MessageBody);
 
  COMMIT TRANSACTION;
END
GO

Usando o procedimento armazenado wrapper, agora podemos enviar uma mensagem de teste para o ProcessingService para validar que configuramos os serviços do corretor corretamente.
-- Send a request
EXECUTE dbo.SendBrokerMessage
  @FromService = N'RequestService',
  @ToService   = N'ProcessingService',
  @Contract    = N'AsyncContract',
  @MessageType = N'AsyncRequest',
  @MessageBody = N'<AsyncRequest><AccountNumber>12345</AccountNumber></AsyncRequest>';
 
-- Check for message on processing queue
SELECT CAST(message_body AS XML) FROM ProcessingQueue;
GO

Processando mensagens


Embora pudéssemos processar manualmente as mensagens do ProcessingQueue , provavelmente desejaremos que as mensagens sejam processadas automaticamente à medida que são enviadas para o ProcessingService . Para fazer isso, é necessário criar um procedimento armazenado de ativação que testaremos e depois vincularemos à fila para automatizar o processamento na ativação da fila. Para processar uma mensagem, precisamos RECEIVE a mensagem da fila dentro de uma transação, juntamente com o tipo de mensagem e o identificador de conversa para a mensagem. O tipo de mensagem garante que a lógica apropriada seja aplicada à mensagem que está sendo processada e o identificador de conversa permite que uma resposta seja enviada de volta ao serviço inicial quando a mensagem for processada.

O RECEIVE O comando permite que uma única mensagem ou várias mensagens dentro do mesmo identificador ou grupo de conversação sejam processadas em uma única transação. Para processar várias mensagens, uma variável de tabela deve ser usada, ou para processar uma única mensagem, uma variável local pode ser usada. O procedimento de ativação abaixo recupera uma única mensagem da fila, verifica o tipo de mensagem para determinar se é um AsyncRequest mensagem e, em seguida, executa o processo de longa execução com base nas informações da mensagem recebidas. Caso não receba uma mensagem dentro do loop, aguardará até 5000ms, ou 5 segundos, para que outra mensagem entre na fila antes de sair do loop e encerrar sua execução. Depois de processar uma mensagem, ele cria um AsyncResult mensagem e a envia de volta ao iniciador no mesmo identificador de conversa do qual a mensagem foi recebida. O procedimento também verifica o tipo de mensagem para determinar se um EndDialog ou Error mensagem foi recebida para limpar a conversa terminando-a.
-- Create processing procedure for processing queue
CREATE PROCEDURE dbo.ProcessingQueueActivation
AS
BEGIN
  SET NOCOUNT ON;
 
  DECLARE @conversation_handle UNIQUEIDENTIFIER;
  DECLARE @message_body XML;
  DECLARE @message_type_name sysname;
 
  WHILE (1=1)
  BEGIN
    BEGIN TRANSACTION;
 
    WAITFOR
    (
      RECEIVE TOP (1)
        @conversation_handle = conversation_handle,
        @message_body = CAST(message_body AS XML),
        @message_type_name = message_type_name
      FROM ProcessingQueue
    ), TIMEOUT 5000;
 
    IF (@@ROWCOUNT = 0)
    BEGIN
      ROLLBACK TRANSACTION;
      BREAK;
    END
 
    IF @message_type_name = N'AsyncRequest'
    BEGIN
      -- Handle complex long processing here
      -- For demonstration we'll pull the account number and send a reply back only
 
      DECLARE @AccountNumber INT = @message_body.value('(AsyncRequest/AccountNumber)[1]', 'INT');
 
      -- Build reply message and send back
      DECLARE @reply_message_body XML = N'
        ' + CAST(@AccountNumber AS NVARCHAR(11)) + '
      ';
 
      SEND ON CONVERSATION @conversation_handle
        MESSAGE TYPE [AsyncResult] (@reply_message_body);
    END
 
    -- If end dialog message, end the dialog
    ELSE IF @message_type_name = N'http://schemas.microsoft.com/SQL/ServiceBroker/EndDialog'
    BEGIN
      END CONVERSATION @conversation_handle;
    END
 
    -- If error message, log and end conversation
    ELSE IF @message_type_name = N'http://schemas.microsoft.com/SQL/ServiceBroker/Error'
    BEGIN
      -- Log the error code and perform any required handling here
      -- End the conversation for the error
      END CONVERSATION @conversation_handle;
    END
 
    COMMIT TRANSACTION;
  END
END
GO

A RequestQueue também precisará processar as mensagens que são enviadas a ele, portanto, um procedimento adicional para processar o AsyncResult mensagens retornadas pelo procedimento ProcessingQueueActivation precisam ser criadas. Como sabemos que a mensagem AsnycResult significa que todo o trabalho de processamento foi concluído, a conversa pode ser encerrada assim que processarmos essa mensagem, que enviará uma mensagem EndDialog ao ProcessingService, que será processada por seu procedimento de ativação para encerrar o conversa limpando tudo e evitando o fogo e esquecendo os problemas que acontecem quando as conversas são encerradas corretamente.
-- Create procedure for processing replies to the request queue
CREATE PROCEDURE dbo.RequestQueueActivation
AS
BEGIN
  SET NOCOUNT ON;
 
  DECLARE @conversation_handle UNIQUEIDENTIFIER;
  DECLARE @message_body XML;
  DECLARE @message_type_name sysname;
 
  WHILE (1=1)
  BEGIN
    BEGIN TRANSACTION;
 
    WAITFOR
    (
      RECEIVE TOP (1)
        @conversation_handle = conversation_handle,
        @message_body = CAST(message_body AS XML),
        @message_type_name = message_type_name
      FROM RequestQueue
    ), TIMEOUT 5000;
 
    IF (@@ROWCOUNT = 0)
    BEGIN
      ROLLBACK TRANSACTION;
      BREAK;
    END
 
    IF @message_type_name = N'AsyncResult'
    BEGIN
      -- If necessary handle the reply message here
      DECLARE @AccountNumber INT = @message_body.value('(AsyncResult/AccountNumber)[1]', 'INT');
 
      -- Since this is all the work being done, end the conversation to send the EndDialog message
      END CONVERSATION @conversation_handle;
    END
 
    -- If end dialog message, end the dialog
    ELSE IF @message_type_name = N'http://schemas.microsoft.com/SQL/ServiceBroker/EndDialog'
    BEGIN
       END CONVERSATION @conversation_handle;
    END
 
    -- If error message, log and end conversation
    ELSE IF @message_type_name = N'http://schemas.microsoft.com/SQL/ServiceBroker/Error'
    BEGIN
       END CONVERSATION @conversation_handle;
    END
 
    COMMIT TRANSACTION;
  END
END
GO

Teste dos procedimentos


Antes de automatizar o processamento da fila para nossos serviços, é importante testar os procedimentos de ativação para garantir que eles processem as mensagens adequadamente e para evitar que uma fila seja desabilitada caso ocorra um erro que não seja tratado adequadamente. Como já existe uma mensagem no ProcessingQueue a ProcessingQueueActivation procedimento pode ser executado para processar essa mensagem. Tenha em mente que o WAITFOR fará com que o procedimento demore 5 segundos para terminar, mesmo que a mensagem seja processada imediatamente da fila. Após processar a mensagem, podemos verificar se o procedimento funcionou corretamente consultando o RequestQueue para ver se um AsyncResult mensagem existe, e então podemos verificar se o RequestQueueActivation procedimento funciona corretamente executando-o.
-- Process the message from the processing queue
EXECUTE dbo.ProcessingQueueActivation;
GO
 
-- Check for reply message on request queue
SELECT CAST(message_body AS XML) FROM RequestQueue;
GO
 
-- Process the message from the request queue
EXECUTE dbo.RequestQueueActivation;
GO

Automatizando o processamento


Neste ponto, todos os componentes estão completos para automatizar totalmente nosso processamento. A única coisa que resta é vincular os procedimentos de ativação às suas filas apropriadas e, em seguida, enviar outra mensagem de teste para validar que ela seja processada e nada permaneça nas filas depois.
-- Alter the processing queue to specify internal activation
ALTER QUEUE ProcessingQueue
    WITH ACTIVATION
    ( 
      STATUS = ON,
      PROCEDURE_NAME = dbo.ProcessingQueueActivation,
      MAX_QUEUE_READERS = 10,
      EXECUTE AS SELF
    );
GO
 
-- Alter the request queue to specify internal activation
ALTER QUEUE RequestQueue
    WITH ACTIVATION
    ( 
      STATUS = ON,
      PROCEDURE_NAME = dbo.RequestQueueActivation,
      MAX_QUEUE_READERS = 10,
      EXECUTE AS SELF
    );
GO
 
-- Test automated activation
-- Send a request
 
EXECUTE dbo.SendBrokerMessage
	@FromService = N'RequestService',
	@ToService   = N'ProcessingService',
	@Contract    = N'AsyncContract',
	@MessageType = N'AsyncRequest',
	@MessageBody = N'<AsyncRequest><AccountNumber>12345</AccountNumber></AsyncRequest>';
 
-- Check for message on processing queue 
-- nothing is there because it was automatically processed
SELECT CAST(message_body AS XML) FROM ProcessingQueue;
GO
 
-- Check for reply message on request queue 
-- nothing is there because it was automatically processed
SELECT CAST(message_body AS XML) FROM RequestQueue;
GO

Resumo


Os componentes básicos para processamento assíncrono automatizado no SQL Server Service Broker podem ser configurados em uma única configuração de banco de dados para permitir o processamento desacoplado de tarefas de longa execução. Essa pode ser uma ferramenta poderosa para melhorar o desempenho do aplicativo, a partir da experiência do usuário final, desvinculando o processamento das interações do usuário final com o aplicativo.