}
static struct mgmt_be_txn_ctx *
-mgmt_be_find_txn_by_id(struct mgmt_be_client *client_ctx, uint64_t txn_id)
+mgmt_be_find_txn_by_id(struct mgmt_be_client *client_ctx, uint64_t txn_id,
+ bool warn)
{
struct mgmt_be_txn_ctx *txn = NULL;
- FOREACH_BE_TXN_IN_LIST (client_ctx, txn) {
+ FOREACH_BE_TXN_IN_LIST (client_ctx, txn)
if (txn->txn_id == txn_id)
return txn;
- }
+ if (warn)
+ MGMTD_BE_CLIENT_ERR("Unknown txn-id: %" PRIu64, txn_id);
return NULL;
}
{
struct mgmt_be_txn_ctx *txn = NULL;
- txn = mgmt_be_find_txn_by_id(client_ctx, txn_id);
- if (!txn) {
- txn = XCALLOC(MTYPE_MGMTD_BE_TXN,
- sizeof(struct mgmt_be_txn_ctx));
- assert(txn);
+ txn = mgmt_be_find_txn_by_id(client_ctx, txn_id, false);
+ if (txn) {
+ MGMTD_BE_CLIENT_ERR("Can't create existing txn-id: %" PRIu64,
+ txn_id);
+ return NULL;
+ }
- txn->txn_id = txn_id;
- txn->client = client_ctx;
- mgmt_be_batches_init(&txn->cfg_batches);
- mgmt_be_batches_init(&txn->apply_cfgs);
- mgmt_be_txns_add_tail(&client_ctx->txn_head, txn);
+ txn = XCALLOC(MTYPE_MGMTD_BE_TXN, sizeof(struct mgmt_be_txn_ctx));
+ txn->txn_id = txn_id;
+ txn->client = client_ctx;
+ mgmt_be_batches_init(&txn->cfg_batches);
+ mgmt_be_batches_init(&txn->apply_cfgs);
+ mgmt_be_txns_add_tail(&client_ctx->txn_head, txn);
- MGMTD_BE_CLIENT_DBG("Added new txn-id: %" PRIu64, txn_id);
- }
+ MGMTD_BE_CLIENT_DBG("Created new txn-id: %" PRIu64, txn_id);
return txn;
}
}
static int mgmt_be_send_txn_reply(struct mgmt_be_client *client_ctx,
- uint64_t txn_id, bool create, bool success)
+ uint64_t txn_id, bool create)
{
Mgmtd__BeMessage be_msg;
Mgmtd__BeTxnReply txn_reply;
mgmtd__be_txn_reply__init(&txn_reply);
txn_reply.create = create;
txn_reply.txn_id = txn_id;
- txn_reply.success = success;
+ txn_reply.success = true;
mgmtd__be_message__init(&be_msg);
be_msg.message_case = MGMTD__BE_MESSAGE__MESSAGE_TXN_REPLY;
{
struct mgmt_be_txn_ctx *txn;
- txn = mgmt_be_find_txn_by_id(client_ctx, txn_id);
if (create) {
- if (txn) {
- /*
- * Transaction with same txn-id already exists.
- * Should not happen under any circumstances.
- */
- MGMTD_BE_CLIENT_ERR(
- "txn-id: %" PRIu64 " already exists", txn_id);
- mgmt_be_send_txn_reply(client_ctx, txn_id, create,
- false);
- }
+ MGMTD_BE_CLIENT_DBG("Creating new txn-id %" PRIu64, txn_id);
- MGMTD_BE_CLIENT_DBG("Created new txn-id %" PRIu64, txn_id);
txn = mgmt_be_txn_create(client_ctx, txn_id);
+ if (!txn)
+ goto failed;
if (client_ctx->cbs.txn_notify)
- (void)(*client_ctx->cbs.txn_notify)(
- client_ctx, client_ctx->user_data,
- &txn->client_data, false);
+ (*client_ctx->cbs.txn_notify)(client_ctx,
+ client_ctx->user_data,
+ &txn->client_data, false);
} else {
- if (!txn) {
- /*
- * Transaction with same txn-id does not exists.
- * Return sucess anyways.
- */
- MGMTD_BE_CLIENT_DBG("txn-id: %" PRIu64
- " for delete does NOT exists",
- txn_id);
- } else {
- MGMTD_BE_CLIENT_DBG("Delete txn-id: %" PRIu64, txn_id);
+ MGMTD_BE_CLIENT_DBG("Deleting txn-id: %" PRIu64, txn_id);
+ txn = mgmt_be_find_txn_by_id(client_ctx, txn_id, false);
+ if (txn)
mgmt_be_txn_delete(client_ctx, &txn);
- }
}
- mgmt_be_send_txn_reply(client_ctx, txn_id, create, true);
+ return mgmt_be_send_txn_reply(client_ctx, txn_id, create);
- return 0;
+failed:
+ msg_conn_disconnect(&client_ctx->client.conn, true);
+ return -1;
}
static int mgmt_be_send_cfgdata_create_reply(struct mgmt_be_client *client_ctx,
{
struct mgmt_be_txn_ctx *txn;
- txn = mgmt_be_find_txn_by_id(client_ctx, txn_id);
- if (!txn) {
- MGMTD_BE_CLIENT_ERR("Invalid txn-id: %" PRIu64
- " from MGMTD server",
- txn_id);
- mgmt_be_send_cfgdata_create_reply(
- client_ctx, txn_id, batch_id, false,
- "Transaction context not created yet");
- } else {
- mgmt_be_update_setcfg_in_batch(client_ctx, txn, batch_id,
- cfg_req, num_req);
- }
+ txn = mgmt_be_find_txn_by_id(client_ctx, txn_id, true);
+ if (!txn)
+ goto failed;
+
+ mgmt_be_update_setcfg_in_batch(client_ctx, txn, batch_id, cfg_req,
+ num_req);
if (txn && end_of_data) {
- MGMTD_BE_CLIENT_DBG("Triggering CFG_PREPARE_REQ processing");
- mgmt_be_txn_cfg_prepare(txn);
+ MGMTD_BE_CLIENT_DBG("End of data; CFG_PREPARE_REQ processing");
+ if (mgmt_be_txn_cfg_prepare(txn))
+ goto failed;
}
return 0;
+failed:
+ msg_conn_disconnect(&client_ctx->client.conn, true);
+ return -1;
}
static int mgmt_be_send_apply_reply(struct mgmt_be_client *client_ctx,
{
struct mgmt_be_txn_ctx *txn;
- txn = mgmt_be_find_txn_by_id(client_ctx, txn_id);
- if (!txn) {
- mgmt_be_send_apply_reply(client_ctx, txn_id, NULL, 0, false,
- "Transaction not created yet!");
- return -1;
- }
+ txn = mgmt_be_find_txn_by_id(client_ctx, txn_id, true);
+ if (!txn)
+ goto failed;
MGMTD_BE_CLIENT_DBG("Trigger CFG_APPLY_REQ processing");
- mgmt_be_txn_proc_cfgapply(txn);
+ if (mgmt_be_txn_proc_cfgapply(txn))
+ goto failed;
return 0;
+failed:
+ msg_conn_disconnect(&client_ctx->client.conn, true);
+ return -1;
}
+
static int mgmt_be_client_handle_msg(struct mgmt_be_client *client_ctx,
Mgmtd__BeMessage *be_msg)
{
/*
+ * On error we may have closed the connection so don't do anything with
+ * the client_ctx on return.
+ *
* protobuf-c adds a max size enum with an internal, and changing by
* version, name; cast to an int to avoid unhandled enum warnings
*/
if (client->cbs.client_connect_notify)
(void)(*client->cbs.client_connect_notify)(
client, client->user_data, connected);
+
+ /* Cleanup any in-progress TXN on disconnect */
+ if (!connected)
+ mgmt_be_cleanup_all_txns(client);
+
return 0;
}