Samo Penic
2018-06-25 c0d5bcca06dcb6b7e3b570b739a5685d59f4ab04
commit | author | age
57af9d 1 #!/usr/bin/python3
SP 2 import requests
3 import json
4 from time import sleep
5 import uuid
6 import subprocess
7 import os
8 import shutil
9 import signal
10 import sys
11 import socket
e08bff 12 from threading import Thread, Event
4c9734 13 import re
SP 14
15 def getTrisurfVersion():
16     p = subprocess.Popen('trisurf --version', shell=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
17     lines=p.stdout.readlines()
18     version=re.findall(r'[0-9a-f]{7}(?:-dirty)?', lines[0].decode('ascii'))
19     p.wait()
20     if(len(version)):
21         return version[0]
22     else:
23         return "unknown version"
57af9d 24
SP 25 def get_hostname():
26     return socket.gethostname()
27
28 def get_ip():
29     return ((([ip for ip in socket.gethostbyname_ex(socket.gethostname())[2] if not ip.startswith("127.")] or [[(s.connect(("8.8.8.8", 53)), s.getsockname()[0], s.close()) for s in [socket.socket(socket.AF_INET, socket.SOCK_DGRAM)]][0][1]]) + ["no IP found"])[0])
30
e08bff 31 def get_client_id(addr, my_ip, my_hostname, subrun):
4c9734 32     client_auth={'ip':my_ip,'hostname':my_hostname, 'subrun':subrun, 'trisurf_version':getTrisurfVersion() }
57af9d 33     response=requests.post(addr+"/api/register/", data=client_auth)
SP 34     if(response.status_code==200):
35         client_data=json.loads(response.text)
36         client_id=client_data['id']
37         return client_id
38     else:
39         raise ValueError
40
41
42 def get_run(addr,cid):
43     response=requests.get(addr+"/api/getrun/"+str(cid)+"/")
44     if(response.status_code==200):
45         client_data=json.loads(response.text)
46         rid=client_data['id']
47         tape=client_data['tape']
48         vtu=client_data['lastVTU']
49         status=client_data['status']
50         return (rid,tape,vtu,status)
51     else:
aae035 52         print(response.text)
e08bff 53         if(response.status_code==400):
SP 54             raise ValueError
55         else:
56             raise NameError
57af9d 57
SP 58
59 def ping_run(addr,cid, rid):
60     client_data={'client_id':cid, 'run_id':rid}
61     response=requests.post(addr+"/api/ping/", data=client_data)
62     if(response.status_code==200):
63         return
64     else:
65         raise ValueError
e08bff 66
SP 67 def client_ping(addr,cid):
68     client_data={'client_id':cid}
69     response=requests.post(addr+"/api/pingclient/", data=client_data)
70     if(response.status_code==200):
59a59b 71         client_data=json.loads(response.text)
d357f5 72             
SP 73         return client_data['concurrent_runs']
e08bff 74     else:
SP 75         raise ValueError
76
77
57af9d 78
1d3d12 79 def send_error_report(addr,cid, rid,errcode):
SP 80     client_data={'client_id':cid, 'run_id':rid, 'error_code':errcode}
81     response=requests.post(addr+"/api/reporterr/", data=client_data)
82     if(response.status_code==200):
83         return
84     else:
85         raise ValueError
86
57af9d 87 def upload(addr,cid, rid, vtu, status):
SP 88     client_data={'client_id': cid, 'run_id': rid, 'lastVTU': vtu, 'status': status}
89     response=requests.post(addr+"/api/upload/", data=client_data)
90     if(response.status_code==200):
91         return
92     else:
93         raise ValueError
94
95 def getNewVTU(directory):
96     fset=set()
97     for file in os.listdir(directory):
98         if file.endswith(".vtu") and file.startswith("timestep_"):
99             fset.add(file)
100     return fset
101
102
103 def removeDir(directory):
104     os.chdir('/')
105     try:
106         shutil.rmtree(directory)
107     except:
108         print("Cannot remove directory "+directory+ "\n")
109     return
110
e08bff 111                     
57af9d 112
e08bff 113
SP 114 class ClientThread(Thread):
115     def __init__(self,conn_address='http://beti.trisurf.eu',subid=0, update_seconds=100):
116         super(ClientThread,self).__init__()
117         self._stop_event = Event()
118         self._stop_event.clear()
119         self.p=None
120         self.workingdir=None
121         self.conn_address=conn_address
122         self.id=subid
123         self.ip=get_ip()
124         self.hostname=get_hostname()
125         self.update_seconds=update_seconds
59a59b 126         self.max_client_ping_time_elapsed=250
e08bff 127
d357f5 128         self.subruns=[]
SP 129
e08bff 130     def stop(self):
SP 131         self._stop_event.set()
132
133     def isStopped(self):
134         return self._stop_event.is_set()
135
136     def join(self):
137         print('joining threads')
138         super(ClientThread, self).join()
d357f5 139         for sub in self.subruns:
SP 140             sub.stop()
141             sub.join()
e08bff 142         if self.p is not None:
SP 143             self.p.terminate()
144         if self.workingdir is not None:
145             removeDir(self.workingdir.fullpath())
146
147
148     def sleep(self,s):
149         for i in range(0, s):
150             if(self.isStopped()):
151                 return False
152             sleep(1)
153         return True
d357f5 154
SP 155     def subrunsStartStop(self,nr):
156         while(self.id==0 and nr>len(self.subruns)+1):
157             #spawning a new worker:
158             print("[{}] Spawning a new worker".format(self.id))
159             t=ClientThread(conn_address=self.conn_address, subid=len(self.subruns)+1,update_seconds=self.update_seconds)
160             t.start()
161             self.subruns.append(t)    
162         while(self.id==0 and nr<len(self.subruns)+1):
163             print("[{}] Stopping a worker".format(self.id))
164             self.subruns[-1].stop()
165             self.subruns[-1].join()
166             del self.subruns[-1]
e08bff 167     
SP 168     def run(self):
59a59b 169         while(not self.isStopped()): #try to register
e08bff 170             try:
SP 171                 cid=get_client_id(self.conn_address, self.ip, self.hostname, self.id)
172             except:
173                 print("[{}] Could not get CID.".format(self.id))
174                 self.sleep(10)
175                 continue
176             #print("Got CID. getting RID.")
177             client_ping_time_elapsed=0
d357f5 178             concurrent_runs=client_ping(self.conn_address,cid)
SP 179             self.subrunsStartStop(concurrent_runs)
59a59b 180             while(not self.isStopped()): #successfully registered, now start pinging and searching for job
e08bff 181                 try:
SP 182                     (rid,tape,vtu,status)=get_run(self.conn_address,cid)
183                 except NameError:
184                     print("[{}] Could not get RID.".format(self.id))
185                     self.sleep(10)
186                     client_ping_time_elapsed+=10
59a59b 187                     if(client_ping_time_elapsed>=self.max_client_ping_time_elapsed):
1d3d12 188                         try:
d357f5 189                             concurrent_runs=client_ping(self.conn_address,cid)
SP 190                             self.subrunsStartStop(concurrent_runs)
1d3d12 191                         except:
e08bff 192                             break
SP 193                         client_ping_time_elapsed=0
194                     #if you put break instead of continue, there is no need to ping client. And it is more robust... 
195                     continue
196                 except ValueError:
197                     print("[{}] Wrong CID? Getting new CID.".format(self.id))
198                     #self.sleep(10)
57af9d 199                     break
e08bff 200                 except:
SP 201                     print("[{}] Cannot connect. Server down? Retrying....".format(self.id))
202                     break
203                 else:
204                     #start separate thread with simulations.
205                     self.workingdir=Directory('/tmp/ts_'+str(uuid.uuid4()))
206                     self.workingdir.makeifnotexist()
207                     self.workingdir.goto()
d357f5 208                     #print("[{}] Using directory {}".format(self.id, self.workingdir.fullpath()))
e08bff 209                     with open(self.workingdir.fullpath()+"/tape", 'w') as f:
SP 210                         f.write(tape)
211                     if(int(status)==-1):
212                         cmd=['trisurf', '--force-from-tape']
213                         print("[{}] Run id={} :: Starting from tape.".format(self.id, rid))
214                     else:
215                         with open(self.workingdir.fullpath()+"/.status",'w') as f:
216                             f.write(status)
217                         with open(self.workingdir.fullpath()+"/initial.vtu",'w') as f:
218                             f.write(vtu)
219                         cmd=['trisurf', '--restore-from-vtk', 'initial.vtu']
220                         print("[{}] Run id={} :: Restoring from vtk, last timestep {}".format(self.id,rid,status))
221                     self.p=subprocess.Popen(cmd, stdout=subprocess.DEVNULL)
222                     s=int(status)
223                     while(not self.isStopped()):
224                         #monitor for new file. If file is present, upload it!
225                         newVTU=getNewVTU(self.workingdir.fullpath())
226                         if newVTU: #upload
227                             try:
d357f5 228                                 for nvfile in sorted(newVTU):
SP 229                                     nv=os.path.join(self.workingdir.fullpath(),nvfile)
e08bff 230                                     with open(nv,'r') as f:
SP 231                                         fc=f.read()
232                                     s=s+1
d357f5 233                                     print('[{}] Uploading {}.'.format(self.id,nvfile))
e08bff 234                                     upload(self.conn_address, cid, rid, fc, s)
SP 235                                     os.unlink(nv)
d357f5 236                             except Exception as e:
e08bff 237                                 print("[{}] Could not upload".format(self.id))
d357f5 238                                 print(e)
e08bff 239                                 self.p.terminate()
SP 240                                 removeDir(self.workingdir.fullpath())
241                                 self.p=None
242                                 self.workingdir=None
243                                 break
244                             else:
245                                 print("[{}] VTU uploaded.".format(self.id))
246                         else: #ping
247                             try:
248                                 ping_run(self.conn_address, cid, rid)
249                             except:
250                                 print("[{}] Could not ping.".format(self.id))
251                                 self.p.terminate()
252                                 self.p=None
253                                 removeDir(self.workingdir.fullpath())
254                                 self.workingdir=None
255                                 #stop simulations
256                                 break
257                         #check if trisurf is still running. If not break the innermost loop.
258                         sleep(1)
259                         if(self.p.poll() is not None): # trisurf exited!
260                             print("[{}] Trisurf was stopped with return code {}".format(self.id, self.p.returncode))
261                             if(self.p.returncode>0):
262                                 try:
263                                     send_error_report(self.conn_address, cid, rid, self.p.returncode)
264                                 except:
265                                     print("[{}] Server didn't accept error report".format(self.id))
266                             removeDir(self.workingdir.fullpath())
267                             self.workingdir=None
268                             self.p=None
269                             break
270                         self.sleep(self.update_seconds-1)
59a59b 271                         client_ping_time_elapsed+=self.update_seconds
SP 272                         if(client_ping_time_elapsed>self.max_client_ping_time_elapsed-self.update_seconds/2):
d357f5 273                             concurrent_runs=client_ping(self.conn_address,cid)
SP 274                             self.subrunsStartStop(concurrent_runs)
59a59b 275                             client_ping_time_elapsed=0
57af9d 276
SP 277
e08bff 278
SP 279 #Stolen from trisurf library... Therefore, this client is not dependent on the installation of the library.
280 class Directory:
281     '''
282     Class deals with the paths where the simulation is run and data is stored.
283     '''
284     def __init__(self, maindir=".", simdir=""):
285         '''Initialization Directory() takes two optional parameters, namely maindir and simdir. Defaults to current directory. It sets local variables maindir and simdir accordingly.'''
286         self.maindir=maindir
287         self.simdir=simdir
288         return
289
290     def fullpath(self):
291         '''
292         Method returns string of path where the data is stored. It combines values of maindir and simdir as maindir/simdir on Unix.
293         '''
294         return os.path.join(self.maindir,self.simdir)
295
296     def exists(self):
297         ''' Method checks whether the directory  specified by fullpath() exists. It return True/False on completion.'''
298         path=self.fullpath()
299         if(os.path.exists(path)):
300             return True
301         else:
302             return False
303
304     def make(self):
305         ''' Method make() creates directory. If it fails it exits the program with error message.'''
306         try:
307             os.makedirs(self.fullpath())
308         except:
309             print("Cannot make directory "+self.fullpath()+"\n")
310             exit(1)
311         return
312
313     def makeifnotexist(self):
314         '''Method makeifnotexist() creates directory if it does not exist.'''
315         if(self.exists()==0):
316             self.make()
317             return True
318         else:
319             return False
320
321     def remove(self):
322         '''Method remove() removes directory recursively. WARNING! No questions asked.'''
323         if(self.exists()):
324             try:
325                 os.rmdir(self.fullpath())
326             except:
327                 print("Cannot remove directory "+self.fullpath()+ "\n")
328                 exit(1)
329         return
330
331     def goto(self):
332         '''
333         Method goto() moves current directory to the one specified by fullpath(). WARNING: when using the relative paths, do not call this function multiple times.
334         '''
335         try:
336             os.chdir(self.fullpath())
337         except:
338             print("Cannot go to directory "+self.fullpath()+"\n")
339         return
340
341
342
343 #--- SIGINT and SIGTERM HANDLING ---
344 def signal_handler(signal,frame):
345     t.stop()
346     t.join()
347     print("Process ended with signal " +str(signal))
348     sys.exit(signal)
349 #--- END SIGINT and SIGTERM----
350
351 if __name__ == '__main__':
352
353     signal.signal(signal.SIGINT, signal_handler)
354     signal.signal(signal.SIGTERM, signal_handler)
355
356     t=ClientThread(update_seconds=100)
357     t.start()
358     #t.join()
359     #print("main")
360     while(True):
361         sleep(1000)