POST_POLICY = "post-policy"
LOC_RIB = "loc-rib"
+UPDATE_EXPECTED_JSON = False
+DEBUG_PCAP = False
+
def build_topo(tgen):
tgen.add_router("r1")
"""
)
+ if DEBUG_PCAP:
+ tgen.gears["r1"].run("rm /tmp/bmp_vrf.pcap")
+ tgen.gears["r1"].run(
+ "tcpdump -nni r1-eth0 -s 0 -w /tmp/bmp_vrf.pcap &", stdout=None
+ )
+
for rname, router in tgen.routers().items():
router.load_config(
TopoRouter.RD_ZEBRA, os.path.join(CWD, "{}/zebra.conf".format(rname))
return messages
-def check_for_prefixes(expected_prefixes, bmp_log_type, policy, labels=None):
+def update_seq():
+ global SEQ
+
+ messages = get_bmp_messages()
+
+ if len(messages):
+ SEQ = messages[-1]["seq"]
+
+
+def update_expected_files(bmp_actual, expected_prefixes, bmp_log_type, policy, step):
+ tgen = get_topogen()
+
+ with open(f"/tmp/bmp-{bmp_log_type}-{policy}-step{step}.json", "w") as json_file:
+ json.dump(bmp_actual, json_file, indent=4)
+
+ out = tgen.gears["r1"].vtysh_cmd("show bgp vrf vrf1 ipv4 json", isjson=True)
+ filtered_out = {
+ "routes": {
+ prefix: route_info
+ for prefix, route_info in out["routes"].items()
+ if prefix in expected_prefixes
+ }
+ }
+ if bmp_log_type == "withdraw":
+ for pfx in expected_prefixes:
+ if "::" in pfx:
+ continue
+ filtered_out["routes"][pfx] = None
+
+ # ls /tmp/show*json | while read file; do egrep -v 'prefix|network|metric|ocPrf|version|weight|peerId|vrf|Version|valid|Reason|fe80' $file >$(basename $file); echo >> $(basename $file); done
+ with open(f"/tmp/show-bgp-ipv4-{bmp_log_type}-step{step}.json", "w") as json_file:
+ json.dump(filtered_out, json_file, indent=4)
+
+ out = tgen.gears["r1"].vtysh_cmd("show bgp vrf vrf1 ipv6 json", isjson=True)
+ filtered_out = {
+ "routes": {
+ prefix: route_info
+ for prefix, route_info in out["routes"].items()
+ if prefix in expected_prefixes
+ }
+ }
+ if bmp_log_type == "withdraw":
+ for pfx in expected_prefixes:
+ if "::" not in pfx:
+ continue
+ filtered_out["routes"][pfx] = None
+
+ with open(f"/tmp/show-bgp-ipv6-{bmp_log_type}-step{step}.json", "w") as json_file:
+ json.dump(filtered_out, json_file, indent=4)
+
+
+def check_for_prefixes(expected_prefixes, bmp_log_type, policy, step):
"""
Check for the presence of the given prefixes in the BMP server logs with
the given message type and the set policy.
+
"""
global SEQ
+
# we care only about the new messages
messages = [
m for m in sorted(get_bmp_messages(), key=lambda d: d["seq"]) if m["seq"] > SEQ
]
- # get the list of pairs (prefix, policy, seq) for the given message type
- prefixes = [
- m["ip_prefix"]
- for m in messages
- if "ip_prefix" in m.keys()
- and "bmp_log_type" in m.keys()
- and m["bmp_log_type"] == bmp_log_type
- and m["policy"] == policy
- and (
- labels is None
- or (
- m["ip_prefix"] in labels.keys() and m["label"] == labels[m["ip_prefix"]]
- )
- )
- ]
-
- # check for prefixes
- for ep in expected_prefixes:
- if ep not in prefixes:
- msg = "The prefix {} is not present in the {} log messages."
- logger.debug(msg.format(ep, bmp_log_type))
- return False
-
- SEQ = messages[-1]["seq"]
- return True
+ # create empty initial files
+ # for step in $(seq 1); do
+ # for i in "update" "withdraw"; do
+ # for j in "pre-policy" "post-policy" "loc-rib"; do
+ # echo '{"null": {}}'> bmp-$i-$j-step$step.json
+ # done
+ # done
+ # done
+
+ ref_file = f"{CWD}/bmp1/bmp-{bmp_log_type}-{policy}-step{step}.json"
+ expected = json.loads(open(ref_file).read())
+
+ # Build actual json from logs
+ actual = {}
+ for m in messages:
+ if (
+ "bmp_log_type" in m.keys()
+ and "ip_prefix" in m.keys()
+ and m["ip_prefix"] in expected_prefixes
+ and m["bmp_log_type"] == bmp_log_type
+ and m["policy"] == policy
+ ):
+ policy_dict = actual.setdefault(m["policy"], {})
+ bmp_log_type_dict = policy_dict.setdefault(m["bmp_log_type"], {})
+
+ # Add or update the ip_prefix dictionary with filtered key-value pairs
+ bmp_log_type_dict[m["ip_prefix"]] = {
+ k: v
+ for k, v in sorted(m.items())
+ # filter out variable keys
+ if k not in ["timestamp", "seq", "nxhp_link-local"]
+ and (
+ # When policy is loc-rib, the peer-distinguisher is 0:0
+ # for the default VRF or the RD if any or the 0:<vrf_id>.
+ # 0:<vrf_id> is used to distinguished. RFC7854 says: "If the
+ # peer is a "Local Instance Peer", it is set to a unique,
+ # locally defined value." The value is not tested because it
+ # is variable.
+ k != "peer_distinguisher"
+ or policy != LOC_RIB
+ or v == "0:0"
+ or not v.startswith("0:")
+ )
+ }
+
+ # build expected JSON files
+ if (
+ UPDATE_EXPECTED_JSON
+ and actual
+ and set(actual.get(policy, {}).get(bmp_log_type, {}).keys())
+ == set(expected_prefixes)
+ ):
+ update_expected_files(actual, expected_prefixes, bmp_log_type, policy, step)
+
+ return topotest.json_cmp(actual, expected, exact=True)
def check_for_peer_message(expected_peers, bmp_log_type):
tgen = get_topogen()
set_bmp_policy(tgen, "r1", 65501, "bmp1", "unicast", policy, vrf="vrf1")
+ update_seq()
+
prefixes = ["172.31.0.15/32", "2111::1111/128"]
# add prefixes
configure_prefixes(tgen, "r2", 65502, "unicast", prefixes)
logger.info("checking for updated prefixes")
+
+ for ipver in [4, 6]:
+ if UPDATE_EXPECTED_JSON:
+ continue
+ ref_file = "{}/r1/show-bgp-ipv{}-update-step1.json".format(CWD, ipver)
+ expected = json.loads(open(ref_file).read())
+
+ test_func = partial(
+ topotest.router_json_cmp,
+ tgen.gears["r1"],
+ f"show bgp vrf vrf1 ipv{ipver} json",
+ expected,
+ )
+ _, res = topotest.run_and_expect(test_func, None, count=30, wait=1)
+ assertmsg = f"r1: BGP IPv{ipver} convergence failed"
+ assert res is None, assertmsg
+
# check
- test_func = partial(check_for_prefixes, prefixes, "update", policy)
- success, _ = topotest.run_and_expect(test_func, True, count=30, wait=1)
- assert success, "Checking the updated prefixes has been failed !."
+ test_func = partial(check_for_prefixes, prefixes, "update", policy, 1)
+ success, res = topotest.run_and_expect(test_func, None, count=30, wait=1)
+ assert success, "Checking the updated prefixes has been failed ! %s" % res
+
+ update_seq()
# withdraw prefixes
configure_prefixes(tgen, "r2", 65502, "unicast", prefixes, update=False)
- logger.info("checking for withdrawed prefxies")
+
+ logger.info("checking for withdrawn prefixes")
+
+ for ipver in [4, 6]:
+ if UPDATE_EXPECTED_JSON:
+ continue
+ ref_file = "{}/r1/show-bgp-ipv{}-withdraw-step1.json".format(CWD, ipver)
+ expected = json.loads(open(ref_file).read())
+
+ test_func = partial(
+ topotest.router_json_cmp,
+ tgen.gears["r1"],
+ f"show bgp vrf vrf1 ipv{ipver} json",
+ expected,
+ )
+ _, res = topotest.run_and_expect(test_func, None, count=30, wait=1)
+ assertmsg = f"r1: BGP IPv{ipver} convergence failed"
+ assert res is None, assertmsg
+
# check
- test_func = partial(check_for_prefixes, prefixes, "withdraw", policy)
- success, _ = topotest.run_and_expect(test_func, True, count=30, wait=1)
- assert success, "Checking the withdrawed prefixes has been failed !."
+ test_func = partial(check_for_prefixes, prefixes, "withdraw", policy, 1)
+ success, res = topotest.run_and_expect(test_func, None, count=30, wait=1)
+ assert success, "Checking the withdrawn prefixes has been failed ! %s" % res
def test_bmp_server_logging():