Back to index

python3.2  3.2.2
Public Member Functions | Static Public Attributes
test.test_io.CBufferedRandomTest Class Reference
Inheritance diagram for test.test_io.CBufferedRandomTest:
Inheritance graph
[legend]
Collaboration diagram for test.test_io.CBufferedRandomTest:
Collaboration graph
[legend]

List of all members.

Public Member Functions

def test_constructor
def test_garbage_collection
def test_unseekable
def test_unseekable
def test_read_and_write
def test_seek_and_tell
def check_flush_and_read
def test_flush_and_read
def test_flush_and_readinto
def test_flush_and_peek
def test_flush_and_write
def test_threads
def test_writes_and_peek
def test_writes_and_reads
def test_writes_and_read1s
def test_writes_and_readintos
def test_write_after_readahead
def test_write_rewind_write
def test_truncate_after_read_or_write
def test_misbehaved_io
def test_read
def test_read1
def test_readinto
def test_readlines
def test_buffering
def test_read_non_blocking
def test_read_past_eof
def test_read_all
def test_no_extraneous_read
def test_detach
def test_detach
def test_fileno
def test_fileno
def test_no_fileno
def test_no_fileno
def test_invalid_args
def test_invalid_args
def test_override_destructor
def test_override_destructor
def test_context_manager
def test_context_manager
def test_error_through_destructor
def test_error_through_destructor
def test_repr
def test_repr
def test_flush_error_on_close
def test_flush_error_on_close
def test_multi_close
def test_multi_close
def test_readonly_attributes
def test_readonly_attributes
def test_detach_flush
def test_write
def test_write_overflow
def check_writes
def test_writes
def test_writes_and_flushes
def test_writes_and_seeks
def test_writes_and_truncates
def test_write_non_blocking
def test_write_and_rewind
def test_flush
def test_destructor
def test_truncate
def test_max_buffer_size_deprecation

Static Public Attributes

 tp = io.BufferedRandom
string read_mode = "rb+"
string write_mode = "wb+"
 test_unseekable = None

Detailed Description

Definition at line 1554 of file test_io.py.


Member Function Documentation

def test.test_io.BufferedRandomTest.check_flush_and_read (   self,
  read_func 
) [inherited]

Definition at line 1409 of file test_io.py.

01409 
01410     def check_flush_and_read(self, read_func):
01411         raw = self.BytesIO(b"abcdefghi")
01412         bufio = self.tp(raw)
01413 
01414         self.assertEqual(b"ab", read_func(bufio, 2))
01415         bufio.write(b"12")
01416         self.assertEqual(b"ef", read_func(bufio, 2))
01417         self.assertEqual(6, bufio.tell())
01418         bufio.flush()
01419         self.assertEqual(6, bufio.tell())
01420         self.assertEqual(b"ghi", read_func(bufio))
01421         raw.seek(0, 0)
01422         raw.write(b"XYZ")
01423         # flush() resets the read buffer
01424         bufio.flush()
01425         bufio.seek(0, 0)
01426         self.assertEqual(b"XYZ", read_func(bufio, 3))

Here is the call graph for this function:

Here is the caller graph for this function:

def test.test_io.BufferedWriterTest.check_writes (   self,
  intermediate_func 
) [inherited]

Definition at line 1031 of file test_io.py.

01031 
01032     def check_writes(self, intermediate_func):
01033         # Lots of writes, test the flushed output is as expected.
01034         contents = bytes(range(256)) * 1000
01035         n = 0
01036         writer = self.MockRawIO()
01037         bufio = self.tp(writer, 13)
01038         # Generator of write sizes: repeat each N 15 times then proceed to N+1
01039         def gen_sizes():
01040             for size in count(1):
01041                 for i in range(15):
01042                     yield size
01043         sizes = gen_sizes()
01044         while n < len(contents):
01045             size = min(next(sizes), len(contents) - n)
01046             self.assertEqual(bufio.write(contents[n:n+size]), size)
01047             intermediate_func(bufio)
01048             n += size
01049         bufio.flush()
01050         self.assertEqual(contents, b"".join(writer._write_stack))

Here is the call graph for this function:

Here is the caller graph for this function:

Definition at line 823 of file test_io.py.

00823 
00824     def test_buffering(self):
00825         data = b"abcdefghi"
00826         dlen = len(data)
00827 
00828         tests = [
00829             [ 100, [ 3, 1, 4, 8 ], [ dlen, 0 ] ],
00830             [ 100, [ 3, 3, 3],     [ dlen ]    ],
00831             [   4, [ 1, 2, 4, 2 ], [ 4, 4, 1 ] ],
00832         ]
00833 
00834         for bufsize, buf_read_sizes, raw_read_sizes in tests:
00835             rawio = self.MockFileIO(data)
00836             bufio = self.tp(rawio, buffer_size=bufsize)
00837             pos = 0
00838             for nbytes in buf_read_sizes:
00839                 self.assertEqual(bufio.read(nbytes), data[pos:pos+nbytes])
00840                 pos += nbytes
00841             # this is mildly implementation-dependent
00842             self.assertEqual(rawio.read_history, raw_read_sizes)

Here is the call graph for this function:

Reimplemented from test.test_io.BufferedRandomTest.

Definition at line 1557 of file test_io.py.

01557 
01558     def test_constructor(self):
01559         BufferedRandomTest.test_constructor(self)
01560         # The allocation can succeed on 32-bit builds, e.g. with more
01561         # than 2GB RAM and a 64-bit kernel.
01562         if sys.maxsize > 0x7FFFFFFF:
01563             rawio = self.MockRawIO()
01564             bufio = self.tp(rawio)
01565             self.assertRaises((OverflowError, MemoryError, ValueError),
01566                 bufio.__init__, rawio, sys.maxsize)

Here is the call graph for this function:

Definition at line 692 of file test_io.py.

00692 
00693     def test_context_manager(self):
00694         # Test usability as a context manager
00695         rawio = self.MockRawIO()
00696         bufio = self.tp(rawio)
00697         def _with():
00698             with bufio:
00699                 pass
00700         _with()
00701         # bufio should now be closed, and using it a second time should raise
00702         # a ValueError.
00703         self.assertRaises(ValueError, _with)

Here is the call graph for this function:

Definition at line 692 of file test_io.py.

00692 
00693     def test_context_manager(self):
00694         # Test usability as a context manager
00695         rawio = self.MockRawIO()
00696         bufio = self.tp(rawio)
00697         def _with():
00698             with bufio:
00699                 pass
00700         _with()
00701         # bufio should now be closed, and using it a second time should raise
00702         # a ValueError.
00703         self.assertRaises(ValueError, _with)

Here is the call graph for this function:

Definition at line 1121 of file test_io.py.

01121 
01122     def test_destructor(self):
01123         writer = self.MockRawIO()
01124         bufio = self.tp(writer, 8)
01125         bufio.write(b"abc")
01126         del bufio
01127         support.gc_collect()
01128         self.assertEqual(b"abc", writer._write_stack[0])

Here is the call graph for this function:

Definition at line 640 of file test_io.py.

00640 
00641     def test_detach(self):
00642         raw = self.MockRawIO()
00643         buf = self.tp(raw)
00644         self.assertIs(buf.detach(), raw)
00645         self.assertRaises(ValueError, buf.detach)

Here is the call graph for this function:

Definition at line 640 of file test_io.py.

00640 
00641     def test_detach(self):
00642         raw = self.MockRawIO()
00643         buf = self.tp(raw)
00644         self.assertIs(buf.detach(), raw)
00645         self.assertRaises(ValueError, buf.detach)

Here is the call graph for this function:

Definition at line 1005 of file test_io.py.

01005 
01006     def test_detach_flush(self):
01007         raw = self.MockRawIO()
01008         buf = self.tp(raw)
01009         buf.write(b"howdy!")
01010         self.assertFalse(raw._write_stack)
01011         buf.detach()
01012         self.assertEqual(raw._write_stack, [b"howdy!"])

Here is the call graph for this function:

Definition at line 704 of file test_io.py.

00704 
00705     def test_error_through_destructor(self):
00706         # Test that the exception state is not modified by a destructor,
00707         # even if close() fails.
00708         rawio = self.CloseFailureIO()
00709         def f():
00710             self.tp(rawio).xyzzy
00711         with support.captured_output("stderr") as s:
00712             self.assertRaises(AttributeError, f)
00713         s = s.getvalue().strip()
00714         if s:
00715             # The destructor *may* have printed an unraisable error, check it
00716             self.assertEqual(len(s.splitlines()), 1)
00717             self.assertTrue(s.startswith("Exception IOError: "), s)
00718             self.assertTrue(s.endswith(" ignored"), s)

Here is the call graph for this function:

Definition at line 704 of file test_io.py.

00704 
00705     def test_error_through_destructor(self):
00706         # Test that the exception state is not modified by a destructor,
00707         # even if close() fails.
00708         rawio = self.CloseFailureIO()
00709         def f():
00710             self.tp(rawio).xyzzy
00711         with support.captured_output("stderr") as s:
00712             self.assertRaises(AttributeError, f)
00713         s = s.getvalue().strip()
00714         if s:
00715             # The destructor *may* have printed an unraisable error, check it
00716             self.assertEqual(len(s.splitlines()), 1)
00717             self.assertTrue(s.startswith("Exception IOError: "), s)
00718             self.assertTrue(s.endswith(" ignored"), s)

Here is the call graph for this function:

Definition at line 646 of file test_io.py.

00646 
00647     def test_fileno(self):
00648         rawio = self.MockRawIO()
00649         bufio = self.tp(rawio)
00650 
00651         self.assertEqual(42, bufio.fileno())

Here is the call graph for this function:

Definition at line 646 of file test_io.py.

00646 
00647     def test_fileno(self):
00648         rawio = self.MockRawIO()
00649         bufio = self.tp(rawio)
00650 
00651         self.assertEqual(42, bufio.fileno())

Here is the call graph for this function:

def test.test_io.BufferedWriterTest.test_flush (   self) [inherited]

Definition at line 1114 of file test_io.py.

01114 
01115     def test_flush(self):
01116         writer = self.MockRawIO()
01117         bufio = self.tp(writer, 8)
01118         bufio.write(b"abc")
01119         bufio.flush()
01120         self.assertEqual(b"abc", writer._write_stack[0])

Here is the call graph for this function:

Definition at line 1437 of file test_io.py.

01437 
01438     def test_flush_and_peek(self):
01439         def _peek(bufio, n=-1):
01440             # This relies on the fact that the buffer can contain the whole
01441             # raw stream, otherwise peek() can return less.
01442             b = bufio.peek(n)
01443             if n != -1:
01444                 b = b[:n]
01445             bufio.seek(len(b), 1)
01446             return b
01447         self.check_flush_and_read(_peek)

Here is the call graph for this function:

Definition at line 1427 of file test_io.py.

01427 
01428     def test_flush_and_read(self):
01429         self.check_flush_and_read(lambda bufio, *args: bufio.read(*args))

Here is the call graph for this function:

Definition at line 1430 of file test_io.py.

01430 
01431     def test_flush_and_readinto(self):
01432         def _readinto(bufio, n=-1):
01433             b = bytearray(n if n >= 0 else 9999)
01434             n = bufio.readinto(b)
01435             return bytes(b[:n])
01436         self.check_flush_and_read(_readinto)

Here is the call graph for this function:

Definition at line 1448 of file test_io.py.

01448 
01449     def test_flush_and_write(self):
01450         raw = self.BytesIO(b"abcdefghi")
01451         bufio = self.tp(raw)
01452 
01453         bufio.write(b"123")
01454         bufio.flush()
01455         bufio.write(b"45")
01456         bufio.flush()
01457         bufio.seek(0, 0)
01458         self.assertEqual(b"12345fghi", raw.getvalue())
01459         self.assertEqual(b"12345fghi", bufio.read())

Here is the call graph for this function:

Definition at line 729 of file test_io.py.

00729 
00730     def test_flush_error_on_close(self):
00731         raw = self.MockRawIO()
00732         def bad_flush():
00733             raise IOError()
00734         raw.flush = bad_flush
00735         b = self.tp(raw)
00736         self.assertRaises(IOError, b.close) # exception not swallowed

Here is the call graph for this function:

Definition at line 729 of file test_io.py.

00729 
00730     def test_flush_error_on_close(self):
00731         raw = self.MockRawIO()
00732         def bad_flush():
00733             raise IOError()
00734         raw.flush = bad_flush
00735         b = self.tp(raw)
00736         self.assertRaises(IOError, b.close) # exception not swallowed

Here is the call graph for this function:

Definition at line 1567 of file test_io.py.

01567 
01568     def test_garbage_collection(self):
01569         CBufferedReaderTest.test_garbage_collection(self)
01570         CBufferedWriterTest.test_garbage_collection(self)

Definition at line 657 of file test_io.py.

00657 
00658     def test_invalid_args(self):
00659         rawio = self.MockRawIO()
00660         bufio = self.tp(rawio)
00661         # Invalid whence
00662         self.assertRaises(ValueError, bufio.seek, 0, -1)
00663         self.assertRaises(ValueError, bufio.seek, 0, 3)

Here is the call graph for this function:

Definition at line 657 of file test_io.py.

00657 
00658     def test_invalid_args(self):
00659         rawio = self.MockRawIO()
00660         bufio = self.tp(rawio)
00661         # Invalid whence
00662         self.assertRaises(ValueError, bufio.seek, 0, -1)
00663         self.assertRaises(ValueError, bufio.seek, 0, 3)

Here is the call graph for this function:

Definition at line 1197 of file test_io.py.

01197 
01198     def test_max_buffer_size_deprecation(self):
01199         with support.check_warnings(("max_buffer_size is deprecated",
01200                                      DeprecationWarning)):
01201             self.tp(self.MockRawIO(), 8, 12)
01202 

Reimplemented from test.test_io.BufferedReaderTest.

Definition at line 1547 of file test_io.py.

01547 
01548     def test_misbehaved_io(self):
01549         BufferedReaderTest.test_misbehaved_io(self)
01550         BufferedWriterTest.test_misbehaved_io(self)

Definition at line 737 of file test_io.py.

00737 
00738     def test_multi_close(self):
00739         raw = self.MockRawIO()
00740         b = self.tp(raw)
00741         b.close()
00742         b.close()
00743         b.close()
00744         self.assertRaises(ValueError, b.flush)

Here is the call graph for this function:

Definition at line 737 of file test_io.py.

00737 
00738     def test_multi_close(self):
00739         raw = self.MockRawIO()
00740         b = self.tp(raw)
00741         b.close()
00742         b.close()
00743         b.close()
00744         self.assertRaises(ValueError, b.flush)

Here is the call graph for this function:

Definition at line 920 of file test_io.py.

00920 
00921     def test_no_extraneous_read(self):
00922         # Issue #9550; when the raw IO object has satisfied the read request,
00923         # we should not issue any additional reads, otherwise it may block
00924         # (e.g. socket).
00925         bufsize = 16
00926         for n in (2, bufsize - 1, bufsize, bufsize + 1, bufsize * 2):
00927             rawio = self.MockRawIO([b"x" * n])
00928             bufio = self.tp(rawio, bufsize)
00929             self.assertEqual(bufio.read(n), b"x" * n)
00930             # Simple case: one raw read is enough to satisfy the request.
00931             self.assertEqual(rawio._extraneous_reads, 0,
00932                              "failed for {}: {} != 0".format(n, rawio._extraneous_reads))
00933             # A more complex case where two raw reads are needed to satisfy
00934             # the request.
00935             rawio = self.MockRawIO([b"x" * (n - 1), b"x"])
00936             bufio = self.tp(rawio, bufsize)
00937             self.assertEqual(bufio.read(n), b"x" * n)
00938             self.assertEqual(rawio._extraneous_reads, 0,
00939                              "failed for {}: {} != 0".format(n, rawio._extraneous_reads))
00940 

Here is the call graph for this function:

Definition at line 652 of file test_io.py.

00652 
00653     def test_no_fileno(self):
00654         # XXX will we always have fileno() function? If so, kill
00655         # this test. Else, write it.
00656         pass

Definition at line 652 of file test_io.py.

00652 
00653     def test_no_fileno(self):
00654         # XXX will we always have fileno() function? If so, kill
00655         # this test. Else, write it.
00656         pass

Definition at line 664 of file test_io.py.

00664 
00665     def test_override_destructor(self):
00666         tp = self.tp
00667         record = []
00668         class MyBufferedIO(tp):
00669             def __del__(self):
00670                 record.append(1)
00671                 try:
00672                     f = super().__del__
00673                 except AttributeError:
00674                     pass
00675                 else:
00676                     f()
00677             def close(self):
00678                 record.append(2)
00679                 super().close()
00680             def flush(self):
00681                 record.append(3)
00682                 super().flush()
00683         rawio = self.MockRawIO()
00684         bufio = MyBufferedIO(rawio)
00685         writable = bufio.writable()
00686         del bufio
00687         support.gc_collect()
00688         if writable:
00689             self.assertEqual(record, [1, 2, 3])
00690         else:
00691             self.assertEqual(record, [1, 2])

Here is the call graph for this function:

Definition at line 664 of file test_io.py.

00664 
00665     def test_override_destructor(self):
00666         tp = self.tp
00667         record = []
00668         class MyBufferedIO(tp):
00669             def __del__(self):
00670                 record.append(1)
00671                 try:
00672                     f = super().__del__
00673                 except AttributeError:
00674                     pass
00675                 else:
00676                     f()
00677             def close(self):
00678                 record.append(2)
00679                 super().close()
00680             def flush(self):
00681                 record.append(3)
00682                 super().flush()
00683         rawio = self.MockRawIO()
00684         bufio = MyBufferedIO(rawio)
00685         writable = bufio.writable()
00686         del bufio
00687         support.gc_collect()
00688         if writable:
00689             self.assertEqual(record, [1, 2, 3])
00690         else:
00691             self.assertEqual(record, [1, 2])

Here is the call graph for this function:

def test.test_io.BufferedReaderTest.test_read (   self) [inherited]

Definition at line 775 of file test_io.py.

00775 
00776     def test_read(self):
00777         for arg in (None, 7):
00778             rawio = self.MockRawIO((b"abc", b"d", b"efg"))
00779             bufio = self.tp(rawio)
00780             self.assertEqual(b"abcdefg", bufio.read(arg))
00781         # Invalid args
00782         self.assertRaises(ValueError, bufio.read, -2)

Here is the call graph for this function:

def test.test_io.BufferedReaderTest.test_read1 (   self) [inherited]

Definition at line 783 of file test_io.py.

00783 
00784     def test_read1(self):
00785         rawio = self.MockRawIO((b"abc", b"d", b"efg"))
00786         bufio = self.tp(rawio)
00787         self.assertEqual(b"a", bufio.read(1))
00788         self.assertEqual(b"b", bufio.read1(1))
00789         self.assertEqual(rawio._reads, 1)
00790         self.assertEqual(b"c", bufio.read1(100))
00791         self.assertEqual(rawio._reads, 1)
00792         self.assertEqual(b"d", bufio.read1(100))
00793         self.assertEqual(rawio._reads, 2)
00794         self.assertEqual(b"efg", bufio.read1(100))
00795         self.assertEqual(rawio._reads, 3)
00796         self.assertEqual(b"", bufio.read1(100))
00797         self.assertEqual(rawio._reads, 4)
00798         # Invalid args
00799         self.assertRaises(ValueError, bufio.read1, -1)

Here is the call graph for this function:

Definition at line 864 of file test_io.py.

00864 
00865     def test_read_all(self):
00866         rawio = self.MockRawIO((b"abc", b"d", b"efg"))
00867         bufio = self.tp(rawio)
00868 
00869         self.assertEqual(b"abcdefg", bufio.read())

Here is the call graph for this function:

Definition at line 1378 of file test_io.py.

01378 
01379     def test_read_and_write(self):
01380         raw = self.MockRawIO((b"asdf", b"ghjk"))
01381         rw = self.tp(raw, 8)
01382 
01383         self.assertEqual(b"as", rw.read(2))
01384         rw.write(b"ddd")
01385         rw.write(b"eee")
01386         self.assertFalse(raw._write_stack) # Buffer writes
01387         self.assertEqual(b"ghjk", rw.read())
01388         self.assertEqual(b"dddeee", raw._write_stack[0])

Here is the call graph for this function:

Definition at line 843 of file test_io.py.

00843 
00844     def test_read_non_blocking(self):
00845         # Inject some None's in there to simulate EWOULDBLOCK
00846         rawio = self.MockRawIO((b"abc", b"d", None, b"efg", None, None, None))
00847         bufio = self.tp(rawio)
00848         self.assertEqual(b"abcd", bufio.read(6))
00849         self.assertEqual(b"e", bufio.read(1))
00850         self.assertEqual(b"fg", bufio.read())
00851         self.assertEqual(b"", bufio.peek(1))
00852         self.assertIsNone(bufio.read())
00853         self.assertEqual(b"", bufio.read())
00854 
00855         rawio = self.MockRawIO((b"a", None, None))
00856         self.assertEqual(b"a", rawio.readall())
00857         self.assertIsNone(rawio.readall())

Here is the call graph for this function:

Definition at line 858 of file test_io.py.

00858 
00859     def test_read_past_eof(self):
00860         rawio = self.MockRawIO((b"abc", b"d", b"efg"))
00861         bufio = self.tp(rawio)
00862 
00863         self.assertEqual(b"abcdefg", bufio.read(9000))

Here is the call graph for this function:

Definition at line 800 of file test_io.py.

00800 
00801     def test_readinto(self):
00802         rawio = self.MockRawIO((b"abc", b"d", b"efg"))
00803         bufio = self.tp(rawio)
00804         b = bytearray(2)
00805         self.assertEqual(bufio.readinto(b), 2)
00806         self.assertEqual(b, b"ab")
00807         self.assertEqual(bufio.readinto(b), 2)
00808         self.assertEqual(b, b"cd")
00809         self.assertEqual(bufio.readinto(b), 2)
00810         self.assertEqual(b, b"ef")
00811         self.assertEqual(bufio.readinto(b), 1)
00812         self.assertEqual(b, b"gf")
00813         self.assertEqual(bufio.readinto(b), 0)
00814         self.assertEqual(b, b"gf")

Here is the call graph for this function:

Definition at line 815 of file test_io.py.

00815 
00816     def test_readlines(self):
00817         def bufio():
00818             rawio = self.MockRawIO((b"abc\n", b"d\n", b"ef"))
00819             return self.tp(rawio)
00820         self.assertEqual(bufio().readlines(), [b"abc\n", b"d\n", b"ef"])
00821         self.assertEqual(bufio().readlines(5), [b"abc\n", b"d\n"])
00822         self.assertEqual(bufio().readlines(None), [b"abc\n", b"d\n", b"ef"])

Here is the call graph for this function:

Definition at line 750 of file test_io.py.

00750 
00751     def test_readonly_attributes(self):
00752         raw = self.MockRawIO()
00753         buf = self.tp(raw)
00754         x = self.MockRawIO()
00755         with self.assertRaises(AttributeError):
00756             buf.raw = x
00757 

Here is the call graph for this function:

Definition at line 750 of file test_io.py.

00750 
00751     def test_readonly_attributes(self):
00752         raw = self.MockRawIO()
00753         buf = self.tp(raw)
00754         x = self.MockRawIO()
00755         with self.assertRaises(AttributeError):
00756             buf.raw = x
00757 

Here is the call graph for this function:

def test.test_io.CommonBufferedTests.test_repr (   self) [inherited]

Definition at line 719 of file test_io.py.

00719 
00720     def test_repr(self):
00721         raw = self.MockRawIO()
00722         b = self.tp(raw)
00723         clsname = "%s.%s" % (self.tp.__module__, self.tp.__name__)
00724         self.assertEqual(repr(b), "<%s>" % clsname)
00725         raw.name = "dummy"
00726         self.assertEqual(repr(b), "<%s name='dummy'>" % clsname)
00727         raw.name = b"dummy"
00728         self.assertEqual(repr(b), "<%s name=b'dummy'>" % clsname)

Here is the call graph for this function:

def test.test_io.CommonBufferedTests.test_repr (   self) [inherited]

Definition at line 719 of file test_io.py.

00719 
00720     def test_repr(self):
00721         raw = self.MockRawIO()
00722         b = self.tp(raw)
00723         clsname = "%s.%s" % (self.tp.__module__, self.tp.__name__)
00724         self.assertEqual(repr(b), "<%s>" % clsname)
00725         raw.name = "dummy"
00726         self.assertEqual(repr(b), "<%s name='dummy'>" % clsname)
00727         raw.name = b"dummy"
00728         self.assertEqual(repr(b), "<%s name=b'dummy'>" % clsname)

Here is the call graph for this function:

Definition at line 1389 of file test_io.py.

01389 
01390     def test_seek_and_tell(self):
01391         raw = self.BytesIO(b"asdfghjkl")
01392         rw = self.tp(raw)
01393 
01394         self.assertEqual(b"as", rw.read(2))
01395         self.assertEqual(2, rw.tell())
01396         rw.seek(0, 0)
01397         self.assertEqual(b"asdf", rw.read(4))
01398 
01399         rw.write(b"asdf")
01400         rw.seek(0, 0)
01401         self.assertEqual(b"asdfasdfl", rw.read())
01402         self.assertEqual(9, rw.tell())
01403         rw.seek(-4, 2)
01404         self.assertEqual(5, rw.tell())
01405         rw.seek(2, 1)
01406         self.assertEqual(7, rw.tell())
01407         self.assertEqual(b"fl", rw.read(11))
01408         self.assertRaises(TypeError, rw.seek, 0.0)

Here is the call graph for this function:

Reimplemented from test.test_io.BufferedReaderTest.

Definition at line 1460 of file test_io.py.

01460 
01461     def test_threads(self):
01462         BufferedReaderTest.test_threads(self)
01463         BufferedWriterTest.test_threads(self)

Definition at line 1129 of file test_io.py.

01129 
01130     def test_truncate(self):
01131         # Truncate implicitly flushes the buffer.
01132         with self.open(support.TESTFN, self.write_mode, buffering=0) as raw:
01133             bufio = self.tp(raw, 8)
01134             bufio.write(b"abcdef")
01135             self.assertEqual(bufio.truncate(3), 3)
01136             self.assertEqual(bufio.tell(), 6)
01137         with self.open(support.TESTFN, "rb", buffering=0) as f:
01138             self.assertEqual(f.read(), b"abc")

Here is the call graph for this function:

Definition at line 1539 of file test_io.py.

01539 
01540     def test_truncate_after_read_or_write(self):
01541         raw = self.BytesIO(b"A" * 10)
01542         bufio = self.tp(raw, 100)
01543         self.assertEqual(bufio.read(2), b"AA") # the read buffer gets filled
01544         self.assertEqual(bufio.truncate(), 2)
01545         self.assertEqual(bufio.write(b"BB"), 2) # the write buffer increases
01546         self.assertEqual(bufio.truncate(), 4)

Here is the call graph for this function:

Definition at line 745 of file test_io.py.

00745 
00746     def test_unseekable(self):
00747         bufio = self.tp(self.MockUnseekableIO(b"A" * 10))
00748         self.assertRaises(self.UnsupportedOperation, bufio.tell)
00749         self.assertRaises(self.UnsupportedOperation, bufio.seek, 0)

Here is the call graph for this function:

Definition at line 745 of file test_io.py.

00745 
00746     def test_unseekable(self):
00747         bufio = self.tp(self.MockUnseekableIO(b"A" * 10))
00748         self.assertRaises(self.UnsupportedOperation, bufio.tell)
00749         self.assertRaises(self.UnsupportedOperation, bufio.seek, 0)

Here is the call graph for this function:

def test.test_io.BufferedWriterTest.test_write (   self) [inherited]

Definition at line 1013 of file test_io.py.

01013 
01014     def test_write(self):
01015         # Write to the buffered IO but don't overflow the buffer.
01016         writer = self.MockRawIO()
01017         bufio = self.tp(writer, 8)
01018         bufio.write(b"abc")
01019         self.assertFalse(writer._write_stack)

Here is the call graph for this function:

Definition at line 1493 of file test_io.py.

01493 
01494     def test_write_after_readahead(self):
01495         # Issue #6629: writing after the buffer was filled by readahead should
01496         # first rewind the raw stream.
01497         for overwrite_size in [1, 5]:
01498             raw = self.BytesIO(b"A" * 10)
01499             bufio = self.tp(raw, 4)
01500             # Trigger readahead
01501             self.assertEqual(bufio.read(1), b"A")
01502             self.assertEqual(bufio.tell(), 1)
01503             # Overwriting should rewind the raw stream if it needs so
01504             bufio.write(b"B" * overwrite_size)
01505             self.assertEqual(bufio.tell(), overwrite_size + 1)
01506             # If the write size was smaller than the buffer size, flush() and
01507             # check that rewind happens.
01508             bufio.flush()
01509             self.assertEqual(bufio.tell(), overwrite_size + 1)
01510             s = raw.getvalue()
01511             self.assertEqual(s,
01512                 b"A" + b"B" * overwrite_size + b"A" * (9 - overwrite_size))

Here is the call graph for this function:

Definition at line 1101 of file test_io.py.

01101 
01102     def test_write_and_rewind(self):
01103         raw = io.BytesIO()
01104         bufio = self.tp(raw, 4)
01105         self.assertEqual(bufio.write(b"abcdef"), 6)
01106         self.assertEqual(bufio.tell(), 6)
01107         bufio.seek(0, 0)
01108         self.assertEqual(bufio.write(b"XY"), 2)
01109         bufio.seek(6, 0)
01110         self.assertEqual(raw.getvalue(), b"XYcdef")
01111         self.assertEqual(bufio.write(b"123456"), 6)
01112         bufio.flush()
01113         self.assertEqual(raw.getvalue(), b"XYcdef123456")

Here is the call graph for this function:

Definition at line 1074 of file test_io.py.

01074 
01075     def test_write_non_blocking(self):
01076         raw = self.MockNonBlockWriterIO()
01077         bufio = self.tp(raw, 8)
01078 
01079         self.assertEqual(bufio.write(b"abcd"), 4)
01080         self.assertEqual(bufio.write(b"efghi"), 5)
01081         # 1 byte will be written, the rest will be buffered
01082         raw.block_on(b"k")
01083         self.assertEqual(bufio.write(b"jklmn"), 5)
01084 
01085         # 8 bytes will be written, 8 will be buffered and the rest will be lost
01086         raw.block_on(b"0")
01087         try:
01088             bufio.write(b"opqrwxyz0123456789")
01089         except self.BlockingIOError as e:
01090             written = e.characters_written
01091         else:
01092             self.fail("BlockingIOError should have been raised")
01093         self.assertEqual(written, 16)
01094         self.assertEqual(raw.pop_written(),
01095             b"abcdefghijklmnopqrwxyz")
01096 
01097         self.assertEqual(bufio.write(b"ABCDEFGHI"), 9)
01098         s = raw.pop_written()
01099         # Previously buffered bytes were flushed
01100         self.assertTrue(s.startswith(b"01234567A"), s)

Here is the call graph for this function:

Definition at line 1020 of file test_io.py.

01020 
01021     def test_write_overflow(self):
01022         writer = self.MockRawIO()
01023         bufio = self.tp(writer, 8)
01024         contents = b"abcdefghijklmnop"
01025         for n in range(0, len(contents), 3):
01026             bufio.write(contents[n:n+3])
01027         flushed = b"".join(writer._write_stack)
01028         # At least (total - 8) bytes were implicitly flushed, perhaps more
01029         # depending on the implementation.
01030         self.assertTrue(flushed.startswith(contents[:-8]), flushed)

Here is the call graph for this function:

Definition at line 1513 of file test_io.py.

01513 
01514     def test_write_rewind_write(self):
01515         # Various combinations of reading / writing / seeking backwards / writing again
01516         def mutate(bufio, pos1, pos2):
01517             assert pos2 >= pos1
01518             # Fill the buffer
01519             bufio.seek(pos1)
01520             bufio.read(pos2 - pos1)
01521             bufio.write(b'\x02')
01522             # This writes earlier than the previous write, but still inside
01523             # the buffer.
01524             bufio.seek(pos1)
01525             bufio.write(b'\x01')
01526 
01527         b = b"\x80\x81\x82\x83\x84"
01528         for i in range(0, len(b)):
01529             for j in range(i, len(b)):
01530                 raw = self.BytesIO(b)
01531                 bufio = self.tp(raw, 100)
01532                 mutate(bufio, i, j)
01533                 bufio.flush()
01534                 expected = bytearray(b)
01535                 expected[j] = 2
01536                 expected[i] = 1
01537                 self.assertEqual(raw.getvalue(), expected,
01538                                  "failed result for i=%d, j=%d" % (i, j))

Here is the call graph for this function:

def test.test_io.BufferedWriterTest.test_writes (   self) [inherited]

Definition at line 1051 of file test_io.py.

01051 
01052     def test_writes(self):
01053         self.check_writes(lambda bufio: None)

Here is the call graph for this function:

Definition at line 1054 of file test_io.py.

01054 
01055     def test_writes_and_flushes(self):
01056         self.check_writes(lambda bufio: bufio.flush())

Here is the call graph for this function:

Definition at line 1464 of file test_io.py.

01464 
01465     def test_writes_and_peek(self):
01466         def _peek(bufio):
01467             bufio.peek(1)
01468         self.check_writes(_peek)
01469         def _peek(bufio):
01470             pos = bufio.tell()
01471             bufio.seek(-1, 1)
01472             bufio.peek(1)
01473             bufio.seek(pos, 0)
01474         self.check_writes(_peek)

Here is the call graph for this function:

Definition at line 1481 of file test_io.py.

01481 
01482     def test_writes_and_read1s(self):
01483         def _read1(bufio):
01484             bufio.seek(-1, 1)
01485             bufio.read1(1)
01486         self.check_writes(_read1)

Here is the call graph for this function:

Definition at line 1487 of file test_io.py.

01487 
01488     def test_writes_and_readintos(self):
01489         def _read(bufio):
01490             bufio.seek(-1, 1)
01491             bufio.readinto(bytearray(1))
01492         self.check_writes(_read)

Here is the call graph for this function:

Definition at line 1475 of file test_io.py.

01475 
01476     def test_writes_and_reads(self):
01477         def _read(bufio):
01478             bufio.seek(-1, 1)
01479             bufio.read(1)
01480         self.check_writes(_read)

Here is the call graph for this function:

Definition at line 1057 of file test_io.py.

01057 
01058     def test_writes_and_seeks(self):
01059         def _seekabs(bufio):
01060             pos = bufio.tell()
01061             bufio.seek(pos + 1, 0)
01062             bufio.seek(pos - 1, 0)
01063             bufio.seek(pos, 0)
01064         self.check_writes(_seekabs)
01065         def _seekrel(bufio):
01066             pos = bufio.seek(0, 1)
01067             bufio.seek(+1, 1)
01068             bufio.seek(-1, 1)
01069             bufio.seek(pos, 0)
01070         self.check_writes(_seekrel)

Here is the call graph for this function:

Definition at line 1071 of file test_io.py.

01071 
01072     def test_writes_and_truncates(self):
01073         self.check_writes(lambda bufio: bufio.truncate(bufio.tell()))

Here is the call graph for this function:


Member Data Documentation

Reimplemented from test.test_io.BufferedReaderTest.

Definition at line 1371 of file test_io.py.

Definition at line 1552 of file test_io.py.

test.test_io.CBufferedRandomTest.tp = io.BufferedRandom [static]

Definition at line 1555 of file test_io.py.

Reimplemented from test.test_io.BufferedWriterTest.

Definition at line 1372 of file test_io.py.


The documentation for this class was generated from the following file: