summaryrefslogtreecommitdiff
path: root/tests/topotests/lib/pim.py
diff options
context:
space:
mode:
Diffstat (limited to 'tests/topotests/lib/pim.py')
-rw-r--r--tests/topotests/lib/pim.py29
1 files changed, 7 insertions, 22 deletions
diff --git a/tests/topotests/lib/pim.py b/tests/topotests/lib/pim.py
index 925890b324..f7440efd6d 100644
--- a/tests/topotests/lib/pim.py
+++ b/tests/topotests/lib/pim.py
@@ -1,35 +1,35 @@
+# -*- coding: utf-8 eval: (blacken-mode 1) -*-
# SPDX-License-Identifier: ISC
# Copyright (c) 2019 by VMware, Inc. ("VMware")
# Used Copyright (c) 2018 by Network Device Education Foundation, Inc.
# ("NetDEF") in this file.
import datetime
+import functools
import os
import re
import sys
import traceback
-import functools
from copy import deepcopy
from time import sleep
-from lib import topotest
-
# Import common_config to use commomnly used APIs
from lib.common_config import (
- create_common_configurations,
HostApplicationHelper,
InvalidCLIError,
create_common_configuration,
- InvalidCLIError,
+ create_common_configurations,
+ get_frr_ipv6_linklocal,
retry,
run_frr_cmd,
validate_ip_address,
- get_frr_ipv6_linklocal,
)
from lib.micronet import get_exec_path
from lib.topolog import logger
from lib.topotest import frr_unicode
+from lib import topotest
+
####
CWD = os.path.dirname(os.path.realpath(__file__))
@@ -593,7 +593,6 @@ def find_rp_details(tgen, topo):
topo_data = topo["routers"]
for router in router_list.keys():
-
if "pim" not in topo_data[router]:
continue
@@ -1133,7 +1132,7 @@ def verify_upstream_iif(
grp_addr,
in_interface,
group_addr_json[src_address]["inboundInterface"],
- joinState,
+ "Joined",
group_addr_json[src_address]["joinState"],
)
)
@@ -1495,7 +1494,6 @@ def verify_mroutes(
and data["outboundInterface"] in oil
):
if return_uptime:
-
uptime_dict[grp_addr][src_address] = data["upTime"]
logger.info(
@@ -1917,7 +1915,6 @@ def get_pim_interface_traffic(tgen, input_dict):
for intf, data in input_dict[dut].items():
interface_json = show_pim_intf_traffic_json[intf]
for state in data:
-
# Verify Tx/Rx
if state in interface_json:
output_dict[dut][state] = interface_json[state]
@@ -1990,7 +1987,6 @@ def get_pim6_interface_traffic(tgen, input_dict):
for intf, data in input_dict[dut].items():
interface_json = show_pim_intf_traffic_json[intf]
for state in data:
-
# Verify Tx/Rx
if state in interface_json:
output_dict[dut][state] = interface_json[state]
@@ -3007,7 +3003,6 @@ def verify_pim_upstream_rpf(
logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
if "pim" in topo["routers"][dut]:
-
logger.info("[DUT: %s]: Verifying ip pim upstream rpf:", dut)
rnode = tgen.routers()[dut]
@@ -3245,7 +3240,6 @@ def verify_pim_join(
grp_addr = grp_addr.split("/")[0]
for source, data in interface_json[grp_addr].items():
-
# Verify pim join
if pim_join:
if data["group"] == grp_addr and data["channelJoinName"] == "JOIN":
@@ -3338,7 +3332,6 @@ def verify_igmp_config(tgen, input_dict, stats_return=False, expected=True):
rnode = tgen.routers()[dut]
for interface, data in input_dict[dut]["igmp"]["interfaces"].items():
-
statistics = False
report = False
if "statistics" in input_dict[dut]["igmp"]["interfaces"][interface]["igmp"]:
@@ -3623,7 +3616,6 @@ def verify_pim_config(tgen, input_dict, expected=True):
rnode = tgen.routers()[dut]
for interface, data in input_dict[dut]["pim"]["interfaces"].items():
-
logger.info("[DUT: %s]: Verifying PIM interface %s detail:", dut, interface)
show_ip_igmp_intf_json = run_frr_cmd(
@@ -3772,7 +3764,6 @@ def verify_multicast_traffic(tgen, input_dict, return_traffic=False, expected=Tr
elif (
interface_json["pktsIn"] != 0 and interface_json["bytesIn"] != 0
):
-
traffic_dict[traffic_type][interface][
"pktsIn"
] = interface_json["pktsIn"]
@@ -3836,7 +3827,6 @@ def verify_multicast_traffic(tgen, input_dict, return_traffic=False, expected=Tr
interface_json["pktsOut"] != 0
and interface_json["bytesOut"] != 0
):
-
traffic_dict[traffic_type][interface][
"pktsOut"
] = interface_json["pktsOut"]
@@ -4232,7 +4222,6 @@ def verify_local_igmp_groups(tgen, dut, interface, group_addresses):
group_addresses = [group_addresses]
if interface not in show_ip_local_igmp_json:
-
errormsg = (
"[DUT %s]: Verifying local IGMP group received"
" from interface %s [FAILED]!! " % (dut, interface)
@@ -4319,7 +4308,6 @@ def verify_pim_interface_traffic(tgen, input_dict, return_stats=True, addr_type=
for intf, data in input_dict[dut].items():
interface_json = show_pim_intf_traffic_json[intf]
for state in data:
-
# Verify Tx/Rx
if state in interface_json:
output_dict[dut][state] = interface_json[state]
@@ -4525,7 +4513,6 @@ def verify_mld_config(tgen, input_dict, stats_return=False, expected=True):
for dut in input_dict.keys():
rnode = tgen.routers()[dut]
for interface, data in input_dict[dut]["mld"]["interfaces"].items():
-
statistics = False
report = False
if "statistics" in input_dict[dut]["mld"]["interfaces"][interface]["mld"]:
@@ -5040,7 +5027,6 @@ def verify_pim6_config(tgen, input_dict, expected=True):
rnode = tgen.routers()[dut]
for interface, data in input_dict[dut]["pim6"]["interfaces"].items():
-
logger.info(
"[DUT: %s]: Verifying PIM6 interface %s detail:", dut, interface
)
@@ -5158,7 +5144,6 @@ def verify_local_mld_groups(tgen, dut, interface, group_addresses):
group_addresses = [group_addresses]
if interface not in show_ipv6_local_mld_json["default"]:
-
errormsg = (
"[DUT %s]: Verifying local MLD group received"
" from interface %s [FAILED]!! " % (dut, interface)