From e08bfff865c6deff8251d71d30cf9c3ec62c7fac Mon Sep 17 00:00:00 2001 From: Samo Penic <samo.penic@gmail.com> Date: Sat, 02 Jun 2018 20:32:21 +0000 Subject: [PATCH] changed the directory --- tsclient.py | 332 +++++++++++++++++++++++++++++++++++++++---------------- 1 files changed, 234 insertions(+), 98 deletions(-) diff --git a/tsclient.py b/tsclient.py index 492badc..6d2be9e 100755 --- a/tsclient.py +++ b/tsclient.py @@ -4,34 +4,12 @@ from time import sleep import uuid import subprocess -from trisurf import trisurf import os import shutil import signal import sys import socket -CONNECT_ADDR='http://localhost:8000' - -p=None -workingdir=None - - - -#--- SIGINT and SIGTERM HANDLING --- -def signal_handler(signal,frame): - global p - global wirkingdir - if p is not None: - p.terminate() - if(workingdir is not None): - removeDir(workingdir.fullpath()) - print("Process ended with signal " +str(signal)) - sys.exit(0) -signal.signal(signal.SIGINT, signal_handler) -signal.signal(signal.SIGTERM, signal_handler) -#--- END SIGINT and SIGTERM---- - - +from threading import Thread, Event def get_hostname(): return socket.gethostname() @@ -39,8 +17,8 @@ def get_ip(): return ((([ip for ip in socket.gethostbyname_ex(socket.gethostname())[2] if not ip.startswith("127.")] or [[(s.connect(("8.8.8.8", 53)), s.getsockname()[0], s.close()) for s in [socket.socket(socket.AF_INET, socket.SOCK_DGRAM)]][0][1]]) + ["no IP found"])[0]) -def get_client_id(addr, my_ip, my_hostname): - client_auth={'ip':my_ip,'hostname':my_hostname} +def get_client_id(addr, my_ip, my_hostname, subrun): + client_auth={'ip':my_ip,'hostname':my_hostname, 'subrun':subrun} response=requests.post(addr+"/api/register/", data=client_auth) if(response.status_code==200): client_data=json.loads(response.text) @@ -61,7 +39,10 @@ return (rid,tape,vtu,status) else: print(response.text) - raise ValueError + if(response.status_code==400): + raise ValueError + else: + raise NameError def ping_run(addr,cid, rid): @@ -71,6 +52,16 @@ return else: raise ValueError + +def client_ping(addr,cid): + client_data={'client_id':cid} + response=requests.post(addr+"/api/pingclient/", data=client_data) + if(response.status_code==200): + return + else: + raise ValueError + + def send_error_report(addr,cid, rid,errcode): client_data={'client_id':cid, 'run_id':rid, 'error_code':errcode} @@ -104,81 +95,226 @@ print("Cannot remove directory "+directory+ "\n") return + -while(True): - try: - cid=get_client_id(CONNECT_ADDR, get_ip(),get_hostname()) - except: - print("Cannot get CID.") - sleep(10) - continue - print("Got CID. getting RID.") - while(True): - try: - (rid,tape,vtu,status)=get_run(CONNECT_ADDR,cid) - except: - print("Could not get RID.") - sleep(10) - break - else: - #start separate thread with simulations. - workingdir=trisurf.Directory('/tmp/ts_'+str(uuid.uuid4())) - workingdir.makeifnotexist() - workingdir.goto() - with open(workingdir.fullpath()+"/tape", 'w') as f: - f.write(tape) - if(int(status)==-1): - cmd=['trisurf', '--force-from-tape'] - print("Run id="+str(rid)+ " :: Starting from tape") - else: - with open(workingdir.fullpath()+"/.status",'w') as f: - f.write(status) - with open(workingdir.fullpath()+"/initial.vtu",'w') as f: - f.write(vtu) - cmd=['trisurf', '--restore-from-vtk', 'initial.vtu'] - print("Run id="+str(rid)+ " :: Restoring from vtk, last timestep "+status) - p=subprocess.Popen(cmd, stdout=subprocess.DEVNULL) - s=int(status) - while(True): - #monitor for new file. If file is present, upload it! - newVTU=getNewVTU(workingdir.fullpath()) - if newVTU: #upload - try: - for nv in sorted(newVTU): - with open(nv,'r') as f: - fc=f.read() - s=s+1 - print('Uploading '+nv) - upload(CONNECT_ADDR, cid, rid, fc, s) - os.unlink(nv) - except: - print("Could not upload") - p.terminate() - removeDir(workingdir.fullpath()) - break - else: - print("VTU uploaded") - else: #ping - try: - ping_run(CONNECT_ADDR, cid, rid) - except: - print("Could not ping") - p.terminate() - removeDir(workingdir.fullpath()) - #stop simulations - break - #check if trisurf is still running. If not break the highest level loop. - sleep(1) - if(p.poll() is not None): # trisurf exited! - print("Trisurf was stopped with return code {}".format(p.returncode)) - if(p.returncode>0): + +class ClientThread(Thread): + + def __init__(self,conn_address='http://beti.trisurf.eu',subid=0, update_seconds=100): + super(ClientThread,self).__init__() + self._stop_event = Event() + self._stop_event.clear() + self.p=None + self.workingdir=None + self.conn_address=conn_address + self.id=subid + self.ip=get_ip() + self.hostname=get_hostname() + self.update_seconds=update_seconds + + + def stop(self): + self._stop_event.set() + + def isStopped(self): + return self._stop_event.is_set() + + def join(self): + print('joining threads') + super(ClientThread, self).join() + if self.p is not None: + self.p.terminate() + if self.workingdir is not None: + removeDir(self.workingdir.fullpath()) + + + def sleep(self,s): + for i in range(0, s): + if(self.isStopped()): + return False + sleep(1) + return True + + def run(self): + while(not self.isStopped()): + try: + cid=get_client_id(self.conn_address, self.ip, self.hostname, self.id) + except: + print("[{}] Could not get CID.".format(self.id)) + self.sleep(10) + continue + #print("Got CID. getting RID.") + client_ping_time_elapsed=0 + while(not self.isStopped()): + try: + (rid,tape,vtu,status)=get_run(self.conn_address,cid) + except NameError: + print("[{}] Could not get RID.".format(self.id)) + self.sleep(10) + client_ping_time_elapsed+=10 + if(client_ping_time_elapsed>=250): try: - send_error_report(CONNECT_ADDR, cid, rid, p.returncode) + client_ping(self.conn_address,cid) except: - print("Server didn't accept error report") - removeDir(workingdir.fullpath()) + break + client_ping_time_elapsed=0 + #if you put break instead of continue, there is no need to ping client. And it is more robust... + continue + except ValueError: + print("[{}] Wrong CID? Getting new CID.".format(self.id)) + #self.sleep(10) break - sleep(100) + except: + print("[{}] Cannot connect. Server down? Retrying....".format(self.id)) + break + else: + #start separate thread with simulations. + self.workingdir=Directory('/tmp/ts_'+str(uuid.uuid4())) + self.workingdir.makeifnotexist() + self.workingdir.goto() + with open(self.workingdir.fullpath()+"/tape", 'w') as f: + f.write(tape) + if(int(status)==-1): + cmd=['trisurf', '--force-from-tape'] + print("[{}] Run id={} :: Starting from tape.".format(self.id, rid)) + else: + with open(self.workingdir.fullpath()+"/.status",'w') as f: + f.write(status) + with open(self.workingdir.fullpath()+"/initial.vtu",'w') as f: + f.write(vtu) + cmd=['trisurf', '--restore-from-vtk', 'initial.vtu'] + print("[{}] Run id={} :: Restoring from vtk, last timestep {}".format(self.id,rid,status)) + self.p=subprocess.Popen(cmd, stdout=subprocess.DEVNULL) + s=int(status) + while(not self.isStopped()): + #monitor for new file. If file is present, upload it! + newVTU=getNewVTU(self.workingdir.fullpath()) + if newVTU: #upload + try: + for nv in sorted(newVTU): + with open(nv,'r') as f: + fc=f.read() + s=s+1 + print('[{}] Uploading {}.'.format(self.id,nv)) + upload(self.conn_address, cid, rid, fc, s) + os.unlink(nv) + except: + print("[{}] Could not upload".format(self.id)) + self.p.terminate() + removeDir(self.workingdir.fullpath()) + self.p=None + self.workingdir=None + break + else: + print("[{}] VTU uploaded.".format(self.id)) + else: #ping + try: + ping_run(self.conn_address, cid, rid) + except: + print("[{}] Could not ping.".format(self.id)) + self.p.terminate() + self.p=None + removeDir(self.workingdir.fullpath()) + self.workingdir=None + #stop simulations + break + #check if trisurf is still running. If not break the innermost loop. + sleep(1) + if(self.p.poll() is not None): # trisurf exited! + print("[{}] Trisurf was stopped with return code {}".format(self.id, self.p.returncode)) + if(self.p.returncode>0): + try: + send_error_report(self.conn_address, cid, rid, self.p.returncode) + except: + print("[{}] Server didn't accept error report".format(self.id)) + removeDir(self.workingdir.fullpath()) + self.workingdir=None + self.p=None + break + self.sleep(self.update_seconds-1) - + +#Stolen from trisurf library... Therefore, this client is not dependent on the installation of the library. +class Directory: + ''' + Class deals with the paths where the simulation is run and data is stored. + ''' + def __init__(self, maindir=".", simdir=""): + '''Initialization Directory() takes two optional parameters, namely maindir and simdir. Defaults to current directory. It sets local variables maindir and simdir accordingly.''' + self.maindir=maindir + self.simdir=simdir + return + + def fullpath(self): + ''' + Method returns string of path where the data is stored. It combines values of maindir and simdir as maindir/simdir on Unix. + ''' + return os.path.join(self.maindir,self.simdir) + + def exists(self): + ''' Method checks whether the directory specified by fullpath() exists. It return True/False on completion.''' + path=self.fullpath() + if(os.path.exists(path)): + return True + else: + return False + + def make(self): + ''' Method make() creates directory. If it fails it exits the program with error message.''' + try: + os.makedirs(self.fullpath()) + except: + print("Cannot make directory "+self.fullpath()+"\n") + exit(1) + return + + def makeifnotexist(self): + '''Method makeifnotexist() creates directory if it does not exist.''' + if(self.exists()==0): + self.make() + return True + else: + return False + + def remove(self): + '''Method remove() removes directory recursively. WARNING! No questions asked.''' + if(self.exists()): + try: + os.rmdir(self.fullpath()) + except: + print("Cannot remove directory "+self.fullpath()+ "\n") + exit(1) + return + + def goto(self): + ''' + Method goto() moves current directory to the one specified by fullpath(). WARNING: when using the relative paths, do not call this function multiple times. + ''' + try: + os.chdir(self.fullpath()) + except: + print("Cannot go to directory "+self.fullpath()+"\n") + return + + + +#--- SIGINT and SIGTERM HANDLING --- +def signal_handler(signal,frame): + t.stop() + t.join() + print("Process ended with signal " +str(signal)) + sys.exit(signal) +#--- END SIGINT and SIGTERM---- + +if __name__ == '__main__': + + signal.signal(signal.SIGINT, signal_handler) + signal.signal(signal.SIGTERM, signal_handler) + + t=ClientThread(update_seconds=100) + t.start() + #t.join() + #print("main") + while(True): + sleep(1000) -- Gitblit v1.9.3