From 4c9734b25313f93c6277189bcf792151370c7225 Mon Sep 17 00:00:00 2001 From: Samo Penic <samo.penic@gmail.com> Date: Fri, 22 Jun 2018 08:51:41 +0000 Subject: [PATCH] Added sending information of trisurf version to the server --- tsclient.py | 378 ++++++++++++++++++++++++++++++++++++++++------------- 1 files changed, 284 insertions(+), 94 deletions(-) diff --git a/tsclient.py b/tsclient.py index 972fd81..ba76880 100755 --- a/tsclient.py +++ b/tsclient.py @@ -4,34 +4,23 @@ from time import sleep import uuid import subprocess -from trisurf import trisurf import os import shutil import signal import sys import socket -CONNECT_ADDR='http://localhost:8000' +from threading import Thread, Event +import re -p=None -workingdir=None - - - -#--- SIGINT and SIGTERM HANDLING --- -def signal_handler(signal,frame): - global p - global wirkingdir - if p is not None: - p.terminate() - if(workingdir is not None): - removeDir(workingdir.fullpath()) - print("Process ended with signal " +str(signal)) - sys.exit(0) -signal.signal(signal.SIGINT, signal_handler) -signal.signal(signal.SIGTERM, signal_handler) -#--- END SIGINT and SIGTERM---- - - +def getTrisurfVersion(): + p = subprocess.Popen('trisurf --version', shell=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT) + lines=p.stdout.readlines() + version=re.findall(r'[0-9a-f]{7}(?:-dirty)?', lines[0].decode('ascii')) + p.wait() + if(len(version)): + return version[0] + else: + return "unknown version" def get_hostname(): return socket.gethostname() @@ -39,8 +28,8 @@ def get_ip(): return ((([ip for ip in socket.gethostbyname_ex(socket.gethostname())[2] if not ip.startswith("127.")] or [[(s.connect(("8.8.8.8", 53)), s.getsockname()[0], s.close()) for s in [socket.socket(socket.AF_INET, socket.SOCK_DGRAM)]][0][1]]) + ["no IP found"])[0]) -def get_client_id(addr, my_ip, my_hostname): - client_auth={'ip':my_ip,'hostname':my_hostname} +def get_client_id(addr, my_ip, my_hostname, subrun): + client_auth={'ip':my_ip,'hostname':my_hostname, 'subrun':subrun, 'trisurf_version':getTrisurfVersion() } response=requests.post(addr+"/api/register/", data=client_auth) if(response.status_code==200): client_data=json.loads(response.text) @@ -61,12 +50,35 @@ return (rid,tape,vtu,status) else: print(response.text) - raise ValueError + if(response.status_code==400): + raise ValueError + else: + raise NameError def ping_run(addr,cid, rid): client_data={'client_id':cid, 'run_id':rid} response=requests.post(addr+"/api/ping/", data=client_data) + if(response.status_code==200): + return + else: + raise ValueError + +def client_ping(addr,cid): + client_data={'client_id':cid} + response=requests.post(addr+"/api/pingclient/", data=client_data) + if(response.status_code==200): + client_data=json.loads(response.text) + + return client_data['concurrent_runs'] + else: + raise ValueError + + + +def send_error_report(addr,cid, rid,errcode): + client_data={'client_id':cid, 'run_id':rid, 'error_code':errcode} + response=requests.post(addr+"/api/reporterr/", data=client_data) if(response.status_code==200): return else: @@ -96,76 +108,254 @@ print("Cannot remove directory "+directory+ "\n") return + -while(True): - try: - cid=get_client_id(CONNECT_ADDR, get_ip(),get_hostname()) - except: - print("Cannot get CID.") - sleep(10) - continue - print("Got CID. getting RID.") - while(True): - try: - (rid,tape,vtu,status)=get_run(CONNECT_ADDR,cid) - except: - print("Could not get RID.") - sleep(10) - break - else: - #start separate thread with simulations. - workingdir=trisurf.Directory('/tmp/ts_'+str(uuid.uuid4())) - workingdir.makeifnotexist() - workingdir.goto() - with open(workingdir.fullpath()+"/tape", 'w') as f: - f.write(tape) - if(int(status)==-1): - cmd=['trisurf', '--force-from-tape'] - print("Run id="+str(rid)+ " :: Starting from tape") - else: - with open(workingdir.fullpath()+"/.status",'w') as f: - f.write(status) - with open(workingdir.fullpath()+"/initial.vtu",'w') as f: - f.write(vtu) - cmd=['trisurf', '--restore-from-vtk', 'initial.vtu'] - print("Run id="+str(rid)+ " :: Restoring from vtk, last timestep "+status) - p=subprocess.Popen(cmd, stdout=subprocess.DEVNULL) - s=int(status) - while(True): - #monitor for new file. If file is present, upload it! - newVTU=getNewVTU(workingdir.fullpath()) - if newVTU: #upload - try: - for nv in sorted(newVTU): - with open(nv,'r') as f: - fc=f.read() - s=s+1 - print('Uploading '+nv) - upload(CONNECT_ADDR, cid, rid, fc, s) - os.unlink(nv) - except: - print("Could not upload") - p.terminate() - removeDir(workingdir.fullpath()) - break - else: - print("VTU uploaded") - else: #ping - try: - ping_run(CONNECT_ADDR, cid, rid) - except: - print("Could not ping") - p.terminate() - removeDir(workingdir.fullpath()) - #stop simulations - break - #check if trisurf is still running. If not break the highest level loop. - sleep(1) - if(p.poll() is not None): # trisurf exited! - print("Trisurf was stopped") - removeDir(workingdir.fullpath()) + +class ClientThread(Thread): + def __init__(self,conn_address='http://beti.trisurf.eu',subid=0, update_seconds=100): + super(ClientThread,self).__init__() + self._stop_event = Event() + self._stop_event.clear() + self.p=None + self.workingdir=None + self.conn_address=conn_address + self.id=subid + self.ip=get_ip() + self.hostname=get_hostname() + self.update_seconds=update_seconds + self.max_client_ping_time_elapsed=250 + + self.subruns=[] + + def stop(self): + self._stop_event.set() + + def isStopped(self): + return self._stop_event.is_set() + + def join(self): + print('joining threads') + super(ClientThread, self).join() + for sub in self.subruns: + sub.stop() + sub.join() + if self.p is not None: + self.p.terminate() + if self.workingdir is not None: + removeDir(self.workingdir.fullpath()) + + + def sleep(self,s): + for i in range(0, s): + if(self.isStopped()): + return False + sleep(1) + return True + + def subrunsStartStop(self,nr): + while(self.id==0 and nr>len(self.subruns)+1): + #spawning a new worker: + print("[{}] Spawning a new worker".format(self.id)) + t=ClientThread(conn_address=self.conn_address, subid=len(self.subruns)+1,update_seconds=self.update_seconds) + t.start() + self.subruns.append(t) + while(self.id==0 and nr<len(self.subruns)+1): + print("[{}] Stopping a worker".format(self.id)) + self.subruns[-1].stop() + self.subruns[-1].join() + del self.subruns[-1] + + def run(self): + while(not self.isStopped()): #try to register + try: + cid=get_client_id(self.conn_address, self.ip, self.hostname, self.id) + except: + print("[{}] Could not get CID.".format(self.id)) + self.sleep(10) + continue + #print("Got CID. getting RID.") + client_ping_time_elapsed=0 + concurrent_runs=client_ping(self.conn_address,cid) + self.subrunsStartStop(concurrent_runs) + while(not self.isStopped()): #successfully registered, now start pinging and searching for job + try: + (rid,tape,vtu,status)=get_run(self.conn_address,cid) + except NameError: + print("[{}] Could not get RID.".format(self.id)) + self.sleep(10) + client_ping_time_elapsed+=10 + if(client_ping_time_elapsed>=self.max_client_ping_time_elapsed): + try: + concurrent_runs=client_ping(self.conn_address,cid) + self.subrunsStartStop(concurrent_runs) + except: + break + client_ping_time_elapsed=0 + #if you put break instead of continue, there is no need to ping client. And it is more robust... + continue + except ValueError: + print("[{}] Wrong CID? Getting new CID.".format(self.id)) + #self.sleep(10) break - sleep(100) + except: + print("[{}] Cannot connect. Server down? Retrying....".format(self.id)) + break + else: + #start separate thread with simulations. + self.workingdir=Directory('/tmp/ts_'+str(uuid.uuid4())) + self.workingdir.makeifnotexist() + self.workingdir.goto() + #print("[{}] Using directory {}".format(self.id, self.workingdir.fullpath())) + with open(self.workingdir.fullpath()+"/tape", 'w') as f: + f.write(tape) + if(int(status)==-1): + cmd=['trisurf', '--force-from-tape'] + print("[{}] Run id={} :: Starting from tape.".format(self.id, rid)) + else: + with open(self.workingdir.fullpath()+"/.status",'w') as f: + f.write(status) + with open(self.workingdir.fullpath()+"/initial.vtu",'w') as f: + f.write(vtu) + cmd=['trisurf', '--restore-from-vtk', 'initial.vtu'] + print("[{}] Run id={} :: Restoring from vtk, last timestep {}".format(self.id,rid,status)) + self.p=subprocess.Popen(cmd, stdout=subprocess.DEVNULL) + s=int(status) + while(not self.isStopped()): + #monitor for new file. If file is present, upload it! + newVTU=getNewVTU(self.workingdir.fullpath()) + if newVTU: #upload + try: + for nvfile in sorted(newVTU): + nv=os.path.join(self.workingdir.fullpath(),nvfile) + with open(nv,'r') as f: + fc=f.read() + s=s+1 + print('[{}] Uploading {}.'.format(self.id,nvfile)) + upload(self.conn_address, cid, rid, fc, s) + os.unlink(nv) + except Exception as e: + print("[{}] Could not upload".format(self.id)) + print(e) + self.p.terminate() + removeDir(self.workingdir.fullpath()) + self.p=None + self.workingdir=None + break + else: + print("[{}] VTU uploaded.".format(self.id)) + else: #ping + try: + ping_run(self.conn_address, cid, rid) + except: + print("[{}] Could not ping.".format(self.id)) + self.p.terminate() + self.p=None + removeDir(self.workingdir.fullpath()) + self.workingdir=None + #stop simulations + break + #check if trisurf is still running. If not break the innermost loop. + sleep(1) + if(self.p.poll() is not None): # trisurf exited! + print("[{}] Trisurf was stopped with return code {}".format(self.id, self.p.returncode)) + if(self.p.returncode>0): + try: + send_error_report(self.conn_address, cid, rid, self.p.returncode) + except: + print("[{}] Server didn't accept error report".format(self.id)) + removeDir(self.workingdir.fullpath()) + self.workingdir=None + self.p=None + break + self.sleep(self.update_seconds-1) + client_ping_time_elapsed+=self.update_seconds + if(client_ping_time_elapsed>self.max_client_ping_time_elapsed-self.update_seconds/2): + concurrent_runs=client_ping(self.conn_address,cid) + self.subrunsStartStop(concurrent_runs) + client_ping_time_elapsed=0 - + +#Stolen from trisurf library... Therefore, this client is not dependent on the installation of the library. +class Directory: + ''' + Class deals with the paths where the simulation is run and data is stored. + ''' + def __init__(self, maindir=".", simdir=""): + '''Initialization Directory() takes two optional parameters, namely maindir and simdir. Defaults to current directory. It sets local variables maindir and simdir accordingly.''' + self.maindir=maindir + self.simdir=simdir + return + + def fullpath(self): + ''' + Method returns string of path where the data is stored. It combines values of maindir and simdir as maindir/simdir on Unix. + ''' + return os.path.join(self.maindir,self.simdir) + + def exists(self): + ''' Method checks whether the directory specified by fullpath() exists. It return True/False on completion.''' + path=self.fullpath() + if(os.path.exists(path)): + return True + else: + return False + + def make(self): + ''' Method make() creates directory. If it fails it exits the program with error message.''' + try: + os.makedirs(self.fullpath()) + except: + print("Cannot make directory "+self.fullpath()+"\n") + exit(1) + return + + def makeifnotexist(self): + '''Method makeifnotexist() creates directory if it does not exist.''' + if(self.exists()==0): + self.make() + return True + else: + return False + + def remove(self): + '''Method remove() removes directory recursively. WARNING! No questions asked.''' + if(self.exists()): + try: + os.rmdir(self.fullpath()) + except: + print("Cannot remove directory "+self.fullpath()+ "\n") + exit(1) + return + + def goto(self): + ''' + Method goto() moves current directory to the one specified by fullpath(). WARNING: when using the relative paths, do not call this function multiple times. + ''' + try: + os.chdir(self.fullpath()) + except: + print("Cannot go to directory "+self.fullpath()+"\n") + return + + + +#--- SIGINT and SIGTERM HANDLING --- +def signal_handler(signal,frame): + t.stop() + t.join() + print("Process ended with signal " +str(signal)) + sys.exit(signal) +#--- END SIGINT and SIGTERM---- + +if __name__ == '__main__': + + signal.signal(signal.SIGINT, signal_handler) + signal.signal(signal.SIGTERM, signal_handler) + + t=ClientThread(update_seconds=100) + t.start() + #t.join() + #print("main") + while(True): + sleep(1000) -- Gitblit v1.9.3