From 6bb7d2af3bc3c1f032c1c0c4060577a158ff5f8d Mon Sep 17 00:00:00 2001 From: Samo Penic <samo.penic@gmail.com> Date: Sun, 23 Dec 2018 17:02:49 +0000 Subject: [PATCH] Fix in a bug --- tsclient.py | 83 ++++++++++++++++++++++++++++++++++------- 1 files changed, 68 insertions(+), 15 deletions(-) diff --git a/tsclient.py b/tsclient.py index ed061b7..8435467 100755 --- a/tsclient.py +++ b/tsclient.py @@ -10,6 +10,21 @@ import sys import socket from threading import Thread, Event +import re + + + +glob_ts_version='00000' + +def getTrisurfVersion(): + p = subprocess.Popen('trisurf --version', shell=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT) + lines=p.stdout.readlines() + version=re.findall(r'[0-9a-f]{7}(?:-dirty)?', lines[0].decode('ascii')) + p.wait() + if(len(version)): + return version[0] + else: + return "unknown version" def get_hostname(): return socket.gethostname() @@ -18,7 +33,8 @@ return ((([ip for ip in socket.gethostbyname_ex(socket.gethostname())[2] if not ip.startswith("127.")] or [[(s.connect(("8.8.8.8", 53)), s.getsockname()[0], s.close()) for s in [socket.socket(socket.AF_INET, socket.SOCK_DGRAM)]][0][1]]) + ["no IP found"])[0]) def get_client_id(addr, my_ip, my_hostname, subrun): - client_auth={'ip':my_ip,'hostname':my_hostname, 'subrun':subrun} + global glob_ts_version + client_auth={'ip':my_ip,'hostname':my_hostname, 'subrun':subrun, 'trisurf_version':glob_ts_version } response=requests.post(addr+"/api/register/", data=client_auth) if(response.status_code==200): client_data=json.loads(response.text) @@ -38,7 +54,7 @@ status=client_data['status'] return (rid,tape,vtu,status) else: - print(response.text) + #print(response.text) if(response.status_code==400): raise ValueError else: @@ -58,8 +74,8 @@ response=requests.post(addr+"/api/pingclient/", data=client_data) if(response.status_code==200): client_data=json.loads(response.text) - - return + + return client_data['concurrent_runs'] else: raise ValueError @@ -101,7 +117,6 @@ class ClientThread(Thread): - def __init__(self,conn_address='http://beti.trisurf.eu',subid=0, update_seconds=100): super(ClientThread,self).__init__() self._stop_event = Event() @@ -115,6 +130,8 @@ self.update_seconds=update_seconds self.max_client_ping_time_elapsed=250 + self.subruns=[] + def stop(self): self._stop_event.set() @@ -124,6 +141,9 @@ def join(self): print('joining threads') super(ClientThread, self).join() + for sub in self.subruns: + sub.stop() + sub.join() if self.p is not None: self.p.terminate() if self.workingdir is not None: @@ -136,6 +156,19 @@ return False sleep(1) return True + + def subrunsStartStop(self,nr): + while(self.id==0 and nr>len(self.subruns)+1): + #spawning a new worker: + print("[{}] Spawning a new worker".format(self.id)) + t=ClientThread(conn_address=self.conn_address, subid=len(self.subruns)+1,update_seconds=self.update_seconds) + t.start() + self.subruns.append(t) + while(self.id==0 and nr<len(self.subruns)+1): + print("[{}] Stopping a worker".format(self.id)) + self.subruns[-1].stop() + self.subruns[-1].join() + del self.subruns[-1] def run(self): while(not self.isStopped()): #try to register @@ -145,18 +178,25 @@ print("[{}] Could not get CID.".format(self.id)) self.sleep(10) continue - #print("Got CID. getting RID.") - client_ping_time_elapsed=0 + print("[{}] Connected and got client ID {}.".format(self.id, cid)) + try: + concurrent_runs=client_ping(self.conn_address,cid) + client_ping_time_elapsed=0 + except: + self.sleep(10) + continue + self.subrunsStartStop(concurrent_runs) while(not self.isStopped()): #successfully registered, now start pinging and searching for job try: (rid,tape,vtu,status)=get_run(self.conn_address,cid) except NameError: - print("[{}] Could not get RID.".format(self.id)) + #print("[{}] Could not get RID.".format(self.id)) self.sleep(10) client_ping_time_elapsed+=10 if(client_ping_time_elapsed>=self.max_client_ping_time_elapsed): try: - client_ping(self.conn_address,cid) + concurrent_runs=client_ping(self.conn_address,cid) + self.subrunsStartStop(concurrent_runs) except: break client_ping_time_elapsed=0 @@ -174,6 +214,7 @@ self.workingdir=Directory('/tmp/ts_'+str(uuid.uuid4())) self.workingdir.makeifnotexist() self.workingdir.goto() + #print("[{}] Using directory {}".format(self.id, self.workingdir.fullpath())) with open(self.workingdir.fullpath()+"/tape", 'w') as f: f.write(tape) if(int(status)==-1): @@ -193,15 +234,17 @@ newVTU=getNewVTU(self.workingdir.fullpath()) if newVTU: #upload try: - for nv in sorted(newVTU): + for nvfile in sorted(newVTU): + nv=os.path.join(self.workingdir.fullpath(),nvfile) with open(nv,'r') as f: fc=f.read() s=s+1 - print('[{}] Uploading {}.'.format(self.id,nv)) + print('[{}] Uploading {}.'.format(self.id,nvfile)) upload(self.conn_address, cid, rid, fc, s) os.unlink(nv) - except: + except Exception as e: print("[{}] Could not upload".format(self.id)) + print(e) self.p.terminate() removeDir(self.workingdir.fullpath()) self.p=None @@ -213,7 +256,7 @@ try: ping_run(self.conn_address, cid, rid) except: - print("[{}] Could not ping.".format(self.id)) + print("[{}] Could not prolong a lease on the run.".format(self.id)) self.p.terminate() self.p=None removeDir(self.workingdir.fullpath()) @@ -236,7 +279,16 @@ self.sleep(self.update_seconds-1) client_ping_time_elapsed+=self.update_seconds if(client_ping_time_elapsed>self.max_client_ping_time_elapsed-self.update_seconds/2): - client_ping(self.conn_address,cid) + try: + concurrent_runs=client_ping(self.conn_address,cid) + except: + print("[{}] Could not client ping.".format(self.id)) + self.p.terminate() + self.p=None + removeDir(self.workingdir.fullpath()) + self.workingdir=None + break + self.subrunsStartStop(concurrent_runs) client_ping_time_elapsed=0 @@ -314,7 +366,8 @@ #--- END SIGINT and SIGTERM---- if __name__ == '__main__': - + #global glob_ts_version + glob_ts_version=getTrisurfVersion() signal.signal(signal.SIGINT, signal_handler) signal.signal(signal.SIGTERM, signal_handler) -- Gitblit v1.9.3