Skip to content

Commit 6955437

Browse files
Merge pull request #479 from meldafert/async-socket-creator
Make BackgroundSocketCreator async using asyncio
2 parents 29c154a + c711f60 commit 6955437

1 file changed

Lines changed: 31 additions & 20 deletions

File tree

‎python3/vdebug/connection.py‎

Lines changed: 31 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
import sys
55
import threading
66
import time
7+
import asyncio
78

89
from . import log
910

@@ -157,31 +158,42 @@ def has_socket(self):
157158

158159
class BackgroundSocketCreator(threading.Thread):
159160

160-
def __init__(self, host, port, message_q, output_q):
161-
self.__message_q = message_q
161+
def __init__(self, host, port, output_q):
162162
self.__output_q = output_q
163163
self.__host = host
164164
self.__port = port
165+
self.__socket_task = None
166+
self.__loop = None
165167
threading.Thread.__init__(self)
166168

167169
@staticmethod
168170
def log(message):
169171
log.Log(message, log.Logger.DEBUG)
170172

171173
def run(self):
174+
# needed for python 3.5
175+
self.__loop = asyncio.new_event_loop()
176+
asyncio.set_event_loop(self.__loop)
177+
self.__loop.run_until_complete(self.run_async())
178+
179+
async def run_async(self):
172180
self.log("Started")
173181
self.log("Listening on port %s" % self.__port)
174182
try:
175183
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
176-
s.setblocking(1)
184+
s.setblocking(False)
177185
s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
178186
s.bind((self.__host, self.__port))
179-
s.settimeout(5) # timeout after 5 seconds so we can check messages
180187
s.listen(5)
181188
while 1:
182189
try:
183-
self.__peek_for_exit()
184-
client, address = s.accept()
190+
# using ensure_future here since before 3.7, this is not a coroutine, but returns a future
191+
self.__socket_task = asyncio.ensure_future(self.__loop.sock_accept(s))
192+
client, address = await self.__socket_task
193+
# set resulting socket to blocking
194+
client.setblocking(True)
195+
client.settimeout(5)
196+
185197
self.log("Found client, %s" % str(address))
186198
self.__output_q.put((client, address))
187199
break
@@ -195,31 +207,30 @@ def run(self):
195207
if socket_error.errno == errno.EADDRINUSE:
196208
self.log("Address already in use")
197209
print("Socket is already in use")
198-
except Exception:
210+
except asyncio.CancelledError as e:
211+
self.log("Stopping server")
212+
self.__socket_task = None
213+
except Exception as e:
199214
print("Exception caught")
200215
self.log("Error: %s" % str(sys.exc_info()))
201216
self.log("Stopping server")
202217
finally:
203218
self.log("Finishing socket server")
204219
s.close()
205220

206-
def __peek_for_exit(self):
207-
try:
208-
# self.log("Checking for exit")
209-
self.__check_exit(self.__message_q.get_nowait())
210-
except queue.Empty:
211-
pass
221+
def _exit(self):
222+
if self.__socket_task:
223+
# this will raise asyncio.CancelledError
224+
self.__socket_task.cancel()
212225

213-
@staticmethod
214-
def __check_exit(message):
215-
if message == "exit":
216-
raise Exception("Exiting")
226+
# called from outside of the thread
227+
def exit(self):
228+
self.__loop.call_soon_threadsafe(self._exit)
217229

218230

219231
class SocketServer:
220232

221233
def __init__(self):
222-
self.__message_q = queue.Queue(0)
223234
self.__socket_q = queue.Queue(1)
224235
self.__thread = None
225236

@@ -229,7 +240,7 @@ def __del__(self):
229240
def start(self, host, port):
230241
if not self.is_alive():
231242
self.__thread = BackgroundSocketCreator(
232-
host, port, self.__message_q, self.__socket_q)
243+
host, port, self.__socket_q)
233244
self.__thread.start()
234245

235246
def is_alive(self):
@@ -243,7 +254,7 @@ def socket(self):
243254

244255
def stop(self):
245256
if self.is_alive():
246-
self.__message_q.put_nowait("exit")
257+
self.__thread.exit()
247258
self.__thread.join(3000)
248259
if self.has_socket():
249260
self.socket()[0].close()

0 commit comments

Comments
 (0)