-
Notifications
You must be signed in to change notification settings - Fork 2
Expand file tree
/
Copy pathfreesurfer_utils.py
More file actions
1816 lines (1549 loc) · 72.5 KB
/
freesurfer_utils.py
File metadata and controls
1816 lines (1549 loc) · 72.5 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
# -*-coding:utf-8 -*-
'''
Code based on https://github.com/freesurfer/freesurfer
@Author : Freesurfer
'''
import logging
import glob
import os
import time
import numpy as np
import nibabel as nib
import tensorflow as tf
from datetime import timedelta
from itertools import combinations
from scipy.ndimage import label as scipy_label
from scipy.ndimage import binary_dilation, binary_erosion, gaussian_filter
from scipy.ndimage import distance_transform_edt, binary_fill_holes
from scipy.interpolate import RegularGridInterpolator
from tensorflow import keras
from keras import backend as K
# ================================================================================================
# Neurite Utilities - See github.com/adalca/neurite
# ================================================================================================
def unet(nb_features,
input_shape,
nb_levels,
conv_size,
nb_labels,
name='unet',
prefix=None,
feat_mult=1,
pool_size=2,
padding='same',
dilation_rate_mult=1,
activation='elu',
skip_n_concatenations=0,
use_residuals=False,
final_pred_activation='softmax',
nb_conv_per_level=1,
layer_nb_feats=None,
conv_dropout=0,
batch_norm=None,
input_model=None):
"""
Unet-style keras model with an overdose of parametrization. Copied with permission
from github.com/adalca/neurite.
"""
# naming
model_name = name
if prefix is None:
prefix = model_name
# volume size data
ndims = len(input_shape) - 1
if isinstance(pool_size, int):
pool_size = (pool_size,) * ndims
# get encoding model
enc_model = conv_enc(nb_features,
input_shape,
nb_levels,
conv_size,
name=model_name,
prefix=prefix,
feat_mult=feat_mult,
pool_size=pool_size,
padding=padding,
dilation_rate_mult=dilation_rate_mult,
activation=activation,
use_residuals=use_residuals,
nb_conv_per_level=nb_conv_per_level,
layer_nb_feats=layer_nb_feats,
conv_dropout=conv_dropout,
batch_norm=batch_norm,
input_model=input_model)
# get decoder
# use_skip_connections=True makes it a u-net
lnf = layer_nb_feats[(nb_levels * nb_conv_per_level):] if layer_nb_feats is not None else None
dec_model = conv_dec(nb_features,
None,
nb_levels,
conv_size,
nb_labels,
name=model_name,
prefix=prefix,
feat_mult=feat_mult,
pool_size=pool_size,
use_skip_connections=True,
skip_n_concatenations=skip_n_concatenations,
padding=padding,
dilation_rate_mult=dilation_rate_mult,
activation=activation,
use_residuals=use_residuals,
final_pred_activation=final_pred_activation,
nb_conv_per_level=nb_conv_per_level,
batch_norm=batch_norm,
layer_nb_feats=lnf,
conv_dropout=conv_dropout,
input_model=enc_model)
final_model = dec_model
return final_model
def conv_enc(nb_features,
input_shape,
nb_levels,
conv_size,
name=None,
prefix=None,
feat_mult=1,
pool_size=2,
dilation_rate_mult=1,
padding='same',
activation='elu',
layer_nb_feats=None,
use_residuals=False,
nb_conv_per_level=2,
conv_dropout=0,
batch_norm=None,
input_model=None):
"""
Fully Convolutional Encoder. Copied with permission from github.com/adalca/neurite.
"""
# naming
model_name = name
if prefix is None:
prefix = model_name
# first layer: input
name = '%s_input' % prefix
if input_model is None:
input_tensor = keras.layers.Input(shape=input_shape, name=name)
last_tensor = input_tensor
else:
input_tensor = input_model.inputs
last_tensor = input_model.outputs
if isinstance(last_tensor, list):
last_tensor = last_tensor[0]
# volume size data
ndims = len(input_shape) - 1
if isinstance(pool_size, int):
pool_size = (pool_size,) * ndims
# prepare layers
convL = getattr(keras.layers, 'Conv%dD' % ndims)
conv_kwargs = {'padding': padding, 'activation': activation, 'data_format': 'channels_last'}
maxpool = getattr(keras.layers, 'MaxPooling%dD' % ndims)
# down arm:
# add nb_levels of conv + ReLu + conv + ReLu. Pool after each of first nb_levels - 1 layers
lfidx = 0 # level feature index
for level in range(nb_levels):
lvl_first_tensor = last_tensor
nb_lvl_feats = np.round(nb_features * feat_mult ** level).astype(int)
conv_kwargs['dilation_rate'] = dilation_rate_mult ** level
for conv in range(nb_conv_per_level):
# does several conv per level, max pooling applied at the end
if layer_nb_feats is not None: # None or List of all the feature numbers
nb_lvl_feats = layer_nb_feats[lfidx]
lfidx += 1
name = '%s_conv_downarm_%d_%d' % (prefix, level, conv)
if conv < (nb_conv_per_level - 1) or (not use_residuals):
last_tensor = \
convL(nb_lvl_feats, conv_size, **conv_kwargs, name=name)(last_tensor)
else: # no activation
last_tensor = \
convL(nb_lvl_feats, conv_size, padding=padding, name=name)(last_tensor)
if conv_dropout > 0:
# conv dropout along feature space only
name = '%s_dropout_downarm_%d_%d' % (prefix, level, conv)
noise_shape = [None, *[1] * ndims, nb_lvl_feats]
last_tensor = keras.layers.Dropout(
conv_dropout, noise_shape=noise_shape, name=name)(last_tensor)
if use_residuals:
convarm_layer = last_tensor
# the "add" layer is the original input
# However, it may not have the right number of features to be added
nb_feats_in = lvl_first_tensor.get_shape()[-1]
nb_feats_out = convarm_layer.get_shape()[-1]
add_layer = lvl_first_tensor
if nb_feats_in > 1 and nb_feats_out > 1 and (nb_feats_in != nb_feats_out):
name = '%s_expand_down_merge_%d' % (prefix, level)
last_tensor = convL(
nb_lvl_feats, conv_size, **conv_kwargs, name=name)(lvl_first_tensor)
add_layer = last_tensor
if conv_dropout > 0:
name = '%s_dropout_down_merge_%d_%d' % (prefix, level, conv)
noise_shape = [None, *[1] * ndims, nb_lvl_feats]
name = '%s_res_down_merge_%d' % (prefix, level)
last_tensor = keras.layers.add([add_layer, convarm_layer], name=name)
name = '%s_res_down_merge_act_%d' % (prefix, level)
last_tensor = keras.layers.Activation(activation, name=name)(last_tensor)
if batch_norm is not None:
name = '%s_bn_down_%d' % (prefix, level)
last_tensor = keras.layers.BatchNormalization(axis=batch_norm, name=name)(last_tensor)
# max pool if we're not at the last level
if level < (nb_levels - 1):
name = '%s_maxpool_%d' % (prefix, level)
last_tensor = maxpool(pool_size=pool_size, name=name, padding=padding)(last_tensor)
# create the model and return
model = keras.Model(inputs=input_tensor, outputs=[last_tensor], name=model_name)
return model
def conv_dec(nb_features,
input_shape,
nb_levels,
conv_size,
nb_labels,
name=None,
prefix=None,
feat_mult=1,
pool_size=2,
use_skip_connections=False,
skip_n_concatenations=0,
padding='same',
dilation_rate_mult=1,
activation='elu',
use_residuals=False,
final_pred_activation='softmax',
nb_conv_per_level=2,
layer_nb_feats=None,
batch_norm=None,
conv_dropout=0,
input_model=None):
"""
Fully Convolutional Decoder. Copied with permission from github.com/adalca/neurite.
Parameters:
...
use_skip_connections (bool): if true, turns an Enc-Dec to a U-Net.
If true, input_tensor and tensors are required.
It assumes a particular naming of layers. conv_enc...
"""
# naming
model_name = name
if prefix is None:
prefix = model_name
# if using skip connections, make sure need to use them.
if use_skip_connections:
assert input_model is not None, "is using skip connections, tensors dictionary is required"
# first layer: input
input_name = '%s_input' % prefix
if input_model is None:
input_tensor = keras.layers.Input(shape=input_shape, name=input_name)
last_tensor = input_tensor
else:
input_tensor = input_model.input
last_tensor = input_model.output
input_shape = last_tensor.shape.as_list()[1:]
# vol size info
ndims = len(input_shape) - 1
if isinstance(pool_size, int):
if ndims > 1:
pool_size = (pool_size,) * ndims
# prepare layers
convL = getattr(keras.layers, 'Conv%dD' % ndims)
conv_kwargs = {'padding': padding, 'activation': activation}
upsample = getattr(keras.layers, 'UpSampling%dD' % ndims)
# up arm:
# nb_levels - 1 layers of Deconvolution3D
# (approx via up + conv + ReLu) + merge + conv + ReLu + conv + ReLu
lfidx = 0
for level in range(nb_levels - 1):
nb_lvl_feats = np.round(nb_features * feat_mult ** (nb_levels - 2 - level)).astype(int)
conv_kwargs['dilation_rate'] = dilation_rate_mult ** (nb_levels - 2 - level)
# upsample matching the max pooling layers size
name = '%s_up_%d' % (prefix, nb_levels + level)
last_tensor = upsample(size=pool_size, name=name)(last_tensor)
up_tensor = last_tensor
# merge layers combining previous layer
if use_skip_connections & (level < (nb_levels - skip_n_concatenations - 1)):
conv_name = \
'%s_conv_downarm_%d_%d' % (prefix, nb_levels - 2 - level, nb_conv_per_level - 1)
cat_tensor = input_model.get_layer(conv_name).output
name = '%s_merge_%d' % (prefix, nb_levels + level)
last_tensor = keras.layers.concatenate(
[cat_tensor, last_tensor], axis=ndims + 1, name=name)
# convolution layers
for conv in range(nb_conv_per_level):
if layer_nb_feats is not None:
nb_lvl_feats = layer_nb_feats[lfidx]
lfidx += 1
name = '%s_conv_uparm_%d_%d' % (prefix, nb_levels + level, conv)
if conv < (nb_conv_per_level - 1) or (not use_residuals):
last_tensor = convL(
nb_lvl_feats, conv_size, **conv_kwargs, name=name)(last_tensor)
else:
last_tensor = convL(
nb_lvl_feats, conv_size, padding=padding, name=name)(last_tensor)
if conv_dropout > 0:
name = '%s_dropout_uparm_%d_%d' % (prefix, level, conv)
noise_shape = [None, *[1] * ndims, nb_lvl_feats]
last_tensor = keras.layers.Dropout(
conv_dropout, noise_shape=noise_shape, name=name)(last_tensor)
# residual block
if use_residuals:
# the "add" layer is the original input
# However, it may not have the right number of features to be added
add_layer = up_tensor
nb_feats_in = add_layer.get_shape()[-1]
nb_feats_out = last_tensor.get_shape()[-1]
if nb_feats_in > 1 and nb_feats_out > 1 and (nb_feats_in != nb_feats_out):
name = '%s_expand_up_merge_%d' % (prefix, level)
add_layer = convL(nb_lvl_feats, conv_size, **conv_kwargs, name=name)(add_layer)
if conv_dropout > 0:
name = '%s_dropout_up_merge_%d_%d' % (prefix, level, conv)
noise_shape = [None, *[1] * ndims, nb_lvl_feats]
last_tensor = keras.layers.Dropout(
conv_dropout, noise_shape=noise_shape, name=name)(last_tensor)
name = '%s_res_up_merge_%d' % (prefix, level)
last_tensor = keras.layers.add([last_tensor, add_layer], name=name)
name = '%s_res_up_merge_act_%d' % (prefix, level)
last_tensor = keras.layers.Activation(activation, name=name)(last_tensor)
if batch_norm is not None:
name = '%s_bn_up_%d' % (prefix, level)
last_tensor = keras.layers.BatchNormalization(axis=batch_norm, name=name)(last_tensor)
# Compute likelyhood prediction (no activation yet)
name = '%s_likelihood' % prefix
last_tensor = convL(nb_labels, 1, activation=None, name=name)(last_tensor)
like_tensor = last_tensor
# output prediction layer
# we use a softmax to compute P(L_x|I) where x is each location
if final_pred_activation == 'softmax':
name = '%s_prediction' % prefix
pred_tensor = keras.layers.Lambda(
lambda x: keras.activations.softmax(x, axis=ndims + 1),
name=name
)(last_tensor)
# otherwise create a layer that does nothing.
else:
name = '%s_prediction' % prefix
pred_tensor = keras.layers.Activation('linear', name=name)(like_tensor)
# create the model and retun
model = keras.Model(inputs=input_tensor, outputs=pred_tensor, name=model_name)
return model
# ================================================================================================
# Lab2Im Utilities
# ================================================================================================
# ---------------------------------------------- loading/saving functions ------------------------
def load_volume(path_volume, im_only=True, squeeze=True, dtype=None, aff_ref=None):
"""
Load volume file.
:param path_volume: path of the volume to load. Can either be a nii, nii.gz, mgz, or npz format.
If npz format, 1) the variable name is assumed to be 'vol_data',
2) the volume is associated with a identity affine matrix and blank header.
:param im_only: (optional) if False, the function also returns the affine matrix and
header of the volume.
:param squeeze: (optional) whether to squeeze the volume when loading.
:param dtype: (optional) if not None, convert the loaded volume to this numpy dtype.
:param aff_ref: (optional) If not None, the loaded volume is aligned to this affine matrix.
The returned affine matrix is also given in this new space. Must be a numpy array of
dimension 4x4.
:return: the volume, with corresponding affine matrix and header if im_only is False.
"""
assert path_volume.endswith(
('.nii', '.nii.gz', '.mgz', '.npz')), 'Unknown data file: %s' % path_volume
if path_volume.endswith(('.nii', '.nii.gz', '.mgz')):
x = nib.load(path_volume)
if squeeze:
volume = np.squeeze(x.get_fdata())
else:
volume = x.get_fdata()
aff = x.affine
header = x.header
else: # npz
volume = np.load(path_volume)['vol_data']
if squeeze:
volume = np.squeeze(volume)
aff = np.eye(4)
header = nib.Nifti1Header()
if dtype is not None:
if 'int' in dtype:
volume = np.round(volume)
volume = volume.astype(dtype=dtype)
# align image to reference affine matrix
if aff_ref is not None:
n_dims, _ = get_dims(list(volume.shape), max_channels=10)
volume, aff = align_volume_to_ref(
volume, aff, aff_ref=aff_ref, return_aff=True, n_dims=n_dims)
if im_only:
return volume
else:
return volume, aff, header
def save_volume(volume, aff, header, path, res=None, dtype=None, n_dims=3):
"""
Save a volume.
:param volume: volume to save
:param aff: affine matrix of the volume to save. If aff is None, the volume is saved
with an identity affine matrix. aff can also be set to 'FS', in which case the
volume is saved with the affine matrix of FreeSurfer outputs.
:param header: header of the volume to save. If None, the volume is saved with a blank header.
:param path: path where to save the volume.
:param res: (optional) update the resolution in the header before saving the volume.
:param dtype: (optional) numpy dtype for the saved volume.
:param n_dims: (optional) number of dimensions, to avoid confusion in multi-channel case.
Default is None, where
n_dims is automatically inferred.
"""
mkdir(os.path.dirname(path))
if '.npz' in path:
np.savez_compressed(path, vol_data=volume)
else:
if header is None:
header = nib.Nifti1Header()
if isinstance(aff, str):
if aff == 'FS':
aff = np.array([[-1, 0, 0, 0], [0, 0, 1, 0], [0, -1, 0, 0], [0, 0, 0, 1]])
elif aff is None:
aff = np.eye(4)
nifty = nib.Nifti1Image(volume, aff, header)
if dtype is not None:
if 'int' in dtype:
volume = np.round(volume)
volume = volume.astype(dtype=dtype)
nifty.set_data_dtype(dtype)
if res is not None:
if n_dims is None:
n_dims, _ = get_dims(volume.shape)
res = reformat_to_list(res, length=n_dims, dtype=None)
nifty.header.set_zooms(res)
nib.save(nifty, path)
def get_volume_info(path_volume, return_volume=False, aff_ref=None, max_channels=10):
"""
Gather information about a volume: shape, affine matrix, number of dimensions and channels,
header, and resolution.
:param path_volume: path of the volume to get information form.
:param return_volume: (optional) whether to return the volume along with the information.
:param aff_ref: (optional) If not None, the loaded volume is aligned to this affine matrix.
All info relative to the volume is then given in this new space. Must be a numpy array
of dimension 4x4.
:return: volume (if return_volume is true), and corresponding info.
If aff_ref is not None, the returned aff is the original one, i.e. the affine of the
image before being aligned to aff_ref.
"""
# read image
im, aff, header = load_volume(path_volume, im_only=False)
# understand if image is multichannel
im_shape = list(im.shape)
n_dims, n_channels = get_dims(im_shape, max_channels=max_channels)
im_shape = im_shape[:n_dims]
# get labels res
if '.nii' in path_volume:
data_res = np.array(header['pixdim'][1:n_dims + 1])
elif '.mgz' in path_volume:
data_res = np.array(header['delta']) # mgz image
else:
data_res = np.array([1.0] * n_dims)
# align to given affine matrix
if aff_ref is not None:
ras_axes = get_ras_axes(aff, n_dims=n_dims)
ras_axes_ref = get_ras_axes(aff_ref, n_dims=n_dims)
im = align_volume_to_ref(im, aff, aff_ref=aff_ref, n_dims=n_dims)
im_shape = np.array(im_shape)
data_res = np.array(data_res)
im_shape[ras_axes_ref] = im_shape[ras_axes]
data_res[ras_axes_ref] = data_res[ras_axes]
im_shape = im_shape.tolist()
# return info
if return_volume:
return im, im_shape, aff, n_dims, n_channels, header, data_res
else:
return im_shape, aff, n_dims, n_channels, header, data_res
def get_list_labels(label_list=None, labels_dir=None, save_label_list=None, FS_sort=False):
"""This function reads or computes a list of all label values used in a set of label maps.
It can also sort all labels according to FreeSurfer lut.
:param label_list: (optional) already computed label_list.
Can be a sequence, a 1d numpy array, or the path to
a numpy 1d array.
:param labels_dir: (optional) if path_label_list is None, the label list is computed by
reading all the label maps
in the given folder. Can also be the path to a single label map.
:param save_label_list: (optional) path where to save the label list.
:param FS_sort: (optional) whether to sort label values according to
the FreeSurfer classification.
If true, the label values will be ordered as follows: neutral labels first
(i.e. non-sided), left-side labels,
and right-side labels. If FS_sort is True, this function also returns the number of
neutral labels in label_list.
:return: the label list (numpy 1d array), and the number of neutral (i.e. non-sided)
labels if FS_sort is True.
If one side of the brain is not represented at all in label_list, all labels are
considered as neutral, and
n_neutral_labels = len(label_list).
"""
# load label list if previously computed
if label_list is not None:
label_list = np.array(reformat_to_list(label_list, load_as_numpy=True, dtype='int'))
# compute label list from all label files
elif labels_dir is not None:
# print('Compiling list of unique labels')
# go through all labels files and compute unique list of labels
labels_paths = list_images_in_folder(labels_dir)
label_list = np.empty(0)
loop_info = LoopInfo(len(labels_paths), 10, 'processing', print_time=True)
for lab_idx, path in enumerate(labels_paths):
loop_info.update(lab_idx)
y = load_volume(path, dtype='int32')
y_unique = np.unique(y)
label_list = np.unique(np.concatenate((label_list, y_unique))).astype('int')
else:
raise Exception('either label_list, path_label_list or labels_dir should be provided')
# sort labels in neutral/left/right according to FS labels
n_neutral_labels = 0
if FS_sort:
neutral_FS_labels = [
0, 14, 15, 16, 21, 22, 23, 24, 72, 77, 80, 85, 100, 101, 102, 103, 104, 105, 106, 107,
108, 109, 165, 200, 201, 202, 203, 204, 205, 206, 207, 208, 209, 210, 251, 252, 253,
254, 255, 258, 259, 260, 331, 332, 333, 334, 335, 336, 337, 338, 339, 340, 502, 506,
507, 508, 509, 511, 512, 514, 515, 516, 517, 530, 531, 532, 533, 534, 535, 536, 537
]
neutral = list()
left = list()
right = list()
for la in label_list:
if la in neutral_FS_labels:
if la not in neutral:
neutral.append(la)
elif ((0 < la < 14) | (16 < la < 21) | (24 < la < 40) | (135 < la < 139) |
(1000 <= la <= 1035) | (la == 865) | (20100 < la < 20110)):
if la not in left:
left.append(la)
elif ((39 < la < 72) | (162 < la < 165) | (2000 <= la <= 2035) |
(20000 < la < 20010) | (la == 139) | (la == 866)):
if la not in right:
right.append(la)
else:
raise Exception('label {} not in our current FS classification, '
'please update get_list_labels in utils.py'.format(la))
label_list = np.concatenate([sorted(neutral), sorted(left), sorted(right)])
if ((len(left) > 0) & (len(right) > 0)) | ((len(left) == 0) & (len(right) == 0)):
n_neutral_labels = len(neutral)
else:
n_neutral_labels = len(label_list)
# save labels if specified
if save_label_list is not None:
np.save(save_label_list, np.int32(label_list))
if FS_sort:
return np.int32(label_list), n_neutral_labels
else:
return np.int32(label_list), None
def load_array_if_path(var, load_as_numpy=True):
"""If var is a string and load_as_numpy is True, this function loads the array writen at the
path indicated by var.
Otherwise it simply returns var as it is."""
if (isinstance(var, str)) & load_as_numpy:
assert os.path.isfile(var), 'No such path: %s' % var
var = np.load(var)
return var
# ----------------------------------------------- reformatting functions --------------------------
def reformat_to_list(var, length=None, load_as_numpy=False, dtype=None):
"""This function takes a variable and reformat it into a list of desired
length and type (int, float, bool, str).
If variable is a string, and load_as_numpy is True, it will be loaded as a numpy array.
If variable is None, this funtion returns None.
:param var: a str, int, float, list, tuple, or numpy array
:param length: (optional) if var is a single item, it will be replicated to a
list of this length
:param load_as_numpy: (optional) whether var is the path to a numpy array
:param dtype: (optional) convert all item to this type. Can be 'int', 'float', 'bool', or 'str'
:return: reformated list
"""
# convert to list
if var is None:
return None
var = load_array_if_path(var, load_as_numpy=load_as_numpy)
var_list = (
int, float, np.int8, np.int16, np.int32, np.int64, np.float16,
np.float32, np.float64, np.float128
)
if isinstance(var, var_list):
var = [var]
elif isinstance(var, tuple):
var = list(var)
elif isinstance(var, np.ndarray):
if var.shape == (1,):
var = [var[0]]
else:
var = np.squeeze(var).tolist()
elif isinstance(var, str):
var = [var]
elif isinstance(var, bool):
var = [var]
if isinstance(var, list):
if length is not None:
if len(var) == 1:
var = var * length
elif len(var) != length:
raise ValueError(
'if var is a list/tuple/numpy array, it should be of length 1 or {0}, '
'had {1}'.format(length, var))
else:
raise TypeError(
'var should be an int, float, tuple, list, numpy array, or path to numpy array')
# convert items type
if dtype is not None:
if dtype == 'int':
var = [int(v) for v in var]
elif dtype == 'float':
var = [float(v) for v in var]
elif dtype == 'bool':
var = [bool(v) for v in var]
elif dtype == 'str':
var = [str(v) for v in var]
else:
raise ValueError(
"dtype should be 'str', 'float', 'int', or 'bool'; had {}".format(dtype))
return var
# ----------------------------------------------- path-related functions --------------------------
def list_images_in_folder(path_dir, include_single_image=True, check_if_empty=True):
"""List all files with extension nii, nii.gz, mgz, or npz whithin a folder."""
basename = os.path.basename(path_dir)
if include_single_image & (
('.nii.gz' in basename) | ('.nii' in basename) |
('.mgz' in basename) | ('.npz' in basename)
):
assert os.path.isfile(path_dir), 'file %s does not exist' % path_dir
list_images = [path_dir]
else:
if os.path.isdir(path_dir):
list_images = sorted(glob.glob(os.path.join(path_dir, '*nii.gz')) +
glob.glob(os.path.join(path_dir, '*nii')) +
glob.glob(os.path.join(path_dir, '*.mgz')) +
glob.glob(os.path.join(path_dir, '*.npz')))
else:
raise Exception('Folder does not exist: %s' % path_dir)
if check_if_empty:
assert len(list_images) > 0, \
'no .nii, .nii.gz, .mgz or .npz image could be found in %s' % path_dir
return list_images
def mkdir(path_dir):
"""Recursively creates the current dir as well as its parent folders
if they do not already exist."""
if path_dir[-1] == '/':
path_dir = path_dir[:-1]
if not os.path.isdir(path_dir):
list_dir_to_create = [path_dir]
while not os.path.isdir(os.path.dirname(list_dir_to_create[-1])):
list_dir_to_create.append(os.path.dirname(list_dir_to_create[-1]))
for dir_to_create in reversed(list_dir_to_create):
os.mkdir(dir_to_create)
# ---------------------------------------------- shape-related functions --------------------
def get_dims(shape, max_channels=10):
"""Get the number of dimensions and channels from the shape of an array.
The number of dimensions is assumed to be the length of the shape, as long as the shape
of the last dimension is
inferior or equal to max_channels (default 3).
:param shape: shape of an array. Can be a sequence or a 1d numpy array.
:param max_channels: maximum possible number of channels.
:return: the number of dimensions and channels associated with the provided shape.
example 1: get_dims([150, 150, 150], max_channels=10) = (3, 1)
example 2: get_dims([150, 150, 150, 3], max_channels=10) = (3, 3)
example 3: get_dims([150, 150, 150, 15], max_channels=10) = (4, 1), because 5>3"""
if shape[-1] <= max_channels:
n_dims = len(shape) - 1
n_channels = shape[-1]
else:
n_dims = len(shape)
n_channels = 1
return n_dims, n_channels
def add_axis(x, axis=0):
"""Add axis to a numpy array.
:param axis: index of the new axis to add. Can also be a list of indices to add
several axes at the same time."""
axis = reformat_to_list(axis)
for ax in axis:
x = np.expand_dims(x, axis=ax)
return x
# --------------------------------------------------- miscellaneous --------------------
def infer(x):
''' Try to parse input to float. If it fails, tries boolean, and otherwise keep it as string '''
try:
x = float(x)
except ValueError:
if x == 'False':
x = False
elif x == 'True':
x = True
elif not isinstance(x, str):
raise TypeError('input should be an int/float/boolean/str, had {}'.format(type(x)))
return x
class LoopInfo:
"""
Class to print the current iteration in a for loop, and optionally the estimated remaining time.
Instantiate just before the loop, and call the update method at the start of the loop.
The printed text has the following format:
processing i/total remaining time: hh:mm:ss
"""
def __init__(self, n_iterations, spacing=10, text='processing', print_time=False):
"""
:param n_iterations: total number of iterations of the for loop.
:param spacing: frequency at which the update info will be printed on screen.
:param text: text to print. Default is processing.
:param print_time: whether to print the estimated remaining time. Default is False.
"""
# loop parameters
self.n_iterations = n_iterations
self.spacing = spacing
# text parameters
self.text = text
self.print_time = print_time
self.print_previous_time = False
self.align = len(str(self.n_iterations)) * 2 + 1 + 3
# timing parameters
self.iteration_durations = np.zeros((n_iterations,))
self.start = time.time()
self.previous = time.time()
def update(self, idx):
# time iteration
now = time.time()
self.iteration_durations[idx] = now - self.previous
self.previous = now
# print text
if idx == 0:
logging.debug(self.text + ' 1/{}'.format(self.n_iterations))
elif idx % self.spacing == self.spacing - 1:
iteration = str(idx + 1) + '/' + str(self.n_iterations)
if self.print_time:
# estimate remaining time
max_duration = np.max(self.iteration_durations)
average_duration = np.mean(
self.iteration_durations[self.iteration_durations > .01 * max_duration])
remaining_time = int(average_duration * (self.n_iterations - idx))
# print total remaining time only if it is greater than 1s or
# if it was previously printed
if (remaining_time > 1) | self.print_previous_time:
eta = str(timedelta(seconds=remaining_time))
logging.debug(self.text + ' {:<{x}} remaining time: {}'.format(
iteration, eta, x=self.align))
self.print_previous_time = True
else:
logging.debug(self.text + ' {}'.format(iteration))
else:
pass
logging.debug(self.text + ' {}'.format(iteration))
def get_mapping_lut(source, dest=None):
"""This functions returns the look-up table to map a list of N values (source)
to another list (dest).
If the second list is not given, we assume it is equal to [0, ..., N-1]."""
# initialise
source = np.array(reformat_to_list(source), dtype='int32')
n_labels = source.shape[0]
# build new label list if neccessary
if dest is None:
dest = np.arange(n_labels, dtype='int32')
else:
assert len(source) == len(dest), 'label_list and new_label_list should have the same length'
dest = np.array(reformat_to_list(dest, dtype='int'))
# build look-up table
lut = np.zeros(np.max(source) + 1, dtype='int32')
for source, dest in zip(source, dest):
lut[source] = dest
return lut
def find_closest_number_divisible_by_m(n, m, answer_type='lower'):
"""
Return the closest integer to n that is divisible by m. answer_type can either be 'closer',
'lower' (only returns values lower than n), or 'higher (only returns values higher than m).
"""
if n % m == 0:
return n
else:
q = int(n / m)
lower = q * m
higher = (q + 1) * m
if answer_type == 'lower':
return lower
elif answer_type == 'higher':
return higher
elif answer_type == 'closer':
return lower if (n - lower) < (higher - n) else higher
else:
raise Exception(
'answer_type should be lower, higher, or closer, had : %s' % answer_type)
def build_binary_structure(connectivity, n_dims, shape=None):
"""Return a dilation/erosion element with provided connectivity"""
if shape is None:
shape = [connectivity * 2 + 1] * n_dims
else:
shape = reformat_to_list(shape, length=n_dims)
dist = np.ones(shape)
center = tuple([tuple([int(s / 2)]) for s in shape])
dist[center] = 0
dist = distance_transform_edt(dist)
struct = (dist <= connectivity) * 1
return struct
# ---------------------------------------------------- edit volume ----------------------------
def mask_volume(
volume, mask=None, threshold=0.1, dilate=0, erode=0, fill_holes=False,
masking_value=0, return_mask=False, return_copy=True
):
"""Mask a volume, either with a given mask, or by keeping only the values above a threshold.
:param volume: a numpy array, possibly with several channels
:param mask: (optional) a numpy array to mask volume with.
Mask doesn't have to be a 0/1 array, all strictly positive values of mask are considered for
masking volume.
Mask should have the same size as volume. If volume has several channels, mask can either be
uni- or multi-channel. In the first case, the same mask is applied to all channels.
:param threshold: (optional) If mask is None, masking is performed by keeping thresholding
the input.
:param dilate: (optional) number of voxels by which to dilate the provided or computed mask.
:param erode: (optional) number of voxels by which to erode the provided or computed mask.
:param fill_holes: (optional) whether to fill the holes in the provided or computed mask.
:param masking_value: (optional) masking value
:param return_mask: (optional) whether to return the applied mask
:return: the masked volume, and the applied mask if return_mask is True.
"""
# get info
new_volume = volume.copy() if return_copy else volume
vol_shape = list(new_volume.shape)
n_dims, n_channels = get_dims(vol_shape)
# get mask and erode/dilate it
if mask is None:
mask = new_volume >= threshold
else:
assert list(mask.shape[:n_dims]) == vol_shape[:n_dims], \
'mask should have shape {0}, or {1}, had {2}'.format(
vol_shape[:n_dims], vol_shape[:n_dims] + [n_channels], list(mask.shape)
)
mask = mask > 0
if dilate > 0:
dilate_struct = build_binary_structure(dilate, n_dims)
mask_to_apply = binary_dilation(mask, dilate_struct)
else:
mask_to_apply = mask
if erode > 0:
erode_struct = build_binary_structure(erode, n_dims)
mask_to_apply = binary_erosion(mask_to_apply, erode_struct)
if fill_holes:
mask_to_apply = binary_fill_holes(mask_to_apply)
# replace values outside of mask by padding_char
if mask_to_apply.shape == new_volume.shape:
new_volume[np.logical_not(mask_to_apply)] = masking_value
else:
new_volume[np.stack([np.logical_not(mask_to_apply)] * n_channels, axis=-1)] = masking_value
if return_mask:
return new_volume, mask_to_apply
else:
return new_volume
def rescale_volume(
volume, new_min=0, new_max=255, min_percentile=2., max_percentile=98.,
use_positive_only=False
):
"""This function linearly rescales a volume between new_min and new_max.
:param volume: a numpy array
:param new_min: (optional) minimum value for the rescaled image.
:param new_max: (optional) maximum value for the rescaled image.
:param min_percentile: (optional) percentile for estimating robust minimum of volume
(float in [0,...100]), where 0 = np.min
:param max_percentile: (optional) percentile for estimating robust maximum of volume
(float in [0,...100]), where 100 = np.max
:param use_positive_only: (optional) whether to use only positive values when estimating
the min and max percentile.
:return: rescaled volume
"""
# select only positive intensities
new_volume = volume.copy()
intensities = new_volume[new_volume > 0] if use_positive_only else new_volume.flatten()
# define min and max intensities in original image for normalisation
robust_min = np.min(intensities) \
if min_percentile == 0 else np.percentile(intensities, min_percentile)
robust_max = np.max(intensities) \
if max_percentile == 100 else np.percentile(intensities, max_percentile)
# trim values outside range
new_volume = np.clip(new_volume, robust_min, robust_max)
# rescale image
if robust_min != robust_max:
return new_min + (new_volume - robust_min) / (robust_max - robust_min) * (new_max - new_min)
else: # avoid dividing by zero
return np.zeros_like(new_volume)
def crop_volume(
volume, cropping_margin=None, cropping_shape=None, aff=None,
return_crop_idx=False, mode='center'
):
"""Crop volume by a given margin, or to a given shape.
:param volume: 2d or 3d numpy array (possibly with multiple channels)