Hot-keys on this page
r m x p toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1import asyncio
2import msgpack
3from .request import RequestType
4import logging
5from typing import Callable, Any
8logger = logging.getLogger(__name__)
11class Server(object):
12 """RPC server"""
14 def __init__(self,
15 handler: Any,
16 *,
17 packer: msgpack.Packer = None,
18 unpacker_factory: Callable[[], msgpack.Unpacker] = lambda: msgpack.Unpacker(raw=False),
19 loop: asyncio.AbstractEventLoop = None) -> None:
20 self._handler = handler
21 self._packer = packer if packer is not None else msgpack.Packer(use_bin_type=True)
22 self._unpacked_factory = unpacker_factory
23 self._loop = loop if loop is not None else asyncio.get_event_loop()
25 async def __call__(self, reader: asyncio.StreamReader, writer: asyncio.StreamWriter) -> None:
26 """Coroutine to serve a client connection"""
27 try:
28 # create an unpacker for this connection
29 unpacker = self._unpacked_factory()
30 while True:
31 data = await reader.read(n=4096)
32 if not data:
33 raise ConnectionError("Client connection closed")
34 unpacker.feed(data)
35 for obj in unpacker:
36 # for every received object create a task to handle it
37 self._loop.create_task(self._handle_request(obj, writer))
38 except ConnectionError:
39 pass
40 except Exception:
41 logger.exception("Uncaught exception in client servicer")
43 async def _handle_request(self, obj: Any, writer: asyncio.StreamWriter) -> None:
44 try:
45 if obj[0] == RequestType.REQUEST:
46 _, msgid, name, params = obj
47 try:
48 # handler can be a coroutine or a plain function
49 result = getattr(self._handler, name)(*params)
50 if asyncio.iscoroutine(result):
51 result = await result
52 response = (RequestType.RESPONSE, msgid, None, result)
53 except Exception as e:
54 logger.info("Exception %r in call handler %r", e, name)
55 response = (RequestType.RESPONSE, msgid, str(e), None)
56 writer.write(self._packer.pack(response))
58 elif obj[0] == RequestType.NOTIFY:
59 _, name, params = obj
60 try:
61 result = getattr(self._handler, name)(*params)
62 if asyncio.iscoroutine(result):
63 result = await result
64 except Exception:
65 logger.exception("Exception in notification handler %r", name)
66 else:
67 raise RuntimeError("unknown request type, {}".format(obj[0]))
68 except Exception:
69 logger.exception("Exception while handling rpc request: %r", obj)