7140167 pkgmerge(1) only merges default publisher's packages s12b07
authorTim Foster <tim.s.foster@oracle.com>
Mon, 15 Oct 2012 17:22:18 +0100
changeset 2805 f2f570fb5d9f
parent 2802 7ef194c53b05
child 2808 05c6015a8c62
7140167 pkgmerge(1) only merges default publisher's packages
src/tests/cli/t_pkgmerge.py
src/util/publish/pkgmerge.py
--- a/src/tests/cli/t_pkgmerge.py	Fri Oct 12 13:47:55 2012 -0700
+++ b/src/tests/cli/t_pkgmerge.py	Mon Oct 15 17:22:18 2012 +0100
@@ -21,7 +21,7 @@
 #
 
 #
-# Copyright (c) 2011, Oracle and/or its affiliates. All rights reserved.
+# Copyright (c) 2011, 2012, Oracle and/or its affiliates. All rights reserved.
 #
 
 import testutils
@@ -48,7 +48,7 @@
         persistent_setup = True
 
         scheme10 = """
-            open pkg:/[email protected],5.11-0
+            open [email protected],5.11-0
             add file tmp/sparc-only mode=0444 owner=root group=bin path=/etc/tree
             close
         """
@@ -201,9 +201,7 @@
             "tmp/sparc3", "tmp/sparc4", "tmp/i3861", "tmp/i3862", "tmp/i3863"]
 
         def setUp(self):
-                pkg5unittest.ManyDepotTestCase.setUp(self, ["os.org", "os.org",
-                    "os.org", "os.org", "os.org", "os.org", "os.org", "os.org",
-                    "os.org", "os.org", "os.org", "os.org", "os.org"])
+                pkg5unittest.ManyDepotTestCase.setUp(self, 16 * ["os.org"])
                 self.make_misc_files(self.misc_files)
 
                 self.rurl1 = self.dcs[1].get_repo_url()
@@ -223,6 +221,11 @@
                 self.rurl11 = self.dcs[11].get_repo_url()
                 self.rurl12 = self.dcs[12].get_repo_url()
                 self.rurl13 = self.dcs[13].get_repo_url()
+
+                # repositories which will contain several publishers
+                self.rurl14 = self.dcs[14].get_repo_url()
+                self.rurl15 = self.dcs[15].get_repo_url()
+
                 # Publish a set of packages to one repository.
                 self.published = self.pkgsend_bulk(self.rurl1, (self.amber10,
                     self.amber20, self.bronze10, self.bronze20, self.tree10,
@@ -285,6 +288,44 @@
                 time.sleep(1)
                 self.published_blend += self.pkgsend_bulk(self.rurl13, (self.multiD,))
 
+                # Publish to multiple repositories, maintaining lists of which
+                # FMRIs are published to which repository.
+                self.published_multi_14 = []
+                self.published_multi_15 = []
+
+                for url, record in [
+                    (self.rurl14, self.published_multi_14),
+                    (self.rurl15, self.published_multi_15)]:
+                        time.sleep(1)
+                        record += self.pkgsend_bulk(url, (self.scheme10))
+                        time.sleep(1)
+                        record += self.pkgsend_bulk(url, (self.tree10))
+
+                        time.sleep(1)
+                        record += self.pkgsend_bulk(url,
+                            self.bronze20.replace("open ",
+                            "open pkg://altpub/"))
+                        time.sleep(1)
+                        record += self.pkgsend_bulk(url,
+                            (self.amber10.replace("open ",
+                            "open pkg://altpub/")))
+                        time.sleep(1)
+                        record += self.pkgsend_bulk(url,
+                            (self.multiA.replace("open ",
+                            "open pkg://last/")))
+
+                # add bronze20b to one repository so that we have at least one
+                # package where more complex merging happens.
+                time.sleep(1)
+                self.published_multi_15 += self.pkgsend_bulk(self.rurl15,
+                    self.bronze20b.replace("open ", "open pkg://altpub/"))
+
+                # one of our source repositories also contains a newer
+                # version of pkg:/gold (self.multi*)
+                time.sleep(1)
+                self.published_multi_15 += self.pkgsend_bulk(self.rurl15,
+                    (self.multiB.replace("open ", "open pkg://last/")))
+
         def test_0_options(self):
                 """Verify that pkgmerge gracefully fails when given bad option
                 values."""
@@ -341,6 +382,12 @@
                     "-d %s" % self.test_root,
                 ]), exit=1)
 
+                # Should fail because of no matching -p publishers.
+                self.pkgmerge(" ".join([
+                    "-s arch=i386,%s" % self.rurl2,
+                    "-d %s -p noodles" % self.test_root,
+                ]), exit=1)
+
         def test_1_single_merge(self):
                 """Verify that merge functionality works as expected when
                 specifying a single source."""
@@ -992,13 +1039,215 @@
                 self.assertEqualDiff(expected, actual)
                 shutil.rmtree(repodir)
 
-        def get_manifest(self, repodir):
-                repo = self.get_repo(repodir)
-                cat = repo.get_catalog(pub="os.org")
-                for f in cat.fmris():
-                        with open(repo.manifest(f), "rb") as m:
-                                actual = "".join(sorted(l for l in m)).strip()
-                return actual
+        def test_7_multipub_merge(self):
+                """Tests that we can merge packages from repositories with
+                several publishers."""
+
+                repodir = os.path.join(self.test_root, "7merge_repo")
+                self.create_repo(repodir)
+
+                # test dry run
+                self.pkgmerge(" ".join([
+                    "-n",
+                    "-s arch=sparc,debug=false,%s" % self.dcs[14].get_repodir(),
+                    "-s arch=i386,debug=false,%s" % self.dcs[15].get_repodir(),
+                    "-d %s" % repodir]))
+
+                # test dry run with selected publishers
+                self.pkgmerge(" ".join([
+                    "-p os.org",
+                    "-p altpub",
+                    "-n",
+                    "-s arch=sparc,debug=false,%s" % self.dcs[14].get_repodir(),
+                    "-s arch=i386,debug=false,%s" % self.dcs[15].get_repodir(),
+                    "-d %s" % repodir]))
+
+                # this should fail, as no -p noodles publisher exists in any of
+                # the source repositories
+                self.pkgmerge(" ".join([
+                    "-p os.org",
+                    "-p altpub",
+                    "-p noodles",
+                    "-n",
+                    "-s arch=sparc,debug=false,%s" % self.dcs[14].get_repodir(),
+                    "-s arch=i386,debug=false,%s" % self.dcs[15].get_repodir(),
+                    "-d %s" % repodir]), exit=1)
+
+                # now we want to perform the merge operations and validate the
+                # results. This was the order we published packages to multi_15
+                # 0  = pkg://os.org/[email protected],5.11-0:20120920T085857Z
+                # 1  = pkg://os.org/[email protected],5.11-0:20120920T085859Z
+                # 2  = pkg://altpub/[email protected],5.11-0:20120920T085902Z
+                # 3  = pkg://altpub/[email protected],5.11-0:20120920T085904Z
+                # 4  = pkg://last/[email protected],5.11-0:20120920T085906Z
+                # 5  = pkg://altpub/[email protected],5.11-0:20120920T085920Z
+                # 6  = pkg://last/[email protected],5.11-0:20120920T085923Z
+
+                # build a dictionary of the FMRIs we're interested in
+                repo15_fmris = {
+                    "osorg_scheme": self.published_multi_15[0],
+                    "osorg_tree": self.published_multi_15[1],
+                    "altpub_amber": self.published_multi_15[3],
+                    # we published two versions of bronze and gold, use the
+                    # latest FMRI
+                    "altpub_bronze": self.published_multi_15[5],
+                    "last_gold": self.published_multi_15[6]
+                }
+
+                # the some expected manifests we should get after merging.
+                expected_osorg_scheme = """\
+file 3a06aa547ffe0186a2b9db55b8853874a048fb47 chash=ab50364de4ce8f847d765d402d80e37431e1f0aa group=bin mode=0444 owner=root path=etc/tree pkg.csize=40 pkg.size=20
+set name=pkg.fmri value=%(osorg_scheme)s
+set name=variant.arch value=sparc value=i386
+set name=variant.debug value=false\
+""" % repo15_fmris
+                expected_osorg_tree = """\
+file 3a06aa547ffe0186a2b9db55b8853874a048fb47 chash=ab50364de4ce8f847d765d402d80e37431e1f0aa group=bin mode=0444 owner=root path=etc/tree pkg.csize=40 pkg.size=20
+set name=pkg.fmri value=%(osorg_tree)s
+set name=variant.arch value=sparc value=i386
+set name=variant.debug value=false\
+""" % repo15_fmris
+                expected_altpub_amber = """\
+depend fmri=pkg:/[email protected] type=require
+set name=pkg.fmri value=%(altpub_amber)s
+set name=variant.arch value=sparc value=i386
+set name=variant.debug value=false\
+""" % repo15_fmris
+                expected_altpub_bronze = """\
+depend fmri=pkg:/[email protected] type=require
+depend fmri=pkg:/[email protected] type=require variant.arch=i386
+dir group=bin mode=0755 owner=root path=etc
+dir group=bin mode=0755 owner=root path=lib
+file 1abe1a7084720f501912eceb1312ddd799fb2a34 chash=ea7230676e13986491d7405c5a9298e074930575 group=bin mode=0444 owner=root path=etc/bronze1 pkg.csize=37 pkg.size=17
+file 34f88965d55d3a730fa7683bc0f370fc6e42bf95 chash=66eebb69ee0299dcb495162336db81a3188de037 group=bin mode=0555 owner=root path=usr/bin/sh pkg.csize=32 pkg.size=12
+file 6d8f3b9498aa3bbe7db01189b88f1b71f4ce40ad chash=6f3882864ebd7fd1a09e0e7b889fdc524c8c8bb2 group=bin mode=0444 owner=root path=etc/amber2 pkg.csize=37 pkg.size=17 variant.arch=sparc
+file 8535c15c49cbe1e7cb1a0bf8ff87e512abed66f8 chash=6ff2f52d2f894f5c71fb8fdd3b214e22959fccbb group=bin mode=0555 owner=root path=lib/libc.bronze pkg.csize=33 pkg.size=13
+file 91fa26695f9891b2d94fd72c31b640efb5589da5 chash=4eed1e5dc5ab131812da34dc148562e6833fa92b group=bin mode=0444 owner=root path=etc/scheme pkg.csize=36 pkg.size=16 variant.arch=i386
+file cf68b26a90cb9a0d7510f24cfb8cf6d901cec34e chash=0eb6fe69c4492f801c35dcc9175d55f783cc64a2 group=bin mode=0444 owner=root path=A1/B2/C3/D4/E5/F6/bronzeA2 pkg.csize=38 pkg.size=18
+hardlink path=lib/libc.bronze2.0.hardlink target=/lib/libc.so.1
+license 773b94a252723da43e8f969b4384701bcd41ce12 chash=e0715301fc211f6543ce0c444f4c34e38c70f70e license=copyright pkg.csize=40 pkg.size=20
+link path=usr/bin/jsh target=./sh
+set name=pkg.fmri value=%(altpub_bronze)s
+set name=variant.arch value=sparc value=i386
+set name=variant.debug value=false\
+""" % repo15_fmris
+                expected_last_gold = """\
+depend fmri=foo fmri=bar type=require-any
+file 6b7161cb29262ea4924a8874818da189bb70da09 chash=77e271370cec04931346c969a85d6af37c1ea83f group=bin mode=0444 owner=root path=etc/binary pkg.csize=36 pkg.size=16 variant.arch=i386
+file 6b7161cb29262ea4924a8874818da189bb70da09 chash=77e271370cec04931346c969a85d6af37c1ea83f group=bin mode=0444 owner=root path=etc/everywhere-notes pkg.csize=36 pkg.size=16
+file 9e837a70edd530a88c88f8a58b8a5bf2a8f3943c chash=d0323533586e1153bd1701254f45d2eb2c7eb0c4 group=bin mode=0444 owner=root path=etc/debug-notes pkg.csize=36 pkg.size=16
+file a10f11b8559a723bea9ee0cf5980811a9d51afbb chash=9fb8079898da8a2a9faad65c8df4c4a42095f25a group=bin mode=0444 owner=root path=etc/sparc/debug-notes pkg.csize=36 pkg.size=16 variant.arch=sparc
+file aab699c6424ed1fc258b6b39eb113e624a9ee368 chash=43c3b9a83a112727264390002c3db3fcebec2e76 group=bin mode=0444 owner=root path=etc/binary pkg.csize=36 pkg.size=16 variant.arch=sparc
+set name=pkg.fmri value=%(last_gold)s
+set name=variant.arch value=sparc value=i386
+set name=variant.debug value=false\
+""" % repo15_fmris
+
+                # A dictionary of the expected package contents, keyed by FMRI
+                expected = {
+                    repo15_fmris["altpub_bronze"]: expected_altpub_bronze,
+                    repo15_fmris["altpub_amber"]: expected_altpub_amber,
+                    repo15_fmris["osorg_tree"]: expected_osorg_tree,
+                    repo15_fmris["osorg_scheme"]: expected_osorg_scheme,
+                    repo15_fmris["last_gold"]: expected_last_gold
+                }
+
+                def check_repo(repodir, keys, fmri_dic, expected):
+                        """Check that packages corresponding to the list of
+                        keys 'keys' to items in 'fmri_dic' are present in the
+                        repository, and match the contents from the dictionary
+                        'expected'.  We also check that the repository has no
+                        packages other than those specified by 'keys', and no
+                        more publishers than are present in those packages."""
+                        sr = self.get_repo(repodir)
+                        # check that the packages from 'keys' exist,
+                        # and their content matches what we expect.
+                        for key in keys:
+                                f = fmri_dic[key]
+                                with open(sr.manifest(f), "rb") as manf:
+                                        actual = "".join(
+                                            sorted(l for l in manf)).strip()
+                                self.assertEqualDiff(expected[f], actual)
+
+                        # check that we have only the publishers used
+                        # by packages from 'keys' in the repository
+                        fmris = [fmri_dic[key] for key in keys]
+                        pubs = set([fmri.PkgFmri(entry).get_publisher()
+                            for entry in fmris])
+                        known_pubs = set(
+                            [p.prefix for p in sr.get_publishers()])
+                        self.assert_(pubs == known_pubs,
+                            "Repository at %s didn't contain the "
+                            "expected set of publishers")
+
+                        # check that we have only the packages defined
+                        # in 'keys' in the repository by walking all
+                        # publishers, and all packages in the repository
+                        for pub in sr.get_publishers():
+                                cat = sr.get_catalog(pub=p.prefix)
+                                for f in cat.fmris():
+                                        if f.get_fmri() not in fmris:
+                                                self.assert_(False,
+                                                    "%s not in repository" % f)
+
+                # test merging all publishers.
+                self.pkgmerge(" ".join([
+                    "-s arch=sparc,debug=false,%s" % self.dcs[14].get_repodir(),
+                    "-s arch=i386,debug=false,%s" % self.dcs[15].get_repodir(),
+                    "-d %s" % repodir]))
+
+                check_repo(repodir, repo15_fmris.keys(), repo15_fmris, expected)
+
+                # test merging only altpub and os.org.
+                shutil.rmtree(repodir)
+                self.create_repo(repodir)
+                self.pkgmerge(" ".join([
+                    "-p altpub",
+                    "-p os.org",
+                    "-s arch=sparc,debug=false,%s" % self.dcs[14].get_repodir(),
+                    "-s arch=i386,debug=false,%s" % self.dcs[15].get_repodir(),
+                    "-d %s" % repodir]))
+
+                check_repo(repodir, ["altpub_bronze", "altpub_amber",
+                    "osorg_tree", "osorg_scheme"], repo15_fmris, expected)
+
+                # test merging only altpub
+                shutil.rmtree(repodir)
+                self.create_repo(repodir)
+                self.pkgmerge(" ".join([
+                    "-p altpub",
+                    "-s arch=sparc,debug=false,%s" % self.dcs[14].get_repodir(),
+                    "-s arch=i386,debug=false,%s" % self.dcs[15].get_repodir(),
+                    "-d %s" % repodir]))
+
+                check_repo(repodir, ["altpub_bronze", "altpub_amber"],
+                    repo15_fmris, expected)
+
+                # this should exit with a 1, but we should get the same results
+                # in the repository as last time.
+                shutil.rmtree(repodir)
+                self.create_repo(repodir)
+                self.pkgmerge(" ".join([
+                    "-p altpub",
+                    "-p noodles",
+                    "-s arch=sparc,debug=false,%s" % self.dcs[14].get_repodir(),
+                    "-s arch=i386,debug=false,%s" % self.dcs[15].get_repodir(),
+                    "-d %s" % repodir]), exit=1)
+
+                check_repo(repodir, ["altpub_bronze", "altpub_amber"],
+                    repo15_fmris, expected)
+
+        def get_manifest(self, repodir, pubs=["os.org"]):
+                repository = self.get_repo(repodir)
+                actual = ""
+                for pub in pubs:
+                        cat = repository.get_catalog(pub=pub)
+                        for f in cat.fmris():
+                                with open(repository.manifest(f), "rb") as m:
+                                        actual += "".join(
+                                            sorted(l for l in m)).strip()
+                        actual += "\n"
+                return actual.strip()
 
 if __name__ == "__main__":
         unittest.main()
--- a/src/util/publish/pkgmerge.py	Fri Oct 12 13:47:55 2012 -0700
+++ b/src/util/publish/pkgmerge.py	Mon Oct 15 17:22:18 2012 +0100
@@ -52,6 +52,16 @@
         import sys
         sys.exit(1)
 
+class PkgmergeException(Exception):
+        """An exception raised if something goes wrong during the merging
+        process."""
+
+        def __unicode__(self):
+                # To workaround python issues 6108 and 2517, this provides a
+                # a standard wrapper for this class' exceptions so that they
+                # have a chance of being stringified correctly.
+                return str(self)
+
 BUILD_RELEASE  = "5.11"  # Should be an option to this program some day?
 
 catalog_dict   = {}    # hash table of catalogs by source uri
@@ -95,7 +105,7 @@
         msg(_("""\
 Usage:
         pkgmerge [-n] -d dest_repo -s variant=value[,...],src_repo ...
-            [pkg_fmri_pattern ...]
+            [-p publisher_prefix ... ] [pkg_fmri_pattern ...]
 
 Options:
         -d dest_repo
@@ -115,6 +125,12 @@
                 be named for all sources.  This option may be specified multiple
                 times.
 
+        -p publisher_prefix
+                The name of the publisher we should merge packages from.  This
+                option may be specified multiple times.  If no -p option is
+                used, the default is to merge packages from all publishers in
+                all source repositories.
+
         --help or -?
                 Displays a usage message.
 
@@ -134,15 +150,12 @@
         if exitcode != None:
                 sys.exit(exitcode)
 
-def get_tracker(quiet=False):
-        if quiet:
-                progresstracker = progress.QuietProgressTracker()
-        else:
-                try:
-                        progresstracker = \
-                            progress.FancyUNIXProgressTracker()
-                except progress.ProgressTrackerException:
-                        progresstracker = progress.CommandLineProgressTracker()
+def get_tracker():
+        try:
+                progresstracker = \
+                    progress.FancyUNIXProgressTracker()
+        except progress.ProgressTrackerException:
+                progresstracker = progress.CommandLineProgressTracker()
         return progresstracker
 
 def load_catalog(repouri, pub):
@@ -187,9 +200,12 @@
         dest_repo     = None
         source_list   = []
         variant_list  = []
+        pub_list      = []
+        use_pub_list  = False
 
         try:
-                opts, pargs = getopt.getopt(sys.argv[1:], "d:ns:?", ["help"])
+                opts, pargs = getopt.getopt(sys.argv[1:], "d:np:s:?",
+                    ["help"])
                 for opt, arg in opts:
                         if opt == "-d":
                                 dest_repo = misc.parse_uri(arg)
@@ -217,6 +233,9 @@
                                 variant_list.append(src_vars)
                                 source_list.append(publisher.RepositoryURI(
                                     misc.parse_uri(s[-1])))
+                        elif opt == "-p":
+                                use_pub_list = True
+                                pub_list.append(arg)
 
                         if opt in ("--help", "-?"):
                                 usage(exitcode=0)
@@ -282,82 +301,187 @@
         xport, xport_cfg = transport.setup_transport()
         xport_cfg.incoming_root = tmpdir
 
-        pub = transport.setup_publisher(source_list,
+        # we don't use the publisher returned by setup_publisher, as that only
+        # returns one of the publishers in source_list.  Instead we use
+        # xport_cfg to access all publishers.
+        transport.setup_publisher(source_list,
             "pkgmerge", xport, xport_cfg, remote_prefix=True)
         cat_dir = tempfile.mkdtemp(dir=tmpdir)
-        pub.meta_root = cat_dir
-        pub.transport = xport
+
+        # we must have at least one matching publisher if -p was used.
+        known_pubs = set([pub.prefix for pub in xport_cfg.gen_publishers()])
+        if pub_list and len(set(pub_list).intersection(known_pubs)) == 0:
+                error(_("no publishers from source repositories match "
+                    "the given -p options."))
+
+        errors = set()
+        tracker = get_tracker()
 
-        # Use separate transport for destination repository in case source
-        # and destination have identical publisher configuration.
-        dest_xport, dest_xport_cfg = transport.setup_transport()
-        dest_xport_cfg.incoming_root = tmpdir
+        # iterate over all publishers in our source repositories.  If errors
+        # are encountered for a given publisher, we accumulate those, and
+        # skip to the next publisher.
+        for pub in xport_cfg.gen_publishers():
 
-        # retrieve catalogs for all specified repositories
-        for s in source_list:
-                load_catalog(s, pub)
+                if use_pub_list:
+                        if pub.prefix not in pub_list:
+                                continue
+                        else:
+                                # remove publishers from pub_list as we go, so
+                                # that when we're finished, any remaining
+                                # publishers in pub_list suggest superfluous
+                                # -p options, which will cause us to exit with
+                                # an error.
+                                pub_list.remove(pub.prefix)
+
+                pub.meta_root = cat_dir
+                pub.transport = xport
 
-        # determine the list of packages we'll be processing
-        if not pargs:
-                # use the latest versions and merge everything
-                fmri_arguments = list(set(
-                    name
+                # Use separate transport for destination repository in case
+                # source and destination have identical publisher configuration.
+                dest_xport, dest_xport_cfg = transport.setup_transport()
+                dest_xport_cfg.incoming_root = tmpdir
+
+                # retrieve catalogs for all specified repositories
+                for s in source_list:
+                        load_catalog(s, pub)
+
+                # determine the list of packages we'll be processing
+                if not pargs:
+                        # use the latest versions and merge everything
+                        fmri_arguments = list(set(
+                            name
+                            for s in source_list
+                            for name in get_all_pkg_names(s)
+                        ))
+                        exclude_args = []
+                else:
+                        fmri_arguments = [
+                            f
+                            for f in pargs
+                            if not f.startswith("!")
+                        ]
+
+                        exclude_args = [
+                            f[1:]
+                            for f in pargs
+                            if f.startswith("!")
+                        ]
+
+                # build fmris to be merged
+                masterlist = [
+                    build_merge_list(fmri_arguments, exclude_args,
+                        catalog_dict[s.uri])
                     for s in source_list
-                    for name in get_all_pkg_names(s)
-                ))
-                exclude_args = []
-        else:
-                fmri_arguments = [
-                    f
-                    for f in pargs
-                    if not f.startswith("!")
-                ]
-
-                exclude_args = [
-                    f[1:]
-                    for f in pargs
-                    if f.startswith("!")
                 ]
 
-        # build fmris to be merged
-        masterlist = [
-            build_merge_list(fmri_arguments, exclude_args, catalog_dict[s.uri])
-            for s in source_list
-        ]
+                # check for unmatched patterns
+                in_none = reduce(lambda x, y: x & y,
+                    (set(u) for d, u in masterlist))
+                if in_none:
+                        errors.add(
+                            _("The following pattern(s) did not match any "
+                            "packages in any of the specified repositories for "
+                            "publisher %(pub_name)s:"
+                            "\n%(patterns)s") % {"patterns": "\n".join(in_none),
+                            "pub_name": pub.prefix})
+                        continue
+
+                # generate set of all package names to be processed, and dict
+                # of lists indexed by order in source_list; if that repo has no
+                # fmri for this pkg then use None.
+                allpkgs = set(name for d, u in masterlist for name in d)
+
+                processdict = {}
+                for p in allpkgs:
+                        for d, u in masterlist:
+                                processdict.setdefault(p, []).append(
+                                    d.setdefault(p, None))
 
-        # check for unmatched patterns
-        in_none = reduce(lambda x, y: x & y, (set(u) for d, u in masterlist))
-        if in_none:
-                error(_("The following pattern(s) did not match any packages "
-                    "in any of the specified repositories:\n%s") % "\n".join(
-                    in_none))
+                # check to make sure all fmris are at same version modulo
+                # timestamp
+                for entry in processdict:
+                        if len(set([
+                                str(a).rsplit(":")[0]
+                                for a in processdict[entry]
+                                if a is not None
+                            ])) > 1:
+                                errors.add(
+                                    _("fmris matching the following patterns do"
+                                    " not have matching versions across all "
+                                    "repositories for publisher %(pubs)s: "
+                                    "%(patterns)s") % {"pub": pub.prefix,
+                                    "patterns": processdict[entry]})
+                                continue
 
-        # generate set of all package names to be processed, and dict of lists
-        # indexed by order in source_list; if that repo has no fmri for this
-        # pkg then use None.
-        allpkgs = set(name for d, u in masterlist for name in d)
+                # we're ready to merge
+                if not dry_run:
+                        target_pub = transport.setup_publisher(dest_repo,
+                            pub.prefix, dest_xport, dest_xport_cfg,
+                            remote_prefix=True)
+                else:
+                        target_pub = None
 
-        processdict = {}
-        for p in allpkgs:
-                for d, u in masterlist:
-                        processdict.setdefault(p, []).append(d.setdefault(p,
-                            None))
+                tracker.republish_set_goal(len(processdict), 0, 0)
+                # republish packages for this publisher. If we encounter any
+                # publication errors, we move on to the next publisher.
+                try:
+                        pkg_tmpdir = tempfile.mkdtemp(dir=tmpdir)
+                        republish_packages(pub, target_pub,
+                            processdict, source_list, variant_list, variants,
+                            tracker, xport, dest_repo, dest_xport, pkg_tmpdir,
+                            dry_run=dry_run)
+                except (trans.TransactionError, PkgmergeException), e:
+                        errors.add(str(e))
+                        tracker.reset()
+                        continue
+                finally:
+                        # if we're handling an exception, this still gets called
+                        # in spite of the 'continue' that the handler ends with.
+                        if os.path.exists(pkg_tmpdir):
+                                shutil.rmtree(pkg_tmpdir)
+
+                tracker.republish_done(dryrun=dry_run)
+                tracker.reset()
+
+        # If -p options were supplied, we should have processed all of them
+        # by now. Remaining entries suggest -p options that were not merged.
+        if use_pub_list and pub_list:
+                errors.add(_("the following publishers were not found in "
+                    "source repositories: %s") % " ".join(pub_list))
 
-        # check to make sure all fmris are at same version modulo timestamp
-        for entry in processdict:
-                if len(set([
-                        str(a).rsplit(":")[0]
-                        for a in processdict[entry]
-                        if a is not None
-                    ])) > 1:
-                        error(_("fmris matching the following patterns do not "
-                            "have matching versions across all repositories: "
-                            "%s") % processdict[entry])
+        # If we have encountered errors for some publishers, print them now
+        # and exit.
+        tracker.flush()
+        for message in errors:
+                error(message, exitcode=None)
+        if errors:
+                exit(1)
+
+        return 0
+
+def republish_packages(pub, target_pub, processdict, source_list, variant_list,
+        variants, tracker, xport, dest_repo, dest_xport, pkg_tmpdir,
+        dry_run=False):
+        """Republish packages for publisher pub to dest_repo.
 
-        # we're ready to merge
-        if not dry_run:
-                target_pub = transport.setup_publisher(dest_repo,
-                    pub.prefix, dest_xport, dest_xport_cfg, remote_prefix=True)
+        If we try to republish a package that we have already published,
+        an exception is raise.
+
+        pub             the publisher from source_list that we are republishing
+        target_pub      the destination publisher
+        processdict     a dict indexed by package name of the pkgs to merge
+        source_list     a list of source respositories
+        variant_list    a list of dicts containing variant names/values
+        variants        the unique set of variants across all sources.
+        tracker         a progress tracker
+        xport           the transport handling our source repositories
+        dest_repo       our destination repository
+        dest_xport      the transport handling our destination repository
+        pkg_tmpdir      a temporary dir used when downloading pkg content
+                        which may be deleted and recreated by this method.
+
+        dry_run         True if we should not actually publish
+        """
 
         def get_basename(pfmri):
                 open_time = pfmri.get_timestamp()
@@ -365,8 +489,6 @@
                     (calendar.timegm(open_time.utctimetuple()),
                     urllib.quote(str(pfmri), ""))
 
-        tracker = get_tracker()
-        tracker.republish_set_goal(len(processdict), 0, 0)
         for entry in processdict:
                 man, retrievals = merge_fmris(source_list,
                     processdict[entry], variant_list, variants)
@@ -392,70 +514,66 @@
 
                 tracker.republish_start_pkg(f, getbytes=getbytes,
                     sendbytes=sendbytes)
+
                 if dry_run:
                         # Dry-run; attempt a merge of everything but don't
                         # write any data or publish packages.
                         continue
 
-                pkgdir = tempfile.mkdtemp(dir=tmpdir)
+                target_pub.prefix = f.publisher
+
                 # Retrieve package data from each package source.
                 for i, uri in enumerate(source_list):
                         pub.repository.origins = [uri]
-                        mfile = xport.multi_file_ni(pub, pkgdir,
+                        mfile = xport.multi_file_ni(pub, pkg_tmpdir,
                             decompress=True, progtrack=tracker)
                         for a in retrievals[i]:
                                 mfile.add_action(a)
                         mfile.wait_files()
 
+                trans_id = get_basename(f)
+                pkg_name = f.get_fmri()
+                pubs.add(target_pub.prefix)
                 # Publish merged package.
-                try:
-                        trans_id = get_basename(f)
-                        pkg_name = f.get_fmri()
-                        target_pub.prefix = f.publisher
-                        pubs.add(f.publisher)
+                t = trans.Transaction(dest_repo,
+                    pkg_name=pkg_name, trans_id=trans_id,
+                    xport=dest_xport, pub=target_pub,
+                    progtrack=tracker)
 
-                        t = trans.Transaction(dest_repo, pkg_name=pkg_name,
-                            trans_id=trans_id, xport=dest_xport, pub=target_pub,
-                            progtrack=tracker)
-
-                        # Remove any previous failed attempt to
-                        # to republish this package.
-                        try:
-                                t.close(abandon=True)
-                        except:
-                                # It might not exist already.
-                                pass
+                # Remove any previous failed attempt to
+                # to republish this package.
+                try:
+                        t.close(abandon=True)
+                except:
+                        # It might not exist already.
+                        pass
 
-                        t.open()
-                        for a in man.gen_actions():
-                                if (a.name == "set" and
-                                    a.attrs["name"] == "pkg.fmri"):
-                                        # To be consistent with the
-                                        # server, the fmri can't be
-                                        # added to the manifest.
-                                        continue
+                t.open()
+                for a in man.gen_actions():
+                        if (a.name == "set" and
+                            a.attrs["name"] == "pkg.fmri"):
+                                # To be consistent with the
+                                # server, the fmri can't be
+                                # added to the manifest.
+                                continue
 
-                                if hasattr(a, "hash"):
-                                        fname = os.path.join(pkgdir,
-                                            a.hash)
-                                        a.data = lambda: open(fname, "rb")
-                                t.add(a)
+                        if hasattr(a, "hash"):
+                                fname = os.path.join(pkg_tmpdir,
+                                    a.hash)
+                                a.data = lambda: open(
+                                    fname, "rb")
+                        t.add(a)
 
-                        # Always defer catalog update.
-                        t.close(add_to_catalog=False)
-                except trans.TransactionError, e:
-                        error(str(e))
+                # Always defer catalog update.
+                t.close(add_to_catalog=False)
 
                 # Done with this package.
                 tracker.republish_end_pkg(f)
 
-                # Dump retrieved package data after each republication.
-                shutil.rmtree(pkgdir)
-
-        tracker.republish_done(dryrun=dry_run)
-        tracker.reset()
-
-        return 0
+                # Dump retrieved package data after each republication and
+                # recreate the directory for the next package.
+                shutil.rmtree(pkg_tmpdir)
+                os.mkdir(pkg_tmpdir)
 
 def merge_fmris(source_list, fmri_list, variant_list, variants):
         """Merge a list of manifests representing multiple variants,
@@ -637,7 +755,8 @@
 
                         if a.name == "set" and a.attrs["name"] == variant:
                                 if vval not in a.attrlist("value"):
-                                        error(_("package %(pkg)s is tagged as "
+                                        raise PkgmergeException(
+                                            _("package %(pkg)s is tagged as "
                                             "not supporting %(var_name)s "
                                             "%(var_value)s") % {
                                             "pkg": fmri_list[j],
@@ -664,7 +783,8 @@
         try:
                 action_lists = list(manifest.Manifest.comm(manifest_list))
         except manifest.ManifestError, e:
-                error("Duplicate action(s) in package \"%s\": \n%s" %
+                raise PkgmergeException(
+                    "Duplicate action(s) in package \"%s\": \n%s" %
                     (new_fmri.pkg_name, e))
 
         # Declare new package FMRI.
@@ -790,7 +910,7 @@
                         fmris.append(fmri)
                 except (pkg.fmri.FmriError,
                     pkg.version.VersionError), e:
-                        error(str(e))
+                        raise PkgmergeException(str(e))
 
         # Create a dictionary of patterns, with each value being a
         # dictionary of pkg names & fmris that match that pattern.