From e08bfff865c6deff8251d71d30cf9c3ec62c7fac Mon Sep 17 00:00:00 2001 From: Samo Penic <samo.penic@gmail.com> Date: Sat, 02 Jun 2018 20:32:21 +0000 Subject: [PATCH] changed the directory --- /dev/null | 320 ----------------------------- tsclient.py | 332 +++++++++++++++++++++-------- 2 files changed, 234 insertions(+), 418 deletions(-) diff --git a/playground/tsclient.py b/playground/tsclient.py deleted file mode 100755 index 4b0724c..0000000 --- a/playground/tsclient.py +++ /dev/null @@ -1,320 +0,0 @@ -#!/usr/bin/python3 -import requests -import json -from time import sleep -import uuid -import subprocess -import os -import shutil -import signal -import sys -import socket -from threading import Thread, Event - -def get_hostname(): - return socket.gethostname() - -def get_ip(): - return ((([ip for ip in socket.gethostbyname_ex(socket.gethostname())[2] if not ip.startswith("127.")] or [[(s.connect(("8.8.8.8", 53)), s.getsockname()[0], s.close()) for s in [socket.socket(socket.AF_INET, socket.SOCK_DGRAM)]][0][1]]) + ["no IP found"])[0]) - -def get_client_id(addr, my_ip, my_hostname, subrun): - client_auth={'ip':my_ip,'hostname':my_hostname, 'subrun':subrun} - response=requests.post(addr+"/api/register/", data=client_auth) - if(response.status_code==200): - client_data=json.loads(response.text) - client_id=client_data['id'] - return client_id - else: - raise ValueError - - -def get_run(addr,cid): - response=requests.get(addr+"/api/getrun/"+str(cid)+"/") - if(response.status_code==200): - client_data=json.loads(response.text) - rid=client_data['id'] - tape=client_data['tape'] - vtu=client_data['lastVTU'] - status=client_data['status'] - return (rid,tape,vtu,status) - else: - print(response.text) - if(response.status_code==400): - raise ValueError - else: - raise NameError - - -def ping_run(addr,cid, rid): - client_data={'client_id':cid, 'run_id':rid} - response=requests.post(addr+"/api/ping/", data=client_data) - if(response.status_code==200): - return - else: - raise ValueError - -def client_ping(addr,cid): - client_data={'client_id':cid} - response=requests.post(addr+"/api/pingclient/", data=client_data) - if(response.status_code==200): - return - else: - raise ValueError - - - -def send_error_report(addr,cid, rid,errcode): - client_data={'client_id':cid, 'run_id':rid, 'error_code':errcode} - response=requests.post(addr+"/api/reporterr/", data=client_data) - if(response.status_code==200): - return - else: - raise ValueError - -def upload(addr,cid, rid, vtu, status): - client_data={'client_id': cid, 'run_id': rid, 'lastVTU': vtu, 'status': status} - response=requests.post(addr+"/api/upload/", data=client_data) - if(response.status_code==200): - return - else: - raise ValueError - -def getNewVTU(directory): - fset=set() - for file in os.listdir(directory): - if file.endswith(".vtu") and file.startswith("timestep_"): - fset.add(file) - return fset - - -def removeDir(directory): - os.chdir('/') - try: - shutil.rmtree(directory) - except: - print("Cannot remove directory "+directory+ "\n") - return - - - - -class ClientThread(Thread): - - def __init__(self,conn_address='http://localhost:8000',subid=0, update_seconds=100): - super(ClientThread,self).__init__() - self._stop_event = Event() - self._stop_event.clear() - self.p=None - self.workingdir=None - self.conn_address=conn_address - self.id=subid - self.ip=get_ip() - self.hostname=get_hostname() - self.update_seconds=update_seconds - - - def stop(self): - self._stop_event.set() - - def isStopped(self): - return self._stop_event.is_set() - - def join(self): - print('joining threads') - super(ClientThread, self).join() - if self.p is not None: - self.p.terminate() - if self.workingdir is not None: - removeDir(self.workingdir.fullpath()) - - - def sleep(self,s): - for i in range(0, s): - if(self.isStopped()): - return False - sleep(1) - return True - - def run(self): - while(not self.isStopped()): - try: - cid=get_client_id(self.conn_address, self.ip, self.hostname, self.id) - except: - print("[{}] Could not get CID.".format(self.id)) - self.sleep(10) - continue - #print("Got CID. getting RID.") - client_ping_time_elapsed=0 - while(not self.isStopped()): - try: - (rid,tape,vtu,status)=get_run(self.conn_address,cid) - except NameError: - print("[{}] Could not get RID.".format(self.id)) - self.sleep(10) - client_ping_time_elapsed+=10 - if(client_ping_time_elapsed>=250): - try: - client_ping(self.conn_address,cid) - except: - break - client_ping_time_elapsed=0 - #if you put break instead of continue, there is no need to ping client. And it is more robust... - continue - except ValueError: - print("[{}] Wrong CID? Getting new CID.".format(self.id)) - #self.sleep(10) - break - except: - print("[{}] Cannot connect. Server down? Retrying....".format(self.id)) - break - else: - #start separate thread with simulations. - self.workingdir=Directory('/tmp/ts_'+str(uuid.uuid4())) - self.workingdir.makeifnotexist() - self.workingdir.goto() - with open(self.workingdir.fullpath()+"/tape", 'w') as f: - f.write(tape) - if(int(status)==-1): - cmd=['trisurf', '--force-from-tape'] - print("[{}] Run id={} :: Starting from tape.".format(self.id, rid)) - else: - with open(self.workingdir.fullpath()+"/.status",'w') as f: - f.write(status) - with open(self.workingdir.fullpath()+"/initial.vtu",'w') as f: - f.write(vtu) - cmd=['trisurf', '--restore-from-vtk', 'initial.vtu'] - print("[{}] Run id={} :: Restoring from vtk, last timestep {}".format(self.id,rid,status)) - self.p=subprocess.Popen(cmd, stdout=subprocess.DEVNULL) - s=int(status) - while(not self.isStopped()): - #monitor for new file. If file is present, upload it! - newVTU=getNewVTU(self.workingdir.fullpath()) - if newVTU: #upload - try: - for nv in sorted(newVTU): - with open(nv,'r') as f: - fc=f.read() - s=s+1 - print('[{}] Uploading {}.'.format(self.id,nv)) - upload(self.conn_address, cid, rid, fc, s) - os.unlink(nv) - except: - print("[{}] Could not upload".format(self.id)) - self.p.terminate() - removeDir(self.workingdir.fullpath()) - self.p=None - self.workingdir=None - break - else: - print("[{}] VTU uploaded.".format(self.id)) - else: #ping - try: - ping_run(self.conn_address, cid, rid) - except: - print("[{}] Could not ping.".format(self.id)) - self.p.terminate() - self.p=None - removeDir(self.workingdir.fullpath()) - self.workingdir=None - #stop simulations - break - #check if trisurf is still running. If not break the innermost loop. - sleep(1) - if(self.p.poll() is not None): # trisurf exited! - print("[{}] Trisurf was stopped with return code {}".format(self.id, self.p.returncode)) - if(self.p.returncode>0): - try: - send_error_report(self.conn_address, cid, rid, self.p.returncode) - except: - print("[{}] Server didn't accept error report".format(self.id)) - removeDir(self.workingdir.fullpath()) - self.workingdir=None - self.p=None - break - self.sleep(self.update_seconds-1) - - - -#Stolen from trisurf library... Therefore, this client is not dependent on the installation of the library. -class Directory: - ''' - Class deals with the paths where the simulation is run and data is stored. - ''' - def __init__(self, maindir=".", simdir=""): - '''Initialization Directory() takes two optional parameters, namely maindir and simdir. Defaults to current directory. It sets local variables maindir and simdir accordingly.''' - self.maindir=maindir - self.simdir=simdir - return - - def fullpath(self): - ''' - Method returns string of path where the data is stored. It combines values of maindir and simdir as maindir/simdir on Unix. - ''' - return os.path.join(self.maindir,self.simdir) - - def exists(self): - ''' Method checks whether the directory specified by fullpath() exists. It return True/False on completion.''' - path=self.fullpath() - if(os.path.exists(path)): - return True - else: - return False - - def make(self): - ''' Method make() creates directory. If it fails it exits the program with error message.''' - try: - os.makedirs(self.fullpath()) - except: - print("Cannot make directory "+self.fullpath()+"\n") - exit(1) - return - - def makeifnotexist(self): - '''Method makeifnotexist() creates directory if it does not exist.''' - if(self.exists()==0): - self.make() - return True - else: - return False - - def remove(self): - '''Method remove() removes directory recursively. WARNING! No questions asked.''' - if(self.exists()): - try: - os.rmdir(self.fullpath()) - except: - print("Cannot remove directory "+self.fullpath()+ "\n") - exit(1) - return - - def goto(self): - ''' - Method goto() moves current directory to the one specified by fullpath(). WARNING: when using the relative paths, do not call this function multiple times. - ''' - try: - os.chdir(self.fullpath()) - except: - print("Cannot go to directory "+self.fullpath()+"\n") - return - - - -#--- SIGINT and SIGTERM HANDLING --- -def signal_handler(signal,frame): - t.stop() - t.join() - print("Process ended with signal " +str(signal)) - sys.exit(signal) -#--- END SIGINT and SIGTERM---- - -if __name__ == '__main__': - - signal.signal(signal.SIGINT, signal_handler) - signal.signal(signal.SIGTERM, signal_handler) - - t=ClientThread(update_seconds=100) - t.start() - #t.join() - #print("main") - while(True): - sleep(1000) diff --git a/tsclient.py b/tsclient.py index 492badc..6d2be9e 100755 --- a/tsclient.py +++ b/tsclient.py @@ -4,34 +4,12 @@ from time import sleep import uuid import subprocess -from trisurf import trisurf import os import shutil import signal import sys import socket -CONNECT_ADDR='http://localhost:8000' - -p=None -workingdir=None - - - -#--- SIGINT and SIGTERM HANDLING --- -def signal_handler(signal,frame): - global p - global wirkingdir - if p is not None: - p.terminate() - if(workingdir is not None): - removeDir(workingdir.fullpath()) - print("Process ended with signal " +str(signal)) - sys.exit(0) -signal.signal(signal.SIGINT, signal_handler) -signal.signal(signal.SIGTERM, signal_handler) -#--- END SIGINT and SIGTERM---- - - +from threading import Thread, Event def get_hostname(): return socket.gethostname() @@ -39,8 +17,8 @@ def get_ip(): return ((([ip for ip in socket.gethostbyname_ex(socket.gethostname())[2] if not ip.startswith("127.")] or [[(s.connect(("8.8.8.8", 53)), s.getsockname()[0], s.close()) for s in [socket.socket(socket.AF_INET, socket.SOCK_DGRAM)]][0][1]]) + ["no IP found"])[0]) -def get_client_id(addr, my_ip, my_hostname): - client_auth={'ip':my_ip,'hostname':my_hostname} +def get_client_id(addr, my_ip, my_hostname, subrun): + client_auth={'ip':my_ip,'hostname':my_hostname, 'subrun':subrun} response=requests.post(addr+"/api/register/", data=client_auth) if(response.status_code==200): client_data=json.loads(response.text) @@ -61,7 +39,10 @@ return (rid,tape,vtu,status) else: print(response.text) - raise ValueError + if(response.status_code==400): + raise ValueError + else: + raise NameError def ping_run(addr,cid, rid): @@ -71,6 +52,16 @@ return else: raise ValueError + +def client_ping(addr,cid): + client_data={'client_id':cid} + response=requests.post(addr+"/api/pingclient/", data=client_data) + if(response.status_code==200): + return + else: + raise ValueError + + def send_error_report(addr,cid, rid,errcode): client_data={'client_id':cid, 'run_id':rid, 'error_code':errcode} @@ -104,81 +95,226 @@ print("Cannot remove directory "+directory+ "\n") return + -while(True): - try: - cid=get_client_id(CONNECT_ADDR, get_ip(),get_hostname()) - except: - print("Cannot get CID.") - sleep(10) - continue - print("Got CID. getting RID.") - while(True): - try: - (rid,tape,vtu,status)=get_run(CONNECT_ADDR,cid) - except: - print("Could not get RID.") - sleep(10) - break - else: - #start separate thread with simulations. - workingdir=trisurf.Directory('/tmp/ts_'+str(uuid.uuid4())) - workingdir.makeifnotexist() - workingdir.goto() - with open(workingdir.fullpath()+"/tape", 'w') as f: - f.write(tape) - if(int(status)==-1): - cmd=['trisurf', '--force-from-tape'] - print("Run id="+str(rid)+ " :: Starting from tape") - else: - with open(workingdir.fullpath()+"/.status",'w') as f: - f.write(status) - with open(workingdir.fullpath()+"/initial.vtu",'w') as f: - f.write(vtu) - cmd=['trisurf', '--restore-from-vtk', 'initial.vtu'] - print("Run id="+str(rid)+ " :: Restoring from vtk, last timestep "+status) - p=subprocess.Popen(cmd, stdout=subprocess.DEVNULL) - s=int(status) - while(True): - #monitor for new file. If file is present, upload it! - newVTU=getNewVTU(workingdir.fullpath()) - if newVTU: #upload - try: - for nv in sorted(newVTU): - with open(nv,'r') as f: - fc=f.read() - s=s+1 - print('Uploading '+nv) - upload(CONNECT_ADDR, cid, rid, fc, s) - os.unlink(nv) - except: - print("Could not upload") - p.terminate() - removeDir(workingdir.fullpath()) - break - else: - print("VTU uploaded") - else: #ping - try: - ping_run(CONNECT_ADDR, cid, rid) - except: - print("Could not ping") - p.terminate() - removeDir(workingdir.fullpath()) - #stop simulations - break - #check if trisurf is still running. If not break the highest level loop. - sleep(1) - if(p.poll() is not None): # trisurf exited! - print("Trisurf was stopped with return code {}".format(p.returncode)) - if(p.returncode>0): + +class ClientThread(Thread): + + def __init__(self,conn_address='http://beti.trisurf.eu',subid=0, update_seconds=100): + super(ClientThread,self).__init__() + self._stop_event = Event() + self._stop_event.clear() + self.p=None + self.workingdir=None + self.conn_address=conn_address + self.id=subid + self.ip=get_ip() + self.hostname=get_hostname() + self.update_seconds=update_seconds + + + def stop(self): + self._stop_event.set() + + def isStopped(self): + return self._stop_event.is_set() + + def join(self): + print('joining threads') + super(ClientThread, self).join() + if self.p is not None: + self.p.terminate() + if self.workingdir is not None: + removeDir(self.workingdir.fullpath()) + + + def sleep(self,s): + for i in range(0, s): + if(self.isStopped()): + return False + sleep(1) + return True + + def run(self): + while(not self.isStopped()): + try: + cid=get_client_id(self.conn_address, self.ip, self.hostname, self.id) + except: + print("[{}] Could not get CID.".format(self.id)) + self.sleep(10) + continue + #print("Got CID. getting RID.") + client_ping_time_elapsed=0 + while(not self.isStopped()): + try: + (rid,tape,vtu,status)=get_run(self.conn_address,cid) + except NameError: + print("[{}] Could not get RID.".format(self.id)) + self.sleep(10) + client_ping_time_elapsed+=10 + if(client_ping_time_elapsed>=250): try: - send_error_report(CONNECT_ADDR, cid, rid, p.returncode) + client_ping(self.conn_address,cid) except: - print("Server didn't accept error report") - removeDir(workingdir.fullpath()) + break + client_ping_time_elapsed=0 + #if you put break instead of continue, there is no need to ping client. And it is more robust... + continue + except ValueError: + print("[{}] Wrong CID? Getting new CID.".format(self.id)) + #self.sleep(10) break - sleep(100) + except: + print("[{}] Cannot connect. Server down? Retrying....".format(self.id)) + break + else: + #start separate thread with simulations. + self.workingdir=Directory('/tmp/ts_'+str(uuid.uuid4())) + self.workingdir.makeifnotexist() + self.workingdir.goto() + with open(self.workingdir.fullpath()+"/tape", 'w') as f: + f.write(tape) + if(int(status)==-1): + cmd=['trisurf', '--force-from-tape'] + print("[{}] Run id={} :: Starting from tape.".format(self.id, rid)) + else: + with open(self.workingdir.fullpath()+"/.status",'w') as f: + f.write(status) + with open(self.workingdir.fullpath()+"/initial.vtu",'w') as f: + f.write(vtu) + cmd=['trisurf', '--restore-from-vtk', 'initial.vtu'] + print("[{}] Run id={} :: Restoring from vtk, last timestep {}".format(self.id,rid,status)) + self.p=subprocess.Popen(cmd, stdout=subprocess.DEVNULL) + s=int(status) + while(not self.isStopped()): + #monitor for new file. If file is present, upload it! + newVTU=getNewVTU(self.workingdir.fullpath()) + if newVTU: #upload + try: + for nv in sorted(newVTU): + with open(nv,'r') as f: + fc=f.read() + s=s+1 + print('[{}] Uploading {}.'.format(self.id,nv)) + upload(self.conn_address, cid, rid, fc, s) + os.unlink(nv) + except: + print("[{}] Could not upload".format(self.id)) + self.p.terminate() + removeDir(self.workingdir.fullpath()) + self.p=None + self.workingdir=None + break + else: + print("[{}] VTU uploaded.".format(self.id)) + else: #ping + try: + ping_run(self.conn_address, cid, rid) + except: + print("[{}] Could not ping.".format(self.id)) + self.p.terminate() + self.p=None + removeDir(self.workingdir.fullpath()) + self.workingdir=None + #stop simulations + break + #check if trisurf is still running. If not break the innermost loop. + sleep(1) + if(self.p.poll() is not None): # trisurf exited! + print("[{}] Trisurf was stopped with return code {}".format(self.id, self.p.returncode)) + if(self.p.returncode>0): + try: + send_error_report(self.conn_address, cid, rid, self.p.returncode) + except: + print("[{}] Server didn't accept error report".format(self.id)) + removeDir(self.workingdir.fullpath()) + self.workingdir=None + self.p=None + break + self.sleep(self.update_seconds-1) - + +#Stolen from trisurf library... Therefore, this client is not dependent on the installation of the library. +class Directory: + ''' + Class deals with the paths where the simulation is run and data is stored. + ''' + def __init__(self, maindir=".", simdir=""): + '''Initialization Directory() takes two optional parameters, namely maindir and simdir. Defaults to current directory. It sets local variables maindir and simdir accordingly.''' + self.maindir=maindir + self.simdir=simdir + return + + def fullpath(self): + ''' + Method returns string of path where the data is stored. It combines values of maindir and simdir as maindir/simdir on Unix. + ''' + return os.path.join(self.maindir,self.simdir) + + def exists(self): + ''' Method checks whether the directory specified by fullpath() exists. It return True/False on completion.''' + path=self.fullpath() + if(os.path.exists(path)): + return True + else: + return False + + def make(self): + ''' Method make() creates directory. If it fails it exits the program with error message.''' + try: + os.makedirs(self.fullpath()) + except: + print("Cannot make directory "+self.fullpath()+"\n") + exit(1) + return + + def makeifnotexist(self): + '''Method makeifnotexist() creates directory if it does not exist.''' + if(self.exists()==0): + self.make() + return True + else: + return False + + def remove(self): + '''Method remove() removes directory recursively. WARNING! No questions asked.''' + if(self.exists()): + try: + os.rmdir(self.fullpath()) + except: + print("Cannot remove directory "+self.fullpath()+ "\n") + exit(1) + return + + def goto(self): + ''' + Method goto() moves current directory to the one specified by fullpath(). WARNING: when using the relative paths, do not call this function multiple times. + ''' + try: + os.chdir(self.fullpath()) + except: + print("Cannot go to directory "+self.fullpath()+"\n") + return + + + +#--- SIGINT and SIGTERM HANDLING --- +def signal_handler(signal,frame): + t.stop() + t.join() + print("Process ended with signal " +str(signal)) + sys.exit(signal) +#--- END SIGINT and SIGTERM---- + +if __name__ == '__main__': + + signal.signal(signal.SIGINT, signal_handler) + signal.signal(signal.SIGTERM, signal_handler) + + t=ClientThread(update_seconds=100) + t.start() + #t.join() + #print("main") + while(True): + sleep(1000) -- Gitblit v1.9.3