Source code for laue_dials.command_line.sequence_to_stills

"""
Split a sequence into a set of stills.

Example:

  laue.sequence_to_stills monochromatic.expt monochromatic.refl
"""

import logging
import sys
import time

import numpy as np
from dials.array_family import flex
from dials.array_family.flex import reflection_table
from dials.util import show_mail_handle_errors
from dials.util.options import (ArgumentParser,
                                reflections_and_experiments_from_files)
from dxtbx.model import MosaicCrystalSauter2014
from dxtbx.model.experiment_list import Experiment, ExperimentList
from libtbx.phil import parse
from scitbx import matrix
from tqdm import trange

from laue_dials.utils.version import laue_version

# Print laue-dials + DIALS versions
laue_version()

logger = logging.getLogger("laue.command_line.sequence_to_stills")

# The phil scope
phil_scope = parse(
    """
output {
  experiments = stills.expt
    .type = str
    .help = "Filename for the experimental models that have been converted to stills"
  reflections = stills.refl
    .type = str
    .help = "Filename for the reflection tables with split shoeboxes (3D to 2D)"
  log = laue.sequence_to_stills.log
    .type = str
    .help = "Filename for the reflection tables with split shoeboxes (3D to 2D)"
  domain_size_ang = None
    .type = float
    .help = "Override for domain size. If None, use the crystal's domain size, if"
            "available"
  half_mosaicity_deg = None
    .type = float
    .help = "Override for mosaic angle. If None, use the crystal's mosaic angle, if"
            "available"
}
max_scan_points = None
  .type = int
  .expert_level = 2
  .help = Limit number of scan points
"""
)


[docs] def sequence_to_stills(experiments, reflections, params): """ Split a sequence into a set of stills. This function takes a sequence of experiments and corresponding reflections and splits it into a set of stills. Each still is associated with a specific scan point within the original sequence. The resulting experiments and reflection tables for the stills are returned. Args: experiments (List[dxtbx.model.experiment_list]): The list of experiments representing the sequence. reflections (List[dials.array_family.flex.reflection_table]): The list of reflection tables corresponding to the experiments. params (libtbx.phil.scope_extract): Program parameters. Returns: Tuple[List[dxtbx.model.experiment_list.Experiment], List[dials.array_family.flex.reflection_table]]: A tuple containing the following: - A list of Experiment objects representing the split stills. - A list of reflection_table objects containing the split reflections for each still. """ assert len(reflections) == 1 reflections = reflections[0] new_experiments = [] new_reflections = [] id_counter = 0 print("Building experiment lists.") for expt_id, experiment in enumerate(experiments): # Generate crystals and imagesets for each scan point on first loop for i_scan_point in range(*experiment.scan.get_array_range()): # Get the goniometer setting matrix goniometer_setting_matrix = matrix.sqr( experiment.goniometer.get_setting_rotation() ) goniometer_axis = matrix.col(experiment.goniometer.get_rotation_axis()) step = experiment.scan.get_oscillation()[1] # The A matrix is the goniometer setting matrix for this scan point # times the scan varying A matrix at this scan point. Note, the # goniometer setting matrix for scan point zero will be the identity # matrix and represents the beginning of the oscillation. # For stills, the A matrix needs to be positioned in the midpoint of an # oscillation step. Hence, here the goniometer setting matrixis rotated # by a further half oscillation step. A = ( goniometer_axis.axis_and_angle_as_r3_rotation_matrix( angle=experiment.scan.get_angle_from_array_index(i_scan_point) + (step / 2), deg=True, ) * goniometer_setting_matrix * matrix.sqr(experiment.crystal.get_A()) ) crystal = MosaicCrystalSauter2014(experiment.crystal) crystal.set_A(A) # Copy in mosaic parameters if available if params.output.domain_size_ang is None and hasattr( experiment.crystal, "get_domain_size_ang" ): crystal.set_domain_size_ang(experiment.crystal.get_domain_size_ang()) elif params.output.domain_size_ang is not None: crystal.set_domain_size_ang(params.output.domain_size_ang) if params.output.half_mosaicity_deg is None and hasattr( experiment.crystal, "get_half_mosaicity_deg" ): crystal.set_half_mosaicity_deg( experiment.crystal.get_half_mosaicity_deg() ) elif params.output.half_mosaicity_deg is not None: crystal.set_half_mosaicity_deg(params.output.half_mosaicity_deg) # Append experiment new_experiment = Experiment( identifier=str(id_counter), detector=experiment.detector, beam=experiment.beam, crystal=crystal, imageset=experiment.imageset.as_imageset()[ i_scan_point : i_scan_point + 1 ], ) new_experiments.append(new_experiment) # Increment ID for experiment id_counter = id_counter + 1 # ----------------EXPERIMENTS CREATED--------------------------------- print("Building reflection table.") id_counter = 0 for expt_id, experiment in enumerate(experiments): # Get subset of reflections for this pass pass_refls = reflections.select(reflections["imageset_id"] == expt_id) for i_scan_point in range(*experiment.scan.get_array_range()): # Get subset of reflections on this image _, _, _, _, z1, z2 = pass_refls["bbox"].parts() subrefls = pass_refls.select((i_scan_point >= z1) & (i_scan_point < z2)) new_refls = subrefls.copy() centroids = np.asarray(subrefls["xyzobs.px.value"] - [0.0, 0.0, 0.5]) centroids[:, 2] = i_scan_point new_refls["xyzobs.px.value"] = flex.vec3_double(centroids) new_refls["imageset_id"] = flex.int(len(new_refls), i_scan_point) x, y, _ = subrefls["xyzobs.mm.value"].parts() new_refls["xyzobs.mm.value"] = flex.vec3_double( x, y, flex.double(len(new_refls), 0) ) new_refls["id"] = flex.int(len(new_refls), id_counter) id_counter = id_counter + 1 new_reflections.append(new_refls) return (new_experiments, new_reflections)
[docs] @show_mail_handle_errors() def run(args=None, phil=phil_scope): """ Validate the arguments and load experiments/reflections for sequence_to_stills. Args: args (list): Command-line arguments. phil: Working phil scope. Returns: None """ # The script usage usage = "usage: laue.sequence_to_stills [options] [param.phil] models.expt reflections.refl" # Create the parser parser = ArgumentParser( usage=usage, phil=phil, read_experiments=True, read_reflections=True, check_format=False, epilog=__doc__, ) params, options = parser.parse_args(args=args, show_diff_phil=True) # Configure logging console = logging.StreamHandler(sys.stdout) fh = logging.FileHandler(params.output.log, mode="w", encoding="utf-8") loglevel = logging.INFO logger.addHandler(fh) logger.addHandler(console) logging.captureWarnings(True) warning_logger = logging.getLogger("py.warnings") warning_logger.addHandler(fh) warning_logger.addHandler(console) dials_logger = logging.getLogger("dials") dials_logger.addHandler(fh) dials_logger.addHandler(console) dxtbx_logger = logging.getLogger("dxtbx") dxtbx_logger.addHandler(fh) dxtbx_logger.addHandler(console) xfel_logger = logging.getLogger("xfel") xfel_logger.addHandler(fh) xfel_logger.addHandler(console) logger.setLevel(loglevel) dials_logger.setLevel(loglevel) dxtbx_logger.setLevel(loglevel) xfel_logger.setLevel(loglevel) fh.setLevel(loglevel) # Try to load the models and data if not params.input.experiments or not params.input.reflections: parser.print_help() return reflections, experiments = reflections_and_experiments_from_files( params.input.reflections, params.input.experiments ) # Get initial time for process start_time = time.time() (new_experiments, new_reflections) = sequence_to_stills( experiments, reflections, params ) # Write out the output experiments, reflections logger.info("Writing output data.") total_reflections = reflection_table() for i in trange(len(new_experiments)): total_reflections.extend(new_reflections[i]) # Final logs logger.info("") logger.info( "Time Taken to Split into Stills = %f seconds", time.time() - start_time ) ExperimentList(new_experiments).as_file(params.output.experiments) total_reflections.as_file(params.output.reflections)
if __name__ == "__main__": run()