Hot-keys on this page
r m x p toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1import msgpack
2import asyncio
3import logging
4from typing import Any
6from .error import RPCResponseError
7from .request import RequestType
9logger = logging.getLogger(__name__)
12class Client(object):
13 """RPC Client"""
15 def __init__(self,
16 reader: asyncio.StreamReader,
17 writer: asyncio.StreamWriter,
18 *,
19 packer: msgpack.Packer = None,
20 unpacker: msgpack.Unpacker = None,
21 loop: asyncio.AbstractEventLoop = None,
22 response_timeout: float = None) -> None:
23 self._reader = reader
24 self._writer = writer
25 self._packer = packer if packer is not None else msgpack.Packer(use_bin_type=True)
26 self._unpacker = unpacker if unpacker is not None else msgpack.Unpacker(raw=False)
27 self._loop = loop if loop is not None else asyncio.get_event_loop()
29 # exposed state
30 self.response_timeout = response_timeout
32 # internal mutable state
33 self._next_msgid = 0
34 self._pending_requests = {}
35 self._receiver_task = None
37 async def _receiver(self) -> None:
38 """Background task to receive objects from the stream
40 This allows parallel/overlapping rpc calls
41 """
42 try:
43 unpacker = self._unpacker
44 reader = self._reader
45 logger.info("starting receiver")
46 while len(self._pending_requests):
47 data = await reader.read(n=2048)
48 if not data:
49 raise ConnectionError("Connection has been closed")
50 unpacker.feed(data)
51 for obj in unpacker:
52 self._on_recv(obj)
53 except ConnectionError:
54 logger.info("Server connection has closed")
55 except Exception:
56 logger.exception("exception in client receiver")
57 finally:
58 logger.info("ending receiver")
60 def close(self) -> None:
61 """Remove all pending responses and close the underlying connection"""
62 self._pending_requests = {}
63 self._writer.close()
65 if self._receiver_task is not None:
66 self._receiver_task.cancel()
68 def _on_recv(self, obj) -> None:
69 """Handler for the reception of msgpack objects"""
70 try:
71 if obj[0] == RequestType.RESPONSE:
72 _, msgid, error, result = obj
73 _, future = self._pending_requests[msgid]
74 if error:
75 future.set_exception(RPCResponseError(error))
76 else:
77 future.set_result(result)
78 else:
79 logger.error("received non-response object %r", obj)
80 except LookupError:
81 logger.error("received unknown object type %r", obj)
83 def _get_next_msgid(self) -> int:
84 """return the next msgid to be used"""
85 val = self._next_msgid
86 self._next_msgid = (self._next_msgid + 1) & 0xFFFFFFFF
87 return val
89 async def call(self, name: str, *args, timeout: float = None) -> Any:
90 """Call a remote function
92 If timeout is not given the class attribute response_timeout will be used.
93 """
94 logger.debug("call: %s%r", name, args)
95 timeout = timeout if timeout is not None else self.response_timeout
97 request = (RequestType.REQUEST, self._get_next_msgid(), name, args)
99 # create a future for the response and make it responsable for its own cleanup
100 future_response = self._loop.create_future()
101 self._pending_requests[request[1]] = (request, future_response)
102 future_response.add_done_callback(lambda fut: self._pending_requests.pop(request[1]))
104 self._writer.write(self._packer.pack(request))
106 # start the receiver if its not already active
107 if self._receiver_task is None or self._receiver_task.done():
108 self._receiver_task = self._loop.create_task(self._receiver())
109 # wait for the future or the timeout to complete
110 return await asyncio.wait_for(future_response, timeout=timeout)
112 async def notify(self, name: str, *args: Any) -> asyncio.Future:
113 """Send a one-way notification to the server"""
114 logger.debug("notify: %s%r", name, args)
115 request = (RequestType.NOTIFY, name, args)
116 self._writer.write(self._packer.pack(request))
117 await self._writer.drain()