Back to index

python3.2  3.2.2
Public Member Functions | Static Public Attributes | Private Member Functions
test.test_threading.ThreadJoinOnShutdown Class Reference
Inheritance diagram for test.test_threading.ThreadJoinOnShutdown:
Inheritance graph
[legend]
Collaboration diagram for test.test_threading.ThreadJoinOnShutdown:
Collaboration graph
[legend]

List of all members.

Public Member Functions

def test_1_join_on_shutdown
def test_2_join_in_forked_process
def test_3_join_in_forked_from_thread
def assertScriptHasOutput
def test_4_joining_across_fork_in_worker_thread
def test_5_clear_waiter_locks_to_avoid_crash
def test_6_daemon_threads
def setUp
def tearDown

Static Public Attributes

tuple platforms_to_skip

Private Member Functions

def _run_and_join

Detailed Description

Definition at line 411 of file test_threading.py.


Member Function Documentation

def test.test_threading.ThreadJoinOnShutdown._run_and_join (   self,
  script 
) [private]

Definition at line 420 of file test_threading.py.

00420 
00421     def _run_and_join(self, script):
00422         script = """if 1:
00423             import sys, os, time, threading
00424 
00425             # a thread, which waits for the main program to terminate
00426             def joiningfunc(mainthread):
00427                 mainthread.join()
00428                 print('end of thread')
00429                 # stdout is fully buffered because not a tty, we have to flush
00430                 # before exit.
00431                 sys.stdout.flush()
00432         \n""" + script
00433 
00434         rc, out, err = assert_python_ok("-c", script)
00435         data = out.decode().replace('\r', '')
00436         self.assertEqual(data, "end of main\nend of thread\n")

Here is the call graph for this function:

Here is the caller graph for this function:

def test.test_threading.ThreadJoinOnShutdown.assertScriptHasOutput (   self,
  script,
  expected_output 
)

Definition at line 491 of file test_threading.py.

00491 
00492     def assertScriptHasOutput(self, script, expected_output):
00493         rc, out, err = assert_python_ok("-c", script)
00494         data = out.decode().replace('\r', '')
00495         self.assertEqual(data, expected_output)

Here is the call graph for this function:

Here is the caller graph for this function:

def test.test_threading.BaseTestCase.setUp (   self) [inherited]

Definition at line 64 of file test_threading.py.

Here is the caller graph for this function:

def test.test_threading.BaseTestCase.tearDown (   self) [inherited]

Definition at line 67 of file test_threading.py.

Here is the call graph for this function:

Here is the caller graph for this function:

Definition at line 437 of file test_threading.py.

00437 
00438     def test_1_join_on_shutdown(self):
00439         # The usual case: on exit, wait for a non-daemon thread
00440         script = """if 1:
00441             import os
00442             t = threading.Thread(target=joiningfunc,
00443                                  args=(threading.current_thread(),))
00444             t.start()
00445             time.sleep(0.1)
00446             print('end of main')
00447             """
00448         self._run_and_join(script)

Here is the call graph for this function:

Definition at line 451 of file test_threading.py.

00451 
00452     def test_2_join_in_forked_process(self):
00453         # Like the test above, but from a forked interpreter
00454         script = """if 1:
00455             childpid = os.fork()
00456             if childpid != 0:
00457                 os.waitpid(childpid, 0)
00458                 sys.exit(0)
00459 
00460             t = threading.Thread(target=joiningfunc,
00461                                  args=(threading.current_thread(),))
00462             t.start()
00463             print('end of main')
00464             """
00465         self._run_and_join(script)

Here is the call graph for this function:

Definition at line 468 of file test_threading.py.

00468 
00469     def test_3_join_in_forked_from_thread(self):
00470         # Like the test above, but fork() was called from a worker thread
00471         # In the forked process, the main Thread object must be marked as stopped.
00472 
00473         script = """if 1:
00474             main_thread = threading.current_thread()
00475             def worker():
00476                 childpid = os.fork()
00477                 if childpid != 0:
00478                     os.waitpid(childpid, 0)
00479                     sys.exit(0)
00480 
00481                 t = threading.Thread(target=joiningfunc,
00482                                      args=(main_thread,))
00483                 print('end of main')
00484                 t.start()
00485                 t.join() # Should not block: main_thread is already stopped
00486 
00487             w = threading.Thread(target=worker)
00488             w.start()
00489             """
00490         self._run_and_join(script)

Here is the call graph for this function:

Definition at line 498 of file test_threading.py.

00498 
00499     def test_4_joining_across_fork_in_worker_thread(self):
00500         # There used to be a possible deadlock when forking from a child
00501         # thread.  See http://bugs.python.org/issue6643.
00502 
00503         # The script takes the following steps:
00504         # - The main thread in the parent process starts a new thread and then
00505         #   tries to join it.
00506         # - The join operation acquires the Lock inside the thread's _block
00507         #   Condition.  (See threading.py:Thread.join().)
00508         # - We stub out the acquire method on the condition to force it to wait
00509         #   until the child thread forks.  (See LOCK ACQUIRED HERE)
00510         # - The child thread forks.  (See LOCK HELD and WORKER THREAD FORKS
00511         #   HERE)
00512         # - The main thread of the parent process enters Condition.wait(),
00513         #   which releases the lock on the child thread.
00514         # - The child process returns.  Without the necessary fix, when the
00515         #   main thread of the child process (which used to be the child thread
00516         #   in the parent process) attempts to exit, it will try to acquire the
00517         #   lock in the Thread._block Condition object and hang, because the
00518         #   lock was held across the fork.
00519 
00520         script = """if 1:
00521             import os, time, threading
00522 
00523             finish_join = False
00524             start_fork = False
00525 
00526             def worker():
00527                 # Wait until this thread's lock is acquired before forking to
00528                 # create the deadlock.
00529                 global finish_join
00530                 while not start_fork:
00531                     time.sleep(0.01)
00532                 # LOCK HELD: Main thread holds lock across this call.
00533                 childpid = os.fork()
00534                 finish_join = True
00535                 if childpid != 0:
00536                     # Parent process just waits for child.
00537                     os.waitpid(childpid, 0)
00538                 # Child process should just return.
00539 
00540             w = threading.Thread(target=worker)
00541 
00542             # Stub out the private condition variable's lock acquire method.
00543             # This acquires the lock and then waits until the child has forked
00544             # before returning, which will release the lock soon after.  If
00545             # someone else tries to fix this test case by acquiring this lock
00546             # before forking instead of resetting it, the test case will
00547             # deadlock when it shouldn't.
00548             condition = w._block
00549             orig_acquire = condition.acquire
00550             call_count_lock = threading.Lock()
00551             call_count = 0
00552             def my_acquire():
00553                 global call_count
00554                 global start_fork
00555                 orig_acquire()  # LOCK ACQUIRED HERE
00556                 start_fork = True
00557                 if call_count == 0:
00558                     while not finish_join:
00559                         time.sleep(0.01)  # WORKER THREAD FORKS HERE
00560                 with call_count_lock:
00561                     call_count += 1
00562             condition.acquire = my_acquire
00563 
00564             w.start()
00565             w.join()
00566             print('end of main')
00567             """
00568         self.assertScriptHasOutput(script, "end of main\n")

Here is the call graph for this function:

Definition at line 571 of file test_threading.py.

00571 
00572     def test_5_clear_waiter_locks_to_avoid_crash(self):
00573         # Check that a spawned thread that forks doesn't segfault on certain
00574         # platforms, namely OS X.  This used to happen if there was a waiter
00575         # lock in the thread's condition variable's waiters list.  Even though
00576         # we know the lock will be held across the fork, it is not safe to
00577         # release locks held across forks on all platforms, so releasing the
00578         # waiter lock caused a segfault on OS X.  Furthermore, since locks on
00579         # OS X are (as of this writing) implemented with a mutex + condition
00580         # variable instead of a semaphore, while we know that the Python-level
00581         # lock will be acquired, we can't know if the internal mutex will be
00582         # acquired at the time of the fork.
00583 
00584         script = """if True:
00585             import os, time, threading
00586 
00587             start_fork = False
00588 
00589             def worker():
00590                 # Wait until the main thread has attempted to join this thread
00591                 # before continuing.
00592                 while not start_fork:
00593                     time.sleep(0.01)
00594                 childpid = os.fork()
00595                 if childpid != 0:
00596                     # Parent process just waits for child.
00597                     (cpid, rc) = os.waitpid(childpid, 0)
00598                     assert cpid == childpid
00599                     assert rc == 0
00600                     print('end of worker thread')
00601                 else:
00602                     # Child process should just return.
00603                     pass
00604 
00605             w = threading.Thread(target=worker)
00606 
00607             # Stub out the private condition variable's _release_save method.
00608             # This releases the condition's lock and flips the global that
00609             # causes the worker to fork.  At this point, the problematic waiter
00610             # lock has been acquired once by the waiter and has been put onto
00611             # the waiters list.
00612             condition = w._block
00613             orig_release_save = condition._release_save
00614             def my_release_save():
00615                 global start_fork
00616                 orig_release_save()
00617                 # Waiter lock held here, condition lock released.
00618                 start_fork = True
00619             condition._release_save = my_release_save
00620 
00621             w.start()
00622             w.join()
00623             print('end of main thread')
00624             """
00625         output = "end of worker thread\nend of main thread\n"
00626         self.assertScriptHasOutput(script, output)

Here is the call graph for this function:

Definition at line 627 of file test_threading.py.

00627 
00628     def test_6_daemon_threads(self):
00629         # Check that a daemon thread cannot crash the interpreter on shutdown
00630         # by manipulating internal structures that are being disposed of in
00631         # the main thread.
00632         script = """if True:
00633             import os
00634             import random
00635             import sys
00636             import time
00637             import threading
00638 
00639             thread_has_run = set()
00640 
00641             def random_io():
00642                 '''Loop for a while sleeping random tiny amounts and doing some I/O.'''
00643                 while True:
00644                     in_f = open(os.__file__, 'rb')
00645                     stuff = in_f.read(200)
00646                     null_f = open(os.devnull, 'wb')
00647                     null_f.write(stuff)
00648                     time.sleep(random.random() / 1995)
00649                     null_f.close()
00650                     in_f.close()
00651                     thread_has_run.add(threading.current_thread())
00652 
00653             def main():
00654                 count = 0
00655                 for _ in range(40):
00656                     new_thread = threading.Thread(target=random_io)
00657                     new_thread.daemon = True
00658                     new_thread.start()
00659                     count += 1
00660                 while len(thread_has_run) < count:
00661                     time.sleep(0.001)
00662                 # Trigger process shutdown
00663                 sys.exit(0)
00664 
00665             main()
00666             """
00667         rc, out, err = assert_python_ok('-c', script)
00668         self.assertFalse(err)
00669 

Here is the call graph for this function:


Member Data Documentation

Initial value:
('freebsd4', 'freebsd5', 'freebsd6', 'netbsd5',
                         'os2emx')

Definition at line 417 of file test_threading.py.


The documentation for this class was generated from the following file: