From 6bb7d2af3bc3c1f032c1c0c4060577a158ff5f8d Mon Sep 17 00:00:00 2001
From: Samo Penic <samo.penic@gmail.com>
Date: Sun, 23 Dec 2018 17:02:49 +0000
Subject: [PATCH] Fix in a bug

---
 tsclient.py |   83 ++++++++++++++++++++++++++++++++++-------
 1 files changed, 68 insertions(+), 15 deletions(-)

diff --git a/tsclient.py b/tsclient.py
index ed061b7..8435467 100755
--- a/tsclient.py
+++ b/tsclient.py
@@ -10,6 +10,21 @@
 import sys
 import socket
 from threading import Thread, Event
+import re
+
+
+
+glob_ts_version='00000'
+
+def getTrisurfVersion():
+	p = subprocess.Popen('trisurf --version', shell=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
+	lines=p.stdout.readlines()
+	version=re.findall(r'[0-9a-f]{7}(?:-dirty)?', lines[0].decode('ascii'))
+	p.wait()
+	if(len(version)):
+		return version[0]
+	else:
+		return "unknown version"
 
 def get_hostname():
 	return socket.gethostname()
@@ -18,7 +33,8 @@
 	return ((([ip for ip in socket.gethostbyname_ex(socket.gethostname())[2] if not ip.startswith("127.")] or [[(s.connect(("8.8.8.8", 53)), s.getsockname()[0], s.close()) for s in [socket.socket(socket.AF_INET, socket.SOCK_DGRAM)]][0][1]]) + ["no IP found"])[0])
 
 def get_client_id(addr, my_ip, my_hostname, subrun):
-	client_auth={'ip':my_ip,'hostname':my_hostname, 'subrun':subrun}
+	global glob_ts_version
+	client_auth={'ip':my_ip,'hostname':my_hostname, 'subrun':subrun, 'trisurf_version':glob_ts_version }
 	response=requests.post(addr+"/api/register/", data=client_auth)
 	if(response.status_code==200):
 		client_data=json.loads(response.text)
@@ -38,7 +54,7 @@
 		status=client_data['status']
 		return (rid,tape,vtu,status)
 	else:
-		print(response.text)
+		#print(response.text)
 		if(response.status_code==400):
 			raise ValueError
 		else:
@@ -58,8 +74,8 @@
 	response=requests.post(addr+"/api/pingclient/", data=client_data)
 	if(response.status_code==200):
 		client_data=json.loads(response.text)
-		
-		return
+			
+		return client_data['concurrent_runs']
 	else:
 		raise ValueError
 
@@ -101,7 +117,6 @@
 
 
 class ClientThread(Thread):
-
 	def __init__(self,conn_address='http://beti.trisurf.eu',subid=0, update_seconds=100):
 		super(ClientThread,self).__init__()
 		self._stop_event = Event()
@@ -115,6 +130,8 @@
 		self.update_seconds=update_seconds
 		self.max_client_ping_time_elapsed=250
 
+		self.subruns=[]
+
 	def stop(self):
 		self._stop_event.set()
 
@@ -124,6 +141,9 @@
 	def join(self):
 		print('joining threads')
 		super(ClientThread, self).join()
+		for sub in self.subruns:
+			sub.stop()
+			sub.join()
 		if self.p is not None:
 			self.p.terminate()
 		if self.workingdir is not None:
@@ -136,6 +156,19 @@
 				return False
 			sleep(1)
 		return True
+
+	def subrunsStartStop(self,nr):
+		while(self.id==0 and nr>len(self.subruns)+1):
+			#spawning a new worker:
+			print("[{}] Spawning a new worker".format(self.id))
+			t=ClientThread(conn_address=self.conn_address, subid=len(self.subruns)+1,update_seconds=self.update_seconds)
+			t.start()
+			self.subruns.append(t)	
+		while(self.id==0 and nr<len(self.subruns)+1):
+			print("[{}] Stopping a worker".format(self.id))
+			self.subruns[-1].stop()
+			self.subruns[-1].join()
+			del self.subruns[-1]
 	
 	def run(self):
 		while(not self.isStopped()): #try to register
@@ -145,18 +178,25 @@
 				print("[{}] Could not get CID.".format(self.id))
 				self.sleep(10)
 				continue
-			#print("Got CID. getting RID.")
-			client_ping_time_elapsed=0
+			print("[{}] Connected and got client ID {}.".format(self.id, cid))
+			try:
+				concurrent_runs=client_ping(self.conn_address,cid)
+				client_ping_time_elapsed=0
+			except:
+				self.sleep(10)
+				continue
+			self.subrunsStartStop(concurrent_runs)
 			while(not self.isStopped()): #successfully registered, now start pinging and searching for job
 				try:
 					(rid,tape,vtu,status)=get_run(self.conn_address,cid)
 				except NameError:
-					print("[{}] Could not get RID.".format(self.id))
+					#print("[{}] Could not get RID.".format(self.id))
 					self.sleep(10)
 					client_ping_time_elapsed+=10
 					if(client_ping_time_elapsed>=self.max_client_ping_time_elapsed):
 						try:
-							client_ping(self.conn_address,cid)
+							concurrent_runs=client_ping(self.conn_address,cid)
+							self.subrunsStartStop(concurrent_runs)
 						except:
 							break
 						client_ping_time_elapsed=0
@@ -174,6 +214,7 @@
 					self.workingdir=Directory('/tmp/ts_'+str(uuid.uuid4()))
 					self.workingdir.makeifnotexist()
 					self.workingdir.goto()
+					#print("[{}] Using directory {}".format(self.id, self.workingdir.fullpath()))
 					with open(self.workingdir.fullpath()+"/tape", 'w') as f:
 						f.write(tape)
 					if(int(status)==-1):
@@ -193,15 +234,17 @@
 						newVTU=getNewVTU(self.workingdir.fullpath())
 						if newVTU: #upload
 							try:
-								for nv in sorted(newVTU):
+								for nvfile in sorted(newVTU):
+									nv=os.path.join(self.workingdir.fullpath(),nvfile)
 									with open(nv,'r') as f:
 										fc=f.read()
 									s=s+1
-									print('[{}] Uploading {}.'.format(self.id,nv))
+									print('[{}] Uploading {}.'.format(self.id,nvfile))
 									upload(self.conn_address, cid, rid, fc, s)
 									os.unlink(nv)
-							except:
+							except Exception as e:
 								print("[{}] Could not upload".format(self.id))
+								print(e)
 								self.p.terminate()
 								removeDir(self.workingdir.fullpath())
 								self.p=None
@@ -213,7 +256,7 @@
 							try:
 								ping_run(self.conn_address, cid, rid)
 							except:
-								print("[{}] Could not ping.".format(self.id))
+								print("[{}] Could not prolong a lease on the run.".format(self.id))
 								self.p.terminate()
 								self.p=None
 								removeDir(self.workingdir.fullpath())
@@ -236,7 +279,16 @@
 						self.sleep(self.update_seconds-1)
 						client_ping_time_elapsed+=self.update_seconds
 						if(client_ping_time_elapsed>self.max_client_ping_time_elapsed-self.update_seconds/2):
-							client_ping(self.conn_address,cid)
+							try:
+								concurrent_runs=client_ping(self.conn_address,cid)
+							except:
+								print("[{}] Could not client ping.".format(self.id))
+								self.p.terminate()
+								self.p=None
+								removeDir(self.workingdir.fullpath())
+								self.workingdir=None
+								break
+							self.subrunsStartStop(concurrent_runs)
 							client_ping_time_elapsed=0
 
 
@@ -314,7 +366,8 @@
 #--- END SIGINT and SIGTERM----
 
 if __name__ == '__main__':
-
+	#global glob_ts_version
+	glob_ts_version=getTrisurfVersion()
 	signal.signal(signal.SIGINT, signal_handler)
 	signal.signal(signal.SIGTERM, signal_handler)
 

--
Gitblit v1.9.3