summaryrefslogtreecommitdiff
path: root/tests/topotests/lib/common_config.py
diff options
context:
space:
mode:
Diffstat (limited to 'tests/topotests/lib/common_config.py')
-rw-r--r--tests/topotests/lib/common_config.py458
1 files changed, 423 insertions, 35 deletions
diff --git a/tests/topotests/lib/common_config.py b/tests/topotests/lib/common_config.py
index f2d33f94ae..ba8a4cb0f4 100644
--- a/tests/topotests/lib/common_config.py
+++ b/tests/topotests/lib/common_config.py
@@ -21,6 +21,7 @@
from collections import OrderedDict
from datetime import datetime
from time import sleep
+from copy import deepcopy
from subprocess import call
from subprocess import STDOUT as SUB_STDOUT
from subprocess import PIPE as SUB_PIPE
@@ -208,6 +209,7 @@ def create_common_configuration(tgen, router, data, config_type=None,
"interface_config": "! Interfaces Config\n",
"static_route": "! Static Route Config\n",
"prefix_list": "! Prefix List Config\n",
+ "bgp_community_list": "! Community List Config\n",
"route_maps": "! Route Maps Config\n",
"bgp": "! BGP Config\n"
})
@@ -547,13 +549,11 @@ def generate_ips(network, no_of_ips):
Returns list of IPs.
based on start_ip and no_of_ips
- * `network` : from here the ip will start generating, start_ip will be
- first ip
+ * `network` : from here the ip will start generating,
+ start_ip will be
* `no_of_ips` : these many IPs will be generated
-
- Limitation: It will generate IPs only for ip_mask 32
-
"""
+
ipaddress_list = []
if type(network) is not list:
network = [network]
@@ -893,6 +893,7 @@ def create_static_routes(tgen, input_dict, build=False):
"""
result = False
logger.debug("Entering lib API: create_static_routes()")
+ input_dict = deepcopy(input_dict)
try:
for router in input_dict.keys():
if "static_routes" not in input_dict[router]:
@@ -918,16 +919,21 @@ def create_static_routes(tgen, input_dict, build=False):
next_hop = static_route["next_hop"]
network = static_route["network"]
- ip_list = generate_ips([network], no_of_ip)
+ if type(network) is not list:
+ network = [network]
+
+ ip_list = generate_ips(network, no_of_ip)
for ip in ip_list:
addr_type = validate_ip_address(ip)
+
if addr_type == "ipv4":
cmd = "ip route {} {}".format(ip, next_hop)
else:
cmd = "ipv6 route {} {}".format(ip, next_hop)
if tag:
- cmd = "{} {}".format(cmd, str(tag))
+ cmd = "{} tag {}".format(cmd, str(tag))
+
if admin_distance:
cmd = "{} {}".format(cmd, admin_distance)
@@ -1112,11 +1118,11 @@ def create_route_maps(tgen, input_dict, build=False):
"prefix_list": "pf_list_1"
}
- "large-community-list": "{
+ "large-community-list": {
"id": "community_1",
"exact_match": True
}
- "community": {
+ "community_list": {
"id": "community_2",
"exact_match": True
}
@@ -1152,12 +1158,11 @@ def create_route_maps(tgen, input_dict, build=False):
result = False
logger.debug("Entering lib API: create_route_maps()")
-
+ input_dict = deepcopy(input_dict)
try:
for router in input_dict.keys():
if "route_maps" not in input_dict[router]:
- errormsg = "route_maps not present in input_dict"
- logger.debug(errormsg)
+ logger.debug("route_maps not present in input_dict")
continue
rmap_data = []
for rmap_name, rmap_value in \
@@ -1187,10 +1192,41 @@ def create_route_maps(tgen, input_dict, build=False):
rmap_name, rmap_action, seq_id
))
+ if "continue" in rmap_dict:
+ continue_to = rmap_dict["continue"]
+ if continue_to:
+ rmap_data.append("on-match goto {}".
+ format(continue_to))
+ else:
+ logger.error("In continue, 'route-map entry "
+ "sequence number' is not provided")
+ return False
+
+ if "goto" in rmap_dict:
+ go_to = rmap_dict["goto"]
+ if go_to:
+ rmap_data.append("on-match goto {}".
+ format(go_to))
+ else:
+ logger.error("In goto, 'Goto Clause number' is not"
+ " provided")
+ return False
+
+ if "call" in rmap_dict:
+ call_rmap = rmap_dict["call"]
+ if call_rmap:
+ rmap_data.append("call {}".
+ format(call_rmap))
+ else:
+ logger.error("In call, 'destination Route-Map' is"
+ " not provided")
+ return False
+
# Verifying if SET criteria is defined
if "set" in rmap_dict:
set_data = rmap_dict["set"]
-
+ ipv4_data = set_data.setdefault("ipv4", {})
+ ipv6_data = set_data.setdefault("ipv6", {})
local_preference = set_data.setdefault("localpref",
None)
metric = set_data.setdefault("med", None)
@@ -1199,7 +1235,10 @@ def create_route_maps(tgen, input_dict, build=False):
community = set_data.setdefault("community", {})
large_community = set_data.setdefault(
"large_community", {})
+ large_comm_list = set_data.setdefault(
+ "large_comm_list", {})
set_action = set_data.setdefault("set_action", None)
+ nexthop = set_data.setdefault("nexthop", None)
# Local Preference
if local_preference:
@@ -1243,42 +1282,84 @@ def create_route_maps(tgen, input_dict, build=False):
rmap_data.append(cmd)
else:
- logger.errror("In large_community, AS Num not"
- " provided")
+ logger.error("In large_community, AS Num not"
+ " provided")
+ return False
+ if large_comm_list:
+ id = large_comm_list.setdefault("id", None)
+ del_comm = large_comm_list.setdefault("delete",
+ None)
+ if id:
+ cmd = "set large-comm-list {}".format(id)
+ if del_comm:
+ cmd = "{} delete".format(cmd)
+
+ rmap_data.append(cmd)
+ else:
+ logger.error("In large_comm_list 'id' not"
+ " provided")
return False
# Weight
if weight:
- rmap_data.append("set weight {}".format(
+ rmap_data.append("set weight {} \n".format(
weight))
-
+ if ipv6_data:
+ nexthop = ipv6_data.setdefault("nexthop",None)
+ if nexthop:
+ rmap_data.append("set ipv6 next-hop \
+ {}".format(nexthop))
# Adding MATCH and SET sequence to RMAP if defined
if "match" in rmap_dict:
match_data = rmap_dict["match"]
ipv4_data = match_data.setdefault("ipv4", {})
ipv6_data = match_data.setdefault("ipv6", {})
- community = match_data.setdefault("community-list",
- {})
+ community = match_data.setdefault(
+ "community_list",{})
large_community = match_data.setdefault(
- "large-community-list", {}
+ "large_community", {}
+ )
+ large_community_list = match_data.setdefault(
+ "large_community_list", {}
)
- tag = match_data.setdefault("tag", None)
if ipv4_data:
- prefix_name = ipv4_data.setdefault("prefix_lists",
- None)
+ # fetch prefix list data from rmap
+ prefix_name = \
+ ipv4_data.setdefault("prefix_lists",
+ None)
if prefix_name:
- rmap_data.append("match ip address prefix-list"
- " {}".format(prefix_name))
+ rmap_data.append("match ip address"
+ " prefix-list {}".format(prefix_name))
+
+ # fetch tag data from rmap
+ tag = ipv4_data.setdefault("tag", None)
+ if tag:
+ rmap_data.append("match tag {}".format(tag))
+
+ # fetch large community data from rmap
+ large_community_list = ipv4_data.setdefault(
+ "large_community_list",{})
+ large_community = match_data.setdefault(
+ "large_community", {})
+
if ipv6_data:
prefix_name = ipv6_data.setdefault("prefix_lists",
None)
if prefix_name:
- rmap_data.append("match ipv6 address "
- "prefix-list {}".
- format(prefix_name))
- if tag:
- rmap_data.append("match tag {}".format(tag))
+ rmap_data.append("match ipv6 address"
+ " prefix-list {}".format(prefix_name))
+
+ # fetch tag data from rmap
+ tag = ipv6_data.setdefault("tag", None)
+ if tag:
+ rmap_data.append("match tag {}".format(tag))
+
+ # fetch large community data from rmap
+ large_community_list = ipv6_data.setdefault(
+ "large_community_list",{})
+ large_community = match_data.setdefault(
+ "large_community", {})
if community:
if "id" not in community:
@@ -1293,10 +1374,9 @@ def create_route_maps(tgen, input_dict, build=False):
cmd = "{} exact-match".format(cmd)
rmap_data.append(cmd)
-
if large_community:
if "id" not in large_community:
- logger.error("'num' is mandatory for "
+ logger.error("'id' is mandatory for "
"large-community-list in match "
"criteria")
return False
@@ -1306,7 +1386,19 @@ def create_route_maps(tgen, input_dict, build=False):
"exact_match", False)
if exact_match:
cmd = "{} exact-match".format(cmd)
-
+ rmap_data.append(cmd)
+ if large_community_list:
+ if "id" not in large_community_list:
+ logger.error("'id' is mandatory for "
+ "large-community-list in match "
+ "criteria")
+ return False
+ cmd = "match large-community {}".format(
+ large_community_list["id"])
+ exact_match = large_community_list.setdefault(
+ "exact_match", False)
+ if exact_match:
+ cmd = "{} exact-match".format(cmd)
rmap_data.append(cmd)
result = create_common_configuration(tgen, router,
@@ -1320,10 +1412,178 @@ def create_route_maps(tgen, input_dict, build=False):
logger.error(errormsg)
return errormsg
- logger.debug("Exiting lib API: create_prefix_lists()")
+ logger.debug("Exiting lib API: create_route_maps()")
+ return result
+
+
+def delete_route_maps(tgen, input_dict):
+ """
+ Delete ip route maps from device
+
+ * `tgen` : Topogen object
+ * `input_dict` : for which router,
+ route map has to be deleted
+
+ Usage
+ -----
+ # Delete route-map rmap_1 and rmap_2 from router r1
+ input_dict = {
+ "r1": {
+ "route_maps": ["rmap_1", "rmap__2"]
+ }
+ }
+ result = delete_route_maps("ipv4", input_dict)
+
+ Returns
+ -------
+ errormsg(str) or True
+ """
+ logger.info("Entering lib API: delete_route_maps()")
+
+ for router in input_dict.keys():
+ route_maps = input_dict[router]["route_maps"][:]
+ rmap_data = input_dict[router]
+ rmap_data["route_maps"] = {}
+ for route_map_name in route_maps:
+ rmap_data["route_maps"].update({
+ route_map_name:
+ [{
+ "delete": True
+ }]
+ })
+
+ return create_route_maps(tgen, input_dict)
+
+
+def create_bgp_community_lists(tgen, input_dict, build=False):
+ """
+ Create bgp community-list or large-community-list on the devices as per
+ the arguments passed. Takes list of communities in input.
+
+ Parameters
+ ----------
+ * `tgen` : Topogen object
+ * `input_dict` : Input dict data, required when configuring from testcase
+ * `build` : Only for initial setup phase this is set as True.
+ Usage
+ -----
+ input_dict_1 = {
+ "r3": {
+ "bgp_community_lists": [
+ {
+ "community_type": "standard",
+ "action": "permit",
+ "name": "rmap_lcomm_{}".format(addr_type),
+ "value": "1:1:1 1:2:3 2:1:1 2:2:2",
+ "large": True
+ }
+ ]
+ }
+ }
+ }
+ result = create_bgp_community_lists(tgen, input_dict_1)
+ """
+
+ result = False
+ logger.debug("Entering lib API: create_bgp_community_lists()")
+ input_dict = deepcopy(input_dict)
+ try:
+ for router in input_dict.keys():
+ if "bgp_community_lists" not in input_dict[router]:
+ errormsg = "bgp_community_lists not present in input_dict"
+ logger.debug(errormsg)
+ continue
+
+ config_data = []
+
+ community_list = input_dict[router]["bgp_community_lists"]
+ for community_dict in community_list:
+ del_action = community_dict.setdefault("delete", False)
+ community_type = community_dict.setdefault("community_type",
+ None)
+ action = community_dict.setdefault("action", None)
+ value = community_dict.setdefault("value", '')
+ large = community_dict.setdefault("large", None)
+ name = community_dict.setdefault("name", None)
+ if large:
+ cmd = "bgp large-community-list"
+ else:
+ cmd = "bgp community-list"
+
+ if not large and not (community_type and action and value):
+ errormsg = "community_type, action and value are " \
+ "required in bgp_community_list"
+ logger.error(errormsg)
+ return False
+
+ try:
+ community_type = int(community_type)
+ cmd = "{} {} {} {}".format(cmd, community_type, action,
+ value)
+ except ValueError:
+
+ cmd = "{} {} {} {} {}".format(
+ cmd, community_type, name, action, value)
+
+ if del_action:
+ cmd = "no {}".format(cmd)
+
+ config_data.append(cmd)
+
+ result = create_common_configuration(tgen, router, config_data,
+ "bgp_community_list",
+ build=build)
+
+ except InvalidCLIError:
+ # Traceback
+ errormsg = traceback.format_exc()
+ logger.error(errormsg)
+ return errormsg
+
+ logger.debug("Exiting lib API: create_bgp_community_lists()")
return result
+def shutdown_bringup_interface(tgen, dut, intf_name, ifaceaction=False):
+ """
+ Shutdown or bringup router's interface "
+
+ * `tgen` : Topogen object
+ * `dut` : Device under test
+ * `intf_name` : Interface name to be shut/no shut
+ * `ifaceaction` : Action, to shut/no shut interface,
+ by default is False
+
+ Usage
+ -----
+ dut = "r3"
+ intf = "r3-r1-eth0"
+ # Shut down ineterface
+ shutdown_bringup_interface(tgen, dut, intf, False)
+
+ # Bring up ineterface
+ shutdown_bringup_interface(tgen, dut, intf, True)
+
+ Returns
+ -------
+ errormsg(str) or True
+ """
+
+ from topotest import interface_set_status
+ router_list = tgen.routers()
+ if ifaceaction:
+ logger.info("Bringing up interface : {}".format(intf_name))
+ else:
+ logger.info("Shutting down interface : {}".format(intf_name))
+
+ interface_set_status(router_list[dut], intf_name,
+ ifaceaction)
+
+ if ifaceaction:
+ # Disabling v6 link local once interfac comes up back
+ disable_v6_link_local(tgen, dut, intf_name=intf_name)
+
+
#############################################
# Verification APIs
#############################################
@@ -1625,5 +1885,133 @@ def verify_prefix_lists(tgen, input_dict):
logger.info("Prefix list %s is/are not present in the router"
" from router %s", prefix_list, router)
- logger.debug("Exiting lib API: verify_prefix_lissts()")
+ logger.debug("Exiting lib API: verify_prefix_lists()")
+ return True
+
+
+@retry(attempts=2, wait=4, return_is_str=True, initial_wait=2)
+def verify_route_maps(tgen, input_dict):
+ """
+ Running "show route-map" command and verifying given route-map
+ is present in router.
+ Parameters
+ ----------
+ * `tgen` : topogen object
+ * `input_dict`: data to verify prefix lists
+ Usage
+ -----
+ # To verify rmap_1 and rmap_2 are present in router r1
+ input_dict = {
+ "r1": {
+ "route_maps": ["rmap_1", "rmap_2"]
+ }
+ }
+ result = verify_route_maps(tgen, input_dict)
+ Returns
+ -------
+ errormsg(str) or True
+ """
+
+ logger.debug("Entering lib API: verify_route_maps()")
+
+ for router in input_dict.keys():
+ if router not in tgen.routers():
+ continue
+
+ rnode = tgen.routers()[router]
+ # Show ip route-map
+ show_route_maps = rnode.vtysh_cmd("show route-map")
+
+ # Verify route-map is deleted
+ route_maps = input_dict[router]["route_maps"]
+ for route_map in route_maps:
+ if route_map in show_route_maps:
+ errormsg = ("Route map {} is not deleted from router"
+ " {}".format(route_map, router))
+ return errormsg
+
+ logger.info("Route map %s is/are deleted successfully from"
+ " router %s", route_maps, router)
+
+ logger.debug("Exiting lib API: verify_route_maps()")
+ return True
+
+
+@retry(attempts=3, wait=4, return_is_str=True)
+def verify_bgp_community(tgen, addr_type, router, network, input_dict=None):
+ """
+ API to veiryf BGP large community is attached in route for any given
+ DUT by running "show bgp ipv4/6 {route address} json" command.
+
+ Parameters
+ ----------
+ * `tgen`: topogen object
+ * `addr_type` : ip type, ipv4/ipv6
+ * `dut`: Device Under Test
+ * `network`: network for which set criteria needs to be verified
+ * `input_dict`: having details like - for which router, community and
+ values needs to be verified
+ Usage
+ -----
+ networks = ["200.50.2.0/32"]
+ input_dict = {
+ "largeCommunity": "2:1:1 2:2:2 2:3:3 2:4:4 2:5:5"
+ }
+ result = verify_bgp_community(tgen, "ipv4", dut, network, input_dict=None)
+
+ Returns
+ -------
+ errormsg(str) or True
+ """
+
+ logger.info("Entering lib API: verify_bgp_community()")
+ if router not in tgen.routers():
+ return False
+
+ rnode = tgen.routers()[router]
+
+ logger.debug("Verifying BGP community attributes on dut %s: for %s "
+ "network %s", router, addr_type, network)
+
+ for net in network:
+ cmd = "show bgp {} {} json".format(addr_type, net)
+ show_bgp_json = rnode.vtysh_cmd(cmd, isjson=True)
+ logger.info(show_bgp_json)
+ if "paths" not in show_bgp_json:
+ return "Prefix {} not found in BGP table of router: {}". \
+ format(net, router)
+
+ as_paths = show_bgp_json["paths"]
+ found = False
+ for i in range(len(as_paths)):
+ if "largeCommunity" in show_bgp_json["paths"][i] or \
+ "community" in show_bgp_json["paths"][i]:
+ found = True
+ logger.info("Large Community attribute is found for route:"
+ " %s in router: %s", net, router)
+ if input_dict is not None:
+ for criteria, comm_val in input_dict.items():
+ show_val = show_bgp_json["paths"][i][criteria][
+ "string"]
+ if comm_val == show_val:
+ logger.info("Verifying BGP %s for prefix: %s"
+ " in router: %s, found expected"
+ " value: %s", criteria, net, router,
+ comm_val)
+ else:
+ errormsg = "Failed: Verifying BGP attribute" \
+ " {} for route: {} in router: {}" \
+ ", expected value: {} but found" \
+ ": {}".format(
+ criteria, net, router, comm_val,
+ show_val)
+ return errormsg
+
+ if not found:
+ errormsg = (
+ "Large Community attribute is not found for route: "
+ "{} in router: {} ".format(net, router))
+ return errormsg
+
+ logger.debug("Exiting lib API: verify_bgp_community()")
return True