import logging
import unicodedata
from abc import ABCMeta
from abc import abstractmethod
from collections import deque
from enum import Enum
from typing import Any
from typing import Deque
from typing import List
from typing import Optional
from typing import Set
from typing import Tuple
from typing import Type
from typing import TypeVar
import antlr4
import antlr4.error.ErrorListener
from antlr4 import CommonTokenStream
from antlr4 import InputStream
from antlr4 import ParserRuleContext
from antlr4 import ParseTreeWalker
from fcsql.FCSLexer import FCSLexer
from fcsql.FCSParser import FCSParser
from fcsql.FCSParserListener import FCSParserListener
# ---------------------------------------------------------------------------
LOGGER = logging.getLogger(__name__)
_T = TypeVar("_T", bound="QueryNode")
OCCURS_UNBOUNDED = -1
"""Atom occurrence if not bound."""
# ---------------------------------------------------------------------------
[docs]class QueryNodeType(str, Enum):
"""Node types of FCS-QL expression tree nodes."""
def __str__(self) -> str:
return self.value
QUERY_SEGMENT = "QuerySegment"
"""Segment query."""
QUERY_GROUP = "QueryGroup"
"""Group query."""
QUERY_SEQUENCE = "QuerySequence"
"""Sequence query."""
QUERY_DISJUNCTION = "QueryDisjunction"
"""Or query."""
QUERY_WITH_WITHIN = "QueryWithWithin"
"""Query with within part."""
EXPRESSION = "Expression"
"""Simple expression."""
EXPRESSION_WILDCARD = "Wildcard"
"""Wildcard expression."""
EXPRESSION_GROUP = "Group"
"""Group expression."""
EXPRESSION_OR = "Or"
"""Or expression."""
EXPRESSION_AND = "And"
"""And expression."""
EXPRESSION_NOT = "Not"
"""Not expression."""
SIMPLE_WITHIN = "SimpleWithin"
"""Simple within part."""
[docs]class Operator(str, Enum):
"""FCS-QL operators."""
def __str__(self) -> str:
return self.value
EQUALS = "Eq"
"""EQUALS operator."""
NOT_EQUALS = "Ne"
"""NOT-EQUALS operator."""
[docs]class RegexFlag(str, Enum):
"""FCS-QL expression tree regex flags."""
def __new__(cls, name: str, char: str):
obj = str.__new__(cls, name)
obj._value_ = name
obj.char = char
return obj
char: str
def __str__(self) -> str:
return self.value
CASE_INSENSITIVE = ("case-insensitive", "i")
"""Case insensitive."""
CASE_SENSITIVE = ("case-sensitive", "I")
"""Case sensitive."""
LITERAL_MATCHING = ("literal-matching", "l")
"""match exactly (= literally)"""
IGNORE_DIACRITICS = ("ignore-diacritics", "d")
"""Ignore all diacritics."""
[docs]class SimpleWithinScope(str, Enum):
"""The within scope."""
def __str__(self) -> str:
return self.value
SENTENCE = "Sentence"
"""sentence scope (small)"""
UTTERANCE = "Utterance"
"""utterance scope (small)"""
PARAGRAPH = "Paragraph"
"""paragraph scope (medium)"""
TURN = "Turn"
"""turn scope (medium)"""
TEXT = "Text"
"""text scope (large)"""
SESSION = "Session"
"""session scope (large)"""
# ---------------------------------------------------------------------------
[docs]class QueryVisitor(metaclass=ABCMeta):
"""Interface implementing a Visitor pattern for FCS-QL expression trees.
Default method implementations do nothing.
"""
[docs] def visit(self, node: "QueryNode") -> None:
"""Visit a query node. Generic handler, dispatches to visit methods
based on `QueryNodeType` if exists else do nothing::
method = "visit_" + node.node_type.value
Args:
node: the node to visit
Returns:
``None``
"""
if not node:
return None
def noop(node):
pass
# search for specific visit function based on node_type
method = getattr(self, f"visit_{node.node_type}", noop)
method(node)
# ---------------------------------------------------------------------------
[docs]class QueryNode(metaclass=ABCMeta):
"""Base class for FCS-QL expression tree nodes."""
def __init__(
self,
node_type: QueryNodeType,
children: Optional[List["QueryNode"]] = None,
child: Optional["QueryNode"] = None,
):
"""[Constructor]
Args:
node_type: the type of the node
children: the children of this node or ``None``. Defaults to None.
child: the child of this node or ``None``. Defaults to None.
"""
self.node_type = node_type
"""The node type of this node."""
self.parent: Optional[QueryNode] = None
"""The parent node of this node.
``None`` if this is the root node.
"""
if not children:
children = list()
self.children = list(children)
"""The children of this node."""
if child:
self.children.append(child)
[docs] def has_node_type(self, node_type: QueryNodeType) -> bool:
"""Check, if node if of given type.
Args:
node_type: type to check against
Returns:
bool: ``True`` if node is of given type, ``False`` otherwise
Raises:
TypeError: if node_type is ``None``
"""
if node_type is None:
raise TypeError("node_type is None")
return self.node_type == node_type
@property
def child_count(self) -> int:
"""Get the number of children of this node.
Returns:
int: the number of children of this node
"""
return len(self.children) if self.children else 0
[docs] def get_child(
self, idx: int, clazz: Optional[Type[_T]] = None
) -> Optional["QueryNode"]:
"""Get a child node of specified type by index.
When supplied with ``clazz`` parameter, only child nodes of
the requested type are counted.
Args:
idx: the index of the child node (if `clazz` provided, only consideres child nodes of requested type)
clazz: the type to nodes to be considered, optional
Returns:
QueryNode: the child node of this node or ``None`` if not child was found (e.g. type mismatch or index out of bounds)
"""
if not self.children or idx < 0 or idx > self.child_count:
return None
if not clazz:
return self.children[idx]
pos = 0
for child in self.children:
if isinstance(child, clazz):
if pos == idx:
return child
pos += 1
return None
[docs] def get_first_child(
self, clazz: Optional[Type[_T]] = None
) -> Optional["QueryNode"]:
"""Get this first child node.
Args:
clazz: the type to nodes to be considered
Returns:
QueryNode: the first child node of this node or ``None``
"""
return self.get_child(0, clazz=clazz)
[docs] def get_last_child(self, clazz: Optional[Type[_T]] = None) -> Optional["QueryNode"]:
"""Get this last child node.
Args:
clazz: the type to nodes to be considered
Returns:
QueryNode: the last child node of this node or ``None``
"""
return self.get_child(self.child_count - 1, clazz=clazz)
def __str__(self) -> str:
chs = " ".join(map(str, self.children))
return f"({self.node_type!s}{' ' + chs if chs else ''})"
[docs] @abstractmethod
def accept(self, visitor: QueryVisitor) -> None:
pass
# ---------------------------------------------------------------------------
[docs]class Expression(QueryNode):
"""A FCS-QL expression tree SIMPLE expression node."""
def __init__(
self,
qualifier: Optional[str],
identifier: str,
operator: Operator,
regex: str,
regex_flags: Optional[Set[RegexFlag]],
):
"""[Constructor]
Args:
qualifier: the layer identifier qualifier or ``None``
identifier: the layer identifier
operator: the operator
regex: the regular expression
regex_flags: the regular expression flags or ``None``
"""
super().__init__(QueryNodeType.EXPRESSION)
if not qualifier or qualifier.isspace():
qualifier = None
if not regex_flags:
regex_flags = None
else:
regex_flags = set(regex_flags)
self.qualifier = qualifier
"""The Layer Type Identifier qualifier.
``None`` if not used in this expression.
"""
self.identifier = identifier
"""The layer identifier."""
self.operator = operator
"""The operator."""
self.regex = regex
"""The regex value."""
self.regex_flags = regex_flags
"""The regex flags set.
``None`` if no flags were used in this expression.
"""
[docs] def has_layer_identifier(self, identifier: str) -> bool:
"""Check if the expression used a given **Layer Type Identifier**.
Args:
identifier: the Layer Type Identifier to check against
Returns:
bool: ``True`` if this identifier was used, ``False`` otherwise
Raises:
TypeError: if identifier is ``None``
"""
if identifier is None:
raise TypeError("identifier is None")
return self.identifier == identifier
[docs] def is_layer_qualifier_empty(self) -> bool:
"""Check if the Layer Type Identifier qualifier is empty.
Returns:
bool: ``True`` if no Layer Type Identifier qualifier was set, ``False`` otherwise
"""
# NOTE: check only `self.qualifier is None` ?
return bool(self.qualifier)
[docs] def has_layer_qualifier(self, qualifier: str) -> bool:
"""Check if the expression used a given qualifier for the Layer Type
Identifier.
Args:
qualifier: the qualifier to check against
Returns:
bool: ``True`` if this identifier was used, ``False`` otherwise
Raises:
TypeError: if qualifier is ``None``
"""
if qualifier is None:
raise TypeError("qualifier is None")
if self.is_layer_qualifier_empty():
return False
return self.qualifier == qualifier
[docs] def has_operator(self, operator: Operator) -> bool:
"""Check if expression used a given operator.
Args:
operator: the operator to check
Returns:
bool: ``True`` if the given operator was used, ``False`` otherwise
Raises:
TypeError: if operator is ``None``
"""
if operator is None:
raise TypeError("operator is None")
return self.operator == operator
[docs] def is_regex_flags_empty(self) -> bool:
"""Check if a regex flag set is empty.
Returns:
bool: ``True`` if no regex flags where set, ``False`` otherwise
"""
return bool(self.regex_flags)
[docs] def has_regex_flag(self, flag: RegexFlag) -> bool:
"""Check if a regex flag is set.
Args:
flag: the flag to be checked
Returns:
bool: ``True`` if the flag is set, ``False`` otherwise
Raises:
TypeError: if flag is ``None``
"""
if flag is None:
raise TypeError("flag is None")
if not self.regex_flags:
return False
return flag in self.regex_flags
def __str__(self) -> str:
parts = list()
parts.append(f"({self.node_type!s} ")
parts.append(f"{self.qualifier}:" if self.qualifier else "")
parts.append(f'{self.identifier} {self.operator!s} "')
parts.append(
self.regex.translate(str.maketrans({"\n": "\\n", "\r": "\\r", "\t": "\\t"})) # type: ignore
)
parts.append('"')
if self.regex_flags:
parts.append("/")
# TODO: use chars from RegexFlag enum. How to guarantee same order?
parts.append("i" if RegexFlag.CASE_INSENSITIVE in self.regex_flags else "")
parts.append("I" if RegexFlag.CASE_SENSITIVE in self.regex_flags else "")
parts.append("l" if RegexFlag.LITERAL_MATCHING in self.regex_flags else "")
parts.append("d" if RegexFlag.IGNORE_DIACRITICS in self.regex_flags else "")
return "".join(parts)
[docs] def accept(self, visitor: QueryVisitor) -> None:
visitor.visit(self)
# ---------------------------------------------------------------------------
[docs]class ExpressionWildcard(QueryNode):
"""A FCS-QL expression tree WILDCARD expression node."""
def __init__(
self,
children: Optional[List["QueryNode"]] = None,
child: Optional["QueryNode"] = None,
):
super().__init__(
QueryNodeType.EXPRESSION_WILDCARD, children=children, child=child
)
[docs] def accept(self, visitor: QueryVisitor) -> None:
visitor.visit(self)
[docs]class ExpressionGroup(QueryNode):
"""A FCS-QL expression tree GROUP expression node."""
def __init__(self, child: QueryNode):
"""[Constructor]
Args:
child: the group content
"""
super().__init__(QueryNodeType.EXPRESSION_GROUP, child=child)
def __str__(self) -> str:
return f"({self.node_type!s} {self.get_first_child()!s})"
[docs] def accept(self, visitor: QueryVisitor) -> None:
if self.children:
# for child in self.children:
# child.accept(visitor)
self.children[0].accept(visitor)
visitor.visit(self)
[docs]class ExpressionNot(QueryNode):
"""A FCS-QL expression tree NOT expression node."""
def __init__(self, child: QueryNode):
"""[Constructor]
Args:
child: the child expression
"""
super().__init__(QueryNodeType.EXPRESSION_NOT, child=child)
def __str__(self) -> str:
return f"({self.node_type!s} {self.get_first_child()!s})"
[docs] def accept(self, visitor: QueryVisitor) -> None:
if self.children:
# for child in self.children:
# child.accept(visitor)
self.children[0].accept(visitor)
visitor.visit(self)
[docs]class ExpressionAnd(QueryNode):
"""A FCS-QL expression tree AND expression node."""
def __init__(self, children: List[QueryNode]):
"""[Constructor]
Args:
children: child elements covered by AND expression.
"""
super().__init__(QueryNodeType.EXPRESSION_AND, children=children)
@property
def operands(self) -> List[QueryNode]:
"""Get the AND expression operands.
Returns:
List[QueryNode]: a list of expressions
"""
return self.children
[docs] def accept(self, visitor: QueryVisitor) -> None:
if self.children:
for child in self.children:
child.accept(visitor)
visitor.visit(self)
[docs]class ExpressionOr(QueryNode):
"""A FCS-QL expression tree OR expression node."""
def __init__(self, children: List[QueryNode]):
"""[Constructor]
Args:
children: child elements covered by OR expression.
"""
super().__init__(QueryNodeType.EXPRESSION_OR, children=children)
@property
def operands(self) -> List[QueryNode]:
"""Get the OR expression operands.
Returns:
List[QueryNode]: a list of expressions
"""
return self.children
[docs] def accept(self, visitor: QueryVisitor) -> None:
if self.children:
for child in self.children:
child.accept(visitor)
visitor.visit(self)
# ---------------------------------------------------------------------------
[docs]class QueryDisjunction(QueryNode):
"""A FCS-QL expression tree QR query."""
def __init__(self, children: List[QueryNode]):
"""[Constructor]
Args:
children: the children
"""
super().__init__(QueryNodeType.QUERY_DISJUNCTION, children=children)
[docs] def accept(self, visitor: QueryVisitor) -> None:
if self.children:
for child in self.children:
child.accept(visitor)
visitor.visit(self)
[docs]class QuerySequence(QueryNode):
"""A FCS-QL expression tree query sequence node."""
def __init__(self, children: List[QueryNode]):
"""[Constructor]
Args:
children: the children for this node
"""
super().__init__(QueryNodeType.QUERY_SEQUENCE, children=children)
[docs] def accept(self, visitor: QueryVisitor) -> None:
if self.children:
for child in self.children:
child.accept(visitor)
visitor.visit(self)
[docs]class QueryWithWithin(QueryNode):
"""FCS-QL expression tree QUERY-WITH-WITHIN node."""
def __init__(self, query: QueryNode, within: Optional[QueryNode]):
"""[Constructor]
Args:
query: the query node
within: the within node
"""
children = [query, within] if within else [query]
super().__init__(QueryNodeType.QUERY_WITH_WITHIN, children=children)
[docs] def get_query(self) -> QueryNode:
"""Get the query clause.
Returns:
QueryNode: the query clause
"""
return self.children[0]
[docs] def get_within(self) -> Optional[QueryNode]:
"""Get the within clause (= search context)
Returns:
QueryNode: the witin clause
"""
return self.get_child(1)
[docs] def accept(self, visitor: QueryVisitor) -> None:
self.children[0].accept(visitor)
within = self.get_child(1)
if within:
within.accept(visitor)
visitor.visit(self)
[docs]class QuerySegment(QueryNode):
"""A FCS-QL expression tree query segment node."""
def __init__(self, expression: QueryNode, min_occurs: int, max_occurs: int):
"""[Constructor]
Args:
expression: the expression
min_occurs: the minimum occurrence
max_occurs: the maximum occurrence
"""
super().__init__(QueryNodeType.QUERY_SEGMENT, child=expression)
self.min_occurs = min_occurs
"""The minimum occurrence of this segment."""
self.max_occurs = max_occurs
"""The maximum occurrence of this segment."""
[docs] def get_expression(self) -> QueryNode:
"""Get the expression for this segment.
Returns:
QueryNode: the expression
"""
return self.children[0]
def __str__(self) -> str:
ret = f"({self.node_type!s} "
if self.min_occurs != 1:
ret += f"@min={'*' if self.min_occurs == OCCURS_UNBOUNDED else self.min_occurs} "
if self.max_occurs != 1:
ret += f"@max={'*' if self.max_occurs == OCCURS_UNBOUNDED else self.max_occurs} "
ret += f"{self.children[0]!s})"
return ret
[docs] def accept(self, visitor: QueryVisitor) -> None:
self.children[0].accept(visitor)
visitor.visit(self)
[docs]class QueryGroup(QueryNode):
"""A FCS-QL expression tree GROUP query node."""
def __init__(self, child: QueryNode, min_occurs: int, max_occurs: int):
"""[Constructor]
Args:
child: the child
min_occurs: the minimum occurrence
max_occurs: the maximum occurrence
"""
super().__init__(QueryNodeType.QUERY_SEGMENT, child=child)
self.min_occurs = min_occurs
"""The minimum occurrence of group content."""
self.max_occurs = max_occurs
"""The maximum occurrence of group content."""
[docs] def get_content(self) -> QueryNode:
"""Get the group content.
Returns:
QueryNode: the content of the GROUP query
"""
return self.children[0]
def __str__(self) -> str:
ret = f"({self.node_type!s} "
if self.min_occurs != 1:
ret += f"@min={'*' if self.min_occurs == OCCURS_UNBOUNDED else self.min_occurs} "
if self.max_occurs != 1:
ret += f"@max={'*' if self.max_occurs == OCCURS_UNBOUNDED else self.max_occurs} "
ret += f"{self.children[0]!s})"
return ret
[docs] def accept(self, visitor: QueryVisitor) -> None:
if self.children:
for child in self.children:
child.accept(visitor)
visitor.visit(self)
# ---------------------------------------------------------------------------
[docs]class SimpleWithin(QueryNode):
"""A FCS-QL expression tree SIMPLE WITHIN query node."""
def __init__(self, scope: SimpleWithinScope):
super().__init__(QueryNodeType.SIMPLE_WITHIN)
self.scope = scope
"""The simple within scope."""
def __str__(self) -> str:
return f"({self.node_type!s} {self.scope!s})"
[docs] def accept(self, visitor: QueryVisitor) -> None:
visitor.visit(self)
# ---------------------------------------------------------------------------
REP_ZERO_OR_MORE = (0, OCCURS_UNBOUNDED)
REP_ONE_OR_MORE = (1, OCCURS_UNBOUNDED)
REP_ZERO_OR_ONE = (0, 1)
EMPTY_STRING = ""
DEFAULT_IDENTIFIER = "text"
DEFAULT_OPERATOR = Operator.EQUALS
DEFAULT_UNICODE_NORMALIZATION_FORM = "NFC"
"""Default unicode normalization form.
See also: `unicodedata.normalize
<https://docs.python.org/3/library/unicodedata.html#unicodedata.normalize>`_
"""
# ---------------------------------------------------------------------------
[docs]class ErrorListener(antlr4.error.ErrorListener.ErrorListener):
def __init__(self, query: str) -> None:
super().__init__()
self.query = query
self.errors: List[str] = list()
[docs] def syntaxError(self, recognizer, offendingSymbol, line, column, msg, e):
# FIXME: additional information of error should not be logged but added
# to the list of errors; that list probably needs to be enhanced to
# store supplementary information Furthermore, a sophisticated
# errorlist implementation could also be used by the QueryVistor to add
# addition query error information
if LOGGER.isEnabledFor(logging.DEBUG):
if isinstance(offendingSymbol, antlr4.Token):
pos = offendingSymbol.start
if pos != -1:
LOGGER.debug("query: %s", self.query)
LOGGER.debug(" %s^- %s", " " * pos, msg)
self.errors.append(msg)
[docs] def has_errors(self) -> bool:
return bool(self.errors)
[docs]class QueryParserException(Exception):
"""Query parser exception."""
[docs]class ExpressionTreeBuilderException(Exception):
"""Error building expression tree."""
[docs]class ExpressionTreeBuilder(FCSParserListener):
def __init__(self, parser: "QueryParser") -> None:
super().__init__()
self.parser = parser
self.stack: Deque[Any] = deque()
self.stack_Query_disjunction: Deque[int] = deque()
"""for `enterQuery_disjunction`/`exitQuery_disjunction`"""
self.stack_Query_sequence: Deque[int] = deque()
"""for `enterQuery_sequence`/`exitQuery_sequence`"""
self.stack_Expression_or: Deque[int] = deque()
"""for `enterExpression_or`/`exitExpression_or`"""
self.stack_Expression_and: Deque[int] = deque()
"""for `enterExpression_and`/`exitExpression_and`"""
# ----------------------------------------------------
[docs] def enterQuery(self, ctx: FCSParser.QueryContext):
if LOGGER.isEnabledFor(logging.DEBUG):
LOGGER.debug(
"enterQuery: children=%s / cnt=%s / text=%s",
ctx.children,
ctx.getChildCount(),
ctx.getText(),
)
return super().enterQuery(ctx)
[docs] def exitQuery(self, ctx: FCSParser.QueryContext):
w_ctx = ctx.getChild(0, FCSParser.Within_partContext)
if w_ctx is not None:
within = self.stack.pop()
query = self.stack.pop()
self.stack.append(QueryWithWithin(query, within))
if LOGGER.isEnabledFor(logging.DEBUG):
LOGGER.debug("exitQuery: stack=%s", self.stack)
return super().exitQuery(ctx)
[docs] def enterMain_query(self, ctx: FCSParser.Main_queryContext):
if LOGGER.isEnabledFor(logging.DEBUG):
LOGGER.debug(
"enterMain_query: children=%s / cnt=%s / text=%s",
ctx.children,
ctx.getChildCount(),
ctx.getText(),
)
return super().enterMain_query(ctx)
[docs] def exitMain_query(self, ctx: FCSParser.Main_queryContext):
if LOGGER.isEnabledFor(logging.DEBUG):
LOGGER.debug("exitMain_query: stack=%s", self.stack)
return super().exitMain_query(ctx)
[docs] def enterQuery_disjunction(self, ctx: FCSParser.Query_disjunctionContext):
if LOGGER.isEnabledFor(logging.DEBUG):
LOGGER.debug(
"enterQuery_disjunction: children=%s / cnt=%s / text=%s",
ctx.children,
ctx.getChildCount(),
ctx.getText(),
)
self.stack_Query_disjunction.append(len(self.stack))
return super().enterQuery_disjunction(ctx)
[docs] def exitQuery_disjunction(self, ctx: FCSParser.Query_disjunctionContext):
pos = self.stack_Query_disjunction.pop()
if len(self.stack) > pos:
items: List[QueryNode] = list()
while len(self.stack) > pos:
items.insert(0, self.stack.pop())
self.stack.append(QueryDisjunction(items))
else:
raise ExpressionTreeBuilderException("exitQuery_disjunction is empty")
if LOGGER.isEnabledFor(logging.DEBUG):
LOGGER.debug("exitQuery_disjunction: stack=%s", self.stack)
return super().exitQuery_disjunction(ctx)
[docs] def enterQuery_sequence(self, ctx: FCSParser.Query_sequenceContext):
if LOGGER.isEnabledFor(logging.DEBUG):
LOGGER.debug(
"enterQuery_sequence: children=%s / cnt=%s / text=%s",
ctx.children,
ctx.getChildCount(),
ctx.getText(),
)
self.stack_Query_sequence.append(len(self.stack))
return super().enterQuery_sequence(ctx)
[docs] def exitQuery_sequence(self, ctx: FCSParser.Query_sequenceContext):
pos = self.stack_Query_sequence.pop()
if len(self.stack) > pos:
items: List[QueryNode] = list()
while len(self.stack) > pos:
items.insert(0, self.stack.pop())
self.stack.append(QuerySequence(items))
else:
raise ExpressionTreeBuilderException("exitQuery_sequence is empty")
if LOGGER.isEnabledFor(logging.DEBUG):
LOGGER.debug("exitQuery_sequence: stack=%s", self.stack)
return super().exitQuery_sequence(ctx)
[docs] def enterQuery_group(self, ctx: FCSParser.Query_groupContext):
if LOGGER.isEnabledFor(logging.DEBUG):
LOGGER.debug(
"enterQuery_group: children=%s / cnt=%s / text=%s",
ctx.children,
ctx.getChildCount(),
ctx.getText(),
)
return super().enterQuery_group(ctx)
[docs] def exitQuery_group(self, ctx: FCSParser.Query_groupContext):
# handle repetition (if any)
min = max = 1
# fetch *first* child of type QuantifierContext, therefore idx=0
q_ctx = ctx.getChild(0, FCSParser.QualifierContext)
if q_ctx is not None:
min, max = ExpressionTreeBuilder.processRepetition(ctx)
content: QueryNode = self.stack.pop()
self.stack.append(QueryGroup(content, min, max))
if LOGGER.isEnabledFor(logging.DEBUG):
LOGGER.debug("exitQuery_group: stack=%s", self.stack)
return super().exitQuery_group(ctx)
[docs] def enterQuery_simple(self, ctx: FCSParser.Query_simpleContext):
if LOGGER.isEnabledFor(logging.DEBUG):
LOGGER.debug(
"enterQuery_simple: children=%s / cnt=%s / text=%s",
ctx.children,
ctx.getChildCount(),
ctx.getText(),
)
return super().enterQuery_simple(ctx)
[docs] def exitQuery_simple(self, ctx: FCSParser.Query_simpleContext):
# handle repetition (if any)
min = max = 1
# fetch *first* child of type QuantifierContext, therefore idx=0
q_ctx = ctx.getChild(0, FCSParser.QualifierContext)
if q_ctx is not None:
min, max = ExpressionTreeBuilder.processRepetition(ctx)
expression: QueryNode = self.stack.pop()
self.stack.append(QuerySegment(expression, min, max))
if LOGGER.isEnabledFor(logging.DEBUG):
LOGGER.debug("exitQuery_simple: stack=%s", self.stack)
return super().exitQuery_simple(ctx)
[docs] def enterQuery_implicit(self, ctx: FCSParser.Query_implicitContext):
if LOGGER.isEnabledFor(logging.DEBUG):
LOGGER.debug(
"enterQuery_implicit: children=%s / cnt=%s / text=%s",
ctx.children,
ctx.getChildCount(),
ctx.getText(),
)
self.stack.append(self.parser.default_operator)
self.stack.append(self.parser.default_identifier)
self.stack.append(EMPTY_STRING)
return super().enterQuery_implicit(ctx)
[docs] def exitQuery_implicit(self, ctx: FCSParser.Query_implicitContext):
regex_flags: Set[RegexFlag] = self.stack.pop()
regex_value: str = self.stack.pop()
qualifier: str = self.stack.pop()
identifier: str = self.stack.pop()
operator: Operator = self.stack.pop()
self.stack.append(
Expression(
qualifier=qualifier,
identifier=identifier,
operator=operator,
regex=regex_value,
regex_flags=regex_flags,
)
)
if LOGGER.isEnabledFor(logging.DEBUG):
LOGGER.debug("exitQuery_implicit: stack=%s", self.stack)
return super().exitQuery_implicit(ctx)
# TODO: check, abortable, if also exit?
[docs] def enterQuery_segment(self, ctx: FCSParser.Query_segmentContext):
if LOGGER.isEnabledFor(logging.DEBUG):
LOGGER.debug(
"enterQuery_segment: children=%s / cnt=%s / text=%s",
ctx.children,
ctx.getChildCount(),
ctx.getText(),
)
# if the context contains only two children, they must be
# '[' and ']' thus we are dealing with a wildcard segment
if ctx.getChildCount() == 2:
self.stack.append(ExpressionWildcard())
# TODO: not exactly matching the java implementation
# do we need to block 'visitQuery_segment' call?
return super().enterQuery_segment(ctx)
[docs] def exitQuery_segment(self, ctx: FCSParser.Query_segmentContext):
if LOGGER.isEnabledFor(logging.DEBUG):
LOGGER.debug("exitQuery_segment: stack=%s", self.stack)
return super().exitQuery_segment(ctx)
[docs] def enterExpression_basic(self, ctx: FCSParser.Expression_basicContext):
if LOGGER.isEnabledFor(logging.DEBUG):
LOGGER.debug(
"enterExpression_basic: children=%s / cnt=%s / text=%s",
ctx.children,
ctx.getChildCount(),
ctx.getText(),
)
tok_op = ctx.getChild(1).symbol
if tok_op.type == FCSLexer.OPERATOR_EQ:
self.stack.append(Operator.EQUALS)
elif tok_op.type == FCSLexer.OPERATOR_NE:
self.stack.append(Operator.NOT_EQUALS)
else:
raise ExpressionTreeBuilderException(
f"invalid operator type: {tok_op.text}"
)
return super().enterExpression_basic(ctx)
[docs] def exitExpression_basic(self, ctx: FCSParser.Expression_basicContext):
regex_flags: Set[RegexFlag] = self.stack.pop()
regex_value: str = self.stack.pop()
qualifier: str = self.stack.pop()
identifier: str = self.stack.pop()
operator: Operator = self.stack.pop()
self.stack.append(
Expression(
qualifier=qualifier,
identifier=identifier,
operator=operator,
regex=regex_value,
regex_flags=regex_flags,
)
)
if LOGGER.isEnabledFor(logging.DEBUG):
LOGGER.debug("exitExpression_basic: stack=%s", self.stack)
return super().exitExpression_basic(ctx)
[docs] def enterExpression_not(self, ctx: FCSParser.Expression_notContext):
if LOGGER.isEnabledFor(logging.DEBUG):
LOGGER.debug(
"enterExpression_not: children=%s / cnt=%s / text=%s",
ctx.children,
ctx.getChildCount(),
ctx.getText(),
)
return super().enterExpression_not(ctx)
[docs] def exitExpression_not(self, ctx: FCSParser.Expression_notContext):
expression: QueryNode = self.stack.pop()
self.stack.append(ExpressionNot(expression))
if LOGGER.isEnabledFor(logging.DEBUG):
LOGGER.debug("exitExpression_not: stack=%s", self.stack)
return super().exitExpression_not(ctx)
[docs] def enterExpression_group(self, ctx: FCSParser.Expression_groupContext):
if LOGGER.isEnabledFor(logging.DEBUG):
LOGGER.debug(
"enterExpression_group: children=%s / cnt=%s / text=%s",
ctx.children,
ctx.getChildCount(),
ctx.getText(),
)
return super().enterExpression_group(ctx)
[docs] def exitExpression_group(self, ctx: FCSParser.Expression_groupContext):
expression: QueryNode = self.stack.pop()
self.stack.append(ExpressionGroup(expression))
if LOGGER.isEnabledFor(logging.DEBUG):
LOGGER.debug("exitExpression_group: stack=%s", self.stack)
return super().exitExpression_group(ctx)
[docs] def enterExpression_or(self, ctx: FCSParser.Expression_orContext):
if LOGGER.isEnabledFor(logging.DEBUG):
LOGGER.debug(
"enterExpression_or: children=%s / cnt=%s / text=%s",
ctx.children,
ctx.getChildCount(),
ctx.getText(),
)
self.stack_Expression_or.append(len(self.stack))
return super().enterExpression_or(ctx)
[docs] def exitExpression_or(self, ctx: FCSParser.Expression_orContext):
pos = self.stack_Expression_or.pop()
if len(self.stack) > pos:
children: List[QueryNode] = list()
while len(self.stack) > pos:
children.insert(0, self.stack.pop())
self.stack.append(ExpressionOr(children))
else:
raise ExpressionTreeBuilderException("exitExpression_or is empty")
if LOGGER.isEnabledFor(logging.DEBUG):
LOGGER.debug("exitExpression_or: stack=%s", self.stack)
return super().exitExpression_or(ctx)
[docs] def enterExpression_and(self, ctx: FCSParser.Expression_andContext):
if LOGGER.isEnabledFor(logging.DEBUG):
LOGGER.debug(
"enterExpression_and: children=%s / cnt=%s / text=%s",
ctx.children,
ctx.getChildCount(),
ctx.getText(),
)
self.stack_Expression_and.append(len(self.stack))
return super().enterExpression_and(ctx)
[docs] def exitExpression_and(self, ctx: FCSParser.Expression_andContext):
pos = self.stack_Expression_and.pop()
if len(self.stack) > pos:
children: List[QueryNode] = list()
while len(self.stack) > pos:
children.insert(0, self.stack.pop())
self.stack.append(ExpressionAnd(children))
else:
raise ExpressionTreeBuilderException("exitExpression_and is empty")
if LOGGER.isEnabledFor(logging.DEBUG):
LOGGER.debug("exitExpression_and: stack=%s", self.stack)
return super().exitExpression_and(ctx)
# TODO: check, or exit
[docs] def enterAttribute(self, ctx: FCSParser.AttributeContext):
if LOGGER.isEnabledFor(logging.DEBUG):
LOGGER.debug(
"enterAttribute: children=%s / cnt=%s / text=%s",
ctx.children,
ctx.getChildCount(),
ctx.getText(),
)
# handle optional qualifier
q_ctx = ctx.getChild(0, FCSParser.QualifierContext)
qualifier = q_ctx.getText() if q_ctx is not None else EMPTY_STRING
self.stack.append(ctx.getChild(0, FCSParser.IdentifierContext).getText())
self.stack.append(qualifier)
return super().enterAttribute(ctx)
[docs] def exitAttribute(self, ctx: FCSParser.AttributeContext):
if LOGGER.isEnabledFor(logging.DEBUG):
LOGGER.debug("exitAttribute: stack=%s", self.stack)
return super().exitAttribute(ctx)
# TODO: check, or exit
[docs] def enterRegexp(self, ctx: FCSParser.RegexpContext):
if LOGGER.isEnabledFor(logging.DEBUG):
LOGGER.debug(
"enterRegexp: children=%s / cnt=%s / text=%s",
ctx.children,
ctx.getChildCount(),
ctx.getText(),
)
p_ctx = ctx.getChild(0, FCSParser.Regexp_patternContext)
regex = ExpressionTreeBuilder.stripQuotes(p_ctx.getText())
# process escape sequences, if present
if "\\" in regex:
regex = ExpressionTreeBuilder.unescapeString(regex)
# perform unicode normalization, if requested
if self.parser.unicode_normalization_form:
regex = unicodedata.normalize(self.parser.unicode_normalization_form, regex)
# FIXME: validate regex?
self.stack.append(regex)
# handle regex flags, if any
f_ctx = ctx.getChild(0, FCSParser.Regexp_flagContext)
if f_ctx:
val = f_ctx.getText()
flags: Set[RegexFlag] = set()
for i in range(len(val)):
flag = val[i]
if flag in ("i", "c"):
flags.add(RegexFlag.CASE_INSENSITIVE)
elif flag in ("I", "C"):
flags.add(RegexFlag.CASE_SENSITIVE)
elif flag == "l":
flags.add(RegexFlag.LITERAL_MATCHING)
elif flag == "d":
flags.add(RegexFlag.IGNORE_DIACRITICS)
else:
raise ExpressionTreeBuilderException(
f"unknown regex modifier flag: {flag}"
)
# validate regex flags
if (
RegexFlag.CASE_SENSITIVE in flags
and RegexFlag.CASE_INSENSITIVE in RegexFlag.CASE_SENSITIVE
):
raise ExpressionTreeBuilderException(
"invalid combination of regex modifier flags: "
"'i' or 'c' and 'I' or 'C' are mutually exclusive"
)
if RegexFlag.LITERAL_MATCHING in flags and any(
flag in flags
for flag in {
RegexFlag.CASE_SENSITIVE,
RegexFlag.CASE_INSENSITIVE,
RegexFlag.IGNORE_DIACRITICS,
}
):
raise ExpressionTreeBuilderException(
"invalid combination of regex modifier flags: 'l' "
"is mutually exclusive with 'i', 'c', 'I', 'C' or 'd'"
)
self.stack.append(flags)
else:
# regex without flags, so push 'empty' flags on stack
self.stack.append(set())
return super().enterRegexp(ctx)
[docs] def exitRegexp(self, ctx: FCSParser.RegexpContext):
if LOGGER.isEnabledFor(logging.DEBUG):
LOGGER.debug("exitRegexp: stack=%s", self.stack)
return super().exitRegexp(ctx)
# TODO: check, abortable, if also exit?
[docs] def enterWithin_part_simple(self, ctx: FCSParser.Within_part_simpleContext):
if LOGGER.isEnabledFor(logging.DEBUG):
LOGGER.debug(
"enterWithin_part_simple: children=%s / cnt=%s / text=%s",
ctx.children,
ctx.getChildCount(),
ctx.getText(),
)
scope: SimpleWithinScope
val = ctx.getChild(0).getText()
if val in ("sentence", "s"):
scope = SimpleWithinScope.SENTENCE
elif val in ("utterance", "u"):
scope = SimpleWithinScope.UTTERANCE
elif val in ("paragraph", "p"):
scope = SimpleWithinScope.PARAGRAPH
elif val in ("turn", "t"):
scope = SimpleWithinScope.TURN
elif val == "text":
scope = SimpleWithinScope.TEXT
elif val == "session":
scope = SimpleWithinScope.SESSION
else:
raise ExpressionTreeBuilderException(
f"invalid scope for simple 'within' clause: {val}"
)
self.stack.append(SimpleWithin(scope))
return super().enterWithin_part_simple(ctx)
[docs] def exitWithin_part_simple(self, ctx: FCSParser.Within_part_simpleContext):
if LOGGER.isEnabledFor(logging.DEBUG):
LOGGER.debug("exitWithin_part_simple: stack=%s", self.stack)
return super().exitWithin_part_simple(ctx)
# ----------------------------------------------------
[docs] @staticmethod
def processRepetition(ctx: FCSParser.QualifierContext) -> Tuple[int, int]:
tok: antlr4.Token = ctx.getChild(0, antlr4.TerminalNode).symbol
if tok.type == FCSParser.Q_ZERO_OR_MORE: # "*"
return REP_ZERO_OR_MORE
if tok.type == FCSParser.Q_ONE_OR_MORE: # "+"
return REP_ONE_OR_MORE
if tok.type == FCSParser.Q_ZERO_OR_ONE: # "?"
return REP_ZERO_OR_ONE
if tok.type == FCSParser.L_CURLY_BRACKET: # "{x, y}" variants
return ExpressionTreeBuilder.processRepetitionRange(ctx)
raise ExpressionTreeBuilderException(
f"unexpected symbol in repetition quantifier: {tok.text}"
)
[docs] @staticmethod
def processRepetitionRange(ctx: FCSParser.QuantifierContext) -> Tuple[int, int]:
comma_idx = ExpressionTreeBuilder.getChildIndex(ctx, 0, FCSParser.Q_COMMA)
int1_idx = ExpressionTreeBuilder.getChildIndex(ctx, 0, FCSParser.INTEGER)
int2_idx = ExpressionTreeBuilder.getChildIndex(
ctx, int1_idx + 1, FCSParser.INTEGER
)
min = 0
max = OCCURS_UNBOUNDED
if comma_idx != -1:
if int1_idx < comma_idx:
min = ExpressionTreeBuilder.parseInt(ctx.getChild(int1_idx).getText())
if comma_idx < int1_idx:
max = ExpressionTreeBuilder.parseInt(ctx.getChild(int1_idx).getText())
elif comma_idx < int2_idx:
max = ExpressionTreeBuilder.parseInt(ctx.getChild(int2_idx).getText())
else:
if int1_idx == -1:
raise ExpressionTreeBuilderException("int1_idx == -1")
min = max = ExpressionTreeBuilder.parseInt(ctx.getChild(int1_idx).getText())
if max != OCCURS_UNBOUNDED and min > max:
raise ExpressionTreeBuilderException(
f"bad qualifier: min > max ({min} > {max})"
)
return (min, max)
[docs] @staticmethod
def getChildIndex(ctx: ParserRuleContext, start: int, ttype: int) -> int:
if start >= 0 and start < ctx.getChildCount():
for idx in range(start, ctx.getChildCount()):
tree = ctx.getChild(idx)
if isinstance(tree, antlr4.TerminalNode):
if tree.symbol.type == ttype:
return idx
return -1
[docs] @staticmethod
def parseInt(val: str) -> int:
try:
return int(val)
except ValueError as ex:
raise ExpressionTreeBuilderException(f"invalid integer: {val}") from ex
[docs] @staticmethod
def stripQuotes(val: str) -> str:
if val.startswith('"'):
if val.endswith('"'):
val = val[1:-1]
else:
raise ExpressionTreeBuilderException(
"value not properly quoted; invalid closing quote"
)
elif val.startswith("'"):
if val.endswith("'"):
val = val[1:-1]
else:
raise ExpressionTreeBuilderException(
"value not properly quoted; invalid closing quote"
)
else:
raise ExpressionTreeBuilderException(
"value not properly quoted; expected \" (double quote) or ' (single qoute) character"
)
return val
[docs] @staticmethod
def unescapeString(val: str) -> str:
chars = list()
i = 0
while i < len(val):
cp = val[i]
if cp == "\\":
i += 1 # skip slash
cp = val[i]
if cp == "\\": # slash
chars.append("\\")
elif cp == '"': # double quote
chars.append('"')
elif cp == "'": # single quote
chars.append("'")
elif cp == "n": # new line
chars.append("\n")
elif cp == "t": # tabulator
chars.append("\t")
elif cp == ".": # regex: dot
chars.append("\\.")
elif cp == "^": # regex: caret
chars.append("\\^")
elif cp == "$": # regex: dollar
chars.append("\\$")
elif cp == "*": # regex: asterisk
chars.append("\\*")
elif cp == "+": # regex: plus
chars.append("\\+")
elif cp == "?": # regex: question mark
chars.append("\\?")
elif cp == "(": # regex: opening parenthesis
chars.append("\\(")
elif cp == ")": # regex: closing parenthesis
chars.append("\\)")
elif cp == "{": # regex: opening curly brace
chars.append("\\{")
elif cp == "[": # regex: opening square bracket
chars.append("\\[")
elif cp == "|": # regex: vertical bar
chars.append("\\|")
elif cp == "x": # x HEX HEX
chars.append(ExpressionTreeBuilder.unescapeUnicode(val, i, 2))
i += 2
elif cp == "u": # u HEX HEX HEX HEX
chars.append(ExpressionTreeBuilder.unescapeUnicode(val, i, 4))
i += 4
elif cp == "U": # U HEX HEX HEX HEX HEX HEX HEX HEX
# TODO: does this even work in python?
chars.append(ExpressionTreeBuilder.unescapeUnicode(val, i, 8))
i += 8
else:
raise ExpressionTreeBuilderException(
f"invalid escape sequence: \\{cp}"
)
else:
# no error should happen here (Python uses unicode by default)
# so no back-and-forth with codepoint conversions
chars.append(cp)
i += 1
return "".join(chars)
[docs] @staticmethod
def unescapeUnicode(val: str, i: int, size: int) -> str:
# NOTE: or simply: `return chr(int(val[i+1:i+size+1], 16))`
if (len(val) - i - 1) >= size:
cp = 0 # codepoint
for pos in range(size):
i += 1
if pos > 0:
cp <<= 4
cp |= ExpressionTreeBuilder.parseHexChar(val[i])
try:
return chr(cp)
except ValueError:
raise ExpressionTreeBuilderException(f"invalid codepoint: U+{cp:X}")
else:
raise ExpressionTreeBuilderException(
f"truncated escape sequence: \\{val[i]}"
)
[docs] @staticmethod
def parseHexChar(val: str) -> int:
try:
if len(val) != 1:
raise ValueError("length of string should be 1 for a single character")
return int(val, 16)
except ValueError:
# actually, this should never happen, as ANTLR's lexer should
# catch illegal HEX characters
raise ExpressionTreeBuilderException(f"invalud hex character: {val}")
[docs]class QueryParser:
"""A FCS-QL query parser that produces FCS-QL expression trees."""
def __init__(
self,
default_identifier: str = DEFAULT_IDENTIFIER,
default_operator: Operator = DEFAULT_OPERATOR,
unicode_normalization_form: Optional[str] = DEFAULT_UNICODE_NORMALIZATION_FORM,
) -> None:
"""[Constructor]
Args:
default_identifier: the default identifier to be used for simple expressions. Defaults to `DEFAULT_IDENTIFIER`.
default_operator: the default operator. Defaults to `DEFAULT_OPERATOR`.
unicode_normalization_form: the Unicode normalization form to be used or ``None`` to not perform normlization. Defaults to `DEFAULT_UNICODE_NORMALIZATION_FORM`.
""" # noqa: E501
self.default_identifier = default_identifier
self.default_operator = default_operator
self.unicode_normalization_form = unicode_normalization_form
[docs] def parse(self, query: str) -> QueryNode:
"""Parse query.
Args:
query: the raw FCS-QL query
Raises:
QueryParserException: if an error occurred
Returns:
QueryNode: a FCS-QL expression tree
"""
error_listener = ErrorListener(query)
try:
input_stream = InputStream(query)
lexer = FCSLexer(input_stream)
stream = CommonTokenStream(lexer)
parser = FCSParser(stream)
# clear (possible) default error listeners and set our own!
lexer.removeErrorListeners()
parser.removeErrorListeners()
lexer.addErrorListener(error_listener)
parser.addErrorListener(error_listener)
# ExceptionThrowingErrorListener ?
# commence parsing ...
tree: FCSParser.QueryContext = parser.query()
if (
not error_listener.has_errors()
and parser.getNumberOfSyntaxErrors() == 0
):
if LOGGER.isEnabledFor(logging.DEBUG):
LOGGER.debug(
"ANTLR parse tree: %s", tree.toStringTree(FCSParser.ruleNames)
)
# now build the expression tree
builder = ExpressionTreeBuilder(self)
walker = ParseTreeWalker()
walker.walk(builder, tree)
return builder.stack.pop()
else:
if LOGGER.isEnabledFor(logging.DEBUG):
for msg in error_listener.errors:
LOGGER.debug("ERROR: %s", msg)
# FIXME: (include additional error information)
raise QueryParserException(
(error_listener.errors or ["unspecified error"])[0]
)
except ExpressionTreeBuilderException as ex:
raise QueryParserException(str(ex)) from ex
except QueryParserException:
raise
except Exception as ex:
raise QueryParserException(
"an unexpected exception occured while parsing"
) from ex
# ---------------------------------------------------------------------------