Source code for tg_pubsub.worker

import asyncio
import json
import logging
import time

import websockets

from django.utils.encoding import force_text
from rest_framework.utils import encoders

from .exceptions import InvalidMessageException, IgnoreMessageException

from . import pubsub

from .config import get_protocol_handler_klass, get_hello_packets, get_pubsub_server_ping_delta
from .messages import registry


logger = logging.getLogger('tg_pubsub.server')


[docs]class HandlerProtocol(object): def __init__(self, ws, request): self.socket = ws self.request = request assert self.socket is not None @property def open(self): return self.socket.open @property def user(self): if self.request is None: from django.contrib.auth.models import AnonymousUser return AnonymousUser() return self.request.user @property def logging_key(self): return 'tg_pubsub.handler-%s' % (self.user.pk or 'none') @asyncio.coroutine
[docs] def ping(self): yield from self.socket.ping()
@asyncio.coroutine
[docs] def send(self, data): if isinstance(data, dict): data = json.dumps(data, cls=encoders.JSONEncoder) yield from self.socket.send(data)
[docs]class WebSocketHandler(object): def __init__(self, ws, request): super().__init__() self.ws = HandlerProtocol(ws, request) self.logger = logging.getLogger(self.ws.logging_key) @asyncio.coroutine
[docs] def run(self): yield from self.send_hello() yield from self.send_on_change()
@asyncio.coroutine
[docs] def ping(self): yield from self.ws.ping()
@asyncio.coroutine
[docs] def send_hello(self): packets = get_hello_packets() self.logger.debug("Sending %d hello packets", len(packets)) for packet in packets: yield from self.ws.send(packet.prepare_for_send(self.ws, packet.data)) self.logger.debug("Sent %d hello packets", len(packets))
@asyncio.coroutine
[docs] def send_on_change(self): # Create Redis connection, subscribe channels self.logger.debug("Entering main Redis loop") r = pubsub.create_redis_connection() p = r.pubsub() # Subscribe to django channel. p.subscribe('django') last = None should_ping = get_pubsub_server_ping_delta() while self.ws.open: now = time.time() if should_ping: if last is None or last < now - should_ping: self.logger.debug('send ping: last: %s, current_time: %s', last, now) last = now yield from self.ping() msg = p.get_message(ignore_subscribe_messages=True) if msg is None: # If there was no message, sleep for one second, and try again yield from asyncio.sleep(1.0) continue try: self.logger.debug("Got update from Redis: %s", msg) identifier, data = self.message_valid(msg) except InvalidMessageException: continue else: try: yield from self.ws.send(identifier.prepare_for_send(self.ws, data)) except IgnoreMessageException as e: self.logger.debug('%s' % e)
@classmethod
[docs] def message_valid(cls, message): msg_data = force_text(message['data']) if not msg_data or ':' not in msg_data: raise InvalidMessageException() identifier, data = force_text(msg_data).split(':', 1) if not (identifier and data): raise InvalidMessageException() # Only accept valid identifiers if identifier not in registry.keys(): raise InvalidMessageException() # Parse the message body try: data = json.loads(data) except: raise InvalidMessageException() return registry[identifier], data
@asyncio.coroutine
[docs]def client_handler(ws, path, request=None): handler = WebSocketHandler(ws, request) yield from handler.run()
[docs]def run_pubsub_server(host, port): logger.info("Starting pubsub server on %s:%d", host, port) start_server = websockets.serve(client_handler, host, port, klass=get_protocol_handler_klass()) asyncio.get_event_loop().run_until_complete(start_server) asyncio.get_event_loop().run_forever()