Source code for pbcluster.trajectory

# -*- coding: utf-8 -*-
#
# trajectory.py

"""Trajectory module."""

import networkx as nx
import numpy as np
import pandas as pd

from .cluster import Cluster
from .utils import get_graph_from_particle_positions


[docs]class Trajectory: """Object to store and compute data about particle clusters in sequential timesteps Args: trajectory_data (dataframe or ndarray): Dataframe or ndarray containing trajectory data. If dataframe, it must contain columns `particle_id` and `x0`, `x1`, ... `xN`. If there are multiple timesteps, a `timestep` column must be included. If there are multiple particle types, a `particle_type` column must be included. If ndarray, its dimensions must be (n_timesteps, n_particles, n_dimensions), and it will be assumed that all particles are of the same type. box_lengths (float or ndarray): If float, the length of each side of a cubic box. If ndarray, it must contain `n_dimensions` values representing the lengths of each dimension of a rectangular box. cutoff_distance (float): Maximum distance two particles can be from each other to be considered part of the same cluster Attributes: trajectory_df (dataframe): Dataframe containing trajectory data with columns `timestep`, `particle_id`, `particle_type`, `x0`, x1`, ... `xN` n_dimensions (int): Number of dimensions in the system box_lengths (ndarray): Length `n_dimensions` array representing the lengths of each dimension of a rectangular box. cutoff_distance (float): Maximum distance two particles can be from each other to be considered part of the same cluster cluster_dict (dict): `timestep → cluster_list` key-value pairs """ def __init__(self, trajectory_data, box_lengths, cutoff_distance): if isinstance(trajectory_data, pd.DataFrame): self.trajectory_df = trajectory_data.copy() elif isinstance(trajectory_data, np.ndarray): self.trajectory_df = self._convert_ndarray_to_df(trajectory_data) self.trajectory_df = self._verify_dataframe(self.trajectory_df) self.n_dimensions = self._get_n_dimensions(self.trajectory_df) self.box_lengths = self._verify_box_lengths(box_lengths) self.cutoff_distance = float(cutoff_distance) self.cluster_dict = None self.cluster_dict = self._get_cluster_dict() def _convert_ndarray_to_df(self, trajectory_array): """Convert numpy array with trajectory information into a dataframe Args: trajectory_array (ndarray): 2D or 3D array of shape (n_particles, n_dimensions) if 2D, or (n_timesteps, n_particles, n_dimensions) if 3D. If 2D, it will be assumed that it represents a single timestep. Raises: ValueError: If trajectory_array isn't a 2D or 3D ndarray Returns: dataframe: Each row represents a single particle at a particular timestep. Columns are `timestep`, `particle_id`, `x0`, `x1`, ... `xN`. """ if not isinstance(trajectory_array, np.ndarray): raise ValueError("trajectory_array must be a numpy ndarray!") array_dim = trajectory_array.ndim if array_dim < 2 or 3 < array_dim: raise ValueError("trajectory_array must have 2 or 3 dimensions!") elif array_dim == 2: # Reshape array with dimensions (n_particles, n_dimensions) into # array with dimensions (n_timesteps, n_particles, n_dimensions), # where n_timesteps = 1 trajectory_array = trajectory_array[None, :, :] n_timesteps, n_particles, n_dimensions = trajectory_array.shape trajectory_array = trajectory_array.reshape( (n_timesteps * n_particles, n_dimensions) ) x_columns = ["x{}".format(i) for i in range(n_dimensions)] trajectory_df = ( pd.DataFrame(trajectory_array, columns=x_columns) .assign( timestep=np.arange(n_timesteps * n_particles) // n_particles ) .assign( particle_id=np.arange(n_timesteps * n_particles) % n_particles ) ) # `particle_type` column is not created because all particles are # assumed to have the same type if an ndarray is passed, and that column # is created in _verify_dataframe anyway return trajectory_df def _verify_dataframe(self, trajectory_df): """Perform quality check on input array and return array with desired specifications Args: trajectory_data (dataframe): Dataframe containing trajectory data. It must contain at least column `particle_id` and columns `x0`, `x1`, ... `xN`. If there are multiple timesteps, a `timestep` column must be included. If there are multiple particle types, a `particle_type` column must be included. Raises: ValueError: trajectory_df does not meet specifications Returns: dataframe: Contains columns in order `timestep`, `particle_id`, `particle_type`, `x0`, `x1`, ... `xN` """ x_column_names = self._get_x_column_names(trajectory_df) if "particle_id" not in trajectory_df.columns: raise ValueError("particle_id column does not exist!") if "timestep" not in trajectory_df.columns: # Assume we only have 1 timestep, meaning there should be no # duplicate `particle_id`s pids = trajectory_df["particle_id"] # import pdb; pdb.set_trace(); if len(pids) != len(pids.unique()): raise ValueError( "Duplicate particle_ids exist with no timestep column!" ) trajectory_df["timestep"] = 0 if "particle_type" not in trajectory_df.columns: # Assume all particles have the same type if type is not provided trajectory_df["particle_type"] = 0 for _, group in trajectory_df.groupby("timestep"): assert np.all(group["particle_id"] == np.arange(len(group))) # Arrange columns in desired order trajectory_df = trajectory_df[ ["timestep", "particle_id", "particle_type"] + x_column_names ] return trajectory_df def _get_x_column_names(self, trajectory_df): """Return a list of column names containing particle position info Args: trajectory_df (dataframe): Dataframe of trajectory data Raises: ValueError: If columns starting with `x` aren't just `x0`, `x1`, ... `xN`, or `x0` doesn't exist. Returns: list: list of all column names with particle position data """ x_column_names = [ c for c in trajectory_df.columns if c.startswith("x") ] x_column_names = sorted(x_column_names, key=lambda x: int(x[1:])) ints_from_column_names = [int(x[1:]) for x in x_column_names] expected_ints = list(range(len(x_column_names))) if ints_from_column_names != expected_ints: raise ValueError( "Expected columns x0, x1, ... xN and no other " "columns beginning with 'x'" ) return x_column_names def _get_n_dimensions(self, trajectory_df): """Return the number of dimensions in this trajectory Args: trajectory_df (dataframe): Dataframe containing trajectory information Raises: ValueError: If no `x0` column exists Returns: int: Number of dimensions """ x_columns = self._get_x_column_names(trajectory_df) n_dimensions = len(x_columns) if n_dimensions == 0: raise ValueError("At least one dimension (x0 column) is required!") return n_dimensions def _verify_box_lengths(self, box_lengths): """Verifies a proper box_lengths input Args: box_lengths (float or ndarray): If float, the length of each side of a cubic box. If ndarray, it must contain `n_dimensions` values representing the lengths of each dimension of a rectangular box. Raises: TypeError: If the input is not a float or ndarray ValueError: If the input has the wrong dimensions or non-positive numbers Returns: ndarray: `n_dimensions` length array of simulation box lengths """ if not isinstance(box_lengths, np.ndarray): try: # Assume it's the length of each side of a cube if it isn't a # numpy array box_lengths = float(box_lengths) box_lengths = box_lengths * np.ones(self.n_dimensions) except (TypeError, ValueError): raise TypeError("box_lengths must be a float or numpy array!") if box_lengths.ndim > 1 or len(box_lengths) != self.n_dimensions: raise ValueError( f"box_lengths must have length {self.n_dimensions}!" ) if np.any(box_lengths <= 0): raise ValueError(f"box_lengths must be greater than 0!") return box_lengths def _get_one_timestep_cluster_list(self, timestep_df): """Returns list of Cluster objects for one timestep Args: timestep_df (dataframe): This should be the rows of `trajectory_df` corresponding to one timestep Returns: list: List of Cluster objects from the current timestep """ particle_positions_df = timestep_df.set_index( "particle_id", drop=True ).filter(regex="x.*") full_graph = get_graph_from_particle_positions( particle_positions_df, self.box_lengths, self.cutoff_distance ) cluster_graphs = [ full_graph.subgraph(c).copy() for c in nx.connected_components(full_graph) ] cluster_list = [] for cluster_graph in cluster_graphs: particle_id_list = sorted(list(cluster_graph.nodes)) cluster_particle_df = particle_positions_df.query( "particle_id in @particle_id_list" ) cluster = Cluster( cluster_graph, cluster_particle_df, self.box_lengths, self.cutoff_distance, ) cluster_list.append(cluster) return cluster_list def _get_cluster_dict(self, verbosity=0): """Returns dictionary of one cluster list for each timestep. Args: verbosity (int, optional): A value greater than 0 will print some output to show which timesteps have been computed. Defaults to 0. Returns: dict: `timestep` → `cluster_list` as key-value pairs """ if self.cluster_dict is not None: return self.cluster_dict cluster_dict = dict() for timestep, ts_group in self.trajectory_df.groupby("timestep"): if verbosity > 0: print("Timestep:", timestep) cluster_list = self._get_one_timestep_cluster_list(ts_group) cluster_dict[timestep] = cluster_list return cluster_dict
[docs] def compute_cluster_properties( self, properties=["n_particles"], verbosity=0 ): """Returns dataframe of properties for each cluster in each timestep. Args: properties (list, optional): List of properties to computer for each cluster. Defaults to ["n_particles"]. verbosity (int, optional): A value greater than 0 will print some output to show which timesteps have been computed. Defaults to 0. Returns: dataframe: columns of `timestep`, `cluster_id`, and columns corresponding to properties in `properties` list. """ properties_dict_list = list() for timestep, cluster_list in self.cluster_dict.items(): if verbosity > 0: print("Timestep:", timestep) for i, cluster in enumerate(cluster_list): properties_dict = cluster.compute_cluster_properties( properties ) properties_dict["timestep"] = timestep properties_dict["cluster_id"] = i properties_dict_list.append(properties_dict) properties_df = pd.DataFrame(properties_dict_list) return properties_df
[docs] def compute_particle_properties( self, properties=["coordination_number"], verbosity=0 ): """Returns dataframe of properties for each particle in each timestep. Args: properties (list, optional): List of properties to computer for each particle. Defaults to ["coordination_number"]. verbosity (int, optional): A value greater than 0 will print some output to show which timesteps have been computed. Defaults to 0. Returns: dataframe: columns of `timestep`, `cluster_id`, `particle_id`, and columns corresponding to properties in `properties` list. """ properties_df_list = list() for timestep, cluster_list in self.cluster_dict.items(): for i, cluster in enumerate(cluster_list): properties_df = ( cluster.compute_particle_properties(properties) .reset_index() # Move particle_id column out of index .assign(timestep=timestep) .assign(cluster_id=i) ) properties_df_list.append(properties_df) properties_df = pd.concat(properties_df_list, ignore_index=True) return properties_df
[docs] def compute_bonds(self): """Returns dataframe of bonds for the whole trajectory. Returns: dataframe: Shape `(n_bonds, 4)`. Column names `particle_id_1`, `particle_id_2`, `timestep`, and `cluster_id`. """ bonds_df_list = list() for timestep, cluster_list in self.cluster_dict.items(): for i, cluster in enumerate(cluster_list): bonds_df = ( cluster.compute_bonds() .assign(timestep=timestep) .assign(cluster_id=i) ) bonds_df_list.append(bonds_df) bonds_df = pd.concat(bonds_df_list, axis=0, ignore_index=True) return bonds_df
[docs] def compute_bond_durations(self): """Returns dataframe with duration of bond for every distinct bonding event. If particles 3 and 5 bond, then unbond, then bond again, that is 2 distinct bonding events. Returns: dataframe: Shape `(n_bond_events, 5)`. Columns `particle_id_1`, `particle_id_2`, `start`, `duration`, `bonded_at_end` """ bonds_df = self.compute_bonds() def get_bond_start_and_duration(df, final_timestep, delta_timestep=1): df = df.copy() timesteps = df["timestep"].values dt = timesteps[1:] - timesteps[:-1] is_start = np.zeros(len(df)) is_start[0] = 1 is_start[1:] = dt > delta_timestep start_inds, = np.where(is_start == 1) start_inds = np.concatenate((start_inds, [len(is_start)])) durations = (start_inds[1:] - start_inds[:-1]) * delta_timestep start_and_duration_df = pd.DataFrame( dict( start=timesteps[start_inds[:-1]], duration=durations, bonded_at_end=False, ) ) if timesteps[-1] == final_timestep: start_and_duration_df.loc[ len(start_and_duration_df) - 1, "bonded_at_end" ] = True return start_and_duration_df bond_durations_df = ( bonds_df.sort_values(["particle_id_1", "particle_id_2"]) .groupby(["particle_id_1", "particle_id_2"]) .apply( get_bond_start_and_duration, final_timestep=bonds_df["timestep"].max(), ) .reset_index() .drop("level_2", axis=1) ) return bond_durations_df