[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[Xen-changelog] [xen-unstable] [XEND] Add Task support in Xen API implementation.



# HG changeset patch
# User Alastair Tse <atse@xxxxxxxxxxxxx>
# Date 1169645186 0
# Node ID 248a9c36d81670735f5b1d89bbf50ba985903fe5
# Parent  259470f0856b4642dd0a4fab6bfba331f8c5454b
[XEND] Add Task support in Xen API implementation.

Added progress tracking to some common methods like VM.start so the
progress during async invocation.

Signed-off-by: Alastair Tse <atse@xxxxxxxxxxxxx>
---
 tools/python/xen/xend/XendAPI.py             |  297 +++++++++++++++++++++------
 tools/python/xen/xend/XendAPIConstants.py    |    1 
 tools/python/xen/xend/XendDomainInfo.py      |   14 -
 tools/python/xen/xend/XendTask.py            |  226 ++++++++++++++++++++
 tools/python/xen/xend/XendTaskManager.py     |  110 ++++++++++
 tools/python/xen/xend/server/XMLRPCServer.py |    2 
 6 files changed, 581 insertions(+), 69 deletions(-)

diff -r 259470f0856b -r 248a9c36d816 tools/python/xen/xend/XendAPI.py
--- a/tools/python/xen/xend/XendAPI.py  Wed Jan 24 14:36:03 2007 +0000
+++ b/tools/python/xen/xend/XendAPI.py  Wed Jan 24 13:26:26 2007 +0000
@@ -22,12 +22,13 @@ import traceback
 import traceback
 
 from xen.xend import XendDomain, XendDomainInfo, XendNode
-from xen.xend import XendLogging
+from xen.xend import XendLogging, XendTaskManager
 
 from xen.xend.XendAuthSessions import instance as auth_manager
 from xen.xend.XendError import *
 from xen.xend.XendClient import ERROR_INVALID_DOMAIN
 from xen.xend.XendLogging import log
+from xen.xend.XendTask import XendTask
 
 from xen.xend.XendAPIConstants import *
 from xen.util.xmlrpclib2 import stringify
@@ -237,15 +238,26 @@ def valid_sr(func):
                       'SR_HANDLE_INVALID', func, *args, **kwargs)
 
 def valid_pif(func):
-    """Decorator to verify if sr_ref is valid before calling
+    """Decorator to verify if pif_ref is valid before calling
     method.
 
-    @param func: function with params: (self, session, sr_ref)
+    @param func: function with params: (self, session, pif_ref)
     @rtype: callable object
     """
     return lambda *args, **kwargs: \
            _check_ref(lambda r: r in XendNode.instance().pifs,
                       'PIF_HANDLE_INVALID', func, *args, **kwargs)
+
+def valid_task(func):
+    """Decorator to verify if task_ref is valid before calling
+    method.
+
+    @param func: function with params: (self, session, task_ref)
+    @rtype: callable object
+    """
+    return lambda *args, **kwargs: \
+           _check_ref(XendTaskManager.get_task,
+                      'TASK_HANDLE_INVALID', func, *args, **kwargs)
 
 # -----------------------------
 # Bridge to Legacy XM API calls
@@ -288,18 +300,17 @@ class XendAPI:
     def __init__(self, auth):
         self.auth = auth
 
-
     Base_attr_ro = ['uuid']
     Base_attr_rw = []
-    Base_methods = ['destroy', 'get_by_uuid', 'get_record']
-    Base_funcs   = ['create', 'get_all']
+    Base_methods = [('destroy', None), ('get_record', 'Struct')]
+    Base_funcs   = [('get_all', 'Set'), ('get_by_uuid', None)]
 
     # Xen API: Class Session
     # ----------------------------------------------------------------
     # NOTE: Left unwrapped by __init__
 
     session_attr_ro = ['this_host', 'this_user']
-    session_methods = ['logout']
+    session_methods = [('logout', None)]
     # session_funcs = ['login_with_password']    
 
     def session_login_with_password(self, *args):
@@ -346,7 +357,77 @@ class XendAPI:
 
     # Xen API: Class Tasks
     # ----------------------------------------------------------------
-    # TODO: NOT IMPLEMENTED YET    
+
+    task_attr_ro = ['status',
+                    'progress',
+                    'eta',                    
+                    'type',
+                    'result',
+                    'error_code',
+                    'error_info']
+
+    task_attr_rw = ['name_label',
+                    'name_description']
+
+    task_funcs = [('get_by_name_label', 'Set(task)')]
+
+    def task_get_status(self, session, task_ref):
+        task = XendTaskManager.get_task(task_ref)
+        return xen_api_success(task.get_status())
+
+    def task_get_progress(self, session, task_ref):
+        task = XendTaskManager.get_task(task_ref)
+        return xen_api_success(task.progress)
+
+    def task_get_eta(self, session, task_ref):
+        task = XendTaskManager.get_task(task_ref)
+        return xen_api_success(task.eta)
+
+    def task_get_type(self, session, task_ref):
+        task = XendTaskManager.get_task(task_ref)
+        return xen_api_success(task.type)
+
+    def task_get_result(self, session, task_ref):
+        task = XendTaskManager.get_task(task_ref)
+        return xen_api_success(task.result)
+
+    def task_get_error_code(self, session, task_ref):
+        task = XendTaskManager.get_task(task_ref)
+        return xen_api_success(task.error_code)
+
+    def task_get_error_info(self, session, task_ref):
+        task = XendTaskManager.get_task(task_ref)
+        return xen_api_success(task.error_info)
+
+    def task_get_name_label(self, session, task_ref):
+        task = XendTaskManager.get_task(task_ref)
+        return xen_api_success(task.name_label)
+
+    def task_get_name_description(self, session, task_ref):
+        task = XendTaskManager.get_task(task_ref)
+        return xen_api_success(task.name_description)
+
+    def task_set_name_label(self, session, task_ref, label):
+        task = XendTaskManager.get_task(task_ref)
+        task.name_label = label
+        return xen_api_success_void()
+
+    def task_set_name_description(self, session, task_ref, desc):
+        task = XendTaskManager.get_task(task_ref)
+        task.name_description = desc
+        return xen_api_success_void()    
+
+    def task_get_all(self, session):
+        tasks = XendTaskManager.get_all_tasks()
+        return xen_api_success(tasks)
+
+    def task_destroy(self, session, task_uuid):
+        XendTaskManager.destroy_task(task_uuid)
+        return xen_api_success_void()
+
+    def task_get_record(self, session, task_ref):
+        task = XendTaskManager.get_task(task_ref)
+        return xen_api_success(task.get_record())
 
     # Xen API: Class Host
     # ----------------------------------------------------------------    
@@ -358,12 +439,12 @@ class XendAPI:
     host_attr_rw = ['name_label',
                     'name_description']
 
-    host_methods = ['disable',
-                    'enable',
-                    'reboot',
-                    'shutdown']
-    
-    host_funcs = ['get_by_name_label']
+    host_methods = [('disable', None),
+                    ('enable', None),
+                    ('reboot', None),
+                    ('shutdown', None)]
+    
+    host_funcs = [('get_by_name_label', 'Set(host)')]
 
     # attributes
     def host_get_name_label(self, session, host_ref):
@@ -456,8 +537,6 @@ class XendAPI:
     # class methods
     def host_cpu_get_all(self, session):
         return xen_api_success(XendNode.instance().get_host_cpu_refs())
-    def host_cpu_create(self, session, struct):
-        return xen_api_error(XEND_ERROR_UNSUPPORTED)
 
 
     # Xen API: Class network
@@ -468,7 +547,9 @@ class XendAPI:
                        'name_description',
                        'default_gateway',
                        'default_netmask']
-
+    
+    network_funcs = [('create', 'network')]
+    
     def network_create(self, _, name_label, name_description,
                        default_gateway, default_netmask):
         return xen_api_success(
@@ -534,7 +615,7 @@ class XendAPI:
 
     PIF_attr_inst = PIF_attr_rw
 
-    PIF_methods = ['create_VLAN']
+    PIF_methods = [('create_VLAN', 'int')]
 
     def _get_PIF(self, ref):
         return XendNode.instance().pifs[ref]
@@ -659,18 +740,19 @@ class XendAPI:
                   'platform_keymap',
                   'otherConfig']
 
-    VM_methods = ['clone',
-                  'start',
-                  'pause',
-                  'unpause',
-                  'clean_shutdown',
-                  'clean_reboot',
-                  'hard_shutdown',
-                  'hard_reboot',
-                  'suspend',
-                  'resume']
-    
-    VM_funcs  = ['get_by_name_label']
+    VM_methods = [('clone', 'VM'),
+                  ('start', None),
+                  ('pause', None),
+                  ('unpause', None),
+                  ('clean_shutdown', None),
+                  ('clean_reboot', None),
+                  ('hard_shutdown', None),
+                  ('hard_reboot', None),
+                  ('suspend', None),
+                  ('resume', None)]
+    
+    VM_funcs  = [('create', 'VM'),
+                 ('get_by_name_label', 'Set(VM)')]
 
     # parameters required for _create()
     VM_attr_inst = [
@@ -991,7 +1073,8 @@ class XendAPI:
     
     def VM_create(self, session, vm_struct):
         xendom = XendDomain.instance()
-        domuuid = xendom.create_domain(vm_struct)
+        domuuid = XendTask.log_progress(0, 100,
+                                        xendom.create_domain, vm_struct)
         return xen_api_success(domuuid)
     
     # object methods
@@ -1052,31 +1135,49 @@ class XendAPI:
     def VM_clean_reboot(self, session, vm_ref):
         xendom = XendDomain.instance()
         xeninfo = xendom.get_vm_by_uuid(vm_ref)
-        xeninfo.shutdown("reboot")
-        return xen_api_success_void()
+        XendTask.log_progress(0, 100, xeninfo.shutdown, "reboot")
+        return xen_api_success_void()
+    
     def VM_clean_shutdown(self, session, vm_ref):
         xendom = XendDomain.instance()
         xeninfo = xendom.get_vm_by_uuid(vm_ref)
-        xeninfo.shutdown("poweroff")
-        return xen_api_success_void()
+        XendTask.log_progress(0, 100, xeninfo.shutdown, "poweroff")        
+        return xen_api_success_void()
+    
     def VM_clone(self, session, vm_ref):
         return xen_api_error(XEND_ERROR_UNSUPPORTED)
+    
     def VM_destroy(self, session, vm_ref):
-        return do_vm_func("domain_delete", vm_ref)
+        return XendTask.log_progress(0, 100, do_vm_func,
+                                     "domain_delete", vm_ref)
+    
     def VM_hard_reboot(self, session, vm_ref):
-        return xen_api_error(XEND_ERROR_UNSUPPORTED)    
+        return xen_api_error(XEND_ERROR_UNSUPPORTED)
+    
     def VM_hard_shutdown(self, session, vm_ref):
-        return do_vm_func("domain_destroy", vm_ref)    
+        return XendTask.log_progress(0, 100, do_vm_func,
+                                     "domain_destroy", vm_ref)    
     def VM_pause(self, session, vm_ref):
-        return do_vm_func("domain_pause", vm_ref)
+        return XendTask.log_progress(0, 100, do_vm_func,
+                                     "domain_pause", vm_ref)
+    
     def VM_resume(self, session, vm_ref, start_paused):
-        return do_vm_func("domain_resume", vm_ref, start_paused = 
start_paused)    
+        return XendTask.log_progress(0, 100, do_vm_func,
+                                     "domain_resume", vm_ref,
+                                     start_paused = start_paused)
+    
     def VM_start(self, session, vm_ref, start_paused):
-        return do_vm_func("domain_start", vm_ref, start_paused = start_paused)
+        return XendTask.log_progress(0, 100, do_vm_func,
+                                     "domain_start", vm_ref,
+                                     start_paused = start_paused)
+    
     def VM_suspend(self, session, vm_ref):
-        return do_vm_func("domain_suspend", vm_ref)    
+        return XendTask.log_progress(0, 100, do_vm_func,
+                                     "domain_suspend", vm_ref)
+    
     def VM_unpause(self, session, vm_ref):
-        return do_vm_func("domain_unpause", vm_ref)
+        return XendTask.log_progress(0, 100, do_vm_func,
+                                     "domain_unpause", vm_ref)
 
     # Xen API: Class VBD
     # ----------------------------------------------------------------
@@ -1095,8 +1196,9 @@ class XendAPI:
 
     VBD_attr_inst = VBD_attr_rw + ['image']
 
-    VBD_methods = ['media_change']
-
+    VBD_methods = [('media_change', None)]
+    VBD_funcs = [('create', 'VBD')]
+    
     # object methods
     def VBD_get_record(self, session, vbd_ref):
         xendom = XendDomain.instance()
@@ -1191,6 +1293,9 @@ class XendAPI:
 
     VIF_attr_inst = VIF_attr_rw
 
+    VIF_funcs = [('create', 'VIF')]
+
+                 
     # object methods
     def VIF_get_record(self, session, vif_ref):
         xendom = XendDomain.instance()
@@ -1242,8 +1347,9 @@ class XendAPI:
                    'read_only']
     VDI_attr_inst = VDI_attr_ro + VDI_attr_rw
 
-    VDI_methods = ['snapshot']
-    VDI_funcs = ['get_by_name_label']
+    VDI_methods = [('snapshot', 'VDI')]
+    VDI_funcs = [('create', 'VDI'),
+                  ('get_by_name_label', 'Set(VDI)')]
 
     def _get_VDI(self, ref):
         return XendNode.instance().get_sr().xen_api_get_by_uuid(ref)
@@ -1369,6 +1475,8 @@ class XendAPI:
 
     VTPM_attr_inst = VTPM_attr_rw
 
+    VTPM_funcs = [('create', 'VTPM')]
+    
     # object methods
     def VTPM_get_record(self, session, vtpm_ref):
         xendom = XendDomain.instance()
@@ -1467,8 +1575,9 @@ class XendAPI:
                     'name_label',
                     'name_description']
     
-    SR_methods = ['clone']
-    SR_funcs = ['get_by_name_label']
+    SR_methods = [('clone', 'SR')]
+    SR_funcs = [('get_by_name_label', 'Set(SR)'),
+                ('get_by_uuid', 'SR')]
 
     # Class Functions
     def SR_get_all(self, session):
@@ -1542,6 +1651,67 @@ class XendAPI:
         XendNode.instance().save()        
         return xen_api_success_void()
 
+
+class XendAPIAsyncProxy:
+    """ A redirector for Async.Class.function calls to XendAPI
+    but wraps the call for use with the XendTaskManager.
+
+    @ivar xenapi: Xen API instance
+    @ivar method_map: Mapping from XMLRPC method name to callable objects.
+    """
+
+    method_prefix = 'Async.'
+
+    def __init__(self, xenapi):
+        """Initialises the Async Proxy by making a map of all
+        implemented Xen API methods for use with XendTaskManager.
+
+        @param xenapi: XendAPI instance
+        """
+        self.xenapi = xenapi
+        self.method_map = {}
+        for method_name in dir(self.xenapi):
+            method = getattr(self.xenapi, method_name)            
+            if method_name[0] != '_' and hasattr(method, 'async') \
+                   and method.async == True:
+                self.method_map[method.api] = method
+
+    def _dispatch(self, method, args):
+        """Overridden method so that SimpleXMLRPCServer will
+        resolve methods through this method rather than through
+        inspection.
+
+        @param method: marshalled method name from XMLRPC.
+        @param args: marshalled arguments from XMLRPC.
+        """
+
+        # Only deal with method names that start with "Async."
+        if not method.startswith(self.method_prefix):
+            raise Exception('Method %s not supported' % method)
+
+        # Require 'session' argument to be present.
+        if len(args) < 1:
+            raise Exception('Not enough arguments')
+
+        # Lookup synchronous version of the method
+        synchronous_method_name = method[len(self.method_prefix):]
+        if synchronous_method_name not in self.method_map:
+            raise Exception('Method %s not supported' % method)
+        
+        method = self.method_map[synchronous_method_name]
+
+        # Validate the session before proceeding
+        session = args[0]
+        if not auth_manager().is_session_valid(session):
+            return xen_api_error(['SESSION_INVALID', session])
+
+        # create and execute the task, and return task_uuid
+        return_type = getattr(method, 'return_type', None)
+        task_uuid = XendTaskManager.create_task(method, args,
+                                                synchronous_method_name,
+                                                return_type,
+                                                synchronous_method_name)
+        return xen_api_success(task_uuid)
 
 def _decorate():
     """Initialise Xen API wrapper by making sure all functions
@@ -1561,7 +1731,8 @@ def _decorate():
         'VDI'     : valid_vdi,
         'VTPM'    : valid_vtpm,
         'SR'      : valid_sr,
-        'PIF'     : valid_pif
+        'PIF'     : valid_pif,
+        'task'    : valid_task,
         }
 
     # Cheat methods
@@ -1582,17 +1753,11 @@ def _decorate():
         setattr(XendAPI, get_by_uuid, _get_by_uuid)
         setattr(XendAPI, get_uuid,    _get_uuid)
 
-    # 2. get_record is just getting all the attributes, so provide
-    #    a fake template implementation.
-    # 
-    # TODO: ...
-
-
     # Wrapping validators around XMLRPC calls
     # ---------------------------------------
 
     for cls, validator in classes.items():
-        def doit(n, takes_instance):
+        def doit(n, takes_instance, async_support = False, return_type = None):
             n_ = n.replace('.', '_')
             try:
                 f = getattr(XendAPI, n_)
@@ -1604,6 +1769,10 @@ def _decorate():
                 for v in validators:
                     f = v(f)
                     f.api = n
+                    f.async = async_support
+                    if return_type:
+                        f.return_type = return_type
+                    
                 setattr(XendAPI, n_, f)
             except AttributeError:
                 log.warn("API call: %s not found" % n)
@@ -1616,19 +1785,21 @@ def _decorate():
 
         # wrap validators around readable class attributes
         for attr_name in ro_attrs + rw_attrs + XendAPI.Base_attr_ro:
-            doit('%s.get_%s' % (cls, attr_name), True)
+            doit('%s.get_%s' % (cls, attr_name), True, async_support = False)
 
         # wrap validators around writable class attrributes
         for attr_name in rw_attrs + XendAPI.Base_attr_rw:
-            doit('%s.set_%s' % (cls, attr_name), True)
+            doit('%s.set_%s' % (cls, attr_name), True, async_support = False)
 
         # wrap validators around methods
-        for method_name in methods + XendAPI.Base_methods:
-            doit('%s.%s' % (cls, method_name), True)
+        for method_name, return_type in methods + XendAPI.Base_methods:
+            doit('%s.%s' % (cls, method_name), True, async_support = True)
 
         # wrap validators around class functions
-        for func_name in funcs + XendAPI.Base_funcs:
-            doit('%s.%s' % (cls, func_name), False)
+        for func_name, return_type in funcs + XendAPI.Base_funcs:
+            doit('%s.%s' % (cls, func_name), False, async_support = True,
+                 return_type = return_type)
+
 
 _decorate()
 
diff -r 259470f0856b -r 248a9c36d816 tools/python/xen/xend/XendAPIConstants.py
--- a/tools/python/xen/xend/XendAPIConstants.py Wed Jan 24 14:36:03 2007 +0000
+++ b/tools/python/xen/xend/XendAPIConstants.py Wed Jan 24 13:26:26 2007 +0000
@@ -74,3 +74,4 @@ XEN_API_VDI_TYPE = ['system', 'user', 'e
 XEN_API_VDI_TYPE = ['system', 'user', 'ephemeral']
 XEN_API_DRIVER_TYPE = ['ioemu', 'paravirtualised']
 XEN_API_VBD_TYPE = ['CD', 'Disk']
+XEN_API_TASK_STATUS_TYPE = ['pending', 'success', 'failure']
diff -r 259470f0856b -r 248a9c36d816 tools/python/xen/xend/XendDomainInfo.py
--- a/tools/python/xen/xend/XendDomainInfo.py   Wed Jan 24 14:36:03 2007 +0000
+++ b/tools/python/xen/xend/XendDomainInfo.py   Wed Jan 24 13:26:26 2007 +0000
@@ -44,6 +44,7 @@ from xen.xend.XendBootloader import boot
 from xen.xend.XendBootloader import bootloader, bootloader_tidy
 from xen.xend.XendError import XendError, VmError
 from xen.xend.XendDevices import XendDevices
+from xen.xend.XendTask import XendTask
 from xen.xend.xenstore.xstransact import xstransact, complete
 from xen.xend.xenstore.xsutil import GetDomainPath, IntroduceDomain, 
ResumeDomain
 from xen.xend.xenstore.xswatch import xswatch
@@ -387,12 +388,13 @@ class XendDomainInfo:
 
         if self.state == DOM_STATE_HALTED:
             try:
-                self._constructDomain()
-                self._initDomain()
-                self._storeVmDetails()
-                self._storeDomDetails()
-                self._registerWatches()
-                self.refreshShutdown()
+                XendTask.log_progress(0, 30, self._constructDomain)
+                XendTask.log_progress(31, 60, self._initDomain)
+                
+                XendTask.log_progress(61, 70, self._storeVmDetails)
+                XendTask.log_progress(71, 80, self._storeDomDetails)
+                XendTask.log_progress(81, 90, self._registerWatches)
+                XendTask.log_progress(91, 100, self.refreshShutdown)
 
                 # save running configuration if XendDomains believe domain is
                 # persistent
diff -r 259470f0856b -r 248a9c36d816 tools/python/xen/xend/XendTask.py
--- /dev/null   Thu Jan 01 00:00:00 1970 +0000
+++ b/tools/python/xen/xend/XendTask.py Wed Jan 24 13:26:26 2007 +0000
@@ -0,0 +1,226 @@
+#===========================================================================
+# This library is free software; you can redistribute it and/or
+# modify it under the terms of version 2.1 of the GNU Lesser General Public
+# License as published by the Free Software Foundation.
+#
+# This library is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+# Lesser General Public License for more details.
+#
+# You should have received a copy of the GNU Lesser General Public
+# License along with this library; if not, write to the Free Software
+# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
+#============================================================================
+# Copyright (C) 2007 XenSource Ltd
+#============================================================================
+
+from xen.xend.XendAPIConstants import XEN_API_TASK_STATUS_TYPE
+from xen.xend.XendLogging import log
+import thread
+import threading
+
+class XendTask(threading.Thread):
+    """Represents a Asynchronous Task used by Xen API.
+
+    Basically proxies the callable object in a thread and returns the
+    results via self.{type,result,error_code,error_info}.
+
+    @cvar task_progress: Thread local storage for progress tracking.
+                         It is a dict indexed by thread_id. Note that the
+                         thread_id may be reused when the previous
+                         thread with the thread_id ends.
+                         
+    @cvar task_progress_lock: lock on thread access to task_progress
+    
+    """
+
+    # progress stack:
+    # thread_id : [(start_task, end_task),
+    #              (start_sub_task, end_sub_task)..]
+    # example : (0, 100), (50, 100) (50, 100) ...
+    #           That would mean that the task is 75% complete.
+    #           as it is 50% of the last 50% of the task.
+    
+    task_progress = {}
+    task_progress_lock = threading.Lock()
+
+    def __init__(self, uuid, func, args, func_name, return_type = None,
+                 label = None, desc = None):
+        """
+        @param uuid: UUID of the task
+        @type uuid: string
+        @param func: Method to call (from XendAPI)
+        @type func: callable object
+        @param args: arguments to pass to function
+        @type args: list or tuple
+        @param label: name label of the task.
+        @type label: string
+        @param desc: name description of the task.
+        @type desc: string
+        @param func_name: function name, eg ('VM.start')
+        @type desc: string
+        """
+        
+        threading.Thread.__init__(self)
+        self.status_lock = threading.Lock()
+        self.status = XEN_API_TASK_STATUS_TYPE[0]
+
+        self.progress = 0
+        self.eta = None  # TODO: we have no time estimates
+        self.type = return_type
+        self.uuid = uuid
+        
+        self.result = None
+        self.error_code = ''
+        self.error_info = []
+        
+        self.name_label = label or func.__name__
+        self.name_description = desc
+        self.thread_id = 0
+
+        self.func_name = func_name 
+        self.func = func
+        self.args = args
+
+    def set_status(self, new_status):
+        self.status_lock.acquire()
+        try:
+            self.status = new_status
+        finally:
+            self.status_lock.release()
+
+    def get_status(self):
+        self.status_lock.acquire()
+        try:
+            return self.status
+        finally:
+            self.status_lock.release()        
+
+    def run(self):
+        """Runs the method and stores the result for later access.
+
+        Is invoked by threading.Thread.start().
+        """
+
+        self.thread_id = thread.get_ident()
+        self.task_progress_lock.acquire()
+        try:
+            self.task_progress[self.thread_id] = {}
+            self.progress = 0            
+        finally:
+            self.task_progress_lock.release()
+
+        try:
+            result = self.func(*self.args)
+            if result['Status'] == 'Success':
+                self.result = result['Value']
+                self.set_status(XEN_API_TASK_STATUS_TYPE[1])
+            else:
+                self.error_code = result['ErrorDescription'][0]
+                self.error_info = result['ErrorDescription'][1:]
+                self.set_status(XEN_API_TASK_STATUS_TYPE[2])                
+        except Exception, e:
+            log.exception('Error running Async Task')
+            self.error_code = 'INTERNAL ERROR'
+            self.error_info = [str(e)]
+            self.set_status(XEN_API_TASK_STATUS_TYPE[2])
+
+        self.task_progress_lock.acquire()
+        try:
+            del self.task_progress[self.thread_id]
+            self.progress = 100
+        finally:
+            self.task_progress_lock.release()
+    
+    def get_record(self):
+        """Returns a Xen API compatible record."""
+        return {
+            'uuid': self.uuid,            
+            'name_label': self.name_label,
+            'name_description': self.name_description,
+            'status': self.status,
+            'progress': self.get_progress(),
+            'eta': self.eta,
+            'type': self.type,
+            'result': self.result,
+            'error_code': self.error_code,
+            'error_info': self.error_info,
+        }
+
+    def get_progress(self):
+        """ Checks the thread local progress storage. """
+        if self.status != XEN_API_TASK_STATUS_TYPE[0]:
+            return 100
+        
+        self.task_progress_lock.acquire()
+        try:
+            # Pop each progress range in the stack and map it on to
+            # the next progress range until we find out cumulative
+            # progress based on the (start, end) range of each level
+            start = 0
+            prog_stack = self.task_progress.get(self.thread_id, [])[:]
+            if len(prog_stack) > 0:
+                start, stop = prog_stack.pop()
+                while prog_stack:
+                    new_start, new_stop = prog_stack.pop()
+                    start = new_start + ((new_stop - new_start)/100.0 * start)
+
+            # only update progress if it increases, this will prevent
+            # progress from going backwards when tasks are popped off
+            # the stack
+            if start > self.progress:
+                self.progress = int(start)
+        finally:
+            self.task_progress_lock.release()
+
+        return self.progress
+
+
+    def log_progress(cls, progress_min, progress_max,
+                     func, *args, **kwds):
+        """ Callable function wrapper that logs the progress of the
+        function to thread local storage for task progress calculation.
+
+        This is a class method so other parts of Xend will update
+        the task progress by calling:
+
+        XendTask.push_progress(progress_min, progress_max,
+                               func, *args, **kwds)
+
+        The results of the progress is stored in thread local storage
+        and the result of the func(*args, **kwds) is returned back
+        to the caller.
+
+        """
+        thread_id = thread.get_ident()
+        retval = None
+
+        # Log the start of the method
+        cls.task_progress_lock.acquire()
+        try:
+            if type(cls.task_progress.get(thread_id)) != list:
+                cls.task_progress[thread_id] = []
+                    
+            cls.task_progress[thread_id].append((progress_min,
+                                                 progress_max))
+        finally:
+            cls.task_progress_lock.release()
+
+        # Execute the method
+        retval = func(*args, **kwds)
+
+        # Log the end of the method by popping the progress range
+        # off the stack.
+        cls.task_progress_lock.acquire()
+        try:
+            cls.task_progress[thread_id].pop()
+        finally:
+            cls.task_progress_lock.release()
+
+        return retval
+
+    log_progress = classmethod(log_progress)
+
+    
+
diff -r 259470f0856b -r 248a9c36d816 tools/python/xen/xend/XendTaskManager.py
--- /dev/null   Thu Jan 01 00:00:00 1970 +0000
+++ b/tools/python/xen/xend/XendTaskManager.py  Wed Jan 24 13:26:26 2007 +0000
@@ -0,0 +1,110 @@
+#===========================================================================
+# This library is free software; you can redistribute it and/or
+# modify it under the terms of version 2.1 of the GNU Lesser General Public
+# License as published by the Free Software Foundation.
+#
+# This library is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+# Lesser General Public License for more details.
+#
+# You should have received a copy of the GNU Lesser General Public
+# License along with this library; if not, write to the Free Software
+# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
+#============================================================================
+# Copyright (C) 2007 XenSource Ltd
+#============================================================================
+
+"""
+Task Manager for Xen API asynchronous tasks.
+
+Stores all tasks in a simple dictionary in module's own local storage to
+avoid the 'instance()' methods.
+
+Tasks are indexed by UUID.
+
+"""
+
+from xen.xend.XendTask import XendTask
+from xen.xend import uuid
+import threading
+
+tasks = {}
+tasks_lock = threading.Lock()
+
+def create_task(func, args, func_name, return_type = None, label = ''):
+    """Creates a new Task and registers it with the XendTaskManager.
+
+    @param func: callable object XMLRPC method
+    @type func: callable object
+    @param args: tuple or list of arguments
+    @type args: tuple or list
+    @param func_name: XMLRPC method name, so we can estimate the progress
+    @type func_name: string
+    
+    @return: Task UUID
+    @rtype: string.
+    """
+    task_uuid = uuid.createString()
+    try:
+        tasks_lock.acquire()
+        task = XendTask(task_uuid, func, args, func_name,
+                        return_type = return_type, label = label)
+        tasks[task_uuid] = task
+    finally:
+        tasks_lock.release()
+
+    task.start()
+
+    return task_uuid
+
+def destroy_task(task_uuid):
+    """Destroys a task.
+
+    @param task_uuid: Task UUID
+    @type task_uuid: string.
+    """
+    try:
+        tasks_lock.acquire()
+        if task_uuid in tasks:
+            del tasks[task_uuid]
+    finally:
+        tasks_lock.release()
+
+def get_all_tasks():
+    """ Returns all the UUID of tracked tasks, completed or pending.
+
+    @returns: list of UUIDs
+    @rtype: list of strings
+    """
+    try:
+        tasks_lock.acquire()
+        return tasks.keys()
+    finally:
+        tasks_lock.release()
+
+def get_task(task_uuid):
+    """ Retrieves a task by UUID.
+
+    @rtype: XendTask or None
+    @return: Task denoted by UUID.
+    """
+    try:
+        tasks_lock.acquire()
+        return tasks.get(task_uuid)
+    finally:
+        tasks_lock.release()
+
+def get_tasks_by_name(task_name):
+    """ Retrieves a task by UUID.
+
+    @rtype: XendTask or None
+    @return: Task denoted by UUID.
+    """
+    try:
+        tasks_lock.acquire()
+        return [t.uuid for t in tasks if t.name_label == name]
+    finally:
+        tasks_lock.release()        
+
+
diff -r 259470f0856b -r 248a9c36d816 
tools/python/xen/xend/server/XMLRPCServer.py
--- a/tools/python/xen/xend/server/XMLRPCServer.py      Wed Jan 24 14:36:03 
2007 +0000
+++ b/tools/python/xen/xend/server/XMLRPCServer.py      Wed Jan 24 13:26:26 
2007 +0000
@@ -139,6 +139,8 @@ class XMLRPCServer:
                 meth = getattr(self.xenapi, meth_name)
                 if callable(meth) and hasattr(meth, 'api'):
                     self.server.register_function(meth, getattr(meth, 'api'))
+
+        self.server.register_instance(XendAPI.XendAPIAsyncProxy(self.xenapi))
                 
         # Legacy deprecated xm xmlrpc api
         # --------------------------------------------------------------------

_______________________________________________
Xen-changelog mailing list
Xen-changelog@xxxxxxxxxxxxxxxxxxx
http://lists.xensource.com/xen-changelog


 


Rackspace

Lists.xenproject.org is hosted with RackSpace, monitoring our
servers 24x7x365 and backed by RackSpace's Fanatical Support®.