Back to index

lightning-sunbird  0.9+nobinonly
nsStreamListenerProxy.cpp
Go to the documentation of this file.
00001 /* -*- Mode: C++; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
00002 /* ***** BEGIN LICENSE BLOCK *****
00003  * Version: MPL 1.1/GPL 2.0/LGPL 2.1
00004  *
00005  * The contents of this file are subject to the Mozilla Public License Version
00006  * 1.1 (the "License"); you may not use this file except in compliance with
00007  * the License. You may obtain a copy of the License at
00008  * http://www.mozilla.org/MPL/
00009  *
00010  * Software distributed under the License is distributed on an "AS IS" basis,
00011  * WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License
00012  * for the specific language governing rights and limitations under the
00013  * License.
00014  *
00015  * The Original Code is mozilla.org code.
00016  *
00017  * The Initial Developer of the Original Code is
00018  * Netscape Communications Corporation.
00019  * Portions created by the Initial Developer are Copyright (C) 2001
00020  * the Initial Developer. All Rights Reserved.
00021  *
00022  * Contributor(s):
00023  *   Darin Fisher <darin@netscape.com> (original author)
00024  *
00025  * Alternatively, the contents of this file may be used under the terms of
00026  * either the GNU General Public License Version 2 or later (the "GPL"), or
00027  * the GNU Lesser General Public License Version 2.1 or later (the "LGPL"),
00028  * in which case the provisions of the GPL or the LGPL are applicable instead
00029  * of those above. If you wish to allow use of your version of this file only
00030  * under the terms of either the GPL or the LGPL, and not to allow others to
00031  * use your version of this file under the terms of the MPL, indicate your
00032  * decision by deleting the provisions above and replace them with the notice
00033  * and other provisions required by the GPL or the LGPL. If you do not delete
00034  * the provisions above, a recipient may use your version of this file under
00035  * the terms of any one of the MPL, the GPL or the LGPL.
00036  *
00037  * ***** END LICENSE BLOCK ***** */
00038 
00039 #include "nsStreamListenerProxy.h"
00040 #include "netCore.h"
00041 #include "nsIGenericFactory.h"
00042 #include "nsIPipe.h"
00043 #include "nsAutoLock.h"
00044 #include "prlog.h"
00045 #include "nsIOService.h"
00046 
00047 #if defined(PR_LOGGING)
00048 static PRLogModuleInfo *gStreamListenerProxyLog;
00049 #endif
00050 
00051 #define LOG(args) PR_LOG(gStreamListenerProxyLog, PR_LOG_DEBUG, args)
00052 
00053 #define DEFAULT_BUFFER_SEGMENT_SIZE 4096
00054 #define DEFAULT_BUFFER_MAX_SIZE  (4*4096)
00055 
00056 //----------------------------------------------------------------------------
00057 // Design Overview
00058 //
00059 // A stream listener proxy maintains a pipe.  When the request makes data
00060 // available, the proxy copies as much of that data as possible into the pipe.
00061 // If data was written to the pipe, then the proxy posts an asynchronous event
00062 // corresponding to the amount of data written.  If no data could be written,
00063 // because the pipe was full, then WOULD_BLOCK is returned to the request,
00064 // indicating that the request should suspend itself.
00065 //
00066 // Once suspended in this manner, the request is only resumed when the pipe is
00067 // emptied.
00068 //
00069 // XXX The current design does NOT allow the request to be "externally"
00070 // suspended!!  For the moment this is not a problem, but it should be fixed.
00071 //----------------------------------------------------------------------------
00072 
00073 //----------------------------------------------------------------------------
00074 // nsStreamListenerProxy implementation...
00075 //----------------------------------------------------------------------------
00076 
00077 nsStreamListenerProxy::nsStreamListenerProxy()
00078     : mObserverProxy(nsnull)
00079     , mLock(nsnull)
00080     , mPendingCount(0)
00081     , mPipeEmptied(PR_FALSE)
00082     , mListenerStatus(NS_OK)
00083 {
00084 }
00085 
00086 nsStreamListenerProxy::~nsStreamListenerProxy()
00087 {
00088     if (mLock) {
00089         PR_DestroyLock(mLock);
00090         mLock = nsnull;
00091     }
00092     NS_IF_RELEASE(mObserverProxy);
00093 }
00094 
00095 nsresult
00096 nsStreamListenerProxy::GetListener(nsIStreamListener **listener)
00097 {
00098     NS_ENSURE_TRUE(mObserverProxy, NS_ERROR_NOT_INITIALIZED);
00099     nsIRequestObserver* obs = mObserverProxy->Observer();
00100     if (!obs)
00101         return NS_ERROR_NULL_POINTER;
00102     return CallQueryInterface(obs, listener);
00103 }
00104 
00105 PRUint32
00106 nsStreamListenerProxy::GetPendingCount()
00107 {
00108     return PR_AtomicSet((PRInt32 *) &mPendingCount, 0);
00109 }
00110 
00111 //----------------------------------------------------------------------------
00112 // nsOnDataAvailableEvent internal class...
00113 //----------------------------------------------------------------------------
00114 
00115 class nsOnDataAvailableEvent : public nsARequestObserverEvent
00116 {
00117 public:
00118     nsOnDataAvailableEvent(nsStreamListenerProxy *proxy,
00119                            nsIRequest *request,
00120                            nsISupports *context,
00121                            nsIInputStream *source,
00122                            PRUint32 offset)
00123         : nsARequestObserverEvent(request, context)
00124         , mProxy(proxy)
00125         , mSource(source)
00126         , mOffset(offset)
00127     {
00128         MOZ_COUNT_CTOR(nsOnDataAvailableEvent);
00129         NS_PRECONDITION(mProxy, "null pointer");
00130         NS_ADDREF(mProxy);
00131     }
00132 
00133    ~nsOnDataAvailableEvent()
00134     {
00135         MOZ_COUNT_DTOR(nsOnDataAvailableEvent);
00136         NS_RELEASE(mProxy);
00137     }
00138 
00139     void HandleEvent();
00140 
00141 protected:
00142     nsStreamListenerProxy    *mProxy;
00143     nsCOMPtr<nsIInputStream>  mSource;
00144     PRUint32                  mOffset;
00145 };
00146 
00147 void
00148 nsOnDataAvailableEvent::HandleEvent()
00149 {
00150     LOG(("nsOnDataAvailableEvent: HandleEvent [req=%x]", mRequest.get()));
00151 
00152     if (NS_FAILED(mProxy->GetListenerStatus())) {
00153         LOG(("nsOnDataAvailableEvent: Discarding event [listener_status=%x, req=%x]\n",
00154             mProxy->GetListenerStatus(), mRequest.get()));
00155         return;
00156     }
00157 
00158     nsresult status = NS_OK;
00159     nsresult rv = mRequest->GetStatus(&status);
00160     NS_ASSERTION(NS_SUCCEEDED(rv), "GetStatus failed");
00161 
00162     // We should only forward this event to the listener if the request is
00163     // still in a "good" state.  Because these events are being processed
00164     // asynchronously, there is a very real chance that the listener might
00165     // have cancelled the request after _this_ event was triggered.
00166 
00167     if (NS_FAILED(status)) {
00168         LOG(("nsOnDataAvailableEvent: Not calling OnDataAvailable [req=%x]",
00169             mRequest.get()));
00170         return;
00171     }
00172 
00173     // Find out from the listener proxy how many bytes to report.
00174     PRUint32 count = mProxy->GetPendingCount();
00175 
00176 #if defined(PR_LOGGING)
00177     {
00178         PRUint32 avail;
00179         mSource->Available(&avail);
00180         LOG(("nsOnDataAvailableEvent: Calling the consumer's OnDataAvailable "
00181             "[offset=%u count=%u avail=%u req=%x]\n",
00182             mOffset, count, avail, mRequest.get()));
00183     }
00184 #endif
00185 
00186     nsCOMPtr<nsIStreamListener> listener;
00187     rv = mProxy->GetListener(getter_AddRefs(listener));
00188 
00189     LOG(("handle dataevent=%8lX\n",(long)this));
00190 
00191     // Forward call to listener
00192     if (listener)
00193         rv = listener->OnDataAvailable(mRequest, mContext,
00194                                        mSource, mOffset, count);
00195 
00196     LOG(("nsOnDataAvailableEvent: Done with the consumer's OnDataAvailable "
00197          "[rv=%x, req=%x]\n", rv, mRequest.get()));
00198 
00199     // XXX Need to suspend the underlying request... must consider
00200     //     other pending events (such as OnStopRequest). These
00201     //     should not be forwarded to the listener if the request
00202     //     is suspended. Also, handling the Resume could be tricky.
00203 
00204     if (rv == NS_BASE_STREAM_WOULD_BLOCK) {
00205         NS_NOTREACHED("listener returned NS_BASE_STREAM_WOULD_BLOCK"
00206                       " -- support for this is not implemented");
00207         rv = NS_BINDING_FAILED;
00208     }
00209 
00210     // Cancel the request on unexpected errors
00211     if (NS_FAILED(rv) && (rv != NS_BASE_STREAM_CLOSED)) {
00212         LOG(("OnDataAvailable failed [rv=%x] canceling request!\n"));
00213         mRequest->Cancel(rv);
00214     }
00215 
00216     mProxy->SetListenerStatus(rv);
00217 }
00218 
00219 //----------------------------------------------------------------------------
00220 // nsStreamListenerProxy::nsISupports implementation...
00221 //----------------------------------------------------------------------------
00222 
00223 NS_IMPL_THREADSAFE_ISUPPORTS4(nsStreamListenerProxy,
00224                               nsIStreamListener,
00225                               nsIRequestObserver,
00226                               nsIStreamListenerProxy,
00227                               nsIInputStreamObserver)
00228 
00229 //----------------------------------------------------------------------------
00230 // nsStreamListenerProxy::nsIRequestObserver implementation...
00231 //----------------------------------------------------------------------------
00232 
00233 NS_IMETHODIMP
00234 nsStreamListenerProxy::OnStartRequest(nsIRequest *request,
00235                                       nsISupports *context)
00236 {
00237     NS_ENSURE_TRUE(mObserverProxy, NS_ERROR_NOT_INITIALIZED);
00238 
00239     nsresult rv;
00240     nsCOMPtr<nsIObservableInputStream> obs(do_QueryInterface(mPipeIn, &rv));
00241     if (NS_FAILED(rv)) return rv;
00242 
00243     // This will create a cyclic reference between the pipe and |this|, which
00244     // will be broken when onStopRequest is called.
00245     rv = obs->SetObserver(this);
00246     if (NS_FAILED(rv)) return rv;
00247 
00248     return mObserverProxy->OnStartRequest(request, context);
00249 }
00250 
00251 NS_IMETHODIMP
00252 nsStreamListenerProxy::OnStopRequest(nsIRequest *request,
00253                                      nsISupports *context,
00254                                      nsresult status)
00255 {
00256     NS_ENSURE_TRUE(mObserverProxy, NS_ERROR_NOT_INITIALIZED);
00257 
00258     mPipeIn = 0;
00259     mPipeOut = 0;
00260 
00261     return mObserverProxy->OnStopRequest(request, context, status);
00262 }
00263 
00264 //----------------------------------------------------------------------------
00265 // nsIStreamListener implementation...
00266 //----------------------------------------------------------------------------
00267 
00268 NS_IMETHODIMP
00269 nsStreamListenerProxy::OnDataAvailable(nsIRequest *request,
00270                                        nsISupports *context,
00271                                        nsIInputStream *source,
00272                                        PRUint32 offset,
00273                                        PRUint32 count)
00274 {
00275     nsresult rv;
00276     PRUint32 bytesWritten=0;
00277 
00278     LOG(("nsStreamListenerProxy: OnDataAvailable [offset=%u count=%u req=%x]\n",
00279          offset, count, request));
00280 
00281     NS_ENSURE_TRUE(mObserverProxy, NS_ERROR_NOT_INITIALIZED);
00282     NS_PRECONDITION(mRequestToResume == 0, "Unexpected call to OnDataAvailable");
00283     NS_PRECONDITION(mPipeIn, "Pipe not initialized");
00284     NS_PRECONDITION(mPipeOut, "Pipe not initialized");
00285 
00286     //
00287     // Any non-successful listener status gets passed back to the caller
00288     //
00289     {
00290         nsresult status = mListenerStatus;
00291         if (NS_FAILED(status)) {
00292             LOG(("nsStreamListenerProxy: Listener failed [status=%x req=%x]\n",
00293                 status, request));
00294             return status;
00295         }
00296     }
00297     
00298     while (1) {
00299         mPipeEmptied = PR_FALSE;
00300         // 
00301         // Try to copy data into the pipe.
00302         //
00303         // If the pipe is full, then we return NS_BASE_STREAM_WOULD_BLOCK
00304         // so the calling request can suspend itself.  If, however, we detect
00305         // that the pipe was emptied during this time, we retry copying data
00306         // into the pipe.
00307         //
00308         rv = mPipeOut->WriteFrom(source, count, &bytesWritten);
00309 
00310         LOG(("nsStreamListenerProxy: Wrote data to pipe [rv=%x count=%u bytesWritten=%u req=%x]\n",
00311             rv, count, bytesWritten, request));
00312 
00313         if (NS_FAILED(rv)) {
00314             if (rv == NS_BASE_STREAM_WOULD_BLOCK) {
00315                 nsAutoLock lock(mLock);
00316                 if (mPipeEmptied) {
00317                     LOG(("nsStreamListenerProxy: Pipe emptied; looping back [req=%x]\n", request));
00318                     continue;
00319                 }
00320                 LOG(("nsStreamListenerProxy: Pipe full; setting request to resume [req=%x]\n", request));
00321                 mRequestToResume = request;
00322             }
00323             return rv;
00324         }
00325         if (bytesWritten == 0) {
00326             LOG(("nsStreamListenerProxy: Copied zero bytes; not posting an event [req=%x]\n", request));
00327             return NS_OK;
00328         }
00329 
00330         // Copied something into the pipe...
00331         break;
00332     }
00333 
00334     //
00335     // Update the pending count; return if able to piggy-back on a pending event.
00336     //
00337     PRUint32 total = PR_AtomicAdd((PRInt32 *) &mPendingCount, bytesWritten);
00338     if (total > bytesWritten) {
00339         LOG(("nsStreamListenerProxy: Piggy-backing pending event [total=%u, req=%x]\n",
00340             total, request));
00341         return NS_OK;
00342     }
00343  
00344     //
00345     // Post an event for the number of bytes actually written.
00346     //
00347     nsOnDataAvailableEvent *ev =
00348         new nsOnDataAvailableEvent(this, request, context, mPipeIn, offset);
00349     if (!ev) return NS_ERROR_OUT_OF_MEMORY;
00350 
00351     // Reuse the event queue of the observer proxy
00352     LOG(("post dataevent=%8lX queue=%8lX\n",(long)ev,(long)mObserverProxy->EventQueue()));
00353     rv = mObserverProxy->FireEvent(ev);
00354     if (NS_FAILED(rv))
00355         delete ev;
00356     return rv;
00357 }
00358 
00359 //----------------------------------------------------------------------------
00360 // nsStreamListenerProxy::nsIStreamListenerProxy implementation...
00361 //----------------------------------------------------------------------------
00362 
00363 NS_IMETHODIMP
00364 nsStreamListenerProxy::Init(nsIStreamListener *listener,
00365                             nsIEventQueue *eventQ,
00366                             PRUint32 bufferSegmentSize,
00367                             PRUint32 bufferMaxSize)
00368 {
00369     NS_ENSURE_ARG_POINTER(listener);
00370 
00371 #if defined(PR_LOGGING)
00372     if (!gStreamListenerProxyLog)
00373         gStreamListenerProxyLog = PR_NewLogModule("nsStreamListenerProxy");
00374 #endif
00375 
00376     //
00377     // Create the RequestToResume lock
00378     //
00379     mLock = PR_NewLock();
00380     if (!mLock) return NS_ERROR_OUT_OF_MEMORY;
00381 
00382     //
00383     // Create the request observer proxy
00384     //
00385     mObserverProxy = new nsRequestObserverProxy();
00386     if (!mObserverProxy) return NS_ERROR_OUT_OF_MEMORY;
00387     NS_ADDREF(mObserverProxy);
00388 
00389     //
00390     // Create the pipe
00391     //
00392     if (bufferSegmentSize == 0)
00393         bufferSegmentSize = DEFAULT_BUFFER_SEGMENT_SIZE;
00394     if (bufferMaxSize == 0)
00395         bufferMaxSize = DEFAULT_BUFFER_MAX_SIZE;
00396     // The segment size must not exceed the maximum
00397     bufferSegmentSize = PR_MIN(bufferMaxSize, bufferSegmentSize);
00398 
00399     // Use the necko buffer cache for the default buffers
00400     nsIMemory *allocator = nsnull;
00401     if (bufferSegmentSize == DEFAULT_BUFFER_SEGMENT_SIZE)
00402         allocator = nsIOService::gBufferCache;
00403     nsresult rv = NS_NewPipe(getter_AddRefs(mPipeIn),
00404                              getter_AddRefs(mPipeOut),
00405                              bufferSegmentSize,
00406                              bufferMaxSize,
00407                              PR_TRUE, PR_TRUE, allocator);
00408     if (NS_FAILED(rv)) return rv;
00409 
00410     nsCOMPtr<nsIRequestObserver> observer = do_QueryInterface(listener);
00411     return mObserverProxy->Init(observer, eventQ);
00412 }
00413 
00414 //----------------------------------------------------------------------------
00415 // nsStreamListenerProxy::nsIInputStreamObserver implementation...
00416 //----------------------------------------------------------------------------
00417 
00418 NS_IMETHODIMP
00419 nsStreamListenerProxy::OnEmpty(nsIInputStream *inputStream)
00420 {
00421     LOG(("nsStreamListenerProxy: OnEmpty\n"));
00422     //
00423     // The pipe has been emptied by the listener.  If the request
00424     // has been suspended (waiting for the pipe to be emptied), then
00425     // go ahead and resume it.  But take care not to resume while 
00426     // holding the "ChannelToResume" lock.
00427     //
00428     nsCOMPtr<nsIRequest> req;
00429     {
00430         nsAutoLock lock(mLock);
00431         if (mRequestToResume) {
00432             req = mRequestToResume;
00433             mRequestToResume = 0;
00434         }
00435         mPipeEmptied = PR_TRUE; // Flag this call
00436     }
00437     if (req) {
00438         LOG(("nsStreamListenerProxy: Resuming request\n"));
00439         req->Resume();
00440     }
00441     return NS_OK;
00442 }
00443 
00444 NS_IMETHODIMP
00445 nsStreamListenerProxy::OnClose(nsIInputStream *aInputStream)
00446 {
00447     LOG(("nsStreamListenerProxy: OnClose\n"));
00448     return NS_OK;
00449 }