Source code for pbcluster.cluster

# -*- coding:utf-8 -*-
#
# cluster.py

"""Cluster module."""

import networkx as nx
import numpy as np
import pandas as pd

from .utils import flatten_dict
from .utils import get_within_cutoff_matrix
from .utils import pairwise_distances


[docs]class Cluster: """Object to store and compute data about an individual particle cluster Args: graph (networkx Graph): Contains nodes and edges corresponding to particles and bonds, respectively, where a bond implies the particles are within a distance of `cutoff_distance` from each other. particle_df (dataframe): Dataframe where index is `particle_id`, and there are `n_dimensions` columns labelled `x0`, x1`, ... `xN` box_lengths (ndarray): Must contain `n_dimensions` values representing the lengths of each dimension of a rectangular box. cutoff_distance (float): Maximum distance two particles can be from each other to be considered part of the same cluster Attributes: graph (networkx Graph): Contains nodes and edges corresponding to particles and bonds, respectively, where a bond implies the particles are within a distance of `cutoff_distance` from each other. particle_df (dataframe): Dataframe where index is `particle_id`, and there are `n_dimensions` columns labelled `x0`, x1`, ... `xN` box_lengths (ndarray): Must contain `n_dimensions` values representing the lengths of each dimension of a rectangular box. cutoff_distance (float): Maximum distance two particles can be from each other to be considered part of the same cluster n_dimensions (int): Number of dimensions in the system n_particles (int): Number of particles in the cluster """ def __init__(self, graph, particle_df, box_lengths, cutoff_distance): self._cluster_property_map = dict( n_particles=self.compute_n_particles, minimum_node_cuts=self.compute_minimum_node_cuts, center_of_mass=self.compute_center_of_mass, unwrapped_center_of_mass=self.compute_unwrapped_center_of_mass, rg=self.compute_rg, asphericity=self.compute_asphericity, ) self._particle_property_map = dict( coordination_number=self.compute_coordination_number, distance_from_com=self.compute_distance_from_com, ) self.graph = graph self.particle_df = particle_df.copy() self.box_lengths = box_lengths self.cutoff_distance = cutoff_distance self.n_dimensions = len(box_lengths) self.n_particles = len(particle_df) self._minimum_node_cuts_dict = None self._unwrapped_x_df = None self._unwrapped_center_of_mass_dict = None self._center_of_mass_dict = None self._gyration_tensor = None self._gyration_eigenvals = None self._rg = None def _split_edges_with_faces_1_dim(self, graph, dim): """Breaks all edges that cross the `dim`-dimension's periodic boundary condition (PBC). Replaces those edges with edges connecting the lower particle to the lower wall and the higher particle to the higher wall Args: graph (networkx Graph): [description] dim (int): Dimension in which to break the PBC. (0, 1, 2, ... etc.) Returns: networkx Graph: Copy of the input graph, modified to replace PBC-crossing edges with conections to "face" nodes """ graph = graph.copy() dim_str = f"x{dim}" low_face_node_position = { f"x{d}": None for d in range(self.n_dimensions) } low_face_node_position[dim_str] = 0 high_face_node_position = { f"x{d}": None for d in range(self.n_dimensions) } high_face_node_position[dim_str] = self.box_lengths[dim] low_face_node_str = f"{dim_str}_low" high_face_node_str = f"{dim_str}_high" graph.add_node(low_face_node_str, **low_face_node_position) graph.add_node(high_face_node_str, **high_face_node_position) edges_to_add = [] edges_to_remove = [] for u, v in graph.edges: if isinstance(u, str) or isinstance(v, str): # Only the face nodes are strings continue u_x = self.particle_df.loc[u, dim_str] v_x = self.particle_df.loc[v, dim_str] if np.abs(u_x - v_x) > self.cutoff_distance: edges_to_remove.append((u, v)) if u_x < v_x: low_node = u high_node = v else: low_node = v high_node = u edges_to_add.append((low_face_node_str, low_node)) edges_to_add.append((high_face_node_str, high_node)) for u, v in edges_to_add: graph.add_edge(u, v) for u, v in edges_to_remove: graph.remove_edge(u, v) return graph
[docs] def compute_cluster_properties(self, properties=["n_particles"]): """Compute cluster properties passed in `properties` variable Args: properties (list or str, optional): List of cluster properties to compute, or "all" to compute all available properties. Defaults to ["n_particles"]. Returns: dict: `property_name → property_value` key-value pairs """ if properties == "all": properties = self._cluster_property_map.keys() cluster_properties_dict = dict() for prop in properties: if prop not in self._cluster_property_map: raise ValueError(f"Property '{prop}' is not valid!") prop_function = self._cluster_property_map[prop] cluster_properties_dict[prop] = prop_function() cluster_properties_dict = flatten_dict(cluster_properties_dict) return cluster_properties_dict
[docs] def compute_particle_properties(self, properties=["coordination_number"]): """Compute particle properties passed in `properties` variable Args: properties (list or str, optional): List of particle properties to compute, or "all" to compute all available properties. Defaults to ["coordination_number"]. Returns: dataframe: Shape (`n_particles`, `n_dimensions` + `n_properties`) `particle_id` as index, `x*` and particle property columns. """ if properties == "all": properties = self._particle_property_map.keys() particle_df = self.particle_df.copy() for prop in properties: if prop not in self._particle_property_map: raise ValueError(f"Property '{prop}' is not valid!") prop_function = self._particle_property_map[prop] property_df = prop_function() assert np.all(property_df.index == self.particle_df.index) particle_df = particle_df.join(property_df, how="left") return particle_df
[docs] def compute_bonds(self): """Returns a dataframe with 2 columns, where each row has a pair of `particle_id`s associated with bonded particles Returns: dataframe: Shape `(n_bonds, 2)`. Column names `particle_id_1` and `particle_id_2`. """ bonds_df = pd.DataFrame( self.graph.edges(), columns=["particle_id_1", "particle_id_2"] ).sort_values(["particle_id_1", "particle_id_2"]) return bonds_df
###################### # Cluster Properties # ######################
[docs] def compute_n_particles(self): """Returns the number of particles in the cluster Returns: int: number of particles in the cluster """ return self.n_particles
[docs] def compute_minimum_node_cuts(self): """Returns dictionary of minimum node cuts required to break the connection between faces normal to a given direction. Returns: dict: `dimension_str → minimum_node_cuts` key-value pairs """ # If this was already computed, return the stored dictionary if self._minimum_node_cuts_dict is not None: return self._minimum_node_cuts_dict minimum_node_cuts_dict = dict() for dim in range(self.n_dimensions): split_graph = self._split_edges_with_faces_1_dim(self.graph, dim) node_cut = nx.minimum_node_cut( split_graph, f"x{dim}_low", f"x{dim}_high" ) minimum_node_cuts_dict[f"x{dim}"] = len(node_cut) # Store this because other computations like center of mass rely on it self._minimum_node_cuts_dict = minimum_node_cuts_dict return minimum_node_cuts_dict
[docs] def compute_center_of_mass(self, wrapped=True): """Returns cluster center of mass dictionary Args: wrapped (boolean, optional): If True, a center of mass that falls outside the box bounds is forced to be in range [0, `box_lengths[d]`) for each dimension `d`. If using this to compare to unwrapped particle coordinates, leave as False. Defaults to False. Returns: dict: `{"x0": x0, "x1": x1, ...}` """ if wrapped is True and self._center_of_mass_dict is not None: return self._center_of_mass_dict if ( wrapped is False and self._unwrapped_center_of_mass_dict is not None ): return self._unwrapped_center_of_mass_dict unwrapped_x_df = self._compute_unwrapped_x() if unwrapped_x_df is None: # _compute_unwrapped_x returns None if the particles bridge the # faces of at least 1 dimension. If that's the case, center of mass # can't necessarily be computed either center_of_mass = { f"x{d}": np.nan for d in range(self.n_dimensions) } self._unwrapped_center_of_mass_dict = center_of_mass self._center_of_mass_dict = center_of_mass return center_of_mass unwrapped_x_df.columns = [f"x{d}" for d in range(self.n_dimensions)] center_of_mass = unwrapped_x_df.mean(axis=0) self._unwrapped_center_of_mass_dict = center_of_mass.to_dict() if wrapped is True: while np.any(center_of_mass < 0) or np.any( center_of_mass > self.box_lengths ): center_of_mass = np.where( center_of_mass < 0, center_of_mass + self.box_lengths, center_of_mass, ) center_of_mass = np.where( center_of_mass >= self.box_lengths, center_of_mass - self.box_lengths, center_of_mass, ) self._center_of_mass_dict = { f"x{i}": v for i, v in enumerate(center_of_mass) } return self._center_of_mass_dict else: return self._unwrapped_center_of_mass_dict
[docs] def compute_unwrapped_center_of_mass(self): """Returns unwrapped center of mass, meaning it's the center of mass of the unwrapped particle coordinates, and isn't necessarily inside the box coordinates. Returns: dict: Unwrapped center of mass coordinates, `"x*" → number` key-value pairs. Technically, no max or min restriction, but probably within 1 period of the box bounds. """ return self.compute_center_of_mass(wrapped=False)
def _compute_gyration_tensor(self): """Returns cluster gyration tensor Returns: ndarray: Shape (`n_dimensions`, `n_dimensions`) gyration tensor """ if self._gyration_tensor is not None: return self._gyration_tensor dx_from_com = self.compute_distance_from_com( include_distance=False ).values if np.isnan(dx_from_com).sum() > 0: # If there are NaN values, that means it's a percolated cluster gyration_tensor = np.nan * np.ones( (self.n_dimensions, self.n_dimensions) ) else: # This implements the first equation in # https://en.wikipedia.org/wiki/Gyration_tensor gyration_tensor = ( np.sum( dx_from_com[:, :, None] * dx_from_com[:, None, :], axis=0 ) / self.n_particles ) # Make sure gyration_tensor is symmetric assert np.allclose(gyration_tensor, gyration_tensor.T) self._gyration_tensor = gyration_tensor return gyration_tensor def _compute_gyration_eigenvals(self): """Returns numpy array of eigenvalues of the gyration tensor. Values are not sorted. Returns: ndarry: Shape (`n_dimensions`,) eigenvalues array """ if self._gyration_eigenvals is not None: return self._gyration_eigenvals gyration_tensor = self._compute_gyration_tensor() if np.isnan(gyration_tensor).sum() > 0: # NaNs exist if the cluster is percolated eigenvals = np.nan * np.ones(self.n_dimensions) else: eigenvals, eigenvecs = np.linalg.eig(gyration_tensor) # Make sure all the numbers are real assert np.isclose(np.sum(np.abs(np.imag(eigenvals))), 0.0) # Drop the 0 imaginary part if it's there eigenvals = eigenvals.real self._gyration_eigenvals = eigenvals return eigenvals
[docs] def compute_rg(self): """Returns cluster radius of gyration. Returns: float: Cluster radius of gyration """ if self._rg is not None: return self._rg eigenvals = self._compute_gyration_eigenvals() if np.any(np.isnan(eigenvals)): rg = np.nan else: rg = np.sqrt(np.sum(eigenvals ** 2)) self._rg = rg return rg
[docs] def compute_asphericity(self): """Returns cluster asphericity (see https://en.wikipedia.org/wiki/Gyration_tensor#Shape_descriptors) Returns: float: Asphericity, normalized by radius of gyration squared """ rg = self.compute_rg() if rg == 0.0: asphericity = 0 elif np.isnan(rg): asphericity = np.nan else: scaled_eigenvals = self._compute_gyration_eigenvals() / rg evals_squared = np.sort(scaled_eigenvals ** 2) asphericity = evals_squared[-1] - np.mean(evals_squared[:-1]) return asphericity
####################### # Particle Properties # #######################
[docs] def compute_coordination_number(self): """Returns a dataframe of coordination numbers corresponding to each particle in the cluster Returns: dataframe: Coordination numbers for particles in the cluster. Index is `particle_id`s and matches `particle_df.index` """ distances = pairwise_distances(self.particle_df, self.box_lengths) within_cutoff_matrix = get_within_cutoff_matrix( distances, self.cutoff_distance ) coordination_numbers = within_cutoff_matrix.sum(axis=1).astype(int) coordination_numbers_df = pd.DataFrame( dict(coordination_number=coordination_numbers), index=self.particle_df.index, ) return coordination_numbers_df
def _compute_unwrapped_x(self): """Returns unwrapped particle coordinates dataframe Returns: dataframe: Index is `particle_id`, matching index of `particle_df`. Columns are `unwrapped_x*` where `*` represents 0, 1, ... `n_particles` """ if self._unwrapped_x_df is not None: return self._unwrapped_x_df minimum_node_cuts_dict = self.compute_minimum_node_cuts() n_node_cuts = sum(value for value in minimum_node_cuts_dict.values()) # If n_node_cuts is greater than 0, that means the cluster spans the # length of at least 1 dimension, and a center of mass can't # necessarily be computed. if n_node_cuts > 0: return None column_names_1 = [f"x{d}" for d in range(self.n_dimensions)] column_names_2 = [f"unwrapped_x{d}" for d in range(self.n_dimensions)] if len(self.graph) == 1: unwrapped_x_df = self.particle_df.filter(column_names_1).copy() unwrapped_x_df.columns = column_names_2 return unwrapped_x_df x_array_dict = dict() first = True x_df = self.particle_df.filter(column_names_1) for node_1, node_2 in nx.dfs_edges(self.graph): if first: x_array_1 = x_df.loc[node_1, :].values x_array_dict[node_1] = x_array_1 first = False else: x_array_1 = x_array_dict[node_1] x_array_2 = x_df.loc[node_2, :].values dx_array = x_array_2 - x_array_1 dx_array = np.where( dx_array < -self.box_lengths / 2, dx_array + self.box_lengths, dx_array, ) dx_array = np.where( dx_array >= self.box_lengths / 2, dx_array - self.box_lengths, dx_array, ) x_array_2 = x_array_1 + dx_array x_array_dict[node_2] = x_array_2 unwrapped_x_df = pd.DataFrame(x_array_dict).transpose().sort_index() assert np.all(unwrapped_x_df.index == self.particle_df.index) assert unwrapped_x_df.shape == (self.n_particles, self.n_dimensions) unwrapped_x_df.columns = column_names_2 self._unwrapped_x_df = unwrapped_x_df return unwrapped_x_df
[docs] def compute_distance_from_com( self, include_dx=True, include_distance=True ): """Returns dataframe of distances from the center of mass for each particle Args: include_dx (bool, optional): If True, includes `dx_from_com_x*` columns. Defaults to True. include_distance (bool, optional): If True, includes `distance_from_com` column. Defaults to True Raises: ValueError: both include_dx and include_distance are False Returns: dataframe: Index is `particle_id` (matching index of `particle_df`), columns are `distance_from_com` (Euclidean distance from center of mass), and `dx_from_com_x*` (Vector difference) where `*` represents 0, 1, ... `n_particles`. """ if include_dx is False and include_distance is False: raise ValueError( "one of include_dx or include_distance must be True" ) x_columns = [f"x{d}" for d in range(self.n_dimensions)] unwrapped_x_df = self._compute_unwrapped_x() if unwrapped_x_df is None: center_of_mass_dict = {xc: None for xc in x_columns} distance_from_com_df = pd.DataFrame(index=self.particle_df.index) if include_dx is True: for d in range(self.n_dimensions): distance_from_com_df[f"dx_from_com_x{d}"] = np.nan if include_distance is True: distance_from_com_df["distance_from_com"] = np.nan else: unwrapped_x = unwrapped_x_df.values center_of_mass_dict = self.compute_center_of_mass(wrapped=False) center_of_mass = ( pd.DataFrame([center_of_mass_dict]).filter(x_columns).values ) dx = unwrapped_x - center_of_mass if include_dx is True: arrays_dict = { f"dx_from_com_x{d}": dx[:, d] for d in range(self.n_dimensions) } else: arrays_dict = {} if include_distance is True: distances = np.linalg.norm(dx, axis=1) arrays_dict["distance_from_com"] = distances distance_from_com_df = pd.DataFrame( dict(arrays_dict), index=self.particle_df.index ) return distance_from_com_df