#!/usr/bin/python3
|
import requests
|
import json
|
from time import sleep
|
import uuid
|
import subprocess
|
import os
|
import shutil
|
import signal
|
import sys
|
import socket
|
from threading import Thread, Event
|
|
def get_hostname():
|
return socket.gethostname()
|
|
def get_ip():
|
return ((([ip for ip in socket.gethostbyname_ex(socket.gethostname())[2] if not ip.startswith("127.")] or [[(s.connect(("8.8.8.8", 53)), s.getsockname()[0], s.close()) for s in [socket.socket(socket.AF_INET, socket.SOCK_DGRAM)]][0][1]]) + ["no IP found"])[0])
|
|
def get_client_id(addr, my_ip, my_hostname, subrun):
|
client_auth={'ip':my_ip,'hostname':my_hostname, 'subrun':subrun}
|
response=requests.post(addr+"/api/register/", data=client_auth)
|
if(response.status_code==200):
|
client_data=json.loads(response.text)
|
client_id=client_data['id']
|
return client_id
|
else:
|
raise ValueError
|
|
|
def get_run(addr,cid):
|
response=requests.get(addr+"/api/getrun/"+str(cid)+"/")
|
if(response.status_code==200):
|
client_data=json.loads(response.text)
|
rid=client_data['id']
|
tape=client_data['tape']
|
vtu=client_data['lastVTU']
|
status=client_data['status']
|
return (rid,tape,vtu,status)
|
else:
|
print(response.text)
|
if(response.status_code==400):
|
raise ValueError
|
else:
|
raise NameError
|
|
|
def ping_run(addr,cid, rid):
|
client_data={'client_id':cid, 'run_id':rid}
|
response=requests.post(addr+"/api/ping/", data=client_data)
|
if(response.status_code==200):
|
return
|
else:
|
raise ValueError
|
|
def client_ping(addr,cid):
|
client_data={'client_id':cid}
|
response=requests.post(addr+"/api/pingclient/", data=client_data)
|
if(response.status_code==200):
|
client_data=json.loads(response.text)
|
|
return
|
else:
|
raise ValueError
|
|
|
|
def send_error_report(addr,cid, rid,errcode):
|
client_data={'client_id':cid, 'run_id':rid, 'error_code':errcode}
|
response=requests.post(addr+"/api/reporterr/", data=client_data)
|
if(response.status_code==200):
|
return
|
else:
|
raise ValueError
|
|
def upload(addr,cid, rid, vtu, status):
|
client_data={'client_id': cid, 'run_id': rid, 'lastVTU': vtu, 'status': status}
|
response=requests.post(addr+"/api/upload/", data=client_data)
|
if(response.status_code==200):
|
return
|
else:
|
raise ValueError
|
|
def getNewVTU(directory):
|
fset=set()
|
for file in os.listdir(directory):
|
if file.endswith(".vtu") and file.startswith("timestep_"):
|
fset.add(file)
|
return fset
|
|
|
def removeDir(directory):
|
os.chdir('/')
|
try:
|
shutil.rmtree(directory)
|
except:
|
print("Cannot remove directory "+directory+ "\n")
|
return
|
|
|
|
|
class ClientThread(Thread):
|
|
def __init__(self,conn_address='http://beti.trisurf.eu',subid=0, update_seconds=100):
|
super(ClientThread,self).__init__()
|
self._stop_event = Event()
|
self._stop_event.clear()
|
self.p=None
|
self.workingdir=None
|
self.conn_address=conn_address
|
self.id=subid
|
self.ip=get_ip()
|
self.hostname=get_hostname()
|
self.update_seconds=update_seconds
|
self.max_client_ping_time_elapsed=250
|
|
def stop(self):
|
self._stop_event.set()
|
|
def isStopped(self):
|
return self._stop_event.is_set()
|
|
def join(self):
|
print('joining threads')
|
super(ClientThread, self).join()
|
if self.p is not None:
|
self.p.terminate()
|
if self.workingdir is not None:
|
removeDir(self.workingdir.fullpath())
|
|
|
def sleep(self,s):
|
for i in range(0, s):
|
if(self.isStopped()):
|
return False
|
sleep(1)
|
return True
|
|
def run(self):
|
while(not self.isStopped()): #try to register
|
try:
|
cid=get_client_id(self.conn_address, self.ip, self.hostname, self.id)
|
except:
|
print("[{}] Could not get CID.".format(self.id))
|
self.sleep(10)
|
continue
|
#print("Got CID. getting RID.")
|
client_ping_time_elapsed=0
|
while(not self.isStopped()): #successfully registered, now start pinging and searching for job
|
try:
|
(rid,tape,vtu,status)=get_run(self.conn_address,cid)
|
except NameError:
|
print("[{}] Could not get RID.".format(self.id))
|
self.sleep(10)
|
client_ping_time_elapsed+=10
|
if(client_ping_time_elapsed>=self.max_client_ping_time_elapsed):
|
try:
|
client_ping(self.conn_address,cid)
|
except:
|
break
|
client_ping_time_elapsed=0
|
#if you put break instead of continue, there is no need to ping client. And it is more robust...
|
continue
|
except ValueError:
|
print("[{}] Wrong CID? Getting new CID.".format(self.id))
|
#self.sleep(10)
|
break
|
except:
|
print("[{}] Cannot connect. Server down? Retrying....".format(self.id))
|
break
|
else:
|
#start separate thread with simulations.
|
self.workingdir=Directory('/tmp/ts_'+str(uuid.uuid4()))
|
self.workingdir.makeifnotexist()
|
self.workingdir.goto()
|
with open(self.workingdir.fullpath()+"/tape", 'w') as f:
|
f.write(tape)
|
if(int(status)==-1):
|
cmd=['trisurf', '--force-from-tape']
|
print("[{}] Run id={} :: Starting from tape.".format(self.id, rid))
|
else:
|
with open(self.workingdir.fullpath()+"/.status",'w') as f:
|
f.write(status)
|
with open(self.workingdir.fullpath()+"/initial.vtu",'w') as f:
|
f.write(vtu)
|
cmd=['trisurf', '--restore-from-vtk', 'initial.vtu']
|
print("[{}] Run id={} :: Restoring from vtk, last timestep {}".format(self.id,rid,status))
|
self.p=subprocess.Popen(cmd, stdout=subprocess.DEVNULL)
|
s=int(status)
|
while(not self.isStopped()):
|
#monitor for new file. If file is present, upload it!
|
newVTU=getNewVTU(self.workingdir.fullpath())
|
if newVTU: #upload
|
try:
|
for nv in sorted(newVTU):
|
with open(nv,'r') as f:
|
fc=f.read()
|
s=s+1
|
print('[{}] Uploading {}.'.format(self.id,nv))
|
upload(self.conn_address, cid, rid, fc, s)
|
os.unlink(nv)
|
except:
|
print("[{}] Could not upload".format(self.id))
|
self.p.terminate()
|
removeDir(self.workingdir.fullpath())
|
self.p=None
|
self.workingdir=None
|
break
|
else:
|
print("[{}] VTU uploaded.".format(self.id))
|
else: #ping
|
try:
|
ping_run(self.conn_address, cid, rid)
|
except:
|
print("[{}] Could not ping.".format(self.id))
|
self.p.terminate()
|
self.p=None
|
removeDir(self.workingdir.fullpath())
|
self.workingdir=None
|
#stop simulations
|
break
|
#check if trisurf is still running. If not break the innermost loop.
|
sleep(1)
|
if(self.p.poll() is not None): # trisurf exited!
|
print("[{}] Trisurf was stopped with return code {}".format(self.id, self.p.returncode))
|
if(self.p.returncode>0):
|
try:
|
send_error_report(self.conn_address, cid, rid, self.p.returncode)
|
except:
|
print("[{}] Server didn't accept error report".format(self.id))
|
removeDir(self.workingdir.fullpath())
|
self.workingdir=None
|
self.p=None
|
break
|
self.sleep(self.update_seconds-1)
|
client_ping_time_elapsed+=self.update_seconds
|
if(client_ping_time_elapsed>self.max_client_ping_time_elapsed-self.update_seconds/2):
|
client_ping(self.conn_address,cid)
|
client_ping_time_elapsed=0
|
|
|
|
#Stolen from trisurf library... Therefore, this client is not dependent on the installation of the library.
|
class Directory:
|
'''
|
Class deals with the paths where the simulation is run and data is stored.
|
'''
|
def __init__(self, maindir=".", simdir=""):
|
'''Initialization Directory() takes two optional parameters, namely maindir and simdir. Defaults to current directory. It sets local variables maindir and simdir accordingly.'''
|
self.maindir=maindir
|
self.simdir=simdir
|
return
|
|
def fullpath(self):
|
'''
|
Method returns string of path where the data is stored. It combines values of maindir and simdir as maindir/simdir on Unix.
|
'''
|
return os.path.join(self.maindir,self.simdir)
|
|
def exists(self):
|
''' Method checks whether the directory specified by fullpath() exists. It return True/False on completion.'''
|
path=self.fullpath()
|
if(os.path.exists(path)):
|
return True
|
else:
|
return False
|
|
def make(self):
|
''' Method make() creates directory. If it fails it exits the program with error message.'''
|
try:
|
os.makedirs(self.fullpath())
|
except:
|
print("Cannot make directory "+self.fullpath()+"\n")
|
exit(1)
|
return
|
|
def makeifnotexist(self):
|
'''Method makeifnotexist() creates directory if it does not exist.'''
|
if(self.exists()==0):
|
self.make()
|
return True
|
else:
|
return False
|
|
def remove(self):
|
'''Method remove() removes directory recursively. WARNING! No questions asked.'''
|
if(self.exists()):
|
try:
|
os.rmdir(self.fullpath())
|
except:
|
print("Cannot remove directory "+self.fullpath()+ "\n")
|
exit(1)
|
return
|
|
def goto(self):
|
'''
|
Method goto() moves current directory to the one specified by fullpath(). WARNING: when using the relative paths, do not call this function multiple times.
|
'''
|
try:
|
os.chdir(self.fullpath())
|
except:
|
print("Cannot go to directory "+self.fullpath()+"\n")
|
return
|
|
|
|
#--- SIGINT and SIGTERM HANDLING ---
|
def signal_handler(signal,frame):
|
t.stop()
|
t.join()
|
print("Process ended with signal " +str(signal))
|
sys.exit(signal)
|
#--- END SIGINT and SIGTERM----
|
|
if __name__ == '__main__':
|
|
signal.signal(signal.SIGINT, signal_handler)
|
signal.signal(signal.SIGTERM, signal_handler)
|
|
t=ClientThread(update_seconds=100)
|
t.start()
|
#t.join()
|
#print("main")
|
while(True):
|
sleep(1000)
|