summaryrefslogtreecommitdiff
path: root/tests/topotests/lib/common_config.py
diff options
context:
space:
mode:
authorreformat <reformat@nobody.nobody>2020-04-03 14:05:24 +0300
committerDonatas Abraitis <donatas.abraitis@gmail.com>2020-04-03 19:41:28 +0300
commit787e762445d50ca5b52fafcf8dd6de08ab90916f (patch)
treeafaad3d41a83da180d5fc8bbc7b23d02da7c4dbd /tests/topotests/lib/common_config.py
parentcd05906c41aaaf94e06fa9d14a7634bdffd99ed0 (diff)
tests: Run python formatter (black) for topotests
Mostly ' => ", whitespace changes. Using https://github.com/psf/black Signed-off-by: reformat <reformat@nobody.nobody>
Diffstat (limited to 'tests/topotests/lib/common_config.py')
-rw-r--r--tests/topotests/lib/common_config.py714
1 files changed, 384 insertions, 330 deletions
diff --git a/tests/topotests/lib/common_config.py b/tests/topotests/lib/common_config.py
index 8a9d2d64c9..5ee59070cc 100644
--- a/tests/topotests/lib/common_config.py
+++ b/tests/topotests/lib/common_config.py
@@ -79,9 +79,9 @@ if config.has_option("topogen", "frrtest_log_dir"):
frrtest_log_file = frrtest_log_dir + logfile_name + str(time_stamp)
print("frrtest_log_file..", frrtest_log_file)
- logger = logger_config.get_logger(name="test_execution_logs",
- log_level=loglevel,
- target=frrtest_log_file)
+ logger = logger_config.get_logger(
+ name="test_execution_logs", log_level=loglevel, target=frrtest_log_file
+ )
print("Logs will be sent to logfile: {}".format(frrtest_log_file))
if config.has_option("topogen", "show_router_config"):
@@ -94,10 +94,7 @@ ADDRESS_TYPES = os.environ.get("ADDRESS_TYPES")
# Saves sequence id numbers
-SEQ_ID = {
- "prefix_lists": {},
- "route_maps": {}
-}
+SEQ_ID = {"prefix_lists": {}, "route_maps": {}}
def get_seq_id(obj_type, router, obj_name):
@@ -145,6 +142,7 @@ def set_seq_id(obj_type, router, id, obj_name):
class InvalidCLIError(Exception):
"""Raise when the CLI command is wrong"""
+
pass
@@ -169,16 +167,19 @@ def run_frr_cmd(rnode, cmd, isjson=False):
else:
print_data = ret_data
- logger.info('Output for command [ %s] on router %s:\n%s',
- cmd.rstrip("json"), rnode.name, print_data)
+ logger.info(
+ "Output for command [ %s] on router %s:\n%s",
+ cmd.rstrip("json"),
+ rnode.name,
+ print_data,
+ )
return ret_data
else:
- raise InvalidCLIError('No actual cmd passed')
+ raise InvalidCLIError("No actual cmd passed")
-def create_common_configuration(tgen, router, data, config_type=None,
- build=False):
+def create_common_configuration(tgen, router, data, config_type=None, build=False):
"""
API to create object of class FRRConfig and also create frr_json.conf
file. It will create interface and common configurations and save it to
@@ -201,15 +202,17 @@ def create_common_configuration(tgen, router, data, config_type=None,
fname = "{}/{}/{}".format(TMPDIR, router, FRRCFG_FILE)
- config_map = OrderedDict({
- "general_config": "! FRR General Config\n",
- "interface_config": "! Interfaces Config\n",
- "static_route": "! Static Route Config\n",
- "prefix_list": "! Prefix List Config\n",
- "bgp_community_list": "! Community List Config\n",
- "route_maps": "! Route Maps Config\n",
- "bgp": "! BGP Config\n"
- })
+ config_map = OrderedDict(
+ {
+ "general_config": "! FRR General Config\n",
+ "interface_config": "! Interfaces Config\n",
+ "static_route": "! Static Route Config\n",
+ "prefix_list": "! Prefix List Config\n",
+ "bgp_community_list": "! Community List Config\n",
+ "route_maps": "! Route Maps Config\n",
+ "bgp": "! BGP Config\n",
+ }
+ )
if build:
mode = "a"
@@ -225,8 +228,9 @@ def create_common_configuration(tgen, router, data, config_type=None,
frr_cfg_fd.write("\n")
except IOError as err:
- logger.error("Unable to open FRR Config File. error(%s): %s" %
- (err.errno, err.strerror))
+ logger.error(
+ "Unable to open FRR Config File. error(%s): %s" % (err.errno, err.strerror)
+ )
return False
finally:
frr_cfg_fd.close()
@@ -257,8 +261,7 @@ def reset_config_on_routers(tgen, routerName=None):
continue
router = router_list[rname]
- logger.info("Configuring router %s to initial test configuration",
- rname)
+ logger.info("Configuring router %s to initial test configuration", rname)
cfg = router.run("vtysh -c 'show running'")
fname = "{}/{}/frr.sav".format(TMPDIR, rname)
dname = "{}/{}/delta.conf".format(TMPDIR, rname)
@@ -266,9 +269,11 @@ def reset_config_on_routers(tgen, routerName=None):
for line in cfg.split("\n"):
line = line.strip()
- if (line == "Building configuration..." or
- line == "Current configuration:" or
- not line):
+ if (
+ line == "Building configuration..."
+ or line == "Current configuration:"
+ or not line
+ ):
continue
f.write(line)
f.write("\n")
@@ -279,37 +284,39 @@ def reset_config_on_routers(tgen, routerName=None):
init_cfg_file = "{}/{}/frr_json_initial.conf".format(TMPDIR, rname)
tempdir = mkdtemp()
- with open(os.path.join(tempdir, 'vtysh.conf'), 'w') as fd:
+ with open(os.path.join(tempdir, "vtysh.conf"), "w") as fd:
pass
- command = "/usr/lib/frr/frr-reload.py --confdir {} --input {} --test {} > {}". \
- format(tempdir, run_cfg_file, init_cfg_file, dname)
- result = call(command, shell=True, stderr=SUB_STDOUT,
- stdout=SUB_PIPE)
+ command = "/usr/lib/frr/frr-reload.py --confdir {} --input {} --test {} > {}".format(
+ tempdir, run_cfg_file, init_cfg_file, dname
+ )
+ result = call(command, shell=True, stderr=SUB_STDOUT, stdout=SUB_PIPE)
- os.unlink(os.path.join(tempdir, 'vtysh.conf'))
+ os.unlink(os.path.join(tempdir, "vtysh.conf"))
os.rmdir(tempdir)
# Assert if command fail
if result > 0:
- logger.error("Delta file creation failed. Command executed %s",
- command)
- with open(run_cfg_file, 'r') as fd:
- logger.info('Running configuration saved in %s is:\n%s',
- run_cfg_file, fd.read())
- with open(init_cfg_file, 'r') as fd:
- logger.info('Test configuration saved in %s is:\n%s',
- init_cfg_file, fd.read())
-
- err_cmd = ['/usr/bin/vtysh', '-m', '-f', run_cfg_file]
+ logger.error("Delta file creation failed. Command executed %s", command)
+ with open(run_cfg_file, "r") as fd:
+ logger.info(
+ "Running configuration saved in %s is:\n%s", run_cfg_file, fd.read()
+ )
+ with open(init_cfg_file, "r") as fd:
+ logger.info(
+ "Test configuration saved in %s is:\n%s", init_cfg_file, fd.read()
+ )
+
+ err_cmd = ["/usr/bin/vtysh", "-m", "-f", run_cfg_file]
result = Popen(err_cmd, stdout=SUB_PIPE, stderr=SUB_PIPE)
output = result.communicate()
for out_data in output:
- temp_data = out_data.decode('utf-8').lower()
+ temp_data = out_data.decode("utf-8").lower()
for out_err in ERROR_LIST:
if out_err.lower() in temp_data:
- logger.error("Found errors while validating data in"
- " %s", run_cfg_file)
+ logger.error(
+ "Found errors while validating data in" " %s", run_cfg_file
+ )
raise InvalidCLIError(out_data)
raise InvalidCLIError("Unknown error in %s", output)
@@ -319,18 +326,19 @@ def reset_config_on_routers(tgen, routerName=None):
t_delta = f.read()
for line in t_delta.split("\n"):
line = line.strip()
- if (line == "Lines To Delete" or
- line == "===============" or
- line == "Lines To Add" or
- line == "============" or
- not line):
+ if (
+ line == "Lines To Delete"
+ or line == "==============="
+ or line == "Lines To Add"
+ or line == "============"
+ or not line
+ ):
continue
delta.write(line)
delta.write("\n")
delta.write("end\n")
- output = router.vtysh_multicmd(delta.getvalue(),
- pretty_output=False)
+ output = router.vtysh_multicmd(delta.getvalue(), pretty_output=False)
delta.close()
delta = StringIO.StringIO()
@@ -343,8 +351,7 @@ def reset_config_on_routers(tgen, routerName=None):
# Router current configuration to log file or console if
# "show_router_config" is defined in "pytest.ini"
if show_router_config:
- logger.info("Configuration on router {} after config reset:".
- format(rname))
+ logger.info("Configuration on router {} after config reset:".format(rname))
logger.info(delta.getvalue())
delta.close()
@@ -373,12 +380,13 @@ def load_config_to_router(tgen, routerName, save_bkup=False):
router = router_list[rname]
try:
frr_cfg_file = "{}/{}/{}".format(TMPDIR, rname, FRRCFG_FILE)
- frr_cfg_bkup = "{}/{}/{}".format(TMPDIR, rname,
- FRRCFG_BKUP_FILE)
+ frr_cfg_bkup = "{}/{}/{}".format(TMPDIR, rname, FRRCFG_BKUP_FILE)
with open(frr_cfg_file, "r+") as cfg:
data = cfg.read()
- logger.info("Applying following configuration on router"
- " {}:\n{}".format(rname, data))
+ logger.info(
+ "Applying following configuration on router"
+ " {}:\n{}".format(rname, data)
+ )
if save_bkup:
with open(frr_cfg_bkup, "w") as bkup:
bkup.write(data)
@@ -390,8 +398,10 @@ def load_config_to_router(tgen, routerName, save_bkup=False):
cfg.truncate(0)
except IOError as err:
- errormsg = ("Unable to open config File. error(%s):"
- " %s", (err.errno, err.strerror))
+ errormsg = (
+ "Unable to open config File. error(%s):" " %s",
+ (err.errno, err.strerror),
+ )
return errormsg
# Router current configuration to log file or console if
@@ -418,8 +428,9 @@ def start_topology(tgen):
# Starting deamons
router_list = tgen.routers()
- ROUTER_LIST = sorted(router_list.keys(),
- key=lambda x: int(re_search('\d+', x).group(0)))
+ ROUTER_LIST = sorted(
+ router_list.keys(), key=lambda x: int(re_search("\d+", x).group(0))
+ )
TMPDIR = os.path.join(LOGDIR, tgen.modname)
router_list = tgen.routers()
@@ -430,31 +441,27 @@ def start_topology(tgen):
# Creating router named dir and empty zebra.conf bgpd.conf files
# inside the current directory
- if os.path.isdir('{}'.format(rname)):
+ if os.path.isdir("{}".format(rname)):
os.system("rm -rf {}".format(rname))
- os.mkdir('{}'.format(rname))
- os.system('chmod -R go+rw {}'.format(rname))
- os.chdir('{}/{}'.format(TMPDIR, rname))
- os.system('touch zebra.conf bgpd.conf')
+ os.mkdir("{}".format(rname))
+ os.system("chmod -R go+rw {}".format(rname))
+ os.chdir("{}/{}".format(TMPDIR, rname))
+ os.system("touch zebra.conf bgpd.conf")
else:
- os.mkdir('{}'.format(rname))
- os.system('chmod -R go+rw {}'.format(rname))
- os.chdir('{}/{}'.format(TMPDIR, rname))
- os.system('touch zebra.conf bgpd.conf')
+ os.mkdir("{}".format(rname))
+ os.system("chmod -R go+rw {}".format(rname))
+ os.chdir("{}/{}".format(TMPDIR, rname))
+ os.system("touch zebra.conf bgpd.conf")
except IOError as (errno, strerror):
logger.error("I/O error({0}): {1}".format(errno, strerror))
# Loading empty zebra.conf file to router, to start the zebra deamon
router.load_config(
- TopoRouter.RD_ZEBRA,
- '{}/{}/zebra.conf'.format(TMPDIR, rname)
+ TopoRouter.RD_ZEBRA, "{}/{}/zebra.conf".format(TMPDIR, rname)
)
# Loading empty bgpd.conf file to router, to start the bgp deamon
- router.load_config(
- TopoRouter.RD_BGP,
- '{}/{}/bgpd.conf'.format(TMPDIR, rname)
- )
+ router.load_config(TopoRouter.RD_BGP, "{}/{}/bgpd.conf".format(TMPDIR, rname))
# Starting routers
logger.info("Starting all routers once topology is created")
@@ -483,6 +490,7 @@ def number_to_column(routerName):
# Common APIs, will be used by all protocols
#############################################
+
def validate_ip_address(ip_address):
"""
Validates the type of ip address
@@ -518,8 +526,9 @@ def validate_ip_address(ip_address):
return "ipv6"
if not v4 and not v6:
- raise Exception("InvalidIpAddr", "%s is neither valid IPv4 or IPv6"
- " address" % ip_address)
+ raise Exception(
+ "InvalidIpAddr", "%s is neither valid IPv4 or IPv6" " address" % ip_address
+ )
def check_address_types(addr_type=None):
@@ -542,8 +551,11 @@ def check_address_types(addr_type=None):
return addr_types
if addr_type not in addr_types:
- logger.error("{} not in supported/configured address types {}".
- format(addr_type, addr_types))
+ logger.error(
+ "{} not in supported/configured address types {}".format(
+ addr_type, addr_types
+ )
+ )
return False
return True
@@ -589,8 +601,7 @@ def generate_ips(network, no_of_ips):
return ipaddress_list
-def find_interface_with_greater_ip(topo, router, loopback=True,
- interface=True):
+def find_interface_with_greater_ip(topo, router, loopback=True, interface=True):
"""
Returns highest interface ip for ipv4/ipv6. If loopback is there then
it will return highest IP from loopback IPs otherwise from physical
@@ -608,12 +619,14 @@ def find_interface_with_greater_ip(topo, router, loopback=True,
if loopback:
if "type" in data and data["type"] == "loopback":
lo_exists = True
- ip_address = topo["routers"][router]["links"][
- destRouterLink]["ipv4"].split("/")[0]
+ ip_address = topo["routers"][router]["links"][destRouterLink][
+ "ipv4"
+ ].split("/")[0]
lo_list.append(ip_address)
if interface:
- ip_address = topo["routers"][router]["links"][
- destRouterLink]["ipv4"].split("/")[0]
+ ip_address = topo["routers"][router]["links"][destRouterLink]["ipv4"].split(
+ "/"
+ )[0]
interfaces_list.append(ip_address)
if lo_exists:
@@ -625,17 +638,17 @@ def find_interface_with_greater_ip(topo, router, loopback=True,
def write_test_header(tc_name):
""" Display message at beginning of test case"""
count = 20
- logger.info("*"*(len(tc_name)+count))
+ logger.info("*" * (len(tc_name) + count))
step("START -> Testcase : %s" % tc_name, reset=True)
- logger.info("*"*(len(tc_name)+count))
+ logger.info("*" * (len(tc_name) + count))
def write_test_footer(tc_name):
""" Display message at end of test case"""
count = 21
- logger.info("="*(len(tc_name)+count))
+ logger.info("=" * (len(tc_name) + count))
logger.info("Testcase : %s -> PASSED", tc_name)
- logger.info("="*(len(tc_name)+count))
+ logger.info("=" * (len(tc_name) + count))
def interface_status(tgen, topo, input_dict):
@@ -664,8 +677,8 @@ def interface_status(tgen, topo, input_dict):
global frr_cfg
for router in input_dict.keys():
- interface_list = input_dict[router]['interface_list']
- status = input_dict[router].setdefault('status', 'up')
+ interface_list = input_dict[router]["interface_list"]
+ status = input_dict[router].setdefault("status", "up")
for intf in interface_list:
rnode = tgen.routers()[router]
interface_set_status(rnode, intf, status)
@@ -698,11 +711,10 @@ def retry(attempts=3, wait=2, return_is_str=True, initial_wait=0):
"""
def _retry(func):
-
@wraps(func)
def func_retry(*args, **kwargs):
- _wait = kwargs.pop('wait', wait)
- _attempts = kwargs.pop('attempts', attempts)
+ _wait = kwargs.pop("wait", wait)
+ _attempts = kwargs.pop("attempts", attempts)
_attempts = int(_attempts)
if _attempts < 0:
raise ValueError("attempts must be 0 or greater")
@@ -711,11 +723,11 @@ def retry(attempts=3, wait=2, return_is_str=True, initial_wait=0):
logger.info("Waiting for [%s]s as initial delay", initial_wait)
sleep(initial_wait)
- _return_is_str = kwargs.pop('return_is_str', return_is_str)
+ _return_is_str = kwargs.pop("return_is_str", return_is_str)
for i in range(1, _attempts + 1):
try:
- _expected = kwargs.setdefault('expected', True)
- kwargs.pop('expected')
+ _expected = kwargs.setdefault("expected", True)
+ kwargs.pop("expected")
ret = func(*args, **kwargs)
logger.debug("Function returned %s" % ret)
if return_is_str and isinstance(ret, bool) and _expected:
@@ -727,17 +739,17 @@ def retry(attempts=3, wait=2, return_is_str=True, initial_wait=0):
return ret
except Exception as err:
if _attempts == i:
- logger.info("Max number of attempts (%r) reached",
- _attempts)
+ logger.info("Max number of attempts (%r) reached", _attempts)
raise
else:
logger.info("Function returned %s", err)
if i < _attempts:
- logger.info("Retry [#%r] after sleeping for %ss"
- % (i, _wait))
+ logger.info("Retry [#%r] after sleeping for %ss" % (i, _wait))
sleep(_wait)
+
func_retry._original = func
return func_retry
+
return _retry
@@ -745,6 +757,7 @@ class Stepper:
"""
Prints step number for the test case step being executed
"""
+
count = 1
def __call__(self, msg, reset):
@@ -795,24 +808,17 @@ def create_interfaces_cfg(tgen, topo, build=False):
interface_name = destRouterLink
else:
interface_name = data["interface"]
- interface_data.append("interface {}".format(
- str(interface_name)
- ))
+ interface_data.append("interface {}".format(str(interface_name)))
if "ipv4" in data:
intf_addr = c_data["links"][destRouterLink]["ipv4"]
- interface_data.append("ip address {}".format(
- intf_addr
- ))
+ interface_data.append("ip address {}".format(intf_addr))
if "ipv6" in data:
intf_addr = c_data["links"][destRouterLink]["ipv6"]
- interface_data.append("ipv6 address {}".format(
- intf_addr
- ))
-
- result = create_common_configuration(tgen, c_router,
- interface_data,
- "interface_config",
- build=build)
+ interface_data.append("ipv6 address {}".format(intf_addr))
+
+ result = create_common_configuration(
+ tgen, c_router, interface_data, "interface_config", build=build
+ )
except InvalidCLIError:
# Traceback
errormsg = traceback.format_exc()
@@ -880,13 +886,10 @@ def create_static_routes(tgen, input_dict, build=False):
del_action = static_route.setdefault("delete", False)
# No of IPs
no_of_ip = static_route.setdefault("no_of_ip", 1)
- admin_distance = static_route.setdefault("admin_distance",
- None)
+ admin_distance = static_route.setdefault("admin_distance", None)
tag = static_route.setdefault("tag", None)
- if "next_hop" not in static_route or \
- "network" not in static_route:
- errormsg = "'next_hop' or 'network' missing in" \
- " input_dict"
+ if "next_hop" not in static_route or "network" not in static_route:
+ errormsg = "'next_hop' or 'network' missing in" " input_dict"
return errormsg
next_hop = static_route["next_hop"]
@@ -914,10 +917,9 @@ def create_static_routes(tgen, input_dict, build=False):
static_routes_list.append(cmd)
- result = create_common_configuration(tgen, router,
- static_routes_list,
- "static_route",
- build=build)
+ result = create_common_configuration(
+ tgen, router, static_routes_list, "static_route", build=build
+ )
except InvalidCLIError:
# Traceback
@@ -992,10 +994,8 @@ def create_prefix_lists(tgen, input_dict, build=False):
for prefix_name, prefix_list in prefix_data.iteritems():
for prefix_dict in prefix_list:
- if "action" not in prefix_dict or \
- "network" not in prefix_dict:
- errormsg = "'action' or network' missing in" \
- " input_dict"
+ if "action" not in prefix_dict or "network" not in prefix_dict:
+ errormsg = "'action' or network' missing in" " input_dict"
return errormsg
network_addr = prefix_dict["network"]
@@ -1005,11 +1005,9 @@ def create_prefix_lists(tgen, input_dict, build=False):
seqid = prefix_dict.setdefault("seqid", None)
del_action = prefix_dict.setdefault("delete", False)
if seqid is None:
- seqid = get_seq_id("prefix_lists", router,
- prefix_name)
+ seqid = get_seq_id("prefix_lists", router, prefix_name)
else:
- set_seq_id("prefix_lists", router, seqid,
- prefix_name)
+ set_seq_id("prefix_lists", router, seqid, prefix_name)
if addr_type == "ipv4":
protocol = "ip"
@@ -1028,10 +1026,9 @@ def create_prefix_lists(tgen, input_dict, build=False):
cmd = "no {}".format(cmd)
config_data.append(cmd)
- result = create_common_configuration(tgen, router,
- config_data,
- "prefix_list",
- build=build)
+ result = create_common_configuration(
+ tgen, router, config_data, "prefix_list", build=build
+ )
except InvalidCLIError:
# Traceback
@@ -1137,8 +1134,7 @@ def create_route_maps(tgen, input_dict, build=False):
logger.debug("route_maps not present in input_dict")
continue
rmap_data = []
- for rmap_name, rmap_value in \
- input_dict[router]["route_maps"].iteritems():
+ for rmap_name, rmap_value in input_dict[router]["route_maps"].iteritems():
for rmap_dict in rmap_value:
del_action = rmap_dict.setdefault("delete", False)
@@ -1160,38 +1156,39 @@ def create_route_maps(tgen, input_dict, build=False):
else:
set_seq_id("route_maps", router, seq_id, rmap_name)
- rmap_data.append("route-map {} {} {}".format(
- rmap_name, rmap_action, seq_id
- ))
+ rmap_data.append(
+ "route-map {} {} {}".format(rmap_name, rmap_action, seq_id)
+ )
if "continue" in rmap_dict:
continue_to = rmap_dict["continue"]
if continue_to:
- rmap_data.append("on-match goto {}".
- format(continue_to))
+ rmap_data.append("on-match goto {}".format(continue_to))
else:
- logger.error("In continue, 'route-map entry "
- "sequence number' is not provided")
+ logger.error(
+ "In continue, 'route-map entry "
+ "sequence number' is not provided"
+ )
return False
if "goto" in rmap_dict:
go_to = rmap_dict["goto"]
if go_to:
- rmap_data.append("on-match goto {}".
- format(go_to))
+ rmap_data.append("on-match goto {}".format(go_to))
else:
- logger.error("In goto, 'Goto Clause number' is not"
- " provided")
+ logger.error(
+ "In goto, 'Goto Clause number' is not" " provided"
+ )
return False
if "call" in rmap_dict:
call_rmap = rmap_dict["call"]
if call_rmap:
- rmap_data.append("call {}".
- format(call_rmap))
+ rmap_data.append("call {}".format(call_rmap))
else:
- logger.error("In call, 'destination Route-Map' is"
- " not provided")
+ logger.error(
+ "In call, 'destination Route-Map' is" " not provided"
+ )
return False
# Verifying if SET criteria is defined
@@ -1199,24 +1196,22 @@ def create_route_maps(tgen, input_dict, build=False):
set_data = rmap_dict["set"]
ipv4_data = set_data.setdefault("ipv4", {})
ipv6_data = set_data.setdefault("ipv6", {})
- local_preference = set_data.setdefault("locPrf",
- None)
+ local_preference = set_data.setdefault("locPrf", None)
metric = set_data.setdefault("metric", None)
as_path = set_data.setdefault("path", {})
weight = set_data.setdefault("weight", None)
community = set_data.setdefault("community", {})
- large_community = set_data.setdefault(
- "large_community", {})
- large_comm_list = set_data.setdefault(
- "large_comm_list", {})
+ large_community = set_data.setdefault("large_community", {})
+ large_comm_list = set_data.setdefault("large_comm_list", {})
set_action = set_data.setdefault("set_action", None)
nexthop = set_data.setdefault("nexthop", None)
origin = set_data.setdefault("origin", None)
# Local Preference
if local_preference:
- rmap_data.append("set local-preference {}".
- format(local_preference))
+ rmap_data.append(
+ "set local-preference {}".format(local_preference)
+ )
# Metric
if metric:
@@ -1231,8 +1226,9 @@ def create_route_maps(tgen, input_dict, build=False):
as_num = as_path.setdefault("as_num", None)
as_action = as_path.setdefault("as_action", None)
if as_action and as_num:
- rmap_data.append("set as-path {} {}".
- format(as_action, as_num))
+ rmap_data.append(
+ "set as-path {} {}".format(as_action, as_num)
+ )
# Community
if community:
@@ -1244,14 +1240,12 @@ def create_route_maps(tgen, input_dict, build=False):
cmd = "{} {}".format(cmd, comm_action)
rmap_data.append(cmd)
else:
- logger.error("In community, AS Num not"
- " provided")
+ logger.error("In community, AS Num not" " provided")
return False
if large_community:
num = large_community.setdefault("num", None)
- comm_action = large_community.setdefault("action",
- None)
+ comm_action = large_community.setdefault("action", None)
if num:
cmd = "set large-community {}".format(num)
if comm_action:
@@ -1259,13 +1253,13 @@ def create_route_maps(tgen, input_dict, build=False):
rmap_data.append(cmd)
else:
- logger.error("In large_community, AS Num not"
- " provided")
+ logger.error(
+ "In large_community, AS Num not" " provided"
+ )
return False
if large_comm_list:
id = large_comm_list.setdefault("id", None)
- del_comm = large_comm_list.setdefault("delete",
- None)
+ del_comm = large_comm_list.setdefault("delete", None)
if id:
cmd = "set large-comm-list {}".format(id)
if del_comm:
@@ -1273,43 +1267,36 @@ def create_route_maps(tgen, input_dict, build=False):
rmap_data.append(cmd)
else:
- logger.error("In large_comm_list 'id' not"
- " provided")
+ logger.error("In large_comm_list 'id' not" " provided")
return False
# Weight
if weight:
- rmap_data.append("set weight {}".format(
- weight))
+ rmap_data.append("set weight {}".format(weight))
if ipv6_data:
nexthop = ipv6_data.setdefault("nexthop", None)
if nexthop:
- rmap_data.append("set ipv6 next-hop {}".format(
- nexthop
- ))
+ rmap_data.append("set ipv6 next-hop {}".format(nexthop))
# Adding MATCH and SET sequence to RMAP if defined
if "match" in rmap_dict:
match_data = rmap_dict["match"]
ipv4_data = match_data.setdefault("ipv4", {})
ipv6_data = match_data.setdefault("ipv6", {})
- community = match_data.setdefault(
- "community_list",{})
- large_community = match_data.setdefault(
- "large_community", {}
- )
+ community = match_data.setdefault("community_list", {})
+ large_community = match_data.setdefault("large_community", {})
large_community_list = match_data.setdefault(
"large_community_list", {}
)
if ipv4_data:
# fetch prefix list data from rmap
- prefix_name = \
- ipv4_data.setdefault("prefix_lists",
- None)
+ prefix_name = ipv4_data.setdefault("prefix_lists", None)
if prefix_name:
- rmap_data.append("match ip address"
- " prefix-list {}".format(prefix_name))
+ rmap_data.append(
+ "match ip address"
+ " prefix-list {}".format(prefix_name)
+ )
# fetch tag data from rmap
tag = ipv4_data.setdefault("tag", None)
@@ -1318,16 +1305,19 @@ def create_route_maps(tgen, input_dict, build=False):
# fetch large community data from rmap
large_community_list = ipv4_data.setdefault(
- "large_community_list",{})
+ "large_community_list", {}
+ )
large_community = match_data.setdefault(
- "large_community", {})
+ "large_community", {}
+ )
if ipv6_data:
- prefix_name = ipv6_data.setdefault("prefix_lists",
- None)
+ prefix_name = ipv6_data.setdefault("prefix_lists", None)
if prefix_name:
- rmap_data.append("match ipv6 address"
- " prefix-list {}".format(prefix_name))
+ rmap_data.append(
+ "match ipv6 address"
+ " prefix-list {}".format(prefix_name)
+ )
# fetch tag data from rmap
tag = ipv6_data.setdefault("tag", None)
@@ -1336,54 +1326,64 @@ def create_route_maps(tgen, input_dict, build=False):
# fetch large community data from rmap
large_community_list = ipv6_data.setdefault(
- "large_community_list",{})
+ "large_community_list", {}
+ )
large_community = match_data.setdefault(
- "large_community", {})
+ "large_community", {}
+ )
if community:
if "id" not in community:
- logger.error("'id' is mandatory for "
- "community-list in match"
- " criteria")
+ logger.error(
+ "'id' is mandatory for "
+ "community-list in match"
+ " criteria"
+ )
return False
cmd = "match community {}".format(community["id"])
- exact_match = community.setdefault("exact_match",
- False)
+ exact_match = community.setdefault("exact_match", False)
if exact_match:
cmd = "{} exact-match".format(cmd)
rmap_data.append(cmd)
if large_community:
if "id" not in large_community:
- logger.error("'id' is mandatory for "
- "large-community-list in match "
- "criteria")
+ logger.error(
+ "'id' is mandatory for "
+ "large-community-list in match "
+ "criteria"
+ )
return False
cmd = "match large-community {}".format(
- large_community["id"])
+ large_community["id"]
+ )
exact_match = large_community.setdefault(
- "exact_match", False)
+ "exact_match", False
+ )
if exact_match:
cmd = "{} exact-match".format(cmd)
rmap_data.append(cmd)
if large_community_list:
if "id" not in large_community_list:
- logger.error("'id' is mandatory for "
- "large-community-list in match "
- "criteria")
+ logger.error(
+ "'id' is mandatory for "
+ "large-community-list in match "
+ "criteria"
+ )
return False
cmd = "match large-community {}".format(
- large_community_list["id"])
+ large_community_list["id"]
+ )
exact_match = large_community_list.setdefault(
- "exact_match", False)
+ "exact_match", False
+ )
if exact_match:
cmd = "{} exact-match".format(cmd)
rmap_data.append(cmd)
- result = create_common_configuration(tgen, router,
- rmap_data,
- "route_maps",
- build=build)
+ result = create_common_configuration(
+ tgen, router, rmap_data, "route_maps", build=build
+ )
except InvalidCLIError:
# Traceback
@@ -1424,12 +1424,7 @@ def delete_route_maps(tgen, input_dict):
rmap_data = input_dict[router]
rmap_data["route_maps"] = {}
for route_map_name in route_maps:
- rmap_data["route_maps"].update({
- route_map_name:
- [{
- "delete": True
- }]
- })
+ rmap_data["route_maps"].update({route_map_name: [{"delete": True}]})
return create_route_maps(tgen, input_dict)
@@ -1478,10 +1473,9 @@ def create_bgp_community_lists(tgen, input_dict, build=False):
community_list = input_dict[router]["bgp_community_lists"]
for community_dict in community_list:
del_action = community_dict.setdefault("delete", False)
- community_type = community_dict.setdefault("community_type",
- None)
+ community_type = community_dict.setdefault("community_type", None)
action = community_dict.setdefault("action", None)
- value = community_dict.setdefault("value", '')
+ value = community_dict.setdefault("value", "")
large = community_dict.setdefault("large", None)
name = community_dict.setdefault("name", None)
if large:
@@ -1490,28 +1484,30 @@ def create_bgp_community_lists(tgen, input_dict, build=False):
cmd = "bgp community-list"
if not large and not (community_type and action and value):
- errormsg = "community_type, action and value are " \
- "required in bgp_community_list"
+ errormsg = (
+ "community_type, action and value are "
+ "required in bgp_community_list"
+ )
logger.error(errormsg)
return False
try:
community_type = int(community_type)
- cmd = "{} {} {} {}".format(cmd, community_type, action,
- value)
+ cmd = "{} {} {} {}".format(cmd, community_type, action, value)
except ValueError:
cmd = "{} {} {} {} {}".format(
- cmd, community_type, name, action, value)
+ cmd, community_type, name, action, value
+ )
if del_action:
cmd = "no {}".format(cmd)
config_data.append(cmd)
- result = create_common_configuration(tgen, router, config_data,
- "bgp_community_list",
- build=build)
+ result = create_common_configuration(
+ tgen, router, config_data, "bgp_community_list", build=build
+ )
except InvalidCLIError:
# Traceback
@@ -1634,8 +1630,9 @@ def verify_rib(tgen, addr_type, dut, input_dict, next_hop=None, protocol=None):
# Verifying output dictionary rib_routes_json is not empty
if bool(rib_routes_json) is False:
- errormsg = "No {} route found in rib of router {}..". \
- format(protocol, router)
+ errormsg = "No {} route found in rib of router {}..".format(
+ protocol, router
+ )
return errormsg
if "static_routes" in input_dict[routerInput]:
@@ -1665,47 +1662,62 @@ def verify_rib(tgen, addr_type, dut, input_dict, next_hop=None, protocol=None):
if type(next_hop) is not list:
next_hop = [next_hop]
- found_hops = [rib_r["ip"] for rib_r in
- rib_routes_json[st_rt][0][
- "nexthops"]]
+ found_hops = [
+ rib_r["ip"]
+ for rib_r in rib_routes_json[st_rt][0]["nexthops"]
+ ]
for nh in found_hops:
nh_found = False
if nh and nh in next_hop:
nh_found = True
else:
- errormsg = ("Nexthop {} is Missing for {}"
- " route {} in RIB of router"
- " {}\n".format(next_hop,
- protocol,
- st_rt, dut))
+ errormsg = (
+ "Nexthop {} is Missing for {}"
+ " route {} in RIB of router"
+ " {}\n".format(
+ next_hop, protocol, st_rt, dut
+ )
+ )
return errormsg
else:
missing_routes.append(st_rt)
if nh_found:
- logger.info("Found next_hop %s for all routes in RIB of"
- " router %s\n", next_hop, dut)
+ logger.info(
+ "Found next_hop %s for all routes in RIB of" " router %s\n",
+ next_hop,
+ dut,
+ )
if not st_found and len(missing_routes) > 0:
- errormsg = "Missing route in RIB of router {}, routes: " \
- "{}\n".format(dut, missing_routes)
+ errormsg = (
+ "Missing route in RIB of router {}, routes: "
+ "{}\n".format(dut, missing_routes)
+ )
return errormsg
- logger.info("Verified routes in router %s RIB, found routes"
- " are: %s\n", dut, found_routes)
+ logger.info(
+ "Verified routes in router %s RIB, found routes" " are: %s\n",
+ dut,
+ found_routes,
+ )
continue
if "bgp" in input_dict[routerInput]:
- if 'advertise_networks' in input_dict[routerInput]["bgp"]\
- ["address_family"][addr_type]["unicast"]:
+ if (
+ "advertise_networks"
+ in input_dict[routerInput]["bgp"]["address_family"][addr_type][
+ "unicast"
+ ]
+ ):
found_routes = []
missing_routes = []
- advertise_network = input_dict[routerInput]["bgp"]\
- ["address_family"][addr_type]["unicast"]\
- ["advertise_networks"]
+ advertise_network = input_dict[routerInput]["bgp"][
+ "address_family"
+ ][addr_type]["unicast"]["advertise_networks"]
for advertise_network_dict in advertise_network:
start_ip = advertise_network_dict["network"]
@@ -1730,34 +1742,43 @@ def verify_rib(tgen, addr_type, dut, input_dict, next_hop=None, protocol=None):
next_hop = [next_hop]
for index, nh in enumerate(next_hop):
- if rib_routes_json[st_rt][0]\
- ['nexthops'][index]['ip'] == nh:
+ if (
+ rib_routes_json[st_rt][0]["nexthops"][
+ index
+ ]["ip"]
+ == nh
+ ):
nh_found = True
else:
- errormsg=("Nexthop {} is Missing"
- " for {} route {} in "
- "RIB of router {}\n".\
- format(next_hop,
- protocol,
- st_rt, dut))
+ errormsg = (
+ "Nexthop {} is Missing"
+ " for {} route {} in "
+ "RIB of router {}\n".format(
+ next_hop, protocol, st_rt, dut
+ )
+ )
return errormsg
else:
missing_routes.append(st_rt)
if nh_found:
- logger.info("Found next_hop {} for all routes in RIB"
- " of router {}\n".format(next_hop, dut))
+ logger.info(
+ "Found next_hop {} for all routes in RIB"
+ " of router {}\n".format(next_hop, dut)
+ )
if not found and len(missing_routes) > 0:
- errormsg = ("Missing {} route in RIB of router {}, "
- "routes: {} \n".\
- format(addr_type, dut, missing_routes))
+ errormsg = (
+ "Missing {} route in RIB of router {}, "
+ "routes: {} \n".format(addr_type, dut, missing_routes)
+ )
return errormsg
- logger.info("Verified {} routes in router {} RIB, found"
- " routes are: {}\n".\
- format(addr_type, dut, found_routes))
+ logger.info(
+ "Verified {} routes in router {} RIB, found"
+ " routes are: {}\n".format(addr_type, dut, found_routes)
+ )
logger.debug("Exiting lib API: verify_rib()")
return True
@@ -1810,8 +1831,11 @@ def verify_admin_distance_for_static_routes(tgen, input_dict):
command = "show ipv6 route json"
show_ip_route_json = run_frr_cmd(rnode, command, isjson=True)
- logger.info("Verifying admin distance for static route %s"
- " under dut %s:", static_route, router)
+ logger.info(
+ "Verifying admin distance for static route %s" " under dut %s:",
+ static_route,
+ router,
+ )
network = static_route["network"]
next_hop = static_route["next_hop"]
admin_distance = static_route["admin_distance"]
@@ -1819,23 +1843,32 @@ def verify_admin_distance_for_static_routes(tgen, input_dict):
if network in show_ip_route_json:
if route_data["nexthops"][0]["ip"] == next_hop:
if route_data["distance"] != admin_distance:
- errormsg = ("Verification failed: admin distance"
- " for static route {} under dut {},"
- " found:{} but expected:{}".
- format(static_route, router,
- route_data["distance"],
- admin_distance))
+ errormsg = (
+ "Verification failed: admin distance"
+ " for static route {} under dut {},"
+ " found:{} but expected:{}".format(
+ static_route,
+ router,
+ route_data["distance"],
+ admin_distance,
+ )
+ )
return errormsg
else:
- logger.info("Verification successful: admin"
- " distance for static route %s under"
- " dut %s, found:%s", static_route,
- router, route_data["distance"])
+ logger.info(
+ "Verification successful: admin"
+ " distance for static route %s under"
+ " dut %s, found:%s",
+ static_route,
+ router,
+ route_data["distance"],
+ )
else:
- errormsg = ("Static route {} not found in "
- "show_ip_route_json for dut {}".
- format(network, router))
+ errormsg = (
+ "Static route {} not found in "
+ "show_ip_route_json for dut {}".format(network, router)
+ )
return errormsg
logger.debug("Exiting lib API: verify_admin_distance_for_static_routes()")
@@ -1885,12 +1918,17 @@ def verify_prefix_lists(tgen, input_dict):
for prefix_list in prefix_lists_addr[addr_type].keys():
if prefix_list in show_prefix_list:
- errormsg = ("Prefix list {} is/are present in the router"
- " {}".format(prefix_list, router))
+ errormsg = (
+ "Prefix list {} is/are present in the router"
+ " {}".format(prefix_list, router)
+ )
return errormsg
- logger.info("Prefix list %s is/are not present in the router"
- " from router %s", prefix_list, router)
+ logger.info(
+ "Prefix list %s is/are not present in the router" " from router %s",
+ prefix_list,
+ router,
+ )
logger.debug("Exiting lib API: verify_prefix_lists()")
return True
@@ -1933,12 +1971,16 @@ def verify_route_maps(tgen, input_dict):
route_maps = input_dict[router]["route_maps"]
for route_map in route_maps:
if route_map in show_route_maps:
- errormsg = ("Route map {} is not deleted from router"
- " {}".format(route_map, router))
+ errormsg = "Route map {} is not deleted from router" " {}".format(
+ route_map, router
+ )
return errormsg
- logger.info("Route map %s is/are deleted successfully from"
- " router %s", route_maps, router)
+ logger.info(
+ "Route map %s is/are deleted successfully from" " router %s",
+ route_maps,
+ router,
+ )
logger.debug("Exiting lib API: verify_route_maps()")
return True
@@ -1977,47 +2019,60 @@ def verify_bgp_community(tgen, addr_type, router, network, input_dict=None):
rnode = tgen.routers()[router]
- logger.debug("Verifying BGP community attributes on dut %s: for %s "
- "network %s", router, addr_type, network)
+ logger.debug(
+ "Verifying BGP community attributes on dut %s: for %s " "network %s",
+ router,
+ addr_type,
+ network,
+ )
for net in network:
cmd = "show bgp {} {} json".format(addr_type, net)
show_bgp_json = rnode.vtysh_cmd(cmd, isjson=True)
logger.info(show_bgp_json)
if "paths" not in show_bgp_json:
- return "Prefix {} not found in BGP table of router: {}". \
- format(net, router)
+ return "Prefix {} not found in BGP table of router: {}".format(net, router)
as_paths = show_bgp_json["paths"]
found = False
for i in range(len(as_paths)):
- if "largeCommunity" in show_bgp_json["paths"][i] or \
- "community" in show_bgp_json["paths"][i]:
+ if (
+ "largeCommunity" in show_bgp_json["paths"][i]
+ or "community" in show_bgp_json["paths"][i]
+ ):
found = True
- logger.info("Large Community attribute is found for route:"
- " %s in router: %s", net, router)
+ logger.info(
+ "Large Community attribute is found for route:" " %s in router: %s",
+ net,
+ router,
+ )
if input_dict is not None:
for criteria, comm_val in input_dict.items():
- show_val = show_bgp_json["paths"][i][criteria][
- "string"]
+ show_val = show_bgp_json["paths"][i][criteria]["string"]
if comm_val == show_val:
- logger.info("Verifying BGP %s for prefix: %s"
- " in router: %s, found expected"
- " value: %s", criteria, net, router,
- comm_val)
+ logger.info(
+ "Verifying BGP %s for prefix: %s"
+ " in router: %s, found expected"
+ " value: %s",
+ criteria,
+ net,
+ router,
+ comm_val,
+ )
else:
- errormsg = "Failed: Verifying BGP attribute" \
- " {} for route: {} in router: {}" \
- ", expected value: {} but found" \
- ": {}".format(
- criteria, net, router, comm_val,
- show_val)
+ errormsg = (
+ "Failed: Verifying BGP attribute"
+ " {} for route: {} in router: {}"
+ ", expected value: {} but found"
+ ": {}".format(criteria, net, router, comm_val, show_val)
+ )
return errormsg
if not found:
errormsg = (
"Large Community attribute is not found for route: "
- "{} in router: {} ".format(net, router))
+ "{} in router: {} ".format(net, router)
+ )
return errormsg
logger.debug("Exiting lib API: verify_bgp_community()")
@@ -2057,25 +2112,24 @@ def verify_create_community_list(tgen, input_dict):
rnode = tgen.routers()[router]
- logger.info("Verifying large-community is created for dut %s:",
- router)
+ logger.info("Verifying large-community is created for dut %s:", router)
for comm_data in input_dict[router]["bgp_community_lists"]:
comm_name = comm_data["name"]
comm_type = comm_data["community_type"]
- show_bgp_community = \
- run_frr_cmd(rnode,
- "show bgp large-community-list {} detail".
- format(comm_name))
+ show_bgp_community = run_frr_cmd(
+ rnode, "show bgp large-community-list {} detail".format(comm_name)
+ )
# Verify community list and type
- if comm_name in show_bgp_community and comm_type in \
- show_bgp_community:
- logger.info("BGP %s large-community-list %s is"
- " created", comm_type, comm_name)
+ if comm_name in show_bgp_community and comm_type in show_bgp_community:
+ logger.info(
+ "BGP %s large-community-list %s is" " created", comm_type, comm_name
+ )
else:
- errormsg = "BGP {} large-community-list {} is not" \
- " created".format(comm_type, comm_name)
+ errormsg = "BGP {} large-community-list {} is not" " created".format(
+ comm_type, comm_name
+ )
return errormsg
logger.debug("Exiting lib API: verify_create_community_list()")