1
如何从一个自定义manage.py命令将消息发送到一个Django消费者如何从一个命令将消息发送到一个频道
from django.core.management.base import BaseCommand, CommandError
from channels import Channel
class Command(BaseCommand):
help = 'Sends a message to a Django channel from the thing'
def add_arguments(self, parser):
parser.add_argument('json_object', nargs='+', type=str)
def handle(self, *args, **options):
self.stdout.write("TEST !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!")
print Channel("test").channel_layer
Channel("test").send({'op':options['json_object'][0]})
这是我的消费者
class MyConsumer(WebsocketConsumer):
@classmethod
def channel_names(self):
return {"test"}
def connection_groups(self):
return ["test"]
def dispatch(self, message, **kwargs):
return self.get_handler(message, **kwargs)(message, **kwargs)
def get_handler(self, message, **kwargs):
channel_routing = [
consumers.MyConsumer.as_route(path=r"^/test/"),
route("test.receive", consumers.chat_join),
] for _filter, value in kwargs.items():
filter_mapping = getattr(self, _filter + '_mapping', None)
if not filter_mapping:
continue
consumer = getattr(self, filter_mapping.get(value), None)
if consumer:
return consumer
raise ValueError('Message')
def connect(self,message):
self.message.reply_channel.send({"accept": True})
def receive(self,text=None, bytes= None):
print text
def disconnect(self,message):
pass
当我尝试但是运行命令,我得到这个消息
2017年3月8日03:45:33839 - 错误 - 工人 - 无法找到匹配的消息上测试!检查你的路由。
如果它是贴切的,这里是我的路由
channel_routing = [
consumers.MyConsumer.as_route(path=r"^/test/"),
]
我有一个非常类似的问题的原因,你有没有设法解决这个问题? – kunambi
我实际上创建了RedisChannelLayer的自定义版本并覆盖了这些方法 – cjds