Back to index

python3.2  3.2.2
Public Member Functions
test.test_dummy_thread.ThreadTests Class Reference

List of all members.

Public Member Functions

def test_arg_passing
def test_multi_creation

Detailed Description

Test thread creation.

Definition at line 119 of file test_dummy_thread.py.


Member Function Documentation

Definition at line 122 of file test_dummy_thread.py.

00122 
00123     def test_arg_passing(self):
00124         #Make sure that parameter passing works.
00125         def arg_tester(queue, arg1=False, arg2=False):
00126             """Use to test _thread.start_new_thread() passes args properly."""
00127             queue.put((arg1, arg2))
00128 
00129         testing_queue = queue.Queue(1)
00130         _thread.start_new_thread(arg_tester, (testing_queue, True, True))
00131         result = testing_queue.get()
00132         self.assertTrue(result[0] and result[1],
00133                         "Argument passing for thread creation using tuple failed")
00134         _thread.start_new_thread(arg_tester, tuple(), {'queue':testing_queue,
00135                                                        'arg1':True, 'arg2':True})
00136         result = testing_queue.get()
00137         self.assertTrue(result[0] and result[1],
00138                         "Argument passing for thread creation using kwargs failed")
00139         _thread.start_new_thread(arg_tester, (testing_queue, True), {'arg2':True})
00140         result = testing_queue.get()
00141         self.assertTrue(result[0] and result[1],
00142                         "Argument passing for thread creation using both tuple"
00143                         " and kwargs failed")

Here is the call graph for this function:

Definition at line 144 of file test_dummy_thread.py.

00144 
00145     def test_multi_creation(self):
00146         #Make sure multiple threads can be created.
00147         def queue_mark(queue, delay):
00148             """Wait for ``delay`` seconds and then put something into ``queue``"""
00149             time.sleep(delay)
00150             queue.put(_thread.get_ident())
00151 
00152         thread_count = 5
00153         testing_queue = queue.Queue(thread_count)
00154         if support.verbose:
00155             print()
00156             print("*** Testing multiple thread creation "\
00157             "(will take approx. %s to %s sec.) ***" % (DELAY, thread_count))
00158         for count in range(thread_count):
00159             if DELAY:
00160                 local_delay = round(random.random(), 1)
00161             else:
00162                 local_delay = 0
00163             _thread.start_new_thread(queue_mark,
00164                                      (testing_queue, local_delay))
00165         time.sleep(DELAY)
00166         if support.verbose:
00167             print('done')
00168         self.assertTrue(testing_queue.qsize() == thread_count,
00169                         "Not all %s threads executed properly after %s sec." %
00170                         (thread_count, DELAY))

Here is the call graph for this function:


The documentation for this class was generated from the following file: