Back to index

lightning-sunbird  0.9+nobinonly
file.py
Go to the documentation of this file.
00001 # ***** BEGIN LICENSE BLOCK *****
00002 # Version: MPL 1.1/GPL 2.0/LGPL 2.1
00003 #
00004 # The contents of this file are subject to the Mozilla Public License Version
00005 # 1.1 (the "License"); you may not use this file except in compliance with
00006 # the License. You may obtain a copy of the License at
00007 # http://www.mozilla.org/MPL/
00008 #
00009 # Software distributed under the License is distributed on an "AS IS" basis,
00010 # WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License
00011 # for the specific language governing rights and limitations under the
00012 # License.
00013 #
00014 # The Original Code is the Python XPCOM language bindings.
00015 #
00016 # The Initial Developer of the Original Code is
00017 # ActiveState Tool Corp.
00018 # Portions created by the Initial Developer are Copyright (C) 2000, 2001
00019 # the Initial Developer. All Rights Reserved.
00020 #
00021 # Contributor(s):
00022 #   Mark Hammond <MarkH@ActiveState.com> (original author)
00023 #
00024 # Alternatively, the contents of this file may be used under the terms of
00025 # either the GNU General Public License Version 2 or later (the "GPL"), or
00026 # the GNU Lesser General Public License Version 2.1 or later (the "LGPL"),
00027 # in which case the provisions of the GPL or the LGPL are applicable instead
00028 # of those above. If you wish to allow use of your version of this file only
00029 # under the terms of either the GPL or the LGPL, and not to allow others to
00030 # use your version of this file under the terms of the MPL, indicate your
00031 # decision by deleting the provisions above and replace them with the notice
00032 # and other provisions required by the GPL or the LGPL. If you do not delete
00033 # the provisions above, a recipient may use your version of this file under
00034 # the terms of any one of the MPL, the GPL or the LGPL.
00035 #
00036 # ***** END LICENSE BLOCK *****
00037 
00038 """Implementation of Python file objects for Mozilla/xpcom.
00039 
00040 Introduction:
00041   This module defines various class that are implemented using 
00042   Mozilla streams.  This allows you to open Mozilla URI's, and
00043   treat them as Python file object.
00044   
00045 Example:
00046 >>> file = URIFile("chrome://whatever")
00047 >>> data = file.read(5) # Pass no arg to read everything.
00048 
00049 Known Limitations:
00050  * Not all URL schemes will work from "python.exe" - most notably
00051    "chrome://" and "http://" URLs - this is because a simple initialization of
00052    xpcom by Python does not load up the full set of Mozilla URL handlers.
00053    If you can work out how to correctly initialize the chrome registry and
00054    setup a message queue.
00055 
00056 Known Bugs:
00057   * Only read ("r") mode is supported.  Although write ("w") mode doesnt make
00058     sense for HTTP type URLs, it potentially does for file:// etc type ones.
00059   * No concept of text mode vs binary mode.  It appears Mozilla takes care of
00060     this internally (ie, all "text/???" mime types are text, rest are binary)
00061 
00062 """
00063 
00064 from xpcom import components, Exception, _xpcom
00065 import os
00066 import threading # for locks.
00067 
00068 NS_RDONLY        = 0x01
00069 NS_WRONLY        = 0x02
00070 NS_RDWR          = 0x04
00071 NS_CREATE_FILE   = 0x08
00072 NS_APPEND        = 0x10
00073 NS_TRUNCATE      = 0x20
00074 NS_SYNC          = 0x40
00075 NS_EXCL          = 0x80
00076 
00077 # A helper function that may come in useful
00078 def LocalFileToURL(localFileName):
00079     "Convert a filename to an XPCOM nsIFileURL object."
00080     # Create an nsILocalFile
00081     localFile = components.classes["@mozilla.org/file/local;1"] \
00082           .createInstance(components.interfaces.nsILocalFile)
00083     localFile.initWithPath(localFileName)
00084 
00085     # Use the IO Service to create the interface, then QI for a FileURL
00086     io_service = components.classes["@mozilla.org/network/io-service;1"] \
00087                     .getService(components.interfaces.nsIIOService)
00088     url = io_service.newFileURI(localFile).queryInterface(components.interfaces.nsIFileURL)
00089     # Setting the "file" attribute causes initialization...
00090     url.file = localFile
00091     return url
00092 
00093 # A base class for file objects.
00094 class _File:
00095     def __init__(self, name_thingy = None, mode="r"):
00096         self.lockob = threading.Lock()
00097         self.inputStream = self.outputStream = None
00098         if name_thingy is not None:
00099                 self.init(name_thingy, mode)
00100 
00101     def __del__(self):
00102         self.close()
00103 
00104     # The Moz file streams are not thread safe.
00105     def _lock(self):
00106         self.lockob.acquire()
00107     def _release(self):
00108         self.lockob.release()
00109     def read(self, n = -1):
00110         assert self.inputStream is not None, "Not setup for read!"
00111         self._lock()
00112         try:
00113             return str(self.inputStream.read(n))
00114         finally:
00115             self._release()
00116 
00117     def readlines(self):
00118         # Not part of the xpcom interface, but handy for direct Python users.
00119         # Not 100% faithful, but near enough for now!
00120         lines = self.read().split("\n")
00121         if len(lines) and len(lines[-1]) == 0:
00122             lines = lines[:-1]
00123         return [s+"\n" for s in lines ]
00124 
00125     def write(self, data):
00126         assert self.outputStream is not None, "Not setup for write!"
00127         self._lock()
00128         try:
00129             self.outputStream.write(data, len(data))
00130         finally:
00131             self._release()
00132 
00133     def close(self):
00134         self._lock()
00135         try:
00136             if self.inputStream is not None:
00137                 self.inputStream.close()
00138                 self.inputStream = None
00139             if self.outputStream is not None:
00140                 self.outputStream.close()
00141                 self.outputStream = None
00142             self.channel = None
00143         finally:
00144             self._release()
00145 
00146     def flush(self):
00147         self._lock()
00148         try:
00149             if self.outputStream is not None: self.outputStream.flush()
00150         finally:
00151             self._release()
00152 
00153 # A synchronous "file object" used to open a URI.
00154 class URIFile(_File):
00155     def init(self, url, mode="r"):
00156         self.close()
00157         if mode != "r":
00158             raise ValueError, "only 'r' mode supported'"
00159         io_service = components.classes["@mozilla.org/network/io-service;1"] \
00160                         .getService(components.interfaces.nsIIOService)
00161         if hasattr(url, "queryInterface"):
00162             url_ob = url
00163         else:
00164             url_ob = io_service.newURI(url, None, None)
00165         # Mozilla asserts and starts saying "NULL POINTER" if this is wrong!
00166         if not url_ob.scheme:
00167             raise ValueError, ("The URI '%s' is invalid (no scheme)" 
00168                                   % (url_ob.spec,))
00169         self.channel = io_service.newChannelFromURI(url_ob)
00170         self.inputStream = self.channel.open()
00171 
00172 # A "file object" implemented using Netscape's native file support.
00173 # Based on io.js - http://lxr.mozilla.org/seamonkey/source/xpcom/tests/utils/io.js
00174 # You open this file using a local file name (as a string) so it really is pointless - 
00175 # you may as well be using a standard Python file object!
00176 class LocalFile(_File):
00177     def __init__(self, *args):
00178         self.fileIO = None
00179         _File.__init__(self, *args)
00180 
00181     def init(self, name, mode = "r"):
00182         name = os.path.abspath(name) # Moz libraries under Linux fail with relative paths.
00183         self.close()
00184         file = components.classes['@mozilla.org/file/local;1'].createInstance("nsILocalFile")
00185         file.initWithPath(name)
00186         if mode in ["w","a"]:
00187             self.fileIO = components.classes["@mozilla.org/network/file-output-stream;1"].createInstance("nsIFileOutputStream")
00188             if mode== "w":
00189                 if file.exists():
00190                     file.remove(0)
00191                 moz_mode = NS_CREATE_FILE | NS_WRONLY
00192             elif mode=="a":
00193                 moz_mode = NS_APPEND
00194             else:
00195                 assert 0, "Can't happen!"
00196             self.fileIO.init(file, moz_mode, -1,0)
00197             self.outputStream = self.fileIO
00198         elif mode == "r":
00199             self.fileIO = components.classes["@mozilla.org/network/file-input-stream;1"].createInstance("nsIFileInputStream")
00200             self.fileIO.init(file, NS_RDONLY, -1,0)
00201             self.inputStream = components.classes["@mozilla.org/scriptableinputstream;1"].createInstance("nsIScriptableInputStream")
00202             self.inputStream.init(self.fileIO)
00203         else:
00204             raise ValueError, "Unknown mode"
00205 
00206     def close(self):
00207         if self.fileIO is not None:
00208             self.fileIO.close()
00209             self.fileIO = None
00210         _File.close(self)
00211 
00212     def read(self, n = -1):
00213         return _File.read(self, n)
00214 
00215 
00216 ##########################################################
00217 ##
00218 ## Test Code
00219 ##
00220 ##########################################################
00221 def _DoTestRead(file, expected):
00222     # read in a couple of chunks, just to test that our various arg combinations work.
00223     got = file.read(3)
00224     got = got + file.read(300)
00225     got = got + file.read(0)
00226     got = got + file.read()
00227     if got != expected:
00228         raise RuntimeError, "Reading '%s' failed - got %d bytes, but expected %d bytes" % (file, len(got), len(expected))
00229 
00230 def _DoTestBufferRead(file, expected):
00231     # read in a couple of chunks, just to test that our various arg combinations work.
00232     buffer = _xpcom.AllocateBuffer(50)
00233     got = ''
00234     while 1:
00235         # Note - we need to reach into the file object so we
00236         # can get at the native buffer supported function.
00237         num = file.inputStream.read(buffer)
00238         if num == 0:
00239             break
00240         got = got + str(buffer[:num])
00241     if got != expected:
00242         raise RuntimeError, "Reading '%s' failed - got %d bytes, but expected %d bytes" % (file, len(got), len(expected))
00243 
00244 def _TestLocalFile():
00245     import tempfile, os
00246     fname = tempfile.mktemp()
00247     data = "Hello from Python"
00248     test_file = LocalFile(fname, "w")
00249     try:
00250         test_file.write(data)
00251         test_file.close()
00252         # Make sure Python can read it OK.
00253         f = open(fname, "r")
00254         if f.read() != data:
00255                 print "Eeek - Python could not read the data back correctly!"
00256         f.close()
00257         # For the sake of the test, try a re-init.
00258         test_file.init(fname, "r")
00259         got = str(test_file.read())
00260         if got != data:
00261             print "Read the wrong data back - %r" % (got,)
00262         else:
00263             print "Read the correct data."
00264         test_file.close()
00265         # Try reading in chunks.
00266         test_file = LocalFile(fname, "r")
00267         got = test_file.read(10) + test_file.read()
00268         if got != data:
00269             print "Chunks the wrong data back - %r" % (got,)
00270         else:
00271             print "Chunks read the correct data."
00272         test_file.close()
00273         # Open the same file again for writing - this should delete the old one.
00274         if not os.path.isfile(fname):
00275             raise RuntimeError, "The file '%s' does not exist, but we are explicitly testing create semantics when it does" % (fname,)
00276         test_file = LocalFile(fname, "w")
00277         test_file.write(data)
00278         test_file.close()
00279         # Make sure Python can read it OK.
00280         f = open(fname, "r")
00281         if f.read() != data:
00282                 print "Eeek - Python could not read the data back correctly after recreating an existing file!"
00283         f.close()
00284 
00285         # XXX - todo - test "a" mode!
00286     finally:
00287         try:
00288             os.unlink(fname)
00289         except OSError, details:
00290             print "Error removing temp test file:", details
00291 
00292 def _TestAll():
00293     # A mini test suite.
00294     # Get a test file, and convert it to a file:// URI.
00295     # check what we read is the same as when
00296     # we read this file "normally"
00297     fname = components.__file__
00298     if fname[-1] in "cCoO": # fix .pyc/.pyo
00299             fname = fname[:-1]
00300     expected = open(fname, "rb").read()
00301     # convert the fname to a URI.
00302     url = LocalFileToURL(fname)
00303     # First try passing a URL as a string.
00304     _DoTestRead( URIFile( url.spec), expected)
00305     print "Open as string test worked."
00306     # Now with a URL object.
00307     _DoTestRead( URIFile( url ), expected)
00308     print "Open as URL test worked."
00309 
00310     _DoTestBufferRead( URIFile( url ), expected)
00311     print "File test using buffers worked."
00312 
00313     # For the sake of testing, do our pointless, demo object!
00314     _DoTestRead( LocalFile(fname), expected )
00315     print "Local file read test worked."
00316 
00317     # Now do the full test of our pointless, demo object!
00318     _TestLocalFile()
00319 
00320 def _TestURI(url):
00321     test_file = URIFile(url)
00322     print "Opened file is", test_file
00323     got = test_file.read()
00324     print "Read %d bytes of data from %r" % (len(got), url)
00325     test_file.close()
00326 
00327 if __name__=='__main__':
00328     import sys
00329     if len(sys.argv) < 2:
00330         print "No URL specified on command line - performing self-test"
00331         _TestAll()
00332     else:
00333             _TestURI(sys.argv[1])