Samo Penic
2018-06-02 e08bfff865c6deff8251d71d30cf9c3ec62c7fac
commit | author | age
57af9d 1 #!/usr/bin/python3
SP 2 import requests
3 import json
4 from time import sleep
5 import uuid
6 import subprocess
7 import os
8 import shutil
9 import signal
10 import sys
11 import socket
e08bff 12 from threading import Thread, Event
57af9d 13
SP 14 def get_hostname():
15     return socket.gethostname()
16
17 def get_ip():
18     return ((([ip for ip in socket.gethostbyname_ex(socket.gethostname())[2] if not ip.startswith("127.")] or [[(s.connect(("8.8.8.8", 53)), s.getsockname()[0], s.close()) for s in [socket.socket(socket.AF_INET, socket.SOCK_DGRAM)]][0][1]]) + ["no IP found"])[0])
19
e08bff 20 def get_client_id(addr, my_ip, my_hostname, subrun):
SP 21     client_auth={'ip':my_ip,'hostname':my_hostname, 'subrun':subrun}
57af9d 22     response=requests.post(addr+"/api/register/", data=client_auth)
SP 23     if(response.status_code==200):
24         client_data=json.loads(response.text)
25         client_id=client_data['id']
26         return client_id
27     else:
28         raise ValueError
29
30
31 def get_run(addr,cid):
32     response=requests.get(addr+"/api/getrun/"+str(cid)+"/")
33     if(response.status_code==200):
34         client_data=json.loads(response.text)
35         rid=client_data['id']
36         tape=client_data['tape']
37         vtu=client_data['lastVTU']
38         status=client_data['status']
39         return (rid,tape,vtu,status)
40     else:
aae035 41         print(response.text)
e08bff 42         if(response.status_code==400):
SP 43             raise ValueError
44         else:
45             raise NameError
57af9d 46
SP 47
48 def ping_run(addr,cid, rid):
49     client_data={'client_id':cid, 'run_id':rid}
50     response=requests.post(addr+"/api/ping/", data=client_data)
51     if(response.status_code==200):
52         return
53     else:
54         raise ValueError
e08bff 55
SP 56 def client_ping(addr,cid):
57     client_data={'client_id':cid}
58     response=requests.post(addr+"/api/pingclient/", data=client_data)
59     if(response.status_code==200):
60         return
61     else:
62         raise ValueError
63
64
57af9d 65
1d3d12 66 def send_error_report(addr,cid, rid,errcode):
SP 67     client_data={'client_id':cid, 'run_id':rid, 'error_code':errcode}
68     response=requests.post(addr+"/api/reporterr/", data=client_data)
69     if(response.status_code==200):
70         return
71     else:
72         raise ValueError
73
57af9d 74 def upload(addr,cid, rid, vtu, status):
SP 75     client_data={'client_id': cid, 'run_id': rid, 'lastVTU': vtu, 'status': status}
76     response=requests.post(addr+"/api/upload/", data=client_data)
77     if(response.status_code==200):
78         return
79     else:
80         raise ValueError
81
82 def getNewVTU(directory):
83     fset=set()
84     for file in os.listdir(directory):
85         if file.endswith(".vtu") and file.startswith("timestep_"):
86             fset.add(file)
87     return fset
88
89
90 def removeDir(directory):
91     os.chdir('/')
92     try:
93         shutil.rmtree(directory)
94     except:
95         print("Cannot remove directory "+directory+ "\n")
96     return
97
e08bff 98                     
57af9d 99
e08bff 100
SP 101 class ClientThread(Thread):
102
103     def __init__(self,conn_address='http://beti.trisurf.eu',subid=0, update_seconds=100):
104         super(ClientThread,self).__init__()
105         self._stop_event = Event()
106         self._stop_event.clear()
107         self.p=None
108         self.workingdir=None
109         self.conn_address=conn_address
110         self.id=subid
111         self.ip=get_ip()
112         self.hostname=get_hostname()
113         self.update_seconds=update_seconds
114
115
116     def stop(self):
117         self._stop_event.set()
118
119     def isStopped(self):
120         return self._stop_event.is_set()
121
122     def join(self):
123         print('joining threads')
124         super(ClientThread, self).join()
125         if self.p is not None:
126             self.p.terminate()
127         if self.workingdir is not None:
128             removeDir(self.workingdir.fullpath())
129
130
131     def sleep(self,s):
132         for i in range(0, s):
133             if(self.isStopped()):
134                 return False
135             sleep(1)
136         return True
137     
138     def run(self):
139         while(not self.isStopped()):
140             try:
141                 cid=get_client_id(self.conn_address, self.ip, self.hostname, self.id)
142             except:
143                 print("[{}] Could not get CID.".format(self.id))
144                 self.sleep(10)
145                 continue
146             #print("Got CID. getting RID.")
147             client_ping_time_elapsed=0
148             while(not self.isStopped()):
149                 try:
150                     (rid,tape,vtu,status)=get_run(self.conn_address,cid)
151                 except NameError:
152                     print("[{}] Could not get RID.".format(self.id))
153                     self.sleep(10)
154                     client_ping_time_elapsed+=10
155                     if(client_ping_time_elapsed>=250):
1d3d12 156                         try:
e08bff 157                             client_ping(self.conn_address,cid)
1d3d12 158                         except:
e08bff 159                             break
SP 160                         client_ping_time_elapsed=0
161                     #if you put break instead of continue, there is no need to ping client. And it is more robust... 
162                     continue
163                 except ValueError:
164                     print("[{}] Wrong CID? Getting new CID.".format(self.id))
165                     #self.sleep(10)
57af9d 166                     break
e08bff 167                 except:
SP 168                     print("[{}] Cannot connect. Server down? Retrying....".format(self.id))
169                     break
170                 else:
171                     #start separate thread with simulations.
172                     self.workingdir=Directory('/tmp/ts_'+str(uuid.uuid4()))
173                     self.workingdir.makeifnotexist()
174                     self.workingdir.goto()
175                     with open(self.workingdir.fullpath()+"/tape", 'w') as f:
176                         f.write(tape)
177                     if(int(status)==-1):
178                         cmd=['trisurf', '--force-from-tape']
179                         print("[{}] Run id={} :: Starting from tape.".format(self.id, rid))
180                     else:
181                         with open(self.workingdir.fullpath()+"/.status",'w') as f:
182                             f.write(status)
183                         with open(self.workingdir.fullpath()+"/initial.vtu",'w') as f:
184                             f.write(vtu)
185                         cmd=['trisurf', '--restore-from-vtk', 'initial.vtu']
186                         print("[{}] Run id={} :: Restoring from vtk, last timestep {}".format(self.id,rid,status))
187                     self.p=subprocess.Popen(cmd, stdout=subprocess.DEVNULL)
188                     s=int(status)
189                     while(not self.isStopped()):
190                         #monitor for new file. If file is present, upload it!
191                         newVTU=getNewVTU(self.workingdir.fullpath())
192                         if newVTU: #upload
193                             try:
194                                 for nv in sorted(newVTU):
195                                     with open(nv,'r') as f:
196                                         fc=f.read()
197                                     s=s+1
198                                     print('[{}] Uploading {}.'.format(self.id,nv))
199                                     upload(self.conn_address, cid, rid, fc, s)
200                                     os.unlink(nv)
201                             except:
202                                 print("[{}] Could not upload".format(self.id))
203                                 self.p.terminate()
204                                 removeDir(self.workingdir.fullpath())
205                                 self.p=None
206                                 self.workingdir=None
207                                 break
208                             else:
209                                 print("[{}] VTU uploaded.".format(self.id))
210                         else: #ping
211                             try:
212                                 ping_run(self.conn_address, cid, rid)
213                             except:
214                                 print("[{}] Could not ping.".format(self.id))
215                                 self.p.terminate()
216                                 self.p=None
217                                 removeDir(self.workingdir.fullpath())
218                                 self.workingdir=None
219                                 #stop simulations
220                                 break
221                         #check if trisurf is still running. If not break the innermost loop.
222                         sleep(1)
223                         if(self.p.poll() is not None): # trisurf exited!
224                             print("[{}] Trisurf was stopped with return code {}".format(self.id, self.p.returncode))
225                             if(self.p.returncode>0):
226                                 try:
227                                     send_error_report(self.conn_address, cid, rid, self.p.returncode)
228                                 except:
229                                     print("[{}] Server didn't accept error report".format(self.id))
230                             removeDir(self.workingdir.fullpath())
231                             self.workingdir=None
232                             self.p=None
233                             break
234                         self.sleep(self.update_seconds-1)
57af9d 235
SP 236
e08bff 237
SP 238 #Stolen from trisurf library... Therefore, this client is not dependent on the installation of the library.
239 class Directory:
240     '''
241     Class deals with the paths where the simulation is run and data is stored.
242     '''
243     def __init__(self, maindir=".", simdir=""):
244         '''Initialization Directory() takes two optional parameters, namely maindir and simdir. Defaults to current directory. It sets local variables maindir and simdir accordingly.'''
245         self.maindir=maindir
246         self.simdir=simdir
247         return
248
249     def fullpath(self):
250         '''
251         Method returns string of path where the data is stored. It combines values of maindir and simdir as maindir/simdir on Unix.
252         '''
253         return os.path.join(self.maindir,self.simdir)
254
255     def exists(self):
256         ''' Method checks whether the directory  specified by fullpath() exists. It return True/False on completion.'''
257         path=self.fullpath()
258         if(os.path.exists(path)):
259             return True
260         else:
261             return False
262
263     def make(self):
264         ''' Method make() creates directory. If it fails it exits the program with error message.'''
265         try:
266             os.makedirs(self.fullpath())
267         except:
268             print("Cannot make directory "+self.fullpath()+"\n")
269             exit(1)
270         return
271
272     def makeifnotexist(self):
273         '''Method makeifnotexist() creates directory if it does not exist.'''
274         if(self.exists()==0):
275             self.make()
276             return True
277         else:
278             return False
279
280     def remove(self):
281         '''Method remove() removes directory recursively. WARNING! No questions asked.'''
282         if(self.exists()):
283             try:
284                 os.rmdir(self.fullpath())
285             except:
286                 print("Cannot remove directory "+self.fullpath()+ "\n")
287                 exit(1)
288         return
289
290     def goto(self):
291         '''
292         Method goto() moves current directory to the one specified by fullpath(). WARNING: when using the relative paths, do not call this function multiple times.
293         '''
294         try:
295             os.chdir(self.fullpath())
296         except:
297             print("Cannot go to directory "+self.fullpath()+"\n")
298         return
299
300
301
302 #--- SIGINT and SIGTERM HANDLING ---
303 def signal_handler(signal,frame):
304     t.stop()
305     t.join()
306     print("Process ended with signal " +str(signal))
307     sys.exit(signal)
308 #--- END SIGINT and SIGTERM----
309
310 if __name__ == '__main__':
311
312     signal.signal(signal.SIGINT, signal_handler)
313     signal.signal(signal.SIGTERM, signal_handler)
314
315     t=ClientThread(update_seconds=100)
316     t.start()
317     #t.join()
318     #print("main")
319     while(True):
320         sleep(1000)