import websockets
from importlib import import_module
from urllib.parse import parse_qs, urlparse
from django.conf import settings
from django.utils.functional import SimpleLazyObject
[docs]class FakeRequest(object):
def __init__(self, path, session_key):
from django.contrib.auth.middleware import get_user
self.session_key = session_key
self.path = path
self.user = SimpleLazyObject(lambda: get_user(self))
@property
def session(self):
if not hasattr(self, '_session'):
engine = import_module(settings.SESSION_ENGINE)
setattr(self, '_session', engine.SessionStore(self.session_key))
return getattr(self, '_session')
[docs]class RequestServerProtocol(websockets.server.WebSocketServerProtocol):
""" WebSocketServerProtocol that gives handler a request-like
object which might contain the session/user if token was provided.
"""
TOKEN_PARAM = 'token'
[docs] def get_handler_kwargs(self, path, get_header):
query_dict = parse_qs(urlparse(path).query)
session_key = query_dict.get(getattr(settings, 'TG_PUBSUB_TOKEN_PARAM', self.TOKEN_PARAM), [''])[0]
# Construct a request
request = FakeRequest(path, session_key)
return {
'request': request,
}
[docs]class SessionRequiredServerProtocol(RequestServerProtocol):
""" WebSocketServerProtocol which only allows handshakes with a valid token
"""
[docs] def get_handler_kwargs(self, path, get_header):
handler_kwargs = super().get_handler_kwargs(path, get_header)
# If no token provided, stop the handshake.
if not handler_kwargs['request'].session_key:
raise websockets.InvalidOrigin('Permission denied')
return handler_kwargs
[docs]class AnyUserServerProtocol(SessionRequiredServerProtocol):
""" WebSocketServerProtocol implementation that allows any users (that provide a token)
"""
[docs] def has_permissions(self, request):
return request is not None
[docs] def get_handler_kwargs(self, path, get_header):
handler_kwargs = super().get_handler_kwargs(path, get_header)
if not self.has_permissions(handler_kwargs['request']):
raise websockets.InvalidOrigin('Permission denied')
return handler_kwargs
[docs]class AnonymousUserServerProtocol(AnyUserServerProtocol):
""" WebSocketServerProtocol implementation that only allows anonymous users
"""
[docs] def has_permissions(self, request):
if super().has_permissions(request):
return request.user.is_anonymous()
return False
[docs]class AuthenticatedUserServerProtocol(AnyUserServerProtocol):
""" WebSocketServerProtocol implementation that only allows authenticated users
"""
[docs] def has_permissions(self, request):
if super().has_permissions(request):
return request.user.is_authenticated()
return False
[docs]class StaffUserServerProtocol(AuthenticatedUserServerProtocol):
""" WebSocketServerProtocol implementation that only allows staff users
"""
[docs] def has_permissions(self, request):
if super().has_permissions(request):
return request.user.is_staff() or request.user.is_superuser()
return False
[docs]class SuperUserServerProtocol(StaffUserServerProtocol):
""" WebSocketServerProtocol implementation that only allows superusers
"""
[docs] def has_permissions(self, request):
if super().has_permissions(request):
return request.user.is_superuser()
return False