Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
115 changes: 69 additions & 46 deletions scripts/Diffraction/isis_powder/hrpd.py
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,7 @@ def _mask_prompt_pulses(self, ws, ispecs=None):
**mask_kwargs,
)

def _subtract_prompt_pulses(self, ws, tof_res=0.002):
def _subtract_prompt_pulses(self, ws):
"""
HRPD has a long flight path from the moderator resulting
in sharp peaks from the proton pulse that maintain their
Expand Down Expand Up @@ -235,71 +235,39 @@ def _subtract_prompt_pulses(self, ws, tof_res=0.002):
cen = npulse * PROMPT_PULSE_INTERVAL
xlo = cen - PROMPT_PULSE_LEFT_WIDTH - 10 # add extra to ensure get background on both sides of peak
xhi = cen + PROMPT_PULSE_RIGHT_WIDTH + 10
comp_func = FunctionFactory.createInitialized(
f"name=PearsonIV, Centre={8}, Intensity=1,Sigma=8.5, Exponent=1.5, Skew=-5,"
f"constraints=(0.2<Sigma,1.5<Exponent);name=FlatBackground, A0=0,constraints=(0<A0)"
)
comp_func[0].setAttributeValue("CentreShift", cen)
comp_func[0].setHeight(ws.readY(ispec)[ws.yIndexOfX(cen)])
comp_func[0].addConstraints(f"{-15}<Centre<{15}")
comp_func[0].addConstraints(f"{0}<Intensity")
comp_func[1]["A0"] = min(ws.readY(ispec)[ws.yIndexOfX(xlo)], ws.readY(ispec)[ws.yIndexOfX(xhi)])
comp_func = self._setup_single_prompt_pulse_function(ws, ispec, cen, xlo, xhi)
func.add(comp_func)
func.setDomainIndex(ipulse, ipulse)
key_suffix = f"_{ipulse}" if ipulse > 0 else ""
fit_kwargs["InputWorkspace" + key_suffix] = ws.name()
fit_kwargs["StartX" + key_suffix] = xlo
fit_kwargs["EndX" + key_suffix] = xhi
fit_kwargs["WorkspaceIndex" + key_suffix] = ispec
# work out how to exclude
exclude = []
diff_consts = si.diffractometerConstants(ispec)
for dpk in dpks_bragg:
tof_pk = UnitConversion.run("dSpacing", "TOF", dpk, 0, DeltaEModeType.Elastic, diff_consts)
tof_pk_lo = tof_pk * (1 - tof_res)
tof_pk_hi = tof_pk * (1 + tof_res)
if xlo < tof_pk_lo < xhi or xlo < tof_pk_hi < xhi or (tof_pk_lo < xlo and tof_pk_hi > xhi):
exclude.extend([tof_pk_lo, tof_pk_hi])
# exclude x-ranges where Bragg peaks overlap prompt pulse within some fractional resolution
exclude = self._get_tofs_to_exclude(si, ispec, dpks_bragg, xlo, xhi)
if exclude:
func.fixParameter(f"f{ipulse}.f1.A0") # fix constant background - Bragg peak can mess with bg
fit_kwargs["Exclude" + key_suffix] = exclude
# check that at least one fit region has no overlapping prompt pulse
if len([key for key in fit_kwargs.keys() if "Exclude" in key]) < len(npulses):
# tie peak parameters to be common for all prompt pulses
for idomain in range(len(npulses) - 1):
for param_name in ["Intensity", "Sigma", "Exponent", "Skew", "Centre"]:
func.tie(f"f{idomain}.f0.{param_name}", f"f{len(npulses) - 1}.f0.{param_name}") # tie to first
# fix some peak parameters
for param_name in ["Sigma", "Exponent", "Skew"]:
func.fixParameter(f"f{len(npulses) - 1}.f0.{param_name}")
excluded_keys = [key for key in fit_kwargs.keys() if "Exclude" in key]
if len(excluded_keys) < len(npulses):
self._add_ties_to_multidomain_prompt_pulse_func(func)
fit_output = mantid.Fit(Function=func, **fit_kwargs)
# subtract prompt pulse only from spectrum
success = fit_output.OutputStatus == "success" or "Changes in function value are too small" in fit_output.OutputStatus
if success:
is_success = fit_output.OutputStatus == "success"
is_small_change = "Changes in function value are too small" in fit_output.OutputStatus
if is_success or is_small_change:
func = fit_output.Function.function
for ipulse in range(fit_output.Function.nDomains):
func.removeTie(f"f{ipulse}.f0.Centre")
key_suffix = f"_{ipulse}" if ipulse > 0 else ""
if "Exclude" + key_suffix in fit_kwargs:
func.fixParameter(f"f{ipulse}.f0.Centre") # fix centre as can't fit independently with bragg
else:
fitted_cen = func[ipulse][0]["Centre"]
func[ipulse][0].addConstraints(
f"{fitted_cen - 5}<Centre<{fitted_cen + 5}"
) # free centre but stricter constraint
self._free_ties_of_multidomain_prompt_pulse_func(func, fit_kwargs)
fit_output_final = mantid.Fit(Function=func, **fit_kwargs)
success_final = (
fit_output_final.OutputStatus == "success"
or "Changes in function value are too small" in fit_output_final.OutputStatus
)
if success_final:
fit_output = fit_output_final
# accumulate peak functions
pk_func = FunctionWrapper(fit_output.Function.function[0][0])
for ifunc in range(1, fit_output.Function.nDomains):
pk_func = pk_func + FunctionWrapper(fit_output.Function.function[ifunc][0])
ws_eval = mantid.EvaluateFunction(InputWorkspace=ws, Function=pk_func, EnableLogging=False, StoreInADS=False)
y_nopulse = ws.readY(ispec) - ws_eval.readY(1)
func = fit_output.Function.function
y_pulses = self._eval_fitted_prompt_pulse_peaks_only(ws, func)
y_nopulse = ws.readY(ispec) - y_pulses
y_nopulse[y_nopulse < 0] = 0 # can't have negative counts
ws.setY(ispec, y_nopulse)
else:
Expand All @@ -310,6 +278,61 @@ def _subtract_prompt_pulses(self, ws, tof_res=0.002):
# Mask other banks and spectra for which fit failed
self._mask_prompt_pulses(ws, ispec_failed)

@staticmethod
def _eval_fitted_prompt_pulse_peaks_only(ws, func):
# accumulate peak functions only (first function in individual composite functions)
pk_func = FunctionWrapper(func[0][0])
for ifunc in range(1, func.nDomains()):
pk_func = pk_func + FunctionWrapper(func[ifunc][0])
ws_eval = mantid.EvaluateFunction(InputWorkspace=ws, Function=pk_func, EnableLogging=False, StoreInADS=False)
return ws_eval.readY(1)

@staticmethod
def _free_ties_of_multidomain_prompt_pulse_func(func, fit_kwargs):
for ipulse in range(func.nDomains()):
func.removeTie(f"f{ipulse}.f0.Centre")
key_suffix = f"_{ipulse}" if ipulse > 0 else ""
if "Exclude" + key_suffix in fit_kwargs:
func.fixParameter(f"f{ipulse}.f0.Centre") # fix centre as can't fit independently with bragg
else:
fitted_cen = func[ipulse][0]["Centre"]
func[ipulse][0].addConstraints(f"{fitted_cen - 5}<Centre<{fitted_cen + 5}") # free centre but stricter constraint

@staticmethod
def _add_ties_to_multidomain_prompt_pulse_func(func):
# tie peak parameters to be common for all prompt pulses (tie to last peak func - seems to give better results than first)
for idomain in range(func.nDomains() - 1):
for param_name in ["Intensity", "Sigma", "Exponent", "Skew", "Centre"]:
func.tie(f"f{idomain}.f0.{param_name}", f"f{func.nDomains() - 1}.f0.{param_name}") # tie to first
# fix some peak parameters
for param_name in ["Sigma", "Exponent", "Skew"]:
func.fixParameter(f"f{func.nDomains() - 1}.f0.{param_name}")

@staticmethod
def _get_tofs_to_exclude(specinfo, ispec, dpks_bragg, xlo, xhi, tof_res=0.002):
exclude = []
diff_consts = specinfo.diffractometerConstants(ispec)
for dpk in dpks_bragg:
tof_pk = UnitConversion.run("dSpacing", "TOF", dpk, 0, DeltaEModeType.Elastic, diff_consts)
tof_pk_lo = tof_pk * (1 - tof_res)
tof_pk_hi = tof_pk * (1 + tof_res)
if xlo < tof_pk_lo < xhi or xlo < tof_pk_hi < xhi or (tof_pk_lo < xlo and tof_pk_hi > xhi):
exclude.extend([tof_pk_lo, tof_pk_hi])
return exclude

@staticmethod
def _setup_single_prompt_pulse_function(ws, ispec, cen, xlo, xhi):
comp_func = FunctionFactory.createInitialized(
"name=PearsonIV, Centre=8, Intensity=1,Sigma=8.5, Exponent=1.5, Skew=-5,"
"constraints=(0.2<Sigma,1.5<Exponent);name=FlatBackground, A0=0,constraints=(0<A0)"
)
comp_func[0].setAttributeValue("CentreShift", cen)
comp_func[0].setHeight(ws.readY(ispec)[ws.yIndexOfX(cen)])
comp_func[0].addConstraints(f"{-15}<Centre<{15}")
comp_func[0].addConstraints(f"{0}<Intensity")
comp_func[1]["A0"] = min(ws.readY(ispec)[ws.yIndexOfX(xlo)], ws.readY(ispec)[ws.yIndexOfX(xhi)])
return comp_func

@staticmethod
def _find_bragg_peak_dspacings_and_prompt_pulses(ws, ispec_start=0, ispec_end=60, dspac_res=0.001):
"""
Expand Down
Loading