Coverage for appr/models/package_base.py : 31%

Hot-keys on this page
r m x p toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
PackageReleaseNotFound, raise_package_not_found)
if mediatype: match = re.match(r"application/vnd\.appr\.package-manifest\.(.+?)\.(.+).json", mediatype) if match: mediatype = match.group(1) return mediatype
return "application/vnd.appr.package.%s.%s.tar+gzip" % (media_type, SCHEMA_VERSION)
return "application/vnd.appr.package-manifest.%s.%s.json" % (get_media_type(media_type), SCHEMA_VERSION)
return hashlib.sha256(json.dumps(manifest, sort_keys=True)).hexdigest()
self.package = package_name self.media_type = get_media_type(media_type) self.namespace, self.name = package_name.split("/") self.release = release self._data = None self.created_at = None self.packager = None self._blob = None self._blob_size = 0 self._digest = None self._blob = None self.metadata = metadata self.blob = blob
def blob(self): return self._blob
def blob(self, value): if value is not None: if not isinstance(value, BlobBase): raise ValueError("blob must be a BlobBase instance") self._blob = value
""" Returns all available channels for a package """ channels = channel_class.all(self.package) result = [] # yapf:disable for channel in channels: if ((iscurrent and channel.current == self.release) or (not iscurrent and self.release in channel.releases())): result.append(channel.name) # yapf:enable return result
def digest(self): if not self._digest and self.blob: self._digest = self.blob.digest return self._digest
def blob_size(self): if not self._blob_size and self.blob: self._blob_size = self.blob.size return self._blob_size
def content_media_type(self): return content_media_type(self.media_type)
def manifest_media_type(self): return manifest_media_type(self.media_type)
return { "mediaType": self.content_media_type, "size": self.blob_size, "digest": self.digest, "urls": []}
res = [] for mtype in cls.manifests(package_name, release): if media_type is not None and media_type != mtype: continue package = cls.get(package_name, release, mtype) if manifest_only: res.append(package.manifest()) else: res.append(package.data) return res
manifest = {"mediaType": self.manifest_media_type, "content": self.content_descriptor()} return manifest
return [ item for release in cls.all_releases(package, media_type=media_type) for item in cls.view_manifests(package, release, False, media_type=media_type)]
def data(self): if self._data is None: self._data = {'created_at': datetime.datetime.utcnow().isoformat()} d = { "package": self.package, "release": self.release, "metadata": self.metadata, "mediaType": self.manifest_media_type, "content": self.content_descriptor()} self._data.update(d) return self._data
def data(self, data): self._data = data self.created_at = data['created_at'] self.metadata = data.get('metadata', None) self.release = data['release'] self._digest = data['content']['digest'] self._blob_size = data['content']['size'] self.media_type = get_media_type(data['mediaType'])
def check_release(cls, release): try: semantic_version.Version(release) except ValueError as e: raise InvalidRelease(str(e), {"version": release}) return None
def _find_media_type(cls, package, release): manifests = cls.manifests(package, release) if len(manifests) != 1: raise InvalidUsage("media-type non specified: [%s]" % ','.join(manifests)) else: return manifests[0]
def get(cls, package, release, media_type): """ package: string following "namespace/package_name" format release: release query. If None return latest release
returns: (package blob(targz) encoded in base64, release) """ p = cls(package, release) p.pull(release, media_type) return p
releases = cls.all_releases(package) if not releases: raise_package_not_found(package, release=release_query) if release_query is None or release_query == 'default': return last_version(releases, stable) else: try: return select_version(releases, str(release_query), stable) except ValueError as e: raise InvalidRelease(e.message, {"release": release_query})
# Find release if release_query is None: release_query = self.release package = self.package release = self.get_release(package, release_query) if release is None: raise PackageReleaseNotFound("No release match '%s' for package '%s'" % (release_query, package), {"package": package, "release_query": release_query}) # Find media_type if media_type == "-": media_type = self._find_media_type(package, str(release)) media_type = get_media_type(media_type) if media_type is None: media_type = self.media_type
self.data = self._fetch(package, str(release), media_type) return self
self.check_release(self.release) if self.isdeleted_release(self.package, self.release) and not force: raise PackageAlreadyExists("Package release %s existed" % self.package, { "package": self.package, "release": self.release}) self.blob.save(self.content_media_type) self._save(force, **kwargs)
return self.all_releases(self.package)
def delete(cls, package, release, media_type): cls._delete(package, release, media_type)
raise NotImplementedError
raise NotImplementedError
def _fetch(cls, package, release, media_type): raise NotImplementedError
def _delete(cls, package, release, media_type): raise NotImplementedError
raise NotImplementedError
def search(cls, query, **kwargs): raise NotImplementedError
def isdeleted_release(cls, package, release): raise NotImplementedError
def reindex(cls): raise NotImplementedError
def dump_all(cls, blob_cls): """ produce a dict with all packages """ raise NotImplementedError
def manifests(cls, package, release): """ Returns an array of string """ raise NotImplementedError |