SQL trabalha com e retorna dados tabulares (ou relações, se você preferir pensar assim, mas nem todas as tabelas SQL são relações). O que isso implica é que uma tabela aninhada, como descrita na pergunta, não é um recurso tão comum. Existem maneiras de produzir algo do tipo no Postgresql, por exemplo, usando arrays de JSON ou compostos, mas é perfeitamente possível apenas buscar dados tabulares e realizar o aninhamento no aplicativo. Python tem
itertools.groupby()
, que se encaixa muito bem, considerando os dados classificados. O erro
column "incoming.id" must appear in the GROUP BY clause...
está dizendo que não agregados na lista de seleção, cláusula de ter, etc. devem aparecer no GROUP BY
cláusula ou ser usado em um agregado, para que não tenham possivelmente valores indeterminados . Em outras palavras, o valor teria que ser escolhido apenas em alguma linha do grupo, porque GROUP BY
condensa as linhas agrupadas em uma única linha , e ninguém saberia de qual linha eles foram escolhidos. A implementação pode permitir isso, como o SQLite e o MySQL costumavam fazer, mas o padrão SQL proíbe isso. A exceção à regra é quando há uma dependência funcional
; o GROUP BY
cláusula determina os não agregados. Pense em uma junção entre as tabelas A e B agrupados por A chave primária de. Não importa em qual linha em um grupo o sistema escolheria os valores para A 's, elas seriam as mesmas, pois o agrupamento foi feito com base na chave primária. Para abordar a abordagem geral pretendida de 3 pontos, uma maneira seria selecionar uma união de entrada e saída, ordenada por seus timestamps. Como não há hierarquia de herança setup––como pode não haver um, não estou familiarizado com contabilidade––uma reversão para o uso de tuplas de resultados simples e Core facilita as coisas neste caso:
incoming = select([literal('incoming').label('type'), Incoming.__table__]).\
where(Incoming.accountID == accountID)
outgoing = select([literal('outgoing').label('type'), Outgoing.__table__]).\
where(Outgoing.accountID == accountID)
all_entries = incoming.union(outgoing)
all_entries = all_entries.order_by(all_entries.c.timestamp)
all_entries = db_session.execute(all_entries)
Então, para formar a estrutura aninhada
itertools.groupby()
é usado:date_groups = groupby(all_entries, lambda ent: ent.timestamp.date())
date_groups = [(k, [dict(ent) for ent in g]) for k, g in date_groups]
O resultado final é uma lista de 2 tuplas de data e uma lista de dicionários de entradas em ordem crescente. Não é bem a solução ORM, mas faz o trabalho. Um exemplo:
In [55]: session.add_all([Incoming(accountID=1, amount=1, description='incoming',
...: timestamp=datetime.utcnow() - timedelta(days=i))
...: for i in range(3)])
...:
In [56]: session.add_all([Outgoing(accountID=1, amount=2, description='outgoing',
...: timestamp=datetime.utcnow() - timedelta(days=i))
...: for i in range(3)])
...:
In [57]: session.commit()
In [58]: incoming = select([literal('incoming').label('type'), Incoming.__table__]).\
...: where(Incoming.accountID == 1)
...:
...: outgoing = select([literal('outgoing').label('type'), Outgoing.__table__]).\
...: where(Outgoing.accountID == 1)
...:
...: all_entries = incoming.union(outgoing)
...: all_entries = all_entries.order_by(all_entries.c.timestamp)
...: all_entries = db_session.execute(all_entries)
In [59]: date_groups = groupby(all_entries, lambda ent: ent.timestamp.date())
...: [(k, [dict(ent) for ent in g]) for k, g in date_groups]
Out[59]:
[(datetime.date(2019, 9, 1),
[{'accountID': 1,
'amount': 1.0,
'description': 'incoming',
'id': 5,
'timestamp': datetime.datetime(2019, 9, 1, 20, 33, 6, 101521),
'type': 'incoming'},
{'accountID': 1,
'amount': 2.0,
'description': 'outgoing',
'id': 4,
'timestamp': datetime.datetime(2019, 9, 1, 20, 33, 29, 420446),
'type': 'outgoing'}]),
(datetime.date(2019, 9, 2),
[{'accountID': 1,
'amount': 1.0,
'description': 'incoming',
'id': 4,
'timestamp': datetime.datetime(2019, 9, 2, 20, 33, 6, 101495),
'type': 'incoming'},
{'accountID': 1,
'amount': 2.0,
'description': 'outgoing',
'id': 3,
'timestamp': datetime.datetime(2019, 9, 2, 20, 33, 29, 420419),
'type': 'outgoing'}]),
(datetime.date(2019, 9, 3),
[{'accountID': 1,
'amount': 1.0,
'description': 'incoming',
'id': 3,
'timestamp': datetime.datetime(2019, 9, 3, 20, 33, 6, 101428),
'type': 'incoming'},
{'accountID': 1,
'amount': 2.0,
'description': 'outgoing',
'id': 2,
'timestamp': datetime.datetime(2019, 9, 3, 20, 33, 29, 420352),
'type': 'outgoing'}])]
Como mencionado, o Postgresql pode produzir praticamente o mesmo resultado que está usando um array de JSON:
from sqlalchemy.dialects.postgresql import aggregate_order_by
incoming = select([literal('incoming').label('type'), Incoming.__table__]).\
where(Incoming.accountID == accountID)
outgoing = select([literal('outgoing').label('type'), Outgoing.__table__]).\
where(Outgoing.accountID == accountID)
all_entries = incoming.union(outgoing).alias('all_entries')
day = func.date_trunc('day', all_entries.c.timestamp)
stmt = select([day,
func.array_agg(aggregate_order_by(
func.row_to_json(literal_column('all_entries.*')),
all_entries.c.timestamp))]).\
group_by(day).\
order_by(day)
db_session.execute(stmt).fetchall()
Se de fato
Incoming
e Outgoing
podem ser considerados como filhos de uma base comum, por exemplo Entry
, o uso de uniões pode ser um pouco automatizado com herança de tabela concreta
:from sqlalchemy.ext.declarative import AbstractConcreteBase
class Entry(AbstractConcreteBase, Base):
pass
class Incoming(Entry):
__tablename__ = 'incoming'
id = Column(Integer, primary_key=True)
accountID = Column(Integer, ForeignKey('account.id'))
amount = Column(Float, nullable=False)
description = Column(Text, nullable=False)
timestamp = Column(TIMESTAMP, nullable=False)
account = relationship("Account", back_populates="incomings")
__mapper_args__ = {
'polymorphic_identity': 'incoming',
'concrete': True
}
class Outgoing(Entry):
__tablename__ = 'outgoing'
id = Column(Integer, primary_key=True)
accountID = Column(Integer, ForeignKey('account.id'))
amount = Column(Float, nullable=False)
description = Column(Text, nullable=False)
timestamp = Column(TIMESTAMP, nullable=False)
account = relationship("Account", back_populates="outgoings")
__mapper_args__ = {
'polymorphic_identity': 'outgoing',
'concrete': True
}
Infelizmente usando
AbstractConcreteBase
requer uma chamada manual para configure_mappers()
quando todas as classes necessárias tiverem sido definidas; neste caso, a primeira possibilidade é depois de definir User
, porque Account
depende dele por meio de relacionamentos:from sqlalchemy.orm import configure_mappers
configure_mappers()
Então, para buscar todos os
Incoming
e Outgoing
em uma única consulta ORM polimórfica use Entry
:session.query(Entry).\
filter(Entry.accountID == accountID).\
order_by(Entry.timestamp).\
all()
e continue usando
itertools.groupby()
como acima na lista resultante de Incoming
e Outgoing
.