Samo Penic
2018-06-02 59a59b50a078d7b74a5ba42446d2b5c1a1bf7514
commit | author | age
57af9d 1 #!/usr/bin/python3
SP 2 import requests
3 import json
4 from time import sleep
5 import uuid
6 import subprocess
7 import os
8 import shutil
9 import signal
10 import sys
11 import socket
e08bff 12 from threading import Thread, Event
57af9d 13
SP 14 def get_hostname():
15     return socket.gethostname()
16
17 def get_ip():
18     return ((([ip for ip in socket.gethostbyname_ex(socket.gethostname())[2] if not ip.startswith("127.")] or [[(s.connect(("8.8.8.8", 53)), s.getsockname()[0], s.close()) for s in [socket.socket(socket.AF_INET, socket.SOCK_DGRAM)]][0][1]]) + ["no IP found"])[0])
19
e08bff 20 def get_client_id(addr, my_ip, my_hostname, subrun):
SP 21     client_auth={'ip':my_ip,'hostname':my_hostname, 'subrun':subrun}
57af9d 22     response=requests.post(addr+"/api/register/", data=client_auth)
SP 23     if(response.status_code==200):
24         client_data=json.loads(response.text)
25         client_id=client_data['id']
26         return client_id
27     else:
28         raise ValueError
29
30
31 def get_run(addr,cid):
32     response=requests.get(addr+"/api/getrun/"+str(cid)+"/")
33     if(response.status_code==200):
34         client_data=json.loads(response.text)
35         rid=client_data['id']
36         tape=client_data['tape']
37         vtu=client_data['lastVTU']
38         status=client_data['status']
39         return (rid,tape,vtu,status)
40     else:
aae035 41         print(response.text)
e08bff 42         if(response.status_code==400):
SP 43             raise ValueError
44         else:
45             raise NameError
57af9d 46
SP 47
48 def ping_run(addr,cid, rid):
49     client_data={'client_id':cid, 'run_id':rid}
50     response=requests.post(addr+"/api/ping/", data=client_data)
51     if(response.status_code==200):
52         return
53     else:
54         raise ValueError
e08bff 55
SP 56 def client_ping(addr,cid):
57     client_data={'client_id':cid}
58     response=requests.post(addr+"/api/pingclient/", data=client_data)
59     if(response.status_code==200):
59a59b 60         client_data=json.loads(response.text)
SP 61         
e08bff 62         return
SP 63     else:
64         raise ValueError
65
66
57af9d 67
1d3d12 68 def send_error_report(addr,cid, rid,errcode):
SP 69     client_data={'client_id':cid, 'run_id':rid, 'error_code':errcode}
70     response=requests.post(addr+"/api/reporterr/", data=client_data)
71     if(response.status_code==200):
72         return
73     else:
74         raise ValueError
75
57af9d 76 def upload(addr,cid, rid, vtu, status):
SP 77     client_data={'client_id': cid, 'run_id': rid, 'lastVTU': vtu, 'status': status}
78     response=requests.post(addr+"/api/upload/", data=client_data)
79     if(response.status_code==200):
80         return
81     else:
82         raise ValueError
83
84 def getNewVTU(directory):
85     fset=set()
86     for file in os.listdir(directory):
87         if file.endswith(".vtu") and file.startswith("timestep_"):
88             fset.add(file)
89     return fset
90
91
92 def removeDir(directory):
93     os.chdir('/')
94     try:
95         shutil.rmtree(directory)
96     except:
97         print("Cannot remove directory "+directory+ "\n")
98     return
99
e08bff 100                     
57af9d 101
e08bff 102
SP 103 class ClientThread(Thread):
104
105     def __init__(self,conn_address='http://beti.trisurf.eu',subid=0, update_seconds=100):
106         super(ClientThread,self).__init__()
107         self._stop_event = Event()
108         self._stop_event.clear()
109         self.p=None
110         self.workingdir=None
111         self.conn_address=conn_address
112         self.id=subid
113         self.ip=get_ip()
114         self.hostname=get_hostname()
115         self.update_seconds=update_seconds
59a59b 116         self.max_client_ping_time_elapsed=250
e08bff 117
SP 118     def stop(self):
119         self._stop_event.set()
120
121     def isStopped(self):
122         return self._stop_event.is_set()
123
124     def join(self):
125         print('joining threads')
126         super(ClientThread, self).join()
127         if self.p is not None:
128             self.p.terminate()
129         if self.workingdir is not None:
130             removeDir(self.workingdir.fullpath())
131
132
133     def sleep(self,s):
134         for i in range(0, s):
135             if(self.isStopped()):
136                 return False
137             sleep(1)
138         return True
139     
140     def run(self):
59a59b 141         while(not self.isStopped()): #try to register
e08bff 142             try:
SP 143                 cid=get_client_id(self.conn_address, self.ip, self.hostname, self.id)
144             except:
145                 print("[{}] Could not get CID.".format(self.id))
146                 self.sleep(10)
147                 continue
148             #print("Got CID. getting RID.")
149             client_ping_time_elapsed=0
59a59b 150             while(not self.isStopped()): #successfully registered, now start pinging and searching for job
e08bff 151                 try:
SP 152                     (rid,tape,vtu,status)=get_run(self.conn_address,cid)
153                 except NameError:
154                     print("[{}] Could not get RID.".format(self.id))
155                     self.sleep(10)
156                     client_ping_time_elapsed+=10
59a59b 157                     if(client_ping_time_elapsed>=self.max_client_ping_time_elapsed):
1d3d12 158                         try:
e08bff 159                             client_ping(self.conn_address,cid)
1d3d12 160                         except:
e08bff 161                             break
SP 162                         client_ping_time_elapsed=0
163                     #if you put break instead of continue, there is no need to ping client. And it is more robust... 
164                     continue
165                 except ValueError:
166                     print("[{}] Wrong CID? Getting new CID.".format(self.id))
167                     #self.sleep(10)
57af9d 168                     break
e08bff 169                 except:
SP 170                     print("[{}] Cannot connect. Server down? Retrying....".format(self.id))
171                     break
172                 else:
173                     #start separate thread with simulations.
174                     self.workingdir=Directory('/tmp/ts_'+str(uuid.uuid4()))
175                     self.workingdir.makeifnotexist()
176                     self.workingdir.goto()
177                     with open(self.workingdir.fullpath()+"/tape", 'w') as f:
178                         f.write(tape)
179                     if(int(status)==-1):
180                         cmd=['trisurf', '--force-from-tape']
181                         print("[{}] Run id={} :: Starting from tape.".format(self.id, rid))
182                     else:
183                         with open(self.workingdir.fullpath()+"/.status",'w') as f:
184                             f.write(status)
185                         with open(self.workingdir.fullpath()+"/initial.vtu",'w') as f:
186                             f.write(vtu)
187                         cmd=['trisurf', '--restore-from-vtk', 'initial.vtu']
188                         print("[{}] Run id={} :: Restoring from vtk, last timestep {}".format(self.id,rid,status))
189                     self.p=subprocess.Popen(cmd, stdout=subprocess.DEVNULL)
190                     s=int(status)
191                     while(not self.isStopped()):
192                         #monitor for new file. If file is present, upload it!
193                         newVTU=getNewVTU(self.workingdir.fullpath())
194                         if newVTU: #upload
195                             try:
196                                 for nv in sorted(newVTU):
197                                     with open(nv,'r') as f:
198                                         fc=f.read()
199                                     s=s+1
200                                     print('[{}] Uploading {}.'.format(self.id,nv))
201                                     upload(self.conn_address, cid, rid, fc, s)
202                                     os.unlink(nv)
203                             except:
204                                 print("[{}] Could not upload".format(self.id))
205                                 self.p.terminate()
206                                 removeDir(self.workingdir.fullpath())
207                                 self.p=None
208                                 self.workingdir=None
209                                 break
210                             else:
211                                 print("[{}] VTU uploaded.".format(self.id))
212                         else: #ping
213                             try:
214                                 ping_run(self.conn_address, cid, rid)
215                             except:
216                                 print("[{}] Could not ping.".format(self.id))
217                                 self.p.terminate()
218                                 self.p=None
219                                 removeDir(self.workingdir.fullpath())
220                                 self.workingdir=None
221                                 #stop simulations
222                                 break
223                         #check if trisurf is still running. If not break the innermost loop.
224                         sleep(1)
225                         if(self.p.poll() is not None): # trisurf exited!
226                             print("[{}] Trisurf was stopped with return code {}".format(self.id, self.p.returncode))
227                             if(self.p.returncode>0):
228                                 try:
229                                     send_error_report(self.conn_address, cid, rid, self.p.returncode)
230                                 except:
231                                     print("[{}] Server didn't accept error report".format(self.id))
232                             removeDir(self.workingdir.fullpath())
233                             self.workingdir=None
234                             self.p=None
235                             break
236                         self.sleep(self.update_seconds-1)
59a59b 237                         client_ping_time_elapsed+=self.update_seconds
SP 238                         if(client_ping_time_elapsed>self.max_client_ping_time_elapsed-self.update_seconds/2):
239                             client_ping(self.conn_address,cid)
240                             client_ping_time_elapsed=0
57af9d 241
SP 242
e08bff 243
SP 244 #Stolen from trisurf library... Therefore, this client is not dependent on the installation of the library.
245 class Directory:
246     '''
247     Class deals with the paths where the simulation is run and data is stored.
248     '''
249     def __init__(self, maindir=".", simdir=""):
250         '''Initialization Directory() takes two optional parameters, namely maindir and simdir. Defaults to current directory. It sets local variables maindir and simdir accordingly.'''
251         self.maindir=maindir
252         self.simdir=simdir
253         return
254
255     def fullpath(self):
256         '''
257         Method returns string of path where the data is stored. It combines values of maindir and simdir as maindir/simdir on Unix.
258         '''
259         return os.path.join(self.maindir,self.simdir)
260
261     def exists(self):
262         ''' Method checks whether the directory  specified by fullpath() exists. It return True/False on completion.'''
263         path=self.fullpath()
264         if(os.path.exists(path)):
265             return True
266         else:
267             return False
268
269     def make(self):
270         ''' Method make() creates directory. If it fails it exits the program with error message.'''
271         try:
272             os.makedirs(self.fullpath())
273         except:
274             print("Cannot make directory "+self.fullpath()+"\n")
275             exit(1)
276         return
277
278     def makeifnotexist(self):
279         '''Method makeifnotexist() creates directory if it does not exist.'''
280         if(self.exists()==0):
281             self.make()
282             return True
283         else:
284             return False
285
286     def remove(self):
287         '''Method remove() removes directory recursively. WARNING! No questions asked.'''
288         if(self.exists()):
289             try:
290                 os.rmdir(self.fullpath())
291             except:
292                 print("Cannot remove directory "+self.fullpath()+ "\n")
293                 exit(1)
294         return
295
296     def goto(self):
297         '''
298         Method goto() moves current directory to the one specified by fullpath(). WARNING: when using the relative paths, do not call this function multiple times.
299         '''
300         try:
301             os.chdir(self.fullpath())
302         except:
303             print("Cannot go to directory "+self.fullpath()+"\n")
304         return
305
306
307
308 #--- SIGINT and SIGTERM HANDLING ---
309 def signal_handler(signal,frame):
310     t.stop()
311     t.join()
312     print("Process ended with signal " +str(signal))
313     sys.exit(signal)
314 #--- END SIGINT and SIGTERM----
315
316 if __name__ == '__main__':
317
318     signal.signal(signal.SIGINT, signal_handler)
319     signal.signal(signal.SIGTERM, signal_handler)
320
321     t=ClientThread(update_seconds=100)
322     t.start()
323     #t.join()
324     #print("main")
325     while(True):
326         sleep(1000)