diff --git a/doc/source/CHANGELOG.rst b/doc/source/CHANGELOG.rst index 23c35cf67..60d135f7e 100644 --- a/doc/source/CHANGELOG.rst +++ b/doc/source/CHANGELOG.rst @@ -1,6 +1,16 @@ Changelog ========= +2.5.1 (02-14-2018) +------------------ + +Quick fix release to repair chunking in the coordinates package. + +**Fixes**: + +- coordinates: fixed handling of default chunksize. #1247 + + 2.5 (02-09-2018) ---------------- diff --git a/pyemma/coordinates/data/_base/iterable.py b/pyemma/coordinates/data/_base/iterable.py index 40825cfc3..6bab3a6b8 100644 --- a/pyemma/coordinates/data/_base/iterable.py +++ b/pyemma/coordinates/data/_base/iterable.py @@ -45,14 +45,24 @@ def ndim(self): def default_chunksize(self): """ How much data will be processed at once, in case no chunksize has been provided.""" if self._default_chunksize is None: - from pyemma import config - from pyemma.util.units import string_to_bytes - max_bytes = string_to_bytes(config.default_chunksize) - itemsize = np.dtype(self.output_type()).itemsize - # TODO: consider rounding this to some cache size of CPU? e.g py-cpuinfo can obtain it. - max_elements = max_bytes // itemsize // self.ndim - self._default_chunksize = max_elements - assert self._default_chunksize > 0 + try: + # some overloads of dimension can raise, eg. PCA, TICA + dim = self.dimension() + except: + self.logger.info('could not obtain output dimension, defaulting to chunksize=1000') + self._default_chunksize = 1000 + else: + # obtain a human readable memory size from the config, convert it to bytes and calc maximum chunksize. + from pyemma import config + from pyemma.util.units import string_to_bytes + max_bytes = string_to_bytes(config.default_chunksize) + itemsize = np.dtype(self.output_type()).itemsize + # TODO: consider rounding this to some cache size of CPU? e.g py-cpuinfo can obtain it. + # if one time step is already bigger than max_memory, we set the chunksize to 1. + max_elements = max(1, int(np.floor(max_bytes / (itemsize * self.ndim)))) + assert max_elements * self.ndim * itemsize <= max_bytes or max_elements == 1 + self._default_chunksize = max(1, max_elements // self.ndim) + assert self._default_chunksize > 0, self._default_chunksize return self._default_chunksize @property @@ -61,8 +71,10 @@ def chunksize(self): @chunksize.setter def chunksize(self, value): - if self.default_chunksize < 0: - raise ValueError("Chunksize of %s was provided, but has to be >= 0" % self.default_chunksize) + if not isinstance(value, (type(None), int)): + raise ValueError('chunksize has to be of type: None or int') + if isinstance(value, int) and value < 0: + raise ValueError("Chunksize of %s was provided, but has to be >= 0" % value) self._default_chunksize = value def iterator(self, stride=1, lag=0, chunk=None, return_trajindex=True, cols=None, skip=0): diff --git a/pyemma/coordinates/data/_base/transformer.py b/pyemma/coordinates/data/_base/transformer.py index 80ddefa81..889377d3e 100644 --- a/pyemma/coordinates/data/_base/transformer.py +++ b/pyemma/coordinates/data/_base/transformer.py @@ -167,7 +167,7 @@ def _clear_in_memory(self): super(StreamingTransformer, self)._clear_in_memory() self._set_random_access_strategies() - def _create_iterator(self, skip=0, chunk=0, stride=1, return_trajindex=True, cols=None): + def _create_iterator(self, skip=0, chunk=None, stride=1, return_trajindex=True, cols=None): return StreamingTransformerIterator(self, skip=skip, chunk=chunk, stride=stride, return_trajindex=return_trajindex, cols=cols) @@ -175,13 +175,15 @@ def _create_iterator(self, skip=0, chunk=0, stride=1, return_trajindex=True, col def chunksize(self): """chunksize defines how much data is being processed at once.""" if not self.data_producer: - return self._default_chunksize + return self.default_chunksize return self.data_producer.chunksize @chunksize.setter def chunksize(self, size): if self.data_producer is None: - raise RuntimeError('cant set chunksize') + if size < 0: + raise ValueError('chunksize has to be positive.') + self._default_chunksize = size self.data_producer.chunksize = size def number_of_trajectories(self, stride=1): @@ -220,7 +222,7 @@ def get_output(self, dimensions=slice(0, None), stride=1, skip=0, chunk=None): class StreamingTransformerIterator(DataSourceIterator): - def __init__(self, data_source, skip=0, chunk=0, stride=1, return_trajindex=False, cols=None): + def __init__(self, data_source, skip=0, chunk=None, stride=1, return_trajindex=False, cols=None): super(StreamingTransformerIterator, self).__init__( data_source, return_trajindex=return_trajindex) self._it = self._data_source.data_producer.iterator( diff --git a/pyemma/coordinates/tests/test_coordinates_iterator.py b/pyemma/coordinates/tests/test_coordinates_iterator.py index 5354dba53..a88f54242 100644 --- a/pyemma/coordinates/tests/test_coordinates_iterator.py +++ b/pyemma/coordinates/tests/test_coordinates_iterator.py @@ -103,6 +103,15 @@ def test_chunksize(self): it.chunksize = cs[i] assert it.chunksize == cs[i] + def test_chunksize_max_memory(self): + from pyemma.util.contexts import settings + data = np.random.random((10000, 10)) + max_size = 1024 + with settings(default_chunksize=str(max_size)): + r = DataInMemory(data) + for itraj, x in r.iterator(): + self.assertLessEqual(x.nbytes, max_size) + def test_last_chunk(self): r = DataInMemory(self.d) it = r.iterator(chunk=0)