Esta parte do tutorial detalha como implementar uma fila de tarefas do Redis para lidar com o processamento de texto.
Atualizações:
- 12/02/2020:atualizado para o Python versão 3.8.1, bem como para as versões mais recentes do Redis, Python Redis e RQ. Veja abaixo para detalhes. Mencione um bug na versão RQ mais recente e forneça uma solução. Resolvido o bug http antes de https.
- 22/03/2016:atualizado para o Python versão 3.5.1, bem como para as versões mais recentes do Redis, Python Redis e RQ. Veja abaixo os detalhes.
- 22/02/2015:Adicionado suporte para Python 3.
Bônus grátis: Clique aqui para obter acesso a um tutorial em vídeo gratuito do Flask + Python que mostra como criar o aplicativo Web Flask, passo a passo.
Lembre-se:Aqui está o que estamos construindo:um aplicativo Flask que calcula pares de frequência de palavras com base no texto de um determinado URL.
- Parte um:configurar um ambiente de desenvolvimento local e, em seguida, implantar um ambiente de preparação e de produção no Heroku.
- Parte dois:configure um banco de dados PostgreSQL junto com SQLAlchemy e Alembic para lidar com migrações.
- Parte três:adicione a lógica de back-end para extrair e, em seguida, processar a contagem de palavras de uma página da Web usando as bibliotecas de solicitações, BeautifulSoup e Natural Language Toolkit (NLTK).
- Parte quatro:Implemente uma fila de tarefas do Redis para lidar com o processamento de texto. (atual )
- Parte cinco:configure o Angular no front-end para pesquisar continuamente o back-end para ver se o processamento da solicitação foi concluído.
- Parte seis:enviar para o servidor de teste no Heroku - configurando o Redis e detalhando como executar dois processos (web e worker) em um único Dyno.
- Parte sete:atualize o front-end para torná-lo mais fácil de usar.
- Parte oito:crie uma diretiva angular personalizada para exibir um gráfico de distribuição de frequência usando JavaScript e D3.
Precisa do código? Pegue-o no repositório.
Requisitos de instalação
Ferramentas usadas:
- Redis (5.0.7)
- Python Redis (3.4.1)
- RQ (1.2.2) - uma biblioteca simples para criar uma fila de tarefas
Comece baixando e instalando o Redis do site oficial ou via Homebrew (
brew install redis
). Uma vez instalado, inicie o servidor Redis:$ redis-server
Em seguida, instale o Python Redis e o RQ em uma nova janela de terminal:
$ cd flask-by-example
$ python -m pip install redis==3.4.1 rq==1.2.2
$ python -m pip freeze > requirements.txt
Configurar o trabalhador
Vamos começar criando um processo de trabalho para ouvir as tarefas enfileiradas. Crie um novo arquivo worker.py , e adicione este código:
import os
import redis
from rq import Worker, Queue, Connection
listen = ['default']
redis_url = os.getenv('REDISTOGO_URL', 'redis://localhost:6379')
conn = redis.from_url(redis_url)
if __name__ == '__main__':
with Connection(conn):
worker = Worker(list(map(Queue, listen)))
worker.work()
Aqui, ouvimos uma fila chamada
default
e estabeleceu uma conexão com o servidor Redis em localhost:6379
. Acione isso em outra janela de terminal:
$ cd flask-by-example
$ python worker.py
17:01:29 RQ worker started, version 0.5.6
17:01:29
17:01:29 *** Listening on default...
Agora precisamos atualizar nosso app.py para enviar trabalhos para a fila…
Atualizar app.py
Adicione as seguintes importações a app.py :
from rq import Queue
from rq.job import Job
from worker import conn
Em seguida, atualize a seção de configuração:
app = Flask(__name__)
app.config.from_object(os.environ['APP_SETTINGS'])
app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = True
db = SQLAlchemy(app)
q = Queue(connection=conn)
from models import *
q = Queue(connection=conn)
configurar uma conexão Redis e inicializar uma fila com base nessa conexão. Mova a funcionalidade de processamento de texto de nossa rota de índice para uma nova função chamada
count_and_save_words()
. Essa função aceita um argumento, uma URL, que passaremos para ela quando a chamarmos de nossa rota de índice. def count_and_save_words(url):
errors = []
try:
r = requests.get(url)
except:
errors.append(
"Unable to get URL. Please make sure it's valid and try again."
)
return {"error": errors}
# text processing
raw = BeautifulSoup(r.text).get_text()
nltk.data.path.append('./nltk_data/') # set the path
tokens = nltk.word_tokenize(raw)
text = nltk.Text(tokens)
# remove punctuation, count raw words
nonPunct = re.compile('.*[A-Za-z].*')
raw_words = [w for w in text if nonPunct.match(w)]
raw_word_count = Counter(raw_words)
# stop words
no_stop_words = [w for w in raw_words if w.lower() not in stops]
no_stop_words_count = Counter(no_stop_words)
# save the results
try:
result = Result(
url=url,
result_all=raw_word_count,
result_no_stop_words=no_stop_words_count
)
db.session.add(result)
db.session.commit()
return result.id
except:
errors.append("Unable to add item to database.")
return {"error": errors}
@app.route('/', methods=['GET', 'POST'])
def index():
results = {}
if request.method == "POST":
# this import solves a rq bug which currently exists
from app import count_and_save_words
# get url that the person has entered
url = request.form['url']
if not url[:8].startswith(('https://', 'http://')):
url = 'http://' + url
job = q.enqueue_call(
func=count_and_save_words, args=(url,), result_ttl=5000
)
print(job.get_id())
return render_template('index.html', results=results)
Observe o seguinte código:
job = q.enqueue_call(
func=count_and_save_words, args=(url,), result_ttl=5000
)
print(job.get_id())
Observação: Precisamos importar ascount_and_save_words
função em nossa funçãoindex
pois o pacote RQ atualmente tem um bug, onde não encontra funções no mesmo módulo.
Aqui usamos a fila que inicializamos anteriormente e chamamos de
enqueue_call()
função. Isso adicionou um novo trabalho à fila e esse trabalho executou o count_and_save_words()
função com o URL como argumento. O result_ttl=5000
O argumento de linha informa ao RQ por quanto tempo manter o resultado do trabalho por - 5.000 segundos, neste caso. Em seguida, enviamos o ID do trabalho para o terminal. Este id é necessário para ver se o trabalho é processado. Vamos configurar uma nova rota para isso…
Obter resultados
@app.route("/results/<job_key>", methods=['GET'])
def get_results(job_key):
job = Job.fetch(job_key, connection=conn)
if job.is_finished:
return str(job.result), 200
else:
return "Nay!", 202
Vamos testar isso.
Inicie o servidor, navegue até http://localhost:5000/, use a URL https://realpython.com e pegue o ID do trabalho no terminal. Em seguida, use esse id no endpoint '/results/' - ou seja, http://localhost:5000/results/ef600206-3503-4b87-a436-ddd9438f2197.
Contanto que menos de 5.000 segundos tenham se passado antes de você verificar o status, você deverá ver um número de identificação, que é gerado quando adicionamos os resultados ao banco de dados:
# save the results
try:
from models import Result
result = Result(
url=url,
result_all=raw_word_count,
result_no_stop_words=no_stop_words_count
)
db.session.add(result)
db.session.commit()
return result.id
Agora, vamos refatorar um pouco a rota para retornar os resultados reais do banco de dados em JSON:
@app.route("/results/<job_key>", methods=['GET'])
def get_results(job_key):
job = Job.fetch(job_key, connection=conn)
if job.is_finished:
result = Result.query.filter_by(id=job.result).first()
results = sorted(
result.result_no_stop_words.items(),
key=operator.itemgetter(1),
reverse=True
)[:10]
return jsonify(results)
else:
return "Nay!", 202
Certifique-se de adicionar a importação:
from flask import jsonify
Teste isso novamente. Se tudo correu bem, você deve ver algo semelhante em seu navegador:
[
[
"Python",
315
],
[
"intermediate",
167
],
[
"python",
161
],
[
"basics",
118
],
[
"web-dev",
108
],
[
"data-science",
51
],
[
"best-practices",
49
],
[
"advanced",
45
],
[
"django",
43
],
[
"flask",
41
]
]
O que vem a seguir?
Bônus grátis: Clique aqui para obter acesso a um tutorial em vídeo gratuito do Flask + Python que mostra como criar o aplicativo Web Flask, passo a passo.
Na Parte 5, juntaremos o cliente e o servidor adicionando Angular à mistura para criar um poller, que enviará uma solicitação a cada cinco segundos para o
/results/<job_key>
endpoint solicitando atualizações. Assim que os dados estiverem disponíveis, iremos adicioná-los ao DOM. Saúde!
Esta é uma colaboração entre Cam Linke, cofundador da Startup Edmonton, e o pessoal da Real Python