Samo Penic
2018-05-05 eccf3a5f7218be8d6a10dbdc5a8202c26341c75f
commit | author | age
4db06c 1 #!/usr/bin/python3
SP 2 import requests
3 import json
4 from time import sleep
5 import uuid
6 import subprocess
7 import os
8 import shutil
9 import signal
10 import sys
11 import socket
12 from threading import Thread, Event
13
14 def get_hostname():
15     return socket.gethostname()
16
17 def get_ip():
18     return ((([ip for ip in socket.gethostbyname_ex(socket.gethostname())[2] if not ip.startswith("127.")] or [[(s.connect(("8.8.8.8", 53)), s.getsockname()[0], s.close()) for s in [socket.socket(socket.AF_INET, socket.SOCK_DGRAM)]][0][1]]) + ["no IP found"])[0])
19
20 def get_client_id(addr, my_ip, my_hostname):
21     client_auth={'ip':my_ip,'hostname':my_hostname}
22     response=requests.post(addr+"/api/register/", data=client_auth)
23     if(response.status_code==200):
24         client_data=json.loads(response.text)
25         client_id=client_data['id']
26         return client_id
27     else:
28         raise ValueError
29
30
31 def get_run(addr,cid):
32     response=requests.get(addr+"/api/getrun/"+str(cid)+"/")
33     if(response.status_code==200):
34         client_data=json.loads(response.text)
35         rid=client_data['id']
36         tape=client_data['tape']
37         vtu=client_data['lastVTU']
38         status=client_data['status']
39         return (rid,tape,vtu,status)
40     else:
41         print(response.text)
42         raise ValueError
43
44
45 def ping_run(addr,cid, rid):
46     client_data={'client_id':cid, 'run_id':rid}
47     response=requests.post(addr+"/api/ping/", data=client_data)
48     if(response.status_code==200):
49         return
50     else:
51         raise ValueError
52
53 def send_error_report(addr,cid, rid,errcode):
54     client_data={'client_id':cid, 'run_id':rid, 'error_code':errcode}
55     response=requests.post(addr+"/api/reporterr/", data=client_data)
56     if(response.status_code==200):
57         return
58     else:
59         raise ValueError
60
61 def upload(addr,cid, rid, vtu, status):
62     client_data={'client_id': cid, 'run_id': rid, 'lastVTU': vtu, 'status': status}
63     response=requests.post(addr+"/api/upload/", data=client_data)
64     if(response.status_code==200):
65         return
66     else:
67         raise ValueError
68
69 def getNewVTU(directory):
70     fset=set()
71     for file in os.listdir(directory):
72         if file.endswith(".vtu") and file.startswith("timestep_"):
73             fset.add(file)
74     return fset
75
76
77 def removeDir(directory):
78     os.chdir('/')
79     try:
80         shutil.rmtree(directory)
81     except:
82         print("Cannot remove directory "+directory+ "\n")
83     return
84
85                     
86
87
88 class ClientThread(Thread):
89
90     def __init__(self,conn_address='http://localhost:8000',subid=0, update_seconds=100):
91         super(ClientThread,self).__init__()
92         self._stop_event = Event()
93         self._stop_event.clear()
94         self.p=None
95         self.workingdir=None
96         self.conn_address=conn_address
97         self.id=subid
98         self.ip=get_ip()
99         self.hostname=get_hostname()
eccf3a 100         self.update_seconds=update_second
SP 101 #        self.subthreads=[]
4db06c 102
SP 103     def stop(self):
104         self._stop_event.set()
105
106     def isStopped(self):
107         return self._stop_event.is_set()
108
109     def join(self):
110         print('joining threads')
111         super(ClientThread, self).join()
112         if self.p is not None:
113             self.p.terminate()
114         if self.workingdir is not None:
115             removeDir(self.workingdir.fullpath())
116
117
118     def sleep(self,s):
119         for i in range(0, s):
120             if(self.isStopped()):
121                 return False
122             sleep(1)
123         return True
124     
125     def run(self):
126         while(not self.isStopped()):
127             try:
128                 cid=get_client_id(self.conn_address, self.ip, self.hostname)
129             except:
130                 print("[{}] Could not get CID.".format(self.id))
131                 self.sleep(10)
132                 continue
133             #print("Got CID. getting RID.")
134             while(not self.isStopped()):
135                 try:
136                     (rid,tape,vtu,status)=get_run(self.conn_address,cid)
137                 except:
138                     print("[{}] Could not get RID.".format(self.id))
139                     self.sleep(10)    
140                     break
141                 else:
142                     #start separate thread with simulations.
143                     self.workingdir=Directory('/tmp/ts_'+str(uuid.uuid4()))
144                     self.workingdir.makeifnotexist()
145                     self.workingdir.goto()
146                     with open(self.workingdir.fullpath()+"/tape", 'w') as f:
147                         f.write(tape)
148                     if(int(status)==-1):
149                         cmd=['trisurf', '--force-from-tape']
150                         print("[{}] Run id={} :: Starting from tape.".format(self.id, rid))
151                     else:
152                         with open(self.workingdir.fullpath()+"/.status",'w') as f:
153                             f.write(status)
154                         with open(self.workingdir.fullpath()+"/initial.vtu",'w') as f:
155                             f.write(vtu)
156                         cmd=['trisurf', '--restore-from-vtk', 'initial.vtu']
157                         print("[{}] Run id={} :: Restoring from vtk, last timestep {}".format(self.id,rid,status))
158                     self.p=subprocess.Popen(cmd, stdout=subprocess.DEVNULL)
159                     s=int(status)
160                     while(not self.isStopped()):
161                         #monitor for new file. If file is present, upload it!
162                         newVTU=getNewVTU(self.workingdir.fullpath())
163                         if newVTU: #upload
164                             try:
165                                 for nv in sorted(newVTU):
166                                     with open(nv,'r') as f:
167                                         fc=f.read()
168                                     s=s+1
169                                     print('[{}] Uploading {}.'.format(self.id,nv))
170                                     upload(self.conn_address, cid, rid, fc, s)
171                                     os.unlink(nv)
172                             except:
173                                 print("[{}] Could not upload".format(self.id))
174                                 self.p.terminate()
175                                 removeDir(self.workingdir.fullpath())
176                                 self.p=None
177                                 self.workingdir=None
178                                 break
179                             else:
180                                 print("[{}] VTU uploaded.".format(self.id))
181                         else: #ping
182                             try:
183                                 ping_run(self.conn_address, cid, rid)
184                             except:
185                                 print("[{}] Could not ping.".format(self.id))
186                                 self.p.terminate()
187                                 self.p=None
188                                 removeDir(self.workingdir.fullpath())
189                                 self.workingdir=None
190                                 #stop simulations
191                                 break
192                         #check if trisurf is still running. If not break the innermost loop.
193                         sleep(1)
194                         if(self.p.poll() is not None): # trisurf exited!
195                             print("[{}] Trisurf was stopped with return code {}".format(self.id, self.p.returncode))
196                             if(self.p.returncode>0):
197                                 try:
198                                     send_error_report(self.conn_address, cid, rid, self.p.returncode)
199                                 except:
200                                     print("[{}] Server didn't accept error report".format(self.id))
201                             removeDir(self.workingdir.fullpath())
202                             self.workingdir=None
203                             self.p=None
204                             break
205                         self.sleep(self.update_seconds-1)
206
207
208
209 #Stolen from trisurf library... Therefore, this client is not dependent on the installation of the library.
210 class Directory:
211     '''
212     Class deals with the paths where the simulation is run and data is stored.
213     '''
214     def __init__(self, maindir=".", simdir=""):
215         '''Initialization Directory() takes two optional parameters, namely maindir and simdir. Defaults to current directory. It sets local variables maindir and simdir accordingly.'''
216         self.maindir=maindir
217         self.simdir=simdir
218         return
219
220     def fullpath(self):
221         '''
222         Method returns string of path where the data is stored. It combines values of maindir and simdir as maindir/simdir on Unix.
223         '''
224         return os.path.join(self.maindir,self.simdir)
225
226     def exists(self):
227         ''' Method checks whether the directory  specified by fullpath() exists. It return True/False on completion.'''
228         path=self.fullpath()
229         if(os.path.exists(path)):
230             return True
231         else:
232             return False
233
234     def make(self):
235         ''' Method make() creates directory. If it fails it exits the program with error message.'''
236         try:
237             os.makedirs(self.fullpath())
238         except:
239             print("Cannot make directory "+self.fullpath()+"\n")
240             exit(1)
241         return
242
243     def makeifnotexist(self):
244         '''Method makeifnotexist() creates directory if it does not exist.'''
245         if(self.exists()==0):
246             self.make()
247             return True
248         else:
249             return False
250
251     def remove(self):
252         '''Method remove() removes directory recursively. WARNING! No questions asked.'''
253         if(self.exists()):
254             try:
255                 os.rmdir(self.fullpath())
256             except:
257                 print("Cannot remove directory "+self.fullpath()+ "\n")
258                 exit(1)
259         return
260
261     def goto(self):
262         '''
263         Method goto() moves current directory to the one specified by fullpath(). WARNING: when using the relative paths, do not call this function multiple times.
264         '''
265         try:
266             os.chdir(self.fullpath())
267         except:
268             print("Cannot go to directory "+self.fullpath()+"\n")
269         return
270
271
272
273 #--- SIGINT and SIGTERM HANDLING ---
274 def signal_handler(signal,frame):
275     t.stop()
276     t.join()
277     print("Process ended with signal " +str(signal))
eccf3a 278     sys.exit(signal)
4db06c 279 #--- END SIGINT and SIGTERM----
SP 280
281 if __name__ == '__main__':
282
283     signal.signal(signal.SIGINT, signal_handler)
284     signal.signal(signal.SIGTERM, signal_handler)
285
286     t=ClientThread(update_seconds=100)
287     t.start()
288     print("main")
289     while(True):
290         sleep(1000)