2017-07-28 30 views
3

我正在使用使用websockets的Django和Django频道构建web应用程序。从另一个消费者在一个Django频道消费者中断开循环

当用户点击浏览器上的按钮,可将WebSocket将数据发送到我的服务器和客户服务器上启动每秒消息发送到客户端一次(环路)。

我想创建另一个按钮,将停止该数据发送过程。当用户点击这个新按钮时,websocket会向服务器发送另一个数据,并且服务器上的使用者必须以某种方式停止之前使用者的循环。此外,我会要求这停止客户端断开连接时的循环。

我觉得想要使用全局变量。但是Django Channels文档声明,他们强烈建议不要使用全局变量,因为他们希望保持应用程序网络的透明性(不太了解这一点)。

我试过使用通道会话。我让第二个消费者更新频道会话中的值,但频道会话值没有在第一个消费者中更新。

这是简化的代码。 浏览器:在服务器

var socket = new WebSocket("ws://" + window.location.host + "/socket/"); 
$('#button1').on('click', function() { 
    socket.send(JSON.stringify({action: 'start_getting_values'})) 
}); 
$('#button2').on('click', function() { 
    socket.send(JSON.stringify({action: 'stop_getting_values'})) 
}); 

消费者:

@channel_session 
def ws_message(message): 
    text = json.loads(message.content['text']) 

    if text['action'] == 'start_getting_values': 
     while True: 
      # Getting some data here 
      # ... 
      message.reply_channel.send({"text": some_data}, immediately=True) 
      time.sleep(1) 

    if text['action'] == 'stop_getting_values': 
     do_something_to_stop_the_loop_above() 
+0

你找到一个解决这个问题? – Vingtoft

+0

@Vingtoft是的,请检查我的答案。 –

回答

3

好吧,我设法解决这个任务后,我自己我接触Django的渠道开发。

使内消费者循环的做法是不好的,因为它会阻止该网站一旦消费者将要运行的次数等于相当于运行此消费的所有工人的线程数量。

所以我的方法如下:一旦消费者获得'start_getting_values'信号,它将当前的回复通道添加到组中,并在连接到的Redis服务器上增加值(我使用Redis作为通道层后端,但它将工作在任何其他后端)。

它增加了什么值?在Redis上,我有一个关键字,表示散列对象类型的“组”。该键的每个键表示通道中的一个组,而值表示该组中回复通道的数量。

然后我创建,我连接到相同的Redis服务器一个新的Python文件。在这个文件中,我运行无限循环,从Redis的关键'组'中加载字典。然后我在这个词典中循环每个键(每个键代表通道组的名称)并将数据广播到具有非零值的每个组。当我运行这个文件时,它作为单独的进程运行,因此不会阻止消费者的任何事情。

要停止向用户播放,当我从他那里得到适当的信号时,我只是将他从组中删除,并减少相应的Redis值。

消费者代码:

import redis 

redis_client = redis.StrictRedis(host='localhost', port=6379, db=0) 

@channel_session_user 
def ws_message(message): 

    text = json.loads(message.content['text']) 

    if text['stream'] == 'start_getting_values': 
     value_id = text['value_id'] 
     redis_client.hincrby('redis_some_key', value_id, 1) 
     Group(value_id).add(message.reply_channel) 
     channel_session['value_id'] = value_id 
     return 0 

    if text['stream'] == 'stop_getting_values': 
     if message.channel_session['value_id'] != '': 
      value_id = message.channel_session['value_id'] 
      Group(value_id).discard(message.reply_channel) 

      l = redis_client.lock(name='del_lock') 
      val = redis_client.hincrby('redis_some_key', value_id, -1) 
      if (val <= 0): 
       redis_client.hdel('redis_some_key', value_id) 
      l.release() 
     return 0 

单独的Python文件:

import time 
import redis 
from threading import Thread 
import asgi_redis 


redis_client = redis.StrictRedis(host='localhost', port=6379, db=0) 
cl = asgi_redis.RedisChannelLayer() 

def some_action(value_id): 

    # getting some data based on value_id 
    # .... 

    cl.send_group(value_id, { 
     "text": some_data, 
    }) 


while True: 
    value_ids = redis_client.hgetall('redis_some_key') 

    ths = [] 
    for b_value_id in value_ids.keys(): 
     value_id = b_value_id.decode("utf-8") 
     ths.append(Thread(target=some_action, args=(value_id,))) 

    for th in ths: 
     th.start() 
    for th in ths: 
     th.join() 


    time.sleep(1) 
+0

感谢您的解释!你有任何链接到你使用的资源或代码示例?谢谢! – Vingtoft

+1

@Vingtoft不客气。添加了代码片段。 –

相关问题