[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[Xen-API] [PATCH 2 of 3] CP-1884: implementation of VDI.copy can now use the import_raw_vdi HTTP handler to copy between hosts



# HG changeset patch
# User David Scott <dave.scott@xxxxxxxxxxxxx>
# Date 1282565148 -3600
# Node ID f9725bc799887fb205bf1da8cf3474288529dc90
# Parent  23fa063db91d4eae0179fb29179002a85a75cf31
CP-1884: implementation of VDI.copy can now use the import_raw_vdi HTTP handler 
to copy between hosts.

Signed-off-by: David Scott <dave.scott@xxxxxxxxxxxxx>

diff -r 23fa063db91d -r f9725bc79988 ocaml/xapi/import_raw_vdi.ml
--- a/ocaml/xapi/import_raw_vdi.ml      Mon Aug 23 13:05:47 2010 +0100
+++ b/ocaml/xapi/import_raw_vdi.ml      Mon Aug 23 13:05:48 2010 +0100
@@ -23,14 +23,16 @@
 open Sparse_encoding
 open Unixext
 open Pervasiveext
+open Client
 
 let receive_chunks (s: Unix.file_descr) (fd: Unix.file_descr) = 
        Chunk.fold (fun () -> Chunk.write fd) () s
 
 let vdi_of_req ~__context (req: request) = 
+       let all = req.Http.query @ req.Http.cookie in
        let vdi = 
-               if List.mem_assoc "vdi" req.Http.query
-               then List.assoc "vdi" req.Http.query
+               if List.mem_assoc "vdi" all
+               then List.assoc "vdi" all
                else raise (Failure "Missing vdi query parameter") in
        if Db_cache.DBCache.is_valid_ref vdi 
        then Ref.of_string vdi 
@@ -40,21 +42,25 @@
   req.close <- true;
   Xapi_http.with_context "Importing raw VDI" req s
     (fun __context ->
+       let all = req.Http.query @ req.Http.cookie in
       let vdi = vdi_of_req ~__context req in
-      let chunked = List.mem_assoc "chunked" req.Http.query in
-      try
-       match req.transfer_encoding, req.content_length with
-       | Some "chunked", _ ->
+      let chunked = List.mem_assoc "chunked" all in
+      let task_id = Context.get_task_id __context in
+        debug "import_raw_vdi task_id = %s vdi = %s; chunked = %b" 
(Ref.string_of task_id) (Ref.string_of vdi) chunked;
+        try
+       match req.transfer_encoding with
+       | Some "chunked" ->
            error "Chunked encoding not yet implemented in the import code";
            Http_svr.headers s http_403_forbidden;
            raise (Failure "import code cannot handle chunked encoding")
-       | None, Some len ->
+       | None ->
            let headers = Http.http_200_ok ~keep_alive:false () @
-             [ Http.task_id_hdr ^ ":" ^ (Ref.string_of (Context.get_task_id 
__context));
+             [ Http.task_id_hdr ^ ":" ^ (Ref.string_of task_id);
                content_type ] in
             Http_svr.headers s headers;
            
-
+               Server_helpers.exec_with_new_task "VDI.import" 
+               (fun __context -> 
                 Sm_fs_ops.with_block_attached_device __context rpc session_id 
vdi `RW
                   (fun device ->
                      let fd = Unix.openfile device  [ Unix.O_WRONLY ] 0 in
@@ -63,16 +69,18 @@
                           try
                             if chunked
                             then receive_chunks s fd
-                            else ignore(Unixext.copy_file ~limit:len s fd);
-                            Unixext.fsync fd
+                            else ignore(Unixext.copy_file 
?limit:req.content_length s fd);
+                            Unixext.fsync fd;
                           with Unix.Unix_error(Unix.EIO, _, _) ->
                             raise (Api_errors.Server_error 
(Api_errors.vdi_io_error, ["Device I/O errors"]))
                        )
                        (fun () -> Unix.close fd)
-                  );
-
-           TaskHelper.complete ~__context []
+                  )
+           );
+           TaskHelper.complete ~__context [];
       with e ->
+       error "Caught exception: %s" (ExnHelper.string_of_exn e);
+       log_backtrace ();
        TaskHelper.failed ~__context (Api_errors.internal_error, ["Caught 
exception: " ^ (ExnHelper.string_of_exn e)]);
        raise e)
 
diff -r 23fa063db91d -r f9725bc79988 ocaml/xapi/message_forwarding.ml
--- a/ocaml/xapi/message_forwarding.ml  Mon Aug 23 13:05:47 2010 +0100
+++ b/ocaml/xapi/message_forwarding.ml  Mon Aug 23 13:05:48 2010 +0100
@@ -2862,9 +2862,9 @@
     let copy ~__context ~vdi ~sr =
       info "VDI.copy: VDI = '%s'; SR = '%s'" (vdi_uuid ~__context vdi) 
(sr_uuid ~__context sr);
       let local_fn = Local.VDI.copy ~vdi ~sr in
-      let src_sr = Db.VDI.get_SR ~__context ~self:vdi in
       (* No need to lock the VDI because the VBD.plug will do that for us *)
-      SR.forward_sr_multiple_op ~local_fn ~__context ~srs:[src_sr;sr] 
+      (* Forward the request to a host which can read the source VDI *)
+      forward_vdi_op ~local_fn ~__context ~self:vdi
        (fun session_id rpc -> Client.VDI.copy rpc session_id vdi sr)
 
     let resize ~__context ~vdi ~size =
diff -r 23fa063db91d -r f9725bc79988 ocaml/xapi/sm_fs_ops.ml
--- a/ocaml/xapi/sm_fs_ops.ml   Mon Aug 23 13:05:47 2010 +0100
+++ b/ocaml/xapi/sm_fs_ops.ml   Mon Aug 23 13:05:48 2010 +0100
@@ -41,6 +41,35 @@
     | vdi :: vdis -> with_block_attached_device __context rpc session_id vdi 
mode (fun path -> loop (path :: acc) vdis) in
   loop [] vdis
 
+(** Open an import_raw_vdi HTTP connection and run [f] with the socket *)
+let with_import_vdi __context rpc session_id vdi f = 
+       let subtask_of = Context.get_task_id __context in
+       Server_helpers.exec_with_new_task "VDI.import" 
+       (fun __context -> 
+               (* Find a suitable host for the SR containing the VDI *)
+               let sr = Db.VDI.get_SR ~__context ~self:vdi in
+               let host = Importexport.find_host_for_sr ~__context sr in
+               let address = Db.Host.get_address ~__context ~self:host in
+               let importtask = Client.Task.create rpc session_id "VDI.import" 
"" in
+
+                       let headers = Xapi_http.http_request 
+                               ~cookie:(["session_id", Ref.string_of 
session_id;
+                                         "task_id", Ref.string_of importtask;
+                                         "vdi", Ref.string_of vdi;
+                                         "chunked", "true"])
+                               Http.Put address Constants.import_raw_vdi_uri in
+                       let writer _ _ sock = f sock; true in
+                       if not (Xmlrpcclient.do_secure_http_rpc 
~use_stunnel_cache:false 
+                               ~task_id:(Ref.string_of (Context.get_task_id 
__context))
+                               ~host:address ~port:Xapi_globs.default_ssl_port 
~headers ~body:"" writer)
+                       then failwith "with_import_vdi";
+                       debug "Waiting for import task (%s) to complete" 
(Ref.string_of importtask);
+                       (* wait for the task to complete before cleaning 
anything up *)
+                       while Client.Task.get_status rpc session_id importtask 
= `pending do
+                               Thread.delay 1.;
+                       done;
+                       Client.Task.destroy rpc session_id importtask;
+       )
 
 (** Catch those smint exceptions and convert into meaningful internal errors *)
 let with_api_errors f x = 
@@ -141,8 +170,37 @@
 exception Cancelled
 exception NonZero
 
+(** The copying routine can operate on anything which looks like a 
file-descriptor/Stream *)
+module type Stream = sig
+       type stream
+       val write: stream -> int64 -> string -> int -> int -> unit
+end
+
+(** Writes directly to a file *)
+module FileStream = struct
+       type stream = Unix.file_descr
+       let write stream stream_offset buf off len =
+               let newoff = Unix.LargeFile.lseek stream stream_offset 
Unix.SEEK_SET in
+               (* Printf.printf "Unix.write buf len %d; offset %d; len %d\n" 
(String.length buf) offset len; *)
+               let n = Unix.write stream buf off len in
+               if n < len then failwith "Short write"
+end
+
+(** Marshals data across the network in chunks *)
+module NetworkStream = struct
+       open Sparse_encoding
+       type stream = Unix.file_descr
+       let write stream stream_offset buf off len =
+               let copy = String.create len in
+               String.blit buf off copy 0 len;
+               let x = { Chunk.start = stream_offset; data = copy } in
+               Chunk.marshal stream x
+end
+
+module DD(Output: Stream) = struct
+
 (* dd with sparseness check *)
-let sparse_dd refresh_session ~__context sparse ifd ofd size bs =
+let sparse_dd refresh_session ~__context sparse ifd stream size bs : unit =
   let round v = int_of_float (v *. 50.0) in
   let update = 
     let oldvalue = ref (-1.0) in
@@ -178,20 +236,22 @@
       begin
        let this_chunk = Int64.to_int (min remaining (Int64.of_int bs)) in
        Unixext.really_read ifd buf 0 this_chunk;
-       begin
-         if sparse && (allzero buf this_chunk)
-          then
-           ignore(Unix.LargeFile.lseek ofd (Int64.of_int this_chunk) 
Unix.SEEK_CUR)
-         else
-           let n = Unix.write ofd buf 0 this_chunk in
-           (if n<this_chunk then failwith "Error!")
-       end;
+
+       if not sparse || (not (allzero buf this_chunk))
+       then Output.write stream offset buf 0 this_chunk;
+
        do_block (Int64.add offset (Int64.of_int this_chunk))
       end
   in
   do_block 0L;
+  Output.write stream 0L "" 0 0; (* end of stream is a zero-sized chunk *)
   update 1.0
 
+end
+
+module LocalDD = DD(FileStream)
+module RemoteDD = DD(NetworkStream)
+
 (* SCTX-286: thin provisioning is thrown away over VDI.copy, 
VM.import(VM.export).
    Return true if the newly created vdi must have zeroes written into it; 
default to false
    under the assumption that "proper" storage devices (ie not our legacy LVM 
stuff) always 
@@ -232,33 +292,47 @@
   (* Use the sparse copy unless we must write zeroes into the new VDI *)
   let sparse = not (must_write_zeroes_into_new_vdi ~__context vdi_dst) in
 
+  (* Copy locally unless this host can't see the destination SR *)
+  let sr_dst = Db.VDI.get_SR ~__context ~self:vdi_dst in
+  let local_copy = Importexport.check_sr_availability ~__context sr_dst in
+
   let size = Db.VDI.get_virtual_size ~__context ~self:vdi_src in
   let blocksize = 1024*1024 in
 
-  debug "Sm_fs_ops.copy_vdi: copying %Ld in blocks of %d%s preserving 
sparseness" size blocksize (if sparse then "" else " NOT");
+  debug "Sm_fs_ops.copy_vdi: %s-copying %Ld in blocks of %d%s preserving 
sparseness" (if local_copy then "locally" else "remotely") size blocksize (if 
sparse then "" else " NOT");
 
-  let dd = sparse_dd refresh_session ~__context sparse in
+  let local_dd = LocalDD.sparse_dd refresh_session ~__context sparse in
+  let remote_dd = RemoteDD.sparse_dd refresh_session ~__context sparse in
 
-  with_block_attached_device __context rpc session_id vdi_src `RO
-    (fun device_src ->
-       with_block_attached_device __context rpc session_id vdi_dst `RW
-        (fun device_dst ->
-           let ifd=Unix.openfile device_src [Unix.O_RDONLY] 0o600 
-           and ofd=Unix.openfile device_dst [Unix.O_WRONLY; Unix.O_SYNC] 0o600 
in
-           finally
-             (fun () ->
-                try
-                  dd ifd ofd size blocksize;
-                with
-                  | Unix.Unix_error(Unix.EIO, _, _) ->
-                      raise (Api_errors.Server_error (Api_errors.vdi_io_error, 
["Device I/O error"]))
-                  | e ->
-                      debug "Caught exception %s" (ExnHelper.string_of_exn e);
-                      log_backtrace ())
-             (fun () ->
-                Unix.close ifd;
-                Unix.close ofd)
-        )
-    )
-  )
+try
+       with_block_attached_device __context rpc session_id vdi_src `RO
+       (fun device_src ->
+               let ifd=Unix.openfile device_src [Unix.O_RDONLY] 0o600 in
+               finally
+               (fun () ->
+                       if local_copy 
+                       then with_block_attached_device __context rpc 
session_id vdi_dst `RW
+                               (fun device_dst ->
+                                       let ofd=Unix.openfile device_dst 
[Unix.O_WRONLY; Unix.O_SYNC] 0o600 in
+                                       finally
+                                       (fun () ->
+                                               local_dd ifd ofd size blocksize
+                                       )
+                                       (fun () -> Unix.close ofd)
 
+                               )
+                       else with_import_vdi __context rpc session_id vdi_dst
+                               (fun ofd ->
+                                       remote_dd ifd ofd size blocksize
+                               )
+               )
+               (fun () -> Unix.close ifd)
+       )
+with
+| Unix.Unix_error(Unix.EIO, _, _) ->
+       raise (Api_errors.Server_error (Api_errors.vdi_io_error, ["Device I/O 
error"]))
+| e ->
+       debug "Caught exception %s" (ExnHelper.string_of_exn e);
+       log_backtrace ();
+       raise e
+)
diff -r 23fa063db91d -r f9725bc79988 ocaml/xapi/sparse_encoding.ml
--- a/ocaml/xapi/sparse_encoding.ml     Mon Aug 23 13:05:47 2010 +0100
+++ b/ocaml/xapi/sparse_encoding.ml     Mon Aug 23 13:05:48 2010 +0100
@@ -85,13 +85,14 @@
        }
 
        let really_write fd offset buf off len = 
-               ignore(Unix.LargeFile.lseek fd offset Unix.SEEK_SET);
                let n = Unix.write fd buf off len in
                if n < len 
                then failwith "Short write: attempted to write %d bytes at %Ld, 
only wrote %d" len offset n
 
        (** Writes a single block of data to the output device *)
-       let write fd x = really_write fd x.start x.data 0 (String.length x.data)
+       let write fd x = 
+               ignore(Unix.LargeFile.lseek fd x.start Unix.SEEK_SET);
+               really_write fd x.start x.data 0 (String.length x.data)
 
        (** Reads a type t from a file descriptor *)
        let unmarshal fd = 
diff -r 23fa063db91d -r f9725bc79988 ocaml/xapi/xapi_vdi.ml
--- a/ocaml/xapi/xapi_vdi.ml    Mon Aug 23 13:05:47 2010 +0100
+++ b/ocaml/xapi/xapi_vdi.ml    Mon Aug 23 13:05:48 2010 +0100
@@ -460,7 +460,6 @@
        raise e)
 
 let copy ~__context ~vdi ~sr =
-  Sm.assert_pbd_is_plugged ~__context ~sr;
   Xapi_vdi_helpers.assert_managed ~__context ~vdi;
   let task_id = Ref.string_of (Context.get_task_id __context) in
 
@@ -489,8 +488,10 @@
     dst
   with 
       e -> 
-       destroy ~__context ~self:dst;
-       raise e
+      Helpers.call_api_functions ~__context
+      (fun rpc session_id -> Client.VDI.destroy rpc session_id dst);
+      raise e
+
 
 
 let force_unlock ~__context ~vdi = 
 ocaml/xapi/import_raw_vdi.ml     |   36 ++++++---
 ocaml/xapi/message_forwarding.ml |    4 +-
 ocaml/xapi/sm_fs_ops.ml          |  140 +++++++++++++++++++++++++++++---------
 ocaml/xapi/sparse_encoding.ml    |    5 +-
 ocaml/xapi/xapi_vdi.ml           |    7 +-
 5 files changed, 138 insertions(+), 54 deletions(-)


Attachment: xen-api.hg-2.patch
Description: Text Data

_______________________________________________
xen-api mailing list
xen-api@xxxxxxxxxxxxxxxxxxx
http://lists.xensource.com/mailman/listinfo/xen-api

 


Rackspace

Lists.xenproject.org is hosted with RackSpace, monitoring our
servers 24x7x365 and backed by RackSpace's Fanatical Support®.