if k not in new:
new[k] = config[k]
return new
+
+
+def cli_opt_list(option_list):
+ if not option_list:
+ return []
+ if isinstance(option_list, str):
+ return [x for x in option_list.split(",") if x]
+ return [x for x in option_list if x]
+
+
+def name_in_cli_opt_str(name, option_list):
+ ol = cli_opt_list(option_list)
+ return name in ol or "all" in ol
+
+
+class ConfigOptionsProxy:
+ """Proxy options object to fill in for any missing pytest config."""
+
+ class DefNoneObject:
+ """An object that returns None for any attribute access."""
+
+ def __getattr__(self, attr):
+ return None
+
+ def __init__(self, pytestconfig=None):
+ if isinstance(pytestconfig, ConfigOptionsProxy):
+ self.config = pytestconfig.config
+ self.option = self.config.option
+ else:
+ self.config = pytestconfig
+ if self.config:
+ self.option = self.config.option
+ else:
+ self.option = ConfigOptionsProxy.DefNoneObject()
+
+ def getoption(self, opt, defval=None):
+ if not self.config:
+ return defval
+
+ try:
+ return self.config.getoption(opt, default=defval)
+ except ValueError:
+ return defval
+
+ def get_option(self, opt, defval=None):
+ return self.getoption(opt, defval)
+
+ def get_option_list(self, opt):
+ value = self.get_option(opt, "")
+ return cli_opt_list(value)
+
+ def name_in_option_list(self, name, opt):
+ optlist = self.get_option_list(opt)
+ return "all" in optlist or name in optlist
stdout: file-like object with a ``name`` attribute, or a path to a file.
stderr: file-like object with a ``name`` attribute, or a path to a file.
"""
- if not self.unet or not self.unet.pytest_config:
+ if not self.unet:
return
- outopt = self.unet.pytest_config.getoption("--stdout")
+ outopt = self.unet.cfgopt.getoption("--stdout")
outopt = outopt if outopt is not None else ""
if outopt == "all" or self.name in outopt.split(","):
outname = stdout.name if hasattr(stdout, "name") else stdout
self.run_in_window(f"tail -F {outname}", title=f"O:{self.name}")
if stderr:
- erropt = self.unet.pytest_config.getoption("--stderr")
+ erropt = self.unet.cfgopt.getoption("--stderr")
erropt = erropt if erropt is not None else ""
if erropt == "all" or self.name in erropt.split(","):
errname = stderr.name if hasattr(stderr, "name") else stderr
self.run_in_window(f"tail -F {errname}", title=f"E:{self.name}")
def pytest_hook_open_shell(self):
- if not self.unet or not self.unet.pytest_config:
+ if not self.unet:
return
gdbcmd = self.config.get("gdb-cmd")
- shellopt = self.unet.pytest_config.getoption("--gdb", "")
+ shellopt = self.unet.cfgopt.getoption("--gdb", "")
should_gdb = gdbcmd and (shellopt == "all" or self.name in shellopt.split(","))
- use_emacs = self.unet.pytest_config.getoption("--gdb-use-emacs", False)
+ use_emacs = self.unet.cfgopt.getoption("--gdb-use-emacs", False)
if should_gdb and not use_emacs:
cmds = self.config.get("gdb-target-cmds", [])
for cmd in cmds:
gdbcmd += f" '-ex={cmd}'"
- bps = self.unet.pytest_config.getoption("--gdb-breakpoints", "").split(",")
+ bps = self.unet.cfgopt.getoption("--gdb-breakpoints", "").split(",")
for bp in bps:
gdbcmd += f" '-ex=b {bp}'"
]
)
- bps = self.unet.pytest_config.getoption("--gdb-breakpoints", "").split(",")
+ bps = self.unet.cfgopt.getoption("--gdb-breakpoints", "").split(",")
for bp in bps:
cmd = f"br {bp}"
self.cmd_raises(
)
gdbcmd += f" '-ex={cmd}'"
- shellopt = self.unet.pytest_config.getoption("--shell")
- shellopt = shellopt if shellopt is not None else ""
+ shellopt = self.unet.cfgopt.getoption("--shell")
+ shellopt = shellopt if shellopt else ""
if shellopt == "all" or self.name in shellopt.split(","):
self.run_in_window("bash")
con.cmd_raises(f"ip -6 route add default via {switch.ip6_address}")
con.cmd_raises("ip link set lo up")
- if self.unet.pytest_config and self.unet.pytest_config.getoption("--coverage"):
+ if self.unet.cfgopt.getoption("--coverage"):
con.cmd_raises("mount -t debugfs none /sys/kernel/debug")
async def gather_coverage_data(self):
self,
rundir=None,
config=None,
- pytestconfig=None,
pid=True,
logger=None,
**kwargs,
self.config_pathname = ""
self.config_dirname = ""
- self.pytest_config = pytestconfig
-
# Done in BaseMunet now
# # We need some way to actually get back to the root namespace
# if not self.isolated:
# # Let's hide podman details
# self.tmpfs_mount("/var/lib/containers/storage/overlay-containers")
- shellopt = (
- self.pytest_config.getoption("--shell") if self.pytest_config else None
- )
- shellopt = shellopt if shellopt is not None else ""
+ shellopt = self.cfgopt.getoption("--shell")
+ shellopt = shellopt if shellopt else ""
if shellopt == "all" or "." in shellopt.split(","):
self.run_in_window("bash")
x for x in hosts if hasattr(x, "has_ready_cmd") and x.has_ready_cmd()
]
- if not self.pytest_config:
- pcapopt = ""
- else:
- pcapopt = self.pytest_config.getoption("--pcap")
- pcapopt = pcapopt if pcapopt else ""
+ pcapopt = self.cfgopt.getoption("--pcap")
+ pcapopt = pcapopt if pcapopt else ""
if pcapopt == "all":
pcapopt = self.switches.keys()
if pcapopt:
self.logger.debug("%s: deleting.", self)
- if self.pytest_config and self.pytest_config.getoption("--coverage"):
+ if self.cfgopt.getoption("--coverage"):
nodes = (
x for x in self.hosts.values() if hasattr(x, "gather_coverage_data")
)
except Exception as error:
logging.warning("Error gathering coverage data: %s", error)
- if not self.pytest_config:
- pause = False
- else:
- pause = bool(self.pytest_config.getoption("--pause-at-end"))
- pause = pause or bool(self.pytest_config.getoption("--pause"))
+ pause = bool(self.cfgopt.getoption("--pause-at-end"))
+ pause = pause or bool(self.cfgopt.getoption("--pause"))
if pause:
try:
await async_pause_test("Before MUNET delete")