#!/usr/bin/python3 import requests import json from time import sleep import uuid import subprocess import os import shutil import signal import sys import socket from threading import Thread, Event def get_hostname(): return socket.gethostname() def get_ip(): return ((([ip for ip in socket.gethostbyname_ex(socket.gethostname())[2] if not ip.startswith("127.")] or [[(s.connect(("8.8.8.8", 53)), s.getsockname()[0], s.close()) for s in [socket.socket(socket.AF_INET, socket.SOCK_DGRAM)]][0][1]]) + ["no IP found"])[0]) def get_client_id(addr, my_ip, my_hostname, subrun): client_auth={'ip':my_ip,'hostname':my_hostname, 'subrun':subrun} response=requests.post(addr+"/api/register/", data=client_auth) if(response.status_code==200): client_data=json.loads(response.text) client_id=client_data['id'] return client_id else: raise ValueError def get_run(addr,cid): response=requests.get(addr+"/api/getrun/"+str(cid)+"/") if(response.status_code==200): client_data=json.loads(response.text) rid=client_data['id'] tape=client_data['tape'] vtu=client_data['lastVTU'] status=client_data['status'] return (rid,tape,vtu,status) else: print(response.text) if(response.status_code==400): raise ValueError else: raise NameError def ping_run(addr,cid, rid): client_data={'client_id':cid, 'run_id':rid} response=requests.post(addr+"/api/ping/", data=client_data) if(response.status_code==200): return else: raise ValueError def client_ping(addr,cid): client_data={'client_id':cid} response=requests.post(addr+"/api/pingclient/", data=client_data) if(response.status_code==200): client_data=json.loads(response.text) return client_data['concurrent_runs'] else: raise ValueError def send_error_report(addr,cid, rid,errcode): client_data={'client_id':cid, 'run_id':rid, 'error_code':errcode} response=requests.post(addr+"/api/reporterr/", data=client_data) if(response.status_code==200): return else: raise ValueError def upload(addr,cid, rid, vtu, status): client_data={'client_id': cid, 'run_id': rid, 'lastVTU': vtu, 'status': status} response=requests.post(addr+"/api/upload/", data=client_data) if(response.status_code==200): return else: raise ValueError def getNewVTU(directory): fset=set() for file in os.listdir(directory): if file.endswith(".vtu") and file.startswith("timestep_"): fset.add(file) return fset def removeDir(directory): os.chdir('/') try: shutil.rmtree(directory) except: print("Cannot remove directory "+directory+ "\n") return class ClientThread(Thread): def __init__(self,conn_address='http://beti.trisurf.eu',subid=0, update_seconds=100): super(ClientThread,self).__init__() self._stop_event = Event() self._stop_event.clear() self.p=None self.workingdir=None self.conn_address=conn_address self.id=subid self.ip=get_ip() self.hostname=get_hostname() self.update_seconds=update_seconds self.max_client_ping_time_elapsed=250 self.subruns=[] def stop(self): self._stop_event.set() def isStopped(self): return self._stop_event.is_set() def join(self): print('joining threads') super(ClientThread, self).join() for sub in self.subruns: sub.stop() sub.join() if self.p is not None: self.p.terminate() if self.workingdir is not None: removeDir(self.workingdir.fullpath()) def sleep(self,s): for i in range(0, s): if(self.isStopped()): return False sleep(1) return True def subrunsStartStop(self,nr): while(self.id==0 and nr>len(self.subruns)+1): #spawning a new worker: print("[{}] Spawning a new worker".format(self.id)) t=ClientThread(conn_address=self.conn_address, subid=len(self.subruns)+1,update_seconds=self.update_seconds) t.start() self.subruns.append(t) while(self.id==0 and nr=self.max_client_ping_time_elapsed): try: concurrent_runs=client_ping(self.conn_address,cid) self.subrunsStartStop(concurrent_runs) except: break client_ping_time_elapsed=0 #if you put break instead of continue, there is no need to ping client. And it is more robust... continue except ValueError: print("[{}] Wrong CID? Getting new CID.".format(self.id)) #self.sleep(10) break except: print("[{}] Cannot connect. Server down? Retrying....".format(self.id)) break else: #start separate thread with simulations. self.workingdir=Directory('/tmp/ts_'+str(uuid.uuid4())) self.workingdir.makeifnotexist() self.workingdir.goto() #print("[{}] Using directory {}".format(self.id, self.workingdir.fullpath())) with open(self.workingdir.fullpath()+"/tape", 'w') as f: f.write(tape) if(int(status)==-1): cmd=['trisurf', '--force-from-tape'] print("[{}] Run id={} :: Starting from tape.".format(self.id, rid)) else: with open(self.workingdir.fullpath()+"/.status",'w') as f: f.write(status) with open(self.workingdir.fullpath()+"/initial.vtu",'w') as f: f.write(vtu) cmd=['trisurf', '--restore-from-vtk', 'initial.vtu'] print("[{}] Run id={} :: Restoring from vtk, last timestep {}".format(self.id,rid,status)) self.p=subprocess.Popen(cmd, stdout=subprocess.DEVNULL) s=int(status) while(not self.isStopped()): #monitor for new file. If file is present, upload it! newVTU=getNewVTU(self.workingdir.fullpath()) if newVTU: #upload try: for nvfile in sorted(newVTU): nv=os.path.join(self.workingdir.fullpath(),nvfile) with open(nv,'r') as f: fc=f.read() s=s+1 print('[{}] Uploading {}.'.format(self.id,nvfile)) upload(self.conn_address, cid, rid, fc, s) os.unlink(nv) except Exception as e: print("[{}] Could not upload".format(self.id)) print(e) self.p.terminate() removeDir(self.workingdir.fullpath()) self.p=None self.workingdir=None break else: print("[{}] VTU uploaded.".format(self.id)) else: #ping try: ping_run(self.conn_address, cid, rid) except: print("[{}] Could not ping.".format(self.id)) self.p.terminate() self.p=None removeDir(self.workingdir.fullpath()) self.workingdir=None #stop simulations break #check if trisurf is still running. If not break the innermost loop. sleep(1) if(self.p.poll() is not None): # trisurf exited! print("[{}] Trisurf was stopped with return code {}".format(self.id, self.p.returncode)) if(self.p.returncode>0): try: send_error_report(self.conn_address, cid, rid, self.p.returncode) except: print("[{}] Server didn't accept error report".format(self.id)) removeDir(self.workingdir.fullpath()) self.workingdir=None self.p=None break self.sleep(self.update_seconds-1) client_ping_time_elapsed+=self.update_seconds if(client_ping_time_elapsed>self.max_client_ping_time_elapsed-self.update_seconds/2): concurrent_runs=client_ping(self.conn_address,cid) self.subrunsStartStop(concurrent_runs) client_ping_time_elapsed=0 #Stolen from trisurf library... Therefore, this client is not dependent on the installation of the library. class Directory: ''' Class deals with the paths where the simulation is run and data is stored. ''' def __init__(self, maindir=".", simdir=""): '''Initialization Directory() takes two optional parameters, namely maindir and simdir. Defaults to current directory. It sets local variables maindir and simdir accordingly.''' self.maindir=maindir self.simdir=simdir return def fullpath(self): ''' Method returns string of path where the data is stored. It combines values of maindir and simdir as maindir/simdir on Unix. ''' return os.path.join(self.maindir,self.simdir) def exists(self): ''' Method checks whether the directory specified by fullpath() exists. It return True/False on completion.''' path=self.fullpath() if(os.path.exists(path)): return True else: return False def make(self): ''' Method make() creates directory. If it fails it exits the program with error message.''' try: os.makedirs(self.fullpath()) except: print("Cannot make directory "+self.fullpath()+"\n") exit(1) return def makeifnotexist(self): '''Method makeifnotexist() creates directory if it does not exist.''' if(self.exists()==0): self.make() return True else: return False def remove(self): '''Method remove() removes directory recursively. WARNING! No questions asked.''' if(self.exists()): try: os.rmdir(self.fullpath()) except: print("Cannot remove directory "+self.fullpath()+ "\n") exit(1) return def goto(self): ''' Method goto() moves current directory to the one specified by fullpath(). WARNING: when using the relative paths, do not call this function multiple times. ''' try: os.chdir(self.fullpath()) except: print("Cannot go to directory "+self.fullpath()+"\n") return #--- SIGINT and SIGTERM HANDLING --- def signal_handler(signal,frame): t.stop() t.join() print("Process ended with signal " +str(signal)) sys.exit(signal) #--- END SIGINT and SIGTERM---- if __name__ == '__main__': signal.signal(signal.SIGINT, signal_handler) signal.signal(signal.SIGTERM, signal_handler) t=ClientThread(update_seconds=100) t.start() #t.join() #print("main") while(True): sleep(1000)