diff options
Diffstat (limited to 'tests/topotests/lib/common_config.py')
| -rw-r--r-- | tests/topotests/lib/common_config.py | 458 |
1 files changed, 423 insertions, 35 deletions
diff --git a/tests/topotests/lib/common_config.py b/tests/topotests/lib/common_config.py index f2d33f94ae..ba8a4cb0f4 100644 --- a/tests/topotests/lib/common_config.py +++ b/tests/topotests/lib/common_config.py @@ -21,6 +21,7 @@ from collections import OrderedDict from datetime import datetime from time import sleep +from copy import deepcopy from subprocess import call from subprocess import STDOUT as SUB_STDOUT from subprocess import PIPE as SUB_PIPE @@ -208,6 +209,7 @@ def create_common_configuration(tgen, router, data, config_type=None, "interface_config": "! Interfaces Config\n", "static_route": "! Static Route Config\n", "prefix_list": "! Prefix List Config\n", + "bgp_community_list": "! Community List Config\n", "route_maps": "! Route Maps Config\n", "bgp": "! BGP Config\n" }) @@ -547,13 +549,11 @@ def generate_ips(network, no_of_ips): Returns list of IPs. based on start_ip and no_of_ips - * `network` : from here the ip will start generating, start_ip will be - first ip + * `network` : from here the ip will start generating, + start_ip will be * `no_of_ips` : these many IPs will be generated - - Limitation: It will generate IPs only for ip_mask 32 - """ + ipaddress_list = [] if type(network) is not list: network = [network] @@ -893,6 +893,7 @@ def create_static_routes(tgen, input_dict, build=False): """ result = False logger.debug("Entering lib API: create_static_routes()") + input_dict = deepcopy(input_dict) try: for router in input_dict.keys(): if "static_routes" not in input_dict[router]: @@ -918,16 +919,21 @@ def create_static_routes(tgen, input_dict, build=False): next_hop = static_route["next_hop"] network = static_route["network"] - ip_list = generate_ips([network], no_of_ip) + if type(network) is not list: + network = [network] + + ip_list = generate_ips(network, no_of_ip) for ip in ip_list: addr_type = validate_ip_address(ip) + if addr_type == "ipv4": cmd = "ip route {} {}".format(ip, next_hop) else: cmd = "ipv6 route {} {}".format(ip, next_hop) if tag: - cmd = "{} {}".format(cmd, str(tag)) + cmd = "{} tag {}".format(cmd, str(tag)) + if admin_distance: cmd = "{} {}".format(cmd, admin_distance) @@ -1112,11 +1118,11 @@ def create_route_maps(tgen, input_dict, build=False): "prefix_list": "pf_list_1" } - "large-community-list": "{ + "large-community-list": { "id": "community_1", "exact_match": True } - "community": { + "community_list": { "id": "community_2", "exact_match": True } @@ -1152,12 +1158,11 @@ def create_route_maps(tgen, input_dict, build=False): result = False logger.debug("Entering lib API: create_route_maps()") - + input_dict = deepcopy(input_dict) try: for router in input_dict.keys(): if "route_maps" not in input_dict[router]: - errormsg = "route_maps not present in input_dict" - logger.debug(errormsg) + logger.debug("route_maps not present in input_dict") continue rmap_data = [] for rmap_name, rmap_value in \ @@ -1187,10 +1192,41 @@ def create_route_maps(tgen, input_dict, build=False): rmap_name, rmap_action, seq_id )) + if "continue" in rmap_dict: + continue_to = rmap_dict["continue"] + if continue_to: + rmap_data.append("on-match goto {}". + format(continue_to)) + else: + logger.error("In continue, 'route-map entry " + "sequence number' is not provided") + return False + + if "goto" in rmap_dict: + go_to = rmap_dict["goto"] + if go_to: + rmap_data.append("on-match goto {}". + format(go_to)) + else: + logger.error("In goto, 'Goto Clause number' is not" + " provided") + return False + + if "call" in rmap_dict: + call_rmap = rmap_dict["call"] + if call_rmap: + rmap_data.append("call {}". + format(call_rmap)) + else: + logger.error("In call, 'destination Route-Map' is" + " not provided") + return False + # Verifying if SET criteria is defined if "set" in rmap_dict: set_data = rmap_dict["set"] - + ipv4_data = set_data.setdefault("ipv4", {}) + ipv6_data = set_data.setdefault("ipv6", {}) local_preference = set_data.setdefault("localpref", None) metric = set_data.setdefault("med", None) @@ -1199,7 +1235,10 @@ def create_route_maps(tgen, input_dict, build=False): community = set_data.setdefault("community", {}) large_community = set_data.setdefault( "large_community", {}) + large_comm_list = set_data.setdefault( + "large_comm_list", {}) set_action = set_data.setdefault("set_action", None) + nexthop = set_data.setdefault("nexthop", None) # Local Preference if local_preference: @@ -1243,42 +1282,84 @@ def create_route_maps(tgen, input_dict, build=False): rmap_data.append(cmd) else: - logger.errror("In large_community, AS Num not" - " provided") + logger.error("In large_community, AS Num not" + " provided") + return False + if large_comm_list: + id = large_comm_list.setdefault("id", None) + del_comm = large_comm_list.setdefault("delete", + None) + if id: + cmd = "set large-comm-list {}".format(id) + if del_comm: + cmd = "{} delete".format(cmd) + + rmap_data.append(cmd) + else: + logger.error("In large_comm_list 'id' not" + " provided") return False # Weight if weight: - rmap_data.append("set weight {}".format( + rmap_data.append("set weight {} \n".format( weight)) - + if ipv6_data: + nexthop = ipv6_data.setdefault("nexthop",None) + if nexthop: + rmap_data.append("set ipv6 next-hop \ + {}".format(nexthop)) # Adding MATCH and SET sequence to RMAP if defined if "match" in rmap_dict: match_data = rmap_dict["match"] ipv4_data = match_data.setdefault("ipv4", {}) ipv6_data = match_data.setdefault("ipv6", {}) - community = match_data.setdefault("community-list", - {}) + community = match_data.setdefault( + "community_list",{}) large_community = match_data.setdefault( - "large-community-list", {} + "large_community", {} + ) + large_community_list = match_data.setdefault( + "large_community_list", {} ) - tag = match_data.setdefault("tag", None) if ipv4_data: - prefix_name = ipv4_data.setdefault("prefix_lists", - None) + # fetch prefix list data from rmap + prefix_name = \ + ipv4_data.setdefault("prefix_lists", + None) if prefix_name: - rmap_data.append("match ip address prefix-list" - " {}".format(prefix_name)) + rmap_data.append("match ip address" + " prefix-list {}".format(prefix_name)) + + # fetch tag data from rmap + tag = ipv4_data.setdefault("tag", None) + if tag: + rmap_data.append("match tag {}".format(tag)) + + # fetch large community data from rmap + large_community_list = ipv4_data.setdefault( + "large_community_list",{}) + large_community = match_data.setdefault( + "large_community", {}) + if ipv6_data: prefix_name = ipv6_data.setdefault("prefix_lists", None) if prefix_name: - rmap_data.append("match ipv6 address " - "prefix-list {}". - format(prefix_name)) - if tag: - rmap_data.append("match tag {}".format(tag)) + rmap_data.append("match ipv6 address" + " prefix-list {}".format(prefix_name)) + + # fetch tag data from rmap + tag = ipv6_data.setdefault("tag", None) + if tag: + rmap_data.append("match tag {}".format(tag)) + + # fetch large community data from rmap + large_community_list = ipv6_data.setdefault( + "large_community_list",{}) + large_community = match_data.setdefault( + "large_community", {}) if community: if "id" not in community: @@ -1293,10 +1374,9 @@ def create_route_maps(tgen, input_dict, build=False): cmd = "{} exact-match".format(cmd) rmap_data.append(cmd) - if large_community: if "id" not in large_community: - logger.error("'num' is mandatory for " + logger.error("'id' is mandatory for " "large-community-list in match " "criteria") return False @@ -1306,7 +1386,19 @@ def create_route_maps(tgen, input_dict, build=False): "exact_match", False) if exact_match: cmd = "{} exact-match".format(cmd) - + rmap_data.append(cmd) + if large_community_list: + if "id" not in large_community_list: + logger.error("'id' is mandatory for " + "large-community-list in match " + "criteria") + return False + cmd = "match large-community {}".format( + large_community_list["id"]) + exact_match = large_community_list.setdefault( + "exact_match", False) + if exact_match: + cmd = "{} exact-match".format(cmd) rmap_data.append(cmd) result = create_common_configuration(tgen, router, @@ -1320,10 +1412,178 @@ def create_route_maps(tgen, input_dict, build=False): logger.error(errormsg) return errormsg - logger.debug("Exiting lib API: create_prefix_lists()") + logger.debug("Exiting lib API: create_route_maps()") + return result + + +def delete_route_maps(tgen, input_dict): + """ + Delete ip route maps from device + + * `tgen` : Topogen object + * `input_dict` : for which router, + route map has to be deleted + + Usage + ----- + # Delete route-map rmap_1 and rmap_2 from router r1 + input_dict = { + "r1": { + "route_maps": ["rmap_1", "rmap__2"] + } + } + result = delete_route_maps("ipv4", input_dict) + + Returns + ------- + errormsg(str) or True + """ + logger.info("Entering lib API: delete_route_maps()") + + for router in input_dict.keys(): + route_maps = input_dict[router]["route_maps"][:] + rmap_data = input_dict[router] + rmap_data["route_maps"] = {} + for route_map_name in route_maps: + rmap_data["route_maps"].update({ + route_map_name: + [{ + "delete": True + }] + }) + + return create_route_maps(tgen, input_dict) + + +def create_bgp_community_lists(tgen, input_dict, build=False): + """ + Create bgp community-list or large-community-list on the devices as per + the arguments passed. Takes list of communities in input. + + Parameters + ---------- + * `tgen` : Topogen object + * `input_dict` : Input dict data, required when configuring from testcase + * `build` : Only for initial setup phase this is set as True. + Usage + ----- + input_dict_1 = { + "r3": { + "bgp_community_lists": [ + { + "community_type": "standard", + "action": "permit", + "name": "rmap_lcomm_{}".format(addr_type), + "value": "1:1:1 1:2:3 2:1:1 2:2:2", + "large": True + } + ] + } + } + } + result = create_bgp_community_lists(tgen, input_dict_1) + """ + + result = False + logger.debug("Entering lib API: create_bgp_community_lists()") + input_dict = deepcopy(input_dict) + try: + for router in input_dict.keys(): + if "bgp_community_lists" not in input_dict[router]: + errormsg = "bgp_community_lists not present in input_dict" + logger.debug(errormsg) + continue + + config_data = [] + + community_list = input_dict[router]["bgp_community_lists"] + for community_dict in community_list: + del_action = community_dict.setdefault("delete", False) + community_type = community_dict.setdefault("community_type", + None) + action = community_dict.setdefault("action", None) + value = community_dict.setdefault("value", '') + large = community_dict.setdefault("large", None) + name = community_dict.setdefault("name", None) + if large: + cmd = "bgp large-community-list" + else: + cmd = "bgp community-list" + + if not large and not (community_type and action and value): + errormsg = "community_type, action and value are " \ + "required in bgp_community_list" + logger.error(errormsg) + return False + + try: + community_type = int(community_type) + cmd = "{} {} {} {}".format(cmd, community_type, action, + value) + except ValueError: + + cmd = "{} {} {} {} {}".format( + cmd, community_type, name, action, value) + + if del_action: + cmd = "no {}".format(cmd) + + config_data.append(cmd) + + result = create_common_configuration(tgen, router, config_data, + "bgp_community_list", + build=build) + + except InvalidCLIError: + # Traceback + errormsg = traceback.format_exc() + logger.error(errormsg) + return errormsg + + logger.debug("Exiting lib API: create_bgp_community_lists()") return result +def shutdown_bringup_interface(tgen, dut, intf_name, ifaceaction=False): + """ + Shutdown or bringup router's interface " + + * `tgen` : Topogen object + * `dut` : Device under test + * `intf_name` : Interface name to be shut/no shut + * `ifaceaction` : Action, to shut/no shut interface, + by default is False + + Usage + ----- + dut = "r3" + intf = "r3-r1-eth0" + # Shut down ineterface + shutdown_bringup_interface(tgen, dut, intf, False) + + # Bring up ineterface + shutdown_bringup_interface(tgen, dut, intf, True) + + Returns + ------- + errormsg(str) or True + """ + + from topotest import interface_set_status + router_list = tgen.routers() + if ifaceaction: + logger.info("Bringing up interface : {}".format(intf_name)) + else: + logger.info("Shutting down interface : {}".format(intf_name)) + + interface_set_status(router_list[dut], intf_name, + ifaceaction) + + if ifaceaction: + # Disabling v6 link local once interfac comes up back + disable_v6_link_local(tgen, dut, intf_name=intf_name) + + ############################################# # Verification APIs ############################################# @@ -1625,5 +1885,133 @@ def verify_prefix_lists(tgen, input_dict): logger.info("Prefix list %s is/are not present in the router" " from router %s", prefix_list, router) - logger.debug("Exiting lib API: verify_prefix_lissts()") + logger.debug("Exiting lib API: verify_prefix_lists()") + return True + + +@retry(attempts=2, wait=4, return_is_str=True, initial_wait=2) +def verify_route_maps(tgen, input_dict): + """ + Running "show route-map" command and verifying given route-map + is present in router. + Parameters + ---------- + * `tgen` : topogen object + * `input_dict`: data to verify prefix lists + Usage + ----- + # To verify rmap_1 and rmap_2 are present in router r1 + input_dict = { + "r1": { + "route_maps": ["rmap_1", "rmap_2"] + } + } + result = verify_route_maps(tgen, input_dict) + Returns + ------- + errormsg(str) or True + """ + + logger.debug("Entering lib API: verify_route_maps()") + + for router in input_dict.keys(): + if router not in tgen.routers(): + continue + + rnode = tgen.routers()[router] + # Show ip route-map + show_route_maps = rnode.vtysh_cmd("show route-map") + + # Verify route-map is deleted + route_maps = input_dict[router]["route_maps"] + for route_map in route_maps: + if route_map in show_route_maps: + errormsg = ("Route map {} is not deleted from router" + " {}".format(route_map, router)) + return errormsg + + logger.info("Route map %s is/are deleted successfully from" + " router %s", route_maps, router) + + logger.debug("Exiting lib API: verify_route_maps()") + return True + + +@retry(attempts=3, wait=4, return_is_str=True) +def verify_bgp_community(tgen, addr_type, router, network, input_dict=None): + """ + API to veiryf BGP large community is attached in route for any given + DUT by running "show bgp ipv4/6 {route address} json" command. + + Parameters + ---------- + * `tgen`: topogen object + * `addr_type` : ip type, ipv4/ipv6 + * `dut`: Device Under Test + * `network`: network for which set criteria needs to be verified + * `input_dict`: having details like - for which router, community and + values needs to be verified + Usage + ----- + networks = ["200.50.2.0/32"] + input_dict = { + "largeCommunity": "2:1:1 2:2:2 2:3:3 2:4:4 2:5:5" + } + result = verify_bgp_community(tgen, "ipv4", dut, network, input_dict=None) + + Returns + ------- + errormsg(str) or True + """ + + logger.info("Entering lib API: verify_bgp_community()") + if router not in tgen.routers(): + return False + + rnode = tgen.routers()[router] + + logger.debug("Verifying BGP community attributes on dut %s: for %s " + "network %s", router, addr_type, network) + + for net in network: + cmd = "show bgp {} {} json".format(addr_type, net) + show_bgp_json = rnode.vtysh_cmd(cmd, isjson=True) + logger.info(show_bgp_json) + if "paths" not in show_bgp_json: + return "Prefix {} not found in BGP table of router: {}". \ + format(net, router) + + as_paths = show_bgp_json["paths"] + found = False + for i in range(len(as_paths)): + if "largeCommunity" in show_bgp_json["paths"][i] or \ + "community" in show_bgp_json["paths"][i]: + found = True + logger.info("Large Community attribute is found for route:" + " %s in router: %s", net, router) + if input_dict is not None: + for criteria, comm_val in input_dict.items(): + show_val = show_bgp_json["paths"][i][criteria][ + "string"] + if comm_val == show_val: + logger.info("Verifying BGP %s for prefix: %s" + " in router: %s, found expected" + " value: %s", criteria, net, router, + comm_val) + else: + errormsg = "Failed: Verifying BGP attribute" \ + " {} for route: {} in router: {}" \ + ", expected value: {} but found" \ + ": {}".format( + criteria, net, router, comm_val, + show_val) + return errormsg + + if not found: + errormsg = ( + "Large Community attribute is not found for route: " + "{} in router: {} ".format(net, router)) + return errormsg + + logger.debug("Exiting lib API: verify_bgp_community()") return True |
