diff --git a/deepmd/dpmodel/descriptor/__init__.py b/deepmd/dpmodel/descriptor/__init__.py index 5c3987e1c5..de22757647 100644 --- a/deepmd/dpmodel/descriptor/__init__.py +++ b/deepmd/dpmodel/descriptor/__init__.py @@ -23,11 +23,15 @@ from .se_t import ( DescrptSeT, ) +from .se_t_tebd import ( + DescrptSeTTebd, +) __all__ = [ "DescrptSeA", "DescrptSeR", "DescrptSeT", + "DescrptSeTTebd", "DescrptDPA1", "DescrptSeAttenV2", "DescrptDPA2", diff --git a/deepmd/dpmodel/descriptor/se_t_tebd.py b/deepmd/dpmodel/descriptor/se_t_tebd.py new file mode 100644 index 0000000000..b6e362d2d7 --- /dev/null +++ b/deepmd/dpmodel/descriptor/se_t_tebd.py @@ -0,0 +1,747 @@ +# SPDX-License-Identifier: LGPL-3.0-or-later +from typing import ( + Callable, + List, + Optional, + Tuple, + Union, +) + +import numpy as np + +from deepmd.dpmodel import ( + PRECISION_DICT, + NativeOP, +) +from deepmd.dpmodel.utils import ( + EmbeddingNet, + EnvMat, + NetworkCollection, + PairExcludeMask, +) +from deepmd.dpmodel.utils.seed import ( + child_seed, +) +from deepmd.dpmodel.utils.type_embed import ( + TypeEmbedNet, +) +from deepmd.dpmodel.utils.update_sel import ( + UpdateSel, +) +from deepmd.env import ( + GLOBAL_NP_FLOAT_PRECISION, +) +from deepmd.utils.data_system import ( + DeepmdDataSystem, +) +from deepmd.utils.finetune import ( + get_index_between_two_maps, + map_pair_exclude_types, +) +from deepmd.utils.path import ( + DPPath, +) +from deepmd.utils.version import ( + check_version_compatibility, +) + +from .base_descriptor import ( + BaseDescriptor, +) +from .descriptor import ( + DescriptorBlock, + extend_descrpt_stat, +) + + +@BaseDescriptor.register("se_e3_tebd") +class DescrptSeTTebd(NativeOP, BaseDescriptor): + r"""Construct an embedding net that takes angles between two neighboring atoms and type embeddings as input. + + Parameters + ---------- + rcut + The cut-off radius + rcut_smth + From where the environment matrix should be smoothed + sel : Union[List[int], int] + list[int]: sel[i] specifies the maxmum number of type i atoms in the cut-off radius + int: the total maxmum number of atoms in the cut-off radius + ntypes : int + Number of element types + neuron : list[int] + Number of neurons in each hidden layers of the embedding net + tebd_dim : int + Dimension of the type embedding + tebd_input_mode : str + The input mode of the type embedding. Supported modes are ["concat", "strip"]. + - "concat": Concatenate the type embedding with the smoothed angular information as the union input for the embedding network. + - "strip": Use a separated embedding network for the type embedding and combine the output with the angular embedding network output. + resnet_dt + Time-step `dt` in the resnet construction: + y = x + dt * \phi (Wx + b) + set_davg_zero + Set the shift of embedding net input to zero. + activation_function + The activation function in the embedding net. Supported options are |ACTIVATION_FN| + env_protection: float + Protection parameter to prevent division by zero errors during environment matrix calculations. + exclude_types : List[Tuple[int, int]] + The excluded pairs of types which have no interaction with each other. + For example, `[[0, 1]]` means no interaction between type 0 and type 1. + precision + The precision of the embedding net parameters. Supported options are |PRECISION| + trainable + If the weights of embedding net are trainable. + seed + Random seed for initializing the network parameters. + type_map: List[str], Optional + A list of strings. Give the name to each type of atoms. + concat_output_tebd: bool + Whether to concat type embedding at the output of the descriptor. + use_econf_tebd: bool, Optional + Whether to use electronic configuration type embedding. + use_tebd_bias : bool, Optional + Whether to use bias in the type embedding layer. + smooth: bool + Whether to use smooth process in calculation. + + """ + + def __init__( + self, + rcut: float, + rcut_smth: float, + sel: Union[List[int], int], + ntypes: int, + neuron: list = [2, 4, 8], + tebd_dim: int = 8, + tebd_input_mode: str = "concat", + resnet_dt: bool = False, + set_davg_zero: bool = True, + activation_function: str = "tanh", + env_protection: float = 0.0, + exclude_types: List[Tuple[int, int]] = [], + precision: str = "float64", + trainable: bool = True, + seed: Optional[Union[int, List[int]]] = None, + type_map: Optional[List[str]] = None, + concat_output_tebd: bool = True, + use_econf_tebd: bool = False, + use_tebd_bias=False, + smooth: bool = True, + ) -> None: + self.se_ttebd = DescrptBlockSeTTebd( + rcut, + rcut_smth, + sel, + ntypes, + neuron=neuron, + tebd_dim=tebd_dim, + tebd_input_mode=tebd_input_mode, + set_davg_zero=set_davg_zero, + activation_function=activation_function, + precision=precision, + resnet_dt=resnet_dt, + exclude_types=exclude_types, + env_protection=env_protection, + smooth=smooth, + seed=child_seed(seed, 0), + ) + self.use_econf_tebd = use_econf_tebd + self.type_map = type_map + self.smooth = smooth + self.type_embedding = TypeEmbedNet( + ntypes=ntypes, + neuron=[tebd_dim], + padding=True, + activation_function="Linear", + precision=precision, + use_econf_tebd=use_econf_tebd, + use_tebd_bias=use_tebd_bias, + type_map=type_map, + seed=child_seed(seed, 1), + ) + self.tebd_dim = tebd_dim + self.concat_output_tebd = concat_output_tebd + self.trainable = trainable + + def get_rcut(self) -> float: + """Returns the cut-off radius.""" + return self.se_ttebd.get_rcut() + + def get_rcut_smth(self) -> float: + """Returns the radius where the neighbor information starts to smoothly decay to 0.""" + return self.se_ttebd.get_rcut_smth() + + def get_nsel(self) -> int: + """Returns the number of selected atoms in the cut-off radius.""" + return self.se_ttebd.get_nsel() + + def get_sel(self) -> List[int]: + """Returns the number of selected atoms for each type.""" + return self.se_ttebd.get_sel() + + def get_ntypes(self) -> int: + """Returns the number of element types.""" + return self.se_ttebd.get_ntypes() + + def get_type_map(self) -> List[str]: + """Get the name to each type of atoms.""" + return self.type_map + + def get_dim_out(self) -> int: + """Returns the output dimension.""" + ret = self.se_ttebd.get_dim_out() + if self.concat_output_tebd: + ret += self.tebd_dim + return ret + + def get_dim_emb(self) -> int: + return self.se_ttebd.dim_emb + + def mixed_types(self) -> bool: + """If true, the discriptor + 1. assumes total number of atoms aligned across frames; + 2. requires a neighbor list that does not distinguish different atomic types. + + If false, the discriptor + 1. assumes total number of atoms of each atom type aligned across frames; + 2. requires a neighbor list that distinguishes different atomic types. + + """ + return self.se_ttebd.mixed_types() + + def has_message_passing(self) -> bool: + """Returns whether the descriptor has message passing.""" + return self.se_ttebd.has_message_passing() + + def need_sorted_nlist_for_lower(self) -> bool: + """Returns whether the descriptor needs sorted nlist when using `forward_lower`.""" + return self.se_ttebd.need_sorted_nlist_for_lower() + + def get_env_protection(self) -> float: + """Returns the protection of building environment matrix.""" + return self.se_ttebd.get_env_protection() + + def share_params(self, base_class, shared_level, resume=False): + """ + Share the parameters of self to the base_class with shared_level during multitask training. + If not start from checkpoint (resume is False), + some seperated parameters (e.g. mean and stddev) will be re-calculated across different classes. + """ + raise NotImplementedError + + @property + def dim_out(self): + return self.get_dim_out() + + @property + def dim_emb(self): + return self.get_dim_emb() + + def compute_input_stats(self, merged: List[dict], path: Optional[DPPath] = None): + """Update mean and stddev for descriptor elements.""" + raise NotImplementedError + + def set_stat_mean_and_stddev( + self, + mean: np.ndarray, + stddev: np.ndarray, + ) -> None: + """Update mean and stddev for descriptor.""" + self.se_ttebd.mean = mean + self.se_ttebd.stddev = stddev + + def get_stat_mean_and_stddev(self) -> Tuple[np.ndarray, np.ndarray]: + """Get mean and stddev for descriptor.""" + return self.se_ttebd.mean, self.se_ttebd.stddev + + def change_type_map( + self, type_map: List[str], model_with_new_type_stat=None + ) -> None: + """Change the type related params to new ones, according to `type_map` and the original one in the model. + If there are new types in `type_map`, statistics will be updated accordingly to `model_with_new_type_stat` for these new types. + """ + assert ( + self.type_map is not None + ), "'type_map' must be defined when performing type changing!" + remap_index, has_new_type = get_index_between_two_maps(self.type_map, type_map) + obj = self.se_ttebd + obj.ntypes = len(type_map) + self.type_map = type_map + self.type_embedding.change_type_map(type_map=type_map) + obj.reinit_exclude(map_pair_exclude_types(obj.exclude_types, remap_index)) + if has_new_type: + # the avg and std of new types need to be updated + extend_descrpt_stat( + obj, + type_map, + des_with_stat=model_with_new_type_stat.se_ttebd + if model_with_new_type_stat is not None + else None, + ) + obj["davg"] = obj["davg"][remap_index] + obj["dstd"] = obj["dstd"][remap_index] + + def call( + self, + coord_ext, + atype_ext, + nlist, + mapping: Optional[np.ndarray] = None, + ): + """Compute the descriptor. + + Parameters + ---------- + coord_ext + The extended coordinates of atoms. shape: nf x (nallx3) + atype_ext + The extended aotm types. shape: nf x nall + nlist + The neighbor list. shape: nf x nloc x nnei + mapping + The index mapping from extended to lcoal region. not used by this descriptor. + + Returns + ------- + descriptor + The descriptor. shape: nf x nloc x (ng x axis_neuron) + gr + The rotationally equivariant and permutationally invariant single particle + representation. shape: nf x nloc x ng x 3 + g2 + The rotationally invariant pair-partical representation. + this descriptor returns None + h2 + The rotationally equivariant pair-partical representation. + this descriptor returns None + sw + The smooth switch function. + """ + del mapping + nf, nloc, nnei = nlist.shape + nall = coord_ext.reshape(nf, -1).shape[1] // 3 + # nf x nall x tebd_dim + atype_embd_ext = self.type_embedding.call()[atype_ext] + # nfnl x tebd_dim + atype_embd = atype_embd_ext[:, :nloc, :] + grrg, g2, h2, rot_mat, sw = self.se_ttebd( + nlist, + coord_ext, + atype_ext, + atype_embd_ext, + mapping=None, + ) + # nf x nloc x (ng + tebd_dim) + if self.concat_output_tebd: + grrg = np.concatenate( + [grrg, atype_embd.reshape(nf, nloc, self.tebd_dim)], axis=-1 + ) + return grrg, rot_mat, None, None, sw + + def serialize(self) -> dict: + """Serialize the descriptor to dict.""" + obj = self.se_ttebd + data = { + "@class": "Descriptor", + "type": "se_e3_tebd", + "@version": 1, + "rcut": obj.rcut, + "rcut_smth": obj.rcut_smth, + "sel": obj.sel, + "ntypes": obj.ntypes, + "neuron": obj.neuron, + "tebd_dim": obj.tebd_dim, + "tebd_input_mode": obj.tebd_input_mode, + "set_davg_zero": obj.set_davg_zero, + "activation_function": obj.activation_function, + "resnet_dt": obj.resnet_dt, + "concat_output_tebd": self.concat_output_tebd, + "use_econf_tebd": self.use_econf_tebd, + "type_map": self.type_map, + # make deterministic + "precision": np.dtype(PRECISION_DICT[obj.precision]).name, + "embeddings": obj.embeddings.serialize(), + "env_mat": obj.env_mat.serialize(), + "type_embedding": self.type_embedding.serialize(), + "exclude_types": obj.exclude_types, + "env_protection": obj.env_protection, + "smooth": self.smooth, + "@variables": { + "davg": obj["davg"], + "dstd": obj["dstd"], + }, + "trainable": self.trainable, + } + if obj.tebd_input_mode in ["strip"]: + data.update({"embeddings_strip": obj.embeddings_strip.serialize()}) + return data + + @classmethod + def deserialize(cls, data: dict) -> "DescrptSeTTebd": + """Deserialize from dict.""" + data = data.copy() + check_version_compatibility(data.pop("@version"), 1, 1) + data.pop("@class") + data.pop("type") + variables = data.pop("@variables") + embeddings = data.pop("embeddings") + type_embedding = data.pop("type_embedding") + env_mat = data.pop("env_mat") + tebd_input_mode = data["tebd_input_mode"] + if tebd_input_mode in ["strip"]: + embeddings_strip = data.pop("embeddings_strip") + else: + embeddings_strip = None + obj = cls(**data) + + obj.type_embedding = TypeEmbedNet.deserialize(type_embedding) + obj.se_ttebd["davg"] = variables["davg"] + obj.se_ttebd["dstd"] = variables["dstd"] + obj.se_ttebd.embeddings = NetworkCollection.deserialize(embeddings) + if tebd_input_mode in ["strip"]: + obj.se_ttebd.embeddings_strip = NetworkCollection.deserialize( + embeddings_strip + ) + + return obj + + @classmethod + def update_sel( + cls, + train_data: DeepmdDataSystem, + type_map: Optional[List[str]], + local_jdata: dict, + ) -> Tuple[dict, Optional[float]]: + """Update the selection and perform neighbor statistics. + + Parameters + ---------- + train_data : DeepmdDataSystem + data used to do neighbor statictics + type_map : list[str], optional + The name of each type of atoms + local_jdata : dict + The local data refer to the current class + + Returns + ------- + dict + The updated local data + float + The minimum distance between two atoms + """ + local_jdata_cpy = local_jdata.copy() + min_nbor_dist, sel = UpdateSel().update_one_sel( + train_data, type_map, local_jdata_cpy["rcut"], local_jdata_cpy["sel"], True + ) + local_jdata_cpy["sel"] = sel[0] + return local_jdata_cpy, min_nbor_dist + + +@DescriptorBlock.register("se_ttebd") +class DescrptBlockSeTTebd(NativeOP, DescriptorBlock): + def __init__( + self, + rcut: float, + rcut_smth: float, + sel: Union[List[int], int], + ntypes: int, + neuron: list = [25, 50, 100], + tebd_dim: int = 8, + tebd_input_mode: str = "concat", + set_davg_zero: bool = True, + activation_function="tanh", + precision: str = "float64", + resnet_dt: bool = False, + exclude_types: List[Tuple[int, int]] = [], + env_protection: float = 0.0, + smooth: bool = True, + seed: Optional[Union[int, List[int]]] = None, + ) -> None: + self.rcut = rcut + self.rcut_smth = rcut_smth + self.neuron = neuron + self.filter_neuron = self.neuron + self.tebd_dim = tebd_dim + self.tebd_input_mode = tebd_input_mode + self.set_davg_zero = set_davg_zero + self.activation_function = activation_function + self.precision = precision + self.resnet_dt = resnet_dt + self.env_protection = env_protection + self.seed = seed + self.smooth = smooth + + if isinstance(sel, int): + sel = [sel] + + self.ntypes = ntypes + self.sel = sel + self.sec = self.sel + self.split_sel = self.sel + self.nnei = sum(sel) + self.ndescrpt = self.nnei * 4 + # order matters, placed after the assignment of self.ntypes + self.reinit_exclude(exclude_types) + + self.tebd_dim_input = self.tebd_dim * 2 + if self.tebd_input_mode in ["concat"]: + self.embd_input_dim = 1 + self.tebd_dim_input + else: + self.embd_input_dim = 1 + + self.embeddings = NetworkCollection( + ndim=0, + ntypes=self.ntypes, + network_type="embedding_network", + ) + self.embeddings[0] = EmbeddingNet( + self.embd_input_dim, + self.neuron, + self.activation_function, + self.resnet_dt, + self.precision, + seed=child_seed(seed, 0), + ) + if self.tebd_input_mode in ["strip"]: + self.embeddings_strip = NetworkCollection( + ndim=0, + ntypes=self.ntypes, + network_type="embedding_network", + ) + self.embeddings_strip[0] = EmbeddingNet( + self.tebd_dim_input, + self.neuron, + self.activation_function, + self.resnet_dt, + self.precision, + seed=child_seed(seed, 1), + ) + else: + self.embeddings_strip = None + + wanted_shape = (self.ntypes, self.nnei, 4) + self.env_mat = EnvMat(self.rcut, self.rcut_smth, protection=self.env_protection) + self.mean = np.zeros(wanted_shape, dtype=PRECISION_DICT[self.precision]) + self.stddev = np.ones(wanted_shape, dtype=PRECISION_DICT[self.precision]) + self.orig_sel = self.sel + + def get_rcut(self) -> float: + """Returns the cut-off radius.""" + return self.rcut + + def get_rcut_smth(self) -> float: + """Returns the radius where the neighbor information starts to smoothly decay to 0.""" + return self.rcut_smth + + def get_nsel(self) -> int: + """Returns the number of selected atoms in the cut-off radius.""" + return sum(self.sel) + + def get_sel(self) -> List[int]: + """Returns the number of selected atoms for each type.""" + return self.sel + + def get_ntypes(self) -> int: + """Returns the number of element types.""" + return self.ntypes + + def get_dim_in(self) -> int: + """Returns the input dimension.""" + return self.dim_in + + def get_dim_out(self) -> int: + """Returns the output dimension.""" + return self.dim_out + + def get_dim_emb(self) -> int: + """Returns the output dimension of embedding.""" + return self.filter_neuron[-1] + + def __setitem__(self, key, value): + if key in ("avg", "data_avg", "davg"): + self.mean = value + elif key in ("std", "data_std", "dstd"): + self.stddev = value + else: + raise KeyError(key) + + def __getitem__(self, key): + if key in ("avg", "data_avg", "davg"): + return self.mean + elif key in ("std", "data_std", "dstd"): + return self.stddev + else: + raise KeyError(key) + + def mixed_types(self) -> bool: + """If true, the discriptor + 1. assumes total number of atoms aligned across frames; + 2. requires a neighbor list that does not distinguish different atomic types. + + If false, the discriptor + 1. assumes total number of atoms of each atom type aligned across frames; + 2. requires a neighbor list that distinguishes different atomic types. + + """ + return True + + def get_env_protection(self) -> float: + """Returns the protection of building environment matrix.""" + return self.env_protection + + @property + def dim_out(self): + """Returns the output dimension of this descriptor.""" + return self.filter_neuron[-1] + + @property + def dim_in(self): + """Returns the atomic input dimension of this descriptor.""" + return self.tebd_dim + + @property + def dim_emb(self): + """Returns the output dimension of embedding.""" + return self.get_dim_emb() + + def compute_input_stats( + self, + merged: Union[Callable[[], List[dict]], List[dict]], + path: Optional[DPPath] = None, + ): + """Compute the input statistics (e.g. mean and stddev) for the descriptors from packed data.""" + raise NotImplementedError + + def get_stats(self): + """Get the statistics of the descriptor.""" + raise NotImplementedError + + def reinit_exclude( + self, + exclude_types: List[Tuple[int, int]] = [], + ): + self.exclude_types = exclude_types + self.emask = PairExcludeMask(self.ntypes, exclude_types=exclude_types) + + def cal_g( + self, + ss, + embedding_idx, + ): + # nfnl x nt_i x nt_j x ng + gg = self.embeddings[embedding_idx].call(ss) + return gg + + def cal_g_strip( + self, + ss, + embedding_idx, + ): + assert self.embeddings_strip is not None + # nfnl x nt_i x nt_j x ng + gg = self.embeddings_strip[embedding_idx].call(ss) + return gg + + def call( + self, + nlist: np.ndarray, + coord_ext: np.ndarray, + atype_ext: np.ndarray, + atype_embd_ext: Optional[np.ndarray] = None, + mapping: Optional[np.ndarray] = None, + ): + # nf x nloc x nnei x 4 + dmatrix, diff, sw = self.env_mat.call( + coord_ext, atype_ext, nlist, self.mean, self.stddev + ) + nf, nloc, nnei, _ = dmatrix.shape + exclude_mask = self.emask.build_type_exclude_mask(nlist, atype_ext) + # nfnl x nnei + exclude_mask = exclude_mask.reshape(nf * nloc, nnei) + # nfnl x nnei + nlist = nlist.reshape(nf * nloc, nnei) + nlist = np.where(exclude_mask, nlist, -1) + # nfnl x nnei + nlist_mask = nlist != -1 + # nfnl x nnei x 1 + sw = np.where(nlist_mask[:, :, None], sw.reshape(nf * nloc, nnei, 1), 0.0) + + # nfnl x nnei x 4 + dmatrix = dmatrix.reshape(nf * nloc, nnei, 4) + # nfnl x nnei x 4 + rr = dmatrix + rr = rr * exclude_mask[:, :, None] + # nfnl x nt_i x 3 + rr_i = rr[:, :, 1:] + # nfnl x nt_j x 3 + rr_j = rr[:, :, 1:] + # nfnl x nt_i x nt_j + env_ij = np.einsum("ijm,ikm->ijk", rr_i, rr_j) + # nfnl x nt_i x nt_j x 1 + ss = np.expand_dims(env_ij, axis=-1) + + nlist_masked = np.where(nlist_mask, nlist, 0) + index = np.tile(nlist_masked.reshape(nf, -1, 1), (1, 1, self.tebd_dim)) + # nfnl x nnei x tebd_dim + atype_embd_nlist = np.take_along_axis(atype_embd_ext, index, axis=1).reshape( + nf * nloc, nnei, self.tebd_dim + ) + # nfnl x nt_i x nt_j x tebd_dim + nlist_tebd_i = np.tile( + np.expand_dims(atype_embd_nlist, axis=2), [1, 1, self.nnei, 1] + ) + nlist_tebd_j = np.tile( + np.expand_dims(atype_embd_nlist, axis=1), [1, self.nnei, 1, 1] + ) + ng = self.neuron[-1] + + if self.tebd_input_mode in ["concat"]: + # nfnl x nt_i x nt_j x (1 + tebd_dim * 2) + ss = np.concatenate([ss, nlist_tebd_i, nlist_tebd_j], axis=-1) + # nfnl x nt_i x nt_j x ng + gg = self.cal_g(ss, 0) + elif self.tebd_input_mode in ["strip"]: + # nfnl x nt_i x nt_j x ng + gg_s = self.cal_g(ss, 0) + assert self.embeddings_strip is not None + # nfnl x nt_i x nt_j x (tebd_dim * 2) + tt = np.concatenate([nlist_tebd_i, nlist_tebd_j], axis=-1) + # nfnl x nt_i x nt_j x ng + gg_t = self.cal_g_strip(tt, 0) + if self.smooth: + gg_t = ( + gg_t + * sw.reshape(nf * nloc, self.nnei, 1, 1) + * sw.reshape(nf * nloc, 1, self.nnei, 1) + ) + # nfnl x nt_i x nt_j x ng + gg = gg_s * gg_t + gg_s + else: + raise NotImplementedError + + # nfnl x ng + res_ij = np.einsum("ijk,ijkm->im", env_ij, gg) + res_ij = res_ij * (1.0 / float(self.nnei) / float(self.nnei)) + # nf x nl x ng + result = res_ij.reshape(nf, nloc, self.filter_neuron[-1]).astype( + GLOBAL_NP_FLOAT_PRECISION + ) + return ( + result, + None, + None, + None, + sw, + ) + + def has_message_passing(self) -> bool: + """Returns whether the descriptor block has message passing.""" + return False + + def need_sorted_nlist_for_lower(self) -> bool: + """Returns whether the descriptor block needs sorted nlist when using `forward_lower`.""" + return False diff --git a/deepmd/pt/model/descriptor/__init__.py b/deepmd/pt/model/descriptor/__init__.py index b42aa98380..779e7a562c 100644 --- a/deepmd/pt/model/descriptor/__init__.py +++ b/deepmd/pt/model/descriptor/__init__.py @@ -38,6 +38,10 @@ from .se_t import ( DescrptSeT, ) +from .se_t_tebd import ( + DescrptBlockSeTTebd, + DescrptSeTTebd, +) __all__ = [ "BaseDescriptor", @@ -46,6 +50,8 @@ "DescrptBlockSeA", "DescrptBlockSeAtten", "DescrptSeAttenV2", + "DescrptSeTTebd", + "DescrptBlockSeTTebd", "DescrptSeA", "DescrptSeR", "DescrptSeT", diff --git a/deepmd/pt/model/descriptor/se_t_tebd.py b/deepmd/pt/model/descriptor/se_t_tebd.py new file mode 100644 index 0000000000..18569d2f18 --- /dev/null +++ b/deepmd/pt/model/descriptor/se_t_tebd.py @@ -0,0 +1,865 @@ +# SPDX-License-Identifier: LGPL-3.0-or-later +from typing import ( + Callable, + Dict, + List, + Optional, + Tuple, + Union, +) + +import torch + +from deepmd.dpmodel.utils import EnvMat as DPEnvMat +from deepmd.dpmodel.utils.seed import ( + child_seed, +) +from deepmd.pt.model.descriptor import ( + DescriptorBlock, +) +from deepmd.pt.model.descriptor.env_mat import ( + prod_env_mat, +) +from deepmd.pt.model.network.mlp import ( + EmbeddingNet, + NetworkCollection, +) +from deepmd.pt.model.network.network import ( + TypeEmbedNet, + TypeEmbedNetConsistent, +) +from deepmd.pt.utils import ( + env, +) +from deepmd.pt.utils.env import ( + PRECISION_DICT, + RESERVED_PRECISON_DICT, +) +from deepmd.pt.utils.env_mat_stat import ( + EnvMatStatSe, +) +from deepmd.pt.utils.exclude_mask import ( + PairExcludeMask, +) +from deepmd.pt.utils.update_sel import ( + UpdateSel, +) +from deepmd.utils.data_system import ( + DeepmdDataSystem, +) +from deepmd.utils.env_mat_stat import ( + StatItem, +) +from deepmd.utils.finetune import ( + get_index_between_two_maps, + map_pair_exclude_types, +) +from deepmd.utils.path import ( + DPPath, +) +from deepmd.utils.version import ( + check_version_compatibility, +) + +from .base_descriptor import ( + BaseDescriptor, +) +from .descriptor import ( + extend_descrpt_stat, +) + + +@BaseDescriptor.register("se_e3_tebd") +class DescrptSeTTebd(BaseDescriptor, torch.nn.Module): + r"""Construct an embedding net that takes angles between two neighboring atoms and type embeddings as input. + + Parameters + ---------- + rcut + The cut-off radius + rcut_smth + From where the environment matrix should be smoothed + sel : Union[List[int], int] + list[int]: sel[i] specifies the maxmum number of type i atoms in the cut-off radius + int: the total maxmum number of atoms in the cut-off radius + ntypes : int + Number of element types + neuron : list[int] + Number of neurons in each hidden layers of the embedding net + tebd_dim : int + Dimension of the type embedding + tebd_input_mode : str + The input mode of the type embedding. Supported modes are ["concat", "strip"]. + - "concat": Concatenate the type embedding with the smoothed angular information as the union input for the embedding network. + - "strip": Use a separated embedding network for the type embedding and combine the output with the angular embedding network output. + resnet_dt + Time-step `dt` in the resnet construction: + y = x + dt * \phi (Wx + b) + set_davg_zero + Set the shift of embedding net input to zero. + activation_function + The activation function in the embedding net. Supported options are |ACTIVATION_FN| + env_protection: float + Protection parameter to prevent division by zero errors during environment matrix calculations. + exclude_types : List[Tuple[int, int]] + The excluded pairs of types which have no interaction with each other. + For example, `[[0, 1]]` means no interaction between type 0 and type 1. + precision + The precision of the embedding net parameters. Supported options are |PRECISION| + trainable + If the weights of embedding net are trainable. + seed + Random seed for initializing the network parameters. + type_map: List[str], Optional + A list of strings. Give the name to each type of atoms. + concat_output_tebd: bool + Whether to concat type embedding at the output of the descriptor. + use_econf_tebd: bool, Optional + Whether to use electronic configuration type embedding. + use_tebd_bias : bool, Optional + Whether to use bias in the type embedding layer. + smooth: bool + Whether to use smooth process in calculation. + + """ + + def __init__( + self, + rcut: float, + rcut_smth: float, + sel: Union[List[int], int], + ntypes: int, + neuron: list = [2, 4, 8], + tebd_dim: int = 8, + tebd_input_mode: str = "concat", + resnet_dt: bool = False, + set_davg_zero: bool = True, + activation_function: str = "tanh", + env_protection: float = 0.0, + exclude_types: List[Tuple[int, int]] = [], + precision: str = "float64", + trainable: bool = True, + seed: Optional[Union[int, List[int]]] = None, + type_map: Optional[List[str]] = None, + concat_output_tebd: bool = True, + use_econf_tebd: bool = False, + use_tebd_bias=False, + smooth: bool = True, + ): + super().__init__() + self.se_ttebd = DescrptBlockSeTTebd( + rcut, + rcut_smth, + sel, + ntypes, + neuron=neuron, + tebd_dim=tebd_dim, + tebd_input_mode=tebd_input_mode, + set_davg_zero=set_davg_zero, + activation_function=activation_function, + precision=precision, + resnet_dt=resnet_dt, + exclude_types=exclude_types, + env_protection=env_protection, + smooth=smooth, + seed=child_seed(seed, 1), + ) + self.use_econf_tebd = use_econf_tebd + self.type_map = type_map + self.smooth = smooth + self.type_embedding = TypeEmbedNet( + ntypes, + tebd_dim, + precision=precision, + seed=child_seed(seed, 2), + use_econf_tebd=use_econf_tebd, + type_map=type_map, + use_tebd_bias=use_tebd_bias, + ) + self.tebd_dim = tebd_dim + self.concat_output_tebd = concat_output_tebd + self.trainable = trainable + # set trainable + for param in self.parameters(): + param.requires_grad = trainable + + def get_rcut(self) -> float: + """Returns the cut-off radius.""" + return self.se_ttebd.get_rcut() + + def get_rcut_smth(self) -> float: + """Returns the radius where the neighbor information starts to smoothly decay to 0.""" + return self.se_ttebd.get_rcut_smth() + + def get_nsel(self) -> int: + """Returns the number of selected atoms in the cut-off radius.""" + return self.se_ttebd.get_nsel() + + def get_sel(self) -> List[int]: + """Returns the number of selected atoms for each type.""" + return self.se_ttebd.get_sel() + + def get_ntypes(self) -> int: + """Returns the number of element types.""" + return self.se_ttebd.get_ntypes() + + def get_type_map(self) -> List[str]: + """Get the name to each type of atoms.""" + return self.type_map + + def get_dim_out(self) -> int: + """Returns the output dimension.""" + ret = self.se_ttebd.get_dim_out() + if self.concat_output_tebd: + ret += self.tebd_dim + return ret + + def get_dim_emb(self) -> int: + return self.se_ttebd.dim_emb + + def mixed_types(self) -> bool: + """If true, the discriptor + 1. assumes total number of atoms aligned across frames; + 2. requires a neighbor list that does not distinguish different atomic types. + + If false, the discriptor + 1. assumes total number of atoms of each atom type aligned across frames; + 2. requires a neighbor list that distinguishes different atomic types. + + """ + return self.se_ttebd.mixed_types() + + def has_message_passing(self) -> bool: + """Returns whether the descriptor has message passing.""" + return self.se_ttebd.has_message_passing() + + def need_sorted_nlist_for_lower(self) -> bool: + """Returns whether the descriptor needs sorted nlist when using `forward_lower`.""" + return self.se_ttebd.need_sorted_nlist_for_lower() + + def get_env_protection(self) -> float: + """Returns the protection of building environment matrix.""" + return self.se_ttebd.get_env_protection() + + def share_params(self, base_class, shared_level, resume=False): + """ + Share the parameters of self to the base_class with shared_level during multitask training. + If not start from checkpoint (resume is False), + some seperated parameters (e.g. mean and stddev) will be re-calculated across different classes. + """ + assert ( + self.__class__ == base_class.__class__ + ), "Only descriptors of the same type can share params!" + # For DPA1 descriptors, the user-defined share-level + # shared_level: 0 + # share all parameters in both type_embedding and se_ttebd + if shared_level == 0: + self._modules["type_embedding"] = base_class._modules["type_embedding"] + self.se_ttebd.share_params(base_class.se_ttebd, 0, resume=resume) + # shared_level: 1 + # share all parameters in type_embedding + elif shared_level == 1: + self._modules["type_embedding"] = base_class._modules["type_embedding"] + # Other shared levels + else: + raise NotImplementedError + + @property + def dim_out(self): + return self.get_dim_out() + + @property + def dim_emb(self): + return self.get_dim_emb() + + def compute_input_stats( + self, + merged: Union[Callable[[], List[dict]], List[dict]], + path: Optional[DPPath] = None, + ): + """ + Compute the input statistics (e.g. mean and stddev) for the descriptors from packed data. + + Parameters + ---------- + merged : Union[Callable[[], List[dict]], List[dict]] + - List[dict]: A list of data samples from various data systems. + Each element, `merged[i]`, is a data dictionary containing `keys`: `torch.Tensor` + originating from the `i`-th data system. + - Callable[[], List[dict]]: A lazy function that returns data samples in the above format + only when needed. Since the sampling process can be slow and memory-intensive, + the lazy function helps by only sampling once. + path : Optional[DPPath] + The path to the stat file. + + """ + return self.se_ttebd.compute_input_stats(merged, path) + + def set_stat_mean_and_stddev( + self, + mean: torch.Tensor, + stddev: torch.Tensor, + ) -> None: + """Update mean and stddev for descriptor.""" + self.se_ttebd.mean = mean + self.se_ttebd.stddev = stddev + + def get_stat_mean_and_stddev(self) -> Tuple[torch.Tensor, torch.Tensor]: + """Get mean and stddev for descriptor.""" + return self.se_ttebd.mean, self.se_ttebd.stddev + + def change_type_map( + self, type_map: List[str], model_with_new_type_stat=None + ) -> None: + """Change the type related params to new ones, according to `type_map` and the original one in the model. + If there are new types in `type_map`, statistics will be updated accordingly to `model_with_new_type_stat` for these new types. + """ + assert ( + self.type_map is not None + ), "'type_map' must be defined when performing type changing!" + remap_index, has_new_type = get_index_between_two_maps(self.type_map, type_map) + obj = self.se_ttebd + obj.ntypes = len(type_map) + self.type_map = type_map + self.type_embedding.change_type_map(type_map=type_map) + obj.reinit_exclude(map_pair_exclude_types(obj.exclude_types, remap_index)) + if has_new_type: + # the avg and std of new types need to be updated + extend_descrpt_stat( + obj, + type_map, + des_with_stat=model_with_new_type_stat.se_ttebd + if model_with_new_type_stat is not None + else None, + ) + obj["davg"] = obj["davg"][remap_index] + obj["dstd"] = obj["dstd"][remap_index] + + def serialize(self) -> dict: + obj = self.se_ttebd + data = { + "@class": "Descriptor", + "type": "se_e3_tebd", + "@version": 1, + "rcut": obj.rcut, + "rcut_smth": obj.rcut_smth, + "sel": obj.sel, + "ntypes": obj.ntypes, + "neuron": obj.neuron, + "tebd_dim": obj.tebd_dim, + "tebd_input_mode": obj.tebd_input_mode, + "set_davg_zero": obj.set_davg_zero, + "activation_function": obj.activation_function, + "resnet_dt": obj.resnet_dt, + "concat_output_tebd": self.concat_output_tebd, + "use_econf_tebd": self.use_econf_tebd, + "type_map": self.type_map, + # make deterministic + "precision": RESERVED_PRECISON_DICT[obj.prec], + "embeddings": obj.filter_layers.serialize(), + "env_mat": DPEnvMat(obj.rcut, obj.rcut_smth).serialize(), + "type_embedding": self.type_embedding.embedding.serialize(), + "exclude_types": obj.exclude_types, + "env_protection": obj.env_protection, + "smooth": self.smooth, + "@variables": { + "davg": obj["davg"].detach().cpu().numpy(), + "dstd": obj["dstd"].detach().cpu().numpy(), + }, + "trainable": self.trainable, + } + if obj.tebd_input_mode in ["strip"]: + data.update({"embeddings_strip": obj.filter_layers_strip.serialize()}) + return data + + @classmethod + def deserialize(cls, data: dict) -> "DescrptSeTTebd": + data = data.copy() + check_version_compatibility(data.pop("@version"), 1, 1) + data.pop("@class") + data.pop("type") + variables = data.pop("@variables") + embeddings = data.pop("embeddings") + type_embedding = data.pop("type_embedding") + env_mat = data.pop("env_mat") + tebd_input_mode = data["tebd_input_mode"] + if tebd_input_mode in ["strip"]: + embeddings_strip = data.pop("embeddings_strip") + else: + embeddings_strip = None + obj = cls(**data) + + def t_cvt(xx): + return torch.tensor(xx, dtype=obj.se_ttebd.prec, device=env.DEVICE) + + obj.type_embedding.embedding = TypeEmbedNetConsistent.deserialize( + type_embedding + ) + obj.se_ttebd["davg"] = t_cvt(variables["davg"]) + obj.se_ttebd["dstd"] = t_cvt(variables["dstd"]) + obj.se_ttebd.filter_layers = NetworkCollection.deserialize(embeddings) + if tebd_input_mode in ["strip"]: + obj.se_ttebd.filter_layers_strip = NetworkCollection.deserialize( + embeddings_strip + ) + return obj + + def forward( + self, + extended_coord: torch.Tensor, + extended_atype: torch.Tensor, + nlist: torch.Tensor, + mapping: Optional[torch.Tensor] = None, + comm_dict: Optional[Dict[str, torch.Tensor]] = None, + ): + """Compute the descriptor. + + Parameters + ---------- + extended_coord + The extended coordinates of atoms. shape: nf x (nallx3) + extended_atype + The extended aotm types. shape: nf x nall + nlist + The neighbor list. shape: nf x nloc x nnei + mapping + The index mapping, not required by this descriptor. + comm_dict + The data needed for communication for parallel inference. + + Returns + ------- + descriptor + The descriptor. shape: nf x nloc x (ng x axis_neuron) + gr + The rotationally equivariant and permutationally invariant single particle + representation. shape: nf x nloc x ng x 3 + g2 + The rotationally invariant pair-partical representation. + shape: nf x nloc x nnei x ng + h2 + The rotationally equivariant pair-partical representation. + shape: nf x nloc x nnei x 3 + sw + The smooth switch function. shape: nf x nloc x nnei + + """ + del mapping + nframes, nloc, nnei = nlist.shape + nall = extended_coord.view(nframes, -1).shape[1] // 3 + g1_ext = self.type_embedding(extended_atype) + g1_inp = g1_ext[:, :nloc, :] + g1, g2, h2, rot_mat, sw = self.se_ttebd( + nlist, + extended_coord, + extended_atype, + g1_ext, + mapping=None, + ) + if self.concat_output_tebd: + g1 = torch.cat([g1, g1_inp], dim=-1) + + return g1, rot_mat, g2, h2, sw + + @classmethod + def update_sel( + cls, + train_data: DeepmdDataSystem, + type_map: Optional[List[str]], + local_jdata: dict, + ) -> Tuple[dict, Optional[float]]: + """Update the selection and perform neighbor statistics. + + Parameters + ---------- + train_data : DeepmdDataSystem + data used to do neighbor statictics + type_map : list[str], optional + The name of each type of atoms + local_jdata : dict + The local data refer to the current class + + Returns + ------- + dict + The updated local data + float + The minimum distance between two atoms + """ + local_jdata_cpy = local_jdata.copy() + min_nbor_dist, sel = UpdateSel().update_one_sel( + train_data, type_map, local_jdata_cpy["rcut"], local_jdata_cpy["sel"], True + ) + local_jdata_cpy["sel"] = sel[0] + return local_jdata_cpy, min_nbor_dist + + +@DescriptorBlock.register("se_ttebd") +class DescrptBlockSeTTebd(DescriptorBlock): + def __init__( + self, + rcut: float, + rcut_smth: float, + sel: Union[List[int], int], + ntypes: int, + neuron: list = [25, 50, 100], + tebd_dim: int = 8, + tebd_input_mode: str = "concat", + set_davg_zero: bool = True, + activation_function="tanh", + precision: str = "float64", + resnet_dt: bool = False, + exclude_types: List[Tuple[int, int]] = [], + env_protection: float = 0.0, + smooth: bool = True, + seed: Optional[Union[int, List[int]]] = None, + ): + super().__init__() + self.rcut = rcut + self.rcut_smth = rcut_smth + self.neuron = neuron + self.filter_neuron = self.neuron + self.tebd_dim = tebd_dim + self.tebd_input_mode = tebd_input_mode + self.set_davg_zero = set_davg_zero + self.activation_function = activation_function + self.precision = precision + self.prec = PRECISION_DICT[self.precision] + self.resnet_dt = resnet_dt + self.env_protection = env_protection + self.seed = seed + self.smooth = smooth + + if isinstance(sel, int): + sel = [sel] + + self.ntypes = ntypes + self.sel = sel + self.sec = self.sel + self.split_sel = self.sel + self.nnei = sum(sel) + self.ndescrpt = self.nnei * 4 + # order matters, placed after the assignment of self.ntypes + self.reinit_exclude(exclude_types) + + wanted_shape = (self.ntypes, self.nnei, 4) + mean = torch.zeros( + wanted_shape, dtype=env.GLOBAL_PT_FLOAT_PRECISION, device=env.DEVICE + ) + stddev = torch.ones( + wanted_shape, dtype=env.GLOBAL_PT_FLOAT_PRECISION, device=env.DEVICE + ) + self.register_buffer("mean", mean) + self.register_buffer("stddev", stddev) + self.tebd_dim_input = self.tebd_dim * 2 + if self.tebd_input_mode in ["concat"]: + self.embd_input_dim = 1 + self.tebd_dim_input + else: + self.embd_input_dim = 1 + + self.filter_layers_old = None + self.filter_layers = None + self.filter_layers_strip = None + filter_layers = NetworkCollection( + ndim=0, ntypes=self.ntypes, network_type="embedding_network" + ) + filter_layers[0] = EmbeddingNet( + self.embd_input_dim, + self.filter_neuron, + activation_function=self.activation_function, + precision=self.precision, + resnet_dt=self.resnet_dt, + seed=child_seed(self.seed, 1), + ) + self.filter_layers = filter_layers + if self.tebd_input_mode in ["strip"]: + filter_layers_strip = NetworkCollection( + ndim=0, ntypes=self.ntypes, network_type="embedding_network" + ) + filter_layers_strip[0] = EmbeddingNet( + self.tebd_dim_input, + self.filter_neuron, + activation_function=self.activation_function, + precision=self.precision, + resnet_dt=self.resnet_dt, + seed=child_seed(self.seed, 2), + ) + self.filter_layers_strip = filter_layers_strip + self.stats = None + + def get_rcut(self) -> float: + """Returns the cut-off radius.""" + return self.rcut + + def get_rcut_smth(self) -> float: + """Returns the radius where the neighbor information starts to smoothly decay to 0.""" + return self.rcut_smth + + def get_nsel(self) -> int: + """Returns the number of selected atoms in the cut-off radius.""" + return sum(self.sel) + + def get_sel(self) -> List[int]: + """Returns the number of selected atoms for each type.""" + return self.sel + + def get_ntypes(self) -> int: + """Returns the number of element types.""" + return self.ntypes + + def get_dim_in(self) -> int: + """Returns the input dimension.""" + return self.dim_in + + def get_dim_out(self) -> int: + """Returns the output dimension.""" + return self.dim_out + + def get_dim_emb(self) -> int: + """Returns the output dimension of embedding.""" + return self.filter_neuron[-1] + + def __setitem__(self, key, value): + if key in ("avg", "data_avg", "davg"): + self.mean = value + elif key in ("std", "data_std", "dstd"): + self.stddev = value + else: + raise KeyError(key) + + def __getitem__(self, key): + if key in ("avg", "data_avg", "davg"): + return self.mean + elif key in ("std", "data_std", "dstd"): + return self.stddev + else: + raise KeyError(key) + + def mixed_types(self) -> bool: + """If true, the discriptor + 1. assumes total number of atoms aligned across frames; + 2. requires a neighbor list that does not distinguish different atomic types. + + If false, the discriptor + 1. assumes total number of atoms of each atom type aligned across frames; + 2. requires a neighbor list that distinguishes different atomic types. + + """ + return True + + def get_env_protection(self) -> float: + """Returns the protection of building environment matrix.""" + return self.env_protection + + @property + def dim_out(self): + """Returns the output dimension of this descriptor.""" + return self.filter_neuron[-1] + + @property + def dim_in(self): + """Returns the atomic input dimension of this descriptor.""" + return self.tebd_dim + + @property + def dim_emb(self): + """Returns the output dimension of embedding.""" + return self.get_dim_emb() + + def compute_input_stats( + self, + merged: Union[Callable[[], List[dict]], List[dict]], + path: Optional[DPPath] = None, + ): + """ + Compute the input statistics (e.g. mean and stddev) for the descriptors from packed data. + + Parameters + ---------- + merged : Union[Callable[[], List[dict]], List[dict]] + - List[dict]: A list of data samples from various data systems. + Each element, `merged[i]`, is a data dictionary containing `keys`: `torch.Tensor` + originating from the `i`-th data system. + - Callable[[], List[dict]]: A lazy function that returns data samples in the above format + only when needed. Since the sampling process can be slow and memory-intensive, + the lazy function helps by only sampling once. + path : Optional[DPPath] + The path to the stat file. + + """ + env_mat_stat = EnvMatStatSe(self) + if path is not None: + path = path / env_mat_stat.get_hash() + if path is None or not path.is_dir(): + if callable(merged): + # only get data for once + sampled = merged() + else: + sampled = merged + else: + sampled = [] + env_mat_stat.load_or_compute_stats(sampled, path) + self.stats = env_mat_stat.stats + mean, stddev = env_mat_stat() + if not self.set_davg_zero: + self.mean.copy_(torch.tensor(mean, device=env.DEVICE)) # pylint: disable=no-explicit-dtype + self.stddev.copy_(torch.tensor(stddev, device=env.DEVICE)) # pylint: disable=no-explicit-dtype + + def get_stats(self) -> Dict[str, StatItem]: + """Get the statistics of the descriptor.""" + if self.stats is None: + raise RuntimeError( + "The statistics of the descriptor has not been computed." + ) + return self.stats + + def reinit_exclude( + self, + exclude_types: List[Tuple[int, int]] = [], + ): + self.exclude_types = exclude_types + self.emask = PairExcludeMask(self.ntypes, exclude_types=exclude_types) + + def forward( + self, + nlist: torch.Tensor, + extended_coord: torch.Tensor, + extended_atype: torch.Tensor, + extended_atype_embd: Optional[torch.Tensor] = None, + mapping: Optional[torch.Tensor] = None, + ): + """Compute the descriptor. + + Parameters + ---------- + nlist + The neighbor list. shape: nf x nloc x nnei + extended_coord + The extended coordinates of atoms. shape: nf x (nallx3) + extended_atype + The extended aotm types. shape: nf x nall x nt + extended_atype_embd + The extended type embedding of atoms. shape: nf x nall + mapping + The index mapping, not required by this descriptor. + + Returns + ------- + result + The descriptor. shape: nf x nloc x (ng x axis_neuron) + g2 + The rotationally invariant pair-partical representation. + shape: nf x nloc x nnei x ng + h2 + The rotationally equivariant pair-partical representation. + shape: nf x nloc x nnei x 3 + gr + The rotationally equivariant and permutationally invariant single particle + representation. shape: nf x nloc x ng x 3 + sw + The smooth switch function. shape: nf x nloc x nnei + + """ + del mapping + assert extended_atype_embd is not None + nframes, nloc, nnei = nlist.shape + atype = extended_atype[:, :nloc] + nb = nframes + nall = extended_coord.view(nb, -1, 3).shape[1] + dmatrix, diff, sw = prod_env_mat( + extended_coord, + nlist, + atype, + self.mean, + self.stddev, + self.rcut, + self.rcut_smth, + protection=self.env_protection, + ) + # nb x nloc x nnei + exclude_mask = self.emask(nlist, extended_atype) + nlist = torch.where(exclude_mask != 0, nlist, -1) + nlist_mask = nlist != -1 + nlist = torch.where(nlist == -1, 0, nlist) + sw = torch.squeeze(sw, -1) + # nf x nall x nt + nt = extended_atype_embd.shape[-1] + atype_tebd_ext = extended_atype_embd + # nb x (nloc x nnei) x nt + index = nlist.reshape(nb, nloc * nnei).unsqueeze(-1).expand(-1, -1, nt) + # nb x (nloc x nnei) x nt + atype_tebd_nlist = torch.gather(atype_tebd_ext, dim=1, index=index) + # nb x nloc x nnei x nt + atype_tebd_nlist = atype_tebd_nlist.view(nb, nloc, nnei, nt) + # beyond the cutoff sw should be 0.0 + sw = sw.masked_fill(~nlist_mask, 0.0) + # (nb x nloc) x nnei + exclude_mask = exclude_mask.view(nb * nloc, nnei) + assert self.filter_layers is not None + # nfnl x nnei x 4 + dmatrix = dmatrix.view(-1, self.nnei, 4) + nfnl = dmatrix.shape[0] + # nfnl x nnei x 4 + rr = dmatrix + rr = rr * exclude_mask[:, :, None] + + # nfnl x nt_i x 3 + rr_i = rr[:, :, 1:] + # nfnl x nt_j x 3 + rr_j = rr[:, :, 1:] + # nfnl x nt_i x nt_j + env_ij = torch.einsum("ijm,ikm->ijk", rr_i, rr_j) + # nfnl x nt_i x nt_j x 1 + ss = env_ij.unsqueeze(-1) + + # nfnl x nnei x tebd_dim + nlist_tebd = atype_tebd_nlist.reshape(nfnl, nnei, self.tebd_dim) + + # nfnl x nt_i x nt_j x tebd_dim + nlist_tebd_i = nlist_tebd.unsqueeze(2).expand([-1, -1, self.nnei, -1]) + nlist_tebd_j = nlist_tebd.unsqueeze(1).expand([-1, self.nnei, -1, -1]) + + if self.tebd_input_mode in ["concat"]: + # nfnl x nt_i x nt_j x (1 + tebd_dim * 2) + ss = torch.concat([ss, nlist_tebd_i, nlist_tebd_j], dim=-1) + # nfnl x nt_i x nt_j x ng + gg = self.filter_layers.networks[0](ss) + elif self.tebd_input_mode in ["strip"]: + # nfnl x nt_i x nt_j x ng + gg_s = self.filter_layers.networks[0](ss) + assert self.filter_layers_strip is not None + # nfnl x nt_i x nt_j x (tebd_dim * 2) + tt = torch.concat([nlist_tebd_i, nlist_tebd_j], dim=-1) + # nfnl x nt_i x nt_j x ng + gg_t = self.filter_layers_strip.networks[0](tt) + if self.smooth: + gg_t = ( + gg_t + * sw.reshape(nfnl, self.nnei, 1, 1) + * sw.reshape(nfnl, 1, self.nnei, 1) + ) + # nfnl x nt_i x nt_j x ng + gg = gg_s * gg_t + gg_s + else: + raise NotImplementedError + + # nfnl x ng + res_ij = torch.einsum("ijk,ijkm->im", env_ij, gg) + res_ij = res_ij * (1.0 / float(self.nnei) / float(self.nnei)) + # nf x nl x ng + result = res_ij.view(nframes, nloc, self.filter_neuron[-1]) + return ( + result.to(dtype=env.GLOBAL_PT_FLOAT_PRECISION), + None, + None, + None, + sw, + ) + + def has_message_passing(self) -> bool: + """Returns whether the descriptor block has message passing.""" + return False + + def need_sorted_nlist_for_lower(self) -> bool: + """Returns whether the descriptor block needs sorted nlist when using `forward_lower`.""" + return False diff --git a/deepmd/utils/argcheck.py b/deepmd/utils/argcheck.py index 8ee0a480a7..88ea439acc 100644 --- a/deepmd/utils/argcheck.py +++ b/deepmd/utils/argcheck.py @@ -661,6 +661,120 @@ def descrpt_se_atten_args(): ] +@descrpt_args_plugin.register("se_e3_tebd", doc=doc_only_pt_supported) +def descrpt_se_e3_tebd_args(): + doc_sel = 'This parameter set the number of selected neighbors. Note that this parameter is a little different from that in other descriptors. Instead of separating each type of atoms, only the summation matters. And this number is highly related with the efficiency, thus one should not make it too large. Usually 200 or less is enough, far away from the GPU limitation 4096. It can be:\n\n\ + - `int`. The maximum number of neighbor atoms to be considered. We recommend it to be less than 200. \n\n\ + - `List[int]`. The length of the list should be the same as the number of atom types in the system. `sel[i]` gives the selected number of type-i neighbors. Only the summation of `sel[i]` matters, and it is recommended to be less than 200.\ + - `str`. Can be "auto:factor" or "auto". "factor" is a float number larger than 1. This option will automatically determine the `sel`. In detail it counts the maximal number of neighbors with in the cutoff radius for each type of neighbor, then multiply the maximum by the "factor". Finally the number is wraped up to 4 divisible. The option "auto" is equivalent to "auto:1.1".' + doc_rcut = "The cut-off radius." + doc_rcut_smth = "Where to start smoothing. For example the 1/r term is smoothed from `rcut` to `rcut_smth`" + doc_neuron = "Number of neurons in each hidden layers of the embedding net. When two layers are of the same size or one layer is twice as large as the previous layer, a skip connection is built." + doc_activation_function = f'The activation function in the embedding net. Supported activation functions are {list_to_doc(ACTIVATION_FN_DICT.keys())} Note that "gelu" denotes the custom operator version, and "gelu_tf" denotes the TF standard version. If you set "None" or "none" here, no activation function will be used.' + doc_resnet_dt = 'Whether to use a "Timestep" in the skip connection' + doc_precision = f"The precision of the embedding net parameters, supported options are {list_to_doc(PRECISION_DICT.keys())} Default follows the interface precision." + doc_trainable = "If the parameters in the embedding net is trainable" + doc_seed = "Random seed for parameter initialization" + doc_exclude_types = "The excluded pairs of types which have no interaction with each other. For example, `[[0, 1]]` means no interaction between type 0 and type 1." + doc_env_protection = "Protection parameter to prevent division by zero errors during environment matrix calculations. For example, when using paddings, there may be zero distances of neighbors, which may make division by zero error during environment matrix calculations without protection." + doc_smooth = "Whether to use smooth process in calculation when using stripped type embedding. Whether to dot smooth factor (both neighbors j and k) on the network output (out_jk) of type embedding to keep the network smooth, instead of setting `set_davg_zero` to be True." + doc_set_davg_zero = "Set the normalization average to zero. This option should be set when `atom_ener` in the energy fitting is used" + doc_tebd_dim = "The dimension of atom type embedding." + doc_use_econf_tebd = r"Whether to use electronic configuration type embedding." + doc_concat_output_tebd = ( + "Whether to concat type embedding at the output of the descriptor." + ) + doc_tebd_input_mode = ( + "The input mode of the type embedding. Supported modes are ['concat', 'strip']." + "- 'concat': Concatenate the type embedding with the smoothed angular information as the union input for the embedding network. " + "The input is `input_jk = concat([angle_jk, tebd_j, tebd_k])`. " + "The output is `out_jk = embeding(input_jk)` for the three-body representation of atom i with neighbors j and k." + "- 'strip': Use a separated embedding network for the type embedding and combine the output with the angular embedding network output. " + "The input is `input_t = concat([tebd_j, tebd_k])`." + "The output is `out_jk = embeding_t(input_t) * embeding_s(angle_jk) + embeding_s(angle_jk)` for the three-body representation of atom i with neighbors j and k." + ) + + return [ + Argument( + "sel", [int, List[int], str], optional=True, default="auto", doc=doc_sel + ), + Argument("rcut", float, optional=True, default=6.0, doc=doc_rcut), + Argument("rcut_smth", float, optional=True, default=0.5, doc=doc_rcut_smth), + Argument( + "neuron", List[int], optional=True, default=[10, 20, 40], doc=doc_neuron + ), + Argument( + "tebd_dim", + int, + optional=True, + default=8, + doc=doc_only_pt_supported + doc_tebd_dim, + ), + Argument( + "tebd_input_mode", + str, + optional=True, + default="concat", + doc=doc_tebd_input_mode, + ), + Argument("resnet_dt", bool, optional=True, default=False, doc=doc_resnet_dt), + Argument( + "set_davg_zero", bool, optional=True, default=True, doc=doc_set_davg_zero + ), + Argument( + "activation_function", + str, + optional=True, + default="tanh", + doc=doc_activation_function, + ), + Argument( + "env_protection", + float, + optional=True, + default=0.0, + doc=doc_only_pt_supported + doc_env_protection, + ), + Argument( + "smooth", + bool, + optional=True, + default=True, + doc=doc_smooth, + ), + Argument( + "exclude_types", + List[List[int]], + optional=True, + default=[], + doc=doc_exclude_types, + ), + Argument("precision", str, optional=True, default="default", doc=doc_precision), + Argument("trainable", bool, optional=True, default=True, doc=doc_trainable), + Argument("seed", [int, None], optional=True, doc=doc_seed), + Argument( + "concat_output_tebd", + bool, + optional=True, + default=True, + doc=doc_only_pt_supported + doc_concat_output_tebd, + ), + Argument( + "use_econf_tebd", + bool, + optional=True, + default=False, + doc=doc_only_pt_supported + doc_use_econf_tebd, + ), + Argument( + "use_tebd_bias", + bool, + optional=True, + default=True, + ), + ] + + @descrpt_args_plugin.register("se_atten_v2") def descrpt_se_atten_v2_args(): doc_set_davg_zero = "Set the normalization average to zero. This option should be set when `se_atten` descriptor or `atom_ener` in the energy fitting is used" diff --git a/doc/model/index.rst b/doc/model/index.rst index ff4e986178..8409d4ce97 100644 --- a/doc/model/index.rst +++ b/doc/model/index.rst @@ -18,6 +18,7 @@ Model train-fitting-dos train-se-e2-a-tebd train-se-a-mask + train-se-e3-tebd dplr dprc linear diff --git a/doc/model/train-se-e3-tebd.md b/doc/model/train-se-e3-tebd.md new file mode 100644 index 0000000000..8b49b0c220 --- /dev/null +++ b/doc/model/train-se-e3-tebd.md @@ -0,0 +1,78 @@ +# Descriptor `"se_e3_tebd"` {{ pytorch_icon }} {{ dpmodel_icon }} + +:::{note} +**Supported backends**: PyTorch {{ pytorch_icon }}, DP {{ dpmodel_icon }} +::: + +The notation of `se_e3_tebd` is short for the three-body embedding descriptor with type embeddings, where the notation `se` denotes the Deep Potential Smooth Edition (DeepPot-SE). +The embedding takes bond angles between a central atom and its two neighboring atoms (denoted by `e3`) and their type embeddings (denoted by `tebd`) as input. + +## Theory + +The three-body embedding DeepPot-SE descriptor with type embeddings incorporates bond-angle and type information, making the model more accurate. The descriptor $\mathcal{D}^i$ can be represented as + +```math + \mathcal{D}^i = \frac{1}{N_c^2}(\mathcal{R}^i(\mathcal{R}^i)^T):\mathcal{G}^i, +``` + +where +$N_c$ is the expected maximum number of neighboring atoms, which is the same constant for all atoms over all frames. +$\mathcal{R}^i$ is constructed as + +```math + (\mathcal{R}^i)_j = + \{ + \begin{array}{cccc} + s(r_{ij}) & \frac{s(r_{ij})x_{ij}}{r_{ij}} & \frac{s(r_{ij})y_{ij}}{r_{ij}} & \frac{s(r_{ij})z_{ij}}{r_{ij}} + \end{array} + \}, +``` + +where $s(r_{ij})$ is the switch function between central atom $i$ and neighbor atom $j$, which is the same as that in `se_e2_a`. + +Currently, only the full information case of $\mathcal{R}^i$ is supported by the three-body embedding. +Each element of $\mathcal{G}^i \in \mathbb{R}^{N_c \times N_c \times M}$ comes from $M$ nodes from the output layer of an NN function $\mathcal{N}_{e,3}$. +If `tebd_input_mode` is set to `concat`, the formulation is: + +```math + (\mathcal{G}^i)_{jk}=\mathcal{N}_{e,3}((\theta_i)_{jk}, \mathcal{A}^j, \mathcal{A}^k) +``` + +Otherwise, if `tebd_input_mode` is set to `strip`, the angular and type information will be taken into two separate NNs $\mathcal{N}_{e,3}^{s}$ and $\mathcal{N}_{e,3}^{t}$. The formulation is: + +```math + (\mathcal{G}^i)_{jk}=\mathcal{N}_{e,3}^{s}((\theta_i)_{jk}) + \mathcal{N}_{e,3}^{s}((\theta_i)_{jk}) \odot ( \mathcal{N}_{e,3}^{t}(\mathcal{A}^j, \mathcal{A}^k) \odot s(r_{ij}) \odot s(r_{ik})) +``` + +where $(\theta_i)_ {jk} = (\mathcal{R}^i)_ {j,\\{2,3,4\\}}\cdot (\mathcal{R}^i)_ {k,\\{2,3,4\\}}$ considers the angle form of two neighbours ($j$ and $k$). +The type embeddings of neighboring atoms $\mathcal{A}^j$ and $\mathcal{A}^k$ are added as an extra input of the embedding network. +The notation $:$ in the equation indicates the contraction between matrix $\mathcal{R}^i(\mathcal{R}^i)^T$ and the first two dimensions of tensor $\mathcal{G}^i$. + +## Instructions + +A complete training input script of this example can be found in the directory + +```bash +$deepmd_source_dir/examples/water/se_e3_tebd/input.json +``` + +The training input script is very similar to that of [`se_e2_a`](train-se-e2-a.md). The only difference lies in the {ref}`descriptor ` section + +```json + "descriptor": { + "type": "se_e3_tebd", + "sel": 40, + "rcut_smth": 0.5, + "rcut": 4.0, + "neuron": [ + 2, + 4, + 8 + ], + "tebd_dim": 8, + "tebd_input_mode": "concat", + "activation_function": "tanh" + }, +``` + +The type of the descriptor is set by the key {ref}`type `. diff --git a/examples/water/se_e3_tebd/input_torch.json b/examples/water/se_e3_tebd/input_torch.json new file mode 100644 index 0000000000..05aee70a73 --- /dev/null +++ b/examples/water/se_e3_tebd/input_torch.json @@ -0,0 +1,77 @@ +{ + "_comment": "that's all", + "model": { + "type_map": [ + "O", + "H" + ], + "descriptor": { + "type": "se_e3_tebd", + "sel": 40, + "rcut_smth": 0.5, + "rcut": 4.0, + "neuron": [ + 2, + 4, + 8 + ], + "tebd_dim": 8, + "tebd_input_mode": "concat", + "activation_function": "tanh" + }, + "fitting_net": { + "neuron": [ + 240, + 240, + 240 + ], + "resnet_dt": true, + "seed": 1, + "_comment": " that's all" + }, + "_comment": " that's all" + }, + "learning_rate": { + "type": "exp", + "decay_steps": 5000, + "start_lr": 0.001, + "stop_lr": 3.51e-08, + "_comment": "that's all" + }, + "loss": { + "type": "ener", + "start_pref_e": 0.02, + "limit_pref_e": 1, + "start_pref_f": 1000, + "limit_pref_f": 1, + "start_pref_v": 0, + "limit_pref_v": 0, + "_comment": " that's all" + }, + "training": { + "stat_file": "./se_e3_tebd.hdf5", + "training_data": { + "systems": [ + "../data/data_0", + "../data/data_1", + "../data/data_2" + ], + "batch_size": 1, + "_comment": "that's all" + }, + "validation_data": { + "systems": [ + "../data/data_3" + ], + "batch_size": 1, + "numb_btch": 3, + "_comment": "that's all" + }, + "numb_steps": 1000000, + "seed": 10, + "disp_file": "lcurve.out", + "disp_freq": 100, + "save_freq": 1000, + "_comment": "that's all" + } +} diff --git a/source/tests/common/test_examples.py b/source/tests/common/test_examples.py index 6498d7beb2..24735fa59f 100644 --- a/source/tests/common/test_examples.py +++ b/source/tests/common/test_examples.py @@ -53,6 +53,7 @@ p_examples / "water" / "se_e2_a" / "input_torch.json", p_examples / "water" / "se_atten" / "input_torch.json", p_examples / "water" / "dpa2" / "input_torch.json", + p_examples / "water" / "se_e3_tebd" / "input_torch.json", ) input_files_multi = ( diff --git a/source/tests/consistent/descriptor/test_se_t_tebd.py b/source/tests/consistent/descriptor/test_se_t_tebd.py new file mode 100644 index 0000000000..2ddbe70a4f --- /dev/null +++ b/source/tests/consistent/descriptor/test_se_t_tebd.py @@ -0,0 +1,267 @@ +# SPDX-License-Identifier: LGPL-3.0-or-later +import unittest +from typing import ( + Any, + Tuple, +) + +import numpy as np +from dargs import ( + Argument, +) + +from deepmd.dpmodel.descriptor.se_t_tebd import DescrptSeTTebd as DescrptSeTTebdDP +from deepmd.env import ( + GLOBAL_NP_FLOAT_PRECISION, +) + +from ..common import ( + INSTALLED_PT, + CommonTest, + parameterized, +) +from .common import ( + DescriptorTest, +) + +if INSTALLED_PT: + from deepmd.pt.model.descriptor.se_t_tebd import DescrptSeTTebd as DescrptSeTTebdPT +else: + DescrptSeTTebdPT = None +DescrptSeTTebdTF = None +from deepmd.utils.argcheck import ( + descrpt_se_e3_tebd_args, +) + + +@parameterized( + (4,), # tebd_dim + ("strip",), # tebd_input_mode + (True,), # resnet_dt + ([], [[0, 1]]), # excluded_types + (0.0,), # env_protection + (True, False), # set_davg_zero + (True, False), # smooth + (True,), # concat_output_tebd + ("float64",), # precision + (True, False), # use_econf_tebd + (False, True), # use_tebd_bias +) +class TestSeTTebd(CommonTest, DescriptorTest, unittest.TestCase): + @property + def data(self) -> dict: + ( + tebd_dim, + tebd_input_mode, + resnet_dt, + excluded_types, + env_protection, + set_davg_zero, + smooth, + concat_output_tebd, + precision, + use_econf_tebd, + use_tebd_bias, + ) = self.param + return { + "sel": [10], + "rcut_smth": 3.50, + "rcut": 4.00, + "neuron": [2, 4, 8], + "ntypes": self.ntypes, + "tebd_dim": tebd_dim, + "tebd_input_mode": tebd_input_mode, + "concat_output_tebd": concat_output_tebd, + "resnet_dt": resnet_dt, + "exclude_types": excluded_types, + "env_protection": env_protection, + "precision": precision, + "set_davg_zero": set_davg_zero, + "smooth": smooth, + "use_econf_tebd": use_econf_tebd, + "use_tebd_bias": use_tebd_bias, + "type_map": ["O", "H"] if use_econf_tebd else None, + "seed": 1145141919810, + } + + @property + def skip_pt(self) -> bool: + ( + tebd_dim, + tebd_input_mode, + resnet_dt, + excluded_types, + env_protection, + set_davg_zero, + smooth, + concat_output_tebd, + precision, + use_econf_tebd, + use_tebd_bias, + ) = self.param + return CommonTest.skip_pt + + @property + def skip_dp(self) -> bool: + ( + tebd_dim, + tebd_input_mode, + resnet_dt, + excluded_types, + env_protection, + set_davg_zero, + smooth, + concat_output_tebd, + precision, + use_econf_tebd, + use_tebd_bias, + ) = self.param + return CommonTest.skip_pt + + @property + def skip_tf(self) -> bool: + ( + tebd_dim, + tebd_input_mode, + resnet_dt, + excluded_types, + env_protection, + set_davg_zero, + smooth, + concat_output_tebd, + precision, + use_econf_tebd, + use_tebd_bias, + ) = self.param + return True + + tf_class = DescrptSeTTebdTF + dp_class = DescrptSeTTebdDP + pt_class = DescrptSeTTebdPT + args = descrpt_se_e3_tebd_args().append(Argument("ntypes", int, optional=False)) + + def setUp(self): + CommonTest.setUp(self) + + self.ntypes = 2 + self.coords = np.array( + [ + 12.83, + 2.56, + 2.18, + 12.09, + 2.87, + 2.74, + 00.25, + 3.32, + 1.68, + 3.36, + 3.00, + 1.81, + 3.51, + 2.51, + 2.60, + 4.27, + 3.22, + 1.56, + ], + dtype=GLOBAL_NP_FLOAT_PRECISION, + ) + self.atype = np.array([0, 1, 1, 0, 1, 1], dtype=np.int32) + self.box = np.array( + [13.0, 0.0, 0.0, 0.0, 13.0, 0.0, 0.0, 0.0, 13.0], + dtype=GLOBAL_NP_FLOAT_PRECISION, + ) + self.natoms = np.array([6, 6, 2, 4], dtype=np.int32) + ( + tebd_dim, + tebd_input_mode, + resnet_dt, + excluded_types, + env_protection, + set_davg_zero, + smooth, + concat_output_tebd, + precision, + use_econf_tebd, + use_tebd_bias, + ) = self.param + + def build_tf(self, obj: Any, suffix: str) -> Tuple[list, dict]: + return self.build_tf_descriptor( + obj, + self.natoms, + self.coords, + self.atype, + self.box, + suffix, + ) + + def eval_dp(self, dp_obj: Any) -> Any: + return self.eval_dp_descriptor( + dp_obj, + self.natoms, + self.coords, + self.atype, + self.box, + mixed_types=True, + ) + + def eval_pt(self, pt_obj: Any) -> Any: + return self.eval_pt_descriptor( + pt_obj, + self.natoms, + self.coords, + self.atype, + self.box, + mixed_types=True, + ) + + def extract_ret(self, ret: Any, backend) -> Tuple[np.ndarray, ...]: + return (ret[0],) + + @property + def rtol(self) -> float: + """Relative tolerance for comparing the return value.""" + ( + tebd_dim, + tebd_input_mode, + resnet_dt, + excluded_types, + env_protection, + set_davg_zero, + smooth, + concat_output_tebd, + precision, + use_econf_tebd, + use_tebd_bias, + ) = self.param + if precision == "float64": + return 1e-10 + elif precision == "float32": + return 1e-4 + else: + raise ValueError(f"Unknown precision: {precision}") + + @property + def atol(self) -> float: + """Absolute tolerance for comparing the return value.""" + ( + tebd_dim, + tebd_input_mode, + resnet_dt, + excluded_types, + env_protection, + set_davg_zero, + smooth, + concat_output_tebd, + precision, + use_econf_tebd, + use_tebd_bias, + ) = self.param + if precision == "float64": + return 1e-10 + elif precision == "float32": + return 1e-4 + else: + raise ValueError(f"Unknown precision: {precision}") diff --git a/source/tests/universal/dpmodel/atomc_model/test_atomic_model.py b/source/tests/universal/dpmodel/atomc_model/test_atomic_model.py index 10ff269ef3..e83705159a 100644 --- a/source/tests/universal/dpmodel/atomc_model/test_atomic_model.py +++ b/source/tests/universal/dpmodel/atomc_model/test_atomic_model.py @@ -41,6 +41,7 @@ DescriptorParamDPA2List, DescriptorParamHybrid, DescriptorParamHybridMixed, + DescriptorParamHybridMixedTTebd, DescriptorParamSeA, DescriptorParamSeAList, DescriptorParamSeR, @@ -76,6 +77,7 @@ *[(param_func, DescrptDPA2) for param_func in DescriptorParamDPA2List], (DescriptorParamHybrid, DescrptHybrid), (DescriptorParamHybridMixed, DescrptHybrid), + (DescriptorParamHybridMixedTTebd, DescrptHybrid), ), # descrpt_class_param & class ((FittingParamEnergy, EnergyFittingNet),), # fitting_class_param & class ), @@ -142,6 +144,7 @@ def setUpClass(cls): *[(param_func, DescrptDPA2) for param_func in DescriptorParamDPA2List], (DescriptorParamHybrid, DescrptHybrid), (DescriptorParamHybridMixed, DescrptHybrid), + (DescriptorParamHybridMixedTTebd, DescrptHybrid), ), # descrpt_class_param & class ((FittingParamDos, DOSFittingNet),), # fitting_class_param & class ), @@ -330,6 +333,7 @@ def setUpClass(cls): *[(param_func, DescrptDPA1) for param_func in DescriptorParamDPA1List], *[(param_func, DescrptDPA2) for param_func in DescriptorParamDPA2List], (DescriptorParamHybridMixed, DescrptHybrid), + (DescriptorParamHybridMixedTTebd, DescrptHybrid), ), # descrpt_class_param & class ((FittingParamEnergy, EnergyFittingNet),), # fitting_class_param & class ), diff --git a/source/tests/universal/dpmodel/descriptor/test_descriptor.py b/source/tests/universal/dpmodel/descriptor/test_descriptor.py index 41d915847e..424dd2ea39 100644 --- a/source/tests/universal/dpmodel/descriptor/test_descriptor.py +++ b/source/tests/universal/dpmodel/descriptor/test_descriptor.py @@ -11,6 +11,7 @@ DescrptSeA, DescrptSeR, DescrptSeT, + DescrptSeTTebd, ) from deepmd.dpmodel.descriptor.dpa2 import ( RepformerArgs, @@ -164,6 +165,68 @@ def DescriptorParamSeT( DescriptorParamSeT = DescriptorParamSeTList[0] +def DescriptorParamSeTTebd( + ntypes, + rcut, + rcut_smth, + sel, + type_map, + env_protection=0.0, + exclude_types=[], + tebd_dim=4, + tebd_input_mode="concat", + concat_output_tebd=True, + resnet_dt=True, + set_davg_zero=True, + smooth=True, + use_econf_tebd=False, + use_tebd_bias=False, + precision="float64", +): + input_dict = { + "ntypes": ntypes, + "rcut": rcut, + "rcut_smth": rcut_smth, + "sel": sel, # use a small sel for efficiency + "type_map": type_map, + "seed": GLOBAL_SEED, + "tebd_dim": tebd_dim, + "tebd_input_mode": tebd_input_mode, + "concat_output_tebd": concat_output_tebd, + "resnet_dt": resnet_dt, + "exclude_types": exclude_types, + "env_protection": env_protection, + "set_davg_zero": set_davg_zero, + "smooth": smooth, + "use_econf_tebd": use_econf_tebd, + "use_tebd_bias": use_tebd_bias, + "precision": precision, + } + return input_dict + + +DescriptorParamSeTTebdList = parameterize_func( + DescriptorParamSeTTebd, + OrderedDict( + { + "tebd_dim": (4,), + "tebd_input_mode": ("concat", "strip"), + "resnet_dt": (True,), + "exclude_types": ([], [[0, 1]]), + "env_protection": (0.0,), + "set_davg_zero": (False,), + "smooth": (True, False), + "concat_output_tebd": (True,), + "use_econf_tebd": (False, True), + "use_tebd_bias": (False,), + "precision": ("float64",), + } + ), +) +# to get name for the default function +DescriptorParamSeTTebd = DescriptorParamSeTTebdList[0] + + def DescriptorParamDPA1( ntypes, rcut, @@ -411,15 +474,34 @@ def DescriptorParamHybridMixed(ntypes, rcut, rcut_smth, sel, type_map, **kwargs) return input_dict +def DescriptorParamHybridMixedTTebd(ntypes, rcut, rcut_smth, sel, type_map, **kwargs): + ddsub0 = { + "type": "dpa1", + **DescriptorParamDPA1(ntypes, rcut, rcut_smth, sum(sel), type_map, **kwargs), + } + ddsub1 = { + "type": "se_e3_tebd", + **DescriptorParamSeTTebd( + ntypes, rcut / 2, rcut_smth / 2, min(sum(sel) // 2, 10), type_map, **kwargs + ), + } # use a small sel for efficiency + input_dict = { + "list": [ddsub0, ddsub1], + } + return input_dict + + @parameterized( ( (DescriptorParamSeA, DescrptSeA), (DescriptorParamSeR, DescrptSeR), (DescriptorParamSeT, DescrptSeT), + (DescriptorParamSeTTebd, DescrptSeTTebd), (DescriptorParamDPA1, DescrptDPA1), (DescriptorParamDPA2, DescrptDPA2), (DescriptorParamHybrid, DescrptHybrid), (DescriptorParamHybridMixed, DescrptHybrid), + (DescriptorParamHybridMixedTTebd, DescrptHybrid), ) # class_param & class ) @unittest.skipIf(TEST_DEVICE != "cpu", "Only test on CPU.") diff --git a/source/tests/universal/dpmodel/model/test_model.py b/source/tests/universal/dpmodel/model/test_model.py index aa735977d5..66edc2d50e 100644 --- a/source/tests/universal/dpmodel/model/test_model.py +++ b/source/tests/universal/dpmodel/model/test_model.py @@ -8,6 +8,7 @@ DescrptSeA, DescrptSeR, DescrptSeT, + DescrptSeTTebd, ) from deepmd.dpmodel.fitting import ( EnergyFittingNet, @@ -40,12 +41,15 @@ DescriptorParamDPA2List, DescriptorParamHybrid, DescriptorParamHybridMixed, + DescriptorParamHybridMixedTTebd, DescriptorParamSeA, DescriptorParamSeAList, DescriptorParamSeR, DescriptorParamSeRList, DescriptorParamSeT, DescriptorParamSeTList, + DescriptorParamSeTTebd, + DescriptorParamSeTTebdList, ) from ..fitting.test_fitting import ( FittingParamEnergy, @@ -82,10 +86,15 @@ def skip_model_tests(test_obj): *[(param_func, DescrptSeA) for param_func in DescriptorParamSeAList], *[(param_func, DescrptSeR) for param_func in DescriptorParamSeRList], *[(param_func, DescrptSeT) for param_func in DescriptorParamSeTList], + *[ + (param_func, DescrptSeTTebd) + for param_func in DescriptorParamSeTTebdList + ], *[(param_func, DescrptDPA1) for param_func in DescriptorParamDPA1List], *[(param_func, DescrptDPA2) for param_func in DescriptorParamDPA2List], (DescriptorParamHybrid, DescrptHybrid), (DescriptorParamHybridMixed, DescrptHybrid), + (DescriptorParamHybridMixedTTebd, DescrptHybrid), ), # descrpt_class_param & class ((FittingParamEnergy, EnergyFittingNet),), # fitting_class_param & class ), @@ -94,6 +103,7 @@ def skip_model_tests(test_obj): (DescriptorParamSeA, DescrptSeA), (DescriptorParamSeR, DescrptSeR), (DescriptorParamSeT, DescrptSeT), + (DescriptorParamSeTTebd, DescrptSeTTebd), (DescriptorParamDPA1, DescrptDPA1), (DescriptorParamDPA2, DescrptDPA2), ), # descrpt_class_param & class @@ -116,7 +126,7 @@ def setUpClass(cls): cls.aprec_dict["test_smooth"] = 5e-5 if Descrpt in [DescrptDPA1]: cls.epsilon_dict["test_smooth"] = 1e-6 - if Descrpt in [DescrptSeT]: + if Descrpt in [DescrptSeT, DescrptSeTTebd]: # computational expensive cls.expected_sel = [i // 4 for i in cls.expected_sel] cls.expected_rcut = cls.expected_rcut / 2 @@ -163,11 +173,16 @@ def setUpClass(cls): *[(param_func, DescrptSeA) for param_func in DescriptorParamSeAList], *[(param_func, DescrptSeR) for param_func in DescriptorParamSeRList], *[(param_func, DescrptSeT) for param_func in DescriptorParamSeTList], + *[ + (param_func, DescrptSeTTebd) + for param_func in DescriptorParamSeTTebdList + ], *[(param_func, DescrptDPA1) for param_func in DescriptorParamDPA1List], *[(param_func, DescrptDPA2) for param_func in DescriptorParamDPA2List], # (DescriptorParamHybrid, DescrptHybrid), # unsupported for SpinModel to hybrid both mixed_types and no-mixed_types descriptor (DescriptorParamHybridMixed, DescrptHybrid), + (DescriptorParamHybridMixedTTebd, DescrptHybrid), ), # descrpt_class_param & class ((FittingParamEnergy, EnergyFittingNet),), # fitting_class_param & class ), @@ -176,6 +191,7 @@ def setUpClass(cls): (DescriptorParamSeA, DescrptSeA), (DescriptorParamSeR, DescrptSeR), (DescriptorParamSeT, DescrptSeT), + (DescriptorParamSeTTebd, DescrptSeTTebd), (DescriptorParamDPA1, DescrptDPA1), (DescriptorParamDPA2, DescrptDPA2), ), # descrpt_class_param & class @@ -197,7 +213,7 @@ def setUpClass(cls): # set special precision if Descrpt in [DescrptDPA2, DescrptHybrid]: cls.epsilon_dict["test_smooth"] = 1e-8 - if Descrpt in [DescrptSeT]: + if Descrpt in [DescrptSeT, DescrptSeTTebd]: # computational expensive cls.expected_sel = [i // 4 for i in cls.expected_sel] cls.expected_rcut = cls.expected_rcut / 2 diff --git a/source/tests/universal/pt/atomc_model/test_atomic_model.py b/source/tests/universal/pt/atomc_model/test_atomic_model.py index f0eff421b7..2853deb0db 100644 --- a/source/tests/universal/pt/atomc_model/test_atomic_model.py +++ b/source/tests/universal/pt/atomc_model/test_atomic_model.py @@ -38,6 +38,7 @@ DescriptorParamDPA2List, DescriptorParamHybrid, DescriptorParamHybridMixed, + DescriptorParamHybridMixedTTebd, DescriptorParamSeA, DescriptorParamSeAList, DescriptorParamSeR, @@ -73,6 +74,7 @@ *[(param_func, DescrptDPA2) for param_func in DescriptorParamDPA2List], (DescriptorParamHybrid, DescrptHybrid), (DescriptorParamHybridMixed, DescrptHybrid), + (DescriptorParamHybridMixedTTebd, DescrptHybrid), ), # descrpt_class_param & class ((FittingParamEnergy, EnergyFittingNet),), # fitting_class_param & class ), @@ -138,6 +140,7 @@ def setUpClass(cls): *[(param_func, DescrptDPA2) for param_func in DescriptorParamDPA2List], (DescriptorParamHybrid, DescrptHybrid), (DescriptorParamHybridMixed, DescrptHybrid), + (DescriptorParamHybridMixedTTebd, DescrptHybrid), ), # descrpt_class_param & class ((FittingParamDos, DOSFittingNet),), # fitting_class_param & class ), @@ -323,6 +326,7 @@ def setUpClass(cls): *[(param_func, DescrptDPA1) for param_func in DescriptorParamDPA1List], *[(param_func, DescrptDPA2) for param_func in DescriptorParamDPA2List], (DescriptorParamHybridMixed, DescrptHybrid), + (DescriptorParamHybridMixedTTebd, DescrptHybrid), ), # descrpt_class_param & class ((FittingParamEnergy, EnergyFittingNet),), # fitting_class_param & class ), diff --git a/source/tests/universal/pt/descriptor/test_descriptor.py b/source/tests/universal/pt/descriptor/test_descriptor.py index 7e2226feb6..4a5d511deb 100644 --- a/source/tests/universal/pt/descriptor/test_descriptor.py +++ b/source/tests/universal/pt/descriptor/test_descriptor.py @@ -8,6 +8,7 @@ DescrptSeA, DescrptSeR, DescrptSeT, + DescrptSeTTebd, ) from ....consistent.common import ( @@ -24,6 +25,7 @@ DescriptorParamSeA, DescriptorParamSeR, DescriptorParamSeT, + DescriptorParamSeTTebd, ) from ..backend import ( PTTestCase, @@ -35,6 +37,7 @@ (DescriptorParamSeA, DescrptSeA), (DescriptorParamSeR, DescrptSeR), (DescriptorParamSeT, DescrptSeT), + (DescriptorParamSeTTebd, DescrptSeTTebd), (DescriptorParamDPA1, DescrptDPA1), (DescriptorParamDPA2, DescrptDPA2), (DescriptorParamHybrid, DescrptHybrid), diff --git a/source/tests/universal/pt/model/test_model.py b/source/tests/universal/pt/model/test_model.py index 7e45b85248..235d501c65 100644 --- a/source/tests/universal/pt/model/test_model.py +++ b/source/tests/universal/pt/model/test_model.py @@ -14,6 +14,7 @@ DescrptSeA, DescrptSeR, DescrptSeT, + DescrptSeTTebd, ) from deepmd.pt.model.model import ( DipoleModel, @@ -51,12 +52,15 @@ DescriptorParamDPA2List, DescriptorParamHybrid, DescriptorParamHybridMixed, + DescriptorParamHybridMixedTTebd, DescriptorParamSeA, DescriptorParamSeAList, DescriptorParamSeR, DescriptorParamSeRList, DescriptorParamSeT, DescriptorParamSeTList, + DescriptorParamSeTTebd, + DescriptorParamSeTTebdList, ) from ...dpmodel.fitting.test_fitting import ( FittingParamDipole, @@ -79,6 +83,7 @@ DescriptorParamSeA, DescriptorParamSeR, DescriptorParamSeT, + DescriptorParamSeTTebd, DescriptorParamDPA1, DescriptorParamDPA2, DescriptorParamHybrid, @@ -98,10 +103,15 @@ *[(param_func, DescrptSeA) for param_func in DescriptorParamSeAList], *[(param_func, DescrptSeR) for param_func in DescriptorParamSeRList], *[(param_func, DescrptSeT) for param_func in DescriptorParamSeTList], + *[ + (param_func, DescrptSeTTebd) + for param_func in DescriptorParamSeTTebdList + ], *[(param_func, DescrptDPA1) for param_func in DescriptorParamDPA1List], *[(param_func, DescrptDPA2) for param_func in DescriptorParamDPA2List], (DescriptorParamHybrid, DescrptHybrid), (DescriptorParamHybridMixed, DescrptHybrid), + (DescriptorParamHybridMixedTTebd, DescrptHybrid), ), # descrpt_class_param & class ((FittingParamEnergy, EnergyFittingNet),), # fitting_class_param & class ), @@ -110,6 +120,7 @@ (DescriptorParamSeA, DescrptSeA), (DescriptorParamSeR, DescrptSeR), (DescriptorParamSeT, DescrptSeT), + (DescriptorParamSeTTebd, DescrptSeTTebd), (DescriptorParamDPA1, DescrptDPA1), (DescriptorParamDPA2, DescrptDPA2), ), # descrpt_class_param & class @@ -140,7 +151,7 @@ def setUpClass(cls): # set special precision if Descrpt in [DescrptDPA2]: cls.epsilon_dict["test_smooth"] = 1e-8 - if Descrpt in [DescrptSeT]: + if Descrpt in [DescrptSeT, DescrptSeTTebd]: # computational expensive cls.expected_sel = [i // 4 for i in cls.expected_sel] cls.expected_rcut = cls.expected_rcut / 2 @@ -194,10 +205,15 @@ def setUpClass(cls): *[(param_func, DescrptSeA) for param_func in DescriptorParamSeAList], *[(param_func, DescrptSeR) for param_func in DescriptorParamSeRList], *[(param_func, DescrptSeT) for param_func in DescriptorParamSeTList], + *[ + (param_func, DescrptSeTTebd) + for param_func in DescriptorParamSeTTebdList + ], *[(param_func, DescrptDPA1) for param_func in DescriptorParamDPA1List], *[(param_func, DescrptDPA2) for param_func in DescriptorParamDPA2List], (DescriptorParamHybrid, DescrptHybrid), (DescriptorParamHybridMixed, DescrptHybrid), + (DescriptorParamHybridMixedTTebd, DescrptHybrid), ), # descrpt_class_param & class ((FittingParamDos, DOSFittingNet),), # fitting_class_param & class ), @@ -206,6 +222,7 @@ def setUpClass(cls): (DescriptorParamSeA, DescrptSeA), (DescriptorParamSeR, DescrptSeR), (DescriptorParamSeT, DescrptSeT), + (DescriptorParamSeTTebd, DescrptSeTTebd), (DescriptorParamDPA1, DescrptDPA1), (DescriptorParamDPA2, DescrptDPA2), ), # descrpt_class_param & class @@ -237,7 +254,7 @@ def setUpClass(cls): cls.aprec_dict["test_smooth"] = 1e-4 if Descrpt in [DescrptDPA2]: cls.epsilon_dict["test_smooth"] = 1e-8 - if Descrpt in [DescrptSeT]: + if Descrpt in [DescrptSeT, DescrptSeTTebd]: # computational expensive cls.expected_sel = [i // 4 for i in cls.expected_sel] cls.expected_rcut = cls.expected_rcut / 2 @@ -473,6 +490,7 @@ def setUpClass(cls): *[(param_func, DescrptDPA1) for param_func in DescriptorParamDPA1List], *[(param_func, DescrptDPA2) for param_func in DescriptorParamDPA2List], (DescriptorParamHybridMixed, DescrptHybrid), + (DescriptorParamHybridMixedTTebd, DescrptHybrid), ), # descrpt_class_param & class ((FittingParamEnergy, EnergyFittingNet),), # fitting_class_param & class ), @@ -571,11 +589,16 @@ def setUpClass(cls): *[(param_func, DescrptSeA) for param_func in DescriptorParamSeAList], *[(param_func, DescrptSeR) for param_func in DescriptorParamSeRList], *[(param_func, DescrptSeT) for param_func in DescriptorParamSeTList], + *[ + (param_func, DescrptSeTTebd) + for param_func in DescriptorParamSeTTebdList + ], *[(param_func, DescrptDPA1) for param_func in DescriptorParamDPA1List], *[(param_func, DescrptDPA2) for param_func in DescriptorParamDPA2List], # (DescriptorParamHybrid, DescrptHybrid), # unsupported for SpinModel to hybrid both mixed_types and no-mixed_types descriptor (DescriptorParamHybridMixed, DescrptHybrid), + (DescriptorParamHybridMixedTTebd, DescrptHybrid), ), # descrpt_class_param & class ((FittingParamEnergy, EnergyFittingNet),), # fitting_class_param & class ), @@ -584,6 +607,7 @@ def setUpClass(cls): (DescriptorParamSeA, DescrptSeA), (DescriptorParamSeR, DescrptSeR), (DescriptorParamSeT, DescrptSeT), + (DescriptorParamSeTTebd, DescrptSeTTebd), (DescriptorParamDPA1, DescrptDPA1), (DescriptorParamDPA2, DescrptDPA2), ), # descrpt_class_param & class @@ -616,7 +640,7 @@ def setUpClass(cls): # set special precision if Descrpt in [DescrptDPA2, DescrptHybrid]: cls.epsilon_dict["test_smooth"] = 1e-8 - if Descrpt in [DescrptSeT]: + if Descrpt in [DescrptSeT, DescrptSeTTebd]: # computational expensive cls.expected_sel = [i // 4 for i in cls.expected_sel] cls.expected_rcut = cls.expected_rcut / 2