PostgreSQL
 sql >> Base de Dados >  >> RDS >> PostgreSQL

Fila de tarefas como tabela SQL com vários consumidores (PostgreSQL)


Eu uso postgres para uma fila FIFO também. Eu originalmente usei ACCESS EXCLUSIVE, que produz resultados corretos em alta simultaneidade, mas tem o efeito infeliz de ser mutuamente exclusivo com pg_dump, que adquire um bloqueio ACCESS SHARE durante sua execução. Isso faz com que minha função next() bloqueie por muito tempo (a duração do pg_dump). Isso não era aceitável, pois somos uma loja 24 horas por dia, 7 dias por semana e os clientes não gostaram do tempo morto na fila no meio da noite.

Achei que deveria haver um bloqueio menos restritivo que ainda seria seguro para concorrência e não bloquearia enquanto o pg_dump estivesse em execução. Minha busca me levou a este post SO.

Então fiz algumas pesquisas.

Os seguintes modos são suficientes para uma função FIFO queue NEXT() que atualizará o status de um trabalho de enfileirado para executar sem nenhuma falha de simultaneidade e também não bloquear contra pg_dump:
SHARE UPDATE EXCLUSIVE
SHARE ROW EXCLUSIVE
EXCLUSIVE

Inquerir:
begin;
lock table tx_test_queue in exclusive mode;
update 
    tx_test_queue
set 
    status='running'
where
    job_id in (
        select
            job_id
        from
            tx_test_queue
        where
            status='queued'
        order by 
            job_id asc
        limit 1
    )
returning job_id;
commit;

Resultado se parece com:
UPDATE 1
 job_id
--------
     98
(1 row)

Aqui está um script de shell que testa todos os diferentes modos de bloqueio em alta simultaneidade (30).
#!/bin/bash
# RESULTS, feel free to repro yourself
#
# noLock                    FAIL
# accessShare               FAIL
# rowShare                  FAIL
# rowExclusive              FAIL
# shareUpdateExclusive      SUCCESS
# share                     FAIL+DEADLOCKS
# shareRowExclusive         SUCCESS
# exclusive                 SUCCESS
# accessExclusive           SUCCESS, but LOCKS against pg_dump

#config
strategy="exclusive"

db=postgres
dbuser=postgres
queuecount=100
concurrency=30

# code
psql84 -t -U $dbuser $db -c "create table tx_test_queue (job_id serial, status text);"
# empty queue
psql84 -t -U $dbuser $db -c "truncate tx_test_queue;";
echo "Simulating 10 second pg_dump with ACCESS SHARE"
psql84 -t -U $dbuser $db -c "lock table tx_test_queue in ACCESS SHARE mode; select pg_sleep(10); select 'pg_dump finished...'" &

echo "Starting workers..."
# queue $queuecount items
seq $queuecount | xargs -n 1 -P $concurrency -I {} psql84 -q -U $dbuser $db -c "insert into tx_test_queue (status) values ('queued');"
#psql84 -t -U $dbuser $db -c "select * from tx_test_queue order by job_id;"
# process $queuecount w/concurrency of $concurrency
case $strategy in
    "noLock")               strategySql="update tx_test_queue set status='running{}' where job_id in (select job_id from tx_test_queue where status='queued' order by job_id asc limit 1);";;
    "accessShare")          strategySql="lock table tx_test_queue in ACCESS SHARE mode; update tx_test_queue set status='running{}' where job_id in (select job_id from tx_test_queue where status='queued' order by job_id asc limit 1);";;
    "rowShare")             strategySql="lock table tx_test_queue in ROW SHARE mode; update tx_test_queue set status='running{}' where job_id in (select job_id from tx_test_queue where status='queued' order by job_id asc limit 1);";;
    "rowExclusive")         strategySql="lock table tx_test_queue in ROW EXCLUSIVE mode; update tx_test_queue set status='running{}' where job_id in (select job_id from tx_test_queue where status='queued' order by job_id asc limit 1);";;
    "shareUpdateExclusive") strategySql="lock table tx_test_queue in SHARE UPDATE EXCLUSIVE mode; update tx_test_queue set status='running{}' where job_id in (select job_id from tx_test_queue where status='queued' order by job_id asc limit 1);";;
    "share")                strategySql="lock table tx_test_queue in SHARE mode; update tx_test_queue set status='running{}' where job_id in (select job_id from tx_test_queue where status='queued' order by job_id asc limit 1);";;
    "shareRowExclusive")    strategySql="lock table tx_test_queue in SHARE ROW EXCLUSIVE mode; update tx_test_queue set status='running{}' where job_id in (select job_id from tx_test_queue where status='queued' order by job_id asc limit 1);";;
    "exclusive")            strategySql="lock table tx_test_queue in EXCLUSIVE mode; update tx_test_queue set status='running{}' where job_id in (select job_id from tx_test_queue where status='queued' order by job_id asc limit 1);";;
    "accessExclusive")      strategySql="lock table tx_test_queue in ACCESS EXCLUSIVE mode; update tx_test_queue set status='running{}' where job_id in (select job_id from tx_test_queue where status='queued' order by job_id asc limit 1);";;
    *) echo "Unknown strategy $strategy";;
esac
echo $strategySql
seq $queuecount | xargs -n 1 -P $concurrency -I {} psql84 -U $dbuser $db -c "$strategySql"
#psql84 -t -U $dbuser $db -c "select * from tx_test_queue order by job_id;"
psql84 -U $dbuser $db -c "select count(distinct(status)) as should_output_100 from tx_test_queue;"
psql84 -t -U $dbuser $db -c "drop table tx_test_queue;";

O código também está aqui se você quiser editar:https://gist.github.com/1083936

Estou atualizando meu aplicativo para usar o modo EXCLUSIVE, pois é o modo mais restritivo que a) está correto eb) não entra em conflito com pg_dump. Eu escolhi o mais restritivo, pois parece o menos arriscado em termos de alterar o aplicativo de ACCESS EXCLUSIVE sem ser um especialista em bloqueio de postgres.

Sinto-me bastante confortável com meu equipamento de teste e com as ideias gerais por trás da resposta. Espero que compartilhar isso ajude a resolver esse problema para outras pessoas.