modname = parent.module.__name__
# Treat skips as non errors, don't pause after
- if call.excinfo.typename != "AssertionError":
+ if call.excinfo.typename == "Skipped":
pause = False
error = False
logger.info(
- 'assert skipped at "{}/{}": {}'.format(
+ 'test skipped at "{}/{}": {}'.format(
modname, item.name, call.excinfo.value
)
)
# Handle assert failures
parent._previousfailed = item # pylint: disable=W0212
logger.error(
- 'assert failed at "{}/{}": {}'.format(
+ 'test failed at "{}/{}": {}'.format(
modname, item.name, call.excinfo.value
)
)
# Import common_config to use commomnly used APIs
from lib.common_config import (
- create_common_configuration,
+ create_common_configurations,
InvalidCLIError,
load_config_to_router,
check_address_types,
topo = topo["routers"]
input_dict = deepcopy(input_dict)
+ config_data_dict = {}
+
for router in input_dict.keys():
if "bgp" not in input_dict[router]:
logger.debug("Router %s: 'bgp' not present in input_dict", router)
if type(bgp_data_list) is not list:
bgp_data_list = [bgp_data_list]
+ config_data = []
+
for bgp_data in bgp_data_list:
data_all_bgp = __create_bgp_global(tgen, bgp_data, router, build)
if data_all_bgp:
data_all_bgp = __create_l2vpn_evpn_address_family(
tgen, topo, bgp_data, router, config_data=data_all_bgp
)
+ if data_all_bgp:
+ config_data.extend(data_all_bgp)
- try:
- result = create_common_configuration(
- tgen, router, data_all_bgp, "bgp", build, load_config
- )
- except InvalidCLIError:
- # Traceback
- errormsg = traceback.format_exc()
- logger.error(errormsg)
- return errormsg
+ if config_data:
+ config_data_dict[router] = config_data
+
+ try:
+ result = create_common_configurations(
+ tgen, config_data_dict, "bgp", build, load_config
+ )
+ except InvalidCLIError:
+ logger.error("create_router_bgp", exc_info=True)
+ result = False
logger.debug("Exiting lib API: create_router_bgp()")
return result
Returns
-------
- True or False
+ list of config commands
"""
result = False
logger.debug(
"Router %s: 'local_as' not present in input_dict" "for BGP", router
)
- return False
+ return config_data
local_as = bgp_data.setdefault("local_as", "")
cmd = "router bgp {}".format(local_as)
create_router_bgp(tgen, topo, router_dict)
logger.info("Applying modified bgp configuration")
- create_router_bgp(tgen, new_topo)
-
+ result = create_router_bgp(tgen, new_topo)
+ if result is not True:
+ result = "Error applying new AS number config"
except Exception as e:
errormsg = traceback.format_exc()
logger.error(errormsg)
return errormsg
logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
- return True
+ return result
@retry(retry_timeout=8)
from tempfile import mkdtemp
import json
+import logging
import os
import sys
import traceback
True or errormsg
"""
- result = True
+ rlist = []
+
for router_name in input_dict.keys():
config_cmd = input_dict[router_name]["raw_config"]
for cmd in config_cmd:
cfg.write("{}\n".format(cmd))
- result = load_config_to_router(tgen, router_name)
+ rlist.append(router_name)
- return result
+ # Load config on all routers
+ return load_config_to_routers(tgen, rlist)
-def create_common_configuration(
- tgen, router, data, config_type=None, build=False, load_config=True
+def create_common_configurations(
+ tgen, config_dict, config_type=None, build=False, load_config=True
):
"""
API to create object of class FRRConfig and also create frr_json.conf
Parameters
----------
* `tgen`: tgen object
- * `data`: Configuration data saved in a list.
- * `router` : router id to be configured.
+ * `config_dict`: Configuration data saved in a dict of { router: config-list }
+ * `routers` : list of router id to be configured.
* `config_type` : Syntactic information while writing configuration. Should
be one of the value as mentioned in the config_map below.
* `build` : Only for initial setup phase this is set as True
"""
TMPDIR = os.path.join(LOGDIR, tgen.modname)
- fname = "{}/{}/{}".format(TMPDIR, router, FRRCFG_FILE)
-
config_map = OrderedDict(
{
"general_config": "! FRR General Config\n",
else:
mode = "w"
- try:
- frr_cfg_fd = open(fname, mode)
- if config_type:
- frr_cfg_fd.write(config_map[config_type])
- for line in data:
- frr_cfg_fd.write("{} \n".format(str(line)))
- frr_cfg_fd.write("\n")
-
- except IOError as err:
- logger.error(
- "Unable to open FRR Config File. error(%s): %s" % (err.errno, err.strerror)
- )
- return False
- finally:
- frr_cfg_fd.close()
+ routers = config_dict.keys()
+ for router in routers:
+ fname = "{}/{}/{}".format(TMPDIR, router, FRRCFG_FILE)
+ try:
+ frr_cfg_fd = open(fname, mode)
+ if config_type:
+ frr_cfg_fd.write(config_map[config_type])
+ for line in config_dict[router]:
+ frr_cfg_fd.write("{} \n".format(str(line)))
+ frr_cfg_fd.write("\n")
+
+ except IOError as err:
+ logger.error(
+ "Unable to open FRR Config '%s': %s" % (fname, str(err))
+ )
+ return False
+ finally:
+ frr_cfg_fd.close()
# If configuration applied from build, it will done at last
+ result = True
if not build and load_config:
- load_config_to_router(tgen, router)
+ result = load_config_to_routers(tgen, routers)
- return True
+ return result
+
+
+def create_common_configuration(
+ tgen, router, data, config_type=None, build=False, load_config=True
+):
+ """
+ API to create object of class FRRConfig and also create frr_json.conf
+ file. It will create interface and common configurations and save it to
+ frr_json.conf and load to router
+ Parameters
+ ----------
+ * `tgen`: tgen object
+ * `data`: Configuration data saved in a list.
+ * `router` : router id to be configured.
+ * `config_type` : Syntactic information while writing configuration. Should
+ be one of the value as mentioned in the config_map below.
+ * `build` : Only for initial setup phase this is set as True
+ Returns
+ -------
+ True or False
+ """
+ return create_common_configurations(
+ tgen, {router: data}, config_type, build, load_config
+ )
def kill_router_daemons(tgen, router, daemons, save_config=True):
'\nvtysh config apply => "{}"\nvtysh output <= "{}"'.format(vtysh_command, output)
)
else:
- router_list[rname].logger.error(
- '\nvtysh config apply => "{}"\nvtysh output <= "{}"'.format(vtysh_command, output)
+ router_list[rname].logger.warning(
+ '\nvtysh config apply failed => "{}"\nvtysh output <= "{}"'.format(vtysh_command, output)
)
logger.error("Delta file apply for %s failed %d: %s", rname, p.returncode, output)
for rname, p in procs.items():
output, _ = p.communicate()
if p.returncode:
- logger.warning(
- "Get running config for %s failed %d: %s", rname, p.returncode, output
- )
+ logger.warning("Get running config for %s failed %d: %s", rname, p.returncode, output)
else:
- logger.info("Configuration on router {} after reset:\n{}".format(rname, output))
+ logger.info("Configuration on router %s after reset:\n%s", rname, output)
logger.debug("Exiting API: reset_config_on_routers")
return True
-def load_config_to_router(tgen, routerName, save_bkup=False):
+def load_config_to_routers(tgen, routers, save_bkup=False):
"""
- Loads configuration on router from the file FRRCFG_FILE.
+ Loads configuration on routers from the file FRRCFG_FILE.
Parameters
----------
* `tgen` : Topogen object
- * `routerName` : router for which configuration to be loaded
+ * `routers` : routers for which configuration is to be loaded
* `save_bkup` : If True, Saves snapshot of FRRCFG_FILE to FRRCFG_BKUP_FILE
+ Returns
+ -------
+ True or False
"""
- logger.debug("Entering API: load_config_to_router")
+ logger.debug("Entering API: load_config_to_routers")
- router_list = tgen.routers()
- for rname in ROUTER_LIST:
- if routerName and rname != routerName:
+ base_router_list = tgen.routers()
+ router_list = {}
+ for router in routers:
+ if (router not in ROUTER_LIST) or (router not in base_router_list):
continue
+ router_list[router] = base_router_list[router]
+
+ frr_cfg_file_fmt = TMPDIR + "/{}/" + FRRCFG_FILE
+ frr_cfg_bkup_fmt = TMPDIR + "/{}/" + FRRCFG_BKUP_FILE
+ procs = {}
+ for rname in router_list:
router = router_list[rname]
try:
- frr_cfg_file = "{}/{}/{}".format(TMPDIR, rname, FRRCFG_FILE)
- frr_cfg_bkup = "{}/{}/{}".format(TMPDIR, rname, FRRCFG_BKUP_FILE)
+ frr_cfg_file = frr_cfg_file_fmt.format(rname)
+ frr_cfg_bkup = frr_cfg_bkup_fmt.format(rname)
with open(frr_cfg_file, "r+") as cfg:
data = cfg.read()
logger.info(
if save_bkup:
with open(frr_cfg_bkup, "w") as bkup:
bkup.write(data)
+ procs[rname] = router_list[rname].popen(
+ ["/usr/bin/env", "vtysh", "-f", frr_cfg_file],
+ stdin=None,
+ stdout=subprocess.PIPE,
+ stderr=subprocess.STDOUT,
+ )
+ except IOError as err:
+ logging.error(
+ "Unable to open config File. error(%s): %s",
+ err.errno, err.strerror
+ )
+ return False
+ except Exception as error:
+ logging.error("Unable to apply config on %s: %s", rname, str(error))
+ return False
- output = router.vtysh_multicmd(data, pretty_output=False)
- for out_err in ERROR_LIST:
- if out_err.lower() in output.lower():
- raise InvalidCLIError("%s" % output)
+ errors = []
+ for rname, p in procs.items():
+ output, _ = p.communicate()
+ frr_cfg_file = frr_cfg_file_fmt.format(rname)
+ vtysh_command = "vtysh -f " + frr_cfg_file
+ if not p.returncode:
+ router_list[rname].logger.info(
+ '\nvtysh config apply => "{}"\nvtysh output <= "{}"'.format(vtysh_command, output)
+ )
+ else:
+ router_list[rname].logger.error(
+ '\nvtysh config apply failed => "{}"\nvtysh output <= "{}"'.format(vtysh_command, output)
+ )
+ logger.error("Config apply for %s failed %d: %s", rname, p.returncode, output)
+ # We can't thorw an exception here as we won't clear the config file.
+ errors.append(InvalidCLIError("load_config_to_routers error for {}: {}".format(rname, output)))
- cfg.truncate(0)
+ # Empty the config file or we append to it next time through.
+ with open(frr_cfg_file, "r+") as cfg:
+ cfg.truncate(0)
- except IOError as err:
- errormsg = (
- "Unable to open config File. error(%s):" " %s",
- (err.errno, err.strerror),
+ # Router current configuration to log file or console if
+ # "show_router_config" is defined in "pytest.ini"
+ if show_router_config:
+ procs = {}
+ for rname in router_list:
+ procs[rname] = router_list[rname].popen(
+ ["/usr/bin/env", "vtysh", "-c", "show running-config no-header"],
+ stdin=None,
+ stdout=subprocess.PIPE,
+ stderr=subprocess.STDOUT,
)
- return errormsg
+ for rname, p in procs.items():
+ output, _ = p.communicate()
+ if p.returncode:
+ logger.warning("Get running config for %s failed %d: %s", rname, p.returncode, output)
+ else:
+ logger.info("New configuration for router %s:\n%s", rname,output)
- # Router current configuration to log file or console if
- # "show_router_config" is defined in "pytest.ini"
- if show_router_config:
- logger.info("New configuration for router {}:".format(rname))
- new_config = router.run("vtysh -c 'show running'")
- logger.info(new_config)
+ logger.debug("Exiting API: load_config_to_routers")
+ return not errors
- logger.debug("Exiting API: load_config_to_router")
- return True
+def load_config_to_router(tgen, routerName, save_bkup=False):
+ """
+ Loads configuration on router from the file FRRCFG_FILE.
+
+ Parameters
+ ----------
+ * `tgen` : Topogen object
+ * `routerName` : router for which configuration to be loaded
+ * `save_bkup` : If True, Saves snapshot of FRRCFG_FILE to FRRCFG_BKUP_FILE
+ """
+ return load_config_to_routers(tgen, [routerName], save_bkup)
def get_frr_ipv6_linklocal(tgen, router, intf=None, vrf=None):
result = False
try:
+ debug_config_dict = {}
+
for router in input_dict.keys():
debug_config = []
if "debug" in input_dict[router]:
for daemon, debug_logs in disable_logs.items():
for debug_log in debug_logs:
debug_config.append("no {}".format(debug_log))
+ if debug_config:
+ debug_config_dict[router] = debug_config
- result = create_common_configuration(
- tgen, router, debug_config, "debug_log_config", build=build
- )
+ result = create_common_configurations(
+ tgen, debug_config_dict, "debug_log_config", build=build
+ )
except InvalidCLIError:
# Traceback
errormsg = traceback.format_exc()
input_dict = deepcopy(input_dict)
try:
+ config_data_dict = {}
+
for c_router, c_data in input_dict.items():
rnode = tgen.routers()[c_router]
+ config_data = []
+
if "vrfs" in c_data:
for vrf in c_data["vrfs"]:
- config_data = []
del_action = vrf.setdefault("delete", False)
name = vrf.setdefault("name", None)
table_id = vrf.setdefault("id", None)
cmd = "no vni {}".format(del_vni)
config_data.append(cmd)
- result = create_common_configuration(
- tgen, c_router, config_data, "vrf", build=build
- )
+ if config_data:
+ config_data_dict[c_router] = config_data
+
+ result = create_common_configurations(
+ tgen, config_data_dict, "vrf", build=build
+ )
except InvalidCLIError:
# Traceback
logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
try:
- global frr_cfg
+ rlist = []
+
for router in input_dict.keys():
interface_list = input_dict[router]["interface_list"]
rnode = tgen.routers()[router]
interface_set_status(rnode, intf, status)
- # Load config to router
- load_config_to_router(tgen, router)
+ rlist.append(router)
+
+ # Load config to routers
+ load_config_to_routers(tgen, rlist)
except Exception as e:
errormsg = traceback.format_exc()
topo = deepcopy(topo)
try:
+ interface_data_dict = {}
+
for c_router, c_data in topo.items():
interface_data = []
for destRouterLink, data in sorted(c_data["links"].items()):
interface_data += _create_interfaces_ospf_cfg(
"ospf6", c_data, data, ospf_keywords + ["area"]
)
+ if interface_data:
+ interface_data_dict[c_router] = interface_data
- result = create_common_configuration(
- tgen, c_router, interface_data, "interface_config", build=build
- )
+ result = create_common_configurations(
+ tgen, interface_data_dict, "interface_config", build=build
+ )
except InvalidCLIError:
# Traceback
input_dict = deepcopy(input_dict)
try:
+ static_routes_list_dict = {}
+
for router in input_dict.keys():
if "static_routes" not in input_dict[router]:
errormsg = "static_routes not present in input_dict"
static_routes_list.append(cmd)
- result = create_common_configuration(
- tgen, router, static_routes_list, "static_route", build=build
- )
+ if static_routes_list:
+ static_routes_list_dict[router] = static_routes_list
+
+ result = create_common_configurations(
+ tgen, static_routes_list_dict, "static_route", build=build
+ )
except InvalidCLIError:
# Traceback
logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
result = False
try:
+ config_data_dict = {}
+
for router in input_dict.keys():
if "prefix_lists" not in input_dict[router]:
errormsg = "prefix_lists not present in input_dict"
cmd = "no {}".format(cmd)
config_data.append(cmd)
- result = create_common_configuration(
- tgen, router, config_data, "prefix_list", build=build
- )
+ if config_data:
+ config_data_dict[router] = config_data
+
+ result = create_common_configurations(
+ tgen, config_data_dict, "prefix_list", build=build
+ )
except InvalidCLIError:
# Traceback
logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
input_dict = deepcopy(input_dict)
try:
+ rmap_data_dict = {}
+
for router in input_dict.keys():
if "route_maps" not in input_dict[router]:
logger.debug("route_maps not present in input_dict")
cmd = "match metric {}".format(metric)
rmap_data.append(cmd)
- result = create_common_configuration(
- tgen, router, rmap_data, "route_maps", build=build
- )
+ if rmap_data:
+ rmap_data_dict[router] = rmap_data
+
+ result = create_common_configurations(
+ tgen, rmap_data_dict, "route_maps", build=build
+ )
except InvalidCLIError:
# Traceback
logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
input_dict = deepcopy(input_dict)
try:
+ config_data_dict = {}
+
for router in input_dict.keys():
if "bgp_community_lists" not in input_dict[router]:
errormsg = "bgp_community_lists not present in input_dict"
config_data.append(cmd)
- result = create_common_configuration(
- tgen, router, config_data, "bgp_community_list", build=build
- )
+ if config_data:
+ config_data_dict[router] = config_data
+
+ result = create_common_configurations(
+ tgen, config_data_dict, "bgp_community_list", build=build
+ )
except InvalidCLIError:
# Traceback
# OF THIS SOFTWARE.
#
-import traceback
import ipaddr
import ipaddress
import sys
# Import common_config to use commomnly used APIs
from lib.common_config import (
- create_common_configuration,
+ create_common_configurations,
InvalidCLIError,
retry,
generate_ips,
topo = topo["routers"]
input_dict = deepcopy(input_dict)
- for router in input_dict.keys():
- if "ospf" not in input_dict[router]:
- logger.debug("Router %s: 'ospf' not present in input_dict", router)
- continue
+ for ospf in ["ospf", "ospf6"]:
+ config_data_dict = {}
- result = __create_ospf_global(tgen, input_dict, router, build, load_config)
- if result is True:
- ospf_data = input_dict[router]["ospf"]
-
- for router in input_dict.keys():
- if "ospf6" not in input_dict[router]:
- logger.debug("Router %s: 'ospf6' not present in input_dict", router)
- continue
+ for router in input_dict.keys():
+ if ospf not in input_dict[router]:
+ logger.debug("Router %s: %s not present in input_dict", router, ospf)
+ continue
- result = __create_ospf_global(
- tgen, input_dict, router, build, load_config, ospf="ospf6"
- )
- if result is True:
- ospf_data = input_dict[router]["ospf6"]
+ config_data = __create_ospf_global(
+ tgen, input_dict, router, build, load_config, ospf
+ )
+ if config_data:
+ if router not in config_data_dict:
+ config_data_dict[router] = config_data
+ else:
+ config_data_dict[router].extend(config_data)
+ try:
+ result = create_common_configurations(
+ tgen, config_data_dict, ospf, build, load_config
+ )
+ except InvalidCLIError:
+ logger.error("create_router_ospf (ipv4)", exc_info=True)
+ result = False
logger.debug("Exiting lib API: create_router_ospf()")
return result
def __create_ospf_global(
- tgen, input_dict, router, build=False, load_config=True, ospf="ospf"
+ tgen, input_dict, router, build, load_config, ospf
):
"""
Helper API to create ospf global configuration.
"links": {
"r3": {
"ipv6": "2013:13::1/64",
- "ospf6": {
+ "ospf6": {
"hello_interval": 1,
"dead_interval": 4,
"network": "point-to-point"
}
- }
+ }
},
"ospf6": {
"router_id": "1.1.1.1",
Returns
-------
- True or False
+ list of configuration commands
"""
- result = False
- logger.debug("Entering lib API: __create_ospf_global()")
- try:
+ config_data = []
- ospf_data = input_dict[router][ospf]
- del_ospf_action = ospf_data.setdefault("delete", False)
- if del_ospf_action:
- config_data = ["no router {}".format(ospf)]
- result = create_common_configuration(
- tgen, router, config_data, ospf, build, load_config
- )
- return result
+ if ospf not in input_dict[router]:
+ return config_data
- config_data = []
- cmd = "router {}".format(ospf)
+ logger.debug("Entering lib API: __create_ospf_global()")
+ ospf_data = input_dict[router][ospf]
+ del_ospf_action = ospf_data.setdefault("delete", False)
+ if del_ospf_action:
+ config_data = ["no router {}".format(ospf)]
+ return config_data
+
+ cmd = "router {}".format(ospf)
+
+ config_data.append(cmd)
+
+ # router id
+ router_id = ospf_data.setdefault("router_id", None)
+ del_router_id = ospf_data.setdefault("del_router_id", False)
+ if del_router_id:
+ config_data.append("no {} router-id".format(ospf))
+ if router_id:
+ config_data.append("{} router-id {}".format(ospf, router_id))
+
+ # log-adjacency-changes
+ log_adj_changes = ospf_data.setdefault("log_adj_changes", None)
+ del_log_adj_changes = ospf_data.setdefault("del_log_adj_changes", False)
+ if del_log_adj_changes:
+ config_data.append("no log-adjacency-changes detail")
+ if log_adj_changes:
+ config_data.append("log-adjacency-changes {}".format(
+ log_adj_changes))
+
+ # aggregation timer
+ aggr_timer = ospf_data.setdefault("aggr_timer", None)
+ del_aggr_timer = ospf_data.setdefault("del_aggr_timer", False)
+ if del_aggr_timer:
+ config_data.append("no aggregation timer")
+ if aggr_timer:
+ config_data.append("aggregation timer {}".format(
+ aggr_timer))
+
+ # maximum path information
+ ecmp_data = ospf_data.setdefault("maximum-paths", {})
+ if ecmp_data:
+ cmd = "maximum-paths {}".format(ecmp_data)
+ del_action = ospf_data.setdefault("del_max_path", False)
+ if del_action:
+ cmd = "no maximum-paths"
config_data.append(cmd)
- # router id
- router_id = ospf_data.setdefault("router_id", None)
- del_router_id = ospf_data.setdefault("del_router_id", False)
- if del_router_id:
- config_data.append("no {} router-id".format(ospf))
- if router_id:
- config_data.append("{} router-id {}".format(ospf, router_id))
-
- # log-adjacency-changes
- log_adj_changes = ospf_data.setdefault("log_adj_changes", None)
- del_log_adj_changes = ospf_data.setdefault("del_log_adj_changes", False)
- if del_log_adj_changes:
- config_data.append("no log-adjacency-changes detail")
- if log_adj_changes:
- config_data.append("log-adjacency-changes {}".format(log_adj_changes))
-
- # aggregation timer
- aggr_timer = ospf_data.setdefault("aggr_timer", None)
- del_aggr_timer = ospf_data.setdefault("del_aggr_timer", False)
- if del_aggr_timer:
- config_data.append("no aggregation timer")
- if aggr_timer:
- config_data.append("aggregation timer {}".format(aggr_timer))
-
- # maximum path information
- ecmp_data = ospf_data.setdefault("maximum-paths", {})
- if ecmp_data:
- cmd = "maximum-paths {}".format(ecmp_data)
- del_action = ospf_data.setdefault("del_max_path", False)
- if del_action:
- cmd = "no maximum-paths"
- config_data.append(cmd)
+ # redistribute command
+ redistribute_data = ospf_data.setdefault("redistribute", {})
+ if redistribute_data:
+ for redistribute in redistribute_data:
+ if "redist_type" not in redistribute:
+ logger.debug(
+ "Router %s: 'redist_type' not present in " "input_dict", router
+ )
+ else:
+ cmd = "redistribute {}".format(redistribute["redist_type"])
+ for red_type in redistribute_data:
+ if "route_map" in red_type:
+ cmd = cmd + " route-map {}".format(red_type["route_map"])
+ del_action = redistribute.setdefault("delete", False)
+ if del_action:
+ cmd = "no {}".format(cmd)
+ config_data.append(cmd)
- # redistribute command
- redistribute_data = ospf_data.setdefault("redistribute", {})
- if redistribute_data:
- for redistribute in redistribute_data:
- if "redist_type" not in redistribute:
- logger.debug(
- "Router %s: 'redist_type' not present in " "input_dict", router
- )
- else:
- cmd = "redistribute {}".format(redistribute["redist_type"])
- for red_type in redistribute_data:
- if "route_map" in red_type:
- cmd = cmd + " route-map {}".format(red_type["route_map"])
- del_action = redistribute.setdefault("delete", False)
- if del_action:
- cmd = "no {}".format(cmd)
- config_data.append(cmd)
+ # area information
+ area_data = ospf_data.setdefault("area", {})
+ if area_data:
+ for area in area_data:
+ if "id" not in area:
+ logger.debug(
+ "Router %s: 'area id' not present in " "input_dict", router
+ )
+ else:
+ cmd = "area {}".format(area["id"])
- # area information
- area_data = ospf_data.setdefault("area", {})
- if area_data:
- for area in area_data:
- if "id" not in area:
- logger.debug(
- "Router %s: 'area id' not present in " "input_dict", router
- )
- else:
- cmd = "area {}".format(area["id"])
+ if "type" in area:
+ cmd = cmd + " {}".format(area["type"])
- if "type" in area:
- cmd = cmd + " {}".format(area["type"])
+ del_action = area.setdefault("delete", False)
+ if del_action:
+ cmd = "no {}".format(cmd)
+ config_data.append(cmd)
- del_action = area.setdefault("delete", False)
- if del_action:
- cmd = "no {}".format(cmd)
- config_data.append(cmd)
+ #def route information
+ def_rte_data = ospf_data.setdefault("default-information", {})
+ if def_rte_data:
+ if "originate" not in def_rte_data:
+ logger.debug("Router %s: 'originate key' not present in "
+ "input_dict", router)
+ else:
+ cmd = "default-information originate"
- # def route information
- def_rte_data = ospf_data.setdefault("default-information", {})
- if def_rte_data:
- if "originate" not in def_rte_data:
- logger.debug(
- "Router %s: 'originate key' not present in " "input_dict", router
- )
- else:
- cmd = "default-information originate"
+ if "always" in def_rte_data:
+ cmd = cmd + " always"
- if "always" in def_rte_data:
- cmd = cmd + " always"
+ if "metric" in def_rte_data:
+ cmd = cmd + " metric {}".format(def_rte_data["metric"])
- if "metric" in def_rte_data:
- cmd = cmd + " metric {}".format(def_rte_data["metric"])
+ if "metric-type" in def_rte_data:
+ cmd = cmd + " metric-type {}".format(def_rte_data[
+ "metric-type"])
- if "metric-type" in def_rte_data:
- cmd = cmd + " metric-type {}".format(def_rte_data["metric-type"])
+ if "route-map" in def_rte_data:
+ cmd = cmd + " route-map {}".format(def_rte_data["route-map"])
- if "route-map" in def_rte_data:
- cmd = cmd + " route-map {}".format(def_rte_data["route-map"])
+ del_action = def_rte_data.setdefault("delete", False)
+ if del_action:
+ cmd = "no {}".format(cmd)
+ config_data.append(cmd)
- del_action = def_rte_data.setdefault("delete", False)
- if del_action:
- cmd = "no {}".format(cmd)
- config_data.append(cmd)
+ # area interface information for ospf6d only
+ if ospf == "ospf6":
+ area_iface = ospf_data.setdefault("neighbors", {})
+ if area_iface:
+ for neighbor in area_iface:
+ if "area" in area_iface[neighbor]:
+ iface = input_dict[router]["links"][neighbor]["interface"]
+ cmd = "interface {} area {}".format(
+ iface, area_iface[neighbor]["area"]
+ )
+ if area_iface[neighbor].setdefault("delete", False):
+ cmd = "no {}".format(cmd)
+ config_data.append(cmd)
- # area interface information for ospf6d only
- if ospf == "ospf6":
- area_iface = ospf_data.setdefault("neighbors", {})
- if area_iface:
- for neighbor in area_iface:
- if "area" in area_iface[neighbor]:
+ try:
+ if "area" in input_dict[router]['links'][neighbor][
+ 'ospf6']:
iface = input_dict[router]["links"][neighbor]["interface"]
cmd = "interface {} area {}".format(
- iface, area_iface[neighbor]["area"]
- )
- if area_iface[neighbor].setdefault("delete", False):
+ iface, input_dict[router]['links'][neighbor][
+ 'ospf6']['area'])
+ if input_dict[router]['links'][neighbor].setdefault(
+ "delete", False):
cmd = "no {}".format(cmd)
config_data.append(cmd)
-
- try:
- if "area" in input_dict[router]["links"][neighbor]["ospf6"]:
- iface = input_dict[router]["links"][neighbor]["interface"]
- cmd = "interface {} area {}".format(
- iface,
- input_dict[router]["links"][neighbor]["ospf6"]["area"],
- )
- if input_dict[router]["links"][neighbor].setdefault(
- "delete", False
- ):
- cmd = "no {}".format(cmd)
- config_data.append(cmd)
- except KeyError:
+ except KeyError:
pass
- # summary information
- summary_data = ospf_data.setdefault("summary-address", {})
- if summary_data:
- for summary in summary_data:
- if "prefix" not in summary:
- logger.debug(
- "Router %s: 'summary-address' not present in " "input_dict",
- router,
- )
- else:
- cmd = "summary {}/{}".format(summary["prefix"], summary["mask"])
-
- _tag = summary.setdefault("tag", None)
- if _tag:
- cmd = "{} tag {}".format(cmd, _tag)
- _advertise = summary.setdefault("advertise", True)
- if not _advertise:
- cmd = "{} no-advertise".format(cmd)
+ # summary information
+ summary_data = ospf_data.setdefault("summary-address", {})
+ if summary_data:
+ for summary in summary_data:
+ if "prefix" not in summary:
+ logger.debug(
+ "Router %s: 'summary-address' not present in " "input_dict",
+ router,
+ )
+ else:
+ cmd = "summary {}/{}".format(summary["prefix"], summary["mask"])
- del_action = summary.setdefault("delete", False)
- if del_action:
- cmd = "no {}".format(cmd)
- config_data.append(cmd)
+ _tag = summary.setdefault("tag", None)
+ if _tag:
+ cmd = "{} tag {}".format(cmd, _tag)
- # ospf gr information
- gr_data = ospf_data.setdefault("graceful-restart", {})
- if gr_data:
+ _advertise = summary.setdefault("advertise", True)
+ if not _advertise:
+ cmd = "{} no-advertise".format(cmd)
- if "opaque" in gr_data and gr_data["opaque"]:
- cmd = "capability opaque"
- if gr_data.setdefault("delete", False):
+ del_action = summary.setdefault("delete", False)
+ if del_action:
cmd = "no {}".format(cmd)
config_data.append(cmd)
- if "helper-only" in gr_data and not gr_data["helper-only"]:
- cmd = "graceful-restart helper-only"
+ # ospf gr information
+ gr_data = ospf_data.setdefault("graceful-restart", {})
+ if gr_data:
+
+ if "opaque" in gr_data and gr_data["opaque"]:
+ cmd = "capability opaque"
+ if gr_data.setdefault("delete", False):
+ cmd = "no {}".format(cmd)
+ config_data.append(cmd)
+
+ if "helper-only" in gr_data and not gr_data["helper-only"]:
+ cmd = "graceful-restart helper-only"
+ if gr_data.setdefault("delete", False):
+ cmd = "no {}".format(cmd)
+ config_data.append(cmd)
+ elif "helper-only" in gr_data and type(gr_data["helper-only"]) is list:
+ for rtrs in gr_data["helper-only"]:
+ cmd = "graceful-restart helper-only {}".format(rtrs)
if gr_data.setdefault("delete", False):
cmd = "no {}".format(cmd)
config_data.append(cmd)
- elif "helper-only" in gr_data and type(gr_data["helper-only"]) is list:
- for rtrs in gr_data["helper-only"]:
- cmd = "graceful-restart helper-only {}".format(rtrs)
- if gr_data.setdefault("delete", False):
- cmd = "no {}".format(cmd)
- config_data.append(cmd)
-
- if "helper" in gr_data:
- if type(gr_data["helper"]) is not list:
- gr_data["helper"] = list(gr_data["helper"])
- for helper_role in gr_data["helper"]:
- cmd = "graceful-restart helper {}".format(helper_role)
- if gr_data.setdefault("delete", False):
- cmd = "no {}".format(cmd)
- config_data.append(cmd)
- if "supported-grace-time" in gr_data:
- cmd = "graceful-restart helper supported-grace-time {}".format(
- gr_data["supported-grace-time"]
- )
+ if "helper" in gr_data:
+ if type(gr_data["helper"]) is not list:
+ gr_data["helper"] = list(gr_data["helper"])
+ for helper_role in gr_data["helper"]:
+ cmd = "graceful-restart helper {}".format(helper_role)
if gr_data.setdefault("delete", False):
cmd = "no {}".format(cmd)
config_data.append(cmd)
- result = create_common_configuration(
- tgen, router, config_data, "ospf", build, load_config
- )
-
- except InvalidCLIError:
- # Traceback
- errormsg = traceback.format_exc()
- logger.error(errormsg)
- return errormsg
+ if "supported-grace-time" in gr_data:
+ cmd = "graceful-restart helper supported-grace-time {}".format(
+ gr_data["supported-grace-time"]
+ )
+ if gr_data.setdefault("delete", False):
+ cmd = "no {}".format(cmd)
+ config_data.append(cmd)
logger.debug("Exiting lib API: create_ospf_global()")
- return result
+
+ return config_data
def create_router_ospf6(tgen, topo, input_dict=None, build=False, load_config=True):
else:
topo = topo["routers"]
input_dict = deepcopy(input_dict)
+
+ config_data_dict = {}
+
for router in input_dict.keys():
if "ospf6" not in input_dict[router]:
logger.debug("Router %s: 'ospf6' not present in input_dict", router)
continue
- result = __create_ospf_global(
+ config_data = __create_ospf_global(
tgen, input_dict, router, build, load_config, "ospf6"
)
+ if config_data:
+ config_data_dict[router] = config_data
+
+ try:
+ result = create_common_configurations(
+ tgen, config_data_dict, "ospf6", build, load_config
+ )
+ except InvalidCLIError:
+ logger.error("create_router_ospf6", exc_info=True)
+ result = False
logger.debug("Exiting lib API: create_router_ospf6()")
return result
input_dict = deepcopy(topo)
else:
input_dict = deepcopy(input_dict)
+
+ config_data_dict = {}
+
for router in input_dict.keys():
config_data = []
for lnk in input_dict[router]["links"].keys():
if build:
return config_data
- else:
- result = create_common_configuration(
- tgen, router, config_data, "interface_config", build=build
- )
+
+ if config_data:
+ config_data_dict[router] = config_data
+
+ result = create_common_configurations(
+ tgen, config_data_dict, "interface_config", build=build
+ )
+
logger.debug("Exiting lib API: config_ospf_interface()")
return result
input_dict = deepcopy(topo)
else:
input_dict = deepcopy(input_dict)
+
+ config_data_dict = {}
+
for router in input_dict.keys():
config_data = []
for lnk in input_dict[router]['links'].keys():
if build:
return config_data
- else:
- result = create_common_configuration(
- tgen, router, config_data, "interface_config", build=build
- )
+
+ if config_data:
+ config_data_dict[router] = config_data
+
+ result = create_common_configurations(
+ tgen, config_data_dict, "interface_config", build=build
+ )
+
logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
return result
# Import common_config to use commomnly used APIs
from lib.common_config import (
create_common_configuration,
+ create_common_configurations,
InvalidCLIError,
retry,
run_frr_cmd,
else:
topo = topo["routers"]
input_dict = deepcopy(input_dict)
+
+ config_data_dict = {}
+
for router in input_dict.keys():
- result = _enable_disable_pim(tgen, topo, input_dict, router, build)
+ config_data = _enable_disable_pim_config(tgen, topo, input_dict, router, build)
+ if config_data:
+ config_data_dict[router] = config_data
+
+ # Now add RP config to all routers
+ for router in input_dict.keys():
if "pim" not in input_dict[router]:
- logger.debug("Router %s: 'pim' is not present in " "input_dict", router)
continue
+ if "rp" not in input_dict[router]["pim"]:
+ continue
+ _add_pim_rp_config(
+ tgen, topo, input_dict, router, build, config_data_dict
+ )
- if result is True:
- if "rp" not in input_dict[router]["pim"]:
- continue
-
- result = _create_pim_rp_config(
- tgen, topo, input_dict, router, build, load_config
- )
- if result is not True:
- return False
+ try:
+ result = create_common_configurations(
+ tgen, config_data_dict, "pim", build, load_config
+ )
+ except InvalidCLIError:
+ logger.error("create_pim_config", exc_info=True)
+ result = False
logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
return result
-def _create_pim_rp_config(tgen, topo, input_dict, router, build=False, load_config=False):
+def _add_pim_rp_config(tgen, topo, input_dict, router, build, config_data_dict):
"""
Helper API to create pim RP configurations.
* `input_dict` : Input dict data, required when configuring from testcase
* `router` : router id to be configured.
* `build` : Only for initial setup phase this is set as True.
-
+ * `config_data_dict` : OUT: adds `router` config to dictinary
Returns
-------
- True or False
+ None
"""
- result = False
logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
pim_data = input_dict[router]["pim"]
# Configure this RP on every router.
for dut in tgen.routers():
-
# At least one interface must be enabled for PIM on the router
pim_if_enabled = False
for destLink, data in topo[dut]["links"].items():
cmd = "no {}".format(cmd)
config_data.append(cmd)
- try:
- result = create_common_configuration(
- tgen, dut, config_data, "pim", build, load_config
- )
- if result is not True:
- logger.error("Error applying PIM config", exc_info=True)
- logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
- return False
-
- except InvalidCLIError as error:
- logger.error("Error applying PIM config: %s", error, exc_info=error)
- logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
- return False
-
- logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
- return result
+ if config_data:
+ if dut not in config_data_dict:
+ config_data_dict[dut] = config_data
+ else:
+ config_data_dict[dut].extend(config_data)
def create_igmp_config(tgen, topo, input_dict=None, build=False):
else:
topo = topo["routers"]
input_dict = deepcopy(input_dict)
+
+ config_data_dict = {}
+
for router in input_dict.keys():
if "igmp" not in input_dict[router]:
logger.debug("Router %s: 'igmp' is not present in " "input_dict", router)
cmd = "no {}".format(cmd)
config_data.append(cmd)
- try:
+ if config_data:
+ config_data_dict[router] = config_data
- result = create_common_configuration(
- tgen, router, config_data, "interface_config", build=build
- )
- except InvalidCLIError:
- errormsg = traceback.format_exc()
- logger.error(errormsg)
- return errormsg
+ try:
+ result = create_common_configurations(
+ tgen, config_data_dict, "interface_config", build=build
+ )
+ except InvalidCLIError:
+ logger.error("create_igmp_config", exc_info=True)
+ result = False
logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
return result
-def _enable_disable_pim(tgen, topo, input_dict, router, build=False):
+def _enable_disable_pim_config(tgen, topo, input_dict, router, build=False):
"""
Helper API to enable or disable pim on interfaces
Returns
-------
- True or False
+ list of config
"""
- result = False
- logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
- try:
- config_data = []
-
- # Enable pim on interfaces
- for destRouterLink, data in sorted(topo[router]["links"].items()):
- if "pim" in data and data["pim"] == "enable":
- # Loopback interfaces
- if "type" in data and data["type"] == "loopback":
- interface_name = destRouterLink
- else:
- interface_name = data["interface"]
+ config_data = []
- cmd = "interface {}".format(interface_name)
+ # Enable pim on interfaces
+ for destRouterLink, data in sorted(topo[router]["links"].items()):
+ if "pim" in data and data["pim"] == "enable":
+ # Loopback interfaces
+ if "type" in data and data["type"] == "loopback":
+ interface_name = destRouterLink
+ else:
+ interface_name = data["interface"]
+
+ cmd = "interface {}".format(interface_name)
+ config_data.append(cmd)
+ config_data.append("ip pim")
+
+ # pim global config
+ if "pim" in input_dict[router]:
+ pim_data = input_dict[router]["pim"]
+ del_action = pim_data.setdefault("delete", False)
+ for t in [
+ "join-prune-interval",
+ "keep-alive-timer",
+ "register-suppress-time",
+ ]:
+ if t in pim_data:
+ cmd = "ip pim {} {}".format(t, pim_data[t])
+ if del_action:
+ cmd = "no {}".format(cmd)
config_data.append(cmd)
- config_data.append("ip pim")
-
- result = create_common_configuration(
- tgen, router, config_data, "interface_config", build=build
- )
- if result is not True:
- return False
-
- config_data = []
- if "pim" in input_dict[router]:
- pim_data = input_dict[router]["pim"]
- for t in [
- "join-prune-interval",
- "keep-alive-timer",
- "register-suppress-time",
- ]:
- if t in pim_data:
- cmd = "ip pim {} {}".format(t, pim_data[t])
- config_data.append(cmd)
-
- if config_data:
- result = create_common_configuration(
- tgen, router, config_data, "pim", build=build
- )
- except InvalidCLIError:
- # Traceback
- errormsg = traceback.format_exc()
- logger.error(errormsg)
- return errormsg
- logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
- return result
+ return config_data
def find_rp_details(tgen, topo):
result = False
logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
+
try:
+ config_data_dict = {}
for dut in input_dict.keys():
if "pim" not in input_dict[dut]:
pim_data = input_dict[dut]["pim"]
+ config_data = []
if "force_expire" in pim_data:
- config_data = []
force_expire_data = pim_data["force_expire"]
for source, groups in force_expire_data.items():
)
config_data.append(cmd)
- result = create_common_configuration(
- tgen, dut, config_data, "pim", build=build
- )
- if result is not True:
- return False
+ if config_data:
+ config_data_dict[dut] = config_data
+ result = create_common_configurations(
+ tgen, config_data_dict, "pim", build=build
+ )
except InvalidCLIError:
- # Traceback
- errormsg = traceback.format_exc()
- logger.error(errormsg)
- return errormsg
+ logger.error("configure_pim_force_expire", exc_info=True)
+ result = False
logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
return result
return True
-@retry(retry_timeout=80)
+@retry(retry_timeout=120)
def verify_ip_mroutes(
tgen, dut, src_address, group_addresses, iif, oil, return_uptime=False, mwait=0, expected=True
):
config_data.append("ip address {}".format(_rp))
config_data.append("ip pim")
+ # Why not config just once, why per group?
result = create_common_configuration(
tgen, rp, config_data, "interface_config"
)
from lib.common_config import (
number_to_row,
number_to_column,
- load_config_to_router,
+ load_config_to_routers,
create_interfaces_cfg,
create_static_routes,
create_prefix_lists,
func_dict.get(func_type)(tgen, data, build=True)
- for router in sorted(topo["routers"].keys()):
- logger.debug("Configuring router {}...".format(router))
-
- result = load_config_to_router(tgen, router, save_bkup)
- if not result:
- logger.info("Failed while configuring {}".format(router))
- pytest.exit(1)
+ routers = sorted(topo["routers"].keys())
+ result = load_config_to_routers(tgen, routers, save_bkup)
+ if not result:
+ logger.info("build_config_from_json: failed to configure topology")
+ pytest.exit(1)