Source code for planetaryimage.pds3image

# -*- coding: utf-8 -*-
import numpy
import six
import os
import pvl
import collections

from .image import PlanetaryImage
from .decoders import BandSequentialDecoder


class Pointer(collections.namedtuple('Pointer', ['filename', 'bytes'])):
    @staticmethod
    def _parse_bytes(value, record_bytes):
        if isinstance(value, six.integer_types):
            return (value - 1) * record_bytes

        if isinstance(value, pvl.Units) and value.units == 'BYTES':
            return value.value

        raise ValueError('Unsupported pointer type')

    @classmethod
    def parse(cls, value, record_bytes):
        """Parses the pointer label.

        Parameters
        ----------
        pointer_data
            Supported values for `pointer_data` are::

                ^PTR = nnn
                ^PTR = nnn <BYTES>
                ^PTR = "filename"
                ^PTR = ("filename")
                ^PTR = ("filename", nnn)
                ^PTR = ("filename", nnn <BYTES>)

        record_bytes
            Record multiplier value

        Returns
        -------
        Pointer object
        """
        if isinstance(value, six.string_types):
            return cls(value, 0)

        if isinstance(value, list):
            if len(value) == 1:
                return cls(value[0], 0)

            if len(value) == 2:
                return cls(value[0], cls._parse_bytes(value[1], record_bytes))

            raise ValueError('Unsupported pointer type')

        return cls(None, cls._parse_bytes(value, record_bytes))


[docs]class PDS3Image(PlanetaryImage): """A PDS3 image reader. Examples -------- >>> from planetaryimage import PDS3Image >>> testfile = 'tests/mission_data/2p129641989eth0361p2600r8m1.img' >>> image = PDS3Image.open(testfile) >>> # Examples of PDS3Image Attributes >>> image.dtype dtype('>i2') >>> image.record_bytes 128 >>> image.data_filename """ SAMPLE_TYPES = { 'MSB_INTEGER': '>i', 'INTEGER': '>i', 'MAC_INTEGER': '>i', 'SUN_INTEGER': '>i', 'MSB_UNSIGNED_INTEGER': '>u', 'UNSIGNED_INTEGER': '>u', 'MAC_UNSIGNED_INTEGER': '>u', 'SUN_UNSIGNED_INTEGER': '>u', 'LSB_INTEGER': '<i', 'PC_INTEGER': '<i', 'VAX_INTEGER': '<i', 'LSB_UNSIGNED_INTEGER': '<u', 'PC_UNSIGNED_INTEGER': '<u', 'VAX_UNSIGNED_INTEGER': '<u', 'IEEE_REAL': '>f', 'FLOAT': '>f', 'REAL': '>f', 'MAC_REAL': '>f', 'SUN_REAL': '>f', 'IEEE_COMPLEX': '>c', 'COMPLEX': '>c', 'MAC_COMPLEX': '>c', 'SUN_COMPLEX': '>c', 'PC_REAL': '<f', 'PC_COMPLEX': '<c', 'MSB_BIT_STRING': '>S', 'LSB_BIT_STRING': '<S', 'VAX_BIT_STRING': '<S', } DTYPES = { '>i': 'MSB_INTEGER', '>u': 'MSB_UNSIGNED_INTEGER', '<i': 'LSB_INTEGER', '<u': 'LSB_UNSIGNED_INTEGER', '>f': 'IEEE_REAL', '>c': 'IEEE_COMPLEX', '<f': 'PC_REAL', '<c': 'PC_COMPLEX', '>S': 'MSB_BIT_STRING', '<S': 'LSB_BIT_STRING', } def _save(self, file_to_write, overwrite): """Save PDS3Image object as PDS3 file. Parameters ---------- filename: Set filename for the pds image to be saved. Overwrite: Use this keyword to save image with same filename. Usage: image.save('temp.IMG', overwrite=True) """ if overwrite: file_to_write = self.filename elif os.path.isfile(file_to_write): msg = 'File ' + file_to_write + ' already exists !\n' + \ 'Call save() with "overwrite = True" to overwrite the file.' raise IOError(msg) encoder = pvl.encoder.PDSLabelEncoder serial_label = pvl.dumps(self.label, cls=encoder) label_sz = len(serial_label) image_pointer = int(label_sz / self.label['RECORD_BYTES']) + 1 self.label['^IMAGE'] = image_pointer + 1 if self._sample_bytes != self.label['IMAGE']['SAMPLE_BITS'] * 8: self.label['IMAGE']['SAMPLE_BITS'] = self.data.itemsize * 8 sample_type_to_save = self.DTYPES[self._sample_type[0] + self.dtype.kind] self.label['IMAGE']['SAMPLE_TYPE'] = sample_type_to_save if len(self.data.shape) == 3: self.label['IMAGE']['BANDS'] = self.data.shape[0] self.label['IMAGE']['LINES'] = self.data.shape[1] self.label['IMAGE']['LINE_SAMPLES'] = self.data.shape[2] else: self.label['IMAGE']['BANDS'] = 1 self.label['IMAGE']['LINES'] = self.data.shape[0] self.label['IMAGE']['LINE_SAMPLES'] = self.data.shape[1] diff = 0 if len(pvl.dumps(self.label, cls=encoder)) != label_sz: diff = abs(label_sz - len(pvl.dumps(self.label, cls=encoder))) pvl.dump(self.label, file_to_write, cls=encoder) offset = image_pointer * self.label['RECORD_BYTES'] - label_sz stream = open(file_to_write, 'a') for i in range(0, offset+diff): stream.write(" ") if (self._bands > 1 and self._format != 'BAND_SEQUENTIAL'): raise NotImplementedError else: self.data.tofile(stream, format='%' + self.dtype.kind) stream.close() def _create_label(self, array): """Create sample PDS3 label for NumPy Array. It is called by 'image.py' to create PDS3Image object from Numpy Array. Returns ------- PVLModule label for the given NumPy array. Usage: self.label = _create_label(array) """ if len(array.shape) == 3: bands = array.shape[0] lines = array.shape[1] line_samples = array.shape[2] else: bands = 1 lines = array.shape[0] line_samples = array.shape[1] record_bytes = line_samples * array.itemsize label_module = pvl.PVLModule([ ('PDS_VERSION_ID', 'PDS3'), ('RECORD_TYPE', 'FIXED_LENGTH'), ('RECORD_BYTES', record_bytes), ('LABEL_RECORDS', 1), ('^IMAGE', 1), ('IMAGE', {'BANDS': bands, 'LINES': lines, 'LINE_SAMPLES': line_samples, 'MAXIMUM': 0, 'MEAN': 0, 'MEDIAN': 0, 'MINIMUM': 0, 'SAMPLE_BITS': array.itemsize * 8, 'SAMPLE_TYPE': 'MSB_INTEGER', 'STANDARD_DEVIATION': 0}) ]) return self._update_label(label_module, array) def _update_label(self, label, array): """Update PDS3 label for NumPy Array. It is called by '_create_label' to update label values such as, - ^IMAGE, RECORD_BYTES - STANDARD_DEVIATION - MAXIMUM, MINIMUM - MEDIAN, MEAN Returns ------- Update label module for the NumPy array. Usage: self.label = self._update_label(label, array) """ maximum = float(numpy.max(array)) mean = float(numpy.mean(array)) median = float(numpy.median(array)) minimum = float(numpy.min(array)) stdev = float(numpy.std(array, ddof=1)) encoder = pvl.encoder.PDSLabelEncoder serial_label = pvl.dumps(label, cls=encoder) label_sz = len(serial_label) image_pointer = int(label_sz / label['RECORD_BYTES']) + 1 label['^IMAGE'] = image_pointer + 1 label['LABEL_RECORDS'] = image_pointer label['IMAGE']['MEAN'] = mean label['IMAGE']['MAXIMUM'] = maximum label['IMAGE']['MEDIAN'] = median label['IMAGE']['MINIMUM'] = minimum label['IMAGE']['STANDARD_DEVIATION'] = stdev return label @property def _bands(self): try: if len(self.data.shape) == 3: return self.data.shape[0] else: return 1 except AttributeError: return self.label['IMAGE'].get('BANDS', 1) @property def _lines(self): try: if len(self.data.shape) == 3: return self.data.shape[1] else: return self.data.shape[0] except AttributeError: return self.label['IMAGE']['LINES'] @property def _samples(self): try: if len(self.data.shape) == 3: return self.data.shape[2] else: return self.data.shape[1] except AttributeError: return self.label['IMAGE']['LINE_SAMPLES'] @property def _format(self): return self.label['IMAGE'].get('BAND_STORAGE_TYPE', 'BAND_SEQUENTIAL') @property def _start_byte(self): return self._image_pointer.bytes @property def _data_filename(self): return self._image_pointer.filename @property def _dtype(self): return self._pixel_type.newbyteorder(self._byte_order) @property def record_bytes(self): """Number of bytes for fixed length records.""" return self.label.get('RECORD_BYTES', 0) @property def _image_pointer(self): return Pointer.parse(self.label['^IMAGE'], self.record_bytes) @property def _sample_type(self): sample_type = self.label['IMAGE']['SAMPLE_TYPE'] try: return self.SAMPLE_TYPES[sample_type] except KeyError: raise ValueError('Unsupported sample type: %r' % sample_type) @property def _sample_bytes(self): # get bytes to match NumPy dtype expressions try: return self.data.itemsize except AttributeError: return int(self.label['IMAGE']['SAMPLE_BITS'] / 8) # FIXME: This dtype overrides the Image.dtype right? Then whats the point # of _dtype above here ^^, should we just rename this one _dtype and remove # the other one? @property def dtype(self): """Pixel data type.""" try: return self.data.dtype except AttributeError: return numpy.dtype('%s%d' % (self._sample_type, self._sample_bytes)) @property def _decoder(self): if self.format == 'BAND_SEQUENTIAL': return BandSequentialDecoder( self.dtype, self.shape, self.compression ) raise ValueError('Unkown format (%s)' % self.format)