EDITAÇÃO ATUALIZADA NO FINAL:Mostra o código de trabalho. Módulo principal não modificado, exceto para código de depuração. Observação:eu experimentei o problema que já observei em relação à necessidade de cancelar a assinatura antes do término.
O código parece correto. Eu gostaria de ver como você está instanciando isso.
Em config/application.rb, você provavelmente tem pelo menos algo como:
require 'ws_communication'
config.middleware.use WsCommunication
Então, em seu cliente JavaScript, você deve ter algo assim:
var ws = new WebSocket(uri);
Você instancia outra instância de WsCommunication? Isso definiria @clients para uma matriz vazia e poderia exibir seus sintomas. Algo assim estaria incorreto:
var ws = new WsCommunication;
Ajudaria se você mostrasse o cliente e, talvez, config/application.rb se este post não ajudar.
A propósito, concordo com o comentário de que @clients deve ser protegido por um mutex em qualquer atualização, se não for lido também. É uma estrutura dinâmica que pode mudar a qualquer momento em um sistema orientado a eventos. redis-mutex é uma boa opção. (Espero que o link esteja correto, pois o Github parece estar lançando 500 erros em tudo no momento.)
Você também pode observar que $redis.publish retorna um valor inteiro do número de clientes que receberam a mensagem.
Por fim, você pode achar que precisa garantir que seu canal seja cancelado antes do encerramento. Já tive situações em que acabei enviando cada mensagem várias, até muitas vezes, por causa de assinaturas anteriores no mesmo canal que não foram limpas. Como você está se inscrevendo no canal dentro de um thread, você precisará cancelar a inscrição nesse mesmo thread ou o processo simplesmente "travará" esperando que o thread certo apareça magicamente. Eu lido com essa situação definindo um sinalizador de "cancelamento de inscrição" e enviando uma mensagem. Então, dentro do bloco on.message, eu testo o sinalizador de cancelamento de assinatura e emito o cancelamento de assinatura lá.
O módulo que você forneceu, com apenas pequenas modificações de depuração:
require 'faye/websocket'
require 'redis'
class WsCommunication
KEEPALIVE_TIME = 15 #seconds
CHANNEL = 'vip-deck'
def initialize(app)
@app = app
@clients = []
uri = URI.parse(ENV['REDISCLOUD_URL'])
$redis = Redis.new(host: uri.host, port: uri.port, password: uri.password)
Thread.new do
redis_sub = Redis.new(host: uri.host, port: uri.port, password: uri.password)
redis_sub.subscribe(CHANNEL) do |on|
on.message do |channel, msg|
puts "Message event. Clients receiving:#{@clients.count};"
@clients.each { |ws| ws.send(msg) }
end
end
end
end
def call(env)
if Faye::WebSocket.websocket?(env)
ws = Faye::WebSocket.new(env, nil, {ping: KEEPALIVE_TIME})
ws.on :open do |event|
@clients << ws
puts "Open event. Clients open:#{@clients.count};"
end
ws.on :message do |event|
receivers = $redis.publish(CHANNEL, event.data)
puts "Message published:#{event.data}; Receivers:#{receivers};"
end
ws.on :close do |event|
@clients.delete(ws)
puts "Close event. Clients open:#{@clients.count};"
ws = nil
end
ws.rack_response
else
@app.call(env)
end
end
end
O código do assinante de teste que forneci:
# encoding: UTF-8
puts "Starting client-subscriber.rb"
$:.unshift File.expand_path '../lib', File.dirname(__FILE__)
require 'rubygems'
require 'eventmachine'
require 'websocket-client-simple'
puts "websocket-client-simple v#{WebSocket::Client::Simple::VERSION}"
url = ARGV.shift || 'ws://localhost:3000'
EM.run do
ws = WebSocket::Client::Simple.connect url
ws.on :message do |msg|
puts msg
end
ws.on :open do
puts "-- Subscriber open (#{ws.url})"
end
ws.on :close do |e|
puts "-- Subscriber close (#{e.inspect})"
exit 1
end
ws.on :error do |e|
puts "-- Subscriber error (#{e.inspect})"
end
end
O código do editor de teste que forneci. Editor e Assinante podem ser facilmente combinados, pois estes são apenas testes:
# encoding: UTF-8
puts "Starting client-publisher.rb"
$:.unshift File.expand_path '../lib', File.dirname(__FILE__)
require 'rubygems'
require 'eventmachine'
require 'json'
require 'websocket-client-simple'
puts "websocket-client-simple v#{WebSocket::Client::Simple::VERSION}"
url = ARGV.shift || 'ws://localhost:3000'
EM.run do
count ||= 0
timer = EventMachine.add_periodic_timer(5+rand(5)) do
count += 1
send({"MESSAGE": "COUNT:#{count};"})
end
@ws = WebSocket::Client::Simple.connect url
@ws.on :message do |msg|
puts msg
end
@ws.on :open do
puts "-- Publisher open"
end
@ws.on :close do |e|
puts "-- Publisher close (#{e.inspect})"
exit 1
end
@ws.on :error do |e|
puts "-- Publisher error (#{e.inspect})"
@ws.close
end
def self.send message
payload = message.is_a?(Hash) ? message : {payload: message}
@ws.send(payload.to_json)
end
end
Um exemplo de config.ru que executa tudo isso na camada de middleware do rack:
require './controllers/main'
require './middlewares/ws_communication'
use WsCommunication
run Main.new
Este é o Principal. Eu o retirei da minha versão em execução, então pode precisar de ajustes se você usá-lo:
%w(rubygems bundler sinatra/base json erb).each { |m| require m }
ENV['RACK_ENV'] ||= 'development'
Bundler.require
$: << File.expand_path('../', __FILE__)
$: << File.expand_path('../lib', __FILE__)
Dir["./lib/*.rb", "./lib/**/*.rb"].each { |file| require file }
env = ENV['OS'] == 'Windows_NT' ? 'development' : ENV['RACK_ENV']
class Main < Sinatra::Base
env = ENV['OS'] == 'Windows_NT' ? 'development' : ENV['RACK_ENV']
get "/" do
erb :"index.html"
end
get "/assets/js/application.js" do
content_type :js
@scheme = env == "production" ? "wss://" : "ws://"
erb :"application.js"
end
end