from . import cli
from . import parser
+from .args import add_launch_args
from .base import get_event_loop
from .cleanup import cleanup_previous
from .compat import PytestConfig
cap.add_argument(
"--project-root", help="directory to stop searching for kinds config at"
)
+
rap = ap.add_argument_group(title="Runtime", description="runtime related options")
+ add_launch_args(rap.add_argument)
+
+ # Move to munet.args?
rap.add_argument(
"-C",
"--cleanup",
action="store_true",
help="Remove the entire rundir (not just node subdirs) prior to running.",
)
- rap.add_argument(
- "--gdb", metavar="NODE-LIST", help="comma-sep list of hosts to run gdb on"
- )
- rap.add_argument(
- "--gdb-breakpoints",
- metavar="BREAKPOINT-LIST",
- help="comma-sep list of breakpoints to set",
- )
- rap.add_argument(
- "--host",
- action="store_true",
- help="no isolation for top namespace, bridges exposed to default namespace",
- )
- rap.add_argument(
- "--pcap",
- metavar="TARGET-LIST",
- help="comma-sep list of capture targets (NETWORK or NODE:IFNAME)",
- )
- rap.add_argument(
- "--shell", metavar="NODE-LIST", help="comma-sep list of nodes to open shells on"
- )
- rap.add_argument(
- "--stderr",
- metavar="NODE-LIST",
- help="comma-sep list of nodes to open windows viewing stderr",
- )
- rap.add_argument(
- "--stdout",
- metavar="NODE-LIST",
- help="comma-sep list of nodes to open windows viewing stdout",
- )
+ # Move to munet.args?
rap.add_argument(
"--topology-only",
action="store_true",
help="Do not run any node commands",
)
- rap.add_argument("--unshare-inline", action="store_true", help=argparse.SUPPRESS)
rap.add_argument(
"--validate-only",
action="store_true",
help="Validate the config against the schema definition",
)
+ rap.add_argument("--unshare-inline", action="store_true", help=argparse.SUPPRESS)
+
rap.add_argument("-v", "--verbose", action="store_true", help="be verbose")
rap.add_argument(
"-V", "--version", action="store_true", help="print the verison number and exit"
)
+
eap = ap.add_argument_group(title="Uncommon", description="uncommonly used options")
eap.add_argument("--log-config", help="logging config file (yaml, toml, json, ...)")
eap.add_argument(
rundir = args.rundir if args.rundir else "/tmp/munet"
args.rundir = rundir
-
if args.cleanup:
if os.path.exists(rundir):
if not os.path.exists(f"{rundir}/config.json"):
sys.exit(1)
else:
subprocess.run(["/usr/bin/rm", "-rf", rundir], check=True)
-
subprocess.run(f"mkdir -p {rundir} && chmod 755 {rundir}", check=True, shell=True)
os.environ["MUNET_RUNDIR"] = rundir
--- /dev/null
+# -*- coding: utf-8 eval: (blacken-mode 1) -*-
+#
+# April 14 2024, Christian Hopps <chopps@labn.net>
+#
+# Copyright (c) 2024, LabN Consulting, L.L.C.
+#
+"""Common CLI execute argument."""
+
+
+def add_launch_args(add_func):
+
+ add_func("--gdb", metavar="NODE-LIST", help="comma-sep list of hosts to run gdb on")
+ add_func(
+ "--gdb-breakpoints",
+ metavar="BREAKPOINT-LIST",
+ help="comma-sep list of breakpoints to set",
+ )
+ add_func(
+ "--gdb-use-emacs",
+ action="store_true",
+ help="Use emacsclient to run gdb instead of a shell",
+ )
+
+ add_func(
+ "--host",
+ action="store_true",
+ help="no isolation for top namespace, bridges exposed to default namespace",
+ )
+ add_func(
+ "--pcap",
+ metavar="TARGET-LIST",
+ help="comma-sep list of capture targets (NETWORK or NODE:IFNAME) or 'all'",
+ )
+ add_func(
+ "--shell", metavar="NODE-LIST", help="comma-sep list of nodes to open shells on"
+ )
+ add_func(
+ "--stderr",
+ metavar="NODE-LIST",
+ help="comma-sep list of nodes to open windows viewing stderr",
+ )
+ add_func(
+ "--stdout",
+ metavar="NODE-LIST",
+ help="comma-sep list of nodes to open windows viewing stdout",
+ )
+
+
+def add_testing_args(add_func):
+ add_func(
+ "--cli-on-error",
+ action="store_true",
+ help="CLI on test failure",
+ )
+
+ add_func(
+ "--coverage",
+ action="store_true",
+ help="Enable coverage gathering if supported",
+ )
+
+ add_func(
+ "--cov-build-dir",
+ help="Specify the build dir for locating coverage data files",
+ )
+
+ add_launch_args(add_func)
+
+ add_func(
+ "--pause",
+ action="store_true",
+ help="Pause after each test",
+ )
+ add_func(
+ "--pause-at-end",
+ action="store_true",
+ help="Pause before taking munet down",
+ )
+ add_func(
+ "--pause-on-error",
+ action="store_true",
+ help="Pause after (disables default when --shell or -vtysh given)",
+ )
+ add_func(
+ "--no-pause-on-error",
+ dest="pause_on_error",
+ action="store_false",
+ help="Do not pause after (disables default when --shell or -vtysh given)",
+ )
import sys
import tempfile
import time as time_mod
+
from collections import defaultdict
from pathlib import Path
from typing import Union
from . import config as munet_config
from . import linux
+
try:
import pexpect
+
from pexpect.fdpexpect import fdspawn
from pexpect.popen_spawn import PopenSpawn
"""
policy = asyncio.get_event_loop_policy()
loop = policy.get_event_loop()
+ if not hasattr(os, "pidfd_open"):
+ return loop
+
owatcher = policy.get_child_watcher()
logging.debug(
"event_loop_fixture: global policy %s, current loop %s, current watcher %s",
import argparse
import json
import os
-import subprocess
import sys
from pathlib import Path
ecmd = "/usr/bin/nsenter"
eargs = [ecmd]
- output = subprocess.check_output(["/usr/bin/nsenter", "--help"], encoding="utf-8")
- if " -a," in output:
- eargs.append("-a")
- else:
- # -U doesn't work
- for flag in ["-u", "-i", "-m", "-n", "-C", "-T"]:
- if f" {flag}," in output:
- eargs.append(flag)
+ #start mucmd same way base process is started
+ eargs.append(f"--mount=/proc/{pid}/ns/mnt")
+ eargs.append(f"--net=/proc/{pid}/ns/net")
eargs.append(f"--pid=/proc/{pid}/ns/pid_for_children")
+ eargs.append(f"--uts=/proc/{pid}/ns/uts")
eargs.append(f"--wd={rundir}")
- eargs.extend(["-t", pid])
eargs += args.shellcmd
- # print("Using ", eargs)
+ #print("Using ", eargs)
return os.execvpe(ecmd, eargs, {**env, **envcfg})
"server-port": {
"type": "number"
},
+ "ssh-identity-file": {
+ "type": "string"
+ },
+ "ssh-user": {
+ "type": "string"
+ },
+ "ssh-password": {
+ "type": "string"
+ },
"qemu": {
"type": "object",
"properties": {
"disk": {
"type": "string"
},
+ "disk-driver": {
+ "type": "string"
+ },
+ "disk-template": {
+ "type": "string"
+ },
+ "initial-cmd": {
+ "type": "string"
+ },
"kerenel": {
"type": "string"
},
"password": {
"type": "string"
},
+ "initial-password": {
+ "type": "string"
+ },
"expects": {
"type": "array",
"items": {
"server-port": {
"type": "number"
},
+ "ssh-identity-file": {
+ "type": "string"
+ },
+ "ssh-user": {
+ "type": "string"
+ },
+ "ssh-password": {
+ "type": "string"
+ },
"qemu": {
"type": "object",
"properties": {
"disk": {
"type": "string"
},
+ "disk-driver": {
+ "type": "string"
+ },
+ "disk-template": {
+ "type": "string"
+ },
+ "initial-cmd": {
+ "type": "string"
+ },
"kerenel": {
"type": "string"
},
"password": {
"type": "string"
},
+ "initial-password": {
+ "type": "string"
+ },
"expects": {
"type": "array",
"items": {
from typing import Union
from munet import parser
+from munet.args import add_testing_args
from munet.base import Bridge
from munet.base import get_event_loop
+from munet.cli import async_cli
+from munet.compat import PytestConfig
from munet.mutest import userapi as uapi
from munet.native import L3NodeMixin
from munet.native import Munet
exec_formatter = logging.Formatter("%(asctime)s %(levelname)5s: %(name)s: %(message)s")
-async def get_unet(config: dict, croot: Path, rundir: Path, unshare: bool = False):
+async def get_unet(
+ config: dict, croot: Path, rundir: Path, args: Namespace, unshare: bool = False
+):
"""Create and run a new Munet topology.
The topology is built from the given ``config`` to run inside the path indicated
value will be modified and stored in the built ``Munet`` object.
croot: common root of all tests, used to search for ``kinds.yaml`` files.
rundir: the path to the run directory for this topology.
+ args: argparse args
unshare: True to unshare the process into it's own private namespace.
Yields:
try:
try:
unet = await async_build_topology(
- config, rundir=str(rundir), unshare_inline=unshare
+ config,
+ rundir=str(rundir),
+ args=args,
+ pytestconfig=PytestConfig(args),
+ unshare_inline=unshare,
)
except Exception as error:
logging.debug("unet build failed: %s", error, exc_info=True)
targets["."] = unet
tc = uapi.TestCase(
- str(test_num), test_name, test, targets, logger, reslog, args.full_summary
+ str(test_num), test_name, test, targets, args, logger, reslog, args.full_summary
)
- passed, failed, e = tc.execute()
+ try:
+ passed, failed, e = tc.execute()
+ except uapi.CLIOnErrorError as error:
+ await async_cli(unet)
+ passed, failed, e = 0, 0, error
run_time = time.time() - tc.info.start_time
start_time = time.time()
try:
for dirpath in tests:
+ if args.validate_only:
+ parser.validate_config(configs[dirpath], reslog, args)
+ continue
+
test_files = tests[dirpath]
for test in test_files:
tnum += 1
root_logger.addHandler(exec_handler)
try:
- async for unet in get_unet(config, common, rundir):
+ async for unet in get_unet(config, common, rundir, args):
+
if not printed_header:
print_header(reslog, unet)
printed_header = True
+
passed, failed, e = await execute_test(
unet, test, args, tnum, exec_handler
)
except KeyboardInterrupt:
pass
+ if args.validate_only:
+ return False
+
run_time = time.time() - start_time
tnum = 0
tpassed = 0
def main():
ap = ArgumentParser()
ap.add_argument(
- "--dist",
- type=int,
- nargs="?",
- const=-1,
- default=0,
- action="store",
- metavar="NUM-THREADS",
- help="Run in parallel, value is num. of threads or no value for auto",
+ "-v", dest="verbose", action="count", default=0, help="More -v's, more verbose"
)
- ap.add_argument("-d", "--rundir", help="runtime directory for tempfiles, logs, etc")
ap.add_argument(
+ "-V", "--version", action="store_true", help="print the verison number and exit"
+ )
+ ap.add_argument("paths", nargs="*", help="Paths to collect tests from")
+
+ rap = ap.add_argument_group(title="Runtime", description="runtime related options")
+ rap.add_argument(
+ "-d", "--rundir", help="runtime directory for tempfiles, logs, etc"
+ )
+ add_testing_args(rap.add_argument)
+
+ eap = ap.add_argument_group(title="Uncommon", description="uncommonly used options")
+ eap.add_argument(
"--file-select", default="mutest_*.py", help="shell glob for finding tests"
)
- ap.add_argument("--log-config", help="logging config file (yaml, toml, json, ...)")
- ap.add_argument(
- "-V",
+ eap.add_argument(
"--full-summary",
action="store_true",
help="print full summary headers from docstrings",
)
- ap.add_argument(
- "-v", dest="verbose", action="count", default=0, help="More -v's, more verbose"
+ eap.add_argument("--log-config", help="logging config file (yaml, toml, json, ...)")
+ eap.add_argument(
+ "--validate-only",
+ action="store_true",
+ help="Validate the munet configs against the schema definition",
)
- ap.add_argument("paths", nargs="*", help="Paths to collect tests from")
+
args = ap.parse_args()
+ if args.version:
+ from importlib import metadata # pylint: disable=C0415
+
+ print(metadata.version("munet"))
+ sys.exit(0)
+
rundir = args.rundir if args.rundir else "/tmp/mutest"
args.rundir = Path(rundir)
os.environ["MUNET_RUNDIR"] = rundir
import logging
import pprint
import re
+import subprocess
+import sys
import time
+from argparse import Namespace
from pathlib import Path
from typing import Any
from typing import Union
from munet.base import Commander
+class ScriptError(Exception):
+ """An unrecoverable script failure."""
+
+
+class CLIOnErrorError(Exception):
+ """Enter CLI after error."""
+
+
+def pause_test(desc=""):
+ isatty = sys.stdout.isatty()
+ if not isatty:
+ desc = f" for {desc}" if desc else ""
+ logging.info("NO PAUSE on non-tty terminal%s", desc)
+ return
+
+ while True:
+ if desc:
+ print(f"\n== PAUSING: {desc} ==")
+ try:
+ user = input('PAUSED, "cli" for CLI, "pdb" to debug, "Enter" to continue: ')
+ except EOFError:
+ print("^D...continuing")
+ break
+ user = user.strip()
+ if user == "cli":
+ raise CLIOnErrorError()
+ if user == "pdb":
+ breakpoint() # pylint: disable=W1515
+ elif user:
+ print(f'Unrecognized input: "{user}"')
+ else:
+ break
+
+
+def act_on_result(success, args, desc=""):
+ if args.pause:
+ pause_test(desc)
+ elif success:
+ return
+ if args.cli_on_error:
+ raise CLIOnErrorError()
+ if args.pause_on_error:
+ pause_test(desc)
+
+
class TestCaseInfo:
"""Object to hold nestable TestCase Results."""
name: str,
path: Path,
targets: dict,
+ args: Namespace,
output_logger: logging.Logger = None,
result_logger: logging.Logger = None,
full_summary: bool = False,
self.__in_section = False
self.targets = targets
+ self.args = args
self.last = ""
self.last_m = None
# Extract any docstring as a title.
if print_header:
- title = locals()[f"_{name}"].__doc__.lstrip()
+ title = locals()[f"_{name}"].__doc__
+ if title is None:
+ title = ""
+ title = title.lstrip()
if self.__short_doc_header and (title := title.lstrip()):
if (idx := title.find("\n")) != -1:
title = title[:idx].strip()
# Here's where we can do async in the future if we want.
# result = await locals()[f"_{name}"](_ok_result)
+ except ScriptError as error:
+ return error
+ except CLIOnErrorError:
+ raise
except Exception as error:
logging.error(
"Unexpected exception executing %s: %s", name, error, exc_info=True
target: the target to execute the command on.
cmd: string to execut on the target.
"""
- out = self.targets[target].cmd_nostatus(cmd, warn=False)
+ out = self.targets[target].cmd_nostatus(
+ cmd, stdin=subprocess.DEVNULL, warn=False
+ )
self.last = out = out.rstrip()
report = out if out else "<no output>"
self.logf("COMMAND OUTPUT:\n%s", report)
target: the target to execute the command on.
cmd: string to execute on the target.
"""
- out = self.targets[target].cmd_nostatus(cmd, warn=False)
+ out = self.targets[target].cmd_nostatus(
+ cmd, stdin=subprocess.DEVNULL, warn=False
+ )
self.last = out = out.rstrip()
try:
js = json.loads(out)
except Exception as error:
- js = {}
+ js = None
self.olog.warning(
"JSON load failed. Check command output is in JSON format: %s",
error,
exact_match: if True then the json must exactly match.
"""
js = self._command_json(target, cmd)
+ if js is None:
+ return expect_fail, {}
+
try:
+ # Convert to string to validate the input is valid JSON
+ if not isinstance(match, str):
+ match = json.dumps(match)
expect = json.loads(match)
except Exception as error:
expect = {}
self.olog.warning(
"JSON load failed. Check match value is in JSON format: %s", error
)
+ return expect_fail, {}
if exact_match:
deep_diff = json_cmp(expect, js)
# Convert DeepDiff completely into dicts or lists at all levels
json_diff = json.loads(deep_diff.to_json())
else:
- deep_diff = json_cmp(expect, js, ignore_order=True)
+ deep_diff = json_cmp(
+ expect, js, ignore_order=True, cutoff_intersection_for_pairs=1
+ )
# Convert DeepDiff completely into dicts or lists at all levels
json_diff = json.loads(deep_diff.to_json())
# Remove new fields in json object from diff
"""
path = Path(pathname)
path = self.info.path.parent.joinpath(path)
+ do_cli = False
self.oplogf(
"include: new path: %s create section: %s currently __in_section: %s",
self.info.path = path
self.oplogf("include: swapped info path: new %s old %s", path, old_path)
- self.__exec_script(path, print_header=new_section, add_newline=new_section)
+ try:
+ e = self.__exec_script(
+ path, print_header=new_section, add_newline=new_section
+ )
+ except CLIOnErrorError:
+ do_cli = True
if new_section:
# Something within the section creating include has also created a section
self.info.path = old_path
self.oplogf("include: restored info path: %s", old_path)
+ if do_cli:
+ raise CLIOnErrorError()
+ if e:
+ raise ScriptError(e)
+
def __end_section(self):
self.oplogf("__end_section: __in_section: %s", self.__in_section)
info = self.__pop_execinfo()
)
if desc:
self.__post_result(target, success, desc)
+ act_on_result(success, self.args, desc)
return success, ret
def test_step(self, expr_or_value: Any, desc: str, target: str = "") -> bool:
"""
success = bool(expr_or_value)
self.__post_result(target, success, desc)
+ act_on_result(success, self.args, desc)
return success
def match_step_json(
)
if desc:
self.__post_result(target, success, desc)
+ act_on_result(success, self.args, desc)
return success, ret
def wait_step(
)
if desc:
self.__post_result(target, success, desc)
+ act_on_result(success, self.args, desc)
return success, ret
def wait_step_json(
)
if desc:
self.__post_result(target, success, desc)
+ act_on_result(success, self.args, desc)
return success, ret
# pylint: disable=protected-access
"""A module that defines objects for standalone use."""
import asyncio
+import base64
import errno
import getpass
+import glob
import ipaddress
import logging
import os
async def async_cleanup_cmd(self):
"""Run the configured cleanup commands for this node."""
+ if self.cleanup_called:
+ return
+ self.cleanup_called = True
+
return await self._async_cleanup_cmd()
def has_ready_cmd(self) -> bool:
outopt = outopt if outopt is not None else ""
if outopt == "all" or self.name in outopt.split(","):
outname = stdout.name if hasattr(stdout, "name") else stdout
- self.run_in_window(f"tail -F {outname}", title=f"O:{self.name}")
+ self.run_in_window(f"tail -n+1 -F {outname}", title=f"O:{self.name}")
if stderr:
erropt = self.unet.cfgopt.getoption("--stderr")
erropt = erropt if erropt is not None else ""
if erropt == "all" or self.name in erropt.split(","):
errname = stderr.name if hasattr(stderr, "name") else stderr
- self.run_in_window(f"tail -F {errname}", title=f"E:{self.name}")
+ self.run_in_window(f"tail -n+1 -F {errname}", title=f"E:{self.name}")
def pytest_hook_open_shell(self):
if not self.unet:
self.logger.info("%s: created", self)
- def has_ready_cmd(self) -> bool:
- return bool(self.config.get("ready-cmd", "").strip())
-
def _get_pre_cmd(self, use_str, use_pty, ns_only=False, **kwargs):
pre_cmd = []
if self.unet:
async def async_cleanup_cmd(self):
"""Run the configured cleanup commands for this node."""
+ if self.cleanup_called:
+ return
self.cleanup_called = True
if "cleanup-cmd" not in self.config:
return
+ # The opposite of other types, the container needs cmd_p running
if not self.cmd_p:
self.logger.warning("async_cleanup_cmd: container no longer running")
return
rundir=os.path.join(self.rundir, self.name),
configdir=self.unet.config_dirname,
)
- self.ssh_keyfile = self.qemu_config.get("sshkey")
+ self.ssh_keyfile = self.config.get("ssh-identity-file")
+ if not self.ssh_keyfile:
+ self.ssh_keyfile = self.qemu_config.get("sshkey")
+
+ self.ssh_user = self.config.get("ssh-user")
+ if not self.ssh_user:
+ self.ssh_user = self.qemu_config.get("sshuser", "root")
+
+ self.disk_created = False
@property
def is_vm(self):
self.__base_cmd_pty = list(self.__base_cmd)
self.__base_cmd_pty.append("-t")
- user = self.qemu_config.get("sshuser", "root")
- self.__base_cmd.append(f"{user}@{mgmt_ip}")
+ self.__base_cmd.append(f"{self.ssh_user}@{mgmt_ip}")
self.__base_cmd.append("--")
- self.__base_cmd_pty.append(f"{user}@{mgmt_ip}")
+ self.__base_cmd_pty.append(f"{self.ssh_user}@{mgmt_ip}")
# self.__base_cmd_pty.append("--")
return True
if args:
self.extra_mounts += args
- async def run_cmd(self):
+ async def _run_cmd(self, cmd_node):
"""Run the configured commands for this node inside VM."""
self.logger.debug(
"[rundir %s exists %s]", self.rundir, os.path.exists(self.rundir)
)
- cmd = self.config.get("cmd", "").strip()
+ cmd = self.config.get(cmd_node, "").strip()
if not cmd:
- self.logger.debug("%s: no `cmd` to run", self)
+ self.logger.debug("%s: no `%s` to run", self, cmd_node)
return None
shell_cmd = self.config.get("shell", "/bin/bash")
cmd += "\n"
# Write a copy to the rundir
- cmdpath = os.path.join(self.rundir, "cmd.shebang")
+ cmdpath = os.path.join(self.rundir, f"{cmd_node}.shebang")
with open(cmdpath, mode="w+", encoding="utf-8") as cmdfile:
cmdfile.write(cmd)
commander.cmd_raises(f"chmod 755 {cmdpath}")
# Now write a copy inside the VM
- self.conrepl.cmd_status("cat > /tmp/cmd.shebang << EOF\n" + cmd + "\nEOF")
- self.conrepl.cmd_status("chmod 755 /tmp/cmd.shebang")
- cmds = "/tmp/cmd.shebang"
+ self.conrepl.cmd_status(
+ f"cat > /tmp/{cmd_node}.shebang << EOF\n" + cmd + "\nEOF"
+ )
+ self.conrepl.cmd_status(f"chmod 755 /tmp/{cmd_node}.shebang")
+ cmds = f"/tmp/{cmd_node}.shebang"
else:
cmd = cmd.replace("%CONFIGDIR%", str(self.unet.config_dirname))
cmd = cmd.replace("%RUNDIR%", str(self.rundir))
# When run_command supports async_ arg we can use the above...
self.cmd_p = now_proc(self.cmdrepl.run_command(cmds, timeout=120))
-
- # stdout and err both combined into logfile from the spawned repl
- stdout = os.path.join(self.rundir, "_cmdcon-log.txt")
- self.pytest_hook_run_cmd(stdout, None)
else:
# If we only have a console we can't run in parallel, so run to completion
self.cmd_p = now_proc(self.conrepl.run_command(cmds, timeout=120))
return self.cmd_p
+ async def run_cmd(self):
+ if self.disk_created:
+ await self._run_cmd("initial-cmd")
+ await self._run_cmd("cmd")
+
+ # stdout and err both combined into logfile from the spawned repl
+ if self.cmdrepl:
+ stdout = os.path.join(self.rundir, "_cmdcon-log.txt")
+ self.pytest_hook_run_cmd(stdout, None)
+
# InterfaceMixin override
# We need a name unique in the shared namespace.
def get_ns_ifname(self, ifname):
async def gather_coverage_data(self):
con = self.conrepl
+ gcda_root = "/sys/kernel/debug/gcov"
+ dest = "/tmp/gcov-data.tgz"
+
+ if gcda_root != "/sys/kernel/debug/gcov":
+ con.cmd_raises(
+ rf"cd {gcda_root} && find * -name '*.gc??' "
+ "| tar -cf - -T - | gzip -c > {dest}"
+ )
+ else:
+ # Some tars dont try and read 0 length files so we need to copy them.
+ tmpdir = con.cmd_raises("mktemp -d").strip()
+ con.cmd_raises(
+ rf"cd {gcda_root} && find -type d -exec mkdir -p {tmpdir}/{{}} \;"
+ )
+ con.cmd_raises(
+ rf"cd {gcda_root} && "
+ rf"find -name '*.gcda' -exec sh -c 'cat < $0 > {tmpdir}/$0' {{}} \;"
+ )
+ con.cmd_raises(
+ rf"cd {gcda_root} && "
+ rf"find -name '*.gcno' -exec sh -c 'cp -d $0 {tmpdir}/$0' {{}} \;"
+ )
+ con.cmd_raises(
+ rf"cd {tmpdir} && "
+ rf"find * -name '*.gc??' | tar -cf - -T - | gzip -c > {dest}"
+ )
+ con.cmd_raises(rf"rm -rf {tmpdir}")
- gcda = "/sys/kernel/debug/gcov"
- tmpdir = con.cmd_raises("mktemp -d").strip()
- dest = "/gcov-data.tgz"
- con.cmd_raises(rf"find {gcda} -type d -exec mkdir -p {tmpdir}/{{}} \;")
- con.cmd_raises(
- rf"find {gcda} -name '*.gcda' -exec sh -c 'cat < $0 > {tmpdir}/$0' {{}} \;"
- )
- con.cmd_raises(
- rf"find {gcda} -name '*.gcno' -exec sh -c 'cp -d $0 {tmpdir}/$0' {{}} \;"
- )
- con.cmd_raises(rf"tar cf - -C {tmpdir} sys | gzip -c > {dest}")
- con.cmd_raises(rf"rm -rf {tmpdir}")
self.logger.info("Saved coverage data in VM at %s", dest)
+ ldest = os.path.join(self.rundir, "gcov-data.tgz")
if self.use_ssh:
- ldest = os.path.join(self.rundir, "gcov-data.tgz")
self.cmd_raises(["/bin/cat", dest], stdout=open(ldest, "wb"))
self.logger.info("Saved coverage data on host at %s", ldest)
+ else:
+ output = con.cmd_raises(rf"base64 {dest}")
+ with open(ldest, "wb") as f:
+ f.write(base64.b64decode(output))
+ self.logger.info("Saved coverage data on host at %s", ldest)
async def _opencons(
self,
expects=expects,
sends=sends,
timeout=timeout,
+ init_newline=True,
trace=True,
)
)
if not nnics:
args += ["-nic", "none"]
- dtpl = qc.get("disk-template")
+ dtplpath = dtpl = qc.get("disk-template")
diskpath = disk = qc.get("disk")
- if dtpl and not disk:
- disk = qc["disk"] = f"{self.name}-{os.path.basename(dtpl)}"
- diskpath = os.path.join(self.rundir, disk)
+ if diskpath:
+ if diskpath[0] != "/":
+ diskpath = os.path.join(self.unet.config_dirname, diskpath)
+
+ if dtpl and (not disk or not os.path.exists(diskpath)):
+ if not disk:
+ disk = qc["disk"] = f"{self.name}-{os.path.basename(dtpl)}"
+ diskpath = os.path.join(self.rundir, disk)
if self.path_exists(diskpath):
logging.debug("Disk '%s' file exists, using.", diskpath)
else:
- dtplpath = os.path.abspath(
- os.path.join(
- os.path.dirname(self.unet.config["config_pathname"]), dtpl
- )
- )
+ if dtplpath[0] != "/":
+ dtplpath = os.path.join(self.unet.config_dirname, dtpl)
logging.info("Create disk '%s' from template '%s'", diskpath, dtplpath)
self.cmd_raises(
f"qemu-img create -f qcow2 -F qcow2 -b {dtplpath} {diskpath}"
)
+ self.disk_created = True
+ disk_driver = qc.get("disk-driver", "virtio")
if diskpath:
- args.extend(
- ["-drive", f"file={diskpath},if=none,id=sata-disk0,format=qcow2"]
- )
- args.extend(["-device", "ahci,id=ahci"])
- args.extend(["-device", "ide-hd,bus=ahci.0,drive=sata-disk0"])
+ if disk_driver == "virtio":
+ args.extend(["-drive", f"file={diskpath},if=virtio,format=qcow2"])
+ else:
+ args.extend(
+ ["-drive", f"file={diskpath},if=none,id=sata-disk0,format=qcow2"]
+ )
+ args.extend(["-device", "ahci,id=ahci"])
+ args.extend(["-device", "ide-hd,bus=ahci.0,drive=sata-disk0"])
+
+ cidiskpath = qc.get("cloud-init-disk")
+ if cidiskpath:
+ if cidiskpath[0] != "/":
+ cidiskpath = os.path.join(self.unet.config_dirname, cidiskpath)
+ args.extend(["-drive", f"file={cidiskpath},if=virtio,format=qcow2"])
+
+ # args.extend(["-display", "vnc=0.0.0.0:40"])
use_stdio = cc.get("stdio", True)
has_cmd = self.config.get("cmd")
if use_cmdcon:
confiles.append("_cmdcon")
+ password = cc.get("password", "")
+ if self.disk_created:
+ password = cc.get("initial-password", password)
+
#
# Connect to the console socket, retrying
#
prompt=prompt,
is_bourne=not bool(prompt),
user=cc.get("user", "root"),
- password=cc.get("password", ""),
+ password=password,
expects=cc.get("expects"),
sends=cc.get("sends"),
timeout=int(cc.get("timeout", 60)),
async def async_cleanup_cmd(self):
"""Run the configured cleanup commands for this node."""
+ if self.cleanup_called:
+ return
self.cleanup_called = True
if "cleanup-cmd" not in self.config:
mtu = kwargs.get("mtu", config.get("mtu"))
return super().add_switch(name, cls=cls, config=config, mtu=mtu, **kwargs)
+ def coverage_setup(self):
+ bdir = self.cfgopt.getoption("--cov-build-dir")
+ if not bdir:
+ # Try and find the build dir using common prefix of gcno files
+ common = None
+ cwd = os.getcwd()
+ for f in glob.iglob(rf"{cwd}/**/*.gcno", recursive=True):
+ if not common:
+ common = os.path.dirname(f)
+ else:
+ common = os.path.commonprefix([common, f])
+ if not common:
+ break
+ assert (
+ bdir
+ ), "Can't locate build directory for coverage data, use --cov-build-dir"
+
+ bdir = Path(bdir).resolve()
+ rundir = Path(self.rundir).resolve()
+ gcdadir = rundir / "gcda"
+ os.environ["GCOV_BUILD_DIR"] = str(bdir)
+ os.environ["GCOV_PREFIX_STRIP"] = str(len(bdir.parts) - 1)
+ os.environ["GCOV_PREFIX"] = str(gcdadir)
+
+ # commander.cmd_raises(f"find {bdir} -name '*.gc??' -exec chmod o+rw {{}} +")
+ group_id = bdir.stat().st_gid
+ commander.cmd_raises(f"mkdir -p {gcdadir}")
+ commander.cmd_raises(f"chown -R root:{group_id} {gcdadir}")
+ commander.cmd_raises(f"chmod 2775 {gcdadir}")
+
+ async def coverage_finish(self):
+ rundir = Path(self.rundir).resolve()
+ bdir = Path(os.environ["GCOV_BUILD_DIR"])
+ gcdadir = Path(os.environ["GCOV_PREFIX"])
+
+ # Create GCNO symlinks
+ self.logger.info("Creating .gcno symlinks from '%s' to '%s'", gcdadir, bdir)
+ commander.cmd_raises(
+ f'cd "{gcdadir}"; bdir="{bdir}"'
+ + """
+for f in $(find . -name '*.gcda'); do
+ f=${f#./};
+ f=${f%.gcda}.gcno;
+ ln -fs $bdir/$f $f;
+ touch -h -r $bdir/$f $f;
+ echo $f;
+done"""
+ )
+
+ # Get the results into a summary file
+ data_file = rundir / "coverage.info"
+ self.logger.info("Gathering coverage data into: %s", data_file)
+ commander.cmd_raises(
+ f"lcov --directory {gcdadir} --capture --output-file {data_file}"
+ )
+
+ # Get coverage info filtered to a specific set of files
+ report_file = rundir / "coverage.info"
+ self.logger.debug("Generating coverage summary: %s", report_file)
+ output = commander.cmd_raises(f"lcov --summary {data_file}")
+ self.logger.info("\nCOVERAGE-SUMMARY-START\n%s\nCOVERAGE-SUMMARY-END", output)
+ # terminalreporter.write(
+ # f"\nCOVERAGE-SUMMARY-START\n{output}\nCOVERAGE-SUMMARY-END\n"
+ # )
+
async def run(self):
tasks = []
hosts = self.hosts.values()
launch_nodes = [x for x in hosts if hasattr(x, "launch")]
launch_nodes = [x for x in launch_nodes if x.config.get("qemu")]
- run_nodes = [x for x in hosts if hasattr(x, "has_run_cmd") and x.has_run_cmd()]
- ready_nodes = [
- x for x in hosts if hasattr(x, "has_ready_cmd") and x.has_ready_cmd()
- ]
+ run_nodes = [x for x in hosts if x.has_run_cmd()]
+ ready_nodes = [x for x in hosts if x.has_ready_cmd()]
+
+ if self.cfgopt.getoption("--coverage"):
+ self.coverage_setup()
pcapopt = self.cfgopt.getoption("--pcap")
pcapopt = pcapopt if pcapopt else ""
self.logger.debug("%s: deleting.", self)
- if self.cfgopt.getoption("--coverage"):
- nodes = (
- x for x in self.hosts.values() if hasattr(x, "gather_coverage_data")
- )
- try:
- await asyncio.gather(*(x.gather_coverage_data() for x in nodes))
- except Exception as error:
- logging.warning("Error gathering coverage data: %s", error)
-
pause = bool(self.cfgopt.getoption("--pause-at-end"))
pause = pause or bool(self.cfgopt.getoption("--pause"))
if pause:
except Exception as error:
self.logger.error("\n...continuing after error: %s", error)
+ # Run cleanup-cmd's.
+ nodes = (x for x in self.hosts.values() if x.has_cleanup_cmd())
+ try:
+ await asyncio.gather(*(x.async_cleanup_cmd() for x in nodes))
+ except Exception as error:
+ logging.warning("Error running cleanup cmds: %s", error)
+
+ # Gather any coverage data
+ if self.cfgopt.getoption("--coverage"):
+ nodes = (
+ x for x in self.hosts.values() if hasattr(x, "gather_coverage_data")
+ )
+ try:
+ await asyncio.gather(*(x.gather_coverage_data() for x in nodes))
+ except Exception as error:
+ logging.warning("Error gathering coverage data: %s", error)
+
+ await self.coverage_finish()
+
# XXX should we cancel launch and run tasks?
try:
if args:
os.chdir(args.rundir)
- args_config = args.kinds_config if args else None
+ args_config = args.kinds_config if args and hasattr(args, "kinds_config") else None
try:
if search is None:
search = [cwd]
# create search directories from common root if given
cpath = Path(config["config_pathname"]).absolute()
- project_root = args.project_root if args else None
+ project_root = args.project_root if args and hasattr(args, "project_root") else None
if not search_root:
search_root = find_project_root(cpath, project_root)
if not search_root:
pytestconfig=pytestconfig,
isolated=isolated,
pid=top_level_pidns,
- unshare_inline=args.unshare_inline if args else unshare_inline,
+ unshare_inline=(
+ args.unshare_inline
+ if args and hasattr(args, "unshare_inline")
+ else unshare_inline
+ ),
logger=logger,
)
@pytest.fixture(autouse=True, scope="module")
def module_autouse(request):
- logpath = get_test_logdir(request.node.name, True)
+ logpath = get_test_logdir(request.node.nodeid, True)
logpath = os.path.join("/tmp/unet-test", logpath, "pytest-exec.log")
with log_handler("module", logpath):
sdir = os.path.dirname(os.path.realpath(request.fspath))
raise Exception("Base Munet was not cleaned up/deleted")
-@pytest.fixture(scope="module")
+@pytest.fixture(scope="session")
def event_loop():
"""Create an instance of the default event loop for the session."""
loop = get_event_loop()
param,
exc_info=True,
)
- pytest.skip(
- f"unet fixture: unet build failed: {error}", allow_module_level=True
- )
- raise
+ pytest.fail(f"unet fixture: unet build failed: {error}")
try:
tasks = await _unet.run()
except Exception as error:
logging.debug("unet fixture: unet run failed: %s", error, exc_info=True)
await _unet.async_delete()
- pytest.skip(f"unet fixture: unet run failed: {error}", allow_module_level=True)
- raise
+ pytest.fail(f"unet fixture: unet run failed: {error}")
logging.debug("unet fixture: containers running")
import pytest
+from ..args import add_testing_args
from ..base import BaseMunet # pylint: disable=import-error
from ..cli import cli # pylint: disable=import-error
from .util import pause_test
def pytest_addoption(parser):
- parser.addoption(
- "--cli-on-error",
- action="store_true",
- help="CLI on test failure",
- )
-
- parser.addoption(
- "--coverage",
- action="store_true",
- help="Enable coverage gathering if supported",
- )
-
- parser.addoption(
- "--gdb",
- default="",
- metavar="HOST[,HOST...]",
- help="Comma-separated list of nodes to launch gdb on, or 'all'",
- )
- parser.addoption(
- "--gdb-breakpoints",
- default="",
- metavar="BREAKPOINT[,BREAKPOINT...]",
- help="Comma-separated list of breakpoints",
- )
- parser.addoption(
- "--gdb-use-emacs",
- action="store_true",
- help="Use emacsclient to run gdb instead of a shell",
- )
-
- parser.addoption(
- "--pcap",
- default="",
- metavar="NET[,NET...]",
- help="Comma-separated list of networks to capture packets on, or 'all'",
- )
-
- parser.addoption(
- "--pause",
- action="store_true",
- help="Pause after each test",
- )
- parser.addoption(
- "--pause-at-end",
- action="store_true",
- help="Pause before taking munet down",
- )
- parser.addoption(
- "--pause-on-error",
- action="store_true",
- help="Pause after (disables default when --shell or -vtysh given)",
- )
- parser.addoption(
- "--no-pause-on-error",
- dest="pause_on_error",
- action="store_false",
- help="Do not pause after (disables default when --shell or -vtysh given)",
- )
-
- parser.addoption(
- "--shell",
- default="",
- metavar="NODE[,NODE...]",
- help="Comma-separated list of nodes to spawn shell on, or 'all'",
- )
-
- parser.addoption(
- "--stdout",
- default="",
- metavar="NODE[,NODE...]",
- help="Comma-separated list of nodes to open tail-f stdout window on, or 'all'",
- )
-
- parser.addoption(
- "--stderr",
- default="",
- metavar="NODE[,NODE...]",
- help="Comma-separated list of nodes to open tail-f stderr window on, or 'all'",
- )
+ add_testing_args(parser.addoption)
def pytest_configure(config):
elif b and not is_xdist and not have_windows:
pytest.exit(f"{winopt} use requires byobu/TMUX/SCREEN/XTerm")
+ cli_pause = (
+ config.getoption("--cli-on-error")
+ or config.getoption("--pause")
+ or config.getoption("--pause-at-end")
+ or config.getoption("--pause-on-error")
+ )
+ if config.getoption("--capture") == "fd" and cli_pause:
+ pytest.exit(
+ "CLI is not compatible with `--capture=fd`, "
+ "please run again with `-s` or other `--capture` value"
+ )
+
def pytest_runtest_makereport(item, call):
"""Pause or invoke CLI as directed by config."""