summaryrefslogtreecommitdiff
path: root/tests/topotests/lib/common_config.py
diff options
context:
space:
mode:
Diffstat (limited to 'tests/topotests/lib/common_config.py')
-rw-r--r--tests/topotests/lib/common_config.py1391
1 files changed, 1391 insertions, 0 deletions
diff --git a/tests/topotests/lib/common_config.py b/tests/topotests/lib/common_config.py
new file mode 100644
index 0000000000..d2c1d82430
--- /dev/null
+++ b/tests/topotests/lib/common_config.py
@@ -0,0 +1,1391 @@
+#
+# Copyright (c) 2019 by VMware, Inc. ("VMware")
+# Used Copyright (c) 2018 by Network Device Education Foundation, Inc.
+# ("NetDEF") in this file.
+#
+# Permission to use, copy, modify, and/or distribute this software
+# for any purpose with or without fee is hereby granted, provided
+# that the above copyright notice and this permission notice appear
+# in all copies.
+#
+# THE SOFTWARE IS PROVIDED "AS IS" AND VMWARE DISCLAIMS ALL WARRANTIES
+# WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
+# MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL VMWARE BE LIABLE FOR
+# ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY
+# DAMAGES WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS,
+# WHETHER IN AN ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS
+# ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR PERFORMANCE
+# OF THIS SOFTWARE.
+#
+
+from collections import OrderedDict
+from datetime import datetime
+from time import sleep
+from subprocess import call
+from subprocess import STDOUT as SUB_STDOUT
+import StringIO
+import os
+import ConfigParser
+import traceback
+import socket
+import ipaddr
+
+from lib.topolog import logger, logger_config
+from lib.topogen import TopoRouter
+
+
+FRRCFG_FILE = "frr_json.conf"
+FRRCFG_BKUP_FILE = "frr_json_initial.conf"
+
+ERROR_LIST = ["Malformed", "Failure", "Unknown"]
+
+####
+CD = os.path.dirname(os.path.realpath(__file__))
+PYTESTINI_PATH = os.path.join(CD, "../pytest.ini")
+
+# Creating tmp dir with testsuite name to avoid conflict condition when
+# multiple testsuites run together. All temporary files would be created
+# in this dir and this dir would be removed once testsuite run is
+# completed
+LOGDIR = "/tmp/topotests/"
+TMPDIR = None
+
+# NOTE: to save execution logs to log file frrtest_log_dir must be configured
+# in `pytest.ini`.
+config = ConfigParser.ConfigParser()
+config.read(PYTESTINI_PATH)
+
+config_section = "topogen"
+
+if config.has_option("topogen", "verbosity"):
+ loglevel = config.get("topogen", "verbosity")
+ loglevel = loglevel.upper()
+else:
+ loglevel = "INFO"
+
+if config.has_option("topogen", "frrtest_log_dir"):
+ frrtest_log_dir = config.get("topogen", "frrtest_log_dir")
+ time_stamp = datetime.time(datetime.now())
+ logfile_name = "frr_test_bgp_"
+ frrtest_log_file = frrtest_log_dir + logfile_name + str(time_stamp)
+ print("frrtest_log_file..", frrtest_log_file)
+
+ logger = logger_config.get_logger(name="test_execution_logs",
+ log_level=loglevel,
+ target=frrtest_log_file)
+ print("Logs will be sent to logfile: {}".format(frrtest_log_file))
+
+if config.has_option("topogen", "show_router_config"):
+ show_router_config = config.get("topogen", "show_router_config")
+else:
+ show_router_config = False
+
+# env variable for setting what address type to test
+ADDRESS_TYPES = os.environ.get("ADDRESS_TYPES")
+
+
+# Saves sequence id numbers
+SEQ_ID = {
+ "prefix_lists": {},
+ "route_maps": {}
+}
+
+
+def get_seq_id(obj_type, router, obj_name):
+ """
+ Generates and saves sequence number in interval of 10
+
+ Parameters
+ ----------
+ * `obj_type`: prefix_lists or route_maps
+ * `router`: router name
+ *` obj_name`: name of the prefix-list or route-map
+
+ Returns
+ --------
+ Sequence number generated
+ """
+
+ router_data = SEQ_ID[obj_type].setdefault(router, {})
+ obj_data = router_data.setdefault(obj_name, {})
+ seq_id = obj_data.setdefault("seq_id", 0)
+
+ seq_id = int(seq_id) + 10
+ obj_data["seq_id"] = seq_id
+
+ return seq_id
+
+
+def set_seq_id(obj_type, router, id, obj_name):
+ """
+ Saves sequence number if not auto-generated and given by user
+
+ Parameters
+ ----------
+ * `obj_type`: prefix_lists or route_maps
+ * `router`: router name
+ *` obj_name`: name of the prefix-list or route-map
+ """
+ router_data = SEQ_ID[obj_type].setdefault(router, {})
+ obj_data = router_data.setdefault(obj_name, {})
+ seq_id = obj_data.setdefault("seq_id", 0)
+
+ seq_id = int(seq_id) + int(id)
+ obj_data["seq_id"] = seq_id
+
+
+class InvalidCLIError(Exception):
+ """Raise when the CLI command is wrong"""
+ pass
+
+
+def create_common_configuration(tgen, router, data, config_type=None,
+ build=False):
+ """
+ API to create object of class FRRConfig and also create frr_json.conf
+ file. It will create interface and common configurations and save it to
+ frr_json.conf and load to router
+
+ Parameters
+ ----------
+ * `tgen`: tgen onject
+ * `data`: Congiguration data saved in a list.
+ * `router` : router id to be configured.
+ * `config_type` : Syntactic information while writing configuration. Should
+ be one of the value as mentioned in the config_map below.
+ * `build` : Only for initial setup phase this is set as True
+
+ Returns
+ -------
+ True or False
+ """
+ TMPDIR = os.path.join(LOGDIR, tgen.modname)
+
+ fname = "{}/{}/{}".format(TMPDIR, router, FRRCFG_FILE)
+
+ config_map = OrderedDict({
+ "general_config": "! FRR General Config\n",
+ "interface_config": "! Interfaces Config\n",
+ "static_route": "! Static Route Config\n",
+ "prefix_list": "! Prefix List Config\n",
+ "route_maps": "! Route Maps Config\n",
+ "bgp": "! BGP Config\n"
+ })
+
+ if build:
+ mode = "a"
+ else:
+ mode = "w"
+
+ try:
+ frr_cfg_fd = open(fname, mode)
+ if config_type:
+ frr_cfg_fd.write(config_map[config_type])
+ for line in data:
+ frr_cfg_fd.write("{} \n".format(str(line)))
+
+ except IOError as err:
+ logger.error("Unable to open FRR Config File. error(%s): %s" %
+ (err.errno, err.strerror))
+ return False
+ finally:
+ frr_cfg_fd.close()
+
+ # If configuration applied from build, it will done at last
+ if not build:
+ load_config_to_router(tgen, router)
+
+ return True
+
+
+def reset_config_on_routers(tgen, routerName=None):
+ """
+ Resets configuration on routers to the snapshot created using input JSON
+ file. It replaces existing router configuration with FRRCFG_BKUP_FILE
+
+ Parameters
+ ----------
+ * `tgen` : Topogen object
+ * `routerName` : router config is to be reset
+ """
+
+ logger.debug("Entering API: reset_config_on_routers")
+
+ router_list = tgen.routers()
+ for rname, router in router_list.iteritems():
+ if routerName and routerName != rname:
+ continue
+
+ cfg = router.run("vtysh -c 'show running'")
+ fname = "{}/{}/frr.sav".format(TMPDIR, rname)
+ dname = "{}/{}/delta.conf".format(TMPDIR, rname)
+ f = open(fname, "w")
+ for line in cfg.split("\n"):
+ line = line.strip()
+
+ if (line == "Building configuration..." or
+ line == "Current configuration:" or
+ not line):
+ continue
+ f.write(line)
+ f.write("\n")
+
+ f.close()
+
+ command = "/usr/lib/frr/frr-reload.py --input {}/{}/frr.sav" \
+ " --test {}/{}/frr_json_initial.conf > {}". \
+ format(TMPDIR, rname, TMPDIR, rname, dname)
+ result = call(command, shell=True, stderr=SUB_STDOUT)
+
+ # Assert if command fail
+ if result > 0:
+ errormsg = ("Command:{} is failed due to non-zero exit"
+ " code".format(command))
+ return errormsg
+
+ f = open(dname, "r")
+ delta = StringIO.StringIO()
+ delta.write("configure terminal\n")
+ t_delta = f.read()
+ for line in t_delta.split("\n"):
+ line = line.strip()
+ if (line == "Lines To Delete" or
+ line == "===============" or
+ line == "Lines To Add" or
+ line == "============" or
+ not line):
+ continue
+ delta.write(line)
+ delta.write("\n")
+
+ delta.write("end\n")
+ output = router.vtysh_multicmd(delta.getvalue(),
+ pretty_output=False)
+ logger.info("New configuration for router {}:".format(rname))
+ delta.close()
+ delta = StringIO.StringIO()
+ cfg = router.run("vtysh -c 'show running'")
+ for line in cfg.split("\n"):
+ line = line.strip()
+ delta.write(line)
+ delta.write("\n")
+
+ # Router current configuration to log file or console if
+ # "show_router_config" is defined in "pytest.ini"
+ if show_router_config:
+ logger.info(delta.getvalue())
+ delta.close()
+
+ logger.debug("Exting API: reset_config_on_routers")
+ return True
+
+
+def load_config_to_router(tgen, routerName, save_bkup=False):
+ """
+ Loads configuration on router from the file FRRCFG_FILE.
+
+ Parameters
+ ----------
+ * `tgen` : Topogen object
+ * `routerName` : router for which configuration to be loaded
+ * `save_bkup` : If True, Saves snapshot of FRRCFG_FILE to FRRCFG_BKUP_FILE
+ """
+
+ logger.debug("Entering API: load_config_to_router")
+
+ router_list = tgen.routers()
+ for rname, router in router_list.iteritems():
+ if rname == routerName:
+ try:
+ frr_cfg_file = "{}/{}/{}".format(TMPDIR, rname, FRRCFG_FILE)
+ frr_cfg_bkup = "{}/{}/{}".format(TMPDIR, rname,
+ FRRCFG_BKUP_FILE)
+ with open(frr_cfg_file, "r") as cfg:
+ data = cfg.read()
+ if save_bkup:
+ with open(frr_cfg_bkup, "w") as bkup:
+ bkup.write(data)
+
+ output = router.vtysh_multicmd(data, pretty_output=False)
+ for out_err in ERROR_LIST:
+ if out_err.lower() in output.lower():
+ raise InvalidCLIError("%s" % output)
+ except IOError as err:
+ errormsg = ("Unable to open config File. error(%s):"
+ " %s", (err.errno, err.strerror))
+ return errormsg
+
+ logger.info("New configuration for router {}:".format(rname))
+ new_config = router.run("vtysh -c 'show running'")
+
+ # Router current configuration to log file or console if
+ # "show_router_config" is defined in "pytest.ini"
+ if show_router_config:
+ logger.info(new_config)
+
+ logger.debug("Exting API: load_config_to_router")
+ return True
+
+
+def start_topology(tgen):
+ """
+ Starting topology, create tmp files which are loaded to routers
+ to start deamons and then start routers
+ * `tgen` : topogen object
+ """
+
+ global TMPDIR
+ # Starting topology
+ tgen.start_topology()
+
+ # Starting deamons
+ router_list = tgen.routers()
+ TMPDIR = os.path.join(LOGDIR, tgen.modname)
+
+ for rname, router in router_list.iteritems():
+ try:
+ os.chdir(TMPDIR)
+
+ # Creating rouer named dir and empty zebra.conf bgpd.conf files
+ # inside the current directory
+
+ if os.path.isdir('{}'.format(rname)):
+ os.system("rm -rf {}".format(rname))
+ os.mkdir('{}'.format(rname))
+ os.system('chmod -R go+rw {}'.format(rname))
+ os.chdir('{}/{}'.format(TMPDIR, rname))
+ os.system('touch zebra.conf bgpd.conf')
+ else:
+ os.mkdir('{}'.format(rname))
+ os.system('chmod -R go+rw {}'.format(rname))
+ os.chdir('{}/{}'.format(TMPDIR, rname))
+ os.system('touch zebra.conf bgpd.conf')
+
+ except IOError as (errno, strerror):
+ logger.error("I/O error({0}): {1}".format(errno, strerror))
+
+ # Loading empty zebra.conf file to router, to start the zebra deamon
+ router.load_config(
+ TopoRouter.RD_ZEBRA,
+ '{}/{}/zebra.conf'.format(TMPDIR, rname)
+ # os.path.join(tmpdir, '{}/zebra.conf'.format(rname))
+ )
+ # Loading empty bgpd.conf file to router, to start the bgp deamon
+ router.load_config(
+ TopoRouter.RD_BGP,
+ '{}/{}/bgpd.conf'.format(TMPDIR, rname)
+ # os.path.join(tmpdir, '{}/bgpd.conf'.format(rname))
+ )
+
+ # Starting routers
+ logger.info("Starting all routers once topology is created")
+ tgen.start_router()
+
+
+def number_to_row(routerName):
+ """
+ Returns the number for the router.
+ Calculation based on name a0 = row 0, a1 = row 1, b2 = row 2, z23 = row 23
+ etc
+ """
+ return int(routerName[1:])
+
+
+def number_to_column(routerName):
+ """
+ Returns the number for the router.
+ Calculation based on name a0 = columnn 0, a1 = column 0, b2= column 1,
+ z23 = column 26 etc
+ """
+ return ord(routerName[0]) - 97
+
+
+#############################################
+# Common APIs, will be used by all protocols
+#############################################
+
+def validate_ip_address(ip_address):
+ """
+ Validates the type of ip address
+
+ Parameters
+ ----------
+ * `ip_address`: IPv4/IPv6 address
+
+ Returns
+ -------
+ Type of address as string
+ """
+
+ if "/" in ip_address:
+ ip_address = ip_address.split("/")[0]
+
+ v4 = True
+ v6 = True
+ try:
+ socket.inet_aton(ip_address)
+ except socket.error as error:
+ logger.debug("Not a valid IPv4 address")
+ v4 = False
+ else:
+ return "ipv4"
+
+ try:
+ socket.inet_pton(socket.AF_INET6, ip_address)
+ except socket.error as error:
+ logger.debug("Not a valid IPv6 address")
+ v6 = False
+ else:
+ return "ipv6"
+
+ if not v4 and not v6:
+ raise Exception("InvalidIpAddr", "%s is neither valid IPv4 or IPv6"
+ " address" % ip_address)
+
+
+def check_address_types(addr_type):
+ """
+ Checks environment variable set and compares with the current address type
+ """
+ global ADDRESS_TYPES
+ if ADDRESS_TYPES is None:
+ ADDRESS_TYPES = "dual"
+
+ if ADDRESS_TYPES == "dual":
+ ADDRESS_TYPES = ["ipv4", "ipv6"]
+ elif ADDRESS_TYPES == "ipv4":
+ ADDRESS_TYPES = ["ipv4"]
+ elif ADDRESS_TYPES == "ipv6":
+ ADDRESS_TYPES = ["ipv6"]
+
+ if addr_type not in ADDRESS_TYPES:
+ logger.error("{} not in supported/configured address types {}".
+ format(addr_type, ADDRESS_TYPES))
+ return False
+
+ return ADDRESS_TYPES
+
+
+def generate_ips(network, no_of_ips):
+ """
+ Returns list of IPs.
+ based on start_ip and no_of_ips
+
+ * `network` : from here the ip will start generating, start_ip will be
+ first ip
+ * `no_of_ips` : these many IPs will be generated
+
+ Limitation: It will generate IPs only for ip_mask 32
+
+ """
+ ipaddress_list = []
+ if type(network) is not list:
+ network = [network]
+
+ for start_ipaddr in network:
+ if "/" in start_ipaddr:
+ start_ip = start_ipaddr.split("/")[0]
+ mask = int(start_ipaddr.split("/")[1])
+
+ addr_type = validate_ip_address(start_ip)
+ if addr_type == "ipv4":
+ start_ip = ipaddr.IPv4Address(unicode(start_ip))
+ step = 2 ** (32 - mask)
+ if addr_type == "ipv6":
+ start_ip = ipaddr.IPv6Address(unicode(start_ip))
+ step = 2 ** (128 - mask)
+
+ next_ip = start_ip
+ count = 0
+ while count < no_of_ips:
+ ipaddress_list.append("{}/{}".format(next_ip, mask))
+ if addr_type == "ipv6":
+ next_ip = ipaddr.IPv6Address(int(next_ip) + step)
+ else:
+ next_ip += step
+ count += 1
+
+ return ipaddress_list
+
+
+def find_interface_with_greater_ip(topo, router, loopback=True,
+ interface=True):
+ """
+ Returns highest interface ip for ipv4/ipv6. If loopback is there then
+ it will return highest IP from loopback IPs otherwise from physical
+ interface IPs.
+
+ * `topo` : json file data
+ * `router` : router for which hightest interface should be calculated
+ """
+
+ link_data = topo["routers"][router]["links"]
+ lo_list = []
+ interfaces_list = []
+ lo_exists = False
+ for destRouterLink, data in sorted(link_data.iteritems()):
+ if loopback:
+ if "type" in data and data["type"] == "loopback":
+ lo_exists = True
+ ip_address = topo["routers"][router]["links"][
+ destRouterLink]["ipv4"].split("/")[0]
+ lo_list.append(ip_address)
+ if interface:
+ ip_address = topo["routers"][router]["links"][
+ destRouterLink]["ipv4"].split("/")[0]
+ interfaces_list.append(ip_address)
+
+ if lo_exists:
+ return sorted(lo_list)[-1]
+
+ return sorted(interfaces_list)[-1]
+
+
+def write_test_header(tc_name):
+ """ Display message at beginning of test case"""
+ count = 20
+ logger.info("*"*(len(tc_name)+count))
+ logger.info("START -> Testcase : %s", tc_name)
+ logger.info("*"*(len(tc_name)+count))
+
+
+def write_test_footer(tc_name):
+ """ Display message at end of test case"""
+ count = 21
+ logger.info("="*(len(tc_name)+count))
+ logger.info("PASSED -> Testcase : %s", tc_name)
+ logger.info("="*(len(tc_name)+count))
+
+
+#############################################
+# These APIs, will used by testcase
+#############################################
+def create_interfaces_cfg(tgen, topo, build=False):
+ """
+ Create interface configuration for created topology. Basic Interface
+ configuration is provided in input json file.
+
+ Parameters
+ ----------
+ * `tgen` : Topogen object
+ * `topo` : json file data
+ * `build` : Only for initial setup phase this is set as True.
+
+ Returns
+ -------
+ True or False
+ """
+ result = False
+
+ try:
+ for c_router, c_data in topo.iteritems():
+ interface_data = []
+ for destRouterLink, data in sorted(c_data["links"].iteritems()):
+ # Loopback interfaces
+ if "type" in data and data["type"] == "loopback":
+ interface_name = destRouterLink
+ else:
+ interface_name = data["interface"]
+ interface_data.append("interface {}\n".format(
+ str(interface_name)
+ ))
+ if "ipv4" in data:
+ intf_addr = c_data["links"][destRouterLink]["ipv4"]
+ interface_data.append("ip address {}\n".format(
+ intf_addr
+ ))
+ if "ipv6" in data:
+ intf_addr = c_data["links"][destRouterLink]["ipv6"]
+ interface_data.append("ipv6 address {}\n".format(
+ intf_addr
+ ))
+ result = create_common_configuration(tgen, c_router,
+ interface_data,
+ "interface_config",
+ build=build)
+ except InvalidCLIError:
+ # Traceback
+ errormsg = traceback.format_exc()
+ logger.error(errormsg)
+ return errormsg
+
+ return result
+
+
+def create_static_routes(tgen, input_dict, build=False):
+ """
+ Create static routes for given router as defined in input_dict
+
+ Parameters
+ ----------
+ * `tgen` : Topogen object
+ * `input_dict` : Input dict data, required when configuring from testcase
+ * `build` : Only for initial setup phase this is set as True.
+
+ Usage
+ -----
+ input_dict should be in the format below:
+ # static_routes: list of all routes
+ # network: network address
+ # no_of_ip: number of next-hop address that will be configured
+ # admin_distance: admin distance for route/routes.
+ # next_hop: starting next-hop address
+ # tag: tag id for static routes
+ # delete: True if config to be removed. Default False.
+
+ Example:
+ "routers": {
+ "r1": {
+ "static_routes": [
+ {
+ "network": "100.0.20.1/32",
+ "no_of_ip": 9,
+ "admin_distance": 100,
+ "next_hop": "10.0.0.1",
+ "tag": 4001
+ "delete": true
+ }
+ ]
+ }
+ }
+
+ Returns
+ -------
+ errormsg(str) or True
+ """
+ result = False
+ logger.debug("Entering lib API: create_static_routes()")
+ try:
+ for router in input_dict.keys():
+ if "static_routes" not in input_dict[router]:
+ errormsg = "static_routes not present in input_dict"
+ logger.info(errormsg)
+ continue
+
+ static_routes_list = []
+
+ static_routes = input_dict[router]["static_routes"]
+ for static_route in static_routes:
+ del_action = static_route.setdefault("delete", False)
+ # No of IPs
+ no_of_ip = static_route.setdefault("no_of_ip", 1)
+ admin_distance = static_route.setdefault("admin_distance",
+ None)
+ tag = static_route.setdefault("tag", None)
+ if "next_hop" not in static_route or \
+ "network" not in static_route:
+ errormsg = "'next_hop' or 'network' missing in" \
+ " input_dict"
+ return errormsg
+
+ next_hop = static_route["next_hop"]
+ network = static_route["network"]
+ ip_list = generate_ips([network], no_of_ip)
+ for ip in ip_list:
+ addr_type = validate_ip_address(ip)
+ if addr_type == "ipv4":
+ cmd = "ip route {} {}".format(ip, next_hop)
+ else:
+ cmd = "ipv6 route {} {}".format(ip, next_hop)
+
+ if tag:
+ cmd = "{} {}".format(cmd, str(tag))
+ if admin_distance:
+ cmd = "{} {}".format(cmd, admin_distance)
+
+ if del_action:
+ cmd = "no {}".format(cmd)
+
+ static_routes_list.append(cmd)
+
+ result = create_common_configuration(tgen, router,
+ static_routes_list,
+ "static_route",
+ build=build)
+
+ except InvalidCLIError:
+ # Traceback
+ errormsg = traceback.format_exc()
+ logger.error(errormsg)
+ return errormsg
+
+ logger.debug("Exiting lib API: create_static_routes()")
+ return result
+
+
+def create_prefix_lists(tgen, input_dict, build=False):
+ """
+ Create ip prefix lists as per the config provided in input
+ JSON or input_dict
+
+ Parameters
+ ----------
+ * `tgen` : Topogen object
+ * `input_dict` : Input dict data, required when configuring from testcase
+ * `build` : Only for initial setup phase this is set as True.
+
+ Usage
+ -----
+ # pf_lists_1: name of prefix-list, user defined
+ # seqid: prefix-list seqid, auto-generated if not given by user
+ # network: criteria for applying prefix-list
+ # action: permit/deny
+ # le: less than or equal number of bits
+ # ge: greater than or equal number of bits
+
+ Example
+ -------
+ input_dict = {
+ "r1": {
+ "prefix_lists":{
+ "ipv4": {
+ "pf_list_1": [
+ {
+ "seqid": 10,
+ "network": "any",
+ "action": "permit",
+ "le": "32",
+ "ge": "30",
+ "delete": True
+ }
+ ]
+ }
+ }
+ }
+ }
+
+ Returns
+ -------
+ errormsg or True
+ """
+
+ logger.debug("Entering lib API: create_prefix_lists()")
+ result = False
+ try:
+ for router in input_dict.keys():
+ if "prefix_lists" not in input_dict[router]:
+ errormsg = "prefix_lists not present in input_dict"
+ logger.info(errormsg)
+ continue
+
+ config_data = []
+ prefix_lists = input_dict[router]["prefix_lists"]
+ for addr_type, prefix_data in prefix_lists.iteritems():
+ if not check_address_types(addr_type):
+ continue
+
+ for prefix_name, prefix_list in prefix_data.iteritems():
+ for prefix_dict in prefix_list:
+ if "action" not in prefix_dict or \
+ "network" not in prefix_dict:
+ errormsg = "'action' or network' missing in" \
+ " input_dict"
+ return errormsg
+
+ network_addr = prefix_dict["network"]
+ action = prefix_dict["action"]
+ le = prefix_dict.setdefault("le", None)
+ ge = prefix_dict.setdefault("ge", None)
+ seqid = prefix_dict.setdefault("seqid", None)
+ del_action = prefix_dict.setdefault("delete", False)
+ if seqid is None:
+ seqid = get_seq_id("prefix_lists", router,
+ prefix_name)
+ else:
+ set_seq_id("prefix_lists", router, seqid,
+ prefix_name)
+
+ if addr_type == "ipv4":
+ protocol = "ip"
+ else:
+ protocol = "ipv6"
+
+ cmd = "{} prefix-list {} seq {} {} {}".format(
+ protocol, prefix_name, seqid, action, network_addr
+ )
+ if le:
+ cmd = "{} le {}".format(cmd, le)
+ if ge:
+ cmd = "{} ge {}".format(cmd, ge)
+
+ if del_action:
+ cmd = "no {}".format(cmd)
+
+ config_data.append(cmd)
+ result = create_common_configuration(tgen, router,
+ config_data,
+ "prefix_list",
+ build=build)
+
+ except InvalidCLIError:
+ # Traceback
+ errormsg = traceback.format_exc()
+ logger.error(errormsg)
+ return errormsg
+
+ logger.debug("Exiting lib API: create_prefix_lists()")
+ return result
+
+
+def create_route_maps(tgen, input_dict, build=False):
+ """
+ Create route-map on the devices as per the arguments passed
+
+ Parameters
+ ----------
+ * `tgen` : Topogen object
+ * `input_dict` : Input dict data, required when configuring from testcase
+ * `build` : Only for initial setup phase this is set as True.
+
+ Usage
+ -----
+ # route_maps: key, value pair for route-map name and its attribute
+ # rmap_match_prefix_list_1: user given name for route-map
+ # action: PERMIT/DENY
+ # match: key,value pair for match criteria. prefix_list, community-list,
+ large-community-list or tag. Only one option at a time.
+ # prefix_list: name of prefix list
+ # large-community-list: name of large community list
+ # community-ist: name of community list
+ # tag: tag id for static routes
+ # set: key, value pair for modifying route attributes
+ # localpref: preference value for the network
+ # med: metric value advertised for AS
+ # aspath: set AS path value
+ # weight: weight for the route
+ # community: standard community value to be attached
+ # large_community: large community value to be attached
+ # community_additive: if set to "additive", adds community/large-community
+ value to the existing values of the network prefix
+
+ Example:
+ --------
+ input_dict = {
+ "r1": {
+ "route_maps": {
+ "rmap_match_prefix_list_1": [
+ {
+ "action": "PERMIT",
+ "match": {
+ "ipv4": {
+ "prefix_list": "pf_list_1"
+ }
+ "ipv6": {
+ "prefix_list": "pf_list_1"
+ }
+
+ "large-community-list": "{
+ "id": "community_1",
+ "exact_match": True
+ }
+ "community": {
+ "id": "community_2",
+ "exact_match": True
+ }
+ "tag": "tag_id"
+ },
+ "set": {
+ "localpref": 150,
+ "med": 30,
+ "aspath": {
+ "num": 20000,
+ "action": "prepend",
+ },
+ "weight": 500,
+ "community": {
+ "num": "1:2 2:3",
+ "action": additive
+ }
+ "large_community": {
+ "num": "1:2:3 4:5;6",
+ "action": additive
+ },
+ }
+ }
+ ]
+ }
+ }
+ }
+
+ Returns
+ -------
+ errormsg(str) or True
+ """
+
+ result = False
+ logger.debug("Entering lib API: create_route_maps()")
+
+ try:
+ for router in input_dict.keys():
+ if "route_maps" not in input_dict[router]:
+ errormsg = "route_maps not present in input_dict"
+ logger.info(errormsg)
+ continue
+ rmap_data = []
+ for rmap_name, rmap_value in \
+ input_dict[router]["route_maps"].iteritems():
+
+ for rmap_dict in rmap_value:
+ del_action = rmap_dict.setdefault("delete", False)
+
+ if del_action:
+ rmap_data.append("no route-map {}".format(rmap_name))
+ continue
+
+ if "action" not in rmap_dict:
+ errormsg = "action not present in input_dict"
+ logger.error(errormsg)
+ return False
+
+ rmap_action = rmap_dict.setdefault("action", "deny")
+
+ seq_id = rmap_dict.setdefault("seq_id", None)
+ if seq_id is None:
+ seq_id = get_seq_id("route_maps", router, rmap_name)
+ else:
+ set_seq_id("route_maps", router, seq_id, rmap_name)
+
+ rmap_data.append("route-map {} {} {}".format(
+ rmap_name, rmap_action, seq_id
+ ))
+
+ # Verifying if SET criteria is defined
+ if "set" in rmap_dict:
+ set_data = rmap_dict["set"]
+
+ local_preference = set_data.setdefault("localpref",
+ None)
+ metric = set_data.setdefault("med", None)
+ as_path = set_data.setdefault("aspath", {})
+ weight = set_data.setdefault("weight", None)
+ community = set_data.setdefault("community", {})
+ large_community = set_data.setdefault(
+ "large_community", {})
+ set_action = set_data.setdefault("set_action", None)
+
+ # Local Preference
+ if local_preference:
+ rmap_data.append("set local-preference {}".
+ format(local_preference))
+
+ # Metric
+ if metric:
+ rmap_data.append("set metric {} \n".format(metric))
+
+ # AS Path Prepend
+ if as_path:
+ as_num = as_path.setdefault("as_num", None)
+ as_action = as_path.setdefault("as_action", None)
+ if as_action and as_num:
+ rmap_data.append("set as-path {} {}".
+ format(as_action, as_num))
+
+ # Community
+ if community:
+ num = community.setdefault("num", None)
+ comm_action = community.setdefault("action", None)
+ if num:
+ cmd = "set community {}".format(num)
+ if comm_action:
+ cmd = "{} {}".format(cmd, comm_action)
+ rmap_data.append(cmd)
+ else:
+ logger.error("In community, AS Num not"
+ " provided")
+ return False
+
+ if large_community:
+ num = large_community.setdefault("num", None)
+ comm_action = large_community.setdefault("action",
+ None)
+ if num:
+ cmd = "set large-community {}".format(num)
+ if comm_action:
+ cmd = "{} {}".format(cmd, comm_action)
+
+ rmap_data.append(cmd)
+ else:
+ logger.errror("In large_community, AS Num not"
+ " provided")
+ return False
+
+ # Weight
+ if weight:
+ rmap_data.append("set weight {} \n".format(
+ weight))
+
+ # Adding MATCH and SET sequence to RMAP if defined
+ if "match" in rmap_dict:
+ match_data = rmap_dict["match"]
+ ipv4_data = match_data.setdefault("ipv4", {})
+ ipv6_data = match_data.setdefault("ipv6", {})
+ community = match_data.setdefault("community-list",
+ {})
+ large_community = match_data.setdefault(
+ "large-community-list", {}
+ )
+ tag = match_data.setdefault("tag", None)
+
+ if ipv4_data:
+ prefix_name = ipv4_data.setdefault("prefix_lists",
+ None)
+ if prefix_name:
+ rmap_data.append("match ip address prefix-list"
+ " {}".format(prefix_name))
+ if ipv6_data:
+ prefix_name = ipv6_data.setdefault("prefix_lists",
+ None)
+ if prefix_name:
+ rmap_data.append("match ipv6 address "
+ "prefix-list {}".
+ format(prefix_name))
+ if tag:
+ rmap_data.append("match tag {}".format(tag))
+
+ if community:
+ if "id" not in community:
+ logger.error("'id' is mandatory for "
+ "community-list in match"
+ " criteria")
+ return False
+ cmd = "match community {}".format(community["id"])
+ exact_match = community.setdefault("exact_match",
+ False)
+ if exact_match:
+ cmd = "{} exact-match".format(cmd)
+
+ rmap_data.append(cmd)
+
+ if large_community:
+ if "id" not in large_community:
+ logger.error("'num' is mandatory for "
+ "large-community-list in match "
+ "criteria")
+ return False
+ cmd = "match large-community {}".format(
+ large_community["id"])
+ exact_match = large_community.setdefault(
+ "exact_match", False)
+ if exact_match:
+ cmd = "{} exact-match".format(cmd)
+
+ rmap_data.append(cmd)
+
+ result = create_common_configuration(tgen, router,
+ rmap_data,
+ "route_maps",
+ build=build)
+
+ except InvalidCLIError:
+ # Traceback
+ errormsg = traceback.format_exc()
+ logger.error(errormsg)
+ return errormsg
+
+ logger.debug("Exiting lib API: create_prefix_lists()")
+ return result
+
+
+#############################################
+# Verification APIs
+#############################################
+def verify_rib(tgen, addr_type, dut, input_dict, next_hop=None, protocol=None):
+ """
+ Data will be read from input_dict or input JSON file, API will generate
+ same prefixes, which were redistributed by either create_static_routes() or
+ advertise_networks_using_network_command() and do will verify next_hop and
+ each prefix/routes is present in "show ip/ipv6 route {bgp/stataic} json"
+ command o/p.
+
+ Parameters
+ ----------
+ * `tgen` : topogen object
+ * `addr_type` : ip type, ipv4/ipv6
+ * `dut`: Device Under Test, for which user wants to test the data
+ * `input_dict` : input dict, has details of static routes
+ * `next_hop`[optional]: next_hop which needs to be verified,
+ default: static
+ * `protocol`[optional]: protocol, default = None
+
+ Usage
+ -----
+ # RIB can be verified for static routes OR network advertised using
+ network command. Following are input_dicts to create static routes
+ and advertise networks using network command. Any one of the input_dict
+ can be passed to verify_rib() to verify routes in DUT"s RIB.
+
+ # Creating static routes for r1
+ input_dict = {
+ "r1": {
+ "static_routes": [{"network": "10.0.20.1/32", "no_of_ip": 9, \
+ "admin_distance": 100, "next_hop": "10.0.0.2", "tag": 4001}]
+ }}
+ # Advertising networks using network command in router r1
+ input_dict = {
+ "r1": {
+ "advertise_networks": [{"start_ip": "20.0.0.0/32",
+ "no_of_network": 10},
+ {"start_ip": "30.0.0.0/32"}]
+ }}
+ # Verifying ipv4 routes in router r1 learned via BGP
+ dut = "r2"
+ protocol = "bgp"
+ result = verify_rib(tgen, "ipv4", dut, input_dict, protocol = protocol)
+
+ Returns
+ -------
+ errormsg(str) or True
+ """
+
+ logger.info("Entering lib API: verify_rib()")
+
+ router_list = tgen.routers()
+ for routerInput in input_dict.keys():
+ for router, rnode in router_list.iteritems():
+ if router != dut:
+ continue
+
+ # Verifying RIB routes
+ if addr_type == "ipv4":
+ if protocol:
+ command = "show ip route {} json".format(protocol)
+ else:
+ command = "show ip route json"
+ else:
+ if protocol:
+ command = "show ipv6 route {} json".format(protocol)
+ else:
+ command = "show ipv6 route json"
+
+ sleep(2)
+ logger.info("Checking router %s RIB:", router)
+ rib_routes_json = rnode.vtysh_cmd(command, isjson=True)
+
+ # Verifying output dictionary rib_routes_json is not empty
+ if bool(rib_routes_json) is False:
+ errormsg = "No {} route found in rib of router {}..". \
+ format(protocol, router)
+ return errormsg
+
+ if "static_routes" in input_dict[routerInput]:
+ static_routes = input_dict[routerInput]["static_routes"]
+ st_found = False
+ nh_found = False
+ found_routes = []
+ missing_routes = []
+ for static_route in static_routes:
+ network = static_route["network"]
+ if "no_of_ip" in static_route:
+ no_of_ip = static_route["no_of_ip"]
+ else:
+ no_of_ip = 0
+
+ # Generating IPs for verification
+ ip_list = generate_ips(network, no_of_ip)
+ for st_rt in ip_list:
+ st_rt = str(ipaddr.IPNetwork(unicode(st_rt)))
+
+ if st_rt in rib_routes_json:
+ st_found = True
+ found_routes.append(st_rt)
+
+ if next_hop:
+ if type(next_hop) is not list:
+ next_hop = [next_hop]
+
+ found_hops = [rib_r["ip"] for rib_r in
+ rib_routes_json[st_rt][0][
+ "nexthops"]]
+ for nh in next_hop:
+ nh_found = False
+ if nh and nh in found_hops:
+ nh_found = True
+ else:
+ errormsg = ("Nexthop {} is Missing for {}"
+ " route {} in RIB of router"
+ " {}\n".format(next_hop,
+ protocol,
+ st_rt, dut))
+
+ return errormsg
+ else:
+ missing_routes.append(st_rt)
+ if nh_found:
+ logger.info("Found next_hop %s for all routes in RIB of"
+ " router %s\n", next_hop, dut)
+
+ if not st_found and len(missing_routes) > 0:
+ errormsg = "Missing route in RIB of router {}, routes: " \
+ "{}\n".format(dut, missing_routes)
+ return errormsg
+
+ logger.info("Verified routes in router %s RIB, found routes"
+ " are: %s\n", dut, found_routes)
+
+ advertise_network = input_dict[routerInput].setdefault(
+ "advertise_networks", {})
+ if advertise_network:
+ found_routes = []
+ missing_routes = []
+ found = False
+ for advertise_network_dict in advertise_network:
+ start_ip = advertise_network_dict["network"]
+ if "no_of_network" in advertise_network_dict:
+ no_of_network = advertise_network_dict["no_of_network"]
+ else:
+ no_of_network = 0
+
+ # Generating IPs for verification
+ ip_list = generate_ips(start_ip, no_of_network)
+ for st_rt in ip_list:
+ st_rt = str(ipaddr.IPNetwork(unicode(st_rt)))
+
+ if st_rt in rib_routes_json:
+ found = True
+ found_routes.append(st_rt)
+ else:
+ missing_routes.append(st_rt)
+
+ if not found and len(missing_routes) > 0:
+ errormsg = "Missing route in RIB of router {}, are: {}" \
+ " \n".format(dut, missing_routes)
+ return errormsg
+
+ logger.info("Verified routes in router %s RIB, found routes"
+ " are: %s", dut, found_routes)
+
+ logger.info("Exiting lib API: verify_rib()")
+ return True
+
+
+def verify_admin_distance_for_static_routes(tgen, input_dict):
+ """
+ API to verify admin distance for static routes as defined in input_dict/
+ input JSON by running show ip/ipv6 route json command.
+
+ Parameter
+ ---------
+ * `tgen` : topogen object
+ * `input_dict`: having details like - for which router and static routes
+ admin dsitance needs to be verified
+ Usage
+ -----
+ # To verify admin distance is 10 for prefix 10.0.20.1/32 having next_hop
+ 10.0.0.2 in router r1
+ input_dict = {
+ "r1": {
+ "static_routes": [{
+ "network": "10.0.20.1/32",
+ "admin_distance": 10,
+ "next_hop": "10.0.0.2"
+ }]
+ }
+ }
+ result = verify_admin_distance_for_static_routes(tgen, input_dict)
+
+ Returns
+ -------
+ errormsg(str) or True
+ """
+
+ logger.info("Entering lib API: verify_admin_distance_for_static_routes()")
+
+ for router in input_dict.keys():
+ if router not in tgen.routers():
+ continue
+
+ rnode = tgen.routers()[router]
+
+ for static_route in input_dict[router]["static_routes"]:
+ addr_type = validate_ip_address(static_route["network"])
+ # Command to execute
+ if addr_type == "ipv4":
+ command = "show ip route json"
+ else:
+ command = "show ipv6 route json"
+ show_ip_route_json = rnode.vtysh_cmd(command, isjson=True)
+
+ logger.info("Verifying admin distance for static route %s"
+ " under dut %s:", static_route, router)
+ network = static_route["network"]
+ next_hop = static_route["next_hop"]
+ admin_distance = static_route["admin_distance"]
+ route_data = show_ip_route_json[network][0]
+ if network in show_ip_route_json:
+ if route_data["nexthops"][0]["ip"] == next_hop:
+ if route_data["distance"] != admin_distance:
+ errormsg = ("Verification failed: admin distance"
+ " for static route {} under dut {},"
+ " found:{} but expected:{}".
+ format(static_route, router,
+ route_data["distance"],
+ admin_distance))
+ return errormsg
+ else:
+ logger.info("Verification successful: admin"
+ " distance for static route %s under"
+ " dut %s, found:%s", static_route,
+ router, route_data["distance"])
+
+ else:
+ errormsg = ("Static route {} not found in "
+ "show_ip_route_json for dut {}".
+ format(network, router))
+ return errormsg
+
+ logger.info("Exiting lib API: verify_admin_distance_for_static_routes()")
+ return True
+
+
+def verify_prefix_lists(tgen, input_dict):
+ """
+ Running "show ip prefix-list" command and verifying given prefix-list
+ is present in router.
+
+ Parameters
+ ----------
+ * `tgen` : topogen object
+ * `input_dict`: data to verify prefix lists
+
+ Usage
+ -----
+ # To verify pf_list_1 is present in router r1
+ input_dict = {
+ "r1": {
+ "prefix_lists": ["pf_list_1"]
+ }}
+ result = verify_prefix_lists("ipv4", input_dict, tgen)
+
+ Returns
+ -------
+ errormsg(str) or True
+ """
+
+ logger.info("Entering lib API: verify_prefix_lists()")
+
+ for router in input_dict.keys():
+ if router not in tgen.routers():
+ continue
+
+ rnode = tgen.routers()[router]
+
+ # Show ip prefix list
+ show_prefix_list = rnode.vtysh_cmd("show ip prefix-list")
+
+ # Verify Prefix list is deleted
+ prefix_lists_addr = input_dict[router]["prefix_lists"]
+ for addr_type in prefix_lists_addr:
+ if not check_address_types(addr_type):
+ continue
+
+ for prefix_list in prefix_lists_addr[addr_type].keys():
+ if prefix_list in show_prefix_list:
+ errormsg = ("Prefix list {} is not deleted from router"
+ " {}".format(prefix_list, router))
+ return errormsg
+
+ logger.info("Prefix list %s is/are deleted successfully"
+ " from router %s", prefix_list, router)
+
+ logger.info("Exiting lib API: verify_prefix_lissts()")
+ return True