"""Base class for all descriptor calculators."""
from abc import abstractmethod
from functools import cached_property
import os
import tempfile
import ase
from ase.units import m
from ase.neighborlist import NeighborList, NewPrimitiveNeighborList
import numpy as np
from skspatial.objects import Plane
from mala.common.parameters import ParametersDescriptors, Parameters
from mala.common.parallelizer import (
get_comm,
printout,
get_rank,
get_size,
barrier,
parallel_warn,
set_lammps_instance,
)
from mala.common.physical_data import PhysicalData
from mala.descriptors.lammps_utils import set_cmdlinevars
[docs]
class Descriptor(PhysicalData):
"""
Base class for all descriptors available in MALA.
Descriptors encode the atomic fingerprint of a DFT calculation.
Parameters
----------
parameters : mala.common.parameters.Parameters
Parameters object used to create this object.
Attributes
----------
parameters: mala.common.parameters.ParametersDescriptors
MALA descriptor calculation parameters.
"""
##############################
# Constructors
##############################
def __new__(cls, params: Parameters = None):
"""
Create a Descriptor instance.
The correct type of descriptor calculator will automatically be
instantiated by this class if possible. You can also instantiate
the desired descriptor directly by calling upon the subclass.
Parameters
----------
params : mala.common.parametes.Parameters
Parameters used to create this descriptor calculator.
"""
descriptors = None
# Check if we're accessing through base class.
# If not, we need to return the correct object directly.
if cls == Descriptor:
if params.descriptors.descriptor_type == "Bispectrum":
from mala.descriptors.bispectrum import Bispectrum
descriptors = super(Descriptor, Bispectrum).__new__(Bispectrum)
if params.descriptors.descriptor_type == "AtomicDensity":
from mala.descriptors.atomic_density import AtomicDensity
descriptors = super(Descriptor, AtomicDensity).__new__(
AtomicDensity
)
if params.descriptors.descriptor_type == "MinterpyDescriptors":
from mala.descriptors.minterpy_descriptors import (
MinterpyDescriptors,
)
descriptors = super(Descriptor, MinterpyDescriptors).__new__(
MinterpyDescriptors
)
if descriptors is None:
raise Exception("Unsupported descriptor calculator.")
else:
descriptors = super(Descriptor, cls).__new__(cls)
# For pickling
setattr(descriptors, "params_arg", params)
return descriptors
def __getnewargs__(self):
"""
Get the necessary arguments to call __new__.
Used for pickling.
Returns
-------
params : mala.Parameters
The parameters object with which this object was created.
"""
return (self.params_arg,)
def __init__(self, parameters):
super(Descriptor, self).__init__(parameters)
self.parameters: ParametersDescriptors = parameters.descriptors
self.feature_size = 0 # so iterations will fail
self._in_format_ase = ""
self._atoms = None
self._voxel = None
# If we ever have NON LAMMPS descriptors, these parameters have no
# meaning anymore and should probably be moved to an intermediate
# DescriptorsLAMMPS class, from which the LAMMPS descriptors inherit.
self._lammps_temporary_input = None
self._lammps_temporary_log = None
##############################
# Properties
##############################
@property
def si_unit_conversion(self):
"""
Numeric value of the conversion from MALA (ASE) units to SI.
Needed for OpenPMD interface.
"""
return m**3
@property
def si_dimension(self):
"""
Dictionary containing the SI unit dimensions in OpenPMD format.
Needed for OpenPMD interface.
"""
import openpmd_api as io
return {io.Unit_Dimension.L: -3}
@property
def descriptors_contain_xyz(self):
"""Control whether descriptor vectors will contain xyz coordinates."""
return self.parameters.descriptors_contain_xyz
@descriptors_contain_xyz.setter
def descriptors_contain_xyz(self, value):
self.parameters.descriptors_contain_xyz = value
##############################
# Methods
##############################
# File I/O
##########
[docs]
@staticmethod
def convert_units(array, in_units="1/eV"):
"""
Convert descriptors from a specified unit into the ones used in MALA.
Parameters
----------
array : numpy.array
Data for which the units should be converted.
in_units : string
Units of array.
Returns
-------
converted_array : numpy.array
Data in MALA units.
"""
raise Exception(
"No unit conversion method implemented for this"
" descriptor type."
)
@property
def feature_size(self):
"""Get the feature dimension of this data."""
return self._feature_size
@feature_size.setter
def feature_size(self, value):
self._feature_size = value
[docs]
@staticmethod
def backconvert_units(array, out_units):
"""
Convert descriptors from MALA units into a specified unit.
Parameters
----------
array : numpy.array
Data in MALA units.
out_units : string
Desired units of output array.
Returns
-------
converted_array : numpy.array
Data in out_units.
"""
raise Exception(
"No unit back conversion method implemented for "
"this descriptor type."
)
[docs]
def setup_lammps_tmp_files(self, lammps_type, outdir):
"""
Create the temporary lammps input and log files.
Parameters
----------
lammps_type: str
Type of descriptor calculation (e.g. bgrid for bispectrum)
outdir: str
Directory where lammps files are kept
Returns
-------
None
"""
if get_rank() == 0:
prefix_inp_str = "lammps_" + lammps_type + "_input"
prefix_log_str = "lammps_" + lammps_type + "_log"
lammps_tmp_input_file = tempfile.NamedTemporaryFile(
delete=False, prefix=prefix_inp_str, suffix="_.tmp", dir=outdir
)
self._lammps_temporary_input = lammps_tmp_input_file.name
lammps_tmp_input_file.close()
lammps_tmp_log_file = tempfile.NamedTemporaryFile(
delete=False, prefix=prefix_log_str, suffix="_.tmp", dir=outdir
)
self._lammps_temporary_log = lammps_tmp_log_file.name
lammps_tmp_log_file.close()
else:
self._lammps_temporary_input = None
self._lammps_temporary_log = None
if self.parameters._configuration["mpi"]:
self._lammps_temporary_input = get_comm().bcast(
self._lammps_temporary_input, root=0
)
self._lammps_temporary_log = get_comm().bcast(
self._lammps_temporary_log, root=0
)
# Calculations
##############
[docs]
@staticmethod
def enforce_pbc(atoms):
"""
Explictly enforces the PBC on an ASE atoms object.
QE (and potentially other codes?) do that internally. Meaning that the
raw positions of atoms (in Angstrom) can lie outside of the unit cell.
When setting up the DFT calculation, these atoms get shifted into
the unit cell. Since we directly use these raw positions for the
descriptor calculation, we need to enforce that in the ASE atoms
objects, the atoms are explicitly in the unit cell.
Parameters
----------
atoms : ase.atoms
The ASE atoms object for which the PBC need to be enforced.
Returns
-------
new_atoms : ase.Atoms
The ASE atoms object for which the PBC have been enforced.
"""
new_atoms = atoms.copy()
new_atoms.set_scaled_positions(new_atoms.get_scaled_positions())
# This might be unecessary, but I think it is nice to have some sort of
# metric here.
rescaled_atoms = 0
for i in range(0, len(atoms)):
if False in (
np.isclose(
new_atoms[i].position, atoms[i].position, atol=0.001
)
):
rescaled_atoms += 1
printout(
"Descriptor calculation: had to enforce periodic boundary "
"conditions on",
rescaled_atoms,
"atoms before calculation.",
min_verbosity=2,
)
return new_atoms
[docs]
def calculate_from_qe_out(
self, qe_out_file, working_directory=".", **kwargs
):
"""
Calculate the descriptors based on a Quantum Espresso outfile.
Parameters
----------
qe_out_file : string
Name of Quantum Espresso output file for snapshot.
working_directory : string
A directory in which to write the output of the LAMMPS calculation.
Usually the local directory should suffice, given that there
are no multiple instances running in the same directory.
kwargs : dict
A collection of keyword arguments, that are mainly used for
debugging and development. Different types of descriptors
may support different keyword arguments. Commonly supported
are
- "use_fp64": To use enforce floating point 64 precision for
descriptors.
- "keep_logs": To not delete temporary files created during
LAMMPS calculation of descriptors.
Returns
-------
descriptors : numpy.array
Numpy array containing the descriptors with the dimension
(x,y,z,descriptor_dimension)
"""
self._in_format_ase = "espresso-out"
printout("Calculating descriptors from", qe_out_file, min_verbosity=0)
# We get the atomic information by using ASE.
self._atoms = ase.io.read(qe_out_file, format=self._in_format_ase)
# Enforcing / Checking PBC on the read atoms.
self._atoms = self.enforce_pbc(self._atoms)
# Get the grid dimensions.
if "grid_dimensions" in kwargs.keys():
self.grid_dimensions = kwargs["grid_dimensions"]
# Deleting this keyword from the list to avoid conflict with
# dict below.
del kwargs["grid_dimensions"]
else:
qe_outfile = open(qe_out_file, "r")
lines = qe_outfile.readlines()
self.grid_dimensions = [0, 0, 0]
for line in lines:
if "FFT dimensions" in line:
tmp = line.split("(")[1].split(")")[0]
self.grid_dimensions[0] = int(tmp.split(",")[0])
self.grid_dimensions[1] = int(tmp.split(",")[1])
self.grid_dimensions[2] = int(tmp.split(",")[2])
break
self._voxel = self._atoms.cell.copy()
self._voxel[0] = self._voxel[0] / (self.grid_dimensions[0])
self._voxel[1] = self._voxel[1] / (self.grid_dimensions[1])
self._voxel[2] = self._voxel[2] / (self.grid_dimensions[2])
return self._calculate(working_directory, **kwargs)
[docs]
def calculate_from_atoms(
self, atoms, grid_dimensions, working_directory=".", **kwargs
):
"""
Calculate the bispectrum descriptors based on atomic configurations.
Parameters
----------
atoms : ase.Atoms
Atoms object holding the atomic configuration.
grid_dimensions : list
Grid dimensions to be used, in the format [x,y,z].
working_directory : string
A directory in which to write the output of the LAMMPS calculation.
Usually the local directory should suffice, given that there
are no multiple instances running in the same directory.
kwargs : dict
A collection of keyword arguments, that are mainly used for
debugging and development. Different types of descriptors
may support different keyword arguments. Commonly supported
are
- "use_fp64": To use enforce floating point 64 precision for
descriptors.
- "keep_logs": To not delete temporary files created during
LAMMPS calculation of descriptors.
Returns
-------
descriptors : numpy.array
Numpy array containing the descriptors with the dimension
(x,y,z,descriptor_dimension)
"""
# Enforcing / Checking PBC on the input atoms.
self._atoms = self.enforce_pbc(atoms)
self.grid_dimensions = grid_dimensions
self._voxel = self._atoms.cell.copy()
self._voxel[0] = self._voxel[0] / (self.grid_dimensions[0])
self._voxel[1] = self._voxel[1] / (self.grid_dimensions[1])
self._voxel[2] = self._voxel[2] / (self.grid_dimensions[2])
return self._calculate(working_directory, **kwargs)
[docs]
def gather_descriptors(self, descriptors_np, use_pickled_comm=False):
"""
Gathers all descriptors on rank 0 and sorts them.
This is useful for e.g. parallel preprocessing.
This function removes the extra 3 components that come from parallel
processing.
I.e. if we have 91 bispectrum descriptors, LAMMPS directly outputs us
97 (in parallel mode), and this function returns 94, as to retain the
3 x,y,z ones we by default include.
Parameters
----------
descriptors_np : numpy.array
Numpy array with the descriptors of this ranks local grid.
use_pickled_comm : bool
If True, the pickled communication route from mpi4py is used.
If False, a Recv/Sendv combination is used. I am not entirely
sure what is faster. Technically Recv/Sendv should be faster,
but I doubt my implementation is all that optimal. For the pickled
route we can use gather(), which should be fairly quick.
However, for large grids, one CANNOT use the pickled route;
too large python objects will break it. Therefore, I am setting
the Recv/Sendv route as default.
"""
# Barrier to make sure all ranks have descriptors..
comm = get_comm()
barrier()
# Gather the descriptors into a list.
if use_pickled_comm:
all_descriptors_list = comm.gather(descriptors_np, root=0)
else:
sendcounts = np.array(
comm.gather(np.shape(descriptors_np)[0], root=0)
)
raw_feature_length = self.feature_size + 3
if get_rank() == 0:
# print("sendcounts: {}, total: {}".format(sendcounts,
# sum(sendcounts)))
# Preparing the list of buffers.
all_descriptors_list = []
for i in range(0, get_size()):
all_descriptors_list.append(
np.empty(
sendcounts[i] * raw_feature_length,
dtype=descriptors_np.dtype,
)
)
# No MPI necessary for first rank. For all the others,
# collect the buffers.
all_descriptors_list[0] = descriptors_np
for i in range(1, get_size()):
comm.Recv(all_descriptors_list[i], source=i, tag=100 + i)
all_descriptors_list[i] = np.reshape(
all_descriptors_list[i],
(sendcounts[i], raw_feature_length),
)
else:
comm.Send(descriptors_np, dest=0, tag=get_rank() + 100)
barrier()
# if get_rank() == 0:
# printout(np.shape(all_descriptors_list[0]))
# printout(np.shape(all_descriptors_list[1]))
# printout(np.shape(all_descriptors_list[2]))
# printout(np.shape(all_descriptors_list[3]))
# Dummy for the other ranks.
# (For now, might later simply broadcast to other ranks).
descriptors_full = np.zeros([1, 1, 1, 1])
# Reorder the list.
if get_rank() == 0:
# Prepare the descriptor array.
nx = self.grid_dimensions[0]
ny = self.grid_dimensions[1]
nz = self.grid_dimensions[2]
descriptors_full = np.zeros([nx, ny, nz, self.feature_size])
# Fill the full bispectrum descriptors array.
for idx, local_grid in enumerate(all_descriptors_list):
# We glue the individual cells back together, and transpose.
first_x = int(local_grid[0][0])
first_y = int(local_grid[0][1])
first_z = int(local_grid[0][2])
last_x = int(local_grid[-1][0]) + 1
last_y = int(local_grid[-1][1]) + 1
last_z = int(local_grid[-1][2]) + 1
descriptors_full[
first_x:last_x, first_y:last_y, first_z:last_z
] = np.reshape(
local_grid[:, 3:],
[
last_z - first_z,
last_y - first_y,
last_x - first_x,
self.feature_size,
],
).transpose(
[2, 1, 0, 3]
)
# Leaving this in here for debugging purposes.
# This is the slow way to reshape the descriptors.
# for entry in local_grid:
# x = int(entry[0])
# y = int(entry[1])
# z = int(entry[2])
# descriptors_full[x, y, z] = entry[3:]
if self.parameters.descriptors_contain_xyz:
return descriptors_full
else:
return descriptors_full[:, :, :, 3:]
[docs]
def convert_local_to_3d(self, descriptors_np):
"""
Convert the desciptors as done in the gather function, but per rank.
This is useful for e.g. parallel preprocessing.
This function removes the extra 3 components that come from parallel
processing.
I.e. if we have 91 bispectrum descriptors, LAMMPS directly outputs us
97 (in parallel mode), and this function returns 94, as to retain the
3 x,y,z ones we by default include.
Parameters
----------
descriptors_np : numpy.array
Numpy array with the descriptors of this ranks local grid.
"""
local_offset = [None, None, None]
local_reach = [None, None, None]
local_offset[0] = int(descriptors_np[0][0])
local_offset[1] = int(descriptors_np[0][1])
local_offset[2] = int(descriptors_np[0][2])
local_reach[0] = int(descriptors_np[-1][0]) + 1
local_reach[1] = int(descriptors_np[-1][1]) + 1
local_reach[2] = int(descriptors_np[-1][2]) + 1
nx = local_reach[0] - local_offset[0]
ny = local_reach[1] - local_offset[1]
nz = local_reach[2] - local_offset[2]
descriptors_full = np.zeros([nx, ny, nz, self.feature_size])
descriptors_full[0:nx, 0:ny, 0:nz] = np.reshape(
descriptors_np[:, 3:], [nz, ny, nx, self.feature_size]
).transpose([2, 1, 0, 3])
return descriptors_full, local_offset, local_reach
# Private methods
#################
def _process_loaded_array(self, array, units=None):
array *= self.convert_units(1, in_units=units)
def _process_loaded_dimensions(self, array_dimensions):
if self.descriptors_contain_xyz:
return (
array_dimensions[0],
array_dimensions[1],
array_dimensions[2],
array_dimensions[3] - 3,
)
else:
return array_dimensions
def _set_geometry_info(self, mesh):
# Geometry: Save the cell parameters and angles of the grid.
if self._atoms is not None:
import openpmd_api as io
self._voxel = self._atoms.cell.copy()
self._voxel[0] = self._voxel[0] / (self.grid_dimensions[0])
self._voxel[1] = self._voxel[1] / (self.grid_dimensions[1])
self._voxel[2] = self._voxel[2] / (self.grid_dimensions[2])
mesh.geometry = io.Geometry.cartesian
mesh.grid_spacing = self._voxel.cellpar()[0:3]
mesh.set_attribute("angles", self._voxel.cellpar()[3:])
def _get_atoms(self):
return self._atoms
def _feature_mask(self):
if self.descriptors_contain_xyz:
return 3
else:
return 0
def _setup_lammps(self, nx, ny, nz, lammps_dict):
"""
Set up the lammps processor grid.
Takes into account y/z-splitting.
"""
from lammps import lammps
# Build LAMMPS arguments from the data we read.
lmp_cmdargs = [
"-screen",
"none",
"-log",
self._lammps_temporary_log,
]
lammps_dict["atom_config_fname"] = self._lammps_temporary_input
if self.parameters._configuration["mpi"]:
size = get_size()
# for parallel tem need to set lammps commands: processors and
# balance current implementation is to match lammps mpi processor
# grid to QE processor splitting QE distributes grid points in
# parallel as slices along z axis currently grid points fall on z
# axix plane cutoff values in lammps this leads to some ranks
# having 0 grid points and other having 2x gridpoints
# balance command in lammps aleviates this issue
# integers for plane cuts in z axis appear to be most important
#
# determine if nyfft flag is set so that QE also parallelizes
# along y axis if nyfft is true lammps mpi processor grid needs to
# be 1x{ny}x{nz} need to configure separate total_energy_module
# with nyfft enabled
if self.parameters.use_y_splitting > 1:
# TODO automatically pass nyfft into QE from MALA
# if more processors thatn y*z grid dimensions requested
# send error. More processors than y*z grid dimensions reduces
# efficiency and scaling of QE.
nyfft = self.parameters.use_y_splitting
# number of y processors is equal to nyfft
yprocs = nyfft
# number of z processors is equal to total processors/nyfft is
# nyfft is used else zprocs = size
if size % yprocs == 0:
zprocs = int(size / yprocs)
else:
raise ValueError(
"Cannot evenly divide z-planes in y-direction"
)
# check if total number of processors is greater than number of
# grid sections produce error if number of processors is
# greater than grid partions - will cause mismatch later in QE
mpi_grid_sections = yprocs * zprocs
if mpi_grid_sections < size:
raise ValueError(
"More processors than grid sections. "
"This will cause a crash further in the "
"calculation. Choose a total number of "
"processors equal to or less than the "
"total number of grid sections requsted "
"for the calculation (nyfft*nz)."
)
# TODO not sure what happens when size/nyfft is not integer -
# further testing required
# set the mpi processor grid for lammps
lammps_procs = f"1 {yprocs} {zprocs}"
printout(
"mpi grid with nyfft: ", lammps_procs, min_verbosity=2
)
# prepare y plane cuts for balance command in lammps if not
# integer value
if int(ny / yprocs) == (ny / yprocs):
ycut = 1 / yprocs
yint = ""
for i in range(0, yprocs - 1):
yvals = ((i + 1) * ycut) - 0.00000001
yint += format(yvals, ".8f")
yint += " "
else:
# account for remainder with uneven number of
# planes/processors
ycut = 1 / yprocs
yrem = ny - (yprocs * int(ny / yprocs))
yint = ""
for i in range(0, yrem):
yvals = (((i + 1) * 2) * ycut) - 0.00000001
yint += format(yvals, ".8f")
yint += " "
for i in range(yrem, yprocs - 1):
yvals = ((i + 1 + yrem) * ycut) - 0.00000001
yint += format(yvals, ".8f")
yint += " "
# prepare z plane cuts for balance command in lammps
if int(nz / zprocs) == (nz / zprocs):
zcut = 1 / nz
zint = ""
for i in range(0, zprocs - 1):
zvals = ((i + 1) * (nz / zprocs) * zcut) - 0.00000001
zint += format(zvals, ".8f")
zint += " "
else:
# account for remainder with uneven number of
# planes/processors
raise ValueError(
"Cannot divide z-planes on processors"
" without remainder. "
"This is currently unsupported."
)
# zcut = 1/nz
# zrem = nz - (zprocs*int(nz/zprocs))
# zint = ''
# for i in range(0, zrem):
# zvals = (((i+1)*2)*zcut)-0.00000001
# zint += format(zvals, ".8f")
# zint += ' '
# for i in range(zrem, zprocs-1):
# zvals = ((i+1+zrem)*zcut)-0.00000001
# zint += format(zvals, ".8f")
# zint += ' '
lammps_dict["lammps_procs"] = (
f"processors {lammps_procs} " f"map xyz"
)
lammps_dict["zbal"] = f"balance 1.0 y {yint} z {zint}"
lammps_dict["ngridx"] = nx
lammps_dict["ngridy"] = ny
lammps_dict["ngridz"] = nz
lammps_dict["switch"] = self.parameters.bispectrum_switchflag
else:
if self.parameters.use_z_splitting:
# when nyfft is not used only split processors along z axis
size = get_size()
zprocs = size
# check to make sure number of z planes is not less than
# processors. If more processors than planes calculation
# efficiency decreases
if nz < size:
raise ValueError(
"More processors than grid sections. "
"This will cause a crash further in "
"the calculation. Choose a total "
"number of processors equal to or "
"less than the total number of grid "
"sections requsted for the "
"calculation (nz)."
)
# match lammps mpi grid to be 1x1x{zprocs}
lammps_procs = f"1 1 {zprocs}"
# print("mpi grid z only: ", lammps_procs)
# prepare z plane cuts for balance command in lammps
if int(nz / zprocs) == (nz / zprocs):
printout("No remainder in z")
zcut = 1 / nz
zint = ""
for i in range(0, zprocs - 1):
zvals = (
(i + 1) * (nz / zprocs) * zcut
) - 0.00000001
zint += format(zvals, ".8f")
zint += " "
else:
# raise ValueError("Cannot divide z-planes on processors"
# " without remainder. "
# "This is currently unsupported.")
zcut = 1 / nz
zrem = nz - (zprocs * int(nz / zprocs))
zint = ""
for i in range(0, zrem):
zvals = (
((i + 1) * (int(nz / zprocs) + 1)) * zcut
) - 0.00000001
zint += format(zvals, ".8f")
zint += " "
for i in range(zrem, zprocs - 1):
zvals = (
((i + 1) * int(nz / zprocs) + zrem) * zcut
) - 0.00000001
zint += format(zvals, ".8f")
zint += " "
lammps_dict["lammps_procs"] = f"processors {lammps_procs}"
lammps_dict["zbal"] = f"balance 1.0 z {zint}"
lammps_dict["ngridx"] = nx
lammps_dict["ngridy"] = ny
lammps_dict["ngridz"] = nz
lammps_dict["switch"] = (
self.parameters.bispectrum_switchflag
)
else:
lammps_dict["ngridx"] = nx
lammps_dict["ngridy"] = ny
lammps_dict["ngridz"] = nz
lammps_dict["switch"] = (
self.parameters.bispectrum_switchflag
)
else:
size = 1
lammps_dict["ngridx"] = nx
lammps_dict["ngridy"] = ny
lammps_dict["ngridz"] = nz
lammps_dict["switch"] = self.parameters.bispectrum_switchflag
if self.parameters._configuration["gpu"]:
# Tell Kokkos to use one GPU.
lmp_cmdargs.append("-k")
lmp_cmdargs.append("on")
lmp_cmdargs.append("g")
lmp_cmdargs.append(str(size))
# Tell LAMMPS to use Kokkos versions of those commands for
# which a Kokkos version exists.
lmp_cmdargs.append("-sf")
lmp_cmdargs.append("kk")
pass
lmp_cmdargs = set_cmdlinevars(lmp_cmdargs, lammps_dict)
lmp = lammps(cmdargs=lmp_cmdargs)
set_lammps_instance(lmp)
return lmp
def _clean_calculation(self, lmp, keep_logs):
lmp.close()
if not keep_logs:
if get_rank() == 0:
os.remove(self._lammps_temporary_log)
os.remove(self._lammps_temporary_input)
def _setup_atom_list(self):
"""
Set up a list of atoms potentially relevant for descriptor calculation.
If periodic boundary conditions are used, which is usually the case
for MALA simulation, one has to compute descriptors by also
incorporating atoms from neighboring cells.
FURTHER OPTIMIZATION: Probably not that much, this mostly already uses
optimized python functions.
"""
if np.any(self._atoms.pbc):
# To determine the list of relevant atoms we first take the edges
# of the simulation cell and use them to determine all cells
# which hold atoms that _may_ be relevant for the calculation.
edges = list(
np.array(
[
[0, 0, 0],
[1, 0, 0],
[0, 1, 0],
[0, 0, 1],
[1, 1, 1],
[0, 1, 1],
[1, 0, 1],
[1, 1, 0],
]
)
* np.array(self.grid_dimensions)
)
all_cells_list = None
# For each edge point create a neighborhoodlist to all cells
# given by the cutoff radius.
for edge in edges:
edge_point = self._grid_to_coord(edge)
neighborlist = NeighborList(
np.zeros(len(self._atoms) + 1)
+ [self.parameters.atomic_density_cutoff],
bothways=True,
self_interaction=False,
primitive=NewPrimitiveNeighborList,
)
atoms_with_grid_point = self._atoms.copy()
# Construct a ghost atom representing the grid point.
atoms_with_grid_point.append(ase.Atom("H", edge_point))
neighborlist.update(atoms_with_grid_point)
indices, offsets = neighborlist.get_neighbors(len(self._atoms))
# Incrementally fill the list containing all cells to be
# considered.
if all_cells_list is None:
all_cells_list = np.unique(offsets, axis=0)
else:
all_cells_list = np.concatenate(
(all_cells_list, np.unique(offsets, axis=0))
)
# Delete the original cell from the list of all cells.
# This is to avoid double checking of atoms below.
all_cells = np.unique(all_cells_list, axis=0)
idx = 0
for a in range(0, len(all_cells)):
if (all_cells[a, :] == np.array([0, 0, 0])).all():
break
idx += 1
all_cells = np.delete(all_cells, idx, axis=0)
# Create an object to hold all relevant atoms.
# First, instantiate it by filling it will all atoms from all
# potentiall relevant cells, as identified above.
all_atoms = None
for a in range(0, len(self._atoms)):
if all_atoms is None:
all_atoms = (
self._atoms.positions[a]
+ all_cells @ self._atoms.get_cell()
)
else:
all_atoms = np.concatenate(
(
all_atoms,
self._atoms.positions[a]
+ all_cells @ self._atoms.get_cell(),
)
)
# Next, construct the planes forming the unit cell.
# Atoms from neighboring cells are only included in the list of
# all relevant atoms, if they have a distance to any of these
# planes smaller than the cutoff radius. Elsewise, they would
# not be included in the eventual calculation anyhow.
planes = [
[[0, 1, 0], [0, 0, 1], [0, 0, 0]],
[
[self.grid_dimensions[0], 1, 0],
[self.grid_dimensions[0], 0, 1],
self.grid_dimensions,
],
[[1, 0, 0], [0, 0, 1], [0, 0, 0]],
[
[1, self.grid_dimensions[1], 0],
[0, self.grid_dimensions[1], 1],
self.grid_dimensions,
],
[[1, 0, 0], [0, 1, 0], [0, 0, 0]],
[
[1, 0, self.grid_dimensions[2]],
[0, 1, self.grid_dimensions[2]],
self.grid_dimensions,
],
]
all_distances = []
for plane in planes:
curplane = Plane.from_points(
self._grid_to_coord(plane[0]),
self._grid_to_coord(plane[1]),
self._grid_to_coord(plane[2]),
)
distances = []
# TODO: This may be optimized, and formulated in an array
# operation.
for a in range(np.shape(all_atoms)[0]):
distances.append(curplane.distance_point(all_atoms[a]))
all_distances.append(distances)
all_distances = np.array(all_distances)
all_distances = np.min(all_distances, axis=0)
all_atoms = np.squeeze(
all_atoms[
np.argwhere(
all_distances < self.parameters.atomic_density_cutoff
),
:,
]
)
return np.concatenate((all_atoms, self._atoms.positions))
else:
# If no PBC are used, only consider a single cell.
return self._atoms.positions
def _grid_to_coord(self, gridpoint):
# Convert grid indices to real space grid point.
i = gridpoint[0]
j = gridpoint[1]
k = gridpoint[2]
# Orthorhombic cells and triclinic ones have
# to be treated differently, see domain.cpp
if self._atoms.cell.orthorhombic:
return np.diag(self._voxel) * [i, j, k]
else:
ret = [0, 0, 0]
ret[0] = (
i / self.grid_dimensions[0] * self._atoms.cell[0, 0]
+ j / self.grid_dimensions[1] * self._atoms.cell[1, 0]
+ k / self.grid_dimensions[2] * self._atoms.cell[2, 0]
)
ret[1] = (
j / self.grid_dimensions[1] * self._atoms.cell[1, 1]
+ k / self.grid_dimensions[2] * self._atoms.cell[1, 2]
)
ret[2] = k / self.grid_dimensions[2] * self._atoms.cell[2, 2]
return np.array(ret)
@abstractmethod
def _calculate(self, outdir, **kwargs):
pass
def _set_feature_size_from_array(self, array):
self.feature_size = np.shape(array)[-1]