[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index] [Xen-devel] [PATCH v6.1 05/11] libxl_qmp: Implementation of libxl__ev_qmp_*
Signed-off-by: Anthony PERARD <anthony.perard@xxxxxxxxxx> --- tools/libxl/libxl_internal.h | 35 ++ tools/libxl/libxl_qmp.c | 683 +++++++++++++++++++++++++++++++++++ tools/libxl/libxl_types.idl | 6 + 3 files changed, 724 insertions(+) diff --git a/tools/libxl/libxl_internal.h b/tools/libxl/libxl_internal.h index 2b7e343775..fd84b60982 100644 --- a/tools/libxl/libxl_internal.h +++ b/tools/libxl/libxl_internal.h @@ -411,12 +411,47 @@ _hidden int libxl__ev_qmp_send(libxl__gc *gc, libxl__ev_qmp *ev, const char *cmd, libxl__json_object *args); _hidden void libxl__ev_qmp_dispose(libxl__gc *gc, libxl__ev_qmp *ev); +typedef enum { + /* initial state */ + qmp_state_disconnected = 1, + /* connected to QMP socket, waiting for greeting message */ + qmp_state_connecting, + /* qmp_capabilities command sent, waiting for reply */ + qmp_state_capability_negotiation, + /* ready to send commands */ + qmp_state_connected, + /* cmd sent, waiting for reply */ + qmp_state_waiting_reply, +} libxl__qmp_state; + struct libxl__ev_qmp { /* caller should include this in their own struct */ /* caller must fill these in, and they must all remain valid */ libxl_domid domid; libxl__ev_qmp_callback *callback; int fd; /* set to send a fd with the command, -1 otherwise */ + + /* + * remaining fields are private to libxl_ev_qmp_* + */ + + int id; + libxl__carefd *qmp_cfd; + libxl__ev_fd qmp_efd; + libxl__qmp_state qmp_state; + /* receive buffer, with: + * rx_buf_size: current allocated size, + * rx_buf_used: actual data in the buffer */ + char *rx_buf; + size_t rx_buf_size; + size_t rx_buf_used; + /* sending buffer */ + char *tx_buf; + size_t tx_buf_len; + size_t tx_buf_off; + /* The message to send when ready */ + char *msg; + size_t msg_len; }; diff --git a/tools/libxl/libxl_qmp.c b/tools/libxl/libxl_qmp.c index 218ee3ff09..07aa0dd89a 100644 --- a/tools/libxl/libxl_qmp.c +++ b/tools/libxl/libxl_qmp.c @@ -75,11 +75,18 @@ # define DEBUG_REPORT_RECEIVED(dom, buf, len) ((void)0) #endif +#ifdef DEBUG_QMP_CLIENT +# define LOG_QMP(f, ...) LOGD(DEBUG, ev->domid, f, ##__VA_ARGS__) +#else +# define LOG_QMP(f, ...) +#endif + /* * QMP types & constant */ #define QMP_RECEIVE_BUFFER_SIZE 4096 +#define QMP_MAX_SIZE_RX_BUF MB(8) #define PCI_PT_QDEV_ID "pci-pt-%02x_%02x.%01x" /* @@ -1312,6 +1319,682 @@ int libxl__qmp_initializations(libxl__gc *gc, uint32_t domid, return ret; } +/* ------------ Implementation of libxl__ev_qmp ---------------- */ + +/* + * Possible internal state compared to qmp_state: + * + * qmp_state disconnected connecting capability connected waiting + * _negotiation _reply + * External Idle Active Active Connected Active + * qmp_cfd close open open open open + * qmp_efd Idle Active Active Active Active + * id reset set set prev[1] set + * rx_buf* free used used used used + * tx_buf* free free used/free free used/free + * msg* free set set free/set free + * + * [1] id used on the previously sent command + * + * Possible buffers states: + * - receiving buffer: + * free used + * rx_buf NULL allocated + * rx_buf_size 0 allocation size of `rx_buf` + * rx_buf_off 0 <= rx_buf_size + * - transmitted buffer: + * free used + * tx_buf NULL contain data + * tx_buf_len 0 size of data + * tx_buf_off 0 <= tx_buf_len + * - queued user command: + * free set + * msg NULL contain data + * msg_len 0 size of data + * + * - Allowed internal state transition: + * disconnected -> connecting + * connection -> capability_negotiation + * capability_negotiation/waiting_reply -> connected + * connected -> waiting_reply + * any -> disconnected + */ + +/* hard coded message ID used for capability negotiation ("xlq\0") */ +#define QMP_CAPABILITY_NEGOTIATION_MSGID 0x786c7100 + +/* prototypes */ + +static void qmp_ev_fd_callback(libxl__egc *egc, libxl__ev_fd *ev_fd, + int fd, short events, short revents); +static int qmp_ev_callback_writable(libxl__gc *gc, + libxl__ev_qmp *ev, int fd); +static int qmp_ev_callback_readable(libxl__egc *egc, + libxl__ev_qmp *ev, int fd); +static int qmp_ev_get_next_msg(libxl__egc *egc, libxl__ev_qmp *ev, + libxl__json_object **o_r); +static int qmp_ev_handle_message(libxl__egc *egc, + libxl__ev_qmp *ev, + const libxl__json_object *resp); + +/* helpers */ + +static void qmp_ev_ensure_reading_writing(libxl__gc *gc, libxl__ev_qmp *ev) +{ + bool enable = false; + short events; + + if (ev->tx_buf) { + enable = true; + } else { + enable = (ev->qmp_state == qmp_state_connected) && ev->msg; + } + + if (enable) + events = ev->qmp_efd.events | POLLOUT; + else + events = ev->qmp_efd.events & ~POLLOUT; + + libxl__ev_fd_modify(gc, &ev->qmp_efd, events); +} + +static void qmp_ev_set_state(libxl__gc *gc, libxl__ev_qmp *ev, + libxl__qmp_state new_state) +{ + libxl__qmp_state cur = ev->qmp_state; + switch (new_state) { + case qmp_state_disconnected: + break; + case qmp_state_connecting: + assert(cur == qmp_state_disconnected); + break; + case qmp_state_capability_negotiation: + assert(cur == qmp_state_connecting); + break; + case qmp_state_connected: + assert(cur == qmp_state_capability_negotiation || + cur == qmp_state_waiting_reply); + break; + case qmp_state_waiting_reply: + assert(cur == qmp_state_connected); + break; + } + + ev->qmp_state = new_state; +} + +static int qmp_error_class_to_libxl_error_code(libxl__gc *gc, + const char *eclass) +{ + const libxl_enum_string_table *t = libxl_error_string_table; + + /* compare "QMP_GENERIC_ERROR" from libxl_error to "GenericError" + * generated by the QMP server */ + + for ( ; t->s; t++) { + const char *s = eclass; + const char *se = t->s; + if (strncasecmp(t->s, "QMP_", 4)) + continue; + + /* skip "QMP_" */ + se += 4; + while (*s && *se) { + /* skip underscores */ + if (*se == '_') { + se++; + continue; + } + if (tolower(*s) != tolower(*se)) + break; + s++, se++; + } + if (!*s && !*se) + return t->v; + } + + return ERROR_UNKNOWN_QMP_ERROR; +} + +static int qmp_ev_prepare_cmd(libxl__gc *gc, + libxl__ev_qmp *ev, + const char *cmd, + const libxl__json_object *args) +{ + char *buf = NULL; + size_t len; + + assert(!ev->tx_buf && !ev->tx_buf_len); + assert(!ev->msg && !ev->msg_len); + + ev->id++; + buf = qmp_prepare_cmd(gc, cmd, args, ev->id, &len); + if (!buf) { + return ERROR_FAIL; + } + + ev->msg = buf; + ev->msg_len = len; + + return 0; +} + +/* Setup connection */ + +static int qmp_ev_connect(libxl__gc *gc, libxl__ev_qmp *ev) + /* disconnected -> connecting + * If the initial state isn't disconnected, then nothing is done */ +{ + int fd; + int rc, r; + struct sockaddr_un un; + const char *qmp_socket_path; + + if (ev->qmp_state != qmp_state_disconnected) + return 0; + + qmp_socket_path = libxl__qemu_qmp_path(gc, ev->domid); + + LOGD(DEBUG, ev->domid, "Connecting to %s", qmp_socket_path); + + libxl__carefd_begin(); + fd = socket(AF_UNIX, SOCK_STREAM, 0); + ev->qmp_cfd = libxl__carefd_opened(CTX, fd); + if (!ev->qmp_cfd) { + LOGED(ERROR, ev->domid, "socket() failed"); + rc = ERROR_FAIL; + goto out; + } + rc = libxl_fd_set_nonblock(CTX, libxl__carefd_fd(ev->qmp_cfd), 1); + if (rc) + goto out; + + rc = libxl__prepare_sockaddr_un(gc, &un, qmp_socket_path, + "QMP socket"); + if (rc) + goto out; + + r = connect(libxl__carefd_fd(ev->qmp_cfd), + (struct sockaddr *) &un, sizeof(un)); + if (r && errno != EINPROGRESS) { + LOGED(ERROR, ev->domid, "Failed to connect to QMP socket %s", + qmp_socket_path); + rc = ERROR_FAIL; + goto out; + } + + rc = libxl__ev_fd_register(gc, &ev->qmp_efd, qmp_ev_fd_callback, + libxl__carefd_fd(ev->qmp_cfd), POLLIN); + if (rc) + goto out; + + qmp_ev_set_state(gc, ev, qmp_state_connecting); + + return 0; + +out: + libxl__carefd_close(ev->qmp_cfd); + ev->qmp_cfd = NULL; + return rc; +} + +/* QMP FD callbacks */ + +static void qmp_ev_fd_callback(libxl__egc *egc, libxl__ev_fd *ev_fd, + int fd, short events, short revents) +{ + EGC_GC; + int rc; + libxl__json_object *o = NULL; + libxl__ev_qmp *ev = CONTAINER_OF(ev_fd, *ev, qmp_efd); + + if (revents & (POLLHUP)) { + LOGD(ERROR, ev->domid, "received POLLHUP from QMP socket"); + rc = ERROR_PROTOCOL_ERROR_QMP; + goto out; + } + if (revents & ~(POLLIN|POLLOUT)) { + LOGD(ERROR, ev->domid, + "unexpected poll event 0x%x on QMP socket (expected POLLIN " + "and/or POLLOUT)", + revents); + rc = ERROR_FAIL; + goto out; + } + + if (revents & POLLOUT) { + rc = qmp_ev_callback_writable(gc, ev, fd); + if (rc) + goto out; + } + + if (revents & POLLIN) { + rc = qmp_ev_callback_readable(egc, ev, fd); + if (rc) + goto out; + + /* parse input */ + while (1) { + /* parse rx buffer to find one json object */ + rc = qmp_ev_get_next_msg(egc, ev, &o); + if (rc == ERROR_NOTFOUND) { + rc = 0; + break; + } else if (rc) + goto out; + + /* Must be last and return when the user callback is called */ + rc = qmp_ev_handle_message(egc, ev, o); + if (rc < 0) + goto out; + if (rc == 1) { + /* user callback has been called */ + return; + } + } + } + + qmp_ev_ensure_reading_writing(gc, ev); + +out: + if (rc) { + LOGD(ERROR, ev->domid, + "Error happend with the QMP connection to QEMU"); + + /* On error, deallocate all private ressources */ + libxl__ev_qmp_dispose(gc, ev); + + /* And tell libxl__ev_qmp user about the error */ + ev->callback(egc, ev, NULL, rc); /* must be last */ + } +} + +static int qmp_ev_callback_writable(libxl__gc *gc, + libxl__ev_qmp *ev, int fd) + /* connected -> waiting_reply + * the state isn't change otherwise. */ +{ + int rc; + ssize_t r; + + if (ev->qmp_state == qmp_state_connected) { + assert(!ev->tx_buf); + if (ev->msg) { + ev->tx_buf = ev->msg; + ev->tx_buf_len = ev->msg_len; + ev->tx_buf_off = 0; + ev->msg = NULL; + ev->msg_len = 0; + qmp_ev_set_state(gc, ev, qmp_state_waiting_reply); + } + } + + if (!ev->tx_buf) + return 0; + + LOG_QMP("sending: '%.*s'", (int)ev->tx_buf_len, ev->tx_buf); + + /* + * We will send a file descriptor associated with a command on the + * first byte of this command. + */ + if (ev->qmp_state == qmp_state_waiting_reply && + ev->fd >= 0 && + ev->tx_buf_off == 0) { + + rc = libxl__sendmsg_fds(gc, fd, ev->tx_buf, 1, + 1, &ev->fd, "QMP socket"); + /* Check for EWOULDBLOCK, and return to try again later */ + if (rc == ERROR_NOT_READY) + return 0; + if (rc) + return rc; + ev->tx_buf_off++; + } + + while (ev->tx_buf_off < ev->tx_buf_len) { + r = write(fd, ev->tx_buf + ev->tx_buf_off, + ev->tx_buf_len - ev->tx_buf_off); + if (r < 0) { + if (errno == EINTR) + continue; + if (errno == EWOULDBLOCK) + break; + LOGED(ERROR, ev->domid, "failed to write to QMP socket"); + return ERROR_FAIL; + } + ev->tx_buf_off += r; + } + + if (ev->tx_buf_off == ev->tx_buf_len) { + free(ev->tx_buf); + ev->tx_buf = NULL; + ev->tx_buf_len = ev->tx_buf_off = 0; + } + + return 0; +} + +static int qmp_ev_callback_readable(libxl__egc *egc, + libxl__ev_qmp *ev, int fd) + /* on error: * -> disconnected */ +{ + EGC_GC; + + while (1) { + ssize_t r; + + /* Check if the buffer still have space, or increase size */ + if (ev->rx_buf_size - ev->rx_buf_used < QMP_RECEIVE_BUFFER_SIZE) { + ev->rx_buf_size = max(ev->rx_buf_size * 2, + (size_t)QMP_RECEIVE_BUFFER_SIZE * 2); + assert(ev->rx_buf_size <= QMP_MAX_SIZE_RX_BUF); + if (ev->rx_buf_size > QMP_MAX_SIZE_RX_BUF) { + LOGD(ERROR, ev->domid, + "QMP receive buffer is too big (%ld > %lld)", + ev->rx_buf_size, QMP_MAX_SIZE_RX_BUF); + return ERROR_BUFFERFULL; + } + ev->rx_buf = libxl__realloc(NOGC, ev->rx_buf, ev->rx_buf_size); + } + + r = read(fd, ev->rx_buf + ev->rx_buf_used, + ev->rx_buf_size - ev->rx_buf_used); + if (r < 0) { + if (errno == EINTR) { + continue; + } + if (errno == EWOULDBLOCK) { + break; + } + LOGED(ERROR, ev->domid, "error reading QMP socket"); + return ERROR_FAIL; + } + + if (r == 0) { + LOGD(ERROR, ev->domid, "Unexpected EOF on QMP socket"); + return ERROR_PROTOCOL_ERROR_QMP; + } + + LOG_QMP("received %ldB: '%.*s'", r, + (int)r, ev->rx_buf + ev->rx_buf_used); + + ev->rx_buf_used += r; + assert(ev->rx_buf_used <= ev->rx_buf_size); + } + + return 0; +} + +/* Handle messages received from QMP server */ + +static int qmp_ev_get_next_msg(libxl__egc *egc, libxl__ev_qmp *ev, + libxl__json_object **o_r) + /* Find a JSON object and store it in o_r. + * return ERROR_NOTFOUND if no object is found. + * `o_r` is allocated within `egc`. + */ +{ + EGC_GC; + size_t len; + char *end = NULL; + libxl__json_object *o = NULL; + + if (!ev->rx_buf_used) + return ERROR_NOTFOUND; + + /* Search for the end of a QMP message: "\r\n" */ + end = memmem(ev->rx_buf, ev->rx_buf_used, "\r\n", 2); + if (!end) + return ERROR_NOTFOUND; + len = (end - ev->rx_buf) + 2; + + LOG_QMP("parsing %luB: '%.*s'", len, (int)len, ev->rx_buf); + + /* Replace \r by \0 so that libxl__json_parse can use strlen */ + ev->rx_buf[len - 2] = '\0'; + o = libxl__json_parse(gc, ev->rx_buf); + + if (!o) { + LOGD(ERROR, ev->domid, "Parse error"); + return ERROR_PROTOCOL_ERROR_QMP; + } + + ev->rx_buf_used -= len; + memmove(ev->rx_buf, ev->rx_buf + len, ev->rx_buf_used); + + LOG_QMP("JSON object received: %s", + libxl__json_object_to_json(gc, o)); + + *o_r = o; + + return 0; +} + +static int qmp_ev_parse_error_messages(libxl__egc *egc, + libxl__ev_qmp *ev, + const libxl__json_object *resp) +{ + EGC_GC; + int rc; + const char *s; + const libxl__json_object *o; + const libxl__json_object *err; + + /* + * { "error": { "class": string, "desc": string } } + */ + + err = libxl__json_map_get("error", resp, JSON_MAP); + + o = libxl__json_map_get("class", err, JSON_STRING); + if (!o) { + LOGD(ERROR, ev->domid, + "Protocol error: missing \"class\" member in error message"); + return ERROR_PROTOCOL_ERROR_QMP; + } + s = libxl__json_object_get_string(o); + if (s) + rc = qmp_error_class_to_libxl_error_code(gc, s); + else + rc = ERROR_PROTOCOL_ERROR_QMP; + + o = libxl__json_map_get("desc", err, JSON_STRING); + if (!o) { + LOGD(ERROR, ev->domid, + "Protocol error: missing \"desc\" member in error message"); + return ERROR_PROTOCOL_ERROR_QMP; + } + s = libxl__json_object_get_string(o); + if (s) + LOGD(ERROR, ev->domid, "%s", s); + else + LOGD(ERROR, ev->domid, "Received unexpected error: %s", + libxl__json_object_to_json(gc, resp)); + return rc; +} + +static int qmp_ev_handle_message(libxl__egc *egc, + libxl__ev_qmp *ev, + const libxl__json_object *resp) + /* + * This function will handle every messages sent by the QMP server. + * Return values: + * < 0 libxl error code + * 0 success + * 1 success, but a user callback has been called, + * `ev` should not be used anymore. + * + * Possible state changes: + * connecting -> capability_negotiation + * capability_negotiation -> connected + * waiting_reply -> waiting_reply/connected + * on error: * -> disconnected + */ +{ + EGC_GC; + int id; + int rc; + const libxl__json_object *o; + const libxl__json_object *response; + libxl__qmp_message_type type = qmp_response_type(resp); + + switch (type) { + case LIBXL__QMP_MESSAGE_TYPE_QMP: + /* greeting message */ + + if (ev->qmp_state != qmp_state_connecting) { + LOGD(ERROR, ev->domid, + "Unexpected greeting message received"); + return ERROR_PROTOCOL_ERROR_QMP; + } + + /* Prepare next message to send */ + assert(!ev->tx_buf); + ev->tx_buf = qmp_prepare_cmd(gc, "qmp_capabilities", NULL, + QMP_CAPABILITY_NEGOTIATION_MSGID, + &ev->tx_buf_len); + ev->tx_buf_off = 0; + qmp_ev_set_state(gc, ev, qmp_state_capability_negotiation); + + return 0; + + case LIBXL__QMP_MESSAGE_TYPE_RETURN: + case LIBXL__QMP_MESSAGE_TYPE_ERROR: + /* + * Reply to a command (success/error) or server error + * + * In this cases, we are parsing two possibles responses: + * - success: + * { "return": json-value, "id": int } + * - error: + * { "error": { "class": string, "desc": string }, "id": int } + */ + + o = libxl__json_map_get("id", resp, JSON_INTEGER); + if (!o) { + /* + * If "id" isn't present, an error occur on the server before + * it has read the "id" provided by libxl. + */ + qmp_ev_parse_error_messages(egc, ev, resp); + return ERROR_PROTOCOL_ERROR_QMP; + } + + id = libxl__json_object_get_integer(o); + + if (id == QMP_CAPABILITY_NEGOTIATION_MSGID) { + /* We have a response to our qmp_capabilities cmd */ + if (ev->qmp_state != qmp_state_capability_negotiation || + type != LIBXL__QMP_MESSAGE_TYPE_RETURN) + goto out_unknown_id; + qmp_ev_set_state(gc, ev, qmp_state_connected); + return 0; + } + + if (ev->qmp_state == qmp_state_waiting_reply && + id == ev->id) { + if (type == LIBXL__QMP_MESSAGE_TYPE_RETURN) { + response = libxl__json_map_get("return", resp, JSON_ANY); + rc = 0; + } else { + /* error message */ + response = NULL; + rc = qmp_ev_parse_error_messages(egc, ev, resp); + } + qmp_ev_set_state(gc, ev, qmp_state_connected); + ev->callback(egc, ev, response, rc); /* must be last */ + return 1; + } + +out_unknown_id: + LOGD(ERROR, ev->domid, + "Message from QEMU with unexpected id %d: %s", + id, libxl__json_object_to_json(gc, resp)); + return ERROR_PROTOCOL_ERROR_QMP; + + case LIBXL__QMP_MESSAGE_TYPE_EVENT: + /* Events are ignored */ + return 0; + + case LIBXL__QMP_MESSAGE_TYPE_INVALID: + LOGD(ERROR, ev->domid, "Unexpected message received: %s", + libxl__json_object_to_json(gc, resp)); + return ERROR_PROTOCOL_ERROR_QMP; + + default: + abort(); + } + + return 0; +} + +/* + * libxl__ev_qmp_* + */ + +void libxl__ev_qmp_init(libxl__ev_qmp *ev) + /* disconnected -> disconnected */ +{ + ev->id = QMP_CAPABILITY_NEGOTIATION_MSGID + 1; + + ev->qmp_cfd = NULL; + libxl__ev_fd_init(&ev->qmp_efd); + ev->qmp_state = qmp_state_disconnected; + + ev->rx_buf = NULL; + ev->rx_buf_size = ev->rx_buf_used = 0; + ev->tx_buf = NULL; + ev->tx_buf_len = ev->tx_buf_off = 0; + + ev->msg = NULL; + ev->msg_len = 0; +} + +int libxl__ev_qmp_send(libxl__gc *gc, libxl__ev_qmp *ev, + const char *cmd, libxl__json_object *args) + /* disconnected -> connecting + * connected -> waiting_reply */ +{ + int rc; + + LOGD(DEBUG, ev->domid, " ev %p, cmd '%s'", ev, cmd); + + assert(ev->qmp_state == qmp_state_disconnected || + ev->qmp_state == qmp_state_connected); + + /* Connect to QEMU if not already connected */ + rc = qmp_ev_connect(gc, ev); + if (rc) + goto out; + + rc = qmp_ev_prepare_cmd(gc, ev, cmd, args); + if (rc) + goto out; + + qmp_ev_ensure_reading_writing(gc, ev); + +out: + if (rc) + libxl__ev_qmp_dispose(gc, ev); + return rc; +} + +void libxl__ev_qmp_dispose(libxl__gc *gc, libxl__ev_qmp *ev) + /* * -> disconnected */ +{ + LOGD(DEBUG, ev->domid, " ev %p", ev); + + free(ev->rx_buf); + free(ev->tx_buf); + free(ev->msg); + + libxl__ev_fd_deregister(gc, &ev->qmp_efd); + libxl__carefd_close(ev->qmp_cfd); + + libxl__ev_qmp_init(ev); +} + /* * Local variables: * mode: C diff --git a/tools/libxl/libxl_types.idl b/tools/libxl/libxl_types.idl index 3b8f967651..fec42b260c 100644 --- a/tools/libxl/libxl_types.idl +++ b/tools/libxl/libxl_types.idl @@ -69,6 +69,12 @@ libxl_error = Enumeration("error", [ (-23, "NOTFOUND"), (-24, "DOMAIN_DESTROYED"), # Target domain ceased to exist during op (-25, "FEATURE_REMOVED"), # For functionality that has been removed + (-26, "PROTOCOL_ERROR_QMP"), + (-27, "UNKNOWN_QMP_ERROR"), + (-28, "QMP_GENERIC_ERROR"), # unspecified qmp error + (-29, "QMP_COMMAND_NOT_FOUND"), # the requested command has not been found + (-30, "QMP_DEVICE_NOT_ACTIVE"), # a device has failed to be become active + (-31, "QMP_DEVICE_NOT_FOUND"), # the requested device has not been found ], value_namespace = "") libxl_domain_type = Enumeration("domain_type", [ -- Anthony PERARD _______________________________________________ Xen-devel mailing list Xen-devel@xxxxxxxxxxxxxxxxxxxx https://lists.xenproject.org/mailman/listinfo/xen-devel
|
Lists.xenproject.org is hosted with RackSpace, monitoring our |