Back to index

system-config-printer  1.3.9+20120706
PrintTestPage.py
Go to the documentation of this file.
00001 #!/usr/bin/python
00002 
00003 ## Printing troubleshooter
00004 
00005 ## Copyright (C) 2008, 2009, 2010 Red Hat, Inc.
00006 ## Authors:
00007 ##  Tim Waugh <twaugh@redhat.com>
00008 
00009 ## This program is free software; you can redistribute it and/or modify
00010 ## it under the terms of the GNU General Public License as published by
00011 ## the Free Software Foundation; either version 2 of the License, or
00012 ## (at your option) any later version.
00013 
00014 ## This program is distributed in the hope that it will be useful,
00015 ## but WITHOUT ANY WARRANTY; without even the implied warranty of
00016 ## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
00017 ## GNU General Public License for more details.
00018 
00019 ## You should have received a copy of the GNU General Public License
00020 ## along with this program; if not, write to the Free Software
00021 ## Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
00022 
00023 import cups
00024 import dbus
00025 import dbus.glib
00026 import gobject
00027 import os
00028 import pango
00029 import tempfile
00030 import time
00031 from timedops import TimedOperation, OperationCanceled
00032 
00033 from base import *
00034 
00035 import errordialogs
00036 from errordialogs import *
00037 
00038 DBUS_PATH="/com/redhat/PrinterSpooler"
00039 DBUS_IFACE="com.redhat.PrinterSpooler"
00040 class PrintTestPage(Question):
00041     STATE = { cups.IPP_JOB_PENDING: _("Pending"),
00042               cups.IPP_JOB_HELD: _("Held"),
00043               cups.IPP_JOB_PROCESSING: _("Processing"),
00044               cups.IPP_JOB_STOPPED: _("Stopped"),
00045               cups.IPP_JOB_CANCELED: _("Canceled"),
00046               cups.IPP_JOB_ABORTED: _("Aborted"),
00047               cups.IPP_JOB_COMPLETED: _("Completed") }
00048 
00049     def __init__ (self, troubleshooter):
00050         Question.__init__ (self, troubleshooter, "Print test page")
00051         page = gtk.VBox ()
00052         page.set_spacing (12)
00053         page.set_border_width (12)
00054 
00055         label = gtk.Label ()
00056         label.set_alignment (0, 0)
00057         label.set_use_markup (True)
00058         label.set_line_wrap (True)
00059         page.pack_start (label, False, False, 0)
00060         self.main_label = label
00061         self.main_label_text = ('<span weight="bold" size="larger">' +
00062                                 _("Test Page") + '</span>\n\n' +
00063                                 _("Now print a test page.  If you are having "
00064                                   "problems printing a specific document, "
00065                                   "print that document now and mark the print "
00066                                   "job below."))
00067 
00068         hbox = gtk.HButtonBox ()
00069         hbox.set_border_width (0)
00070         hbox.set_spacing (3)
00071         hbox.set_layout (gtk.BUTTONBOX_START)
00072         self.print_button = gtk.Button (_("Print Test Page"))
00073         hbox.pack_start (self.print_button, False, False, 0)
00074 
00075         self.cancel_button = gtk.Button (_("Cancel All Jobs"))
00076         hbox.pack_start (self.cancel_button, False, False, 0)
00077         page.pack_start (hbox, False, False, 0)
00078 
00079         tv = gtk.TreeView ()
00080         test_cell = gtk.CellRendererToggle ()
00081         test = gtk.TreeViewColumn (_("Test"), test_cell, active=0)
00082         job = gtk.TreeViewColumn (_("Job"), gtk.CellRendererText (), text=1)
00083         printer_cell = gtk.CellRendererText ()
00084         printer = gtk.TreeViewColumn (_("Printer"), printer_cell, text=2)
00085         name_cell = gtk.CellRendererText ()
00086         name = gtk.TreeViewColumn (_("Document"), name_cell, text=3)
00087         status = gtk.TreeViewColumn (_("Status"), gtk.CellRendererText (),
00088                                      text=4)
00089         test_cell.set_radio (False)
00090         self.test_cell = test_cell
00091         printer.set_resizable (True)
00092         printer_cell.set_property ("ellipsize", pango.ELLIPSIZE_END)
00093         printer_cell.set_property ("width-chars", 20)
00094         name.set_resizable (True)
00095         name_cell.set_property ("ellipsize", pango.ELLIPSIZE_END)
00096         name_cell.set_property ("width-chars", 20)
00097         status.set_resizable (True)
00098         tv.append_column (test)
00099         tv.append_column (job)
00100         tv.append_column (printer)
00101         tv.append_column (name)
00102         tv.append_column (status)
00103         tv.set_rules_hint (True)
00104         sw = gtk.ScrolledWindow ()
00105         sw.set_policy (gtk.POLICY_AUTOMATIC, gtk.POLICY_AUTOMATIC)
00106         sw.set_shadow_type (gtk.SHADOW_IN)
00107         sw.add (tv)
00108         self.treeview = tv
00109         page.pack_start (sw)
00110 
00111         label = gtk.Label (_("Did the marked print jobs print correctly?"))
00112         label.set_line_wrap (True)
00113         label.set_alignment (0, 0)
00114         page.pack_start (label, False, False, 0)
00115 
00116         vbox = gtk.VBox ()
00117         vbox.set_spacing (6)
00118         self.yes = gtk.RadioButton (label=_("Yes"))
00119         no = gtk.RadioButton (label=_("No"))
00120         no.set_group (self.yes)
00121         vbox.pack_start (self.yes, False, False, 0)
00122         vbox.pack_start (no, False, False, 0)
00123         page.pack_start (vbox, False, False, 0)
00124         self.persistent_answers = {}
00125         troubleshooter.new_page (page, self)
00126 
00127     def display (self):
00128         answers = self.troubleshooter.answers
00129         if not answers.has_key ('cups_queue'):
00130             return False
00131 
00132         parent = self.troubleshooter.get_window ()
00133         self.authconn = answers['_authenticated_connection']
00134         mediatype = None
00135         defaults = answers.get ('cups_printer_ppd_defaults', {})
00136         for opts in defaults.values ():
00137             for opt, value in opts.iteritems ():
00138                 if opt == "MediaType":
00139                     mediatype = value
00140                     break
00141 
00142         if mediatype != None:
00143             mediatype_string = '\n\n' + (_("Remember to load paper of type "
00144                                            "'%s' into the printer first.") %
00145                                          mediatype)
00146         else:
00147             mediatype_string = ""
00148 
00149         label_text = self.main_label_text + mediatype_string
00150         self.main_label.set_markup (label_text)
00151 
00152         model = gtk.ListStore (gobject.TYPE_BOOLEAN,
00153                                gobject.TYPE_INT,
00154                                gobject.TYPE_STRING,
00155                                gobject.TYPE_STRING,
00156                                gobject.TYPE_STRING)
00157         self.treeview.set_model (model)
00158         self.job_to_iter = {}
00159 
00160         test_jobs = self.persistent_answers.get ('test_page_job_id', [])
00161         def get_jobs ():
00162             c = self.authconn
00163             try:
00164                 r = ["job-id",
00165                      "job-name",
00166                      "job-state",
00167                      "job-printer-uri",
00168                      "printer-name"]
00169                 jobs_dict = c.getJobs (which_jobs='not-completed',
00170                                        my_jobs=False,
00171                                        requested_attributes=r)
00172                 completed_jobs_dict = c.getJobs (which_jobs='completed',
00173                                                  requested_attributes=r)
00174             except TypeError:
00175                 # requested_attributes requires pycups 1.9.50
00176                 jobs_dict = c.getJobs (which_jobs='not-completed',
00177                                        my_jobs=False)
00178                 completed_jobs_dict = c.getJobs (which_jobs='completed')
00179             return (jobs_dict, completed_jobs_dict)
00180 
00181         self.op = TimedOperation (get_jobs, parent=parent)
00182         try:
00183             (jobs_dict, completed_jobs_dict) = self.op.run ()
00184         except (OperationCanceled, cups.IPPError):
00185             return False
00186 
00187         # We want to display the jobs in the queue for this printer...
00188         try:
00189             queue_uri_ending = "/" + self.troubleshooter.answers['cups_queue']
00190             jobs_on_this_printer = filter (lambda x:
00191                                                jobs_dict[x]['job-printer-uri'].\
00192                                                endswith (queue_uri_ending),
00193                                            jobs_dict.keys ())
00194         except:
00195             jobs_on_this_printer = []
00196 
00197         # ...as well as any other jobs we've previous submitted as test pages.
00198         jobs = list (set(test_jobs).union (set (jobs_on_this_printer)))
00199 
00200         completed_jobs_dict = None
00201         for job in jobs:
00202             try:
00203                 j = jobs_dict[job]
00204             except KeyError:
00205                 try:
00206                     j = completed_jobs_dict[job]
00207                 except KeyError:
00208                     continue
00209 
00210             iter = model.append (None)
00211             self.job_to_iter[job] = iter
00212             model.set_value (iter, 0, job in test_jobs)
00213             model.set_value (iter, 1, job)
00214             self.update_job (job, j)
00215 
00216         return True
00217 
00218     def connect_signals (self, handler):
00219         self.print_sigid = self.print_button.connect ("clicked",
00220                                                       self.print_clicked)
00221         self.cancel_sigid = self.cancel_button.connect ("clicked",
00222                                                         self.cancel_clicked)
00223         self.test_sigid = self.test_cell.connect ('toggled',
00224                                                   self.test_toggled)
00225 
00226         def create_subscription ():
00227             c = self.authconn
00228             sub_id = c.createSubscription ("/",
00229                                            events=["job-created",
00230                                                    "job-completed",
00231                                                    "job-stopped",
00232                                                    "job-progress",
00233                                                    "job-state-changed"])
00234             return sub_id
00235 
00236         parent = self.troubleshooter.get_window ()
00237         self.op = TimedOperation (create_subscription, parent=parent)
00238         try:
00239             self.sub_id = self.op.run ()
00240         except (OperationCanceled, cups.IPPError):
00241             pass
00242 
00243         try:
00244             bus = dbus.SystemBus ()
00245         except:
00246             bus = None
00247 
00248         self.bus = bus
00249         if bus:
00250             bus.add_signal_receiver (self.handle_dbus_signal,
00251                                      path=DBUS_PATH,
00252                                      dbus_interface=DBUS_IFACE)
00253 
00254         self.timer = gobject.timeout_add_seconds (1, self.update_jobs_list)
00255 
00256     def disconnect_signals (self):
00257         if self.bus:
00258             self.bus.remove_signal_receiver (self.handle_dbus_signal,
00259                                              path=DBUS_PATH,
00260                                              dbus_interface=DBUS_IFACE)
00261                                              
00262         self.print_button.disconnect (self.print_sigid)
00263         self.cancel_button.disconnect (self.cancel_sigid)
00264         self.test_cell.disconnect (self.test_sigid)
00265 
00266         def cancel_subscription (sub_id):
00267             c = self.authconn
00268             c.cancelSubscription (sub_id)
00269 
00270         parent = self.troubleshooter.get_window ()
00271         self.op = TimedOperation (cancel_subscription,
00272                                   (self.sub_id,),
00273                                   parent=parent)
00274         try:
00275             self.op.run ()
00276         except (OperationCanceled, cups.IPPError):
00277             pass
00278 
00279         try:
00280             del self.sub_seq
00281         except:
00282             pass
00283 
00284         gobject.source_remove (self.timer)
00285 
00286     def collect_answer (self):
00287         if not self.displayed:
00288             return {}
00289 
00290         self.answers = self.persistent_answers.copy ()
00291         parent = self.troubleshooter.get_window ()
00292         success = self.yes.get_active ()
00293         self.answers['test_page_successful'] = success
00294 
00295         class collect_jobs:
00296             def __init__ (self, model):
00297                 self.jobs = []
00298                 model.foreach (self.each, None)
00299 
00300             def each (self, model, path, iter, user_data):
00301                 self.jobs.append (model.get (iter, 0, 1, 2, 3, 4))
00302 
00303         model = self.treeview.get_model ()
00304         jobs = collect_jobs (model).jobs
00305         def collect_attributes (jobs):
00306             job_attrs = None
00307             c = self.authconn
00308             with_attrs = []
00309             for (test, jobid, printer, doc, status) in jobs:
00310                 attrs = None
00311                 if test:
00312                     try:
00313                         attrs = c.getJobAttributes (jobid)
00314                     except AttributeError:
00315                         # getJobAttributes was introduced in pycups 1.9.35.
00316                         if job_attrs == None:
00317                             job_attrs = c.getJobs (which_jobs='all')
00318 
00319                         attrs = self.job_attrs[jobid]
00320 
00321                 with_attrs.append ((test, jobid, printer, doc, status, attrs))
00322 
00323             return with_attrs
00324 
00325         self.op = TimedOperation (collect_attributes,
00326                                   (jobs,),
00327                                   parent=parent)
00328         try:
00329             with_attrs = self.op.run ()
00330             self.answers['test_page_job_status'] = with_attrs
00331         except (OperationCanceled, cups.IPPError):
00332             pass
00333 
00334         return self.answers
00335 
00336     def cancel_operation (self):
00337         self.op.cancel ()
00338 
00339         # Abandon the CUPS connection and make another.
00340         answers = self.troubleshooter.answers
00341         factory = answers['_authenticated_connection_factory']
00342         self.authconn = factory.get_connection ()
00343         self.answers['_authenticated_connection'] = self.authconn
00344 
00345     def handle_dbus_signal (self, *args):
00346         debugprint ("D-Bus signal caught: updating jobs list soon")
00347         gobject.source_remove (self.timer)
00348         self.timer = gobject.timeout_add (200, self.update_jobs_list)
00349 
00350     def update_job (self, jobid, job_dict):
00351         iter = self.job_to_iter[jobid]
00352         model = self.treeview.get_model ()
00353         try:
00354             printer_name = job_dict['printer-name']
00355         except KeyError:
00356             try:
00357                 uri = job_dict['job-printer-uri']
00358                 r = uri.rfind ('/')
00359                 printer_name = uri[r + 1:]
00360             except KeyError:
00361                 printer_name = None
00362 
00363         if printer_name != None:
00364             model.set_value (iter, 2, printer_name)
00365 
00366         model.set_value (iter, 3, job_dict['job-name'])
00367         model.set_value (iter, 4, self.STATE[job_dict['job-state']])
00368 
00369     def print_clicked (self, widget):
00370         now = time.time ()
00371         tt = time.localtime (now)
00372         when = time.strftime ("%d/%b/%Y:%T %z", tt)
00373         self.persistent_answers['test_page_attempted'] = when
00374         answers = self.troubleshooter.answers
00375         parent = self.troubleshooter.get_window ()
00376 
00377         def print_test_page (*args, **kwargs):
00378             factory = answers['_authenticated_connection_factory']
00379             c = factory.get_connection ()
00380             return c.printTestPage (*args, **kwargs)
00381 
00382         tmpfname = None
00383         mimetypes = [None, 'text/plain']
00384         for mimetype in mimetypes:
00385             try:
00386                 if mimetype == None:
00387                     # Default test page.
00388                     self.op = TimedOperation (print_test_page,
00389                                               (answers['cups_queue'],),
00390                                               parent=parent)
00391                     jobid = self.op.run ()
00392                 elif mimetype == 'text/plain':
00393                     (tmpfd, tmpfname) = tempfile.mkstemp ()
00394                     os.write (tmpfd, "This is a test page.\n")
00395                     os.close (tmpfd)
00396                     self.op = TimedOperation (print_test_page,
00397                                               (answers['cups_queue'],),
00398                                               kwargs={'file': tmpfname,
00399                                                       'format': mimetype},
00400                                               parent=parent)
00401                     jobid = self.op.run ()
00402                     try:
00403                         os.unlink (tmpfname)
00404                     except OSError:
00405                         pass
00406 
00407                     tmpfname = None
00408 
00409                 jobs = self.persistent_answers.get ('test_page_job_id', [])
00410                 jobs.append (jobid)
00411                 self.persistent_answers['test_page_job_id'] = jobs
00412                 break
00413             except OperationCanceled:
00414                 self.persistent_answers['test_page_submit_failure'] = 'cancel'
00415                 break
00416             except RuntimeError:
00417                 self.persistent_answers['test_page_submit_failure'] = 'connect'
00418                 break
00419             except cups.IPPError, (e, s):
00420                 if (e == cups.IPP_DOCUMENT_FORMAT and
00421                     mimetypes.index (mimetype) < (len (mimetypes) - 1)):
00422                     # Try next format.
00423                     if tmpfname != None:
00424                         os.unlink (tmpfname)
00425                         tmpfname = None
00426                     continue
00427 
00428                 self.persistent_answers['test_page_submit_failure'] = (e, s)
00429                 show_error_dialog (_("Error submitting test page"),
00430                                    _("There was an error during the CUPS "
00431                                      "operation: '%s'.") % s,
00432                                    self.troubleshooter.get_window ())
00433                 break
00434 
00435     def cancel_clicked (self, widget):
00436         self.persistent_answers['test_page_jobs_cancelled'] = True
00437         jobids = []
00438         for jobid, iter in self.job_to_iter.iteritems ():
00439             jobids.append (jobid)
00440 
00441         def cancel_jobs (jobids):
00442             c = self.authconn
00443             for jobid in jobids:
00444                 try:
00445                     c.cancelJob (jobid)
00446                 except cups.IPPError, (e, s):
00447                     if e != cups.IPP_NOT_POSSIBLE:
00448                         self.persistent_answers['test_page_cancel_failure'] = (e, s)
00449 
00450         self.op = TimedOperation (cancel_jobs,
00451                                   (jobids,),
00452                                   parent=self.troubleshooter.get_window ())
00453         try:
00454             self.op.run ()
00455         except (OperationCanceled, cups.IPPError):
00456             pass
00457 
00458     def test_toggled (self, cell, path):
00459         model = self.treeview.get_model ()
00460         iter = model.get_iter (path)
00461         active = model.get_value (iter, 0)
00462         model.set_value (iter, 0, not active)
00463 
00464     def update_jobs_list (self):
00465         def get_notifications (self):
00466             c = self.authconn
00467             try:
00468                 notifications = c.getNotifications ([self.sub_id],
00469                                                     [self.sub_seq + 1])
00470             except AttributeError:
00471                 notifications = c.getNotifications ([self.sub_id])
00472 
00473             return notifications
00474 
00475         # Enter the GDK lock.  We need to do this because we were
00476         # called from a timeout.
00477         gtk.gdk.threads_enter ()
00478 
00479         parent = self.troubleshooter.get_window ()
00480         self.op = TimedOperation (get_notifications,
00481                                   (self,),
00482                                   parent=parent)
00483         try:
00484             notifications = self.op.run ()
00485         except (OperationCanceled, cups.IPPError):
00486             gtk.gdk.threads_leave ()
00487             return True
00488 
00489         answers = self.troubleshooter.answers
00490         model = self.treeview.get_model ()
00491         queue = answers['cups_queue']
00492         test_jobs = self.persistent_answers.get('test_page_job_id', [])
00493         for event in notifications['events']:
00494             seq = event['notify-sequence-number']
00495             try:
00496                 if seq <= self.sub_seq:
00497                     # Work around a bug in pycups < 1.9.34
00498                     continue
00499             except AttributeError:
00500                 pass
00501             self.sub_seq = seq
00502             job = event['notify-job-id']
00503 
00504             nse = event['notify-subscribed-event']
00505             if nse == 'job-created':
00506                 if (job in test_jobs or
00507                     event['printer-name'] == queue):
00508                     iter = model.append (None)
00509                     self.job_to_iter[job] = iter
00510                     model.set_value (iter, 0, True)
00511                     model.set_value (iter, 1, job)
00512                 else:
00513                     continue
00514             elif not self.job_to_iter.has_key (job):
00515                 continue
00516 
00517             if (job in test_jobs and
00518                 nse in ["job-stopped", "job-completed"]):
00519                 comp = self.persistent_answers.get ('test_page_completions', [])
00520                 comp.append ((job, event['notify-text']))
00521                 self.persistent_answers['test_page_completions'] = comp
00522 
00523             self.update_job (job, event)
00524 
00525         # Update again when we're told to. (But we might update sooner if
00526         # there is a D-Bus signal.)
00527         gobject.source_remove (self.timer)
00528         self.timer = gobject.timeout_add_seconds (
00529             notifications['notify-get-interval'],
00530             self.update_jobs_list)
00531         debugprint ("Update again in %ds" %
00532                     notifications['notify-get-interval'])
00533         gtk.gdk.threads_leave ()
00534         return False