--- a/src/modules/client/transport/transport.py Thu Mar 01 18:15:20 2012 +0000
+++ b/src/modules/client/transport/transport.py Fri Mar 09 12:23:53 2012 +1300
@@ -587,9 +587,9 @@
# of origins for a publisher without incurring the significant
# overhead of performing file-based search unless the network-
# based resource is unavailable.
- for d, v in self.__gen_repo(pub, retry_count, origin_only=True,
- prefer_remote=True, alt_repo=alt_repo, operation="search",
- versions=[0, 1]):
+ for d, retries, v in self.__gen_repo(pub, retry_count,
+ origin_only=True, prefer_remote=True, alt_repo=alt_repo,
+ operation="search", versions=[0, 1]):
try:
fobj = d.do_search(data, header,
@@ -686,8 +686,8 @@
# prior to this operation.
self._captive_portal_test(ccancel=ccancel, alt_repo=alt_repo)
- for d in self.__gen_repo(pub, retry_count, origin_only=True,
- alt_repo=alt_repo):
+ for d, retries in self.__gen_repo(pub, retry_count,
+ origin_only=True, alt_repo=alt_repo):
repostats = self.stats[d.get_url()]
@@ -844,13 +844,15 @@
# os.statvfs is not available on Windows
pass
- for d, v in self.__gen_repo(pub, retry_count, origin_only=True,
- operation="catalog", versions=[1], ccancel=ccancel,
- alt_repo=alt_repo):
+ for d, retries, v in self.__gen_repo(pub, retry_count,
+ origin_only=True, operation="catalog", versions=[1],
+ ccancel=ccancel, alt_repo=alt_repo):
failedreqs = []
repostats = self.stats[d.get_url()]
gave_up = False
+ if repostats.content_errors and retries > 1:
+ header = d.build_refetch_header(header)
# This returns a list of transient errors
# that occurred during the transport operation.
@@ -912,7 +914,7 @@
try:
self._verify_catalog(s, download_dir)
except tx.InvalidContentException, e:
- repostats.record_error(content=True)
+ repostats.record_error(content=True)
failedreqs.append(e.request)
failures.append(e)
if not flist:
@@ -955,8 +957,9 @@
if isinstance(pub, publisher.Publisher):
header = self.__build_header(uuid=self.__get_uuid(pub))
- for d, v in self.__gen_repo(pub, retry_count, origin_only=True,
- operation="publisher", versions=[0], ccancel=ccancel):
+ for d, retries, v in self.__gen_repo(pub, retry_count,
+ origin_only=True, operation="publisher", versions=[0],
+ ccancel=ccancel):
try:
resp = d.get_publisherinfo(header,
ccancel=ccancel)
@@ -1003,7 +1006,7 @@
assert isinstance(self.cfg, ImageTransportCfg)
assert isinstance(repo_uri, publisher.RepositoryURI)
- for d, v in self.__gen_repo(repo_uri, retry_count,
+ for d, retries, v in self.__gen_repo(repo_uri, retry_count,
origin_only=True, operation="syspub", versions=[0],
ccancel=ccancel):
try:
@@ -1061,11 +1064,13 @@
elif fmri:
alt_repo = self.cfg.get_pkg_alt_repo(fmri)
- for d, v in self.__gen_repo(pub, retry_count, operation="file",
- versions=[0, 1], alt_repo=alt_repo):
+ for d, retries, v in self.__gen_repo(pub, retry_count,
+ operation="file", versions=[0, 1], alt_repo=alt_repo):
url = d.get_url()
-
+ repostats = self.stats[url]
+ if repostats.content_errors and retries > 1:
+ header = d.build_refetch_header(header)
try:
resp = d.get_datastream(fhash, v, header,
ccancel=ccancel, pub=pub)
@@ -1077,7 +1082,6 @@
reason="hash failure: expected: %s"
"computed: %s" % (fhash, hash_val),
url=url)
- repostats = self.stats[url]
repostats.record_error(content=True)
raise exc
@@ -1096,7 +1100,6 @@
exc = tx.TransferContentException(url,
"zlib.error:%s" %
(" ".join([str(a) for a in e.args])))
- repostats = self.stats[url]
repostats.record_error(content=True)
if exc.retryable:
failures.append(exc)
@@ -1122,9 +1125,14 @@
if isinstance(pub, publisher.Publisher):
header = self.__build_header(uuid=self.__get_uuid(pub))
- for d, v in self.__gen_repo(pub, retry_count, origin_only=True,
- operation="status", versions=[0], ccancel=ccancel):
+ for d, retries, v in self.__gen_repo(pub, retry_count,
+ origin_only=True, operation="status", versions=[0],
+ ccancel=ccancel):
try:
+ url = d.get_url()
+ repostats = self.stats[url]
+ if repostats.content_errors and retries > 1:
+ header = d.build_refetch_header(header)
resp = d.get_status(header, ccancel=ccancel)
infostr = resp.read()
@@ -1137,10 +1145,9 @@
failures.extend(e.failures)
except (TypeError, ValueError), e:
- url = d.get_url()
+
exc = tx.TransferContentException(url,
"Invalid stats response: %s" % e)
- repostats = self.stats[url]
repostats.record_error(content=True)
if exc.retryable:
failures.append(exc)
@@ -1174,8 +1181,8 @@
if not alt_repo:
alt_repo = self.cfg.get_pkg_alt_repo(fmri)
- for d in self.__gen_repo(pub, retry_count, origin_only=True,
- alt_repo=alt_repo):
+ for d, retries in self.__gen_repo(pub, retry_count,
+ origin_only=True, alt_repo=alt_repo):
# If a transport exception occurs,
# save it if it's retryable, otherwise
@@ -1234,11 +1241,13 @@
if not alt_repo:
alt_repo = self.cfg.get_pkg_alt_repo(fmri)
- for d in self.__gen_repo(pub, retry_count, origin_only=True,
- alt_repo=alt_repo):
+ for d, retries in self.__gen_repo(pub, retry_count,
+ origin_only=True, alt_repo=alt_repo):
repostats = self.stats[d.get_url()]
verified = False
+ if repostats.content_errors and retries > 1:
+ header = d.build_refetch_header(header)
try:
resp = d.get_manifest(fmri, header,
ccancel=ccancel, pub=pub)
@@ -1264,6 +1273,14 @@
failures.extend(ex.failures)
mcontent = None
+ except tx.InvalidContentException, e:
+ # We might be able to retrive uncorrupted
+ # content. If this was the last retry, then
+ # we're out of luck.
+ failures.append(e)
+ mcontent = None
+ repostats.record_error(content=True)
+
except tx.TransportException, e:
if e.retryable:
failures.append(e)
@@ -1397,13 +1414,20 @@
# download_dir is temporary download path.
download_dir = self.cfg.incoming_root
- for d in self.__gen_repo(pub, retry_count, origin_only=True,
- alt_repo=mxfr.get_alt_repo()):
+ for d, retries in self.__gen_repo(pub, retry_count,
+ origin_only=True, alt_repo=mxfr.get_alt_repo()):
failedreqs = []
repostats = self.stats[d.get_url()]
gave_up = False
+ # Possibly overkill, if any content errors were seen
+ # we modify the headers of all requests, not just the
+ # ones that failed before.
+ if repostats.content_errors and retries > 1:
+ mfstlist = [(fmri, d.build_refetch_header(h))
+ for fmri, h in mfstlist]
+
# This returns a list of transient errors
# that occurred during the transport operation.
# An exception handler here isn't necessary
@@ -1641,11 +1665,15 @@
else:
cache = None
- for d, v in self.__gen_repo(pub, retry_count, operation="file",
- versions=[0, 1], alt_repo=mfile.get_alt_repo()):
+ for d, retries, v in self.__gen_repo(pub, retry_count,
+ operation="file", versions=[0, 1],
+ alt_repo=mfile.get_alt_repo()):
failedreqs = []
repostats = self.stats[d.get_url()]
+ if repostats.content_errors and retries > 1:
+ header = d.build_refetch_header(header)
+
gave_up = False
# This returns a list of transient errors
@@ -1705,7 +1733,7 @@
dl_path)
except tx.InvalidContentException, e:
mfile.subtract_progress(e.size)
- e.request = s
+ e.request = s
repostats.record_error(content=True)
failedreqs.append(s)
failures.append(e)
@@ -1813,8 +1841,13 @@
# prior to this operation.
self._captive_portal_test(ccancel=ccancel, alt_repo=alt_repo)
- for d in self.__gen_repo(pub, retry_count, origin_only=True,
- alt_repo=alt_repo):
+ for d, retries in self.__gen_repo(pub, retry_count,
+ origin_only=True, alt_repo=alt_repo):
+
+ repostats = self.stats[d.get_url()]
+ if repostats.content_errors and retries > 1:
+ header = d.build_refetch_header(header)
+
# If a transport exception occurs,
# save it if it's retryable, otherwise
# raise the error to a higher-level handler.
@@ -1831,16 +1864,20 @@
for f in ex.failures:
f.url = d.get_url()
failures.append(f)
+
+ except tx.InvalidContentException, e:
+ repostats.record_error(content=True)
+ failures.append(
+ apx.InvalidDepotResponseException(
+ d.get_url(), "Unable to parse "
+ "repository's versions/0 response"))
+
except tx.TransportException, e:
e.url = d.get_url()
if e.retryable:
failures.append(e)
else:
raise
- except ValueError:
- raise apx.InvalidDepotResponseException(
- d.get_url(), "Unable to parse repository "
- "response")
raise failures
@staticmethod
@@ -1851,10 +1888,13 @@
resp = repo.get_versions(header, ccancel=ccancel)
verlines = resp.readlines()
- return dict(
- s.split(None, 1)
- for s in (l.strip() for l in verlines)
- )
+ try:
+ return dict(
+ s.split(None, 1)
+ for s in (l.strip() for l in verlines)
+ )
+ except ValueError, e:
+ raise tx.InvalidContentException(e)
def __fill_repo_vers(self, repo, vers=None, ccancel=None):
"""Download versions information for the transport
@@ -1867,10 +1907,11 @@
if not vers:
try:
vers = self.__get_version(repo, ccancel=ccancel)
- except ValueError:
+ except tx.InvalidContentException:
raise tx.PkgProtoError(repo.get_url(),
"versions", 0,
- "VaueError while parsing response")
+ "InvalidContentException while parsing "
+ "response")
for key, val in vers.items():
# Don't turn this line into a list of versions.
@@ -1905,7 +1946,9 @@
object. This is used to lookup a transport.Repo object.
The 'count' argument determines how many times the routine
- will iterate through a list of endpoints.
+ will iterate through a list of endpoints. The number of times
+ we've iterated when calling this method is included in the
+ tuple that is yielded.
'prefer_remote' is an optional boolean value indicating whether
network-based sources are preferred over local sources. If
@@ -1938,9 +1981,11 @@
different Repository object, it should pass one in
'alt_repo.'
- This function returns a Repo object by default. If
- versions and operation are specified, it returns a tuple
- of (Repo, highest supported version)."""
+ This function returns a tuple containing a Repo object and
+ the number of times we've iterated through the endpoints. If
+ versions and operation are specified, it returns a tuple of
+ (Repo, iteration, highest supported version).
+ """
if not self.__engine:
self.__setup()
@@ -1985,7 +2030,9 @@
versions = sorted(versions, reverse=True)
fail = None
+ iteration = 0
for i in xrange(count):
+ iteration += 1
rslist = self.stats.get_repostats(repolist, origins)
if prefer_remote:
rslist.sort(cmp=remote_first)
@@ -2023,11 +2070,11 @@
versions)
if verid >= 0:
repo_found = True
- yield repo, verid
+ yield repo, iteration, verid
else:
repo_found = True
yield self.__repo_cache.new_repo(rs,
- ruri)
+ ruri), iteration
if not repo_found and fail:
raise fail
@@ -2349,8 +2396,9 @@
if progtrack and ccancel:
progtrack.check_cancelation = ccancel
- for d, v in self.__gen_repo(pub, retry_count, origin_only=True,
- single_repository=True, operation="add", versions=[0]):
+ for d, retries, v in self.__gen_repo(pub, retry_count,
+ origin_only=True, single_repository=True, operation="add",
+ versions=[0]):
try:
d.publish_add(action, header=header,
progtrack=progtrack, trans_id=trans_id)
@@ -2382,8 +2430,9 @@
if not self.__engine:
self.__setup()
- for d, v in self.__gen_repo(pub, retry_count, origin_only=True,
- single_repository=True, operation="file", versions=[1]):
+ for d, retries, v in self.__gen_repo(pub, retry_count,
+ origin_only=True, single_repository=True, operation="file",
+ versions=[1]):
try:
d.publish_add_file(pth, header=header,
trans_id=trans_id)
@@ -2411,8 +2460,9 @@
retry_count = global_settings.PKG_CLIENT_MAX_TIMEOUT
header = self.__build_header(uuid=self.__get_uuid(pub))
- for d, v in self.__gen_repo(pub, retry_count, origin_only=True,
- single_repository=True, operation="abandon", versions=[0]):
+ for d, retries, v in self.__gen_repo(pub, retry_count,
+ origin_only=True, single_repository=True,
+ operation="abandon", versions=[0]):
try:
state, fmri = d.publish_abandon(header=header,
trans_id=trans_id)
@@ -2444,8 +2494,9 @@
retry_count = global_settings.PKG_CLIENT_MAX_TIMEOUT
header = self.__build_header(uuid=self.__get_uuid(pub))
- for d, v in self.__gen_repo(pub, retry_count, origin_only=True,
- single_repository=True, operation="close", versions=[0]):
+ for d, retries, v in self.__gen_repo(pub, retry_count,
+ origin_only=True, single_repository=True, operation="close",
+ versions=[0]):
try:
state, fmri = d.publish_close(header=header,
trans_id=trans_id,
@@ -2475,8 +2526,9 @@
retry_count = global_settings.PKG_CLIENT_MAX_TIMEOUT
header = self.__build_header(uuid=self.__get_uuid(pub))
- for d, v in self.__gen_repo(pub, retry_count, origin_only=True,
- single_repository=True, operation="open", versions=[0]):
+ for d, retries, v in self.__gen_repo(pub, retry_count,
+ origin_only=True, single_repository=True, operation="open",
+ versions=[0]):
try:
trans_id = d.publish_open(header=header,
client_release=client_release,
@@ -2503,8 +2555,9 @@
retry_count = global_settings.PKG_CLIENT_MAX_TIMEOUT
header = self.__build_header(uuid=self.__get_uuid(pub))
- for d, v in self.__gen_repo(pub, retry_count, origin_only=True,
- single_repository=True, operation="admin", versions=[0]):
+ for d, retries, v in self.__gen_repo(pub, retry_count,
+ origin_only=True, single_repository=True, operation="admin",
+ versions=[0]):
try:
d.publish_rebuild(header=header, pub=pub)
return
@@ -2536,8 +2589,9 @@
if not self.__engine:
self.__setup()
- for d, v in self.__gen_repo(pub, retry_count, origin_only=True,
- single_repository=True, operation="append", versions=[0]):
+ for d, retries, v in self.__gen_repo(pub, retry_count,
+ origin_only=True, single_repository=True,
+ operation="append", versions=[0]):
try:
trans_id = d.publish_append(header=header,
client_release=client_release,
@@ -2565,8 +2619,9 @@
retry_count = global_settings.PKG_CLIENT_MAX_TIMEOUT
header = self.__build_header(uuid=self.__get_uuid(pub))
- for d, v in self.__gen_repo(pub, retry_count, origin_only=True,
- single_repository=True, operation="admin", versions=[0]):
+ for d, retries, v in self.__gen_repo(pub, retry_count,
+ origin_only=True, single_repository=True, operation="admin",
+ versions=[0]):
try:
d.publish_rebuild_indexes(header=header,
pub=pub)
@@ -2593,8 +2648,9 @@
retry_count = global_settings.PKG_CLIENT_MAX_TIMEOUT
header = self.__build_header(uuid=self.__get_uuid(pub))
- for d, v in self.__gen_repo(pub, retry_count, origin_only=True,
- single_repository=True, operation="admin", versions=[0]):
+ for d, retries, v in self.__gen_repo(pub, retry_count,
+ origin_only=True, single_repository=True, operation="admin",
+ versions=[0]):
try:
d.publish_rebuild_packages(header=header,
pub=pub)
@@ -2621,8 +2677,9 @@
retry_count = global_settings.PKG_CLIENT_MAX_TIMEOUT
header = self.__build_header(uuid=self.__get_uuid(pub))
- for d, v in self.__gen_repo(pub, retry_count, origin_only=True,
- single_repository=True, operation="admin", versions=[0]):
+ for d, retries, v in self.__gen_repo(pub, retry_count,
+ origin_only=True, single_repository=True, operation="admin",
+ versions=[0]):
try:
d.publish_refresh(header=header, pub=pub)
return
@@ -2652,8 +2709,8 @@
# purposefully avoided as the underlying repo function
# will automatically determine what operation to use
# for the single origin returned by __gen_repo.
- for d in self.__gen_repo(pub, retry_count, origin_only=True,
- single_repository=True):
+ for d, retries in self.__gen_repo(pub, retry_count,
+ origin_only=True, single_repository=True):
try:
d.publish_refresh_indexes(header=header,
pub=pub)
@@ -2680,8 +2737,9 @@
retry_count = global_settings.PKG_CLIENT_MAX_TIMEOUT
header = self.__build_header(uuid=self.__get_uuid(pub))
- for d, v in self.__gen_repo(pub, retry_count, origin_only=True,
- single_repository=True, operation="admin", versions=[0]):
+ for d, retries, v in self.__gen_repo(pub, retry_count,
+ origin_only=True, single_repository=True, operation="admin",
+ versions=[0]):
try:
d.publish_refresh_packages(header=header,
pub=pub)