Hide keyboard shortcuts

Hot-keys on this page

r m x p   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

1import msgpack 

2import asyncio 

3import logging 

4from typing import Any 

5 

6from .error import RPCResponseError 

7from .request import RequestType 

8 

9logger = logging.getLogger(__name__) 

10 

11 

12class Client(object): 

13 """RPC Client""" 

14 

15 def __init__(self, 

16 reader: asyncio.StreamReader, 

17 writer: asyncio.StreamWriter, 

18 *, 

19 packer: msgpack.Packer = None, 

20 unpacker: msgpack.Unpacker = None, 

21 loop: asyncio.AbstractEventLoop = None, 

22 response_timeout: float = None) -> None: 

23 self._reader = reader 

24 self._writer = writer 

25 self._packer = packer if packer is not None else msgpack.Packer(use_bin_type=True) 

26 self._unpacker = unpacker if unpacker is not None else msgpack.Unpacker(raw=False) 

27 self._loop = loop if loop is not None else asyncio.get_event_loop() 

28 

29 # exposed state 

30 self.response_timeout = response_timeout 

31 

32 # internal mutable state 

33 self._next_msgid = 0 

34 self._pending_requests = {} 

35 self._receiver_task = None 

36 

37 async def _receiver(self) -> None: 

38 """Background task to receive objects from the stream 

39 

40 This allows parallel/overlapping rpc calls 

41 """ 

42 try: 

43 unpacker = self._unpacker 

44 reader = self._reader 

45 logger.info("starting receiver") 

46 while len(self._pending_requests): 

47 data = await reader.read(n=2048) 

48 if not data: 

49 raise ConnectionError("Connection has been closed") 

50 unpacker.feed(data) 

51 for obj in unpacker: 

52 self._on_recv(obj) 

53 except ConnectionError: 

54 logger.info("Server connection has closed") 

55 except Exception: 

56 logger.exception("exception in client receiver") 

57 finally: 

58 logger.info("ending receiver") 

59 

60 def close(self) -> None: 

61 """Remove all pending responses and close the underlying connection""" 

62 self._pending_requests = {} 

63 self._writer.close() 

64 

65 if self._receiver_task is not None: 

66 self._receiver_task.cancel() 

67 

68 def _on_recv(self, obj) -> None: 

69 """Handler for the reception of msgpack objects""" 

70 try: 

71 if obj[0] == RequestType.RESPONSE: 

72 _, msgid, error, result = obj 

73 _, future = self._pending_requests[msgid] 

74 if error: 

75 future.set_exception(RPCResponseError(error)) 

76 else: 

77 future.set_result(result) 

78 else: 

79 logger.error("received non-response object %r", obj) 

80 except LookupError: 

81 logger.error("received unknown object type %r", obj) 

82 

83 def _get_next_msgid(self) -> int: 

84 """return the next msgid to be used""" 

85 val = self._next_msgid 

86 self._next_msgid = (self._next_msgid + 1) & 0xFFFFFFFF 

87 return val 

88 

89 async def call(self, name: str, *args, timeout: float = None) -> Any: 

90 """Call a remote function 

91 

92 If timeout is not given the class attribute response_timeout will be used. 

93 """ 

94 logger.debug("call: %s%r", name, args) 

95 timeout = timeout if timeout is not None else self.response_timeout 

96 

97 request = (RequestType.REQUEST, self._get_next_msgid(), name, args) 

98 

99 # create a future for the response and make it responsable for its own cleanup 

100 future_response = self._loop.create_future() 

101 self._pending_requests[request[1]] = (request, future_response) 

102 future_response.add_done_callback(lambda fut: self._pending_requests.pop(request[1])) 

103 

104 self._writer.write(self._packer.pack(request)) 

105 

106 # start the receiver if its not already active 

107 if self._receiver_task is None or self._receiver_task.done(): 

108 self._receiver_task = self._loop.create_task(self._receiver()) 

109 # wait for the future or the timeout to complete 

110 return await asyncio.wait_for(future_response, timeout=timeout) 

111 

112 async def notify(self, name: str, *args: Any) -> asyncio.Future: 

113 """Send a one-way notification to the server""" 

114 logger.debug("notify: %s%r", name, args) 

115 request = (RequestType.NOTIFY, name, args) 

116 self._writer.write(self._packer.pack(request)) 

117 await self._writer.drain()