|
@@ -5,37 +5,23 @@ import signal
|
|
|
|
|
|
|
|
|
|
class Server:
|
|
class Server:
|
|
- def __init__(self, host='localhost', input_port=3001, output_port=3002,
|
|
|
|
|
|
+ def __init__(self, host='localhost', port=3001,
|
|
buffer_chunk_size=10**4, buffer_length_limit=10**4):
|
|
buffer_chunk_size=10**4, buffer_length_limit=10**4):
|
|
self._host = host
|
|
self._host = host
|
|
- self._input_port = input_port
|
|
|
|
- self._output_port = output_port
|
|
|
|
- self._reader = None
|
|
|
|
- self._writer = None
|
|
|
|
|
|
+ self._port = port
|
|
self._stopping = False
|
|
self._stopping = False
|
|
self.buffer = collections.deque() # Shared queue of bytes
|
|
self.buffer = collections.deque() # Shared queue of bytes
|
|
self._buffer_chunk_size = buffer_chunk_size # How many bytes per chunk
|
|
self._buffer_chunk_size = buffer_chunk_size # How many bytes per chunk
|
|
self._buffer_length_limit = buffer_length_limit # How many chunks in buffer
|
|
self._buffer_length_limit = buffer_length_limit # How many chunks in buffer
|
|
|
|
+ self._working = False
|
|
|
|
|
|
@property
|
|
@property
|
|
def host(self) -> str:
|
|
def host(self) -> str:
|
|
return self._host
|
|
return self._host
|
|
|
|
|
|
@property
|
|
@property
|
|
- def input_port(self) -> int:
|
|
|
|
- return self._input_port
|
|
|
|
-
|
|
|
|
- @property
|
|
|
|
- def output_port(self) -> int:
|
|
|
|
- return self._output_port
|
|
|
|
-
|
|
|
|
- @property
|
|
|
|
- def reader(self) -> asyncio.StreamReader:
|
|
|
|
- return self._reader
|
|
|
|
-
|
|
|
|
- @property
|
|
|
|
- def writer(self) -> asyncio.StreamWriter:
|
|
|
|
- return self._writer
|
|
|
|
|
|
+ def port(self) -> int:
|
|
|
|
+ return self._port
|
|
|
|
|
|
@property
|
|
@property
|
|
def stopping(self) -> bool:
|
|
def stopping(self) -> bool:
|
|
@@ -49,49 +35,11 @@ class Server:
|
|
def buffer_chunk_size(self) -> int:
|
|
def buffer_chunk_size(self) -> int:
|
|
return self._buffer_chunk_size
|
|
return self._buffer_chunk_size
|
|
|
|
|
|
- def set_reader(self, reader=None, writer=None):
|
|
|
|
- self._reader = reader
|
|
|
|
- _ = writer
|
|
|
|
-
|
|
|
|
- def set_writer(self, reader=None, writer=None):
|
|
|
|
- _ = reader
|
|
|
|
- self._writer = writer
|
|
|
|
-
|
|
|
|
- async def run_server(self):
|
|
|
|
- logging.info("===== Started server...")
|
|
|
|
- if 0: # loop.create_connection method
|
|
|
|
- reader, _ = await loop.create_connection(asyncio.Transport, host=self.host, port=self.input_port)
|
|
|
|
- self._reader = reader
|
|
|
|
- _, writer = await loop.create_connection(asyncio.Transport, host=self.host, port=self.output_port)
|
|
|
|
- self._writer = writer
|
|
|
|
- elif 1: # asyncio.open_connection method
|
|
|
|
- reader, _ = await asyncio.open_connection(self.host, self.input_port)
|
|
|
|
- self._reader = reader
|
|
|
|
- _, writer = await asyncio.open_connection(self.host, self.output_port)
|
|
|
|
- self._writer = writer
|
|
|
|
- else: # asyncio.start_server method
|
|
|
|
- await asyncio.start_server(client_connected_cb=self.set_reader, host=self.host, port=self.input_port)
|
|
|
|
- await asyncio.start_server(client_connected_cb=self.set_writer, host=self.host, port=self.output_port)
|
|
|
|
- while (not self.stopping) and (self.reader is None or self.writer is None):
|
|
|
|
- logging.info("==... waiting for connections...==")
|
|
|
|
- await asyncio.sleep(.5)
|
|
|
|
- await asyncio.gather(
|
|
|
|
- self.run_writer(),
|
|
|
|
- self.run_reader()
|
|
|
|
- )
|
|
|
|
- logging.info("... stopped server. =====")
|
|
|
|
- return
|
|
|
|
-
|
|
|
|
- async def test_client(self):
|
|
|
|
- logging.info("===== Started client test...")
|
|
|
|
- await asyncio.gather(
|
|
|
|
- self.send('/home/davte/Provaaaaa.txt'),
|
|
|
|
- self.receive('/home/davte/Ricevuto.log')
|
|
|
|
- )
|
|
|
|
- logging.info("... stopped client test. =====")
|
|
|
|
- return
|
|
|
|
|
|
+ @property
|
|
|
|
+ def working(self) -> bool:
|
|
|
|
+ return self._working
|
|
|
|
|
|
- async def run_reader(self):
|
|
|
|
|
|
+ async def run_reader(self, reader):
|
|
while not self.stopping:
|
|
while not self.stopping:
|
|
try:
|
|
try:
|
|
# Stop if buffer is full
|
|
# Stop if buffer is full
|
|
@@ -99,14 +47,14 @@ class Server:
|
|
await asyncio.sleep(1)
|
|
await asyncio.sleep(1)
|
|
continue
|
|
continue
|
|
try:
|
|
try:
|
|
- input_data = await self.reader.readexactly(self.buffer_chunk_size)
|
|
|
|
|
|
+ input_data = await reader.readexactly(self.buffer_chunk_size)
|
|
except asyncio.IncompleteReadError as e:
|
|
except asyncio.IncompleteReadError as e:
|
|
input_data = e.partial
|
|
input_data = e.partial
|
|
self.buffer.append(input_data)
|
|
self.buffer.append(input_data)
|
|
except Exception as e:
|
|
except Exception as e:
|
|
logging.error(e)
|
|
logging.error(e)
|
|
|
|
|
|
- async def run_writer(self):
|
|
|
|
|
|
+ async def run_writer(self, writer):
|
|
while not self.stopping:
|
|
while not self.stopping:
|
|
try:
|
|
try:
|
|
# Slow down if buffer is short
|
|
# Slow down if buffer is short
|
|
@@ -116,41 +64,32 @@ class Server:
|
|
input_data = self.buffer.popleft()
|
|
input_data = self.buffer.popleft()
|
|
except IndexError:
|
|
except IndexError:
|
|
continue
|
|
continue
|
|
- self.writer.write(input_data)
|
|
|
|
- await self.writer.drain()
|
|
|
|
|
|
+ writer.write(input_data)
|
|
|
|
+ await writer.drain()
|
|
except Exception as e:
|
|
except Exception as e:
|
|
logging.error(e)
|
|
logging.error(e)
|
|
|
|
|
|
|
|
+ async def forward_bytes(self, reader: asyncio.StreamReader, writer: asyncio.StreamWriter):
|
|
|
|
+ self._working = True
|
|
|
|
+ asyncio.ensure_future(self.run_reader(reader=reader))
|
|
|
|
+ asyncio.ensure_future(self.run_writer(writer=writer))
|
|
|
|
+
|
|
|
|
+ async def run_server(self):
|
|
|
|
+ reader_server = await asyncio.start_server(client_connected_cb=self.forward_bytes,
|
|
|
|
+ host=self.host, port=self.port)
|
|
|
|
+ async with reader_server:
|
|
|
|
+ await reader_server.serve_forever()
|
|
|
|
+ return
|
|
|
|
+
|
|
def stop(self, *_):
|
|
def stop(self, *_):
|
|
- self._stopping = True
|
|
|
|
-
|
|
|
|
- async def send(self, file_path):
|
|
|
|
- await asyncio.start_server(client_connected_cb=self.set_writer, host='5.249.159.33', port='5000')
|
|
|
|
- while (not self.stopping) and self.writer is None:
|
|
|
|
- logging.info("==... waiting for connections...==")
|
|
|
|
- await asyncio.sleep(.5)
|
|
|
|
- with open(file_path, 'rb') as file_to_send:
|
|
|
|
- output_data = file_to_send.read(self.buffer_chunk_size)
|
|
|
|
- self.writer.write(output_data)
|
|
|
|
- await self.writer.drain()
|
|
|
|
-
|
|
|
|
- async def receive(self, file_path):
|
|
|
|
- await asyncio.start_server(client_connected_cb=self.set_reader, host='5.249.159.33', port='5001')
|
|
|
|
- while (not self.stopping) and self.reader is None:
|
|
|
|
- logging.info("==... waiting for connections...==")
|
|
|
|
- await asyncio.sleep(.5)
|
|
|
|
- with open(file_path, 'wb') as file_to_receive:
|
|
|
|
- try:
|
|
|
|
- input_data = await self.reader.readexactly(self.buffer_chunk_size)
|
|
|
|
- except asyncio.IncompleteReadError as e:
|
|
|
|
- input_data = e.partial
|
|
|
|
- file_to_receive.write(input_data)
|
|
|
|
|
|
+ if self.working:
|
|
|
|
+ logging.info("Received interruption signal, stopping...")
|
|
|
|
+ self._stopping = True
|
|
|
|
+ else:
|
|
|
|
+ raise KeyboardInterrupt("Not working yet...")
|
|
|
|
|
|
|
|
|
|
if __name__ == '__main__':
|
|
if __name__ == '__main__':
|
|
- print("https://stackoverflow.com/questions/48023911/aiohttp-error-connect-call-failed-for-multiple-async-requests"
|
|
|
|
- "-to-localhost")
|
|
|
|
- exit()
|
|
|
|
log_formatter = logging.Formatter(
|
|
log_formatter = logging.Formatter(
|
|
"%(asctime)s [%(module)-15s %(levelname)-8s] %(message)s",
|
|
"%(asctime)s [%(module)-15s %(levelname)-8s] %(message)s",
|
|
style='%'
|
|
style='%'
|
|
@@ -165,11 +104,11 @@ if __name__ == '__main__':
|
|
|
|
|
|
loop = asyncio.get_event_loop()
|
|
loop = asyncio.get_event_loop()
|
|
server = Server(
|
|
server = Server(
|
|
- input_port=5000,
|
|
|
|
- output_port=5001
|
|
|
|
|
|
+ host='127.0.0.1',
|
|
|
|
+ port=5000,
|
|
)
|
|
)
|
|
loop.add_signal_handler(signal.SIGINT, server.stop, loop)
|
|
loop.add_signal_handler(signal.SIGINT, server.stop, loop)
|
|
logging.info("Starting file bridging server...")
|
|
logging.info("Starting file bridging server...")
|
|
loop.run_until_complete(server.run_server())
|
|
loop.run_until_complete(server.run_server())
|
|
- logging.info("Received KeyboardInterrupt, stopping...")
|
|
|
|
loop.close()
|
|
loop.close()
|
|
|
|
+ logging.info("Stopped server")
|