Module briar_wrapper.models.socket_listener

Wrapper around Briar API's websocket stream

Expand source code
# Copyright (c) 2019 Nico Alt
# SPDX-License-Identifier: AGPL-3.0-only
# License-Filename: LICENSE.md
"""
Wrapper around Briar API's websocket stream
"""

import asyncio
import json
from threading import Thread, Lock

import websockets

from briar_wrapper.constants import WEBSOCKET_URL
from briar_wrapper.model import Model


class SocketListener(Model):  # pylint: disable=too-few-public-methods

    def __init__(self, api):
        super().__init__(api)
        self._signals = dict()
        self._signals_lock = Lock()
        self._highest_signal_id = -1
        self._start_websocket_thread()

    def connect(self, event, callback):
        """
        Connects to one of websocket API's `event`s. If the websocket API sends
        out a message with given `event`, `callback` will be called.

        Returns
        -------
        int
            Signal ID used for
            `briar_wrapper.models.socket_listener.SocketListener.disconnect`
            later on

        .. versionadded:: 0.0.3
        """
        self._signals_lock.acquire()
        signal_id = self._add_signal(event, callback)
        self._signals_lock.release()
        return signal_id

    def disconnect(self, signal_id):
        """
        Disconnect signal with `signal_id` from
        `briar_wrapper.models.socket_listener.SocketListener`. The `callback`
        given at
        `briar_wrapper.models.socket_listener.SocketListener.connect` will not
        be called anymore.

        .. versionadded:: 0.0.3
        """
        self._signals_lock.acquire()
        self._remove_signal(signal_id)
        self._signals_lock.release()

    def _add_signal(self, event, callback):
        self._highest_signal_id += 1
        signal_id = self._highest_signal_id
        self._signals[signal_id] = {
            "event": event,
            "callback": callback
        }
        return signal_id

    def _remove_signal(self, signal_id):
        del self._signals[signal_id]

    def _start_websocket_thread(self):
        websocket_thread = Thread(target=self._start_watch_loop,
                                  daemon=True)
        websocket_thread.start()

    def _start_watch_loop(self):
        loop = asyncio.new_event_loop()
        asyncio.set_event_loop(loop)
        loop.create_task(self._start_websocket())
        loop.run_forever()
        loop.close()

    async def _start_websocket(self):
        async with websockets.connect(WEBSOCKET_URL) as websocket:
            await websocket.send(self._api.auth_token)
            await self._watch_messages(websocket)

    async def _watch_messages(self, websocket):
        while not websocket.closed and not\
                asyncio.get_event_loop().is_closed():
            message_json = await websocket.recv()
            message = json.loads(message_json)
            self._call_signal_callbacks(message)
        if not asyncio.get_event_loop().is_closed():
            asyncio.get_event_loop().create_task(
                self._watch_messages(websocket))

    def _call_signal_callbacks(self, message):
        self._signals_lock.acquire()
        for _, signal in self._signals.items():
            if signal["event"] == message['name']:
                signal["callback"](message)
        self._signals_lock.release()

Classes

class SocketListener (api)

Initialize with Api instance api

Expand source code
class SocketListener(Model):  # pylint: disable=too-few-public-methods

    def __init__(self, api):
        super().__init__(api)
        self._signals = dict()
        self._signals_lock = Lock()
        self._highest_signal_id = -1
        self._start_websocket_thread()

    def connect(self, event, callback):
        """
        Connects to one of websocket API's `event`s. If the websocket API sends
        out a message with given `event`, `callback` will be called.

        Returns
        -------
        int
            Signal ID used for
            `briar_wrapper.models.socket_listener.SocketListener.disconnect`
            later on

        .. versionadded:: 0.0.3
        """
        self._signals_lock.acquire()
        signal_id = self._add_signal(event, callback)
        self._signals_lock.release()
        return signal_id

    def disconnect(self, signal_id):
        """
        Disconnect signal with `signal_id` from
        `briar_wrapper.models.socket_listener.SocketListener`. The `callback`
        given at
        `briar_wrapper.models.socket_listener.SocketListener.connect` will not
        be called anymore.

        .. versionadded:: 0.0.3
        """
        self._signals_lock.acquire()
        self._remove_signal(signal_id)
        self._signals_lock.release()

    def _add_signal(self, event, callback):
        self._highest_signal_id += 1
        signal_id = self._highest_signal_id
        self._signals[signal_id] = {
            "event": event,
            "callback": callback
        }
        return signal_id

    def _remove_signal(self, signal_id):
        del self._signals[signal_id]

    def _start_websocket_thread(self):
        websocket_thread = Thread(target=self._start_watch_loop,
                                  daemon=True)
        websocket_thread.start()

    def _start_watch_loop(self):
        loop = asyncio.new_event_loop()
        asyncio.set_event_loop(loop)
        loop.create_task(self._start_websocket())
        loop.run_forever()
        loop.close()

    async def _start_websocket(self):
        async with websockets.connect(WEBSOCKET_URL) as websocket:
            await websocket.send(self._api.auth_token)
            await self._watch_messages(websocket)

    async def _watch_messages(self, websocket):
        while not websocket.closed and not\
                asyncio.get_event_loop().is_closed():
            message_json = await websocket.recv()
            message = json.loads(message_json)
            self._call_signal_callbacks(message)
        if not asyncio.get_event_loop().is_closed():
            asyncio.get_event_loop().create_task(
                self._watch_messages(websocket))

    def _call_signal_callbacks(self, message):
        self._signals_lock.acquire()
        for _, signal in self._signals.items():
            if signal["event"] == message['name']:
                signal["callback"](message)
        self._signals_lock.release()

Ancestors

Methods

def connect(self, event, callback)

Connects to one of websocket API's events. If the websocket API sends out a message with given event, callback will be called.

Returns

int
Signal ID used for SocketListener.disconnect() later on

Added in version: 0.0.3

Expand source code
def connect(self, event, callback):
    """
    Connects to one of websocket API's `event`s. If the websocket API sends
    out a message with given `event`, `callback` will be called.

    Returns
    -------
    int
        Signal ID used for
        `briar_wrapper.models.socket_listener.SocketListener.disconnect`
        later on

    .. versionadded:: 0.0.3
    """
    self._signals_lock.acquire()
    signal_id = self._add_signal(event, callback)
    self._signals_lock.release()
    return signal_id
def disconnect(self, signal_id)

Disconnect signal with signal_id from SocketListener. The callback given at SocketListener.connect() will not be called anymore.

Added in version: 0.0.3

Expand source code
def disconnect(self, signal_id):
    """
    Disconnect signal with `signal_id` from
    `briar_wrapper.models.socket_listener.SocketListener`. The `callback`
    given at
    `briar_wrapper.models.socket_listener.SocketListener.connect` will not
    be called anymore.

    .. versionadded:: 0.0.3
    """
    self._signals_lock.acquire()
    self._remove_signal(signal_id)
    self._signals_lock.release()