Redis
 sql >> Base de Dados >  >> NoSQL >> Redis

Como implementar um fluxo de futuros para uma chamada de bloqueio usando futures.rs e Redis PubSub?


Ressalva pesada Eu nunca usei esta biblioteca antes, e meu conhecimento de baixo nível de alguns dos conceitos é um pouco... deficiente. Principalmente eu estou lendo através do tutorial. Tenho certeza de que qualquer pessoa que tenha feito trabalho assíncrono lerá isso e rirá, mas pode ser um ponto de partida útil para outras pessoas. Adverte emptor!

Vamos começar com algo um pouco mais simples, demonstrando como um Stream funciona. Podemos converter um iterador de Result s em um fluxo:
extern crate futures;

use futures::Future;
use futures::stream::{self, Stream};

fn main() {
    let payloads: Vec<Result<String, ()>> = vec![Ok("a".into()), Ok("b".into())];
    let payloads = stream::iter(payloads.into_iter());

    let foo = payloads
        .and_then(|payload| futures::finished(println!("{}", payload)))
        .for_each(|_| Ok(()));

    foo.forget();
}

Isso nos mostra uma maneira de consumir o fluxo. Usamos and_then para fazer algo em cada carga útil (aqui apenas imprimindo) e, em seguida, for_each para converter o Stream de volta para um Future . Podemos então executar o futuro chamando o nome estranho forget método.

Em seguida, é necessário vincular a biblioteca Redis ao mix, manipulando apenas uma mensagem. Como o get_message() estiver bloqueando, precisamos introduzir algumas threads na mistura. Não é uma boa ideia realizar uma grande quantidade de trabalho nesse tipo de sistema assíncrono, pois todo o resto será bloqueado. Por exemplo:

A menos que seja organizado de outra forma, deve-se garantir que implementações desta função terminem muito rapidamente .

Em um mundo ideal, a caixa redis seria construída em cima de uma biblioteca como futuros e exporia tudo isso nativamente.
extern crate redis;
extern crate futures;

use std::thread;
use futures::Future;
use futures::stream::{self, Stream};

fn main() {
    let client = redis::Client::open("redis://127.0.0.1/").expect("Unable to connect to Redis");

    let mut pubsub = client.get_pubsub().expect("Unable to create pubsub handle");
    pubsub.subscribe("rust").expect("Unable to subscribe to redis channel");

    let (tx, payloads) = stream::channel();

    let redis_thread = thread::spawn(move || {
        let msg = pubsub.get_message().expect("Unable to get message");
        let payload: Result<String, _> = msg.get_payload();
        tx.send(payload).forget();
    });

    let foo = payloads
        .and_then(|payload| futures::finished(println!("{}", payload)))
        .for_each(|_| Ok(()));

    foo.forget();
    redis_thread.join().expect("unable to join to thread");
}

Meu entendimento fica mais confuso aqui. Em um thread separado, bloqueamos a mensagem e a enviamos para o canal quando a recebemos. O que eu não entendo é por que precisamos segurar a alça do fio. Eu esperaria que foo.forget estaria se bloqueando, esperando até que o fluxo esteja vazio.

Em uma conexão telnet com o servidor Redis, envie isto:
publish rust awesome

E você verá que funciona. Adicionar instruções de impressão mostra que (para mim) o foo.forget A instrução é executada antes do thread ser gerado.

Múltiplas mensagens é mais complicado. O Sender se consome para evitar que o lado gerador fique muito à frente do lado consumidor. Isso é feito retornando outro futuro de send ! Precisamos transportá-lo de volta para reutilizá-lo para a próxima iteração do loop:
extern crate redis;
extern crate futures;

use std::thread;
use std::sync::mpsc;

use futures::Future;
use futures::stream::{self, Stream};

fn main() {
    let client = redis::Client::open("redis://127.0.0.1/").expect("Unable to connect to Redis");

    let mut pubsub = client.get_pubsub().expect("Unable to create pubsub handle");
    pubsub.subscribe("rust").expect("Unable to subscribe to redis channel");

    let (tx, payloads) = stream::channel();

    let redis_thread = thread::spawn(move || {
        let mut tx = tx;

        while let Ok(msg) = pubsub.get_message() {
            let payload: Result<String, _> = msg.get_payload();

            let (next_tx_tx, next_tx_rx) = mpsc::channel();

            tx.send(payload).and_then(move |new_tx| {
                next_tx_tx.send(new_tx).expect("Unable to send successor channel tx");
                futures::finished(())
            }).forget();

            tx = next_tx_rx.recv().expect("Unable to receive successor channel tx");
        }
    });

    let foo = payloads
        .and_then(|payload| futures::finished(println!("{}", payload)))
        .for_each(|_| Ok(()));

    foo.forget();
    redis_thread.join().expect("unable to join to thread");
}

Tenho certeza de que haverá mais ecossistema para esse tipo de interoperação com o passar do tempo. Por exemplo, a caixa futures-cpupool poderia provavelmente ser estendido para suportar um caso de uso semelhante a este.