Samo Penic
2018-12-11 79ee3747258bc94c4c80ecee1fbc0e8f00103d64
commit | author | age
57af9d 1 #!/usr/bin/python3
SP 2 import requests
3 import json
4 from time import sleep
5 import uuid
6 import subprocess
7 import os
8 import shutil
9 import signal
10 import sys
11 import socket
e08bff 12 from threading import Thread, Event
4c9734 13 import re
SP 14
8058ab 15
SP 16
17 glob_ts_version='00000'
18
4c9734 19 def getTrisurfVersion():
SP 20     p = subprocess.Popen('trisurf --version', shell=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
21     lines=p.stdout.readlines()
22     version=re.findall(r'[0-9a-f]{7}(?:-dirty)?', lines[0].decode('ascii'))
23     p.wait()
24     if(len(version)):
25         return version[0]
26     else:
27         return "unknown version"
57af9d 28
SP 29 def get_hostname():
30     return socket.gethostname()
31
32 def get_ip():
33     return ((([ip for ip in socket.gethostbyname_ex(socket.gethostname())[2] if not ip.startswith("127.")] or [[(s.connect(("8.8.8.8", 53)), s.getsockname()[0], s.close()) for s in [socket.socket(socket.AF_INET, socket.SOCK_DGRAM)]][0][1]]) + ["no IP found"])[0])
34
e08bff 35 def get_client_id(addr, my_ip, my_hostname, subrun):
8058ab 36     global glob_ts_version
SP 37     client_auth={'ip':my_ip,'hostname':my_hostname, 'subrun':subrun, 'trisurf_version':glob_ts_version }
57af9d 38     response=requests.post(addr+"/api/register/", data=client_auth)
SP 39     if(response.status_code==200):
40         client_data=json.loads(response.text)
41         client_id=client_data['id']
42         return client_id
43     else:
44         raise ValueError
45
46
47 def get_run(addr,cid):
48     response=requests.get(addr+"/api/getrun/"+str(cid)+"/")
49     if(response.status_code==200):
50         client_data=json.loads(response.text)
51         rid=client_data['id']
52         tape=client_data['tape']
53         vtu=client_data['lastVTU']
54         status=client_data['status']
55         return (rid,tape,vtu,status)
56     else:
79ee37 57         #print(response.text)
e08bff 58         if(response.status_code==400):
SP 59             raise ValueError
60         else:
61             raise NameError
57af9d 62
SP 63
64 def ping_run(addr,cid, rid):
65     client_data={'client_id':cid, 'run_id':rid}
66     response=requests.post(addr+"/api/ping/", data=client_data)
67     if(response.status_code==200):
68         return
69     else:
70         raise ValueError
e08bff 71
SP 72 def client_ping(addr,cid):
73     client_data={'client_id':cid}
74     response=requests.post(addr+"/api/pingclient/", data=client_data)
75     if(response.status_code==200):
59a59b 76         client_data=json.loads(response.text)
d357f5 77             
SP 78         return client_data['concurrent_runs']
e08bff 79     else:
SP 80         raise ValueError
81
82
57af9d 83
1d3d12 84 def send_error_report(addr,cid, rid,errcode):
SP 85     client_data={'client_id':cid, 'run_id':rid, 'error_code':errcode}
86     response=requests.post(addr+"/api/reporterr/", data=client_data)
87     if(response.status_code==200):
88         return
89     else:
90         raise ValueError
91
57af9d 92 def upload(addr,cid, rid, vtu, status):
SP 93     client_data={'client_id': cid, 'run_id': rid, 'lastVTU': vtu, 'status': status}
94     response=requests.post(addr+"/api/upload/", data=client_data)
95     if(response.status_code==200):
96         return
97     else:
98         raise ValueError
99
100 def getNewVTU(directory):
101     fset=set()
102     for file in os.listdir(directory):
103         if file.endswith(".vtu") and file.startswith("timestep_"):
104             fset.add(file)
105     return fset
106
107
108 def removeDir(directory):
109     os.chdir('/')
110     try:
111         shutil.rmtree(directory)
112     except:
113         print("Cannot remove directory "+directory+ "\n")
114     return
115
e08bff 116                     
57af9d 117
e08bff 118
SP 119 class ClientThread(Thread):
120     def __init__(self,conn_address='http://beti.trisurf.eu',subid=0, update_seconds=100):
121         super(ClientThread,self).__init__()
122         self._stop_event = Event()
123         self._stop_event.clear()
124         self.p=None
125         self.workingdir=None
126         self.conn_address=conn_address
127         self.id=subid
128         self.ip=get_ip()
129         self.hostname=get_hostname()
130         self.update_seconds=update_seconds
59a59b 131         self.max_client_ping_time_elapsed=250
e08bff 132
d357f5 133         self.subruns=[]
SP 134
e08bff 135     def stop(self):
SP 136         self._stop_event.set()
137
138     def isStopped(self):
139         return self._stop_event.is_set()
140
141     def join(self):
142         print('joining threads')
143         super(ClientThread, self).join()
d357f5 144         for sub in self.subruns:
SP 145             sub.stop()
146             sub.join()
e08bff 147         if self.p is not None:
SP 148             self.p.terminate()
149         if self.workingdir is not None:
150             removeDir(self.workingdir.fullpath())
151
152
153     def sleep(self,s):
154         for i in range(0, s):
155             if(self.isStopped()):
156                 return False
157             sleep(1)
158         return True
d357f5 159
SP 160     def subrunsStartStop(self,nr):
161         while(self.id==0 and nr>len(self.subruns)+1):
162             #spawning a new worker:
163             print("[{}] Spawning a new worker".format(self.id))
164             t=ClientThread(conn_address=self.conn_address, subid=len(self.subruns)+1,update_seconds=self.update_seconds)
165             t.start()
166             self.subruns.append(t)    
167         while(self.id==0 and nr<len(self.subruns)+1):
168             print("[{}] Stopping a worker".format(self.id))
169             self.subruns[-1].stop()
170             self.subruns[-1].join()
171             del self.subruns[-1]
e08bff 172     
SP 173     def run(self):
59a59b 174         while(not self.isStopped()): #try to register
e08bff 175             try:
SP 176                 cid=get_client_id(self.conn_address, self.ip, self.hostname, self.id)
177             except:
178                 print("[{}] Could not get CID.".format(self.id))
179                 self.sleep(10)
180                 continue
181             #print("Got CID. getting RID.")
182             client_ping_time_elapsed=0
d357f5 183             concurrent_runs=client_ping(self.conn_address,cid)
SP 184             self.subrunsStartStop(concurrent_runs)
59a59b 185             while(not self.isStopped()): #successfully registered, now start pinging and searching for job
e08bff 186                 try:
SP 187                     (rid,tape,vtu,status)=get_run(self.conn_address,cid)
188                 except NameError:
79ee37 189                     #print("[{}] Could not get RID.".format(self.id))
e08bff 190                     self.sleep(10)
SP 191                     client_ping_time_elapsed+=10
59a59b 192                     if(client_ping_time_elapsed>=self.max_client_ping_time_elapsed):
1d3d12 193                         try:
d357f5 194                             concurrent_runs=client_ping(self.conn_address,cid)
SP 195                             self.subrunsStartStop(concurrent_runs)
1d3d12 196                         except:
e08bff 197                             break
SP 198                         client_ping_time_elapsed=0
199                     #if you put break instead of continue, there is no need to ping client. And it is more robust... 
200                     continue
201                 except ValueError:
202                     print("[{}] Wrong CID? Getting new CID.".format(self.id))
203                     #self.sleep(10)
57af9d 204                     break
e08bff 205                 except:
SP 206                     print("[{}] Cannot connect. Server down? Retrying....".format(self.id))
207                     break
208                 else:
209                     #start separate thread with simulations.
210                     self.workingdir=Directory('/tmp/ts_'+str(uuid.uuid4()))
211                     self.workingdir.makeifnotexist()
212                     self.workingdir.goto()
d357f5 213                     #print("[{}] Using directory {}".format(self.id, self.workingdir.fullpath()))
e08bff 214                     with open(self.workingdir.fullpath()+"/tape", 'w') as f:
SP 215                         f.write(tape)
216                     if(int(status)==-1):
217                         cmd=['trisurf', '--force-from-tape']
218                         print("[{}] Run id={} :: Starting from tape.".format(self.id, rid))
219                     else:
220                         with open(self.workingdir.fullpath()+"/.status",'w') as f:
221                             f.write(status)
222                         with open(self.workingdir.fullpath()+"/initial.vtu",'w') as f:
223                             f.write(vtu)
224                         cmd=['trisurf', '--restore-from-vtk', 'initial.vtu']
225                         print("[{}] Run id={} :: Restoring from vtk, last timestep {}".format(self.id,rid,status))
226                     self.p=subprocess.Popen(cmd, stdout=subprocess.DEVNULL)
227                     s=int(status)
228                     while(not self.isStopped()):
229                         #monitor for new file. If file is present, upload it!
230                         newVTU=getNewVTU(self.workingdir.fullpath())
231                         if newVTU: #upload
232                             try:
d357f5 233                                 for nvfile in sorted(newVTU):
SP 234                                     nv=os.path.join(self.workingdir.fullpath(),nvfile)
e08bff 235                                     with open(nv,'r') as f:
SP 236                                         fc=f.read()
237                                     s=s+1
d357f5 238                                     print('[{}] Uploading {}.'.format(self.id,nvfile))
e08bff 239                                     upload(self.conn_address, cid, rid, fc, s)
SP 240                                     os.unlink(nv)
d357f5 241                             except Exception as e:
e08bff 242                                 print("[{}] Could not upload".format(self.id))
d357f5 243                                 print(e)
e08bff 244                                 self.p.terminate()
SP 245                                 removeDir(self.workingdir.fullpath())
246                                 self.p=None
247                                 self.workingdir=None
248                                 break
249                             else:
250                                 print("[{}] VTU uploaded.".format(self.id))
251                         else: #ping
252                             try:
253                                 ping_run(self.conn_address, cid, rid)
254                             except:
255                                 print("[{}] Could not ping.".format(self.id))
256                                 self.p.terminate()
257                                 self.p=None
258                                 removeDir(self.workingdir.fullpath())
259                                 self.workingdir=None
260                                 #stop simulations
261                                 break
262                         #check if trisurf is still running. If not break the innermost loop.
263                         sleep(1)
264                         if(self.p.poll() is not None): # trisurf exited!
265                             print("[{}] Trisurf was stopped with return code {}".format(self.id, self.p.returncode))
266                             if(self.p.returncode>0):
267                                 try:
268                                     send_error_report(self.conn_address, cid, rid, self.p.returncode)
269                                 except:
270                                     print("[{}] Server didn't accept error report".format(self.id))
271                             removeDir(self.workingdir.fullpath())
272                             self.workingdir=None
273                             self.p=None
274                             break
275                         self.sleep(self.update_seconds-1)
59a59b 276                         client_ping_time_elapsed+=self.update_seconds
SP 277                         if(client_ping_time_elapsed>self.max_client_ping_time_elapsed-self.update_seconds/2):
d357f5 278                             concurrent_runs=client_ping(self.conn_address,cid)
SP 279                             self.subrunsStartStop(concurrent_runs)
59a59b 280                             client_ping_time_elapsed=0
57af9d 281
SP 282
e08bff 283
SP 284 #Stolen from trisurf library... Therefore, this client is not dependent on the installation of the library.
285 class Directory:
286     '''
287     Class deals with the paths where the simulation is run and data is stored.
288     '''
289     def __init__(self, maindir=".", simdir=""):
290         '''Initialization Directory() takes two optional parameters, namely maindir and simdir. Defaults to current directory. It sets local variables maindir and simdir accordingly.'''
291         self.maindir=maindir
292         self.simdir=simdir
293         return
294
295     def fullpath(self):
296         '''
297         Method returns string of path where the data is stored. It combines values of maindir and simdir as maindir/simdir on Unix.
298         '''
299         return os.path.join(self.maindir,self.simdir)
300
301     def exists(self):
302         ''' Method checks whether the directory  specified by fullpath() exists. It return True/False on completion.'''
303         path=self.fullpath()
304         if(os.path.exists(path)):
305             return True
306         else:
307             return False
308
309     def make(self):
310         ''' Method make() creates directory. If it fails it exits the program with error message.'''
311         try:
312             os.makedirs(self.fullpath())
313         except:
314             print("Cannot make directory "+self.fullpath()+"\n")
315             exit(1)
316         return
317
318     def makeifnotexist(self):
319         '''Method makeifnotexist() creates directory if it does not exist.'''
320         if(self.exists()==0):
321             self.make()
322             return True
323         else:
324             return False
325
326     def remove(self):
327         '''Method remove() removes directory recursively. WARNING! No questions asked.'''
328         if(self.exists()):
329             try:
330                 os.rmdir(self.fullpath())
331             except:
332                 print("Cannot remove directory "+self.fullpath()+ "\n")
333                 exit(1)
334         return
335
336     def goto(self):
337         '''
338         Method goto() moves current directory to the one specified by fullpath(). WARNING: when using the relative paths, do not call this function multiple times.
339         '''
340         try:
341             os.chdir(self.fullpath())
342         except:
343             print("Cannot go to directory "+self.fullpath()+"\n")
344         return
345
346
347
348 #--- SIGINT and SIGTERM HANDLING ---
349 def signal_handler(signal,frame):
350     t.stop()
351     t.join()
352     print("Process ended with signal " +str(signal))
353     sys.exit(signal)
354 #--- END SIGINT and SIGTERM----
355
356 if __name__ == '__main__':
8058ab 357     #global glob_ts_version
SP 358     glob_ts_version=getTrisurfVersion()
e08bff 359     signal.signal(signal.SIGINT, signal_handler)
SP 360     signal.signal(signal.SIGTERM, signal_handler)
361
362     t=ClientThread(update_seconds=100)
363     t.start()
364     #t.join()
365     #print("main")
366     while(True):
367         sleep(1000)