Source code for afmformats.afm_data

import abc
import io
import json
import pathlib
import warnings

import h5py
import numpy as np

from ._version import version
from .meta import MetaData


__all__ = ["AFMData", "column_dtypes", "column_units", "known_columns"]


[docs]class AFMData(abc.ABC): """General base class for AFM data""" def __init__(self, data, metadata, diskcache=False): """Initialization Parameters ---------- data: dict-like Experimental data metadata: dict Metadata diskcache: bool TODO """ # convert meta data metadata_i = MetaData(metadata) # check data keys for cc in data: if cc not in known_columns: raise ValueError("Unknown column name '{}'!".format(cc)) self._path = pathlib.Path(metadata_i["path"]) self._metadata = metadata_i self._enum = metadata_i["enum"] # raw data will not be touched self._raw_data = data self._data = {} def __contains__(self, key): return self._data.__contains__(key) or self._raw_data.__contains__(key) def __getitem__(self, key): if key in self._data: data = self._data[key] elif key in self._raw_data: data = self._raw_data[key].copy() elif key == "index": return np.arange(len(self)) else: raise KeyError("Column '{}' not defined!".format(key)) return data def __len__(self): # If you are here, you might have asked yourself why # loading your data takes so long. You have tried # lazy-loading and there shouldn't be a reason why # it is sooo slow! Except there is, because we need # the size of the dataset - and if that is not in the # metadata, then we just take the length of the first # data column as a workaround (which takes time). if "point count" not in self._metadata: k0 = list(self._raw_data.keys())[0] length = len(self[k0]) else: length = self._metadata["point count"] return length def __setitem__(self, key, values): """Set column data""" if len(values) != len(self): raise ValueError( f"Cannot set data '{key}' of length '{len(values)}' " + f"for AFMForceDistance of length '{len(self)}'!") # do not touch raw data self._data[key] = values def __str__(self): strre = "{} '{}'[{}]".format( self.__class__.__name__, self.path, self.enum) return strre def __repr__(self): repre = "<{} '{}'[{}] at {}>".format( self.__class__.__name__, self.path, self.enum, hex(id(self))) return repre @property def columns(self): """Available data columns""" raw = self._raw_data.keys() new = self._data.keys() return sorted(set(raw) | set(new)) @property def columns_innate(self): """Data columns available only in the original data file""" return sorted(self._raw_data.keys()) @property def enum(self): """Unique index of `self` in `self.path` Indexing starts at "0" """ return self._enum @property def metadata(self): """Unique index of `self` in `self.path`""" return MetaData(self._metadata.copy()) @property @abc.abstractmethod def modality(self): """Imaging modality (e.g. force-distance)""" @property def path(self): """Path to the measurement file""" return self._path def _export_hdf5(self, h5group, metadata_dict=None): """Export data to the HDF5 file format Parameters ---------- h5group: h5py.Group or h5py.File Destination group metadata_dict: dict Key-value pairs for the metadata that should be exported (will be stored in the group attributes) """ # set the software and its version if metadata_dict is None: metadata_dict = {} h5group.attrs["software"] = "afmformats" h5group.attrs["software version"] = version enum_key = str(self.enum) if enum_key in h5group: # random fill-mode (get the next free enum key) ii = 0 while True: enum_key = str(ii) if enum_key not in h5group: break ii += 1 metadata_dict["enum"] = int(enum_key) subgroup = h5group.create_group(enum_key) for col in self.columns: if col == "segment": ds = subgroup.create_dataset(name=col, data=np.asarray(self[col], dtype=np.uint8), compression="gzip", fletcher32=True) elif col == "index": # do not store index column continue else: ds = subgroup.create_dataset(name=col, data=self[col], compression="gzip", fletcher32=True) ds.attrs["unit"] = column_units[col] for kk in metadata_dict: if kk == "path": subgroup.attrs["path"] = str(metadata_dict["path"]) else: subgroup.attrs[kk] = metadata_dict[kk] def _export_tab(self, fd, metadata_dict=None): """Export data to a tab separated values file Parameters ---------- fd: io.IOBase File opened in "w" mode metadata_dict: dict Key-value pairs for the metadata that should be exported (will be stored in the group attributes) """ if metadata_dict is None: metadata_dict = {} fd.write("# afmformats {}\r\n".format(version)) fd.write("#\r\n") if metadata_dict: # write metadata dump = json.dumps(metadata_dict, sort_keys=True, indent=2, default=json_path_serializer) fd.write("# BEGIN METADATA\r\n") for dl in dump.split("\n"): fd.write("# " + dl + "\r\n") fd.write("# END METADATA\r\n") fd.write("#\r\n") # get all data (faster than doing it every time for every row) data = {} for cc in self.columns: data[cc] = self[cc] # header fd.write("# " + "\t".join(self.columns) + "\r\n") # rows for ii in range(len(self)): items = [] for cc in self.columns: items.append("{:.8g}".format(data[cc][ii])) fd.write("\t".join(items) + "\r\n") def export(self, *args, **kwargs): warnings.warn("Pleas use `export_data` for data export!", DeprecationWarning) return self.export_data(*args, **kwargs)
[docs] def export_data(self, out, metadata=True, fmt="tab"): """Export all data columns to a file Parameters ---------- out: str, pathlib.Path, writable io.IOBase, or h5py.Group Output path, open file, or h5py object metadata: bool or list If True, all available metadata are stored. If False, no metadata are stored. If a list, only the given metadata keys are stored. fmt: str "tab" for the tab separated values format and "hdf5" / "h5" for the HDF5 file format Notes ----- - If you wish to append HDF5 data to an existing file, please open the file first and call this function with the h5py.File object, i.e. .. code:: python with h5py.File(path, "a") as h5: fdist.export(out=h5, fmt="hdf5") Otherwise the file will be overridden. - The column "index" is not exported in the HDF5 file format """ if isinstance(metadata, (list, tuple)): # list of keys metadata_dict = {} for key in metadata: metadata_dict[key] = self.metadata[key] elif isinstance(metadata, bool) and metadata: # all metadata metadata_dict = self.metadata else: raise ValueError("Metadata must be list, tuple, or bool, got " f"'{metadata}' of type '{type(metadata)}'!") if fmt == "tab": if isinstance(out, (pathlib.Path, str)): fd = pathlib.Path(out).open("w") close = True elif isinstance(out, io.IOBase): fd = out close = False else: raise ValueError("Unexpected object class for 'out': " + "'{}' for format 'tab'!".format( out.__class__)) self._export_tab(fd, metadata_dict=metadata_dict) if close: fd.close() elif fmt in ["hdf5", "h5"]: if isinstance(out, (pathlib.Path, str)): # overrides always h5 = h5py.File(out, "w") close = True elif isinstance(out, h5py.Group): h5 = out close = False else: raise ValueError("Unexpected object class for 'out': " + "'{}' for format 'hdf5'!".format( out.__class__)) self._export_hdf5(h5group=h5, metadata_dict=metadata_dict) if close: h5.close() else: raise ValueError("Unexpected string for 'fmt': {}".format(fmt))
[docs] def reset_data(self): """Resets all data to the state they were after loading Internally, only `self._data` is `clear`ed, which means that all calls to `__getitem__` fall-back to `self._raw_data`. """ self._data.clear()
def json_path_serializer(obj): """Used to convert pathlib.Path to str in metadata""" if isinstance(obj, pathlib.Path): return str(obj) else: raise TypeError(f"TypeError: Object of type {obj.__class__} " + "is not JSON serializable") #: Data types of all known columns (all other columns are assumed to be float) column_dtypes = { "force": float, "height (measured)": float, "height (piezo)": float, "index": int, "segment": np.uint8, "time": float, "tip position": float, } #: Units of all known columns column_units = { "force": "N", "height (measured)": "m", "height (piezo)": "m", "index": "", "segment": "", "time": "s", "tip position": "m", } #: Known data columns known_columns = sorted(column_dtypes.keys())