diff --git a/src/cditools/motors.py b/src/cditools/motors.py index 9107727..dfee6f0 100644 --- a/src/cditools/motors.py +++ b/src/cditools/motors.py @@ -1,38 +1,51 @@ from __future__ import annotations -from typing import ClassVar +from collections.abc import Generator +from pathlib import Path +from typing import Any, ClassVar, Protocol +import bluesky.plan_stubs as bps import numpy as np +import skbeam.core.constants.xrf as xrfC +from bluesky import Msg +from bluesky.callbacks import LiveTable from ophyd import Component as Cpt # type: ignore[import-not-found] -from ophyd import Device, EpicsMotor, PseudoPositioner, PseudoSingle +from ophyd import ( + Device, + EpicsMotor, + PseudoPositioner, + PseudoSingle, + Signal, +) from ophyd import DynamicDeviceComponent as DDC from ophyd.pseudopos import ( pseudo_position_argument, real_position_argument, ) +from scipy.interpolate import make_interp_spline class EpicsMotorRO(EpicsMotor): - def __init__(self, *args, **kwargs): - super().__init__(*args, **kwargs) + def __init__(self, prefix: str, *, name: str, **kwargs: Any) -> None: + super().__init__(prefix=prefix, name=name, **kwargs) - def move(self, *args, **kwargs): # noqa: ARG002 + def move(self, *args: object, **kwargs: object): # noqa: ARG002 msg = f"{self.name} is read-only and cannot be moved." raise PermissionError(msg) - def stop(self, *args, **kwargs): # noqa: ARG002 + def stop(self, *args: object, **kwargs: object): # noqa: ARG002 msg = f"{self.name} is read-only and cannot be stopped manually." raise PermissionError(msg) - def set(self, *args, **kwargs): # noqa: ARG002 + def set(self, *args: object, **kwargs: object): # noqa: ARG002 msg = f"{self.name} is read-only and cannot be set." raise PermissionError(msg) - def set_position(self, *args, **kwargs): # noqa: ARG002 + def set_position(self, *args: object, **kwargs: object): # noqa: ARG002 msg = f"{self.name} is read-only and its position cannot be set." raise PermissionError(msg) - def _readonly_put(self, *args, **kwargs): # noqa: ARG002 + def _readonly_put(self, *args: object, **kwargs: object): # noqa: ARG002 msg = f"{self.name} is read-only and cannot write PVs." raise PermissionError(msg) @@ -77,7 +90,7 @@ class DMM(Device): class DCMBase(Device): pitch = Cpt(EpicsMotor, "Mono:HDCM-Ax:Pitch}Mtr") - fine: ClassVar[dict] = { + fine: ClassVar[dict[str, Cpt[EpicsMotor]]] = { "fpitch": Cpt(EpicsMotor, "Mono:HDCM-Ax:FP}Mtr"), "roll": Cpt(EpicsMotor, "Mono:HDCM-Ax:Roll}Mtr"), } @@ -85,33 +98,182 @@ class DCMBase(Device): v = Cpt(EpicsMotor, "Mono:HDCM-Ax:TY}Mtr") +class _RealPosWithBragg(Protocol): + bragg: float + + class Energy(PseudoPositioner): bragg = Cpt(EpicsMotor, "Mono:HDCM-Ax:Bragg}Mtr") cgap = Cpt(EpicsMotor, "Mono:HDCM-Ax:HG}Mtr") # Synthetic Axis energy = Cpt(PseudoSingle, egu="KeV") + energy_egu = Cpt(Signal, None, add_prefix=(), value="keV", kind="config") + motor_egu = Cpt(Signal, None, add_prefix=(), value="eV", kind="config") + + _u_gap_offset = 0 + # Energy "limits" - _low = 5.0 # TODO: CHECK THIS VALUE - _high = 15.0 # TODO: CHECK THIS VALUE + _low = 5.0 # TODO: CHECK THIS VALUE, SRX uses 4.4 + _high = 15.0 # TODO: CHECK THIS VALUE, SRX uses 25 # Set up constants Xoffset = 20.0 # mm d_111 = 3.1286911960950756 ANG_OVER_KEV = 12.3984 - def __init__(self, *args, **kwargs): + # Motor enable flags + move_u_gap = Cpt(Signal, None, add_prefix=(), value=True) + move_c2_x = Cpt(Signal, None, add_prefix=(), value=True) + harmonic = Cpt(Signal, None, add_prefix=(), value=0, kind="config") + selected_harmonic = Cpt(Signal, None, add_prefix=(), value=0) + + # Experimental + detune = Cpt(Signal, None, add_prefix=(), value=0) + + ele_list: ClassVar[list[str]] = [ + "Si", + "P", + "S", + "Cl", + "Ar", + "K", + "Ca", + "Sc", + "Ti", + "V", + "Cr", + "Mn", + "Fe", + "Co", + "Ni", + "Cu", + "Zn", + "Ga", + "Ge", + "As", + "Se", + "Br", + "Kr", + "Rb", + "Sr", + "Y", + "Zr", + "Nb", + "Mo", + "Tc", + "Ru", + "Rh", + "Pd", + "Ag", + "Cd", + "In", + "Sn", + "Sb", + "Te", + "I", + "Xe", + "Cs", + "Ba", + "La", + "Ce", + "Pr", + "Nd", + "Pm", + "Sm", + "Eu", + "Gd", + "Tb", + "Dy", + "Ho", + "Er", + "Tm", + "Yb", + "Lu", + "Hf", + "Ta", + "W", + "Re", + "Os", + "Ir", + "Pt", + "Au", + "Hg", + "Tl", + "Pb", + "Bi", + "Po", + "At", + "Rn", + "Fr", + "Ra", + "Ac", + "Th", + "Pa", + "U", + "Np", + "Pu", + "Am", + "Cm", + "Cf", + ] + + elements: ClassVar[dict[str, xrfC.XrfElement]] = {} + for i in ele_list: + elements[i] = xrfC.XrfElement(i) + + def __init__( + self, + *args: object, + delta_bragg: int = 0, + xoffset: int = 0, + C2Xcal: int = 0, + T2cal: int = 0, + d_111: int = 0, + **kwargs: object, + ): super().__init__(*args, **kwargs) + self._delta_bragg = delta_bragg + self._xoffset = xoffset + self._C2Xcal = C2Xcal + self._T2cal = T2cal + self._d_111 = d_111 self.energy.readback.name = "energy" self.energy.setpoint.name = "energy_setpoint" - - def energy_to_positions(self, target_energy: float): + calib_path = Path(__file__).parent + # this is temporary, we need to figure out if there is a calib file for CDI + calib_file = "../data/CDIUgapCalibration.txt" + + with Path.open(calib_path / calib_file) as f: + next(f) + uposlistIn = [] + elistIn = [] + for line in f: + num = [float(x) for x in line.split()] + # Check in case there is an extra line at the end of the calibration file + if len(num) == 2: + uposlistIn.append(num[0]) + elistIn.append(num[1]) + + self.etoulookup = make_interp_spline(elistIn, uposlistIn) + self.utoelookup = make_interp_spline(uposlistIn, elistIn) + + def energy_to_positions( + self, + target_energy: float, + undulator_harmonic: int = 0, + u_detune: float = 0.0, + ): """Compute undulator and mono positions given a target energy Parameters ---------- target_energy : float Target energy in keV + undulator_harmonic : int + The harmonic in the undulator to use + u_detune : float + Amount to 'mistune' the undulator in keV Returns ------- @@ -119,28 +281,371 @@ def energy_to_positions(self, target_energy: float): The angle to set the monocromotor in radians gap : float The gap position in millimeters + C2X : float + The C2X position in millimeters + ugap : float + The undulator gap position in microns """ # Calculate Bragg RBV - bragg = np.arcsin((self.ANG_OVER_KEV / target_energy) / (2 * self.d_111)) + bragg_RBV = ( + np.arcsin((self.ANG_OVER_KEV / target_energy) / (2 * self.d_111)) + - self._delta_bragg + ) + bragg = bragg_RBV + self._delta_bragg + T2 = self._xoffset + np.sin(bragg * np.pi / 180) / np.sin( + 2 * bragg * np.pi / 180 + ) + dT2 = T2 - self._T2cal + C2X = self._C2Xcal - dT2 # Calculate C2X - gap = self.Xoffset / 2 / np.cos(bragg) + gap = self._xoffset / 2 / np.cos(bragg) + ugap = float(self.etoulookup((target_energy + u_detune) / undulator_harmonic)) + ugap *= 1000 + ugap = ugap + self._u_gap_offset - return bragg, gap + return bragg, gap, C2X, ugap @pseudo_position_argument - def forward(self, p_pos): - energy = p_pos.energy # energy assumed in keV - bragg, gap = self.energy_to_positions(energy) - return self.RealPosition(bragg=np.rad2deg(bragg), cgap=gap) + def forward(self, pseudo_pos: object): + energy = pseudo_pos.energy # energy assumed in keV + bragg, _, _, _ = self.energy_to_positions(energy) + harmonic_raw = self.harmonic.get() + if not isinstance(harmonic_raw, (int, float)): + msg = "Harmonic value is not set" + raise RuntimeError(msg) + harmonic = int(harmonic_raw) + if harmonic < 0 or ((harmonic % 2) == 0 and harmonic != 0): + msg = f"The harmonic must be 0 or odd and positive, you set {harmonic}. Set `energy.harmonic` to a positive odd integer or 0." + raise RuntimeError(msg) + detune_raw = self.detune.get() + if not isinstance(detune_raw, (int, float)): + msg = "Detune value is not set" + raise RuntimeError(msg) + detune = float(detune_raw) + if energy <= self._low: + msg = f"The energy you entered is too low ({energy} keV). " + msg += f"Minimum energy = {self._low:.1f} keV" + raise ValueError(msg) + if energy > self._high: + if (energy < self._low * 1000) or (energy > self._high * 1000): + # Energy is invalid + msg = f"The requested photon energy is invalid ({energy} keV). " + msg += f"Values must be in the range of {self._low:.1f} - {self._high:.1f} keV" + raise ValueError(msg) + # Energy is in eV + energy = energy / 1000.0 + + if harmonic < 3: + harmonic = 3 + # Choose the right harmonic + _, _, _, ugapcal = self.energy_to_positions(energy, harmonic, detune) + # Try higher harmonics until the required gap is too small + while True: + _, _, _, ugapcal = self.energy_to_positions( + energy, harmonic + 2, detune + ) + if ugapcal < self.u_gap.low_limit: + break + harmonic += 2 + + self.selected_harmonic.put(harmonic) + + # Compute where we would move everything to in a perfect world + bragg, _, c2_x, u_gap = self.energy_to_positions(energy, harmonic, detune) + + # Sometimes move the crystal gap + if not self.move_c2_x.get(): + c2_x = self.c2_x.position + + # Sometimes move the undulator + if not self.move_u_gap.get(): + u_gap = self.u_gap.position + + return self.RealPosition(bragg=np.rad2deg(bragg), c2_x=c2_x, cgap=u_gap) @real_position_argument - def inverse(self, r_pos): - bragg = np.deg2rad(r_pos.bragg) + def inverse(self, real_pos: _RealPosWithBragg): + bragg = np.deg2rad(real_pos.bragg) e = self.ANG_OVER_KEV / (2 * self.d_111 * np.sin(bragg)) return self.PseudoPosition(energy=float(e)) + @pseudo_position_argument + def set(self, position: list[int | float]): + return super().set([float(_) for _ in position]) + + def sync_with_epics(self): + self.epics_d_spacing.put(self._d_111) + self.epics_bragg_offset.put(self._delta_bragg) + + def retune_undulator(self): + self.detune.put(0.0) + self.move(self.engergy.get()[0]) + + def banner(self, str_list: list[str] | str, border: str = "-"): + if not isinstance(str_list, list): + str_list = [str_list] + + num = 2 + for str_val in str_list: + num = max(len(str_val), num) + + print(border * (num + 2)) # noqa: T201 + for str_val in str_list: + print(f" {str_val}") # noqa: T201 + print(border * (num + 2), end="\n\n") # noqa: T201 + + def peakup( + self, + detectors: list[Any] | Any, + start: float | None = None, + min_step: float = 0.005, + max_step: float = 0.5, + *, + motor: Any = None, + target_fields: list[str] = ["bpm_current", "bpm_sum"], # noqa: B006 + MAX_ITER: int = 100, + verbose: bool = False, + ) -> Generator[Msg, Any, None]: + if motor is None: + msg = "peakup requires a motor to move. Please provide a motor to optimize." + raise ValueError(msg) + + detector_list = ( + list(detectors) if isinstance(detectors, list | tuple) else [detectors] + ) + if not detector_list: + msg = "peakup requires at least one detector. Please provide a detector or list of detectors to optimize on." + raise ValueError(msg) + + if verbose: + print("Additional debugging is enabled.") # noqa: T201 + + if not 0 < min_step < max_step: + msg = f"Invalid step sizes: min_step={min_step}, max_step={max_step}. " + raise ValueError(msg) + + # Grab starting point + if start is None: + start = motor.readback.get() + if verbose: + print(f"Starting position: {start:.4}") # noqa: T201 + + # Check foils + if "bpm_current" in target_fields: + E = self.energy.energy.readback.get() # keV + bpm = detector_list[ + 0 + ] # assume bpm is the first detector, need to figure out how to identify which detector is the bpm if there are multiple detectors + y = bpm.y.user_readback.get() # Cu: y=0, Ti: y=25 + if np.abs(y - 25) < 5: + foil = "Ti" + elif np.abs(y) < 5: + foil = "Cu" + else: + foil = "" + self.banner("Unknown foil! Continuing without check!") + + if verbose: + print(f"Energy: {E:.4}") # noqa: T201 + print(f"Foil:\n {y=:.4}\n {foil=}") # noqa: T201 + + threshold = 8.979 # DEFAULT? PASS AS ARG? + if (threshold < E and foil == "Ti") or (threshold > E and foil == "Cu"): + self.banner( + "WARNING! BPM4 foil is not optimized for the incident energy." + ) + + # Visualization + livecb = [] + if verbose is False: + livecb.append(LiveTable([motor.readback.name, *target_fields])) + + # Optimize on a given detector + def optimize_on_det(target_field: str, x0: float) -> Generator[Msg, Any, float]: + past_pos = x0 + next_pos = x0 + step = max_step + past_I = None + cur_I = 0 + cur_det = {} + + for _ in range(MAX_ITER): + yield Msg("checkpoint") + if verbose: + print(f"Moving {motor.name} to {next_pos:.4f}") # noqa: T201 + yield from bps.mv(motor, next_pos) + yield from bps.sleep(0.500) + yield Msg("create", None, name="primary") + for det in detector_list: + yield Msg("trigger", det, group="B") + yield Msg("trigger", motor, group="B") + yield Msg("wait", None, "B") + for det in [*detector_list, motor]: + cur_det = yield Msg("read", det) + if target_field in cur_det: + cur_I = cur_det[target_field]["value"] + if verbose: + print(f"New measurement on {target_field}: {cur_I:.4}") # noqa: T201 + yield Msg("save") + # special case first first loop + if past_I is None: + past_I = cur_I + next_pos += step + if verbose: + print("past_I is None. Continuing...") # noqa: T201 + + dI = cur_I - past_I + if verbose: + print(f"{dI=:.4f}") # noqa: T201 + if dI < 0: + step = -0.6 * step + else: + past_pos = next_pos + past_I = cur_I + next_pos = past_pos + step + if verbose: + print(f"{next_pos=:.4f}") # noqa: T201 + + # Maximum found + if np.abs(step) < min_step: + if verbose: + print( # noqa: T201 + f"Maximum found for {target_field} at {x0:.4f}!\n {step=:.4f}" + ) + return next_pos + else: + msg = "Optimization did not converge!" + raise Exception(msg) + + # Start optimizing based on each detector field + x0: float = start if start is not None else 0.0 + for target_field in target_fields: + if verbose: + print(f"Optimizing on detector {target_field}") # noqa: T201 + x0 = yield from optimize_on_det(target_field, x0) + return x0 + + def set_roi( + self, roinum: int, element: Any, line: str | None = None, det: Any | None = None + ) -> None: + cur_element = xrfC.XrfElement(element) + e = "" + if line is None: + for e in ["ka1", "la1", "ma1"]: + if cur_element.emission_line[e] < self.energy.energy.readback.get(): + break + elif line.lower() not in [ + "ka1", + "ka2", + "kb1", + "la1", + "la2", + "lb1", + "lb2", + "lg1", + "ma1", + ]: + print(f"WARNING: line {line} not in allowed lines.") # noqa: T201 + print("Finding most appropriate line for the current energy.") # noqa: T201 + line = None + else: + e = line.lower() + + # this works for the xspress3, need to figure out how to generalize this for other detectors + channels = [] + e_ch = int(cur_element.emission_line[e] * 1000) + if det is not None: + # need to figure this out + # SRX profile 48 line 76 + channels = det.channels.get() + + for channel in channels: + mcaroi = channel.get_mcaroi(mcaroi_number=roinum) + # TODO: add eV-to-bin conversion to xspress3 class + mcaroi.configure_mcaroi( + min_x=(e_ch - 100) / 10, size_x=200 / 10, roi_name=f"{element}_{e}" + ) + mcaroi.kind = "hinted" + + def getemissionE(self, element: str, edge: str = "") -> float | None: + cur_element = xrfC.XrfElement(element) + if edge == "": + print("Edge\tEnergy [keV]") # noqa: T201 + for e in ["ka1", "ka2", "kb1", "la1", "la2", "lb1", "lb2", "lg1", "ma1"]: + if ( + cur_element.emission_line[e] < 25.0 + and cur_element.emission_line[e] > 1.0 + ): + # print("{0:s}\t{1:8.2f}".format(e, cur_element.emission_line[e])) + print(f"{e}\t{cur_element.emission_line[e]:8.2f}") # noqa: T201 + return 0.0 + return float(np.round(cur_element.emission_line[edge], 3)) + + def getbindingE(self, element: str, edge: str | None = None) -> float | None: + """ + Return edge energy in eV if edge is specified, + otherwise return K and L edge energies and yields + element element symbol for target + edge ['k','l1','l2','l3'] return binding energy of this edge + """ + if edge is None: + y = [0.0, "k"] + print("Edge\tEnergy [eV]\tYield") # noqa: T201 + for i in ["k", "l1", "l2", "l3"]: + print( # noqa: T201 + f"{i}\t" + f"{xrfC.XrayLibWrap(self.elements[element].Z, 'binding_e')[i] * 1000.0:8.2f}\t" + f"{xrfC.XrayLibWrap(self.elements[element].Z, 'yield')[i]:5.3f}" + ) + if ( + y[0] < xrfC.XrayLibWrap(self.elements[element].Z, "yield")[i] + and xrfC.XrayLibWrap(self.elements[element].Z, "binding_e")[i] + < 25.0 + ): + y[0] = xrfC.XrayLibWrap(self.elements[element].Z, "yield")[i] + y[1] = i + return np.round( + xrfC.XrayLibWrap(self.elements[element].Z, "binding_e")[y[1]] * 1000.0, + 3, + ) + return np.round( + xrfC.XrayLibWrap(self.elements[element].Z, "binding_e")[edge] * 1000.0, 3 + ) + + def mono_peakup( + self, + element: str, + peakup: bool = True, + detectors: list[Device] | None = None, + motors: list[Device] | None = None, + start: None = None, + targets: list[str] | None = None, + ) -> Generator[Msg, None, None]: + """ + First draft of the mono peakup scan + Need more info about the axis to be scanned, the move ID, and which detector will be used for feedback. + Args: + element (string): element name + acquisition_time (float, optional): _description_. Defaults to 1.0. + peakup (bool, optional): _description_. Defaults to True. + """ + self.getemissionE(element) + energy_x = self.getbindingE(element) + + yield from bps.mov(self.energy, energy_x) + self.setroi(1, element) + if peakup: + yield from bps.sleep(5) + yield from self.peakup( + detectors=detectors, + motor=motors, + start=start, + target_fields=targets, + verbose=True, + ) + class VPM(Device): fs = DDC(