Back to index

lightning-sunbird  0.9+nobinonly
nsInputStreamPump.cpp
Go to the documentation of this file.
00001 /* -*- Mode: C++; tab-width: 2; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
00002 /* ***** BEGIN LICENSE BLOCK *****
00003  * Version: MPL 1.1/GPL 2.0/LGPL 2.1
00004  *
00005  * The contents of this file are subject to the Mozilla Public License Version
00006  * 1.1 (the "License"); you may not use this file except in compliance with
00007  * the License. You may obtain a copy of the License at
00008  * http://www.mozilla.org/MPL/
00009  *
00010  * Software distributed under the License is distributed on an "AS IS" basis,
00011  * WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License
00012  * for the specific language governing rights and limitations under the
00013  * License.
00014  *
00015  * The Original Code is mozilla.org code.
00016  *
00017  * The Initial Developer of the Original Code is
00018  * Netscape Communications Corporation.
00019  * Portions created by the Initial Developer are Copyright (C) 1998
00020  * the Initial Developer. All Rights Reserved.
00021  *
00022  * Contributor(s):
00023  *
00024  * Alternatively, the contents of this file may be used under the terms of
00025  * either the GNU General Public License Version 2 or later (the "GPL"), or
00026  * the GNU Lesser General Public License Version 2.1 or later (the "LGPL"),
00027  * in which case the provisions of the GPL or the LGPL are applicable instead
00028  * of those above. If you wish to allow use of your version of this file only
00029  * under the terms of either the GPL or the LGPL, and not to allow others to
00030  * use your version of this file under the terms of the MPL, indicate your
00031  * decision by deleting the provisions above and replace them with the notice
00032  * and other provisions required by the GPL or the LGPL. If you do not delete
00033  * the provisions above, a recipient may use your version of this file under
00034  * the terms of any one of the MPL, the GPL or the LGPL.
00035  *
00036  * ***** END LICENSE BLOCK ***** */
00037 
00038 #include "nsInputStreamPump.h"
00039 #include "nsIServiceManager.h"
00040 #include "nsIStreamTransportService.h"
00041 #include "nsIEventQueueService.h"
00042 #include "nsIInterfaceRequestorUtils.h"
00043 #include "nsISeekableStream.h"
00044 #include "nsITransport.h"
00045 #include "nsNetSegmentUtils.h"
00046 #include "nsNetUtil.h"
00047 #include "nsCOMPtr.h"
00048 #include "prlog.h"
00049 #include "nsInt64.h"
00050 
00051 static NS_DEFINE_CID(kStreamTransportServiceCID, NS_STREAMTRANSPORTSERVICE_CID);
00052 static NS_DEFINE_CID(kEventQueueServiceCID, NS_EVENTQUEUESERVICE_CID);
00053 
00054 #if defined(PR_LOGGING)
00055 //
00056 // NSPR_LOG_MODULES=nsStreamPump:5
00057 //
00058 static PRLogModuleInfo *gStreamPumpLog = nsnull;
00059 #endif
00060 #define LOG(args) PR_LOG(gStreamPumpLog, PR_LOG_DEBUG, args)
00061 
00062 //-----------------------------------------------------------------------------
00063 // nsInputStreamPump methods
00064 //-----------------------------------------------------------------------------
00065 
00066 nsInputStreamPump::nsInputStreamPump()
00067     : mState(STATE_IDLE)
00068     , mStreamOffset(0)
00069     , mStreamLength(LL_MaxUint())
00070     , mStatus(NS_OK)
00071     , mSuspendCount(0)
00072     , mLoadFlags(LOAD_NORMAL)
00073     , mWaiting(PR_FALSE)
00074     , mCloseWhenDone(PR_FALSE)
00075 {
00076 #if defined(PR_LOGGING)
00077     if (!gStreamPumpLog)
00078         gStreamPumpLog = PR_NewLogModule("nsStreamPump");
00079 #endif
00080 }
00081 
00082 nsInputStreamPump::~nsInputStreamPump()
00083 {
00084 }
00085 
00086 nsresult
00087 nsInputStreamPump::Create(nsInputStreamPump  **result,
00088                           nsIInputStream      *stream,
00089                           PRInt64              streamPos,
00090                           PRInt64              streamLen,
00091                           PRUint32             segsize,
00092                           PRUint32             segcount,
00093                           PRBool               closeWhenDone)
00094 {
00095     nsresult rv = NS_ERROR_OUT_OF_MEMORY;
00096     nsRefPtr<nsInputStreamPump> pump = new nsInputStreamPump();
00097     if (pump) {
00098         rv = pump->Init(stream, streamPos, streamLen,
00099                         segsize, segcount, closeWhenDone);
00100         if (NS_SUCCEEDED(rv)) {
00101             *result = nsnull;
00102             pump.swap(*result);
00103         }
00104     }
00105     return rv;
00106 }
00107 
00108 
00109 
00110 struct PeekData {
00111   PeekData(nsInputStreamPump::PeekSegmentFun fun, void* closure)
00112     : mFunc(fun), mClosure(closure) {}
00113 
00114   nsInputStreamPump::PeekSegmentFun mFunc;
00115   void* mClosure;
00116 };
00117 
00118 static NS_METHOD
00119 CallPeekFunc(nsIInputStream *aInStream, void *aClosure,
00120              const char *aFromSegment, PRUint32 aToOffset, PRUint32 aCount,
00121              PRUint32 *aWriteCount)
00122 {
00123   NS_ASSERTION(aToOffset == 0, "Called more than once?");
00124   NS_ASSERTION(aCount > 0, "Called without data?");
00125 
00126   PeekData* data = NS_STATIC_CAST(PeekData*, aClosure);
00127   data->mFunc(data->mClosure,
00128               NS_REINTERPRET_CAST(const PRUint8*, aFromSegment), aCount);
00129   return NS_BINDING_ABORTED;
00130 }
00131 
00132 void
00133 nsInputStreamPump::PeekStream(PeekSegmentFun callback, void* closure)
00134 {
00135   NS_ASSERTION(mAsyncStream, "PeekStream called without stream");
00136   PeekData data(callback, closure);
00137   PRUint32 read;
00138   mAsyncStream->ReadSegments(CallPeekFunc, &data, NET_DEFAULT_SEGMENT_SIZE,
00139                              &read);
00140 }
00141 
00142 nsresult
00143 nsInputStreamPump::EnsureWaiting()
00144 {
00145     // no need to worry about multiple threads... an input stream pump lives
00146     // on only one thread.
00147 
00148     if (!mWaiting) {
00149         nsresult rv = mAsyncStream->AsyncWait(this, 0, 0, mEventQ);
00150         if (NS_FAILED(rv)) {
00151             NS_ERROR("AsyncWait failed");
00152             return rv;
00153         }
00154         mWaiting = PR_TRUE;
00155     }
00156     return NS_OK;
00157 }
00158 
00159 //-----------------------------------------------------------------------------
00160 // nsInputStreamPump::nsISupports
00161 //-----------------------------------------------------------------------------
00162 
00163 // although this class can only be accessed from one thread at a time, we do
00164 // allow its ownership to move from thread to thread, assuming the consumer
00165 // understands the limitations of this.
00166 NS_IMPL_THREADSAFE_ISUPPORTS3(nsInputStreamPump,
00167                               nsIRequest,
00168                               nsIInputStreamCallback,
00169                               nsIInputStreamPump)
00170 
00171 //-----------------------------------------------------------------------------
00172 // nsInputStreamPump::nsIRequest
00173 //-----------------------------------------------------------------------------
00174 
00175 NS_IMETHODIMP
00176 nsInputStreamPump::GetName(nsACString &result)
00177 {
00178     result.Truncate();
00179     return NS_OK;
00180 }
00181 
00182 NS_IMETHODIMP
00183 nsInputStreamPump::IsPending(PRBool *result)
00184 {
00185     *result = (mState != STATE_IDLE);
00186     return NS_OK;
00187 }
00188 
00189 NS_IMETHODIMP
00190 nsInputStreamPump::GetStatus(nsresult *status)
00191 {
00192     *status = mStatus;
00193     return NS_OK;
00194 }
00195 
00196 NS_IMETHODIMP
00197 nsInputStreamPump::Cancel(nsresult status)
00198 {
00199     LOG(("nsInputStreamPump::Cancel [this=%x status=%x]\n",
00200         this, status));
00201 
00202     if (NS_FAILED(mStatus)) {
00203         LOG(("  already canceled\n"));
00204         return NS_OK;
00205     }
00206 
00207     NS_ASSERTION(NS_FAILED(status), "cancel with non-failure status code");
00208     mStatus = status;
00209 
00210     // close input stream
00211     if (mAsyncStream) {
00212         mAsyncStream->CloseWithStatus(status);
00213         mSuspendCount = 0; // un-suspend
00214         EnsureWaiting();
00215     }
00216     return NS_OK;
00217 }
00218 
00219 NS_IMETHODIMP
00220 nsInputStreamPump::Suspend()
00221 {
00222     LOG(("nsInputStreamPump::Suspend [this=%x]\n", this));
00223     NS_ENSURE_TRUE(mState != STATE_IDLE, NS_ERROR_UNEXPECTED);
00224     ++mSuspendCount;
00225     return NS_OK;
00226 }
00227 
00228 NS_IMETHODIMP
00229 nsInputStreamPump::Resume()
00230 {
00231     LOG(("nsInputStreamPump::Resume [this=%x]\n", this));
00232     NS_ENSURE_TRUE(mSuspendCount > 0, NS_ERROR_UNEXPECTED);
00233     NS_ENSURE_TRUE(mState != STATE_IDLE, NS_ERROR_UNEXPECTED);
00234 
00235     if (--mSuspendCount == 0)
00236         EnsureWaiting();
00237     return NS_OK;
00238 }
00239 
00240 NS_IMETHODIMP
00241 nsInputStreamPump::GetLoadFlags(nsLoadFlags *aLoadFlags)
00242 {
00243     *aLoadFlags = mLoadFlags;
00244     return NS_OK;
00245 }
00246 
00247 NS_IMETHODIMP
00248 nsInputStreamPump::SetLoadFlags(nsLoadFlags aLoadFlags)
00249 {
00250     mLoadFlags = aLoadFlags;
00251     return NS_OK;
00252 }
00253 
00254 NS_IMETHODIMP
00255 nsInputStreamPump::GetLoadGroup(nsILoadGroup **aLoadGroup)
00256 {
00257     NS_IF_ADDREF(*aLoadGroup = mLoadGroup);
00258     return NS_OK;
00259 }
00260 
00261 NS_IMETHODIMP
00262 nsInputStreamPump::SetLoadGroup(nsILoadGroup *aLoadGroup)
00263 {
00264     mLoadGroup = aLoadGroup;
00265     return NS_OK;
00266 }
00267 
00268 //-----------------------------------------------------------------------------
00269 // nsInputStreamPump::nsIInputStreamPump implementation
00270 //-----------------------------------------------------------------------------
00271 
00272 NS_IMETHODIMP
00273 nsInputStreamPump::Init(nsIInputStream *stream,
00274                         PRInt64 streamPos, PRInt64 streamLen,
00275                         PRUint32 segsize, PRUint32 segcount,
00276                         PRBool closeWhenDone)
00277 {
00278     NS_ENSURE_TRUE(mState == STATE_IDLE, NS_ERROR_IN_PROGRESS);
00279 
00280     mStreamOffset = PRUint64(streamPos);
00281     if (nsInt64(streamLen) >= nsInt64(0))
00282         mStreamLength = PRUint64(streamLen);
00283     mStream = stream;
00284     mSegSize = segsize;
00285     mSegCount = segcount;
00286     mCloseWhenDone = closeWhenDone;
00287 
00288     return NS_OK;
00289 }
00290 
00291 NS_IMETHODIMP
00292 nsInputStreamPump::AsyncRead(nsIStreamListener *listener, nsISupports *ctxt)
00293 {
00294     NS_ENSURE_TRUE(mState == STATE_IDLE, NS_ERROR_IN_PROGRESS);
00295 
00296     nsresult rv;
00297 
00298     //
00299     // OK, we need to use the stream transport service if
00300     //
00301     // (1) the stream is blocking
00302     // (2) the stream does not support nsIAsyncInputStream
00303     //
00304 
00305     PRBool nonBlocking;
00306     rv = mStream->IsNonBlocking(&nonBlocking);
00307     if (NS_FAILED(rv)) return rv;
00308 
00309     if (nonBlocking) {
00310         mAsyncStream = do_QueryInterface(mStream);
00311         //
00312         // if the stream supports nsIAsyncInputStream, and if we need to seek
00313         // to a starting offset, then we must do so here.  in the non-async
00314         // stream case, the stream transport service will take care of seeking
00315         // for us.
00316         // 
00317         if (mAsyncStream && (mStreamOffset != nsUint64(LL_MaxUint()))) {
00318             nsCOMPtr<nsISeekableStream> seekable = do_QueryInterface(mStream);
00319             if (seekable)
00320                 seekable->Seek(nsISeekableStream::NS_SEEK_SET, PRInt64(PRUint64(mStreamOffset)));
00321         }
00322     }
00323 
00324     if (!mAsyncStream) {
00325         // ok, let's use the stream transport service to read this stream.
00326         nsCOMPtr<nsIStreamTransportService> sts =
00327             do_GetService(kStreamTransportServiceCID, &rv);
00328         if (NS_FAILED(rv)) return rv;
00329 
00330         nsCOMPtr<nsITransport> transport;
00331         // Note: The casts to PRUint64 are needed to cast to PRInt64, as
00332         // nsUint64 can't directly be cast to PRInt64
00333         rv = sts->CreateInputTransport(mStream, PRUint64(mStreamOffset), PRUint64(mStreamLength),
00334                                        mCloseWhenDone, getter_AddRefs(transport));
00335         if (NS_FAILED(rv)) return rv;
00336 
00337         nsCOMPtr<nsIInputStream> wrapper;
00338         rv = transport->OpenInputStream(0, mSegSize, mSegCount, getter_AddRefs(wrapper));
00339         if (NS_FAILED(rv)) return rv;
00340 
00341         mAsyncStream = do_QueryInterface(wrapper, &rv);
00342         if (NS_FAILED(rv)) return rv;
00343     }
00344 
00345     // release our reference to the original stream.  from this point forward,
00346     // we only reference the "stream" via mAsyncStream.
00347     mStream = 0;
00348 
00349     // mStreamOffset now holds the number of bytes currently read.  we use this
00350     // to enforce the mStreamLength restriction.
00351     mStreamOffset = 0;
00352 
00353     // grab event queue (we must do this here by contract, since all notifications
00354     // must go to the thread which called AsyncRead)
00355     nsCOMPtr<nsIEventQueueService> eqs = do_GetService(kEventQueueServiceCID, &rv);
00356     if (NS_FAILED(rv)) return rv;
00357 
00358     rv = eqs->ResolveEventQueue(NS_CURRENT_EVENTQ, getter_AddRefs(mEventQ));
00359     if (NS_FAILED(rv)) return rv;
00360 
00361     rv = EnsureWaiting();
00362     if (NS_FAILED(rv)) return rv;
00363 
00364     if (mLoadGroup)
00365         mLoadGroup->AddRequest(this, nsnull);
00366 
00367     mState = STATE_START;
00368     mListener = listener;
00369     mListenerContext = ctxt;
00370     return NS_OK;
00371 }
00372 
00373 //-----------------------------------------------------------------------------
00374 // nsInputStreamPump::nsIInputStreamCallback implementation
00375 //-----------------------------------------------------------------------------
00376 
00377 NS_IMETHODIMP
00378 nsInputStreamPump::OnInputStreamReady(nsIAsyncInputStream *stream)
00379 {
00380     LOG(("nsInputStreamPump::OnInputStreamReady [this=%x]\n", this));
00381 
00382     // this function has been called from a PLEvent, so we can safely call
00383     // any listener or progress sink methods directly from here.
00384 
00385     for (;;) {
00386         if (mSuspendCount || mState == STATE_IDLE) {
00387             mWaiting = PR_FALSE;
00388             break;
00389         }
00390 
00391         PRUint32 nextState;
00392         switch (mState) {
00393         case STATE_START:
00394             nextState = OnStateStart();
00395             break;
00396         case STATE_TRANSFER:
00397             nextState = OnStateTransfer();
00398             break;
00399         case STATE_STOP:
00400             nextState = OnStateStop();
00401             break;
00402         }
00403 
00404         if (mState == nextState && !mSuspendCount) {
00405             NS_ASSERTION(mState == STATE_TRANSFER, "unexpected state");
00406             NS_ASSERTION(NS_SUCCEEDED(mStatus), "unexpected status");
00407 
00408             mWaiting = PR_FALSE;
00409             mStatus = EnsureWaiting();
00410             if (NS_SUCCEEDED(mStatus))
00411                 break;
00412             
00413             nextState = STATE_STOP;
00414         }
00415 
00416         mState = nextState;
00417     }
00418     return NS_OK;
00419 }
00420 
00421 PRUint32
00422 nsInputStreamPump::OnStateStart()
00423 {
00424     LOG(("  OnStateStart [this=%x]\n", this));
00425 
00426     nsresult rv;
00427 
00428     // need to check the reason why the stream is ready.  this is required
00429     // so our listener can check our status from OnStartRequest.
00430     // XXX async streams should have a GetStatus method!
00431     if (NS_SUCCEEDED(mStatus)) {
00432         PRUint32 avail;
00433         rv = mAsyncStream->Available(&avail);
00434         if (NS_FAILED(rv) && rv != NS_BASE_STREAM_CLOSED)
00435             mStatus = rv;
00436     }
00437 
00438     rv = mListener->OnStartRequest(this, mListenerContext);
00439 
00440     // an error returned from OnStartRequest should cause us to abort; however,
00441     // we must not stomp on mStatus if already canceled.
00442     if (NS_FAILED(rv) && NS_SUCCEEDED(mStatus))
00443         mStatus = rv;
00444 
00445     return NS_SUCCEEDED(mStatus) ? STATE_TRANSFER : STATE_STOP;
00446 }
00447 
00448 PRUint32
00449 nsInputStreamPump::OnStateTransfer()
00450 {
00451     LOG(("  OnStateTransfer [this=%x]\n", this));
00452 
00453     // if canceled, go directly to STATE_STOP...
00454     if (NS_FAILED(mStatus))
00455         return STATE_STOP;
00456 
00457     nsresult rv;
00458 
00459     PRUint32 avail;
00460     rv = mAsyncStream->Available(&avail);
00461     LOG(("  Available returned [stream=%x rv=%x avail=%u]\n", mAsyncStream.get(), rv, avail));
00462 
00463     if (rv == NS_BASE_STREAM_CLOSED) {
00464         rv = NS_OK;
00465         avail = 0;
00466     }
00467     else if (NS_SUCCEEDED(rv) && avail) {
00468         // figure out how much data to report (XXX detect overflow??)
00469         if (nsUint64(avail) + mStreamOffset > mStreamLength)
00470             avail = mStreamLength - mStreamOffset;
00471 
00472         if (avail) {
00473             // we used to limit avail to 16K - we were afraid some ODA handlers
00474             // might assume they wouldn't get more than 16K at once
00475             // we're removing that limit since it speeds up local file access.
00476             // Now there's an implicit 64K limit of 4 16K segments
00477             // NOTE: ok, so the story is as follows.  OnDataAvailable impls
00478             //       are by contract supposed to consume exactly |avail| bytes.
00479             //       however, many do not... mailnews... stream converters...
00480             //       cough, cough.  the input stream pump is fairly tolerant
00481             //       in this regard; however, if an ODA does not consume any
00482             //       data from the stream, then we could potentially end up in
00483             //       an infinite loop.  we do our best here to try to catch
00484             //       such an error.  (see bug 189672)
00485 
00486             // in most cases this QI will succeed (mAsyncStream is almost always
00487             // a nsPipeInputStream, which implements nsISeekableStream::Tell).
00488             PRInt64 offsetBefore;
00489             nsCOMPtr<nsISeekableStream> seekable = do_QueryInterface(mAsyncStream);
00490             if (seekable)
00491                 seekable->Tell(&offsetBefore);
00492 
00493             LOG(("  calling OnDataAvailable [offset=%lld count=%u]\n", PRUint64(mStreamOffset), avail));
00494             rv = mListener->OnDataAvailable(this, mListenerContext, mAsyncStream, mStreamOffset, avail);
00495 
00496             // don't enter this code if ODA failed or called Cancel
00497             if (NS_SUCCEEDED(rv) && NS_SUCCEEDED(mStatus)) {
00498                 // test to see if this ODA failed to consume data
00499                 if (seekable) {
00500                     PRInt64 offsetAfter;
00501                     seekable->Tell(&offsetAfter);
00502                     nsUint64 offsetBefore64 = PRUint64(offsetBefore);
00503                     nsUint64 offsetAfter64 = PRUint64(offsetAfter);
00504                     if (offsetAfter64 > offsetBefore64) {
00505                         nsUint64 offsetDelta = offsetAfter64 - offsetBefore64;
00506                         mStreamOffset += offsetDelta;
00507                     }
00508                     else if (mSuspendCount == 0) {
00509                         //
00510                         // possible infinite loop if we continue pumping data!
00511                         //
00512                         // NOTE: although not allowed by nsIStreamListener, we
00513                         // will allow the ODA impl to Suspend the pump.  IMAP
00514                         // does this :-(
00515                         //
00516                         NS_ERROR("OnDataAvailable implementation consumed no data");
00517                         mStatus = NS_ERROR_UNEXPECTED;
00518                     }
00519                 }
00520                 else
00521                     mStreamOffset += avail; // assume ODA behaved well
00522             }
00523         }
00524     }
00525 
00526     // an error returned from Available or OnDataAvailable should cause us to
00527     // abort; however, we must not stomp on mStatus if already canceled.
00528 
00529     if (NS_SUCCEEDED(mStatus)) {
00530         if (NS_FAILED(rv))
00531             mStatus = rv;
00532         else if (avail) {
00533             // if stream is now closed, advance to STATE_STOP right away.
00534             // Available may return 0 bytes available at the moment; that
00535             // would not mean that we are done.
00536             // XXX async streams should have a GetStatus method!
00537             rv = mAsyncStream->Available(&avail);
00538             if (NS_SUCCEEDED(rv))
00539                 return STATE_TRANSFER;
00540         }
00541     }
00542     return STATE_STOP;
00543 }
00544 
00545 PRUint32
00546 nsInputStreamPump::OnStateStop()
00547 {
00548     LOG(("  OnStateStop [this=%x status=%x]\n", this, mStatus));
00549 
00550     // if an error occured, we must be sure to pass the error onto the async
00551     // stream.  in some cases, this is redundant, but since close is idempotent,
00552     // this is OK.  otherwise, be sure to honor the "close-when-done" option.
00553 
00554     if (NS_FAILED(mStatus))
00555         mAsyncStream->CloseWithStatus(mStatus);
00556     else if (mCloseWhenDone)
00557         mAsyncStream->Close();
00558 
00559     mAsyncStream = 0;
00560     mEventQ = 0;
00561     mIsPending = PR_FALSE;
00562 
00563     mListener->OnStopRequest(this, mListenerContext, mStatus);
00564     mListener = 0;
00565     mListenerContext = 0;
00566 
00567     if (mLoadGroup)
00568         mLoadGroup->RemoveRequest(this, nsnull, mStatus);
00569 
00570     return STATE_IDLE;
00571 }