#!/usr/bin/python3 import requests import json from time import sleep import uuid import subprocess #from trisurf import trisurf import os import shutil import signal import sys import socket from threading import Thread, Event """ p=None workingdir=None #--- SIGINT and SIGTERM HANDLING --- def signal_handler(signal,frame): global p global wirkingdir if p is not None: p.terminate() if(workingdir is not None): removeDir(workingdir.fullpath()) print("Process ended with signal " +str(signal)) sys.exit(0) signal.signal(signal.SIGINT, signal_handler) signal.signal(signal.SIGTERM, signal_handler) #--- END SIGINT and SIGTERM---- """ def get_hostname(): return socket.gethostname() def get_ip(): return ((([ip for ip in socket.gethostbyname_ex(socket.gethostname())[2] if not ip.startswith("127.")] or [[(s.connect(("8.8.8.8", 53)), s.getsockname()[0], s.close()) for s in [socket.socket(socket.AF_INET, socket.SOCK_DGRAM)]][0][1]]) + ["no IP found"])[0]) def get_client_id(addr, my_ip, my_hostname): client_auth={'ip':my_ip,'hostname':my_hostname} response=requests.post(addr+"/api/register/", data=client_auth) if(response.status_code==200): client_data=json.loads(response.text) client_id=client_data['id'] return client_id else: raise ValueError def get_run(addr,cid): response=requests.get(addr+"/api/getrun/"+str(cid)+"/") if(response.status_code==200): client_data=json.loads(response.text) rid=client_data['id'] tape=client_data['tape'] vtu=client_data['lastVTU'] status=client_data['status'] return (rid,tape,vtu,status) else: print(response.text) raise ValueError def ping_run(addr,cid, rid): client_data={'client_id':cid, 'run_id':rid} response=requests.post(addr+"/api/ping/", data=client_data) if(response.status_code==200): return else: raise ValueError def send_error_report(addr,cid, rid,errcode): client_data={'client_id':cid, 'run_id':rid, 'error_code':errcode} response=requests.post(addr+"/api/reporterr/", data=client_data) if(response.status_code==200): return else: raise ValueError def upload(addr,cid, rid, vtu, status): client_data={'client_id': cid, 'run_id': rid, 'lastVTU': vtu, 'status': status} response=requests.post(addr+"/api/upload/", data=client_data) if(response.status_code==200): return else: raise ValueError def getNewVTU(directory): fset=set() for file in os.listdir(directory): if file.endswith(".vtu") and file.startswith("timestep_"): fset.add(file) return fset def removeDir(directory): os.chdir('/') try: shutil.rmtree(directory) except: print("Cannot remove directory "+directory+ "\n") return class ClientThread(Thread): def __init__(self,conn_address='http://localhost:8000',subid=0, update_seconds=100): super(ClientThread,self).__init__() self._stop_event = Event() self._stop_event.clear() self.p=None self.workingdir=None self.conn_address=conn_address self.id=subid self.ip=get_ip() self.hostname=get_hostname() self.update_seconds=update_seconds def stop(self): self._stop_event.set() def isStopped(self): return self._stop_event.is_set() def join(self): print('joining threads') super(ClientThread, self).join() if self.p is not None: self.p.terminate() if self.workingdir is not None: removeDir(self.workingdir.fullpath()) def sleep(self,s): for i in range(0, s): if(self.isStopped()): return False sleep(1) return True def run(self): while(not self.isStopped()): try: cid=get_client_id(self.conn_address, self.ip, self.hostname) except: print("[{}] Could not get CID.".format(self.id)) self.sleep(10) continue #print("Got CID. getting RID.") while(not self.isStopped()): try: (rid,tape,vtu,status)=get_run(self.conn_address,cid) except: print("[{}] Could not get RID.".format(self.id)) self.sleep(10) break else: #start separate thread with simulations. self.workingdir=Directory('/tmp/ts_'+str(uuid.uuid4())) self.workingdir.makeifnotexist() self.workingdir.goto() with open(self.workingdir.fullpath()+"/tape", 'w') as f: f.write(tape) if(int(status)==-1): cmd=['trisurf', '--force-from-tape'] print("[{}] Run id={} :: Starting from tape.".format(self.id, rid)) else: with open(self.workingdir.fullpath()+"/.status",'w') as f: f.write(status) with open(self.workingdir.fullpath()+"/initial.vtu",'w') as f: f.write(vtu) cmd=['trisurf', '--restore-from-vtk', 'initial.vtu'] print("[{}] Run id={} :: Restoring from vtk, last timestep {}".format(self.id,rid,status)) self.p=subprocess.Popen(cmd, stdout=subprocess.DEVNULL) s=int(status) while(not self.isStopped()): #monitor for new file. If file is present, upload it! newVTU=getNewVTU(self.workingdir.fullpath()) if newVTU: #upload try: for nv in sorted(newVTU): with open(nv,'r') as f: fc=f.read() s=s+1 print('[{}] Uploading {}.'.format(self.id,nv)) upload(self.conn_address, cid, rid, fc, s) os.unlink(nv) except: print("[{}] Could not upload".format(self.id)) self.p.terminate() removeDir(self.workingdir.fullpath()) self.p=None self.workingdir=None break else: print("[{}] VTU uploaded.".format(self.id)) else: #ping try: ping_run(self.conn_address, cid, rid) except: print("[{}] Could not ping.".format(self.id)) self.p.terminate() self.p=None removeDir(self.workingdir.fullpath()) self.workingdir=None #stop simulations break #check if trisurf is still running. If not break the innermost loop. sleep(1) if(self.p.poll() is not None): # trisurf exited! print("[{}] Trisurf was stopped with return code {}".format(self.id, self.p.returncode)) if(self.p.returncode>0): try: send_error_report(self.conn_address, cid, rid, self.p.returncode) except: print("[{}] Server didn't accept error report".format(self.id)) removeDir(self.workingdir.fullpath()) self.workingdir=None self.p=None break self.sleep(self.update_seconds-1) #Stolen from trisurf library... Therefore, this client is not dependent on the installation of the library. class Directory: ''' Class deals with the paths where the simulation is run and data is stored. ''' def __init__(self, maindir=".", simdir=""): '''Initialization Directory() takes two optional parameters, namely maindir and simdir. Defaults to current directory. It sets local variables maindir and simdir accordingly.''' self.maindir=maindir self.simdir=simdir return def fullpath(self): ''' Method returns string of path where the data is stored. It combines values of maindir and simdir as maindir/simdir on Unix. ''' return os.path.join(self.maindir,self.simdir) def exists(self): ''' Method checks whether the directory specified by fullpath() exists. It return True/False on completion.''' path=self.fullpath() if(os.path.exists(path)): return True else: return False def make(self): ''' Method make() creates directory. If it fails it exits the program with error message.''' try: os.makedirs(self.fullpath()) except: print("Cannot make directory "+self.fullpath()+"\n") exit(1) return def makeifnotexist(self): '''Method makeifnotexist() creates directory if it does not exist.''' if(self.exists()==0): self.make() return True else: return False def remove(self): '''Method remove() removes directory recursively. WARNING! No questions asked.''' if(self.exists()): try: os.rmdir(self.fullpath()) except: print("Cannot remove directory "+self.fullpath()+ "\n") exit(1) return def goto(self): ''' Method goto() moves current directory to the one specified by fullpath(). WARNING: when using the relative paths, do not call this function multiple times. ''' try: os.chdir(self.fullpath()) except: print("Cannot go to directory "+self.fullpath()+"\n") return #--- SIGINT and SIGTERM HANDLING --- def signal_handler(signal,frame): t.stop() t.join() print("Process ended with signal " +str(signal)) sys.exit(0) #--- END SIGINT and SIGTERM---- if __name__ == '__main__': signal.signal(signal.SIGINT, signal_handler) signal.signal(signal.SIGTERM, signal_handler) t=ClientThread(update_seconds=100) t.start() print("main") while(True): sleep(1000)