#!/usr/bin/python3 import requests import json from time import sleep import uuid import subprocess import os import shutil import signal import sys import socket from threading import Thread, Event import re glob_ts_version='00000' def getTrisurfVersion(): p = subprocess.Popen('trisurf --version', shell=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT) lines=p.stdout.readlines() version=re.findall(r'[0-9a-f]{7}(?:-dirty)?', lines[0].decode('ascii')) p.wait() if(len(version)): return version[0] else: return "unknown version" def get_hostname(): return socket.gethostname() def get_ip(): return ((([ip for ip in socket.gethostbyname_ex(socket.gethostname())[2] if not ip.startswith("127.")] or [[(s.connect(("8.8.8.8", 53)), s.getsockname()[0], s.close()) for s in [socket.socket(socket.AF_INET, socket.SOCK_DGRAM)]][0][1]]) + ["no IP found"])[0]) def get_client_id(addr, my_ip, my_hostname, subrun): global glob_ts_version client_auth={'ip':my_ip,'hostname':my_hostname, 'subrun':subrun, 'trisurf_version':glob_ts_version } response=requests.post(addr+"/api/register/", data=client_auth) if(response.status_code==200): client_data=json.loads(response.text) client_id=client_data['id'] return client_id else: raise ValueError def get_run(addr,cid): response=requests.get(addr+"/api/getrun/"+str(cid)+"/") if(response.status_code==200): client_data=json.loads(response.text) rid=client_data['id'] tape=client_data['tape'] vtu=client_data['lastVTU'] status=client_data['status'] return (rid,tape,vtu,status) else: #print(response.text) if(response.status_code==400): raise ValueError else: raise NameError def ping_run(addr,cid, rid): client_data={'client_id':cid, 'run_id':rid} response=requests.post(addr+"/api/ping/", data=client_data) if(response.status_code==200): return else: raise ValueError def client_ping(addr,cid): client_data={'client_id':cid} response=requests.post(addr+"/api/pingclient/", data=client_data) if(response.status_code==200): client_data=json.loads(response.text) return client_data['concurrent_runs'] else: raise ValueError def send_error_report(addr,cid, rid,errcode): client_data={'client_id':cid, 'run_id':rid, 'error_code':errcode} response=requests.post(addr+"/api/reporterr/", data=client_data) if(response.status_code==200): return else: raise ValueError def upload(addr,cid, rid, vtu, status): client_data={'client_id': cid, 'run_id': rid, 'lastVTU': vtu, 'status': status} response=requests.post(addr+"/api/upload/", data=client_data) if(response.status_code==200): return else: raise ValueError def getNewVTU(directory): fset=set() for file in os.listdir(directory): if file.endswith(".vtu") and file.startswith("timestep_"): fset.add(file) return fset def removeDir(directory): os.chdir('/') try: shutil.rmtree(directory) except: print("Cannot remove directory "+directory+ "\n") return class ClientThread(Thread): def __init__(self,conn_address='http://beti.trisurf.eu',subid=0, update_seconds=100): super(ClientThread,self).__init__() self._stop_event = Event() self._stop_event.clear() self.p=None self.workingdir=None self.conn_address=conn_address self.id=subid self.ip=get_ip() self.hostname=get_hostname() self.update_seconds=update_seconds self.max_client_ping_time_elapsed=250 self.subruns=[] def stop(self): self._stop_event.set() def isStopped(self): return self._stop_event.is_set() def join(self): print('joining threads') super(ClientThread, self).join() for sub in self.subruns: sub.stop() sub.join() if self.p is not None: self.p.terminate() if self.workingdir is not None: removeDir(self.workingdir.fullpath()) def sleep(self,s): for i in range(0, s): if(self.isStopped()): return False sleep(1) return True def subrunsStartStop(self,nr): while(self.id==0 and nr>len(self.subruns)+1): #spawning a new worker: print("[{}] Spawning a new worker".format(self.id)) t=ClientThread(conn_address=self.conn_address, subid=len(self.subruns)+1,update_seconds=self.update_seconds) t.start() self.subruns.append(t) while(self.id==0 and nr=self.max_client_ping_time_elapsed): try: concurrent_runs=client_ping(self.conn_address,cid) self.subrunsStartStop(concurrent_runs) except: break client_ping_time_elapsed=0 #if you put break instead of continue, there is no need to ping client. And it is more robust... continue except ValueError: print("[{}] Wrong CID? Getting new CID.".format(self.id)) #self.sleep(10) break except: print("[{}] Cannot connect. Server down? Retrying....".format(self.id)) break else: #start separate thread with simulations. self.workingdir=Directory('/tmp/ts_'+str(uuid.uuid4())) self.workingdir.makeifnotexist() self.workingdir.goto() #print("[{}] Using directory {}".format(self.id, self.workingdir.fullpath())) with open(self.workingdir.fullpath()+"/tape", 'w') as f: f.write(tape) if(int(status)==-1): cmd=['trisurf', '--force-from-tape'] print("[{}] Run id={} :: Starting from tape.".format(self.id, rid)) else: with open(self.workingdir.fullpath()+"/.status",'w') as f: f.write(status) with open(self.workingdir.fullpath()+"/initial.vtu",'w') as f: f.write(vtu) cmd=['trisurf', '--restore-from-vtk', 'initial.vtu'] print("[{}] Run id={} :: Restoring from vtk, last timestep {}".format(self.id,rid,status)) self.p=subprocess.Popen(cmd, stdout=subprocess.DEVNULL) s=int(status) while(not self.isStopped()): #monitor for new file. If file is present, upload it! newVTU=getNewVTU(self.workingdir.fullpath()) if newVTU: #upload try: for nvfile in sorted(newVTU): nv=os.path.join(self.workingdir.fullpath(),nvfile) with open(nv,'r') as f: fc=f.read() s=s+1 print('[{}] Uploading {}.'.format(self.id,nvfile)) upload(self.conn_address, cid, rid, fc, s) os.unlink(nv) except Exception as e: print("[{}] Could not upload".format(self.id)) print(e) self.p.terminate() removeDir(self.workingdir.fullpath()) self.p=None self.workingdir=None break else: print("[{}] VTU uploaded.".format(self.id)) else: #ping try: ping_run(self.conn_address, cid, rid) except: print("[{}] Could not prolong a lease on the run.".format(self.id)) self.p.terminate() self.p=None removeDir(self.workingdir.fullpath()) self.workingdir=None #stop simulations break #check if trisurf is still running. If not break the innermost loop. sleep(1) if(self.p.poll() is not None): # trisurf exited! print("[{}] Trisurf was stopped with return code {}".format(self.id, self.p.returncode)) if(self.p.returncode>0): try: send_error_report(self.conn_address, cid, rid, self.p.returncode) except: print("[{}] Server didn't accept error report".format(self.id)) removeDir(self.workingdir.fullpath()) self.workingdir=None self.p=None break self.sleep(self.update_seconds-1) client_ping_time_elapsed+=self.update_seconds if(client_ping_time_elapsed>self.max_client_ping_time_elapsed-self.update_seconds/2): try: concurrent_runs=client_ping(self.conn_address,cid) except: print("[{}] Could not client ping.".format(self.id)) self.p.terminate() self.p=None removeDir(self.workingdir.fullpath()) self.workingdir=None break self.subrunsStartStop(concurrent_runs) client_ping_time_elapsed=0 #Stolen from trisurf library... Therefore, this client is not dependent on the installation of the library. class Directory: ''' Class deals with the paths where the simulation is run and data is stored. ''' def __init__(self, maindir=".", simdir=""): '''Initialization Directory() takes two optional parameters, namely maindir and simdir. Defaults to current directory. It sets local variables maindir and simdir accordingly.''' self.maindir=maindir self.simdir=simdir return def fullpath(self): ''' Method returns string of path where the data is stored. It combines values of maindir and simdir as maindir/simdir on Unix. ''' return os.path.join(self.maindir,self.simdir) def exists(self): ''' Method checks whether the directory specified by fullpath() exists. It return True/False on completion.''' path=self.fullpath() if(os.path.exists(path)): return True else: return False def make(self): ''' Method make() creates directory. If it fails it exits the program with error message.''' try: os.makedirs(self.fullpath()) except: print("Cannot make directory "+self.fullpath()+"\n") exit(1) return def makeifnotexist(self): '''Method makeifnotexist() creates directory if it does not exist.''' if(self.exists()==0): self.make() return True else: return False def remove(self): '''Method remove() removes directory recursively. WARNING! No questions asked.''' if(self.exists()): try: os.rmdir(self.fullpath()) except: print("Cannot remove directory "+self.fullpath()+ "\n") exit(1) return def goto(self): ''' Method goto() moves current directory to the one specified by fullpath(). WARNING: when using the relative paths, do not call this function multiple times. ''' try: os.chdir(self.fullpath()) except: print("Cannot go to directory "+self.fullpath()+"\n") return #--- SIGINT and SIGTERM HANDLING --- def signal_handler(signal,frame): t.stop() t.join() print("Process ended with signal " +str(signal)) sys.exit(signal) #--- END SIGINT and SIGTERM---- if __name__ == '__main__': #global glob_ts_version glob_ts_version=getTrisurfVersion() signal.signal(signal.SIGINT, signal_handler) signal.signal(signal.SIGTERM, signal_handler) t=ClientThread(update_seconds=100) t.start() #t.join() #print("main") while(True): sleep(1000)