Samo Penic
2018-06-22 4c9734b25313f93c6277189bcf792151370c7225
tsclient.py
@@ -4,34 +4,23 @@
from time import sleep
import uuid
import subprocess
from trisurf import trisurf
import os
import shutil
import signal
import sys
import socket
CONNECT_ADDR='http://localhost:8000'
from threading import Thread, Event
import re
p=None
workingdir=None
#--- SIGINT and SIGTERM HANDLING ---
def signal_handler(signal,frame):
   global p
   global wirkingdir
   if p is not None:
      p.terminate()
   if(workingdir is not None):
      removeDir(workingdir.fullpath())
   print("Process ended with signal " +str(signal))
   sys.exit(0)
signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGTERM, signal_handler)
#--- END SIGINT and SIGTERM----
def getTrisurfVersion():
   p = subprocess.Popen('trisurf --version', shell=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
   lines=p.stdout.readlines()
   version=re.findall(r'[0-9a-f]{7}(?:-dirty)?', lines[0].decode('ascii'))
   p.wait()
   if(len(version)):
      return version[0]
   else:
      return "unknown version"
def get_hostname():
   return socket.gethostname()
@@ -39,8 +28,8 @@
def get_ip():
   return ((([ip for ip in socket.gethostbyname_ex(socket.gethostname())[2] if not ip.startswith("127.")] or [[(s.connect(("8.8.8.8", 53)), s.getsockname()[0], s.close()) for s in [socket.socket(socket.AF_INET, socket.SOCK_DGRAM)]][0][1]]) + ["no IP found"])[0])
def get_client_id(addr, my_ip, my_hostname):
   client_auth={'ip':my_ip,'hostname':my_hostname}
def get_client_id(addr, my_ip, my_hostname, subrun):
   client_auth={'ip':my_ip,'hostname':my_hostname, 'subrun':subrun, 'trisurf_version':getTrisurfVersion() }
   response=requests.post(addr+"/api/register/", data=client_auth)
   if(response.status_code==200):
      client_data=json.loads(response.text)
@@ -60,7 +49,11 @@
      status=client_data['status']
      return (rid,tape,vtu,status)
   else:
      raise ValueError
      print(response.text)
      if(response.status_code==400):
         raise ValueError
      else:
         raise NameError
def ping_run(addr,cid, rid):
@@ -70,6 +63,18 @@
      return
   else:
      raise ValueError
def client_ping(addr,cid):
   client_data={'client_id':cid}
   response=requests.post(addr+"/api/pingclient/", data=client_data)
   if(response.status_code==200):
      client_data=json.loads(response.text)
      return client_data['concurrent_runs']
   else:
      raise ValueError
def send_error_report(addr,cid, rid,errcode):
   client_data={'client_id':cid, 'run_id':rid, 'error_code':errcode}
@@ -103,81 +108,254 @@
      print("Cannot remove directory "+directory+ "\n")
   return
while(True):
   try:
      cid=get_client_id(CONNECT_ADDR, get_ip(),get_hostname())
   except:
      print("Cannot get CID.")
      sleep(10)
      continue
   print("Got CID. getting RID.")
   while(True):
      try:
         (rid,tape,vtu,status)=get_run(CONNECT_ADDR,cid)
      except:
         print("Could not get RID.")
         sleep(10)
         break
      else:
         #start separate thread with simulations.
         workingdir=trisurf.Directory('/tmp/ts_'+str(uuid.uuid4()))
         workingdir.makeifnotexist()
         workingdir.goto()
         with open(workingdir.fullpath()+"/tape", 'w') as f:
            f.write(tape)
         if(int(status)==-1):
            cmd=['trisurf', '--force-from-tape']
            print("Run id="+str(rid)+ " :: Starting from tape")
         else:
            with open(workingdir.fullpath()+"/.status",'w') as f:
               f.write(status)
            with open(workingdir.fullpath()+"/initial.vtu",'w') as f:
               f.write(vtu)
            cmd=['trisurf', '--restore-from-vtk', 'initial.vtu']
            print("Run id="+str(rid)+ " :: Restoring from vtk, last timestep "+status)
         p=subprocess.Popen(cmd, stdout=subprocess.DEVNULL)
         s=int(status)
         while(True):
            #monitor for new file. If file is present, upload it!
            newVTU=getNewVTU(workingdir.fullpath())
            if newVTU: #upload
               try:
                  for nv in sorted(newVTU):
                     with open(nv,'r') as f:
                        fc=f.read()
                     s=s+1
                     print('Uploading '+nv)
                     upload(CONNECT_ADDR, cid, rid, fc, s)
                     os.unlink(nv)
               except:
                  print("Could not upload")
                  p.terminate()
                  removeDir(workingdir.fullpath())
                  break
               else:
                  print("VTU uploaded")
            else: #ping
               try:
                  ping_run(CONNECT_ADDR, cid, rid)
               except:
                  print("Could not ping")
                  p.terminate()
                  removeDir(workingdir.fullpath())
                  #stop simulations
                  break
            #check if trisurf is still running. If not break the highest level loop.
            sleep(1)
            if(p.poll() is not None): # trisurf exited!
               print("Trisurf was stopped with return code {}".format(p.returncode))
               if(p.returncode!=0):
class ClientThread(Thread):
   def __init__(self,conn_address='http://beti.trisurf.eu',subid=0, update_seconds=100):
      super(ClientThread,self).__init__()
      self._stop_event = Event()
      self._stop_event.clear()
      self.p=None
      self.workingdir=None
      self.conn_address=conn_address
      self.id=subid
      self.ip=get_ip()
      self.hostname=get_hostname()
      self.update_seconds=update_seconds
      self.max_client_ping_time_elapsed=250
      self.subruns=[]
   def stop(self):
      self._stop_event.set()
   def isStopped(self):
      return self._stop_event.is_set()
   def join(self):
      print('joining threads')
      super(ClientThread, self).join()
      for sub in self.subruns:
         sub.stop()
         sub.join()
      if self.p is not None:
         self.p.terminate()
      if self.workingdir is not None:
         removeDir(self.workingdir.fullpath())
   def sleep(self,s):
      for i in range(0, s):
         if(self.isStopped()):
            return False
         sleep(1)
      return True
   def subrunsStartStop(self,nr):
      while(self.id==0 and nr>len(self.subruns)+1):
         #spawning a new worker:
         print("[{}] Spawning a new worker".format(self.id))
         t=ClientThread(conn_address=self.conn_address, subid=len(self.subruns)+1,update_seconds=self.update_seconds)
         t.start()
         self.subruns.append(t)
      while(self.id==0 and nr<len(self.subruns)+1):
         print("[{}] Stopping a worker".format(self.id))
         self.subruns[-1].stop()
         self.subruns[-1].join()
         del self.subruns[-1]
   def run(self):
      while(not self.isStopped()): #try to register
         try:
            cid=get_client_id(self.conn_address, self.ip, self.hostname, self.id)
         except:
            print("[{}] Could not get CID.".format(self.id))
            self.sleep(10)
            continue
         #print("Got CID. getting RID.")
         client_ping_time_elapsed=0
         concurrent_runs=client_ping(self.conn_address,cid)
         self.subrunsStartStop(concurrent_runs)
         while(not self.isStopped()): #successfully registered, now start pinging and searching for job
            try:
               (rid,tape,vtu,status)=get_run(self.conn_address,cid)
            except NameError:
               print("[{}] Could not get RID.".format(self.id))
               self.sleep(10)
               client_ping_time_elapsed+=10
               if(client_ping_time_elapsed>=self.max_client_ping_time_elapsed):
                  try:
                     send_error_report(CONNECT_ADDR, cid, rid, p.returncode)
                     concurrent_runs=client_ping(self.conn_address,cid)
                     self.subrunsStartStop(concurrent_runs)
                  except:
                     print("Server didn't accept error report")
               removeDir(workingdir.fullpath())
                     break
                  client_ping_time_elapsed=0
               #if you put break instead of continue, there is no need to ping client. And it is more robust...
               continue
            except ValueError:
               print("[{}] Wrong CID? Getting new CID.".format(self.id))
               #self.sleep(10)
               break
            sleep(100)
            except:
               print("[{}] Cannot connect. Server down? Retrying....".format(self.id))
               break
            else:
               #start separate thread with simulations.
               self.workingdir=Directory('/tmp/ts_'+str(uuid.uuid4()))
               self.workingdir.makeifnotexist()
               self.workingdir.goto()
               #print("[{}] Using directory {}".format(self.id, self.workingdir.fullpath()))
               with open(self.workingdir.fullpath()+"/tape", 'w') as f:
                  f.write(tape)
               if(int(status)==-1):
                  cmd=['trisurf', '--force-from-tape']
                  print("[{}] Run id={} :: Starting from tape.".format(self.id, rid))
               else:
                  with open(self.workingdir.fullpath()+"/.status",'w') as f:
                     f.write(status)
                  with open(self.workingdir.fullpath()+"/initial.vtu",'w') as f:
                     f.write(vtu)
                  cmd=['trisurf', '--restore-from-vtk', 'initial.vtu']
                  print("[{}] Run id={} :: Restoring from vtk, last timestep {}".format(self.id,rid,status))
               self.p=subprocess.Popen(cmd, stdout=subprocess.DEVNULL)
               s=int(status)
               while(not self.isStopped()):
                  #monitor for new file. If file is present, upload it!
                  newVTU=getNewVTU(self.workingdir.fullpath())
                  if newVTU: #upload
                     try:
                        for nvfile in sorted(newVTU):
                           nv=os.path.join(self.workingdir.fullpath(),nvfile)
                           with open(nv,'r') as f:
                              fc=f.read()
                           s=s+1
                           print('[{}] Uploading {}.'.format(self.id,nvfile))
                           upload(self.conn_address, cid, rid, fc, s)
                           os.unlink(nv)
                     except Exception as e:
                        print("[{}] Could not upload".format(self.id))
                        print(e)
                        self.p.terminate()
                        removeDir(self.workingdir.fullpath())
                        self.p=None
                        self.workingdir=None
                        break
                     else:
                        print("[{}] VTU uploaded.".format(self.id))
                  else: #ping
                     try:
                        ping_run(self.conn_address, cid, rid)
                     except:
                        print("[{}] Could not ping.".format(self.id))
                        self.p.terminate()
                        self.p=None
                        removeDir(self.workingdir.fullpath())
                        self.workingdir=None
                        #stop simulations
                        break
                  #check if trisurf is still running. If not break the innermost loop.
                  sleep(1)
                  if(self.p.poll() is not None): # trisurf exited!
                     print("[{}] Trisurf was stopped with return code {}".format(self.id, self.p.returncode))
                     if(self.p.returncode>0):
                        try:
                           send_error_report(self.conn_address, cid, rid, self.p.returncode)
                        except:
                           print("[{}] Server didn't accept error report".format(self.id))
                     removeDir(self.workingdir.fullpath())
                     self.workingdir=None
                     self.p=None
                     break
                  self.sleep(self.update_seconds-1)
                  client_ping_time_elapsed+=self.update_seconds
                  if(client_ping_time_elapsed>self.max_client_ping_time_elapsed-self.update_seconds/2):
                     concurrent_runs=client_ping(self.conn_address,cid)
                     self.subrunsStartStop(concurrent_runs)
                     client_ping_time_elapsed=0
#Stolen from trisurf library... Therefore, this client is not dependent on the installation of the library.
class Directory:
   '''
   Class deals with the paths where the simulation is run and data is stored.
   '''
   def __init__(self, maindir=".", simdir=""):
      '''Initialization Directory() takes two optional parameters, namely maindir and simdir. Defaults to current directory. It sets local variables maindir and simdir accordingly.'''
      self.maindir=maindir
      self.simdir=simdir
      return
   def fullpath(self):
      '''
      Method returns string of path where the data is stored. It combines values of maindir and simdir as maindir/simdir on Unix.
      '''
      return os.path.join(self.maindir,self.simdir)
   def exists(self):
      ''' Method checks whether the directory  specified by fullpath() exists. It return True/False on completion.'''
      path=self.fullpath()
      if(os.path.exists(path)):
         return True
      else:
         return False
   def make(self):
      ''' Method make() creates directory. If it fails it exits the program with error message.'''
      try:
         os.makedirs(self.fullpath())
      except:
         print("Cannot make directory "+self.fullpath()+"\n")
         exit(1)
      return
   def makeifnotexist(self):
      '''Method makeifnotexist() creates directory if it does not exist.'''
      if(self.exists()==0):
         self.make()
         return True
      else:
         return False
   def remove(self):
      '''Method remove() removes directory recursively. WARNING! No questions asked.'''
      if(self.exists()):
         try:
            os.rmdir(self.fullpath())
         except:
            print("Cannot remove directory "+self.fullpath()+ "\n")
            exit(1)
      return
   def goto(self):
      '''
      Method goto() moves current directory to the one specified by fullpath(). WARNING: when using the relative paths, do not call this function multiple times.
      '''
      try:
         os.chdir(self.fullpath())
      except:
         print("Cannot go to directory "+self.fullpath()+"\n")
      return
#--- SIGINT and SIGTERM HANDLING ---
def signal_handler(signal,frame):
   t.stop()
   t.join()
   print("Process ended with signal " +str(signal))
   sys.exit(signal)
#--- END SIGINT and SIGTERM----
if __name__ == '__main__':
   signal.signal(signal.SIGINT, signal_handler)
   signal.signal(signal.SIGTERM, signal_handler)
   t=ClientThread(update_seconds=100)
   t.start()
   #t.join()
   #print("main")
   while(True):
      sleep(1000)