Back to index

lightning-sunbird  0.9+nobinonly
tmTransactionService.cpp
Go to the documentation of this file.
00001 /* ***** BEGIN LICENSE BLOCK *****
00002  * Version: MPL 1.1/GPL 2.0/LGPL 2.1
00003  *
00004  * The contents of this file are subject to the Mozilla Public License Version
00005  * 1.1 (the "License"); you may not use this file except in compliance with
00006  * the License. You may obtain a copy of the License at
00007  * http://www.mozilla.org/MPL/
00008  *
00009  * Software distributed under the License is distributed on an "AS IS" basis,
00010  * WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License
00011  * for the specific language governing rights and limitations under the
00012  * License.
00013  *
00014  * The Original Code is Mozilla Transaction Manager.
00015  *
00016  * The Initial Developer of the Original Code is
00017  * Netscape Communications Corp.
00018  * Portions created by the Initial Developer are Copyright (C) 2003
00019  * the Initial Developer. All Rights Reserved.
00020  *
00021  * Contributor(s):
00022  *   John Gaunt <jgaunt@netscape.com>
00023  *
00024  * Alternatively, the contents of this file may be used under the terms of
00025  * either the GNU General Public License Version 2 or later (the "GPL"), or
00026  * the GNU Lesser General Public License Version 2.1 or later (the "LGPL"),
00027  * in which case the provisions of the GPL or the LGPL are applicable instead
00028  * of those above. If you wish to allow use of your version of this file only
00029  * under the terms of either the GPL or the LGPL, and not to allow others to
00030  * use your version of this file under the terms of the MPL, indicate your
00031  * decision by deleting the provisions above and replace them with the notice
00032  * and other provisions required by the GPL or the LGPL. If you do not delete
00033  * the provisions above, a recipient may use your version of this file under
00034  * the terms of any one of the MPL, the GPL or the LGPL.
00035  *
00036  * ***** END LICENSE BLOCK ***** */
00037 
00038 #include "nsCOMPtr.h"
00039 #include "nsIServiceManager.h"
00040 #include "nsReadableUtils.h"
00041 #include "plstr.h"
00042 #include "ipcITransactionObserver.h"
00043 #include "tmTransaction.h"
00044 #include "tmTransactionService.h"
00045 #include "tmUtils.h"
00046 
00047 static const nsID kTransModuleID = TRANSACTION_MODULE_ID;
00048 
00049 struct tm_waiting_msg {
00050   tmTransaction trans;      // a transaction waiting to be sent to a queue
00051   char*         domainName; // the short queue name
00052 
00053   ~tm_waiting_msg();
00054 };
00055 
00056 tm_waiting_msg::~tm_waiting_msg() {
00057   if (domainName)
00058     PL_strfree(domainName);
00059 }
00060 
00061 struct tm_queue_mapping {
00062   PRInt32 queueID;          // the ID in the TM
00063   char*   domainName;       // used by the consumers of this service
00064   char*   joinedQueueName;  // used by the service -- namespace + domain name
00065 
00066   ~tm_queue_mapping();
00067 };
00068 
00069 tm_queue_mapping::~tm_queue_mapping() {
00070   if (domainName)
00071     PL_strfree(domainName);
00072   if (joinedQueueName)
00073     PL_strfree(joinedQueueName);
00074 }
00075 
00077 // Constructor and Destructor
00078 
00079 tmTransactionService::~tmTransactionService() {
00080 
00081   // just destroy this, it contains 2 pointers it doesn't own.
00082   if (mObservers)
00083     PL_HashTableDestroy(mObservers);
00084 
00085   PRUint32 index = 0;
00086   PRUint32 size = mWaitingMessages.Size();
00087   tm_waiting_msg *msg = nsnull;
00088   for ( ; index < size; index ++) {
00089     msg = (tm_waiting_msg*) mWaitingMessages[index];
00090     delete msg;
00091   }
00092 
00093   size = mQueueMaps.Size();
00094   tm_queue_mapping *qmap = nsnull;
00095   for (index = 0; index < size; index++) {
00096     qmap = (tm_queue_mapping*) mQueueMaps[index];
00097     if (qmap)
00098       delete qmap;
00099   }
00100 }
00101 
00103 // ISupports
00104 
00105 NS_IMPL_ISUPPORTS2(tmTransactionService,
00106                    ipcITransactionService,
00107                    ipcIMessageObserver)
00108 
00109 
00110 // ipcITransactionService
00111 
00112 NS_IMETHODIMP
00113 tmTransactionService::Init(const nsACString & aNamespace) {
00114 
00115   nsresult rv;
00116   
00117   rv = IPC_DefineTarget(kTransModuleID, this, PR_TRUE);
00118   if (NS_FAILED(rv))
00119     return rv;
00120 
00121   // get the lock service
00122   lockService = do_GetService("@mozilla.org/ipc/lock-service;1", &rv);
00123   if (NS_FAILED(rv))
00124     return rv;
00125 
00126   // create some internal storage
00127   mObservers = PL_NewHashTable(20, 
00128                                PL_HashString, 
00129                                PL_CompareStrings, 
00130                                PL_CompareValues, 0, 0);
00131   if (!mObservers)
00132     return NS_ERROR_OUT_OF_MEMORY;
00133 
00134   // init some internal storage
00135   mQueueMaps.Init();
00136   mWaitingMessages.Init();
00137 
00138   // store the namespace
00139   mNamespace.Assign(aNamespace);
00140   return NS_OK;
00141 }
00142 
00143 NS_IMETHODIMP
00144 tmTransactionService::Attach(const nsACString & aDomainName, 
00145                              ipcITransactionObserver *aObserver,
00146                              PRBool aLockingCall) {
00147 
00148   // if the queue already exists, then someone else is attached to it. must
00149   //   return an error here. Only one module attached to a queue per app.
00150   if (GetQueueID(aDomainName) != TM_NO_ID)
00151     return TM_ERROR_QUEUE_EXISTS;
00152   if (!mObservers)
00153     return NS_ERROR_NOT_INITIALIZED;
00154 
00155   // create the full queue name: namespace + queue
00156   nsCString jQName;
00157   jQName.Assign(mNamespace);
00158   jQName.Append(aDomainName);
00159 
00160   // this char* has two homes, make sure it gets PL_free()ed properly
00161   char* joinedQueueName = ToNewCString(jQName);
00162   if (!joinedQueueName)
00163     return NS_ERROR_OUT_OF_MEMORY;
00164 
00165   // link the observer to the joinedqueuename.  home #1 for joinedQueueName
00166   // these currently don't get removed until the destructor on this is called.
00167   PL_HashTableAdd(mObservers, joinedQueueName, aObserver);
00168 
00169   // store the domainName and JoinedQueueName, create a place to store the ID
00170   tm_queue_mapping *qm = new tm_queue_mapping();
00171   if (!qm)
00172     return NS_ERROR_OUT_OF_MEMORY;
00173   qm->queueID = TM_NO_ID;                   // initially no ID for the queue
00174   qm->joinedQueueName = joinedQueueName;    // home #2, owner of joinedQueueName
00175   qm->domainName = ToNewCString(aDomainName);
00176   if (!qm->domainName) {
00177     PL_HashTableRemove(mObservers, joinedQueueName);
00178     delete qm;
00179     return NS_ERROR_OUT_OF_MEMORY;
00180   }
00181   mQueueMaps.Append(qm);
00182 
00183   nsresult rv = NS_ERROR_FAILURE;
00184   tmTransaction trans;
00185 
00186   // acquire a lock if necessary
00187   if (aLockingCall)
00188     lockService->AcquireLock(joinedQueueName, PR_TRUE);
00189   // XXX need to handle lock failures
00190 
00191   if (NS_SUCCEEDED(trans.Init(0,                             // no IPC client
00192                               TM_NO_ID,                      // qID gets returned to us
00193                               TM_ATTACH,                     // action
00194                               NS_OK,                         // default status
00195                               (PRUint8 *)joinedQueueName,    // qName gets copied
00196                               PL_strlen(joinedQueueName)+1))) { // message length
00197     // send the attach msg
00198     SendMessage(&trans, PR_TRUE);  // synchronous
00199     rv = NS_OK;
00200   }
00201 
00202   // drop the lock if necessary
00203   if (aLockingCall)
00204     lockService->ReleaseLock(joinedQueueName);
00205 
00206   return rv;
00207 }
00208 
00209 // actual removal of the observer takes place when we get the detach reply
00210 NS_IMETHODIMP
00211 tmTransactionService::Detach(const nsACString & aDomainName) {
00212 
00213   // asynchronous detach
00214   return SendDetachOrFlush(GetQueueID(aDomainName), TM_DETACH, PR_FALSE);
00215 
00216 }
00217 
00218 NS_IMETHODIMP
00219 tmTransactionService::Flush(const nsACString & aDomainName,
00220                             PRBool aLockingCall) {
00221   // acquire a lock if necessary
00222   if (aLockingCall)
00223     lockService->AcquireLock(GetJoinedQueueName(aDomainName), PR_TRUE);
00224 
00225   // synchronous flush
00226   nsresult rv = SendDetachOrFlush(GetQueueID(aDomainName), TM_FLUSH, PR_TRUE);
00227 
00228   // drop the lock if necessary
00229   if (aLockingCall)
00230     lockService->ReleaseLock(GetJoinedQueueName(aDomainName));
00231 
00232   return rv;
00233 
00234 }
00235 
00236 NS_IMETHODIMP
00237 tmTransactionService::PostTransaction(const nsACString & aDomainName, 
00238                                       const PRUint8 *aData, 
00239                                       PRUint32 aDataLen) {
00240 
00241   tmTransaction trans;
00242   if (NS_SUCCEEDED(trans.Init(0,                       // no IPC client
00243                               GetQueueID(aDomainName), // qID returned to us
00244                               TM_POST,                 // action
00245                               NS_OK,                   // default status
00246                               aData,                   // message data
00247                               aDataLen))) {             // message length
00248     if (trans.GetQueueID() == TM_NO_ID) {
00249       // stack it and pack it
00250       tm_waiting_msg *msg = new tm_waiting_msg(); 
00251       if (!msg)
00252         return NS_ERROR_OUT_OF_MEMORY;
00253       msg->trans = trans;
00254       msg->domainName = ToNewCString(aDomainName);
00255       if (!msg->domainName) {
00256         delete msg;
00257         return NS_ERROR_OUT_OF_MEMORY;
00258       }
00259       mWaitingMessages.Append(msg);
00260     }
00261     else {
00262       // send it
00263       SendMessage(&trans, PR_FALSE);
00264     }
00265     return NS_OK;
00266   }
00267   return NS_ERROR_FAILURE;
00268 }
00269 
00271 // ipcIMessageObserver
00272 
00273 NS_IMETHODIMP
00274 tmTransactionService::OnMessageAvailable(const PRUint32 aSenderID,
00275                                          const nsID & aTarget, 
00276                                          const PRUint8 *aData, 
00277                                          PRUint32 aDataLength) {
00278 
00279   nsresult rv = NS_ERROR_OUT_OF_MEMORY; // prime the return value
00280 
00281   tmTransaction *trans = new tmTransaction();
00282   if (trans) {
00283     rv = trans->Init(0,                      // no IPC client ID
00284                      TM_INVALID_ID,          // in aData
00285                      TM_INVALID_ID,          // in aData
00286                      TM_INVALID_ID,          // in aData
00287                      aData,                  // message data
00288                      aDataLength);           // message length
00289 
00290     if (NS_SUCCEEDED(rv)) {
00291       switch(trans->GetAction()) {
00292       case TM_ATTACH_REPLY:
00293         OnAttachReply(trans);
00294         break;
00295       case TM_POST_REPLY:
00296         // OnPostReply() would be called here
00297         //   isn't necessary at the current time 2/19/03
00298         break;
00299       case TM_DETACH_REPLY:
00300         OnDetachReply(trans);
00301         break;
00302       case TM_FLUSH_REPLY:
00303         OnFlushReply(trans);
00304         break;
00305       case TM_POST:
00306         OnPost(trans);
00307         break;
00308       default: // error, should not happen
00309         NS_NOTREACHED("Recieved a TM reply outside of mapped messages");
00310         break;
00311       }
00312     }
00313     delete trans;
00314   }
00315   return rv;
00316 }
00317 
00319 // Protected Member Functions
00320 
00321 void
00322 tmTransactionService::SendMessage(tmTransaction *aTrans, PRBool aSync) {
00323 
00324   NS_ASSERTION(aTrans, "tmTransactionService::SendMessage called with null transaction");
00325 
00326   IPC_SendMessage(0, kTransModuleID, 
00327                   aTrans->GetRawMessage(), 
00328                   aTrans->GetRawMessageLength());
00329   if (aSync)
00330     IPC_WaitMessage(0, kTransModuleID, nsnull, PR_INTERVAL_NO_TIMEOUT);
00331 }
00332 
00333 void
00334 tmTransactionService::OnAttachReply(tmTransaction *aTrans) {
00335 
00336   // if we attached, store the queue's ID
00337   if (aTrans->GetStatus() >= 0) {
00338 
00339     PRUint32 size = mQueueMaps.Size();
00340     tm_queue_mapping *qmap = nsnull;
00341     for (PRUint32 index = 0; index < size; index++) {
00342       qmap = (tm_queue_mapping*) mQueueMaps[index];
00343       if (qmap && 
00344           PL_strcmp(qmap->joinedQueueName, (char*) aTrans->GetMessage()) == 0) {
00345 
00346         // set the ID in the mapping
00347         qmap->queueID = aTrans->GetQueueID();
00348         // send any stored messges to the queue
00349         DispatchStoredMessages(qmap);
00350       }
00351     }
00352   }
00353 
00354   // notify the observer we have attached (or didn't)
00355   ipcITransactionObserver *observer = 
00356     (ipcITransactionObserver *)PL_HashTableLookup(mObservers, 
00357                                                  (char*)aTrans->GetMessage());
00358   if (observer)
00359     observer->OnAttachReply(aTrans->GetQueueID(), aTrans->GetStatus());
00360 }
00361 
00362 void
00363 tmTransactionService::OnDetachReply(tmTransaction *aTrans) {
00364 
00365   tm_queue_mapping *qmap = GetQueueMap(aTrans->GetQueueID());
00366 
00367   // get the observer before we release the hashtable entry
00368   ipcITransactionObserver *observer = 
00369     (ipcITransactionObserver *)PL_HashTableLookup(mObservers, 
00370                                                  qmap->joinedQueueName);
00371 
00372   // if it was removed, clean up
00373   if (aTrans->GetStatus() >= 0) {
00374 
00375     // remove the link between observer and queue
00376     PL_HashTableRemove(mObservers, qmap->joinedQueueName);
00377 
00378     // remove the mapping of queue names and id
00379     mQueueMaps.Remove(qmap);
00380     delete qmap;
00381   }
00382 
00383 
00384   // notify the observer -- could be didn't detach
00385   if (observer)
00386     observer->OnDetachReply(aTrans->GetQueueID(), aTrans->GetStatus());
00387 }
00388 
00389 void
00390 tmTransactionService::OnFlushReply(tmTransaction *aTrans) {
00391 
00392   ipcITransactionObserver *observer = 
00393     (ipcITransactionObserver *)PL_HashTableLookup(mObservers, 
00394                               GetJoinedQueueName(aTrans->GetQueueID()));
00395   if (observer)
00396     observer->OnFlushReply(aTrans->GetQueueID(), aTrans->GetStatus());
00397 }
00398 
00399 void
00400 tmTransactionService::OnPost(tmTransaction *aTrans) {
00401 
00402   ipcITransactionObserver *observer = 
00403     (ipcITransactionObserver*) PL_HashTableLookup(mObservers, 
00404                               GetJoinedQueueName(aTrans->GetQueueID()));
00405   if (observer)
00406     observer->OnTransactionAvailable(aTrans->GetQueueID(), 
00407                                      aTrans->GetMessage(), 
00408                                      aTrans->GetMessageLength());
00409 }
00410 
00411 void
00412 tmTransactionService::DispatchStoredMessages(tm_queue_mapping *aQMapping) {
00413 
00414   PRUint32 size = mWaitingMessages.Size();
00415   tm_waiting_msg *msg = nsnull;
00416   for (PRUint32 index = 0; index < size; index ++) {
00417     msg = (tm_waiting_msg*) mWaitingMessages[index];
00418     // if the message is waiting on the queue passed in
00419     if (msg && strcmp(aQMapping->domainName, msg->domainName) == 0) {
00420 
00421       // found a match, send it and remove
00422       msg->trans.SetQueueID(aQMapping->queueID);
00423       SendMessage(&(msg->trans), PR_FALSE);
00424 
00425       // clean up
00426       mWaitingMessages.Remove(msg);
00427       delete msg;
00428     }
00429   }
00430 }
00431 
00432 // searches against the short queue name
00433 PRInt32
00434 tmTransactionService::GetQueueID(const nsACString & aDomainName) {
00435 
00436   PRUint32 size = mQueueMaps.Size();
00437   tm_queue_mapping *qmap = nsnull;
00438   for (PRUint32 index = 0; index < size; index++) {
00439     qmap = (tm_queue_mapping*) mQueueMaps[index];
00440     if (qmap && aDomainName.Equals(qmap->domainName))
00441       return qmap->queueID;
00442   }
00443   return TM_NO_ID;
00444 }
00445 
00446 char*
00447 tmTransactionService::GetJoinedQueueName(PRUint32 aQueueID) {
00448 
00449   PRUint32 size = mQueueMaps.Size();
00450   tm_queue_mapping *qmap = nsnull;
00451   for (PRUint32 index = 0; index < size; index++) {
00452     qmap = (tm_queue_mapping*) mQueueMaps[index];
00453     if (qmap && qmap->queueID == aQueueID)
00454       return qmap->joinedQueueName;
00455   }
00456   return nsnull;
00457 }
00458 
00459 char*
00460 tmTransactionService::GetJoinedQueueName(const nsACString & aDomainName) {
00461 
00462   PRUint32 size = mQueueMaps.Size();
00463   tm_queue_mapping *qmap = nsnull;
00464   for (PRUint32 index = 0; index < size; index++) {
00465     qmap = (tm_queue_mapping*) mQueueMaps[index];
00466     if (qmap && aDomainName.Equals(qmap->domainName))
00467       return qmap->joinedQueueName;
00468   }
00469   return nsnull;
00470 }
00471 
00472 tm_queue_mapping*
00473 tmTransactionService::GetQueueMap(PRUint32 aQueueID) {
00474 
00475   PRUint32 size = mQueueMaps.Size();
00476   tm_queue_mapping *qmap = nsnull;
00477   for (PRUint32 index = 0; index < size; index++) {
00478     qmap = (tm_queue_mapping*) mQueueMaps[index];
00479     if (qmap && qmap->queueID == aQueueID)
00480       return qmap;
00481   }
00482   return nsnull;
00483 }
00484 
00485 nsresult
00486 tmTransactionService::SendDetachOrFlush(PRUint32 aQueueID,
00487                                         PRUint32 aAction, 
00488                                         PRBool aSync) {
00489 
00490   // if the queue isn't attached to, just return
00491   if (aQueueID == TM_NO_ID)
00492     return NS_ERROR_UNEXPECTED;
00493 
00494   tmTransaction trans;
00495   if (NS_SUCCEEDED(trans.Init(0,         // no IPC client
00496                               aQueueID,  // qID to detach from
00497                               aAction,   // action
00498                               NS_OK,     // default status
00499                               nsnull,    // no message
00500                               0))) {      // no message
00501     // send it
00502     SendMessage(&trans, aSync);
00503     return NS_OK;
00504   }
00505   return NS_ERROR_FAILURE;
00506 }