Back to index

lightning-sunbird  0.9+nobinonly
nsPipe3.cpp
Go to the documentation of this file.
00001 /* ***** BEGIN LICENSE BLOCK *****
00002  * Version: MPL 1.1/GPL 2.0/LGPL 2.1
00003  *
00004  * The contents of this file are subject to the Mozilla Public License Version
00005  * 1.1 (the "License"); you may not use this file except in compliance with
00006  * the License. You may obtain a copy of the License at
00007  * http://www.mozilla.org/MPL/
00008  *
00009  * Software distributed under the License is distributed on an "AS IS" basis,
00010  * WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License
00011  * for the specific language governing rights and limitations under the
00012  * License.
00013  *
00014  * The Original Code is Mozilla.
00015  *
00016  * The Initial Developer of the Original Code is
00017  * Netscape Communications Corporation.
00018  * Portions created by the Initial Developer are Copyright (C) 2002
00019  * the Initial Developer. All Rights Reserved.
00020  *
00021  * Contributor(s):
00022  *   Darin Fisher <darin@netscape.com>
00023  *
00024  * Alternatively, the contents of this file may be used under the terms of
00025  * either the GNU General Public License Version 2 or later (the "GPL"), or
00026  * the GNU Lesser General Public License Version 2.1 or later (the "LGPL"),
00027  * in which case the provisions of the GPL or the LGPL are applicable instead
00028  * of those above. If you wish to allow use of your version of this file only
00029  * under the terms of either the GPL or the LGPL, and not to allow others to
00030  * use your version of this file under the terms of the MPL, indicate your
00031  * decision by deleting the provisions above and replace them with the notice
00032  * and other provisions required by the GPL or the LGPL. If you do not delete
00033  * the provisions above, a recipient may use your version of this file under
00034  * the terms of any one of the MPL, the GPL or the LGPL.
00035  *
00036  * ***** END LICENSE BLOCK ***** */
00037 
00038 #include "nsIPipe.h"
00039 #include "nsIEventTarget.h"
00040 #include "nsISeekableStream.h"
00041 #include "nsSegmentedBuffer.h"
00042 #include "nsStreamUtils.h"
00043 #include "nsAutoLock.h"
00044 #include "nsCOMPtr.h"
00045 #include "nsCRT.h"
00046 #include "prlog.h"
00047 #include "nsInt64.h"
00048 
00049 #if defined(PR_LOGGING)
00050 //
00051 // set NSPR_LOG_MODULES=nsPipe:5
00052 //
00053 static PRLogModuleInfo *gPipeLog = PR_NewLogModule("nsPipe");
00054 #define LOG(args) PR_LOG(gPipeLog, PR_LOG_DEBUG, args)
00055 #else
00056 #define LOG(args)
00057 #endif
00058 
00059 #define DEFAULT_SEGMENT_SIZE  4096
00060 #define DEFAULT_SEGMENT_COUNT 16
00061 
00062 class nsPipe;
00063 class nsPipeEvents;
00064 class nsPipeInputStream;
00065 class nsPipeOutputStream;
00066 
00067 //-----------------------------------------------------------------------------
00068 
00069 // this class is used to delay notifications until the end of a particular
00070 // scope.  it helps avoid the complexity of issuing callbacks while inside
00071 // a critical section.
00072 class nsPipeEvents
00073 {
00074 public:
00075     nsPipeEvents() { }
00076    ~nsPipeEvents();
00077 
00078     inline void NotifyInputReady(nsIAsyncInputStream *stream,
00079                                  nsIInputStreamCallback *callback)
00080     {
00081         NS_ASSERTION(!mInputCallback, "already have an input event");
00082         mInputStream = stream;
00083         mInputCallback = callback;
00084     }
00085 
00086     inline void NotifyOutputReady(nsIAsyncOutputStream *stream,
00087                                   nsIOutputStreamCallback *callback)
00088     {
00089         NS_ASSERTION(!mOutputCallback, "already have an output event");
00090         mOutputStream = stream;
00091         mOutputCallback = callback;
00092     }
00093 
00094 private:
00095     nsCOMPtr<nsIAsyncInputStream>     mInputStream;
00096     nsCOMPtr<nsIInputStreamCallback>  mInputCallback;
00097     nsCOMPtr<nsIAsyncOutputStream>    mOutputStream;
00098     nsCOMPtr<nsIOutputStreamCallback> mOutputCallback;
00099 };
00100 
00101 //-----------------------------------------------------------------------------
00102 
00103 // the input end of a pipe (allocated as a member of the pipe).
00104 class nsPipeInputStream : public nsIAsyncInputStream
00105                         , public nsISeekableStream
00106                         , public nsISearchableInputStream
00107 {
00108 public:
00109     // since this class will be allocated as a member of the pipe, we do not
00110     // need our own ref count.  instead, we share the lifetime (the ref count)
00111     // of the entire pipe.  this macro is just convenience since it does not
00112     // declare a mRefCount variable; however, don't let the name fool you...
00113     // we are not inheriting from nsPipe ;-)
00114     NS_DECL_ISUPPORTS_INHERITED
00115 
00116     NS_DECL_NSIINPUTSTREAM
00117     NS_DECL_NSIASYNCINPUTSTREAM
00118     NS_DECL_NSISEEKABLESTREAM
00119     NS_DECL_NSISEARCHABLEINPUTSTREAM
00120 
00121     nsPipeInputStream(nsPipe *pipe)
00122         : mPipe(pipe)
00123         , mReaderRefCnt(0)
00124         , mLogicalOffset(0)
00125         , mBlocking(PR_TRUE)
00126         , mBlocked(PR_FALSE)
00127         , mAvailable(0)
00128         , mCallbackFlags(0)
00129         { }
00130 
00131     nsresult Fill();
00132     void SetNonBlocking(PRBool aNonBlocking) { mBlocking = !aNonBlocking; }
00133 
00134     PRUint32 Available() { return mAvailable; }
00135     void     ReduceAvailable(PRUint32 avail) { mAvailable -= avail; }
00136 
00137     // synchronously wait for the pipe to become readable.
00138     nsresult Wait();
00139 
00140     // these functions return true to indicate that the pipe's monitor should
00141     // be notified, to wake up a blocked reader if any.
00142     PRBool   OnInputReadable(PRUint32 bytesWritten, nsPipeEvents &);
00143     PRBool   OnInputException(nsresult, nsPipeEvents &);
00144 
00145 private:
00146     nsPipe                        *mPipe;
00147 
00148     // separate refcnt so that we know when to close the consumer
00149     nsrefcnt                       mReaderRefCnt;
00150     nsInt64                        mLogicalOffset;
00151     PRPackedBool                   mBlocking;
00152 
00153     // these variables can only be accessed while inside the pipe's monitor
00154     PRPackedBool                   mBlocked;
00155     PRUint32                       mAvailable;
00156     nsCOMPtr<nsIInputStreamCallback> mCallback;
00157     PRUint32                       mCallbackFlags;
00158 };
00159 
00160 //-----------------------------------------------------------------------------
00161 
00162 // the output end of a pipe (allocated as a member of the pipe).
00163 class nsPipeOutputStream : public nsIAsyncOutputStream
00164                          , public nsISeekableStream
00165 {
00166 public:
00167     // since this class will be allocated as a member of the pipe, we do not
00168     // need our own ref count.  instead, we share the lifetime (the ref count)
00169     // of the entire pipe.  this macro is just convenience since it does not
00170     // declare a mRefCount variable; however, don't let the name fool you...
00171     // we are not inheriting from nsPipe ;-)
00172     NS_DECL_ISUPPORTS_INHERITED
00173 
00174     NS_DECL_NSIOUTPUTSTREAM
00175     NS_DECL_NSIASYNCOUTPUTSTREAM
00176     NS_DECL_NSISEEKABLESTREAM
00177 
00178     nsPipeOutputStream(nsPipe *pipe)
00179         : mPipe(pipe)
00180         , mWriterRefCnt(0)
00181         , mLogicalOffset(0)
00182         , mBlocking(PR_TRUE)
00183         , mBlocked(PR_FALSE)
00184         , mWritable(PR_TRUE)
00185         , mCallbackFlags(0)
00186         { }
00187 
00188     void SetNonBlocking(PRBool aNonBlocking) { mBlocking = !aNonBlocking; }
00189     void SetWritable(PRBool writable) { mWritable = writable; }
00190 
00191     // synchronously wait for the pipe to become writable.
00192     nsresult Wait();
00193 
00194     // these functions return true to indicate that the pipe's monitor should
00195     // be notified, to wake up a blocked writer if any.
00196     PRBool   OnOutputWritable(nsPipeEvents &);
00197     PRBool   OnOutputException(nsresult, nsPipeEvents &);
00198 
00199 private:
00200     nsPipe                         *mPipe;
00201 
00202     // separate refcnt so that we know when to close the producer
00203     nsrefcnt                        mWriterRefCnt;
00204     nsInt64                         mLogicalOffset;
00205     PRPackedBool                    mBlocking;
00206 
00207     // these variables can only be accessed while inside the pipe's monitor
00208     PRPackedBool                    mBlocked;
00209     PRPackedBool                    mWritable;
00210     nsCOMPtr<nsIOutputStreamCallback> mCallback;
00211     PRUint32                        mCallbackFlags;
00212 };
00213 
00214 //-----------------------------------------------------------------------------
00215 
00216 class nsPipe : public nsIPipe
00217 {
00218 public:
00219     friend class nsPipeInputStream;
00220     friend class nsPipeOutputStream;
00221 
00222     NS_DECL_ISUPPORTS
00223     NS_DECL_NSIPIPE
00224 
00225     // nsPipe methods:
00226     nsPipe();
00227 
00228 private:
00229     ~nsPipe();
00230 
00231 public:
00232     //
00233     // methods below may only be called while inside the pipe's monitor
00234     //
00235 
00236     void PeekSegment(PRUint32 n, char *&cursor, char *&limit);
00237 
00238     //
00239     // methods below may be called while outside the pipe's monitor
00240     //
00241  
00242     nsresult GetReadSegment(const char *&segment, PRUint32 &segmentLen);
00243     void     AdvanceReadCursor(PRUint32 count);
00244 
00245     nsresult GetWriteSegment(char *&segment, PRUint32 &segmentLen);
00246     void     AdvanceWriteCursor(PRUint32 count);
00247 
00248     void     OnPipeException(nsresult reason, PRBool outputOnly = PR_FALSE);
00249 
00250 protected:
00251     // We can't inherit from both nsIInputStream and nsIOutputStream
00252     // because they collide on their Close method. Consequently we nest their
00253     // implementations to avoid the extra object allocation.
00254     nsPipeInputStream   mInput;
00255     nsPipeOutputStream  mOutput;
00256 
00257     PRMonitor*          mMonitor;
00258     nsSegmentedBuffer   mBuffer;
00259 
00260     char*               mReadCursor;
00261     char*               mReadLimit;
00262 
00263     PRInt32             mWriteSegment;
00264     char*               mWriteCursor;
00265     char*               mWriteLimit;
00266 
00267     nsresult            mStatus;
00268 };
00269 
00270 //
00271 // NOTES on buffer architecture:
00272 //
00273 //       +-----------------+ - - mBuffer.GetSegment(0)
00274 //       |                 |
00275 //       + - - - - - - - - + - - mReadCursor
00276 //       |/////////////////|
00277 //       |/////////////////|
00278 //       |/////////////////|
00279 //       |/////////////////|
00280 //       +-----------------+ - - mReadLimit
00281 //                |
00282 //       +-----------------+
00283 //       |/////////////////|
00284 //       |/////////////////|
00285 //       |/////////////////|
00286 //       |/////////////////|
00287 //       |/////////////////|
00288 //       |/////////////////|
00289 //       +-----------------+
00290 //                |
00291 //       +-----------------+ - - mBuffer.GetSegment(mWriteSegment)
00292 //       |/////////////////|
00293 //       |/////////////////|
00294 //       |/////////////////|
00295 //       + - - - - - - - - + - - mWriteCursor
00296 //       |                 |
00297 //       |                 |
00298 //       +-----------------+ - - mWriteLimit
00299 //
00300 // (shaded region contains data)
00301 //
00302 // NOTE: on some systems (notably OS/2), the heap allocator uses an arena for
00303 // small allocations (e.g., 64 byte allocations).  this means that buffers may
00304 // be allocated back-to-back.  in the diagram above, for example, mReadLimit
00305 // would actually be pointing at the beginning of the next segment.  when
00306 // making changes to this file, please keep this fact in mind.
00307 //
00308 
00309 //-----------------------------------------------------------------------------
00310 // nsPipe methods:
00311 //-----------------------------------------------------------------------------
00312 
00313 nsPipe::nsPipe()
00314     : mInput(this)
00315     , mOutput(this)
00316     , mMonitor(nsnull)
00317     , mReadCursor(nsnull)
00318     , mReadLimit(nsnull)
00319     , mWriteSegment(-1)
00320     , mWriteCursor(nsnull)
00321     , mWriteLimit(nsnull)
00322     , mStatus(NS_OK)
00323 {
00324 }
00325 
00326 nsPipe::~nsPipe()
00327 {
00328     if (mMonitor)
00329         PR_DestroyMonitor(mMonitor);
00330 }
00331 
00332 NS_IMPL_THREADSAFE_ISUPPORTS1(nsPipe, nsIPipe)
00333 
00334 NS_IMETHODIMP
00335 nsPipe::Init(PRBool nonBlockingIn,
00336              PRBool nonBlockingOut,
00337              PRUint32 segmentSize,
00338              PRUint32 segmentCount,
00339              nsIMemory *segmentAlloc)
00340 {
00341     mMonitor = PR_NewMonitor();
00342     if (!mMonitor)
00343         return NS_ERROR_OUT_OF_MEMORY;
00344 
00345     if (segmentSize == 0)
00346         segmentSize = DEFAULT_SEGMENT_SIZE;
00347     if (segmentCount == 0)
00348         segmentCount = DEFAULT_SEGMENT_COUNT;
00349 
00350     // protect against overflow
00351     PRUint32 maxCount = PRUint32(-1) / segmentSize;
00352     if (segmentCount > maxCount)
00353         segmentCount = maxCount;
00354 
00355     nsresult rv = mBuffer.Init(segmentSize, segmentSize * segmentCount, segmentAlloc);
00356     if (NS_FAILED(rv))
00357         return rv;
00358 
00359     mInput.SetNonBlocking(nonBlockingIn);
00360     mOutput.SetNonBlocking(nonBlockingOut);
00361     return NS_OK;
00362 }
00363 
00364 NS_IMETHODIMP
00365 nsPipe::GetInputStream(nsIAsyncInputStream **aInputStream)
00366 {
00367     NS_ADDREF(*aInputStream = &mInput);
00368     return NS_OK;
00369 }
00370 
00371 NS_IMETHODIMP
00372 nsPipe::GetOutputStream(nsIAsyncOutputStream **aOutputStream)
00373 {
00374     NS_ADDREF(*aOutputStream = &mOutput);
00375     return NS_OK;
00376 }
00377 
00378 void
00379 nsPipe::PeekSegment(PRUint32 index, char *&cursor, char *&limit)
00380 {
00381     if (index == 0) {
00382         NS_ASSERTION(!mReadCursor || mBuffer.GetSegmentCount(), "unexpected state");
00383         cursor = mReadCursor;
00384         limit = mReadLimit;
00385     }
00386     else {
00387         PRUint32 numSegments = mBuffer.GetSegmentCount();
00388         if (index >= numSegments)
00389             cursor = limit = nsnull;
00390         else {
00391             cursor = mBuffer.GetSegment(index);
00392             if (mWriteSegment == (PRInt32) index)
00393                 limit = mWriteCursor;
00394             else
00395                 limit = cursor + mBuffer.GetSegmentSize();
00396         }
00397     }
00398 }
00399 
00400 nsresult
00401 nsPipe::GetReadSegment(const char *&segment, PRUint32 &segmentLen)
00402 {
00403     nsAutoMonitor mon(mMonitor);
00404 
00405     if (mReadCursor == mReadLimit)
00406         return NS_FAILED(mStatus) ? mStatus : NS_BASE_STREAM_WOULD_BLOCK;
00407 
00408     segment    = mReadCursor;
00409     segmentLen = mReadLimit - mReadCursor;
00410     return NS_OK;
00411 }
00412 
00413 void
00414 nsPipe::AdvanceReadCursor(PRUint32 bytesRead)
00415 {
00416     NS_ASSERTION(bytesRead, "dont call if no bytes read");
00417 
00418     nsPipeEvents events;
00419     {
00420         nsAutoMonitor mon(mMonitor);
00421 
00422         LOG(("III advancing read cursor by %u\n", bytesRead));
00423         NS_ASSERTION(bytesRead <= mBuffer.GetSegmentSize(), "read too much");
00424 
00425         mReadCursor += bytesRead;
00426         NS_ASSERTION(mReadCursor <= mReadLimit, "read cursor exceeds limit");
00427 
00428         mInput.ReduceAvailable(bytesRead);
00429 
00430         if (mReadCursor == mReadLimit) {
00431             // we've reached the limit of how much we can read from this segment.
00432             // if at the end of this segment, then we must discard this segment.
00433 
00434             // if still writing in this segment then bail because we're not done
00435             // with the segment and have to wait for now...
00436             if (mWriteSegment == 0 && mWriteLimit > mWriteCursor) {
00437                 NS_ASSERTION(mReadLimit == mWriteCursor, "unexpected state");
00438                 return;
00439             }
00440 
00441             // shift write segment index (-1 indicates an empty buffer).
00442             --mWriteSegment;
00443 
00444             // done with this segment
00445             mBuffer.DeleteFirstSegment();
00446             LOG(("III deleting first segment\n"));
00447 
00448             if (mWriteSegment == -1) {
00449                 // buffer is completely empty
00450                 mReadCursor = nsnull;
00451                 mReadLimit = nsnull;
00452                 mWriteCursor = nsnull;
00453                 mWriteLimit = nsnull;
00454             }
00455             else {
00456                 // advance read cursor and limit to next buffer segment
00457                 mReadCursor = mBuffer.GetSegment(0);
00458                 if (mWriteSegment == 0)
00459                     mReadLimit = mWriteCursor;
00460                 else
00461                     mReadLimit = mReadCursor + mBuffer.GetSegmentSize();
00462             }
00463 
00464             // we've free'd up a segment, so notify output stream that pipe has
00465             // room for a new segment.
00466             if (mOutput.OnOutputWritable(events))
00467                 mon.Notify();
00468         }
00469     }
00470 }
00471 
00472 nsresult
00473 nsPipe::GetWriteSegment(char *&segment, PRUint32 &segmentLen)
00474 {
00475     nsAutoMonitor mon(mMonitor);
00476 
00477     if (NS_FAILED(mStatus))
00478         return mStatus;
00479 
00480     // write cursor and limit may both be null indicating an empty buffer.
00481     if (mWriteCursor == mWriteLimit) {
00482         char *seg = mBuffer.AppendNewSegment();
00483         // pipe is full
00484         if (seg == nsnull)
00485             return NS_BASE_STREAM_WOULD_BLOCK;
00486         LOG(("OOO appended new segment\n"));
00487         mWriteCursor = seg;
00488         mWriteLimit = mWriteCursor + mBuffer.GetSegmentSize();
00489         ++mWriteSegment;
00490     }
00491 
00492     // make sure read cursor is initialized
00493     if (mReadCursor == nsnull) {
00494         NS_ASSERTION(mWriteSegment == 0, "unexpected null read cursor");
00495         mReadCursor = mReadLimit = mWriteCursor;
00496     }
00497 
00498     // check to see if we can roll-back our read and write cursors to the 
00499     // beginning of the current/first segment.  this is purely an optimization.
00500     if (mReadCursor == mWriteCursor && mWriteSegment == 0) {
00501         char *head = mBuffer.GetSegment(0);
00502         LOG(("OOO rolling back write cursor %u bytes\n", mWriteCursor - head));
00503         mWriteCursor = mReadCursor = mReadLimit = head;
00504     }
00505 
00506     segment    = mWriteCursor;
00507     segmentLen = mWriteLimit - mWriteCursor;
00508     return NS_OK;
00509 }
00510 
00511 void
00512 nsPipe::AdvanceWriteCursor(PRUint32 bytesWritten)
00513 {
00514     NS_ASSERTION(bytesWritten, "dont call if no bytes written");
00515 
00516     nsPipeEvents events;
00517     {
00518         nsAutoMonitor mon(mMonitor);
00519 
00520         LOG(("OOO advancing write cursor by %u\n", bytesWritten));
00521 
00522         char *newWriteCursor = mWriteCursor + bytesWritten;
00523         NS_ASSERTION(newWriteCursor <= mWriteLimit, "write cursor exceeds limit");
00524 
00525         // update read limit if reading in the same segment
00526         if (mWriteSegment == 0 && mReadLimit == mWriteCursor)
00527             mReadLimit = newWriteCursor;
00528 
00529         mWriteCursor = newWriteCursor;
00530 
00531         NS_ASSERTION(mReadCursor != mWriteCursor, "read cursor is bad");
00532 
00533         // update the writable flag on the output stream
00534         if (mWriteCursor == mWriteLimit) {
00535             if (mBuffer.GetSize() >= mBuffer.GetMaxSize())
00536                 mOutput.SetWritable(PR_FALSE);
00537         }
00538 
00539         // notify input stream that pipe now contains additional data
00540         if (mInput.OnInputReadable(bytesWritten, events))
00541             mon.Notify();
00542     }
00543 }
00544 
00545 void
00546 nsPipe::OnPipeException(nsresult reason, PRBool outputOnly)
00547 {
00548     LOG(("PPP nsPipe::OnPipeException [reason=%x output-only=%d]\n",
00549         reason, outputOnly));
00550 
00551     nsPipeEvents events;
00552     {
00553         nsAutoMonitor mon(mMonitor);
00554 
00555         // if we've already hit an exception, then ignore this one.
00556         if (NS_FAILED(mStatus))
00557             return;
00558 
00559         mStatus = reason;
00560 
00561         // an output-only exception applies to the input end if the pipe has
00562         // zero bytes available.
00563         if (outputOnly && !mInput.Available())
00564             outputOnly = PR_FALSE;
00565 
00566         if (!outputOnly)
00567             if (mInput.OnInputException(reason, events))
00568                 mon.Notify();
00569 
00570         if (mOutput.OnOutputException(reason, events))
00571             mon.Notify();
00572     }
00573 }
00574 
00575 //-----------------------------------------------------------------------------
00576 // nsPipeEvents methods:
00577 //-----------------------------------------------------------------------------
00578 
00579 nsPipeEvents::~nsPipeEvents()
00580 {
00581     // dispatch any pending events
00582 
00583     if (mInputCallback) {
00584         mInputCallback->OnInputStreamReady(mInputStream);
00585         mInputCallback = 0;
00586         mInputStream = 0;
00587     }
00588     if (mOutputCallback) {
00589         mOutputCallback->OnOutputStreamReady(mOutputStream);
00590         mOutputCallback = 0;
00591         mOutputStream = 0;
00592     }
00593 }
00594 
00595 //-----------------------------------------------------------------------------
00596 // nsPipeInputStream methods:
00597 //-----------------------------------------------------------------------------
00598 
00599 nsresult
00600 nsPipeInputStream::Wait()
00601 {
00602     NS_ASSERTION(mBlocking, "wait on non-blocking pipe input stream");
00603 
00604     nsAutoMonitor mon(mPipe->mMonitor);
00605 
00606     while (NS_SUCCEEDED(mPipe->mStatus) && (mAvailable == 0)) {
00607         LOG(("III pipe input: waiting for data\n"));
00608 
00609         mBlocked = PR_TRUE;
00610         mon.Wait();
00611         mBlocked = PR_FALSE;
00612 
00613         LOG(("III pipe input: woke up [pipe-status=%x available=%u]\n",
00614             mPipe->mStatus, mAvailable));
00615     }
00616 
00617     return mPipe->mStatus == NS_BASE_STREAM_CLOSED ? NS_OK : mPipe->mStatus;
00618 }
00619 
00620 PRBool
00621 nsPipeInputStream::OnInputReadable(PRUint32 bytesWritten, nsPipeEvents &events)
00622 {
00623     PRBool result = PR_FALSE;
00624 
00625     mAvailable += bytesWritten;
00626 
00627     if (mCallback && !(mCallbackFlags & WAIT_CLOSURE_ONLY)) {
00628         events.NotifyInputReady(this, mCallback);
00629         mCallback = 0;
00630         mCallbackFlags = 0;
00631     }
00632     else if (mBlocked)
00633         result = PR_TRUE;
00634 
00635     return result;
00636 }
00637 
00638 PRBool
00639 nsPipeInputStream::OnInputException(nsresult reason, nsPipeEvents &events)
00640 {
00641     LOG(("nsPipeInputStream::OnInputException [this=%x reason=%x]\n",
00642         this, reason));
00643 
00644     PRBool result = PR_FALSE;
00645 
00646     NS_ASSERTION(NS_FAILED(reason), "huh? successful exception");
00647 
00648     // force count of available bytes to zero.
00649     mAvailable = 0;
00650 
00651     if (mCallback) {
00652         events.NotifyInputReady(this, mCallback);
00653         mCallback = 0;
00654         mCallbackFlags = 0;
00655     }
00656     else if (mBlocked)
00657         result = PR_TRUE;
00658 
00659     return result;
00660 }
00661 
00662 NS_IMETHODIMP_(nsrefcnt)
00663 nsPipeInputStream::AddRef(void)
00664 {
00665     PR_AtomicIncrement((PRInt32*)&mReaderRefCnt);
00666     return mPipe->AddRef();
00667 }
00668 
00669 NS_IMETHODIMP_(nsrefcnt)
00670 nsPipeInputStream::Release(void)
00671 {
00672     nsrefcnt count = PR_AtomicDecrement((PRInt32 *)&mReaderRefCnt);
00673     if (count == 0)
00674         Close();
00675     return mPipe->Release();
00676 }
00677 
00678 NS_IMPL_QUERY_INTERFACE4(nsPipeInputStream,
00679                          nsIInputStream,
00680                          nsIAsyncInputStream,
00681                          nsISeekableStream,
00682                          nsISearchableInputStream)
00683 
00684 NS_IMETHODIMP
00685 nsPipeInputStream::CloseWithStatus(nsresult reason)
00686 {
00687     LOG(("III CloseWithStatus [this=%x reason=%x]\n", this, reason));
00688 
00689     if (NS_SUCCEEDED(reason))
00690         reason = NS_BASE_STREAM_CLOSED;
00691 
00692     mPipe->OnPipeException(reason);
00693     return NS_OK;
00694 }
00695 
00696 NS_IMETHODIMP
00697 nsPipeInputStream::Close()
00698 {
00699     return CloseWithStatus(NS_BASE_STREAM_CLOSED);
00700 }
00701 
00702 NS_IMETHODIMP
00703 nsPipeInputStream::Available(PRUint32 *result)
00704 {
00705     nsAutoMonitor mon(mPipe->mMonitor);
00706 
00707     // return error if pipe closed
00708     if (!mAvailable && NS_FAILED(mPipe->mStatus))
00709         return mPipe->mStatus;
00710 
00711     *result = mAvailable;
00712     return NS_OK;
00713 }
00714 
00715 NS_IMETHODIMP
00716 nsPipeInputStream::ReadSegments(nsWriteSegmentFun writer, 
00717                                 void *closure,  
00718                                 PRUint32 count,
00719                                 PRUint32 *readCount)
00720 {
00721     LOG(("III ReadSegments [this=%x count=%u]\n", this, count));
00722 
00723     nsresult rv = NS_OK;
00724 
00725     const char *segment;
00726     PRUint32 segmentLen;
00727 
00728     *readCount = 0;
00729     while (count) {
00730         rv = mPipe->GetReadSegment(segment, segmentLen);
00731         if (NS_FAILED(rv)) {
00732             // ignore this error if we've already read something.
00733             if (*readCount > 0) {
00734                 rv = NS_OK;
00735                 break;
00736             }
00737             if (rv == NS_BASE_STREAM_WOULD_BLOCK) {
00738                 // pipe is empty
00739                 if (!mBlocking)
00740                     break;
00741                 // wait for some data to be written to the pipe
00742                 rv = Wait();
00743                 if (NS_SUCCEEDED(rv))
00744                     continue;
00745             }
00746             // ignore this error, just return.
00747             if (rv == NS_BASE_STREAM_CLOSED) {
00748                 rv = NS_OK;
00749                 break;
00750             }
00751             mPipe->OnPipeException(rv);
00752             break;
00753         }
00754 
00755         // read no more than count
00756         if (segmentLen > count)
00757             segmentLen = count;
00758 
00759         PRUint32 writeCount, originalLen = segmentLen;
00760         while (segmentLen) {
00761             writeCount = 0;
00762 
00763             rv = writer(this, closure, segment, *readCount, segmentLen, &writeCount);
00764 
00765             if (NS_FAILED(rv) || writeCount == 0) {
00766                 count = 0;
00767                 // any errors returned from the writer end here: do not
00768                 // propogate to the caller of ReadSegments.
00769                 rv = NS_OK;
00770                 break;
00771             }
00772 
00773             NS_ASSERTION(writeCount <= segmentLen, "wrote more than expected");
00774             segment += writeCount;
00775             segmentLen -= writeCount;
00776             count -= writeCount;
00777             *readCount += writeCount;
00778             mLogicalOffset += writeCount;
00779         }
00780 
00781         if (segmentLen < originalLen)
00782             mPipe->AdvanceReadCursor(originalLen - segmentLen);
00783     }
00784 
00785     return rv;
00786 }
00787 
00788 static NS_METHOD
00789 nsWriteToRawBuffer(nsIInputStream* inStr,
00790                    void *closure,
00791                    const char *fromRawSegment,
00792                    PRUint32 offset,
00793                    PRUint32 count,
00794                    PRUint32 *writeCount)
00795 {
00796     char *toBuf = (char*)closure;
00797     memcpy(&toBuf[offset], fromRawSegment, count);
00798     *writeCount = count;
00799     return NS_OK;
00800 }
00801 
00802 NS_IMETHODIMP
00803 nsPipeInputStream::Read(char* toBuf, PRUint32 bufLen, PRUint32 *readCount)
00804 {
00805     return ReadSegments(nsWriteToRawBuffer, toBuf, bufLen, readCount);
00806 }
00807 
00808 NS_IMETHODIMP
00809 nsPipeInputStream::IsNonBlocking(PRBool *aNonBlocking)
00810 {
00811     *aNonBlocking = !mBlocking;
00812     return NS_OK;
00813 }
00814 
00815 NS_IMETHODIMP
00816 nsPipeInputStream::AsyncWait(nsIInputStreamCallback *callback,
00817                              PRUint32 flags,
00818                              PRUint32 requestedCount,
00819                              nsIEventTarget *target)
00820 {
00821     LOG(("III AsyncWait [this=%x]\n", this));
00822 
00823     nsPipeEvents pipeEvents;
00824     {
00825         nsAutoMonitor mon(mPipe->mMonitor);
00826 
00827         // replace a pending callback
00828         mCallback = 0;
00829         mCallbackFlags = 0;
00830 
00831         nsCOMPtr<nsIInputStreamCallback> proxy;
00832         if (target) {
00833             nsresult rv = NS_NewInputStreamReadyEvent(getter_AddRefs(proxy),
00834                                                       callback, target);
00835             if (NS_FAILED(rv)) return rv;
00836             callback = proxy;
00837         }
00838 
00839         if (NS_FAILED(mPipe->mStatus) ||
00840                 (mAvailable && !(flags & WAIT_CLOSURE_ONLY))) {
00841             // stream is already closed or readable; post event.
00842             pipeEvents.NotifyInputReady(this, callback);
00843         }
00844         else {
00845             // queue up callback object to be notified when data becomes available
00846             mCallback = callback;
00847             mCallbackFlags = flags;
00848         }
00849     }
00850     return NS_OK;
00851 }
00852 
00853 NS_IMETHODIMP
00854 nsPipeInputStream::Seek(PRInt32 whence, PRInt64 offset)
00855 {
00856     NS_NOTREACHED("nsPipeInputStream::Seek");
00857     return NS_ERROR_NOT_IMPLEMENTED;
00858 }
00859 
00860 NS_IMETHODIMP
00861 nsPipeInputStream::Tell(PRInt64 *offset)
00862 {
00863     *offset = mLogicalOffset;
00864     return NS_OK;
00865 }
00866 
00867 NS_IMETHODIMP
00868 nsPipeInputStream::SetEOF()
00869 {
00870     NS_NOTREACHED("nsPipeInputStream::SetEOF");
00871     return NS_ERROR_NOT_IMPLEMENTED;
00872 }
00873 
00874 #define COMPARE(s1, s2, i)                                                 \
00875     (ignoreCase                                                            \
00876      ? nsCRT::strncasecmp((const char *)s1, (const char *)s2, (PRUint32)i) \
00877      : nsCRT::strncmp((const char *)s1, (const char *)s2, (PRUint32)i))
00878 
00879 NS_IMETHODIMP
00880 nsPipeInputStream::Search(const char *forString, 
00881                           PRBool ignoreCase,
00882                           PRBool *found,
00883                           PRUint32 *offsetSearchedTo)
00884 {
00885     LOG(("III Search [for=%s ic=%u]\n", forString, ignoreCase));
00886 
00887     nsAutoMonitor mon(mPipe->mMonitor);
00888 
00889     char *cursor1, *limit1;
00890     PRUint32 index = 0, offset = 0;
00891     PRUint32 strLen = strlen(forString);
00892 
00893     mPipe->PeekSegment(0, cursor1, limit1);
00894     if (cursor1 == limit1) {
00895         *found = PR_FALSE;
00896         *offsetSearchedTo = 0;
00897         LOG(("  result [found=%u offset=%u]\n", *found, *offsetSearchedTo));
00898         return NS_OK;
00899     }
00900 
00901     while (PR_TRUE) {
00902         PRUint32 i, len1 = limit1 - cursor1;
00903 
00904         // check if the string is in the buffer segment
00905         for (i = 0; i < len1 - strLen + 1; i++) {
00906             if (COMPARE(&cursor1[i], forString, strLen) == 0) {
00907                 *found = PR_TRUE;
00908                 *offsetSearchedTo = offset + i;
00909                 LOG(("  result [found=%u offset=%u]\n", *found, *offsetSearchedTo));
00910                 return NS_OK;
00911             }
00912         }
00913 
00914         // get the next segment
00915         char *cursor2, *limit2;
00916         PRUint32 len2;
00917 
00918         index++;
00919         offset += len1;
00920 
00921         mPipe->PeekSegment(index, cursor2, limit2);
00922         if (cursor2 == limit2) {
00923             *found = PR_FALSE;
00924             *offsetSearchedTo = offset - strLen + 1;
00925             LOG(("  result [found=%u offset=%u]\n", *found, *offsetSearchedTo));
00926             return NS_OK;
00927         }
00928         len2 = limit2 - cursor2;
00929 
00930         // check if the string is straddling the next buffer segment
00931         PRUint32 lim = PR_MIN(strLen, len2 + 1);
00932         for (i = 0; i < lim; ++i) {
00933             PRUint32 strPart1Len = strLen - i - 1;
00934             PRUint32 strPart2Len = strLen - strPart1Len;
00935             const char* strPart2 = &forString[strLen - strPart2Len];
00936             PRUint32 bufSeg1Offset = len1 - strPart1Len;
00937             if (COMPARE(&cursor1[bufSeg1Offset], forString, strPart1Len) == 0 &&
00938                 COMPARE(cursor2, strPart2, strPart2Len) == 0) {
00939                 *found = PR_TRUE;
00940                 *offsetSearchedTo = offset - strPart1Len;
00941                 LOG(("  result [found=%u offset=%u]\n", *found, *offsetSearchedTo));
00942                 return NS_OK;
00943             }
00944         }
00945 
00946         // finally continue with the next buffer
00947         cursor1 = cursor2;
00948         limit1 = limit2;
00949     }
00950 
00951     NS_NOTREACHED("can't get here");
00952     return NS_ERROR_UNEXPECTED;    // keep compiler happy
00953 }
00954 
00955 //-----------------------------------------------------------------------------
00956 // nsPipeOutputStream methods:
00957 //-----------------------------------------------------------------------------
00958 
00959 nsresult
00960 nsPipeOutputStream::Wait()
00961 {
00962     NS_ASSERTION(mBlocking, "wait on non-blocking pipe output stream");
00963 
00964     nsAutoMonitor mon(mPipe->mMonitor);
00965 
00966     if (NS_SUCCEEDED(mPipe->mStatus) && !mWritable) {
00967         LOG(("OOO pipe output: waiting for space\n"));
00968         mBlocked = PR_TRUE;
00969         mon.Wait();
00970         mBlocked = PR_FALSE;
00971         LOG(("OOO pipe output: woke up [pipe-status=%x writable=%u]\n",
00972             mPipe->mStatus, mWritable == PR_TRUE));
00973     }
00974 
00975     return mPipe->mStatus == NS_BASE_STREAM_CLOSED ? NS_OK : mPipe->mStatus;
00976 }
00977 
00978 PRBool
00979 nsPipeOutputStream::OnOutputWritable(nsPipeEvents &events)
00980 {
00981     PRBool result = PR_FALSE;
00982 
00983     mWritable = PR_TRUE;
00984 
00985     if (mCallback && !(mCallbackFlags & WAIT_CLOSURE_ONLY)) {
00986         events.NotifyOutputReady(this, mCallback);
00987         mCallback = 0;
00988         mCallbackFlags = 0;
00989     }
00990     else if (mBlocked)
00991         result = PR_TRUE;
00992 
00993     return result;
00994 }
00995 
00996 PRBool
00997 nsPipeOutputStream::OnOutputException(nsresult reason, nsPipeEvents &events)
00998 {
00999     LOG(("nsPipeOutputStream::OnOutputException [this=%x reason=%x]\n",
01000         this, reason));
01001 
01002     nsresult result = PR_FALSE;
01003 
01004     NS_ASSERTION(NS_FAILED(reason), "huh? successful exception");
01005     mWritable = PR_FALSE;
01006 
01007     if (mCallback) {
01008         events.NotifyOutputReady(this, mCallback);
01009         mCallback = 0;
01010         mCallbackFlags = 0;
01011     }
01012     else if (mBlocked)
01013         result = PR_TRUE;
01014 
01015     return result;
01016 }
01017 
01018 
01019 NS_IMETHODIMP_(nsrefcnt)
01020 nsPipeOutputStream::AddRef()
01021 {
01022     PR_AtomicIncrement((PRInt32*)&mWriterRefCnt);
01023     return mPipe->AddRef();
01024 }
01025 
01026 NS_IMETHODIMP_(nsrefcnt)
01027 nsPipeOutputStream::Release()
01028 {
01029     nsrefcnt count = PR_AtomicDecrement((PRInt32 *)&mWriterRefCnt);
01030     if (count == 0)
01031         Close();
01032     return mPipe->Release();
01033 }
01034 
01035 NS_IMPL_QUERY_INTERFACE2(nsPipeOutputStream,
01036                          nsIOutputStream,
01037                          nsIAsyncOutputStream)
01038 
01039 NS_IMETHODIMP
01040 nsPipeOutputStream::CloseWithStatus(nsresult reason)
01041 {
01042     LOG(("OOO CloseWithStatus [this=%x reason=%x]\n", this, reason));
01043 
01044     if (NS_SUCCEEDED(reason))
01045         reason = NS_BASE_STREAM_CLOSED;
01046 
01047     // input stream may remain open
01048     mPipe->OnPipeException(reason, PR_TRUE);
01049     return NS_OK;
01050 }
01051 
01052 NS_IMETHODIMP
01053 nsPipeOutputStream::Close()
01054 {
01055     return CloseWithStatus(NS_BASE_STREAM_CLOSED);
01056 }
01057 
01058 NS_IMETHODIMP
01059 nsPipeOutputStream::WriteSegments(nsReadSegmentFun reader,
01060                                   void* closure,
01061                                   PRUint32 count,
01062                                   PRUint32 *writeCount)
01063 {
01064     LOG(("OOO WriteSegments [this=%x count=%u]\n", this, count));
01065 
01066     nsresult rv = NS_OK;
01067 
01068     char *segment;
01069     PRUint32 segmentLen;
01070 
01071     *writeCount = 0;
01072     while (count) {
01073         rv = mPipe->GetWriteSegment(segment, segmentLen);
01074         if (NS_FAILED(rv)) {
01075             if (rv == NS_BASE_STREAM_WOULD_BLOCK) {
01076                 // pipe is full
01077                 if (!mBlocking) {
01078                     // ignore this error if we've already written something
01079                     if (*writeCount > 0)
01080                         rv = NS_OK;
01081                     break;
01082                 }
01083                 // wait for the pipe to have an empty segment.
01084                 rv = Wait();
01085                 if (NS_SUCCEEDED(rv))
01086                     continue;
01087             }
01088             mPipe->OnPipeException(rv);
01089             break;
01090         }
01091 
01092         // write no more than count
01093         if (segmentLen > count)
01094             segmentLen = count;
01095 
01096         PRUint32 readCount, originalLen = segmentLen;
01097         while (segmentLen) {
01098             readCount = 0;
01099 
01100             rv = reader(this, closure, segment, *writeCount, segmentLen, &readCount);
01101 
01102             if (NS_FAILED(rv) || readCount == 0) {
01103                 count = 0;
01104                 // any errors returned from the reader end here: do not
01105                 // propogate to the caller of WriteSegments.
01106                 rv = NS_OK;
01107                 break;
01108             }
01109 
01110             NS_ASSERTION(readCount <= segmentLen, "read more than expected");
01111             segment += readCount;
01112             segmentLen -= readCount;
01113             count -= readCount;
01114             *writeCount += readCount;
01115             mLogicalOffset += readCount;
01116         }
01117 
01118         if (segmentLen < originalLen)
01119             mPipe->AdvanceWriteCursor(originalLen - segmentLen);
01120     }
01121 
01122     return rv;
01123 }
01124 
01125 static NS_METHOD
01126 nsReadFromRawBuffer(nsIOutputStream* outStr,
01127                     void* closure,
01128                     char* toRawSegment,
01129                     PRUint32 offset,
01130                     PRUint32 count,
01131                     PRUint32 *readCount)
01132 {
01133     const char* fromBuf = (const char*)closure;
01134     memcpy(toRawSegment, &fromBuf[offset], count);
01135     *readCount = count;
01136     return NS_OK;
01137 }
01138 
01139 NS_IMETHODIMP
01140 nsPipeOutputStream::Write(const char* fromBuf,
01141                           PRUint32 bufLen, 
01142                           PRUint32 *writeCount)
01143 {
01144     return WriteSegments(nsReadFromRawBuffer, (void*)fromBuf, bufLen, writeCount);
01145 }
01146 
01147 NS_IMETHODIMP
01148 nsPipeOutputStream::Flush(void)
01149 {
01150     // nothing to do
01151     return NS_OK;
01152 }
01153 
01154 static NS_METHOD
01155 nsReadFromInputStream(nsIOutputStream* outStr,
01156                       void* closure,
01157                       char* toRawSegment, 
01158                       PRUint32 offset,
01159                       PRUint32 count,
01160                       PRUint32 *readCount)
01161 {
01162     nsIInputStream* fromStream = (nsIInputStream*)closure;
01163     return fromStream->Read(toRawSegment, count, readCount);
01164 }
01165 
01166 NS_IMETHODIMP
01167 nsPipeOutputStream::WriteFrom(nsIInputStream* fromStream,
01168                               PRUint32 count,
01169                               PRUint32 *writeCount)
01170 {
01171     return WriteSegments(nsReadFromInputStream, fromStream, count, writeCount);
01172 }
01173 
01174 NS_IMETHODIMP
01175 nsPipeOutputStream::IsNonBlocking(PRBool *aNonBlocking)
01176 {
01177     *aNonBlocking = !mBlocking;
01178     return NS_OK;
01179 }
01180 
01181 NS_IMETHODIMP
01182 nsPipeOutputStream::AsyncWait(nsIOutputStreamCallback *callback,
01183                               PRUint32 flags,
01184                               PRUint32 requestedCount,
01185                               nsIEventTarget *target)
01186 {
01187     LOG(("OOO AsyncWait [this=%x]\n", this));
01188 
01189     nsPipeEvents pipeEvents;
01190     {
01191         nsAutoMonitor mon(mPipe->mMonitor);
01192 
01193         // replace a pending callback
01194         mCallback = 0;
01195         mCallbackFlags = 0;
01196 
01197         nsCOMPtr<nsIOutputStreamCallback> proxy;
01198         if (target) {
01199             nsresult rv = NS_NewOutputStreamReadyEvent(getter_AddRefs(proxy),
01200                                                        callback, target);
01201             if (NS_FAILED(rv)) return rv;
01202             callback = proxy;
01203         }
01204 
01205         if (NS_FAILED(mPipe->mStatus) ||
01206                 (mWritable && !(flags & WAIT_CLOSURE_ONLY))) {
01207             // stream is already closed or writable; post event.
01208             pipeEvents.NotifyOutputReady(this, callback);
01209         }
01210         else {
01211             // queue up callback object to be notified when data becomes available
01212             mCallback = callback;
01213             mCallbackFlags = flags;
01214         }
01215     }
01216     return NS_OK;
01217 }
01218 
01219 NS_IMETHODIMP
01220 nsPipeOutputStream::Seek(PRInt32 whence, PRInt64 offset)
01221 {
01222     NS_NOTREACHED("nsPipeOutputStream::Seek");
01223     return NS_ERROR_NOT_IMPLEMENTED;
01224 }
01225 
01226 NS_IMETHODIMP
01227 nsPipeOutputStream::Tell(PRInt64 *offset)
01228 {
01229     *offset = mLogicalOffset;
01230     return NS_OK;
01231 }
01232 
01233 NS_IMETHODIMP
01234 nsPipeOutputStream::SetEOF()
01235 {
01236     NS_NOTREACHED("nsPipeOutputStream::SetEOF");
01237     return NS_ERROR_NOT_IMPLEMENTED;
01238 }
01239 
01241 
01242 NS_COM nsresult
01243 NS_NewPipe2(nsIAsyncInputStream **pipeIn,
01244             nsIAsyncOutputStream **pipeOut,
01245             PRBool nonBlockingInput,
01246             PRBool nonBlockingOutput,
01247             PRUint32 segmentSize,
01248             PRUint32 segmentCount,
01249             nsIMemory *segmentAlloc)
01250 {
01251     nsresult rv;
01252 
01253     nsPipe *pipe = new nsPipe();
01254     if (!pipe)
01255         return NS_ERROR_OUT_OF_MEMORY;
01256 
01257     rv = pipe->Init(nonBlockingInput,
01258                     nonBlockingOutput,
01259                     segmentSize,
01260                     segmentCount,
01261                     segmentAlloc);
01262     if (NS_FAILED(rv)) {
01263         NS_ADDREF(pipe);
01264         NS_RELEASE(pipe);
01265         return rv;
01266     }
01267 
01268     pipe->GetInputStream(pipeIn);
01269     pipe->GetOutputStream(pipeOut);
01270     return NS_OK;
01271 }
01272 
01273 NS_METHOD
01274 nsPipeConstructor(nsISupports *outer, REFNSIID iid, void **result)
01275 {
01276     if (outer)
01277         return NS_ERROR_NO_AGGREGATION;
01278     nsPipe *pipe = new nsPipe();
01279     if (!pipe)
01280         return NS_ERROR_OUT_OF_MEMORY;
01281     NS_ADDREF(pipe);
01282     nsresult rv = pipe->QueryInterface(iid, result);
01283     NS_RELEASE(pipe);
01284     return rv;
01285 }
01286