Back to index

python3.2  3.2.2
test_io.py
Go to the documentation of this file.
00001 """Unit tests for the io module."""
00002 
00003 # Tests of io are scattered over the test suite:
00004 # * test_bufio - tests file buffering
00005 # * test_memoryio - tests BytesIO and StringIO
00006 # * test_fileio - tests FileIO
00007 # * test_file - tests the file interface
00008 # * test_io - tests everything else in the io module
00009 # * test_univnewlines - tests universal newline support
00010 # * test_largefile - tests operations on a file greater than 2**32 bytes
00011 #     (only enabled with -ulargefile)
00012 
00013 ################################################################################
00014 # ATTENTION TEST WRITERS!!!
00015 ################################################################################
00016 # When writing tests for io, it's important to test both the C and Python
00017 # implementations. This is usually done by writing a base test that refers to
00018 # the type it is testing as a attribute. Then it provides custom subclasses to
00019 # test both implementations. This file has lots of examples.
00020 ################################################################################
00021 
00022 import os
00023 import sys
00024 import time
00025 import array
00026 import random
00027 import unittest
00028 import weakref
00029 import abc
00030 import signal
00031 import errno
00032 import warnings
00033 import pickle
00034 from itertools import cycle, count
00035 from collections import deque
00036 from test import support
00037 
00038 import codecs
00039 import io  # C implementation of io
00040 import _pyio as pyio # Python implementation of io
00041 try:
00042     import threading
00043 except ImportError:
00044     threading = None
00045 
00046 
00047 def _default_chunk_size():
00048     """Get the default TextIOWrapper chunk size"""
00049     with open(__file__, "r", encoding="latin1") as f:
00050         return f._CHUNK_SIZE
00051 
00052 
00053 class MockRawIOWithoutRead:
00054     """A RawIO implementation without read(), so as to exercise the default
00055     RawIO.read() which calls readinto()."""
00056 
00057     def __init__(self, read_stack=()):
00058         self._read_stack = list(read_stack)
00059         self._write_stack = []
00060         self._reads = 0
00061         self._extraneous_reads = 0
00062 
00063     def write(self, b):
00064         self._write_stack.append(bytes(b))
00065         return len(b)
00066 
00067     def writable(self):
00068         return True
00069 
00070     def fileno(self):
00071         return 42
00072 
00073     def readable(self):
00074         return True
00075 
00076     def seekable(self):
00077         return True
00078 
00079     def seek(self, pos, whence):
00080         return 0   # wrong but we gotta return something
00081 
00082     def tell(self):
00083         return 0   # same comment as above
00084 
00085     def readinto(self, buf):
00086         self._reads += 1
00087         max_len = len(buf)
00088         try:
00089             data = self._read_stack[0]
00090         except IndexError:
00091             self._extraneous_reads += 1
00092             return 0
00093         if data is None:
00094             del self._read_stack[0]
00095             return None
00096         n = len(data)
00097         if len(data) <= max_len:
00098             del self._read_stack[0]
00099             buf[:n] = data
00100             return n
00101         else:
00102             buf[:] = data[:max_len]
00103             self._read_stack[0] = data[max_len:]
00104             return max_len
00105 
00106     def truncate(self, pos=None):
00107         return pos
00108 
00109 class CMockRawIOWithoutRead(MockRawIOWithoutRead, io.RawIOBase):
00110     pass
00111 
00112 class PyMockRawIOWithoutRead(MockRawIOWithoutRead, pyio.RawIOBase):
00113     pass
00114 
00115 
00116 class MockRawIO(MockRawIOWithoutRead):
00117 
00118     def read(self, n=None):
00119         self._reads += 1
00120         try:
00121             return self._read_stack.pop(0)
00122         except:
00123             self._extraneous_reads += 1
00124             return b""
00125 
00126 class CMockRawIO(MockRawIO, io.RawIOBase):
00127     pass
00128 
00129 class PyMockRawIO(MockRawIO, pyio.RawIOBase):
00130     pass
00131 
00132 
00133 class MisbehavedRawIO(MockRawIO):
00134     def write(self, b):
00135         return super().write(b) * 2
00136 
00137     def read(self, n=None):
00138         return super().read(n) * 2
00139 
00140     def seek(self, pos, whence):
00141         return -123
00142 
00143     def tell(self):
00144         return -456
00145 
00146     def readinto(self, buf):
00147         super().readinto(buf)
00148         return len(buf) * 5
00149 
00150 class CMisbehavedRawIO(MisbehavedRawIO, io.RawIOBase):
00151     pass
00152 
00153 class PyMisbehavedRawIO(MisbehavedRawIO, pyio.RawIOBase):
00154     pass
00155 
00156 
00157 class CloseFailureIO(MockRawIO):
00158     closed = 0
00159 
00160     def close(self):
00161         if not self.closed:
00162             self.closed = 1
00163             raise IOError
00164 
00165 class CCloseFailureIO(CloseFailureIO, io.RawIOBase):
00166     pass
00167 
00168 class PyCloseFailureIO(CloseFailureIO, pyio.RawIOBase):
00169     pass
00170 
00171 
00172 class MockFileIO:
00173 
00174     def __init__(self, data):
00175         self.read_history = []
00176         super().__init__(data)
00177 
00178     def read(self, n=None):
00179         res = super().read(n)
00180         self.read_history.append(None if res is None else len(res))
00181         return res
00182 
00183     def readinto(self, b):
00184         res = super().readinto(b)
00185         self.read_history.append(res)
00186         return res
00187 
00188 class CMockFileIO(MockFileIO, io.BytesIO):
00189     pass
00190 
00191 class PyMockFileIO(MockFileIO, pyio.BytesIO):
00192     pass
00193 
00194 
00195 class MockUnseekableIO:
00196     def seekable(self):
00197         return False
00198 
00199     def seek(self, *args):
00200         raise self.UnsupportedOperation("not seekable")
00201 
00202     def tell(self, *args):
00203         raise self.UnsupportedOperation("not seekable")
00204 
00205 class CMockUnseekableIO(MockUnseekableIO, io.BytesIO):
00206     UnsupportedOperation = io.UnsupportedOperation
00207 
00208 class PyMockUnseekableIO(MockUnseekableIO, pyio.BytesIO):
00209     UnsupportedOperation = pyio.UnsupportedOperation
00210 
00211 
00212 class MockNonBlockWriterIO:
00213 
00214     def __init__(self):
00215         self._write_stack = []
00216         self._blocker_char = None
00217 
00218     def pop_written(self):
00219         s = b"".join(self._write_stack)
00220         self._write_stack[:] = []
00221         return s
00222 
00223     def block_on(self, char):
00224         """Block when a given char is encountered."""
00225         self._blocker_char = char
00226 
00227     def readable(self):
00228         return True
00229 
00230     def seekable(self):
00231         return True
00232 
00233     def writable(self):
00234         return True
00235 
00236     def write(self, b):
00237         b = bytes(b)
00238         n = -1
00239         if self._blocker_char:
00240             try:
00241                 n = b.index(self._blocker_char)
00242             except ValueError:
00243                 pass
00244             else:
00245                 self._blocker_char = None
00246                 self._write_stack.append(b[:n])
00247                 raise self.BlockingIOError(0, "test blocking", n)
00248         self._write_stack.append(b)
00249         return len(b)
00250 
00251 class CMockNonBlockWriterIO(MockNonBlockWriterIO, io.RawIOBase):
00252     BlockingIOError = io.BlockingIOError
00253 
00254 class PyMockNonBlockWriterIO(MockNonBlockWriterIO, pyio.RawIOBase):
00255     BlockingIOError = pyio.BlockingIOError
00256 
00257 
00258 class IOTest(unittest.TestCase):
00259 
00260     def setUp(self):
00261         support.unlink(support.TESTFN)
00262 
00263     def tearDown(self):
00264         support.unlink(support.TESTFN)
00265 
00266     def write_ops(self, f):
00267         self.assertEqual(f.write(b"blah."), 5)
00268         f.truncate(0)
00269         self.assertEqual(f.tell(), 5)
00270         f.seek(0)
00271 
00272         self.assertEqual(f.write(b"blah."), 5)
00273         self.assertEqual(f.seek(0), 0)
00274         self.assertEqual(f.write(b"Hello."), 6)
00275         self.assertEqual(f.tell(), 6)
00276         self.assertEqual(f.seek(-1, 1), 5)
00277         self.assertEqual(f.tell(), 5)
00278         self.assertEqual(f.write(bytearray(b" world\n\n\n")), 9)
00279         self.assertEqual(f.seek(0), 0)
00280         self.assertEqual(f.write(b"h"), 1)
00281         self.assertEqual(f.seek(-1, 2), 13)
00282         self.assertEqual(f.tell(), 13)
00283 
00284         self.assertEqual(f.truncate(12), 12)
00285         self.assertEqual(f.tell(), 13)
00286         self.assertRaises(TypeError, f.seek, 0.0)
00287 
00288     def read_ops(self, f, buffered=False):
00289         data = f.read(5)
00290         self.assertEqual(data, b"hello")
00291         data = bytearray(data)
00292         self.assertEqual(f.readinto(data), 5)
00293         self.assertEqual(data, b" worl")
00294         self.assertEqual(f.readinto(data), 2)
00295         self.assertEqual(len(data), 5)
00296         self.assertEqual(data[:2], b"d\n")
00297         self.assertEqual(f.seek(0), 0)
00298         self.assertEqual(f.read(20), b"hello world\n")
00299         self.assertEqual(f.read(1), b"")
00300         self.assertEqual(f.readinto(bytearray(b"x")), 0)
00301         self.assertEqual(f.seek(-6, 2), 6)
00302         self.assertEqual(f.read(5), b"world")
00303         self.assertEqual(f.read(0), b"")
00304         self.assertEqual(f.readinto(bytearray()), 0)
00305         self.assertEqual(f.seek(-6, 1), 5)
00306         self.assertEqual(f.read(5), b" worl")
00307         self.assertEqual(f.tell(), 10)
00308         self.assertRaises(TypeError, f.seek, 0.0)
00309         if buffered:
00310             f.seek(0)
00311             self.assertEqual(f.read(), b"hello world\n")
00312             f.seek(6)
00313             self.assertEqual(f.read(), b"world\n")
00314             self.assertEqual(f.read(), b"")
00315 
00316     LARGE = 2**31
00317 
00318     def large_file_ops(self, f):
00319         assert f.readable()
00320         assert f.writable()
00321         self.assertEqual(f.seek(self.LARGE), self.LARGE)
00322         self.assertEqual(f.tell(), self.LARGE)
00323         self.assertEqual(f.write(b"xxx"), 3)
00324         self.assertEqual(f.tell(), self.LARGE + 3)
00325         self.assertEqual(f.seek(-1, 1), self.LARGE + 2)
00326         self.assertEqual(f.truncate(), self.LARGE + 2)
00327         self.assertEqual(f.tell(), self.LARGE + 2)
00328         self.assertEqual(f.seek(0, 2), self.LARGE + 2)
00329         self.assertEqual(f.truncate(self.LARGE + 1), self.LARGE + 1)
00330         self.assertEqual(f.tell(), self.LARGE + 2)
00331         self.assertEqual(f.seek(0, 2), self.LARGE + 1)
00332         self.assertEqual(f.seek(-1, 2), self.LARGE)
00333         self.assertEqual(f.read(2), b"x")
00334 
00335     def test_invalid_operations(self):
00336         # Try writing on a file opened in read mode and vice-versa.
00337         exc = self.UnsupportedOperation
00338         for mode in ("w", "wb"):
00339             with self.open(support.TESTFN, mode) as fp:
00340                 self.assertRaises(exc, fp.read)
00341                 self.assertRaises(exc, fp.readline)
00342         with self.open(support.TESTFN, "wb", buffering=0) as fp:
00343             self.assertRaises(exc, fp.read)
00344             self.assertRaises(exc, fp.readline)
00345         with self.open(support.TESTFN, "rb", buffering=0) as fp:
00346             self.assertRaises(exc, fp.write, b"blah")
00347             self.assertRaises(exc, fp.writelines, [b"blah\n"])
00348         with self.open(support.TESTFN, "rb") as fp:
00349             self.assertRaises(exc, fp.write, b"blah")
00350             self.assertRaises(exc, fp.writelines, [b"blah\n"])
00351         with self.open(support.TESTFN, "r") as fp:
00352             self.assertRaises(exc, fp.write, "blah")
00353             self.assertRaises(exc, fp.writelines, ["blah\n"])
00354             # Non-zero seeking from current or end pos
00355             self.assertRaises(exc, fp.seek, 1, self.SEEK_CUR)
00356             self.assertRaises(exc, fp.seek, -1, self.SEEK_END)
00357 
00358     def test_raw_file_io(self):
00359         with self.open(support.TESTFN, "wb", buffering=0) as f:
00360             self.assertEqual(f.readable(), False)
00361             self.assertEqual(f.writable(), True)
00362             self.assertEqual(f.seekable(), True)
00363             self.write_ops(f)
00364         with self.open(support.TESTFN, "rb", buffering=0) as f:
00365             self.assertEqual(f.readable(), True)
00366             self.assertEqual(f.writable(), False)
00367             self.assertEqual(f.seekable(), True)
00368             self.read_ops(f)
00369 
00370     def test_buffered_file_io(self):
00371         with self.open(support.TESTFN, "wb") as f:
00372             self.assertEqual(f.readable(), False)
00373             self.assertEqual(f.writable(), True)
00374             self.assertEqual(f.seekable(), True)
00375             self.write_ops(f)
00376         with self.open(support.TESTFN, "rb") as f:
00377             self.assertEqual(f.readable(), True)
00378             self.assertEqual(f.writable(), False)
00379             self.assertEqual(f.seekable(), True)
00380             self.read_ops(f, True)
00381 
00382     def test_readline(self):
00383         with self.open(support.TESTFN, "wb") as f:
00384             f.write(b"abc\ndef\nxyzzy\nfoo\x00bar\nanother line")
00385         with self.open(support.TESTFN, "rb") as f:
00386             self.assertEqual(f.readline(), b"abc\n")
00387             self.assertEqual(f.readline(10), b"def\n")
00388             self.assertEqual(f.readline(2), b"xy")
00389             self.assertEqual(f.readline(4), b"zzy\n")
00390             self.assertEqual(f.readline(), b"foo\x00bar\n")
00391             self.assertEqual(f.readline(None), b"another line")
00392             self.assertRaises(TypeError, f.readline, 5.3)
00393         with self.open(support.TESTFN, "r") as f:
00394             self.assertRaises(TypeError, f.readline, 5.3)
00395 
00396     def test_raw_bytes_io(self):
00397         f = self.BytesIO()
00398         self.write_ops(f)
00399         data = f.getvalue()
00400         self.assertEqual(data, b"hello world\n")
00401         f = self.BytesIO(data)
00402         self.read_ops(f, True)
00403 
00404     def test_large_file_ops(self):
00405         # On Windows and Mac OSX this test comsumes large resources; It takes
00406         # a long time to build the >2GB file and takes >2GB of disk space
00407         # therefore the resource must be enabled to run this test.
00408         if sys.platform[:3] == 'win' or sys.platform == 'darwin':
00409             if not support.is_resource_enabled("largefile"):
00410                 print("\nTesting large file ops skipped on %s." % sys.platform,
00411                       file=sys.stderr)
00412                 print("It requires %d bytes and a long time." % self.LARGE,
00413                       file=sys.stderr)
00414                 print("Use 'regrtest.py -u largefile test_io' to run it.",
00415                       file=sys.stderr)
00416                 return
00417         with self.open(support.TESTFN, "w+b", 0) as f:
00418             self.large_file_ops(f)
00419         with self.open(support.TESTFN, "w+b") as f:
00420             self.large_file_ops(f)
00421 
00422     def test_with_open(self):
00423         for bufsize in (0, 1, 100):
00424             f = None
00425             with self.open(support.TESTFN, "wb", bufsize) as f:
00426                 f.write(b"xxx")
00427             self.assertEqual(f.closed, True)
00428             f = None
00429             try:
00430                 with self.open(support.TESTFN, "wb", bufsize) as f:
00431                     1/0
00432             except ZeroDivisionError:
00433                 self.assertEqual(f.closed, True)
00434             else:
00435                 self.fail("1/0 didn't raise an exception")
00436 
00437     # issue 5008
00438     def test_append_mode_tell(self):
00439         with self.open(support.TESTFN, "wb") as f:
00440             f.write(b"xxx")
00441         with self.open(support.TESTFN, "ab", buffering=0) as f:
00442             self.assertEqual(f.tell(), 3)
00443         with self.open(support.TESTFN, "ab") as f:
00444             self.assertEqual(f.tell(), 3)
00445         with self.open(support.TESTFN, "a") as f:
00446             self.assertTrue(f.tell() > 0)
00447 
00448     def test_destructor(self):
00449         record = []
00450         class MyFileIO(self.FileIO):
00451             def __del__(self):
00452                 record.append(1)
00453                 try:
00454                     f = super().__del__
00455                 except AttributeError:
00456                     pass
00457                 else:
00458                     f()
00459             def close(self):
00460                 record.append(2)
00461                 super().close()
00462             def flush(self):
00463                 record.append(3)
00464                 super().flush()
00465         with support.check_warnings(('', ResourceWarning)):
00466             f = MyFileIO(support.TESTFN, "wb")
00467             f.write(b"xxx")
00468             del f
00469             support.gc_collect()
00470             self.assertEqual(record, [1, 2, 3])
00471             with self.open(support.TESTFN, "rb") as f:
00472                 self.assertEqual(f.read(), b"xxx")
00473 
00474     def _check_base_destructor(self, base):
00475         record = []
00476         class MyIO(base):
00477             def __init__(self):
00478                 # This exercises the availability of attributes on object
00479                 # destruction.
00480                 # (in the C version, close() is called by the tp_dealloc
00481                 # function, not by __del__)
00482                 self.on_del = 1
00483                 self.on_close = 2
00484                 self.on_flush = 3
00485             def __del__(self):
00486                 record.append(self.on_del)
00487                 try:
00488                     f = super().__del__
00489                 except AttributeError:
00490                     pass
00491                 else:
00492                     f()
00493             def close(self):
00494                 record.append(self.on_close)
00495                 super().close()
00496             def flush(self):
00497                 record.append(self.on_flush)
00498                 super().flush()
00499         f = MyIO()
00500         del f
00501         support.gc_collect()
00502         self.assertEqual(record, [1, 2, 3])
00503 
00504     def test_IOBase_destructor(self):
00505         self._check_base_destructor(self.IOBase)
00506 
00507     def test_RawIOBase_destructor(self):
00508         self._check_base_destructor(self.RawIOBase)
00509 
00510     def test_BufferedIOBase_destructor(self):
00511         self._check_base_destructor(self.BufferedIOBase)
00512 
00513     def test_TextIOBase_destructor(self):
00514         self._check_base_destructor(self.TextIOBase)
00515 
00516     def test_close_flushes(self):
00517         with self.open(support.TESTFN, "wb") as f:
00518             f.write(b"xxx")
00519         with self.open(support.TESTFN, "rb") as f:
00520             self.assertEqual(f.read(), b"xxx")
00521 
00522     def test_array_writes(self):
00523         a = array.array('i', range(10))
00524         n = len(a.tobytes())
00525         with self.open(support.TESTFN, "wb", 0) as f:
00526             self.assertEqual(f.write(a), n)
00527         with self.open(support.TESTFN, "wb") as f:
00528             self.assertEqual(f.write(a), n)
00529 
00530     def test_closefd(self):
00531         self.assertRaises(ValueError, self.open, support.TESTFN, 'w',
00532                           closefd=False)
00533 
00534     def test_read_closed(self):
00535         with self.open(support.TESTFN, "w") as f:
00536             f.write("egg\n")
00537         with self.open(support.TESTFN, "r") as f:
00538             file = self.open(f.fileno(), "r", closefd=False)
00539             self.assertEqual(file.read(), "egg\n")
00540             file.seek(0)
00541             file.close()
00542             self.assertRaises(ValueError, file.read)
00543 
00544     def test_no_closefd_with_filename(self):
00545         # can't use closefd in combination with a file name
00546         self.assertRaises(ValueError, self.open, support.TESTFN, "r", closefd=False)
00547 
00548     def test_closefd_attr(self):
00549         with self.open(support.TESTFN, "wb") as f:
00550             f.write(b"egg\n")
00551         with self.open(support.TESTFN, "r") as f:
00552             self.assertEqual(f.buffer.raw.closefd, True)
00553             file = self.open(f.fileno(), "r", closefd=False)
00554             self.assertEqual(file.buffer.raw.closefd, False)
00555 
00556     def test_garbage_collection(self):
00557         # FileIO objects are collected, and collecting them flushes
00558         # all data to disk.
00559         with support.check_warnings(('', ResourceWarning)):
00560             f = self.FileIO(support.TESTFN, "wb")
00561             f.write(b"abcxxx")
00562             f.f = f
00563             wr = weakref.ref(f)
00564             del f
00565             support.gc_collect()
00566         self.assertTrue(wr() is None, wr)
00567         with self.open(support.TESTFN, "rb") as f:
00568             self.assertEqual(f.read(), b"abcxxx")
00569 
00570     def test_unbounded_file(self):
00571         # Issue #1174606: reading from an unbounded stream such as /dev/zero.
00572         zero = "/dev/zero"
00573         if not os.path.exists(zero):
00574             self.skipTest("{0} does not exist".format(zero))
00575         if sys.maxsize > 0x7FFFFFFF:
00576             self.skipTest("test can only run in a 32-bit address space")
00577         if support.real_max_memuse < support._2G:
00578             self.skipTest("test requires at least 2GB of memory")
00579         with self.open(zero, "rb", buffering=0) as f:
00580             self.assertRaises(OverflowError, f.read)
00581         with self.open(zero, "rb") as f:
00582             self.assertRaises(OverflowError, f.read)
00583         with self.open(zero, "r") as f:
00584             self.assertRaises(OverflowError, f.read)
00585 
00586     def test_flush_error_on_close(self):
00587         f = self.open(support.TESTFN, "wb", buffering=0)
00588         def bad_flush():
00589             raise IOError()
00590         f.flush = bad_flush
00591         self.assertRaises(IOError, f.close) # exception not swallowed
00592 
00593     def test_multi_close(self):
00594         f = self.open(support.TESTFN, "wb", buffering=0)
00595         f.close()
00596         f.close()
00597         f.close()
00598         self.assertRaises(ValueError, f.flush)
00599 
00600     def test_RawIOBase_read(self):
00601         # Exercise the default RawIOBase.read() implementation (which calls
00602         # readinto() internally).
00603         rawio = self.MockRawIOWithoutRead((b"abc", b"d", None, b"efg", None))
00604         self.assertEqual(rawio.read(2), b"ab")
00605         self.assertEqual(rawio.read(2), b"c")
00606         self.assertEqual(rawio.read(2), b"d")
00607         self.assertEqual(rawio.read(2), None)
00608         self.assertEqual(rawio.read(2), b"ef")
00609         self.assertEqual(rawio.read(2), b"g")
00610         self.assertEqual(rawio.read(2), None)
00611         self.assertEqual(rawio.read(2), b"")
00612 
00613 class CIOTest(IOTest):
00614 
00615     def test_IOBase_finalize(self):
00616         # Issue #12149: segmentation fault on _PyIOBase_finalize when both a
00617         # class which inherits IOBase and an object of this class are caught
00618         # in a reference cycle and close() is already in the method cache.
00619         class MyIO(self.IOBase):
00620             def close(self):
00621                 pass
00622 
00623         # create an instance to populate the method cache
00624         MyIO()
00625         obj = MyIO()
00626         obj.obj = obj
00627         wr = weakref.ref(obj)
00628         del MyIO
00629         del obj
00630         support.gc_collect()
00631         self.assertTrue(wr() is None, wr)
00632 
00633 class PyIOTest(IOTest):
00634     pass
00635 
00636 
00637 class CommonBufferedTests:
00638     # Tests common to BufferedReader, BufferedWriter and BufferedRandom
00639 
00640     def test_detach(self):
00641         raw = self.MockRawIO()
00642         buf = self.tp(raw)
00643         self.assertIs(buf.detach(), raw)
00644         self.assertRaises(ValueError, buf.detach)
00645 
00646     def test_fileno(self):
00647         rawio = self.MockRawIO()
00648         bufio = self.tp(rawio)
00649 
00650         self.assertEqual(42, bufio.fileno())
00651 
00652     def test_no_fileno(self):
00653         # XXX will we always have fileno() function? If so, kill
00654         # this test. Else, write it.
00655         pass
00656 
00657     def test_invalid_args(self):
00658         rawio = self.MockRawIO()
00659         bufio = self.tp(rawio)
00660         # Invalid whence
00661         self.assertRaises(ValueError, bufio.seek, 0, -1)
00662         self.assertRaises(ValueError, bufio.seek, 0, 3)
00663 
00664     def test_override_destructor(self):
00665         tp = self.tp
00666         record = []
00667         class MyBufferedIO(tp):
00668             def __del__(self):
00669                 record.append(1)
00670                 try:
00671                     f = super().__del__
00672                 except AttributeError:
00673                     pass
00674                 else:
00675                     f()
00676             def close(self):
00677                 record.append(2)
00678                 super().close()
00679             def flush(self):
00680                 record.append(3)
00681                 super().flush()
00682         rawio = self.MockRawIO()
00683         bufio = MyBufferedIO(rawio)
00684         writable = bufio.writable()
00685         del bufio
00686         support.gc_collect()
00687         if writable:
00688             self.assertEqual(record, [1, 2, 3])
00689         else:
00690             self.assertEqual(record, [1, 2])
00691 
00692     def test_context_manager(self):
00693         # Test usability as a context manager
00694         rawio = self.MockRawIO()
00695         bufio = self.tp(rawio)
00696         def _with():
00697             with bufio:
00698                 pass
00699         _with()
00700         # bufio should now be closed, and using it a second time should raise
00701         # a ValueError.
00702         self.assertRaises(ValueError, _with)
00703 
00704     def test_error_through_destructor(self):
00705         # Test that the exception state is not modified by a destructor,
00706         # even if close() fails.
00707         rawio = self.CloseFailureIO()
00708         def f():
00709             self.tp(rawio).xyzzy
00710         with support.captured_output("stderr") as s:
00711             self.assertRaises(AttributeError, f)
00712         s = s.getvalue().strip()
00713         if s:
00714             # The destructor *may* have printed an unraisable error, check it
00715             self.assertEqual(len(s.splitlines()), 1)
00716             self.assertTrue(s.startswith("Exception IOError: "), s)
00717             self.assertTrue(s.endswith(" ignored"), s)
00718 
00719     def test_repr(self):
00720         raw = self.MockRawIO()
00721         b = self.tp(raw)
00722         clsname = "%s.%s" % (self.tp.__module__, self.tp.__name__)
00723         self.assertEqual(repr(b), "<%s>" % clsname)
00724         raw.name = "dummy"
00725         self.assertEqual(repr(b), "<%s name='dummy'>" % clsname)
00726         raw.name = b"dummy"
00727         self.assertEqual(repr(b), "<%s name=b'dummy'>" % clsname)
00728 
00729     def test_flush_error_on_close(self):
00730         raw = self.MockRawIO()
00731         def bad_flush():
00732             raise IOError()
00733         raw.flush = bad_flush
00734         b = self.tp(raw)
00735         self.assertRaises(IOError, b.close) # exception not swallowed
00736 
00737     def test_multi_close(self):
00738         raw = self.MockRawIO()
00739         b = self.tp(raw)
00740         b.close()
00741         b.close()
00742         b.close()
00743         self.assertRaises(ValueError, b.flush)
00744 
00745     def test_unseekable(self):
00746         bufio = self.tp(self.MockUnseekableIO(b"A" * 10))
00747         self.assertRaises(self.UnsupportedOperation, bufio.tell)
00748         self.assertRaises(self.UnsupportedOperation, bufio.seek, 0)
00749 
00750     def test_readonly_attributes(self):
00751         raw = self.MockRawIO()
00752         buf = self.tp(raw)
00753         x = self.MockRawIO()
00754         with self.assertRaises(AttributeError):
00755             buf.raw = x
00756 
00757 
00758 class BufferedReaderTest(unittest.TestCase, CommonBufferedTests):
00759     read_mode = "rb"
00760 
00761     def test_constructor(self):
00762         rawio = self.MockRawIO([b"abc"])
00763         bufio = self.tp(rawio)
00764         bufio.__init__(rawio)
00765         bufio.__init__(rawio, buffer_size=1024)
00766         bufio.__init__(rawio, buffer_size=16)
00767         self.assertEqual(b"abc", bufio.read())
00768         self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
00769         self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
00770         self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
00771         rawio = self.MockRawIO([b"abc"])
00772         bufio.__init__(rawio)
00773         self.assertEqual(b"abc", bufio.read())
00774 
00775     def test_read(self):
00776         for arg in (None, 7):
00777             rawio = self.MockRawIO((b"abc", b"d", b"efg"))
00778             bufio = self.tp(rawio)
00779             self.assertEqual(b"abcdefg", bufio.read(arg))
00780         # Invalid args
00781         self.assertRaises(ValueError, bufio.read, -2)
00782 
00783     def test_read1(self):
00784         rawio = self.MockRawIO((b"abc", b"d", b"efg"))
00785         bufio = self.tp(rawio)
00786         self.assertEqual(b"a", bufio.read(1))
00787         self.assertEqual(b"b", bufio.read1(1))
00788         self.assertEqual(rawio._reads, 1)
00789         self.assertEqual(b"c", bufio.read1(100))
00790         self.assertEqual(rawio._reads, 1)
00791         self.assertEqual(b"d", bufio.read1(100))
00792         self.assertEqual(rawio._reads, 2)
00793         self.assertEqual(b"efg", bufio.read1(100))
00794         self.assertEqual(rawio._reads, 3)
00795         self.assertEqual(b"", bufio.read1(100))
00796         self.assertEqual(rawio._reads, 4)
00797         # Invalid args
00798         self.assertRaises(ValueError, bufio.read1, -1)
00799 
00800     def test_readinto(self):
00801         rawio = self.MockRawIO((b"abc", b"d", b"efg"))
00802         bufio = self.tp(rawio)
00803         b = bytearray(2)
00804         self.assertEqual(bufio.readinto(b), 2)
00805         self.assertEqual(b, b"ab")
00806         self.assertEqual(bufio.readinto(b), 2)
00807         self.assertEqual(b, b"cd")
00808         self.assertEqual(bufio.readinto(b), 2)
00809         self.assertEqual(b, b"ef")
00810         self.assertEqual(bufio.readinto(b), 1)
00811         self.assertEqual(b, b"gf")
00812         self.assertEqual(bufio.readinto(b), 0)
00813         self.assertEqual(b, b"gf")
00814 
00815     def test_readlines(self):
00816         def bufio():
00817             rawio = self.MockRawIO((b"abc\n", b"d\n", b"ef"))
00818             return self.tp(rawio)
00819         self.assertEqual(bufio().readlines(), [b"abc\n", b"d\n", b"ef"])
00820         self.assertEqual(bufio().readlines(5), [b"abc\n", b"d\n"])
00821         self.assertEqual(bufio().readlines(None), [b"abc\n", b"d\n", b"ef"])
00822 
00823     def test_buffering(self):
00824         data = b"abcdefghi"
00825         dlen = len(data)
00826 
00827         tests = [
00828             [ 100, [ 3, 1, 4, 8 ], [ dlen, 0 ] ],
00829             [ 100, [ 3, 3, 3],     [ dlen ]    ],
00830             [   4, [ 1, 2, 4, 2 ], [ 4, 4, 1 ] ],
00831         ]
00832 
00833         for bufsize, buf_read_sizes, raw_read_sizes in tests:
00834             rawio = self.MockFileIO(data)
00835             bufio = self.tp(rawio, buffer_size=bufsize)
00836             pos = 0
00837             for nbytes in buf_read_sizes:
00838                 self.assertEqual(bufio.read(nbytes), data[pos:pos+nbytes])
00839                 pos += nbytes
00840             # this is mildly implementation-dependent
00841             self.assertEqual(rawio.read_history, raw_read_sizes)
00842 
00843     def test_read_non_blocking(self):
00844         # Inject some None's in there to simulate EWOULDBLOCK
00845         rawio = self.MockRawIO((b"abc", b"d", None, b"efg", None, None, None))
00846         bufio = self.tp(rawio)
00847         self.assertEqual(b"abcd", bufio.read(6))
00848         self.assertEqual(b"e", bufio.read(1))
00849         self.assertEqual(b"fg", bufio.read())
00850         self.assertEqual(b"", bufio.peek(1))
00851         self.assertIsNone(bufio.read())
00852         self.assertEqual(b"", bufio.read())
00853 
00854         rawio = self.MockRawIO((b"a", None, None))
00855         self.assertEqual(b"a", rawio.readall())
00856         self.assertIsNone(rawio.readall())
00857 
00858     def test_read_past_eof(self):
00859         rawio = self.MockRawIO((b"abc", b"d", b"efg"))
00860         bufio = self.tp(rawio)
00861 
00862         self.assertEqual(b"abcdefg", bufio.read(9000))
00863 
00864     def test_read_all(self):
00865         rawio = self.MockRawIO((b"abc", b"d", b"efg"))
00866         bufio = self.tp(rawio)
00867 
00868         self.assertEqual(b"abcdefg", bufio.read())
00869 
00870     @unittest.skipUnless(threading, 'Threading required for this test.')
00871     @support.requires_resource('cpu')
00872     def test_threads(self):
00873         try:
00874             # Write out many bytes with exactly the same number of 0's,
00875             # 1's... 255's. This will help us check that concurrent reading
00876             # doesn't duplicate or forget contents.
00877             N = 1000
00878             l = list(range(256)) * N
00879             random.shuffle(l)
00880             s = bytes(bytearray(l))
00881             with self.open(support.TESTFN, "wb") as f:
00882                 f.write(s)
00883             with self.open(support.TESTFN, self.read_mode, buffering=0) as raw:
00884                 bufio = self.tp(raw, 8)
00885                 errors = []
00886                 results = []
00887                 def f():
00888                     try:
00889                         # Intra-buffer read then buffer-flushing read
00890                         for n in cycle([1, 19]):
00891                             s = bufio.read(n)
00892                             if not s:
00893                                 break
00894                             # list.append() is atomic
00895                             results.append(s)
00896                     except Exception as e:
00897                         errors.append(e)
00898                         raise
00899                 threads = [threading.Thread(target=f) for x in range(20)]
00900                 for t in threads:
00901                     t.start()
00902                 time.sleep(0.02) # yield
00903                 for t in threads:
00904                     t.join()
00905                 self.assertFalse(errors,
00906                     "the following exceptions were caught: %r" % errors)
00907                 s = b''.join(results)
00908                 for i in range(256):
00909                     c = bytes(bytearray([i]))
00910                     self.assertEqual(s.count(c), N)
00911         finally:
00912             support.unlink(support.TESTFN)
00913 
00914     def test_misbehaved_io(self):
00915         rawio = self.MisbehavedRawIO((b"abc", b"d", b"efg"))
00916         bufio = self.tp(rawio)
00917         self.assertRaises(IOError, bufio.seek, 0)
00918         self.assertRaises(IOError, bufio.tell)
00919 
00920     def test_no_extraneous_read(self):
00921         # Issue #9550; when the raw IO object has satisfied the read request,
00922         # we should not issue any additional reads, otherwise it may block
00923         # (e.g. socket).
00924         bufsize = 16
00925         for n in (2, bufsize - 1, bufsize, bufsize + 1, bufsize * 2):
00926             rawio = self.MockRawIO([b"x" * n])
00927             bufio = self.tp(rawio, bufsize)
00928             self.assertEqual(bufio.read(n), b"x" * n)
00929             # Simple case: one raw read is enough to satisfy the request.
00930             self.assertEqual(rawio._extraneous_reads, 0,
00931                              "failed for {}: {} != 0".format(n, rawio._extraneous_reads))
00932             # A more complex case where two raw reads are needed to satisfy
00933             # the request.
00934             rawio = self.MockRawIO([b"x" * (n - 1), b"x"])
00935             bufio = self.tp(rawio, bufsize)
00936             self.assertEqual(bufio.read(n), b"x" * n)
00937             self.assertEqual(rawio._extraneous_reads, 0,
00938                              "failed for {}: {} != 0".format(n, rawio._extraneous_reads))
00939 
00940 
00941 class CBufferedReaderTest(BufferedReaderTest):
00942     tp = io.BufferedReader
00943 
00944     def test_constructor(self):
00945         BufferedReaderTest.test_constructor(self)
00946         # The allocation can succeed on 32-bit builds, e.g. with more
00947         # than 2GB RAM and a 64-bit kernel.
00948         if sys.maxsize > 0x7FFFFFFF:
00949             rawio = self.MockRawIO()
00950             bufio = self.tp(rawio)
00951             self.assertRaises((OverflowError, MemoryError, ValueError),
00952                 bufio.__init__, rawio, sys.maxsize)
00953 
00954     def test_initialization(self):
00955         rawio = self.MockRawIO([b"abc"])
00956         bufio = self.tp(rawio)
00957         self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
00958         self.assertRaises(ValueError, bufio.read)
00959         self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
00960         self.assertRaises(ValueError, bufio.read)
00961         self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
00962         self.assertRaises(ValueError, bufio.read)
00963 
00964     def test_misbehaved_io_read(self):
00965         rawio = self.MisbehavedRawIO((b"abc", b"d", b"efg"))
00966         bufio = self.tp(rawio)
00967         # _pyio.BufferedReader seems to implement reading different, so that
00968         # checking this is not so easy.
00969         self.assertRaises(IOError, bufio.read, 10)
00970 
00971     def test_garbage_collection(self):
00972         # C BufferedReader objects are collected.
00973         # The Python version has __del__, so it ends into gc.garbage instead
00974         rawio = self.FileIO(support.TESTFN, "w+b")
00975         f = self.tp(rawio)
00976         f.f = f
00977         wr = weakref.ref(f)
00978         del f
00979         support.gc_collect()
00980         self.assertTrue(wr() is None, wr)
00981 
00982 class PyBufferedReaderTest(BufferedReaderTest):
00983     tp = pyio.BufferedReader
00984 
00985 
00986 class BufferedWriterTest(unittest.TestCase, CommonBufferedTests):
00987     write_mode = "wb"
00988 
00989     def test_constructor(self):
00990         rawio = self.MockRawIO()
00991         bufio = self.tp(rawio)
00992         bufio.__init__(rawio)
00993         bufio.__init__(rawio, buffer_size=1024)
00994         bufio.__init__(rawio, buffer_size=16)
00995         self.assertEqual(3, bufio.write(b"abc"))
00996         bufio.flush()
00997         self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
00998         self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
00999         self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
01000         bufio.__init__(rawio)
01001         self.assertEqual(3, bufio.write(b"ghi"))
01002         bufio.flush()
01003         self.assertEqual(b"".join(rawio._write_stack), b"abcghi")
01004 
01005     def test_detach_flush(self):
01006         raw = self.MockRawIO()
01007         buf = self.tp(raw)
01008         buf.write(b"howdy!")
01009         self.assertFalse(raw._write_stack)
01010         buf.detach()
01011         self.assertEqual(raw._write_stack, [b"howdy!"])
01012 
01013     def test_write(self):
01014         # Write to the buffered IO but don't overflow the buffer.
01015         writer = self.MockRawIO()
01016         bufio = self.tp(writer, 8)
01017         bufio.write(b"abc")
01018         self.assertFalse(writer._write_stack)
01019 
01020     def test_write_overflow(self):
01021         writer = self.MockRawIO()
01022         bufio = self.tp(writer, 8)
01023         contents = b"abcdefghijklmnop"
01024         for n in range(0, len(contents), 3):
01025             bufio.write(contents[n:n+3])
01026         flushed = b"".join(writer._write_stack)
01027         # At least (total - 8) bytes were implicitly flushed, perhaps more
01028         # depending on the implementation.
01029         self.assertTrue(flushed.startswith(contents[:-8]), flushed)
01030 
01031     def check_writes(self, intermediate_func):
01032         # Lots of writes, test the flushed output is as expected.
01033         contents = bytes(range(256)) * 1000
01034         n = 0
01035         writer = self.MockRawIO()
01036         bufio = self.tp(writer, 13)
01037         # Generator of write sizes: repeat each N 15 times then proceed to N+1
01038         def gen_sizes():
01039             for size in count(1):
01040                 for i in range(15):
01041                     yield size
01042         sizes = gen_sizes()
01043         while n < len(contents):
01044             size = min(next(sizes), len(contents) - n)
01045             self.assertEqual(bufio.write(contents[n:n+size]), size)
01046             intermediate_func(bufio)
01047             n += size
01048         bufio.flush()
01049         self.assertEqual(contents, b"".join(writer._write_stack))
01050 
01051     def test_writes(self):
01052         self.check_writes(lambda bufio: None)
01053 
01054     def test_writes_and_flushes(self):
01055         self.check_writes(lambda bufio: bufio.flush())
01056 
01057     def test_writes_and_seeks(self):
01058         def _seekabs(bufio):
01059             pos = bufio.tell()
01060             bufio.seek(pos + 1, 0)
01061             bufio.seek(pos - 1, 0)
01062             bufio.seek(pos, 0)
01063         self.check_writes(_seekabs)
01064         def _seekrel(bufio):
01065             pos = bufio.seek(0, 1)
01066             bufio.seek(+1, 1)
01067             bufio.seek(-1, 1)
01068             bufio.seek(pos, 0)
01069         self.check_writes(_seekrel)
01070 
01071     def test_writes_and_truncates(self):
01072         self.check_writes(lambda bufio: bufio.truncate(bufio.tell()))
01073 
01074     def test_write_non_blocking(self):
01075         raw = self.MockNonBlockWriterIO()
01076         bufio = self.tp(raw, 8)
01077 
01078         self.assertEqual(bufio.write(b"abcd"), 4)
01079         self.assertEqual(bufio.write(b"efghi"), 5)
01080         # 1 byte will be written, the rest will be buffered
01081         raw.block_on(b"k")
01082         self.assertEqual(bufio.write(b"jklmn"), 5)
01083 
01084         # 8 bytes will be written, 8 will be buffered and the rest will be lost
01085         raw.block_on(b"0")
01086         try:
01087             bufio.write(b"opqrwxyz0123456789")
01088         except self.BlockingIOError as e:
01089             written = e.characters_written
01090         else:
01091             self.fail("BlockingIOError should have been raised")
01092         self.assertEqual(written, 16)
01093         self.assertEqual(raw.pop_written(),
01094             b"abcdefghijklmnopqrwxyz")
01095 
01096         self.assertEqual(bufio.write(b"ABCDEFGHI"), 9)
01097         s = raw.pop_written()
01098         # Previously buffered bytes were flushed
01099         self.assertTrue(s.startswith(b"01234567A"), s)
01100 
01101     def test_write_and_rewind(self):
01102         raw = io.BytesIO()
01103         bufio = self.tp(raw, 4)
01104         self.assertEqual(bufio.write(b"abcdef"), 6)
01105         self.assertEqual(bufio.tell(), 6)
01106         bufio.seek(0, 0)
01107         self.assertEqual(bufio.write(b"XY"), 2)
01108         bufio.seek(6, 0)
01109         self.assertEqual(raw.getvalue(), b"XYcdef")
01110         self.assertEqual(bufio.write(b"123456"), 6)
01111         bufio.flush()
01112         self.assertEqual(raw.getvalue(), b"XYcdef123456")
01113 
01114     def test_flush(self):
01115         writer = self.MockRawIO()
01116         bufio = self.tp(writer, 8)
01117         bufio.write(b"abc")
01118         bufio.flush()
01119         self.assertEqual(b"abc", writer._write_stack[0])
01120 
01121     def test_destructor(self):
01122         writer = self.MockRawIO()
01123         bufio = self.tp(writer, 8)
01124         bufio.write(b"abc")
01125         del bufio
01126         support.gc_collect()
01127         self.assertEqual(b"abc", writer._write_stack[0])
01128 
01129     def test_truncate(self):
01130         # Truncate implicitly flushes the buffer.
01131         with self.open(support.TESTFN, self.write_mode, buffering=0) as raw:
01132             bufio = self.tp(raw, 8)
01133             bufio.write(b"abcdef")
01134             self.assertEqual(bufio.truncate(3), 3)
01135             self.assertEqual(bufio.tell(), 6)
01136         with self.open(support.TESTFN, "rb", buffering=0) as f:
01137             self.assertEqual(f.read(), b"abc")
01138 
01139     @unittest.skipUnless(threading, 'Threading required for this test.')
01140     @support.requires_resource('cpu')
01141     def test_threads(self):
01142         try:
01143             # Write out many bytes from many threads and test they were
01144             # all flushed.
01145             N = 1000
01146             contents = bytes(range(256)) * N
01147             sizes = cycle([1, 19])
01148             n = 0
01149             queue = deque()
01150             while n < len(contents):
01151                 size = next(sizes)
01152                 queue.append(contents[n:n+size])
01153                 n += size
01154             del contents
01155             # We use a real file object because it allows us to
01156             # exercise situations where the GIL is released before
01157             # writing the buffer to the raw streams. This is in addition
01158             # to concurrency issues due to switching threads in the middle
01159             # of Python code.
01160             with self.open(support.TESTFN, self.write_mode, buffering=0) as raw:
01161                 bufio = self.tp(raw, 8)
01162                 errors = []
01163                 def f():
01164                     try:
01165                         while True:
01166                             try:
01167                                 s = queue.popleft()
01168                             except IndexError:
01169                                 return
01170                             bufio.write(s)
01171                     except Exception as e:
01172                         errors.append(e)
01173                         raise
01174                 threads = [threading.Thread(target=f) for x in range(20)]
01175                 for t in threads:
01176                     t.start()
01177                 time.sleep(0.02) # yield
01178                 for t in threads:
01179                     t.join()
01180                 self.assertFalse(errors,
01181                     "the following exceptions were caught: %r" % errors)
01182                 bufio.close()
01183             with self.open(support.TESTFN, "rb") as f:
01184                 s = f.read()
01185             for i in range(256):
01186                 self.assertEqual(s.count(bytes([i])), N)
01187         finally:
01188             support.unlink(support.TESTFN)
01189 
01190     def test_misbehaved_io(self):
01191         rawio = self.MisbehavedRawIO()
01192         bufio = self.tp(rawio, 5)
01193         self.assertRaises(IOError, bufio.seek, 0)
01194         self.assertRaises(IOError, bufio.tell)
01195         self.assertRaises(IOError, bufio.write, b"abcdef")
01196 
01197     def test_max_buffer_size_deprecation(self):
01198         with support.check_warnings(("max_buffer_size is deprecated",
01199                                      DeprecationWarning)):
01200             self.tp(self.MockRawIO(), 8, 12)
01201 
01202 
01203 class CBufferedWriterTest(BufferedWriterTest):
01204     tp = io.BufferedWriter
01205 
01206     def test_constructor(self):
01207         BufferedWriterTest.test_constructor(self)
01208         # The allocation can succeed on 32-bit builds, e.g. with more
01209         # than 2GB RAM and a 64-bit kernel.
01210         if sys.maxsize > 0x7FFFFFFF:
01211             rawio = self.MockRawIO()
01212             bufio = self.tp(rawio)
01213             self.assertRaises((OverflowError, MemoryError, ValueError),
01214                 bufio.__init__, rawio, sys.maxsize)
01215 
01216     def test_initialization(self):
01217         rawio = self.MockRawIO()
01218         bufio = self.tp(rawio)
01219         self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
01220         self.assertRaises(ValueError, bufio.write, b"def")
01221         self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
01222         self.assertRaises(ValueError, bufio.write, b"def")
01223         self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
01224         self.assertRaises(ValueError, bufio.write, b"def")
01225 
01226     def test_garbage_collection(self):
01227         # C BufferedWriter objects are collected, and collecting them flushes
01228         # all data to disk.
01229         # The Python version has __del__, so it ends into gc.garbage instead
01230         rawio = self.FileIO(support.TESTFN, "w+b")
01231         f = self.tp(rawio)
01232         f.write(b"123xxx")
01233         f.x = f
01234         wr = weakref.ref(f)
01235         del f
01236         support.gc_collect()
01237         self.assertTrue(wr() is None, wr)
01238         with self.open(support.TESTFN, "rb") as f:
01239             self.assertEqual(f.read(), b"123xxx")
01240 
01241 
01242 class PyBufferedWriterTest(BufferedWriterTest):
01243     tp = pyio.BufferedWriter
01244 
01245 class BufferedRWPairTest(unittest.TestCase):
01246 
01247     def test_constructor(self):
01248         pair = self.tp(self.MockRawIO(), self.MockRawIO())
01249         self.assertFalse(pair.closed)
01250 
01251     def test_detach(self):
01252         pair = self.tp(self.MockRawIO(), self.MockRawIO())
01253         self.assertRaises(self.UnsupportedOperation, pair.detach)
01254 
01255     def test_constructor_max_buffer_size_deprecation(self):
01256         with support.check_warnings(("max_buffer_size is deprecated",
01257                                      DeprecationWarning)):
01258             self.tp(self.MockRawIO(), self.MockRawIO(), 8, 12)
01259 
01260     def test_constructor_with_not_readable(self):
01261         class NotReadable(MockRawIO):
01262             def readable(self):
01263                 return False
01264 
01265         self.assertRaises(IOError, self.tp, NotReadable(), self.MockRawIO())
01266 
01267     def test_constructor_with_not_writeable(self):
01268         class NotWriteable(MockRawIO):
01269             def writable(self):
01270                 return False
01271 
01272         self.assertRaises(IOError, self.tp, self.MockRawIO(), NotWriteable())
01273 
01274     def test_read(self):
01275         pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
01276 
01277         self.assertEqual(pair.read(3), b"abc")
01278         self.assertEqual(pair.read(1), b"d")
01279         self.assertEqual(pair.read(), b"ef")
01280         pair = self.tp(self.BytesIO(b"abc"), self.MockRawIO())
01281         self.assertEqual(pair.read(None), b"abc")
01282 
01283     def test_readlines(self):
01284         pair = lambda: self.tp(self.BytesIO(b"abc\ndef\nh"), self.MockRawIO())
01285         self.assertEqual(pair().readlines(), [b"abc\n", b"def\n", b"h"])
01286         self.assertEqual(pair().readlines(), [b"abc\n", b"def\n", b"h"])
01287         self.assertEqual(pair().readlines(5), [b"abc\n", b"def\n"])
01288 
01289     def test_read1(self):
01290         # .read1() is delegated to the underlying reader object, so this test
01291         # can be shallow.
01292         pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
01293 
01294         self.assertEqual(pair.read1(3), b"abc")
01295 
01296     def test_readinto(self):
01297         pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
01298 
01299         data = bytearray(5)
01300         self.assertEqual(pair.readinto(data), 5)
01301         self.assertEqual(data, b"abcde")
01302 
01303     def test_write(self):
01304         w = self.MockRawIO()
01305         pair = self.tp(self.MockRawIO(), w)
01306 
01307         pair.write(b"abc")
01308         pair.flush()
01309         pair.write(b"def")
01310         pair.flush()
01311         self.assertEqual(w._write_stack, [b"abc", b"def"])
01312 
01313     def test_peek(self):
01314         pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
01315 
01316         self.assertTrue(pair.peek(3).startswith(b"abc"))
01317         self.assertEqual(pair.read(3), b"abc")
01318 
01319     def test_readable(self):
01320         pair = self.tp(self.MockRawIO(), self.MockRawIO())
01321         self.assertTrue(pair.readable())
01322 
01323     def test_writeable(self):
01324         pair = self.tp(self.MockRawIO(), self.MockRawIO())
01325         self.assertTrue(pair.writable())
01326 
01327     def test_seekable(self):
01328         # BufferedRWPairs are never seekable, even if their readers and writers
01329         # are.
01330         pair = self.tp(self.MockRawIO(), self.MockRawIO())
01331         self.assertFalse(pair.seekable())
01332 
01333     # .flush() is delegated to the underlying writer object and has been
01334     # tested in the test_write method.
01335 
01336     def test_close_and_closed(self):
01337         pair = self.tp(self.MockRawIO(), self.MockRawIO())
01338         self.assertFalse(pair.closed)
01339         pair.close()
01340         self.assertTrue(pair.closed)
01341 
01342     def test_isatty(self):
01343         class SelectableIsAtty(MockRawIO):
01344             def __init__(self, isatty):
01345                 MockRawIO.__init__(self)
01346                 self._isatty = isatty
01347 
01348             def isatty(self):
01349                 return self._isatty
01350 
01351         pair = self.tp(SelectableIsAtty(False), SelectableIsAtty(False))
01352         self.assertFalse(pair.isatty())
01353 
01354         pair = self.tp(SelectableIsAtty(True), SelectableIsAtty(False))
01355         self.assertTrue(pair.isatty())
01356 
01357         pair = self.tp(SelectableIsAtty(False), SelectableIsAtty(True))
01358         self.assertTrue(pair.isatty())
01359 
01360         pair = self.tp(SelectableIsAtty(True), SelectableIsAtty(True))
01361         self.assertTrue(pair.isatty())
01362 
01363 class CBufferedRWPairTest(BufferedRWPairTest):
01364     tp = io.BufferedRWPair
01365 
01366 class PyBufferedRWPairTest(BufferedRWPairTest):
01367     tp = pyio.BufferedRWPair
01368 
01369 
01370 class BufferedRandomTest(BufferedReaderTest, BufferedWriterTest):
01371     read_mode = "rb+"
01372     write_mode = "wb+"
01373 
01374     def test_constructor(self):
01375         BufferedReaderTest.test_constructor(self)
01376         BufferedWriterTest.test_constructor(self)
01377 
01378     def test_read_and_write(self):
01379         raw = self.MockRawIO((b"asdf", b"ghjk"))
01380         rw = self.tp(raw, 8)
01381 
01382         self.assertEqual(b"as", rw.read(2))
01383         rw.write(b"ddd")
01384         rw.write(b"eee")
01385         self.assertFalse(raw._write_stack) # Buffer writes
01386         self.assertEqual(b"ghjk", rw.read())
01387         self.assertEqual(b"dddeee", raw._write_stack[0])
01388 
01389     def test_seek_and_tell(self):
01390         raw = self.BytesIO(b"asdfghjkl")
01391         rw = self.tp(raw)
01392 
01393         self.assertEqual(b"as", rw.read(2))
01394         self.assertEqual(2, rw.tell())
01395         rw.seek(0, 0)
01396         self.assertEqual(b"asdf", rw.read(4))
01397 
01398         rw.write(b"asdf")
01399         rw.seek(0, 0)
01400         self.assertEqual(b"asdfasdfl", rw.read())
01401         self.assertEqual(9, rw.tell())
01402         rw.seek(-4, 2)
01403         self.assertEqual(5, rw.tell())
01404         rw.seek(2, 1)
01405         self.assertEqual(7, rw.tell())
01406         self.assertEqual(b"fl", rw.read(11))
01407         self.assertRaises(TypeError, rw.seek, 0.0)
01408 
01409     def check_flush_and_read(self, read_func):
01410         raw = self.BytesIO(b"abcdefghi")
01411         bufio = self.tp(raw)
01412 
01413         self.assertEqual(b"ab", read_func(bufio, 2))
01414         bufio.write(b"12")
01415         self.assertEqual(b"ef", read_func(bufio, 2))
01416         self.assertEqual(6, bufio.tell())
01417         bufio.flush()
01418         self.assertEqual(6, bufio.tell())
01419         self.assertEqual(b"ghi", read_func(bufio))
01420         raw.seek(0, 0)
01421         raw.write(b"XYZ")
01422         # flush() resets the read buffer
01423         bufio.flush()
01424         bufio.seek(0, 0)
01425         self.assertEqual(b"XYZ", read_func(bufio, 3))
01426 
01427     def test_flush_and_read(self):
01428         self.check_flush_and_read(lambda bufio, *args: bufio.read(*args))
01429 
01430     def test_flush_and_readinto(self):
01431         def _readinto(bufio, n=-1):
01432             b = bytearray(n if n >= 0 else 9999)
01433             n = bufio.readinto(b)
01434             return bytes(b[:n])
01435         self.check_flush_and_read(_readinto)
01436 
01437     def test_flush_and_peek(self):
01438         def _peek(bufio, n=-1):
01439             # This relies on the fact that the buffer can contain the whole
01440             # raw stream, otherwise peek() can return less.
01441             b = bufio.peek(n)
01442             if n != -1:
01443                 b = b[:n]
01444             bufio.seek(len(b), 1)
01445             return b
01446         self.check_flush_and_read(_peek)
01447 
01448     def test_flush_and_write(self):
01449         raw = self.BytesIO(b"abcdefghi")
01450         bufio = self.tp(raw)
01451 
01452         bufio.write(b"123")
01453         bufio.flush()
01454         bufio.write(b"45")
01455         bufio.flush()
01456         bufio.seek(0, 0)
01457         self.assertEqual(b"12345fghi", raw.getvalue())
01458         self.assertEqual(b"12345fghi", bufio.read())
01459 
01460     def test_threads(self):
01461         BufferedReaderTest.test_threads(self)
01462         BufferedWriterTest.test_threads(self)
01463 
01464     def test_writes_and_peek(self):
01465         def _peek(bufio):
01466             bufio.peek(1)
01467         self.check_writes(_peek)
01468         def _peek(bufio):
01469             pos = bufio.tell()
01470             bufio.seek(-1, 1)
01471             bufio.peek(1)
01472             bufio.seek(pos, 0)
01473         self.check_writes(_peek)
01474 
01475     def test_writes_and_reads(self):
01476         def _read(bufio):
01477             bufio.seek(-1, 1)
01478             bufio.read(1)
01479         self.check_writes(_read)
01480 
01481     def test_writes_and_read1s(self):
01482         def _read1(bufio):
01483             bufio.seek(-1, 1)
01484             bufio.read1(1)
01485         self.check_writes(_read1)
01486 
01487     def test_writes_and_readintos(self):
01488         def _read(bufio):
01489             bufio.seek(-1, 1)
01490             bufio.readinto(bytearray(1))
01491         self.check_writes(_read)
01492 
01493     def test_write_after_readahead(self):
01494         # Issue #6629: writing after the buffer was filled by readahead should
01495         # first rewind the raw stream.
01496         for overwrite_size in [1, 5]:
01497             raw = self.BytesIO(b"A" * 10)
01498             bufio = self.tp(raw, 4)
01499             # Trigger readahead
01500             self.assertEqual(bufio.read(1), b"A")
01501             self.assertEqual(bufio.tell(), 1)
01502             # Overwriting should rewind the raw stream if it needs so
01503             bufio.write(b"B" * overwrite_size)
01504             self.assertEqual(bufio.tell(), overwrite_size + 1)
01505             # If the write size was smaller than the buffer size, flush() and
01506             # check that rewind happens.
01507             bufio.flush()
01508             self.assertEqual(bufio.tell(), overwrite_size + 1)
01509             s = raw.getvalue()
01510             self.assertEqual(s,
01511                 b"A" + b"B" * overwrite_size + b"A" * (9 - overwrite_size))
01512 
01513     def test_write_rewind_write(self):
01514         # Various combinations of reading / writing / seeking backwards / writing again
01515         def mutate(bufio, pos1, pos2):
01516             assert pos2 >= pos1
01517             # Fill the buffer
01518             bufio.seek(pos1)
01519             bufio.read(pos2 - pos1)
01520             bufio.write(b'\x02')
01521             # This writes earlier than the previous write, but still inside
01522             # the buffer.
01523             bufio.seek(pos1)
01524             bufio.write(b'\x01')
01525 
01526         b = b"\x80\x81\x82\x83\x84"
01527         for i in range(0, len(b)):
01528             for j in range(i, len(b)):
01529                 raw = self.BytesIO(b)
01530                 bufio = self.tp(raw, 100)
01531                 mutate(bufio, i, j)
01532                 bufio.flush()
01533                 expected = bytearray(b)
01534                 expected[j] = 2
01535                 expected[i] = 1
01536                 self.assertEqual(raw.getvalue(), expected,
01537                                  "failed result for i=%d, j=%d" % (i, j))
01538 
01539     def test_truncate_after_read_or_write(self):
01540         raw = self.BytesIO(b"A" * 10)
01541         bufio = self.tp(raw, 100)
01542         self.assertEqual(bufio.read(2), b"AA") # the read buffer gets filled
01543         self.assertEqual(bufio.truncate(), 2)
01544         self.assertEqual(bufio.write(b"BB"), 2) # the write buffer increases
01545         self.assertEqual(bufio.truncate(), 4)
01546 
01547     def test_misbehaved_io(self):
01548         BufferedReaderTest.test_misbehaved_io(self)
01549         BufferedWriterTest.test_misbehaved_io(self)
01550 
01551     # You can't construct a BufferedRandom over a non-seekable stream.
01552     test_unseekable = None
01553 
01554 class CBufferedRandomTest(BufferedRandomTest):
01555     tp = io.BufferedRandom
01556 
01557     def test_constructor(self):
01558         BufferedRandomTest.test_constructor(self)
01559         # The allocation can succeed on 32-bit builds, e.g. with more
01560         # than 2GB RAM and a 64-bit kernel.
01561         if sys.maxsize > 0x7FFFFFFF:
01562             rawio = self.MockRawIO()
01563             bufio = self.tp(rawio)
01564             self.assertRaises((OverflowError, MemoryError, ValueError),
01565                 bufio.__init__, rawio, sys.maxsize)
01566 
01567     def test_garbage_collection(self):
01568         CBufferedReaderTest.test_garbage_collection(self)
01569         CBufferedWriterTest.test_garbage_collection(self)
01570 
01571 class PyBufferedRandomTest(BufferedRandomTest):
01572     tp = pyio.BufferedRandom
01573 
01574 
01575 # To fully exercise seek/tell, the StatefulIncrementalDecoder has these
01576 # properties:
01577 #   - A single output character can correspond to many bytes of input.
01578 #   - The number of input bytes to complete the character can be
01579 #     undetermined until the last input byte is received.
01580 #   - The number of input bytes can vary depending on previous input.
01581 #   - A single input byte can correspond to many characters of output.
01582 #   - The number of output characters can be undetermined until the
01583 #     last input byte is received.
01584 #   - The number of output characters can vary depending on previous input.
01585 
01586 class StatefulIncrementalDecoder(codecs.IncrementalDecoder):
01587     """
01588     For testing seek/tell behavior with a stateful, buffering decoder.
01589 
01590     Input is a sequence of words.  Words may be fixed-length (length set
01591     by input) or variable-length (period-terminated).  In variable-length
01592     mode, extra periods are ignored.  Possible words are:
01593       - 'i' followed by a number sets the input length, I (maximum 99).
01594         When I is set to 0, words are space-terminated.
01595       - 'o' followed by a number sets the output length, O (maximum 99).
01596       - Any other word is converted into a word followed by a period on
01597         the output.  The output word consists of the input word truncated
01598         or padded out with hyphens to make its length equal to O.  If O
01599         is 0, the word is output verbatim without truncating or padding.
01600     I and O are initially set to 1.  When I changes, any buffered input is
01601     re-scanned according to the new I.  EOF also terminates the last word.
01602     """
01603 
01604     def __init__(self, errors='strict'):
01605         codecs.IncrementalDecoder.__init__(self, errors)
01606         self.reset()
01607 
01608     def __repr__(self):
01609         return '<SID %x>' % id(self)
01610 
01611     def reset(self):
01612         self.i = 1
01613         self.o = 1
01614         self.buffer = bytearray()
01615 
01616     def getstate(self):
01617         i, o = self.i ^ 1, self.o ^ 1 # so that flags = 0 after reset()
01618         return bytes(self.buffer), i*100 + o
01619 
01620     def setstate(self, state):
01621         buffer, io = state
01622         self.buffer = bytearray(buffer)
01623         i, o = divmod(io, 100)
01624         self.i, self.o = i ^ 1, o ^ 1
01625 
01626     def decode(self, input, final=False):
01627         output = ''
01628         for b in input:
01629             if self.i == 0: # variable-length, terminated with period
01630                 if b == ord('.'):
01631                     if self.buffer:
01632                         output += self.process_word()
01633                 else:
01634                     self.buffer.append(b)
01635             else: # fixed-length, terminate after self.i bytes
01636                 self.buffer.append(b)
01637                 if len(self.buffer) == self.i:
01638                     output += self.process_word()
01639         if final and self.buffer: # EOF terminates the last word
01640             output += self.process_word()
01641         return output
01642 
01643     def process_word(self):
01644         output = ''
01645         if self.buffer[0] == ord('i'):
01646             self.i = min(99, int(self.buffer[1:] or 0)) # set input length
01647         elif self.buffer[0] == ord('o'):
01648             self.o = min(99, int(self.buffer[1:] or 0)) # set output length
01649         else:
01650             output = self.buffer.decode('ascii')
01651             if len(output) < self.o:
01652                 output += '-'*self.o # pad out with hyphens
01653             if self.o:
01654                 output = output[:self.o] # truncate to output length
01655             output += '.'
01656         self.buffer = bytearray()
01657         return output
01658 
01659     codecEnabled = False
01660 
01661     @classmethod
01662     def lookupTestDecoder(cls, name):
01663         if cls.codecEnabled and name == 'test_decoder':
01664             latin1 = codecs.lookup('latin-1')
01665             return codecs.CodecInfo(
01666                 name='test_decoder', encode=latin1.encode, decode=None,
01667                 incrementalencoder=None,
01668                 streamreader=None, streamwriter=None,
01669                 incrementaldecoder=cls)
01670 
01671 # Register the previous decoder for testing.
01672 # Disabled by default, tests will enable it.
01673 codecs.register(StatefulIncrementalDecoder.lookupTestDecoder)
01674 
01675 
01676 class StatefulIncrementalDecoderTest(unittest.TestCase):
01677     """
01678     Make sure the StatefulIncrementalDecoder actually works.
01679     """
01680 
01681     test_cases = [
01682         # I=1, O=1 (fixed-length input == fixed-length output)
01683         (b'abcd', False, 'a.b.c.d.'),
01684         # I=0, O=0 (variable-length input, variable-length output)
01685         (b'oiabcd', True, 'abcd.'),
01686         # I=0, O=0 (should ignore extra periods)
01687         (b'oi...abcd...', True, 'abcd.'),
01688         # I=0, O=6 (variable-length input, fixed-length output)
01689         (b'i.o6.x.xyz.toolongtofit.', False, 'x-----.xyz---.toolon.'),
01690         # I=2, O=6 (fixed-length input < fixed-length output)
01691         (b'i.i2.o6xyz', True, 'xy----.z-----.'),
01692         # I=6, O=3 (fixed-length input > fixed-length output)
01693         (b'i.o3.i6.abcdefghijklmnop', True, 'abc.ghi.mno.'),
01694         # I=0, then 3; O=29, then 15 (with longer output)
01695         (b'i.o29.a.b.cde.o15.abcdefghijabcdefghij.i3.a.b.c.d.ei00k.l.m', True,
01696          'a----------------------------.' +
01697          'b----------------------------.' +
01698          'cde--------------------------.' +
01699          'abcdefghijabcde.' +
01700          'a.b------------.' +
01701          '.c.------------.' +
01702          'd.e------------.' +
01703          'k--------------.' +
01704          'l--------------.' +
01705          'm--------------.')
01706     ]
01707 
01708     def test_decoder(self):
01709         # Try a few one-shot test cases.
01710         for input, eof, output in self.test_cases:
01711             d = StatefulIncrementalDecoder()
01712             self.assertEqual(d.decode(input, eof), output)
01713 
01714         # Also test an unfinished decode, followed by forcing EOF.
01715         d = StatefulIncrementalDecoder()
01716         self.assertEqual(d.decode(b'oiabcd'), '')
01717         self.assertEqual(d.decode(b'', 1), 'abcd.')
01718 
01719 class TextIOWrapperTest(unittest.TestCase):
01720 
01721     def setUp(self):
01722         self.testdata = b"AAA\r\nBBB\rCCC\r\nDDD\nEEE\r\n"
01723         self.normalized = b"AAA\nBBB\nCCC\nDDD\nEEE\n".decode("ascii")
01724         support.unlink(support.TESTFN)
01725 
01726     def tearDown(self):
01727         support.unlink(support.TESTFN)
01728 
01729     def test_constructor(self):
01730         r = self.BytesIO(b"\xc3\xa9\n\n")
01731         b = self.BufferedReader(r, 1000)
01732         t = self.TextIOWrapper(b)
01733         t.__init__(b, encoding="latin1", newline="\r\n")
01734         self.assertEqual(t.encoding, "latin1")
01735         self.assertEqual(t.line_buffering, False)
01736         t.__init__(b, encoding="utf8", line_buffering=True)
01737         self.assertEqual(t.encoding, "utf8")
01738         self.assertEqual(t.line_buffering, True)
01739         self.assertEqual("\xe9\n", t.readline())
01740         self.assertRaises(TypeError, t.__init__, b, newline=42)
01741         self.assertRaises(ValueError, t.__init__, b, newline='xyzzy')
01742 
01743     def test_detach(self):
01744         r = self.BytesIO()
01745         b = self.BufferedWriter(r)
01746         t = self.TextIOWrapper(b)
01747         self.assertIs(t.detach(), b)
01748 
01749         t = self.TextIOWrapper(b, encoding="ascii")
01750         t.write("howdy")
01751         self.assertFalse(r.getvalue())
01752         t.detach()
01753         self.assertEqual(r.getvalue(), b"howdy")
01754         self.assertRaises(ValueError, t.detach)
01755 
01756     def test_repr(self):
01757         raw = self.BytesIO("hello".encode("utf-8"))
01758         b = self.BufferedReader(raw)
01759         t = self.TextIOWrapper(b, encoding="utf-8")
01760         modname = self.TextIOWrapper.__module__
01761         self.assertEqual(repr(t),
01762                          "<%s.TextIOWrapper encoding='utf-8'>" % modname)
01763         raw.name = "dummy"
01764         self.assertEqual(repr(t),
01765                          "<%s.TextIOWrapper name='dummy' encoding='utf-8'>" % modname)
01766         t.mode = "r"
01767         self.assertEqual(repr(t),
01768                          "<%s.TextIOWrapper name='dummy' mode='r' encoding='utf-8'>" % modname)
01769         raw.name = b"dummy"
01770         self.assertEqual(repr(t),
01771                          "<%s.TextIOWrapper name=b'dummy' mode='r' encoding='utf-8'>" % modname)
01772 
01773     def test_line_buffering(self):
01774         r = self.BytesIO()
01775         b = self.BufferedWriter(r, 1000)
01776         t = self.TextIOWrapper(b, newline="\n", line_buffering=True)
01777         t.write("X")
01778         self.assertEqual(r.getvalue(), b"")  # No flush happened
01779         t.write("Y\nZ")
01780         self.assertEqual(r.getvalue(), b"XY\nZ")  # All got flushed
01781         t.write("A\rB")
01782         self.assertEqual(r.getvalue(), b"XY\nZA\rB")
01783 
01784     def test_encoding(self):
01785         # Check the encoding attribute is always set, and valid
01786         b = self.BytesIO()
01787         t = self.TextIOWrapper(b, encoding="utf8")
01788         self.assertEqual(t.encoding, "utf8")
01789         t = self.TextIOWrapper(b)
01790         self.assertTrue(t.encoding is not None)
01791         codecs.lookup(t.encoding)
01792 
01793     def test_encoding_errors_reading(self):
01794         # (1) default
01795         b = self.BytesIO(b"abc\n\xff\n")
01796         t = self.TextIOWrapper(b, encoding="ascii")
01797         self.assertRaises(UnicodeError, t.read)
01798         # (2) explicit strict
01799         b = self.BytesIO(b"abc\n\xff\n")
01800         t = self.TextIOWrapper(b, encoding="ascii", errors="strict")
01801         self.assertRaises(UnicodeError, t.read)
01802         # (3) ignore
01803         b = self.BytesIO(b"abc\n\xff\n")
01804         t = self.TextIOWrapper(b, encoding="ascii", errors="ignore")
01805         self.assertEqual(t.read(), "abc\n\n")
01806         # (4) replace
01807         b = self.BytesIO(b"abc\n\xff\n")
01808         t = self.TextIOWrapper(b, encoding="ascii", errors="replace")
01809         self.assertEqual(t.read(), "abc\n\ufffd\n")
01810 
01811     def test_encoding_errors_writing(self):
01812         # (1) default
01813         b = self.BytesIO()
01814         t = self.TextIOWrapper(b, encoding="ascii")
01815         self.assertRaises(UnicodeError, t.write, "\xff")
01816         # (2) explicit strict
01817         b = self.BytesIO()
01818         t = self.TextIOWrapper(b, encoding="ascii", errors="strict")
01819         self.assertRaises(UnicodeError, t.write, "\xff")
01820         # (3) ignore
01821         b = self.BytesIO()
01822         t = self.TextIOWrapper(b, encoding="ascii", errors="ignore",
01823                              newline="\n")
01824         t.write("abc\xffdef\n")
01825         t.flush()
01826         self.assertEqual(b.getvalue(), b"abcdef\n")
01827         # (4) replace
01828         b = self.BytesIO()
01829         t = self.TextIOWrapper(b, encoding="ascii", errors="replace",
01830                              newline="\n")
01831         t.write("abc\xffdef\n")
01832         t.flush()
01833         self.assertEqual(b.getvalue(), b"abc?def\n")
01834 
01835     def test_newlines(self):
01836         input_lines = [ "unix\n", "windows\r\n", "os9\r", "last\n", "nonl" ]
01837 
01838         tests = [
01839             [ None, [ 'unix\n', 'windows\n', 'os9\n', 'last\n', 'nonl' ] ],
01840             [ '', input_lines ],
01841             [ '\n', [ "unix\n", "windows\r\n", "os9\rlast\n", "nonl" ] ],
01842             [ '\r\n', [ "unix\nwindows\r\n", "os9\rlast\nnonl" ] ],
01843             [ '\r', [ "unix\nwindows\r", "\nos9\r", "last\nnonl" ] ],
01844         ]
01845         encodings = (
01846             'utf-8', 'latin-1',
01847             'utf-16', 'utf-16-le', 'utf-16-be',
01848             'utf-32', 'utf-32-le', 'utf-32-be',
01849         )
01850 
01851         # Try a range of buffer sizes to test the case where \r is the last
01852         # character in TextIOWrapper._pending_line.
01853         for encoding in encodings:
01854             # XXX: str.encode() should return bytes
01855             data = bytes(''.join(input_lines).encode(encoding))
01856             for do_reads in (False, True):
01857                 for bufsize in range(1, 10):
01858                     for newline, exp_lines in tests:
01859                         bufio = self.BufferedReader(self.BytesIO(data), bufsize)
01860                         textio = self.TextIOWrapper(bufio, newline=newline,
01861                                                   encoding=encoding)
01862                         if do_reads:
01863                             got_lines = []
01864                             while True:
01865                                 c2 = textio.read(2)
01866                                 if c2 == '':
01867                                     break
01868                                 self.assertEqual(len(c2), 2)
01869                                 got_lines.append(c2 + textio.readline())
01870                         else:
01871                             got_lines = list(textio)
01872 
01873                         for got_line, exp_line in zip(got_lines, exp_lines):
01874                             self.assertEqual(got_line, exp_line)
01875                         self.assertEqual(len(got_lines), len(exp_lines))
01876 
01877     def test_newlines_input(self):
01878         testdata = b"AAA\nBB\x00B\nCCC\rDDD\rEEE\r\nFFF\r\nGGG"
01879         normalized = testdata.replace(b"\r\n", b"\n").replace(b"\r", b"\n")
01880         for newline, expected in [
01881             (None, normalized.decode("ascii").splitlines(True)),
01882             ("", testdata.decode("ascii").splitlines(True)),
01883             ("\n", ["AAA\n", "BB\x00B\n", "CCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
01884             ("\r\n", ["AAA\nBB\x00B\nCCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
01885             ("\r",  ["AAA\nBB\x00B\nCCC\r", "DDD\r", "EEE\r", "\nFFF\r", "\nGGG"]),
01886             ]:
01887             buf = self.BytesIO(testdata)
01888             txt = self.TextIOWrapper(buf, encoding="ascii", newline=newline)
01889             self.assertEqual(txt.readlines(), expected)
01890             txt.seek(0)
01891             self.assertEqual(txt.read(), "".join(expected))
01892 
01893     def test_newlines_output(self):
01894         testdict = {
01895             "": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
01896             "\n": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
01897             "\r": b"AAA\rBBB\rCCC\rX\rY\r\rZ",
01898             "\r\n": b"AAA\r\nBBB\r\nCCC\r\nX\rY\r\r\nZ",
01899             }
01900         tests = [(None, testdict[os.linesep])] + sorted(testdict.items())
01901         for newline, expected in tests:
01902             buf = self.BytesIO()
01903             txt = self.TextIOWrapper(buf, encoding="ascii", newline=newline)
01904             txt.write("AAA\nB")
01905             txt.write("BB\nCCC\n")
01906             txt.write("X\rY\r\nZ")
01907             txt.flush()
01908             self.assertEqual(buf.closed, False)
01909             self.assertEqual(buf.getvalue(), expected)
01910 
01911     def test_destructor(self):
01912         l = []
01913         base = self.BytesIO
01914         class MyBytesIO(base):
01915             def close(self):
01916                 l.append(self.getvalue())
01917                 base.close(self)
01918         b = MyBytesIO()
01919         t = self.TextIOWrapper(b, encoding="ascii")
01920         t.write("abc")
01921         del t
01922         support.gc_collect()
01923         self.assertEqual([b"abc"], l)
01924 
01925     def test_override_destructor(self):
01926         record = []
01927         class MyTextIO(self.TextIOWrapper):
01928             def __del__(self):
01929                 record.append(1)
01930                 try:
01931                     f = super().__del__
01932                 except AttributeError:
01933                     pass
01934                 else:
01935                     f()
01936             def close(self):
01937                 record.append(2)
01938                 super().close()
01939             def flush(self):
01940                 record.append(3)
01941                 super().flush()
01942         b = self.BytesIO()
01943         t = MyTextIO(b, encoding="ascii")
01944         del t
01945         support.gc_collect()
01946         self.assertEqual(record, [1, 2, 3])
01947 
01948     def test_error_through_destructor(self):
01949         # Test that the exception state is not modified by a destructor,
01950         # even if close() fails.
01951         rawio = self.CloseFailureIO()
01952         def f():
01953             self.TextIOWrapper(rawio).xyzzy
01954         with support.captured_output("stderr") as s:
01955             self.assertRaises(AttributeError, f)
01956         s = s.getvalue().strip()
01957         if s:
01958             # The destructor *may* have printed an unraisable error, check it
01959             self.assertEqual(len(s.splitlines()), 1)
01960             self.assertTrue(s.startswith("Exception IOError: "), s)
01961             self.assertTrue(s.endswith(" ignored"), s)
01962 
01963     # Systematic tests of the text I/O API
01964 
01965     def test_basic_io(self):
01966         for chunksize in (1, 2, 3, 4, 5, 15, 16, 17, 31, 32, 33, 63, 64, 65):
01967             for enc in "ascii", "latin1", "utf8" :# , "utf-16-be", "utf-16-le":
01968                 f = self.open(support.TESTFN, "w+", encoding=enc)
01969                 f._CHUNK_SIZE = chunksize
01970                 self.assertEqual(f.write("abc"), 3)
01971                 f.close()
01972                 f = self.open(support.TESTFN, "r+", encoding=enc)
01973                 f._CHUNK_SIZE = chunksize
01974                 self.assertEqual(f.tell(), 0)
01975                 self.assertEqual(f.read(), "abc")
01976                 cookie = f.tell()
01977                 self.assertEqual(f.seek(0), 0)
01978                 self.assertEqual(f.read(None), "abc")
01979                 f.seek(0)
01980                 self.assertEqual(f.read(2), "ab")
01981                 self.assertEqual(f.read(1), "c")
01982                 self.assertEqual(f.read(1), "")
01983                 self.assertEqual(f.read(), "")
01984                 self.assertEqual(f.tell(), cookie)
01985                 self.assertEqual(f.seek(0), 0)
01986                 self.assertEqual(f.seek(0, 2), cookie)
01987                 self.assertEqual(f.write("def"), 3)
01988                 self.assertEqual(f.seek(cookie), cookie)
01989                 self.assertEqual(f.read(), "def")
01990                 if enc.startswith("utf"):
01991                     self.multi_line_test(f, enc)
01992                 f.close()
01993 
01994     def multi_line_test(self, f, enc):
01995         f.seek(0)
01996         f.truncate()
01997         sample = "s\xff\u0fff\uffff"
01998         wlines = []
01999         for size in (0, 1, 2, 3, 4, 5, 30, 31, 32, 33, 62, 63, 64, 65, 1000):
02000             chars = []
02001             for i in range(size):
02002                 chars.append(sample[i % len(sample)])
02003             line = "".join(chars) + "\n"
02004             wlines.append((f.tell(), line))
02005             f.write(line)
02006         f.seek(0)
02007         rlines = []
02008         while True:
02009             pos = f.tell()
02010             line = f.readline()
02011             if not line:
02012                 break
02013             rlines.append((pos, line))
02014         self.assertEqual(rlines, wlines)
02015 
02016     def test_telling(self):
02017         f = self.open(support.TESTFN, "w+", encoding="utf8")
02018         p0 = f.tell()
02019         f.write("\xff\n")
02020         p1 = f.tell()
02021         f.write("\xff\n")
02022         p2 = f.tell()
02023         f.seek(0)
02024         self.assertEqual(f.tell(), p0)
02025         self.assertEqual(f.readline(), "\xff\n")
02026         self.assertEqual(f.tell(), p1)
02027         self.assertEqual(f.readline(), "\xff\n")
02028         self.assertEqual(f.tell(), p2)
02029         f.seek(0)
02030         for line in f:
02031             self.assertEqual(line, "\xff\n")
02032             self.assertRaises(IOError, f.tell)
02033         self.assertEqual(f.tell(), p2)
02034         f.close()
02035 
02036     def test_seeking(self):
02037         chunk_size = _default_chunk_size()
02038         prefix_size = chunk_size - 2
02039         u_prefix = "a" * prefix_size
02040         prefix = bytes(u_prefix.encode("utf-8"))
02041         self.assertEqual(len(u_prefix), len(prefix))
02042         u_suffix = "\u8888\n"
02043         suffix = bytes(u_suffix.encode("utf-8"))
02044         line = prefix + suffix
02045         with self.open(support.TESTFN, "wb") as f:
02046             f.write(line*2)
02047         with self.open(support.TESTFN, "r", encoding="utf-8") as f:
02048             s = f.read(prefix_size)
02049             self.assertEqual(s, str(prefix, "ascii"))
02050             self.assertEqual(f.tell(), prefix_size)
02051             self.assertEqual(f.readline(), u_suffix)
02052 
02053     def test_seeking_too(self):
02054         # Regression test for a specific bug
02055         data = b'\xe0\xbf\xbf\n'
02056         with self.open(support.TESTFN, "wb") as f:
02057             f.write(data)
02058         with self.open(support.TESTFN, "r", encoding="utf-8") as f:
02059             f._CHUNK_SIZE  # Just test that it exists
02060             f._CHUNK_SIZE = 2
02061             f.readline()
02062             f.tell()
02063 
02064     def test_seek_and_tell(self):
02065         #Test seek/tell using the StatefulIncrementalDecoder.
02066         # Make test faster by doing smaller seeks
02067         CHUNK_SIZE = 128
02068 
02069         def test_seek_and_tell_with_data(data, min_pos=0):
02070             """Tell/seek to various points within a data stream and ensure
02071             that the decoded data returned by read() is consistent."""
02072             f = self.open(support.TESTFN, 'wb')
02073             f.write(data)
02074             f.close()
02075             f = self.open(support.TESTFN, encoding='test_decoder')
02076             f._CHUNK_SIZE = CHUNK_SIZE
02077             decoded = f.read()
02078             f.close()
02079 
02080             for i in range(min_pos, len(decoded) + 1): # seek positions
02081                 for j in [1, 5, len(decoded) - i]: # read lengths
02082                     f = self.open(support.TESTFN, encoding='test_decoder')
02083                     self.assertEqual(f.read(i), decoded[:i])
02084                     cookie = f.tell()
02085                     self.assertEqual(f.read(j), decoded[i:i + j])
02086                     f.seek(cookie)
02087                     self.assertEqual(f.read(), decoded[i:])
02088                     f.close()
02089 
02090         # Enable the test decoder.
02091         StatefulIncrementalDecoder.codecEnabled = 1
02092 
02093         # Run the tests.
02094         try:
02095             # Try each test case.
02096             for input, _, _ in StatefulIncrementalDecoderTest.test_cases:
02097                 test_seek_and_tell_with_data(input)
02098 
02099             # Position each test case so that it crosses a chunk boundary.
02100             for input, _, _ in StatefulIncrementalDecoderTest.test_cases:
02101                 offset = CHUNK_SIZE - len(input)//2
02102                 prefix = b'.'*offset
02103                 # Don't bother seeking into the prefix (takes too long).
02104                 min_pos = offset*2
02105                 test_seek_and_tell_with_data(prefix + input, min_pos)
02106 
02107         # Ensure our test decoder won't interfere with subsequent tests.
02108         finally:
02109             StatefulIncrementalDecoder.codecEnabled = 0
02110 
02111     def test_encoded_writes(self):
02112         data = "1234567890"
02113         tests = ("utf-16",
02114                  "utf-16-le",
02115                  "utf-16-be",
02116                  "utf-32",
02117                  "utf-32-le",
02118                  "utf-32-be")
02119         for encoding in tests:
02120             buf = self.BytesIO()
02121             f = self.TextIOWrapper(buf, encoding=encoding)
02122             # Check if the BOM is written only once (see issue1753).
02123             f.write(data)
02124             f.write(data)
02125             f.seek(0)
02126             self.assertEqual(f.read(), data * 2)
02127             f.seek(0)
02128             self.assertEqual(f.read(), data * 2)
02129             self.assertEqual(buf.getvalue(), (data * 2).encode(encoding))
02130 
02131     def test_unreadable(self):
02132         class UnReadable(self.BytesIO):
02133             def readable(self):
02134                 return False
02135         txt = self.TextIOWrapper(UnReadable())
02136         self.assertRaises(IOError, txt.read)
02137 
02138     def test_read_one_by_one(self):
02139         txt = self.TextIOWrapper(self.BytesIO(b"AA\r\nBB"))
02140         reads = ""
02141         while True:
02142             c = txt.read(1)
02143             if not c:
02144                 break
02145             reads += c
02146         self.assertEqual(reads, "AA\nBB")
02147 
02148     def test_readlines(self):
02149         txt = self.TextIOWrapper(self.BytesIO(b"AA\nBB\nCC"))
02150         self.assertEqual(txt.readlines(), ["AA\n", "BB\n", "CC"])
02151         txt.seek(0)
02152         self.assertEqual(txt.readlines(None), ["AA\n", "BB\n", "CC"])
02153         txt.seek(0)
02154         self.assertEqual(txt.readlines(5), ["AA\n", "BB\n"])
02155 
02156     # read in amounts equal to TextIOWrapper._CHUNK_SIZE which is 128.
02157     def test_read_by_chunk(self):
02158         # make sure "\r\n" straddles 128 char boundary.
02159         txt = self.TextIOWrapper(self.BytesIO(b"A" * 127 + b"\r\nB"))
02160         reads = ""
02161         while True:
02162             c = txt.read(128)
02163             if not c:
02164                 break
02165             reads += c
02166         self.assertEqual(reads, "A"*127+"\nB")
02167 
02168     def test_issue1395_1(self):
02169         txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
02170 
02171         # read one char at a time
02172         reads = ""
02173         while True:
02174             c = txt.read(1)
02175             if not c:
02176                 break
02177             reads += c
02178         self.assertEqual(reads, self.normalized)
02179 
02180     def test_issue1395_2(self):
02181         txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
02182         txt._CHUNK_SIZE = 4
02183 
02184         reads = ""
02185         while True:
02186             c = txt.read(4)
02187             if not c:
02188                 break
02189             reads += c
02190         self.assertEqual(reads, self.normalized)
02191 
02192     def test_issue1395_3(self):
02193         txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
02194         txt._CHUNK_SIZE = 4
02195 
02196         reads = txt.read(4)
02197         reads += txt.read(4)
02198         reads += txt.readline()
02199         reads += txt.readline()
02200         reads += txt.readline()
02201         self.assertEqual(reads, self.normalized)
02202 
02203     def test_issue1395_4(self):
02204         txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
02205         txt._CHUNK_SIZE = 4
02206 
02207         reads = txt.read(4)
02208         reads += txt.read()
02209         self.assertEqual(reads, self.normalized)
02210 
02211     def test_issue1395_5(self):
02212         txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
02213         txt._CHUNK_SIZE = 4
02214 
02215         reads = txt.read(4)
02216         pos = txt.tell()
02217         txt.seek(0)
02218         txt.seek(pos)
02219         self.assertEqual(txt.read(4), "BBB\n")
02220 
02221     def test_issue2282(self):
02222         buffer = self.BytesIO(self.testdata)
02223         txt = self.TextIOWrapper(buffer, encoding="ascii")
02224 
02225         self.assertEqual(buffer.seekable(), txt.seekable())
02226 
02227     def test_append_bom(self):
02228         # The BOM is not written again when appending to a non-empty file
02229         filename = support.TESTFN
02230         for charset in ('utf-8-sig', 'utf-16', 'utf-32'):
02231             with self.open(filename, 'w', encoding=charset) as f:
02232                 f.write('aaa')
02233                 pos = f.tell()
02234             with self.open(filename, 'rb') as f:
02235                 self.assertEqual(f.read(), 'aaa'.encode(charset))
02236 
02237             with self.open(filename, 'a', encoding=charset) as f:
02238                 f.write('xxx')
02239             with self.open(filename, 'rb') as f:
02240                 self.assertEqual(f.read(), 'aaaxxx'.encode(charset))
02241 
02242     def test_seek_bom(self):
02243         # Same test, but when seeking manually
02244         filename = support.TESTFN
02245         for charset in ('utf-8-sig', 'utf-16', 'utf-32'):
02246             with self.open(filename, 'w', encoding=charset) as f:
02247                 f.write('aaa')
02248                 pos = f.tell()
02249             with self.open(filename, 'r+', encoding=charset) as f:
02250                 f.seek(pos)
02251                 f.write('zzz')
02252                 f.seek(0)
02253                 f.write('bbb')
02254             with self.open(filename, 'rb') as f:
02255                 self.assertEqual(f.read(), 'bbbzzz'.encode(charset))
02256 
02257     def test_errors_property(self):
02258         with self.open(support.TESTFN, "w") as f:
02259             self.assertEqual(f.errors, "strict")
02260         with self.open(support.TESTFN, "w", errors="replace") as f:
02261             self.assertEqual(f.errors, "replace")
02262 
02263     @unittest.skipUnless(threading, 'Threading required for this test.')
02264     def test_threads_write(self):
02265         # Issue6750: concurrent writes could duplicate data
02266         event = threading.Event()
02267         with self.open(support.TESTFN, "w", buffering=1) as f:
02268             def run(n):
02269                 text = "Thread%03d\n" % n
02270                 event.wait()
02271                 f.write(text)
02272             threads = [threading.Thread(target=lambda n=x: run(n))
02273                        for x in range(20)]
02274             for t in threads:
02275                 t.start()
02276             time.sleep(0.02)
02277             event.set()
02278             for t in threads:
02279                 t.join()
02280         with self.open(support.TESTFN) as f:
02281             content = f.read()
02282             for n in range(20):
02283                 self.assertEqual(content.count("Thread%03d\n" % n), 1)
02284 
02285     def test_flush_error_on_close(self):
02286         txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
02287         def bad_flush():
02288             raise IOError()
02289         txt.flush = bad_flush
02290         self.assertRaises(IOError, txt.close) # exception not swallowed
02291 
02292     def test_multi_close(self):
02293         txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
02294         txt.close()
02295         txt.close()
02296         txt.close()
02297         self.assertRaises(ValueError, txt.flush)
02298 
02299     def test_unseekable(self):
02300         txt = self.TextIOWrapper(self.MockUnseekableIO(self.testdata))
02301         self.assertRaises(self.UnsupportedOperation, txt.tell)
02302         self.assertRaises(self.UnsupportedOperation, txt.seek, 0)
02303 
02304     def test_readonly_attributes(self):
02305         txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
02306         buf = self.BytesIO(self.testdata)
02307         with self.assertRaises(AttributeError):
02308             txt.buffer = buf
02309 
02310     def test_rawio(self):
02311         # Issue #12591: TextIOWrapper must work with raw I/O objects, so
02312         # that subprocess.Popen() can have the required unbuffered
02313         # semantics with universal_newlines=True.
02314         raw = self.MockRawIO([b'abc', b'def', b'ghi\njkl\nopq\n'])
02315         txt = self.TextIOWrapper(raw, encoding='ascii', newline='\n')
02316         # Reads
02317         self.assertEqual(txt.read(4), 'abcd')
02318         self.assertEqual(txt.readline(), 'efghi\n')
02319         self.assertEqual(list(txt), ['jkl\n', 'opq\n'])
02320 
02321     def test_rawio_write_through(self):
02322         # Issue #12591: with write_through=True, writes don't need a flush
02323         raw = self.MockRawIO([b'abc', b'def', b'ghi\njkl\nopq\n'])
02324         txt = self.TextIOWrapper(raw, encoding='ascii', newline='\n',
02325                                  write_through=True)
02326         txt.write('1')
02327         txt.write('23\n4')
02328         txt.write('5')
02329         self.assertEqual(b''.join(raw._write_stack), b'123\n45')
02330 
02331 class CTextIOWrapperTest(TextIOWrapperTest):
02332 
02333     def test_initialization(self):
02334         r = self.BytesIO(b"\xc3\xa9\n\n")
02335         b = self.BufferedReader(r, 1000)
02336         t = self.TextIOWrapper(b)
02337         self.assertRaises(TypeError, t.__init__, b, newline=42)
02338         self.assertRaises(ValueError, t.read)
02339         self.assertRaises(ValueError, t.__init__, b, newline='xyzzy')
02340         self.assertRaises(ValueError, t.read)
02341 
02342     def test_garbage_collection(self):
02343         # C TextIOWrapper objects are collected, and collecting them flushes
02344         # all data to disk.
02345         # The Python version has __del__, so it ends in gc.garbage instead.
02346         rawio = io.FileIO(support.TESTFN, "wb")
02347         b = self.BufferedWriter(rawio)
02348         t = self.TextIOWrapper(b, encoding="ascii")
02349         t.write("456def")
02350         t.x = t
02351         wr = weakref.ref(t)
02352         del t
02353         support.gc_collect()
02354         self.assertTrue(wr() is None, wr)
02355         with self.open(support.TESTFN, "rb") as f:
02356             self.assertEqual(f.read(), b"456def")
02357 
02358 class PyTextIOWrapperTest(TextIOWrapperTest):
02359     pass
02360 
02361 
02362 class IncrementalNewlineDecoderTest(unittest.TestCase):
02363 
02364     def check_newline_decoding_utf8(self, decoder):
02365         # UTF-8 specific tests for a newline decoder
02366         def _check_decode(b, s, **kwargs):
02367             # We exercise getstate() / setstate() as well as decode()
02368             state = decoder.getstate()
02369             self.assertEqual(decoder.decode(b, **kwargs), s)
02370             decoder.setstate(state)
02371             self.assertEqual(decoder.decode(b, **kwargs), s)
02372 
02373         _check_decode(b'\xe8\xa2\x88', "\u8888")
02374 
02375         _check_decode(b'\xe8', "")
02376         _check_decode(b'\xa2', "")
02377         _check_decode(b'\x88', "\u8888")
02378 
02379         _check_decode(b'\xe8', "")
02380         _check_decode(b'\xa2', "")
02381         _check_decode(b'\x88', "\u8888")
02382 
02383         _check_decode(b'\xe8', "")
02384         self.assertRaises(UnicodeDecodeError, decoder.decode, b'', final=True)
02385 
02386         decoder.reset()
02387         _check_decode(b'\n', "\n")
02388         _check_decode(b'\r', "")
02389         _check_decode(b'', "\n", final=True)
02390         _check_decode(b'\r', "\n", final=True)
02391 
02392         _check_decode(b'\r', "")
02393         _check_decode(b'a', "\na")
02394 
02395         _check_decode(b'\r\r\n', "\n\n")
02396         _check_decode(b'\r', "")
02397         _check_decode(b'\r', "\n")
02398         _check_decode(b'\na', "\na")
02399 
02400         _check_decode(b'\xe8\xa2\x88\r\n', "\u8888\n")
02401         _check_decode(b'\xe8\xa2\x88', "\u8888")
02402         _check_decode(b'\n', "\n")
02403         _check_decode(b'\xe8\xa2\x88\r', "\u8888")
02404         _check_decode(b'\n', "\n")
02405 
02406     def check_newline_decoding(self, decoder, encoding):
02407         result = []
02408         if encoding is not None:
02409             encoder = codecs.getincrementalencoder(encoding)()
02410             def _decode_bytewise(s):
02411                 # Decode one byte at a time
02412                 for b in encoder.encode(s):
02413                     result.append(decoder.decode(bytes([b])))
02414         else:
02415             encoder = None
02416             def _decode_bytewise(s):
02417                 # Decode one char at a time
02418                 for c in s:
02419                     result.append(decoder.decode(c))
02420         self.assertEqual(decoder.newlines, None)
02421         _decode_bytewise("abc\n\r")
02422         self.assertEqual(decoder.newlines, '\n')
02423         _decode_bytewise("\nabc")
02424         self.assertEqual(decoder.newlines, ('\n', '\r\n'))
02425         _decode_bytewise("abc\r")
02426         self.assertEqual(decoder.newlines, ('\n', '\r\n'))
02427         _decode_bytewise("abc")
02428         self.assertEqual(decoder.newlines, ('\r', '\n', '\r\n'))
02429         _decode_bytewise("abc\r")
02430         self.assertEqual("".join(result), "abc\n\nabcabc\nabcabc")
02431         decoder.reset()
02432         input = "abc"
02433         if encoder is not None:
02434             encoder.reset()
02435             input = encoder.encode(input)
02436         self.assertEqual(decoder.decode(input), "abc")
02437         self.assertEqual(decoder.newlines, None)
02438 
02439     def test_newline_decoder(self):
02440         encodings = (
02441             # None meaning the IncrementalNewlineDecoder takes unicode input
02442             # rather than bytes input
02443             None, 'utf-8', 'latin-1',
02444             'utf-16', 'utf-16-le', 'utf-16-be',
02445             'utf-32', 'utf-32-le', 'utf-32-be',
02446         )
02447         for enc in encodings:
02448             decoder = enc and codecs.getincrementaldecoder(enc)()
02449             decoder = self.IncrementalNewlineDecoder(decoder, translate=True)
02450             self.check_newline_decoding(decoder, enc)
02451         decoder = codecs.getincrementaldecoder("utf-8")()
02452         decoder = self.IncrementalNewlineDecoder(decoder, translate=True)
02453         self.check_newline_decoding_utf8(decoder)
02454 
02455     def test_newline_bytes(self):
02456         # Issue 5433: Excessive optimization in IncrementalNewlineDecoder
02457         def _check(dec):
02458             self.assertEqual(dec.newlines, None)
02459             self.assertEqual(dec.decode("\u0D00"), "\u0D00")
02460             self.assertEqual(dec.newlines, None)
02461             self.assertEqual(dec.decode("\u0A00"), "\u0A00")
02462             self.assertEqual(dec.newlines, None)
02463         dec = self.IncrementalNewlineDecoder(None, translate=False)
02464         _check(dec)
02465         dec = self.IncrementalNewlineDecoder(None, translate=True)
02466         _check(dec)
02467 
02468 class CIncrementalNewlineDecoderTest(IncrementalNewlineDecoderTest):
02469     pass
02470 
02471 class PyIncrementalNewlineDecoderTest(IncrementalNewlineDecoderTest):
02472     pass
02473 
02474 
02475 # XXX Tests for open()
02476 
02477 class MiscIOTest(unittest.TestCase):
02478 
02479     def tearDown(self):
02480         support.unlink(support.TESTFN)
02481 
02482     def test___all__(self):
02483         for name in self.io.__all__:
02484             obj = getattr(self.io, name, None)
02485             self.assertTrue(obj is not None, name)
02486             if name == "open":
02487                 continue
02488             elif "error" in name.lower() or name == "UnsupportedOperation":
02489                 self.assertTrue(issubclass(obj, Exception), name)
02490             elif not name.startswith("SEEK_"):
02491                 self.assertTrue(issubclass(obj, self.IOBase))
02492 
02493     def test_attributes(self):
02494         f = self.open(support.TESTFN, "wb", buffering=0)
02495         self.assertEqual(f.mode, "wb")
02496         f.close()
02497 
02498         f = self.open(support.TESTFN, "U")
02499         self.assertEqual(f.name,            support.TESTFN)
02500         self.assertEqual(f.buffer.name,     support.TESTFN)
02501         self.assertEqual(f.buffer.raw.name, support.TESTFN)
02502         self.assertEqual(f.mode,            "U")
02503         self.assertEqual(f.buffer.mode,     "rb")
02504         self.assertEqual(f.buffer.raw.mode, "rb")
02505         f.close()
02506 
02507         f = self.open(support.TESTFN, "w+")
02508         self.assertEqual(f.mode,            "w+")
02509         self.assertEqual(f.buffer.mode,     "rb+") # Does it really matter?
02510         self.assertEqual(f.buffer.raw.mode, "rb+")
02511 
02512         g = self.open(f.fileno(), "wb", closefd=False)
02513         self.assertEqual(g.mode,     "wb")
02514         self.assertEqual(g.raw.mode, "wb")
02515         self.assertEqual(g.name,     f.fileno())
02516         self.assertEqual(g.raw.name, f.fileno())
02517         f.close()
02518         g.close()
02519 
02520     def test_io_after_close(self):
02521         for kwargs in [
02522                 {"mode": "w"},
02523                 {"mode": "wb"},
02524                 {"mode": "w", "buffering": 1},
02525                 {"mode": "w", "buffering": 2},
02526                 {"mode": "wb", "buffering": 0},
02527                 {"mode": "r"},
02528                 {"mode": "rb"},
02529                 {"mode": "r", "buffering": 1},
02530                 {"mode": "r", "buffering": 2},
02531                 {"mode": "rb", "buffering": 0},
02532                 {"mode": "w+"},
02533                 {"mode": "w+b"},
02534                 {"mode": "w+", "buffering": 1},
02535                 {"mode": "w+", "buffering": 2},
02536                 {"mode": "w+b", "buffering": 0},
02537             ]:
02538             f = self.open(support.TESTFN, **kwargs)
02539             f.close()
02540             self.assertRaises(ValueError, f.flush)
02541             self.assertRaises(ValueError, f.fileno)
02542             self.assertRaises(ValueError, f.isatty)
02543             self.assertRaises(ValueError, f.__iter__)
02544             if hasattr(f, "peek"):
02545                 self.assertRaises(ValueError, f.peek, 1)
02546             self.assertRaises(ValueError, f.read)
02547             if hasattr(f, "read1"):
02548                 self.assertRaises(ValueError, f.read1, 1024)
02549             if hasattr(f, "readall"):
02550                 self.assertRaises(ValueError, f.readall)
02551             if hasattr(f, "readinto"):
02552                 self.assertRaises(ValueError, f.readinto, bytearray(1024))
02553             self.assertRaises(ValueError, f.readline)
02554             self.assertRaises(ValueError, f.readlines)
02555             self.assertRaises(ValueError, f.seek, 0)
02556             self.assertRaises(ValueError, f.tell)
02557             self.assertRaises(ValueError, f.truncate)
02558             self.assertRaises(ValueError, f.write,
02559                               b"" if "b" in kwargs['mode'] else "")
02560             self.assertRaises(ValueError, f.writelines, [])
02561             self.assertRaises(ValueError, next, f)
02562 
02563     def test_blockingioerror(self):
02564         # Various BlockingIOError issues
02565         self.assertRaises(TypeError, self.BlockingIOError)
02566         self.assertRaises(TypeError, self.BlockingIOError, 1)
02567         self.assertRaises(TypeError, self.BlockingIOError, 1, 2, 3, 4)
02568         self.assertRaises(TypeError, self.BlockingIOError, 1, "", None)
02569         b = self.BlockingIOError(1, "")
02570         self.assertEqual(b.characters_written, 0)
02571         class C(str):
02572             pass
02573         c = C("")
02574         b = self.BlockingIOError(1, c)
02575         c.b = b
02576         b.c = c
02577         wr = weakref.ref(c)
02578         del c, b
02579         support.gc_collect()
02580         self.assertTrue(wr() is None, wr)
02581 
02582     def test_abcs(self):
02583         # Test the visible base classes are ABCs.
02584         self.assertIsInstance(self.IOBase, abc.ABCMeta)
02585         self.assertIsInstance(self.RawIOBase, abc.ABCMeta)
02586         self.assertIsInstance(self.BufferedIOBase, abc.ABCMeta)
02587         self.assertIsInstance(self.TextIOBase, abc.ABCMeta)
02588 
02589     def _check_abc_inheritance(self, abcmodule):
02590         with self.open(support.TESTFN, "wb", buffering=0) as f:
02591             self.assertIsInstance(f, abcmodule.IOBase)
02592             self.assertIsInstance(f, abcmodule.RawIOBase)
02593             self.assertNotIsInstance(f, abcmodule.BufferedIOBase)
02594             self.assertNotIsInstance(f, abcmodule.TextIOBase)
02595         with self.open(support.TESTFN, "wb") as f:
02596             self.assertIsInstance(f, abcmodule.IOBase)
02597             self.assertNotIsInstance(f, abcmodule.RawIOBase)
02598             self.assertIsInstance(f, abcmodule.BufferedIOBase)
02599             self.assertNotIsInstance(f, abcmodule.TextIOBase)
02600         with self.open(support.TESTFN, "w") as f:
02601             self.assertIsInstance(f, abcmodule.IOBase)
02602             self.assertNotIsInstance(f, abcmodule.RawIOBase)
02603             self.assertNotIsInstance(f, abcmodule.BufferedIOBase)
02604             self.assertIsInstance(f, abcmodule.TextIOBase)
02605 
02606     def test_abc_inheritance(self):
02607         # Test implementations inherit from their respective ABCs
02608         self._check_abc_inheritance(self)
02609 
02610     def test_abc_inheritance_official(self):
02611         # Test implementations inherit from the official ABCs of the
02612         # baseline "io" module.
02613         self._check_abc_inheritance(io)
02614 
02615     def _check_warn_on_dealloc(self, *args, **kwargs):
02616         f = open(*args, **kwargs)
02617         r = repr(f)
02618         with self.assertWarns(ResourceWarning) as cm:
02619             f = None
02620             support.gc_collect()
02621         self.assertIn(r, str(cm.warning.args[0]))
02622 
02623     def test_warn_on_dealloc(self):
02624         self._check_warn_on_dealloc(support.TESTFN, "wb", buffering=0)
02625         self._check_warn_on_dealloc(support.TESTFN, "wb")
02626         self._check_warn_on_dealloc(support.TESTFN, "w")
02627 
02628     def _check_warn_on_dealloc_fd(self, *args, **kwargs):
02629         fds = []
02630         def cleanup_fds():
02631             for fd in fds:
02632                 try:
02633                     os.close(fd)
02634                 except EnvironmentError as e:
02635                     if e.errno != errno.EBADF:
02636                         raise
02637         self.addCleanup(cleanup_fds)
02638         r, w = os.pipe()
02639         fds += r, w
02640         self._check_warn_on_dealloc(r, *args, **kwargs)
02641         # When using closefd=False, there's no warning
02642         r, w = os.pipe()
02643         fds += r, w
02644         with warnings.catch_warnings(record=True) as recorded:
02645             open(r, *args, closefd=False, **kwargs)
02646             support.gc_collect()
02647         self.assertEqual(recorded, [])
02648 
02649     def test_warn_on_dealloc_fd(self):
02650         self._check_warn_on_dealloc_fd("rb", buffering=0)
02651         self._check_warn_on_dealloc_fd("rb")
02652         self._check_warn_on_dealloc_fd("r")
02653 
02654 
02655     def test_pickling(self):
02656         # Pickling file objects is forbidden
02657         for kwargs in [
02658                 {"mode": "w"},
02659                 {"mode": "wb"},
02660                 {"mode": "wb", "buffering": 0},
02661                 {"mode": "r"},
02662                 {"mode": "rb"},
02663                 {"mode": "rb", "buffering": 0},
02664                 {"mode": "w+"},
02665                 {"mode": "w+b"},
02666                 {"mode": "w+b", "buffering": 0},
02667             ]:
02668             for protocol in range(pickle.HIGHEST_PROTOCOL + 1):
02669                 with self.open(support.TESTFN, **kwargs) as f:
02670                     self.assertRaises(TypeError, pickle.dumps, f, protocol)
02671 
02672 class CMiscIOTest(MiscIOTest):
02673     io = io
02674 
02675 class PyMiscIOTest(MiscIOTest):
02676     io = pyio
02677 
02678 
02679 @unittest.skipIf(os.name == 'nt', 'POSIX signals required for this test.')
02680 class SignalsTest(unittest.TestCase):
02681 
02682     def setUp(self):
02683         self.oldalrm = signal.signal(signal.SIGALRM, self.alarm_interrupt)
02684 
02685     def tearDown(self):
02686         signal.signal(signal.SIGALRM, self.oldalrm)
02687 
02688     def alarm_interrupt(self, sig, frame):
02689         1/0
02690 
02691     @unittest.skipUnless(threading, 'Threading required for this test.')
02692     @unittest.skipIf(sys.platform in ('freebsd5', 'freebsd6', 'freebsd7'),
02693                      'issue #12429: skip test on FreeBSD <= 7')
02694     def check_interrupted_write(self, item, bytes, **fdopen_kwargs):
02695         """Check that a partial write, when it gets interrupted, properly
02696         invokes the signal handler, and bubbles up the exception raised
02697         in the latter."""
02698         read_results = []
02699         def _read():
02700             s = os.read(r, 1)
02701             read_results.append(s)
02702         t = threading.Thread(target=_read)
02703         t.daemon = True
02704         r, w = os.pipe()
02705         fdopen_kwargs["closefd"] = False
02706         try:
02707             wio = self.io.open(w, **fdopen_kwargs)
02708             t.start()
02709             signal.alarm(1)
02710             # Fill the pipe enough that the write will be blocking.
02711             # It will be interrupted by the timer armed above.  Since the
02712             # other thread has read one byte, the low-level write will
02713             # return with a successful (partial) result rather than an EINTR.
02714             # The buffered IO layer must check for pending signal
02715             # handlers, which in this case will invoke alarm_interrupt().
02716             self.assertRaises(ZeroDivisionError,
02717                               wio.write, item * (1024 * 1024))
02718             t.join()
02719             # We got one byte, get another one and check that it isn't a
02720             # repeat of the first one.
02721             read_results.append(os.read(r, 1))
02722             self.assertEqual(read_results, [bytes[0:1], bytes[1:2]])
02723         finally:
02724             os.close(w)
02725             os.close(r)
02726             # This is deliberate. If we didn't close the file descriptor
02727             # before closing wio, wio would try to flush its internal
02728             # buffer, and block again.
02729             try:
02730                 wio.close()
02731             except IOError as e:
02732                 if e.errno != errno.EBADF:
02733                     raise
02734 
02735     def test_interrupted_write_unbuffered(self):
02736         self.check_interrupted_write(b"xy", b"xy", mode="wb", buffering=0)
02737 
02738     def test_interrupted_write_buffered(self):
02739         self.check_interrupted_write(b"xy", b"xy", mode="wb")
02740 
02741     def test_interrupted_write_text(self):
02742         self.check_interrupted_write("xy", b"xy", mode="w", encoding="ascii")
02743 
02744     def check_reentrant_write(self, data, **fdopen_kwargs):
02745         def on_alarm(*args):
02746             # Will be called reentrantly from the same thread
02747             wio.write(data)
02748             1/0
02749         signal.signal(signal.SIGALRM, on_alarm)
02750         r, w = os.pipe()
02751         wio = self.io.open(w, **fdopen_kwargs)
02752         try:
02753             signal.alarm(1)
02754             # Either the reentrant call to wio.write() fails with RuntimeError,
02755             # or the signal handler raises ZeroDivisionError.
02756             with self.assertRaises((ZeroDivisionError, RuntimeError)) as cm:
02757                 while 1:
02758                     for i in range(100):
02759                         wio.write(data)
02760                         wio.flush()
02761                     # Make sure the buffer doesn't fill up and block further writes
02762                     os.read(r, len(data) * 100)
02763             exc = cm.exception
02764             if isinstance(exc, RuntimeError):
02765                 self.assertTrue(str(exc).startswith("reentrant call"), str(exc))
02766         finally:
02767             wio.close()
02768             os.close(r)
02769 
02770     def test_reentrant_write_buffered(self):
02771         self.check_reentrant_write(b"xy", mode="wb")
02772 
02773     def test_reentrant_write_text(self):
02774         self.check_reentrant_write("xy", mode="w", encoding="ascii")
02775 
02776     def check_interrupted_read_retry(self, decode, **fdopen_kwargs):
02777         """Check that a buffered read, when it gets interrupted (either
02778         returning a partial result or EINTR), properly invokes the signal
02779         handler and retries if the latter returned successfully."""
02780         r, w = os.pipe()
02781         fdopen_kwargs["closefd"] = False
02782         def alarm_handler(sig, frame):
02783             os.write(w, b"bar")
02784         signal.signal(signal.SIGALRM, alarm_handler)
02785         try:
02786             rio = self.io.open(r, **fdopen_kwargs)
02787             os.write(w, b"foo")
02788             signal.alarm(1)
02789             # Expected behaviour:
02790             # - first raw read() returns partial b"foo"
02791             # - second raw read() returns EINTR
02792             # - third raw read() returns b"bar"
02793             self.assertEqual(decode(rio.read(6)), "foobar")
02794         finally:
02795             rio.close()
02796             os.close(w)
02797             os.close(r)
02798 
02799     def test_interrupterd_read_retry_buffered(self):
02800         self.check_interrupted_read_retry(lambda x: x.decode('latin1'),
02801                                           mode="rb")
02802 
02803     def test_interrupterd_read_retry_text(self):
02804         self.check_interrupted_read_retry(lambda x: x,
02805                                           mode="r")
02806 
02807     @unittest.skipUnless(threading, 'Threading required for this test.')
02808     def check_interrupted_write_retry(self, item, **fdopen_kwargs):
02809         """Check that a buffered write, when it gets interrupted (either
02810         returning a partial result or EINTR), properly invokes the signal
02811         handler and retries if the latter returned successfully."""
02812         select = support.import_module("select")
02813         # A quantity that exceeds the buffer size of an anonymous pipe's
02814         # write end.
02815         N = 1024 * 1024
02816         r, w = os.pipe()
02817         fdopen_kwargs["closefd"] = False
02818         # We need a separate thread to read from the pipe and allow the
02819         # write() to finish.  This thread is started after the SIGALRM is
02820         # received (forcing a first EINTR in write()).
02821         read_results = []
02822         write_finished = False
02823         def _read():
02824             while not write_finished:
02825                 while r in select.select([r], [], [], 1.0)[0]:
02826                     s = os.read(r, 1024)
02827                     read_results.append(s)
02828         t = threading.Thread(target=_read)
02829         t.daemon = True
02830         def alarm1(sig, frame):
02831             signal.signal(signal.SIGALRM, alarm2)
02832             signal.alarm(1)
02833         def alarm2(sig, frame):
02834             t.start()
02835         signal.signal(signal.SIGALRM, alarm1)
02836         try:
02837             wio = self.io.open(w, **fdopen_kwargs)
02838             signal.alarm(1)
02839             # Expected behaviour:
02840             # - first raw write() is partial (because of the limited pipe buffer
02841             #   and the first alarm)
02842             # - second raw write() returns EINTR (because of the second alarm)
02843             # - subsequent write()s are successful (either partial or complete)
02844             self.assertEqual(N, wio.write(item * N))
02845             wio.flush()
02846             write_finished = True
02847             t.join()
02848             self.assertEqual(N, sum(len(x) for x in read_results))
02849         finally:
02850             write_finished = True
02851             os.close(w)
02852             os.close(r)
02853             # This is deliberate. If we didn't close the file descriptor
02854             # before closing wio, wio would try to flush its internal
02855             # buffer, and could block (in case of failure).
02856             try:
02857                 wio.close()
02858             except IOError as e:
02859                 if e.errno != errno.EBADF:
02860                     raise
02861 
02862     def test_interrupterd_write_retry_buffered(self):
02863         self.check_interrupted_write_retry(b"x", mode="wb")
02864 
02865     def test_interrupterd_write_retry_text(self):
02866         self.check_interrupted_write_retry("x", mode="w", encoding="latin1")
02867 
02868 
02869 class CSignalsTest(SignalsTest):
02870     io = io
02871 
02872 class PySignalsTest(SignalsTest):
02873     io = pyio
02874 
02875     # Handling reentrancy issues would slow down _pyio even more, so the
02876     # tests are disabled.
02877     test_reentrant_write_buffered = None
02878     test_reentrant_write_text = None
02879 
02880 
02881 def test_main():
02882     tests = (CIOTest, PyIOTest,
02883              CBufferedReaderTest, PyBufferedReaderTest,
02884              CBufferedWriterTest, PyBufferedWriterTest,
02885              CBufferedRWPairTest, PyBufferedRWPairTest,
02886              CBufferedRandomTest, PyBufferedRandomTest,
02887              StatefulIncrementalDecoderTest,
02888              CIncrementalNewlineDecoderTest, PyIncrementalNewlineDecoderTest,
02889              CTextIOWrapperTest, PyTextIOWrapperTest,
02890              CMiscIOTest, PyMiscIOTest,
02891              CSignalsTest, PySignalsTest,
02892              )
02893 
02894     # Put the namespaces of the IO module we are testing and some useful mock
02895     # classes in the __dict__ of each test.
02896     mocks = (MockRawIO, MisbehavedRawIO, MockFileIO, CloseFailureIO,
02897              MockNonBlockWriterIO, MockUnseekableIO, MockRawIOWithoutRead)
02898     all_members = io.__all__ + ["IncrementalNewlineDecoder"]
02899     c_io_ns = {name : getattr(io, name) for name in all_members}
02900     py_io_ns = {name : getattr(pyio, name) for name in all_members}
02901     globs = globals()
02902     c_io_ns.update((x.__name__, globs["C" + x.__name__]) for x in mocks)
02903     py_io_ns.update((x.__name__, globs["Py" + x.__name__]) for x in mocks)
02904     # Avoid turning open into a bound method.
02905     py_io_ns["open"] = pyio.OpenWrapper
02906     for test in tests:
02907         if test.__name__.startswith("C"):
02908             for name, obj in c_io_ns.items():
02909                 setattr(test, name, obj)
02910         elif test.__name__.startswith("Py"):
02911             for name, obj in py_io_ns.items():
02912                 setattr(test, name, obj)
02913 
02914     support.run_unittest(*tests)
02915 
02916 if __name__ == "__main__":
02917     test_main()