Samo Penic
2018-12-23 1bc9bba5a3bb201345fcf6b690b1c884f8ab8a7d
commit | author | age
57af9d 1 #!/usr/bin/python3
SP 2 import requests
3 import json
4 from time import sleep
5 import uuid
6 import subprocess
7 import os
8 import shutil
9 import signal
10 import sys
11 import socket
e08bff 12 from threading import Thread, Event
4c9734 13 import re
SP 14
8058ab 15
SP 16
17 glob_ts_version='00000'
18
4c9734 19 def getTrisurfVersion():
SP 20     p = subprocess.Popen('trisurf --version', shell=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
21     lines=p.stdout.readlines()
22     version=re.findall(r'[0-9a-f]{7}(?:-dirty)?', lines[0].decode('ascii'))
23     p.wait()
24     if(len(version)):
25         return version[0]
26     else:
27         return "unknown version"
57af9d 28
SP 29 def get_hostname():
30     return socket.gethostname()
31
32 def get_ip():
33     return ((([ip for ip in socket.gethostbyname_ex(socket.gethostname())[2] if not ip.startswith("127.")] or [[(s.connect(("8.8.8.8", 53)), s.getsockname()[0], s.close()) for s in [socket.socket(socket.AF_INET, socket.SOCK_DGRAM)]][0][1]]) + ["no IP found"])[0])
34
e08bff 35 def get_client_id(addr, my_ip, my_hostname, subrun):
8058ab 36     global glob_ts_version
SP 37     client_auth={'ip':my_ip,'hostname':my_hostname, 'subrun':subrun, 'trisurf_version':glob_ts_version }
57af9d 38     response=requests.post(addr+"/api/register/", data=client_auth)
SP 39     if(response.status_code==200):
40         client_data=json.loads(response.text)
41         client_id=client_data['id']
42         return client_id
43     else:
44         raise ValueError
45
46
47 def get_run(addr,cid):
48     response=requests.get(addr+"/api/getrun/"+str(cid)+"/")
49     if(response.status_code==200):
50         client_data=json.loads(response.text)
51         rid=client_data['id']
52         tape=client_data['tape']
53         vtu=client_data['lastVTU']
54         status=client_data['status']
55         return (rid,tape,vtu,status)
56     else:
79ee37 57         #print(response.text)
e08bff 58         if(response.status_code==400):
SP 59             raise ValueError
60         else:
61             raise NameError
57af9d 62
SP 63
64 def ping_run(addr,cid, rid):
65     client_data={'client_id':cid, 'run_id':rid}
66     response=requests.post(addr+"/api/ping/", data=client_data)
67     if(response.status_code==200):
68         return
69     else:
70         raise ValueError
e08bff 71
SP 72 def client_ping(addr,cid):
73     client_data={'client_id':cid}
74     response=requests.post(addr+"/api/pingclient/", data=client_data)
75     if(response.status_code==200):
59a59b 76         client_data=json.loads(response.text)
d357f5 77             
SP 78         return client_data['concurrent_runs']
e08bff 79     else:
SP 80         raise ValueError
81
82
57af9d 83
1d3d12 84 def send_error_report(addr,cid, rid,errcode):
SP 85     client_data={'client_id':cid, 'run_id':rid, 'error_code':errcode}
86     response=requests.post(addr+"/api/reporterr/", data=client_data)
87     if(response.status_code==200):
88         return
89     else:
90         raise ValueError
91
57af9d 92 def upload(addr,cid, rid, vtu, status):
SP 93     client_data={'client_id': cid, 'run_id': rid, 'lastVTU': vtu, 'status': status}
94     response=requests.post(addr+"/api/upload/", data=client_data)
95     if(response.status_code==200):
96         return
97     else:
98         raise ValueError
99
100 def getNewVTU(directory):
101     fset=set()
102     for file in os.listdir(directory):
103         if file.endswith(".vtu") and file.startswith("timestep_"):
104             fset.add(file)
105     return fset
106
107
108 def removeDir(directory):
109     os.chdir('/')
110     try:
111         shutil.rmtree(directory)
112     except:
113         print("Cannot remove directory "+directory+ "\n")
114     return
115
e08bff 116                     
57af9d 117
e08bff 118
SP 119 class ClientThread(Thread):
120     def __init__(self,conn_address='http://beti.trisurf.eu',subid=0, update_seconds=100):
121         super(ClientThread,self).__init__()
122         self._stop_event = Event()
123         self._stop_event.clear()
124         self.p=None
125         self.workingdir=None
126         self.conn_address=conn_address
127         self.id=subid
128         self.ip=get_ip()
129         self.hostname=get_hostname()
130         self.update_seconds=update_seconds
59a59b 131         self.max_client_ping_time_elapsed=250
e08bff 132
d357f5 133         self.subruns=[]
SP 134
e08bff 135     def stop(self):
SP 136         self._stop_event.set()
137
138     def isStopped(self):
139         return self._stop_event.is_set()
140
141     def join(self):
142         print('joining threads')
143         super(ClientThread, self).join()
d357f5 144         for sub in self.subruns:
SP 145             sub.stop()
146             sub.join()
e08bff 147         if self.p is not None:
SP 148             self.p.terminate()
149         if self.workingdir is not None:
150             removeDir(self.workingdir.fullpath())
151
152
153     def sleep(self,s):
154         for i in range(0, s):
155             if(self.isStopped()):
156                 return False
157             sleep(1)
158         return True
d357f5 159
SP 160     def subrunsStartStop(self,nr):
161         while(self.id==0 and nr>len(self.subruns)+1):
162             #spawning a new worker:
163             print("[{}] Spawning a new worker".format(self.id))
164             t=ClientThread(conn_address=self.conn_address, subid=len(self.subruns)+1,update_seconds=self.update_seconds)
165             t.start()
166             self.subruns.append(t)    
167         while(self.id==0 and nr<len(self.subruns)+1):
168             print("[{}] Stopping a worker".format(self.id))
169             self.subruns[-1].stop()
170             self.subruns[-1].join()
171             del self.subruns[-1]
e08bff 172     
SP 173     def run(self):
59a59b 174         while(not self.isStopped()): #try to register
e08bff 175             try:
SP 176                 cid=get_client_id(self.conn_address, self.ip, self.hostname, self.id)
177             except:
178                 print("[{}] Could not get CID.".format(self.id))
179                 self.sleep(10)
180                 continue
1bc9bb 181             print("[{}] Connected and got client ID {}.".format(self.id, cid))
SP 182             try:
183                 concurrent_runs=client_ping(self.conn_address,cid)
184                 client_ping_time_elapsed=0
185             except:
186                 self.sleep(10)
187                 continue
d357f5 188             self.subrunsStartStop(concurrent_runs)
59a59b 189             while(not self.isStopped()): #successfully registered, now start pinging and searching for job
e08bff 190                 try:
SP 191                     (rid,tape,vtu,status)=get_run(self.conn_address,cid)
192                 except NameError:
79ee37 193                     #print("[{}] Could not get RID.".format(self.id))
e08bff 194                     self.sleep(10)
SP 195                     client_ping_time_elapsed+=10
59a59b 196                     if(client_ping_time_elapsed>=self.max_client_ping_time_elapsed):
1d3d12 197                         try:
d357f5 198                             concurrent_runs=client_ping(self.conn_address,cid)
SP 199                             self.subrunsStartStop(concurrent_runs)
1d3d12 200                         except:
e08bff 201                             break
SP 202                         client_ping_time_elapsed=0
203                     #if you put break instead of continue, there is no need to ping client. And it is more robust... 
204                     continue
205                 except ValueError:
206                     print("[{}] Wrong CID? Getting new CID.".format(self.id))
207                     #self.sleep(10)
57af9d 208                     break
e08bff 209                 except:
SP 210                     print("[{}] Cannot connect. Server down? Retrying....".format(self.id))
211                     break
212                 else:
213                     #start separate thread with simulations.
214                     self.workingdir=Directory('/tmp/ts_'+str(uuid.uuid4()))
215                     self.workingdir.makeifnotexist()
216                     self.workingdir.goto()
d357f5 217                     #print("[{}] Using directory {}".format(self.id, self.workingdir.fullpath()))
e08bff 218                     with open(self.workingdir.fullpath()+"/tape", 'w') as f:
SP 219                         f.write(tape)
220                     if(int(status)==-1):
221                         cmd=['trisurf', '--force-from-tape']
222                         print("[{}] Run id={} :: Starting from tape.".format(self.id, rid))
223                     else:
224                         with open(self.workingdir.fullpath()+"/.status",'w') as f:
225                             f.write(status)
226                         with open(self.workingdir.fullpath()+"/initial.vtu",'w') as f:
227                             f.write(vtu)
228                         cmd=['trisurf', '--restore-from-vtk', 'initial.vtu']
229                         print("[{}] Run id={} :: Restoring from vtk, last timestep {}".format(self.id,rid,status))
230                     self.p=subprocess.Popen(cmd, stdout=subprocess.DEVNULL)
231                     s=int(status)
232                     while(not self.isStopped()):
233                         #monitor for new file. If file is present, upload it!
234                         newVTU=getNewVTU(self.workingdir.fullpath())
235                         if newVTU: #upload
236                             try:
d357f5 237                                 for nvfile in sorted(newVTU):
SP 238                                     nv=os.path.join(self.workingdir.fullpath(),nvfile)
e08bff 239                                     with open(nv,'r') as f:
SP 240                                         fc=f.read()
241                                     s=s+1
d357f5 242                                     print('[{}] Uploading {}.'.format(self.id,nvfile))
e08bff 243                                     upload(self.conn_address, cid, rid, fc, s)
SP 244                                     os.unlink(nv)
d357f5 245                             except Exception as e:
e08bff 246                                 print("[{}] Could not upload".format(self.id))
d357f5 247                                 print(e)
e08bff 248                                 self.p.terminate()
SP 249                                 removeDir(self.workingdir.fullpath())
250                                 self.p=None
251                                 self.workingdir=None
252                                 break
253                             else:
254                                 print("[{}] VTU uploaded.".format(self.id))
255                         else: #ping
256                             try:
257                                 ping_run(self.conn_address, cid, rid)
258                             except:
1bc9bb 259                                 print("[{}] Could not prolong a lease on the run.".format(self.id))
e08bff 260                                 self.p.terminate()
SP 261                                 self.p=None
262                                 removeDir(self.workingdir.fullpath())
263                                 self.workingdir=None
264                                 #stop simulations
265                                 break
266                         #check if trisurf is still running. If not break the innermost loop.
267                         sleep(1)
268                         if(self.p.poll() is not None): # trisurf exited!
269                             print("[{}] Trisurf was stopped with return code {}".format(self.id, self.p.returncode))
270                             if(self.p.returncode>0):
271                                 try:
272                                     send_error_report(self.conn_address, cid, rid, self.p.returncode)
273                                 except:
274                                     print("[{}] Server didn't accept error report".format(self.id))
275                             removeDir(self.workingdir.fullpath())
276                             self.workingdir=None
277                             self.p=None
278                             break
279                         self.sleep(self.update_seconds-1)
59a59b 280                         client_ping_time_elapsed+=self.update_seconds
SP 281                         if(client_ping_time_elapsed>self.max_client_ping_time_elapsed-self.update_seconds/2):
1bc9bb 282                             try:
SP 283                                 concurrent_runs=client_ping(self.conn_address,cid)
284                             except:
285                                 print("[{}] Could not client ping.".format(self.sid))
286                                 self.p.terminate()
287                                 self.p=None
288                                 removeDir(self.workingdir.fullpath())
289                                 self.workingdir=None
290                                 break
d357f5 291                             self.subrunsStartStop(concurrent_runs)
59a59b 292                             client_ping_time_elapsed=0
57af9d 293
SP 294
e08bff 295
SP 296 #Stolen from trisurf library... Therefore, this client is not dependent on the installation of the library.
297 class Directory:
298     '''
299     Class deals with the paths where the simulation is run and data is stored.
300     '''
301     def __init__(self, maindir=".", simdir=""):
302         '''Initialization Directory() takes two optional parameters, namely maindir and simdir. Defaults to current directory. It sets local variables maindir and simdir accordingly.'''
303         self.maindir=maindir
304         self.simdir=simdir
305         return
306
307     def fullpath(self):
308         '''
309         Method returns string of path where the data is stored. It combines values of maindir and simdir as maindir/simdir on Unix.
310         '''
311         return os.path.join(self.maindir,self.simdir)
312
313     def exists(self):
314         ''' Method checks whether the directory  specified by fullpath() exists. It return True/False on completion.'''
315         path=self.fullpath()
316         if(os.path.exists(path)):
317             return True
318         else:
319             return False
320
321     def make(self):
322         ''' Method make() creates directory. If it fails it exits the program with error message.'''
323         try:
324             os.makedirs(self.fullpath())
325         except:
326             print("Cannot make directory "+self.fullpath()+"\n")
327             exit(1)
328         return
329
330     def makeifnotexist(self):
331         '''Method makeifnotexist() creates directory if it does not exist.'''
332         if(self.exists()==0):
333             self.make()
334             return True
335         else:
336             return False
337
338     def remove(self):
339         '''Method remove() removes directory recursively. WARNING! No questions asked.'''
340         if(self.exists()):
341             try:
342                 os.rmdir(self.fullpath())
343             except:
344                 print("Cannot remove directory "+self.fullpath()+ "\n")
345                 exit(1)
346         return
347
348     def goto(self):
349         '''
350         Method goto() moves current directory to the one specified by fullpath(). WARNING: when using the relative paths, do not call this function multiple times.
351         '''
352         try:
353             os.chdir(self.fullpath())
354         except:
355             print("Cannot go to directory "+self.fullpath()+"\n")
356         return
357
358
359
360 #--- SIGINT and SIGTERM HANDLING ---
361 def signal_handler(signal,frame):
362     t.stop()
363     t.join()
364     print("Process ended with signal " +str(signal))
365     sys.exit(signal)
366 #--- END SIGINT and SIGTERM----
367
368 if __name__ == '__main__':
8058ab 369     #global glob_ts_version
SP 370     glob_ts_version=getTrisurfVersion()
e08bff 371     signal.signal(signal.SIGINT, signal_handler)
SP 372     signal.signal(signal.SIGTERM, signal_handler)
373
374     t=ClientThread(update_seconds=100)
375     t.start()
376     #t.join()
377     #print("main")
378     while(True):
379         sleep(1000)