diff options
Diffstat (limited to 'tests/topotests/lib/common_config.py')
| -rw-r--r-- | tests/topotests/lib/common_config.py | 724 |
1 files changed, 389 insertions, 335 deletions
diff --git a/tests/topotests/lib/common_config.py b/tests/topotests/lib/common_config.py index fc7581b1f2..5ee59070cc 100644 --- a/tests/topotests/lib/common_config.py +++ b/tests/topotests/lib/common_config.py @@ -79,9 +79,9 @@ if config.has_option("topogen", "frrtest_log_dir"): frrtest_log_file = frrtest_log_dir + logfile_name + str(time_stamp) print("frrtest_log_file..", frrtest_log_file) - logger = logger_config.get_logger(name="test_execution_logs", - log_level=loglevel, - target=frrtest_log_file) + logger = logger_config.get_logger( + name="test_execution_logs", log_level=loglevel, target=frrtest_log_file + ) print("Logs will be sent to logfile: {}".format(frrtest_log_file)) if config.has_option("topogen", "show_router_config"): @@ -94,10 +94,7 @@ ADDRESS_TYPES = os.environ.get("ADDRESS_TYPES") # Saves sequence id numbers -SEQ_ID = { - "prefix_lists": {}, - "route_maps": {} -} +SEQ_ID = {"prefix_lists": {}, "route_maps": {}} def get_seq_id(obj_type, router, obj_name): @@ -145,6 +142,7 @@ def set_seq_id(obj_type, router, id, obj_name): class InvalidCLIError(Exception): """Raise when the CLI command is wrong""" + pass @@ -169,16 +167,19 @@ def run_frr_cmd(rnode, cmd, isjson=False): else: print_data = ret_data - logger.info('Output for command [ %s] on router %s:\n%s', - cmd.rstrip("json"), rnode.name, print_data) + logger.info( + "Output for command [ %s] on router %s:\n%s", + cmd.rstrip("json"), + rnode.name, + print_data, + ) return ret_data else: - raise InvalidCLIError('No actual cmd passed') + raise InvalidCLIError("No actual cmd passed") -def create_common_configuration(tgen, router, data, config_type=None, - build=False): +def create_common_configuration(tgen, router, data, config_type=None, build=False): """ API to create object of class FRRConfig and also create frr_json.conf file. It will create interface and common configurations and save it to @@ -201,15 +202,17 @@ def create_common_configuration(tgen, router, data, config_type=None, fname = "{}/{}/{}".format(TMPDIR, router, FRRCFG_FILE) - config_map = OrderedDict({ - "general_config": "! FRR General Config\n", - "interface_config": "! Interfaces Config\n", - "static_route": "! Static Route Config\n", - "prefix_list": "! Prefix List Config\n", - "bgp_community_list": "! Community List Config\n", - "route_maps": "! Route Maps Config\n", - "bgp": "! BGP Config\n" - }) + config_map = OrderedDict( + { + "general_config": "! FRR General Config\n", + "interface_config": "! Interfaces Config\n", + "static_route": "! Static Route Config\n", + "prefix_list": "! Prefix List Config\n", + "bgp_community_list": "! Community List Config\n", + "route_maps": "! Route Maps Config\n", + "bgp": "! BGP Config\n", + } + ) if build: mode = "a" @@ -225,8 +228,9 @@ def create_common_configuration(tgen, router, data, config_type=None, frr_cfg_fd.write("\n") except IOError as err: - logger.error("Unable to open FRR Config File. error(%s): %s" % - (err.errno, err.strerror)) + logger.error( + "Unable to open FRR Config File. error(%s): %s" % (err.errno, err.strerror) + ) return False finally: frr_cfg_fd.close() @@ -257,8 +261,7 @@ def reset_config_on_routers(tgen, routerName=None): continue router = router_list[rname] - logger.info("Configuring router %s to initial test configuration", - rname) + logger.info("Configuring router %s to initial test configuration", rname) cfg = router.run("vtysh -c 'show running'") fname = "{}/{}/frr.sav".format(TMPDIR, rname) dname = "{}/{}/delta.conf".format(TMPDIR, rname) @@ -266,9 +269,11 @@ def reset_config_on_routers(tgen, routerName=None): for line in cfg.split("\n"): line = line.strip() - if (line == "Building configuration..." or - line == "Current configuration:" or - not line): + if ( + line == "Building configuration..." + or line == "Current configuration:" + or not line + ): continue f.write(line) f.write("\n") @@ -279,37 +284,39 @@ def reset_config_on_routers(tgen, routerName=None): init_cfg_file = "{}/{}/frr_json_initial.conf".format(TMPDIR, rname) tempdir = mkdtemp() - with open(os.path.join(tempdir, 'vtysh.conf'), 'w') as fd: + with open(os.path.join(tempdir, "vtysh.conf"), "w") as fd: pass - command = "/usr/lib/frr/frr-reload.py --confdir {} --input {} --test {} > {}". \ - format(tempdir, run_cfg_file, init_cfg_file, dname) - result = call(command, shell=True, stderr=SUB_STDOUT, - stdout=SUB_PIPE) + command = "/usr/lib/frr/frr-reload.py --confdir {} --input {} --test {} > {}".format( + tempdir, run_cfg_file, init_cfg_file, dname + ) + result = call(command, shell=True, stderr=SUB_STDOUT, stdout=SUB_PIPE) - os.unlink(os.path.join(tempdir, 'vtysh.conf')) + os.unlink(os.path.join(tempdir, "vtysh.conf")) os.rmdir(tempdir) # Assert if command fail if result > 0: - logger.error("Delta file creation failed. Command executed %s", - command) - with open(run_cfg_file, 'r') as fd: - logger.info('Running configuration saved in %s is:\n%s', - run_cfg_file, fd.read()) - with open(init_cfg_file, 'r') as fd: - logger.info('Test configuration saved in %s is:\n%s', - init_cfg_file, fd.read()) - - err_cmd = ['/usr/bin/vtysh', '-m', '-f', run_cfg_file] + logger.error("Delta file creation failed. Command executed %s", command) + with open(run_cfg_file, "r") as fd: + logger.info( + "Running configuration saved in %s is:\n%s", run_cfg_file, fd.read() + ) + with open(init_cfg_file, "r") as fd: + logger.info( + "Test configuration saved in %s is:\n%s", init_cfg_file, fd.read() + ) + + err_cmd = ["/usr/bin/vtysh", "-m", "-f", run_cfg_file] result = Popen(err_cmd, stdout=SUB_PIPE, stderr=SUB_PIPE) output = result.communicate() for out_data in output: - temp_data = out_data.decode('utf-8').lower() + temp_data = out_data.decode("utf-8").lower() for out_err in ERROR_LIST: if out_err.lower() in temp_data: - logger.error("Found errors while validating data in" - " %s", run_cfg_file) + logger.error( + "Found errors while validating data in" " %s", run_cfg_file + ) raise InvalidCLIError(out_data) raise InvalidCLIError("Unknown error in %s", output) @@ -319,18 +326,19 @@ def reset_config_on_routers(tgen, routerName=None): t_delta = f.read() for line in t_delta.split("\n"): line = line.strip() - if (line == "Lines To Delete" or - line == "===============" or - line == "Lines To Add" or - line == "============" or - not line): + if ( + line == "Lines To Delete" + or line == "===============" + or line == "Lines To Add" + or line == "============" + or not line + ): continue delta.write(line) delta.write("\n") delta.write("end\n") - output = router.vtysh_multicmd(delta.getvalue(), - pretty_output=False) + output = router.vtysh_multicmd(delta.getvalue(), pretty_output=False) delta.close() delta = StringIO.StringIO() @@ -343,8 +351,7 @@ def reset_config_on_routers(tgen, routerName=None): # Router current configuration to log file or console if # "show_router_config" is defined in "pytest.ini" if show_router_config: - logger.info("Configuration on router {} after config reset:". - format(rname)) + logger.info("Configuration on router {} after config reset:".format(rname)) logger.info(delta.getvalue()) delta.close() @@ -373,12 +380,13 @@ def load_config_to_router(tgen, routerName, save_bkup=False): router = router_list[rname] try: frr_cfg_file = "{}/{}/{}".format(TMPDIR, rname, FRRCFG_FILE) - frr_cfg_bkup = "{}/{}/{}".format(TMPDIR, rname, - FRRCFG_BKUP_FILE) + frr_cfg_bkup = "{}/{}/{}".format(TMPDIR, rname, FRRCFG_BKUP_FILE) with open(frr_cfg_file, "r+") as cfg: data = cfg.read() - logger.info("Applying following configuration on router" - " {}:\n{}".format(rname, data)) + logger.info( + "Applying following configuration on router" + " {}:\n{}".format(rname, data) + ) if save_bkup: with open(frr_cfg_bkup, "w") as bkup: bkup.write(data) @@ -390,8 +398,10 @@ def load_config_to_router(tgen, routerName, save_bkup=False): cfg.truncate(0) except IOError as err: - errormsg = ("Unable to open config File. error(%s):" - " %s", (err.errno, err.strerror)) + errormsg = ( + "Unable to open config File. error(%s):" " %s", + (err.errno, err.strerror), + ) return errormsg # Router current configuration to log file or console if @@ -418,8 +428,9 @@ def start_topology(tgen): # Starting deamons router_list = tgen.routers() - ROUTER_LIST = sorted(router_list.keys(), - key=lambda x: int(re_search('\d+', x).group(0))) + ROUTER_LIST = sorted( + router_list.keys(), key=lambda x: int(re_search("\d+", x).group(0)) + ) TMPDIR = os.path.join(LOGDIR, tgen.modname) router_list = tgen.routers() @@ -430,31 +441,27 @@ def start_topology(tgen): # Creating router named dir and empty zebra.conf bgpd.conf files # inside the current directory - if os.path.isdir('{}'.format(rname)): + if os.path.isdir("{}".format(rname)): os.system("rm -rf {}".format(rname)) - os.mkdir('{}'.format(rname)) - os.system('chmod -R go+rw {}'.format(rname)) - os.chdir('{}/{}'.format(TMPDIR, rname)) - os.system('touch zebra.conf bgpd.conf') + os.mkdir("{}".format(rname)) + os.system("chmod -R go+rw {}".format(rname)) + os.chdir("{}/{}".format(TMPDIR, rname)) + os.system("touch zebra.conf bgpd.conf") else: - os.mkdir('{}'.format(rname)) - os.system('chmod -R go+rw {}'.format(rname)) - os.chdir('{}/{}'.format(TMPDIR, rname)) - os.system('touch zebra.conf bgpd.conf') + os.mkdir("{}".format(rname)) + os.system("chmod -R go+rw {}".format(rname)) + os.chdir("{}/{}".format(TMPDIR, rname)) + os.system("touch zebra.conf bgpd.conf") except IOError as (errno, strerror): logger.error("I/O error({0}): {1}".format(errno, strerror)) # Loading empty zebra.conf file to router, to start the zebra deamon router.load_config( - TopoRouter.RD_ZEBRA, - '{}/{}/zebra.conf'.format(TMPDIR, rname) + TopoRouter.RD_ZEBRA, "{}/{}/zebra.conf".format(TMPDIR, rname) ) # Loading empty bgpd.conf file to router, to start the bgp deamon - router.load_config( - TopoRouter.RD_BGP, - '{}/{}/bgpd.conf'.format(TMPDIR, rname) - ) + router.load_config(TopoRouter.RD_BGP, "{}/{}/bgpd.conf".format(TMPDIR, rname)) # Starting routers logger.info("Starting all routers once topology is created") @@ -483,6 +490,7 @@ def number_to_column(routerName): # Common APIs, will be used by all protocols ############################################# + def validate_ip_address(ip_address): """ Validates the type of ip address @@ -518,8 +526,9 @@ def validate_ip_address(ip_address): return "ipv6" if not v4 and not v6: - raise Exception("InvalidIpAddr", "%s is neither valid IPv4 or IPv6" - " address" % ip_address) + raise Exception( + "InvalidIpAddr", "%s is neither valid IPv4 or IPv6" " address" % ip_address + ) def check_address_types(addr_type=None): @@ -542,8 +551,11 @@ def check_address_types(addr_type=None): return addr_types if addr_type not in addr_types: - logger.error("{} not in supported/configured address types {}". - format(addr_type, addr_types)) + logger.error( + "{} not in supported/configured address types {}".format( + addr_type, addr_types + ) + ) return False return True @@ -589,8 +601,7 @@ def generate_ips(network, no_of_ips): return ipaddress_list -def find_interface_with_greater_ip(topo, router, loopback=True, - interface=True): +def find_interface_with_greater_ip(topo, router, loopback=True, interface=True): """ Returns highest interface ip for ipv4/ipv6. If loopback is there then it will return highest IP from loopback IPs otherwise from physical @@ -608,12 +619,14 @@ def find_interface_with_greater_ip(topo, router, loopback=True, if loopback: if "type" in data and data["type"] == "loopback": lo_exists = True - ip_address = topo["routers"][router]["links"][ - destRouterLink]["ipv4"].split("/")[0] + ip_address = topo["routers"][router]["links"][destRouterLink][ + "ipv4" + ].split("/")[0] lo_list.append(ip_address) if interface: - ip_address = topo["routers"][router]["links"][ - destRouterLink]["ipv4"].split("/")[0] + ip_address = topo["routers"][router]["links"][destRouterLink]["ipv4"].split( + "/" + )[0] interfaces_list.append(ip_address) if lo_exists: @@ -625,17 +638,17 @@ def find_interface_with_greater_ip(topo, router, loopback=True, def write_test_header(tc_name): """ Display message at beginning of test case""" count = 20 - logger.info("*"*(len(tc_name)+count)) + logger.info("*" * (len(tc_name) + count)) step("START -> Testcase : %s" % tc_name, reset=True) - logger.info("*"*(len(tc_name)+count)) + logger.info("*" * (len(tc_name) + count)) def write_test_footer(tc_name): """ Display message at end of test case""" count = 21 - logger.info("="*(len(tc_name)+count)) + logger.info("=" * (len(tc_name) + count)) logger.info("Testcase : %s -> PASSED", tc_name) - logger.info("="*(len(tc_name)+count)) + logger.info("=" * (len(tc_name) + count)) def interface_status(tgen, topo, input_dict): @@ -664,8 +677,8 @@ def interface_status(tgen, topo, input_dict): global frr_cfg for router in input_dict.keys(): - interface_list = input_dict[router]['interface_list'] - status = input_dict[router].setdefault('status', 'up') + interface_list = input_dict[router]["interface_list"] + status = input_dict[router].setdefault("status", "up") for intf in interface_list: rnode = tgen.routers()[router] interface_set_status(rnode, intf, status) @@ -698,11 +711,10 @@ def retry(attempts=3, wait=2, return_is_str=True, initial_wait=0): """ def _retry(func): - @wraps(func) def func_retry(*args, **kwargs): - _wait = kwargs.pop('wait', wait) - _attempts = kwargs.pop('attempts', attempts) + _wait = kwargs.pop("wait", wait) + _attempts = kwargs.pop("attempts", attempts) _attempts = int(_attempts) if _attempts < 0: raise ValueError("attempts must be 0 or greater") @@ -711,11 +723,11 @@ def retry(attempts=3, wait=2, return_is_str=True, initial_wait=0): logger.info("Waiting for [%s]s as initial delay", initial_wait) sleep(initial_wait) - _return_is_str = kwargs.pop('return_is_str', return_is_str) + _return_is_str = kwargs.pop("return_is_str", return_is_str) for i in range(1, _attempts + 1): try: - _expected = kwargs.setdefault('expected', True) - kwargs.pop('expected') + _expected = kwargs.setdefault("expected", True) + kwargs.pop("expected") ret = func(*args, **kwargs) logger.debug("Function returned %s" % ret) if return_is_str and isinstance(ret, bool) and _expected: @@ -727,17 +739,17 @@ def retry(attempts=3, wait=2, return_is_str=True, initial_wait=0): return ret except Exception as err: if _attempts == i: - logger.info("Max number of attempts (%r) reached", - _attempts) + logger.info("Max number of attempts (%r) reached", _attempts) raise else: logger.info("Function returned %s", err) if i < _attempts: - logger.info("Retry [#%r] after sleeping for %ss" - % (i, _wait)) + logger.info("Retry [#%r] after sleeping for %ss" % (i, _wait)) sleep(_wait) + func_retry._original = func return func_retry + return _retry @@ -745,6 +757,7 @@ class Stepper: """ Prints step number for the test case step being executed """ + count = 1 def __call__(self, msg, reset): @@ -795,24 +808,17 @@ def create_interfaces_cfg(tgen, topo, build=False): interface_name = destRouterLink else: interface_name = data["interface"] - interface_data.append("interface {}".format( - str(interface_name) - )) + interface_data.append("interface {}".format(str(interface_name))) if "ipv4" in data: intf_addr = c_data["links"][destRouterLink]["ipv4"] - interface_data.append("ip address {}".format( - intf_addr - )) + interface_data.append("ip address {}".format(intf_addr)) if "ipv6" in data: intf_addr = c_data["links"][destRouterLink]["ipv6"] - interface_data.append("ipv6 address {}".format( - intf_addr - )) - - result = create_common_configuration(tgen, c_router, - interface_data, - "interface_config", - build=build) + interface_data.append("ipv6 address {}".format(intf_addr)) + + result = create_common_configuration( + tgen, c_router, interface_data, "interface_config", build=build + ) except InvalidCLIError: # Traceback errormsg = traceback.format_exc() @@ -880,13 +886,10 @@ def create_static_routes(tgen, input_dict, build=False): del_action = static_route.setdefault("delete", False) # No of IPs no_of_ip = static_route.setdefault("no_of_ip", 1) - admin_distance = static_route.setdefault("admin_distance", - None) + admin_distance = static_route.setdefault("admin_distance", None) tag = static_route.setdefault("tag", None) - if "next_hop" not in static_route or \ - "network" not in static_route: - errormsg = "'next_hop' or 'network' missing in" \ - " input_dict" + if "next_hop" not in static_route or "network" not in static_route: + errormsg = "'next_hop' or 'network' missing in" " input_dict" return errormsg next_hop = static_route["next_hop"] @@ -914,10 +917,9 @@ def create_static_routes(tgen, input_dict, build=False): static_routes_list.append(cmd) - result = create_common_configuration(tgen, router, - static_routes_list, - "static_route", - build=build) + result = create_common_configuration( + tgen, router, static_routes_list, "static_route", build=build + ) except InvalidCLIError: # Traceback @@ -992,10 +994,8 @@ def create_prefix_lists(tgen, input_dict, build=False): for prefix_name, prefix_list in prefix_data.iteritems(): for prefix_dict in prefix_list: - if "action" not in prefix_dict or \ - "network" not in prefix_dict: - errormsg = "'action' or network' missing in" \ - " input_dict" + if "action" not in prefix_dict or "network" not in prefix_dict: + errormsg = "'action' or network' missing in" " input_dict" return errormsg network_addr = prefix_dict["network"] @@ -1005,11 +1005,9 @@ def create_prefix_lists(tgen, input_dict, build=False): seqid = prefix_dict.setdefault("seqid", None) del_action = prefix_dict.setdefault("delete", False) if seqid is None: - seqid = get_seq_id("prefix_lists", router, - prefix_name) + seqid = get_seq_id("prefix_lists", router, prefix_name) else: - set_seq_id("prefix_lists", router, seqid, - prefix_name) + set_seq_id("prefix_lists", router, seqid, prefix_name) if addr_type == "ipv4": protocol = "ip" @@ -1028,10 +1026,9 @@ def create_prefix_lists(tgen, input_dict, build=False): cmd = "no {}".format(cmd) config_data.append(cmd) - result = create_common_configuration(tgen, router, - config_data, - "prefix_list", - build=build) + result = create_common_configuration( + tgen, router, config_data, "prefix_list", build=build + ) except InvalidCLIError: # Traceback @@ -1101,9 +1098,9 @@ def create_route_maps(tgen, input_dict, build=False): "tag": "tag_id" }, "set": { - "localpref": 150, - "med": 30, - "aspath": { + "locPrf": 150, + "metric": 30, + "path": { "num": 20000, "action": "prepend", }, @@ -1137,8 +1134,7 @@ def create_route_maps(tgen, input_dict, build=False): logger.debug("route_maps not present in input_dict") continue rmap_data = [] - for rmap_name, rmap_value in \ - input_dict[router]["route_maps"].iteritems(): + for rmap_name, rmap_value in input_dict[router]["route_maps"].iteritems(): for rmap_dict in rmap_value: del_action = rmap_dict.setdefault("delete", False) @@ -1160,38 +1156,39 @@ def create_route_maps(tgen, input_dict, build=False): else: set_seq_id("route_maps", router, seq_id, rmap_name) - rmap_data.append("route-map {} {} {}".format( - rmap_name, rmap_action, seq_id - )) + rmap_data.append( + "route-map {} {} {}".format(rmap_name, rmap_action, seq_id) + ) if "continue" in rmap_dict: continue_to = rmap_dict["continue"] if continue_to: - rmap_data.append("on-match goto {}". - format(continue_to)) + rmap_data.append("on-match goto {}".format(continue_to)) else: - logger.error("In continue, 'route-map entry " - "sequence number' is not provided") + logger.error( + "In continue, 'route-map entry " + "sequence number' is not provided" + ) return False if "goto" in rmap_dict: go_to = rmap_dict["goto"] if go_to: - rmap_data.append("on-match goto {}". - format(go_to)) + rmap_data.append("on-match goto {}".format(go_to)) else: - logger.error("In goto, 'Goto Clause number' is not" - " provided") + logger.error( + "In goto, 'Goto Clause number' is not" " provided" + ) return False if "call" in rmap_dict: call_rmap = rmap_dict["call"] if call_rmap: - rmap_data.append("call {}". - format(call_rmap)) + rmap_data.append("call {}".format(call_rmap)) else: - logger.error("In call, 'destination Route-Map' is" - " not provided") + logger.error( + "In call, 'destination Route-Map' is" " not provided" + ) return False # Verifying if SET criteria is defined @@ -1199,24 +1196,22 @@ def create_route_maps(tgen, input_dict, build=False): set_data = rmap_dict["set"] ipv4_data = set_data.setdefault("ipv4", {}) ipv6_data = set_data.setdefault("ipv6", {}) - local_preference = set_data.setdefault("localpref", - None) - metric = set_data.setdefault("med", None) - as_path = set_data.setdefault("aspath", {}) + local_preference = set_data.setdefault("locPrf", None) + metric = set_data.setdefault("metric", None) + as_path = set_data.setdefault("path", {}) weight = set_data.setdefault("weight", None) community = set_data.setdefault("community", {}) - large_community = set_data.setdefault( - "large_community", {}) - large_comm_list = set_data.setdefault( - "large_comm_list", {}) + large_community = set_data.setdefault("large_community", {}) + large_comm_list = set_data.setdefault("large_comm_list", {}) set_action = set_data.setdefault("set_action", None) nexthop = set_data.setdefault("nexthop", None) origin = set_data.setdefault("origin", None) # Local Preference if local_preference: - rmap_data.append("set local-preference {}". - format(local_preference)) + rmap_data.append( + "set local-preference {}".format(local_preference) + ) # Metric if metric: @@ -1231,8 +1226,9 @@ def create_route_maps(tgen, input_dict, build=False): as_num = as_path.setdefault("as_num", None) as_action = as_path.setdefault("as_action", None) if as_action and as_num: - rmap_data.append("set as-path {} {}". - format(as_action, as_num)) + rmap_data.append( + "set as-path {} {}".format(as_action, as_num) + ) # Community if community: @@ -1244,14 +1240,12 @@ def create_route_maps(tgen, input_dict, build=False): cmd = "{} {}".format(cmd, comm_action) rmap_data.append(cmd) else: - logger.error("In community, AS Num not" - " provided") + logger.error("In community, AS Num not" " provided") return False if large_community: num = large_community.setdefault("num", None) - comm_action = large_community.setdefault("action", - None) + comm_action = large_community.setdefault("action", None) if num: cmd = "set large-community {}".format(num) if comm_action: @@ -1259,13 +1253,13 @@ def create_route_maps(tgen, input_dict, build=False): rmap_data.append(cmd) else: - logger.error("In large_community, AS Num not" - " provided") + logger.error( + "In large_community, AS Num not" " provided" + ) return False if large_comm_list: id = large_comm_list.setdefault("id", None) - del_comm = large_comm_list.setdefault("delete", - None) + del_comm = large_comm_list.setdefault("delete", None) if id: cmd = "set large-comm-list {}".format(id) if del_comm: @@ -1273,43 +1267,36 @@ def create_route_maps(tgen, input_dict, build=False): rmap_data.append(cmd) else: - logger.error("In large_comm_list 'id' not" - " provided") + logger.error("In large_comm_list 'id' not" " provided") return False # Weight if weight: - rmap_data.append("set weight {}".format( - weight)) + rmap_data.append("set weight {}".format(weight)) if ipv6_data: nexthop = ipv6_data.setdefault("nexthop", None) if nexthop: - rmap_data.append("set ipv6 next-hop {}".format( - nexthop - )) + rmap_data.append("set ipv6 next-hop {}".format(nexthop)) # Adding MATCH and SET sequence to RMAP if defined if "match" in rmap_dict: match_data = rmap_dict["match"] ipv4_data = match_data.setdefault("ipv4", {}) ipv6_data = match_data.setdefault("ipv6", {}) - community = match_data.setdefault( - "community_list",{}) - large_community = match_data.setdefault( - "large_community", {} - ) + community = match_data.setdefault("community_list", {}) + large_community = match_data.setdefault("large_community", {}) large_community_list = match_data.setdefault( "large_community_list", {} ) if ipv4_data: # fetch prefix list data from rmap - prefix_name = \ - ipv4_data.setdefault("prefix_lists", - None) + prefix_name = ipv4_data.setdefault("prefix_lists", None) if prefix_name: - rmap_data.append("match ip address" - " prefix-list {}".format(prefix_name)) + rmap_data.append( + "match ip address" + " prefix-list {}".format(prefix_name) + ) # fetch tag data from rmap tag = ipv4_data.setdefault("tag", None) @@ -1318,16 +1305,19 @@ def create_route_maps(tgen, input_dict, build=False): # fetch large community data from rmap large_community_list = ipv4_data.setdefault( - "large_community_list",{}) + "large_community_list", {} + ) large_community = match_data.setdefault( - "large_community", {}) + "large_community", {} + ) if ipv6_data: - prefix_name = ipv6_data.setdefault("prefix_lists", - None) + prefix_name = ipv6_data.setdefault("prefix_lists", None) if prefix_name: - rmap_data.append("match ipv6 address" - " prefix-list {}".format(prefix_name)) + rmap_data.append( + "match ipv6 address" + " prefix-list {}".format(prefix_name) + ) # fetch tag data from rmap tag = ipv6_data.setdefault("tag", None) @@ -1336,54 +1326,64 @@ def create_route_maps(tgen, input_dict, build=False): # fetch large community data from rmap large_community_list = ipv6_data.setdefault( - "large_community_list",{}) + "large_community_list", {} + ) large_community = match_data.setdefault( - "large_community", {}) + "large_community", {} + ) if community: if "id" not in community: - logger.error("'id' is mandatory for " - "community-list in match" - " criteria") + logger.error( + "'id' is mandatory for " + "community-list in match" + " criteria" + ) return False cmd = "match community {}".format(community["id"]) - exact_match = community.setdefault("exact_match", - False) + exact_match = community.setdefault("exact_match", False) if exact_match: cmd = "{} exact-match".format(cmd) rmap_data.append(cmd) if large_community: if "id" not in large_community: - logger.error("'id' is mandatory for " - "large-community-list in match " - "criteria") + logger.error( + "'id' is mandatory for " + "large-community-list in match " + "criteria" + ) return False cmd = "match large-community {}".format( - large_community["id"]) + large_community["id"] + ) exact_match = large_community.setdefault( - "exact_match", False) + "exact_match", False + ) if exact_match: cmd = "{} exact-match".format(cmd) rmap_data.append(cmd) if large_community_list: if "id" not in large_community_list: - logger.error("'id' is mandatory for " - "large-community-list in match " - "criteria") + logger.error( + "'id' is mandatory for " + "large-community-list in match " + "criteria" + ) return False cmd = "match large-community {}".format( - large_community_list["id"]) + large_community_list["id"] + ) exact_match = large_community_list.setdefault( - "exact_match", False) + "exact_match", False + ) if exact_match: cmd = "{} exact-match".format(cmd) rmap_data.append(cmd) - result = create_common_configuration(tgen, router, - rmap_data, - "route_maps", - build=build) + result = create_common_configuration( + tgen, router, rmap_data, "route_maps", build=build + ) except InvalidCLIError: # Traceback @@ -1424,12 +1424,7 @@ def delete_route_maps(tgen, input_dict): rmap_data = input_dict[router] rmap_data["route_maps"] = {} for route_map_name in route_maps: - rmap_data["route_maps"].update({ - route_map_name: - [{ - "delete": True - }] - }) + rmap_data["route_maps"].update({route_map_name: [{"delete": True}]}) return create_route_maps(tgen, input_dict) @@ -1478,10 +1473,9 @@ def create_bgp_community_lists(tgen, input_dict, build=False): community_list = input_dict[router]["bgp_community_lists"] for community_dict in community_list: del_action = community_dict.setdefault("delete", False) - community_type = community_dict.setdefault("community_type", - None) + community_type = community_dict.setdefault("community_type", None) action = community_dict.setdefault("action", None) - value = community_dict.setdefault("value", '') + value = community_dict.setdefault("value", "") large = community_dict.setdefault("large", None) name = community_dict.setdefault("name", None) if large: @@ -1490,28 +1484,30 @@ def create_bgp_community_lists(tgen, input_dict, build=False): cmd = "bgp community-list" if not large and not (community_type and action and value): - errormsg = "community_type, action and value are " \ - "required in bgp_community_list" + errormsg = ( + "community_type, action and value are " + "required in bgp_community_list" + ) logger.error(errormsg) return False try: community_type = int(community_type) - cmd = "{} {} {} {}".format(cmd, community_type, action, - value) + cmd = "{} {} {} {}".format(cmd, community_type, action, value) except ValueError: cmd = "{} {} {} {} {}".format( - cmd, community_type, name, action, value) + cmd, community_type, name, action, value + ) if del_action: cmd = "no {}".format(cmd) config_data.append(cmd) - result = create_common_configuration(tgen, router, config_data, - "bgp_community_list", - build=build) + result = create_common_configuration( + tgen, router, config_data, "bgp_community_list", build=build + ) except InvalidCLIError: # Traceback @@ -1634,8 +1630,9 @@ def verify_rib(tgen, addr_type, dut, input_dict, next_hop=None, protocol=None): # Verifying output dictionary rib_routes_json is not empty if bool(rib_routes_json) is False: - errormsg = "No {} route found in rib of router {}..". \ - format(protocol, router) + errormsg = "No {} route found in rib of router {}..".format( + protocol, router + ) return errormsg if "static_routes" in input_dict[routerInput]: @@ -1665,47 +1662,62 @@ def verify_rib(tgen, addr_type, dut, input_dict, next_hop=None, protocol=None): if type(next_hop) is not list: next_hop = [next_hop] - found_hops = [rib_r["ip"] for rib_r in - rib_routes_json[st_rt][0][ - "nexthops"]] + found_hops = [ + rib_r["ip"] + for rib_r in rib_routes_json[st_rt][0]["nexthops"] + ] for nh in found_hops: nh_found = False if nh and nh in next_hop: nh_found = True else: - errormsg = ("Nexthop {} is Missing for {}" - " route {} in RIB of router" - " {}\n".format(next_hop, - protocol, - st_rt, dut)) + errormsg = ( + "Nexthop {} is Missing for {}" + " route {} in RIB of router" + " {}\n".format( + next_hop, protocol, st_rt, dut + ) + ) return errormsg else: missing_routes.append(st_rt) if nh_found: - logger.info("Found next_hop %s for all routes in RIB of" - " router %s\n", next_hop, dut) + logger.info( + "Found next_hop %s for all routes in RIB of" " router %s\n", + next_hop, + dut, + ) if not st_found and len(missing_routes) > 0: - errormsg = "Missing route in RIB of router {}, routes: " \ - "{}\n".format(dut, missing_routes) + errormsg = ( + "Missing route in RIB of router {}, routes: " + "{}\n".format(dut, missing_routes) + ) return errormsg - logger.info("Verified routes in router %s RIB, found routes" - " are: %s\n", dut, found_routes) + logger.info( + "Verified routes in router %s RIB, found routes" " are: %s\n", + dut, + found_routes, + ) continue if "bgp" in input_dict[routerInput]: - if 'advertise_networks' in input_dict[routerInput]["bgp"]\ - ["address_family"][addr_type]["unicast"]: + if ( + "advertise_networks" + in input_dict[routerInput]["bgp"]["address_family"][addr_type][ + "unicast" + ] + ): found_routes = [] missing_routes = [] - advertise_network = input_dict[routerInput]["bgp"]\ - ["address_family"][addr_type]["unicast"]\ - ["advertise_networks"] + advertise_network = input_dict[routerInput]["bgp"][ + "address_family" + ][addr_type]["unicast"]["advertise_networks"] for advertise_network_dict in advertise_network: start_ip = advertise_network_dict["network"] @@ -1730,34 +1742,43 @@ def verify_rib(tgen, addr_type, dut, input_dict, next_hop=None, protocol=None): next_hop = [next_hop] for index, nh in enumerate(next_hop): - if rib_routes_json[st_rt][0]\ - ['nexthops'][index]['ip'] == nh: + if ( + rib_routes_json[st_rt][0]["nexthops"][ + index + ]["ip"] + == nh + ): nh_found = True else: - errormsg=("Nexthop {} is Missing" - " for {} route {} in " - "RIB of router {}\n".\ - format(next_hop, - protocol, - st_rt, dut)) + errormsg = ( + "Nexthop {} is Missing" + " for {} route {} in " + "RIB of router {}\n".format( + next_hop, protocol, st_rt, dut + ) + ) return errormsg else: missing_routes.append(st_rt) if nh_found: - logger.info("Found next_hop {} for all routes in RIB" - " of router {}\n".format(next_hop, dut)) + logger.info( + "Found next_hop {} for all routes in RIB" + " of router {}\n".format(next_hop, dut) + ) if not found and len(missing_routes) > 0: - errormsg = ("Missing {} route in RIB of router {}, " - "routes: {} \n".\ - format(addr_type, dut, missing_routes)) + errormsg = ( + "Missing {} route in RIB of router {}, " + "routes: {} \n".format(addr_type, dut, missing_routes) + ) return errormsg - logger.info("Verified {} routes in router {} RIB, found" - " routes are: {}\n".\ - format(addr_type, dut, found_routes)) + logger.info( + "Verified {} routes in router {} RIB, found" + " routes are: {}\n".format(addr_type, dut, found_routes) + ) logger.debug("Exiting lib API: verify_rib()") return True @@ -1810,8 +1831,11 @@ def verify_admin_distance_for_static_routes(tgen, input_dict): command = "show ipv6 route json" show_ip_route_json = run_frr_cmd(rnode, command, isjson=True) - logger.info("Verifying admin distance for static route %s" - " under dut %s:", static_route, router) + logger.info( + "Verifying admin distance for static route %s" " under dut %s:", + static_route, + router, + ) network = static_route["network"] next_hop = static_route["next_hop"] admin_distance = static_route["admin_distance"] @@ -1819,23 +1843,32 @@ def verify_admin_distance_for_static_routes(tgen, input_dict): if network in show_ip_route_json: if route_data["nexthops"][0]["ip"] == next_hop: if route_data["distance"] != admin_distance: - errormsg = ("Verification failed: admin distance" - " for static route {} under dut {}," - " found:{} but expected:{}". - format(static_route, router, - route_data["distance"], - admin_distance)) + errormsg = ( + "Verification failed: admin distance" + " for static route {} under dut {}," + " found:{} but expected:{}".format( + static_route, + router, + route_data["distance"], + admin_distance, + ) + ) return errormsg else: - logger.info("Verification successful: admin" - " distance for static route %s under" - " dut %s, found:%s", static_route, - router, route_data["distance"]) + logger.info( + "Verification successful: admin" + " distance for static route %s under" + " dut %s, found:%s", + static_route, + router, + route_data["distance"], + ) else: - errormsg = ("Static route {} not found in " - "show_ip_route_json for dut {}". - format(network, router)) + errormsg = ( + "Static route {} not found in " + "show_ip_route_json for dut {}".format(network, router) + ) return errormsg logger.debug("Exiting lib API: verify_admin_distance_for_static_routes()") @@ -1885,12 +1918,17 @@ def verify_prefix_lists(tgen, input_dict): for prefix_list in prefix_lists_addr[addr_type].keys(): if prefix_list in show_prefix_list: - errormsg = ("Prefix list {} is/are present in the router" - " {}".format(prefix_list, router)) + errormsg = ( + "Prefix list {} is/are present in the router" + " {}".format(prefix_list, router) + ) return errormsg - logger.info("Prefix list %s is/are not present in the router" - " from router %s", prefix_list, router) + logger.info( + "Prefix list %s is/are not present in the router" " from router %s", + prefix_list, + router, + ) logger.debug("Exiting lib API: verify_prefix_lists()") return True @@ -1933,12 +1971,16 @@ def verify_route_maps(tgen, input_dict): route_maps = input_dict[router]["route_maps"] for route_map in route_maps: if route_map in show_route_maps: - errormsg = ("Route map {} is not deleted from router" - " {}".format(route_map, router)) + errormsg = "Route map {} is not deleted from router" " {}".format( + route_map, router + ) return errormsg - logger.info("Route map %s is/are deleted successfully from" - " router %s", route_maps, router) + logger.info( + "Route map %s is/are deleted successfully from" " router %s", + route_maps, + router, + ) logger.debug("Exiting lib API: verify_route_maps()") return True @@ -1977,47 +2019,60 @@ def verify_bgp_community(tgen, addr_type, router, network, input_dict=None): rnode = tgen.routers()[router] - logger.debug("Verifying BGP community attributes on dut %s: for %s " - "network %s", router, addr_type, network) + logger.debug( + "Verifying BGP community attributes on dut %s: for %s " "network %s", + router, + addr_type, + network, + ) for net in network: cmd = "show bgp {} {} json".format(addr_type, net) show_bgp_json = rnode.vtysh_cmd(cmd, isjson=True) logger.info(show_bgp_json) if "paths" not in show_bgp_json: - return "Prefix {} not found in BGP table of router: {}". \ - format(net, router) + return "Prefix {} not found in BGP table of router: {}".format(net, router) as_paths = show_bgp_json["paths"] found = False for i in range(len(as_paths)): - if "largeCommunity" in show_bgp_json["paths"][i] or \ - "community" in show_bgp_json["paths"][i]: + if ( + "largeCommunity" in show_bgp_json["paths"][i] + or "community" in show_bgp_json["paths"][i] + ): found = True - logger.info("Large Community attribute is found for route:" - " %s in router: %s", net, router) + logger.info( + "Large Community attribute is found for route:" " %s in router: %s", + net, + router, + ) if input_dict is not None: for criteria, comm_val in input_dict.items(): - show_val = show_bgp_json["paths"][i][criteria][ - "string"] + show_val = show_bgp_json["paths"][i][criteria]["string"] if comm_val == show_val: - logger.info("Verifying BGP %s for prefix: %s" - " in router: %s, found expected" - " value: %s", criteria, net, router, - comm_val) + logger.info( + "Verifying BGP %s for prefix: %s" + " in router: %s, found expected" + " value: %s", + criteria, + net, + router, + comm_val, + ) else: - errormsg = "Failed: Verifying BGP attribute" \ - " {} for route: {} in router: {}" \ - ", expected value: {} but found" \ - ": {}".format( - criteria, net, router, comm_val, - show_val) + errormsg = ( + "Failed: Verifying BGP attribute" + " {} for route: {} in router: {}" + ", expected value: {} but found" + ": {}".format(criteria, net, router, comm_val, show_val) + ) return errormsg if not found: errormsg = ( "Large Community attribute is not found for route: " - "{} in router: {} ".format(net, router)) + "{} in router: {} ".format(net, router) + ) return errormsg logger.debug("Exiting lib API: verify_bgp_community()") @@ -2057,25 +2112,24 @@ def verify_create_community_list(tgen, input_dict): rnode = tgen.routers()[router] - logger.info("Verifying large-community is created for dut %s:", - router) + logger.info("Verifying large-community is created for dut %s:", router) for comm_data in input_dict[router]["bgp_community_lists"]: comm_name = comm_data["name"] comm_type = comm_data["community_type"] - show_bgp_community = \ - run_frr_cmd(rnode, - "show bgp large-community-list {} detail". - format(comm_name)) + show_bgp_community = run_frr_cmd( + rnode, "show bgp large-community-list {} detail".format(comm_name) + ) # Verify community list and type - if comm_name in show_bgp_community and comm_type in \ - show_bgp_community: - logger.info("BGP %s large-community-list %s is" - " created", comm_type, comm_name) + if comm_name in show_bgp_community and comm_type in show_bgp_community: + logger.info( + "BGP %s large-community-list %s is" " created", comm_type, comm_name + ) else: - errormsg = "BGP {} large-community-list {} is not" \ - " created".format(comm_type, comm_name) + errormsg = "BGP {} large-community-list {} is not" " created".format( + comm_type, comm_name + ) return errormsg logger.debug("Exiting lib API: verify_create_community_list()") |
