--- a/src/client.py Thu Dec 26 13:21:35 2013 -0800
+++ b/src/client.py Mon Dec 09 11:11:03 2013 +0530
@@ -2145,6 +2145,7 @@
# update global concurrency setting
global_settings.client_concurrency = opts_new["concurrency"]
+ global_settings.client_concurrency_set = True
# remove concurrency from parameters dict
del opts_new["concurrency"]
@@ -2485,6 +2486,19 @@
def remote(op, api_inst, pargs, ctlfd):
"""Execute commands from a remote pipe"""
+ #
+ # this is kinda a gross hack. SocketServer.py uses select.select()
+ # which doesn't support file descriptors larger than FD_SETSIZE.
+ # Since ctlfd may have been allocated in a parent process with many
+ # file descriptors, it may be larger than FD_SETSIZE. Here in the
+ # child, though, the majority of those have been closed, so os.dup()
+ # should return a lower-numbered descriptor which will work with
+ # select.select().
+ #
+ ctlfd_new = os.dup(ctlfd)
+ os.close(ctlfd)
+ ctlfd = ctlfd_new
+
rpc_server = pipeutils.PipedRPCServer(ctlfd)
rpc_server.register_introspection_functions()
rpc_server.register_instance(RemoteDispatch())
--- a/src/modules/client/__init__.py Thu Dec 26 13:21:35 2013 -0800
+++ b/src/modules/client/__init__.py Mon Dec 09 11:11:03 2013 +0530
@@ -21,7 +21,7 @@
#
#
-# Copyright (c) 2007, 2012, Oracle and/or its affiliates. All rights reserved.
+# Copyright (c) 2007, 2013, Oracle and/or its affiliates. All rights reserved.
#
# Missing docstring; pylint: disable-msg=C0111
@@ -80,12 +80,15 @@
self.client_output_progfd = None
# concurrency value used for linked image recursion
+ self.client_concurrency_set = False
self.client_concurrency_default = 1
self.client_concurrency = self.client_concurrency_default
try:
self.client_concurrency = int(os.environ.get(
"PKG_CONCURRENCY",
self.client_concurrency_default))
+ if "PKG_CONCURRENCY" in os.environ:
+ self.client_concurrency_set = True
# remove PKG_CONCURRENCY from the environment so child
# processes don't inherit it.
os.environ.pop("PKG_CONCURRENCY", None)
--- a/src/modules/client/linkedimage/common.py Thu Dec 26 13:21:35 2013 -0800
+++ b/src/modules/client/linkedimage/common.py Mon Dec 09 11:11:03 2013 +0530
@@ -21,7 +21,7 @@
#
#
-# Copyright (c) 2011, 2012, Oracle and/or its affiliates. All rights reserved.
+# Copyright (c) 2011, 2013, Oracle and/or its affiliates. All rights reserved.
#
"""
@@ -1896,8 +1896,14 @@
if _pkg_op in [ pkgdefs.PKG_OP_AUDIT_LINKED,
pkgdefs.PKG_OP_PUBCHECK ]:
- # these operations are cheap, use full parallelism
- concurrency = -1
+ # these operations are cheap so ideally we'd like to
+ # use full parallelism. but if the user specified a
+ # concurrency limit we should respect that.
+ if global_settings.client_concurrency_set:
+ concurrency = global_settings.client_concurrency
+ else:
+ # no limit was specified, use full concurrency
+ concurrency = -1
else:
concurrency = global_settings.client_concurrency
@@ -2010,8 +2016,19 @@
_progtrack.li_recurse_status(lin_running,
done)
- rlistrv = select.select(lic_running, [], [])[0]
- for lic in rlistrv:
+ # poll on all the linked image children and see which
+ # ones have pending output.
+ fd_hash = dict([
+ (lic.fileno(), lic)
+ for lic in lic_running
+ ])
+ p = select.poll()
+ for fd in fd_hash.keys():
+ p.register(fd, select.POLLIN)
+ events = p.poll()
+ lic_list = [ fd_hash[event[0]] for event in events ]
+
+ for lic in lic_list:
_progtrack.li_recurse_progress(lic.child_name)
if not lic.child_op_is_done():
continue
--- a/src/modules/client/pkgremote.py Thu Dec 26 13:21:35 2013 -0800
+++ b/src/modules/client/pkgremote.py Mon Dec 09 11:11:03 2013 +0530
@@ -21,7 +21,7 @@
#
#
-# Copyright (c) 2012, Oracle and/or its affiliates. All rights reserved.
+# Copyright (c) 2012, 2013, Oracle and/or its affiliates. All rights reserved.
#
"""
@@ -209,7 +209,9 @@
"""Drain the client progress pipe."""
progfd = self.__rpc_client_prog_pipe_fobj.fileno()
- while select.select([progfd], [], [], 0)[0]:
+ p = select.poll()
+ p.register(progfd, select.POLLIN)
+ while p.poll(0):
os.read(progfd, 10240)
def __state_verify(self, state=None):
--- a/src/tests/cli/t_pkg_linked.py Thu Dec 26 13:21:35 2013 -0800
+++ b/src/tests/cli/t_pkg_linked.py Mon Dec 09 11:11:03 2013 +0530
@@ -21,7 +21,7 @@
#
#
-# Copyright (c) 2011, 2012, Oracle and/or its affiliates. All rights reserved.
+# Copyright (c) 2011, 2013, Oracle and/or its affiliates. All rights reserved.
#
import testutils
@@ -1548,5 +1548,124 @@
self._pkg([0], "update --stage=prepare")
self._pkg([0], "update --stage=execute")
+
+class TestPkgLinkedScale(pkg5unittest.ManyDepotTestCase):
+ """Test the scalability of the linked image subsystem."""
+
+ max_image_count = 256
+
+ p_sync1 = []
+ p_vers = [
+ "@1.2,5.11-145:19700101T000001Z",
+ "@1.2,5.11-145:19700101T000000Z", # old time
+ "@1.1,5.11-145:19700101T000000Z", # old ver
+ "@1.1,5.11-144:19700101T000000Z", # old build
+ "@1.0,5.11-144:19700101T000000Z", # oldest
+ ]
+ p_files = [
+ "tmp/bar",
+ "tmp/baz",
+ ]
+
+ # generate packages that do need to be synced
+ p_sunc1_name_gen = "sync1"
+ pkgs = [p_sunc1_name_gen + ver for ver in p_vers]
+ p_sync1_name = dict(zip(range(len(pkgs)), pkgs))
+ for i in p_sync1_name:
+ p_data = "open %s\n" % p_sync1_name[i]
+ p_data += "add depend type=parent fmri=%s" % \
+ pkg.actions.depend.DEPEND_SELF
+ p_data += """
+ close\n"""
+ p_sync1.append(p_data)
+
+ def setUp(self):
+ pkg5unittest.ManyDepotTestCase.setUp(self, ["test"],
+ image_count=self.max_image_count)
+
+ # create files that go in packages
+ self.make_misc_files(self.p_files)
+
+ # get repo url
+ self.rurl1 = self.dcs[1].get_repo_url()
+
+ # populate repository
+ self.pkgsend_bulk(self.rurl1, self.p_sync1)
+
+
+ def __req_phys_mem(self, phys_mem_req):
+ """Verify that the current machine has a minimal amount of
+ physical memory (in GB). If it doesn't raise
+ TestSkippedException."""
+
+ psize = os.sysconf(os.sysconf_names["SC_PAGESIZE"])
+ ppages = os.sysconf(os.sysconf_names["SC_PHYS_PAGES"])
+ phys_mem = psize * ppages / 1024.0 / 1024.0 / 1024.0
+
+ if phys_mem < phys_mem_req:
+ raise pkg5unittest.TestSkippedException(
+ "Not enough memory, "\
+ "%d GB required, %d GB detected.\n" %
+ (phys_mem_req, phys_mem))
+
+ def pkg(self, *args, **kwargs):
+ """This is a wrapper function to disable coverage for all
+ tests in this class since these are essentially stress tests.
+ we don't need the coverage data (since other functional tests
+ should have already covered these code paths) and we don't
+ want the added overhead of gathering coverage data (since we
+ want to use all available resource for actually running the
+ tests)."""
+
+ kwargs["coverage"] = False
+ return pkg5unittest.ManyDepotTestCase.pkg(self, *args,
+ **kwargs);
+
+ def test_li_scale(self):
+ """Verify that we can operate on a large number of linked
+ images in parallel.
+
+ For parallel linked image operations, 256 images is high
+ enough to cause file descriptor allocation to exceed
+ FD_SETSIZE, which in turn can cause select.select() to fail if
+ it's invoked. In practice that's the only failure mode we've
+ ever seen when people have tried to update a large number of
+ zones in parallel.
+
+ The maximum value successfully tested here has been 512. I
+ tried 1024 but it resulted in death by swapping on a u27 with
+ 12 GB of memory."""
+
+ # we will require at least 11 GB of memory to run this test.
+ # This is a rough estimate of required memory based on
+ # observing this test running on s12_20 on an x86 machine. on
+ # that machine i observed the peak RSS for pkg child process
+ # was about 24 MB. with 256 child processes this comes out to
+ # about 6 GB of memory. we require 11 GB so that the machine
+ # doesn't get bogged down and other things can continue to
+ # run.
+ self.__req_phys_mem(11)
+
+ limit = self.max_image_count
+
+ # create an image with a synced package
+ self.set_image(0)
+ self.image_create(repourl=self.rurl1)
+ self.pkg("install -v %s" % self.p_sync1_name[1])
+
+ # create copies of the image.
+ for i in range(1, self.max_image_count):
+ self.image_clone(i)
+
+ # attach the copies as children of the original image
+ for i in range(1, self.max_image_count):
+ name = "system:img%d" % i
+ cmd = "attach-linked --linked-md-only -c %s %s" % (
+ name, self.img_path(i))
+ self.pkg(cmd)
+
+ # update the parent image and all child images in parallel
+ self.pkg("update -C0 -q")
+
if __name__ == "__main__":
unittest.main()
--- a/src/tests/pkg5unittest.py Thu Dec 26 13:21:35 2013 -0800
+++ b/src/tests/pkg5unittest.py Mon Dec 09 11:11:03 2013 +0530
@@ -2345,6 +2345,25 @@
self.__setup_signing_files()
return retcode
+ def image_clone(self, dst):
+
+ # the currently selected image is the source
+ src = self.img_index()
+ src_path = self.img_path()
+
+ # create an empty destination image
+ self.set_image(dst)
+ self.image_destroy()
+ os.mkdir(self.img_path())
+ dst_path = self.img_path()
+
+ # reactivate the source image
+ self.set_image(src)
+
+ # populate the destination image
+ cmdline = "cd %s; find . | cpio -pdm %s" % (src_path, dst_path)
+ retcode = self.cmdline_run(cmdline, coverage=False)
+
def image_destroy(self):
if os.path.exists(self.img_path()):
self.debug("image_destroy %s" % self.img_path())
@@ -2358,7 +2377,7 @@
def pkg(self, command, exit=0, comment="", prefix="", su_wrap=None,
out=False, stderr=False, cmd_path=None, use_img_root=True,
- debug_smf=True, env_arg=None):
+ debug_smf=True, env_arg=None, coverage=True):
if debug_smf and "smf_cmds_dir" not in command:
command = "--debug smf_cmds_dir=%s %s" % \
(DebugValues["smf_cmds_dir"], command)
@@ -2371,7 +2390,7 @@
cmdline = "%s %s" % (cmd_path, command)
return self.cmdline_run(cmdline, exit=exit, comment=comment,
prefix=prefix, su_wrap=su_wrap, out=out, stderr=stderr,
- env_arg=env_arg)
+ env_arg=env_arg, coverage=coverage)
def pkgdepend_resolve(self, args, exit=0, comment="", su_wrap=False):
ops = ""