PostgreSQL
 sql >> Base de Dados >  >> RDS >> PostgreSQL

@Tailable(spring-data-reactive-mongodb) equivalente em spring-data-r2dbc


Eu estava com o mesmo problema, não tenho certeza se você encontrou uma solução ou não, mas consegui fazer algo semelhante fazendo o seguinte. Primeiro, adicionei o gatilho à minha tabela
CREATE TRIGGER trigger_name
    AFTER INSERT OR DELETE OR UPDATE 
    ON table_name
    FOR EACH ROW
    EXECUTE PROCEDURE trigger_function_name;

Isso definirá um gatilho na tabela sempre que uma linha for atualizada, excluída ou inserida. Em seguida, ele chamará a função de gatilho que configurei, que se parecia com isso:
CREATE FUNCTION trigger_function_name
RETURNS trigger
LANGUAGE 'plpgsql'
COST 100
VOLATILE NOT LEAKPROOF
AS 
$BODY$
DECLARE
    payload JSON;
BEGIN
    payload = row_to_json(NEW);
    PERFORM pg_notify('notification_name', payload::text);
    RETURN NULL;
END;
$BODY$;

Isso me permitirá 'ouvir' qualquer uma dessas atualizações do meu projeto de inicialização de mola e enviará a linha inteira como uma carga útil. Em seguida, no meu projeto de inicialização de mola, configurei uma conexão com meu banco de dados.
@Configuration
@EnableR2dbcRepositories("com.(point to wherever repository is)")
public class R2DBCConfig extends AbstractR2dbcConfiguration {
    @Override
    @Bean
    public ConnectionFactory connectionFactory() {
        return new PostgresqlConnectionFactory(PostgresqlConnectionConfiguration.builder()
                .host("host")
                .database("db")
                .port(port)
                .username("username")
                .password("password")
                .schema("schema")
                .connectTimeout(Duration.ofMinutes(2))
                .build());
    }
}

Com isso, eu faço o Autowire (injeção de dependência) no construtor na minha classe de serviço e o converto em uma classe r2dbc PostgressqlConnection assim:
this.postgresqlConnection = Mono.from(connectionFactory.create()).cast(PostgresqlConnection.class).block();

Agora queremos 'ouvir' a nossa mesa e ser notificado quando realizarmos alguma atualização na nossa mesa. Para fazer isso, configuramos um método de inicialização que é executado após a injeção de dependência usando a anotação @PostContruct
@PostConstruct
private void postConstruct() {
    postgresqlConnection.createStatement("LISTEN notification_name").execute()
            .flatMap(PostgresqlResult::getRowsUpdated).subscribe();
}

Observe que ouvimos qualquer nome que colocamos dentro do método pg_notify. Também queremos configurar um método para fechar a conexão quando o bean estiver prestes a ser jogado fora, assim:
@PreDestroy
private void preDestroy() {
    postgresqlConnection.close().subscribe();
}

Agora eu simplesmente crio um método que retorna um Flux do que estiver atualmente na minha tabela, e também faço o merge dele com minhas notificações, como eu disse antes das notificações chegarem como um json, então tive que desserializar e resolvi usar Mapeador de Objetos. Então, ficará mais ou menos assim:
private Flux<YourClass> getUpdatedRows() {
    return postgresqlConnection.getNotifications().map(notification -> {
        try {
            //deserialize json
            return objectMapper.readValue(notification.getParameter(), YourClass.class);
        } catch (IOException e) {
            //handle exception
        }
    });
}

public Flux<YourClass> getDocuments() {
    return documentRepository.findAll().share().concatWith(getUpdatedRows());
}

Espero que isso ajude.Cheers!