Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ coverage: ## check code coverage quickly with the default Python
docs: clean-docs ## generate Sphinx HTML documentation, including API docs
sphinx-apidoc -o docs/ direct
$(MAKE) -C docs clean
python setup.py build_ext --inplace
$(MAKE) -C docs html

viewdocs:
Expand Down
104 changes: 104 additions & 0 deletions direct/common/_gaussian.pyx
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
# coding=utf-8
# Copyright (c) DIRECT Contributors

#cython: cdivision=True
#cython: boundscheck=False
#cython: nonecheck=False
#cython: wraparound=False
#cython: overflowcheck=False
#cython: unraisable_tracebacks=False

import numpy as np

cimport numpy as cnp
from libc.math cimport cos, log, pi, sin, sqrt
from libc.stdlib cimport RAND_MAX, rand, srand

cnp.import_array()


cdef double random_uniform() nogil:
"""Produces a random number in (0, 1)."""
cdef double r = rand()
return r / RAND_MAX


cdef cnp.ndarray[cnp.float_t, ndim=1, mode='c'] random_normal_1d(
double mu, double std
):
"""Produces a random vector from the Gaussian distribution based on the Box-Muller algorithm."""
cdef double r, theta, x

r = sqrt(-2 * log(random_uniform()))
theta = 2 * pi * random_uniform()

x = mu + r * cos(theta) * std

return np.array([x], dtype=float)


cdef cnp.ndarray[cnp.float_t, ndim=1, mode='c'] random_normal_2d(
double mu_x, double mu_y, double std_x, double std_y
):
"""Produces a random vector from bivariate Gaussian distribution based on the Box-Muller algorithm."""
cdef double r, theta, x, y

r = sqrt(-2 * log(random_uniform()))
theta = 2 * pi * random_uniform()

x = mu_x + r * cos(theta) * std_x
y = mu_y + r * sin(theta) * std_y

return np.array([x, y], dtype=float)


def gaussian_mask_1d(
int nonzero_count,
int n,
int center,
double std,
cnp.ndarray[cnp.int_t, ndim=1, mode='c'] mask,
int seed,
):
cdef int count, ind
cdef cnp.ndarray[cnp.float_t, ndim=1, mode='c'] rnd_normal

srand(seed)

count = 0

while count <= nonzero_count:
rnd_normal = random_normal_1d(center, std)
ind = int(rnd_normal[0])
if 0 <= ind < n and mask[ind] != 1:
mask[ind] = 1
count = count + 1


def gaussian_mask_2d(
int nonzero_count,
int nrow,
int ncol,
int center_x,
int center_y,
cnp.ndarray[cnp.float_t, ndim=1, mode='c'] std,
cnp.ndarray[cnp.int_t, ndim=2, mode='c'] mask,
int seed,
):
cdef int count, indx, indy
cdef cnp.ndarray[cnp.float_t, ndim=1, mode='c'] rnd_normal

srand(seed)

count = 0

while count <= nonzero_count:

rnd_normal = random_normal_2d(center_x, center_y, std[0], std[1])

indx = int(rnd_normal[0])
indy = int(rnd_normal[1])

if 0 <= indx < nrow and 0 <= indy < ncol and mask[indx, indy] != 1:
mask[indx, indy] = 1
count = count + 1
206 changes: 181 additions & 25 deletions direct/common/subsample.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import torch

import direct.data.transforms as T
from direct.common._gaussian import gaussian_mask_1d, gaussian_mask_2d # pylint: disable=no-name-in-module
from direct.common._poisson import poisson as _poisson # pylint: disable=no-name-in-module
from direct.environment import DIRECT_CACHE_DIR
from direct.types import Number
Expand All @@ -26,10 +27,12 @@
# pylint: disable=arguments-differ

__all__ = (
"CalgaryCampinasMaskFunc",
"FastMRIRandomMaskFunc",
"FastMRIEquispacedMaskFunc",
"FastMRIMagicMaskFunc",
"CalgaryCampinasMaskFunc",
"Gaussian1DMaskFunc",
"Gaussian2DMaskFunc",
"RadialMaskFunc",
"SpiralMaskFunc",
"VariableDensityPoissonMaskFunc",
Expand Down Expand Up @@ -853,20 +856,11 @@ def mask_func(

with temp_seed(self.rng, seed):
center_fraction, acceleration = self.choose_acceleration()
if seed is None:
# cython requires specific seed type so it cannot be None
cython_seed = 0
elif isinstance(seed, (tuple, list)):
# cython `srand` method takes only integers
cython_seed = int(np.mean(seed))
elif isinstance(seed, int):
cython_seed = seed

if return_acs:
return torch.from_numpy(
self.centered_disk_mask((num_rows, num_cols), center_fraction)[np.newaxis, ..., np.newaxis]
).bool()
mask = self.poisson(num_rows, num_cols, center_fraction, acceleration, cython_seed)
if return_acs:
return torch.from_numpy(
centered_disk_mask((num_rows, num_cols), center_fraction)[np.newaxis, ..., np.newaxis]
).bool()
mask = self.poisson(num_rows, num_cols, center_fraction, acceleration, integerize_seed(seed))
return torch.from_numpy(mask[np.newaxis, ..., np.newaxis]).bool()

def poisson(
Expand Down Expand Up @@ -921,7 +915,7 @@ def poisson(

_poisson(num_rows, num_cols, self.max_attempts, mask, radius_x, radius_y, seed)

mask = mask | self.centered_disk_mask((num_rows, num_cols), center_fraction)
mask = mask | centered_disk_mask((num_rows, num_cols), center_fraction)

if self.crop_corner:
mask *= r < 1
Expand All @@ -940,19 +934,181 @@ def poisson(

return mask

@staticmethod
def centered_disk_mask(shape, center_scale):
center_x = shape[0] // 2
center_y = shape[1] // 2

X, Y = np.indices(shape)
class Gaussian1DMaskFunc(FastMRIMaskFunc):
"""Gaussian 1D vertical line mask function."""

def __init__(
self,
accelerations: Union[List[Number], Tuple[Number, ...]],
center_fractions: Optional[Union[List[float], Tuple[float, ...]]] = None,
uniform_range: bool = False,
):
super().__init__(
accelerations=accelerations,
center_fractions=center_fractions,
uniform_range=uniform_range,
)

def mask_func(
self,
shape: Union[List[int], Tuple[int, ...]],
return_acs: bool = False,
seed: Optional[Union[int, Iterable[int]]] = None,
) -> torch.Tensor:
"""Creates a vertical gaussian mask.

Parameters
----------
shape: list or tuple of ints
The shape of the mask to be created. The shape should at least 3 dimensions.
Samples are drawn along the second last dimension.
return_acs: bool
Return the autocalibration signal region as a mask.
seed: int or iterable of ints or None (optional)
Seed for the random number generator. Setting the seed ensures the same mask is generated
each time for the same shape. Default: None.


Returns
-------
mask: torch.Tensor
The sampling mask.
"""
if len(shape) < 3:
raise ValueError("Shape should have 3 or more dimensions")

with temp_seed(self.rng, seed):
num_cols = shape[-2]

center_fraction, acceleration = self.choose_acceleration()
num_low_freqs = int(round(num_cols * center_fraction))

mask = self.center_mask_func(num_cols, num_low_freqs).astype(int)

if return_acs:
return torch.from_numpy(self._reshape_and_broadcast_mask(shape, mask))

# Calls cython function
nonzero_count = int(np.round(num_cols / acceleration - num_low_freqs - 1))
gaussian_mask_1d(
nonzero_count, num_cols, num_cols // 2, 6 * np.sqrt(num_cols // 2), mask, integerize_seed(seed)
)

return torch.from_numpy(self._reshape_and_broadcast_mask(shape, mask).astype(bool))


class Gaussian2DMaskFunc(BaseMaskFunc):
"""Gaussian 2D mask function."""

def __init__(
self,
accelerations: Union[List[Number], Tuple[Number, ...]],
center_fractions: Optional[Union[List[float], Tuple[float, ...]]] = None,
uniform_range: bool = False,
):
super().__init__(
accelerations=accelerations,
center_fractions=center_fractions,
uniform_range=uniform_range,
)

# r = sqrt( center_scale * H * W / pi)
radius = int(np.sqrt(np.prod(shape) * center_scale / np.pi))
def mask_func(
self,
shape: Union[List[int], Tuple[int, ...]],
return_acs: bool = False,
seed: Optional[Union[int, Iterable[int]]] = None,
) -> torch.Tensor:
"""Creates a 2D gaussian mask.

Parameters
----------
shape: list or tuple of ints
The shape of the mask to be created. The shape should at least 3 dimensions.
Samples are drawn along the second last dimension.
return_acs: bool
Return the autocalibration signal region as a mask.
seed: int or iterable of ints or None (optional)
Seed for the random number generator. Setting the seed ensures the same mask is generated
each time for the same shape. Default: None.


Returns
-------
mask: torch.Tensor
The sampling mask.
"""
if len(shape) < 3:
raise ValueError("Shape should have 3 or more dimensions")

num_rows, num_cols = shape[:2]

with temp_seed(self.rng, seed):
center_fraction, acceleration = self.choose_acceleration()

mask = centered_disk_mask((num_rows, num_cols), center_fraction)
if return_acs:
return torch.from_numpy(mask[np.newaxis, ..., np.newaxis]).bool()

# Calls cython function
nonzero_count = int(np.round(num_cols * num_rows / acceleration - mask.sum() - 1))
std = 6 * np.array([np.sqrt(num_rows // 2), np.sqrt(num_cols // 2)], dtype=float)
gaussian_mask_2d(
nonzero_count, num_rows, num_cols, num_rows // 2, num_cols // 2, std, mask, integerize_seed(seed)
)

return torch.from_numpy(mask[np.newaxis, ..., np.newaxis]).bool()


def integerize_seed(seed: Union[None, Tuple[int, ...], List[int]]) -> int:
"""Returns an integer seed.

If input is integer, will return it. If it's None, it will return a random integer in [0, 1e6). If it's a tuple
or list of integers, will return the integer part of the average.

Can be useful for functions that take as input only integer seeds (e.g. cython functions).

Parameters
----------
seed : int, tuple or list of ints, None
Input seed to integerize.

Returns
-------
out_seed: int
Integer seed.
"""
if seed is None:
return np.random.randint(0, 1e6)
if isinstance(seed, int):
return seed
if isinstance(seed, (tuple, list)):
return int(np.mean(seed))
raise ValueError(f"Invalid seed type. Can be None, integer, or tuple or list of integers. Received {seed}.")


def centered_disk_mask(shape: Union[List[int], Tuple[int, ...]], center_scale: float) -> np.ndarray:
r"""Creates a mask with a centered disk of radius :math:`R=\sqrt{c_x \cdot c_y \cdot r / \pi}`.

Parameters
----------
shape : list or tuple of ints
The shape of the (2D) mask to be created.
center_scale : float
Center scale.

Returns
-------
mask : np.ndarray
"""
center_x = shape[0] // 2
center_y = shape[1] // 2

mask = ((X - center_x) ** 2 + (Y - center_y) ** 2) < radius**2
X, Y = np.indices(shape)
radius = int(np.sqrt(np.prod(shape) * center_scale / np.pi))

return mask.astype(int)
mask = ((X - center_x) ** 2 + (Y - center_y) ** 2) < radius**2
return mask.astype(int)


class DictionaryMaskFunc(BaseMaskFunc):
Expand Down
2 changes: 1 addition & 1 deletion direct/data/transforms.py
Original file line number Diff line number Diff line change
Expand Up @@ -554,7 +554,7 @@ def apply_padding(
Parameters
----------
data : torch.Tensor
Batched or not input to be padded of shape (`batch`, *, `height`, `width`, *).
Batched or not input to be padded of shape (`batch`, \*, `height`, `width`, \*).
padding : torch.Tensor or None
Binary tensor of shape (`batch`, 1, `height`, `width`, 1). Entries in `padding` with non-zero value
point to samples in `data` that will be zero-padded. If None, `data` will be returned.
Expand Down
Loading