Back to index

apport  2.3
test_signal_crashes.py
Go to the documentation of this file.
00001 # Copyright (C) 2007 - 2009 Canonical Ltd.
00002 # Author: Martin Pitt <martin.pitt@ubuntu.com>
00003 #
00004 # This program is free software; you can redistribute it and/or modify it
00005 # under the terms of the GNU General Public License as published by the
00006 # Free Software Foundation; either version 2 of the License, or (at your
00007 # option) any later version.  See http://www.gnu.org/copyleft/gpl.html for
00008 # the full text of the license.
00009 
00010 import tempfile, shutil, os, subprocess, signal, time, stat, sys
00011 import resource, errno, grp, unittest
00012 import apport.fileutils
00013 
00014 test_executable = '/usr/bin/yes'
00015 test_package = 'coreutils'
00016 test_source = 'coreutils'
00017 
00018 required_fields = ['ProblemType', 'CoreDump', 'Date', 'ExecutablePath',
00019                    'ProcCmdline', 'ProcEnviron', 'ProcMaps', 'Signal',
00020                    'UserGroups']
00021 
00022 ifpath = os.path.expanduser(apport.report._ignore_file)
00023 
00024 
00025 class T(unittest.TestCase):
00026     def setUp(self):
00027         # use local report dir
00028         self.report_dir = tempfile.mkdtemp()
00029         os.environ['APPORT_REPORT_DIR'] = self.report_dir
00030 
00031         self.workdir = tempfile.mkdtemp()
00032 
00033         # move aside current ignore file
00034         if os.path.exists(ifpath):
00035             os.rename(ifpath, ifpath + '.apporttest')
00036 
00037         # do not write core files by default
00038         resource.setrlimit(resource.RLIMIT_CORE, (0, -1))
00039 
00040         # that's the place where to put core dumps, etc.
00041         os.chdir('/tmp')
00042 
00043         # expected report name for test executable report
00044         self.test_report = os.path.join(
00045             apport.fileutils.report_dir, '%s.%i.crash' %
00046             (test_executable.replace('/', '_'), os.getuid()))
00047 
00048     def tearDown(self):
00049         shutil.rmtree(self.report_dir)
00050         shutil.rmtree(self.workdir)
00051 
00052         # clean up our ignore file
00053         if os.path.exists(ifpath):
00054             os.unlink(ifpath)
00055         orig_ignore_file = ifpath + '.apporttest'
00056         if os.path.exists(orig_ignore_file):
00057             os.rename(orig_ignore_file, ifpath)
00058 
00059         # permit tests to leave behind test_report, but nothing else
00060         if os.path.exists(self.test_report):
00061             apport.fileutils.delete_report(self.test_report)
00062         self.assertEqual(apport.fileutils.get_all_reports(), [])
00063 
00064     def test_empty_core_dump(self):
00065         '''empty core dumps do not generate a report'''
00066 
00067         test_proc = self.create_test_process()
00068         try:
00069             app = subprocess.Popen([apport_path, str(test_proc), '42', '0'],
00070                                    stdin=subprocess.PIPE, stderr=subprocess.PIPE)
00071             app.stdin.close()
00072             assert app.wait() == 0, app.stderr.read()
00073         finally:
00074             os.kill(test_proc, 9)
00075             os.waitpid(test_proc, 0)
00076 
00077         self.assertEqual(self.get_temp_all_reports(), [])
00078 
00079     def test_crash_apport(self):
00080         '''report generation with apport'''
00081 
00082         self.do_crash()
00083 
00084         # check crash report
00085         self.assertEqual(apport.fileutils.get_all_reports(), [self.test_report])
00086         st = os.stat(self.test_report)
00087         self.assertEqual(stat.S_IMODE(st.st_mode), 0o640, 'report has correct permissions')
00088 
00089         # a subsequent crash does not alter unseen report
00090         self.do_crash()
00091         st2 = os.stat(self.test_report)
00092         self.assertEqual(st, st2, 'original unseen report did not get overwritten')
00093 
00094         # a subsequent crash alters seen report
00095         apport.fileutils.mark_report_seen(self.test_report)
00096         self.do_crash()
00097         st2 = os.stat(self.test_report)
00098         self.assertNotEqual(st, st2, 'original seen report gets overwritten')
00099 
00100         pr = apport.Report()
00101         with open(self.test_report, 'rb') as f:
00102             pr.load(f)
00103         self.assertTrue(set(required_fields).issubset(set(pr.keys())),
00104                         'report has required fields')
00105         self.assertEqual(pr['ExecutablePath'], test_executable)
00106         self.assertEqual(pr['ProcCmdline'], test_executable)
00107         self.assertEqual(pr['Signal'], '%i' % signal.SIGSEGV)
00108 
00109         # check safe environment subset
00110         allowed_vars = ['SHELL', 'PATH', 'LANGUAGE', 'LANG', 'LC_CTYPE',
00111                         'LC_COLLATE', 'LC_TIME', 'LC_NUMERIC', 'LC_MONETARY',
00112                         'LC_MESSAGES', 'LC_PAPER', 'LC_NAME', 'LC_ADDRESS',
00113                         'LC_TELEPHONE', 'LC_MEASUREMENT', 'LC_IDENTIFICATION',
00114                         'LOCPATH', 'TERM']
00115 
00116         for l in pr['ProcEnviron'].splitlines():
00117             (k, v) = l.split('=', 1)
00118             self.assertTrue(k in allowed_vars,
00119                             'report contains sensitive environment variable %s' % k)
00120 
00121         # UserGroups only has system groups
00122         for g in pr['UserGroups'].split():
00123             self.assertLess(grp.getgrnam(g).gr_gid, 500)
00124 
00125         self.assertFalse('root' in pr['UserGroups'],
00126                          'collected system groups are not those from root')
00127 
00128     def test_parallel_crash(self):
00129         '''only one apport instance is ran at a time'''
00130 
00131         test_proc = self.create_test_process()
00132         test_proc2 = self.create_test_process(False, '/bin/dd')
00133         try:
00134             app = subprocess.Popen([apport_path, str(test_proc), '42', '0'],
00135                                    stdin=subprocess.PIPE, stderr=subprocess.PIPE)
00136 
00137             time.sleep(0.5)  # give it some time to grab the lock
00138 
00139             app2 = subprocess.Popen([apport_path, str(test_proc2), '42', '0'],
00140                                     stdin=subprocess.PIPE, stderr=subprocess.PIPE)
00141 
00142             # app should wait indefinitely for stdin, while app2 should terminate
00143             # immediately (give it 5 seconds)
00144             timeout = 50
00145             while timeout >= 0:
00146                 if app2.poll():
00147                     break
00148 
00149                 time.sleep(0.1)
00150                 timeout -= 1
00151 
00152             self.assertGreater(timeout, 0, 'second apport instance terminates immediately')
00153             self.assertFalse(app.poll(), 'first apport instance is still running')
00154 
00155             # properly terminate app and app2
00156             app2.stdin.close()
00157             app.stdin.write(b'boo')
00158             app.stdin.close()
00159 
00160             self.assertEqual(app.wait(), 0, app.stderr.read())
00161         finally:
00162             os.kill(test_proc, 9)
00163             os.waitpid(test_proc, 0)
00164             os.kill(test_proc2, 9)
00165             os.waitpid(test_proc2, 0)
00166 
00167     def test_lock_symlink(self):
00168         '''existing .lock file as dangling symlink does not create the file
00169 
00170         This would be a vulnerability, as users could overwrite system files.
00171         '''
00172         # prepare a symlink trap
00173         lockpath = os.path.join(self.report_dir, '.lock')
00174         trappath = os.path.join(self.report_dir, '0wned')
00175         os.symlink(trappath, lockpath)
00176 
00177         # now call apport
00178         test_proc = self.create_test_process()
00179 
00180         try:
00181             app = subprocess.Popen([apport_path, str(test_proc), '42', '0'],
00182                                    stdin=subprocess.PIPE, stderr=subprocess.PIPE)
00183             app.stdin.write(b'boo')
00184             app.stdin.close()
00185 
00186             self.assertNotEqual(app.wait(), 0, app.stderr.read())
00187         finally:
00188             os.kill(test_proc, 9)
00189             os.waitpid(test_proc, 0)
00190 
00191         self.assertEqual(self.get_temp_all_reports(), [])
00192         self.assertFalse(os.path.exists(trappath))
00193 
00194     def test_unpackaged_binary(self):
00195         '''unpackaged binaries do not create a report'''
00196 
00197         local_exe = os.path.join(self.workdir, 'mybin')
00198         with open(local_exe, 'wb') as dest:
00199             with open(test_executable, 'rb') as src:
00200                 dest.write(src.read())
00201         os.chmod(local_exe, 0o755)
00202         self.do_crash(command=local_exe)
00203         self.assertEqual(apport.fileutils.get_all_reports(), [])
00204 
00205     def test_unpackaged_script(self):
00206         '''unpackaged scripts do not create a report'''
00207 
00208         local_exe = os.path.join(self.workdir, 'myscript')
00209         with open(local_exe, 'w') as f:
00210             f.write('#!/bin/sh\nkill -SEGV $$')
00211         os.chmod(local_exe, 0o755)
00212         self.do_crash(command=local_exe)
00213 
00214         # absolute path
00215         self.assertEqual(apport.fileutils.get_all_reports(), [])
00216 
00217         # relative path
00218         os.chdir(self.workdir)
00219         self.do_crash(command='./myscript')
00220         self.assertEqual(apport.fileutils.get_all_reports(), [])
00221 
00222     def test_ignore_sigquit(self):
00223         '''apport ignores SIGQUIT'''
00224 
00225         self.do_crash(sig=signal.SIGQUIT)
00226         self.assertEqual(apport.fileutils.get_all_reports(), [])
00227 
00228     def test_leak_inaccessible_files(self):
00229         '''existence of user-inaccessible files does not leak'''
00230 
00231         local_exe = os.path.join(self.workdir, 'myscript')
00232         with open(local_exe, 'w') as f:
00233             f.write('#!/usr/bin/perl\nsystem("mv $0 $0.exe");\nsystem("ln -sf /etc/shadow $0");\n$0="..$0";\nsleep(10);\n')
00234         os.chmod(local_exe, 0o755)
00235         self.do_crash(check_running=False, command=local_exe, sleep=2)
00236 
00237         leak = os.path.join(apport.fileutils.report_dir, '_usr_bin_perl.%i.crash' %
00238                             (os.getuid()))
00239         pr = apport.Report()
00240         with open(leak, 'rb') as f:
00241             pr.load(f)
00242         # On a leak, no report is created since the executable path will be replaced
00243         # by the symlink path, and it doesn't belong to any package.
00244         self.assertEqual(pr['ExecutablePath'], '/usr/bin/perl')
00245         self.assertFalse('InterpreterPath' in pr)
00246         apport.fileutils.delete_report(leak)
00247 
00248     def test_flood_limit(self):
00249         '''limitation of crash report flood'''
00250 
00251         count = 0
00252         while count < 7:
00253             sys.stderr.write('%i ' % count)
00254             sys.stderr.flush()
00255             self.do_crash()
00256             reports = apport.fileutils.get_new_reports()
00257             if not reports:
00258                 break
00259             apport.fileutils.mark_report_seen(self.test_report)
00260             count += 1
00261         self.assertGreater(count, 1, 'gets at least 2 repeated crashes')
00262         self.assertLess(count, 7, 'stops flooding after less than 7 repeated crashes')
00263 
00264     def test_nonwritable_cwd(self):
00265         '''core dump works for non-writable cwd'''
00266 
00267         os.chdir('/')
00268         self.do_crash()
00269         pr = apport.Report()
00270         self.assertTrue(os.path.exists(self.test_report))
00271         with open(self.test_report, 'rb') as f:
00272             pr.load(f)
00273         assert set(required_fields).issubset(set(pr.keys()))
00274 
00275     def test_core_dump_packaged(self):
00276         '''packaged executables create core dumps on proper ulimits'''
00277 
00278         # for SIGSEGV
00279         resource.setrlimit(resource.RLIMIT_CORE, (1, -1))
00280         self.do_crash(expect_coredump=False, expect_corefile=False)
00281         self.assertEqual(apport.fileutils.get_all_reports(), [])
00282         resource.setrlimit(resource.RLIMIT_CORE, (10, -1))
00283         self.do_crash(expect_corefile=False)
00284         self.assertEqual(apport.fileutils.get_all_reports(), [self.test_report])
00285         self.check_report_coredump(self.test_report)
00286         apport.fileutils.delete_report(self.test_report)
00287         resource.setrlimit(resource.RLIMIT_CORE, (10000, -1))
00288         self.do_crash(expect_corefile=True)
00289         self.assertEqual(apport.fileutils.get_all_reports(), [self.test_report])
00290         self.check_report_coredump(self.test_report)
00291         apport.fileutils.delete_report(self.test_report)
00292         resource.setrlimit(resource.RLIMIT_CORE, (-1, -1))
00293         self.do_crash(expect_corefile=True)
00294         self.assertEqual(apport.fileutils.get_all_reports(), [self.test_report])
00295         self.check_report_coredump(self.test_report)
00296         apport.fileutils.delete_report(self.test_report)
00297 
00298         # for SIGABRT
00299         resource.setrlimit(resource.RLIMIT_CORE, (1, -1))
00300         self.do_crash(expect_coredump=False, expect_corefile=False, sig=signal.SIGABRT)
00301         self.assertEqual(apport.fileutils.get_all_reports(), [])
00302         resource.setrlimit(resource.RLIMIT_CORE, (10, -1))
00303         self.do_crash(expect_corefile=False, sig=signal.SIGABRT)
00304         self.assertEqual(apport.fileutils.get_all_reports(), [self.test_report])
00305         apport.fileutils.delete_report(self.test_report)
00306         resource.setrlimit(resource.RLIMIT_CORE, (10000, -1))
00307         self.do_crash(expect_corefile=True, sig=signal.SIGABRT)
00308         self.assertEqual(apport.fileutils.get_all_reports(), [self.test_report])
00309         apport.fileutils.delete_report(self.test_report)
00310         resource.setrlimit(resource.RLIMIT_CORE, (-1, -1))
00311         self.do_crash(expect_corefile=True, sig=signal.SIGABRT)
00312         self.assertEqual(apport.fileutils.get_all_reports(), [self.test_report])
00313 
00314         # creates core file with existing crash report, too
00315         self.do_crash(expect_corefile=True)
00316 
00317     def test_core_dump_unpackaged(self):
00318         '''unpackaged executables create core dumps on proper ulimits'''
00319 
00320         local_exe = os.path.join(self.workdir, 'mybin')
00321         with open(local_exe, 'wb') as dest:
00322             with open(test_executable, 'rb') as src:
00323                 dest.write(src.read())
00324         os.chmod(local_exe, 0o755)
00325 
00326         # for SIGSEGV
00327         resource.setrlimit(resource.RLIMIT_CORE, (1, -1))
00328         self.do_crash(expect_coredump=False, expect_corefile=False, command=local_exe)
00329         self.assertEqual(apport.fileutils.get_all_reports(), [])
00330         resource.setrlimit(resource.RLIMIT_CORE, (10, -1))
00331         self.assertEqual(apport.fileutils.get_all_reports(), [])
00332         resource.setrlimit(resource.RLIMIT_CORE, (10000, -1))
00333         self.do_crash(expect_corefile=True, command=local_exe)
00334         self.assertEqual(apport.fileutils.get_all_reports(), [])
00335         resource.setrlimit(resource.RLIMIT_CORE, (-1, -1))
00336         self.do_crash(expect_corefile=True, command=local_exe)
00337         self.assertEqual(apport.fileutils.get_all_reports(), [])
00338 
00339         # for SIGABRT
00340         resource.setrlimit(resource.RLIMIT_CORE, (1, -1))
00341         self.do_crash(expect_coredump=False, expect_corefile=False, command=local_exe, sig=signal.SIGABRT)
00342         self.assertEqual(apport.fileutils.get_all_reports(), [])
00343         resource.setrlimit(resource.RLIMIT_CORE, (10, -1))
00344         self.do_crash(expect_corefile=False, command=local_exe, sig=signal.SIGABRT)
00345         self.assertEqual(apport.fileutils.get_all_reports(), [])
00346         resource.setrlimit(resource.RLIMIT_CORE, (10000, -1))
00347         self.do_crash(expect_corefile=True, command=local_exe, sig=signal.SIGABRT)
00348         self.assertEqual(apport.fileutils.get_all_reports(), [])
00349         resource.setrlimit(resource.RLIMIT_CORE, (-1, -1))
00350         self.do_crash(expect_corefile=True, command=local_exe, sig=signal.SIGABRT)
00351         self.assertEqual(apport.fileutils.get_all_reports(), [])
00352 
00353     def test_limit_size(self):
00354         '''core dumps are capped on available memory size'''
00355 
00356         # determine how much data we have to pump into apport in order to make sure
00357         # that it will refuse the core dump
00358         r = apport.Report()
00359         with open('/proc/meminfo', 'rb') as f:
00360             r.load(f)
00361         totalmb = int(r['MemFree'].split()[0]) + int(r['Cached'].split()[0])
00362         totalmb = int(totalmb / 1024)
00363         r = None
00364 
00365         test_proc = self.create_test_process()
00366         try:
00367             app = subprocess.Popen([apport_path, str(test_proc), '42', '0'],
00368                                    stdin=subprocess.PIPE, stderr=subprocess.PIPE)
00369             # pipe an entire total memory size worth of spaces into it, which must be
00370             # bigger than the 'usable' memory size. apport should digest that and the
00371             # report should not have a core dump; NB that this should error out
00372             # with a SIGPIPE when apport aborts reading from stdin
00373             onemb = b' ' * 1048576
00374             while totalmb > 0:
00375                 if totalmb & 31 == 0:
00376                     sys.stderr.write('.')
00377                     sys.stderr.flush()
00378                 try:
00379                     app.stdin.write(onemb)
00380                 except IOError as e:
00381                     if e.errno == errno.EPIPE:
00382                         break
00383                     else:
00384                         raise
00385                 totalmb -= 1
00386             app.stdin.close()
00387             self.assertEqual(app.wait(), 0, app.stderr.read())
00388             onemb = None
00389         finally:
00390             os.kill(test_proc, 9)
00391             os.waitpid(test_proc, 0)
00392 
00393         reports = self.get_temp_all_reports()
00394         self.assertEqual(len(reports), 1)
00395 
00396         pr = apport.Report()
00397         with open(reports[0], 'rb') as f:
00398             pr.load(f)
00399         os.unlink(reports[0])
00400 
00401         self.assertEqual(pr['Signal'], '42')
00402         self.assertEqual(pr['ExecutablePath'], test_executable)
00403         self.assertFalse('CoreDump' in pr)
00404 
00405     def test_ignore(self):
00406         '''ignoring executables'''
00407 
00408         self.do_crash()
00409 
00410         reports = apport.fileutils.get_all_reports()
00411         self.assertEqual(len(reports), 1)
00412 
00413         pr = apport.Report()
00414         with open(reports[0], 'rb') as f:
00415             pr.load(f)
00416         os.unlink(reports[0])
00417 
00418         pr.mark_ignore()
00419 
00420         self.do_crash()
00421         self.assertEqual(apport.fileutils.get_all_reports(), [])
00422 
00423     def test_modify_after_start(self):
00424         '''ignores executables which got modified after process started'''
00425 
00426         # create executable in a path we can modify which apport regards as
00427         # likely packaged
00428         (fd, myexe) = tempfile.mkstemp(dir='/var/tmp')
00429         try:
00430             with open(test_executable, 'rb') as f:
00431                 os.write(fd, f.read())
00432             os.close(fd)
00433             os.chmod(myexe, 0o755)
00434             time.sleep(1)
00435 
00436             try:
00437                 test_proc = self.create_test_process(command=myexe)
00438 
00439                 # bump mtime of myexe to make it more recent than process start
00440                 # time; ensure this works with file systems with only second
00441                 # resolution
00442                 time.sleep(1.1)
00443                 os.utime(myexe, None)
00444 
00445                 app = subprocess.Popen([apport_path, str(test_proc), '42', '0'],
00446                                        stdin=subprocess.PIPE, stderr=subprocess.PIPE)
00447                 app.stdin.write(b'boo')
00448                 app.stdin.close()
00449                 err = app.stderr.read().decode()
00450                 self.assertNotEqual(app.wait(), 0, err)
00451             finally:
00452                 os.kill(test_proc, 9)
00453                 os.waitpid(test_proc, 0)
00454 
00455             self.assertEqual(self.get_temp_all_reports(), [])
00456         finally:
00457             os.unlink(myexe)
00458 
00459     def test_logging_file(self):
00460         '''outputs to log file, if available'''
00461 
00462         test_proc = self.create_test_process()
00463         log = os.path.join(self.workdir, 'apport.log')
00464         try:
00465             env = os.environ.copy()
00466             env['APPORT_LOG_FILE'] = log
00467             app = subprocess.Popen([apport_path, str(test_proc), '42', '0'],
00468                                    stdin=subprocess.PIPE, env=env,
00469                                    stdout=subprocess.PIPE, stderr=subprocess.PIPE,
00470                                    universal_newlines=True)
00471             (out, err) = app.communicate(b'hel\x01lo')
00472         finally:
00473             os.kill(test_proc, 9)
00474             os.waitpid(test_proc, 0)
00475 
00476         self.assertEqual(out, '')
00477         self.assertEqual(err, '')
00478         self.assertEqual(app.returncode, 0, err)
00479         with open(log) as f:
00480             logged = f.read()
00481         self.assertTrue('called for pid' in logged, logged)
00482         self.assertTrue('wrote report' in logged, logged)
00483         self.assertFalse('Traceback' in logged, logged)
00484 
00485         reports = self.get_temp_all_reports()
00486         self.assertEqual(len(reports), 1)
00487 
00488         pr = apport.Report()
00489         with open(reports[0], 'rb') as f:
00490             pr.load(f)
00491         os.unlink(reports[0])
00492 
00493         self.assertEqual(pr['Signal'], '42')
00494         self.assertEqual(pr['ExecutablePath'], test_executable)
00495         self.assertEqual(pr['CoreDump'], b'hel\x01lo')
00496 
00497     def test_logging_stderr(self):
00498         '''outputs to stderr if log is not available'''
00499 
00500         test_proc = self.create_test_process()
00501         try:
00502             env = os.environ.copy()
00503             env['APPORT_LOG_FILE'] = '/not/existing/apport.log'
00504             app = subprocess.Popen([apport_path, str(test_proc), '42', '0'],
00505                                    stdin=subprocess.PIPE, env=env,
00506                                    stdout=subprocess.PIPE, stderr=subprocess.PIPE,
00507                                    universal_newlines=True)
00508             (out, err) = app.communicate(b'hel\x01lo')
00509         finally:
00510             os.kill(test_proc, 9)
00511             os.waitpid(test_proc, 0)
00512 
00513         self.assertEqual(out, '')
00514         self.assertEqual(app.returncode, 0, err)
00515         self.assertTrue('called for pid' in err, err)
00516         self.assertTrue('wrote report' in err, err)
00517         self.assertFalse('Traceback' in err, err)
00518 
00519         reports = self.get_temp_all_reports()
00520         self.assertEqual(len(reports), 1)
00521 
00522         pr = apport.Report()
00523         with open(reports[0], 'rb') as f:
00524             pr.load(f)
00525         os.unlink(reports[0])
00526 
00527         self.assertEqual(pr['Signal'], '42')
00528         self.assertEqual(pr['ExecutablePath'], test_executable)
00529         self.assertEqual(pr['CoreDump'], b'hel\x01lo')
00530 
00531     #
00532     # Helper methods
00533     #
00534 
00535     @classmethod
00536     def create_test_process(klass, check_running=True, command=test_executable):
00537         '''Spawn test_executable.
00538 
00539         Wait until it is fully running, and return its PID.
00540         '''
00541         assert os.access(command, os.X_OK), command + ' is not executable'
00542         if check_running:
00543             assert subprocess.call(['pidof', command]) == 1, 'no running test executable processes'
00544         pid = os.fork()
00545         if pid == 0:
00546             os.dup2(os.open('/dev/null', os.O_WRONLY), sys.stdout.fileno())
00547             sys.stdin.close()
00548             os.setsid()
00549             os.execv(command, [command])
00550             assert False, 'Could not execute ' + command
00551 
00552         # wait until child process has execv()ed properly
00553         while True:
00554             with open('/proc/%i/cmdline' % pid) as f:
00555                 cmdline = f.read()
00556             if 'test_signal' in cmdline:
00557                 time.sleep(0.1)
00558             else:
00559                 break
00560 
00561         time.sleep(0.3)  # needs some more setup time
00562         return pid
00563 
00564     def do_crash(self, expect_coredump=True, expect_corefile=False,
00565                  sig=signal.SIGSEGV, check_running=True, sleep=0,
00566                  command=test_executable):
00567         '''Generate a test crash.
00568 
00569         This runs command (by default test_executable) in /tmp, lets it crash,
00570         and checks that it exits with the expected return code, leaving a core
00571         file behind if expect_corefile is set, and generating a crash report if
00572         expect_coredump is set.
00573 
00574         If check_running is set (default), this will abort if test_process is
00575         already running.
00576         '''
00577         self.assertFalse(os.path.exists('core'), '/tmp/core already exists, please clean up first')
00578         pid = self.create_test_process(check_running, command)
00579         if sleep > 0:
00580             time.sleep(sleep)
00581         os.kill(pid, sig)
00582         result = os.waitpid(pid, 0)[1]
00583         self.assertFalse(os.WIFEXITED(result), 'test process did not exit normally')
00584         self.assertTrue(os.WIFSIGNALED(result), 'test process died due to signal')
00585         self.assertEqual(os.WCOREDUMP(result), expect_coredump)
00586         self.assertEqual(os.WSTOPSIG(result), 0, 'test process was not signaled to stop')
00587         self.assertEqual(os.WTERMSIG(result), sig, 'test process died due to proper signal')
00588 
00589         # wait max 10 seconds for apport to finish
00590         timeout = 50
00591         while timeout >= 0:
00592             if subprocess.call(['pidof', '-x', 'apport'], stdout=subprocess.PIPE) != 0:
00593                 break
00594             time.sleep(0.2)
00595             timeout -= 1
00596         self.assertGreater(timeout, 0)
00597         if check_running:
00598             self.assertEqual(subprocess.call(['pidof', command]), 1,
00599                              'no running test executable processes')
00600 
00601         if expect_corefile:
00602             self.assertTrue(os.path.exists('/tmp/core'), 'leaves wanted core file')
00603             try:
00604                 # check that core file is valid
00605                 gdb = subprocess.Popen(['gdb', '--batch', '--ex', 'bt',
00606                                         command, '/tmp/core'],
00607                                        stdout=subprocess.PIPE,
00608                                        stderr=subprocess.PIPE)
00609                 (out, err) = gdb.communicate()
00610                 self.assertEqual(gdb.returncode, 0)
00611                 out = out.decode()
00612                 err = err.decode().strip()
00613                 self.assertTrue(err == '' or err.startswith('warning'), err)
00614             finally:
00615                 os.unlink('/tmp/core')
00616         else:
00617             if os.path.exists('/tmp/core'):
00618                 os.unlink('/tmp/core')
00619                 self.fail('leaves unexpected core file behind')
00620 
00621     def get_temp_all_reports(self):
00622         '''Call apport.fileutils.get_all_reports() for our temp dir'''
00623 
00624         old_dir = apport.fileutils.report_dir
00625         apport.fileutils.report_dir = self.report_dir
00626         reports = apport.fileutils.get_all_reports()
00627         apport.fileutils.report_dir = old_dir
00628         return reports
00629 
00630     def check_report_coredump(self, report_path):
00631         '''Check that given report file has a valid core dump'''
00632 
00633         r = apport.Report()
00634         with open(report_path, 'rb') as f:
00635             r.load(f)
00636         self.assertTrue('CoreDump' in r)
00637         self.assertGreater(len(r['CoreDump']), 5000)
00638         r.add_gdb_info()
00639         self.assertTrue('\n#2' in r.get('Stacktrace', ''),
00640                         r.get('Stacktrace', 'no Stacktrace field'))
00641 
00642 #
00643 # main
00644 #
00645 
00646 with open('/proc/sys/kernel/core_pattern') as f:
00647     core_pattern = f.read().strip()
00648 if core_pattern[0] != '|':
00649     sys.stderr.write('kernel crash dump helper is not active; please enable before running this test.\n')
00650     sys.exit(0)
00651 apport_path = core_pattern[1:].split()[0]
00652 
00653 if apport.fileutils.get_all_reports():
00654     sys.stderr.write('Please remove all crash reports from /var/crash/ for this test suite:\n  %s\n' %
00655                      '\n  '.join(os.listdir('/var/crash')))
00656     sys.exit(1)
00657 
00658 unittest.main()