Source code for qconcurrency.widgets._progressbar_

#!/usr/bin/env python
"""
Name :          qconcurrency.widgets._progressbar_.py
Created :       Apr 17, 2017
Author :        Will Pittman
Contact :       willjpittman@gmail.com
________________________________________________________________________________
Description :   ProgressBar, that can be updated from a separate thread.
________________________________________________________________________________
"""
#builtin
from __future__    import unicode_literals
from __future__    import absolute_import
from __future__    import division
from __future__    import print_function
from   collections import Iterable
import functools
import uuid
import copy
#external
from Qt            import QtWidgets
#internal
from qconcurrency.threading_  import ThreadedTask, SoloThreadedTask

#!TODO: http://stackoverflow.com/questions/7108715/how-to-show-hide-a-child-qwidget-with-a-motion-animation


class _ProgressSoloThreadedTask( SoloThreadedTask ):
    """
    customized :py:obj:`qconcurrency.threading_.SoloThreadedTask` that
    assigns a new jobid for each thread it starts, and each thread's progress
    is measured entirely independently.

    This way, there is no race-condition where cancelling one thread can knock out
    progress on *all* threads.
    """
    def __init__(self, progressbar, callback, signals=None, connections=None, mutex_expiry=5000 ):
        self._progressbar = progressbar
        SoloThreadedTask.__init__(self,
            callback     = callback,
            signals      = signals,
            connections  = connections,
            mutex_expiry = mutex_expiry,
        )

    def start(self, expiryTimeout=-1, threadpool=None, wait=False, *args, **kwds ):
        """
        Wraps :py:meth:`qconcurrency.threading_.SoloThreadedTask.start` ,
        adding signal-connections so that they update a progressbar.

        See Also:

            * :py:meth:`qconcurrency.threading_.SoloThreadedTask.start`
        """

        jobid = uuid.uuid4().hex

        connections = self._connections.copy()
        if not connections:
            connections = {}

        progbar_connections = {
            'incr_progress' : functools.partial( self._progressbar.incr_progress, jobid=jobid ),
            'add_progress'  : functools.partial( self._progressbar.add_progress,  jobid=jobid ),
            'returned'      : functools.partial( self._progressbar._handle_return_or_abort, jobid=jobid ),
            'exception'     : functools.partial( self._progressbar._handle_return_or_abort, jobid=jobid ),
        }

        for signal in progbar_connections:
            if signal in connections:
                connections[ signal ].append( progbar_connections[signal] )
            else:
                connections[ signal ] = [ progbar_connections[signal] ]


        SoloThreadedTask.start(self,
                           expiryTimeout = expiryTimeout,
                           threadpool    = threadpool,
                           wait          = wait,
                           _connections  = connections,
                           *args,**kwds
                       )



[docs]class ProgressBar( QtWidgets.QWidget ): """ A ProgressBar designed to track progress of several threads, that is hidden automatically whenever all threads have exited (by unhandled exception, or return). Each thread is assigned it's own `jobid`, so that it's progress is tracked entirely independently of all other pending tasks. (my hope is that if errors appear in total-calculated progress in the codebase, this will lessen the appearance of an error for the user - each thread's progress being set to 100% once it exits). """ def __init__(self): QtWidgets.QWidget.__init__(self) self._progress = {} # { jobid: {'total':10, 'current':3} } self._cancelled_jobids = [] # rolling log of cancelled jobids (so later unhandled progress is ignored) self._progressbar = None # the ProgressBar Widget self.setHidden(True) self._initui() def _initui(self): # Create Widgets layout = QtWidgets.QHBoxLayout() self._progressbar = QtWidgets.QProgressBar() self._reset_btn = QtWidgets.QPushButton('reset progress') # Position Widgets self.setLayout( layout ) layout.addWidget( self._progressbar ) layout.addWidget( self._reset_btn ) # Connections self._reset_btn.clicked.connect( self.reset )
[docs] def add_progress(self, amount, jobid=None): """ Adds to the total number of required steps required to complete. """ if jobid in self._cancelled_jobids: return if jobid not in self._progress: self._progress[ jobid ] = {'total':amount, 'current':0} else: self._progress[ jobid ]['total'] += amount self.refresh_progress() self.setHidden(False)
[docs] def refresh_progress(self): """ ReCalculates current/total progress, and updates the progressbar """ total_progress = 0 current_progress = 0 for jobid in list(self._progress.keys()): total_progress += self._progress[ jobid ]['total'] current_progress += self._progress[ jobid ]['current'] if jobid in self._cancelled_jobids: self._progress.pop(jobid) if current_progress == total_progress: self._progress.pop(jobid) self._progressbar.setMaximum( total_progress ) self._progressbar.setValue( current_progress ) if total_progress <= current_progress: self.setHidden(True)
[docs] def incr_progress(self, amount, jobid): """ Completes a particular number of steps for a particular jobid. """ if jobid in self._cancelled_jobids: return if amount == None: amount = 1 if jobid not in self._progress: return self._progress[ jobid ]['current'] += amount self.refresh_progress()
[docs] def reset(self, jobid=None ): """ If not `jobid`, sets all required progress-steps back to 0. Otherwise, removes all required progress associated with that jobid. """ if not jobid: self._progress = {} self._progressbar.reset() self.refresh_progress() elif jobid in self._progress: self._progress.pop( jobid ) self.refresh_progress()
[docs] def new_task(self, callback, signals=None, *args, **kwds ): """ Creates a new :py:obj:`ThreadedTask` object, adding signals to it so that it can update this :py:obj:`ProgressBar`. :py:obj:`ThreadedTask` objects are most suitable for producer/consumer patterns (multiple threads running at once), and no thread depends on another. See also: * :py:obj:`ThreadedTask` * :py:obj:`SoloThreadedTask` * :py:meth:`ProgressBar.new_solotask` """ jobid = uuid.uuid4().hex # assign signals default_signals = { 'incr_progress': int, 'add_progress': int, } if signals: default_signals.update( signals ) # create task task = ThreadedTask( callback = callback, signals = default_signals, *args, **kwds ) # Connections task.signal('incr_progress').connect( functools.partial( self.incr_progress, jobid=jobid ) ) task.signal('add_progress').connect( functools.partial( self.add_progress, jobid=jobid ) ) task.signal('returned').connect( functools.partial( self._handle_return_or_abort, jobid=jobid ) ) task.signal('exception').connect( functools.partial( self._handle_return_or_abort, jobid=jobid ) ) return task
[docs] def new_solotask(self, callback, signals=None, connections=None, mutex_expiry=5000 ): """ Creates a new :py:obj:`SoloThreadedTask` object, adding signals to it so that it can update this :py:obj:`ProgressBar`. """ jobid = uuid.uuid4().hex # assign signals default_signals = { 'incr_progress': int, 'add_progress': int, } if signals: default_signals.update( signals ) # assign connections default_connections = {} if connections: for signal in connections: if signal in default_connections: if isinstance( connections[ signal ], Iterable ): for _callable in connections[ signal ]: default_connections[ signal ].append( _callable ) else: _callable = connections[ signal ] default_connections[ signal ].append( _callable ) else: default_connections[ signal ] = connections[ signal ] # create task solotask = _ProgressSoloThreadedTask( progressbar = self, callback = callback, signals = default_signals, connections = default_connections, mutex_expiry = mutex_expiry, ) return solotask
def _handle_return_or_abort(self, *args,**kwds): if 'jobid' in kwds: if 'jobid' in kwds: self.reset( jobid=kwds['jobid'] ) self._cancelled_jobids.append( kwds['jobid'] ) # when there are more than 50x entries, # prune the tracked jobids if len(self._cancelled_jobids) > 50: self._cancelled_jobids.pop(0) self.refresh_progress()
if __name__ == '__main__': from qconcurrency import QApplication, Fake from Qt import QtWidgets, QtCore, QtGui import supercli.logging import time supercli.logging.SetLog(lv=20) def update_progbar( start_wait=0, signalmgr=None ): if not signalmgr: signalmgr = Fake() signalmgr.add_progress.emit(5) time.sleep( start_wait ) for i in range(5): signalmgr.handle_if_abort() time.sleep(0.2) signalmgr.incr_progress.emit(1) print('done') with QApplication(): win = QtWidgets.QWidget() lyt = QtWidgets.QVBoxLayout() bar = ProgressBar() win.setLayout(lyt) lyt.addWidget(bar) win.show() solotask = bar.new_solotask( callback = update_progbar, ) solotask.start( start_wait=0 ) # wait before starting next job, # keeping eventloop alive so progress can be seen for i in range(2): time.sleep(0.2) QtCore.QCoreApplication.instance().processEvents() solotask.start( start_wait=0.5 ) solotask.start() solotask.start()