"address_family": {
"ipv4": {
"unicast": {
- "redistribute": [
- {"redist_type": "static"},
+ "redistribute": [{
+ "redist_type": "static",
+ "attribute": {
+ "metric" : 123
+ }
+ },
{"redist_type": "connected"}
],
"advertise_networks": [
logger.debug("Router %s: 'bgp' not present in input_dict", router)
continue
- data_all_bgp = __create_bgp_global(tgen, input_dict, router, build)
- if data_all_bgp:
- bgp_data = input_dict[router]["bgp"]
+ bgp_data_list = input_dict[router]["bgp"]
- bgp_addr_data = bgp_data.setdefault("address_family", {})
+ if type(bgp_data_list) is not list:
+ bgp_data_list = [bgp_data_list]
- if not bgp_addr_data:
- logger.debug(
- "Router %s: 'address_family' not present in " "input_dict for BGP",
- router,
- )
- else:
+ for bgp_data in bgp_data_list:
+ data_all_bgp = __create_bgp_global(tgen, bgp_data, router, build)
+ if data_all_bgp:
+ bgp_addr_data = bgp_data.setdefault("address_family", {})
- ipv4_data = bgp_addr_data.setdefault("ipv4", {})
- ipv6_data = bgp_addr_data.setdefault("ipv6", {})
+ if not bgp_addr_data:
+ logger.debug(
+ "Router %s: 'address_family' not present in "
+ "input_dict for BGP",
+ router,
+ )
+ else:
- neigh_unicast = (
- True
- if ipv4_data.setdefault("unicast", {})
- or ipv6_data.setdefault("unicast", {})
- else False
- )
+ ipv4_data = bgp_addr_data.setdefault("ipv4", {})
+ ipv6_data = bgp_addr_data.setdefault("ipv6", {})
- if neigh_unicast:
- data_all_bgp = __create_bgp_unicast_neighbor(
- tgen,
- topo,
- input_dict,
- router,
- afi_test,
- config_data=data_all_bgp,
+ neigh_unicast = (
+ True
+ if ipv4_data.setdefault("unicast", {})
+ or ipv6_data.setdefault("unicast", {})
+ else False
)
- try:
- result = create_common_configuration(
- tgen, router, data_all_bgp, "bgp", build, load_config
- )
- except InvalidCLIError:
- # Traceback
- errormsg = traceback.format_exc()
- logger.error(errormsg)
- return errormsg
+ if neigh_unicast:
+ data_all_bgp = __create_bgp_unicast_neighbor(
+ tgen,
+ topo,
+ bgp_data,
+ router,
+ afi_test,
+ config_data=data_all_bgp,
+ )
- logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
+ try:
+ result = create_common_configuration(
+ tgen, router, data_all_bgp, "bgp", build, load_config
+ )
+ except InvalidCLIError:
+ # Traceback
+ errormsg = traceback.format_exc()
+ logger.error(errormsg)
+ return errormsg
+
+ logger.debug("Exiting lib API: create_router_bgp()")
return result
True or False
"""
+ result = False
logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
- bgp_data = input_dict[router]["bgp"]
+ bgp_data = input_dict
del_bgp_action = bgp_data.setdefault("delete", False)
- if del_bgp_action:
- config_data = ["no router bgp"]
-
- return config_data
config_data = []
if "local_as" not in bgp_data and build:
- logger.error(
+ logger.debug(
"Router %s: 'local_as' not present in input_dict" "for BGP", router
)
return False
if vrf_id:
cmd = "{} vrf {}".format(cmd, vrf_id)
+ if del_bgp_action:
+ cmd = "no {}".format(cmd)
+ config_data.append(cmd)
+
+ return config_data
+
config_data.append(cmd)
config_data.append("no bgp ebgp-requires-policy")
* `build` : Only for initial setup phase this is set as True.
"""
+ result = False
logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
add_neigh = True
+ bgp_data = input_dict
if "router bgp" in config_data:
add_neigh = False
- bgp_data = input_dict[router]["bgp"]["address_family"]
+
+ bgp_data = input_dict["address_family"]
for addr_type, addr_dict in bgp_data.iteritems():
if not addr_dict:
if redistribute_data:
for redistribute in redistribute_data:
if "redist_type" not in redistribute:
- logger.error(
+ logger.debug(
"Router %s: 'redist_type' not present in " "input_dict", router
)
else:
cmd = "redistribute {}".format(redistribute["redist_type"])
redist_attr = redistribute.setdefault("attribute", None)
if redist_attr:
- cmd = "{} {}".format(cmd, redist_attr)
+ if isinstance(redist_attr, dict):
+ for key, value in redist_attr.items():
+ cmd = "{} {} {}".format(cmd, key, value)
+ else:
+ cmd = "{} {}".format(cmd, redist_attr)
+
del_action = redistribute.setdefault("delete", False)
if del_action:
cmd = "no {}".format(cmd)
config_data = []
logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
- bgp_data = input_dict[router]["bgp"]["address_family"]
+ bgp_data = input_dict["address_family"]
neigh_data = bgp_data[addr_type]["unicast"]["neighbor"]
for name, peer_dict in neigh_data.iteritems():
for dest_link, peer in peer_dict["dest_link"].iteritems():
nh_details = topo[name]
- remote_as = nh_details["bgp"]["local_as"]
+
+ if "vrfs" in topo[router]:
+ remote_as = nh_details["bgp"][0]["local_as"]
+ else:
+ remote_as = nh_details["bgp"]["local_as"]
+
update_source = None
if dest_link in nh_details["links"].keys():
config_data = []
logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
- bgp_data = input_dict[router]["bgp"]["address_family"]
+ bgp_data = input_dict["address_family"]
neigh_data = bgp_data[addr_type]["unicast"]["neighbor"]
for peer_name, peer_dict in deepcopy(neigh_data).iteritems():
allowas_in = peer.setdefault("allowas-in", None)
# next-hop-self
- if next_hop_self:
- config_data.append("{} next-hop-self".format(neigh_cxt))
+ if next_hop_self is not None:
+ if next_hop_self is True:
+ config_data.append("{} next-hop-self".format(neigh_cxt))
+ else:
+ config_data.append("no {} next-hop-self".format(neigh_cxt))
+
# send_community
if send_community:
config_data.append("{} send-community".format(neigh_cxt))
@retry(attempts=20, wait=2, return_is_str=True)
-def verify_bgp_convergence(tgen, topo):
+def verify_bgp_convergence(tgen, topo, dut=None):
"""
API will verify if BGP is converged with in the given time frame.
Running "show bgp summary json" command and verify bgp neighbor
state is established,
+
Parameters
----------
* `tgen`: topogen object
* `topo`: input json file data
- * `addr_type`: ip_type, ipv4/ipv6
+ * `dut`: device under test
+
Usage
-----
# To veriry is BGP is converged for all the routers used in
topology
- results = verify_bgp_convergence(tgen, topo, "ipv4")
+ results = verify_bgp_convergence(tgen, topo, dut="r1")
+
Returns
-------
errormsg(str) or True
"""
- logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
+ logger.debug("Entering lib API: verify_bgp_convergence()")
for router, rnode in tgen.routers().iteritems():
if "bgp" not in topo["routers"][router]:
continue
- logger.info("Verifying BGP Convergence on router %s", router)
- show_bgp_json = run_frr_cmd(rnode, "show bgp summary json", isjson=True)
+ if dut is not None and dut != router:
+ continue
+
+ logger.info("Verifying BGP Convergence on router %s:", router)
+ show_bgp_json = run_frr_cmd(rnode, "show bgp vrf all summary json", isjson=True)
# Verifying output dictionary show_bgp_json is empty or not
if not bool(show_bgp_json):
errormsg = "BGP is not running"
return errormsg
# To find neighbor ip type
- bgp_addr_type = topo["routers"][router]["bgp"]["address_family"]
- for addr_type in bgp_addr_type.keys():
- if not check_address_types(addr_type):
- continue
- total_peer = 0
+ bgp_data_list = topo["routers"][router]["bgp"]
- bgp_neighbors = bgp_addr_type[addr_type]["unicast"]["neighbor"]
+ if type(bgp_data_list) is not list:
+ bgp_data_list = [bgp_data_list]
- for bgp_neighbor in bgp_neighbors:
- total_peer += len(bgp_neighbors[bgp_neighbor]["dest_link"])
+ for bgp_data in bgp_data_list:
+ if "vrf" in bgp_data:
+ vrf = bgp_data["vrf"]
+ if vrf is None:
+ vrf = "default"
+ else:
+ vrf = "default"
- for addr_type in bgp_addr_type.keys():
- if not check_address_types(addr_type):
- continue
- bgp_neighbors = bgp_addr_type[addr_type]["unicast"]["neighbor"]
+ # To find neighbor ip type
+ bgp_addr_type = bgp_data["address_family"]
+ if "l2vpn" in bgp_addr_type:
+ total_evpn_peer = 0
- no_of_peer = 0
- for bgp_neighbor, peer_data in bgp_neighbors.iteritems():
- for dest_link in peer_data["dest_link"].keys():
- data = topo["routers"][bgp_neighbor]["links"]
- if dest_link in data:
- neighbor_ip = data[dest_link][addr_type].split("/")[0]
- if addr_type == "ipv4":
- ipv4_data = show_bgp_json["ipv4Unicast"]["peers"]
- nh_state = ipv4_data[neighbor_ip]["state"]
- else:
- ipv6_data = show_bgp_json["ipv6Unicast"]["peers"]
- nh_state = ipv6_data[neighbor_ip]["state"]
+ if "neighbor" not in bgp_addr_type["l2vpn"]["evpn"]:
+ continue
- if nh_state == "Established":
- no_of_peer += 1
- if no_of_peer == total_peer:
- logger.info("BGP is Converged for router %s", router)
+ bgp_neighbors = bgp_addr_type["l2vpn"]["evpn"]["neighbor"]
+ total_evpn_peer += len(bgp_neighbors)
+
+ no_of_evpn_peer = 0
+ for bgp_neighbor, peer_data in bgp_neighbors.items():
+ for _addr_type, dest_link_dict in peer_data.items():
+ data = topo["routers"][bgp_neighbor]["links"]
+ for dest_link in dest_link_dict.keys():
+ if dest_link in data:
+ peer_details = peer_data[_addr_type][dest_link]
+
+ neighbor_ip = data[dest_link][_addr_type].split("/")[0]
+ nh_state = None
+
+ if (
+ "ipv4Unicast" in show_bgp_json[vrf]
+ or "ipv6Unicast" in show_bgp_json[vrf]
+ ):
+ errormsg = (
+ "[DUT: %s] VRF: %s, "
+ "ipv4Unicast/ipv6Unicast"
+ " address-family present"
+ " under l2vpn" % (router, vrf)
+ )
+ return errormsg
+
+ l2VpnEvpn_data = show_bgp_json[vrf]["l2VpnEvpn"][
+ "peers"
+ ]
+ nh_state = l2VpnEvpn_data[neighbor_ip]["state"]
+
+ if nh_state == "Established":
+ no_of_evpn_peer += 1
+
+ if no_of_evpn_peer == total_evpn_peer:
+ logger.info(
+ "[DUT: %s] VRF: %s, BGP is Converged for " "epvn peers",
+ router,
+ vrf,
+ )
+ else:
+ errormsg = (
+ "[DUT: %s] VRF: %s, BGP is not converged "
+ "for evpn peers" % (router, vrf)
+ )
+ return errormsg
+ else:
+ for addr_type in bgp_addr_type.keys():
+ if not check_address_types(addr_type):
+ continue
+ total_peer = 0
+
+ bgp_neighbors = bgp_addr_type[addr_type]["unicast"]["neighbor"]
+
+ for bgp_neighbor in bgp_neighbors:
+ total_peer += len(bgp_neighbors[bgp_neighbor]["dest_link"])
+
+ for addr_type in bgp_addr_type.keys():
+ if not check_address_types(addr_type):
+ continue
+ bgp_neighbors = bgp_addr_type[addr_type]["unicast"]["neighbor"]
+
+ no_of_peer = 0
+ for bgp_neighbor, peer_data in bgp_neighbors.items():
+ for dest_link in peer_data["dest_link"].keys():
+ data = topo["routers"][bgp_neighbor]["links"]
+ if dest_link in data:
+ peer_details = peer_data["dest_link"][dest_link]
+ # for link local neighbors
+ if (
+ "neighbor_type" in peer_details
+ and peer_details["neighbor_type"] == "link-local"
+ ):
+ neighbor_ip = get_ipv6_linklocal_address(
+ topo["routers"], bgp_neighbor, dest_link
+ )
+ elif "source_link" in peer_details:
+ neighbor_ip = topo["routers"][bgp_neighbor][
+ "links"
+ ][peer_details["source_link"]][addr_type].split(
+ "/"
+ )[
+ 0
+ ]
+ elif (
+ "neighbor_type" in peer_details
+ and peer_details["neighbor_type"] == "unnumbered"
+ ):
+ neighbor_ip = data[dest_link]["peer-interface"]
+ else:
+ neighbor_ip = data[dest_link][addr_type].split("/")[
+ 0
+ ]
+ nh_state = None
+
+ if addr_type == "ipv4":
+ ipv4_data = show_bgp_json[vrf]["ipv4Unicast"][
+ "peers"
+ ]
+ nh_state = ipv4_data[neighbor_ip]["state"]
+ else:
+ ipv6_data = show_bgp_json[vrf]["ipv6Unicast"][
+ "peers"
+ ]
+ nh_state = ipv6_data[neighbor_ip]["state"]
+
+ if nh_state == "Established":
+ no_of_peer += 1
+
+ if no_of_peer == total_peer:
+ logger.info("[DUT: %s] VRF: %s, BGP is Converged", router, vrf)
+ else:
+ errormsg = "[DUT: %s] VRF: %s, BGP is not converged" % (router, vrf)
+ return errormsg
+
+ logger.debug("Exiting API: verify_bgp_convergence()")
+ return True
+
+
+@retry(attempts=3, wait=4, return_is_str=True)
+def verify_bgp_community(
+ tgen, addr_type, router, network, input_dict=None, vrf=None, bestpath=False
+):
+ """
+ API to veiryf BGP large community is attached in route for any given
+ DUT by running "show bgp ipv4/6 {route address} json" command.
+
+ Parameters
+ ----------
+ * `tgen`: topogen object
+ * `addr_type` : ip type, ipv4/ipv6
+ * `dut`: Device Under Test
+ * `network`: network for which set criteria needs to be verified
+ * `input_dict`: having details like - for which router, community and
+ values needs to be verified
+ * `vrf`: VRF name
+ * `bestpath`: To check best path cli
+
+ Usage
+ -----
+ networks = ["200.50.2.0/32"]
+ input_dict = {
+ "largeCommunity": "2:1:1 2:2:2 2:3:3 2:4:4 2:5:5"
+ }
+ result = verify_bgp_community(tgen, "ipv4", dut, network, input_dict=None)
+
+ Returns
+ -------
+ errormsg(str) or True
+ """
+
+ logger.debug("Entering lib API: verify_bgp_community()")
+ if router not in tgen.routers():
+ return False
+
+ rnode = tgen.routers()[router]
+
+ logger.info(
+ "Verifying BGP community attributes on dut %s: for %s " "network %s",
+ router,
+ addr_type,
+ network,
+ )
+
+ command = "show bgp"
+
+ sleep(5)
+ for net in network:
+ if vrf:
+ cmd = "{} vrf {} {} {} json".format(command, vrf, addr_type, net)
+ elif bestpath:
+ cmd = "{} {} {} bestpath json".format(command, addr_type, net)
else:
- errormsg = "BGP is not converged for router {}".format(router)
+ cmd = "{} {} {} json".format(command, addr_type, net)
+
+ show_bgp_json = run_frr_cmd(rnode, cmd, isjson=True)
+ if "paths" not in show_bgp_json:
+ return "Prefix {} not found in BGP table of router: {}".format(net, router)
+
+ as_paths = show_bgp_json["paths"]
+ found = False
+ for i in range(len(as_paths)):
+ if (
+ "largeCommunity" in show_bgp_json["paths"][i]
+ or "community" in show_bgp_json["paths"][i]
+ ):
+ found = True
+ logger.info(
+ "Large Community attribute is found for route:" " %s in router: %s",
+ net,
+ router,
+ )
+ if input_dict is not None:
+ for criteria, comm_val in input_dict.items():
+ show_val = show_bgp_json["paths"][i][criteria]["string"]
+ if comm_val == show_val:
+ logger.info(
+ "Verifying BGP %s for prefix: %s"
+ " in router: %s, found expected"
+ " value: %s",
+ criteria,
+ net,
+ router,
+ comm_val,
+ )
+ else:
+ errormsg = (
+ "Failed: Verifying BGP attribute"
+ " {} for route: {} in router: {}"
+ ", expected value: {} but found"
+ ": {}".format(criteria, net, router, comm_val, show_val)
+ )
+ return errormsg
+
+ if not found:
+ errormsg = (
+ "Large Community attribute is not found for route: "
+ "{} in router: {} ".format(net, router)
+ )
return errormsg
- logger.debug("Exiting API: verify_bgp_convergence()")
+ logger.debug("Exiting lib API: verify_bgp_community()")
return True
"""
logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
+
if router not in tgen.routers():
return False
# Clearing BGP
logger.info("Clearing BGP neighborship for router %s..", router)
if addr_type == "ipv4":
- run_frr_cmd(rnode, "clear ip bgp *")
+ if vrf:
+ for _vrf in vrf:
+ run_frr_cmd(rnode, "clear ip bgp vrf {} *".format(_vrf))
+ else:
+ run_frr_cmd(rnode, "clear ip bgp *")
elif addr_type == "ipv6":
- run_frr_cmd(rnode, "clear bgp ipv6 *")
+ if vrf:
+ for _vrf in vrf:
+ run_frr_cmd(rnode, "clear bgp vrf {} ipv6 *".format(_vrf))
+ else:
+ run_frr_cmd(rnode, "clear bgp ipv6 *")
+ else:
+ run_frr_cmd(rnode, "clear bgp *")
+
+ sleep(5)
logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
"show bgp ipv4/6 json" command will be run and verify best path according
to shortest as-path, highest local-preference and med, lowest weight and
route origin IGP>EGP>INCOMPLETE.
-
Parameters
----------
* `tgen` : topogen object
* `attribute` : calculate best path using this attribute
* `input_dict`: defines different routes to calculate for which route
best path is selected
-
Usage
-----
# To verify best path for routes 200.50.2.0/32 and 200.60.2.0/32 from
"""
logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
+
if router not in tgen.routers():
return False
rnode = tgen.routers()[router]
- command = "show bgp {} json".format(addr_type)
+ # Verifying show bgp json
+ command = "show bgp"
- sleep(5)
+ sleep(2)
logger.info("Verifying router %s RIB for best path:", router)
- sh_ip_bgp_json = run_frr_cmd(rnode, command, isjson=True)
+ static_route = False
+ advertise_network = False
for route_val in input_dict.values():
- net_data = route_val["bgp"]["address_family"][addr_type]["unicast"]
- networks = net_data["advertise_networks"]
- for network in networks:
- route = network["network"]
+ if "static_routes" in route_val:
+ static_route = True
+ networks = route_val["static_routes"]
+ else:
+ advertise_network = True
+ net_data = route_val["bgp"]["address_family"][addr_type]["unicast"]
+ networks = net_data["advertise_networks"]
- route_attributes = sh_ip_bgp_json["routes"][route]
- _next_hop = None
- compare = None
- attribute_dict = {}
- for route_attribute in route_attributes:
- next_hops = route_attribute["nexthops"]
- for next_hop in next_hops:
- next_hop_ip = next_hop["ip"]
- attribute_dict[next_hop_ip] = route_attribute[attribute]
+ for network in networks:
+ _network = network["network"]
+ no_of_ip = network.setdefault("no_of_ip", 1)
+ vrf = network.setdefault("vrf", None)
- # AS_PATH attribute
- if attribute == "path":
- # Find next_hop for the route have minimum as_path
- _next_hop = min(
- attribute_dict, key=lambda x: len(set(attribute_dict[x]))
- )
- compare = "SHORTEST"
-
- # LOCAL_PREF attribute
- elif attribute == "locPrf":
- # Find next_hop for the route have highest local preference
- _next_hop = max(attribute_dict, key=(lambda k: attribute_dict[k]))
- compare = "HIGHEST"
-
- # WEIGHT attribute
- elif attribute == "weight":
- # Find next_hop for the route have highest weight
- _next_hop = max(attribute_dict, key=(lambda k: attribute_dict[k]))
- compare = "HIGHEST"
-
- # ORIGIN attribute
- elif attribute == "origin":
- # Find next_hop for the route have IGP as origin, -
- # - rule is IGP>EGP>INCOMPLETE
- _next_hop = [
- key for (key, value) in attribute_dict.iteritems() if value == "IGP"
- ][0]
- compare = ""
-
- # MED attribute
- elif attribute == "metric":
- # Find next_hop for the route have LOWEST MED
- _next_hop = min(attribute_dict, key=(lambda k: attribute_dict[k]))
- compare = "LOWEST"
-
- # Show ip route
- if addr_type == "ipv4":
- command = "show ip route json"
+ if vrf:
+ cmd = "{} vrf {}".format(command, vrf)
else:
- command = "show ipv6 route json"
+ cmd = command
+
+ cmd = "{} {}".format(cmd, addr_type)
+ cmd = "{} json".format(cmd)
+ sh_ip_bgp_json = run_frr_cmd(rnode, cmd, isjson=True)
+
+ routes = generate_ips(_network, no_of_ip)
+ for route in routes:
+ route = str(ipaddr.IPNetwork(unicode(route)))
+
+ if route in sh_ip_bgp_json["routes"]:
+ route_attributes = sh_ip_bgp_json["routes"][route]
+ _next_hop = None
+ compare = None
+ attribute_dict = {}
+ for route_attribute in route_attributes:
+ next_hops = route_attribute["nexthops"]
+ for next_hop in next_hops:
+ next_hop_ip = next_hop["ip"]
+ attribute_dict[next_hop_ip] = route_attribute[attribute]
+
+ # AS_PATH attribute
+ if attribute == "path":
+ # Find next_hop for the route have minimum as_path
+ _next_hop = min(
+ attribute_dict, key=lambda x: len(set(attribute_dict[x]))
+ )
+ compare = "SHORTEST"
- rib_routes_json = run_frr_cmd(rnode, command, isjson=True)
+ # LOCAL_PREF attribute
+ elif attribute == "locPrf":
+ # Find next_hop for the route have highest local preference
+ _next_hop = max(
+ attribute_dict, key=(lambda k: attribute_dict[k])
+ )
+ compare = "HIGHEST"
- # Verifying output dictionary rib_routes_json is not empty
- if not bool(rib_routes_json):
- errormsg = "No route found in RIB of router {}..".format(router)
- return errormsg
+ # WEIGHT attribute
+ elif attribute == "weight":
+ # Find next_hop for the route have highest weight
+ _next_hop = max(
+ attribute_dict, key=(lambda k: attribute_dict[k])
+ )
+ compare = "HIGHEST"
+
+ # ORIGIN attribute
+ elif attribute == "origin":
+ # Find next_hop for the route have IGP as origin, -
+ # - rule is IGP>EGP>INCOMPLETE
+ _next_hop = [
+ key
+ for (key, value) in attribute_dict.iteritems()
+ if value == "IGP"
+ ][0]
+ compare = ""
+
+ # MED attribute
+ elif attribute == "metric":
+ # Find next_hop for the route have LOWEST MED
+ _next_hop = min(
+ attribute_dict, key=(lambda k: attribute_dict[k])
+ )
+ compare = "LOWEST"
- st_found = False
- nh_found = False
- # Find best is installed in RIB
- if route in rib_routes_json:
- st_found = True
- # Verify next_hop in rib_routes_json
- if rib_routes_json[route][0]["nexthops"][0]["ip"] in attribute_dict:
- nh_found = True
- else:
- errormsg = (
- "Incorrect Nexthop for BGP route {} in "
- "RIB of router {}, Expected: {}, Found:"
- " {}\n".format(
+ # Show ip route
+ if addr_type == "ipv4":
+ command_1 = "show ip route"
+ else:
+ command_1 = "show ipv6 route"
+
+ if vrf:
+ cmd = "{} vrf {} json".format(command_1, vrf)
+ else:
+ cmd = "{} json".format(command_1)
+
+ rib_routes_json = run_frr_cmd(rnode, cmd, isjson=True)
+
+ # Verifying output dictionary rib_routes_json is not empty
+ if not bool(rib_routes_json):
+ errormsg = "No route found in RIB of router {}..".format(router)
+ return errormsg
+
+ st_found = False
+ nh_found = False
+ # Find best is installed in RIB
+ if route in rib_routes_json:
+ st_found = True
+ # Verify next_hop in rib_routes_json
+ if (
+ rib_routes_json[route][0]["nexthops"][0]["ip"]
+ in attribute_dict
+ ):
+ nh_found = True
+ else:
+ errormsg = (
+ "Incorrect Nexthop for BGP route {} in "
+ "RIB of router {}, Expected: {}, Found:"
+ " {}\n".format(
+ route,
+ router,
+ rib_routes_json[route][0]["nexthops"][0]["ip"],
+ _next_hop,
+ )
+ )
+ return errormsg
+
+ if st_found and nh_found:
+ logger.info(
+ "Best path for prefix: %s with next_hop: %s is "
+ "installed according to %s %s: (%s) in RIB of "
+ "router %s",
route,
- router,
- rib_routes_json[route][0]["nexthops"][0]["ip"],
_next_hop,
+ compare,
+ attribute,
+ attribute_dict[_next_hop],
+ router,
)
- )
- return errormsg
-
- if st_found and nh_found:
- logger.info(
- "Best path for prefix: %s with next_hop: %s is "
- "installed according to %s %s: (%s) in RIB of "
- "router %s",
- route,
- _next_hop,
- compare,
- attribute,
- attribute_dict[_next_hop],
- router,
- )
logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
return True
return True
-@retry(attempts=10, wait=2, return_is_str=True, initial_wait=2)
+@retry(attempts=5, wait=2, return_is_str=True, initial_wait=2)
def verify_bgp_rib(tgen, addr_type, dut, input_dict, next_hop=None, aspath=None):
"""
This API is to verify whether bgp rib has any
errormsg(str) or True
"""
- logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
+ logger.debug("Entering lib API: verify_bgp_rib()")
router_list = tgen.routers()
additional_nexthops_in_required_nhs = []
"routes are: {}\n".format(dut, found_routes)
)
- logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
+ logger.debug("Exiting lib API: verify_bgp_rib()")
return True
raise InvalidCLIError("No actual cmd passed")
+def apply_raw_config(tgen, input_dict):
+
+ """
+ API to configure raw configuration on device. This can be used for any cli
+ which does has not been implemented in JSON.
+
+ Parameters
+ ----------
+ * `tgen`: tgen onject
+ * `input_dict`: configuration that needs to be applied
+
+ Usage
+ -----
+ input_dict = {
+ "r2": {
+ "raw_config": [
+ "router bgp",
+ "no bgp update-group-split-horizon"
+ ]
+ }
+ }
+ Returns
+ -------
+ True or errormsg
+ """
+
+ result = True
+ for router_name in input_dict.keys():
+ config_cmd = input_dict[router_name]["raw_config"]
+
+ if not isinstance(config_cmd, list):
+ config_cmd = [config_cmd]
+
+ frr_cfg_file = "{}/{}/{}".format(TMPDIR, router_name, FRRCFG_FILE)
+ with open(frr_cfg_file, "w") as cfg:
+ for cmd in config_cmd:
+ cfg.write("{}\n".format(cmd))
+
+ result = load_config_to_router(tgen, router_name)
+
+ return result
+
+
def create_common_configuration(
tgen, router, data, config_type=None, build=False, load_config=True
):
"bgp_community_list": "! Community List Config\n",
"route_maps": "! Route Maps Config\n",
"bgp": "! BGP Config\n",
+ "vrf": "! VRF Config\n",
}
)
"""
Resets configuration on routers to the snapshot created using input JSON
file. It replaces existing router configuration with FRRCFG_BKUP_FILE
+
Parameters
----------
* `tgen` : Topogen object
router = router_list[rname]
logger.info("Configuring router %s to initial test configuration", rname)
+
cfg = router.run("vtysh -c 'show running'")
fname = "{}/{}/frr.sav".format(TMPDIR, rname)
dname = "{}/{}/delta.conf".format(TMPDIR, rname)
f.write("\n")
f.close()
-
run_cfg_file = "{}/{}/frr.sav".format(TMPDIR, rname)
init_cfg_file = "{}/{}/frr_json_initial.conf".format(TMPDIR, rname)
-
- tempdir = mkdtemp()
- with open(os.path.join(tempdir, "vtysh.conf"), "w") as fd:
- pass
-
- command = "/usr/lib/frr/frr-reload.py --confdir {} --input {} --test {} > {}".format(
- tempdir, run_cfg_file, init_cfg_file, dname
+ command = "/usr/lib/frr/frr-reload.py --input {} --test {} > {}".format(
+ run_cfg_file, init_cfg_file, dname
)
result = call(command, shell=True, stderr=SUB_STDOUT, stdout=SUB_PIPE)
- os.unlink(os.path.join(tempdir, "vtysh.conf"))
- os.rmdir(tempdir)
-
# Assert if command fail
if result > 0:
logger.error("Delta file creation failed. Command executed %s", command)
# Router current configuration to log file or console if
# "show_router_config" is defined in "pytest.ini"
if show_router_config:
- logger.info("Configuration on router {} after config reset:".format(rname))
+ logger.info("Configuration on router {} after reset:".format(rname))
logger.info(delta.getvalue())
delta.close()
- logger.debug("Exting API: reset_config_on_routers")
+ logger.debug("Exiting API: reset_config_on_routers")
return True
def load_config_to_router(tgen, routerName, save_bkup=False):
"""
Loads configuration on router from the file FRRCFG_FILE.
+
Parameters
----------
* `tgen` : Topogen object
router_list = tgen.routers()
for rname in ROUTER_LIST:
- if routerName and routerName != rname:
+ if routerName and rname != routerName:
continue
router = router_list[rname]
raise InvalidCLIError("%s" % output)
cfg.truncate(0)
+
except IOError as err:
errormsg = (
"Unable to open config File. error(%s):" " %s",
# Router current configuration to log file or console if
# "show_router_config" is defined in "pytest.ini"
if show_router_config:
+ logger.info("New configuration for router {}:".format(rname))
new_config = router.run("vtysh -c 'show running'")
logger.info(new_config)
- logger.debug("Exting API: load_config_to_router")
+ logger.debug("Exiting API: load_config_to_router")
return True
-def get_frr_ipv6_linklocal(tgen, router, intf=None):
+def get_frr_ipv6_linklocal(tgen, router, intf=None, vrf=None):
"""
API to get the link local ipv6 address of a perticular interface using
FRR command 'show interface'
+
* `tgen`: tgen onject
* `router` : router for which hightest interface should be
calculated
* `intf` : interface for which linklocal address needs to be taken
+ * `vrf` : VRF name
+
Usage
-----
linklocal = get_frr_ipv6_linklocal(tgen, router, "intf1", RED_A)
+
Returns
-------
1) array of interface names to link local ips.
linklocal = []
- cmd = "show interface"
+ if vrf:
+ cmd = "show interface vrf {}".format(vrf)
+ else:
+ cmd = "show interface"
ifaces = router_list[router].run('vtysh -c "{}"'.format(cmd))
tgen.start_router()
+def stop_router(tgen, router):
+ """
+ Router"s current config would be saved to /etc/frr/ for each deamon
+ and router and its deamons would be stopped.
+
+ * `tgen` : topogen object
+ * `router`: Device under test
+ """
+
+ router_list = tgen.routers()
+
+ # Saving router config to /etc/frr, which will be loaded to router
+ # when it starts
+ router_list[router].vtysh_cmd("write memory")
+
+ # Stop router
+ router_list[router].stop()
+
+
+def start_router(tgen, router):
+ """
+ Router will started and config would be loaded from /etc/frr/ for each
+ deamon
+
+ * `tgen` : topogen object
+ * `router`: Device under test
+ """
+
+ logger.debug("Entering lib API: start_router")
+
+ try:
+ router_list = tgen.routers()
+
+ # Router and its deamons would be started and config would
+ # be loaded to router for each deamon from /etc/frr
+ router_list[router].start()
+
+ # Waiting for router to come up
+ sleep(5)
+
+ except Exception as e:
+ errormsg = traceback.format_exc()
+ logger.error(errormsg)
+ return errormsg
+
+ logger.debug("Exiting lib API: start_router()")
+ return True
+
+
def number_to_row(routerName):
"""
Returns the number for the router.
#############################################
+def create_vrf_cfg(tgen, topo, input_dict=None, build=False):
+ """
+ Create vrf configuration for created topology. VRF
+ configuration is provided in input json file.
+
+ VRF config is done in Linux Kernel:
+ * Create VRF
+ * Attach interface to VRF
+ * Bring up VRF
+
+ Parameters
+ ----------
+ * `tgen` : Topogen object
+ * `topo` : json file data
+ * `input_dict` : Input dict data, required when configuring
+ from testcase
+ * `build` : Only for initial setup phase this is set as True.
+
+ Usage
+ -----
+ input_dict={
+ "r3": {
+ "links": {
+ "r2-link1": {"ipv4": "auto", "ipv6": "auto", "vrf": "RED_A"},
+ "r2-link2": {"ipv4": "auto", "ipv6": "auto", "vrf": "RED_B"},
+ "r2-link3": {"ipv4": "auto", "ipv6": "auto", "vrf": "BLUE_A"},
+ "r2-link4": {"ipv4": "auto", "ipv6": "auto", "vrf": "BLUE_B"},
+ },
+ "vrfs":[
+ {
+ "name": "RED_A",
+ "id": "1"
+ },
+ {
+ "name": "RED_B",
+ "id": "2"
+ },
+ {
+ "name": "BLUE_A",
+ "id": "3",
+ "delete": True
+ },
+ {
+ "name": "BLUE_B",
+ "id": "4"
+ }
+ ]
+ }
+ }
+ result = create_vrf_cfg(tgen, topo, input_dict)
+
+ Returns
+ -------
+ True or False
+ """
+ result = True
+ if not input_dict:
+ input_dict = deepcopy(topo)
+ else:
+ input_dict = deepcopy(input_dict)
+
+ try:
+ for c_router, c_data in input_dict.iteritems():
+ rnode = tgen.routers()[c_router]
+ if "vrfs" in c_data:
+ for vrf in c_data["vrfs"]:
+ config_data = []
+ del_action = vrf.setdefault("delete", False)
+ name = vrf.setdefault("name", None)
+ table_id = vrf.setdefault("id", None)
+ vni = vrf.setdefault("vni", None)
+ del_vni = vrf.setdefault("no_vni", None)
+
+ if del_action:
+ # Kernel cmd- Add VRF and table
+ cmd = "ip link del {} type vrf table {}".format(
+ vrf["name"], vrf["id"]
+ )
+
+ logger.info("[DUT: %s]: Running kernel cmd [%s]", c_router, cmd)
+ rnode.run(cmd)
+
+ # Kernel cmd - Bring down VRF
+ cmd = "ip link set dev {} down".format(name)
+ logger.info("[DUT: %s]: Running kernel cmd [%s]", c_router, cmd)
+ rnode.run(cmd)
+
+ else:
+ if name and table_id:
+ # Kernel cmd- Add VRF and table
+ cmd = "ip link add {} type vrf table {}".format(
+ name, table_id
+ )
+ logger.info(
+ "[DUT: %s]: Running kernel cmd " "[%s]", c_router, cmd
+ )
+ rnode.run(cmd)
+
+ # Kernel cmd - Bring up VRF
+ cmd = "ip link set dev {} up".format(name)
+ logger.info(
+ "[DUT: %s]: Running kernel " "cmd [%s]", c_router, cmd
+ )
+ rnode.run(cmd)
+
+ if "links" in c_data:
+ for destRouterLink, data in sorted(
+ c_data["links"].iteritems()
+ ):
+ # Loopback interfaces
+ if "type" in data and data["type"] == "loopback":
+ interface_name = destRouterLink
+ else:
+ interface_name = data["interface"]
+
+ if "vrf" in data:
+ vrf_list = data["vrf"]
+
+ if type(vrf_list) is not list:
+ vrf_list = [vrf_list]
+
+ for _vrf in vrf_list:
+ cmd = "ip link set {} master {}".format(
+ interface_name, _vrf
+ )
+
+ logger.info(
+ "[DUT: %s]: Running" " kernel cmd [%s]",
+ c_router,
+ cmd,
+ )
+ rnode.run(cmd)
+
+ result = create_common_configuration(
+ tgen, c_router, config_data, "vrf", build=build
+ )
+
+ except InvalidCLIError:
+ # Traceback
+ errormsg = traceback.format_exc()
+ logger.error(errormsg)
+ return errormsg
+
+ return result
+
+
+def create_interface_in_kernel(
+ tgen, dut, name, ip_addr, vrf=None, netmask=None, create=True
+):
+ """
+ Cretae interfaces in kernel for ipv4/ipv6
+ Config is done in Linux Kernel:
+
+ Parameters
+ ----------
+ * `tgen` : Topogen object
+ * `dut` : Device for which interfaces to be added
+ * `name` : interface name
+ * `ip_addr` : ip address for interface
+ * `vrf` : VRF name, to which interface will be associated
+ * `netmask` : netmask value, default is None
+ * `create`: Create interface in kernel, if created then no need
+ to create
+ """
+
+ rnode = tgen.routers()[dut]
+
+ if create:
+ cmd = "sudo ip link add name {} type dummy".format(name)
+ rnode.run(cmd)
+
+ addr_type = validate_ip_address(ip_addr)
+ if addr_type == "ipv4":
+ cmd = "ifconfig {} {} netmask {}".format(name, ip_addr, netmask)
+ else:
+ cmd = "ifconfig {} inet6 add {}/{}".format(name, ip_addr, netmask)
+
+ rnode.run(cmd)
+
+ if vrf:
+ cmd = "ip link set {} master {}".format(name, vrf)
+ rnode.run(cmd)
+
+
def validate_ip_address(ip_address):
"""
Validates the type of ip address
return True
-def retry(attempts=3, wait=2, return_is_str=True, initial_wait=0):
+def retry(attempts=3, wait=2, return_is_str=True, initial_wait=0, return_is_dict=False):
"""
Retries function execution, if return is an errormsg or exception
+
* `attempts`: Number of attempts to make
* `wait`: Number of seconds to wait between each attempt
* `return_is_str`: Return val is an errormsg in case of failure
* `initial_wait`: Sleeps for this much seconds before executing function
+
"""
def _retry(func):
sleep(initial_wait)
_return_is_str = kwargs.pop("return_is_str", return_is_str)
+ _return_is_dict = kwargs.pop("return_is_str", return_is_dict)
for i in range(1, _attempts + 1):
try:
_expected = kwargs.setdefault("expected", True)
kwargs.pop("expected")
ret = func(*args, **kwargs)
logger.debug("Function returned %s" % ret)
- if return_is_str and isinstance(ret, bool) and _expected:
+ if _return_is_str and isinstance(ret, bool) and _expected:
return ret
- if isinstance(ret, str) and _expected is False:
+ if (
+ isinstance(ret, str) or isinstance(ret, unicode)
+ ) and _expected is False:
+ return ret
+ if _return_is_dict and isinstance(ret, dict):
return ret
if _attempts == i:
"""
Create interface configuration for created topology. Basic Interface
configuration is provided in input json file.
+
Parameters
----------
* `tgen` : Topogen object
* `topo` : json file data
* `build` : Only for initial setup phase this is set as True.
+
Returns
-------
True or False
"""
result = False
+ topo = deepcopy(topo)
try:
for c_router, c_data in topo.iteritems():
interface_name = destRouterLink
else:
interface_name = data["interface"]
+
interface_data.append("interface {}".format(str(interface_name)))
if "ipv4" in data:
intf_addr = c_data["links"][destRouterLink]["ipv4"]
- interface_data.append("ip address {}".format(intf_addr))
+
+ if "delete" in data and data["delete"]:
+ interface_data.append("no ip address {}".format(intf_addr))
+ else:
+ interface_data.append("ip address {}".format(intf_addr))
if "ipv6" in data:
intf_addr = c_data["links"][destRouterLink]["ipv6"]
- interface_data.append("ipv6 address {}".format(intf_addr))
+
+ if "delete" in data and data["delete"]:
+ interface_data.append("no ipv6 address {}".format(intf_addr))
+ else:
+ interface_data.append("ipv6 address {}".format(intf_addr))
+
+ if "ipv6-link-local" in data:
+ intf_addr = c_data["links"][destRouterLink]["ipv6-link-local"]
+
+ if "delete" in data and data["delete"]:
+ interface_data.append("no ipv6 address {}".format(intf_addr))
+ else:
+ interface_data.append("ipv6 address {}\n".format(intf_addr))
result = create_common_configuration(
tgen, c_router, interface_data, "interface_config", build=build
def create_static_routes(tgen, input_dict, build=False):
"""
Create static routes for given router as defined in input_dict
+
Parameters
----------
* `tgen` : Topogen object
* `input_dict` : Input dict data, required when configuring from testcase
* `build` : Only for initial setup phase this is set as True.
+
Usage
-----
input_dict should be in the format below:
# admin_distance: admin distance for route/routes.
# next_hop: starting next-hop address
# tag: tag id for static routes
+ # vrf: VRF name in which static routes needs to be created
# delete: True if config to be removed. Default False.
+
Example:
"routers": {
"r1": {
"no_of_ip": 9,
"admin_distance": 100,
"next_hop": "10.0.0.1",
- "tag": 4001
+ "tag": 4001,
+ "vrf": "RED_A"
"delete": true
}
]
}
}
+
Returns
-------
errormsg(str) or True
"""
result = False
- logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
+ logger.debug("Entering lib API: create_static_routes()")
input_dict = deepcopy(input_dict)
+
try:
for router in input_dict.keys():
if "static_routes" not in input_dict[router]:
errormsg = "static_routes not present in input_dict"
- logger.debug(errormsg)
+ logger.info(errormsg)
continue
static_routes_list = []
static_routes = input_dict[router]["static_routes"]
for static_route in static_routes:
del_action = static_route.setdefault("delete", False)
- # No of IPs
no_of_ip = static_route.setdefault("no_of_ip", 1)
- admin_distance = static_route.setdefault("admin_distance", None)
- tag = static_route.setdefault("tag", None)
- if "next_hop" not in static_route or "network" not in static_route:
- errormsg = "'next_hop' or 'network' missing in" " input_dict"
- return errormsg
-
- next_hop = static_route["next_hop"]
- network = static_route["network"]
+ network = static_route.setdefault("network", [])
if type(network) is not list:
network = [network]
+ admin_distance = static_route.setdefault("admin_distance", None)
+ tag = static_route.setdefault("tag", None)
+ vrf = static_route.setdefault("vrf", None)
+ interface = static_route.setdefault("interface", None)
+ next_hop = static_route.setdefault("next_hop", None)
+ nexthop_vrf = static_route.setdefault("nexthop_vrf", None)
+
ip_list = generate_ips(network, no_of_ip)
for ip in ip_list:
addr_type = validate_ip_address(ip)
if addr_type == "ipv4":
- cmd = "ip route {} {}".format(ip, next_hop)
+ cmd = "ip route {}".format(ip)
else:
- cmd = "ipv6 route {} {}".format(ip, next_hop)
+ cmd = "ipv6 route {}".format(ip)
+
+ if interface:
+ cmd = "{} {}".format(cmd, interface)
+
+ if next_hop:
+ cmd = "{} {}".format(cmd, next_hop)
+
+ if nexthop_vrf:
+ cmd = "{} nexthop-vrf {}".format(cmd, nexthop_vrf)
+
+ if vrf:
+ cmd = "{} vrf {}".format(cmd, vrf)
if tag:
cmd = "{} tag {}".format(cmd, str(tag))
logger.error(errormsg)
return errormsg
- logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
+ logger.debug("Exiting lib API: create_static_routes()")
return result
"large_community_list", {}
)
+ metric = match_data.setdefault("metric", None)
+ source_vrf = match_data.setdefault("source-vrf", None)
+
if ipv4_data:
# fetch prefix list data from rmap
prefix_name = ipv4_data.setdefault("prefix_lists", None)
cmd = "{} exact-match".format(cmd)
rmap_data.append(cmd)
+ if source_vrf:
+ cmd = "match source-vrf {}".format(source_vrf)
+ rmap_data.append(cmd)
+
+ if metric:
+ cmd = "match metric {}".format(metric)
+ rmap_data.append(cmd)
+
result = create_common_configuration(
tgen, router, rmap_data, "route_maps", build=build
)
interface_set_status(router_list[dut], intf_name, ifaceaction)
+def addKernelRoute(
+ tgen, router, intf, group_addr_range, next_hop=None, src=None, del_action=None
+):
+ """
+ Add route to kernel
+
+ Parameters:
+ -----------
+ * `tgen` : Topogen object
+ * `router`: router for which kernal routes needs to be added
+ * `intf`: interface name, for which kernal routes needs to be added
+ * `bindToAddress`: bind to <host>, an interface or multicast
+ address
+
+ returns:
+ --------
+ errormsg or True
+ """
+
+ logger.debug("Entering lib API: addKernelRoute()")
+
+ rnode = tgen.routers()[router]
+
+ if type(group_addr_range) is not list:
+ group_addr_range = [group_addr_range]
+
+ for grp_addr in group_addr_range:
+
+ addr_type = validate_ip_address(grp_addr)
+ if addr_type == "ipv4":
+ if next_hop is not None:
+ cmd = "ip route add {} via {}".format(grp_addr, next_hop)
+ else:
+ cmd = "ip route add {} dev {}".format(grp_addr, intf)
+ if del_action:
+ cmd = "ip route del {}".format(grp_addr)
+ verify_cmd = "ip route"
+ elif addr_type == "ipv6":
+ if intf and src:
+ cmd = "ip -6 route add {} dev {} src {}".format(grp_addr, intf, src)
+ else:
+ cmd = "ip -6 route add {} via {}".format(grp_addr, next_hop)
+ verify_cmd = "ip -6 route"
+ if del_action:
+ cmd = "ip -6 route del {}".format(grp_addr)
+
+ logger.info("[DUT: {}]: Running command: [{}]".format(router, cmd))
+ output = rnode.run(cmd)
+
+ # Verifying if ip route added to kernal
+ result = rnode.run(verify_cmd)
+ logger.debug("{}\n{}".format(verify_cmd, result))
+ if "/" in grp_addr:
+ ip, mask = grp_addr.split("/")
+ if mask == "32" or mask == "128":
+ grp_addr = ip
+
+ if not re_search(r"{}".format(grp_addr), result) and mask is not "0":
+ errormsg = (
+ "[DUT: {}]: Kernal route is not added for group"
+ " address {} Config output: {}".format(router, grp_addr, output)
+ )
+
+ return errormsg
+
+ logger.debug("Exiting lib API: addKernelRoute()")
+ return True
+
+
#############################################
# Verification APIs
#############################################
-@retry(attempts=10, return_is_str=True, initial_wait=2)
-def verify_rib(tgen, addr_type, dut, input_dict, next_hop=None, protocol=None):
+@retry(attempts=5, wait=2, return_is_str=True, initial_wait=2)
+def verify_rib(
+ tgen,
+ addr_type,
+ dut,
+ input_dict,
+ next_hop=None,
+ protocol=None,
+ tag=None,
+ metric=None,
+ fib=None,
+):
"""
Data will be read from input_dict or input JSON file, API will generate
same prefixes, which were redistributed by either create_static_routes() or
advertise_networks_using_network_command() and do will verify next_hop and
each prefix/routes is present in "show ip/ipv6 route {bgp/stataic} json"
command o/p.
+
Parameters
----------
* `tgen` : topogen object
* `next_hop`[optional]: next_hop which needs to be verified,
default: static
* `protocol`[optional]: protocol, default = None
+
Usage
-----
# RIB can be verified for static routes OR network advertised using
network command. Following are input_dicts to create static routes
and advertise networks using network command. Any one of the input_dict
can be passed to verify_rib() to verify routes in DUT"s RIB.
+
# Creating static routes for r1
input_dict = {
"r1": {
dut = "r2"
protocol = "bgp"
result = verify_rib(tgen, "ipv4", dut, input_dict, protocol = protocol)
+
Returns
-------
errormsg(str) or True
"""
- logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
+ logger.info("Entering lib API: verify_rib()")
router_list = tgen.routers()
+ additional_nexthops_in_required_nhs = []
+ found_hops = []
for routerInput in input_dict.keys():
for router, rnode in router_list.iteritems():
if router != dut:
continue
+ logger.info("Checking router %s RIB:", router)
+
# Verifying RIB routes
if addr_type == "ipv4":
- if protocol:
- command = "show ip route {} json".format(protocol)
- else:
- command = "show ip route json"
+ command = "show ip route"
else:
- if protocol:
- command = "show ipv6 route {} json".format(protocol)
- else:
- command = "show ipv6 route json"
-
- logger.info("Checking router %s RIB:", router)
- rib_routes_json = run_frr_cmd(rnode, command, isjson=True)
+ command = "show ipv6 route"
- # Verifying output dictionary rib_routes_json is not empty
- if bool(rib_routes_json) is False:
- errormsg = "No {} route found in rib of router {}..".format(
- protocol, router
- )
- return errormsg
+ found_routes = []
+ missing_routes = []
if "static_routes" in input_dict[routerInput]:
static_routes = input_dict[routerInput]["static_routes"]
- st_found = False
- nh_found = False
- found_routes = []
- missing_routes = []
+
for static_route in static_routes:
+ if "vrf" in static_route and static_route["vrf"] is not None:
+
+ logger.info(
+ "[DUT: {}]: Verifying routes for VRF:"
+ " {}".format(router, static_route["vrf"])
+ )
+
+ cmd = "{} vrf {}".format(command, static_route["vrf"])
+
+ else:
+ cmd = "{}".format(command)
+
+ if protocol:
+ cmd = "{} {}".format(cmd, protocol)
+
+ cmd = "{} json".format(cmd)
+
+ rib_routes_json = run_frr_cmd(rnode, cmd, isjson=True)
+
+ # Verifying output dictionary rib_routes_json is not empty
+ if bool(rib_routes_json) is False:
+ errormsg = "No route found in rib of router {}..".format(router)
+ return errormsg
+
network = static_route["network"]
if "no_of_ip" in static_route:
no_of_ip = static_route["no_of_ip"]
else:
no_of_ip = 1
+ if "tag" in static_route:
+ _tag = static_route["tag"]
+ else:
+ _tag = None
+
# Generating IPs for verification
ip_list = generate_ips(network, no_of_ip)
+ st_found = False
+ nh_found = False
+
for st_rt in ip_list:
st_rt = str(ipaddr.IPNetwork(unicode(st_rt)))
+ _addr_type = validate_ip_address(st_rt)
+ if _addr_type != addr_type:
+ continue
+
if st_rt in rib_routes_json:
st_found = True
found_routes.append(st_rt)
- if next_hop:
+ if fib and next_hop:
if type(next_hop) is not list:
next_hop = [next_hop]
+ for mnh in range(0, len(rib_routes_json[st_rt])):
+ if (
+ "fib"
+ in rib_routes_json[st_rt][mnh]["nexthops"][0]
+ ):
+ found_hops.append(
+ [
+ rib_r["ip"]
+ for rib_r in rib_routes_json[st_rt][
+ mnh
+ ]["nexthops"]
+ ]
+ )
+
+ if found_hops[0]:
+ missing_list_of_nexthops = set(
+ found_hops[0]
+ ).difference(next_hop)
+ additional_nexthops_in_required_nhs = set(
+ next_hop
+ ).difference(found_hops[0])
+
+ if additional_nexthops_in_required_nhs:
+ logger.info(
+ "Nexthop "
+ "%s is not active for route %s in "
+ "RIB of router %s\n",
+ additional_nexthops_in_required_nhs,
+ st_rt,
+ dut,
+ )
+ errormsg = (
+ "Nexthop {} is not active"
+ " for route {} in RIB of router"
+ " {}\n".format(
+ additional_nexthops_in_required_nhs,
+ st_rt,
+ dut,
+ )
+ )
+ return errormsg
+ else:
+ nh_found = True
+
+ elif next_hop and fib is None:
+ if type(next_hop) is not list:
+ next_hop = [next_hop]
found_hops = [
rib_r["ip"]
for rib_r in rib_routes_json[st_rt][0]["nexthops"]
]
- for nh in found_hops:
- nh_found = False
- if nh and nh in next_hop:
- nh_found = True
- else:
+
+ if found_hops:
+ missing_list_of_nexthops = set(
+ found_hops
+ ).difference(next_hop)
+ additional_nexthops_in_required_nhs = set(
+ next_hop
+ ).difference(found_hops)
+
+ if additional_nexthops_in_required_nhs:
+ logger.info(
+ "Missing nexthop %s for route"
+ " %s in RIB of router %s\n",
+ additional_nexthops_in_required_nhs,
+ st_rt,
+ dut,
+ )
errormsg = (
- "Nexthop {} is Missing for {}"
- " route {} in RIB of router"
- " {}\n".format(
- next_hop, protocol, st_rt, dut
+ "Nexthop {} is Missing for "
+ "route {} in RIB of router {}\n".format(
+ additional_nexthops_in_required_nhs,
+ st_rt,
+ dut,
)
)
-
return errormsg
+ else:
+ nh_found = True
+
+ if tag:
+ if "tag" not in rib_routes_json[st_rt][0]:
+ errormsg = (
+ "[DUT: {}]: tag is not"
+ " present for"
+ " route {} in RIB \n".format(dut, st_rt)
+ )
+ return errormsg
+
+ if _tag != rib_routes_json[st_rt][0]["tag"]:
+ errormsg = (
+ "[DUT: {}]: tag value {}"
+ " is not matched for"
+ " route {} in RIB \n".format(dut, _tag, st_rt,)
+ )
+ return errormsg
+
+ if metric is not None:
+ if "metric" not in rib_routes_json[st_rt][0]:
+ errormsg = (
+ "[DUT: {}]: metric is"
+ " not present for"
+ " route {} in RIB \n".format(dut, st_rt)
+ )
+ return errormsg
+
+ if metric != rib_routes_json[st_rt][0]["metric"]:
+ errormsg = (
+ "[DUT: {}]: metric value "
+ "{} is not matched for "
+ "route {} in RIB \n".format(dut, metric, st_rt,)
+ )
+ return errormsg
+
else:
missing_routes.append(st_rt)
if nh_found:
logger.info(
- "Found next_hop %s for all routes in RIB of" " router %s\n",
- next_hop,
- dut,
+ "[DUT: {}]: Found next_hop {} for all bgp"
+ " routes in RIB".format(router, next_hop)
)
- if not st_found and len(missing_routes) > 0:
- errormsg = (
- "Missing route in RIB of router {}, routes: "
- "{}\n".format(dut, missing_routes)
+ if len(missing_routes) > 0:
+ errormsg = "[DUT: {}]: Missing route in RIB, " "routes: {}".format(
+ dut, missing_routes
)
return errormsg
- logger.info(
- "Verified routes in router %s RIB, found routes" " are: %s\n",
- dut,
- found_routes,
- )
+ if found_routes:
+ logger.info(
+ "[DUT: %s]: Verified routes in RIB, found" " routes are: %s\n",
+ dut,
+ found_routes,
+ )
continue
if "bgp" in input_dict[routerInput]:
if (
"advertise_networks"
- in input_dict[routerInput]["bgp"]["address_family"][addr_type][
+ not in input_dict[routerInput]["bgp"]["address_family"][addr_type][
"unicast"
]
):
+ continue
- found_routes = []
- missing_routes = []
- advertise_network = input_dict[routerInput]["bgp"][
- "address_family"
- ][addr_type]["unicast"]["advertise_networks"]
+ found_routes = []
+ missing_routes = []
+ advertise_network = input_dict[routerInput]["bgp"]["address_family"][
+ addr_type
+ ]["unicast"]["advertise_networks"]
- for advertise_network_dict in advertise_network:
- start_ip = advertise_network_dict["network"]
- if "no_of_network" in advertise_network_dict:
- no_of_network = advertise_network_dict["no_of_network"]
- else:
- no_of_network = 1
-
- # Generating IPs for verification
- ip_list = generate_ips(start_ip, no_of_network)
- for st_rt in ip_list:
- st_rt = str(ipaddr.IPNetwork(unicode(st_rt)))
-
- found = False
- nh_found = False
- if st_rt in rib_routes_json:
- found = True
- found_routes.append(st_rt)
-
- if next_hop:
- if type(next_hop) is not list:
- next_hop = [next_hop]
-
- for nh in next_hop:
- for nh_json in rib_routes_json[st_rt][0][
- "nexthops"
- ]:
- if nh != nh_json["ip"]:
- continue
- nh_found = True
-
- if not nh_found:
- errormsg = (
- "Nexthop {} is Missing"
- " for {} route {} in "
- "RIB of router {}\n".format(
- next_hop, protocol, st_rt, dut
- )
- )
- return errormsg
+ # Continue if there are no network advertise
+ if len(advertise_network) == 0:
+ continue
+
+ for advertise_network_dict in advertise_network:
+ if "vrf" in advertise_network_dict:
+ cmd = "{} vrf {} json".format(command, static_route["vrf"])
+ else:
+ cmd = "{} json".format(command)
+
+ rib_routes_json = run_frr_cmd(rnode, cmd, isjson=True)
+
+ # Verifying output dictionary rib_routes_json is not empty
+ if bool(rib_routes_json) is False:
+ errormsg = "No route found in rib of router {}..".format(router)
+ return errormsg
+
+ start_ip = advertise_network_dict["network"]
+ if "no_of_network" in advertise_network_dict:
+ no_of_network = advertise_network_dict["no_of_network"]
+ else:
+ no_of_network = 1
+
+ # Generating IPs for verification
+ ip_list = generate_ips(start_ip, no_of_network)
+ st_found = False
+ nh_found = False
+
+ for st_rt in ip_list:
+ st_rt = str(ipaddr.IPNetwork(unicode(st_rt)))
+
+ _addr_type = validate_ip_address(st_rt)
+ if _addr_type != addr_type:
+ continue
+
+ if st_rt in rib_routes_json:
+ st_found = True
+ found_routes.append(st_rt)
+
+ if next_hop:
+ if type(next_hop) is not list:
+ next_hop = [next_hop]
+
+ count = 0
+ for nh in next_hop:
+ for nh_dict in rib_routes_json[st_rt][0]["nexthops"]:
+ if nh_dict["ip"] != nh:
+ continue
+ else:
+ count += 1
+ if count == len(next_hop):
+ nh_found = True
else:
- missing_routes.append(st_rt)
+ errormsg = (
+ "Nexthop {} is Missing"
+ " for route {} in "
+ "RIB of router {}\n".format(next_hop, st_rt, dut)
+ )
+ return errormsg
+ else:
+ missing_routes.append(st_rt)
- if nh_found:
- logger.info(
- "Found next_hop {} for all routes in RIB"
- " of router {}\n".format(next_hop, dut)
- )
+ if nh_found:
+ logger.info(
+ "Found next_hop {} for all routes in RIB"
+ " of router {}\n".format(next_hop, dut)
+ )
- if not found and len(missing_routes) > 0:
- errormsg = (
- "Missing {} route in RIB of router {}, "
- "routes: {} \n".format(addr_type, dut, missing_routes)
- )
- return errormsg
+ if len(missing_routes) > 0:
+ errormsg = (
+ "Missing {} route in RIB of router {}, "
+ "routes: {} \n".format(addr_type, dut, missing_routes)
+ )
+ return errormsg
+ if found_routes:
logger.info(
"Verified {} routes in router {} RIB, found"
" routes are: {}\n".format(addr_type, dut, found_routes)
)
- logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
+ logger.info("Exiting lib API: verify_rib()")
return True