Samo Penic
2018-05-05 4db06c65975abc8aa75f6b82367d564ad7727a12
Playground with threaded run
1 files added
1 files modified
314 ■■■■■ changed files
playground/tsclient.py 312 ●●●●● patch | view | raw | blame | history
tsclient.py 2 ●●● patch | view | raw | blame | history
playground/tsclient.py
New file
@@ -0,0 +1,312 @@
#!/usr/bin/python3
import requests
import json
from time import sleep
import uuid
import subprocess
#from trisurf import trisurf
import os
import shutil
import signal
import sys
import socket
from threading import Thread, Event
"""
p=None
workingdir=None
#--- SIGINT and SIGTERM HANDLING ---
def signal_handler(signal,frame):
    global p
    global wirkingdir
    if p is not None:
        p.terminate()
    if(workingdir is not None):
        removeDir(workingdir.fullpath())
    print("Process ended with signal " +str(signal))
    sys.exit(0)
signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGTERM, signal_handler)
#--- END SIGINT and SIGTERM----
"""
def get_hostname():
    return socket.gethostname()
def get_ip():
    return ((([ip for ip in socket.gethostbyname_ex(socket.gethostname())[2] if not ip.startswith("127.")] or [[(s.connect(("8.8.8.8", 53)), s.getsockname()[0], s.close()) for s in [socket.socket(socket.AF_INET, socket.SOCK_DGRAM)]][0][1]]) + ["no IP found"])[0])
def get_client_id(addr, my_ip, my_hostname):
    client_auth={'ip':my_ip,'hostname':my_hostname}
    response=requests.post(addr+"/api/register/", data=client_auth)
    if(response.status_code==200):
        client_data=json.loads(response.text)
        client_id=client_data['id']
        return client_id
    else:
        raise ValueError
def get_run(addr,cid):
    response=requests.get(addr+"/api/getrun/"+str(cid)+"/")
    if(response.status_code==200):
        client_data=json.loads(response.text)
        rid=client_data['id']
        tape=client_data['tape']
        vtu=client_data['lastVTU']
        status=client_data['status']
        return (rid,tape,vtu,status)
    else:
        print(response.text)
        raise ValueError
def ping_run(addr,cid, rid):
    client_data={'client_id':cid, 'run_id':rid}
    response=requests.post(addr+"/api/ping/", data=client_data)
    if(response.status_code==200):
        return
    else:
        raise ValueError
def send_error_report(addr,cid, rid,errcode):
    client_data={'client_id':cid, 'run_id':rid, 'error_code':errcode}
    response=requests.post(addr+"/api/reporterr/", data=client_data)
    if(response.status_code==200):
        return
    else:
        raise ValueError
def upload(addr,cid, rid, vtu, status):
    client_data={'client_id': cid, 'run_id': rid, 'lastVTU': vtu, 'status': status}
    response=requests.post(addr+"/api/upload/", data=client_data)
    if(response.status_code==200):
        return
    else:
        raise ValueError
def getNewVTU(directory):
    fset=set()
    for file in os.listdir(directory):
        if file.endswith(".vtu") and file.startswith("timestep_"):
            fset.add(file)
    return fset
def removeDir(directory):
    os.chdir('/')
    try:
        shutil.rmtree(directory)
    except:
        print("Cannot remove directory "+directory+ "\n")
    return
class ClientThread(Thread):
    def __init__(self,conn_address='http://localhost:8000',subid=0, update_seconds=100):
        super(ClientThread,self).__init__()
        self._stop_event = Event()
        self._stop_event.clear()
        self.p=None
        self.workingdir=None
        self.conn_address=conn_address
        self.id=subid
        self.ip=get_ip()
        self.hostname=get_hostname()
        self.update_seconds=update_seconds
    def stop(self):
        self._stop_event.set()
    def isStopped(self):
        return self._stop_event.is_set()
    def join(self):
        print('joining threads')
        super(ClientThread, self).join()
        if self.p is not None:
            self.p.terminate()
        if self.workingdir is not None:
            removeDir(self.workingdir.fullpath())
    def sleep(self,s):
        for i in range(0, s):
            if(self.isStopped()):
                return False
            sleep(1)
        return True
    def run(self):
        while(not self.isStopped()):
            try:
                cid=get_client_id(self.conn_address, self.ip, self.hostname)
            except:
                print("[{}] Could not get CID.".format(self.id))
                self.sleep(10)
                continue
            #print("Got CID. getting RID.")
            while(not self.isStopped()):
                try:
                    (rid,tape,vtu,status)=get_run(self.conn_address,cid)
                except:
                    print("[{}] Could not get RID.".format(self.id))
                    self.sleep(10)
                    break
                else:
                    #start separate thread with simulations.
                    self.workingdir=Directory('/tmp/ts_'+str(uuid.uuid4()))
                    self.workingdir.makeifnotexist()
                    self.workingdir.goto()
                    with open(self.workingdir.fullpath()+"/tape", 'w') as f:
                        f.write(tape)
                    if(int(status)==-1):
                        cmd=['trisurf', '--force-from-tape']
                        print("[{}] Run id={} :: Starting from tape.".format(self.id, rid))
                    else:
                        with open(self.workingdir.fullpath()+"/.status",'w') as f:
                            f.write(status)
                        with open(self.workingdir.fullpath()+"/initial.vtu",'w') as f:
                            f.write(vtu)
                        cmd=['trisurf', '--restore-from-vtk', 'initial.vtu']
                        print("[{}] Run id={} :: Restoring from vtk, last timestep {}".format(self.id,rid,status))
                    self.p=subprocess.Popen(cmd, stdout=subprocess.DEVNULL)
                    s=int(status)
                    while(not self.isStopped()):
                        #monitor for new file. If file is present, upload it!
                        newVTU=getNewVTU(self.workingdir.fullpath())
                        if newVTU: #upload
                            try:
                                for nv in sorted(newVTU):
                                    with open(nv,'r') as f:
                                        fc=f.read()
                                    s=s+1
                                    print('[{}] Uploading {}.'.format(self.id,nv))
                                    upload(self.conn_address, cid, rid, fc, s)
                                    os.unlink(nv)
                            except:
                                print("[{}] Could not upload".format(self.id))
                                self.p.terminate()
                                removeDir(self.workingdir.fullpath())
                                self.p=None
                                self.workingdir=None
                                break
                            else:
                                print("[{}] VTU uploaded.".format(self.id))
                        else: #ping
                            try:
                                ping_run(self.conn_address, cid, rid)
                            except:
                                print("[{}] Could not ping.".format(self.id))
                                self.p.terminate()
                                self.p=None
                                removeDir(self.workingdir.fullpath())
                                self.workingdir=None
                                #stop simulations
                                break
                        #check if trisurf is still running. If not break the innermost loop.
                        sleep(1)
                        if(self.p.poll() is not None): # trisurf exited!
                            print("[{}] Trisurf was stopped with return code {}".format(self.id, self.p.returncode))
                            if(self.p.returncode>0):
                                try:
                                    send_error_report(self.conn_address, cid, rid, self.p.returncode)
                                except:
                                    print("[{}] Server didn't accept error report".format(self.id))
                            removeDir(self.workingdir.fullpath())
                            self.workingdir=None
                            self.p=None
                            break
                        self.sleep(self.update_seconds-1)
#Stolen from trisurf library... Therefore, this client is not dependent on the installation of the library.
class Directory:
    '''
    Class deals with the paths where the simulation is run and data is stored.
    '''
    def __init__(self, maindir=".", simdir=""):
        '''Initialization Directory() takes two optional parameters, namely maindir and simdir. Defaults to current directory. It sets local variables maindir and simdir accordingly.'''
        self.maindir=maindir
        self.simdir=simdir
        return
    def fullpath(self):
        '''
        Method returns string of path where the data is stored. It combines values of maindir and simdir as maindir/simdir on Unix.
        '''
        return os.path.join(self.maindir,self.simdir)
    def exists(self):
        ''' Method checks whether the directory  specified by fullpath() exists. It return True/False on completion.'''
        path=self.fullpath()
        if(os.path.exists(path)):
            return True
        else:
            return False
    def make(self):
        ''' Method make() creates directory. If it fails it exits the program with error message.'''
        try:
            os.makedirs(self.fullpath())
        except:
            print("Cannot make directory "+self.fullpath()+"\n")
            exit(1)
        return
    def makeifnotexist(self):
        '''Method makeifnotexist() creates directory if it does not exist.'''
        if(self.exists()==0):
            self.make()
            return True
        else:
            return False
    def remove(self):
        '''Method remove() removes directory recursively. WARNING! No questions asked.'''
        if(self.exists()):
            try:
                os.rmdir(self.fullpath())
            except:
                print("Cannot remove directory "+self.fullpath()+ "\n")
                exit(1)
        return
    def goto(self):
        '''
        Method goto() moves current directory to the one specified by fullpath(). WARNING: when using the relative paths, do not call this function multiple times.
        '''
        try:
            os.chdir(self.fullpath())
        except:
            print("Cannot go to directory "+self.fullpath()+"\n")
        return
#--- SIGINT and SIGTERM HANDLING ---
def signal_handler(signal,frame):
    t.stop()
    t.join()
    print("Process ended with signal " +str(signal))
    sys.exit(0)
#--- END SIGINT and SIGTERM----
if __name__ == '__main__':
    signal.signal(signal.SIGINT, signal_handler)
    signal.signal(signal.SIGTERM, signal_handler)
    t=ClientThread(update_seconds=100)
    t.start()
    print("main")
    while(True):
        sleep(1000)
tsclient.py
@@ -171,7 +171,7 @@
                sleep(1)
                if(p.poll() is not None): # trisurf exited!
                    print("Trisurf was stopped with return code {}".format(p.returncode))
                    if(p.returncode!=0):
                    if(p.returncode>0):
                        try:
                            send_error_report(CONNECT_ADDR, cid, rid, p.returncode)
                        except: