From b1b8fc9f25fcff8bc145889abfaefe4ea48bf629 Mon Sep 17 00:00:00 2001 From: Samo Penic <samo.penic@gmail.com> Date: Tue, 15 May 2018 17:42:19 +0000 Subject: [PATCH] Merge branch 'master' of ssh://git.penic.eu:29418/trisurf-client --- playground/tsclient.py | 320 +++++++++++++++++++++++++++++++++++++++++++++++++++++ tsclient.py | 1 2 files changed, 321 insertions(+), 0 deletions(-) diff --git a/playground/tsclient.py b/playground/tsclient.py new file mode 100755 index 0000000..4b0724c --- /dev/null +++ b/playground/tsclient.py @@ -0,0 +1,320 @@ +#!/usr/bin/python3 +import requests +import json +from time import sleep +import uuid +import subprocess +import os +import shutil +import signal +import sys +import socket +from threading import Thread, Event + +def get_hostname(): + return socket.gethostname() + +def get_ip(): + return ((([ip for ip in socket.gethostbyname_ex(socket.gethostname())[2] if not ip.startswith("127.")] or [[(s.connect(("8.8.8.8", 53)), s.getsockname()[0], s.close()) for s in [socket.socket(socket.AF_INET, socket.SOCK_DGRAM)]][0][1]]) + ["no IP found"])[0]) + +def get_client_id(addr, my_ip, my_hostname, subrun): + client_auth={'ip':my_ip,'hostname':my_hostname, 'subrun':subrun} + response=requests.post(addr+"/api/register/", data=client_auth) + if(response.status_code==200): + client_data=json.loads(response.text) + client_id=client_data['id'] + return client_id + else: + raise ValueError + + +def get_run(addr,cid): + response=requests.get(addr+"/api/getrun/"+str(cid)+"/") + if(response.status_code==200): + client_data=json.loads(response.text) + rid=client_data['id'] + tape=client_data['tape'] + vtu=client_data['lastVTU'] + status=client_data['status'] + return (rid,tape,vtu,status) + else: + print(response.text) + if(response.status_code==400): + raise ValueError + else: + raise NameError + + +def ping_run(addr,cid, rid): + client_data={'client_id':cid, 'run_id':rid} + response=requests.post(addr+"/api/ping/", data=client_data) + if(response.status_code==200): + return + else: + raise ValueError + +def client_ping(addr,cid): + client_data={'client_id':cid} + response=requests.post(addr+"/api/pingclient/", data=client_data) + if(response.status_code==200): + return + else: + raise ValueError + + + +def send_error_report(addr,cid, rid,errcode): + client_data={'client_id':cid, 'run_id':rid, 'error_code':errcode} + response=requests.post(addr+"/api/reporterr/", data=client_data) + if(response.status_code==200): + return + else: + raise ValueError + +def upload(addr,cid, rid, vtu, status): + client_data={'client_id': cid, 'run_id': rid, 'lastVTU': vtu, 'status': status} + response=requests.post(addr+"/api/upload/", data=client_data) + if(response.status_code==200): + return + else: + raise ValueError + +def getNewVTU(directory): + fset=set() + for file in os.listdir(directory): + if file.endswith(".vtu") and file.startswith("timestep_"): + fset.add(file) + return fset + + +def removeDir(directory): + os.chdir('/') + try: + shutil.rmtree(directory) + except: + print("Cannot remove directory "+directory+ "\n") + return + + + + +class ClientThread(Thread): + + def __init__(self,conn_address='http://localhost:8000',subid=0, update_seconds=100): + super(ClientThread,self).__init__() + self._stop_event = Event() + self._stop_event.clear() + self.p=None + self.workingdir=None + self.conn_address=conn_address + self.id=subid + self.ip=get_ip() + self.hostname=get_hostname() + self.update_seconds=update_seconds + + + def stop(self): + self._stop_event.set() + + def isStopped(self): + return self._stop_event.is_set() + + def join(self): + print('joining threads') + super(ClientThread, self).join() + if self.p is not None: + self.p.terminate() + if self.workingdir is not None: + removeDir(self.workingdir.fullpath()) + + + def sleep(self,s): + for i in range(0, s): + if(self.isStopped()): + return False + sleep(1) + return True + + def run(self): + while(not self.isStopped()): + try: + cid=get_client_id(self.conn_address, self.ip, self.hostname, self.id) + except: + print("[{}] Could not get CID.".format(self.id)) + self.sleep(10) + continue + #print("Got CID. getting RID.") + client_ping_time_elapsed=0 + while(not self.isStopped()): + try: + (rid,tape,vtu,status)=get_run(self.conn_address,cid) + except NameError: + print("[{}] Could not get RID.".format(self.id)) + self.sleep(10) + client_ping_time_elapsed+=10 + if(client_ping_time_elapsed>=250): + try: + client_ping(self.conn_address,cid) + except: + break + client_ping_time_elapsed=0 + #if you put break instead of continue, there is no need to ping client. And it is more robust... + continue + except ValueError: + print("[{}] Wrong CID? Getting new CID.".format(self.id)) + #self.sleep(10) + break + except: + print("[{}] Cannot connect. Server down? Retrying....".format(self.id)) + break + else: + #start separate thread with simulations. + self.workingdir=Directory('/tmp/ts_'+str(uuid.uuid4())) + self.workingdir.makeifnotexist() + self.workingdir.goto() + with open(self.workingdir.fullpath()+"/tape", 'w') as f: + f.write(tape) + if(int(status)==-1): + cmd=['trisurf', '--force-from-tape'] + print("[{}] Run id={} :: Starting from tape.".format(self.id, rid)) + else: + with open(self.workingdir.fullpath()+"/.status",'w') as f: + f.write(status) + with open(self.workingdir.fullpath()+"/initial.vtu",'w') as f: + f.write(vtu) + cmd=['trisurf', '--restore-from-vtk', 'initial.vtu'] + print("[{}] Run id={} :: Restoring from vtk, last timestep {}".format(self.id,rid,status)) + self.p=subprocess.Popen(cmd, stdout=subprocess.DEVNULL) + s=int(status) + while(not self.isStopped()): + #monitor for new file. If file is present, upload it! + newVTU=getNewVTU(self.workingdir.fullpath()) + if newVTU: #upload + try: + for nv in sorted(newVTU): + with open(nv,'r') as f: + fc=f.read() + s=s+1 + print('[{}] Uploading {}.'.format(self.id,nv)) + upload(self.conn_address, cid, rid, fc, s) + os.unlink(nv) + except: + print("[{}] Could not upload".format(self.id)) + self.p.terminate() + removeDir(self.workingdir.fullpath()) + self.p=None + self.workingdir=None + break + else: + print("[{}] VTU uploaded.".format(self.id)) + else: #ping + try: + ping_run(self.conn_address, cid, rid) + except: + print("[{}] Could not ping.".format(self.id)) + self.p.terminate() + self.p=None + removeDir(self.workingdir.fullpath()) + self.workingdir=None + #stop simulations + break + #check if trisurf is still running. If not break the innermost loop. + sleep(1) + if(self.p.poll() is not None): # trisurf exited! + print("[{}] Trisurf was stopped with return code {}".format(self.id, self.p.returncode)) + if(self.p.returncode>0): + try: + send_error_report(self.conn_address, cid, rid, self.p.returncode) + except: + print("[{}] Server didn't accept error report".format(self.id)) + removeDir(self.workingdir.fullpath()) + self.workingdir=None + self.p=None + break + self.sleep(self.update_seconds-1) + + + +#Stolen from trisurf library... Therefore, this client is not dependent on the installation of the library. +class Directory: + ''' + Class deals with the paths where the simulation is run and data is stored. + ''' + def __init__(self, maindir=".", simdir=""): + '''Initialization Directory() takes two optional parameters, namely maindir and simdir. Defaults to current directory. It sets local variables maindir and simdir accordingly.''' + self.maindir=maindir + self.simdir=simdir + return + + def fullpath(self): + ''' + Method returns string of path where the data is stored. It combines values of maindir and simdir as maindir/simdir on Unix. + ''' + return os.path.join(self.maindir,self.simdir) + + def exists(self): + ''' Method checks whether the directory specified by fullpath() exists. It return True/False on completion.''' + path=self.fullpath() + if(os.path.exists(path)): + return True + else: + return False + + def make(self): + ''' Method make() creates directory. If it fails it exits the program with error message.''' + try: + os.makedirs(self.fullpath()) + except: + print("Cannot make directory "+self.fullpath()+"\n") + exit(1) + return + + def makeifnotexist(self): + '''Method makeifnotexist() creates directory if it does not exist.''' + if(self.exists()==0): + self.make() + return True + else: + return False + + def remove(self): + '''Method remove() removes directory recursively. WARNING! No questions asked.''' + if(self.exists()): + try: + os.rmdir(self.fullpath()) + except: + print("Cannot remove directory "+self.fullpath()+ "\n") + exit(1) + return + + def goto(self): + ''' + Method goto() moves current directory to the one specified by fullpath(). WARNING: when using the relative paths, do not call this function multiple times. + ''' + try: + os.chdir(self.fullpath()) + except: + print("Cannot go to directory "+self.fullpath()+"\n") + return + + + +#--- SIGINT and SIGTERM HANDLING --- +def signal_handler(signal,frame): + t.stop() + t.join() + print("Process ended with signal " +str(signal)) + sys.exit(signal) +#--- END SIGINT and SIGTERM---- + +if __name__ == '__main__': + + signal.signal(signal.SIGINT, signal_handler) + signal.signal(signal.SIGTERM, signal_handler) + + t=ClientThread(update_seconds=100) + t.start() + #t.join() + #print("main") + while(True): + sleep(1000) diff --git a/tsclient.py b/tsclient.py index a736951..492badc 100755 --- a/tsclient.py +++ b/tsclient.py @@ -60,6 +60,7 @@ status=client_data['status'] return (rid,tape,vtu,status) else: + print(response.text) raise ValueError -- Gitblit v1.9.3