from lib.lutil import luCommand, luResult, LUtil
import json
import re
+import time
# gpz: get rib in json form and compare against desired routes
class BgpRib:
return 1
return 0
- def RequireVpnRoutes(self, target, title, wantroutes, debug=0):
+ def RequireVpnRoutes(self, target, title, wantroutes, retry=0, wait=1, debug=0):
import json
logstr = "RequireVpnRoutes " + str(wantroutes)
- # non json form for humans
- luCommand(
- target,
- 'vtysh -c "show bgp ipv4 vpn"',
- ".",
- "None",
- "Get VPN RIB (non-json)",
- )
- ret = luCommand(
- target,
- 'vtysh -c "show bgp ipv4 vpn json"',
- ".*",
- "None",
- "Get VPN RIB (json)",
- )
- if re.search(r"^\s*$", ret):
- # degenerate case: empty json means no routes
- if len(wantroutes) > 0:
- luResult(target, False, title, logstr)
- return
- luResult(target, True, title, logstr)
- rib = json.loads(ret)
- rds = rib["routes"]["routeDistinguishers"]
- for want in wantroutes:
- found = 0
- if debug:
- self.log("want rd %s" % want["rd"])
- for rd in rds.keys():
- if rd != want["rd"]:
- continue
+ retry += 1
+ while retry:
+ retry -= 1
+ # non json form for humans
+ luCommand(
+ target,
+ 'vtysh -c "show bgp ipv4 vpn"',
+ ".",
+ "None",
+ "Get VPN RIB (non-json)",
+ )
+ ret = luCommand(
+ target,
+ 'vtysh -c "show bgp ipv4 vpn json"',
+ ".*",
+ "None",
+ "Get VPN RIB (json)",
+ )
+ if re.search(r"^\s*$", ret):
+ # degenerate case: empty json means no routes
+ if len(wantroutes) > 0:
+ luResult(target, False, title, logstr)
+ return
+ luResult(target, True, title, logstr)
+ rib = json.loads(ret)
+ rds = rib["routes"]["routeDistinguishers"]
+ for want in wantroutes:
+ found = 0
if debug:
- self.log("found rd %s" % rd)
- table = rds[rd]
- if self.routes_include_wanted(table, want, debug):
- found = 1
- break
- if not found:
- luResult(target, False, title, logstr)
- return
- luResult(target, True, title, logstr)
+ self.log("want rd %s" % want["rd"])
+ for rd in rds.keys():
+ if rd != want["rd"]:
+ continue
+ if debug:
+ self.log("found rd %s" % rd)
+ table = rds[rd]
+ if self.routes_include_wanted(table, want, debug):
+ found = 1
+ break
+ if not found:
+ if retry:
+ break
+ luResult(target, False, title, logstr)
+ return
+ if not found and retry:
+ time.sleep(wait)
+ continue
+ luResult(target, True, title, logstr)
+ break
- def RequireUnicastRoutes(self, target, afi, vrf, title, wantroutes, debug=0):
+ def RequireUnicastRoutes(
+ self, target, afi, vrf, title, wantroutes, retry=0, wait=1, debug=0
+ ):
logstr = "RequireUnicastRoutes %s" % str(wantroutes)
vrfstr = ""
if vrf != "":
if (afi != "ipv4") and (afi != "ipv6"):
self.log("ERROR invalid afi")
- cmdstr = "show bgp %s %s unicast" % (vrfstr, afi)
- # non json form for humans
- cmd = 'vtysh -c "%s"' % cmdstr
- luCommand(target, cmd, ".", "None", "Get %s %s RIB (non-json)" % (vrfstr, afi))
- cmd = 'vtysh -c "%s json"' % cmdstr
- ret = luCommand(
- target, cmd, ".*", "None", "Get %s %s RIB (json)" % (vrfstr, afi)
- )
- if re.search(r"^\s*$", ret):
- # degenerate case: empty json means no routes
- if len(wantroutes) > 0:
- luResult(target, False, title, logstr)
+ retry += 1
+ while retry:
+ retry -= 1
+ cmdstr = "show bgp %s %s unicast" % (vrfstr, afi)
+ # non json form for humans
+ cmd = 'vtysh -c "%s"' % cmdstr
+ luCommand(
+ target, cmd, ".", "None", "Get %s %s RIB (non-json)" % (vrfstr, afi)
+ )
+ cmd = 'vtysh -c "%s json"' % cmdstr
+ ret = luCommand(
+ target, cmd, ".*", "None", "Get %s %s RIB (json)" % (vrfstr, afi)
+ )
+ if re.search(r"^\s*$", ret):
+ # degenerate case: empty json means no routes
+ if len(wantroutes) > 0:
+ luResult(target, False, title, logstr)
+ return
+ luResult(target, True, title, logstr)
+ rib = json.loads(ret)
+ try:
+ table = rib["routes"]
+ # KeyError: 'routes' probably means missing/bad VRF
+ except KeyError as err:
+ if vrf != "":
+ errstr = "-script ERROR: check if wrong vrf (%s)" % (vrf)
+ else:
+ errstr = "-script ERROR: check if vrf missing"
+ if retry:
+ time.sleep(wait)
+ continue
+ luResult(target, False, title + errstr, logstr)
return
+ # if debug:
+ # self.log("table=%s" % table)
+ for want in wantroutes:
+ if debug:
+ self.log("want=%s" % want)
+ if not self.routes_include_wanted(table, want, debug):
+ if retry:
+ time.sleep(wait)
+ continue
+ luResult(target, False, title, logstr)
+ return
luResult(target, True, title, logstr)
- rib = json.loads(ret)
- try:
- table = rib["routes"]
- # KeyError: 'routes' probably means missing/bad VRF
- except KeyError as err:
- if vrf != "":
- errstr = "-script ERROR: check if wrong vrf (%s)" % (vrf)
- else:
- errstr = "-script ERROR: check if vrf missing"
- luResult(target, False, title + errstr, logstr)
- return
- # if debug:
- # self.log("table=%s" % table)
- for want in wantroutes:
- if debug:
- self.log("want=%s" % want)
- if not self.routes_include_wanted(table, want, debug):
- luResult(target, False, title, logstr)
- return
- luResult(target, True, title, logstr)
+ break
BgpRib = BgpRib()
-def bgpribRequireVpnRoutes(target, title, wantroutes, debug=0):
- BgpRib.RequireVpnRoutes(target, title, wantroutes, debug)
+def bgpribRequireVpnRoutes(target, title, wantroutes, retry=0, wait=1, debug=0):
+ BgpRib.RequireVpnRoutes(target, title, wantroutes, retry, wait, debug)
-def bgpribRequireUnicastRoutes(target, afi, vrf, title, wantroutes, debug=0):
- BgpRib.RequireUnicastRoutes(target, afi, vrf, title, wantroutes, debug)
+def bgpribRequireUnicastRoutes(
+ target, afi, vrf, title, wantroutes, retry=0, wait=1, debug=0
+):
+ BgpRib.RequireUnicastRoutes(target, afi, vrf, title, wantroutes, retry, wait, debug)