Samo Penic
2018-06-04 d357f502ab18a6c5574e7b0e573c19a01c7dfa32
commit | author | age
57af9d 1 #!/usr/bin/python3
SP 2 import requests
3 import json
4 from time import sleep
5 import uuid
6 import subprocess
7 import os
8 import shutil
9 import signal
10 import sys
11 import socket
e08bff 12 from threading import Thread, Event
57af9d 13
SP 14 def get_hostname():
15     return socket.gethostname()
16
17 def get_ip():
18     return ((([ip for ip in socket.gethostbyname_ex(socket.gethostname())[2] if not ip.startswith("127.")] or [[(s.connect(("8.8.8.8", 53)), s.getsockname()[0], s.close()) for s in [socket.socket(socket.AF_INET, socket.SOCK_DGRAM)]][0][1]]) + ["no IP found"])[0])
19
e08bff 20 def get_client_id(addr, my_ip, my_hostname, subrun):
SP 21     client_auth={'ip':my_ip,'hostname':my_hostname, 'subrun':subrun}
57af9d 22     response=requests.post(addr+"/api/register/", data=client_auth)
SP 23     if(response.status_code==200):
24         client_data=json.loads(response.text)
25         client_id=client_data['id']
26         return client_id
27     else:
28         raise ValueError
29
30
31 def get_run(addr,cid):
32     response=requests.get(addr+"/api/getrun/"+str(cid)+"/")
33     if(response.status_code==200):
34         client_data=json.loads(response.text)
35         rid=client_data['id']
36         tape=client_data['tape']
37         vtu=client_data['lastVTU']
38         status=client_data['status']
39         return (rid,tape,vtu,status)
40     else:
aae035 41         print(response.text)
e08bff 42         if(response.status_code==400):
SP 43             raise ValueError
44         else:
45             raise NameError
57af9d 46
SP 47
48 def ping_run(addr,cid, rid):
49     client_data={'client_id':cid, 'run_id':rid}
50     response=requests.post(addr+"/api/ping/", data=client_data)
51     if(response.status_code==200):
52         return
53     else:
54         raise ValueError
e08bff 55
SP 56 def client_ping(addr,cid):
57     client_data={'client_id':cid}
58     response=requests.post(addr+"/api/pingclient/", data=client_data)
59     if(response.status_code==200):
59a59b 60         client_data=json.loads(response.text)
d357f5 61             
SP 62         return client_data['concurrent_runs']
e08bff 63     else:
SP 64         raise ValueError
65
66
57af9d 67
1d3d12 68 def send_error_report(addr,cid, rid,errcode):
SP 69     client_data={'client_id':cid, 'run_id':rid, 'error_code':errcode}
70     response=requests.post(addr+"/api/reporterr/", data=client_data)
71     if(response.status_code==200):
72         return
73     else:
74         raise ValueError
75
57af9d 76 def upload(addr,cid, rid, vtu, status):
SP 77     client_data={'client_id': cid, 'run_id': rid, 'lastVTU': vtu, 'status': status}
78     response=requests.post(addr+"/api/upload/", data=client_data)
79     if(response.status_code==200):
80         return
81     else:
82         raise ValueError
83
84 def getNewVTU(directory):
85     fset=set()
86     for file in os.listdir(directory):
87         if file.endswith(".vtu") and file.startswith("timestep_"):
88             fset.add(file)
89     return fset
90
91
92 def removeDir(directory):
93     os.chdir('/')
94     try:
95         shutil.rmtree(directory)
96     except:
97         print("Cannot remove directory "+directory+ "\n")
98     return
99
e08bff 100                     
57af9d 101
e08bff 102
SP 103 class ClientThread(Thread):
104     def __init__(self,conn_address='http://beti.trisurf.eu',subid=0, update_seconds=100):
105         super(ClientThread,self).__init__()
106         self._stop_event = Event()
107         self._stop_event.clear()
108         self.p=None
109         self.workingdir=None
110         self.conn_address=conn_address
111         self.id=subid
112         self.ip=get_ip()
113         self.hostname=get_hostname()
114         self.update_seconds=update_seconds
59a59b 115         self.max_client_ping_time_elapsed=250
e08bff 116
d357f5 117         self.subruns=[]
SP 118
e08bff 119     def stop(self):
SP 120         self._stop_event.set()
121
122     def isStopped(self):
123         return self._stop_event.is_set()
124
125     def join(self):
126         print('joining threads')
127         super(ClientThread, self).join()
d357f5 128         for sub in self.subruns:
SP 129             sub.stop()
130             sub.join()
e08bff 131         if self.p is not None:
SP 132             self.p.terminate()
133         if self.workingdir is not None:
134             removeDir(self.workingdir.fullpath())
135
136
137     def sleep(self,s):
138         for i in range(0, s):
139             if(self.isStopped()):
140                 return False
141             sleep(1)
142         return True
d357f5 143
SP 144     def subrunsStartStop(self,nr):
145         while(self.id==0 and nr>len(self.subruns)+1):
146             #spawning a new worker:
147             print("[{}] Spawning a new worker".format(self.id))
148             t=ClientThread(conn_address=self.conn_address, subid=len(self.subruns)+1,update_seconds=self.update_seconds)
149             t.start()
150             self.subruns.append(t)    
151         while(self.id==0 and nr<len(self.subruns)+1):
152             print("[{}] Stopping a worker".format(self.id))
153             self.subruns[-1].stop()
154             self.subruns[-1].join()
155             del self.subruns[-1]
e08bff 156     
SP 157     def run(self):
59a59b 158         while(not self.isStopped()): #try to register
e08bff 159             try:
SP 160                 cid=get_client_id(self.conn_address, self.ip, self.hostname, self.id)
161             except:
162                 print("[{}] Could not get CID.".format(self.id))
163                 self.sleep(10)
164                 continue
165             #print("Got CID. getting RID.")
166             client_ping_time_elapsed=0
d357f5 167             concurrent_runs=client_ping(self.conn_address,cid)
SP 168             self.subrunsStartStop(concurrent_runs)
59a59b 169             while(not self.isStopped()): #successfully registered, now start pinging and searching for job
e08bff 170                 try:
SP 171                     (rid,tape,vtu,status)=get_run(self.conn_address,cid)
172                 except NameError:
173                     print("[{}] Could not get RID.".format(self.id))
174                     self.sleep(10)
175                     client_ping_time_elapsed+=10
59a59b 176                     if(client_ping_time_elapsed>=self.max_client_ping_time_elapsed):
1d3d12 177                         try:
d357f5 178                             concurrent_runs=client_ping(self.conn_address,cid)
SP 179                             self.subrunsStartStop(concurrent_runs)
1d3d12 180                         except:
e08bff 181                             break
SP 182                         client_ping_time_elapsed=0
183                     #if you put break instead of continue, there is no need to ping client. And it is more robust... 
184                     continue
185                 except ValueError:
186                     print("[{}] Wrong CID? Getting new CID.".format(self.id))
187                     #self.sleep(10)
57af9d 188                     break
e08bff 189                 except:
SP 190                     print("[{}] Cannot connect. Server down? Retrying....".format(self.id))
191                     break
192                 else:
193                     #start separate thread with simulations.
194                     self.workingdir=Directory('/tmp/ts_'+str(uuid.uuid4()))
195                     self.workingdir.makeifnotexist()
196                     self.workingdir.goto()
d357f5 197                     #print("[{}] Using directory {}".format(self.id, self.workingdir.fullpath()))
e08bff 198                     with open(self.workingdir.fullpath()+"/tape", 'w') as f:
SP 199                         f.write(tape)
200                     if(int(status)==-1):
201                         cmd=['trisurf', '--force-from-tape']
202                         print("[{}] Run id={} :: Starting from tape.".format(self.id, rid))
203                     else:
204                         with open(self.workingdir.fullpath()+"/.status",'w') as f:
205                             f.write(status)
206                         with open(self.workingdir.fullpath()+"/initial.vtu",'w') as f:
207                             f.write(vtu)
208                         cmd=['trisurf', '--restore-from-vtk', 'initial.vtu']
209                         print("[{}] Run id={} :: Restoring from vtk, last timestep {}".format(self.id,rid,status))
210                     self.p=subprocess.Popen(cmd, stdout=subprocess.DEVNULL)
211                     s=int(status)
212                     while(not self.isStopped()):
213                         #monitor for new file. If file is present, upload it!
214                         newVTU=getNewVTU(self.workingdir.fullpath())
215                         if newVTU: #upload
216                             try:
d357f5 217                                 for nvfile in sorted(newVTU):
SP 218                                     nv=os.path.join(self.workingdir.fullpath(),nvfile)
e08bff 219                                     with open(nv,'r') as f:
SP 220                                         fc=f.read()
221                                     s=s+1
d357f5 222                                     print('[{}] Uploading {}.'.format(self.id,nvfile))
e08bff 223                                     upload(self.conn_address, cid, rid, fc, s)
SP 224                                     os.unlink(nv)
d357f5 225                             except Exception as e:
e08bff 226                                 print("[{}] Could not upload".format(self.id))
d357f5 227                                 print(e)
e08bff 228                                 self.p.terminate()
SP 229                                 removeDir(self.workingdir.fullpath())
230                                 self.p=None
231                                 self.workingdir=None
232                                 break
233                             else:
234                                 print("[{}] VTU uploaded.".format(self.id))
235                         else: #ping
236                             try:
237                                 ping_run(self.conn_address, cid, rid)
238                             except:
239                                 print("[{}] Could not ping.".format(self.id))
240                                 self.p.terminate()
241                                 self.p=None
242                                 removeDir(self.workingdir.fullpath())
243                                 self.workingdir=None
244                                 #stop simulations
245                                 break
246                         #check if trisurf is still running. If not break the innermost loop.
247                         sleep(1)
248                         if(self.p.poll() is not None): # trisurf exited!
249                             print("[{}] Trisurf was stopped with return code {}".format(self.id, self.p.returncode))
250                             if(self.p.returncode>0):
251                                 try:
252                                     send_error_report(self.conn_address, cid, rid, self.p.returncode)
253                                 except:
254                                     print("[{}] Server didn't accept error report".format(self.id))
255                             removeDir(self.workingdir.fullpath())
256                             self.workingdir=None
257                             self.p=None
258                             break
259                         self.sleep(self.update_seconds-1)
59a59b 260                         client_ping_time_elapsed+=self.update_seconds
SP 261                         if(client_ping_time_elapsed>self.max_client_ping_time_elapsed-self.update_seconds/2):
d357f5 262                             concurrent_runs=client_ping(self.conn_address,cid)
SP 263                             self.subrunsStartStop(concurrent_runs)
59a59b 264                             client_ping_time_elapsed=0
57af9d 265
SP 266
e08bff 267
SP 268 #Stolen from trisurf library... Therefore, this client is not dependent on the installation of the library.
269 class Directory:
270     '''
271     Class deals with the paths where the simulation is run and data is stored.
272     '''
273     def __init__(self, maindir=".", simdir=""):
274         '''Initialization Directory() takes two optional parameters, namely maindir and simdir. Defaults to current directory. It sets local variables maindir and simdir accordingly.'''
275         self.maindir=maindir
276         self.simdir=simdir
277         return
278
279     def fullpath(self):
280         '''
281         Method returns string of path where the data is stored. It combines values of maindir and simdir as maindir/simdir on Unix.
282         '''
283         return os.path.join(self.maindir,self.simdir)
284
285     def exists(self):
286         ''' Method checks whether the directory  specified by fullpath() exists. It return True/False on completion.'''
287         path=self.fullpath()
288         if(os.path.exists(path)):
289             return True
290         else:
291             return False
292
293     def make(self):
294         ''' Method make() creates directory. If it fails it exits the program with error message.'''
295         try:
296             os.makedirs(self.fullpath())
297         except:
298             print("Cannot make directory "+self.fullpath()+"\n")
299             exit(1)
300         return
301
302     def makeifnotexist(self):
303         '''Method makeifnotexist() creates directory if it does not exist.'''
304         if(self.exists()==0):
305             self.make()
306             return True
307         else:
308             return False
309
310     def remove(self):
311         '''Method remove() removes directory recursively. WARNING! No questions asked.'''
312         if(self.exists()):
313             try:
314                 os.rmdir(self.fullpath())
315             except:
316                 print("Cannot remove directory "+self.fullpath()+ "\n")
317                 exit(1)
318         return
319
320     def goto(self):
321         '''
322         Method goto() moves current directory to the one specified by fullpath(). WARNING: when using the relative paths, do not call this function multiple times.
323         '''
324         try:
325             os.chdir(self.fullpath())
326         except:
327             print("Cannot go to directory "+self.fullpath()+"\n")
328         return
329
330
331
332 #--- SIGINT and SIGTERM HANDLING ---
333 def signal_handler(signal,frame):
334     t.stop()
335     t.join()
336     print("Process ended with signal " +str(signal))
337     sys.exit(signal)
338 #--- END SIGINT and SIGTERM----
339
340 if __name__ == '__main__':
341
342     signal.signal(signal.SIGINT, signal_handler)
343     signal.signal(signal.SIGTERM, signal_handler)
344
345     t=ClientThread(update_seconds=100)
346     t.start()
347     #t.join()
348     #print("main")
349     while(True):
350         sleep(1000)