PostgreSQL
 sql >> Base de Dados >  >> RDS >> PostgreSQL

SQLAlchemy:agrupar por dia em várias tabelas


SQL trabalha com e retorna dados tabulares (ou relações, se você preferir pensar assim, mas nem todas as tabelas SQL são relações). O que isso implica é que uma tabela aninhada, como descrita na pergunta, não é um recurso tão comum. Existem maneiras de produzir algo do tipo no Postgresql, por exemplo, usando arrays de JSON ou compostos, mas é perfeitamente possível apenas buscar dados tabulares e realizar o aninhamento no aplicativo. Python tem itertools.groupby() , que se encaixa muito bem, considerando os dados classificados.

O erro column "incoming.id" must appear in the GROUP BY clause... está dizendo que não agregados na lista de seleção, cláusula de ter, etc. devem aparecer no GROUP BY cláusula ou ser usado em um agregado, para que não tenham possivelmente valores indeterminados . Em outras palavras, o valor teria que ser escolhido apenas em alguma linha do grupo, porque GROUP BY condensa as linhas agrupadas em uma única linha , e ninguém saberia de qual linha eles foram escolhidos. A implementação pode permitir isso, como o SQLite e o MySQL costumavam fazer, mas o padrão SQL proíbe isso. A exceção à regra é quando há uma dependência funcional ; o GROUP BY cláusula determina os não agregados. Pense em uma junção entre as tabelas A e B agrupados por A chave primária de. Não importa em qual linha em um grupo o sistema escolheria os valores para A 's, elas seriam as mesmas, pois o agrupamento foi feito com base na chave primária.

Para abordar a abordagem geral pretendida de 3 pontos, uma maneira seria selecionar uma união de entrada e saída, ordenada por seus timestamps. Como não há hierarquia de herança setup––como pode não haver um, não estou familiarizado com contabilidade––uma reversão para o uso de tuplas de resultados simples e Core facilita as coisas neste caso:
incoming = select([literal('incoming').label('type'), Incoming.__table__]).\
    where(Incoming.accountID == accountID)

outgoing = select([literal('outgoing').label('type'), Outgoing.__table__]).\
    where(Outgoing.accountID == accountID)

all_entries = incoming.union(outgoing)
all_entries = all_entries.order_by(all_entries.c.timestamp)
all_entries = db_session.execute(all_entries)

Então, para formar a estrutura aninhada itertools.groupby() é usado:
date_groups = groupby(all_entries, lambda ent: ent.timestamp.date())
date_groups = [(k, [dict(ent) for ent in g]) for k, g in date_groups]

O resultado final é uma lista de 2 tuplas de data e uma lista de dicionários de entradas em ordem crescente. Não é bem a solução ORM, mas faz o trabalho. Um exemplo:
In [55]: session.add_all([Incoming(accountID=1, amount=1, description='incoming',
    ...:                           timestamp=datetime.utcnow() - timedelta(days=i))
    ...:                  for i in range(3)])
    ...:                  

In [56]: session.add_all([Outgoing(accountID=1, amount=2, description='outgoing',
    ...:                           timestamp=datetime.utcnow() - timedelta(days=i))
    ...:                  for i in range(3)])
    ...:                  

In [57]: session.commit()

In [58]: incoming = select([literal('incoming').label('type'), Incoming.__table__]).\
    ...:     where(Incoming.accountID == 1)
    ...: 
    ...: outgoing = select([literal('outgoing').label('type'), Outgoing.__table__]).\
    ...:     where(Outgoing.accountID == 1)
    ...: 
    ...: all_entries = incoming.union(outgoing)
    ...: all_entries = all_entries.order_by(all_entries.c.timestamp)
    ...: all_entries = db_session.execute(all_entries)

In [59]: date_groups = groupby(all_entries, lambda ent: ent.timestamp.date())
    ...: [(k, [dict(ent) for ent in g]) for k, g in date_groups]
Out[59]: 
[(datetime.date(2019, 9, 1),
  [{'accountID': 1,
    'amount': 1.0,
    'description': 'incoming',
    'id': 5,
    'timestamp': datetime.datetime(2019, 9, 1, 20, 33, 6, 101521),
    'type': 'incoming'},
   {'accountID': 1,
    'amount': 2.0,
    'description': 'outgoing',
    'id': 4,
    'timestamp': datetime.datetime(2019, 9, 1, 20, 33, 29, 420446),
    'type': 'outgoing'}]),
 (datetime.date(2019, 9, 2),
  [{'accountID': 1,
    'amount': 1.0,
    'description': 'incoming',
    'id': 4,
    'timestamp': datetime.datetime(2019, 9, 2, 20, 33, 6, 101495),
    'type': 'incoming'},
   {'accountID': 1,
    'amount': 2.0,
    'description': 'outgoing',
    'id': 3,
    'timestamp': datetime.datetime(2019, 9, 2, 20, 33, 29, 420419),
    'type': 'outgoing'}]),
 (datetime.date(2019, 9, 3),
  [{'accountID': 1,
    'amount': 1.0,
    'description': 'incoming',
    'id': 3,
    'timestamp': datetime.datetime(2019, 9, 3, 20, 33, 6, 101428),
    'type': 'incoming'},
   {'accountID': 1,
    'amount': 2.0,
    'description': 'outgoing',
    'id': 2,
    'timestamp': datetime.datetime(2019, 9, 3, 20, 33, 29, 420352),
    'type': 'outgoing'}])]

Como mencionado, o Postgresql pode produzir praticamente o mesmo resultado que está usando um array de JSON:
from sqlalchemy.dialects.postgresql import aggregate_order_by

incoming = select([literal('incoming').label('type'), Incoming.__table__]).\
    where(Incoming.accountID == accountID)

outgoing = select([literal('outgoing').label('type'), Outgoing.__table__]).\
    where(Outgoing.accountID == accountID)

all_entries = incoming.union(outgoing).alias('all_entries')

day = func.date_trunc('day', all_entries.c.timestamp)

stmt = select([day,
               func.array_agg(aggregate_order_by(
                   func.row_to_json(literal_column('all_entries.*')),
                   all_entries.c.timestamp))]).\
    group_by(day).\
    order_by(day)

db_session.execute(stmt).fetchall()

Se de fato Incoming e Outgoing podem ser considerados como filhos de uma base comum, por exemplo Entry , o uso de uniões pode ser um pouco automatizado com herança de tabela concreta :
from sqlalchemy.ext.declarative import AbstractConcreteBase

class Entry(AbstractConcreteBase, Base):
    pass

class Incoming(Entry):
    __tablename__ = 'incoming'
    id          = Column(Integer,   primary_key=True)
    accountID   = Column(Integer,   ForeignKey('account.id'))
    amount      = Column(Float,     nullable=False)
    description = Column(Text,      nullable=False)
    timestamp   = Column(TIMESTAMP, nullable=False)
    account     = relationship("Account", back_populates="incomings")

    __mapper_args__ = {
        'polymorphic_identity': 'incoming',
        'concrete': True
    }

class Outgoing(Entry):
    __tablename__ = 'outgoing'
    id          = Column(Integer,   primary_key=True)
    accountID   = Column(Integer,   ForeignKey('account.id'))
    amount      = Column(Float,     nullable=False)
    description = Column(Text,      nullable=False)
    timestamp   = Column(TIMESTAMP, nullable=False)
    account     = relationship("Account", back_populates="outgoings")

    __mapper_args__ = {
        'polymorphic_identity': 'outgoing',
        'concrete': True
    }

Infelizmente usando AbstractConcreteBase requer uma chamada manual para configure_mappers() quando todas as classes necessárias tiverem sido definidas; neste caso, a primeira possibilidade é depois de definir User , porque Account depende dele por meio de relacionamentos:
from sqlalchemy.orm import configure_mappers
configure_mappers()

Então, para buscar todos os Incoming e Outgoing em uma única consulta ORM polimórfica use Entry :
session.query(Entry).\
    filter(Entry.accountID == accountID).\
    order_by(Entry.timestamp).\
    all()

e continue usando itertools.groupby() como acima na lista resultante de Incoming e Outgoing .