|
@@ -1,27 +1,32 @@
|
|
|
import asyncio
|
|
|
+import collections
|
|
|
import logging
|
|
|
+import signal
|
|
|
|
|
|
|
|
|
class Server:
|
|
|
- def __init__(self, host='localhost', input_port=None, output_port=None):
|
|
|
+ def __init__(self, host='localhost', input_port=3001, output_port=3002,
|
|
|
+ buffer_chunk_size=10**4, buffer_length_limit=10**4):
|
|
|
self._host = host
|
|
|
self._input_port = input_port
|
|
|
self._output_port = output_port
|
|
|
self._reader = None
|
|
|
self._writer = None
|
|
|
self._stopping = False
|
|
|
- self.buffer = [] # Shared list
|
|
|
+ self.buffer = collections.deque() # Shared queue of bytes
|
|
|
+ self._buffer_chunk_size = buffer_chunk_size # How many bytes per chunk
|
|
|
+ self._buffer_length_limit = buffer_length_limit # How many chunks in buffer
|
|
|
|
|
|
@property
|
|
|
- def host(self):
|
|
|
+ def host(self) -> str:
|
|
|
return self._host
|
|
|
|
|
|
@property
|
|
|
- def input_port(self):
|
|
|
+ def input_port(self) -> int:
|
|
|
return self._input_port
|
|
|
|
|
|
@property
|
|
|
- def output_port(self):
|
|
|
+ def output_port(self) -> int:
|
|
|
return self._output_port
|
|
|
|
|
|
@property
|
|
@@ -29,45 +34,72 @@ class Server:
|
|
|
return self._reader
|
|
|
|
|
|
@property
|
|
|
- def writer(self) -> asyncio.StreamReader:
|
|
|
+ def writer(self) -> asyncio.StreamWriter:
|
|
|
return self._writer
|
|
|
|
|
|
@property
|
|
|
def stopping(self) -> bool:
|
|
|
return self._stopping
|
|
|
|
|
|
- async def setup_reader(self):
|
|
|
- reader, _ = await asyncio.open_connection(self.host, self.input_port)
|
|
|
- self._reader = reader
|
|
|
+ @property
|
|
|
+ def buffer_length_limit(self) -> int:
|
|
|
+ return self._buffer_length_limit
|
|
|
|
|
|
- async def setup_writer(self):
|
|
|
- _, writer = await asyncio.open_connection(self.host, self.output_port)
|
|
|
- self._writer = writer
|
|
|
+ @property
|
|
|
+ def buffer_chunk_size(self) -> int:
|
|
|
+ return self._buffer_chunk_size
|
|
|
|
|
|
async def run_server(self):
|
|
|
- await self.setup_reader()
|
|
|
+ logging.info("===== Started server...")
|
|
|
+ await asyncio.gather(
|
|
|
+ self.run_reader(),
|
|
|
+ self.run_writer()
|
|
|
+ )
|
|
|
+ logging.info("... stopped server. =====")
|
|
|
+ return
|
|
|
+
|
|
|
+ async def run_reader(self):
|
|
|
+ reader, _ = await asyncio.open_connection(self.host, self.input_port)
|
|
|
+ self._reader = reader
|
|
|
while not self.stopping:
|
|
|
try:
|
|
|
- while len(self.buffer) >= 10**4:
|
|
|
+ # Stop if buffer is full
|
|
|
+ while len(self.buffer) >= self.buffer_length_limit:
|
|
|
await asyncio.sleep(1)
|
|
|
+ continue
|
|
|
try:
|
|
|
- input_data = await self.reader.readexactly(10**4)
|
|
|
+ input_data = await self.reader.readexactly(self.buffer_chunk_size)
|
|
|
except asyncio.IncompleteReadError as e:
|
|
|
input_data = e.partial
|
|
|
self.buffer.append(input_data)
|
|
|
except Exception as e:
|
|
|
logging.error(e)
|
|
|
|
|
|
+ async def run_writer(self):
|
|
|
+ _, writer = await asyncio.open_connection(self.host, self.output_port)
|
|
|
+ self._writer = writer
|
|
|
+ while not self.stopping:
|
|
|
+ try:
|
|
|
+ # Slow down if buffer is short
|
|
|
+ if len(self.buffer) < 3:
|
|
|
+ await asyncio.sleep(.1)
|
|
|
+ try:
|
|
|
+ input_data = self.buffer.popleft()
|
|
|
+ except IndexError:
|
|
|
+ continue
|
|
|
+ self.writer.write(input_data)
|
|
|
+ except Exception as e:
|
|
|
+ logging.error(e)
|
|
|
+
|
|
|
+ def stop(self):
|
|
|
+ self._stopping = True
|
|
|
+
|
|
|
|
|
|
if __name__ == '__main__':
|
|
|
loop = asyncio.get_event_loop()
|
|
|
- try:
|
|
|
- logging.info("Starting file bridging server...")
|
|
|
- # loop.run_until_complete(
|
|
|
- # asyncio.gather(
|
|
|
- # read_input_socket,
|
|
|
- # write_on_output_socket
|
|
|
- # )
|
|
|
- # )
|
|
|
- except KeyboardInterrupt():
|
|
|
- logging.info("Received KeyboardInterrupt, stopping...")
|
|
|
+ server = Server()
|
|
|
+ loop.add_signal_handler(signal.SIGINT, server.stop, loop)
|
|
|
+ logging.info("Starting file bridging server...")
|
|
|
+ loop.run_until_complete(server.run_server())
|
|
|
+ logging.info("Received KeyboardInterrupt, stopping...")
|
|
|
+ loop.close()
|