summaryrefslogtreecommitdiff
path: root/tests/topotests/lib/common_config.py
diff options
context:
space:
mode:
Diffstat (limited to 'tests/topotests/lib/common_config.py')
-rw-r--r--tests/topotests/lib/common_config.py512
1 files changed, 417 insertions, 95 deletions
diff --git a/tests/topotests/lib/common_config.py b/tests/topotests/lib/common_config.py
index f2d33f94ae..c413bf45c7 100644
--- a/tests/topotests/lib/common_config.py
+++ b/tests/topotests/lib/common_config.py
@@ -21,6 +21,7 @@
from collections import OrderedDict
from datetime import datetime
from time import sleep
+from copy import deepcopy
from subprocess import call
from subprocess import STDOUT as SUB_STDOUT
from subprocess import PIPE as SUB_PIPE
@@ -34,11 +35,7 @@ import ConfigParser
import traceback
import socket
import ipaddr
-import re
-from lib import topotest
-
-from functools import partial
from lib.topolog import logger, logger_config
from lib.topogen import TopoRouter
from lib.topotest import interface_set_status
@@ -208,6 +205,7 @@ def create_common_configuration(tgen, router, data, config_type=None,
"interface_config": "! Interfaces Config\n",
"static_route": "! Static Route Config\n",
"prefix_list": "! Prefix List Config\n",
+ "bgp_community_list": "! Community List Config\n",
"route_maps": "! Route Maps Config\n",
"bgp": "! BGP Config\n"
})
@@ -547,13 +545,11 @@ def generate_ips(network, no_of_ips):
Returns list of IPs.
based on start_ip and no_of_ips
- * `network` : from here the ip will start generating, start_ip will be
- first ip
+ * `network` : from here the ip will start generating,
+ start_ip will be
* `no_of_ips` : these many IPs will be generated
-
- Limitation: It will generate IPs only for ip_mask 32
-
"""
+
ipaddress_list = []
if type(network) is not list:
network = [network]
@@ -736,62 +732,6 @@ def retry(attempts=3, wait=2, return_is_str=True, initial_wait=0):
return _retry
-def disable_v6_link_local(tgen, router, intf_name=None):
- """
- Disables ipv6 link local addresses for a particular interface or
- all interfaces
-
- * `tgen`: tgen onject
- * `router` : router for which hightest interface should be
- calculated
- * `intf_name` : Interface name for which v6 link local needs to
- be disabled
- """
-
- router_list = tgen.routers()
- for rname, rnode in router_list.iteritems():
- if rname != router:
- continue
-
- linklocal = []
-
- ifaces = router_list[router].run('ip -6 address')
-
- # Fix newlines (make them all the same)
- ifaces = ('\n'.join(ifaces.splitlines()) + '\n').splitlines()
-
- interface = None
- ll_per_if_count = 0
- for line in ifaces:
- # Interface name
- m = re.search('[0-9]+: ([^:]+)[@if0-9:]+ <', line)
- if m:
- interface = m.group(1).split("@")[0]
- ll_per_if_count = 0
-
- # Interface ip
- m = re.search('inet6 (fe80::[0-9a-f]+:[0-9a-f]+:[0-9a-f]+'
- ':[0-9a-f]+[/0-9]*) scope link', line)
- if m:
- local = m.group(1)
- ll_per_if_count += 1
- if ll_per_if_count > 1:
- linklocal += [["%s-%s" % (interface, ll_per_if_count), local]]
- else:
- linklocal += [[interface, local]]
-
- if len(linklocal[0]) > 1:
- link_local_dict = {item[0]: item[1] for item in linklocal}
-
- for lname, laddr in link_local_dict.items():
-
- if intf_name is not None and lname != intf_name:
- continue
-
- cmd = "ip addr del {} dev {}".format(laddr, lname)
- router_list[router].run(cmd)
-
-
#############################################
# These APIs, will used by testcase
#############################################
@@ -821,8 +761,6 @@ def create_interfaces_cfg(tgen, topo, build=False):
interface_name = destRouterLink
else:
interface_name = data["interface"]
- if "ipv6" in data:
- disable_v6_link_local(tgen, c_router, interface_name)
interface_data.append("interface {}".format(
str(interface_name)
))
@@ -893,6 +831,7 @@ def create_static_routes(tgen, input_dict, build=False):
"""
result = False
logger.debug("Entering lib API: create_static_routes()")
+ input_dict = deepcopy(input_dict)
try:
for router in input_dict.keys():
if "static_routes" not in input_dict[router]:
@@ -918,16 +857,21 @@ def create_static_routes(tgen, input_dict, build=False):
next_hop = static_route["next_hop"]
network = static_route["network"]
- ip_list = generate_ips([network], no_of_ip)
+ if type(network) is not list:
+ network = [network]
+
+ ip_list = generate_ips(network, no_of_ip)
for ip in ip_list:
addr_type = validate_ip_address(ip)
+
if addr_type == "ipv4":
cmd = "ip route {} {}".format(ip, next_hop)
else:
cmd = "ipv6 route {} {}".format(ip, next_hop)
if tag:
- cmd = "{} {}".format(cmd, str(tag))
+ cmd = "{} tag {}".format(cmd, str(tag))
+
if admin_distance:
cmd = "{} {}".format(cmd, admin_distance)
@@ -1112,11 +1056,11 @@ def create_route_maps(tgen, input_dict, build=False):
"prefix_list": "pf_list_1"
}
- "large-community-list": "{
+ "large-community-list": {
"id": "community_1",
"exact_match": True
}
- "community": {
+ "community_list": {
"id": "community_2",
"exact_match": True
}
@@ -1152,12 +1096,11 @@ def create_route_maps(tgen, input_dict, build=False):
result = False
logger.debug("Entering lib API: create_route_maps()")
-
+ input_dict = deepcopy(input_dict)
try:
for router in input_dict.keys():
if "route_maps" not in input_dict[router]:
- errormsg = "route_maps not present in input_dict"
- logger.debug(errormsg)
+ logger.debug("route_maps not present in input_dict")
continue
rmap_data = []
for rmap_name, rmap_value in \
@@ -1187,10 +1130,41 @@ def create_route_maps(tgen, input_dict, build=False):
rmap_name, rmap_action, seq_id
))
+ if "continue" in rmap_dict:
+ continue_to = rmap_dict["continue"]
+ if continue_to:
+ rmap_data.append("on-match goto {}".
+ format(continue_to))
+ else:
+ logger.error("In continue, 'route-map entry "
+ "sequence number' is not provided")
+ return False
+
+ if "goto" in rmap_dict:
+ go_to = rmap_dict["goto"]
+ if go_to:
+ rmap_data.append("on-match goto {}".
+ format(go_to))
+ else:
+ logger.error("In goto, 'Goto Clause number' is not"
+ " provided")
+ return False
+
+ if "call" in rmap_dict:
+ call_rmap = rmap_dict["call"]
+ if call_rmap:
+ rmap_data.append("call {}".
+ format(call_rmap))
+ else:
+ logger.error("In call, 'destination Route-Map' is"
+ " not provided")
+ return False
+
# Verifying if SET criteria is defined
if "set" in rmap_dict:
set_data = rmap_dict["set"]
-
+ ipv4_data = set_data.setdefault("ipv4", {})
+ ipv6_data = set_data.setdefault("ipv6", {})
local_preference = set_data.setdefault("localpref",
None)
metric = set_data.setdefault("med", None)
@@ -1199,7 +1173,10 @@ def create_route_maps(tgen, input_dict, build=False):
community = set_data.setdefault("community", {})
large_community = set_data.setdefault(
"large_community", {})
+ large_comm_list = set_data.setdefault(
+ "large_comm_list", {})
set_action = set_data.setdefault("set_action", None)
+ nexthop = set_data.setdefault("nexthop", None)
# Local Preference
if local_preference:
@@ -1243,42 +1220,86 @@ def create_route_maps(tgen, input_dict, build=False):
rmap_data.append(cmd)
else:
- logger.errror("In large_community, AS Num not"
- " provided")
+ logger.error("In large_community, AS Num not"
+ " provided")
+ return False
+ if large_comm_list:
+ id = large_comm_list.setdefault("id", None)
+ del_comm = large_comm_list.setdefault("delete",
+ None)
+ if id:
+ cmd = "set large-comm-list {}".format(id)
+ if del_comm:
+ cmd = "{} delete".format(cmd)
+
+ rmap_data.append(cmd)
+ else:
+ logger.error("In large_comm_list 'id' not"
+ " provided")
return False
# Weight
if weight:
rmap_data.append("set weight {}".format(
weight))
+ if ipv6_data:
+ nexthop = ipv6_data.setdefault("nexthop", None)
+ if nexthop:
+ rmap_data.append("set ipv6 next-hop {}".format(
+ nexthop
+ ))
# Adding MATCH and SET sequence to RMAP if defined
if "match" in rmap_dict:
match_data = rmap_dict["match"]
ipv4_data = match_data.setdefault("ipv4", {})
ipv6_data = match_data.setdefault("ipv6", {})
- community = match_data.setdefault("community-list",
- {})
+ community = match_data.setdefault(
+ "community_list",{})
large_community = match_data.setdefault(
- "large-community-list", {}
+ "large_community", {}
+ )
+ large_community_list = match_data.setdefault(
+ "large_community_list", {}
)
- tag = match_data.setdefault("tag", None)
if ipv4_data:
- prefix_name = ipv4_data.setdefault("prefix_lists",
- None)
+ # fetch prefix list data from rmap
+ prefix_name = \
+ ipv4_data.setdefault("prefix_lists",
+ None)
if prefix_name:
- rmap_data.append("match ip address prefix-list"
- " {}".format(prefix_name))
+ rmap_data.append("match ip address"
+ " prefix-list {}".format(prefix_name))
+
+ # fetch tag data from rmap
+ tag = ipv4_data.setdefault("tag", None)
+ if tag:
+ rmap_data.append("match tag {}".format(tag))
+
+ # fetch large community data from rmap
+ large_community_list = ipv4_data.setdefault(
+ "large_community_list",{})
+ large_community = match_data.setdefault(
+ "large_community", {})
+
if ipv6_data:
prefix_name = ipv6_data.setdefault("prefix_lists",
None)
if prefix_name:
- rmap_data.append("match ipv6 address "
- "prefix-list {}".
- format(prefix_name))
- if tag:
- rmap_data.append("match tag {}".format(tag))
+ rmap_data.append("match ipv6 address"
+ " prefix-list {}".format(prefix_name))
+
+ # fetch tag data from rmap
+ tag = ipv6_data.setdefault("tag", None)
+ if tag:
+ rmap_data.append("match tag {}".format(tag))
+
+ # fetch large community data from rmap
+ large_community_list = ipv6_data.setdefault(
+ "large_community_list",{})
+ large_community = match_data.setdefault(
+ "large_community", {})
if community:
if "id" not in community:
@@ -1293,10 +1314,9 @@ def create_route_maps(tgen, input_dict, build=False):
cmd = "{} exact-match".format(cmd)
rmap_data.append(cmd)
-
if large_community:
if "id" not in large_community:
- logger.error("'num' is mandatory for "
+ logger.error("'id' is mandatory for "
"large-community-list in match "
"criteria")
return False
@@ -1306,7 +1326,19 @@ def create_route_maps(tgen, input_dict, build=False):
"exact_match", False)
if exact_match:
cmd = "{} exact-match".format(cmd)
-
+ rmap_data.append(cmd)
+ if large_community_list:
+ if "id" not in large_community_list:
+ logger.error("'id' is mandatory for "
+ "large-community-list in match "
+ "criteria")
+ return False
+ cmd = "match large-community {}".format(
+ large_community_list["id"])
+ exact_match = large_community_list.setdefault(
+ "exact_match", False)
+ if exact_match:
+ cmd = "{} exact-match".format(cmd)
rmap_data.append(cmd)
result = create_common_configuration(tgen, router,
@@ -1320,10 +1352,172 @@ def create_route_maps(tgen, input_dict, build=False):
logger.error(errormsg)
return errormsg
- logger.debug("Exiting lib API: create_prefix_lists()")
+ logger.debug("Exiting lib API: create_route_maps()")
return result
+def delete_route_maps(tgen, input_dict):
+ """
+ Delete ip route maps from device
+
+ * `tgen` : Topogen object
+ * `input_dict` : for which router,
+ route map has to be deleted
+
+ Usage
+ -----
+ # Delete route-map rmap_1 and rmap_2 from router r1
+ input_dict = {
+ "r1": {
+ "route_maps": ["rmap_1", "rmap__2"]
+ }
+ }
+ result = delete_route_maps("ipv4", input_dict)
+
+ Returns
+ -------
+ errormsg(str) or True
+ """
+ logger.info("Entering lib API: delete_route_maps()")
+
+ for router in input_dict.keys():
+ route_maps = input_dict[router]["route_maps"][:]
+ rmap_data = input_dict[router]
+ rmap_data["route_maps"] = {}
+ for route_map_name in route_maps:
+ rmap_data["route_maps"].update({
+ route_map_name:
+ [{
+ "delete": True
+ }]
+ })
+
+ return create_route_maps(tgen, input_dict)
+
+
+def create_bgp_community_lists(tgen, input_dict, build=False):
+ """
+ Create bgp community-list or large-community-list on the devices as per
+ the arguments passed. Takes list of communities in input.
+
+ Parameters
+ ----------
+ * `tgen` : Topogen object
+ * `input_dict` : Input dict data, required when configuring from testcase
+ * `build` : Only for initial setup phase this is set as True.
+ Usage
+ -----
+ input_dict_1 = {
+ "r3": {
+ "bgp_community_lists": [
+ {
+ "community_type": "standard",
+ "action": "permit",
+ "name": "rmap_lcomm_{}".format(addr_type),
+ "value": "1:1:1 1:2:3 2:1:1 2:2:2",
+ "large": True
+ }
+ ]
+ }
+ }
+ }
+ result = create_bgp_community_lists(tgen, input_dict_1)
+ """
+
+ result = False
+ logger.debug("Entering lib API: create_bgp_community_lists()")
+ input_dict = deepcopy(input_dict)
+ try:
+ for router in input_dict.keys():
+ if "bgp_community_lists" not in input_dict[router]:
+ errormsg = "bgp_community_lists not present in input_dict"
+ logger.debug(errormsg)
+ continue
+
+ config_data = []
+
+ community_list = input_dict[router]["bgp_community_lists"]
+ for community_dict in community_list:
+ del_action = community_dict.setdefault("delete", False)
+ community_type = community_dict.setdefault("community_type",
+ None)
+ action = community_dict.setdefault("action", None)
+ value = community_dict.setdefault("value", '')
+ large = community_dict.setdefault("large", None)
+ name = community_dict.setdefault("name", None)
+ if large:
+ cmd = "bgp large-community-list"
+ else:
+ cmd = "bgp community-list"
+
+ if not large and not (community_type and action and value):
+ errormsg = "community_type, action and value are " \
+ "required in bgp_community_list"
+ logger.error(errormsg)
+ return False
+
+ try:
+ community_type = int(community_type)
+ cmd = "{} {} {} {}".format(cmd, community_type, action,
+ value)
+ except ValueError:
+
+ cmd = "{} {} {} {} {}".format(
+ cmd, community_type, name, action, value)
+
+ if del_action:
+ cmd = "no {}".format(cmd)
+
+ config_data.append(cmd)
+
+ result = create_common_configuration(tgen, router, config_data,
+ "bgp_community_list",
+ build=build)
+
+ except InvalidCLIError:
+ # Traceback
+ errormsg = traceback.format_exc()
+ logger.error(errormsg)
+ return errormsg
+
+ logger.debug("Exiting lib API: create_bgp_community_lists()")
+ return result
+
+
+def shutdown_bringup_interface(tgen, dut, intf_name, ifaceaction=False):
+ """
+ Shutdown or bringup router's interface "
+
+ * `tgen` : Topogen object
+ * `dut` : Device under test
+ * `intf_name` : Interface name to be shut/no shut
+ * `ifaceaction` : Action, to shut/no shut interface,
+ by default is False
+
+ Usage
+ -----
+ dut = "r3"
+ intf = "r3-r1-eth0"
+ # Shut down ineterface
+ shutdown_bringup_interface(tgen, dut, intf, False)
+
+ # Bring up ineterface
+ shutdown_bringup_interface(tgen, dut, intf, True)
+
+ Returns
+ -------
+ errormsg(str) or True
+ """
+
+ router_list = tgen.routers()
+ if ifaceaction:
+ logger.info("Bringing up interface : {}".format(intf_name))
+ else:
+ logger.info("Shutting down interface : {}".format(intf_name))
+
+ interface_set_status(router_list[dut], intf_name, ifaceaction)
+
+
#############################################
# Verification APIs
#############################################
@@ -1625,5 +1819,133 @@ def verify_prefix_lists(tgen, input_dict):
logger.info("Prefix list %s is/are not present in the router"
" from router %s", prefix_list, router)
- logger.debug("Exiting lib API: verify_prefix_lissts()")
+ logger.debug("Exiting lib API: verify_prefix_lists()")
+ return True
+
+
+@retry(attempts=2, wait=4, return_is_str=True, initial_wait=2)
+def verify_route_maps(tgen, input_dict):
+ """
+ Running "show route-map" command and verifying given route-map
+ is present in router.
+ Parameters
+ ----------
+ * `tgen` : topogen object
+ * `input_dict`: data to verify prefix lists
+ Usage
+ -----
+ # To verify rmap_1 and rmap_2 are present in router r1
+ input_dict = {
+ "r1": {
+ "route_maps": ["rmap_1", "rmap_2"]
+ }
+ }
+ result = verify_route_maps(tgen, input_dict)
+ Returns
+ -------
+ errormsg(str) or True
+ """
+
+ logger.debug("Entering lib API: verify_route_maps()")
+
+ for router in input_dict.keys():
+ if router not in tgen.routers():
+ continue
+
+ rnode = tgen.routers()[router]
+ # Show ip route-map
+ show_route_maps = rnode.vtysh_cmd("show route-map")
+
+ # Verify route-map is deleted
+ route_maps = input_dict[router]["route_maps"]
+ for route_map in route_maps:
+ if route_map in show_route_maps:
+ errormsg = ("Route map {} is not deleted from router"
+ " {}".format(route_map, router))
+ return errormsg
+
+ logger.info("Route map %s is/are deleted successfully from"
+ " router %s", route_maps, router)
+
+ logger.debug("Exiting lib API: verify_route_maps()")
+ return True
+
+
+@retry(attempts=3, wait=4, return_is_str=True)
+def verify_bgp_community(tgen, addr_type, router, network, input_dict=None):
+ """
+ API to veiryf BGP large community is attached in route for any given
+ DUT by running "show bgp ipv4/6 {route address} json" command.
+
+ Parameters
+ ----------
+ * `tgen`: topogen object
+ * `addr_type` : ip type, ipv4/ipv6
+ * `dut`: Device Under Test
+ * `network`: network for which set criteria needs to be verified
+ * `input_dict`: having details like - for which router, community and
+ values needs to be verified
+ Usage
+ -----
+ networks = ["200.50.2.0/32"]
+ input_dict = {
+ "largeCommunity": "2:1:1 2:2:2 2:3:3 2:4:4 2:5:5"
+ }
+ result = verify_bgp_community(tgen, "ipv4", dut, network, input_dict=None)
+
+ Returns
+ -------
+ errormsg(str) or True
+ """
+
+ logger.info("Entering lib API: verify_bgp_community()")
+ if router not in tgen.routers():
+ return False
+
+ rnode = tgen.routers()[router]
+
+ logger.debug("Verifying BGP community attributes on dut %s: for %s "
+ "network %s", router, addr_type, network)
+
+ for net in network:
+ cmd = "show bgp {} {} json".format(addr_type, net)
+ show_bgp_json = rnode.vtysh_cmd(cmd, isjson=True)
+ logger.info(show_bgp_json)
+ if "paths" not in show_bgp_json:
+ return "Prefix {} not found in BGP table of router: {}". \
+ format(net, router)
+
+ as_paths = show_bgp_json["paths"]
+ found = False
+ for i in range(len(as_paths)):
+ if "largeCommunity" in show_bgp_json["paths"][i] or \
+ "community" in show_bgp_json["paths"][i]:
+ found = True
+ logger.info("Large Community attribute is found for route:"
+ " %s in router: %s", net, router)
+ if input_dict is not None:
+ for criteria, comm_val in input_dict.items():
+ show_val = show_bgp_json["paths"][i][criteria][
+ "string"]
+ if comm_val == show_val:
+ logger.info("Verifying BGP %s for prefix: %s"
+ " in router: %s, found expected"
+ " value: %s", criteria, net, router,
+ comm_val)
+ else:
+ errormsg = "Failed: Verifying BGP attribute" \
+ " {} for route: {} in router: {}" \
+ ", expected value: {} but found" \
+ ": {}".format(
+ criteria, net, router, comm_val,
+ show_val)
+ return errormsg
+
+ if not found:
+ errormsg = (
+ "Large Community attribute is not found for route: "
+ "{} in router: {} ".format(net, router))
+ return errormsg
+
+ logger.debug("Exiting lib API: verify_bgp_community()")
return True