InvalidCLIError,
retry,
run_frr_cmd,
+ validate_ip_address,
)
from lib.micronet import get_exec_path
from lib.topolog import logger
def create_pim_config(tgen, topo, input_dict=None, build=False, load_config=True):
"""
- API to configure pim on router
+ API to configure pim/pimv6 on router
Parameters
----------
"prefix-list": "pf_list_1"
"delete": True
}]
+ },
+ "pim6": {
+ "disable" : ["l1-i1-eth1"],
+ "rp": [{
+ "rp_addr" : "2001:db8:f::5:17".
+ "keep-alive-timer": "100"
+ "group_addr_range": ["FF00::/8"]
+ "prefix-list": "pf_list_1"
+ "delete": True
+ }]
}
}
}
# Now add RP config to all routers
for router in input_dict.keys():
- if "pim" not in input_dict[router]:
- continue
- if "rp" not in input_dict[router]["pim"]:
- continue
- _add_pim_rp_config(tgen, topo, input_dict, router, build, config_data_dict)
-
+ if "pim" in input_dict[router] or "pim6" in input_dict[router]:
+ _add_pim_rp_config(tgen, topo, input_dict, router, build, config_data_dict)
try:
result = create_common_configurations(
tgen, config_data_dict, "pim", build, load_config
"""
logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
+ rp_data = []
+
+ # PIMv4
+ pim_data = None
+ if "pim" in input_dict[router]:
+ pim_data = input_dict[router]["pim"]
+ if "rp" in input_dict[router]["pim"]:
+ rp_data += pim_data["rp"]
- pim_data = input_dict[router]["pim"]
- rp_data = pim_data["rp"]
+ # PIMv6
+ pim6_data = None
+ if "pim6" in input_dict[router]:
+ pim6_data = input_dict[router]["pim6"]
+ if "rp" in input_dict[router]["pim6"]:
+ rp_data += pim6_data["rp"]
# Configure this RP on every router.
for dut in tgen.routers():
# At least one interface must be enabled for PIM on the router
pim_if_enabled = False
+ pim6_if_enabled = False
for destLink, data in topo[dut]["links"].items():
if "pim" in data:
pim_if_enabled = True
- if not pim_if_enabled:
+ if "pim6" in data:
+ pim6_if_enabled = True
+ if not pim_if_enabled and pim_data:
+ continue
+ if not pim6_if_enabled and pim6_data:
continue
config_data = []
- for rp_dict in deepcopy(rp_data):
- # ip address of RP
- if "rp_addr" not in rp_dict and build:
- logger.error(
- "Router %s: 'ip address of RP' not " "present in input_dict/JSON",
- router,
- )
-
- return False
- rp_addr = rp_dict.setdefault("rp_addr", None)
-
- # Keep alive Timer
- keep_alive_timer = rp_dict.setdefault("keep_alive_timer", None)
-
- # Group Address range to cover
- if "group_addr_range" not in rp_dict and build:
- logger.error(
- "Router %s:'Group Address range to cover'"
- " not present in input_dict/JSON",
- router,
- )
-
- return False
- group_addr_range = rp_dict.setdefault("group_addr_range", None)
+ if rp_data:
+ for rp_dict in deepcopy(rp_data):
+ # ip address of RP
+ if "rp_addr" not in rp_dict and build:
+ logger.error(
+ "Router %s: 'ip address of RP' not "
+ "present in input_dict/JSON",
+ router,
+ )
- # Group prefix-list filter
- prefix_list = rp_dict.setdefault("prefix_list", None)
+ return False
+ rp_addr = rp_dict.setdefault("rp_addr", None)
+ if rp_addr:
+ addr_type = validate_ip_address(rp_addr)
+ # Keep alive Timer
+ keep_alive_timer = rp_dict.setdefault("keep_alive_timer", None)
+
+ # Group Address range to cover
+ if "group_addr_range" not in rp_dict and build:
+ logger.error(
+ "Router %s:'Group Address range to cover'"
+ " not present in input_dict/JSON",
+ router,
+ )
- # Delete rp config
- del_action = rp_dict.setdefault("delete", False)
+ return False
+ group_addr_range = rp_dict.setdefault("group_addr_range", None)
- if keep_alive_timer:
- cmd = "ip pim rp keep-alive-timer {}".format(keep_alive_timer)
- if del_action:
- cmd = "no {}".format(cmd)
- config_data.append(cmd)
+ # Group prefix-list filter
+ prefix_list = rp_dict.setdefault("prefix_list", None)
- if rp_addr:
- if group_addr_range:
- if type(group_addr_range) is not list:
- group_addr_range = [group_addr_range]
+ # Delete rp config
+ del_action = rp_dict.setdefault("delete", False)
- for grp_addr in group_addr_range:
- cmd = "ip pim rp {} {}".format(rp_addr, grp_addr)
+ if keep_alive_timer:
+ if addr_type == "ipv4":
+ cmd = "ip pim rp keep-alive-timer {}".format(keep_alive_timer)
+ if del_action:
+ cmd = "no {}".format(cmd)
+ config_data.append(cmd)
+ if addr_type == "ipv6":
+ cmd = "ipv6 pim rp keep-alive-timer {}".format(keep_alive_timer)
if del_action:
cmd = "no {}".format(cmd)
config_data.append(cmd)
- if prefix_list:
- cmd = "ip pim rp {} prefix-list {}".format(rp_addr, prefix_list)
- if del_action:
- cmd = "no {}".format(cmd)
- config_data.append(cmd)
+ if rp_addr:
+ if group_addr_range:
+ if type(group_addr_range) is not list:
+ group_addr_range = [group_addr_range]
- if config_data:
- if dut not in config_data_dict:
- config_data_dict[dut] = config_data
- else:
- config_data_dict[dut].extend(config_data)
+ for grp_addr in group_addr_range:
+ if addr_type == "ipv4":
+ cmd = "ip pim rp {} {}".format(rp_addr, grp_addr)
+ if del_action:
+ cmd = "no {}".format(cmd)
+ config_data.append(cmd)
+ if addr_type == "ipv6":
+ cmd = "ipv6 pim rp {} {}".format(rp_addr, grp_addr)
+ if del_action:
+ cmd = "no {}".format(cmd)
+ config_data.append(cmd)
+
+ if prefix_list:
+ if addr_type == "ipv4":
+ cmd = "ip pim rp {} prefix-list {}".format(
+ rp_addr, prefix_list
+ )
+ if del_action:
+ cmd = "no {}".format(cmd)
+ config_data.append(cmd)
+ if addr_type == "ipv6":
+ cmd = "ipv6 pim rp {} prefix-list {}".format(
+ rp_addr, prefix_list
+ )
+ if del_action:
+ cmd = "no {}".format(cmd)
+ config_data.append(cmd)
+
+ if config_data:
+ if dut not in config_data_dict:
+ config_data_dict[dut] = config_data
+ else:
+ config_data_dict[dut].extend(config_data)
def create_igmp_config(tgen, topo, input_dict=None, build=False):
return result
+def create_mld_config(tgen, topo, input_dict=None, build=False):
+ """
+ API to configure mld for PIMv6 on router
+
+ Parameters
+ ----------
+ * `tgen` : Topogen object
+ * `topo` : json file data
+ * `input_dict` : Input dict data, required when configuring from
+ testcase
+ * `build` : Only for initial setup phase this is set as True.
+
+ Usage
+ -----
+ input_dict = {
+ "r1": {
+ "mld": {
+ "interfaces": {
+ "r1-r0-eth0" :{
+ "mld":{
+ "version": "2",
+ "delete": True
+ "query": {
+ "query-interval" : 100,
+ "query-max-response-time": 200
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+
+ Returns
+ -------
+ True or False
+ """
+ logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
+ result = False
+ if not input_dict:
+ input_dict = deepcopy(topo)
+ else:
+ topo = topo["routers"]
+ input_dict = deepcopy(input_dict)
+ for router in input_dict.keys():
+ if "mld" not in input_dict[router]:
+ logger.debug("Router %s: 'mld' is not present in " "input_dict", router)
+ continue
+
+ mld_data = input_dict[router]["mld"]
+
+ if "interfaces" in mld_data:
+ config_data = []
+ intf_data = mld_data["interfaces"]
+
+ for intf_name in intf_data.keys():
+ cmd = "interface {}".format(intf_name)
+ config_data.append(cmd)
+ protocol = "mld"
+ del_action = intf_data[intf_name]["mld"].setdefault("delete", False)
+ cmd = "ipv6 mld"
+ if del_action:
+ cmd = "no {}".format(cmd)
+ config_data.append(cmd)
+
+ del_attr = intf_data[intf_name]["mld"].setdefault("delete_attr", False)
+ join = intf_data[intf_name]["mld"].setdefault("join", None)
+ source = intf_data[intf_name]["mld"].setdefault("source", None)
+ version = intf_data[intf_name]["mld"].setdefault("version", False)
+ query = intf_data[intf_name]["mld"].setdefault("query", {})
+
+ if version:
+ cmd = "ipv6 {} version {}".format(protocol, version)
+ if del_action:
+ cmd = "no {}".format(cmd)
+ config_data.append(cmd)
+
+ if source and join:
+ for group in join:
+ cmd = "ipv6 {} join {} {}".format(protocol, group, source)
+
+ if del_attr:
+ cmd = "no {}".format(cmd)
+ config_data.append(cmd)
+
+ elif join:
+ for group in join:
+ cmd = "ipv6 {} join {}".format(protocol, group)
+
+ if del_attr:
+ cmd = "no {}".format(cmd)
+ config_data.append(cmd)
+
+ if query:
+ for _query, value in query.items():
+ if _query != "delete":
+ cmd = "ipv6 {} {} {}".format(protocol, _query, value)
+
+ if "delete" in intf_data[intf_name][protocol]["query"]:
+ cmd = "no {}".format(cmd)
+
+ config_data.append(cmd)
+ try:
+ result = create_common_configuration(
+ tgen, router, config_data, "interface_config", build=build
+ )
+ except InvalidCLIError:
+ errormsg = traceback.format_exc()
+ logger.error(errormsg)
+ return errormsg
+
+ logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
+ return result
+
+
def _enable_disable_pim_config(tgen, topo, input_dict, router, build=False):
"""
Helper API to enable or disable pim on interfaces
config_data = []
- # Enable pim on interfaces
+ # Enable pim/pim6 on interfaces
for destRouterLink, data in sorted(topo[router]["links"].items()):
if "pim" in data and data["pim"] == "enable":
# Loopback interfaces
config_data.append(cmd)
config_data.append("ip pim")
+ if "pim6" in data and data["pim6"] == "enable":
+ # Loopback interfaces
+ if "type" in data and data["type"] == "loopback":
+ interface_name = destRouterLink
+ else:
+ interface_name = data["interface"]
+
+ cmd = "interface {}".format(interface_name)
+ config_data.append(cmd)
+ config_data.append("ipv6 pim")
+
# pim global config
if "pim" in input_dict[router]:
pim_data = input_dict[router]["pim"]
cmd = "no {}".format(cmd)
config_data.append(cmd)
+ # pim6 global config
+ if "pim6" in input_dict[router]:
+ pim6_data = input_dict[router]["pim6"]
+ del_action = pim6_data.setdefault("delete", False)
+ for t in [
+ "join-prune-interval",
+ "keep-alive-timer",
+ "register-suppress-time",
+ ]:
+ if t in pim6_data:
+ cmd = "ipv6 pim {} {}".format(t, pim6_data[t])
+ if del_action:
+ cmd = "no {}".format(cmd)
+ config_data.append(cmd)
+
return config_data
"[DUT: %s]: Verifying upstream Inbound Interface" " for IGMP groups received:",
dut,
)
- show_ip_pim_upstream_json = run_frr_cmd(
- rnode, "show ip pim upstream json", isjson=True
- )
if type(group_addresses) is not list:
group_addresses = [group_addresses]
if type(iif) is not list:
iif = [iif]
+ for grp in group_addresses:
+ addr_type = validate_ip_address(grp)
+
+ if addr_type == "ipv4":
+ ip_cmd = "ip"
+ elif addr_type == "ipv6":
+ ip_cmd = "ipv6"
+
+ cmd = "show {} pim upstream json".format(ip_cmd)
+ show_ip_pim_upstream_json = run_frr_cmd(rnode, cmd, isjson=True)
+
for grp_addr in group_addresses:
# Verify group address
if grp_addr not in show_ip_pim_upstream_json:
"[DUT: %s]: Verifying Join state and Join Timer" " for IGMP groups received:",
dut,
)
- show_ip_pim_upstream_json = run_frr_cmd(
- rnode, "show ip pim upstream json", isjson=True
- )
if type(group_addresses) is not list:
group_addresses = [group_addresses]
+ for grp in group_addresses:
+ addr_type = validate_ip_address(grp)
+
+ if addr_type == "ipv4":
+ cmd = "show ip pim upstream json"
+ elif addr_type == "ipv6":
+ cmd = "show ipv6 pim upstream json"
+ show_ip_pim_upstream_json = run_frr_cmd(rnode, cmd, isjson=True)
+
for grp_addr in group_addresses:
# Verify group address
if grp_addr not in show_ip_pim_upstream_json:
rnode = tgen.routers()[dut]
+ if not isinstance(group_addresses, list):
+ group_addresses = [group_addresses]
+
+ if not isinstance(iif, list) and iif != "none":
+ iif = [iif]
+
+ if not isinstance(oil, list) and oil != "none":
+ oil = [oil]
+
+ for grp in group_addresses:
+ addr_type = validate_ip_address(grp)
+
+ if addr_type == "ipv4":
+ ip_cmd = "ip"
+ elif addr_type == "ipv6":
+ ip_cmd = "ipv6"
+
if return_uptime:
logger.info("Sleeping for %s sec..", mwait)
sleep(mwait)
logger.info("[DUT: %s]: Verifying ip mroutes", dut)
- show_ip_mroute_json = run_frr_cmd(rnode, "show ip mroute json", isjson=True)
+ show_ip_mroute_json = run_frr_cmd(
+ rnode, "show {} mroute json".format(ip_cmd), isjson=True
+ )
if return_uptime:
uptime_dict = {}
error_msg = "[DUT %s]: mroutes are not present or flushed out !!" % (dut)
return error_msg
- if not isinstance(group_addresses, list):
- group_addresses = [group_addresses]
-
- if not isinstance(iif, list) and iif != "none":
- iif = [iif]
-
- if not isinstance(oil, list) and oil != "none":
- oil = [oil]
-
for grp_addr in group_addresses:
if grp_addr not in show_ip_mroute_json:
errormsg = "[DUT %s]: Verifying (%s, %s) mroute," "[FAILED]!! " % (
rnode = tgen.routers()[dut]
- logger.info("[DUT: %s]: Verifying ip rp info", dut)
- show_ip_rp_info_json = run_frr_cmd(rnode, "show ip pim rp-info json", isjson=True)
-
if type(group_addresses) is not list:
group_addresses = [group_addresses]
if type(oif) is not list:
oif = [oif]
+ for grp in group_addresses:
+ addr_type = validate_ip_address(grp)
+
+ if addr_type == "ipv4":
+ ip_cmd = "ip"
+ elif addr_type == "ipv6":
+ ip_cmd = "ipv6"
+
for grp_addr in group_addresses:
if rp is None:
rp_details = find_rp_details(tgen, topo)
else:
iamRP = False
else:
- show_ip_route_json = run_frr_cmd(
- rnode, "show ip route connected json", isjson=True
- )
+ if addr_type == "ipv4":
+ show_ip_route_json = run_frr_cmd(
+ rnode, "show ip route connected json", isjson=True
+ )
+ elif addr_type == "ipv6":
+ show_ip_route_json = run_frr_cmd(
+ rnode, "show ipv6 route connected json", isjson=True
+ )
for _rp in show_ip_route_json.keys():
if rp == _rp.split("/")[0]:
iamRP = True
else:
iamRP = False
+ logger.info("[DUT: %s]: Verifying ip rp info", dut)
+ cmd = "show {} pim rp-info json".format(ip_cmd)
+ show_ip_rp_info_json = run_frr_cmd(rnode, cmd, isjson=True)
+
if rp not in show_ip_rp_info_json:
- errormsg = "[DUT %s]: Verifying rp-info" "for rp_address %s [FAILED]!! " % (
- dut,
- rp,
+ errormsg = (
+ "[DUT %s]: Verifying rp-info "
+ "for rp_address %s [FAILED]!! " % (dut, rp)
)
return errormsg
else:
group_addr_json = show_ip_rp_info_json[rp]
for rp_json in group_addr_json:
+ if "rpAddress" not in rp_json:
+ errormsg = "[DUT %s]: %s key not " "present in rp-info " % (
+ dut,
+ "rpAddress",
+ )
+ return errormsg
+
if oif is not None:
found = False
if rp_json["outboundInterface"] not in oif:
rnode = tgen.routers()[dut]
logger.info("[DUT: %s]: Verifying pim state", dut)
- show_pim_state_json = run_frr_cmd(rnode, "show ip pim state json", isjson=True)
-
- if installed_fl is None:
- installed_fl = 1
if type(group_addresses) is not list:
group_addresses = [group_addresses]
+ for grp in group_addresses:
+ addr_type = validate_ip_address(grp)
+
+ if addr_type == "ipv4":
+ ip_cmd = "ip"
+ elif addr_type == "ipv6":
+ ip_cmd = "ipv6"
+
+ logger.info("[DUT: %s]: Verifying pim state", dut)
+ show_pim_state_json = run_frr_cmd(
+ rnode, "show {} pim state json".format(ip_cmd), isjson=True
+ )
+
+ if installed_fl is None:
+ installed_fl = 1
+
for grp_addr in group_addresses:
if src_address is None:
src_address = "*"
return True
-def verify_pim_interface_traffic(tgen, input_dict, return_stats=True):
+def verify_pim_interface_traffic(tgen, input_dict, return_stats=True, addr_type="ipv4"):
"""
Verify ip pim interface traffice by running
"show ip pim interface traffic" cli
* `tgen`: topogen object
* `input_dict(dict)`: defines DUT, what and from which interfaces
traffic needs to be verified
+ * [optional]`addr_type`: specify address-family, default is ipv4
+
Usage
-----
input_dict = {
rnode = tgen.routers()[dut]
logger.info("[DUT: %s]: Verifying pim interface traffic", dut)
- show_pim_intf_traffic_json = run_frr_cmd(
- rnode, "show ip pim interface traffic json", isjson=True
- )
+
+ if addr_type == "ipv4":
+ cmd = "show ip pim interface traffic json"
+ elif addr_type == "ipv6":
+ cmd = "show ipv6 pim interface traffic json"
+
+ show_pim_intf_traffic_json = run_frr_cmd(rnode, cmd, isjson=True)
output_dict[dut] = {}
for intf, data in input_dict[dut].items():