Redis
 sql >> Base de Dados >  >> NoSQL >> Redis

Acessando uma variável dentro de um thread de trilhos


EDITAÇÃO ATUALIZADA NO FINAL:Mostra o código de trabalho. Módulo principal não modificado, exceto para código de depuração. Observação:eu experimentei o problema que já observei em relação à necessidade de cancelar a assinatura antes do término.

O código parece correto. Eu gostaria de ver como você está instanciando isso.

Em config/application.rb, você provavelmente tem pelo menos algo como:
require 'ws_communication'
config.middleware.use WsCommunication

Então, em seu cliente JavaScript, você deve ter algo assim:
var ws = new WebSocket(uri);

Você instancia outra instância de WsCommunication? Isso definiria @clients para uma matriz vazia e poderia exibir seus sintomas. Algo assim estaria incorreto:
var ws = new WsCommunication;

Ajudaria se você mostrasse o cliente e, talvez, config/application.rb se este post não ajudar.

A propósito, concordo com o comentário de que @clients deve ser protegido por um mutex em qualquer atualização, se não for lido também. É uma estrutura dinâmica que pode mudar a qualquer momento em um sistema orientado a eventos. redis-mutex é uma boa opção. (Espero que o link esteja correto, pois o Github parece estar lançando 500 erros em tudo no momento.)

Você também pode observar que $redis.publish retorna um valor inteiro do número de clientes que receberam a mensagem.

Por fim, você pode achar que precisa garantir que seu canal seja cancelado antes do encerramento. Já tive situações em que acabei enviando cada mensagem várias, até muitas vezes, por causa de assinaturas anteriores no mesmo canal que não foram limpas. Como você está se inscrevendo no canal dentro de um thread, você precisará cancelar a inscrição nesse mesmo thread ou o processo simplesmente "travará" esperando que o thread certo apareça magicamente. Eu lido com essa situação definindo um sinalizador de "cancelamento de inscrição" e enviando uma mensagem. Então, dentro do bloco on.message, eu testo o sinalizador de cancelamento de assinatura e emito o cancelamento de assinatura lá.

O módulo que você forneceu, com apenas pequenas modificações de depuração:
require 'faye/websocket'
require 'redis'

class WsCommunication
  KEEPALIVE_TIME = 15 #seconds
  CHANNEL = 'vip-deck'

  def initialize(app)
    @app = app
    @clients = []
    uri = URI.parse(ENV['REDISCLOUD_URL'])
    $redis = Redis.new(host: uri.host, port: uri.port, password: uri.password)
    Thread.new do
      redis_sub = Redis.new(host: uri.host, port: uri.port, password: uri.password)
      redis_sub.subscribe(CHANNEL) do |on|
        on.message do |channel, msg|
          puts "Message event. Clients receiving:#{@clients.count};"
          @clients.each { |ws| ws.send(msg) }
        end
      end
    end
  end

  def call(env)
    if Faye::WebSocket.websocket?(env)
      ws = Faye::WebSocket.new(env, nil, {ping: KEEPALIVE_TIME})

      ws.on :open do |event|
        @clients << ws
        puts "Open event. Clients open:#{@clients.count};"
      end

      ws.on :message do |event|
        receivers = $redis.publish(CHANNEL, event.data)
        puts "Message published:#{event.data}; Receivers:#{receivers};"
      end

      ws.on :close do |event|
        @clients.delete(ws)
        puts "Close event. Clients open:#{@clients.count};"
        ws = nil
      end

      ws.rack_response
    else
      @app.call(env)
    end
  end
end

O código do assinante de teste que forneci:
# encoding: UTF-8
puts "Starting client-subscriber.rb"
$:.unshift File.expand_path '../lib', File.dirname(__FILE__)
require 'rubygems'
require 'eventmachine'
require 'websocket-client-simple'

puts "websocket-client-simple v#{WebSocket::Client::Simple::VERSION}"

url = ARGV.shift || 'ws://localhost:3000'

EM.run do

  ws = WebSocket::Client::Simple.connect url

  ws.on :message do |msg|
    puts msg
  end

  ws.on :open do
    puts "-- Subscriber open (#{ws.url})"
  end

  ws.on :close do |e|
    puts "-- Subscriber close (#{e.inspect})"
    exit 1
  end

  ws.on :error do |e|
    puts "-- Subscriber error (#{e.inspect})"
  end

end

O código do editor de teste que forneci. Editor e Assinante podem ser facilmente combinados, pois estes são apenas testes:
# encoding: UTF-8
puts "Starting client-publisher.rb"
$:.unshift File.expand_path '../lib', File.dirname(__FILE__)
require 'rubygems'
require 'eventmachine'
require 'json'
require 'websocket-client-simple'

puts "websocket-client-simple v#{WebSocket::Client::Simple::VERSION}"

url = ARGV.shift || 'ws://localhost:3000'

EM.run do
  count ||= 0
  timer = EventMachine.add_periodic_timer(5+rand(5)) do
    count += 1
    send({"MESSAGE": "COUNT:#{count};"})
  end

  @ws = WebSocket::Client::Simple.connect url

  @ws.on :message do |msg|
    puts msg
  end

  @ws.on :open do
    puts "-- Publisher open"
  end

  @ws.on :close do |e|
    puts "-- Publisher close (#{e.inspect})"
    exit 1
  end

  @ws.on :error do |e|
    puts "-- Publisher error (#{e.inspect})"
    @ws.close
  end

  def self.send message
    payload = message.is_a?(Hash) ? message : {payload: message}
    @ws.send(payload.to_json)
  end
end

Um exemplo de config.ru que executa tudo isso na camada de middleware do rack:
require './controllers/main'
require './middlewares/ws_communication'
use WsCommunication
run Main.new

Este é o Principal. Eu o retirei da minha versão em execução, então pode precisar de ajustes se você usá-lo:
%w(rubygems bundler sinatra/base json erb).each { |m| require m }
ENV['RACK_ENV'] ||= 'development'
Bundler.require
$: << File.expand_path('../', __FILE__)
$: << File.expand_path('../lib', __FILE__)

Dir["./lib/*.rb", "./lib/**/*.rb"].each { |file| require file }
env = ENV['OS'] == 'Windows_NT' ? 'development' : ENV['RACK_ENV']

  class Main < Sinatra::Base

    env = ENV['OS'] == 'Windows_NT' ? 'development' : ENV['RACK_ENV']
    get "/" do
      erb :"index.html"
    end

    get "/assets/js/application.js" do
      content_type :js
      @scheme = env == "production" ? "wss://" : "ws://"
      erb :"application.js"
    end
  end