Back to index

moin  1.9.0~rc2
rpc_aggregator.py
Go to the documentation of this file.
00001 # -*- coding: iso-8859-1 -*-
00002 """
00003     MoinMoin - RPC Aggregator
00004 
00005     Aggregates RPC calls into MultiCall batches in order to increase
00006     the speed.
00007 
00008     @copyright: 2006 MoinMoin:AlexanderSchremmer
00009     @license: GNU GPL, see COPYING for details.
00010 """
00011 
00012 import xmlrpclib
00013 INVALID = object()
00014 
00015 class RPCYielder(object):
00016     """ If you want to have a batchable function, you need
00017     to inherit from this class and define a method "run" that
00018     takes exactly one argument.
00019     This method has to be a generator that yields (func, arg)
00020     tuples whereas func is the RPC method name (str).
00021     You can fetch the calls by calling fetch_call(),
00022     then you have to return the result by calling set_result(res).
00023     """
00024 
00025     def __init__(self, arg, raise_fault=False):
00026         self._comm_slot = [INVALID]
00027         self.raise_fault = raise_fault
00028         self._gen = self.run(arg)
00029 
00030     def fetch_call(self):
00031         try:
00032             next_item = self._gen.next()
00033         except StopIteration:
00034             return None
00035         return next_item
00036 
00037     def set_result(self, result):
00038         self._comm_slot[0] = result
00039 
00040     def fetch_result(self):
00041         result = self._comm_slot[0]
00042         try:
00043             if result is INVALID:
00044                 return RuntimeError("Invalid state, there is no result to fetch.")
00045             if self.raise_fault and isinstance(result, xmlrpclib.Fault):
00046                 raise result
00047             else:
00048                 return result
00049         finally:
00050             self._comm_slot[0] = INVALID
00051 
00052     def run(self, arg):
00053         return NotImplementedError
00054 
00055 
00056 def scheduler(multicall_func, handler, args, max_calls=10, prepare_multicall_func=None):
00057     # all generator (or better, RPCYielder) instances
00058     gens = []
00059     # those instances that have to be queried in the next step again
00060     gens_todo = []
00061     # pending calls, stored as tuples: (generator, (funcname, (args,*)))
00062     call_list = []
00063 
00064     # instantiate generators
00065     for arg in args:
00066         gens.append(handler(arg))
00067     # schedule generators
00068     while gens:
00069         for gen in gens:
00070             if len(call_list) > max_calls:
00071                 gens_todo.append(gen)
00072                 continue
00073             call = gen.fetch_call()
00074             if call is not None:
00075                 call_list.append((gen, call))
00076                 gens_todo.append(gen)
00077         if call_list:
00078             if prepare_multicall_func is not None:
00079                 pre_calls = [(RPCYielder(0), x) for x in prepare_multicall_func()]
00080                 call_list = pre_calls + call_list
00081 
00082             m = multicall_func()
00083             gens_result = [] # generators that will get a result
00084             for gen, (func, args) in call_list:
00085                 gens_result.append(gen)
00086                 getattr(m, func)(*args) # register call
00087             result = iter(m()) # execute multicall
00088             for gen in gens_result:
00089                 try:
00090                     item = result.next()
00091                 except xmlrpclib.Fault, e:
00092                     # this exception is reraised by the RPCYielder
00093                     item = e
00094                 gen.set_result(item)
00095             call_list = []
00096         gens = gens_todo
00097         gens_todo = []
00098 
00099 
00100 def scheduler_simple(multicall_func, handler, args):
00101     for arg in args:
00102         cur_handler = handler(arg)
00103         while 1:
00104             call = cur_handler.fetch_call()
00105             if call is not None:
00106                 func, arg = call
00107                 m = multicall_func()
00108                 getattr(m, func)(arg) # register call
00109                 result = iter(m()) # execute multicall
00110                 try:
00111                     item = result.next()
00112                 except xmlrpclib.Fault, e:
00113                     # this exception is reraised by the RPCYielder
00114                     item = e
00115                 cur_handler.set_result(item)
00116             else:
00117                 break