Source code for vibe.skim_production_modes.slme.fei_comparison

import basf2
import modularAnalysis as ma

from typing import List
from variables import variables as vm
import pandas as pd

from vibe.core.utils.misc import fancy_validation_mode_header
from vibe.core.validation_mode import ValidationModeBaseClass
from vibe.core.helper.skims.skim_attribute_tools import SkimAttributes, AnalysisParameters
from vibe.core.helper.histogram_tools import HistVariable, Histogram, HistComponent
from vibe.core.helper.root_helper import makeROOTCompatible


__all__ = [
    "fei_compare",
]

[docs] @fancy_validation_mode_header class fei_compare(ValidationModeBaseClass): name = "fei_compare"
[docs] def get_skim_attributes(self): this_skim = SkimAttributes( # [REQUIRED] List of reconstruction types to use on a given lpn skim_name = ['feiSL', 'feiHadronic'], # [OPTIONAL] Globaltag globaltag = 'analysis_tools_light-2305-korat', # [OPTIONAL] When True set the number of events to be reconstructed to 100 (Decrease runtime for testing) grid_test_mode = False, # [OPTIONAL] If not None, full pipeline will be engaged and the AnalysisParameters is the space for the user to configure the analysis. What this allows the users # to do is add globaltags and kwargs to the dataset_dict inside of skim_prodcution_mode.json that is automatically produced and updated during runtime skim_to_analysis_pipeline = AnalysisParameters( kwargs= { "feiSL" : {"treename" : "B0:feiSL"}, "feiHadronic" : {"treename" : "B0:feiHadronic"}} ) ) return this_skim
# Returning an empty path as b2luigi will be submitting a custom steering script
[docs] def create_basf2_path(self, treename : str): main_path = basf2.Path() ma.matchMCTruth( treename, path=main_path, ) # select signal + build ROE ma.buildRestOfEvent(treename, path=main_path) # define ROE mask cuts tracksCut = "abs(d0) < 10.0 and abs(z0) < 20.0" clustersCut1 = "[clusterE1E9 > 0.4 or E > 0.075] and [[E > 0.062 and abs(clusterTiming)<18 and clusterReg==1] or " clustersCut2 = ( "[E > 0.060 and abs(clusterTiming)<20 and clusterReg==2]" " or [E > 0.056 and abs(clusterTiming)<44 and clusterReg==3]]" ) clustersCut = clustersCut1 + clustersCut2 # define and append ROE masks m1 = ("m1", tracksCut, clustersCut) ma.appendROEMasks(treename, [m1], path=main_path) ma.rankByHighest( treename, "extraInfo(sigProb)", #allowMultiRank=True, outputVariable="sigProbRank", path=main_path, ) ma.buildEventShape(inputListNames=[treename], path=main_path) ma.buildEventKinematics(inputListNames=[treename], path=main_path) vm.addAlias('sigProbRank', 'extraInfo(sigProbRank)') vm.addAlias('cosThetaBY', 'cosThetaBetweenParticleAndNominalB') vm.addAlias(f"dmID", "extraInfo(decayModeID)") self.variables_to_validation_ntuple( decay_str=treename, variables=list( { "cosThetaBY", "isSignalAcceptMissingNeutrino", "sigProbRank", f"dmID", "Mbc", "foxWolframR2" } ), path=main_path, ) return main_path
@property def analysis_validation_histograms(self) -> List[Histogram]: return [ Histogram( name='cosThetaBY_allVSrank1', title=r"cos$\theta_{BY}$: rank 1 vs rest", hist_variable=HistVariable( df_label=makeROOTCompatible(variable="cosThetaBY"), label=r"cos$\theta_{BY}$", unit=r"", bins=50, scope=(-5, 5), ), hist_components = [ HistComponent( label='all ranks' ), HistComponent( label="rank1 B's", additional_cut_str= "sigProbRank==1", ), ] ), Histogram( name='Mbc', title=r"$B^0$ Mbc", hist_variable=HistVariable( df_label=makeROOTCompatible(variable="Mbc"), label=r"Mbc", unit=r"GeV/c$^2$", bins=50, scope=(5, 5.3), ), hist_components= [HistComponent( label="Mbc" ) ] ), Histogram( name='dmID', title=r"$B^0$ dmID", hist_variable=HistVariable( df_label=makeROOTCompatible(variable="dmID"), label=r"dmID", unit=r"", bins=30, scope=(0,30), ), hist_components= [HistComponent( label="dmID" ) ] ), Histogram( name='foxWolframR2', title=r"foxWolframR2", hist_variable=HistVariable( df_label=makeROOTCompatible(variable="foxWolframR2"), label=r"foxWolframR2", unit=r"", bins=10, scope=(0,1), ), hist_components=[HistComponent( label="foxWolframR2" ) ] ) ]
[docs] def get_number_of_signal_for_efficiency(self, df: pd.DataFrame) -> float: return df["isSignalAcceptMissingNeutrino"].sum()