Source code for symmys.optimization

import collections
import functools

import numpy as np
import sklearn.cluster
import tensorflow as tf
import tensorflow.keras as keras
import tensorflow.keras.backend as K

from .layers import QuaternionRotation, QuaternionRotoinversion
from .losses import mean_exp_rsq
from .utils import hash_sample

[docs]class PointRotations: """Finds rotations that leave a point cloud unchanged up to a permutation. This method optimizes a set of unit quaternions to match the distribution of transformed points to the set of unrotated points. Quaternions are then clustered by their axis of rotation and merged into N-fold rotation symmetries. :param num_rotations: Number of plain rotations (and rotoinversions, if enabled) to consider :param quaternion_dim: Optimizer dimension for quaternions (higher may make optimization easier at the cost of more expensive optimization steps) :param include_inversions: If True, include rotoinversions as well as rotations :param loss: Loss function to use; see :py:mod:`symmys.losses` """ def __init__(self, num_rotations, quaternion_dim=8, include_inversions=True, loss=mean_exp_rsq): self.num_rotations = num_rotations self.quaternion_dim = quaternion_dim self.include_inversions = include_inversions self.loss = loss self._model_dict = None @property def model(self): """Return the tensorflow model that will perform rotations.""" if self._model_dict is None: self._model_dict = self.build_model() return self._model_dict['model'] @property def rotation_layer(self): """Return the tensorflow.keras layer for rotations.""" if self._model_dict is None: self._model_dict = self.build_model() return self._model_dict['rotation_layer'] @property def rotoinversion_layer(self): """Return the tensorflow.keras layer for rotoinversions.""" if self._model_dict is None: self._model_dict = self.build_model() return self._model_dict['rotoinversion_layer']
[docs] def build_model(self): """Create the tensorflow model. This method can be replaced by child classes to experiment with different network architectures. The returned result should be a dictionary containing at least: - `model`: a `tensorflow.keras.models.Model` instance that replicates a given set of input points - `rotation_layer`: a layer with a `quaternions` attribute to be read - `rotoinversion_layer` (if inversions are enabled): a layer with a `quaternions` attribute to be read """ result = {} inp = last = keras.layers.Input(shape=(3,)) result['rotation_layer'] = rot_layer = QuaternionRotation( self.num_rotations, quaternion_dim=self.quaternion_dim) if self.include_inversions: result['rotoinversion_layer'] = conj_rot_layer = QuaternionRotoinversion( self.num_rotations, quaternion_dim=self.quaternion_dim) last = tf.concat([rot_layer(last), conj_rot_layer(last)], 1) else: last = rot_layer(last) result['model'] = keras.models.Model(inp, last) return result
[docs] def fit(self, points, epochs=1024, early_stopping_steps=16, validation_split=.3, hash_sample_N=128, reference_fraction=.1, optimizer='adam', batch_size=256, valid_symmetries=12, extra_callbacks=[]): """Fit rotation quaternions and analyze the collective symmetries of a set of input points. This method builds a rotation model, fits it to the given data, and groups the found quaternions by their axis and rotation angle. After fitting, a map of symmetries will be returned: a dictionary of {N-fold: [axes]} containing all the axes about which each observed symmetry were found. :param points: Input points to analyze:: (N, 3) numpy array-like sequence :param epochs: Maximum number of epochs to train :param early_stopping_steps: Patience (in epochs) for early stopping criterion; training halts when the validation set loss does not improve for this many epochs :param validation_split: Fraction of training data to use for calculating validation loss :param hash_sample_N: Minimum number of points to use as reference data for the loss function (see :py:func:`hash_sample`) :param reference_fraction: Fraction of given input data to be hashed to form the reference data :param optimizer: Tensorflow/keras optimizer name or instance :param batch_size: Batch size for optimization :param valid_symmetries: Maximum degree of symmetry (N) that will be considered when identifying N-fold rotations :param extra_callbacks: Additional tensorflow callbacks to use during optimization """ points = np.asarray(points) N = len(points) reference_N = int(reference_fraction*N) reference, train = points[:reference_N], points[reference_N:] reference = hash_sample(reference, hash_sample_N) reduce_lr_patience = int(early_stopping_steps/3.) callbacks = [ keras.callbacks.EarlyStopping( patience=early_stopping_steps, monitor='val_loss'), keras.callbacks.ReduceLROnPlateau( patience=reduce_lr_patience, monitor='val_loss', factor=.5, verbose=False), ] + extra_callbacks try: import tensorflow_addons as tfa callbacks.append(tfa.callbacks.TQDMProgressBar( show_epoch_progress=False, update_per_second=1)) except ImportError: pass model = self.model loss = self.loss(model.output, reference) model.add_loss(loss) model.compile(optimizer, loss=None) model.fit( train, train, validation_split=validation_split, verbose=False, batch_size=batch_size, callbacks=callbacks, epochs=epochs) self.history = model.history.history if isinstance(valid_symmetries, int): valid_symmetries = range(1, valid_symmetries + 1) Ns = np.array(list(sorted(valid_symmetries))) symmetries = collections.defaultdict(list) for (symmetry, axis) in zip(*self._cluster_quaternions( self.rotation_layer.quaternions.numpy(), Ns)): if symmetry == 1: continue symmetries[symmetry].append(axis) if self.include_inversions: for (symmetry, axis) in zip(*self._cluster_quaternions( self.rotoinversion_layer.quaternions.numpy(), Ns)): symmetries[-symmetry].append(axis) self.symmetries = dict(symmetries) return self.symmetries
@staticmethod def _cluster_quaternions(quats, Ns, tolerance=.0125): axes = quats[:, 1:].copy() axes /= np.linalg.norm(axes, axis=-1, keepdims=True) filt = np.logical_and(np.isfinite(quats[:, 0]), np.all(np.isfinite(axes), axis=-1)) quats, axes = quats[filt], axes[filt] distances = 1 - np.abs(np.sum(axes[:, np.newaxis]*axes[np.newaxis], axis=-1)) distances = np.clip(distances, 0, 1) dbscan = sklearn.cluster.DBSCAN(eps=.0125, min_samples=1, metric='precomputed') cluster_indices = dbscan.fit_predict(distances) clusters = collections.defaultdict(list) for (i, theta, axis) in zip(cluster_indices, 2*np.arccos(quats[:, 0]), axes): clusters[i].append((theta, axis)) averaged_axes = [] symmetries = [] for i, axisangles in clusters.items(): ref_axis = axisangles[0][1] ref_axis *= np.sign(ref_axis[2]) thetas = [theta for (theta, _) in axisangles] axis = np.mean([ axis*np.sign(np.dot(axis, ref_axis)) for (_, axis) in axisangles], axis=0) axis /= np.linalg.norm(axis, keepdims=True) if np.any(np.logical_not(np.isfinite(axis))): axis[:] = (0, 0, 1) thetas = np.array(thetas) psis = np.exp(thetas*Ns[:, np.newaxis]*1j) psis /= Ns[:, np.newaxis] psis = 0.5*np.mean(psis + 1, axis=-1) psis = np.abs(psis) - .5 symmetry = np.argmax(psis) + 1 factors = 2*psis*Ns averaged_axes.append(axis) symmetries.append(symmetry) return symmetries, averaged_axes