Hide keyboard shortcuts

Hot-keys on this page

r m x p   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

1import asyncio 

2import msgpack 

3from .request import RequestType 

4import logging 

5from typing import Callable, Any 

6 

7 

8logger = logging.getLogger(__name__) 

9 

10 

11class Server(object): 

12 """RPC server""" 

13 

14 def __init__(self, 

15 handler: Any, 

16 *, 

17 packer: msgpack.Packer = None, 

18 unpacker_factory: Callable[[], msgpack.Unpacker] = lambda: msgpack.Unpacker(raw=False), 

19 loop: asyncio.AbstractEventLoop = None) -> None: 

20 self._handler = handler 

21 self._packer = packer if packer is not None else msgpack.Packer(use_bin_type=True) 

22 self._unpacked_factory = unpacker_factory 

23 self._loop = loop if loop is not None else asyncio.get_event_loop() 

24 

25 async def __call__(self, reader: asyncio.StreamReader, writer: asyncio.StreamWriter) -> None: 

26 """Coroutine to serve a client connection""" 

27 try: 

28 # create an unpacker for this connection 

29 unpacker = self._unpacked_factory() 

30 while True: 

31 data = await reader.read(n=4096) 

32 if not data: 

33 raise ConnectionError("Client connection closed") 

34 unpacker.feed(data) 

35 for obj in unpacker: 

36 # for every received object create a task to handle it 

37 self._loop.create_task(self._handle_request(obj, writer)) 

38 except ConnectionError: 

39 pass 

40 except Exception: 

41 logger.exception("Uncaught exception in client servicer") 

42 

43 async def _handle_request(self, obj: Any, writer: asyncio.StreamWriter) -> None: 

44 try: 

45 if obj[0] == RequestType.REQUEST: 

46 _, msgid, name, params = obj 

47 try: 

48 # handler can be a coroutine or a plain function 

49 result = getattr(self._handler, name)(*params) 

50 if asyncio.iscoroutine(result): 

51 result = await result 

52 response = (RequestType.RESPONSE, msgid, None, result) 

53 except Exception as e: 

54 logger.info("Exception %r in call handler %r", e, name) 

55 response = (RequestType.RESPONSE, msgid, str(e), None) 

56 writer.write(self._packer.pack(response)) 

57 

58 elif obj[0] == RequestType.NOTIFY: 

59 _, name, params = obj 

60 try: 

61 result = getattr(self._handler, name)(*params) 

62 if asyncio.iscoroutine(result): 

63 result = await result 

64 except Exception: 

65 logger.exception("Exception in notification handler %r", name) 

66 else: 

67 raise RuntimeError("unknown request type, {}".format(obj[0])) 

68 except Exception: 

69 logger.exception("Exception while handling rpc request: %r", obj)