summaryrefslogtreecommitdiff
path: root/tests/topotests/lib/bgp.py
diff options
context:
space:
mode:
Diffstat (limited to 'tests/topotests/lib/bgp.py')
-rw-r--r--tests/topotests/lib/bgp.py527
1 files changed, 525 insertions, 2 deletions
diff --git a/tests/topotests/lib/bgp.py b/tests/topotests/lib/bgp.py
index d183fb9f60..b2cd2d284d 100644
--- a/tests/topotests/lib/bgp.py
+++ b/tests/topotests/lib/bgp.py
@@ -33,6 +33,7 @@ from lib.common_config import (
load_config_to_router,
check_address_types,
generate_ips,
+ validate_ip_address,
find_interface_with_greater_ip,
run_frr_cmd,
retry,
@@ -82,6 +83,9 @@ def create_router_bgp(tgen, topo, input_dict=None, build=False):
"holddowntimer": 180,
"dest_link": {
"r4": {
+ "allowas-in": {
+ "number_occurences":2
+ },
"prefix_lists": [
{
"name": "pf_list_1",
@@ -469,6 +473,7 @@ def __create_bgp_unicast_address_family(
prefix_lists = peer.setdefault("prefix_lists", {})
route_maps = peer.setdefault("route_maps", {})
no_send_community = peer.setdefault("no_send_community", None)
+ allowas_in = peer.setdefault("allowas-in", None)
# next-hop-self
if next_hop_self:
@@ -483,6 +488,13 @@ def __create_bgp_unicast_address_family(
"no {} send-community {}".format(neigh_cxt, no_send_community)
)
+ if "allowas_in" in peer:
+ allow_as_in = peer["allowas_in"]
+ config_data.append("{} allowas-in {}".format(neigh_cxt, allow_as_in))
+
+ if "no_allowas_in" in peer:
+ allow_as_in = peer["no_allowas_in"]
+ config_data.append("no {} allowas-in {}".format(neigh_cxt, allow_as_in))
if prefix_lists:
for prefix_list in prefix_lists:
name = prefix_list.setdefault("name", {})
@@ -517,6 +529,17 @@ def __create_bgp_unicast_address_family(
cmd = "no {}".format(cmd)
config_data.append(cmd)
+ if allowas_in:
+ number_occurences = allowas_in.setdefault("number_occurences", {})
+ del_action = allowas_in.setdefault("delete", False)
+
+ cmd = "{} allowas-in {}".format(neigh_cxt, number_occurences)
+
+ if del_action:
+ cmd = "no {}".format(cmd)
+
+ config_data.append(cmd)
+
return config_data
@@ -616,6 +639,9 @@ def verify_bgp_convergence(tgen, topo):
logger.debug("Entering lib API: verify_bgp_convergence()")
for router, rnode in tgen.routers().iteritems():
+ if "bgp" not in topo["routers"][router]:
+ continue
+
logger.info("Verifying BGP Convergence on router %s", router)
show_bgp_json = run_frr_cmd(rnode, "show bgp summary json", isjson=True)
# Verifying output dictionary show_bgp_json is empty or not
@@ -929,7 +955,6 @@ def clear_bgp_and_verify(tgen, topo, router):
)
return errormsg
- logger.info(peer_uptime_before_clear_bgp)
# Clearing BGP
logger.info("Clearing BGP neighborship for router %s..", router)
for addr_type in bgp_addr_type.keys():
@@ -1008,7 +1033,7 @@ def clear_bgp_and_verify(tgen, topo, router):
" router {}".format(router)
)
return errormsg
- logger.info(peer_uptime_after_clear_bgp)
+
# Comparing peerUptimeEstablishedEpoch dictionaries
if peer_uptime_before_clear_bgp != peer_uptime_after_clear_bgp:
logger.info("BGP neighborship is reset after clear BGP on router %s", router)
@@ -1679,3 +1704,501 @@ def verify_best_path_as_per_admin_distance(
logger.info("Exiting lib API: verify_best_path_as_per_admin_distance()")
return True
+
+
+@retry(attempts=5, wait=2, return_is_str=True, initial_wait=2)
+def verify_bgp_rib(tgen, addr_type, dut, input_dict, next_hop=None, aspath=None):
+ """
+ This API is to verify whether bgp rib has any
+ matching route for a nexthop.
+
+ Parameters
+ ----------
+ * `tgen`: topogen object
+ * `dut`: input dut router name
+ * `addr_type` : ip type ipv4/ipv6
+ * `input_dict` : input dict, has details of static routes
+ * `next_hop`[optional]: next_hop which needs to be verified,
+ default = static
+ * 'aspath'[optional]: aspath which needs to be verified
+
+ Usage
+ -----
+ dut = 'r1'
+ next_hop = "192.168.1.10"
+ input_dict = topo['routers']
+ aspath = "100 200 300"
+ result = verify_bgp_rib(tgen, addr_type, dut, tgen, input_dict,
+ next_hop, aspath)
+
+ Returns
+ -------
+ errormsg(str) or True
+ """
+
+ logger.debug("Entering lib API: verify_bgp_rib()")
+
+ router_list = tgen.routers()
+ additional_nexthops_in_required_nhs = []
+ list1 = []
+ list2 = []
+ for routerInput in input_dict.keys():
+ for router, rnode in router_list.iteritems():
+ if router != dut:
+ continue
+
+ # Verifying RIB routes
+ command = "show bgp"
+
+ # Static routes
+ sleep(2)
+ logger.info("Checking router {} BGP RIB:".format(dut))
+
+ if "static_routes" in input_dict[routerInput]:
+ static_routes = input_dict[routerInput]["static_routes"]
+
+ for static_route in static_routes:
+ found_routes = []
+ missing_routes = []
+ st_found = False
+ nh_found = False
+ vrf = static_route.setdefault("vrf", None)
+ if vrf:
+ cmd = "{} vrf {} {}".format(command, vrf, addr_type)
+
+ else:
+ cmd = "{} {}".format(command, addr_type)
+
+ cmd = "{} json".format(cmd)
+
+ rib_routes_json = run_frr_cmd(rnode, cmd, isjson=True)
+
+ # Verifying output dictionary rib_routes_json is not empty
+ if bool(rib_routes_json) == False:
+ errormsg = "No route found in rib of router {}..".format(router)
+ return errormsg
+
+ network = static_route["network"]
+
+ if "no_of_ip" in static_route:
+ no_of_ip = static_route["no_of_ip"]
+ else:
+ no_of_ip = 1
+
+ # Generating IPs for verification
+ ip_list = generate_ips(network, no_of_ip)
+
+ for st_rt in ip_list:
+ st_rt = str(ipaddr.IPNetwork(unicode(st_rt)))
+
+ _addr_type = validate_ip_address(st_rt)
+ if _addr_type != addr_type:
+ continue
+
+ if st_rt in rib_routes_json["routes"]:
+ st_found = True
+ found_routes.append(st_rt)
+
+ if next_hop:
+ if not isinstance(next_hop, list):
+ next_hop = [next_hop]
+ list1 = next_hop
+ found_hops = [
+ rib_r["ip"]
+ for rib_r in rib_routes_json["routes"][st_rt][0][
+ "nexthops"
+ ]
+ ]
+ list2 = found_hops
+ missing_list_of_nexthops = set(list2).difference(list1)
+ additional_nexthops_in_required_nhs = set(
+ list1
+ ).difference(list2)
+
+ if list2:
+ if additional_nexthops_in_required_nhs:
+ logger.info(
+ "Missing nexthop %s for route"
+ " %s in RIB of router %s\n",
+ additional_nexthops_in_required_nhs,
+ st_rt,
+ dut,
+ )
+ errormsg = (
+ "Nexthop {} is Missing for "
+ "route {} in RIB of router {}\n".format(
+ additional_nexthops_in_required_nhs,
+ st_rt,
+ dut,
+ )
+ )
+ return errormsg
+ else:
+ nh_found = True
+ if aspath:
+ found_paths = rib_routes_json["routes"][st_rt][0][
+ "path"
+ ]
+ if aspath == found_paths:
+ aspath_found = True
+ logger.info(
+ "Found AS path {} for route"
+ " {} in RIB of router "
+ "{}\n".format(aspath, st_rt, dut)
+ )
+ else:
+ errormsg = (
+ "AS Path {} is missing for route"
+ "for route {} in RIB of router {}\n".format(
+ aspath, st_rt, dut
+ )
+ )
+ return errormsg
+
+ else:
+ missing_routes.append(st_rt)
+
+ if nh_found:
+ logger.info(
+ "Found next_hop {} for all bgp"
+ " routes in RIB of"
+ " router {}\n".format(next_hop, router)
+ )
+
+ if len(missing_routes) > 0:
+ errormsg = (
+ "Missing route in RIB of router {}, "
+ "routes: {}\n".format(dut, missing_routes)
+ )
+ return errormsg
+
+ if found_routes:
+ logger.info(
+ "Verified routes in router {} BGP RIB, "
+ "found routes are: {} \n".format(dut, found_routes)
+ )
+ continue
+
+ if "bgp" not in input_dict[routerInput]:
+ continue
+
+ # Advertise networks
+ bgp_data_list = input_dict[routerInput]["bgp"]
+
+ if type(bgp_data_list) is not list:
+ bgp_data_list = [bgp_data_list]
+
+ for bgp_data in bgp_data_list:
+ vrf_id = bgp_data.setdefault("vrf", None)
+ if vrf_id:
+ cmd = "{} vrf {} {}".format(command, vrf_id, addr_type)
+ else:
+ cmd = "{} {}".format(command, addr_type)
+
+ cmd = "{} json".format(cmd)
+
+ rib_routes_json = run_frr_cmd(rnode, cmd, isjson=True)
+
+ # Verifying output dictionary rib_routes_json is not empty
+ if bool(rib_routes_json) == False:
+ errormsg = "No route found in rib of router {}..".format(router)
+ return errormsg
+
+ bgp_net_advertise = bgp_data["address_family"][addr_type]["unicast"]
+ advertise_network = bgp_net_advertise.setdefault(
+ "advertise_networks", []
+ )
+
+ for advertise_network_dict in advertise_network:
+ found_routes = []
+ missing_routes = []
+ found = False
+
+ network = advertise_network_dict["network"]
+
+ if "no_of_network" in advertise_network_dict:
+ no_of_network = advertise_network_dict["no_of_network"]
+ else:
+ no_of_network = 1
+
+ # Generating IPs for verification
+ ip_list = generate_ips(network, no_of_network)
+
+ for st_rt in ip_list:
+ st_rt = str(ipaddr.IPNetwork(unicode(st_rt)))
+
+ _addr_type = validate_ip_address(st_rt)
+ if _addr_type != addr_type:
+ continue
+
+ if st_rt in rib_routes_json["routes"]:
+ found = True
+ found_routes.append(st_rt)
+ else:
+ found = False
+ missing_routes.append(st_rt)
+
+ if len(missing_routes) > 0:
+ errormsg = (
+ "Missing route in BGP RIB of router {},"
+ " are: {}\n".format(dut, missing_routes)
+ )
+ return errormsg
+
+ if found_routes:
+ logger.info(
+ "Verified routes in router {} BGP RIB, found "
+ "routes are: {}\n".format(dut, found_routes)
+ )
+
+ logger.debug("Exiting lib API: verify_bgp_rib()")
+ return True
+
+
+@retry(attempts=5, wait=2, return_is_str=True, initial_wait=2)
+def verify_bgp_rib(tgen, addr_type, dut, input_dict, next_hop=None, aspath=None):
+ """
+ This API is to verify whether bgp rib has any
+ matching route for a nexthop.
+
+ Parameters
+ ----------
+ * `tgen`: topogen object
+ * `dut`: input dut router name
+ * `addr_type` : ip type ipv4/ipv6
+ * `input_dict` : input dict, has details of static routes
+ * `next_hop`[optional]: next_hop which needs to be verified,
+ default = static
+ * 'aspath'[optional]: aspath which needs to be verified
+
+ Usage
+ -----
+ dut = 'r1'
+ next_hop = "192.168.1.10"
+ input_dict = topo['routers']
+ aspath = "100 200 300"
+ result = verify_bgp_rib(tgen, addr_type, dut, tgen, input_dict,
+ next_hop, aspath)
+
+ Returns
+ -------
+ errormsg(str) or True
+ """
+
+ logger.debug("Entering lib API: verify_bgp_rib()")
+
+ router_list = tgen.routers()
+ additional_nexthops_in_required_nhs = []
+ list1 = []
+ list2 = []
+ for routerInput in input_dict.keys():
+ for router, rnode in router_list.iteritems():
+ if router != dut:
+ continue
+
+ # Verifying RIB routes
+ command = "show bgp"
+
+ # Static routes
+ sleep(2)
+ logger.info("Checking router {} BGP RIB:".format(dut))
+
+ if "static_routes" in input_dict[routerInput]:
+ static_routes = input_dict[routerInput]["static_routes"]
+
+ for static_route in static_routes:
+ found_routes = []
+ missing_routes = []
+ st_found = False
+ nh_found = False
+ vrf = static_route.setdefault("vrf", None)
+ if vrf:
+ cmd = "{} vrf {} {}".format(command, vrf, addr_type)
+
+ else:
+ cmd = "{} {}".format(command, addr_type)
+
+ cmd = "{} json".format(cmd)
+
+ rib_routes_json = run_frr_cmd(rnode, cmd, isjson=True)
+
+ # Verifying output dictionary rib_routes_json is not empty
+ if bool(rib_routes_json) == False:
+ errormsg = "No route found in rib of router {}..".format(router)
+ return errormsg
+
+ network = static_route["network"]
+
+ if "no_of_ip" in static_route:
+ no_of_ip = static_route["no_of_ip"]
+ else:
+ no_of_ip = 1
+
+ # Generating IPs for verification
+ ip_list = generate_ips(network, no_of_ip)
+
+ for st_rt in ip_list:
+ st_rt = str(ipaddr.IPNetwork(unicode(st_rt)))
+
+ _addr_type = validate_ip_address(st_rt)
+ if _addr_type != addr_type:
+ continue
+
+ if st_rt in rib_routes_json["routes"]:
+ st_found = True
+ found_routes.append(st_rt)
+
+ if next_hop:
+ if not isinstance(next_hop, list):
+ next_hop = [next_hop]
+ list1 = next_hop
+ found_hops = [
+ rib_r["ip"]
+ for rib_r in rib_routes_json["routes"][st_rt][0][
+ "nexthops"
+ ]
+ ]
+ list2 = found_hops
+ missing_list_of_nexthops = set(list2).difference(list1)
+ additional_nexthops_in_required_nhs = set(
+ list1
+ ).difference(list2)
+
+ if list2:
+ if additional_nexthops_in_required_nhs:
+ logger.info(
+ "Missing nexthop %s for route"
+ " %s in RIB of router %s\n",
+ additional_nexthops_in_required_nhs,
+ st_rt,
+ dut,
+ )
+ errormsg = (
+ "Nexthop {} is Missing for "
+ "route {} in RIB of router {}\n".format(
+ additional_nexthops_in_required_nhs,
+ st_rt,
+ dut,
+ )
+ )
+ return errormsg
+ else:
+ nh_found = True
+ if aspath:
+ found_paths = rib_routes_json["routes"][st_rt][0][
+ "path"
+ ]
+ if aspath == found_paths:
+ aspath_found = True
+ logger.info(
+ "Found AS path {} for route"
+ " {} in RIB of router "
+ "{}\n".format(aspath, st_rt, dut)
+ )
+ else:
+ errormsg = (
+ "AS Path {} is missing for route"
+ "for route {} in RIB of router {}\n".format(
+ aspath, st_rt, dut
+ )
+ )
+ return errormsg
+
+ else:
+ missing_routes.append(st_rt)
+
+ if nh_found:
+ logger.info(
+ "Found next_hop {} for all bgp"
+ " routes in RIB of"
+ " router {}\n".format(next_hop, router)
+ )
+
+ if len(missing_routes) > 0:
+ errormsg = (
+ "Missing route in RIB of router {}, "
+ "routes: {}\n".format(dut, missing_routes)
+ )
+ return errormsg
+
+ if found_routes:
+ logger.info(
+ "Verified routes in router {} BGP RIB, "
+ "found routes are: {} \n".format(dut, found_routes)
+ )
+ continue
+
+ if "bgp" not in input_dict[routerInput]:
+ continue
+
+ # Advertise networks
+ bgp_data_list = input_dict[routerInput]["bgp"]
+
+ if type(bgp_data_list) is not list:
+ bgp_data_list = [bgp_data_list]
+
+ for bgp_data in bgp_data_list:
+ vrf_id = bgp_data.setdefault("vrf", None)
+ if vrf_id:
+ cmd = "{} vrf {} {}".format(command, vrf_id, addr_type)
+ else:
+ cmd = "{} {}".format(command, addr_type)
+
+ cmd = "{} json".format(cmd)
+
+ rib_routes_json = run_frr_cmd(rnode, cmd, isjson=True)
+
+ # Verifying output dictionary rib_routes_json is not empty
+ if bool(rib_routes_json) == False:
+ errormsg = "No route found in rib of router {}..".format(router)
+ return errormsg
+
+ bgp_net_advertise = bgp_data["address_family"][addr_type]["unicast"]
+ advertise_network = bgp_net_advertise.setdefault(
+ "advertise_networks", []
+ )
+
+ for advertise_network_dict in advertise_network:
+ found_routes = []
+ missing_routes = []
+ found = False
+
+ network = advertise_network_dict["network"]
+
+ if "no_of_network" in advertise_network_dict:
+ no_of_network = advertise_network_dict["no_of_network"]
+ else:
+ no_of_network = 1
+
+ # Generating IPs for verification
+ ip_list = generate_ips(network, no_of_network)
+
+ for st_rt in ip_list:
+ st_rt = str(ipaddr.IPNetwork(unicode(st_rt)))
+
+ _addr_type = validate_ip_address(st_rt)
+ if _addr_type != addr_type:
+ continue
+
+ if st_rt in rib_routes_json["routes"]:
+ found = True
+ found_routes.append(st_rt)
+ else:
+ found = False
+ missing_routes.append(st_rt)
+
+ if len(missing_routes) > 0:
+ errormsg = (
+ "Missing route in BGP RIB of router {},"
+ " are: {}\n".format(dut, missing_routes)
+ )
+ return errormsg
+
+ if found_routes:
+ logger.info(
+ "Verified routes in router {} BGP RIB, found "
+ "routes are: {}\n".format(dut, found_routes)
+ )
+
+ logger.debug("Exiting lib API: verify_bgp_rib()")
+ return True