Samo Penic
2018-06-25 c0d5bcca06dcb6b7e3b570b739a5685d59f4ab04
Merge branch 'master' of ssh://git.penic.eu:29418/trisurf-client
1 files modified
1 files deleted
1 files added
648 ■■■■■ changed files
devel.py 23 ●●●●● patch | view | raw | blame | history
playground/tsclient.py 320 ●●●●● patch | view | raw | blame | history
tsclient.py 305 ●●●● patch | view | raw | blame | history
devel.py
New file
@@ -0,0 +1,23 @@
#!/usr/bin/python3
from tsclient import *
import signal
#--- SIGINT and SIGTERM HANDLING ---
def signal_handler(signal,frame):
    t.stop()
    t.join()
    print("Process ended with signal " +str(signal))
    sys.exit(signal)
#--- END SIGINT and SIGTERM----
signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGTERM, signal_handler)
t=ClientThread(conn_address='http://localhost:8000',update_seconds=100)
t.start()
#t.join()
#print("main")
while(True):
    sleep(1000)
playground/tsclient.py
File was deleted
tsclient.py
@@ -4,34 +4,23 @@
from time import sleep
import uuid
import subprocess
from trisurf import trisurf
import os
import shutil
import signal
import sys
import socket
CONNECT_ADDR='http://localhost:8000'
from threading import Thread, Event
import re
p=None
workingdir=None
#--- SIGINT and SIGTERM HANDLING ---
def signal_handler(signal,frame):
    global p
    global wirkingdir
    if p is not None:
        p.terminate()
    if(workingdir is not None):
        removeDir(workingdir.fullpath())
    print("Process ended with signal " +str(signal))
    sys.exit(0)
signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGTERM, signal_handler)
#--- END SIGINT and SIGTERM----
def getTrisurfVersion():
    p = subprocess.Popen('trisurf --version', shell=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
    lines=p.stdout.readlines()
    version=re.findall(r'[0-9a-f]{7}(?:-dirty)?', lines[0].decode('ascii'))
    p.wait()
    if(len(version)):
        return version[0]
    else:
        return "unknown version"
def get_hostname():
    return socket.gethostname()
@@ -39,8 +28,8 @@
def get_ip():
    return ((([ip for ip in socket.gethostbyname_ex(socket.gethostname())[2] if not ip.startswith("127.")] or [[(s.connect(("8.8.8.8", 53)), s.getsockname()[0], s.close()) for s in [socket.socket(socket.AF_INET, socket.SOCK_DGRAM)]][0][1]]) + ["no IP found"])[0])
def get_client_id(addr, my_ip, my_hostname):
    client_auth={'ip':my_ip,'hostname':my_hostname}
def get_client_id(addr, my_ip, my_hostname, subrun):
    client_auth={'ip':my_ip,'hostname':my_hostname, 'subrun':subrun, 'trisurf_version':getTrisurfVersion() }
    response=requests.post(addr+"/api/register/", data=client_auth)
    if(response.status_code==200):
        client_data=json.loads(response.text)
@@ -61,7 +50,10 @@
        return (rid,tape,vtu,status)
    else:
        print(response.text)
        if(response.status_code==400):
        raise ValueError
        else:
            raise NameError
def ping_run(addr,cid, rid):
@@ -71,6 +63,18 @@
        return
    else:
        raise ValueError
def client_ping(addr,cid):
    client_data={'client_id':cid}
    response=requests.post(addr+"/api/pingclient/", data=client_data)
    if(response.status_code==200):
        client_data=json.loads(response.text)
        return client_data['concurrent_runs']
    else:
        raise ValueError
def send_error_report(addr,cid, rid,errcode):
    client_data={'client_id':cid, 'run_id':rid, 'error_code':errcode}
@@ -105,80 +109,253 @@
    return
while(True):
class ClientThread(Thread):
    def __init__(self,conn_address='http://beti.trisurf.eu',subid=0, update_seconds=100):
        super(ClientThread,self).__init__()
        self._stop_event = Event()
        self._stop_event.clear()
        self.p=None
        self.workingdir=None
        self.conn_address=conn_address
        self.id=subid
        self.ip=get_ip()
        self.hostname=get_hostname()
        self.update_seconds=update_seconds
        self.max_client_ping_time_elapsed=250
        self.subruns=[]
    def stop(self):
        self._stop_event.set()
    def isStopped(self):
        return self._stop_event.is_set()
    def join(self):
        print('joining threads')
        super(ClientThread, self).join()
        for sub in self.subruns:
            sub.stop()
            sub.join()
        if self.p is not None:
            self.p.terminate()
        if self.workingdir is not None:
            removeDir(self.workingdir.fullpath())
    def sleep(self,s):
        for i in range(0, s):
            if(self.isStopped()):
                return False
            sleep(1)
        return True
    def subrunsStartStop(self,nr):
        while(self.id==0 and nr>len(self.subruns)+1):
            #spawning a new worker:
            print("[{}] Spawning a new worker".format(self.id))
            t=ClientThread(conn_address=self.conn_address, subid=len(self.subruns)+1,update_seconds=self.update_seconds)
            t.start()
            self.subruns.append(t)
        while(self.id==0 and nr<len(self.subruns)+1):
            print("[{}] Stopping a worker".format(self.id))
            self.subruns[-1].stop()
            self.subruns[-1].join()
            del self.subruns[-1]
    def run(self):
        while(not self.isStopped()): #try to register
    try:
        cid=get_client_id(CONNECT_ADDR, get_ip(),get_hostname())
                cid=get_client_id(self.conn_address, self.ip, self.hostname, self.id)
    except:
        print("Cannot get CID.")
        sleep(10)
                print("[{}] Could not get CID.".format(self.id))
                self.sleep(10)
        continue
    print("Got CID. getting RID.")
    while(True):
            #print("Got CID. getting RID.")
            client_ping_time_elapsed=0
            concurrent_runs=client_ping(self.conn_address,cid)
            self.subrunsStartStop(concurrent_runs)
            while(not self.isStopped()): #successfully registered, now start pinging and searching for job
        try:
            (rid,tape,vtu,status)=get_run(CONNECT_ADDR,cid)
                    (rid,tape,vtu,status)=get_run(self.conn_address,cid)
                except NameError:
                    print("[{}] Could not get RID.".format(self.id))
                    self.sleep(10)
                    client_ping_time_elapsed+=10
                    if(client_ping_time_elapsed>=self.max_client_ping_time_elapsed):
                        try:
                            concurrent_runs=client_ping(self.conn_address,cid)
                            self.subrunsStartStop(concurrent_runs)
        except:
            print("Could not get RID.")
            sleep(10)
                            break
                        client_ping_time_elapsed=0
                    #if you put break instead of continue, there is no need to ping client. And it is more robust...
                    continue
                except ValueError:
                    print("[{}] Wrong CID? Getting new CID.".format(self.id))
                    #self.sleep(10)
                    break
                except:
                    print("[{}] Cannot connect. Server down? Retrying....".format(self.id))
            break
        else:
            #start separate thread with simulations.
            workingdir=trisurf.Directory('/tmp/ts_'+str(uuid.uuid4()))
            workingdir.makeifnotexist()
            workingdir.goto()
            with open(workingdir.fullpath()+"/tape", 'w') as f:
                    self.workingdir=Directory('/tmp/ts_'+str(uuid.uuid4()))
                    self.workingdir.makeifnotexist()
                    self.workingdir.goto()
                    #print("[{}] Using directory {}".format(self.id, self.workingdir.fullpath()))
                    with open(self.workingdir.fullpath()+"/tape", 'w') as f:
                f.write(tape)
            if(int(status)==-1):
                cmd=['trisurf', '--force-from-tape']
                print("Run id="+str(rid)+ " :: Starting from tape")
                        print("[{}] Run id={} :: Starting from tape.".format(self.id, rid))
            else:
                with open(workingdir.fullpath()+"/.status",'w') as f:
                        with open(self.workingdir.fullpath()+"/.status",'w') as f:
                    f.write(status)
                with open(workingdir.fullpath()+"/initial.vtu",'w') as f:
                        with open(self.workingdir.fullpath()+"/initial.vtu",'w') as f:
                    f.write(vtu)
                cmd=['trisurf', '--restore-from-vtk', 'initial.vtu']
                print("Run id="+str(rid)+ " :: Restoring from vtk, last timestep "+status)
            p=subprocess.Popen(cmd, stdout=subprocess.DEVNULL)
                        print("[{}] Run id={} :: Restoring from vtk, last timestep {}".format(self.id,rid,status))
                    self.p=subprocess.Popen(cmd, stdout=subprocess.DEVNULL)
            s=int(status)
            while(True):
                    while(not self.isStopped()):
                #monitor for new file. If file is present, upload it!
                newVTU=getNewVTU(workingdir.fullpath())
                        newVTU=getNewVTU(self.workingdir.fullpath())
                if newVTU: #upload
                    try:
                        for nv in sorted(newVTU):
                                for nvfile in sorted(newVTU):
                                    nv=os.path.join(self.workingdir.fullpath(),nvfile)
                            with open(nv,'r') as f:
                                fc=f.read()
                            s=s+1
                            print('Uploading '+nv)
                            upload(CONNECT_ADDR, cid, rid, fc, s)
                                    print('[{}] Uploading {}.'.format(self.id,nvfile))
                                    upload(self.conn_address, cid, rid, fc, s)
                            os.unlink(nv)
                    except:
                        print("Could not upload")
                        p.terminate()
                        removeDir(workingdir.fullpath())
                            except Exception as e:
                                print("[{}] Could not upload".format(self.id))
                                print(e)
                                self.p.terminate()
                                removeDir(self.workingdir.fullpath())
                                self.p=None
                                self.workingdir=None
                        break
                    else:
                        print("VTU uploaded")
                                print("[{}] VTU uploaded.".format(self.id))
                else: #ping
                    try:
                        ping_run(CONNECT_ADDR, cid, rid)
                                ping_run(self.conn_address, cid, rid)
                    except:
                        print("Could not ping")
                        p.terminate()
                        removeDir(workingdir.fullpath())
                                print("[{}] Could not ping.".format(self.id))
                                self.p.terminate()
                                self.p=None
                                removeDir(self.workingdir.fullpath())
                                self.workingdir=None
                        #stop simulations
                        break
                #check if trisurf is still running. If not break the highest level loop.
                        #check if trisurf is still running. If not break the innermost loop.
                sleep(1)
                if(p.poll() is not None): # trisurf exited!
                    print("Trisurf was stopped with return code {}".format(p.returncode))
                    if(p.returncode>0):
                        if(self.p.poll() is not None): # trisurf exited!
                            print("[{}] Trisurf was stopped with return code {}".format(self.id, self.p.returncode))
                            if(self.p.returncode>0):
                        try:
                            send_error_report(CONNECT_ADDR, cid, rid, p.returncode)
                                    send_error_report(self.conn_address, cid, rid, self.p.returncode)
                        except:
                            print("Server didn't accept error report")
                    removeDir(workingdir.fullpath())
                                    print("[{}] Server didn't accept error report".format(self.id))
                            removeDir(self.workingdir.fullpath())
                            self.workingdir=None
                            self.p=None
                    break
                sleep(100)
                        self.sleep(self.update_seconds-1)
                        client_ping_time_elapsed+=self.update_seconds
                        if(client_ping_time_elapsed>self.max_client_ping_time_elapsed-self.update_seconds/2):
                            concurrent_runs=client_ping(self.conn_address,cid)
                            self.subrunsStartStop(concurrent_runs)
                            client_ping_time_elapsed=0
                
#Stolen from trisurf library... Therefore, this client is not dependent on the installation of the library.
class Directory:
    '''
    Class deals with the paths where the simulation is run and data is stored.
    '''
    def __init__(self, maindir=".", simdir=""):
        '''Initialization Directory() takes two optional parameters, namely maindir and simdir. Defaults to current directory. It sets local variables maindir and simdir accordingly.'''
        self.maindir=maindir
        self.simdir=simdir
        return
    def fullpath(self):
        '''
        Method returns string of path where the data is stored. It combines values of maindir and simdir as maindir/simdir on Unix.
        '''
        return os.path.join(self.maindir,self.simdir)
    def exists(self):
        ''' Method checks whether the directory  specified by fullpath() exists. It return True/False on completion.'''
        path=self.fullpath()
        if(os.path.exists(path)):
            return True
        else:
            return False
    def make(self):
        ''' Method make() creates directory. If it fails it exits the program with error message.'''
        try:
            os.makedirs(self.fullpath())
        except:
            print("Cannot make directory "+self.fullpath()+"\n")
            exit(1)
        return
    def makeifnotexist(self):
        '''Method makeifnotexist() creates directory if it does not exist.'''
        if(self.exists()==0):
            self.make()
            return True
        else:
            return False
    def remove(self):
        '''Method remove() removes directory recursively. WARNING! No questions asked.'''
        if(self.exists()):
            try:
                os.rmdir(self.fullpath())
            except:
                print("Cannot remove directory "+self.fullpath()+ "\n")
                exit(1)
        return
    def goto(self):
        '''
        Method goto() moves current directory to the one specified by fullpath(). WARNING: when using the relative paths, do not call this function multiple times.
        '''
        try:
            os.chdir(self.fullpath())
        except:
            print("Cannot go to directory "+self.fullpath()+"\n")
        return
#--- SIGINT and SIGTERM HANDLING ---
def signal_handler(signal,frame):
    t.stop()
    t.join()
    print("Process ended with signal " +str(signal))
    sys.exit(signal)
#--- END SIGINT and SIGTERM----
if __name__ == '__main__':
    signal.signal(signal.SIGINT, signal_handler)
    signal.signal(signal.SIGTERM, signal_handler)
    t=ClientThread(update_seconds=100)
    t.start()
    #t.join()
    #print("main")
    while(True):
        sleep(1000)