Em um nível de superfície, a única coisa sobre a qual tenho dúvidas é a ordem de incrementar o grupo de espera e o enfileiramento do trabalho:
func (s *Scheduler) Enqueue(req interface{}) {
select {
case s.reqChan <- req:
s.wg.Add(1)
}
}
Não acho que o que foi dito acima vá causar muitos problemas nos treinos com essa carga de trabalho grande, mas acho que pode ser uma condição lógica de corrida. Em níveis mais baixos de simultaneidade e tamanhos de trabalho menores, ele pode enfileirar uma mensagem, alternar o conext para uma goroutine que inicia o trabalho nessa mensagem e DEPOIS o trabalho no grupo de espera.
Em seguida, você tem certeza de
process
método é threadsafe?? Eu suponho que, com base na documentação do redis go, execute com go run -race
tem alguma saída? Em algum momento, é completamente razoável e esperado que o desempenho caia. Eu recomendaria iniciar testes de desempenho para ver onde a latência e a taxa de transferência começam a cair:
talvez um pool de 10, 100, 500, 1.000, 2.500, 5.000, 10.000, ou o que fizer sentido. IMO parece que existem 3 variáveis importantes para ajustar:
- Tamanho do pool de workers
- Tamanho do buffer da fila de trabalho
- Redis
MaxActive
A maior coisa que se destaca é que parece que o redis.Pool está configurado para permitir um número ilimitado de conexões:
pool := &redis.Pool{
MaxIdle: 50,
IdleTimeout: 240 * time.Second,
TestOnBorrow: func(c redis.Conn, t time.Time) error {
_, err := c.Do("PING")
return err
},
Dial: func() (redis.Conn, error) {
return dial("tcp", address, password)
},
}
// Número máximo de conexões alocadas pelo pool em um determinado momento.// Quando zero, não há limite no número de conexões no pool.MaxActive int
Eu pessoalmente tentaria entender onde e quando o desempenho começa a cair em relação ao tamanho do seu pool de trabalhadores. Isso pode tornar mais fácil entender quais são as restrições do seu programa.