New file |
| | |
| | | #!/usr/bin/python3 |
| | | import requests |
| | | import json |
| | | from time import sleep |
| | | import uuid |
| | | import subprocess |
| | | import os |
| | | import shutil |
| | | import signal |
| | | import sys |
| | | import socket |
| | | from threading import Thread, Event |
| | | |
| | | def get_hostname(): |
| | | return socket.gethostname() |
| | | |
| | | def get_ip(): |
| | | return ((([ip for ip in socket.gethostbyname_ex(socket.gethostname())[2] if not ip.startswith("127.")] or [[(s.connect(("8.8.8.8", 53)), s.getsockname()[0], s.close()) for s in [socket.socket(socket.AF_INET, socket.SOCK_DGRAM)]][0][1]]) + ["no IP found"])[0]) |
| | | |
| | | def get_client_id(addr, my_ip, my_hostname, subrun): |
| | | client_auth={'ip':my_ip,'hostname':my_hostname, 'subrun':subrun} |
| | | response=requests.post(addr+"/api/register/", data=client_auth) |
| | | if(response.status_code==200): |
| | | client_data=json.loads(response.text) |
| | | client_id=client_data['id'] |
| | | return client_id |
| | | else: |
| | | raise ValueError |
| | | |
| | | |
| | | def get_run(addr,cid): |
| | | response=requests.get(addr+"/api/getrun/"+str(cid)+"/") |
| | | if(response.status_code==200): |
| | | client_data=json.loads(response.text) |
| | | rid=client_data['id'] |
| | | tape=client_data['tape'] |
| | | vtu=client_data['lastVTU'] |
| | | status=client_data['status'] |
| | | return (rid,tape,vtu,status) |
| | | else: |
| | | print(response.text) |
| | | if(response.status_code==400): |
| | | raise ValueError |
| | | else: |
| | | raise NameError |
| | | |
| | | |
| | | def ping_run(addr,cid, rid): |
| | | client_data={'client_id':cid, 'run_id':rid} |
| | | response=requests.post(addr+"/api/ping/", data=client_data) |
| | | if(response.status_code==200): |
| | | return |
| | | else: |
| | | raise ValueError |
| | | |
| | | def client_ping(addr,cid): |
| | | client_data={'client_id':cid} |
| | | response=requests.post(addr+"/api/pingclient/", data=client_data) |
| | | if(response.status_code==200): |
| | | return |
| | | else: |
| | | raise ValueError |
| | | |
| | | |
| | | |
| | | def send_error_report(addr,cid, rid,errcode): |
| | | client_data={'client_id':cid, 'run_id':rid, 'error_code':errcode} |
| | | response=requests.post(addr+"/api/reporterr/", data=client_data) |
| | | if(response.status_code==200): |
| | | return |
| | | else: |
| | | raise ValueError |
| | | |
| | | def upload(addr,cid, rid, vtu, status): |
| | | client_data={'client_id': cid, 'run_id': rid, 'lastVTU': vtu, 'status': status} |
| | | response=requests.post(addr+"/api/upload/", data=client_data) |
| | | if(response.status_code==200): |
| | | return |
| | | else: |
| | | raise ValueError |
| | | |
| | | def getNewVTU(directory): |
| | | fset=set() |
| | | for file in os.listdir(directory): |
| | | if file.endswith(".vtu") and file.startswith("timestep_"): |
| | | fset.add(file) |
| | | return fset |
| | | |
| | | |
| | | def removeDir(directory): |
| | | os.chdir('/') |
| | | try: |
| | | shutil.rmtree(directory) |
| | | except: |
| | | print("Cannot remove directory "+directory+ "\n") |
| | | return |
| | | |
| | | |
| | | |
| | | |
| | | class ClientThread(Thread): |
| | | |
| | | def __init__(self,conn_address='http://localhost:8000',subid=0, update_seconds=100): |
| | | super(ClientThread,self).__init__() |
| | | self._stop_event = Event() |
| | | self._stop_event.clear() |
| | | self.p=None |
| | | self.workingdir=None |
| | | self.conn_address=conn_address |
| | | self.id=subid |
| | | self.ip=get_ip() |
| | | self.hostname=get_hostname() |
| | | self.update_seconds=update_seconds |
| | | |
| | | |
| | | def stop(self): |
| | | self._stop_event.set() |
| | | |
| | | def isStopped(self): |
| | | return self._stop_event.is_set() |
| | | |
| | | def join(self): |
| | | print('joining threads') |
| | | super(ClientThread, self).join() |
| | | if self.p is not None: |
| | | self.p.terminate() |
| | | if self.workingdir is not None: |
| | | removeDir(self.workingdir.fullpath()) |
| | | |
| | | |
| | | def sleep(self,s): |
| | | for i in range(0, s): |
| | | if(self.isStopped()): |
| | | return False |
| | | sleep(1) |
| | | return True |
| | | |
| | | def run(self): |
| | | while(not self.isStopped()): |
| | | try: |
| | | cid=get_client_id(self.conn_address, self.ip, self.hostname, self.id) |
| | | except: |
| | | print("[{}] Could not get CID.".format(self.id)) |
| | | self.sleep(10) |
| | | continue |
| | | #print("Got CID. getting RID.") |
| | | client_ping_time_elapsed=0 |
| | | while(not self.isStopped()): |
| | | try: |
| | | (rid,tape,vtu,status)=get_run(self.conn_address,cid) |
| | | except NameError: |
| | | print("[{}] Could not get RID.".format(self.id)) |
| | | self.sleep(10) |
| | | client_ping_time_elapsed+=10 |
| | | if(client_ping_time_elapsed>=250): |
| | | try: |
| | | client_ping(self.conn_address,cid) |
| | | except: |
| | | break |
| | | client_ping_time_elapsed=0 |
| | | #if you put break instead of continue, there is no need to ping client. And it is more robust... |
| | | continue |
| | | except ValueError: |
| | | print("[{}] Wrong CID? Getting new CID.".format(self.id)) |
| | | #self.sleep(10) |
| | | break |
| | | except: |
| | | print("[{}] Cannot connect. Server down? Retrying....".format(self.id)) |
| | | break |
| | | else: |
| | | #start separate thread with simulations. |
| | | self.workingdir=Directory('/tmp/ts_'+str(uuid.uuid4())) |
| | | self.workingdir.makeifnotexist() |
| | | self.workingdir.goto() |
| | | with open(self.workingdir.fullpath()+"/tape", 'w') as f: |
| | | f.write(tape) |
| | | if(int(status)==-1): |
| | | cmd=['trisurf', '--force-from-tape'] |
| | | print("[{}] Run id={} :: Starting from tape.".format(self.id, rid)) |
| | | else: |
| | | with open(self.workingdir.fullpath()+"/.status",'w') as f: |
| | | f.write(status) |
| | | with open(self.workingdir.fullpath()+"/initial.vtu",'w') as f: |
| | | f.write(vtu) |
| | | cmd=['trisurf', '--restore-from-vtk', 'initial.vtu'] |
| | | print("[{}] Run id={} :: Restoring from vtk, last timestep {}".format(self.id,rid,status)) |
| | | self.p=subprocess.Popen(cmd, stdout=subprocess.DEVNULL) |
| | | s=int(status) |
| | | while(not self.isStopped()): |
| | | #monitor for new file. If file is present, upload it! |
| | | newVTU=getNewVTU(self.workingdir.fullpath()) |
| | | if newVTU: #upload |
| | | try: |
| | | for nv in sorted(newVTU): |
| | | with open(nv,'r') as f: |
| | | fc=f.read() |
| | | s=s+1 |
| | | print('[{}] Uploading {}.'.format(self.id,nv)) |
| | | upload(self.conn_address, cid, rid, fc, s) |
| | | os.unlink(nv) |
| | | except: |
| | | print("[{}] Could not upload".format(self.id)) |
| | | self.p.terminate() |
| | | removeDir(self.workingdir.fullpath()) |
| | | self.p=None |
| | | self.workingdir=None |
| | | break |
| | | else: |
| | | print("[{}] VTU uploaded.".format(self.id)) |
| | | else: #ping |
| | | try: |
| | | ping_run(self.conn_address, cid, rid) |
| | | except: |
| | | print("[{}] Could not ping.".format(self.id)) |
| | | self.p.terminate() |
| | | self.p=None |
| | | removeDir(self.workingdir.fullpath()) |
| | | self.workingdir=None |
| | | #stop simulations |
| | | break |
| | | #check if trisurf is still running. If not break the innermost loop. |
| | | sleep(1) |
| | | if(self.p.poll() is not None): # trisurf exited! |
| | | print("[{}] Trisurf was stopped with return code {}".format(self.id, self.p.returncode)) |
| | | if(self.p.returncode>0): |
| | | try: |
| | | send_error_report(self.conn_address, cid, rid, self.p.returncode) |
| | | except: |
| | | print("[{}] Server didn't accept error report".format(self.id)) |
| | | removeDir(self.workingdir.fullpath()) |
| | | self.workingdir=None |
| | | self.p=None |
| | | break |
| | | self.sleep(self.update_seconds-1) |
| | | |
| | | |
| | | |
| | | #Stolen from trisurf library... Therefore, this client is not dependent on the installation of the library. |
| | | class Directory: |
| | | ''' |
| | | Class deals with the paths where the simulation is run and data is stored. |
| | | ''' |
| | | def __init__(self, maindir=".", simdir=""): |
| | | '''Initialization Directory() takes two optional parameters, namely maindir and simdir. Defaults to current directory. It sets local variables maindir and simdir accordingly.''' |
| | | self.maindir=maindir |
| | | self.simdir=simdir |
| | | return |
| | | |
| | | def fullpath(self): |
| | | ''' |
| | | Method returns string of path where the data is stored. It combines values of maindir and simdir as maindir/simdir on Unix. |
| | | ''' |
| | | return os.path.join(self.maindir,self.simdir) |
| | | |
| | | def exists(self): |
| | | ''' Method checks whether the directory specified by fullpath() exists. It return True/False on completion.''' |
| | | path=self.fullpath() |
| | | if(os.path.exists(path)): |
| | | return True |
| | | else: |
| | | return False |
| | | |
| | | def make(self): |
| | | ''' Method make() creates directory. If it fails it exits the program with error message.''' |
| | | try: |
| | | os.makedirs(self.fullpath()) |
| | | except: |
| | | print("Cannot make directory "+self.fullpath()+"\n") |
| | | exit(1) |
| | | return |
| | | |
| | | def makeifnotexist(self): |
| | | '''Method makeifnotexist() creates directory if it does not exist.''' |
| | | if(self.exists()==0): |
| | | self.make() |
| | | return True |
| | | else: |
| | | return False |
| | | |
| | | def remove(self): |
| | | '''Method remove() removes directory recursively. WARNING! No questions asked.''' |
| | | if(self.exists()): |
| | | try: |
| | | os.rmdir(self.fullpath()) |
| | | except: |
| | | print("Cannot remove directory "+self.fullpath()+ "\n") |
| | | exit(1) |
| | | return |
| | | |
| | | def goto(self): |
| | | ''' |
| | | Method goto() moves current directory to the one specified by fullpath(). WARNING: when using the relative paths, do not call this function multiple times. |
| | | ''' |
| | | try: |
| | | os.chdir(self.fullpath()) |
| | | except: |
| | | print("Cannot go to directory "+self.fullpath()+"\n") |
| | | return |
| | | |
| | | |
| | | |
| | | #--- SIGINT and SIGTERM HANDLING --- |
| | | def signal_handler(signal,frame): |
| | | t.stop() |
| | | t.join() |
| | | print("Process ended with signal " +str(signal)) |
| | | sys.exit(signal) |
| | | #--- END SIGINT and SIGTERM---- |
| | | |
| | | if __name__ == '__main__': |
| | | |
| | | signal.signal(signal.SIGINT, signal_handler) |
| | | signal.signal(signal.SIGTERM, signal_handler) |
| | | |
| | | t=ClientThread(update_seconds=100) |
| | | t.start() |
| | | #t.join() |
| | | #print("main") |
| | | while(True): |
| | | sleep(1000) |