"""Block-like object of SLHA data."""
import logging
from abc import ABCMeta, abstractmethod
from collections import OrderedDict
from typing import (
Any,
ClassVar,
Generic,
Iterator,
List,
MutableMapping,
Optional,
Sequence,
Tuple,
TypeVar,
Union,
cast,
overload,
)
from yaslha._collections import OrderedTupleOrderInsensitiveDict
from yaslha.comment import CommentInterface
from yaslha.line import (
BlockHeadLine,
DecayHeadLine,
DecayKeyType,
DecayLine,
DecayValueType,
InfoKeyType,
InfoLine,
InfoValueType,
KeyType,
ValueLine,
ValueType,
)
T = TypeVar("T")
logger = logging.getLogger(__name__)
KT = TypeVar("KT", KeyType, InfoKeyType)
VT = TypeVar("VT", ValueType, InfoValueType)
LT = TypeVar("LT", ValueLine, InfoLine)
CT = TypeVar("CT", str, List[str])
KTG = TypeVar("KTG", KeyType, InfoKeyType, DecayKeyType)
[docs]class GenericBlock(Generic[KTG, CT], metaclass=ABCMeta):
"""Block-like object containing comments."""
@abstractmethod
def __init__(self) -> None:
self.head = NotImplemented # type: Union[BlockHeadLine, DecayHeadLine]
self._comment = CommentInterface(self) # type: CommentInterface[KTG, CT]
@property
def comment(self) -> "CommentInterface[KTG, CT]":
"""Give the interface to comments."""
return self._comment
@abstractmethod
def _get_comment(self, key: KTG) -> CT:
pass
@abstractmethod
def _get_pre_comment(self, key: KTG) -> List[str]:
pass
@abstractmethod
def _set_comment(self, key: KTG, value: CT) -> None:
pass
@abstractmethod
def _set_pre_comment(self, key: KTG, value: Sequence[str]) -> None:
pass
@classmethod
@overload
def new(cls, obj: Union[int, DecayHeadLine]) -> "Decay":
"""..."""
pass
@classmethod # noqa: F811
@overload
def new(cls, obj: Union[str, BlockHeadLine]) -> Union["Block", "InfoBlock"]:
"""..."""
pass
[docs] @classmethod # noqa: F811
def new(
cls, obj: Union[int, str, DecayHeadLine, BlockHeadLine]
) -> "Union[Block, InfoBlock, Decay]":
"""Create a GenericBlock object according to the argument."""
if isinstance(obj, int) or isinstance(obj, DecayHeadLine):
return Decay(obj)
name = obj.name if isinstance(obj, BlockHeadLine) else obj
if name.endswith("INFO"):
return InfoBlock(obj)
else:
return Block(obj)
[docs]class AbsBlock(GenericBlock[KT, CT], Generic[KT, VT, LT, CT], metaclass=ABCMeta):
"""Abstract class for SLHA blocks."""
@abstractmethod
def __init__(self, obj: Union[BlockHeadLine, str]) -> None:
super().__init__()
if isinstance(obj, BlockHeadLine):
self.head = obj # type: BlockHeadLine
elif isinstance(obj, str):
self.head = BlockHeadLine(name=obj)
else:
raise TypeError(obj)
self._comment = CommentInterface(self)
self._data = NotImplemented # must be initialized in subclasses
@property
def name(self) -> str:
"""Return the name of block (always in upper case)."""
return self.head.name
@property
def q(self) -> Optional[float]:
"""Return the Q value."""
return self.head.q
@q.setter
def q(self, value: Optional[float]) -> None:
self.head.q = value
[docs] @abstractmethod
def __getitem__(self, key: KT) -> Union[VT, Sequence[VT]]:
"""Return the value corresponding to the key."""
[docs] @abstractmethod
def __setitem__(self, key: KT, value: VT) -> None:
"""Set the value for the key."""
[docs] @abstractmethod
def __delitem__(self, key: KT) -> None:
"""Delete the value for the key."""
[docs] @abstractmethod
def update_line(self, line: LT) -> None:
"""Add the line to the block, overriding if exists."""
[docs] @abstractmethod
def keys(self, sort: bool = False) -> Iterator[KT]:
"""Return the keys."""
__iter__ = keys
[docs] @abstractmethod
def items(self, sort: bool = False) -> Iterator[Tuple[KT, VT]]:
"""Return (key, value) tuples."""
@abstractmethod
def _lines(self, sort: bool = False) -> Iterator[Tuple[KT, LT]]:
pass
[docs]class Block(AbsBlock[KeyType, ValueType, ValueLine, str]):
"""SLHA block that has one value for one key."""
def __init__(self, obj: Union[BlockHeadLine, str]) -> None:
super().__init__(obj)
self._data = OrderedDict() # type: OrderedDict[KeyType, ValueLine]
[docs] def __getitem__(self, key: KeyType) -> ValueType:
"""Return the value corresponding to the key."""
return self._data[key].value
[docs] def __setitem__(self, key: KeyType, value: ValueType) -> None:
"""Set the value for the key."""
if key in self._data:
self._data[key].value = value
else:
self._data[key] = ValueLine.new(key, value)
[docs] def __delitem__(self, key: KeyType) -> None:
"""Delete the value for the key."""
del self._data[key]
[docs] def update_line(self, line: ValueLine) -> None:
"""Add the line to the block, overriding if exists."""
self._data[line.key] = line
[docs] def merge(self, another: "Union[Block, InfoBlock]") -> None:
"""Merge another block."""
if isinstance(another, Block):
for _, line in another._lines():
self.update_line(line)
else:
raise ValueError(another)
[docs] def get(self, *key: Any, default: T) -> Union[ValueType, T]:
"""Return the value for the key if exists, or default value."""
# Here, `key` is always a tuple.
# Meanwhile, the key for `__getitem__` is a tuple only if length > 1.
if isinstance(key, tuple) and len(key) == 1:
key = key[0]
if key in self._data:
return self.__getitem__(key)
else:
return default
[docs] def keys(self, sort: bool = False) -> Iterator[KeyType]:
"""Return the keys."""
for k, _ in self._lines(sort=sort):
yield k
__iter__ = keys
[docs] def items(self, sort: bool = False) -> Iterator[Tuple[KeyType, ValueType]]:
"""Return (key, value) tuples."""
for k, line in self._lines(sort=sort):
yield k, line.value
def _lines(self, sort: bool = False) -> Iterator[Tuple[KeyType, ValueLine]]:
if sort:
key_line_tuples = list(self._data.items())
key_line_tuples.sort(key=lambda k: k[0])
for i in key_line_tuples:
yield i
else:
for i in self._data.items():
yield i
def _get_comment(self, key: KeyType) -> str:
return self._data[key].comment
def _get_pre_comment(self, key: KeyType) -> List[str]:
return self._data[key].pre_comment
def _set_comment(self, key: KeyType, value: Optional[str]) -> None:
self._data[key].comment = value or ""
def _set_pre_comment(self, key: KeyType, value: Optional[Sequence[str]]) -> None:
self._data[key].pre_comment = [v for v in value] if value else []
[docs]class InfoBlock(AbsBlock[InfoKeyType, InfoValueType, InfoLine, List[str]]):
"""SLHA block that may have multiple values for one key."""
def __init__(self, obj: Union[BlockHeadLine, str]) -> None:
super().__init__(obj)
self._data = [] # type: List[InfoLine]
[docs] def __getitem__(self, key: InfoKeyType) -> Sequence[InfoValueType]:
"""Return the value corresponding to the key."""
return tuple(line.value for line in self._data if line.key == key)
[docs] def __setitem__(self, key: InfoKeyType, value: Sequence[InfoValueType]) -> None:
"""Set the value for the key."""
if isinstance(value, str):
raise TypeError(value) # Fail-safe; only List[str] is allowed!
self.__delitem__(key)
self._data.extend([InfoLine(key, v) for v in value])
[docs] def __delitem__(self, key: InfoKeyType) -> None:
"""Delete the value for the key."""
self._data = [line for line in self._data if line.key != key]
[docs] def update_line(self, line: InfoLine) -> None:
"""Add the line to the block, overriding if exists."""
self.__delitem__(line.key)
self._data.append(line)
[docs] def append_line(self, line: InfoLine) -> None:
"""Add the line, appending to the existing one if exists."""
self._data.append(line)
[docs] def append(self, key: InfoKeyType, value: InfoValueType) -> None:
"""Append the value for the key."""
self.append_line(InfoLine(key, value))
[docs] def merge(self, another: "Union[Block, InfoBlock]") -> None:
"""Merge another block."""
if isinstance(another, InfoBlock):
updated = {} # type: MutableMapping[InfoKeyType, bool]
for key, line in another._lines():
if updated.get(key):
self.append_line(line)
else:
updated[key] = True
self.update_line(line)
else:
raise ValueError(another)
[docs] def keys(self, sort: bool = False) -> Iterator[InfoKeyType]:
"""Return the keys."""
keys_dict = {} # type: MutableMapping[InfoKeyType, bool]
for line in self._data:
keys_dict[line.key] = True
keys = list(keys_dict.keys())
if sort:
keys.sort()
for k in keys:
yield k
__iter__ = keys
[docs] def items(self, sort: bool = False) -> Iterator[Tuple[InfoKeyType, InfoValueType]]:
"""Return (key, value) tuples."""
for key, line in self._lines(sort):
yield key, line.value
def _lines(self, sort: bool = False) -> Iterator[Tuple[InfoKeyType, InfoLine]]:
for key in self.keys(sort):
for line in self._data:
if line.key == key:
yield key, line
def _get_comment(self, key: InfoKeyType) -> List[str]:
return [line.comment for line in self._data if line.key == key]
def _get_pre_comment(self, key: InfoKeyType) -> List[str]:
return self._data[key].pre_comment
def _set_comment(self, key: InfoKeyType, value: Optional[Sequence[str]]) -> None:
lines = [line for line in self._data if line.key == key]
if value is None:
value = []
if len(lines) < len(value):
raise ValueError(value) # too many values
for i, line in enumerate(lines):
if i < len(value):
line.comment = value[i]
def _set_pre_comment(
self, key: InfoKeyType, value: Optional[Sequence[str]]
) -> None:
for line in self._data:
if line.key == key:
line.pre_comment = [v for v in value] if value else []
value = [] # to remove all the remaining pre_comment
[docs]class Decay(GenericBlock[DecayKeyType, str]):
"""Decay block."""
br_normalize_threshold = 1.0e-6 # type: ClassVar[float]
def __init__(self, obj: Union[DecayHeadLine, int]) -> None:
super().__init__()
if isinstance(obj, DecayHeadLine):
self.head = obj # type: DecayHeadLine
elif isinstance(obj, int):
self.head = DecayHeadLine(pid=obj, width=0)
else:
raise TypeError(obj)
self._data = (
OrderedTupleOrderInsensitiveDict()
) # type: OrderedTupleOrderInsensitiveDict[DecayKeyType, DecayLine]
@property
def pid(self) -> int:
"""Return the pid of mother particle."""
return self.head.pid
@property
def width(self) -> float:
"""Return the total width."""
return self.head.width
[docs] def update_line(self, line: DecayLine) -> None:
"""Add the line to the block, overriding if exists."""
self._data[line.key] = line
[docs] def br(self, *key: int) -> DecayValueType:
"""Return the BR of given channel."""
if key in self._data:
return self._data[key].br
else:
return 0
[docs] def partial_width(self, *key: int) -> float:
"""Return the width of given channel."""
return self.width * self.br(*key)
[docs] def keys(self, sort: bool = False) -> Iterator[DecayKeyType]:
"""Return the keys."""
for k, _ in self._lines(sort):
yield k
__iter__ = keys
[docs] def items_br(self, sort=False):
# type: (bool)->Iterator[Tuple[DecayKeyType, DecayValueType]]
"""Return (key, BR) tuples, sorted by the BR."""
for k, line in self._lines(sort):
yield k, line.br
[docs] def items_partial_width(self, sort=False):
# type: (bool)->Iterator[Tuple[DecayKeyType, float]]
"""Return (key, width) tuples, sorted by the BR."""
for k, line in self._lines(sort):
yield k, self.width * line.br
def _lines(self, sort: bool = False) -> Iterator[Tuple[DecayKeyType, DecayLine]]:
if sort:
key_line_tuples = list(self._data.items())
key_line_tuples.sort(key=lambda k: -k[1].br)
for i in key_line_tuples:
yield i
else:
for i in self._data.items():
yield i
def _get_comment(self, key: DecayKeyType) -> str:
return self._data[key].comment
def _get_pre_comment(self, key: DecayKeyType) -> List[str]:
return self._data[key].pre_comment
def _set_comment(self, key: DecayKeyType, value: Optional[str]) -> None:
self._data[key].comment = value or ""
def _set_pre_comment(
self, key: DecayKeyType, value: Optional[Sequence[str]]
) -> None:
self._data[key].pre_comment = [v for v in value] if value else []
[docs] def normalize(self, force: bool = False) -> None:
"""Normalize the branching ratios.
This method normalize all the branching ratios so that the sum becomes
unity or less. In particular, if `force` is set True, they are
normalized so that the sum becomes unity, regardless of the current
value.
If `force` is False and the sum is less than one, the branching ratio
is not normalized, assuming that some decay channels are not listed. If
`force` is False and the sum slightly exceeds the unity, the branching
ratios are normalized, while if the excess is larger than
`br_normalize_threshold`, `ValueError` is raised.
"""
br_list = [br for _, br in self.items_br(sort=True)]
total = sum(reversed(br_list)) # sum taken from smaller to reduce error
if not total > 0:
return # stable particle
elif force:
logger.debug("BR for %d force normalized: %g", self.pid, total)
pass # to normalize
elif total > 1 + self.br_normalize_threshold:
logger.critical("BR for %d exceeds unity: %g", self.pid, total)
raise ValueError(total)
elif total >= 1:
pass # to normalize
else: # if force==False and total < 1
rest = 1 - total
assert rest > 0
if rest > self.br_normalize_threshold:
if hasattr(self, "_br_warned"):
pass
else:
# warn only once
self._br_warned = True
logger.warning("BR for %d is less than unity by %g", self.pid, rest)
return # not normalize
for v in self._data.values():
v.br /= total
[docs] def set_partial_width(self, *args: Union[int, float]) -> None:
"""Update the partial width and recalculate BRs of all channels."""
if len(args) < 3 or not all(isinstance(i, int) for i in args[:-1]):
raise KeyError(*args)
key = cast(List[int], args[:-1])
new_partial_width = float(args[-1])
self.normalize()
if key in self._data:
old_partial_width = self.partial_width(*key)
else:
old_partial_width = 0
self.update_line(DecayLine(br=0, channel=key))
# update total width
old_width = self.width
new_width = new_partial_width - old_partial_width + old_width
self.head.width = new_width
# update the modified channel
self._data[key].br = new_partial_width / new_width
target = self._data[key]
for line in self._data.values():
if line != target:
line.br *= old_width / new_width
[docs] def remove(self, *key: int) -> None:
"""Remove the channel and recalculate BRs of all the other channels."""
self.set_partial_width(*key, 0)
del self._data[key]