From a5124c49d37c7474ad720f387307ff62f680a297 Mon Sep 17 00:00:00 2001 From: Christian Hopps Date: Fri, 30 Jul 2021 14:45:56 +0000 Subject: [PATCH] tests: add helper object for mcast-tester and iperf tool. Signed-off-by: Christian Hopps --- tests/topotests/lib/common_config.py | 359 +++++++++++++++------------ tests/topotests/lib/mcast-tester.py | 51 ++-- tests/topotests/lib/pim.py | 116 +++++++++ 3 files changed, 341 insertions(+), 185 deletions(-) diff --git a/tests/topotests/lib/common_config.py b/tests/topotests/lib/common_config.py index de3aa534a4..fc51b11141 100644 --- a/tests/topotests/lib/common_config.py +++ b/tests/topotests/lib/common_config.py @@ -44,6 +44,7 @@ except ImportError: import configparser from io import StringIO +from lib.micronet import comm_error from lib.topogen import TopoRouter, get_topogen from lib.topolog import get_logger, logger from lib.topotest import frr_unicode, interface_set_status, version_cmp @@ -4439,202 +4440,232 @@ def required_linux_kernel_version(required_version): return True -def iperfSendIGMPJoin(tgen, server, bindToAddress, l4Type="UDP", join_interval=1): - """ - Run iperf to send IGMP join and traffic +class HostApplicationHelper (object): + """Helper to track and cleanup per-host based test processes.""" - Parameters: - ----------- - * `tgen` : Topogen object - * `l4Type`: string, one of [ TCP, UDP ] - * `server`: iperf server, from where IGMP join would be sent - * `bindToAddress`: bind to , an interface or multicast - address - * `join_interval`: seconds between periodic bandwidth reports + def __init__(self, tgen=None, base_cmd=None): + self.base_cmd_str = "" + self.host_procs = {} + self.tgen = None + self.set_base_cmd(base_cmd if base_cmd else []) + if tgen is not None: + self.init(tgen) - returns: - -------- - errormsg or True - """ + def __enter__ (self): + self.init() + return self - logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name)) + def __exit__ (self ,type, value, traceback): + self.cleanup() - rnode = tgen.gears[server] + def __str__ (self): + return "HostApplicationHelper({})".format(self.base_cmd_str) - iperf_path = tgen.net.get_exec_path("iperf") + def set_base_cmd(self, base_cmd): + assert isinstance(base_cmd, list) or isinstance(base_cmd, tuple) + self.base_cmd = base_cmd + if base_cmd: + self.base_cmd_str = " ".join(base_cmd) + else: + self.base_cmd_str = "" - if bindToAddress and not isinstance(bindToAddress, list): - bindToAddress = [ipaddress.IPv4Address(frr_unicode(bindToAddress))] + def init(self, tgen=None): + """Initialize the helper with tgen if needed. - for bindTo in bindToAddress: - iperf_args = [iperf_path, "-s"] + If overridden, need to handle multiple entries but one init. Will be called on + object creation if tgen is supplied. Will be called again on __enter__ so should + not re-init if already inited. + """ + if self.tgen: + assert tgen is None or self.tgen == tgen + else: + self.tgen = tgen - # UDP/TCP - if l4Type == "UDP": - iperf_args.append("-u") + def started_proc(self, host, p): + """Called after process started on host. - iperf_args.append("-B") - iperf_args.append(str(bindTo)) + Return value is passed to `stopping_proc` method.""" + logger.debug("%s: Doing nothing after starting process", self) + return False - # Join interval - if join_interval: - iperf_args.append("-i") - iperf_args.append(str(join_interval)) + def stopping_proc(self, host, p, info): + """Called after process started on host.""" + logger.debug("%s: Doing nothing before stopping process", self) + + def _add_host_proc(self, host, p): + v = self.started_proc(host, p) + + if host not in self.host_procs: + self.host_procs[host] = [] + logger.debug("%s: %s: tracking process %s", self, host, p) + self.host_procs[host].append((p, v)) + + def stop_host(self, host): + """Stop the process on the host. + + Override to do additional cleanup.""" + if host in self.host_procs: + hlogger = self.tgen.net[host].logger + for p, v in self.host_procs[host]: + self.stopping_proc(host, p, v) + logger.debug("%s: %s: terminating process %s", self, host, p.pid) + hlogger.debug("%s: %s: terminating process %s", self, host, p.pid) + rc = p.poll() + if rc is not None: + logger.error("%s: %s: process early exit %s: %s", self, host, p.pid, comm_error(p)) + hlogger.error("%s: %s: process early exit %s: %s", self, host, p.pid, comm_error(p)) + else: + p.terminate() + p.wait() + logger.debug("%s: %s: terminated process %s: %s", self, host, p.pid, comm_error(p)) + hlogger.debug("%s: %s: terminated process %s: %s", self, host, p.pid, comm_error(p)) + + del self.host_procs[host] + + def stop_all_hosts(self): + hosts = set(self.host_procs) + for host in hosts: + self.stop_host(host) + + def cleanup(self): + self.stop_all_hosts() + + def run(self, host, cmd_args, **kwargs): + cmd = list(self.base_cmd) + cmd.extend(cmd_args) + p = self.tgen.gears[host].popen(cmd, **kwargs) + assert p.poll() is None + self._add_host_proc(host, p) + return p + + def check_procs(self): + """Check that all current processes are running, log errors if not. + + Returns: List of stopped processes.""" + procs = [] + + logger.debug("%s: checking procs on hosts %s", self, self.host_procs.keys()) + + for host in self.host_procs: + hlogger = self.tgen.net[host].logger + for p, _ in self.host_procs[host]: + logger.debug("%s: checking %s proc %s", self, host, p) + rc = p.poll() + if rc is None: + continue + logger.error("%s: %s proc exited: %s", self, host, comm_error(p), exc_info=True) + hlogger.error("%s: %s proc exited: %s", self, host, comm_error(p), exc_info=True) + procs.append(p) + return procs - # Run iperf command to send IGMP join - logger.debug("[DUT: %s]: Running command: %s",server, iperf_args) - p = rnode.popen(iperf_args, stderr=subprocess.STDOUT) - rc = p.poll() - if rc is not None: - output, _ = p.communicate() - if rc: - errormsg = "IGMP join is not sent for {}. Error: {}".format(bindTo, output) - logger.error("%s", output) - return errormsg +class IPerfHelper(HostApplicationHelper): - if server not in g_iperf_server_procs: - g_iperf_server_procs[server] = [] - g_iperf_server_procs[server].append(p) + def __str__ (self): + return "IPerfHelper()" - logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name)) - return True + def run_join(self, host, join_addr, l4Type="UDP", join_interval=1, join_intf=None, join_towards=None): + """ + Use iperf to send IGMP join and listen to traffic + Parameters: + ----------- + * `host`: iperf host from where IGMP join would be sent + * `l4Type`: string, one of [ TCP, UDP ] + * `join_addr`: multicast address (or addresses) to join to + * `join_interval`: seconds between periodic bandwidth reports + * `join_intf`: the interface to bind the join to + * `join_towards`: router whos interface to bind the join to -def iperfSendTraffic( - tgen, - client, - sentToAddress, - ttl, - time=0, - l4Type="UDP", - bindToIntf=None, -): - """ - Run iperf to send IGMP join and traffic + returns: Success (bool) + """ - Parameters: - ----------- - * `tgen` : Topogen object - * `l4Type`: string, one of [ TCP, UDP ] - * `client`: iperf client, from where iperf traffic would be sent - * `sentToAddress`: bind to , an interface or multicast - address - * `ttl`: time to live - * `time`: time in seconds to transmit for - * `bindToIntf`: Source interface ip address + iperf_path = self.tgen.net.get_exec_path("iperf") - returns: - -------- - errormsg or True - """ + assert join_addr + if not isinstance(join_addr, list) and not isinstance(join_addr, tuple): + join_addr = [ipaddress.IPv4Address(frr_unicode(join_addr))] - logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name)) + for bindTo in join_addr: + iperf_args = [iperf_path, "-s"] - rnode = tgen.gears[client] + if l4Type == "UDP": + iperf_args.append("-u") - iperf_path = tgen.net.get_exec_path("iperf") + iperf_args.append("-B") + if join_towards: + to_intf = frr_unicode(self.tgen.json_topo["routers"][host]["links"][join_towards]["interface"]) + iperf_args.append("{}%{}".format(str(bindTo), to_intf)) + elif join_intf: + iperf_args.append("{}%{}".format(str(bindTo), join_intf)) + else: + iperf_args.append(str(bindTo)) - if sentToAddress and not isinstance(sentToAddress, list): - sentToAddress = [ipaddress.IPv4Address(frr_unicode(sentToAddress))] - for sendTo in sentToAddress: - iperf_args = [iperf_path, "-c", sendTo] + if join_interval: + iperf_args.append("-i") + iperf_args.append(str(join_interval)) - # Bind to Interface IP - if bindToIntf: - ifaddr = frr_unicode(tgen.json_topo["routers"][client]["links"][bindToIntf]["ipv4"]) - ipaddr = ipaddress.IPv4Interface(ifaddr).ip - iperf_args.append("-B") - iperf_args.append(str(ipaddr)) - - # UDP/TCP - if l4Type == "UDP": - iperf_args.append("-u") - iperf_args.append("-b") - iperf_args.append("0.012m") - - # TTL - if ttl: - iperf_args.append("-T") - iperf_args.append(str(ttl)) - - # Time - if time: - iperf_args.append("-t") - iperf_args.append(str(time)) - - # Run iperf command to send multicast traffic - logger.debug("[DUT: {}]: Running command: {}".format(client, iperf_args)) - - p = rnode.popen(iperf_args, stderr=subprocess.STDOUT) - rc = p.poll() - if rc is not None: - output, _ = p.communicate() - if rc: - errormsg = "Multicast traffic is not sent for {}. Error {}".format( - sendTo, output - ) - logger.error(output) - return errormsg + p = self.run(host, iperf_args) + if p.poll() is not None: + logger.error("IGMP join failed on %s: %s", bindTo, comm_error(p)) + return False + return True - if client not in g_iperf_client_procs: - g_iperf_client_procs[client] = [] - g_iperf_client_procs[client].append(p) - logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name)) - return True + def run_traffic(self, host, sentToAddress, ttl, time=0, l4Type="UDP", bind_towards=None): + """ + Run iperf to send IGMP join and traffic + Parameters: + ----------- + * `host`: iperf host to send traffic from + * `l4Type`: string, one of [ TCP, UDP ] + * `sentToAddress`: multicast address to send traffic to + * `ttl`: time to live + * `time`: time in seconds to transmit for + * `bind_towards`: Router who's interface the source ip address is got from -def kill_iperf(tgen, dut=None, action=None): - """ - Killing iperf process if running for any router in topology - Parameters: - ----------- - * `tgen` : Topogen object - * `dut` : Any iperf hostname to send igmp prune - * `action`: to kill igmp join iperf action is remove_join - to kill traffic iperf action is remove_traffic + returns: Success (bool) + """ - Usage - ---- - kill_iperf(tgen, dut ="i6", action="remove_join") + iperf_path = self.tgen.net.get_exec_path("iperf") - """ + if sentToAddress and not isinstance(sentToAddress, list): + sentToAddress = [ipaddress.IPv4Address(frr_unicode(sentToAddress))] - logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name)) - logger.debug("Running iperfs: clients: %s servers: %s", g_iperf_client_procs, g_iperf_server_procs) + for sendTo in sentToAddress: + iperf_args = [iperf_path, "-c", sendTo] - if dut is not None: - nodes = [dut] - else: - nodes = sorted(tgen.gears.keys()) - - for name in nodes: - logger.debug("Checking for iperfs on %s", name) - if action == "remove_join": - procs = g_iperf_server_procs[name] if name in g_iperf_server_procs else [] - g_iperf_server_procs[name] = [] - elif action == "remove_traffic": - procs = g_iperf_client_procs[name] if name in g_iperf_client_procs else [] - g_iperf_client_procs[name] = [] - else: - procs = [] - if name in g_iperf_server_procs: - procs.extend(g_iperf_server_procs[name]) - g_iperf_server_procs[name] = [] - if name in g_iperf_client_procs: - procs.extend(g_iperf_client_procs[name]) - g_iperf_client_procs[name] = [] - for p in procs: - logger.info("[DUT: {}]: Terminating iperf: [{}]".format(name, p.pid)) - # p.send_signal(signal.SIGHUP) - p.terminate() - for p in procs: - logger.info("[DUT: {}]: Waiting for iperf to terminate: [{}]".format(name, p.pid)) - p.wait() + # Bind to Interface IP + if bind_towards: + ifaddr = frr_unicode(self.tgen.json_topo["routers"][host]["links"][bind_towards]["ipv4"]) + ipaddr = ipaddress.IPv4Interface(ifaddr).ip + iperf_args.append("-B") + iperf_args.append(str(ipaddr)) - logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name)) + # UDP/TCP + if l4Type == "UDP": + iperf_args.append("-u") + iperf_args.append("-b") + iperf_args.append("0.012m") + + # TTL + if ttl: + iperf_args.append("-T") + iperf_args.append(str(ttl)) + + # Time + if time: + iperf_args.append("-t") + iperf_args.append(str(time)) + + p = self.run(host, iperf_args) + if p.poll() is not None: + logger.error("mcast traffic send failed for %s: %s", sendTo, comm_error(p)) + return False + + return True def verify_ip_nht(tgen, input_dict): @@ -4675,7 +4706,7 @@ def verify_ip_nht(tgen, input_dict): rnode = router_list[router] nh_list = input_dict[router] - if validate_ip_address(next(iter(nh_list))) is "ipv6": + if validate_ip_address(next(iter(nh_list))) == "ipv6": show_ip_nht = run_frr_cmd(rnode, "show ipv6 nht") else: show_ip_nht = run_frr_cmd(rnode, "show ip nht") diff --git a/tests/topotests/lib/mcast-tester.py b/tests/topotests/lib/mcast-tester.py index 117bf79eb9..a594c4c88d 100755 --- a/tests/topotests/lib/mcast-tester.py +++ b/tests/topotests/lib/mcast-tester.py @@ -60,9 +60,9 @@ def multicast_join(sock, ifindex, group, port): # Main code. # parser = argparse.ArgumentParser(description="Multicast RX utility") -parser.add_argument('socket', help='Point to topotest UNIX socket') parser.add_argument('group', help='Multicast IP') parser.add_argument('interface', help='Interface name') +parser.add_argument('--socket', help='Point to topotest UNIX socket') parser.add_argument( '--send', help='Transmit instead of join with interval', @@ -84,14 +84,19 @@ if os.geteuid() != 0: sys.exit(1) # Wait for topotest to synchronize with us. -toposock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM, 0) -while True: - try: - toposock.connect(args.socket) - break - except ConnectionRefusedError: - time.sleep(1) - continue +if not args.socket: + toposock = None +else: + toposock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM, 0) + while True: + try: + toposock.connect(args.socket) + break + except ConnectionRefusedError: + time.sleep(1) + continue + # Set topotest socket non blocking so we can multiplex the main loop. + toposock.setblocking(False) msock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) if args.send > 0: @@ -105,26 +110,30 @@ if args.send > 0: socket.IPPROTO_IP, socket.IP_MULTICAST_TTL, struct.pack("b", ttl)) # Block to ensure packet send. msock.setblocking(True) - # Set topotest socket non blocking so we can multiplex the main loop. - toposock.setblocking(False) else: multicast_join(msock, ifindex, args.group, port) +def should_exit(): + if not toposock: + # If we are sending then we have slept + if not args.send: + time.sleep(100) + return False + else: + try: + data = toposock.recv(1) + if data == b'': + print(' -> Connection closed') + return True + except BlockingIOError: + return False + counter = 0 -while True: +while not should_exit(): if args.send > 0: msock.sendto(b"test %d" % counter, (args.group, port)) counter += 1 time.sleep(args.send) - try: - data = toposock.recv(1) - if data == b'': - print(' -> Connection closed') - break - except BlockingIOError: - continue - msock.close() - sys.exit(0) diff --git a/tests/topotests/lib/pim.py b/tests/topotests/lib/pim.py index bb964df7dc..2cb4777a94 100644 --- a/tests/topotests/lib/pim.py +++ b/tests/topotests/lib/pim.py @@ -19,7 +19,9 @@ import datetime import os import re +import socket import sys +import tempfile import traceback from copy import deepcopy from time import sleep @@ -29,12 +31,16 @@ import pytest # Import common_config to use commomnly used APIs from lib.common_config import ( create_common_configurations, + HostApplicationHelper, + InvalidCLIError, create_common_configuration, InvalidCLIError, retry, run_frr_cmd, ) +from lib.micronet import comm_error, get_exec_path from lib.topolog import logger +from lib.topotest import frr_unicode #### CWD = os.path.dirname(os.path.realpath(__file__)) @@ -3412,3 +3418,113 @@ def verify_igmp_interface(tgen, topo, dut, igmp_iface, interface_ip, expected=Tr logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name)) return True + + +class McastTesterHelper (HostApplicationHelper): + + def __init__(self, tgen=None): + self.script_path = os.path.join(CWD, "mcast-tester.py") + self.host_conn = {} + self.listen_sock = None + + # # Get a temporary file for socket path + # (fd, sock_path) = tempfile.mkstemp("-mct.sock", "tmp" + str(os.getpid())) + # os.close(fd) + # os.remove(sock_path) + # self.app_sock_path = sock_path + + # # Listen on unix socket + # logger.debug("%s: listening on socket %s", self, self.app_sock_path) + # self.listen_sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM, 0) + # self.listen_sock.settimeout(10) + # self.listen_sock.bind(self.app_sock_path) + # self.listen_sock.listen(10) + + python3_path = get_exec_path(["python3", "python"]) + super(McastTesterHelper, self).__init__( + tgen, + # [python3_path, self.script_path, self.app_sock_path] + [python3_path, self.script_path] + ) + + def __str__ (self): + return "McastTesterHelper({})".format(self.script_path) + + def run_join(self, host, join_addrs, join_towards=None, join_intf=None): + """ + Join a UDP multicast group. + + One of join_towards or join_intf MUST be set. + + Parameters: + ----------- + * `host`: host from where IGMP join would be sent + * `join_addrs`: multicast address (or addresses) to join to + * `join_intf`: the interface to bind the join[s] to + * `join_towards`: router whos interface to bind the join[s] to + """ + if not isinstance(join_addrs, list) and not isinstance(join_addrs, tuple): + join_addrs = [join_addrs] + + if join_towards: + join_intf = frr_unicode(self.tgen.json_topo["routers"][host]["links"][join_towards]["interface"]) + else: + assert join_intf + + for join in join_addrs: + self.run(host, [join, join_intf]) + + return True + + def run_traffic(self, host, send_to_addrs, bind_towards=None, bind_intf=None): + """ + Send UDP multicast traffic. + + One of bind_towards or bind_intf MUST be set. + + Parameters: + ----------- + * `host`: host to send traffic from + * `send_to_addrs`: multicast address (or addresses) to send traffic to + * `bind_towards`: Router who's interface the source ip address is got from + """ + if bind_towards: + bind_intf = frr_unicode(self.tgen.json_topo["routers"][host]["links"][bind_towards]["interface"]) + else: + assert bind_intf + + if not isinstance(send_to_addrs, list) and not isinstance(send_to_addrs, tuple): + send_to_addrs = [send_to_addrs] + + for send_to in send_to_addrs: + self.run(host, ["--send=0.7", send_to, bind_intf]) + + return True + + # def cleanup(self): + # super(McastTesterHelper, self).cleanup() + + # if not self.listen_sock: + # return + + # logger.debug("%s: closing listen socket %s", self, self.app_sock_path) + # self.listen_sock.close() + # self.listen_sock = None + + # if os.path.exists(self.app_sock_path): + # os.remove(self.app_sock_path) + + # def started_proc(self, host, p): + # logger.debug("%s: %s: accepting on socket %s", self, host, self.app_sock_path) + # try: + # conn = self.listen_sock.accept() + # return conn + # except Exception as error: + # logger.error("%s: %s: accept on socket failed: %s", self, host, error) + # if p.poll() is not None: + # logger.error("%s: %s: helper app quit: %s", self, host, comm_error(p)) + # raise + + # def stopping_proc(self, host, p, conn): + # logger.debug("%s: %s: closing socket %s", self, host, conn) + # conn[0].close() -- 2.39.5