Skip to content
This repository was archived by the owner on Sep 11, 2023. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions doc/source/CHANGELOG.rst
Original file line number Diff line number Diff line change
@@ -1,6 +1,16 @@
Changelog
=========

2.5.1 (02-14-2018)
------------------

Quick fix release to repair chunking in the coordinates package.

**Fixes**:

- coordinates: fixed handling of default chunksize. #1247


2.5 (02-09-2018)
----------------

Expand Down
32 changes: 22 additions & 10 deletions pyemma/coordinates/data/_base/iterable.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,14 +45,24 @@ def ndim(self):
def default_chunksize(self):
""" How much data will be processed at once, in case no chunksize has been provided."""
if self._default_chunksize is None:
from pyemma import config
from pyemma.util.units import string_to_bytes
max_bytes = string_to_bytes(config.default_chunksize)
itemsize = np.dtype(self.output_type()).itemsize
# TODO: consider rounding this to some cache size of CPU? e.g py-cpuinfo can obtain it.
max_elements = max_bytes // itemsize // self.ndim
self._default_chunksize = max_elements
assert self._default_chunksize > 0
try:
# some overloads of dimension can raise, eg. PCA, TICA
dim = self.dimension()
except:
self.logger.info('could not obtain output dimension, defaulting to chunksize=1000')
self._default_chunksize = 1000
else:
# obtain a human readable memory size from the config, convert it to bytes and calc maximum chunksize.
from pyemma import config
from pyemma.util.units import string_to_bytes
max_bytes = string_to_bytes(config.default_chunksize)
itemsize = np.dtype(self.output_type()).itemsize
# TODO: consider rounding this to some cache size of CPU? e.g py-cpuinfo can obtain it.
# if one time step is already bigger than max_memory, we set the chunksize to 1.
max_elements = max(1, int(np.floor(max_bytes / (itemsize * self.ndim))))
assert max_elements * self.ndim * itemsize <= max_bytes or max_elements == 1
self._default_chunksize = max(1, max_elements // self.ndim)
assert self._default_chunksize > 0, self._default_chunksize
return self._default_chunksize

@property
Expand All @@ -61,8 +71,10 @@ def chunksize(self):

@chunksize.setter
def chunksize(self, value):
if self.default_chunksize < 0:
raise ValueError("Chunksize of %s was provided, but has to be >= 0" % self.default_chunksize)
if not isinstance(value, (type(None), int)):
raise ValueError('chunksize has to be of type: None or int')
if isinstance(value, int) and value < 0:
raise ValueError("Chunksize of %s was provided, but has to be >= 0" % value)
self._default_chunksize = value

def iterator(self, stride=1, lag=0, chunk=None, return_trajindex=True, cols=None, skip=0):
Expand Down
10 changes: 6 additions & 4 deletions pyemma/coordinates/data/_base/transformer.py
Original file line number Diff line number Diff line change
Expand Up @@ -167,21 +167,23 @@ def _clear_in_memory(self):
super(StreamingTransformer, self)._clear_in_memory()
self._set_random_access_strategies()

def _create_iterator(self, skip=0, chunk=0, stride=1, return_trajindex=True, cols=None):
def _create_iterator(self, skip=0, chunk=None, stride=1, return_trajindex=True, cols=None):
return StreamingTransformerIterator(self, skip=skip, chunk=chunk, stride=stride,
return_trajindex=return_trajindex, cols=cols)

@property
def chunksize(self):
"""chunksize defines how much data is being processed at once."""
if not self.data_producer:
return self._default_chunksize
return self.default_chunksize
return self.data_producer.chunksize

@chunksize.setter
def chunksize(self, size):
if self.data_producer is None:
raise RuntimeError('cant set chunksize')
if size < 0:
raise ValueError('chunksize has to be positive.')
self._default_chunksize = size
self.data_producer.chunksize = size

def number_of_trajectories(self, stride=1):
Expand Down Expand Up @@ -220,7 +222,7 @@ def get_output(self, dimensions=slice(0, None), stride=1, skip=0, chunk=None):

class StreamingTransformerIterator(DataSourceIterator):

def __init__(self, data_source, skip=0, chunk=0, stride=1, return_trajindex=False, cols=None):
def __init__(self, data_source, skip=0, chunk=None, stride=1, return_trajindex=False, cols=None):
super(StreamingTransformerIterator, self).__init__(
data_source, return_trajindex=return_trajindex)
self._it = self._data_source.data_producer.iterator(
Expand Down
9 changes: 9 additions & 0 deletions pyemma/coordinates/tests/test_coordinates_iterator.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,15 @@ def test_chunksize(self):
it.chunksize = cs[i]
assert it.chunksize == cs[i]

def test_chunksize_max_memory(self):
from pyemma.util.contexts import settings
data = np.random.random((10000, 10))
max_size = 1024
with settings(default_chunksize=str(max_size)):
r = DataInMemory(data)
for itraj, x in r.iterator():
self.assertLessEqual(x.nbytes, max_size)

def test_last_chunk(self):
r = DataInMemory(self.d)
it = r.iterator(chunk=0)
Expand Down