7112857 transport could try harder when being fed corrupt data s11u1b12
authorTim Foster <tim.s.foster@oracle.com>
Fri, 09 Mar 2012 12:23:53 +1300
changeset 2642 5b92d3ae1b76
parent 2641 f7bb620e1a56
child 2644 434fe01f111b
7112857 transport could try harder when being fed corrupt data
src/modules/client/transport/repo.py
src/modules/client/transport/stats.py
src/modules/client/transport/transport.py
src/tests/cli/t_pkg_install.py
src/tests/cli/t_pkg_refresh.py
--- a/src/modules/client/transport/repo.py	Thu Mar 01 18:15:20 2012 +0000
+++ b/src/modules/client/transport/repo.py	Fri Mar 09 12:23:53 2012 +1300
@@ -204,6 +204,13 @@
 
                 raise NotImplementedError
 
+        def build_refetch_header(self, header):
+                """Based on existing header contents, build a header that
+                should be used for a subsequent retry when fetching content
+                from the repository."""
+
+                raise NotImplementedError
+
         @staticmethod
         def _annotate_exceptions(errors, mapping=None):
                 """Walk a list of transport errors, examine the
@@ -504,6 +511,7 @@
                         headers["Cache-Control"] = "max-age=0"
                 if redownload:
                         headers["Cache-Control"] = "no-cache"
+                        headers["Pragma"] = "no-cache"
                 if header:
                         headers.update(header)
                 if progtrack:
@@ -1011,6 +1019,24 @@
 
                 return True
 
+        def build_refetch_header(self, header):
+                """For HTTP requests that have failed due to corrupt content,
+                if that request didn't specify 'Cache-control: no-cache' in
+                its headers then we can try the request with that additional
+                header, which can help where a web cache is serving corrupt
+                content.
+
+                This method returns True if the headers passed haven't got
+                "Cache-Control: no-cache" set, adding that header.  Otherwise
+                it returns False.
+                """
+
+                if header.get("Cache-Control", "") != "no-cache":
+                        header["Cache-Control"] = "no-cache"
+                        header["Pragma"] = "no-cache"
+                        return header
+                return header
+
 
 class HTTPSRepo(HTTPRepo):
 
@@ -1723,6 +1749,12 @@
 
                 return True
 
+        def build_refetch_header(self, header):
+                """Pointless to attempt refetch of corrupt content for
+                this protocol."""
+
+                return header
+
 class _ArchiveRepo(TransportRepo):
         """Private implementation of transport repository logic for repositories
         contained within an archive.
@@ -1988,6 +2020,11 @@
                 """No-op."""
                 return True
 
+        def build_refetch_header(self, header):
+                """Pointless to attempt refetch of corrupt content for
+                  this protocol."""
+                return header
+
 
 class FileRepo(object):
         """Factory class for creating transport repository objects for
--- a/src/modules/client/transport/stats.py	Thu Mar 01 18:15:20 2012 +0000
+++ b/src/modules/client/transport/stats.py	Fri Mar 09 12:23:53 2012 +1300
@@ -315,6 +315,14 @@
                     self.__decayable_err
 
         @property
+        def content_errors(self):
+                """Return the number of content errors that the client has
+                encountered while trying to perform operation on this
+                repository."""
+
+                return self.__content_err
+
+        @property
         def num_connect(self):
                 """Return the number of times that the host has had a
                 connection established.  This is less than or equal to the
--- a/src/modules/client/transport/transport.py	Thu Mar 01 18:15:20 2012 +0000
+++ b/src/modules/client/transport/transport.py	Fri Mar 09 12:23:53 2012 +1300
@@ -587,9 +587,9 @@
                 # of origins for a publisher without incurring the significant
                 # overhead of performing file-based search unless the network-
                 # based resource is unavailable.
-                for d, v in self.__gen_repo(pub, retry_count, origin_only=True,
-                    prefer_remote=True, alt_repo=alt_repo, operation="search",
-                    versions=[0, 1]):
+                for d, retries, v in self.__gen_repo(pub, retry_count,
+                    origin_only=True, prefer_remote=True, alt_repo=alt_repo,
+                    operation="search", versions=[0, 1]):
 
                         try:
                                 fobj = d.do_search(data, header,
@@ -686,8 +686,8 @@
                 # prior to this operation.
                 self._captive_portal_test(ccancel=ccancel, alt_repo=alt_repo)
 
-                for d in self.__gen_repo(pub, retry_count, origin_only=True,
-                    alt_repo=alt_repo):
+                for d, retries in self.__gen_repo(pub, retry_count,
+                    origin_only=True, alt_repo=alt_repo):
 
                         repostats = self.stats[d.get_url()]
 
@@ -844,13 +844,15 @@
                         # os.statvfs is not available on Windows
                         pass
 
-                for d, v in self.__gen_repo(pub, retry_count, origin_only=True,
-                    operation="catalog", versions=[1], ccancel=ccancel,
-                    alt_repo=alt_repo):
+                for d, retries, v in self.__gen_repo(pub, retry_count,
+                    origin_only=True, operation="catalog", versions=[1],
+                    ccancel=ccancel, alt_repo=alt_repo):
 
                         failedreqs = []
                         repostats = self.stats[d.get_url()]
                         gave_up = False
+                        if repostats.content_errors and retries > 1:
+                                header = d.build_refetch_header(header)
 
                         # This returns a list of transient errors
                         # that occurred during the transport operation.
@@ -912,7 +914,7 @@
                                 try:
                                         self._verify_catalog(s, download_dir)
                                 except tx.InvalidContentException, e:
-                                        repostats.record_error(content=True)
+                                        repostats.record_error(content=True)                                        
                                         failedreqs.append(e.request)
                                         failures.append(e)
                                         if not flist:
@@ -955,8 +957,9 @@
                 if isinstance(pub, publisher.Publisher):
                         header = self.__build_header(uuid=self.__get_uuid(pub))
 
-                for d, v in self.__gen_repo(pub, retry_count, origin_only=True,
-                    operation="publisher", versions=[0], ccancel=ccancel):
+                for d, retries, v in self.__gen_repo(pub, retry_count,
+                    origin_only=True, operation="publisher", versions=[0],
+                    ccancel=ccancel):
                         try:
                                 resp = d.get_publisherinfo(header,
                                     ccancel=ccancel)
@@ -1003,7 +1006,7 @@
                 assert isinstance(self.cfg, ImageTransportCfg)
                 assert isinstance(repo_uri, publisher.RepositoryURI)
 
-                for d, v in self.__gen_repo(repo_uri, retry_count,
+                for d, retries, v in self.__gen_repo(repo_uri, retry_count,
                     origin_only=True, operation="syspub", versions=[0],
                     ccancel=ccancel):
                         try:
@@ -1061,11 +1064,13 @@
                 elif fmri:
                         alt_repo = self.cfg.get_pkg_alt_repo(fmri)
 
-                for d, v in self.__gen_repo(pub, retry_count, operation="file",
-                    versions=[0, 1], alt_repo=alt_repo):
+                for d, retries, v in self.__gen_repo(pub, retry_count,
+                    operation="file", versions=[0, 1], alt_repo=alt_repo):
 
                         url = d.get_url()
-
+                        repostats = self.stats[url]
+                        if repostats.content_errors and retries > 1:
+                                header = d.build_refetch_header(header)
                         try:
                                 resp = d.get_datastream(fhash, v, header,
                                     ccancel=ccancel, pub=pub)
@@ -1077,7 +1082,6 @@
                                             reason="hash failure:  expected: %s"
                                             "computed: %s" % (fhash, hash_val),
                                             url=url)
-                                        repostats = self.stats[url]
                                         repostats.record_error(content=True)
                                         raise exc
 
@@ -1096,7 +1100,6 @@
                                 exc = tx.TransferContentException(url,
                                     "zlib.error:%s" %
                                     (" ".join([str(a) for a in e.args])))
-                                repostats = self.stats[url]
                                 repostats.record_error(content=True)
                                 if exc.retryable:
                                         failures.append(exc)
@@ -1122,9 +1125,14 @@
                 if isinstance(pub, publisher.Publisher):
                         header = self.__build_header(uuid=self.__get_uuid(pub))
 
-                for d, v in self.__gen_repo(pub, retry_count, origin_only=True,
-                    operation="status", versions=[0], ccancel=ccancel):
+                for d, retries, v in self.__gen_repo(pub, retry_count,
+                    origin_only=True, operation="status", versions=[0],
+                    ccancel=ccancel):
                         try:
+                                url = d.get_url()
+                                repostats = self.stats[url]
+                                if repostats.content_errors and retries > 1:
+                                        header = d.build_refetch_header(header)
                                 resp = d.get_status(header, ccancel=ccancel)
                                 infostr = resp.read()
 
@@ -1137,10 +1145,9 @@
                                 failures.extend(e.failures)
 
                         except (TypeError, ValueError), e:
-                                url = d.get_url()
+                                
                                 exc = tx.TransferContentException(url,
                                     "Invalid stats response: %s" % e)
-                                repostats = self.stats[url]
                                 repostats.record_error(content=True)
                                 if exc.retryable:
                                         failures.append(exc)
@@ -1174,8 +1181,8 @@
                 if not alt_repo:
                         alt_repo = self.cfg.get_pkg_alt_repo(fmri)
 
-                for d in self.__gen_repo(pub, retry_count, origin_only=True,
-                    alt_repo=alt_repo):
+                for d, retries in self.__gen_repo(pub, retry_count,
+                    origin_only=True, alt_repo=alt_repo):
 
                         # If a transport exception occurs,
                         # save it if it's retryable, otherwise
@@ -1234,11 +1241,13 @@
                 if not alt_repo:
                         alt_repo = self.cfg.get_pkg_alt_repo(fmri)
 
-                for d in self.__gen_repo(pub, retry_count, origin_only=True,
-                    alt_repo=alt_repo):
+                for d, retries in self.__gen_repo(pub, retry_count,
+                    origin_only=True, alt_repo=alt_repo):
 
                         repostats = self.stats[d.get_url()]
                         verified = False
+                        if repostats.content_errors and retries > 1:
+                                header = d.build_refetch_header(header)
                         try:
                                 resp = d.get_manifest(fmri, header,
                                     ccancel=ccancel, pub=pub)
@@ -1264,6 +1273,14 @@
                                 failures.extend(ex.failures)
                                 mcontent = None
 
+                        except tx.InvalidContentException, e:
+                                # We might be able to retrive uncorrupted
+                                # content. If this was the last retry, then
+                                # we're out of luck.
+                                failures.append(e)
+                                mcontent = None
+                                repostats.record_error(content=True)
+
                         except tx.TransportException, e:
                                 if e.retryable:
                                         failures.append(e)
@@ -1397,13 +1414,20 @@
                 # download_dir is temporary download path.
                 download_dir = self.cfg.incoming_root
 
-                for d in self.__gen_repo(pub, retry_count, origin_only=True,
-                    alt_repo=mxfr.get_alt_repo()):
+                for d, retries in self.__gen_repo(pub, retry_count,
+                    origin_only=True, alt_repo=mxfr.get_alt_repo()):
 
                         failedreqs = []
                         repostats = self.stats[d.get_url()]
                         gave_up = False
 
+                        # Possibly overkill, if any content errors were seen
+                        # we modify the headers of all requests, not just the
+                        # ones that failed before.
+                        if repostats.content_errors and retries > 1:
+                                mfstlist = [(fmri, d.build_refetch_header(h))
+                                    for fmri, h in mfstlist]
+
                         # This returns a list of transient errors
                         # that occurred during the transport operation.
                         # An exception handler here isn't necessary
@@ -1641,11 +1665,15 @@
                 else:
                         cache = None
 
-                for d, v in self.__gen_repo(pub, retry_count, operation="file",
-                    versions=[0, 1], alt_repo=mfile.get_alt_repo()):
+                for d, retries, v in self.__gen_repo(pub, retry_count,
+                    operation="file", versions=[0, 1],
+                    alt_repo=mfile.get_alt_repo()):
 
                         failedreqs = []
                         repostats = self.stats[d.get_url()]
+                        if repostats.content_errors and retries > 1:
+                                header = d.build_refetch_header(header)
+
                         gave_up = False
 
                         # This returns a list of transient errors
@@ -1705,7 +1733,7 @@
                                             dl_path)
                                 except tx.InvalidContentException, e:
                                         mfile.subtract_progress(e.size)
-                                        e.request = s
+                                        e.request = s                                        
                                         repostats.record_error(content=True)
                                         failedreqs.append(s)
                                         failures.append(e)
@@ -1813,8 +1841,13 @@
                 # prior to this operation.
                 self._captive_portal_test(ccancel=ccancel, alt_repo=alt_repo)
 
-                for d in self.__gen_repo(pub, retry_count, origin_only=True,
-                    alt_repo=alt_repo):
+                for d, retries in self.__gen_repo(pub, retry_count,
+                    origin_only=True, alt_repo=alt_repo):
+
+                        repostats = self.stats[d.get_url()]
+                        if repostats.content_errors and retries > 1:
+                                header = d.build_refetch_header(header)
+
                         # If a transport exception occurs,
                         # save it if it's retryable, otherwise
                         # raise the error to a higher-level handler.
@@ -1831,16 +1864,20 @@
                                 for f in ex.failures:
                                         f.url = d.get_url()
                                         failures.append(f)
+
+                        except tx.InvalidContentException, e:
+                                repostats.record_error(content=True)
+                                failures.append(
+                                    apx.InvalidDepotResponseException(
+                                    d.get_url(), "Unable to parse "
+                                    "repository's versions/0 response"))
+
                         except tx.TransportException, e:
                                 e.url = d.get_url()
                                 if e.retryable:
                                         failures.append(e)
                                 else:
                                         raise
-                        except ValueError:
-                                raise apx.InvalidDepotResponseException(
-                                    d.get_url(), "Unable to parse repository "
-                                    "response")
                 raise failures
 
         @staticmethod
@@ -1851,10 +1888,13 @@
                 resp = repo.get_versions(header, ccancel=ccancel)
                 verlines = resp.readlines()
 
-                return dict(
-                    s.split(None, 1)
-                    for s in (l.strip() for l in verlines)
-                )
+                try:
+                        return dict(
+                            s.split(None, 1)
+                            for s in (l.strip() for l in verlines)
+                        )
+                except ValueError, e:
+                        raise tx.InvalidContentException(e)
 
         def __fill_repo_vers(self, repo, vers=None, ccancel=None):
                 """Download versions information for the transport
@@ -1867,10 +1907,11 @@
                 if not vers:
                         try:
                                 vers = self.__get_version(repo, ccancel=ccancel)
-                        except ValueError:
+                        except tx.InvalidContentException:
                                 raise tx.PkgProtoError(repo.get_url(),
                                     "versions", 0,
-                                    "VaueError while parsing response")
+                                    "InvalidContentException while parsing "
+                                    "response")
 
                 for key, val in vers.items():
                         # Don't turn this line into a list of versions.
@@ -1905,7 +1946,9 @@
                 object.  This is used to lookup a transport.Repo object.
 
                 The 'count' argument determines how many times the routine
-                will iterate through a list of endpoints.
+                will iterate through a list of endpoints.  The number of times
+                we've iterated when calling this method is included in the
+                tuple that is yielded.
 
                 'prefer_remote' is an optional boolean value indicating whether
                 network-based sources are preferred over local sources.  If
@@ -1938,9 +1981,11 @@
                 different Repository object, it should pass one in
                 'alt_repo.'
 
-                This function returns a Repo object by default.  If
-                versions and operation are specified, it returns a tuple
-                of (Repo, highest supported version)."""
+                This function returns a tuple containing a Repo object and
+                the number of times we've iterated through the endpoints.  If
+                versions and operation are specified, it returns a tuple of
+                (Repo, iteration, highest supported version).
+                """
 
                 if not self.__engine:
                         self.__setup()
@@ -1985,7 +2030,9 @@
                         versions = sorted(versions, reverse=True)
 
                 fail = None
+                iteration = 0
                 for i in xrange(count):
+                        iteration += 1
                         rslist = self.stats.get_repostats(repolist, origins)
                         if prefer_remote:
                                 rslist.sort(cmp=remote_first)
@@ -2023,11 +2070,11 @@
                                             versions)
                                         if verid >= 0:
                                                 repo_found = True
-                                                yield repo, verid
+                                                yield repo, iteration, verid
                                 else:
                                         repo_found = True
                                         yield self.__repo_cache.new_repo(rs,
-                                            ruri)
+                                            ruri), iteration
 
                         if not repo_found and fail:
                                 raise fail
@@ -2349,8 +2396,9 @@
                 if progtrack and ccancel:
                         progtrack.check_cancelation = ccancel
 
-                for d, v in self.__gen_repo(pub, retry_count, origin_only=True,
-                    single_repository=True, operation="add", versions=[0]):
+                for d, retries, v in self.__gen_repo(pub, retry_count,
+                    origin_only=True, single_repository=True, operation="add",
+                    versions=[0]):
                         try:
                                 d.publish_add(action, header=header,
                                     progtrack=progtrack, trans_id=trans_id)
@@ -2382,8 +2430,9 @@
                 if not self.__engine:
                         self.__setup()
 
-                for d, v in self.__gen_repo(pub, retry_count, origin_only=True,
-                    single_repository=True, operation="file", versions=[1]):
+                for d, retries, v in self.__gen_repo(pub, retry_count,
+                    origin_only=True, single_repository=True, operation="file",
+                    versions=[1]):
                         try:
                                 d.publish_add_file(pth, header=header,
                                     trans_id=trans_id)
@@ -2411,8 +2460,9 @@
                 retry_count = global_settings.PKG_CLIENT_MAX_TIMEOUT
                 header = self.__build_header(uuid=self.__get_uuid(pub))
 
-                for d, v in self.__gen_repo(pub, retry_count, origin_only=True,
-                    single_repository=True, operation="abandon", versions=[0]):
+                for d, retries, v in self.__gen_repo(pub, retry_count,
+                    origin_only=True, single_repository=True,
+                    operation="abandon", versions=[0]):
                         try:
                                 state, fmri = d.publish_abandon(header=header,
                                     trans_id=trans_id)
@@ -2444,8 +2494,9 @@
                 retry_count = global_settings.PKG_CLIENT_MAX_TIMEOUT
                 header = self.__build_header(uuid=self.__get_uuid(pub))
 
-                for d, v in self.__gen_repo(pub, retry_count, origin_only=True,
-                    single_repository=True, operation="close", versions=[0]):
+                for d, retries, v in self.__gen_repo(pub, retry_count,
+                    origin_only=True, single_repository=True, operation="close",
+                    versions=[0]):
                         try:
                                 state, fmri = d.publish_close(header=header,
                                     trans_id=trans_id,
@@ -2475,8 +2526,9 @@
                 retry_count = global_settings.PKG_CLIENT_MAX_TIMEOUT
                 header = self.__build_header(uuid=self.__get_uuid(pub))
 
-                for d, v in self.__gen_repo(pub, retry_count, origin_only=True,
-                    single_repository=True, operation="open", versions=[0]):
+                for d, retries, v in self.__gen_repo(pub, retry_count,
+                    origin_only=True, single_repository=True, operation="open",
+                    versions=[0]):
                         try:
                                 trans_id = d.publish_open(header=header,
                                     client_release=client_release,
@@ -2503,8 +2555,9 @@
                 retry_count = global_settings.PKG_CLIENT_MAX_TIMEOUT
                 header = self.__build_header(uuid=self.__get_uuid(pub))
 
-                for d, v in self.__gen_repo(pub, retry_count, origin_only=True,
-                    single_repository=True, operation="admin", versions=[0]):
+                for d, retries, v in self.__gen_repo(pub, retry_count,
+                    origin_only=True, single_repository=True, operation="admin",
+                    versions=[0]):
                         try:
                                 d.publish_rebuild(header=header, pub=pub)
                                 return
@@ -2536,8 +2589,9 @@
                 if not self.__engine:
                         self.__setup()
 
-                for d, v in self.__gen_repo(pub, retry_count, origin_only=True,
-                    single_repository=True, operation="append", versions=[0]):
+                for d, retries, v in self.__gen_repo(pub, retry_count,
+                    origin_only=True, single_repository=True,
+                    operation="append", versions=[0]):
                         try:
                                 trans_id = d.publish_append(header=header,
                                     client_release=client_release,
@@ -2565,8 +2619,9 @@
                 retry_count = global_settings.PKG_CLIENT_MAX_TIMEOUT
                 header = self.__build_header(uuid=self.__get_uuid(pub))
 
-                for d, v in self.__gen_repo(pub, retry_count, origin_only=True,
-                    single_repository=True, operation="admin", versions=[0]):
+                for d, retries, v in self.__gen_repo(pub, retry_count,
+                    origin_only=True, single_repository=True, operation="admin",
+                    versions=[0]):
                         try:
                                 d.publish_rebuild_indexes(header=header,
                                     pub=pub)
@@ -2593,8 +2648,9 @@
                 retry_count = global_settings.PKG_CLIENT_MAX_TIMEOUT
                 header = self.__build_header(uuid=self.__get_uuid(pub))
 
-                for d, v in self.__gen_repo(pub, retry_count, origin_only=True,
-                    single_repository=True, operation="admin", versions=[0]):
+                for d, retries, v in self.__gen_repo(pub, retry_count,
+                    origin_only=True, single_repository=True, operation="admin",
+                    versions=[0]):
                         try:
                                 d.publish_rebuild_packages(header=header,
                                     pub=pub)
@@ -2621,8 +2677,9 @@
                 retry_count = global_settings.PKG_CLIENT_MAX_TIMEOUT
                 header = self.__build_header(uuid=self.__get_uuid(pub))
 
-                for d, v in self.__gen_repo(pub, retry_count, origin_only=True,
-                    single_repository=True, operation="admin", versions=[0]):
+                for d, retries, v in self.__gen_repo(pub, retry_count,
+                    origin_only=True, single_repository=True, operation="admin",
+                    versions=[0]):
                         try:
                                 d.publish_refresh(header=header, pub=pub)
                                 return
@@ -2652,8 +2709,8 @@
                 # purposefully avoided as the underlying repo function
                 # will automatically determine what operation to use
                 # for the single origin returned by __gen_repo.
-                for d in self.__gen_repo(pub, retry_count, origin_only=True,
-                    single_repository=True):
+                for d, retries in self.__gen_repo(pub, retry_count,
+                    origin_only=True, single_repository=True):
                         try:
                                 d.publish_refresh_indexes(header=header,
                                     pub=pub)
@@ -2680,8 +2737,9 @@
                 retry_count = global_settings.PKG_CLIENT_MAX_TIMEOUT
                 header = self.__build_header(uuid=self.__get_uuid(pub))
 
-                for d, v in self.__gen_repo(pub, retry_count, origin_only=True,
-                    single_repository=True, operation="admin", versions=[0]):
+                for d, retries, v in self.__gen_repo(pub, retry_count,
+                    origin_only=True, single_repository=True, operation="admin",
+                    versions=[0]):
                         try:
                                 d.publish_refresh_packages(header=header,
                                     pub=pub)
--- a/src/tests/cli/t_pkg_install.py	Thu Mar 01 18:15:20 2012 +0000
+++ b/src/tests/cli/t_pkg_install.py	Fri Mar 09 12:23:53 2012 +1300
@@ -38,6 +38,7 @@
 import stat
 import time
 import unittest
+import urllib2
 
 import pkg.actions
 import pkg.fmri as fmri
@@ -743,6 +744,203 @@
                 afobj.close()
                 self.pkg("install a16189", exit=1)
 
+        def test_corrupt_web_cache(self):
+                """Make sure the client can detect corrupt content being served
+                to it from a corrupt web cache, modifying its requests to
+                retrieve correct content."""
+
+                # Depot required for this test since we want to corrupt
+                # a downstream cache serving this content
+                self.dc.start()
+                fmris = self.pkgsend_bulk(self.durl, self.foo11)
+                # we need to record just the version string of foo in order
+                # to properly quote it later.
+                foo_version = fmris[0].split("@")[1]
+                self.image_create(self.durl)
+
+                # we use the system repository as a convenient way to setup
+                # a caching proxy
+                self.sysrepo("")
+                sc_runtime_dir = os.path.join(self.test_root, "sysrepo_runtime")
+                sc_conf = os.path.join(sc_runtime_dir, "sysrepo_httpd.conf")
+                sc_cache = os.path.join(self.test_root, "sysrepo_cache")
+
+                # ensure pkg5srv can write cache content
+                os.chmod(sc_cache, 0777)
+
+                sysrepo_port = self.next_free_port
+                self.next_free_port += 1
+                sc = pkg5unittest.SysrepoController(sc_conf,
+                    sysrepo_port, sc_runtime_dir, testcase=self)
+                sc.start()
+
+                sysrepo_url = "http://localhost:%s" % sysrepo_port
+
+                saved_pkg_sysrepo_env = os.environ.get("PKG_SYSREPO_URL")
+                os.environ["PKG_SYSREPO_URL"] = sysrepo_url
+
+                # create an image, installing a package, to warm up the webcache
+                self.image_create(props={"use-system-repo": True})
+                self.pkg("install [email protected]")
+                self.pkg("uninstall foo")
+
+                # now recreate the image.  image_create calls image_destroy,
+                # thereby cleaning any cached content in the image.
+                self.image_create(props={"use-system-repo": True})
+
+                def corrupt_path(path, value="noodles\n", rename=False):
+                        """Given a path, corrupt its contents."""
+                        self.assert_(os.path.exists(path))
+                        if rename:                                
+                                os.rename(path, path + ".not-corrupt")
+                                open(path, "wb").write(value)
+                        else:
+                                df = open(path, "wb")
+                                df.write(value)
+                                df.close()
+
+                def corrupt_cache(cache_dir):
+                        """Given an apache cache, corrupt it's contents."""
+
+                        for dirpath, dirname, filenames in os.walk(cache_dir):
+                                for name in filenames:
+                                        if name.endswith(".header"):
+                                                data = name.replace(".header",
+                                                    ".data")
+                                                corrupt_path(os.path.join(
+                                                    dirpath, data))
+                # corrupt our web cache
+                corrupt_cache(sc_cache)
+
+                urls = [
+                    # we need to quote the version carefully to use exactly the
+                    # format pkg(1) uses - two logically identical urls that
+                    # differ only by the way they're quoted are treated by
+                    # Apache as separate cacheable resources.
+                    "%s/test/manifest/0/foo@%s" % (self.durl, urllib2.quote(
+                    foo_version)),
+                    "%s/test/file/1/8535c15c49cbe1e7cb1a0bf8ff87e512abed66f8" %
+                    self.durl,
+                    "%s/test/catalog/1/catalog.attrs" % self.durl,
+                    "%s/test/catalog/1/catalog.base.C" % self.durl
+                ]
+
+                proxy_handler = urllib2.ProxyHandler({"http": sysrepo_url})
+                proxy_opener = urllib2.build_opener(proxy_handler)
+
+                # validate that our cache is returning corrupt urls.
+                for url in urls:
+                        # we should get clean content when we don't use the
+                        # cache
+                        u = urllib2.urlopen(url)
+                        content = u.readlines()
+                        self.assert_(content != ["noodles\n"],
+                            "Unexpected content from depot")
+
+                        # get the corrupted version, and verify it is broken
+                        req = urllib2.Request(url)
+                        u = proxy_opener.open(req)
+                        content = u.readlines()
+
+                        self.assert_(content == ["noodles\n"],
+                            "Expected noodles, got %s for %s" % (content, url))
+
+                # the following should work, as pkg should retry requests
+                # where it has detected corrupt contents with a
+                # "Cache-Control: no-cache" header.
+                self.pkg("refresh --full")
+                self.pkg("contents -rm [email protected]")
+                self.pkg("install [email protected]")
+
+                # since the cache has been refreshed, we should see valid
+                # contents when going through the proxy now.
+                for url in urls:
+                        req = urllib2.Request(url)
+                        u = proxy_opener.open(req)
+                        content = u.readlines()
+                        self.assert_(content != ["noodles\n"],
+                            "Unexpected content from depot")
+
+                # ensure that when we actually corrupt the repository
+                # as well as the cache, we do detect the errors properly.
+                corrupt_cache(sc_cache)
+                repodir = self.dc.get_repodir()
+
+                prefix = "publisher/test"
+                self.image_create(props={"use-system-repo": True})
+
+                # When we corrupt the files in the repository, we intentionally
+                # corrupt them with different contents than the the cache,
+                # allowing us to check the error messages being printed by the
+                # transport subsystem.
+
+                filepath = os.path.join(repodir,
+                    "%s/file/85/8535c15c49cbe1e7cb1a0bf8ff87e512abed66f8" %
+                    prefix)
+                mfpath = os.path.join(repodir, "%s/pkg/foo/%s" % (prefix,
+                    urllib2.quote(foo_version)))
+                catpath = os.path.join(repodir, "%s/catalog/catalog.base.C" %
+                    prefix)
+
+                try:
+                        # first corrupt the file
+                        corrupt_path(filepath, value="spaghetti\n", rename=True)
+                        self.pkg("install [email protected]", stderr=True, exit=1)
+                        os.rename(filepath + ".not-corrupt", filepath)
+
+                        # we should be getting two hash errors, one from the
+                        # cache, one from the repo. The one from the repo should
+                        # repeat
+                        self.assert_(
+                            "1: Invalid contentpath lib/libc.so.1: chash" in
+                            self.errout)
+                        self.assert_(
+                            "2: Invalid contentpath lib/libc.so.1: chash" in
+                            self.errout)
+                        self.assert_("(happened 3 times)" in self.errout)
+
+                        # now corrupt the manifest (we have to re-corrupt the
+                        # cache, since attempting to install foo above would
+                        # have caused the cache to refetch the valid manifest
+                        # from the repo) and remove the version of the manifest
+                        # cached in the image.
+                        corrupt_cache(sc_cache)
+                        corrupt_path(mfpath, value="spaghetti\n", rename=True)
+                        shutil.rmtree(os.path.join(self.img_path(),
+                            "var/pkg/publisher/test/pkg"))
+                        self.pkg("contents -rm [email protected]", stderr=True, exit=1)
+                        os.rename(mfpath + ".not-corrupt", mfpath)
+                
+                        # we should get two hash errors, one from the cache, one
+                        # from the repo - the one from the repo should repeat.
+                        self.assert_(
+                            "1: Invalid content: manifest hash failure" in
+                            self.errout)
+                        self.assert_("2: Invalid content: manifest hash failure"
+                            in self.errout)
+                        self.assert_("(happened 3 times)" in self.errout)
+
+                        # finally, corrupt the catalog. Given we've asked for a
+                        # full refresh, we retrieve the upstream version only.
+                        corrupt_path(catpath, value="spaghetti\n", rename=True)
+                        self.pkg("refresh --full", stderr=True, exit=1)
+                        self.assert_("catalog.base.C' is invalid." in
+                            self.errout)
+                        os.rename(catpath + ".not-corrupt", catpath)
+
+                finally:
+                        # make sure we clean up any corrupt repo contents.
+                        for path in [filepath, mfpath, catpath]:
+                                not_corrupt = path + ".not-corrupt"
+                                if os.path.exists(not_corrupt):
+                                        os.rename(not_corrupt, path)
+
+                        sc.stop()
+                        if saved_pkg_sysrepo_env:
+                                os.environ["PKG_SYSREPO_URL"] = \
+                                    saved_pkg_sysrepo_env
+
+
 class TestPkgInstallUpdateReject(pkg5unittest.SingleDepotTestCase):
         """Test --reject option to pkg update/install"""
         persistent_setup = True
--- a/src/tests/cli/t_pkg_refresh.py	Thu Mar 01 18:15:20 2012 +0000
+++ b/src/tests/cli/t_pkg_refresh.py	Fri Mar 09 12:23:53 2012 +1300
@@ -596,6 +596,8 @@
                 expected = [
                     { "CACHE-CONTROL": "no-cache" },
                     { "CACHE-CONTROL": "no-cache" },
+                    { "PRAGMA": "no-cache" },
+                    { "PRAGMA": "no-cache" }
                 ]
                 self.assertEqualDiff(entries, expected)