#!/usr/bin/env python
"""
Name : qconcurrency.threading_..py
Created : Apr 08, 2017
Author : Will Pittman
Contact : willjpittman@gmail.com
________________________________________________________________________________
Description :
________________________________________________________________________________
"""
#builtin
from __future__ import unicode_literals
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
from collections import MutableMapping, Iterable, OrderedDict
from numbers import Number
import logging
import sys
import os
import uuid
import time
import importlib
import functools
import traceback
#package
#external
from Qt import QtCore, QtWidgets
import six
#internal
from qconcurrency.exceptions_ import *
logger = logging.getLogger(__name__)
loc = locals
__all__ = [
'SignalManagerFactory',
'ThreadedTask',
'SoloThreadedTask',
'QSemaphoreLocker',
]
[docs]def SignalManagerFactory( signals, queue_stop=None ):
"""
Dynamically creates a :py:obj:`SignalManager` class
with all requested signals.
:py:obj:`SignalManager` objects are dynamically
created :py:obj:`QtCore.QObject` designed to be handed
off to a separate thread. They contain a variable
number of signals, and the method `handle_if_abort`
which checks the value of `self._abort_requested`
Example:
.. code-block:: python
class SignalManager( QtCore.QObject ):
returned = QtCore.Signal()
exception = QtCore.Signal()
def handle_if_abort(self):
if self._abort_requested:
raise UserCancelledOperation()
Args:
signals (dict, optional): ``(ex: {signal_name:emitted datatype(s)} )``
A dictionary of signal-names, and the datatypes
they will emit.
.. code-block:: python
{
'update_status': None, # update_status = QtCore.Signal()
'log_message': str, # log_message = QtCore.Signal(str)
'add_item': (int, str), # add_item = QtCore.Signal(int, str)
}
queue_stop (queue.Queue, optional):
A queue that handles request-aborts. When
:py:obj:`SignalManager.handle_if_abort` is run,
if the queue contains this thread's assigned id,
then this thread will be stopped.
"""
if not isinstance( signals, MutableMapping ):
raise RuntimeError(
'Expected `signals` argument to be a dictionary of \n'
'signal-names, and emit-datatypes \n'
)
# extend globals to include all types needed for signals
def import_signal_datatype( _globals, datatype ):
"""
Args:
_globals (dict):
A modified version of `globals` builtin, that will be used
when defining class
datatype (object):
A python object, emitted by a signal.
Returns:
.. code-block:: python
(
_globals, # globals() with datatype pkg imported
pkgname , # full importpath of datatype
)
"""
str_datatype = datatype.__name__
if datatype.__module__ != '__builtin__':
_globals[ datatype.__module__.split('.')[-1] ] = (
importlib.import_module( datatype.__module__ )
)
str_datatype = '%s.%s' % (
datatype.__module__.split('.')[-1],
datatype.__name__,
)
return _globals, str_datatype
_globals = globals()
class_ = (
'from Qt import QtCore \n'
'class SignalManager( QtCore.QObject ):\n'
)
for signal in signals:
# signals without a returntype
if not signals[signal]:
class_ += ' {signal} = QtCore.Signal() \n'.format(signal=signal)
# signals with multiple returns
elif isinstance( signals[signal], Iterable ):
str_datatypes = []
for datatype in signals[signal]:
(_globals, str_datatype) = import_signal_datatype( _globals, datatype )
str_datatypes.append( str_datatype )
class_ += ' {signal} = QtCore.Signal({signal_datatypes})\n'.format(
signal = signal,
signal_datatypes = ','.join(str_datatypes)
)
# single signal datatype
else:
(_globals, str_datatype) = import_signal_datatype( _globals, signals[signal] )
class_ += ' {signal} = QtCore.Signal({datatype})\n'.format(
signal = signal,
datatype = str_datatype,
)
# self._signals
class_ += (
' def __init__(self, _id=None, signals=None, queue_stop=None ): \n'
' QtCore.QObject.__init__(self) \n'
' \n'
' self._id = _id \n'
' self._queue_stop = queue_stop \n'
' self._abort_requested = False \n'
' self._signals_arg = signals \n'
' self._signals = { \n'
)
for signal in signals:
class_ += ' "{signal}": self.{signal},\n'.format(signal=signal)
class_ += ' }\n'
# methods
class_ += (
' def _request_abort(self): \n'
' """ \n'
' Private method that sets attr :py:attr:`_abort_requested`. \n'
' Designed to be connected to a signal. \n'
' """ \n'
' self._abort_requested = True \n'
' \n'
' def handle_if_abort(self, msg=None): \n'
' """ \n'
' Checks if an abort has been requested. If so, \n'
' raises :py:obj:`UserCancelledOperation` \n'
' \n'
' Raises: \n'
' :py:obj:`UserCancelledOperation` \n'
' """ \n'
' if not msg: \n'
' msg = "" \n'
' \n'
' # if self._request_abort() has been called \n'
' if self._abort_requested: \n'
' raise UserCancelledOperation( msg ) \n'
' \n'
' \n'
' def signals(self): \n'
' """ \n'
' Returns a dictionary of signal-names, and the signal \n'
' they represent. \n'
' \n'
' Returns: \n'
' \n'
' .. code-block:: python \n'
' \n'
' { \n'
' "returned": QtCore.Signal(), \n'
' "exception": QtCore.Signal(tuple) \n'
' ... \n'
' } \n'
' """ \n'
' return self._signals \n'
)
_locals = locals()
exec( class_, _globals, _locals )
return _locals['SignalManager']()
[docs]class QSemaphoreLocker( QtCore.QObject ):
"""
Mirrors the behaviour of :py:obj:`QtCore.QMutexLocker`, but instead of a mutex
manages a :py:obj:`QtCore.QSemaphore` .
Example:
.. code-block:: python
class MyClass( QtCore.QObject ):
def __init__(self):
self._semaphore = QtCore.QSemaphore(5)
def load(self):
locked = QSemaphoreLocker( self._semaphore, 3, 500 )
# ...
# when locked goes out of scope, 3x resource is released
# ...
def load_again(self):
with QSemaphoreLocker( self._semaphore ):
# ...
# do these actions while 1x semaphore is locked
# ...
# .. semaphore resources no longer in use ..
"""
[docs] def __init__(self, semaphore, n=1, timeout=-1 ):
"""
Args:
semaphore (QtCore.QSemaphore):
The semaphore that you'd like to lock
n (int, optional):
The number of resources you would like to lock
in the semaphore.
timeout (int, optional):
milliseconds you would like to wait for `n` resources
to become available before failing :py:meth:`QtCore.QSemaphore.acquire`
A :py:obj:`RuntimeError` will be raised.
"""
QtCore.QObject.__init__(self)
if not isinstance( semaphore, QtCore.QSemaphore ):
raise TypeError((
'expected `semaphore` argument to be of type `QtCore.QSemaphore` \n'
'received type: %s'
) % str(type(semaphore))
)
self._semaphore = semaphore
self._resources = n
self.destroyed.connect(
functools.partial(
self._semaphore.release,
self._resources,
)
)
success = self._semaphore.tryAcquire( n, timeout )
if not success:
raise TimedOut(
'waited timeout of %sms to acquire %s QSemaphore resources' % (
timeout, n)
)
def __enter__(self):
pass
def __exit__(self, err_type, err_msg, err_tb ):
"""
On exit, releases all resources that were acquired
"""
if all([ err_type, err_msg, err_tb ]):
self._semaphore.release( self._resources )
six.reraise( err_type, err_msg, err_tb )
self._semaphore.release( self._resources )
[docs]class ThreadedTask( QtCore.QRunnable ):
"""
Bundles a callback method, it's arguments, and a variable
number of signals (with variable return-types) into a :py:obj:`QtCore.QRunnable`
that can be safely queued in a :py:obj:`QtCore.QThreadPool`.
Every callback method must accept the keyword argument
`signalmgr`. `signalmgr` is a :py:obj:`QtCore.QObject`
that is instantiated with signals to communicate back with the UI
thread, and the method :py:meth:`SignalManager.handle_if_abort`
which should be run periodically in your `callback` method
to handle user-abort requests (issued by :py:meth:`request_abort` ).
Example:
*Run function in QCoreApplication's :py:obj:`QtCore.QThreadPool`*
Handling early-exit by periodically (at safe points)
checking if :py:meth:`task.request_abort` has been run.
.. code-block:: python
def long_running_job( jobid, signalmgr=None ):
for i in range(5):
signalmgr.handle_if_abort() # exit early, if user-abort requested
time.sleep(1)
print('finished job %s' % jobid )
task = ThreadedTask(
callable = long_running_job,
jobid = 1
)
task.start()
*Create signals that can be used within the thread.*
:py:obj:`QtCore.QRunnable` objects (which get used in a
:py:obj:`QtCore.QThreadPool` ) cannot have signals attached to them.
In order for this to work you must create a :py:obj:`QtCore.QObject`
with the signals that can be passed to the thread. This all gets
done behind the scenes with a :py:obj:`ThreadedTask` .
.. code-block:: python
def long_running_job( signalmgr=None ):
signalmgr.log_message.emit('started job...')
signalmgr.set_title.emit('My Title', 'my description')
signalmgr.status_changed.emit()
def printargs(*args):
print( args )
task = ThreadedTask( ### Roughly Equivalent to:
callback = long_running_job, #
signals = { # class SignalManager( QtCore.QObject ):
'status_changed': None, # status_changed = QtCore.Signal()
'log_message': str, # log_message = QtCore.Signal(str)
'set_title': (str,str), # set_title = QtCore.Signal(str,str)
}, #
) #
task.signal('set_title').connect( printargs )
task.signal('log_message').connect( printargs )
task.start()
*Handle successful returns, and unhandled exceptions*
:py:obj:`ThreadedTask` have builtin signals
`returned`, and `exception`, that are emitted
automatically. You may emit the output of your callback
in `returned`, only if you override the `returned`
signal in the `signals` argument.
.. code-blocK:: python
def long_running_job( signalmgr=None ):
pass
def run_on_exit(*args,**kwds):
pass
task = ThreadedTask(
callback = long_running_job,
)
task.signal('returned').connect(run_on_exit)
task.signal('exception').connect(run_on_exit)
task.start()
See Also:
* :py:obj:`qconcurrency.threading_.SignalManagerFactory`
* :py:obj:`qconcurrency.threading_.SoloThreadedTask`
"""
[docs] def __init__(self, callback, signals=None, *args, **kwds ):
"""
Args:
callback (callable):
A function, method, or class that you would like to run in
a separate thread.
signals (dict, optional):
Dictionary of signal-names, and the datatypes they will emit.
Signals defined here will override any default signals.
.. code-block:: python
{
# signal-name # # datatype # # equivalent-to #
'add_item': (int,str), #: QtCore.Signal(int,str)
'add_progress': int, #: QtCore.Signal(int)
'returned': None, #: QtCore.Signal()
}
*args/**kwds:
Any additional arguments/keyword-arguments are passed
to the callback in :py:meth:`run`
"""
QtCore.QRunnable.__init__(self)
# Arguments
self._callback = callback
self._args = args
self._kwds = kwds
self._id = None # used by SoloThreadedTask
# Attributes
self._signals = {
'returned': None,
'exception': None,
'abort_requested': None,
}
if signals:
self._signals.update( signals )
self._signalmgr = SignalManagerFactory( self._signals )
[docs] def run(self):
"""
Runs ``callback( *args, **kwds )`` in a separate thread. This method
will be called automatically by the :py:obj:`QtCore.QThreadPool`
when queued (see :py:meth:`start`)
If an unhandled exception is raised during the callback's execution,
the signal *exception* is called. Otherwise the signal *returned*
is emitted when the method completes.
If you desire to catch the return-value of the callback, simply define
the expected return-value for it in the argument `signals`.
.. code-block:: python
task = ThreadedTask(
callback = mycallback,
signals = {'returned': (int,int)}, #: mycallback is now expected to return 2x integers
)
"""
try:
retval = self._callback( signalmgr=self._signalmgr, *self._args, **self._kwds )
if not self._signals['returned']:
self._signalmgr.returned.emit()
else:
self._signalmgr.returned.emit( retval )
except( UserCancelledOperation ):
logger.debug('Responding to user-cancelled-operation. Exiting thread: %s' % repr(self) )
exc_info = sys.exc_info()
self._signalmgr.exception.emit()
except:
logger.error( 'called with %s( %s, %s )' % (repr(self._callback), repr(self._args), repr(self._kwds) ) )
exc_info = sys.exc_info()
logger.error( '%s\n\nUnhandled Exception occurred in thread: %s' % (traceback.format_exc(exc_info), repr(exc_info)) )
self._signalmgr.exception.emit()
[docs] def start(self, expiryTimeout=-1, threadpool=None ):
"""
Queues this thread in a :py:obj:`QtCore.QThreadPool`
(by default :py:obj:`QtCore.QThreadPool.globalInstance()` )
Args:
expiryTimeout (int, optional):
Thread that unused for N milliseconds are considered expired
and will exit. By default, no exipiryTimeout is set ``(-1)``.
threadpool (QtCore.QThreadPool, optional):
By default, this :py:obj:`ThreadedTask` will be queued in the
QCoreApplication's global threadpool. If you would prefer to assign
another, you may specify it here.
"""
if not threadpool:
threadpool = QtCore.QThreadPool.globalInstance()
threadpool.start( self, expiryTimeout )
[docs] def signalmgr(self):
"""
Returns :py:obj:`SignalManager` instance (QObject that will be
passed to separate thread, and stores all signals the thread will
use to communicate back to the UI thread.)
"""
return self._signalmgr
[docs] def signal(self, signal_name):
"""
Returns one of the :py:obj:`QtCore.Signal` s defined in `signals`.
See documentation in :py:meth:`__init__`.
"""
return getattr( self._signalmgr, signal_name )
[docs] def request_abort(self,*args,**kwds):
"""
Runs :py:meth:`SignalManager._request_abort` .
(your callback will still need to periodically run
:py:meth:`SignalManager.handle_if_abort` at safe points
to exit).
"""
logger.warning('Abort Requested for `ThreadedTask`: %s' % repr(self))
self._signalmgr._request_abort()
[docs]class SoloThreadedTask( object ):
"""
:py:obj:`ThreadedTask` that cancels all of it's running/pending threads (started by
this :py:obj:`SoloThreadedTask`) whenever a new thread is requested (and all must exit
before the latest requested task is allowed to start ).
This is designed for methods that load or filter the contents
of a widget.
.. warning::
If slot connected to one of this task's signals does not update
UI until this thread exits, add the following line to the end of your slot.
.. code-block:: python
QtCore.QCoreApplication.instance().processEvents()
Example:
.. code-block:: python
class MyList( QtWidgets.QListWidget ):
def __init__(self):
self._thread_loading = SoloThreadedTask(
callback = self._find_list_items,
signals = {'add_item': str},
connections = {'add_item': [self.addItem] },
)
def load(self):
#
# whenever `self.load` is called
# the last load will be cancelled,
# after which a new load process will start
#
self._thread_loading.start()
def _find_list_items(self, signalmgr=None ):
for i in range(100):
signalmgr.handle_if_abort() # check for a request-abort, and exit early
time.sleep(1)
signalmgr.add_item.emit( i ) # add an item to the list
See Also:
* :py:obj:`qconcurrency.threading_.ThreadedTask`
"""
[docs] def __init__(self, callback, signals=None, connections=None, mutex_expiry=5000 ):
"""
Args:
callback (callable):
A function, method, or class that you would like to run in
a separate thread.
signals (dict, optional):
Dictionary of signal-names, and the datatypes they will emit.
Signals defined here will override any default signals.
.. code-block:: python
{
# signal-name # # datatype # # equivalent-to #
'add_item': (int,str), #: QtCore.Signal(int,str)
'add_progress': int, #: QtCore.Signal(int)
'returned': None, #: QtCore.Signal()
}
connections (dict, optional):
Dictionary of signal-names, and a python-callable, or list
of python-callables to connect to the signal.
.. code-block:: python
{
'add_item': [ printargs, mylist.addItem ],
'add_progress': progbar.add_progress,
...
}
*args/**kwds:
Any additional arguments/keyword-arguments are passed
to the callback in :py:meth:`run`
"""
if signals:
if not isinstance( signals, MutableMapping ):
raise TypeError((
'Expected dictionary for `signals` argument, '
'received type: %s' ) % repr(type(signals))
)
if connections:
if not isinstance( connections, MutableMapping ):
raise TypeError((
'Expected dictionary for `connections` argument '
'received type: %s' ) % repr(type(connections))
)
# Args
self._callback = callback
self._mutex_expiry = mutex_expiry
self._active_threads = OrderedDict() # { uuid : request_abort(method) }
self._thread_with_mutex = None # uuid.uuid4().hex of thread holding `self._mutex_loading`
# ( continues to hold Id after thread exits )
# ( to prevent race-conditions )
# ( (who handled `returned/exception` signal first) )
self._signals = {
'thread_acquired_mutex': str, # emits uuid assigned to thread holding mutex
'_thread_exit_' : str, # uuid
}
if signals:
self._signals.update( signals )
self._connections = connections
# locks
self._mutex_loading = QtCore.QMutex()
[docs] def start(self, expiryTimeout=-1, threadpool=None, wait=False, _connections=None, *args,**kwds):
"""
Creates/starts a new :py:obj:`ThreadedTask`, and cancels
all other pending/running threads started by this
:py:obj:`SoloThreadedTask` instance.
Args:
expiryTimeout (int, optional):
Thread that unused for N milliseconds are considered expired
and will exit. By default, no exipiryTimeout is set ``(-1)``.
threadpool (QtCore.QThreadPool, optional):
By default, this :py:obj:`ThreadedTask` will be queued in the
QCoreApplication's global threadpool. If you would prefer to assign
another, you may specify it here.
_connections(dict, optional):
Entirely replaces the connections defined in ``__init__`` .
Only use this if you are absolutely certain you know what
you are doing.
wait (int, bool, optional): ``(ex: True, False, 1.5 )``
Similar to :py:meth:`threading.Thread.join` this allows
you to wait for the thread to complete, or wait N seconds
for it to complete. If the timeout expires, the exception
:py:obj:`TimedOut` is raised.
*args/**kwds:
Any additional arguments/keyword-arguments will be passed
to the `callback` defined in :py:meth:`__init__` when it is
run from it's separate thread.
"""
threadId = uuid.uuid4().hex
task = ThreadedTask(
callback = self._run,
signals = self._signals,
# args/kwds
threadId = threadId,
*args, **kwds
)
self._active_threads[ threadId ] = task.request_abort
if not _connections:
_connections = self._connections
# setup all user-defined connections
if _connections:
for signal_name in _connections:
if isinstance( _connections[ signal_name ], Iterable ):
for callback in _connections[ signal_name ]:
task.signal( signal_name ).connect( callback )
else:
task.signal( signal_name ).connect(
_connections[signal_name]
)
task.signal('thread_acquired_mutex').connect(
self._set_active_threadId
)
task.signal('_thread_exit_').connect(
self._set_complete_threadId,
QtCore.Qt.DirectConnection
)
if not wait:
task.start( expiryTimeout=expiryTimeout, threadpool=threadpool )
logger.debug('created threadId: %s' % threadId)
else:
elapsed = 0
# wait for thread to lock
while self._mutex_loading.tryLock(0) and threadId in self._active_threads:
if elapsed == 0:
task.start( expiryTimeout=expiryTimeout, threadpool=threadpool )
logger.debug('created threadId: %s' % threadId)
self._mutex_loading.unlock()
time.sleep(0.05)
elapsed += 0.05
logger.debug( 'locked by thread' )
# wait for thread to unlock
while threadId in self._active_threads and self._active_threads:
if wait not in (True,False):
if elapsed >= wait:
raise TimedOut(
'waited %ss for job to complete without success' % elapsed
)
time.sleep(0.05)
elapsed += 0.05
QtCore.QCoreApplication.instance().processEvents()
self._mutex_loading.unlock()
def _run(self, threadId=None, signalmgr=None, *args, **kwds ):
"""
This is the method that is run in a separate thread.
* manages/waits for `self._mutex_loading`
* cancels all pending threads
* calls your callback method
"""
if not self._mutex_loading.tryLock():
logger.debug('Waiting for loading mutex to be released: %s' % threadId )
self.stop( until_threadId=threadId )
self._mutex_loading.tryLock( self._mutex_expiry )
logger.debug('mutex acquired by threadId: %s' % threadId)
signalmgr.thread_acquired_mutex.emit( threadId )
retval = None
try:
retval = self._callback(
signalmgr = signalmgr,
*args, **kwds
)
except( UserCancelledOperation ):
exc_info = sys.exc_info()
logger.debug('Responding to user-cancelled-operation. Exiting thread: %s' % repr(self) )
signalmgr._thread_exit_.emit( threadId )
self._mutex_loading.unlock()
except:
exc_info = sys.exc_info()
logger.error( '%s\n\nUnhandled Exception occurred in thread: %s' % (traceback.format_exc(exc_info), repr(exc_info)) )
signalmgr._thread_exit_.emit( threadId )
self._mutex_loading.unlock()
signalmgr._thread_exit_.emit( threadId )
logger.debug('mutex released by threadId: %s' % threadId)
self._mutex_loading.unlock()
return retval
[docs] def stop(self, until_threadId=None, wait=None ):
"""
Emits `request_abort` signal on all threads up-to (but not including)
the target `until_threadId`. If `until_threadId` is not provided, all
pending threads are killed.
(threadIds will be automatically removed from attr :py:attr:`_active_threads`
as they return, or raise unhandled exceptions).
Args:
until_threadId:
Requests abort on all running threads until (and not including)
the thread with a threadId matching `until_threadId`.
wait (numbers.Number, optional): ``(ex: None, -1, 100)``
Optionally, wait *N* seconds for job to complete
after requesting abort. If negative number, waits
indefinitely. Waiting in the main UI thread will
result in a deadlock.
Raises
* :py:obj:`TimedOut` if user set a wait time.
"""
for active_threadId in self._active_threads:
if active_threadId == until_threadId:
return
else:
logger.debug('requesting abort on threadId: %s' % active_threadId )
self._active_threads[ active_threadId ]()
if wait:
elapsed = 0
while True:
locked = self._mutex_loading.tryLock(0)
# if lock is free, exit
if locked:
self._mutex_loading.unlock()
break
# if waited user-requested time, exit
if elapsed > 0 and elapsed >= wait:
raise TimedOut('waited %ss for thread to end' % elapsed)
break
time.sleep(0.05)
elapsed += 0.05
def _set_active_threadId(self, threadId):
"""
Reports back the UI thread, informing it which thread was
last to hold the :py:attr:`_mutex_loading`.
Args:
threadId (str)
A string containing a UUID (:py:attr:`uuid.uuid4.hex`)
"""
self._thread_with_mutex = threadId
def _set_complete_threadId(self, threadId):
"""
Remove all threadIds from `active_threadIds` up until
the thread that signaled indicating it was finished.
(removing discontinued will remove a lot of clutter)
"""
if threadId in self._active_threads:
self._active_threads.pop( threadId )
[docs] def is_active(self):
"""
Returns ``True`` if this :py:obj:`SoloThreadedTask` is
overseeing an active thread.
"""
if self._active_threads:
return True
return False
if __name__ == '__main__':
from qconcurrency import QApplication
from Qt import QtWidgets
import time
import supercli.logging
supercli.logging.SetLog(lv=10)
def test_threadedtask():
def long_running_job( thread_num, signalmgr=None ):
print( 'thread started (%s)' % thread_num )
for i in range(3):
signalmgr.handle_if_abort()
time.sleep(1)
print( 'job finished (%s)' % thread_num )
with QApplication():
label = QtWidgets.QLabel('watch progress in terminal window..')
label.show()
for i in range(5):
task = ThreadedTask(
callback = long_running_job,
# args/kwds
thread_num = i,
)
task.start()
def test_solo_threadedtask():
def long_running_job( thread_num, signalmgr=None ):
print( '[%s] thread started' % thread_num )
signalmgr.print_txt.emit('test signal')
for i in range(3):
print( '[%s] thread step %s/3' % (thread_num, i+1) )
signalmgr.handle_if_abort()
time.sleep(1)
print( '[%s] thread finished' % thread_num )
def printtxt(msg):
print(msg)
with QApplication():
label = QtWidgets.QLabel('watch progress in terminal window..')
label.show()
solotask = SoloThreadedTask(
callback = long_running_job,
signals = {'print_txt':str},
connections = {'print_txt':[printtxt]},
)
# every 1s, cancel current job with
# a new job.
for i in range(5):
solotask.start( thread_num=i+1 )
time.sleep(1)
def runtests():
#test_threadedtask()
test_solo_threadedtask()
runtests()