Back to index

python3.2  3.2.2
Public Member Functions | Public Attributes
test.test_threading.ThreadTests Class Reference
Inheritance diagram for test.test_threading.ThreadTests:
Inheritance graph
[legend]
Collaboration diagram for test.test_threading.ThreadTests:
Collaboration graph
[legend]

List of all members.

Public Member Functions

def test_various_ops
def test_ident_of_no_threading_threads
def test_various_ops_small_stack
def test_various_ops_large_stack
def test_foreign_thread
def test_PyThreadState_SetAsyncExc
def test_limbo_cleanup
def test_finalize_runnning_thread
def test_finalize_with_trace
def test_join_nondaemon_on_shutdown
def test_enumerate_after_join
def test_no_refcycle_through_target
def test_old_threading_api
def test_repr_daemon
def setUp
def tearDown

Public Attributes

 id
 finished
 should_raise
 thread

Detailed Description

Definition at line 72 of file test_threading.py.


Member Function Documentation

def test.test_threading.BaseTestCase.setUp (   self) [inherited]

Definition at line 64 of file test_threading.py.

Here is the caller graph for this function:

def test.test_threading.BaseTestCase.tearDown (   self) [inherited]

Definition at line 67 of file test_threading.py.

Here is the call graph for this function:

Here is the caller graph for this function:

Definition at line 343 of file test_threading.py.

00343 
00344     def test_enumerate_after_join(self):
00345         # Try hard to trigger #1703448: a thread is still returned in
00346         # threading.enumerate() after it has been join()ed.
00347         enum = threading.enumerate
00348         old_interval = sys.getswitchinterval()
00349         try:
00350             for i in range(1, 100):
00351                 sys.setswitchinterval(i * 0.0002)
00352                 t = threading.Thread(target=lambda: None)
00353                 t.start()
00354                 t.join()
00355                 l = enum()
00356                 self.assertNotIn(t, l,
00357                     "#1703448 triggered after %d trials: %s" % (i, l))
00358         finally:
00359             sys.setswitchinterval(old_interval)

Here is the call graph for this function:

Definition at line 265 of file test_threading.py.

00265 
00266     def test_finalize_runnning_thread(self):
00267         # Issue 1402: the PyGILState_Ensure / _Release functions may be called
00268         # very late on python exit: on deallocation of a running thread for
00269         # example.
00270         import_module("ctypes")
00271 
00272         rc, out, err = assert_python_failure("-c", """if 1:
00273             import ctypes, sys, time, _thread
00274 
00275             # This lock is used as a simple event variable.
00276             ready = _thread.allocate_lock()
00277             ready.acquire()
00278 
00279             # Module globals are cleared before __del__ is run
00280             # So we save the functions in class dict
00281             class C:
00282                 ensure = ctypes.pythonapi.PyGILState_Ensure
00283                 release = ctypes.pythonapi.PyGILState_Release
00284                 def __del__(self):
00285                     state = self.ensure()
00286                     self.release(state)
00287 
00288             def waitingThread():
00289                 x = C()
00290                 ready.release()
00291                 time.sleep(100)
00292 
00293             _thread.start_new_thread(waitingThread, ())
00294             ready.acquire()  # Be sure the other thread is waiting.
00295             sys.exit(42)
00296             """)
00297         self.assertEqual(rc, 42)

Here is the call graph for this function:

Definition at line 298 of file test_threading.py.

00298 
00299     def test_finalize_with_trace(self):
00300         # Issue1733757
00301         # Avoid a deadlock when sys.settrace steps into threading._shutdown
00302         assert_python_ok("-c", """if 1:
00303             import sys, threading
00304 
00305             # A deadlock-killer, to prevent the
00306             # testsuite to hang forever
00307             def killer():
00308                 import os, time
00309                 time.sleep(2)
00310                 print('program blocked; aborting')
00311                 os._exit(2)
00312             t = threading.Thread(target=killer)
00313             t.daemon = True
00314             t.start()
00315 
00316             # This is the trace function
00317             def func(frame, event, arg):
00318                 threading.current_thread()
00319                 return func
00320 
00321             sys.settrace(func)
00322             """)

Here is the call graph for this function:

Definition at line 146 of file test_threading.py.

00146 
00147     def test_foreign_thread(self):
00148         # Check that a "foreign" thread can use the threading module.
00149         def f(mutex):
00150             # Calling current_thread() forces an entry for the foreign
00151             # thread to get made in the threading._active map.
00152             threading.current_thread()
00153             mutex.release()
00154 
00155         mutex = threading.Lock()
00156         mutex.acquire()
00157         tid = _thread.start_new_thread(f, (mutex,))
00158         # Wait for the thread to finish.
00159         mutex.acquire()
00160         self.assertIn(tid, threading._active)
00161         self.assertIsInstance(threading._active[tid], threading._DummyThread)
00162         del threading._active[tid]

Here is the call graph for this function:

Definition at line 108 of file test_threading.py.

00108 
00109     def test_ident_of_no_threading_threads(self):
00110         # The ident still must work for the main thread and dummy threads.
00111         self.assertFalse(threading.currentThread().ident is None)
00112         def f():
00113             ident.append(threading.currentThread().ident)
00114             done.set()
00115         done = threading.Event()
00116         ident = []
00117         _thread.start_new_thread(f, ())
00118         done.wait()
00119         self.assertFalse(ident[0] is None)
00120         # Kill the "immortal" _DummyThread
00121         del threading._active[ident[0]]

Here is the call graph for this function:

Definition at line 323 of file test_threading.py.

00323 
00324     def test_join_nondaemon_on_shutdown(self):
00325         # Issue 1722344
00326         # Raising SystemExit skipped threading._shutdown
00327         rc, out, err = assert_python_ok("-c", """if 1:
00328                 import threading
00329                 from time import sleep
00330 
00331                 def child():
00332                     sleep(1)
00333                     # As a non-daemon thread we SHOULD wake up and nothing
00334                     # should be torn down yet
00335                     print("Woke up, sleep function is:", sleep)
00336 
00337                 threading.Thread(target=child).start()
00338                 raise SystemExit
00339             """)
00340         self.assertEqual(out.strip(),
00341             b"Woke up, sleep function is: <built-in function sleep>")
00342         self.assertEqual(err, b"")

Here is the call graph for this function:

Definition at line 250 of file test_threading.py.

00250 
00251     def test_limbo_cleanup(self):
00252         # Issue 7481: Failure to start thread should cleanup the limbo map.
00253         def fail_new_thread(*args):
00254             raise threading.ThreadError()
00255         _start_new_thread = threading._start_new_thread
00256         threading._start_new_thread = fail_new_thread
00257         try:
00258             t = threading.Thread(target=lambda: None)
00259             self.assertRaises(threading.ThreadError, t.start)
00260             self.assertFalse(
00261                 t in threading._limbo,
00262                 "Failed to cleanup _limbo map on failure of Thread.start().")
00263         finally:
00264             threading._start_new_thread = _start_new_thread

Here is the call graph for this function:

Definition at line 360 of file test_threading.py.

00360 
00361     def test_no_refcycle_through_target(self):
00362         class RunSelfFunction(object):
00363             def __init__(self, should_raise):
00364                 # The links in this refcycle from Thread back to self
00365                 # should be cleaned up when the thread completes.
00366                 self.should_raise = should_raise
00367                 self.thread = threading.Thread(target=self._run,
00368                                                args=(self,),
00369                                                kwargs={'yet_another':self})
00370                 self.thread.start()
00371 
00372             def _run(self, other_ref, yet_another):
00373                 if self.should_raise:
00374                     raise SystemExit
00375 
00376         cyclic_object = RunSelfFunction(should_raise=False)
00377         weak_cyclic_object = weakref.ref(cyclic_object)
00378         cyclic_object.thread.join()
00379         del cyclic_object
00380         self.assertIsNone(weak_cyclic_object(),
00381                          msg=('%d references still around' %
00382                               sys.getrefcount(weak_cyclic_object())))
00383 
00384         raising_cyclic_object = RunSelfFunction(should_raise=True)
00385         weak_raising_cyclic_object = weakref.ref(raising_cyclic_object)
00386         raising_cyclic_object.thread.join()
00387         del raising_cyclic_object
00388         self.assertIsNone(weak_raising_cyclic_object(),
00389                          msg=('%d references still around' %
00390                               sys.getrefcount(weak_raising_cyclic_object())))

Here is the call graph for this function:

Definition at line 391 of file test_threading.py.

00391 
00392     def test_old_threading_api(self):
00393         # Just a quick sanity check to make sure the old method names are
00394         # still present
00395         t = threading.Thread()
00396         t.isDaemon()
00397         t.setDaemon(True)
00398         t.getName()
00399         t.setName("name")
00400         t.isAlive()
00401         e = threading.Event()
00402         e.isSet()
00403         threading.activeCount()

Here is the call graph for this function:

Definition at line 165 of file test_threading.py.

00165 
00166     def test_PyThreadState_SetAsyncExc(self):
00167         ctypes = import_module("ctypes")
00168 
00169         set_async_exc = ctypes.pythonapi.PyThreadState_SetAsyncExc
00170 
00171         class AsyncExc(Exception):
00172             pass
00173 
00174         exception = ctypes.py_object(AsyncExc)
00175 
00176         # First check it works when setting the exception from the same thread.
00177         tid = _thread.get_ident()
00178 
00179         try:
00180             result = set_async_exc(ctypes.c_long(tid), exception)
00181             # The exception is async, so we might have to keep the VM busy until
00182             # it notices.
00183             while True:
00184                 pass
00185         except AsyncExc:
00186             pass
00187         else:
00188             # This code is unreachable but it reflects the intent. If we wanted
00189             # to be smarter the above loop wouldn't be infinite.
00190             self.fail("AsyncExc not raised")
00191         try:
00192             self.assertEqual(result, 1) # one thread state modified
00193         except UnboundLocalError:
00194             # The exception was raised too quickly for us to get the result.
00195             pass
00196 
00197         # `worker_started` is set by the thread when it's inside a try/except
00198         # block waiting to catch the asynchronously set AsyncExc exception.
00199         # `worker_saw_exception` is set by the thread upon catching that
00200         # exception.
00201         worker_started = threading.Event()
00202         worker_saw_exception = threading.Event()
00203 
00204         class Worker(threading.Thread):
00205             def run(self):
00206                 self.id = _thread.get_ident()
00207                 self.finished = False
00208 
00209                 try:
00210                     while True:
00211                         worker_started.set()
00212                         time.sleep(0.1)
00213                 except AsyncExc:
00214                     self.finished = True
00215                     worker_saw_exception.set()
00216 
00217         t = Worker()
00218         t.daemon = True # so if this fails, we don't hang Python at shutdown
00219         t.start()
00220         if verbose:
00221             print("    started worker thread")
00222 
00223         # Try a thread id that doesn't make sense.
00224         if verbose:
00225             print("    trying nonsensical thread id")
00226         result = set_async_exc(ctypes.c_long(-1), exception)
00227         self.assertEqual(result, 0)  # no thread states modified
00228 
00229         # Now raise an exception in the worker thread.
00230         if verbose:
00231             print("    waiting for worker thread to get started")
00232         ret = worker_started.wait()
00233         self.assertTrue(ret)
00234         if verbose:
00235             print("    verifying worker hasn't exited")
00236         self.assertTrue(not t.finished)
00237         if verbose:
00238             print("    attempting to raise asynch exception in worker")
00239         result = set_async_exc(ctypes.c_long(t.id), exception)
00240         self.assertEqual(result, 1) # one thread state modified
00241         if verbose:
00242             print("    waiting for worker to say it caught the exception")
00243         worker_saw_exception.wait(timeout=10)
00244         self.assertTrue(t.finished)
00245         if verbose:
00246             print("    all OK -- joining worker")
00247         if t.finished:
00248             t.join()
00249         # else the thread is still running, and we have no way to kill it

Here is the call graph for this function:

Definition at line 404 of file test_threading.py.

00404 
00405     def test_repr_daemon(self):
00406         t = threading.Thread()
00407         self.assertFalse('daemon' in repr(t))
00408         t.daemon = True
00409         self.assertTrue('daemon' in repr(t))
00410 

Here is the call graph for this function:

Definition at line 76 of file test_threading.py.

00076 
00077     def test_various_ops(self):
00078         # This takes about n/3 seconds to run (about n/3 clumps of tasks,
00079         # times about 1 second per clump).
00080         NUMTASKS = 10
00081 
00082         # no more than 3 of the 10 can run at once
00083         sema = threading.BoundedSemaphore(value=3)
00084         mutex = threading.RLock()
00085         numrunning = Counter()
00086 
00087         threads = []
00088 
00089         for i in range(NUMTASKS):
00090             t = TestThread("<thread %d>"%i, self, sema, mutex, numrunning)
00091             threads.append(t)
00092             self.assertEqual(t.ident, None)
00093             self.assertTrue(re.match('<TestThread\(.*, initial\)>', repr(t)))
00094             t.start()
00095 
00096         if verbose:
00097             print('waiting for all tasks to complete')
00098         for t in threads:
00099             t.join(NUMTASKS)
00100             self.assertTrue(not t.is_alive())
00101             self.assertNotEqual(t.ident, 0)
00102             self.assertFalse(t.ident is None)
00103             self.assertTrue(re.match('<TestThread\(.*, stopped -?\d+\)>',
00104                                      repr(t)))
00105         if verbose:
00106             print('all tasks done')
00107         self.assertEqual(numrunning.get(), 0)

Here is the call graph for this function:

Here is the caller graph for this function:

Definition at line 135 of file test_threading.py.

00135 
00136     def test_various_ops_large_stack(self):
00137         if verbose:
00138             print('with 1MB thread stack size...')
00139         try:
00140             threading.stack_size(0x100000)
00141         except _thread.error:
00142             raise unittest.SkipTest(
00143                 'platform does not support changing thread stack size')
00144         self.test_various_ops()
00145         threading.stack_size(0)

Here is the call graph for this function:

Definition at line 123 of file test_threading.py.

00123 
00124     def test_various_ops_small_stack(self):
00125         if verbose:
00126             print('with 256kB thread stack size...')
00127         try:
00128             threading.stack_size(262144)
00129         except _thread.error:
00130             raise unittest.SkipTest(
00131                 'platform does not support changing thread stack size')
00132         self.test_various_ops()
00133         threading.stack_size(0)

Here is the call graph for this function:


Member Data Documentation

Definition at line 206 of file test_threading.py.

Definition at line 205 of file test_threading.py.

Definition at line 365 of file test_threading.py.

Definition at line 366 of file test_threading.py.


The documentation for this class was generated from the following file: