| | |
| | | import sys |
| | | import socket |
| | | from threading import Thread, Event |
| | | import re |
| | | |
| | | |
| | | |
| | | glob_ts_version='00000' |
| | | |
| | | def getTrisurfVersion(): |
| | | p = subprocess.Popen('trisurf --version', shell=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT) |
| | | lines=p.stdout.readlines() |
| | | version=re.findall(r'[0-9a-f]{7}(?:-dirty)?', lines[0].decode('ascii')) |
| | | p.wait() |
| | | if(len(version)): |
| | | return version[0] |
| | | else: |
| | | return "unknown version" |
| | | |
| | | def get_hostname(): |
| | | return socket.gethostname() |
| | |
| | | return ((([ip for ip in socket.gethostbyname_ex(socket.gethostname())[2] if not ip.startswith("127.")] or [[(s.connect(("8.8.8.8", 53)), s.getsockname()[0], s.close()) for s in [socket.socket(socket.AF_INET, socket.SOCK_DGRAM)]][0][1]]) + ["no IP found"])[0]) |
| | | |
| | | def get_client_id(addr, my_ip, my_hostname, subrun): |
| | | client_auth={'ip':my_ip,'hostname':my_hostname, 'subrun':subrun} |
| | | global glob_ts_version |
| | | client_auth={'ip':my_ip,'hostname':my_hostname, 'subrun':subrun, 'trisurf_version':glob_ts_version } |
| | | response=requests.post(addr+"/api/register/", data=client_auth) |
| | | if(response.status_code==200): |
| | | client_data=json.loads(response.text) |
| | |
| | | status=client_data['status'] |
| | | return (rid,tape,vtu,status) |
| | | else: |
| | | print(response.text) |
| | | #print(response.text) |
| | | if(response.status_code==400): |
| | | raise ValueError |
| | | else: |
| | |
| | | client_data={'client_id':cid} |
| | | response=requests.post(addr+"/api/pingclient/", data=client_data) |
| | | if(response.status_code==200): |
| | | return |
| | | client_data=json.loads(response.text) |
| | | |
| | | return client_data['concurrent_runs'] |
| | | else: |
| | | raise ValueError |
| | | |
| | |
| | | |
| | | |
| | | class ClientThread(Thread): |
| | | |
| | | def __init__(self,conn_address='http://beti.trisurf.eu',subid=0, update_seconds=100): |
| | | super(ClientThread,self).__init__() |
| | | self._stop_event = Event() |
| | |
| | | self.ip=get_ip() |
| | | self.hostname=get_hostname() |
| | | self.update_seconds=update_seconds |
| | | self.max_client_ping_time_elapsed=250 |
| | | |
| | | self.subruns=[] |
| | | |
| | | def stop(self): |
| | | self._stop_event.set() |
| | |
| | | def join(self): |
| | | print('joining threads') |
| | | super(ClientThread, self).join() |
| | | for sub in self.subruns: |
| | | sub.stop() |
| | | sub.join() |
| | | if self.p is not None: |
| | | self.p.terminate() |
| | | if self.workingdir is not None: |
| | |
| | | return False |
| | | sleep(1) |
| | | return True |
| | | |
| | | def subrunsStartStop(self,nr): |
| | | while(self.id==0 and nr>len(self.subruns)+1): |
| | | #spawning a new worker: |
| | | print("[{}] Spawning a new worker".format(self.id)) |
| | | t=ClientThread(conn_address=self.conn_address, subid=len(self.subruns)+1,update_seconds=self.update_seconds) |
| | | t.start() |
| | | self.subruns.append(t) |
| | | while(self.id==0 and nr<len(self.subruns)+1): |
| | | print("[{}] Stopping a worker".format(self.id)) |
| | | self.subruns[-1].stop() |
| | | self.subruns[-1].join() |
| | | del self.subruns[-1] |
| | | |
| | | def run(self): |
| | | while(not self.isStopped()): |
| | | while(not self.isStopped()): #try to register |
| | | try: |
| | | cid=get_client_id(self.conn_address, self.ip, self.hostname, self.id) |
| | | except: |
| | | print("[{}] Could not get CID.".format(self.id)) |
| | | self.sleep(10) |
| | | continue |
| | | #print("Got CID. getting RID.") |
| | | client_ping_time_elapsed=0 |
| | | while(not self.isStopped()): |
| | | print("[{}] Connected and got client ID {}.".format(self.id, cid)) |
| | | try: |
| | | concurrent_runs=client_ping(self.conn_address,cid) |
| | | client_ping_time_elapsed=0 |
| | | except: |
| | | self.sleep(10) |
| | | continue |
| | | self.subrunsStartStop(concurrent_runs) |
| | | while(not self.isStopped()): #successfully registered, now start pinging and searching for job |
| | | try: |
| | | (rid,tape,vtu,status)=get_run(self.conn_address,cid) |
| | | except NameError: |
| | | print("[{}] Could not get RID.".format(self.id)) |
| | | #print("[{}] Could not get RID.".format(self.id)) |
| | | self.sleep(10) |
| | | client_ping_time_elapsed+=10 |
| | | if(client_ping_time_elapsed>=250): |
| | | if(client_ping_time_elapsed>=self.max_client_ping_time_elapsed): |
| | | try: |
| | | client_ping(self.conn_address,cid) |
| | | concurrent_runs=client_ping(self.conn_address,cid) |
| | | self.subrunsStartStop(concurrent_runs) |
| | | except: |
| | | break |
| | | client_ping_time_elapsed=0 |
| | |
| | | self.workingdir=Directory('/tmp/ts_'+str(uuid.uuid4())) |
| | | self.workingdir.makeifnotexist() |
| | | self.workingdir.goto() |
| | | #print("[{}] Using directory {}".format(self.id, self.workingdir.fullpath())) |
| | | with open(self.workingdir.fullpath()+"/tape", 'w') as f: |
| | | f.write(tape) |
| | | if(int(status)==-1): |
| | |
| | | newVTU=getNewVTU(self.workingdir.fullpath()) |
| | | if newVTU: #upload |
| | | try: |
| | | for nv in sorted(newVTU): |
| | | for nvfile in sorted(newVTU): |
| | | nv=os.path.join(self.workingdir.fullpath(),nvfile) |
| | | with open(nv,'r') as f: |
| | | fc=f.read() |
| | | s=s+1 |
| | | print('[{}] Uploading {}.'.format(self.id,nv)) |
| | | print('[{}] Uploading {}.'.format(self.id,nvfile)) |
| | | upload(self.conn_address, cid, rid, fc, s) |
| | | os.unlink(nv) |
| | | except: |
| | | except Exception as e: |
| | | print("[{}] Could not upload".format(self.id)) |
| | | print(e) |
| | | self.p.terminate() |
| | | removeDir(self.workingdir.fullpath()) |
| | | self.p=None |
| | |
| | | try: |
| | | ping_run(self.conn_address, cid, rid) |
| | | except: |
| | | print("[{}] Could not ping.".format(self.id)) |
| | | print("[{}] Could not prolong a lease on the run.".format(self.id)) |
| | | self.p.terminate() |
| | | self.p=None |
| | | removeDir(self.workingdir.fullpath()) |
| | |
| | | self.p=None |
| | | break |
| | | self.sleep(self.update_seconds-1) |
| | | client_ping_time_elapsed+=self.update_seconds |
| | | if(client_ping_time_elapsed>self.max_client_ping_time_elapsed-self.update_seconds/2): |
| | | try: |
| | | concurrent_runs=client_ping(self.conn_address,cid) |
| | | except: |
| | | print("[{}] Could not client ping.".format(self.id)) |
| | | self.p.terminate() |
| | | self.p=None |
| | | removeDir(self.workingdir.fullpath()) |
| | | self.workingdir=None |
| | | break |
| | | self.subrunsStartStop(concurrent_runs) |
| | | client_ping_time_elapsed=0 |
| | | |
| | | |
| | | |
| | |
| | | #--- END SIGINT and SIGTERM---- |
| | | |
| | | if __name__ == '__main__': |
| | | |
| | | #global glob_ts_version |
| | | glob_ts_version=getTrisurfVersion() |
| | | signal.signal(signal.SIGINT, signal_handler) |
| | | signal.signal(signal.SIGTERM, signal_handler) |
| | | |