Back to index

apport  2.4
test_python_crashes.py
Go to the documentation of this file.
00001 # Test apport_python_hook.py
00002 #
00003 # Copyright (c) 2006 - 2011 Canonical Ltd.
00004 # Authors: Robert Collins <robert@ubuntu.com>
00005 #          Martin Pitt <martin.pitt@ubuntu.com>
00006 #
00007 # This program is free software; you can redistribute it and/or modify it
00008 # under the terms of the GNU General Public License as published by the
00009 # Free Software Foundation; either version 2 of the License, or (at your
00010 # option) any later version.  See http://www.gnu.org/copyleft/gpl.html for
00011 # the full text of the license.
00012 
00013 import unittest, tempfile, subprocess, os, stat, shutil, atexit
00014 import dbus
00015 
00016 temp_report_dir = tempfile.mkdtemp()
00017 os.environ['APPORT_REPORT_DIR'] = temp_report_dir
00018 atexit.register(shutil.rmtree, temp_report_dir)
00019 
00020 import apport.fileutils, problem_report
00021 
00022 
00023 class T(unittest.TestCase):
00024     def tearDown(self):
00025         for f in apport.fileutils.get_all_reports():
00026             os.unlink(f)
00027 
00028     def _test_crash(self, extracode='', scriptname=None):
00029         '''Create a test crash.'''
00030 
00031         # put the script into /var/tmp, since that isn't ignored in the
00032         # hook
00033         if scriptname:
00034             script = scriptname
00035             fd = os.open(scriptname, os.O_CREAT | os.O_WRONLY)
00036         else:
00037             (fd, script) = tempfile.mkstemp(dir='/var/tmp')
00038         try:
00039             os.write(fd, ('''#!/usr/bin/env %s
00040 import apport_python_hook
00041 apport_python_hook.install()
00042 
00043 def func(x):
00044     raise Exception(b'This should happen. \\xe2\\x99\\xa5'.decode('UTF-8'))
00045 
00046 %s
00047 func(42)
00048 ''' % (os.getenv('PYTHON', 'python3'), extracode)).encode())
00049             os.close(fd)
00050             os.chmod(script, 0o755)
00051 
00052             p = subprocess.Popen([script, 'testarg1', 'testarg2'],
00053                                  stderr=subprocess.PIPE, env=os.environ)
00054             err = p.communicate()[1].decode()
00055             self.assertEqual(p.returncode, 1,
00056                              'crashing test python program exits with failure code')
00057             if not extracode:
00058                 self.assertTrue('This should happen.' in err, err)
00059             self.assertFalse('OSError' in err, err)
00060         finally:
00061             os.unlink(script)
00062 
00063         return script
00064 
00065     def test_general(self):
00066         '''general operation of the Python crash hook.'''
00067 
00068         script = self._test_crash()
00069 
00070         # did we get a report?
00071         reports = apport.fileutils.get_new_reports()
00072         pr = None
00073         self.assertEqual(len(reports), 1, 'crashed Python program produced a report')
00074         self.assertEqual(stat.S_IMODE(os.stat(reports[0]).st_mode),
00075                          0o640, 'report has correct permissions')
00076 
00077         pr = problem_report.ProblemReport()
00078         with open(reports[0], 'rb') as f:
00079             pr.load(f)
00080 
00081         # check report contents
00082         expected_keys = ['InterpreterPath', 'PythonArgs', 'Traceback',
00083                          'ProblemType', 'ProcEnviron', 'ProcStatus',
00084                          'ProcCmdline', 'Date', 'ExecutablePath', 'ProcMaps',
00085                          'UserGroups']
00086         self.assertTrue(set(expected_keys).issubset(set(pr.keys())),
00087                         'report has necessary fields')
00088         self.assertTrue('bin/python' in pr['InterpreterPath'])
00089         self.assertEqual(pr['ExecutablePath'], script)
00090         self.assertEqual(pr['PythonArgs'], "['%s', 'testarg1', 'testarg2']" % script)
00091         self.assertTrue(pr['Traceback'].startswith('Traceback'))
00092         self.assertTrue("func\n    raise Exception(b'This should happen." in
00093                         pr['Traceback'], pr['Traceback'])
00094 
00095     def test_existing(self):
00096         '''Python crash hook overwrites seen existing files.'''
00097 
00098         script = self._test_crash()
00099 
00100         # did we get a report?
00101         reports = apport.fileutils.get_new_reports()
00102         self.assertEqual(len(reports), 1, 'crashed Python program produced a report')
00103         self.assertEqual(stat.S_IMODE(os.stat(reports[0]).st_mode),
00104                          0o640, 'report has correct permissions')
00105 
00106         # touch report -> "seen" case
00107         apport.fileutils.mark_report_seen(reports[0])
00108 
00109         reports = apport.fileutils.get_new_reports()
00110         self.assertEqual(len(reports), 0)
00111 
00112         script = self._test_crash(scriptname=script)
00113         reports = apport.fileutils.get_new_reports()
00114         self.assertEqual(len(reports), 1)
00115 
00116         # "unseen" case
00117         script = self._test_crash(scriptname=script)
00118         reports = apport.fileutils.get_new_reports()
00119         self.assertEqual(len(reports), 1)
00120 
00121     def test_no_argv(self):
00122         '''with zapped sys.argv.'''
00123 
00124         self._test_crash('import sys\nsys.argv = None')
00125 
00126         # did we get a report?
00127         reports = apport.fileutils.get_new_reports()
00128         pr = None
00129         self.assertEqual(len(reports), 1, 'crashed Python program produced a report')
00130         self.assertEqual(stat.S_IMODE(os.stat(reports[0]).st_mode),
00131                          0o640, 'report has correct permissions')
00132 
00133         pr = problem_report.ProblemReport()
00134         with open(reports[0], 'rb') as f:
00135             pr.load(f)
00136 
00137         # check report contents
00138         expected_keys = ['InterpreterPath', 'Traceback', 'ProblemType',
00139                          'ProcEnviron', 'ProcStatus', 'ProcCmdline', 'Date',
00140                          'ExecutablePath', 'ProcMaps', 'UserGroups']
00141         self.assertTrue(set(expected_keys).issubset(set(pr.keys())),
00142                         'report has necessary fields')
00143         self.assertTrue('bin/python' in pr['InterpreterPath'])
00144         self.assertTrue(pr['Traceback'].startswith('Traceback'))
00145 
00146     def _assert_no_reports(self):
00147         '''Assert that there are no crash reports.'''
00148 
00149         reports = apport.fileutils.get_new_reports()
00150         self.assertEqual(len(reports), 0,
00151                          'no crash reports present (cwd: %s)' % os.getcwd())
00152 
00153     def test_interactive(self):
00154         '''interactive Python sessions never generate a report.'''
00155 
00156         orig_cwd = os.getcwd()
00157         try:
00158             for d in ('/tmp', '/usr/local', '/usr'):
00159                 os.chdir(d)
00160                 p = subprocess.Popen(['python'], stdin=subprocess.PIPE,
00161                                      stdout=subprocess.PIPE, stderr=subprocess.PIPE)
00162                 (out, err) = p.communicate(b'raise ValueError')
00163                 out = out.decode()
00164                 err = err.decode()
00165                 assert p.returncode != 0
00166                 assert out == ''
00167                 assert 'ValueError' in err
00168                 self._assert_no_reports()
00169         finally:
00170             os.chdir(orig_cwd)
00171 
00172     def test_ignoring(self):
00173         '''the Python crash hook respects the ignore list.'''
00174 
00175         # put the script into /var/crash, since that isn't ignored in the
00176         # hook
00177         (fd, script) = tempfile.mkstemp(dir=apport.fileutils.report_dir)
00178         ifpath = os.path.expanduser(apport.report._ignore_file)
00179         orig_ignore_file = None
00180         try:
00181             os.write(fd, ('''#!/usr/bin/env %s
00182 import apport_python_hook
00183 apport_python_hook.install()
00184 
00185 def func(x):
00186     raise Exception('This should happen.')
00187 
00188 func(42)
00189 ''' % os.getenv('PYTHON', 'python3')).encode('ascii'))
00190             os.close(fd)
00191             os.chmod(script, 0o755)
00192 
00193             # move aside current ignore file
00194             if os.path.exists(ifpath):
00195                 orig_ignore_file = ifpath + '.apporttest'
00196                 os.rename(ifpath, orig_ignore_file)
00197 
00198             # ignore
00199             r = apport.report.Report()
00200             r['ExecutablePath'] = script
00201             r.mark_ignore()
00202             r = None
00203 
00204             p = subprocess.Popen([script, 'testarg1', 'testarg2'],
00205                                  stdout=subprocess.PIPE, stderr=subprocess.PIPE)
00206             err = p.communicate()[1].decode()
00207             self.assertEqual(p.returncode, 1,
00208                              'crashing test python program exits with failure code')
00209             self.assertTrue('Exception: This should happen.' in err, err)
00210 
00211         finally:
00212             os.unlink(script)
00213             # clean up our ignore file
00214             if os.path.exists(ifpath):
00215                 os.unlink(ifpath)
00216             if orig_ignore_file:
00217                 os.rename(orig_ignore_file, ifpath)
00218 
00219         # did we get a report?
00220         reports = apport.fileutils.get_new_reports()
00221         self.assertEqual(len(reports), 0)
00222 
00223     def test_no_flooding(self):
00224         '''limit successive reports'''
00225 
00226         count = 0
00227         limit = 5
00228         while count < limit:
00229             self._test_crash(scriptname='/var/tmp/pytestcrash')
00230             reports = apport.fileutils.get_new_reports()
00231             if not reports:
00232                 break
00233             self.assertEqual(len(reports), 1, 'crashed Python program produced one report')
00234             apport.fileutils.mark_report_seen(reports[0])
00235             count += 1
00236 
00237         self.assertGreater(count, 1)
00238         self.assertLess(count, limit)
00239 
00240     def test_dbus_service_unknown_invalid(self):
00241         '''DBus.Error.ServiceUnknown with an invalid name'''
00242 
00243         self._test_crash(extracode='''import dbus
00244 obj = dbus.SessionBus().get_object('com.example.NotExisting', '/Foo')
00245 ''')
00246 
00247         pr = self._load_report()
00248         self.assertTrue(pr['Traceback'].startswith('Traceback'), pr['Traceback'])
00249         self.assertTrue('org.freedesktop.DBus.Error.ServiceUnknown' in pr['Traceback'], pr['Traceback'])
00250         self.assertEqual(pr['DbusErrorAnalysis'], 'no service file providing com.example.NotExisting')
00251 
00252     def test_dbus_service_unknown_wrongbus_notrunning(self):
00253         '''DBus.Error.ServiceUnknown with a valid name on a different bus (not running)'''
00254 
00255         subprocess.call(['killall', 'gvfsd-metadata'])
00256         self._test_crash(extracode='''import dbus
00257 obj = dbus.SystemBus().get_object('org.gtk.vfs.Metadata', '/org/gtk/vfs/metadata')
00258 ''')
00259 
00260         pr = self._load_report()
00261         self.assertTrue('org.freedesktop.DBus.Error.ServiceUnknown' in pr['Traceback'], pr['Traceback'])
00262         self.assertTrue(pr['DbusErrorAnalysis'].startswith('provided by /usr/share/dbus-1/services/gvfs-metadata.service'),
00263                         pr['DbusErrorAnalysis'])
00264         self.assertTrue('gvfsd-metadata is not running' in pr['DbusErrorAnalysis'], pr['DbusErrorAnalysis'])
00265 
00266     def test_dbus_service_unknown_wrongbus_running(self):
00267         '''DBus.Error.ServiceUnknown with a valid name on a different bus (running)'''
00268 
00269         self._test_crash(extracode='''import dbus
00270 # let the service be activated, to ensure it is running
00271 obj = dbus.SessionBus().get_object('org.gtk.vfs.Metadata', '/org/gtk/vfs/metadata')
00272 assert obj
00273 obj = dbus.SystemBus().get_object('org.gtk.vfs.Metadata', '/org/gtk/vfs/metadata')
00274 ''')
00275 
00276         pr = self._load_report()
00277         self.assertTrue('org.freedesktop.DBus.Error.ServiceUnknown' in pr['Traceback'], pr['Traceback'])
00278         self.assertTrue(pr['DbusErrorAnalysis'].startswith('provided by /usr/share/dbus-1/services/gvfs-metadata.service'),
00279                         pr['DbusErrorAnalysis'])
00280         self.assertTrue('gvfsd-metadata is running' in pr['DbusErrorAnalysis'], pr['DbusErrorAnalysis'])
00281 
00282     def test_dbus_service_timeout_running(self):
00283         '''DBus.Error.NoReply with a running service'''
00284 
00285         # ensure the service is running
00286         metadata_obj = dbus.SessionBus().get_object('org.gtk.vfs.Metadata', '/org/gtk/vfs/metadata')
00287         self.assertNotEqual(metadata_obj, None)
00288 
00289         # timeout of zero will always fail with NoReply
00290         try:
00291             subprocess.call(['killall', '-STOP', 'gvfsd-metadata'])
00292             self._test_crash(extracode='''import dbus
00293 obj = dbus.SessionBus().get_object('org.gtk.vfs.Metadata', '/org/gtk/vfs/metadata')
00294 assert obj
00295 i = dbus.Interface(obj, 'org.freedesktop.DBus.Peer')
00296 i.Ping(timeout=1)
00297 ''')
00298         finally:
00299             subprocess.call(['killall', '-CONT', 'gvfsd-metadata'])
00300 
00301         # check report contents
00302         reports = apport.fileutils.get_new_reports()
00303         self.assertEqual(len(reports), 0, 'NoReply is an useless exception and should not create a report')
00304 
00305         # This is disabled for now as we cannot get the bus name from the NoReply exception
00306         #pr = self._load_report()
00307         #self.assertTrue('org.freedesktop.DBus.Error.NoReply' in pr['Traceback'], pr['Traceback'])
00308         #self.assertTrue(pr['DbusErrorAnalysis'].startswith('provided by /usr/share/dbus-1/services/gvfs-metadata.service'),
00309         #                pr['DbusErrorAnalysis'])
00310         #self.assertTrue('gvfsd-metadata is running' in pr['DbusErrorAnalysis'], pr['DbusErrorAnalysis'])
00311 
00312 # This is disabled for now as we cannot get the bus name from the NoReply exception
00313 #    def test_dbus_service_timeout_notrunning(self):
00314 #        '''DBus.Error.NoReply with a crashing method'''
00315 #
00316 #        # run our own mock service with a crashing method
00317 #        subprocess.call(['killall', 'gvfsd-metadata'])
00318 #        service = subprocess.Popen([os.getenv('PYTHON', 'python3')],
00319 #                                   stdin=subprocess.PIPE,
00320 #                                   universal_newlines=True)
00321 #        service.stdin.write('''import os
00322 #import dbus, dbus.service, dbus.mainloop.glib
00323 #from gi.repository import GLib
00324 #
00325 #class MockMetadata(dbus.service.Object):
00326 #    @dbus.service.method('com.ubuntu.Test', in_signature='', out_signature='i')
00327 #    def Crash(self):
00328 #        os.kill(os.getpid(), 5)
00329 #
00330 #dbus.mainloop.glib.DBusGMainLoop(set_as_default=True)
00331 #dbus_name = dbus.service.BusName('org.gtk.vfs.Metadata', dbus.SessionBus())
00332 #svr = MockMetadata(bus_name=dbus_name, object_path='/org/gtk/vfs/metadata')
00333 #GLib.MainLoop().run()
00334 #''')
00335 #        service.stdin.close()
00336 #        self.addCleanup(service.terminate)
00337 #        time.sleep(0.5)
00338 #
00339 #        self._test_crash(extracode='''import dbus
00340 #obj = dbus.SessionBus().get_object('org.gtk.vfs.Metadata', '/org/gtk/vfs/metadata')
00341 #assert obj
00342 #dbus.Interface(obj, 'com.ubuntu.Test').Crash()
00343 #''')
00344 #
00345 #        pr = self._load_report()
00346 #        self.assertTrue('org.freedesktop.DBus.Error.NoReply' in pr['Traceback'], pr['Traceback'])
00347 #        self.assertTrue(pr['DbusErrorAnalysis'].startswith('provided by /usr/share/dbus-1/services/gvfs-metadata.service'),
00348 #                        pr['DbusErrorAnalysis'])
00349 #        self.assertTrue('gvfsd-metadata is not running' in pr['DbusErrorAnalysis'], pr['DbusErrorAnalysis'])
00350 
00351     def _load_report(self):
00352         '''Ensure that there is exactly one crash report and load it'''
00353 
00354         reports = apport.fileutils.get_new_reports()
00355         self.assertEqual(len(reports), 1, 'crashed Python program produced a report')
00356         pr = problem_report.ProblemReport()
00357         with open(reports[0], 'rb') as f:
00358             pr.load(f)
00359         return pr
00360 
00361 unittest.main()