Back to index

apport  2.4
test_problem_report.py
Go to the documentation of this file.
00001 # vim: set encoding=UTF-8 fileencoding=UTF-8 :
00002 import unittest, tempfile, os, email, gzip, time, sys
00003 
00004 from io import BytesIO
00005 import problem_report
00006 
00007 bin_data = b'ABABABABAB\0\0\0Z\x01\x02'
00008 
00009 if sys.version < '3':
00010     # backwards compatibility shim
00011     email.message_from_binary_file = email.message_from_file
00012 
00013 
00014 class T(unittest.TestCase):
00015     def test_basic_operations(self):
00016         '''basic creation and operation.'''
00017 
00018         pr = problem_report.ProblemReport()
00019         pr['foo'] = 'bar'
00020         pr['bar'] = ' foo   bar\nbaz\n   blip  '
00021         pr['dash-key'] = '1'
00022         pr['dot.key'] = '1'
00023         pr['underscore_key'] = '1'
00024         self.assertEqual(pr['foo'], 'bar')
00025         self.assertEqual(pr['bar'], ' foo   bar\nbaz\n   blip  ')
00026         self.assertEqual(pr['ProblemType'], 'Crash')
00027         self.assertTrue(time.strptime(pr['Date']))
00028         self.assertEqual(pr['dash-key'], '1')
00029         self.assertEqual(pr['dot.key'], '1')
00030         self.assertEqual(pr['underscore_key'], '1')
00031 
00032     def test_ctor_arguments(self):
00033         '''non-default constructor arguments.'''
00034 
00035         pr = problem_report.ProblemReport('KernelCrash')
00036         self.assertEqual(pr['ProblemType'], 'KernelCrash')
00037         pr = problem_report.ProblemReport(date='19801224 12:34')
00038         self.assertEqual(pr['Date'], '19801224 12:34')
00039 
00040     def test_sanity_checks(self):
00041         '''various error conditions.'''
00042 
00043         pr = problem_report.ProblemReport()
00044         self.assertRaises(AssertionError, pr.__setitem__, 'a b', '1')
00045         self.assertRaises(AssertionError, pr.__setitem__, 'a', 1)
00046         self.assertRaises(AssertionError, pr.__setitem__, 'a', 1)
00047         self.assertRaises(AssertionError, pr.__setitem__, 'a', (1,))
00048         self.assertRaises(AssertionError, pr.__setitem__, 'a', ('/tmp/nonexistant', ''))
00049         self.assertRaises(KeyError, pr.__getitem__, 'Nonexistant')
00050 
00051     def test_compressed_values(self):
00052         '''handling of CompressedValue values.'''
00053 
00054         large_val = b'A' * 5000000
00055 
00056         pr = problem_report.ProblemReport()
00057         pr['Foo'] = problem_report.CompressedValue(b'FooFoo!')
00058         pr['Bin'] = problem_report.CompressedValue()
00059         pr['Bin'].set_value(bin_data)
00060         pr['Large'] = problem_report.CompressedValue(large_val)
00061 
00062         self.assertTrue(isinstance(pr['Foo'], problem_report.CompressedValue))
00063         self.assertTrue(isinstance(pr['Bin'], problem_report.CompressedValue))
00064         self.assertEqual(pr['Foo'].get_value(), b'FooFoo!')
00065         self.assertEqual(pr['Bin'].get_value(), bin_data)
00066         self.assertEqual(pr['Large'].get_value(), large_val)
00067         self.assertEqual(len(pr['Foo']), 7)
00068         self.assertEqual(len(pr['Bin']), len(bin_data))
00069         self.assertEqual(len(pr['Large']), len(large_val))
00070 
00071         io = BytesIO()
00072         pr['Bin'].write(io)
00073         self.assertEqual(io.getvalue(), bin_data)
00074         io = BytesIO()
00075         pr['Large'].write(io)
00076         self.assertEqual(io.getvalue(), large_val)
00077 
00078         pr['Multiline'] = problem_report.CompressedValue(b'\1\1\1\n\2\2\n\3\3\3')
00079         self.assertEqual(pr['Multiline'].splitlines(),
00080                          [b'\1\1\1', b'\2\2', b'\3\3\3'])
00081 
00082         # test writing of reports with CompressedValues
00083         io = BytesIO()
00084         pr.write(io)
00085         io.seek(0)
00086         pr = problem_report.ProblemReport()
00087         pr.load(io)
00088         self.assertEqual(pr['Foo'], 'FooFoo!')
00089         self.assertEqual(pr['Bin'], bin_data)
00090         self.assertEqual(pr['Large'], large_val.decode('ASCII'))
00091 
00092     def test_write(self):
00093         '''write() and proper formatting.'''
00094 
00095         pr = problem_report.ProblemReport(date='now!')
00096         pr['Simple'] = 'bar'
00097         if sys.version.startswith('2'):
00098             pr['SimpleUTF8'] = '1äö2Φ3'
00099             pr['SimpleUnicode'] = '1äö2Φ3'.decode('UTF-8')
00100             pr['TwoLineUnicode'] = 'pi-π\nnu-η'.decode('UTF-8')
00101             pr['TwoLineUTF8'] = 'pi-π\nnu-η'
00102         else:
00103             pr['SimpleUTF8'] = '1äö2Φ3'.encode('UTF-8')
00104             pr['SimpleUnicode'] = '1äö2Φ3'
00105             pr['TwoLineUnicode'] = 'pi-π\nnu-η'
00106             pr['TwoLineUTF8'] = 'pi-π\nnu-η'.encode('UTF-8')
00107         pr['WhiteSpace'] = ' foo   bar\nbaz\n  blip  \n\nafteremptyline'
00108         io = BytesIO()
00109         pr.write(io)
00110         expected = '''ProblemType: Crash
00111 Date: now!
00112 Simple: bar
00113 SimpleUTF8: 1äö2Φ3
00114 SimpleUnicode: 1äö2Φ3
00115 TwoLineUTF8:
00116  pi-π
00117  nu-η
00118 TwoLineUnicode:
00119  pi-π
00120  nu-η
00121 WhiteSpace:
00122   foo   bar
00123  baz
00124    blip  
00125  
00126  afteremptyline
00127 '''
00128         if sys.version > '3':
00129             expected = expected.encode('UTF-8')
00130         self.assertEqual(io.getvalue(), expected)
00131 
00132     def test_write_append(self):
00133         '''write() with appending to an existing file.'''
00134 
00135         pr = problem_report.ProblemReport(date='now!')
00136         pr['Simple'] = 'bar'
00137         pr['WhiteSpace'] = ' foo   bar\nbaz\n  blip  '
00138         io = BytesIO()
00139         pr.write(io)
00140 
00141         pr.clear()
00142         pr['Extra'] = 'appended'
00143         pr.write(io)
00144 
00145         self.assertEqual(io.getvalue(),
00146                          b'''ProblemType: Crash
00147 Date: now!
00148 Simple: bar
00149 WhiteSpace:
00150   foo   bar
00151  baz
00152    blip  
00153 Extra: appended
00154 ''')
00155 
00156         temp = tempfile.NamedTemporaryFile()
00157         temp.write(bin_data)
00158         temp.flush()
00159 
00160         pr = problem_report.ProblemReport(date='now!')
00161         pr['File'] = (temp.name,)
00162         io = BytesIO()
00163         pr.write(io)
00164         temp.close()
00165 
00166         pr.clear()
00167         pr['Extra'] = 'appended'
00168         pr.write(io)
00169 
00170         io.seek(0)
00171         pr = problem_report.ProblemReport()
00172         pr.load(io)
00173 
00174         self.assertEqual(pr['Date'], 'now!')
00175         self.assertEqual(pr['File'], bin_data)
00176         self.assertEqual(pr['Extra'], 'appended')
00177 
00178     def test_load(self):
00179         '''load() with various formatting.'''
00180 
00181         pr = problem_report.ProblemReport()
00182         pr.load(BytesIO(b'''ProblemType: Crash
00183 Date: now!
00184 Simple: bar
00185 WhiteSpace:
00186   foo   bar
00187  baz
00188    blip  
00189 '''))
00190         self.assertEqual(pr['ProblemType'], 'Crash')
00191         self.assertEqual(pr['Date'], 'now!')
00192         self.assertEqual(pr['Simple'], 'bar')
00193         self.assertEqual(pr['WhiteSpace'], ' foo   bar\nbaz\n  blip  ')
00194 
00195         # test last field a bit more
00196         pr.load(BytesIO(b'''ProblemType: Crash
00197 Date: now!
00198 Simple: bar
00199 WhiteSpace:
00200   foo   bar
00201  baz
00202    blip  
00203  
00204 '''))
00205         self.assertEqual(pr['ProblemType'], 'Crash')
00206         self.assertEqual(pr['Date'], 'now!')
00207         self.assertEqual(pr['Simple'], 'bar')
00208         self.assertEqual(pr['WhiteSpace'], ' foo   bar\nbaz\n  blip  \n')
00209 
00210         # last field might not be \n terminated
00211         pr.load(BytesIO(b'''ProblemType: Crash
00212 Date: now!
00213 Simple: bar
00214 WhiteSpace:
00215  foo
00216  bar'''))
00217         self.assertEqual(pr['ProblemType'], 'Crash')
00218         self.assertEqual(pr['Date'], 'now!')
00219         self.assertEqual(pr['Simple'], 'bar')
00220         self.assertEqual(pr['WhiteSpace'], 'foo\nbar')
00221 
00222         pr = problem_report.ProblemReport()
00223         pr.load(BytesIO(b'''ProblemType: Crash
00224 WhiteSpace:
00225   foo   bar
00226  baz
00227  
00228    blip  
00229 Last: foo
00230 '''))
00231         self.assertEqual(pr['WhiteSpace'], ' foo   bar\nbaz\n\n  blip  ')
00232         self.assertEqual(pr['Last'], 'foo')
00233 
00234         pr.load(BytesIO(b'''ProblemType: Crash
00235 WhiteSpace:
00236   foo   bar
00237  baz
00238    blip  
00239 Last: foo
00240  
00241 '''))
00242         self.assertEqual(pr['WhiteSpace'], ' foo   bar\nbaz\n  blip  ')
00243         self.assertEqual(pr['Last'], 'foo\n')
00244 
00245         # empty lines in values must have a leading space in coding
00246         invalid_spacing = BytesIO(b'''WhiteSpace:
00247  first
00248 
00249  second
00250 ''')
00251         pr = problem_report.ProblemReport()
00252         self.assertRaises(ValueError, pr.load, invalid_spacing)
00253 
00254         # test that load() cleans up properly
00255         pr.load(BytesIO(b'ProblemType: Crash'))
00256         self.assertEqual(list(pr.keys()), ['ProblemType'])
00257 
00258     def test_write_file(self):
00259         '''writing a report with binary file data.'''
00260 
00261         temp = tempfile.NamedTemporaryFile()
00262         temp.write(bin_data)
00263         temp.flush()
00264 
00265         pr = problem_report.ProblemReport(date='now!')
00266         pr['File'] = (temp.name,)
00267         pr['Afile'] = (temp.name,)
00268         io = BytesIO()
00269         pr.write(io)
00270         temp.close()
00271 
00272         self.assertEqual(io.getvalue(),
00273                          b'''ProblemType: Crash
00274 Date: now!
00275 Afile: base64
00276  H4sICAAAAAAC/0FmaWxlAA==
00277  c3RyhEIGBoYoRiYAM5XUCxAAAAA=
00278 File: base64
00279  H4sICAAAAAAC/0ZpbGUA
00280  c3RyhEIGBoYoRiYAM5XUCxAAAAA=
00281 ''')
00282 
00283         # force compression/encoding bool
00284         temp = tempfile.NamedTemporaryFile()
00285         temp.write(b'foo\0bar')
00286         temp.flush()
00287         pr = problem_report.ProblemReport(date='now!')
00288         pr['File'] = (temp.name, False)
00289         io = BytesIO()
00290         pr.write(io)
00291 
00292         self.assertEqual(io.getvalue(),
00293                          b'''ProblemType: Crash
00294 Date: now!
00295 File: foo\0bar
00296 ''')
00297 
00298         pr['File'] = (temp.name, True)
00299         io = BytesIO()
00300         pr.write(io)
00301 
00302         self.assertEqual(io.getvalue(),
00303                          b'''ProblemType: Crash
00304 Date: now!
00305 File: base64
00306  H4sICAAAAAAC/0ZpbGUA
00307  S8vPZ0hKLAIACq50HgcAAAA=
00308 ''')
00309         temp.close()
00310 
00311     def test_write_fileobj(self):
00312         '''writing a report with a pointer to a file-like object.'''
00313 
00314         tempbin = BytesIO(bin_data)
00315         tempasc = BytesIO(b'Hello World')
00316 
00317         pr = problem_report.ProblemReport(date='now!')
00318         pr['BinFile'] = (tempbin,)
00319         pr['AscFile'] = (tempasc, False)
00320         io = BytesIO()
00321         pr.write(io)
00322         io.seek(0)
00323 
00324         pr = problem_report.ProblemReport()
00325         pr.load(io)
00326         self.assertEqual(pr['BinFile'], tempbin.getvalue())
00327         self.assertEqual(pr['AscFile'], tempasc.getvalue().decode())
00328 
00329     def test_write_empty_fileobj(self):
00330         '''writing a report with a pointer to a file-like object with enforcing non-emptyness.'''
00331 
00332         tempbin = BytesIO(b'')
00333         tempasc = BytesIO(b'')
00334 
00335         pr = problem_report.ProblemReport(date='now!')
00336         pr['BinFile'] = (tempbin, True, None, True)
00337         io = BytesIO()
00338         self.assertRaises(IOError, pr.write, io)
00339 
00340         pr = problem_report.ProblemReport(date='now!')
00341         pr['AscFile'] = (tempasc, False, None, True)
00342         io = BytesIO()
00343         self.assertRaises(IOError, pr.write, io)
00344 
00345     def test_write_delayed_fileobj(self):
00346         '''writing a report with file pointers and delayed data.'''
00347 
00348         (fout, fin) = os.pipe()
00349 
00350         if os.fork() == 0:
00351             os.close(fout)
00352             time.sleep(0.3)
00353             os.write(fin, b'ab' * 512 * 1024)
00354             time.sleep(0.3)
00355             os.write(fin, b'hello')
00356             time.sleep(0.3)
00357             os.write(fin, b' world')
00358             os.close(fin)
00359             os._exit(0)
00360 
00361         os.close(fin)
00362 
00363         pr = problem_report.ProblemReport(date='now!')
00364         io = BytesIO()
00365         with os.fdopen(fout, 'rb') as f:
00366             pr['BinFile'] = (f,)
00367             pr.write(io)
00368         assert os.wait()[1] == 0
00369 
00370         io.seek(0)
00371 
00372         pr2 = problem_report.ProblemReport()
00373         pr2.load(io)
00374         self.assertEqual(pr2['BinFile'], 'ab' * 512 * 1024 + 'hello world')
00375 
00376     def test_read_file(self):
00377         '''reading a report with binary data.'''
00378 
00379         bin_report = b'''ProblemType: Crash
00380 Date: now!
00381 File: base64
00382  H4sICAAAAAAC/0ZpbGUA
00383  c3RyhEIGBoYoRiYAM5XUCxAAAAA=
00384 Foo: Bar
00385 '''
00386 
00387         # test with reading everything
00388         pr = problem_report.ProblemReport()
00389         pr.load(BytesIO(bin_report))
00390         self.assertEqual(pr['File'], bin_data)
00391         self.assertEqual(pr.has_removed_fields(), False)
00392 
00393         # test with skipping binary data
00394         pr.load(BytesIO(bin_report), binary=False)
00395         self.assertEqual(pr['File'], '')
00396         self.assertEqual(pr.has_removed_fields(), True)
00397 
00398         # test with keeping compressed binary data
00399         pr.load(BytesIO(bin_report), binary='compressed')
00400         self.assertEqual(pr['Foo'], 'Bar')
00401         self.assertEqual(pr.has_removed_fields(), False)
00402         self.assertTrue(isinstance(pr['File'], problem_report.CompressedValue))
00403         self.assertEqual(len(pr['File']), len(bin_data))
00404         self.assertEqual(pr['File'].get_value(), bin_data)
00405 
00406     def test_read_file_legacy(self):
00407         '''reading a report with binary data in legacy format without gzip
00408         header.'''
00409 
00410         bin_report = b'''ProblemType: Crash
00411 Date: now!
00412 File: base64
00413  eJw=
00414  c3RyxIAMcBAFAG55BXk=
00415 Foo: Bar
00416 '''
00417 
00418         # test with reading everything
00419         pr = problem_report.ProblemReport()
00420         pr.load(BytesIO(bin_report))
00421         self.assertEqual(pr['File'], b'AB' * 10 + b'\0' * 10 + b'Z')
00422         self.assertEqual(pr.has_removed_fields(), False)
00423 
00424         # test with skipping binary data
00425         pr.load(BytesIO(bin_report), binary=False)
00426         self.assertEqual(pr['File'], '')
00427         self.assertEqual(pr.has_removed_fields(), True)
00428 
00429         # test with keeping CompressedValues
00430         pr.load(BytesIO(bin_report), binary='compressed')
00431         self.assertEqual(pr.has_removed_fields(), False)
00432         self.assertEqual(len(pr['File']), 31)
00433         self.assertEqual(pr['File'].get_value(), b'AB' * 10 + b'\0' * 10 + b'Z')
00434         io = BytesIO()
00435         pr['File'].write(io)
00436         io.seek(0)
00437         self.assertEqual(io.read(), b'AB' * 10 + b'\0' * 10 + b'Z')
00438 
00439     def test_big_file(self):
00440         '''writing and re-decoding a big random file.'''
00441 
00442         # create 1 MB random file
00443         temp = tempfile.NamedTemporaryFile()
00444         data = os.urandom(1048576)
00445         temp.write(data)
00446         temp.flush()
00447 
00448         # write it into problem report
00449         pr = problem_report.ProblemReport()
00450         pr['File'] = (temp.name,)
00451         pr['Before'] = 'xtestx'
00452         pr['ZAfter'] = 'ytesty'
00453         io = BytesIO()
00454         pr.write(io)
00455         temp.close()
00456 
00457         # read it again
00458         io.seek(0)
00459         pr = problem_report.ProblemReport()
00460         pr.load(io)
00461 
00462         self.assertTrue(pr['File'] == data)
00463         self.assertEqual(pr['Before'], 'xtestx')
00464         self.assertEqual(pr['ZAfter'], 'ytesty')
00465 
00466         # write it again
00467         io2 = BytesIO()
00468         pr.write(io2)
00469         self.assertTrue(io.getvalue() == io2.getvalue())
00470 
00471         # check gzip compatibility
00472         io.seek(0)
00473         pr = problem_report.ProblemReport()
00474         pr.load(io, binary='compressed')
00475         self.assertEqual(pr['File'].get_value(), data)
00476 
00477     def test_size_limit(self):
00478         '''writing and a big random file with a size limit key.'''
00479 
00480         # create 1 MB random file
00481         temp = tempfile.NamedTemporaryFile()
00482         data = os.urandom(1048576)
00483         temp.write(data)
00484         temp.flush()
00485 
00486         # write it into problem report
00487         pr = problem_report.ProblemReport()
00488         pr['FileSmallLimit'] = (temp.name, True, 100)
00489         pr['FileLimitMinus1'] = (temp.name, True, 1048575)
00490         pr['FileExactLimit'] = (temp.name, True, 1048576)
00491         pr['FileLimitPlus1'] = (temp.name, True, 1048577)
00492         pr['FileLimitNone'] = (temp.name, True, None)
00493         pr['Before'] = 'xtestx'
00494         pr['ZAfter'] = 'ytesty'
00495         io = BytesIO()
00496         pr.write(io)
00497         temp.close()
00498 
00499         # read it again
00500         io.seek(0)
00501         pr = problem_report.ProblemReport()
00502         pr.load(io)
00503 
00504         self.assertFalse('FileSmallLimit' in pr)
00505         self.assertFalse('FileLimitMinus1' in pr)
00506         self.assertTrue(pr['FileExactLimit'] == data)
00507         self.assertTrue(pr['FileLimitPlus1'] == data)
00508         self.assertTrue(pr['FileLimitNone'] == data)
00509         self.assertEqual(pr['Before'], 'xtestx')
00510         self.assertEqual(pr['ZAfter'], 'ytesty')
00511 
00512     def test_iter(self):
00513         '''problem_report.ProblemReport iteration.'''
00514 
00515         pr = problem_report.ProblemReport()
00516         pr['foo'] = 'bar'
00517 
00518         keys = []
00519         for k in pr:
00520             keys.append(k)
00521         keys.sort()
00522         self.assertEqual(' '.join(keys), 'Date ProblemType foo')
00523 
00524         self.assertEqual(len([k for k in pr if k != 'foo']), 2)
00525 
00526     def test_modify(self):
00527         '''reading, modifying fields, and writing back.'''
00528 
00529         report = b'''ProblemType: Crash
00530 Date: now!
00531 Long:
00532  xxx
00533  .
00534  yyy
00535 Short: Bar
00536 File: base64
00537  H4sICAAAAAAC/0ZpbGUA
00538  c3RyxIAMcBAFAK/2p9MfAAAA
00539 '''
00540 
00541         pr = problem_report.ProblemReport()
00542         pr.load(BytesIO(report))
00543 
00544         self.assertEqual(pr['Long'], 'xxx\n.\nyyy')
00545 
00546         # write back unmodified
00547         io = BytesIO()
00548         pr.write(io)
00549         self.assertEqual(io.getvalue(), report)
00550 
00551         pr['Short'] = 'aaa\nbbb'
00552         pr['Long'] = '123'
00553         io = BytesIO()
00554         pr.write(io)
00555         self.assertEqual(io.getvalue(),
00556                          b'''ProblemType: Crash
00557 Date: now!
00558 Long: 123
00559 Short:
00560  aaa
00561  bbb
00562 File: base64
00563  H4sICAAAAAAC/0ZpbGUA
00564  c3RyxIAMcBAFAK/2p9MfAAAA
00565 ''')
00566 
00567     def test_add_to_existing(self):
00568         '''adding information to an existing report.'''
00569 
00570         # original report
00571         pr = problem_report.ProblemReport()
00572         pr['old1'] = '11'
00573         pr['old2'] = '22'
00574 
00575         (fd, rep) = tempfile.mkstemp()
00576         os.close(fd)
00577         with open(rep, 'wb') as f:
00578             pr.write(f)
00579 
00580         origstat = os.stat(rep)
00581 
00582         # create a new one and add it
00583         pr = problem_report.ProblemReport()
00584         pr.clear()
00585         pr['new1'] = '33'
00586 
00587         pr.add_to_existing(rep, keep_times=True)
00588 
00589         # check keep_times
00590         newstat = os.stat(rep)
00591         self.assertEqual(origstat.st_mode, newstat.st_mode)
00592         self.assertAlmostEqual(origstat.st_atime, newstat.st_atime, 1)
00593         self.assertAlmostEqual(origstat.st_mtime, newstat.st_mtime, 1)
00594 
00595         # check report contents
00596         newpr = problem_report.ProblemReport()
00597         with open(rep, 'rb') as f:
00598             newpr.load(f)
00599         self.assertEqual(newpr['old1'], '11')
00600         self.assertEqual(newpr['old2'], '22')
00601         self.assertEqual(newpr['new1'], '33')
00602 
00603         # create a another new one and add it, but make sure mtime must be
00604         # different
00605         time.sleep(1)
00606         with open(rep) as f:
00607             f.read()  # bump atime
00608         time.sleep(1)
00609 
00610         pr = problem_report.ProblemReport()
00611         pr.clear()
00612         pr['new2'] = '44'
00613 
00614         pr.add_to_existing(rep)
00615 
00616         # check that timestamps have been updates
00617         newstat = os.stat(rep)
00618         self.assertEqual(origstat.st_mode, newstat.st_mode)
00619         self.assertNotEqual(origstat.st_mtime, newstat.st_mtime)
00620         # skip atime check if filesystem is mounted noatime
00621         skip_atime = False
00622         dir = rep
00623         while len(dir) > 1:
00624             dir, filename = os.path.split(dir)
00625             if os.path.ismount(dir):
00626                 with open('/proc/mounts') as f:
00627                     for line in f:
00628                         mount, fs, options = line.split(' ')[1:4]
00629                         if mount == dir and 'noatime' in options.split(','):
00630                             skip_atime = True
00631                             break
00632                 break
00633         if not skip_atime:
00634             self.assertNotEqual(origstat.st_atime, newstat.st_atime)
00635 
00636         # check report contents
00637         newpr = problem_report.ProblemReport()
00638         with open(rep, 'rb') as f:
00639             newpr.load(f)
00640         self.assertEqual(newpr['old1'], '11')
00641         self.assertEqual(newpr['old2'], '22')
00642         self.assertEqual(newpr['new1'], '33')
00643         self.assertEqual(newpr['new2'], '44')
00644 
00645         os.unlink(rep)
00646 
00647     def test_write_mime_text(self):
00648         '''write_mime() for text values.'''
00649 
00650         pr = problem_report.ProblemReport(date='now!')
00651         pr['Simple'] = 'bar'
00652         if sys.version.startswith('2'):
00653             pr['SimpleUTF8'] = '1äö2Φ3'
00654             pr['SimpleUnicode'] = '1äö2Φ3'.decode('UTF-8')
00655             pr['TwoLineUnicode'] = 'pi-π\nnu-η\n'.decode('UTF-8')
00656             pr['TwoLineUTF8'] = 'pi-π\nnu-η\n'
00657         else:
00658             pr['SimpleUTF8'] = '1äö2Φ3'.encode('UTF-8')
00659             pr['SimpleUnicode'] = '1äö2Φ3'
00660             pr['TwoLineUnicode'] = 'pi-π\nnu-η\n'
00661             pr['TwoLineUTF8'] = 'pi-π\nnu-η\n'.encode('UTF-8')
00662         pr['SimpleLineEnd'] = 'bar\n'
00663         pr['TwoLine'] = 'first\nsecond\n'
00664         pr['InlineMargin'] = 'first\nsecond\nthird\nfourth\nfifth\n'
00665         pr['Multiline'] = ' foo   bar\nbaz\n  blip  \nline4\nline♥5!!\nłıµ€ ⅝\n'
00666 
00667         # still small enough for inline text
00668         pr['Largeline'] = 'A' * 999
00669         pr['LargeMultiline'] = 'A' * 120 + '\n' + 'B' * 90
00670 
00671         # too big for inline text, these become attachments
00672         pr['Hugeline'] = 'A' * 10000
00673         pr['HugeMultiline'] = 'A' * 900 + '\n' + 'B' * 900 + '\n' + 'C' * 900
00674         io = BytesIO()
00675         pr.write_mime(io)
00676         io.seek(0)
00677 
00678         msg = email.message_from_binary_file(io)
00679         parts = [p for p in msg.walk()]
00680         self.assertEqual(len(parts), 5)
00681 
00682         # first part is the multipart container
00683         self.assertTrue(parts[0].is_multipart())
00684 
00685         # second part should be an inline text/plain attachments with all short
00686         # fields
00687         self.assertTrue(not parts[1].is_multipart())
00688         self.assertEqual(parts[1].get_content_type(), 'text/plain')
00689         self.assertEqual(parts[1].get_content_charset(), 'utf-8')
00690         self.assertEqual(parts[1].get_filename(), None)
00691         expected = '''ProblemType: Crash
00692 Date: now!
00693 InlineMargin:
00694  first
00695  second
00696  third
00697  fourth
00698  fifth
00699 LargeMultiline:
00700  AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA
00701  BBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBB
00702 Largeline: %s
00703 Simple: bar
00704 SimpleLineEnd: bar
00705 SimpleUTF8: 1äö2Φ3
00706 SimpleUnicode: 1äö2Φ3
00707 TwoLine:
00708  first
00709  second
00710 TwoLineUTF8:
00711  pi-π
00712  nu-η
00713 TwoLineUnicode:
00714  pi-π
00715  nu-η
00716 ''' % pr['Largeline']
00717         if sys.version >= '3':
00718             expected = expected.encode('UTF-8')
00719         self.assertEqual(parts[1].get_payload(decode=True), expected)
00720 
00721         # third part should be the HugeMultiline: field as attachment
00722         self.assertTrue(not parts[2].is_multipart())
00723         self.assertEqual(parts[2].get_content_type(), 'text/plain')
00724         self.assertEqual(parts[2].get_content_charset(), 'utf-8')
00725         self.assertEqual(parts[2].get_filename(), 'HugeMultiline.txt')
00726         self.assertEqual(parts[2].get_payload(decode=True), pr['HugeMultiline'].encode())
00727 
00728         # fourth part should be the Hugeline: field as attachment
00729         self.assertTrue(not parts[3].is_multipart())
00730         self.assertEqual(parts[3].get_content_type(), 'text/plain')
00731         self.assertEqual(parts[3].get_content_charset(), 'utf-8')
00732         self.assertEqual(parts[3].get_filename(), 'Hugeline.txt')
00733         self.assertEqual(parts[3].get_payload(decode=True), pr['Hugeline'].encode())
00734 
00735         # fifth part should be the Multiline: field as attachment
00736         self.assertTrue(not parts[4].is_multipart())
00737         self.assertEqual(parts[4].get_content_type(), 'text/plain')
00738         self.assertEqual(parts[4].get_content_charset(), 'utf-8')
00739         self.assertEqual(parts[4].get_filename(), 'Multiline.txt')
00740         expected = ''' foo   bar
00741 baz
00742   blip  
00743 line4
00744 line♥5!!
00745 łıµ€ ⅝
00746 '''
00747         if sys.version >= '3':
00748             expected = expected.encode('UTF-8')
00749         self.assertEqual(parts[4].get_payload(decode=True), expected)
00750 
00751     def test_write_mime_binary(self):
00752         '''write_mime() for binary values and file references.'''
00753 
00754         temp = tempfile.NamedTemporaryFile()
00755         temp.write(bin_data)
00756         temp.flush()
00757 
00758         tempgz = tempfile.NamedTemporaryFile()
00759         gz = gzip.GzipFile('File1', 'w', fileobj=tempgz)
00760         gz.write(bin_data)
00761         gz.close()
00762         tempgz.flush()
00763 
00764         pr = problem_report.ProblemReport(date='now!')
00765         pr['Context'] = 'Test suite'
00766         pr['File1'] = (temp.name,)
00767         pr['File1.gz'] = (tempgz.name,)
00768         pr['Value1'] = bin_data
00769         with open(tempgz.name, 'rb') as f:
00770             pr['Value1.gz'] = f.read()
00771         pr['ZValue'] = problem_report.CompressedValue(bin_data)
00772         io = BytesIO()
00773         pr.write_mime(io)
00774         io.seek(0)
00775 
00776         msg = email.message_from_binary_file(io)
00777         parts = [p for p in msg.walk()]
00778         self.assertEqual(len(parts), 7)
00779 
00780         # first part is the multipart container
00781         self.assertTrue(parts[0].is_multipart())
00782 
00783         # second part should be an inline text/plain attachments with all short
00784         # fields
00785         self.assertTrue(not parts[1].is_multipart())
00786         self.assertEqual(parts[1].get_content_type(), 'text/plain')
00787         self.assertEqual(parts[1].get_content_charset(), 'utf-8')
00788         self.assertEqual(parts[1].get_filename(), None)
00789         self.assertEqual(parts[1].get_payload(decode=True),
00790                          b'ProblemType: Crash\nContext: Test suite\nDate: now!\n')
00791 
00792         # third part should be the File1: file contents as gzip'ed attachment
00793         self.assertTrue(not parts[2].is_multipart())
00794         self.assertEqual(parts[2].get_content_type(), 'application/x-gzip')
00795         self.assertEqual(parts[2].get_filename(), 'File1.gz')
00796         f = tempfile.TemporaryFile()
00797         f.write(parts[2].get_payload(decode=True))
00798         f.seek(0)
00799         self.assertEqual(gzip.GzipFile(mode='rb', fileobj=f).read(), bin_data)
00800         f.close()
00801 
00802         # fourth part should be the File1.gz: file contents as gzip'ed
00803         # attachment; write_mime() should not compress it again
00804         self.assertTrue(not parts[3].is_multipart())
00805         self.assertEqual(parts[3].get_content_type(), 'application/x-gzip')
00806         self.assertEqual(parts[3].get_filename(), 'File1.gz')
00807         f = tempfile.TemporaryFile()
00808         f.write(parts[3].get_payload(decode=True))
00809         f.seek(0)
00810         self.assertEqual(gzip.GzipFile(mode='rb', fileobj=f).read(), bin_data)
00811         f.close()
00812 
00813         # fifth part should be the Value1: value as gzip'ed attachment
00814         self.assertTrue(not parts[4].is_multipart())
00815         self.assertEqual(parts[4].get_content_type(), 'application/x-gzip')
00816         self.assertEqual(parts[4].get_filename(), 'Value1.gz')
00817         f = tempfile.TemporaryFile()
00818         f.write(parts[4].get_payload(decode=True))
00819         f.seek(0)
00820         self.assertEqual(gzip.GzipFile(mode='rb', fileobj=f).read(), bin_data)
00821         f.close()
00822 
00823         # sixth part should be the Value1: value as gzip'ed attachment;
00824         # write_mime should not compress it again
00825         self.assertTrue(not parts[5].is_multipart())
00826         self.assertEqual(parts[5].get_content_type(), 'application/x-gzip')
00827         self.assertEqual(parts[5].get_filename(), 'Value1.gz')
00828         f = tempfile.TemporaryFile()
00829         f.write(parts[5].get_payload(decode=True))
00830         f.seek(0)
00831         self.assertEqual(gzip.GzipFile(mode='rb', fileobj=f).read(), bin_data)
00832         f.close()
00833 
00834         # seventh part should be the ZValue: value as gzip'ed attachment;
00835         # write_mime should not compress it again
00836         self.assertTrue(not parts[6].is_multipart())
00837         self.assertEqual(parts[6].get_content_type(), 'application/x-gzip')
00838         self.assertEqual(parts[6].get_filename(), 'ZValue.gz')
00839         f = tempfile.TemporaryFile()
00840         f.write(parts[6].get_payload(decode=True))
00841         f.seek(0)
00842         self.assertEqual(gzip.GzipFile(mode='rb', fileobj=f).read(), bin_data)
00843         f.close()
00844 
00845     def test_write_mime_extra_headers(self):
00846         '''write_mime() with extra headers.'''
00847 
00848         pr = problem_report.ProblemReport(date='now!')
00849         pr['Simple'] = 'bar'
00850         pr['TwoLine'] = 'first\nsecond\n'
00851         io = BytesIO()
00852         pr.write_mime(io, extra_headers={'Greeting': 'hello world',
00853                                          'Foo': 'Bar'})
00854         io.seek(0)
00855 
00856         msg = email.message_from_binary_file(io)
00857         self.assertEqual(msg['Greeting'], 'hello world')
00858         self.assertEqual(msg['Foo'], 'Bar')
00859         parts = [p for p in msg.walk()]
00860         self.assertEqual(len(parts), 2)
00861 
00862         # first part is the multipart container
00863         self.assertTrue(parts[0].is_multipart())
00864 
00865         # second part should be an inline text/plain attachments with all short
00866         # fields
00867         self.assertTrue(not parts[1].is_multipart())
00868         self.assertEqual(parts[1].get_content_type(), 'text/plain')
00869         self.assertTrue(b'Simple: bar' in parts[1].get_payload(decode=True))
00870 
00871     def test_write_mime_filter(self):
00872         '''write_mime() with key filters.'''
00873 
00874         pr = problem_report.ProblemReport(date='now!')
00875         pr['GoodText'] = 'Hi'
00876         pr['BadText'] = 'YouDontSeeMe'
00877         pr['GoodBin'] = bin_data
00878         pr['BadBin'] = 'Y' + '\x05' * 10 + '-'
00879         io = BytesIO()
00880         pr.write_mime(io, skip_keys=['BadText', 'BadBin'])
00881         io.seek(0)
00882 
00883         msg = email.message_from_binary_file(io)
00884         parts = [p for p in msg.walk()]
00885         self.assertEqual(len(parts), 3)
00886 
00887         # first part is the multipart container
00888         self.assertTrue(parts[0].is_multipart())
00889 
00890         # second part should be an inline text/plain attachments with all short
00891         # fields
00892         self.assertTrue(not parts[1].is_multipart())
00893         self.assertEqual(parts[1].get_content_type(), 'text/plain')
00894         self.assertEqual(parts[1].get_content_charset(), 'utf-8')
00895         self.assertEqual(parts[1].get_filename(), None)
00896         self.assertEqual(parts[1].get_payload(decode=True), b'''ProblemType: Crash
00897 Date: now!
00898 GoodText: Hi
00899 ''')
00900 
00901         # third part should be the GoodBin: field as attachment
00902         self.assertTrue(not parts[2].is_multipart())
00903         f = tempfile.TemporaryFile()
00904         f.write(parts[2].get_payload(decode=True))
00905         f.seek(0)
00906         self.assertEqual(gzip.GzipFile(mode='rb', fileobj=f).read(), bin_data)
00907         f.close()
00908 
00909     def test_write_mime_order(self):
00910         '''write_mime() with keys ordered.'''
00911 
00912         pr = problem_report.ProblemReport(date='now!')
00913         pr['SecondText'] = 'What'
00914         pr['FirstText'] = 'Who'
00915         pr['FourthText'] = 'Today'
00916         pr['ThirdText'] = "I Don't Know"
00917         io = BytesIO()
00918         pr.write_mime(io, priority_fields=['FirstText', 'SecondText',
00919                                            'ThirdText', 'Unknown', 'FourthText'])
00920         io.seek(0)
00921 
00922         msg = email.message_from_binary_file(io)
00923         parts = [p for p in msg.walk()]
00924         self.assertEqual(len(parts), 2)
00925 
00926         # first part is the multipart container
00927         self.assertTrue(parts[0].is_multipart())
00928 
00929         # second part should be an inline text/plain attachments with all short
00930         # fields
00931         self.assertTrue(not parts[1].is_multipart())
00932         self.assertEqual(parts[1].get_content_type(), 'text/plain')
00933         self.assertEqual(parts[1].get_content_charset(), 'utf-8')
00934         self.assertEqual(parts[1].get_filename(), None)
00935         self.assertEqual(parts[1].get_payload(decode=True), b'''FirstText: Who
00936 SecondText: What
00937 ThirdText: I Don't Know
00938 FourthText: Today
00939 ProblemType: Crash
00940 Date: now!
00941 ''')
00942 
00943     def test_updating(self):
00944         '''new_keys() and write() with only_new=True.'''
00945 
00946         pr = problem_report.ProblemReport()
00947         self.assertEqual(pr.new_keys(), set(['ProblemType', 'Date']))
00948         pr.load(BytesIO(b'''ProblemType: Crash
00949 Date: now!
00950 Foo: bar
00951 Baz: blob
00952 '''))
00953 
00954         self.assertEqual(pr.new_keys(), set())
00955 
00956         pr['Foo'] = 'changed'
00957         pr['NewKey'] = 'new new'
00958         self.assertEqual(pr.new_keys(), set(['NewKey']))
00959 
00960         out = BytesIO()
00961         pr.write(out, only_new=True)
00962         self.assertEqual(out.getvalue(), b'NewKey: new new\n')
00963 
00964     def test_import_dict(self):
00965         '''importing a dictionary with update().'''
00966 
00967         pr = problem_report.ProblemReport()
00968         pr['oldtext'] = 'Hello world'
00969         pr['oldbin'] = bin_data
00970         pr['overwrite'] = 'I am crap'
00971 
00972         d = {}
00973         d['newtext'] = 'Goodbye world'
00974         d['newbin'] = '11\000\001\002\xFFZZ'
00975         d['overwrite'] = 'I am good'
00976 
00977         pr.update(d)
00978         self.assertEqual(pr['oldtext'], 'Hello world')
00979         self.assertEqual(pr['oldbin'], bin_data)
00980         self.assertEqual(pr['newtext'], 'Goodbye world')
00981         self.assertEqual(pr['newbin'], '11\000\001\002\xFFZZ')
00982         self.assertEqual(pr['overwrite'], 'I am good')
00983 
00984 if __name__ == '__main__':
00985     unittest.main()