Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
85 changes: 51 additions & 34 deletions aeon/distances/elastic/_bounding_matrix.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,44 +63,61 @@ def create_bounding_matrix(
def _itakura_parallelogram(x_size: int, y_size: int, max_slope_percent: float):
"""Itakura parallelogram bounding matrix.

This code was adapted from tslearn. This link to the original code line 974:
https://github.com/tslearn-team/tslearn/blob/main/tslearn/metrics/dtw_variants.py
This code was adapted from pyts. This link to the original code:
https://pyts.readthedocs.io/en/latest/_modules/pyts/metrics/dtw.html#itakura_parallelogram
"""
if x_size != y_size:
raise ValueError(
"""Itakura parallelogram does not support unequal length time series.
Please consider using a full bounding matrix or a sakoe chiba bounding matrix
instead."""
)
one_percent = min(x_size, y_size) / 100
max_slope = math.floor((max_slope_percent * one_percent) * 100)
min_slope = 1 / float(max_slope)
max_slope *= float(x_size) / float(y_size)
min_slope *= float(x_size) / float(y_size)

lower_bound = np.empty((2, y_size))
lower_bound[0] = min_slope * np.arange(y_size)
lower_bound[1] = (
(x_size - 1) - max_slope * (y_size - 1) + max_slope * np.arange(y_size)
)
lower_bound_ = np.empty(y_size)
for i in range(y_size):
lower_bound_[i] = max(round(lower_bound[0, i], 2), round(lower_bound[1, i], 2))
lower_bound_ = np.ceil(lower_bound_)

upper_bound = np.empty((2, y_size))
upper_bound[0] = max_slope * np.arange(y_size)
upper_bound[1] = (
(x_size - 1) - min_slope * (y_size - 1) + min_slope * np.arange(y_size)
)
upper_bound_ = np.empty(y_size)
for i in range(y_size):
upper_bound_[i] = min(round(upper_bound[0, i], 2), round(upper_bound[1, i], 2))
upper_bound_ = np.floor(upper_bound_ + 1)

bounding_matrix = np.full((x_size, y_size), False)
for i in range(y_size):
bounding_matrix[int(lower_bound_[i]) : int(upper_bound_[i]), i] = True
max_slope *= float(y_size - 1) / float(x_size - 2)
max_slope = max(max_slope, 1.0)

min_slope *= float(y_size - 2) / float(x_size - 1)
min_slope = min(min_slope, 1.0)

centered_scale = np.arange(x_size) - x_size + 1

lower_bound = np.empty(x_size, dtype=np.float64)
upper_bound = np.empty(x_size, dtype=np.float64)

for i in range(x_size):
lb0 = min_slope * i
lb1 = max_slope * centered_scale[i] + y_size - 1
lower_bound[i] = math.ceil(max(round(lb0, 2), round(lb1, 2)))

ub0 = max_slope * i + 1
ub1 = min_slope * centered_scale[i] + y_size
upper_bound[i] = math.floor(min(round(ub0, 2), round(ub1, 2)))

if max_slope == 1.0:
if y_size > x_size:
for i in range(x_size - 1):
upper_bound[i] = lower_bound[i + 1]
else:
for i in range(x_size):
upper_bound[i] = lower_bound[i] + 1

for i in range(x_size):
if lower_bound[i] < 0:
lower_bound[i] = 0
if lower_bound[i] > y_size:
lower_bound[i] = y_size
if upper_bound[i] < 0:
upper_bound[i] = 0
if upper_bound[i] > y_size:
upper_bound[i] = y_size

bounding_matrix = np.empty((x_size, y_size), dtype=np.bool_)
for i in range(x_size):
for j in range(y_size):
bounding_matrix[i, j] = False

for i in range(x_size):
start = int(lower_bound[i])
end = int(upper_bound[i])
for j in range(start, end):
bounding_matrix[i, j] = True

return bounding_matrix


Expand Down
39 changes: 31 additions & 8 deletions aeon/distances/elastic/tests/test_bounding.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
"""Test for bounding matrix."""

import numpy as np
import pytest

from aeon.distances import create_bounding_matrix

Expand Down Expand Up @@ -37,10 +36,34 @@ def test_itakura_parallelogram():
matrix = create_bounding_matrix(10, 10, itakura_max_slope=0.2)
assert isinstance(matrix, np.ndarray)

with pytest.raises(
ValueError,
match="""Itakura parallelogram does not support unequal length time series.
Please consider using a full bounding matrix or a sakoe chiba bounding matrix
instead.""",
):
create_bounding_matrix(5, 10, itakura_max_slope=0.2)
expected_result_5_7 = np.array(
[
[True, False, False, False, False, False, False],
[False, True, True, True, True, False, False],
[False, False, True, True, True, False, False],
[False, False, True, True, True, True, False],
[False, False, False, False, False, False, True],
]
)

expected_result_7_5 = np.array(
[
[True, False, False, False, False],
[False, True, False, False, False],
[False, True, True, True, False],
[False, True, True, True, False],
[False, True, True, True, False],
[False, False, False, True, False],
[False, False, False, False, True],
]
)

matrix = create_bounding_matrix(5, 7, itakura_max_slope=0.5)
assert isinstance(matrix, np.ndarray)
assert matrix.shape == (5, 7)
assert np.array_equal(matrix, expected_result_5_7)

matrix = create_bounding_matrix(7, 5, itakura_max_slope=0.5)
assert isinstance(matrix, np.ndarray)
assert matrix.shape == (7, 5)
assert np.array_equal(matrix, expected_result_7_5)