Samo Penic
2018-05-05 4db06c65975abc8aa75f6b82367d564ad7727a12
commit | author | age
4db06c 1 #!/usr/bin/python3
SP 2 import requests
3 import json
4 from time import sleep
5 import uuid
6 import subprocess
7 #from trisurf import trisurf
8 import os
9 import shutil
10 import signal
11 import sys
12 import socket
13 from threading import Thread, Event
14 """
15 p=None
16 workingdir=None
17
18
19
20 #--- SIGINT and SIGTERM HANDLING ---
21 def signal_handler(signal,frame):
22     global p
23     global wirkingdir
24     if p is not None:
25         p.terminate()
26     if(workingdir is not None):
27         removeDir(workingdir.fullpath())
28     print("Process ended with signal " +str(signal))
29     sys.exit(0)
30 signal.signal(signal.SIGINT, signal_handler)
31 signal.signal(signal.SIGTERM, signal_handler)
32 #--- END SIGINT and SIGTERM----
33 """
34
35
36 def get_hostname():
37     return socket.gethostname()
38
39 def get_ip():
40     return ((([ip for ip in socket.gethostbyname_ex(socket.gethostname())[2] if not ip.startswith("127.")] or [[(s.connect(("8.8.8.8", 53)), s.getsockname()[0], s.close()) for s in [socket.socket(socket.AF_INET, socket.SOCK_DGRAM)]][0][1]]) + ["no IP found"])[0])
41
42 def get_client_id(addr, my_ip, my_hostname):
43     client_auth={'ip':my_ip,'hostname':my_hostname}
44     response=requests.post(addr+"/api/register/", data=client_auth)
45     if(response.status_code==200):
46         client_data=json.loads(response.text)
47         client_id=client_data['id']
48         return client_id
49     else:
50         raise ValueError
51
52
53 def get_run(addr,cid):
54     response=requests.get(addr+"/api/getrun/"+str(cid)+"/")
55     if(response.status_code==200):
56         client_data=json.loads(response.text)
57         rid=client_data['id']
58         tape=client_data['tape']
59         vtu=client_data['lastVTU']
60         status=client_data['status']
61         return (rid,tape,vtu,status)
62     else:
63         print(response.text)
64         raise ValueError
65
66
67 def ping_run(addr,cid, rid):
68     client_data={'client_id':cid, 'run_id':rid}
69     response=requests.post(addr+"/api/ping/", data=client_data)
70     if(response.status_code==200):
71         return
72     else:
73         raise ValueError
74
75 def send_error_report(addr,cid, rid,errcode):
76     client_data={'client_id':cid, 'run_id':rid, 'error_code':errcode}
77     response=requests.post(addr+"/api/reporterr/", data=client_data)
78     if(response.status_code==200):
79         return
80     else:
81         raise ValueError
82
83 def upload(addr,cid, rid, vtu, status):
84     client_data={'client_id': cid, 'run_id': rid, 'lastVTU': vtu, 'status': status}
85     response=requests.post(addr+"/api/upload/", data=client_data)
86     if(response.status_code==200):
87         return
88     else:
89         raise ValueError
90
91 def getNewVTU(directory):
92     fset=set()
93     for file in os.listdir(directory):
94         if file.endswith(".vtu") and file.startswith("timestep_"):
95             fset.add(file)
96     return fset
97
98
99 def removeDir(directory):
100     os.chdir('/')
101     try:
102         shutil.rmtree(directory)
103     except:
104         print("Cannot remove directory "+directory+ "\n")
105     return
106
107                     
108
109
110 class ClientThread(Thread):
111
112     def __init__(self,conn_address='http://localhost:8000',subid=0, update_seconds=100):
113         super(ClientThread,self).__init__()
114         self._stop_event = Event()
115         self._stop_event.clear()
116         self.p=None
117         self.workingdir=None
118         self.conn_address=conn_address
119         self.id=subid
120         self.ip=get_ip()
121         self.hostname=get_hostname()
122         self.update_seconds=update_seconds
123
124     def stop(self):
125         self._stop_event.set()
126
127     def isStopped(self):
128         return self._stop_event.is_set()
129
130     def join(self):
131         print('joining threads')
132         super(ClientThread, self).join()
133         if self.p is not None:
134             self.p.terminate()
135         if self.workingdir is not None:
136             removeDir(self.workingdir.fullpath())
137
138
139     def sleep(self,s):
140         for i in range(0, s):
141             if(self.isStopped()):
142                 return False
143             sleep(1)
144         return True
145     
146     def run(self):
147         while(not self.isStopped()):
148             try:
149                 cid=get_client_id(self.conn_address, self.ip, self.hostname)
150             except:
151                 print("[{}] Could not get CID.".format(self.id))
152                 self.sleep(10)
153                 continue
154             #print("Got CID. getting RID.")
155             while(not self.isStopped()):
156                 try:
157                     (rid,tape,vtu,status)=get_run(self.conn_address,cid)
158                 except:
159                     print("[{}] Could not get RID.".format(self.id))
160                     self.sleep(10)    
161                     break
162                 else:
163                     #start separate thread with simulations.
164                     self.workingdir=Directory('/tmp/ts_'+str(uuid.uuid4()))
165                     self.workingdir.makeifnotexist()
166                     self.workingdir.goto()
167                     with open(self.workingdir.fullpath()+"/tape", 'w') as f:
168                         f.write(tape)
169                     if(int(status)==-1):
170                         cmd=['trisurf', '--force-from-tape']
171                         print("[{}] Run id={} :: Starting from tape.".format(self.id, rid))
172                     else:
173                         with open(self.workingdir.fullpath()+"/.status",'w') as f:
174                             f.write(status)
175                         with open(self.workingdir.fullpath()+"/initial.vtu",'w') as f:
176                             f.write(vtu)
177                         cmd=['trisurf', '--restore-from-vtk', 'initial.vtu']
178                         print("[{}] Run id={} :: Restoring from vtk, last timestep {}".format(self.id,rid,status))
179                     self.p=subprocess.Popen(cmd, stdout=subprocess.DEVNULL)
180                     s=int(status)
181                     while(not self.isStopped()):
182                         #monitor for new file. If file is present, upload it!
183                         newVTU=getNewVTU(self.workingdir.fullpath())
184                         if newVTU: #upload
185                             try:
186                                 for nv in sorted(newVTU):
187                                     with open(nv,'r') as f:
188                                         fc=f.read()
189                                     s=s+1
190                                     print('[{}] Uploading {}.'.format(self.id,nv))
191                                     upload(self.conn_address, cid, rid, fc, s)
192                                     os.unlink(nv)
193                             except:
194                                 print("[{}] Could not upload".format(self.id))
195                                 self.p.terminate()
196                                 removeDir(self.workingdir.fullpath())
197                                 self.p=None
198                                 self.workingdir=None
199                                 break
200                             else:
201                                 print("[{}] VTU uploaded.".format(self.id))
202                         else: #ping
203                             try:
204                                 ping_run(self.conn_address, cid, rid)
205                             except:
206                                 print("[{}] Could not ping.".format(self.id))
207                                 self.p.terminate()
208                                 self.p=None
209                                 removeDir(self.workingdir.fullpath())
210                                 self.workingdir=None
211                                 #stop simulations
212                                 break
213                         #check if trisurf is still running. If not break the innermost loop.
214                         sleep(1)
215                         if(self.p.poll() is not None): # trisurf exited!
216                             print("[{}] Trisurf was stopped with return code {}".format(self.id, self.p.returncode))
217                             if(self.p.returncode>0):
218                                 try:
219                                     send_error_report(self.conn_address, cid, rid, self.p.returncode)
220                                 except:
221                                     print("[{}] Server didn't accept error report".format(self.id))
222                             removeDir(self.workingdir.fullpath())
223                             self.workingdir=None
224                             self.p=None
225                             break
226                         self.sleep(self.update_seconds-1)
227
228
229
230 #Stolen from trisurf library... Therefore, this client is not dependent on the installation of the library.
231 class Directory:
232     '''
233     Class deals with the paths where the simulation is run and data is stored.
234     '''
235     def __init__(self, maindir=".", simdir=""):
236         '''Initialization Directory() takes two optional parameters, namely maindir and simdir. Defaults to current directory. It sets local variables maindir and simdir accordingly.'''
237         self.maindir=maindir
238         self.simdir=simdir
239         return
240
241     def fullpath(self):
242         '''
243         Method returns string of path where the data is stored. It combines values of maindir and simdir as maindir/simdir on Unix.
244         '''
245         return os.path.join(self.maindir,self.simdir)
246
247     def exists(self):
248         ''' Method checks whether the directory  specified by fullpath() exists. It return True/False on completion.'''
249         path=self.fullpath()
250         if(os.path.exists(path)):
251             return True
252         else:
253             return False
254
255     def make(self):
256         ''' Method make() creates directory. If it fails it exits the program with error message.'''
257         try:
258             os.makedirs(self.fullpath())
259         except:
260             print("Cannot make directory "+self.fullpath()+"\n")
261             exit(1)
262         return
263
264     def makeifnotexist(self):
265         '''Method makeifnotexist() creates directory if it does not exist.'''
266         if(self.exists()==0):
267             self.make()
268             return True
269         else:
270             return False
271
272     def remove(self):
273         '''Method remove() removes directory recursively. WARNING! No questions asked.'''
274         if(self.exists()):
275             try:
276                 os.rmdir(self.fullpath())
277             except:
278                 print("Cannot remove directory "+self.fullpath()+ "\n")
279                 exit(1)
280         return
281
282     def goto(self):
283         '''
284         Method goto() moves current directory to the one specified by fullpath(). WARNING: when using the relative paths, do not call this function multiple times.
285         '''
286         try:
287             os.chdir(self.fullpath())
288         except:
289             print("Cannot go to directory "+self.fullpath()+"\n")
290         return
291
292
293
294
295 #--- SIGINT and SIGTERM HANDLING ---
296 def signal_handler(signal,frame):
297     t.stop()
298     t.join()
299     print("Process ended with signal " +str(signal))
300     sys.exit(0)
301 #--- END SIGINT and SIGTERM----
302
303 if __name__ == '__main__':
304
305     signal.signal(signal.SIGINT, signal_handler)
306     signal.signal(signal.SIGTERM, signal_handler)
307
308     t=ClientThread(update_seconds=100)
309     t.start()
310     print("main")
311     while(True):
312         sleep(1000)