Module briar_wrapper.models.socket_listener
Wrapper around Briar API's websocket stream
Expand source code
# Copyright (c) 2019 Nico Alt
# SPDX-License-Identifier: AGPL-3.0-only
# License-Filename: LICENSE.md
"""
Wrapper around Briar API's websocket stream
"""
import asyncio
import json
from threading import Thread, Lock
import websockets
from briar_wrapper.constants import WEBSOCKET_URL
from briar_wrapper.model import Model
class SocketListener(Model): # pylint: disable=too-few-public-methods
def __init__(self, api):
super().__init__(api)
self._signals = dict()
self._signals_lock = Lock()
self._highest_signal_id = -1
self._start_websocket_thread()
def connect(self, event, callback):
"""
Connects to one of websocket API's `event`s. If the websocket API sends
out a message with given `event`, `callback` will be called.
Returns
-------
int
Signal ID used for
`briar_wrapper.models.socket_listener.SocketListener.disconnect`
later on
.. versionadded:: 0.0.3
"""
self._signals_lock.acquire()
signal_id = self._add_signal(event, callback)
self._signals_lock.release()
return signal_id
def disconnect(self, signal_id):
"""
Disconnect signal with `signal_id` from
`briar_wrapper.models.socket_listener.SocketListener`. The `callback`
given at
`briar_wrapper.models.socket_listener.SocketListener.connect` will not
be called anymore.
.. versionadded:: 0.0.3
"""
self._signals_lock.acquire()
self._remove_signal(signal_id)
self._signals_lock.release()
def _add_signal(self, event, callback):
self._highest_signal_id += 1
signal_id = self._highest_signal_id
self._signals[signal_id] = {
"event": event,
"callback": callback
}
return signal_id
def _remove_signal(self, signal_id):
del self._signals[signal_id]
def _start_websocket_thread(self):
websocket_thread = Thread(target=self._start_watch_loop,
daemon=True)
websocket_thread.start()
def _start_watch_loop(self):
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
loop.create_task(self._start_websocket())
loop.run_forever()
loop.close()
async def _start_websocket(self):
async with websockets.connect(WEBSOCKET_URL) as websocket:
await websocket.send(self._api.auth_token)
await self._watch_messages(websocket)
async def _watch_messages(self, websocket):
while not websocket.closed and not\
asyncio.get_event_loop().is_closed():
message_json = await websocket.recv()
message = json.loads(message_json)
self._call_signal_callbacks(message)
if not asyncio.get_event_loop().is_closed():
asyncio.get_event_loop().create_task(
self._watch_messages(websocket))
def _call_signal_callbacks(self, message):
self._signals_lock.acquire()
for _, signal in self._signals.items():
if signal["event"] == message['name']:
signal["callback"](message)
self._signals_lock.release()
Classes
class SocketListener (api)
-
Initialize with
Api
instanceapi
Expand source code
class SocketListener(Model): # pylint: disable=too-few-public-methods def __init__(self, api): super().__init__(api) self._signals = dict() self._signals_lock = Lock() self._highest_signal_id = -1 self._start_websocket_thread() def connect(self, event, callback): """ Connects to one of websocket API's `event`s. If the websocket API sends out a message with given `event`, `callback` will be called. Returns ------- int Signal ID used for `briar_wrapper.models.socket_listener.SocketListener.disconnect` later on .. versionadded:: 0.0.3 """ self._signals_lock.acquire() signal_id = self._add_signal(event, callback) self._signals_lock.release() return signal_id def disconnect(self, signal_id): """ Disconnect signal with `signal_id` from `briar_wrapper.models.socket_listener.SocketListener`. The `callback` given at `briar_wrapper.models.socket_listener.SocketListener.connect` will not be called anymore. .. versionadded:: 0.0.3 """ self._signals_lock.acquire() self._remove_signal(signal_id) self._signals_lock.release() def _add_signal(self, event, callback): self._highest_signal_id += 1 signal_id = self._highest_signal_id self._signals[signal_id] = { "event": event, "callback": callback } return signal_id def _remove_signal(self, signal_id): del self._signals[signal_id] def _start_websocket_thread(self): websocket_thread = Thread(target=self._start_watch_loop, daemon=True) websocket_thread.start() def _start_watch_loop(self): loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) loop.create_task(self._start_websocket()) loop.run_forever() loop.close() async def _start_websocket(self): async with websockets.connect(WEBSOCKET_URL) as websocket: await websocket.send(self._api.auth_token) await self._watch_messages(websocket) async def _watch_messages(self, websocket): while not websocket.closed and not\ asyncio.get_event_loop().is_closed(): message_json = await websocket.recv() message = json.loads(message_json) self._call_signal_callbacks(message) if not asyncio.get_event_loop().is_closed(): asyncio.get_event_loop().create_task( self._watch_messages(websocket)) def _call_signal_callbacks(self, message): self._signals_lock.acquire() for _, signal in self._signals.items(): if signal["event"] == message['name']: signal["callback"](message) self._signals_lock.release()
Ancestors
Methods
def connect(self, event, callback)
-
Connects to one of websocket API's
event
s. If the websocket API sends out a message with givenevent
,callback
will be called.Returns
int
- Signal ID used for
SocketListener.disconnect()
later on
Added in version: 0.0.3
Expand source code
def connect(self, event, callback): """ Connects to one of websocket API's `event`s. If the websocket API sends out a message with given `event`, `callback` will be called. Returns ------- int Signal ID used for `briar_wrapper.models.socket_listener.SocketListener.disconnect` later on .. versionadded:: 0.0.3 """ self._signals_lock.acquire() signal_id = self._add_signal(event, callback) self._signals_lock.release() return signal_id
def disconnect(self, signal_id)
-
Disconnect signal with
signal_id
fromSocketListener
. Thecallback
given atSocketListener.connect()
will not be called anymore.Added in version: 0.0.3
Expand source code
def disconnect(self, signal_id): """ Disconnect signal with `signal_id` from `briar_wrapper.models.socket_listener.SocketListener`. The `callback` given at `briar_wrapper.models.socket_listener.SocketListener.connect` will not be called anymore. .. versionadded:: 0.0.3 """ self._signals_lock.acquire() self._remove_signal(signal_id) self._signals_lock.release()