Samo Penic
2018-06-04 d357f502ab18a6c5574e7b0e573c19a01c7dfa32
Multithreading and client pinging resolved
1 files modified
40 ■■■■ changed files
tsclient.py 40 ●●●● patch | view | raw | blame | history
tsclient.py
@@ -58,8 +58,8 @@
    response=requests.post(addr+"/api/pingclient/", data=client_data)
    if(response.status_code==200):
        client_data=json.loads(response.text)
        return
        return client_data['concurrent_runs']
    else:
        raise ValueError
@@ -101,7 +101,6 @@
class ClientThread(Thread):
    def __init__(self,conn_address='http://beti.trisurf.eu',subid=0, update_seconds=100):
        super(ClientThread,self).__init__()
        self._stop_event = Event()
@@ -115,6 +114,8 @@
        self.update_seconds=update_seconds
        self.max_client_ping_time_elapsed=250
        self.subruns=[]
    def stop(self):
        self._stop_event.set()
@@ -124,6 +125,9 @@
    def join(self):
        print('joining threads')
        super(ClientThread, self).join()
        for sub in self.subruns:
            sub.stop()
            sub.join()
        if self.p is not None:
            self.p.terminate()
        if self.workingdir is not None:
@@ -136,6 +140,19 @@
                return False
            sleep(1)
        return True
    def subrunsStartStop(self,nr):
        while(self.id==0 and nr>len(self.subruns)+1):
            #spawning a new worker:
            print("[{}] Spawning a new worker".format(self.id))
            t=ClientThread(conn_address=self.conn_address, subid=len(self.subruns)+1,update_seconds=self.update_seconds)
            t.start()
            self.subruns.append(t)
        while(self.id==0 and nr<len(self.subruns)+1):
            print("[{}] Stopping a worker".format(self.id))
            self.subruns[-1].stop()
            self.subruns[-1].join()
            del self.subruns[-1]
    
    def run(self):
        while(not self.isStopped()): #try to register
@@ -147,6 +164,8 @@
                continue
            #print("Got CID. getting RID.")
            client_ping_time_elapsed=0
            concurrent_runs=client_ping(self.conn_address,cid)
            self.subrunsStartStop(concurrent_runs)
            while(not self.isStopped()): #successfully registered, now start pinging and searching for job
                try:
                    (rid,tape,vtu,status)=get_run(self.conn_address,cid)
@@ -156,7 +175,8 @@
                    client_ping_time_elapsed+=10
                    if(client_ping_time_elapsed>=self.max_client_ping_time_elapsed):
                        try:
                            client_ping(self.conn_address,cid)
                            concurrent_runs=client_ping(self.conn_address,cid)
                            self.subrunsStartStop(concurrent_runs)
                        except:
                            break
                        client_ping_time_elapsed=0
@@ -174,6 +194,7 @@
                    self.workingdir=Directory('/tmp/ts_'+str(uuid.uuid4()))
                    self.workingdir.makeifnotexist()
                    self.workingdir.goto()
                    #print("[{}] Using directory {}".format(self.id, self.workingdir.fullpath()))
                    with open(self.workingdir.fullpath()+"/tape", 'w') as f:
                        f.write(tape)
                    if(int(status)==-1):
@@ -193,15 +214,17 @@
                        newVTU=getNewVTU(self.workingdir.fullpath())
                        if newVTU: #upload
                            try:
                                for nv in sorted(newVTU):
                                for nvfile in sorted(newVTU):
                                    nv=os.path.join(self.workingdir.fullpath(),nvfile)
                                    with open(nv,'r') as f:
                                        fc=f.read()
                                    s=s+1
                                    print('[{}] Uploading {}.'.format(self.id,nv))
                                    print('[{}] Uploading {}.'.format(self.id,nvfile))
                                    upload(self.conn_address, cid, rid, fc, s)
                                    os.unlink(nv)
                            except:
                            except Exception as e:
                                print("[{}] Could not upload".format(self.id))
                                print(e)
                                self.p.terminate()
                                removeDir(self.workingdir.fullpath())
                                self.p=None
@@ -236,7 +259,8 @@
                        self.sleep(self.update_seconds-1)
                        client_ping_time_elapsed+=self.update_seconds
                        if(client_ping_time_elapsed>self.max_client_ping_time_elapsed-self.update_seconds/2):
                            client_ping(self.conn_address,cid)
                            concurrent_runs=client_ping(self.conn_address,cid)
                            self.subrunsStartStop(concurrent_runs)
                            client_ping_time_elapsed=0