44import sys
55import threading
66import time
7+ import asyncio
78
89from . import log
910
@@ -157,31 +158,42 @@ def has_socket(self):
157158
158159class BackgroundSocketCreator (threading .Thread ):
159160
160- def __init__ (self , host , port , message_q , output_q ):
161- self .__message_q = message_q
161+ def __init__ (self , host , port , output_q ):
162162 self .__output_q = output_q
163163 self .__host = host
164164 self .__port = port
165+ self .__socket_task = None
166+ self .__loop = None
165167 threading .Thread .__init__ (self )
166168
167169 @staticmethod
168170 def log (message ):
169171 log .Log (message , log .Logger .DEBUG )
170172
171173 def run (self ):
174+ # needed for python 3.5
175+ self .__loop = asyncio .new_event_loop ()
176+ asyncio .set_event_loop (self .__loop )
177+ self .__loop .run_until_complete (self .run_async ())
178+
179+ async def run_async (self ):
172180 self .log ("Started" )
173181 self .log ("Listening on port %s" % self .__port )
174182 try :
175183 s = socket .socket (socket .AF_INET , socket .SOCK_STREAM )
176- s .setblocking (1 )
184+ s .setblocking (False )
177185 s .setsockopt (socket .SOL_SOCKET , socket .SO_REUSEADDR , 1 )
178186 s .bind ((self .__host , self .__port ))
179- s .settimeout (5 ) # timeout after 5 seconds so we can check messages
180187 s .listen (5 )
181188 while 1 :
182189 try :
183- self .__peek_for_exit ()
184- client , address = s .accept ()
190+ # using ensure_future here since before 3.7, this is not a coroutine, but returns a future
191+ self .__socket_task = asyncio .ensure_future (self .__loop .sock_accept (s ))
192+ client , address = await self .__socket_task
193+ # set resulting socket to blocking
194+ client .setblocking (True )
195+ client .settimeout (5 )
196+
185197 self .log ("Found client, %s" % str (address ))
186198 self .__output_q .put ((client , address ))
187199 break
@@ -195,31 +207,30 @@ def run(self):
195207 if socket_error .errno == errno .EADDRINUSE :
196208 self .log ("Address already in use" )
197209 print ("Socket is already in use" )
198- except Exception :
210+ except asyncio .CancelledError as e :
211+ self .log ("Stopping server" )
212+ self .__socket_task = None
213+ except Exception as e :
199214 print ("Exception caught" )
200215 self .log ("Error: %s" % str (sys .exc_info ()))
201216 self .log ("Stopping server" )
202217 finally :
203218 self .log ("Finishing socket server" )
204219 s .close ()
205220
206- def __peek_for_exit (self ):
207- try :
208- # self.log("Checking for exit")
209- self .__check_exit (self .__message_q .get_nowait ())
210- except queue .Empty :
211- pass
221+ def _exit (self ):
222+ if self .__socket_task :
223+ # this will raise asyncio.CancelledError
224+ self .__socket_task .cancel ()
212225
213- @staticmethod
214- def __check_exit (message ):
215- if message == "exit" :
216- raise Exception ("Exiting" )
226+ # called from outside of the thread
227+ def exit (self ):
228+ self .__loop .call_soon_threadsafe (self ._exit )
217229
218230
219231class SocketServer :
220232
221233 def __init__ (self ):
222- self .__message_q = queue .Queue (0 )
223234 self .__socket_q = queue .Queue (1 )
224235 self .__thread = None
225236
@@ -229,7 +240,7 @@ def __del__(self):
229240 def start (self , host , port ):
230241 if not self .is_alive ():
231242 self .__thread = BackgroundSocketCreator (
232- host , port , self .__message_q , self . __socket_q )
243+ host , port , self .__socket_q )
233244 self .__thread .start ()
234245
235246 def is_alive (self ):
@@ -243,7 +254,7 @@ def socket(self):
243254
244255 def stop (self ):
245256 if self .is_alive ():
246- self .__message_q . put_nowait ( " exit" )
257+ self .__thread . exit ( )
247258 self .__thread .join (3000 )
248259 if self .has_socket ():
249260 self .socket ()[0 ].close ()
0 commit comments