Samo Penic
2018-12-23 6bb7d2af3bc3c1f032c1c0c4060577a158ff5f8d
tsclient.py
@@ -10,6 +10,21 @@
import sys
import socket
from threading import Thread, Event
import re
glob_ts_version='00000'
def getTrisurfVersion():
   p = subprocess.Popen('trisurf --version', shell=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
   lines=p.stdout.readlines()
   version=re.findall(r'[0-9a-f]{7}(?:-dirty)?', lines[0].decode('ascii'))
   p.wait()
   if(len(version)):
      return version[0]
   else:
      return "unknown version"
def get_hostname():
   return socket.gethostname()
@@ -18,7 +33,8 @@
   return ((([ip for ip in socket.gethostbyname_ex(socket.gethostname())[2] if not ip.startswith("127.")] or [[(s.connect(("8.8.8.8", 53)), s.getsockname()[0], s.close()) for s in [socket.socket(socket.AF_INET, socket.SOCK_DGRAM)]][0][1]]) + ["no IP found"])[0])
def get_client_id(addr, my_ip, my_hostname, subrun):
   client_auth={'ip':my_ip,'hostname':my_hostname, 'subrun':subrun}
   global glob_ts_version
   client_auth={'ip':my_ip,'hostname':my_hostname, 'subrun':subrun, 'trisurf_version':glob_ts_version }
   response=requests.post(addr+"/api/register/", data=client_auth)
   if(response.status_code==200):
      client_data=json.loads(response.text)
@@ -38,7 +54,7 @@
      status=client_data['status']
      return (rid,tape,vtu,status)
   else:
      print(response.text)
      #print(response.text)
      if(response.status_code==400):
         raise ValueError
      else:
@@ -57,7 +73,9 @@
   client_data={'client_id':cid}
   response=requests.post(addr+"/api/pingclient/", data=client_data)
   if(response.status_code==200):
      return
      client_data=json.loads(response.text)
      return client_data['concurrent_runs']
   else:
      raise ValueError
@@ -99,7 +117,6 @@
class ClientThread(Thread):
   def __init__(self,conn_address='http://beti.trisurf.eu',subid=0, update_seconds=100):
      super(ClientThread,self).__init__()
      self._stop_event = Event()
@@ -111,7 +128,9 @@
      self.ip=get_ip()
      self.hostname=get_hostname()
      self.update_seconds=update_seconds
      self.max_client_ping_time_elapsed=250
      self.subruns=[]
   def stop(self):
      self._stop_event.set()
@@ -122,6 +141,9 @@
   def join(self):
      print('joining threads')
      super(ClientThread, self).join()
      for sub in self.subruns:
         sub.stop()
         sub.join()
      if self.p is not None:
         self.p.terminate()
      if self.workingdir is not None:
@@ -134,27 +156,47 @@
            return False
         sleep(1)
      return True
   def subrunsStartStop(self,nr):
      while(self.id==0 and nr>len(self.subruns)+1):
         #spawning a new worker:
         print("[{}] Spawning a new worker".format(self.id))
         t=ClientThread(conn_address=self.conn_address, subid=len(self.subruns)+1,update_seconds=self.update_seconds)
         t.start()
         self.subruns.append(t)
      while(self.id==0 and nr<len(self.subruns)+1):
         print("[{}] Stopping a worker".format(self.id))
         self.subruns[-1].stop()
         self.subruns[-1].join()
         del self.subruns[-1]
   
   def run(self):
      while(not self.isStopped()):
      while(not self.isStopped()): #try to register
         try:
            cid=get_client_id(self.conn_address, self.ip, self.hostname, self.id)
         except:
            print("[{}] Could not get CID.".format(self.id))
            self.sleep(10)
            continue
         #print("Got CID. getting RID.")
         client_ping_time_elapsed=0
         while(not self.isStopped()):
         print("[{}] Connected and got client ID {}.".format(self.id, cid))
         try:
            concurrent_runs=client_ping(self.conn_address,cid)
            client_ping_time_elapsed=0
         except:
            self.sleep(10)
            continue
         self.subrunsStartStop(concurrent_runs)
         while(not self.isStopped()): #successfully registered, now start pinging and searching for job
            try:
               (rid,tape,vtu,status)=get_run(self.conn_address,cid)
            except NameError:
               print("[{}] Could not get RID.".format(self.id))
               #print("[{}] Could not get RID.".format(self.id))
               self.sleep(10)
               client_ping_time_elapsed+=10
               if(client_ping_time_elapsed>=250):
               if(client_ping_time_elapsed>=self.max_client_ping_time_elapsed):
                  try:
                     client_ping(self.conn_address,cid)
                     concurrent_runs=client_ping(self.conn_address,cid)
                     self.subrunsStartStop(concurrent_runs)
                  except:
                     break
                  client_ping_time_elapsed=0
@@ -172,6 +214,7 @@
               self.workingdir=Directory('/tmp/ts_'+str(uuid.uuid4()))
               self.workingdir.makeifnotexist()
               self.workingdir.goto()
               #print("[{}] Using directory {}".format(self.id, self.workingdir.fullpath()))
               with open(self.workingdir.fullpath()+"/tape", 'w') as f:
                  f.write(tape)
               if(int(status)==-1):
@@ -191,15 +234,17 @@
                  newVTU=getNewVTU(self.workingdir.fullpath())
                  if newVTU: #upload
                     try:
                        for nv in sorted(newVTU):
                        for nvfile in sorted(newVTU):
                           nv=os.path.join(self.workingdir.fullpath(),nvfile)
                           with open(nv,'r') as f:
                              fc=f.read()
                           s=s+1
                           print('[{}] Uploading {}.'.format(self.id,nv))
                           print('[{}] Uploading {}.'.format(self.id,nvfile))
                           upload(self.conn_address, cid, rid, fc, s)
                           os.unlink(nv)
                     except:
                     except Exception as e:
                        print("[{}] Could not upload".format(self.id))
                        print(e)
                        self.p.terminate()
                        removeDir(self.workingdir.fullpath())
                        self.p=None
@@ -211,7 +256,7 @@
                     try:
                        ping_run(self.conn_address, cid, rid)
                     except:
                        print("[{}] Could not ping.".format(self.id))
                        print("[{}] Could not prolong a lease on the run.".format(self.id))
                        self.p.terminate()
                        self.p=None
                        removeDir(self.workingdir.fullpath())
@@ -232,6 +277,19 @@
                     self.p=None
                     break
                  self.sleep(self.update_seconds-1)
                  client_ping_time_elapsed+=self.update_seconds
                  if(client_ping_time_elapsed>self.max_client_ping_time_elapsed-self.update_seconds/2):
                     try:
                        concurrent_runs=client_ping(self.conn_address,cid)
                     except:
                        print("[{}] Could not client ping.".format(self.id))
                        self.p.terminate()
                        self.p=None
                        removeDir(self.workingdir.fullpath())
                        self.workingdir=None
                        break
                     self.subrunsStartStop(concurrent_runs)
                     client_ping_time_elapsed=0
@@ -308,7 +366,8 @@
#--- END SIGINT and SIGTERM----
if __name__ == '__main__':
   #global glob_ts_version
   glob_ts_version=getTrisurfVersion()
   signal.signal(signal.SIGINT, signal_handler)
   signal.signal(signal.SIGTERM, signal_handler)