Back to index

python3.2  3.2.2
Public Member Functions | Private Member Functions | Private Attributes
test.test_threading_local.BaseLocalTest Class Reference
Inheritance diagram for test.test_threading_local.BaseLocalTest:
Inheritance graph
[legend]

List of all members.

Public Member Functions

def test_local_refs
def test_derived
def test_derived_cycle_dealloc
def test_arguments
def test_threading_local
def test_threading_local_subclass
def test_dict_attribute
def test_dict_attribute_subclass
def test_cycle_collection

Private Member Functions

def _local_refs
def _test_one_class
def _test_dict_attribute

Private Attributes

 _failed

Detailed Description

Definition at line 22 of file test_threading_local.py.


Member Function Documentation

def test.test_threading_local.BaseLocalTest._local_refs (   self,
  n 
) [private]

Definition at line 29 of file test_threading_local.py.

00029 
00030     def _local_refs(self, n):
00031         local = self._local()
00032         weaklist = []
00033         for i in range(n):
00034             t = threading.Thread(target=target, args=(local, weaklist))
00035             t.start()
00036             t.join()
00037         del t
00038 
00039         gc.collect()
00040         self.assertEqual(len(weaklist), n)
00041 
00042         # XXX _threading_local keeps the local of the last stopped thread alive.
00043         deadlist = [weak for weak in weaklist if weak() is None]
00044         self.assertIn(len(deadlist), (n-1, n))
00045 
00046         # Assignment to the same thread local frees it sometimes (!)
00047         local.someothervar = None
00048         gc.collect()
00049         deadlist = [weak for weak in weaklist if weak() is None]
00050         self.assertIn(len(deadlist), (n-1, n), (n, len(deadlist)))

Here is the call graph for this function:

Here is the caller graph for this function:

Definition at line 170 of file test_threading_local.py.

00170 
00171     def _test_dict_attribute(self, cls):
00172         obj = cls()
00173         obj.x = 5
00174         self.assertEqual(obj.__dict__, {'x': 5})
00175         with self.assertRaises(AttributeError):
00176             obj.__dict__ = {}
00177         with self.assertRaises(AttributeError):
00178             del obj.__dict__

Here is the call graph for this function:

Here is the caller graph for this function:

Definition at line 126 of file test_threading_local.py.

00126 
00127     def _test_one_class(self, c):
00128         self._failed = "No error message set or cleared."
00129         obj = c()
00130         e1 = threading.Event()
00131         e2 = threading.Event()
00132 
00133         def f1():
00134             obj.x = 'foo'
00135             obj.y = 'bar'
00136             del obj.y
00137             e1.set()
00138             e2.wait()
00139 
00140         def f2():
00141             try:
00142                 foo = obj.x
00143             except AttributeError:
00144                 # This is expected -- we haven't set obj.x in this thread yet!
00145                 self._failed = ""  # passed
00146             else:
00147                 self._failed = ('Incorrectly got value %r from class %r\n' %
00148                                 (foo, c))
00149                 sys.stderr.write(self._failed)
00150 
00151         t1 = threading.Thread(target=f1)
00152         t1.start()
00153         e1.wait()
00154         t2 = threading.Thread(target=f2)
00155         t2.start()
00156         t2.join()
00157         # The test is done; just let t1 know it can exit, and wait for it.
00158         e2.set()
00159         t1.join()
00160 
00161         self.assertFalse(self._failed, self._failed)

Here is the caller graph for this function:

Definition at line 115 of file test_threading_local.py.

00115 
00116     def test_arguments(self):
00117         # Issue 1522237
00118         class MyLocal(self._local):
00119             def __init__(self, *args, **kwargs):
00120                 pass
00121 
00122         MyLocal(a=1)
00123         MyLocal(1)
00124         self.assertRaises(TypeError, self._local, a=1)
00125         self.assertRaises(TypeError, self._local, 1)

Here is the call graph for this function:

Definition at line 187 of file test_threading_local.py.

00187 
00188     def test_cycle_collection(self):
00189         class X:
00190             pass
00191 
00192         x = X()
00193         x.local = self._local()
00194         x.local.x = x
00195         wr = weakref.ref(x)
00196         del x
00197         gc.collect()
00198         self.assertIs(wr(), None)
00199 

Definition at line 51 of file test_threading_local.py.

00051 
00052     def test_derived(self):
00053         # Issue 3088: if there is a threads switch inside the __init__
00054         # of a threading.local derived class, the per-thread dictionary
00055         # is created but not correctly set on the object.
00056         # The first member set may be bogus.
00057         import time
00058         class Local(self._local):
00059             def __init__(self):
00060                 time.sleep(0.01)
00061         local = Local()
00062 
00063         def f(i):
00064             local.x = i
00065             # Simply check that the variable is correctly set
00066             self.assertEqual(local.x, i)
00067 
00068         threads= []
00069         for i in range(10):
00070             t = threading.Thread(target=f, args=(i,))
00071             t.start()
00072             threads.append(t)
00073 
00074         for t in threads:
00075             t.join()

Here is the call graph for this function:

Definition at line 76 of file test_threading_local.py.

00076 
00077     def test_derived_cycle_dealloc(self):
00078         # http://bugs.python.org/issue6990
00079         class Local(self._local):
00080             pass
00081         locals = None
00082         passed = False
00083         e1 = threading.Event()
00084         e2 = threading.Event()
00085 
00086         def f():
00087             nonlocal passed
00088             # 1) Involve Local in a cycle
00089             cycle = [Local()]
00090             cycle.append(cycle)
00091             cycle[0].foo = 'bar'
00092 
00093             # 2) GC the cycle (triggers threadmodule.c::local_clear
00094             # before local_dealloc)
00095             del cycle
00096             gc.collect()
00097             e1.set()
00098             e2.wait()
00099 
00100             # 4) New Locals should be empty
00101             passed = all(not hasattr(local, 'foo') for local in locals)
00102 
00103         t = threading.Thread(target=f)
00104         t.start()
00105         e1.wait()
00106 
00107         # 3) New Locals should recycle the original's address. Creating
00108         # them in the thread overwrites the thread state and avoids the
00109         # bug
00110         locals = [Local() for i in range(10)]
00111         e2.set()
00112         t.join()
00113 
00114         self.assertTrue(passed)

Definition at line 179 of file test_threading_local.py.

00179 
00180     def test_dict_attribute(self):
00181         self._test_dict_attribute(self._local)

Here is the call graph for this function:

Definition at line 182 of file test_threading_local.py.

00182 
00183     def test_dict_attribute_subclass(self):
00184         class LocalSubclass(self._local):
00185             """To test that subclasses behave properly."""
00186         self._test_dict_attribute(LocalSubclass)

Definition at line 24 of file test_threading_local.py.

00024 
00025     def test_local_refs(self):
00026         self._local_refs(20)
00027         self._local_refs(50)
00028         self._local_refs(100)

Here is the call graph for this function:

Definition at line 162 of file test_threading_local.py.

00162 
00163     def test_threading_local(self):
00164         self._test_one_class(self._local)

Here is the call graph for this function:

Definition at line 165 of file test_threading_local.py.

00165 
00166     def test_threading_local_subclass(self):
00167         class LocalSubclass(self._local):
00168             """To test that subclasses behave properly."""
00169         self._test_one_class(LocalSubclass)


Member Data Documentation

Definition at line 127 of file test_threading_local.py.


The documentation for this class was generated from the following file: