diff options
| author | Alex Deymo <deymo@google.com> | 2016-06-27 12:14:03 -0700 |
|---|---|---|
| committer | Alex Deymo <deymo@google.com> | 2016-06-27 12:20:26 -0700 |
| commit | ce47ca46a3dd74985ac96e95da22ef7a3b742d25 (patch) | |
| tree | a8ead3ebdbf56da13feeb984991f51cba1f71577 /scripts/update_payload/payload.py | |
| parent | 773f4323a4f6357b7c79debf0f87ffa658fabec4 (diff) | |
| parent | c2538fab9a7fc01c0216520874d711c8a9fbd9d3 (diff) | |
Merge dev-utils update_payload_library and the paycheck.
The update_payload library is a python library for parsing and
handling update payload. This library is used by the payload checker
python script also included here.
The code is merged from this repo:
https://chromium.googlesource.com/chromiumos/platform/dev-util/
Bug: 28797993
TEST='import update_payload'
Diffstat (limited to 'scripts/update_payload/payload.py')
| -rw-r--r-- | scripts/update_payload/payload.py | 341 |
1 files changed, 341 insertions, 0 deletions
diff --git a/scripts/update_payload/payload.py b/scripts/update_payload/payload.py new file mode 100644 index 00000000..f76c0de6 --- /dev/null +++ b/scripts/update_payload/payload.py @@ -0,0 +1,341 @@ +# Copyright (c) 2013 The Chromium OS Authors. All rights reserved. +# Use of this source code is governed by a BSD-style license that can be +# found in the LICENSE file. + +"""Tools for reading, verifying and applying Chrome OS update payloads.""" + +from __future__ import print_function + +import hashlib +import struct + +import applier +import block_tracer +import checker +import common +from error import PayloadError +import update_metadata_pb2 + + +# +# Helper functions. +# +def _ReadInt(file_obj, size, is_unsigned, hasher=None): + """Reads a binary-encoded integer from a file. + + It will do the correct conversion based on the reported size and whether or + not a signed number is expected. Assumes a network (big-endian) byte + ordering. + + Args: + file_obj: a file object + size: the integer size in bytes (2, 4 or 8) + is_unsigned: whether it is signed or not + hasher: an optional hasher to pass the value through + + Returns: + An "unpacked" (Python) integer value. + + Raises: + PayloadError if an read error occurred. + """ + return struct.unpack(common.IntPackingFmtStr(size, is_unsigned), + common.Read(file_obj, size, hasher=hasher))[0] + + +# +# Update payload. +# +class Payload(object): + """Chrome OS update payload processor.""" + + class _PayloadHeader(object): + """Update payload header struct.""" + + # Header constants; sizes are in bytes. + _MAGIC = 'CrAU' + _VERSION_SIZE = 8 + _MANIFEST_LEN_SIZE = 8 + _METADATA_SIGNATURE_LEN_SIZE = 4 + + def __init__(self): + self.version = None + self.manifest_len = None + self.metadata_signature_len = None + self.size = None + + def ReadFromPayload(self, payload_file, hasher=None): + """Reads the payload header from a file. + + Reads the payload header from the |payload_file| and updates the |hasher| + if one is passed. The parsed header is stored in the _PayloadHeader + instance attributes. + + Args: + payload_file: a file object + hasher: an optional hasher to pass the value through + + Returns: + None. + + Raises: + PayloadError if a read error occurred or the header is invalid. + """ + # Verify magic + magic = common.Read(payload_file, len(self._MAGIC), hasher=hasher) + if magic != self._MAGIC: + raise PayloadError('invalid payload magic: %s' % magic) + + self.version = _ReadInt(payload_file, self._VERSION_SIZE, True, + hasher=hasher) + self.manifest_len = _ReadInt(payload_file, self._MANIFEST_LEN_SIZE, True, + hasher=hasher) + self.size = (len(self._MAGIC) + self._VERSION_SIZE + + self._MANIFEST_LEN_SIZE) + self.metadata_signature_len = 0 + + if self.version == common.BRILLO_MAJOR_PAYLOAD_VERSION: + self.size += self._METADATA_SIGNATURE_LEN_SIZE + self.metadata_signature_len = _ReadInt( + payload_file, self._METADATA_SIGNATURE_LEN_SIZE, True, + hasher=hasher) + + + def __init__(self, payload_file): + """Initialize the payload object. + + Args: + payload_file: update payload file object open for reading + """ + self.payload_file = payload_file + self.manifest_hasher = None + self.is_init = False + self.header = None + self.manifest = None + self.data_offset = None + self.metadata_signature = None + self.metadata_size = None + + def _ReadHeader(self): + """Reads and returns the payload header. + + Returns: + A payload header object. + + Raises: + PayloadError if a read error occurred. + """ + header = self._PayloadHeader() + header.ReadFromPayload(self.payload_file, self.manifest_hasher) + return header + + def _ReadManifest(self): + """Reads and returns the payload manifest. + + Returns: + A string containing the payload manifest in binary form. + + Raises: + PayloadError if a read error occurred. + """ + if not self.header: + raise PayloadError('payload header not present') + + return common.Read(self.payload_file, self.header.manifest_len, + hasher=self.manifest_hasher) + + def _ReadMetadataSignature(self): + """Reads and returns the metadata signatures. + + Returns: + A string containing the metadata signatures protobuf in binary form or + an empty string if no metadata signature found in the payload. + + Raises: + PayloadError if a read error occurred. + """ + if not self.header: + raise PayloadError('payload header not present') + + return common.Read( + self.payload_file, self.header.metadata_signature_len, + offset=self.header.size + self.header.manifest_len) + + def ReadDataBlob(self, offset, length): + """Reads and returns a single data blob from the update payload. + + Args: + offset: offset to the beginning of the blob from the end of the manifest + length: the blob's length + + Returns: + A string containing the raw blob data. + + Raises: + PayloadError if a read error occurred. + """ + return common.Read(self.payload_file, length, + offset=self.data_offset + offset) + + def Init(self): + """Initializes the payload object. + + This is a prerequisite for any other public API call. + + Raises: + PayloadError if object already initialized or fails to initialize + correctly. + """ + if self.is_init: + raise PayloadError('payload object already initialized') + + # Initialize hash context. + # pylint: disable=E1101 + self.manifest_hasher = hashlib.sha256() + + # Read the file header. + self.header = self._ReadHeader() + + # Read the manifest. + manifest_raw = self._ReadManifest() + self.manifest = update_metadata_pb2.DeltaArchiveManifest() + self.manifest.ParseFromString(manifest_raw) + + # Read the metadata signature (if any). + metadata_signature_raw = self._ReadMetadataSignature() + if metadata_signature_raw: + self.metadata_signature = update_metadata_pb2.Signatures() + self.metadata_signature.ParseFromString(metadata_signature_raw) + + self.metadata_size = self.header.size + self.header.manifest_len + self.data_offset = self.metadata_size + self.header.metadata_signature_len + + self.is_init = True + + def Describe(self): + """Emits the payload embedded description data to standard output.""" + def _DescribeImageInfo(description, image_info): + def _DisplayIndentedValue(name, value): + print(' {:<14} {}'.format(name+':', value)) + + print('%s:' % description) + _DisplayIndentedValue('Channel', image_info.channel) + _DisplayIndentedValue('Board', image_info.board) + _DisplayIndentedValue('Version', image_info.version) + _DisplayIndentedValue('Key', image_info.key) + + if image_info.build_channel != image_info.channel: + _DisplayIndentedValue('Build channel', image_info.build_channel) + + if image_info.build_version != image_info.version: + _DisplayIndentedValue('Build version', image_info.build_version) + + if self.manifest.HasField('old_image_info'): + # pylint: disable=E1101 + _DescribeImageInfo('Old Image', self.manifest.old_image_info) + + if self.manifest.HasField('new_image_info'): + # pylint: disable=E1101 + _DescribeImageInfo('New Image', self.manifest.new_image_info) + + def _AssertInit(self): + """Raises an exception if the object was not initialized.""" + if not self.is_init: + raise PayloadError('payload object not initialized') + + def ResetFile(self): + """Resets the offset of the payload file to right past the manifest.""" + self.payload_file.seek(self.data_offset) + + def IsDelta(self): + """Returns True iff the payload appears to be a delta.""" + self._AssertInit() + return (self.manifest.HasField('old_kernel_info') or + self.manifest.HasField('old_rootfs_info') or + any(partition.HasField('old_partition_info') + for partition in self.manifest.partitions)) + + def IsFull(self): + """Returns True iff the payload appears to be a full.""" + return not self.IsDelta() + + def Check(self, pubkey_file_name=None, metadata_sig_file=None, + report_out_file=None, assert_type=None, block_size=0, + rootfs_part_size=0, kernel_part_size=0, allow_unhashed=False, + disabled_tests=()): + """Checks the payload integrity. + + Args: + pubkey_file_name: public key used for signature verification + metadata_sig_file: metadata signature, if verification is desired + report_out_file: file object to dump the report to + assert_type: assert that payload is either 'full' or 'delta' + block_size: expected filesystem / payload block size + rootfs_part_size: the size of (physical) rootfs partitions in bytes + kernel_part_size: the size of (physical) kernel partitions in bytes + allow_unhashed: allow unhashed operation blobs + disabled_tests: list of tests to disable + + Raises: + PayloadError if payload verification failed. + """ + self._AssertInit() + + # Create a short-lived payload checker object and run it. + helper = checker.PayloadChecker( + self, assert_type=assert_type, block_size=block_size, + allow_unhashed=allow_unhashed, disabled_tests=disabled_tests) + helper.Run(pubkey_file_name=pubkey_file_name, + metadata_sig_file=metadata_sig_file, + rootfs_part_size=rootfs_part_size, + kernel_part_size=kernel_part_size, + report_out_file=report_out_file) + + def Apply(self, new_kernel_part, new_rootfs_part, old_kernel_part=None, + old_rootfs_part=None, bsdiff_in_place=True, bspatch_path=None, + truncate_to_expected_size=True): + """Applies the update payload. + + Args: + new_kernel_part: name of dest kernel partition file + new_rootfs_part: name of dest rootfs partition file + old_kernel_part: name of source kernel partition file (optional) + old_rootfs_part: name of source rootfs partition file (optional) + bsdiff_in_place: whether to perform BSDIFF operations in-place (optional) + bspatch_path: path to the bspatch binary (optional) + truncate_to_expected_size: whether to truncate the resulting partitions + to their expected sizes, as specified in the + payload (optional) + + Raises: + PayloadError if payload application failed. + """ + self._AssertInit() + + # Create a short-lived payload applier object and run it. + helper = applier.PayloadApplier( + self, bsdiff_in_place=bsdiff_in_place, bspatch_path=bspatch_path, + truncate_to_expected_size=truncate_to_expected_size) + helper.Run(new_kernel_part, new_rootfs_part, + old_kernel_part=old_kernel_part, + old_rootfs_part=old_rootfs_part) + + def TraceBlock(self, block, skip, trace_out_file, is_kernel): + """Traces the origin(s) of a given dest partition block. + + The tracing tries to find origins transitively, when possible (it currently + only works for move operations, where the mapping of src/dst is + one-to-one). It will dump a list of operations and source blocks + responsible for the data in the given dest block. + + Args: + block: the block number whose origin to trace + skip: the number of first origin mappings to skip + trace_out_file: file object to dump the trace to + is_kernel: trace through kernel (True) or rootfs (False) operations + """ + self._AssertInit() + + # Create a short-lived payload block tracer object and run it. + helper = block_tracer.PayloadBlockTracer(self) + helper.Run(block, skip, trace_out_file, is_kernel) |
