[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index] [Xen-changelog] [xen-unstable] [XEND] Add Task support in Xen API implementation.
# HG changeset patch # User Alastair Tse <atse@xxxxxxxxxxxxx> # Date 1169645186 0 # Node ID 248a9c36d81670735f5b1d89bbf50ba985903fe5 # Parent 259470f0856b4642dd0a4fab6bfba331f8c5454b [XEND] Add Task support in Xen API implementation. Added progress tracking to some common methods like VM.start so the progress during async invocation. Signed-off-by: Alastair Tse <atse@xxxxxxxxxxxxx> --- tools/python/xen/xend/XendAPI.py | 297 +++++++++++++++++++++------ tools/python/xen/xend/XendAPIConstants.py | 1 tools/python/xen/xend/XendDomainInfo.py | 14 - tools/python/xen/xend/XendTask.py | 226 ++++++++++++++++++++ tools/python/xen/xend/XendTaskManager.py | 110 ++++++++++ tools/python/xen/xend/server/XMLRPCServer.py | 2 6 files changed, 581 insertions(+), 69 deletions(-) diff -r 259470f0856b -r 248a9c36d816 tools/python/xen/xend/XendAPI.py --- a/tools/python/xen/xend/XendAPI.py Wed Jan 24 14:36:03 2007 +0000 +++ b/tools/python/xen/xend/XendAPI.py Wed Jan 24 13:26:26 2007 +0000 @@ -22,12 +22,13 @@ import traceback import traceback from xen.xend import XendDomain, XendDomainInfo, XendNode -from xen.xend import XendLogging +from xen.xend import XendLogging, XendTaskManager from xen.xend.XendAuthSessions import instance as auth_manager from xen.xend.XendError import * from xen.xend.XendClient import ERROR_INVALID_DOMAIN from xen.xend.XendLogging import log +from xen.xend.XendTask import XendTask from xen.xend.XendAPIConstants import * from xen.util.xmlrpclib2 import stringify @@ -237,15 +238,26 @@ def valid_sr(func): 'SR_HANDLE_INVALID', func, *args, **kwargs) def valid_pif(func): - """Decorator to verify if sr_ref is valid before calling + """Decorator to verify if pif_ref is valid before calling method. - @param func: function with params: (self, session, sr_ref) + @param func: function with params: (self, session, pif_ref) @rtype: callable object """ return lambda *args, **kwargs: \ _check_ref(lambda r: r in XendNode.instance().pifs, 'PIF_HANDLE_INVALID', func, *args, **kwargs) + +def valid_task(func): + """Decorator to verify if task_ref is valid before calling + method. + + @param func: function with params: (self, session, task_ref) + @rtype: callable object + """ + return lambda *args, **kwargs: \ + _check_ref(XendTaskManager.get_task, + 'TASK_HANDLE_INVALID', func, *args, **kwargs) # ----------------------------- # Bridge to Legacy XM API calls @@ -288,18 +300,17 @@ class XendAPI: def __init__(self, auth): self.auth = auth - Base_attr_ro = ['uuid'] Base_attr_rw = [] - Base_methods = ['destroy', 'get_by_uuid', 'get_record'] - Base_funcs = ['create', 'get_all'] + Base_methods = [('destroy', None), ('get_record', 'Struct')] + Base_funcs = [('get_all', 'Set'), ('get_by_uuid', None)] # Xen API: Class Session # ---------------------------------------------------------------- # NOTE: Left unwrapped by __init__ session_attr_ro = ['this_host', 'this_user'] - session_methods = ['logout'] + session_methods = [('logout', None)] # session_funcs = ['login_with_password'] def session_login_with_password(self, *args): @@ -346,7 +357,77 @@ class XendAPI: # Xen API: Class Tasks # ---------------------------------------------------------------- - # TODO: NOT IMPLEMENTED YET + + task_attr_ro = ['status', + 'progress', + 'eta', + 'type', + 'result', + 'error_code', + 'error_info'] + + task_attr_rw = ['name_label', + 'name_description'] + + task_funcs = [('get_by_name_label', 'Set(task)')] + + def task_get_status(self, session, task_ref): + task = XendTaskManager.get_task(task_ref) + return xen_api_success(task.get_status()) + + def task_get_progress(self, session, task_ref): + task = XendTaskManager.get_task(task_ref) + return xen_api_success(task.progress) + + def task_get_eta(self, session, task_ref): + task = XendTaskManager.get_task(task_ref) + return xen_api_success(task.eta) + + def task_get_type(self, session, task_ref): + task = XendTaskManager.get_task(task_ref) + return xen_api_success(task.type) + + def task_get_result(self, session, task_ref): + task = XendTaskManager.get_task(task_ref) + return xen_api_success(task.result) + + def task_get_error_code(self, session, task_ref): + task = XendTaskManager.get_task(task_ref) + return xen_api_success(task.error_code) + + def task_get_error_info(self, session, task_ref): + task = XendTaskManager.get_task(task_ref) + return xen_api_success(task.error_info) + + def task_get_name_label(self, session, task_ref): + task = XendTaskManager.get_task(task_ref) + return xen_api_success(task.name_label) + + def task_get_name_description(self, session, task_ref): + task = XendTaskManager.get_task(task_ref) + return xen_api_success(task.name_description) + + def task_set_name_label(self, session, task_ref, label): + task = XendTaskManager.get_task(task_ref) + task.name_label = label + return xen_api_success_void() + + def task_set_name_description(self, session, task_ref, desc): + task = XendTaskManager.get_task(task_ref) + task.name_description = desc + return xen_api_success_void() + + def task_get_all(self, session): + tasks = XendTaskManager.get_all_tasks() + return xen_api_success(tasks) + + def task_destroy(self, session, task_uuid): + XendTaskManager.destroy_task(task_uuid) + return xen_api_success_void() + + def task_get_record(self, session, task_ref): + task = XendTaskManager.get_task(task_ref) + return xen_api_success(task.get_record()) # Xen API: Class Host # ---------------------------------------------------------------- @@ -358,12 +439,12 @@ class XendAPI: host_attr_rw = ['name_label', 'name_description'] - host_methods = ['disable', - 'enable', - 'reboot', - 'shutdown'] - - host_funcs = ['get_by_name_label'] + host_methods = [('disable', None), + ('enable', None), + ('reboot', None), + ('shutdown', None)] + + host_funcs = [('get_by_name_label', 'Set(host)')] # attributes def host_get_name_label(self, session, host_ref): @@ -456,8 +537,6 @@ class XendAPI: # class methods def host_cpu_get_all(self, session): return xen_api_success(XendNode.instance().get_host_cpu_refs()) - def host_cpu_create(self, session, struct): - return xen_api_error(XEND_ERROR_UNSUPPORTED) # Xen API: Class network @@ -468,7 +547,9 @@ class XendAPI: 'name_description', 'default_gateway', 'default_netmask'] - + + network_funcs = [('create', 'network')] + def network_create(self, _, name_label, name_description, default_gateway, default_netmask): return xen_api_success( @@ -534,7 +615,7 @@ class XendAPI: PIF_attr_inst = PIF_attr_rw - PIF_methods = ['create_VLAN'] + PIF_methods = [('create_VLAN', 'int')] def _get_PIF(self, ref): return XendNode.instance().pifs[ref] @@ -659,18 +740,19 @@ class XendAPI: 'platform_keymap', 'otherConfig'] - VM_methods = ['clone', - 'start', - 'pause', - 'unpause', - 'clean_shutdown', - 'clean_reboot', - 'hard_shutdown', - 'hard_reboot', - 'suspend', - 'resume'] - - VM_funcs = ['get_by_name_label'] + VM_methods = [('clone', 'VM'), + ('start', None), + ('pause', None), + ('unpause', None), + ('clean_shutdown', None), + ('clean_reboot', None), + ('hard_shutdown', None), + ('hard_reboot', None), + ('suspend', None), + ('resume', None)] + + VM_funcs = [('create', 'VM'), + ('get_by_name_label', 'Set(VM)')] # parameters required for _create() VM_attr_inst = [ @@ -991,7 +1073,8 @@ class XendAPI: def VM_create(self, session, vm_struct): xendom = XendDomain.instance() - domuuid = xendom.create_domain(vm_struct) + domuuid = XendTask.log_progress(0, 100, + xendom.create_domain, vm_struct) return xen_api_success(domuuid) # object methods @@ -1052,31 +1135,49 @@ class XendAPI: def VM_clean_reboot(self, session, vm_ref): xendom = XendDomain.instance() xeninfo = xendom.get_vm_by_uuid(vm_ref) - xeninfo.shutdown("reboot") - return xen_api_success_void() + XendTask.log_progress(0, 100, xeninfo.shutdown, "reboot") + return xen_api_success_void() + def VM_clean_shutdown(self, session, vm_ref): xendom = XendDomain.instance() xeninfo = xendom.get_vm_by_uuid(vm_ref) - xeninfo.shutdown("poweroff") - return xen_api_success_void() + XendTask.log_progress(0, 100, xeninfo.shutdown, "poweroff") + return xen_api_success_void() + def VM_clone(self, session, vm_ref): return xen_api_error(XEND_ERROR_UNSUPPORTED) + def VM_destroy(self, session, vm_ref): - return do_vm_func("domain_delete", vm_ref) + return XendTask.log_progress(0, 100, do_vm_func, + "domain_delete", vm_ref) + def VM_hard_reboot(self, session, vm_ref): - return xen_api_error(XEND_ERROR_UNSUPPORTED) + return xen_api_error(XEND_ERROR_UNSUPPORTED) + def VM_hard_shutdown(self, session, vm_ref): - return do_vm_func("domain_destroy", vm_ref) + return XendTask.log_progress(0, 100, do_vm_func, + "domain_destroy", vm_ref) def VM_pause(self, session, vm_ref): - return do_vm_func("domain_pause", vm_ref) + return XendTask.log_progress(0, 100, do_vm_func, + "domain_pause", vm_ref) + def VM_resume(self, session, vm_ref, start_paused): - return do_vm_func("domain_resume", vm_ref, start_paused = start_paused) + return XendTask.log_progress(0, 100, do_vm_func, + "domain_resume", vm_ref, + start_paused = start_paused) + def VM_start(self, session, vm_ref, start_paused): - return do_vm_func("domain_start", vm_ref, start_paused = start_paused) + return XendTask.log_progress(0, 100, do_vm_func, + "domain_start", vm_ref, + start_paused = start_paused) + def VM_suspend(self, session, vm_ref): - return do_vm_func("domain_suspend", vm_ref) + return XendTask.log_progress(0, 100, do_vm_func, + "domain_suspend", vm_ref) + def VM_unpause(self, session, vm_ref): - return do_vm_func("domain_unpause", vm_ref) + return XendTask.log_progress(0, 100, do_vm_func, + "domain_unpause", vm_ref) # Xen API: Class VBD # ---------------------------------------------------------------- @@ -1095,8 +1196,9 @@ class XendAPI: VBD_attr_inst = VBD_attr_rw + ['image'] - VBD_methods = ['media_change'] - + VBD_methods = [('media_change', None)] + VBD_funcs = [('create', 'VBD')] + # object methods def VBD_get_record(self, session, vbd_ref): xendom = XendDomain.instance() @@ -1191,6 +1293,9 @@ class XendAPI: VIF_attr_inst = VIF_attr_rw + VIF_funcs = [('create', 'VIF')] + + # object methods def VIF_get_record(self, session, vif_ref): xendom = XendDomain.instance() @@ -1242,8 +1347,9 @@ class XendAPI: 'read_only'] VDI_attr_inst = VDI_attr_ro + VDI_attr_rw - VDI_methods = ['snapshot'] - VDI_funcs = ['get_by_name_label'] + VDI_methods = [('snapshot', 'VDI')] + VDI_funcs = [('create', 'VDI'), + ('get_by_name_label', 'Set(VDI)')] def _get_VDI(self, ref): return XendNode.instance().get_sr().xen_api_get_by_uuid(ref) @@ -1369,6 +1475,8 @@ class XendAPI: VTPM_attr_inst = VTPM_attr_rw + VTPM_funcs = [('create', 'VTPM')] + # object methods def VTPM_get_record(self, session, vtpm_ref): xendom = XendDomain.instance() @@ -1467,8 +1575,9 @@ class XendAPI: 'name_label', 'name_description'] - SR_methods = ['clone'] - SR_funcs = ['get_by_name_label'] + SR_methods = [('clone', 'SR')] + SR_funcs = [('get_by_name_label', 'Set(SR)'), + ('get_by_uuid', 'SR')] # Class Functions def SR_get_all(self, session): @@ -1542,6 +1651,67 @@ class XendAPI: XendNode.instance().save() return xen_api_success_void() + +class XendAPIAsyncProxy: + """ A redirector for Async.Class.function calls to XendAPI + but wraps the call for use with the XendTaskManager. + + @ivar xenapi: Xen API instance + @ivar method_map: Mapping from XMLRPC method name to callable objects. + """ + + method_prefix = 'Async.' + + def __init__(self, xenapi): + """Initialises the Async Proxy by making a map of all + implemented Xen API methods for use with XendTaskManager. + + @param xenapi: XendAPI instance + """ + self.xenapi = xenapi + self.method_map = {} + for method_name in dir(self.xenapi): + method = getattr(self.xenapi, method_name) + if method_name[0] != '_' and hasattr(method, 'async') \ + and method.async == True: + self.method_map[method.api] = method + + def _dispatch(self, method, args): + """Overridden method so that SimpleXMLRPCServer will + resolve methods through this method rather than through + inspection. + + @param method: marshalled method name from XMLRPC. + @param args: marshalled arguments from XMLRPC. + """ + + # Only deal with method names that start with "Async." + if not method.startswith(self.method_prefix): + raise Exception('Method %s not supported' % method) + + # Require 'session' argument to be present. + if len(args) < 1: + raise Exception('Not enough arguments') + + # Lookup synchronous version of the method + synchronous_method_name = method[len(self.method_prefix):] + if synchronous_method_name not in self.method_map: + raise Exception('Method %s not supported' % method) + + method = self.method_map[synchronous_method_name] + + # Validate the session before proceeding + session = args[0] + if not auth_manager().is_session_valid(session): + return xen_api_error(['SESSION_INVALID', session]) + + # create and execute the task, and return task_uuid + return_type = getattr(method, 'return_type', None) + task_uuid = XendTaskManager.create_task(method, args, + synchronous_method_name, + return_type, + synchronous_method_name) + return xen_api_success(task_uuid) def _decorate(): """Initialise Xen API wrapper by making sure all functions @@ -1561,7 +1731,8 @@ def _decorate(): 'VDI' : valid_vdi, 'VTPM' : valid_vtpm, 'SR' : valid_sr, - 'PIF' : valid_pif + 'PIF' : valid_pif, + 'task' : valid_task, } # Cheat methods @@ -1582,17 +1753,11 @@ def _decorate(): setattr(XendAPI, get_by_uuid, _get_by_uuid) setattr(XendAPI, get_uuid, _get_uuid) - # 2. get_record is just getting all the attributes, so provide - # a fake template implementation. - # - # TODO: ... - - # Wrapping validators around XMLRPC calls # --------------------------------------- for cls, validator in classes.items(): - def doit(n, takes_instance): + def doit(n, takes_instance, async_support = False, return_type = None): n_ = n.replace('.', '_') try: f = getattr(XendAPI, n_) @@ -1604,6 +1769,10 @@ def _decorate(): for v in validators: f = v(f) f.api = n + f.async = async_support + if return_type: + f.return_type = return_type + setattr(XendAPI, n_, f) except AttributeError: log.warn("API call: %s not found" % n) @@ -1616,19 +1785,21 @@ def _decorate(): # wrap validators around readable class attributes for attr_name in ro_attrs + rw_attrs + XendAPI.Base_attr_ro: - doit('%s.get_%s' % (cls, attr_name), True) + doit('%s.get_%s' % (cls, attr_name), True, async_support = False) # wrap validators around writable class attrributes for attr_name in rw_attrs + XendAPI.Base_attr_rw: - doit('%s.set_%s' % (cls, attr_name), True) + doit('%s.set_%s' % (cls, attr_name), True, async_support = False) # wrap validators around methods - for method_name in methods + XendAPI.Base_methods: - doit('%s.%s' % (cls, method_name), True) + for method_name, return_type in methods + XendAPI.Base_methods: + doit('%s.%s' % (cls, method_name), True, async_support = True) # wrap validators around class functions - for func_name in funcs + XendAPI.Base_funcs: - doit('%s.%s' % (cls, func_name), False) + for func_name, return_type in funcs + XendAPI.Base_funcs: + doit('%s.%s' % (cls, func_name), False, async_support = True, + return_type = return_type) + _decorate() diff -r 259470f0856b -r 248a9c36d816 tools/python/xen/xend/XendAPIConstants.py --- a/tools/python/xen/xend/XendAPIConstants.py Wed Jan 24 14:36:03 2007 +0000 +++ b/tools/python/xen/xend/XendAPIConstants.py Wed Jan 24 13:26:26 2007 +0000 @@ -74,3 +74,4 @@ XEN_API_VDI_TYPE = ['system', 'user', 'e XEN_API_VDI_TYPE = ['system', 'user', 'ephemeral'] XEN_API_DRIVER_TYPE = ['ioemu', 'paravirtualised'] XEN_API_VBD_TYPE = ['CD', 'Disk'] +XEN_API_TASK_STATUS_TYPE = ['pending', 'success', 'failure'] diff -r 259470f0856b -r 248a9c36d816 tools/python/xen/xend/XendDomainInfo.py --- a/tools/python/xen/xend/XendDomainInfo.py Wed Jan 24 14:36:03 2007 +0000 +++ b/tools/python/xen/xend/XendDomainInfo.py Wed Jan 24 13:26:26 2007 +0000 @@ -44,6 +44,7 @@ from xen.xend.XendBootloader import boot from xen.xend.XendBootloader import bootloader, bootloader_tidy from xen.xend.XendError import XendError, VmError from xen.xend.XendDevices import XendDevices +from xen.xend.XendTask import XendTask from xen.xend.xenstore.xstransact import xstransact, complete from xen.xend.xenstore.xsutil import GetDomainPath, IntroduceDomain, ResumeDomain from xen.xend.xenstore.xswatch import xswatch @@ -387,12 +388,13 @@ class XendDomainInfo: if self.state == DOM_STATE_HALTED: try: - self._constructDomain() - self._initDomain() - self._storeVmDetails() - self._storeDomDetails() - self._registerWatches() - self.refreshShutdown() + XendTask.log_progress(0, 30, self._constructDomain) + XendTask.log_progress(31, 60, self._initDomain) + + XendTask.log_progress(61, 70, self._storeVmDetails) + XendTask.log_progress(71, 80, self._storeDomDetails) + XendTask.log_progress(81, 90, self._registerWatches) + XendTask.log_progress(91, 100, self.refreshShutdown) # save running configuration if XendDomains believe domain is # persistent diff -r 259470f0856b -r 248a9c36d816 tools/python/xen/xend/XendTask.py --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/tools/python/xen/xend/XendTask.py Wed Jan 24 13:26:26 2007 +0000 @@ -0,0 +1,226 @@ +#=========================================================================== +# This library is free software; you can redistribute it and/or +# modify it under the terms of version 2.1 of the GNU Lesser General Public +# License as published by the Free Software Foundation. +# +# This library is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +# Lesser General Public License for more details. +# +# You should have received a copy of the GNU Lesser General Public +# License along with this library; if not, write to the Free Software +# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA +#============================================================================ +# Copyright (C) 2007 XenSource Ltd +#============================================================================ + +from xen.xend.XendAPIConstants import XEN_API_TASK_STATUS_TYPE +from xen.xend.XendLogging import log +import thread +import threading + +class XendTask(threading.Thread): + """Represents a Asynchronous Task used by Xen API. + + Basically proxies the callable object in a thread and returns the + results via self.{type,result,error_code,error_info}. + + @cvar task_progress: Thread local storage for progress tracking. + It is a dict indexed by thread_id. Note that the + thread_id may be reused when the previous + thread with the thread_id ends. + + @cvar task_progress_lock: lock on thread access to task_progress + + """ + + # progress stack: + # thread_id : [(start_task, end_task), + # (start_sub_task, end_sub_task)..] + # example : (0, 100), (50, 100) (50, 100) ... + # That would mean that the task is 75% complete. + # as it is 50% of the last 50% of the task. + + task_progress = {} + task_progress_lock = threading.Lock() + + def __init__(self, uuid, func, args, func_name, return_type = None, + label = None, desc = None): + """ + @param uuid: UUID of the task + @type uuid: string + @param func: Method to call (from XendAPI) + @type func: callable object + @param args: arguments to pass to function + @type args: list or tuple + @param label: name label of the task. + @type label: string + @param desc: name description of the task. + @type desc: string + @param func_name: function name, eg ('VM.start') + @type desc: string + """ + + threading.Thread.__init__(self) + self.status_lock = threading.Lock() + self.status = XEN_API_TASK_STATUS_TYPE[0] + + self.progress = 0 + self.eta = None # TODO: we have no time estimates + self.type = return_type + self.uuid = uuid + + self.result = None + self.error_code = '' + self.error_info = [] + + self.name_label = label or func.__name__ + self.name_description = desc + self.thread_id = 0 + + self.func_name = func_name + self.func = func + self.args = args + + def set_status(self, new_status): + self.status_lock.acquire() + try: + self.status = new_status + finally: + self.status_lock.release() + + def get_status(self): + self.status_lock.acquire() + try: + return self.status + finally: + self.status_lock.release() + + def run(self): + """Runs the method and stores the result for later access. + + Is invoked by threading.Thread.start(). + """ + + self.thread_id = thread.get_ident() + self.task_progress_lock.acquire() + try: + self.task_progress[self.thread_id] = {} + self.progress = 0 + finally: + self.task_progress_lock.release() + + try: + result = self.func(*self.args) + if result['Status'] == 'Success': + self.result = result['Value'] + self.set_status(XEN_API_TASK_STATUS_TYPE[1]) + else: + self.error_code = result['ErrorDescription'][0] + self.error_info = result['ErrorDescription'][1:] + self.set_status(XEN_API_TASK_STATUS_TYPE[2]) + except Exception, e: + log.exception('Error running Async Task') + self.error_code = 'INTERNAL ERROR' + self.error_info = [str(e)] + self.set_status(XEN_API_TASK_STATUS_TYPE[2]) + + self.task_progress_lock.acquire() + try: + del self.task_progress[self.thread_id] + self.progress = 100 + finally: + self.task_progress_lock.release() + + def get_record(self): + """Returns a Xen API compatible record.""" + return { + 'uuid': self.uuid, + 'name_label': self.name_label, + 'name_description': self.name_description, + 'status': self.status, + 'progress': self.get_progress(), + 'eta': self.eta, + 'type': self.type, + 'result': self.result, + 'error_code': self.error_code, + 'error_info': self.error_info, + } + + def get_progress(self): + """ Checks the thread local progress storage. """ + if self.status != XEN_API_TASK_STATUS_TYPE[0]: + return 100 + + self.task_progress_lock.acquire() + try: + # Pop each progress range in the stack and map it on to + # the next progress range until we find out cumulative + # progress based on the (start, end) range of each level + start = 0 + prog_stack = self.task_progress.get(self.thread_id, [])[:] + if len(prog_stack) > 0: + start, stop = prog_stack.pop() + while prog_stack: + new_start, new_stop = prog_stack.pop() + start = new_start + ((new_stop - new_start)/100.0 * start) + + # only update progress if it increases, this will prevent + # progress from going backwards when tasks are popped off + # the stack + if start > self.progress: + self.progress = int(start) + finally: + self.task_progress_lock.release() + + return self.progress + + + def log_progress(cls, progress_min, progress_max, + func, *args, **kwds): + """ Callable function wrapper that logs the progress of the + function to thread local storage for task progress calculation. + + This is a class method so other parts of Xend will update + the task progress by calling: + + XendTask.push_progress(progress_min, progress_max, + func, *args, **kwds) + + The results of the progress is stored in thread local storage + and the result of the func(*args, **kwds) is returned back + to the caller. + + """ + thread_id = thread.get_ident() + retval = None + + # Log the start of the method + cls.task_progress_lock.acquire() + try: + if type(cls.task_progress.get(thread_id)) != list: + cls.task_progress[thread_id] = [] + + cls.task_progress[thread_id].append((progress_min, + progress_max)) + finally: + cls.task_progress_lock.release() + + # Execute the method + retval = func(*args, **kwds) + + # Log the end of the method by popping the progress range + # off the stack. + cls.task_progress_lock.acquire() + try: + cls.task_progress[thread_id].pop() + finally: + cls.task_progress_lock.release() + + return retval + + log_progress = classmethod(log_progress) + + + diff -r 259470f0856b -r 248a9c36d816 tools/python/xen/xend/XendTaskManager.py --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/tools/python/xen/xend/XendTaskManager.py Wed Jan 24 13:26:26 2007 +0000 @@ -0,0 +1,110 @@ +#=========================================================================== +# This library is free software; you can redistribute it and/or +# modify it under the terms of version 2.1 of the GNU Lesser General Public +# License as published by the Free Software Foundation. +# +# This library is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +# Lesser General Public License for more details. +# +# You should have received a copy of the GNU Lesser General Public +# License along with this library; if not, write to the Free Software +# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA +#============================================================================ +# Copyright (C) 2007 XenSource Ltd +#============================================================================ + +""" +Task Manager for Xen API asynchronous tasks. + +Stores all tasks in a simple dictionary in module's own local storage to +avoid the 'instance()' methods. + +Tasks are indexed by UUID. + +""" + +from xen.xend.XendTask import XendTask +from xen.xend import uuid +import threading + +tasks = {} +tasks_lock = threading.Lock() + +def create_task(func, args, func_name, return_type = None, label = ''): + """Creates a new Task and registers it with the XendTaskManager. + + @param func: callable object XMLRPC method + @type func: callable object + @param args: tuple or list of arguments + @type args: tuple or list + @param func_name: XMLRPC method name, so we can estimate the progress + @type func_name: string + + @return: Task UUID + @rtype: string. + """ + task_uuid = uuid.createString() + try: + tasks_lock.acquire() + task = XendTask(task_uuid, func, args, func_name, + return_type = return_type, label = label) + tasks[task_uuid] = task + finally: + tasks_lock.release() + + task.start() + + return task_uuid + +def destroy_task(task_uuid): + """Destroys a task. + + @param task_uuid: Task UUID + @type task_uuid: string. + """ + try: + tasks_lock.acquire() + if task_uuid in tasks: + del tasks[task_uuid] + finally: + tasks_lock.release() + +def get_all_tasks(): + """ Returns all the UUID of tracked tasks, completed or pending. + + @returns: list of UUIDs + @rtype: list of strings + """ + try: + tasks_lock.acquire() + return tasks.keys() + finally: + tasks_lock.release() + +def get_task(task_uuid): + """ Retrieves a task by UUID. + + @rtype: XendTask or None + @return: Task denoted by UUID. + """ + try: + tasks_lock.acquire() + return tasks.get(task_uuid) + finally: + tasks_lock.release() + +def get_tasks_by_name(task_name): + """ Retrieves a task by UUID. + + @rtype: XendTask or None + @return: Task denoted by UUID. + """ + try: + tasks_lock.acquire() + return [t.uuid for t in tasks if t.name_label == name] + finally: + tasks_lock.release() + + diff -r 259470f0856b -r 248a9c36d816 tools/python/xen/xend/server/XMLRPCServer.py --- a/tools/python/xen/xend/server/XMLRPCServer.py Wed Jan 24 14:36:03 2007 +0000 +++ b/tools/python/xen/xend/server/XMLRPCServer.py Wed Jan 24 13:26:26 2007 +0000 @@ -139,6 +139,8 @@ class XMLRPCServer: meth = getattr(self.xenapi, meth_name) if callable(meth) and hasattr(meth, 'api'): self.server.register_function(meth, getattr(meth, 'api')) + + self.server.register_instance(XendAPI.XendAPIAsyncProxy(self.xenapi)) # Legacy deprecated xm xmlrpc api # -------------------------------------------------------------------- _______________________________________________ Xen-changelog mailing list Xen-changelog@xxxxxxxxxxxxxxxxxxx http://lists.xensource.com/xen-changelog
|
Lists.xenproject.org is hosted with RackSpace, monitoring our |