Queer European MD passionate about IT
Forráskód Böngészése

Working on server but getting errors

ConnectionRefusedError: [Errno 111] Connect call failed ('127.0.0.1', 5000)
Davte 5 éve
szülő
commit
cab7527c63
2 módosított fájl, 205 hozzáadás és 8 törlés
  1. 130 0
      src/client.py
  2. 75 8
      src/server.py

+ 130 - 0
src/client.py

@@ -0,0 +1,130 @@
+import asyncio
+import collections
+import logging
+import signal
+
+
+class Server:
+    def __init__(self, host='localhost', input_port=3001, output_port=3002,
+                 buffer_chunk_size=10**4, buffer_length_limit=10**4):
+        self._host = host
+        self._input_port = input_port
+        self._output_port = output_port
+        self._reader = None
+        self._writer = None
+        self._stopping = False
+        self.buffer = collections.deque()  # Shared queue of bytes
+        self._buffer_chunk_size = buffer_chunk_size   # How many bytes per chunk
+        self._buffer_length_limit = buffer_length_limit  # How many chunks in buffer
+
+    @property
+    def host(self) -> str:
+        return self._host
+
+    @property
+    def input_port(self) -> int:
+        return self._input_port
+
+    @property
+    def output_port(self) -> int:
+        return self._output_port
+
+    @property
+    def reader(self) -> asyncio.StreamReader:
+        return self._reader
+
+    @property
+    def writer(self) -> asyncio.StreamWriter:
+        return self._writer
+
+    @property
+    def stopping(self) -> bool:
+        return self._stopping
+
+    @property
+    def buffer_length_limit(self) -> int:
+        return self._buffer_length_limit
+
+    @property
+    def buffer_chunk_size(self) -> int:
+        return self._buffer_chunk_size
+
+    def set_reader(self, reader=None, writer=None):
+        self._reader = reader
+        _ = writer
+
+    def set_writer(self, reader=None, writer=None):
+        _ = reader
+        self._writer = writer
+
+    async def run_server(self):
+        logging.info("===== Started server...")
+        await asyncio.start_server(client_connected_cb=self.set_reader, host=self.host, port=self.input_port)
+        await asyncio.start_server(client_connected_cb=self.set_writer, host=self.host, port=self.output_port)
+        while (not self.stopping) and (self.reader is None or self.writer is None):
+            logging.info("==... waiting for connections...==")
+            await asyncio.sleep(.5)
+        await asyncio.gather(
+            self.run_reader(),
+            self.run_writer()
+        )
+        logging.info("... stopped server. =====")
+        return
+
+    async def run_reader(self):
+        while not self.stopping:
+            try:
+                # Stop if buffer is full
+                while len(self.buffer) >= self.buffer_length_limit:
+                    await asyncio.sleep(1)
+                    continue
+                try:
+                    input_data = await self.reader.readexactly(self.buffer_chunk_size)
+                except asyncio.IncompleteReadError as e:
+                    input_data = e.partial
+                self.buffer.append(input_data)
+            except Exception as e:
+                logging.error(e)
+
+    async def run_writer(self):
+        while not self.stopping:
+            try:
+                # Slow down if buffer is short
+                if len(self.buffer) < 3:
+                    await asyncio.sleep(.1)
+                try:
+                    input_data = self.buffer.popleft()
+                except IndexError:
+                    continue
+                self.writer.write(input_data)
+                await self.writer.drain()
+            except Exception as e:
+                logging.error(e)
+
+    def stop(self, *_):
+        self._stopping = True
+
+
+if 1 or __name__ == '__main__':
+    log_formatter = logging.Formatter(
+        "%(asctime)s [%(module)-15s %(levelname)-8s]     %(message)s",
+        style='%'
+    )
+    root_logger = logging.getLogger()
+    root_logger.setLevel(logging.DEBUG)
+
+    console_handler = logging.StreamHandler()
+    console_handler.setFormatter(log_formatter)
+    console_handler.setLevel(logging.DEBUG)
+    root_logger.addHandler(console_handler)
+
+    loop = asyncio.get_event_loop()
+    server = Server(
+        input_port=0,
+        output_port=0
+    )
+    loop.add_signal_handler(signal.SIGINT, server.stop, loop)
+    logging.info("Starting file bridging server...")
+    loop.run_until_complete(server.run_server())
+    logging.info("Received KeyboardInterrupt, stopping...")
+    loop.close()

+ 75 - 8
src/server.py

@@ -49,18 +49,49 @@ class Server:
     def buffer_chunk_size(self) -> int:
         return self._buffer_chunk_size
 
+    def set_reader(self, reader=None, writer=None):
+        self._reader = reader
+        _ = writer
+
+    def set_writer(self, reader=None, writer=None):
+        _ = reader
+        self._writer = writer
+
     async def run_server(self):
         logging.info("===== Started server...")
+        if 0:  # loop.create_connection method (search about Protocol?)
+            reader, _ = await loop.create_connection(host=self.host, port=self.input_port)
+            self._reader = reader
+            _, writer = await loop.create_connection(host=self.host, port=self.output_port)
+            self._writer = writer
+        elif 1:  # asyncio.open_connection method
+            reader, _ = await asyncio.open_connection(self.host, self.input_port)
+            self._reader = reader
+            _, writer = await asyncio.open_connection(self.host, self.output_port)
+            self._writer = writer
+        else:  # asyncio.start_server method
+            await asyncio.start_server(client_connected_cb=self.set_reader, host=self.host, port=self.input_port)
+            await asyncio.start_server(client_connected_cb=self.set_writer, host=self.host, port=self.output_port)
+        while (not self.stopping) and (self.reader is None or self.writer is None):
+            logging.info("==... waiting for connections...==")
+            await asyncio.sleep(.5)
         await asyncio.gather(
-            self.run_reader(),
-            self.run_writer()
+            self.run_writer(),
+            self.run_reader()
         )
         logging.info("... stopped server. =====")
         return
 
+    async def test_client(self):
+        logging.info("===== Started client test...")
+        await asyncio.gather(
+            self.send('/home/davte/Provaaaaa.txt'),
+            self.receive('/home/davte/Ricevuto.log')
+        )
+        logging.info("... stopped client test. =====")
+        return
+
     async def run_reader(self):
-        reader, _ = await asyncio.open_connection(self.host, self.input_port)
-        self._reader = reader
         while not self.stopping:
             try:
                 # Stop if buffer is full
@@ -76,8 +107,6 @@ class Server:
                 logging.error(e)
 
     async def run_writer(self):
-        _, writer = await asyncio.open_connection(self.host, self.output_port)
-        self._writer = writer
         while not self.stopping:
             try:
                 # Slow down if buffer is short
@@ -88,16 +117,54 @@ class Server:
                 except IndexError:
                     continue
                 self.writer.write(input_data)
+                await self.writer.drain()
             except Exception as e:
                 logging.error(e)
 
-    def stop(self):
+    def stop(self, *_):
         self._stopping = True
 
+    async def send(self, file_path):
+        await asyncio.start_server(client_connected_cb=self.set_writer, host='5.249.159.33', port='5000')
+        while (not self.stopping) and self.writer is None:
+            logging.info("==... waiting for connections...==")
+            await asyncio.sleep(.5)
+        with open(file_path, 'rb') as file_to_send:
+            output_data = file_to_send.read(self.buffer_chunk_size)
+            self.writer.write(output_data)
+            await self.writer.drain()
+
+    async def receive(self, file_path):
+        await asyncio.start_server(client_connected_cb=self.set_reader, host='5.249.159.33', port='5001')
+        while (not self.stopping) and self.reader is None:
+            logging.info("==... waiting for connections...==")
+            await asyncio.sleep(.5)
+        with open(file_path, 'wb') as file_to_receive:
+            try:
+                input_data = await self.reader.readexactly(self.buffer_chunk_size)
+            except asyncio.IncompleteReadError as e:
+                input_data = e.partial
+            file_to_receive.write(input_data)
+
 
 if __name__ == '__main__':
+    log_formatter = logging.Formatter(
+        "%(asctime)s [%(module)-15s %(levelname)-8s]     %(message)s",
+        style='%'
+    )
+    root_logger = logging.getLogger()
+    root_logger.setLevel(logging.DEBUG)
+
+    console_handler = logging.StreamHandler()
+    console_handler.setFormatter(log_formatter)
+    console_handler.setLevel(logging.DEBUG)
+    root_logger.addHandler(console_handler)
+
     loop = asyncio.get_event_loop()
-    server = Server()
+    server = Server(
+        input_port=5000,
+        output_port=5001
+    )
     loop.add_signal_handler(signal.SIGINT, server.stop, loop)
     logging.info("Starting file bridging server...")
     loop.run_until_complete(server.run_server())