Redis
 sql >> Base de Dados >  >> NoSQL >> Redis

Frasco por exemplo - Implementando uma fila de tarefas do Redis


Esta parte do tutorial detalha como implementar uma fila de tarefas do Redis para lidar com o processamento de texto.

Atualizações:
  • 12/02/2020:atualizado para o Python versão 3.8.1, bem como para as versões mais recentes do Redis, Python Redis e RQ. Veja abaixo para detalhes. Mencione um bug na versão RQ mais recente e forneça uma solução. Resolvido o bug http antes de https.
  • 22/03/2016:atualizado para o Python versão 3.5.1, bem como para as versões mais recentes do Redis, Python Redis e RQ. Veja abaixo os detalhes.
  • 22/02/2015:Adicionado suporte para Python 3.

Bônus grátis: Clique aqui para obter acesso a um tutorial em vídeo gratuito do Flask + Python que mostra como criar o aplicativo Web Flask, passo a passo.

Lembre-se:Aqui está o que estamos construindo:um aplicativo Flask que calcula pares de frequência de palavras com base no texto de um determinado URL.
  1. Parte um:configurar um ambiente de desenvolvimento local e, em seguida, implantar um ambiente de preparação e de produção no Heroku.
  2. Parte dois:configure um banco de dados PostgreSQL junto com SQLAlchemy e Alembic para lidar com migrações.
  3. Parte três:adicione a lógica de back-end para extrair e, em seguida, processar a contagem de palavras de uma página da Web usando as bibliotecas de solicitações, BeautifulSoup e Natural Language Toolkit (NLTK).
  4. Parte quatro:Implemente uma fila de tarefas do Redis para lidar com o processamento de texto. (atual )
  5. Parte cinco:configure o Angular no front-end para pesquisar continuamente o back-end para ver se o processamento da solicitação foi concluído.
  6. Parte seis:enviar para o servidor de teste no Heroku - configurando o Redis e detalhando como executar dois processos (web e worker) em um único Dyno.
  7. Parte sete:atualize o front-end para torná-lo mais fácil de usar.
  8. Parte oito:crie uma diretiva angular personalizada para exibir um gráfico de distribuição de frequência usando JavaScript e D3.

Precisa do código? Pegue-o no repositório.

Requisitos de instalação


Ferramentas usadas:
  • Redis (5.0.7)
  • Python Redis (3.4.1)
  • RQ (1.2.2) - uma biblioteca simples para criar uma fila de tarefas

Comece baixando e instalando o Redis do site oficial ou via Homebrew (brew install redis ). Uma vez instalado, inicie o servidor Redis:
$ redis-server

Em seguida, instale o Python Redis e o RQ em uma nova janela de terminal:
$ cd flask-by-example
$ python -m pip install redis==3.4.1 rq==1.2.2
$ python -m pip freeze > requirements.txt


Configurar o trabalhador


Vamos começar criando um processo de trabalho para ouvir as tarefas enfileiradas. Crie um novo arquivo worker.py , e adicione este código:
import os

import redis
from rq import Worker, Queue, Connection

listen = ['default']

redis_url = os.getenv('REDISTOGO_URL', 'redis://localhost:6379')

conn = redis.from_url(redis_url)

if __name__ == '__main__':
    with Connection(conn):
        worker = Worker(list(map(Queue, listen)))
        worker.work()

Aqui, ouvimos uma fila chamada default e estabeleceu uma conexão com o servidor Redis em localhost:6379 .

Acione isso em outra janela de terminal:
$ cd flask-by-example
$ python worker.py
17:01:29 RQ worker started, version 0.5.6
17:01:29
17:01:29 *** Listening on default...

Agora precisamos atualizar nosso app.py para enviar trabalhos para a fila…


Atualizar app.py


Adicione as seguintes importações a app.py :
from rq import Queue
from rq.job import Job
from worker import conn

Em seguida, atualize a seção de configuração:
app = Flask(__name__)
app.config.from_object(os.environ['APP_SETTINGS'])
app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = True
db = SQLAlchemy(app)

q = Queue(connection=conn)

from models import *

q = Queue(connection=conn) configurar uma conexão Redis e inicializar uma fila com base nessa conexão.

Mova a funcionalidade de processamento de texto de nossa rota de índice para uma nova função chamada count_and_save_words() . Essa função aceita um argumento, uma URL, que passaremos para ela quando a chamarmos de nossa rota de índice.
def count_and_save_words(url):

    errors = []

    try:
        r = requests.get(url)
    except:
        errors.append(
            "Unable to get URL. Please make sure it's valid and try again."
        )
        return {"error": errors}

    # text processing
    raw = BeautifulSoup(r.text).get_text()
    nltk.data.path.append('./nltk_data/')  # set the path
    tokens = nltk.word_tokenize(raw)
    text = nltk.Text(tokens)

    # remove punctuation, count raw words
    nonPunct = re.compile('.*[A-Za-z].*')
    raw_words = [w for w in text if nonPunct.match(w)]
    raw_word_count = Counter(raw_words)

    # stop words
    no_stop_words = [w for w in raw_words if w.lower() not in stops]
    no_stop_words_count = Counter(no_stop_words)

    # save the results
    try:
        result = Result(
            url=url,
            result_all=raw_word_count,
            result_no_stop_words=no_stop_words_count
        )
        db.session.add(result)
        db.session.commit()
        return result.id
    except:
        errors.append("Unable to add item to database.")
        return {"error": errors}


@app.route('/', methods=['GET', 'POST'])
def index():
    results = {}
    if request.method == "POST":
        # this import solves a rq bug which currently exists
        from app import count_and_save_words

        # get url that the person has entered
        url = request.form['url']
        if not url[:8].startswith(('https://', 'http://')):
            url = 'http://' + url
        job = q.enqueue_call(
            func=count_and_save_words, args=(url,), result_ttl=5000
        )
        print(job.get_id())

    return render_template('index.html', results=results)

Observe o seguinte código:
job = q.enqueue_call(
    func=count_and_save_words, args=(url,), result_ttl=5000
)
print(job.get_id())

Observação: Precisamos importar as count_and_save_words função em nossa função index pois o pacote RQ atualmente tem um bug, onde não encontra funções no mesmo módulo.

Aqui usamos a fila que inicializamos anteriormente e chamamos de enqueue_call() função. Isso adicionou um novo trabalho à fila e esse trabalho executou o count_and_save_words() função com o URL como argumento. O result_ttl=5000 O argumento de linha informa ao RQ por quanto tempo manter o resultado do trabalho por - 5.000 segundos, neste caso. Em seguida, enviamos o ID do trabalho para o terminal. Este id é necessário para ver se o trabalho é processado.

Vamos configurar uma nova rota para isso…


Obter resultados

@app.route("/results/<job_key>", methods=['GET'])
def get_results(job_key):

    job = Job.fetch(job_key, connection=conn)

    if job.is_finished:
        return str(job.result), 200
    else:
        return "Nay!", 202

Vamos testar isso.

Inicie o servidor, navegue até http://localhost:5000/, use a URL https://realpython.com e pegue o ID do trabalho no terminal. Em seguida, use esse id no endpoint '/results/' - ou seja, http://localhost:5000/results/ef600206-3503-4b87-a436-ddd9438f2197.

Contanto que menos de 5.000 segundos tenham se passado antes de você verificar o status, você deverá ver um número de identificação, que é gerado quando adicionamos os resultados ao banco de dados:
# save the results
try:
    from models import Result
    result = Result(
        url=url,
        result_all=raw_word_count,
        result_no_stop_words=no_stop_words_count
    )
    db.session.add(result)
    db.session.commit()
    return result.id

Agora, vamos refatorar um pouco a rota para retornar os resultados reais do banco de dados em JSON:
@app.route("/results/<job_key>", methods=['GET'])
def get_results(job_key):

    job = Job.fetch(job_key, connection=conn)

    if job.is_finished:
        result = Result.query.filter_by(id=job.result).first()
        results = sorted(
            result.result_no_stop_words.items(),
            key=operator.itemgetter(1),
            reverse=True
        )[:10]
        return jsonify(results)
    else:
        return "Nay!", 202

Certifique-se de adicionar a importação:
from flask import jsonify

Teste isso novamente. Se tudo correu bem, você deve ver algo semelhante em seu navegador:
[
  [
    "Python", 
    315
  ], 
  [
    "intermediate", 
    167
  ], 
  [
    "python", 
    161
  ], 
  [
    "basics", 
    118
  ], 
  [
    "web-dev", 
    108
  ], 
  [
    "data-science", 
    51
  ], 
  [
    "best-practices", 
    49
  ], 
  [
    "advanced", 
    45
  ], 
  [
    "django", 
    43
  ], 
  [
    "flask", 
    41
  ]
]


O que vem a seguir?


Bônus grátis: Clique aqui para obter acesso a um tutorial em vídeo gratuito do Flask + Python que mostra como criar o aplicativo Web Flask, passo a passo.

Na Parte 5, juntaremos o cliente e o servidor adicionando Angular à mistura para criar um poller, que enviará uma solicitação a cada cinco segundos para o /results/<job_key> endpoint solicitando atualizações. Assim que os dados estiverem disponíveis, iremos adicioná-los ao DOM.

Saúde!

Esta é uma colaboração entre Cam Linke, cofundador da Startup Edmonton, e o pessoal da Real Python