Module pygmlparser.Parser

Expand source code
from typing import cast
from typing import List
from typing import NewType
from typing import Tuple
from typing import Union

from logging import Logger
from logging import getLogger

from pygmlparser.Edge import Edge
from pygmlparser.Graph import Graph
from pygmlparser.Node import Node

from pygmlparser.graphics.Point import Point
from pygmlparser.graphics.EdgeGraphics import EdgeGraphics
from pygmlparser.graphics.NodeGraphics import NodeGraphics

from pygmlparser.exceptions.GMLParseException import GMLParseException


class Parser:

    AttrObjectType = NewType('AttrObjectType', Union[Node, Edge, NodeGraphics, EdgeGraphics, Point])
    LineType       = NewType('LineType', Tuple[Point, ...])

    GRAPH_TOKEN: str = 'graph'

    ID_TOKEN:   str = 'id'
    NODE_TOKEN: str = 'node'
    EDGE_TOKEN: str = 'edge'

    SOURCE_ID_TOKEN: str = 'source'
    TARGET_ID_TOKEN: str = 'target'

    GRAPHICS_TOKEN: str = 'graphics'
    START_TOKEN:    str = '['
    END_TOKEN:      str = ']'

    QUOTE_TOKEN: str = '"'

    LINE_DEFINITION_TOKEN:  str = 'Line'
    POINT_DEFINITION_TOKEN: str = 'point'

    def __init__(self):
        self.logger: Logger    = getLogger(__name__)
        self._raw:   List[str] = []
        """
        raw GML data (raw string split on whitespace)
        """
        self._i: int = 0
        """
        position (index) in self._raw
        """

        self.graph: Graph = cast(Graph, None)

    def loadGML(self, path: str):
        """
        First method to call after instantiating a Graph object

        Args:
            path: The fully qualified path to the .gml file

        """
        with open(path) as infile:
            # NOTE: the split will destroy any spaces in string attributes
            self._raw = infile.read().strip().split()

        self._i    = 0
        self.graph = Graph()

    def parse(self):
        """
        The second method to call after the parser loads the .gml file.  After this
        method completes extract the graph from `org.hasii.pygmlparser.Graph`

        """
        if len(self._raw) == 0:
            raise GMLParseException('Mot loaded you must call load_gml before parse')

        self._parseGraph()

    def _currentToken(self) -> str:
        if self._i >= len(self._raw):
            raise GMLParseException(f'[pos {self._i}] unexpected end of file')

        return self._raw[self._i]

    def _increment(self):
        self._i += 1

    def _parseGraph(self):
        self._parseOpenWithKeyword(Parser.GRAPH_TOKEN)

        while self._currentToken() != Parser.END_TOKEN:
            currentToken = self._currentToken()
            self.logger.debug(f'currentToken: {currentToken}')
            if currentToken == Parser.NODE_TOKEN:
                self._parseNode()
            elif currentToken == Parser.EDGE_TOKEN:
                self._parseEdge()
            else:
                self._parseAttribute(self.graph)
        self._increment()

    def _parseNode(self):
        self._parseOpenWithKeyword(Parser.NODE_TOKEN)

        node = Node()

        while self._currentToken() != Parser.END_TOKEN:
            try:
                current: str = self._currentToken()
                if current == Parser.GRAPHICS_TOKEN:
                    self._parseNodeGraphics(node)
                else:
                    self._parseAttribute(node)
            except GMLParseException:
                self.logger.error(f'current: {self._currentToken()}')
                continue
        self.logger.debug(f'Current index: {self._i}')
        self._increment()

        node.validate(rawIdx=self._i)
        nid = node.id
        self.graph.validate(rawIdx=self._i, nodeId=nid)

        self.logger.info(f'Parsed Node: {node}')
        self.graph.graphNodes[nid] = node

    def _parseEdge(self):
        self._parseOpenWithKeyword(Parser.EDGE_TOKEN)

        edge: Edge = Edge()

        while self._currentToken() != Parser.END_TOKEN:
            current: str = self._currentToken()
            if current == Parser.GRAPHICS_TOKEN:
                self._parseEdgeGraphics(edge)
            else:
                self._parseAttribute(edge)
        self._increment()
        edge.validate(rawIdx=self._i)

        for nid in (edge.source, edge.target):
            if nid not in self.graph.graphNodes:
                node: Node = Node()
                node.is_anon = True
                node.id = nid
                self.graph.graphNodes[nid] = node

        edge.source_node = self.graph.graphNodes[edge.source]
        edge.target_node = self.graph.graphNodes[edge.target]

        edge.source_node.forward_edges.append(edge)
        edge.target_node.backward_edges.append(edge)

        self.logger.info(f'Parsed Edge: {edge}')
        self.graph.graphEdges.append(edge)

    def _parseNodeGraphics(self, node: Node):

        self._parseOpenWithKeyword(Parser.GRAPHICS_TOKEN)
        graphics: NodeGraphics = NodeGraphics()

        while self._currentToken() != Parser.END_TOKEN:
            self._parseAttribute(graphics)

        self._increment()
        self.logger.debug(f'Current index: {self._i}')
        node.graphics = graphics

    def _parseEdgeGraphics(self, edge: Edge) -> Edge:

        self._parseOpenWithKeyword(Parser.GRAPHICS_TOKEN)
        graphics: EdgeGraphics = EdgeGraphics()
        while self._currentToken() != Parser.END_TOKEN:

            current: str = self._currentToken()
            if current == Parser.LINE_DEFINITION_TOKEN:
                graphics = self._parseLineDefinition(graphics)
            else:
                graphics = self._parseAttribute(graphics)

        self._increment()
        edge.graphics = graphics

        return edge

    def _parseLineDefinition(self, graphics: EdgeGraphics) -> EdgeGraphics:

        self._parseOpenWithKeyword(Parser.LINE_DEFINITION_TOKEN)

        #
        # We'll use a List because of the vagaries of Python data classes
        #
        lineList: List[Point] = []
        while self._currentToken() != Parser.END_TOKEN:
            current: str = self._currentToken()
            self.logger.debug(f'current: {current}')

            lineList = self._parsePointDefinition(lineList)

        # But the data classes save a line definition as a Tuple of points
        graphics.line = tuple(lineList)

        self._increment()
        return graphics

    def _parsePointDefinition(self, lineList: List[Point]) -> List[Point]:

        self._parseOpenWithKeyword(Parser.POINT_DEFINITION_TOKEN)

        point: Point = Point()
        while self._currentToken() != Parser.END_TOKEN:
            self.logger.debug(f'point current: {self._currentToken()}')
            point = self._parseAttribute(point)

        lineList.append(point)

        self._increment()
        return lineList

    def _parseAttribute(self, obj: AttrObjectType) -> AttrObjectType:
        """

        Args:
            obj: The object we update attributes on


        Returns:
            The update object
        """
        name = self._currentToken()
        if not name.isalnum():
            raise GMLParseException(f'[pos {self._i}] attribute name is not alphanumeric: {name}')
        self._increment()

        val = self._currentToken()
        try:
            # try to parse val as int
            val = int(val, 10)
            self._increment()
            setattr(obj, name, val)
        except ValueError:
            # Try float
            try:
                val = float(val)
                self._increment()
                setattr(obj, name, val)
            except ValueError:
                # otherwise try to parse val as string
                if not val.startswith(f'{Parser.QUOTE_TOKEN}'):
                    raise GMLParseException(f'[pos {self._i}] attribute name is not alphanumeric: {name}')

                val_l = []
                while not self._currentToken().endswith(f'{Parser.QUOTE_TOKEN}'):
                    val_l.append(self._currentToken())
                    self._increment()
                val_l.append(self._currentToken())  # capture closing one

                self._increment()

                val = ' '.join(val_l)  # unify
                val = val.strip(f'{Parser.QUOTE_TOKEN}')
                setattr(obj, name, val)

        return obj

    def _parseOpenWithKeyword(self, kw: str):
        if self._currentToken() != kw:
            raise GMLParseException(f'[pos {self._i}] expected `{kw}` keyword, found: {self._currentToken()}')
        self._increment()

        if self._currentToken() != Parser.START_TOKEN:
            raise GMLParseException(f'[pos {self._i}] expected opening `[`, found: {self._currentToken()}')
        self._increment()

Classes

class Parser
Expand source code
class Parser:

    AttrObjectType = NewType('AttrObjectType', Union[Node, Edge, NodeGraphics, EdgeGraphics, Point])
    LineType       = NewType('LineType', Tuple[Point, ...])

    GRAPH_TOKEN: str = 'graph'

    ID_TOKEN:   str = 'id'
    NODE_TOKEN: str = 'node'
    EDGE_TOKEN: str = 'edge'

    SOURCE_ID_TOKEN: str = 'source'
    TARGET_ID_TOKEN: str = 'target'

    GRAPHICS_TOKEN: str = 'graphics'
    START_TOKEN:    str = '['
    END_TOKEN:      str = ']'

    QUOTE_TOKEN: str = '"'

    LINE_DEFINITION_TOKEN:  str = 'Line'
    POINT_DEFINITION_TOKEN: str = 'point'

    def __init__(self):
        self.logger: Logger    = getLogger(__name__)
        self._raw:   List[str] = []
        """
        raw GML data (raw string split on whitespace)
        """
        self._i: int = 0
        """
        position (index) in self._raw
        """

        self.graph: Graph = cast(Graph, None)

    def loadGML(self, path: str):
        """
        First method to call after instantiating a Graph object

        Args:
            path: The fully qualified path to the .gml file

        """
        with open(path) as infile:
            # NOTE: the split will destroy any spaces in string attributes
            self._raw = infile.read().strip().split()

        self._i    = 0
        self.graph = Graph()

    def parse(self):
        """
        The second method to call after the parser loads the .gml file.  After this
        method completes extract the graph from `org.hasii.pygmlparser.Graph`

        """
        if len(self._raw) == 0:
            raise GMLParseException('Mot loaded you must call load_gml before parse')

        self._parseGraph()

    def _currentToken(self) -> str:
        if self._i >= len(self._raw):
            raise GMLParseException(f'[pos {self._i}] unexpected end of file')

        return self._raw[self._i]

    def _increment(self):
        self._i += 1

    def _parseGraph(self):
        self._parseOpenWithKeyword(Parser.GRAPH_TOKEN)

        while self._currentToken() != Parser.END_TOKEN:
            currentToken = self._currentToken()
            self.logger.debug(f'currentToken: {currentToken}')
            if currentToken == Parser.NODE_TOKEN:
                self._parseNode()
            elif currentToken == Parser.EDGE_TOKEN:
                self._parseEdge()
            else:
                self._parseAttribute(self.graph)
        self._increment()

    def _parseNode(self):
        self._parseOpenWithKeyword(Parser.NODE_TOKEN)

        node = Node()

        while self._currentToken() != Parser.END_TOKEN:
            try:
                current: str = self._currentToken()
                if current == Parser.GRAPHICS_TOKEN:
                    self._parseNodeGraphics(node)
                else:
                    self._parseAttribute(node)
            except GMLParseException:
                self.logger.error(f'current: {self._currentToken()}')
                continue
        self.logger.debug(f'Current index: {self._i}')
        self._increment()

        node.validate(rawIdx=self._i)
        nid = node.id
        self.graph.validate(rawIdx=self._i, nodeId=nid)

        self.logger.info(f'Parsed Node: {node}')
        self.graph.graphNodes[nid] = node

    def _parseEdge(self):
        self._parseOpenWithKeyword(Parser.EDGE_TOKEN)

        edge: Edge = Edge()

        while self._currentToken() != Parser.END_TOKEN:
            current: str = self._currentToken()
            if current == Parser.GRAPHICS_TOKEN:
                self._parseEdgeGraphics(edge)
            else:
                self._parseAttribute(edge)
        self._increment()
        edge.validate(rawIdx=self._i)

        for nid in (edge.source, edge.target):
            if nid not in self.graph.graphNodes:
                node: Node = Node()
                node.is_anon = True
                node.id = nid
                self.graph.graphNodes[nid] = node

        edge.source_node = self.graph.graphNodes[edge.source]
        edge.target_node = self.graph.graphNodes[edge.target]

        edge.source_node.forward_edges.append(edge)
        edge.target_node.backward_edges.append(edge)

        self.logger.info(f'Parsed Edge: {edge}')
        self.graph.graphEdges.append(edge)

    def _parseNodeGraphics(self, node: Node):

        self._parseOpenWithKeyword(Parser.GRAPHICS_TOKEN)
        graphics: NodeGraphics = NodeGraphics()

        while self._currentToken() != Parser.END_TOKEN:
            self._parseAttribute(graphics)

        self._increment()
        self.logger.debug(f'Current index: {self._i}')
        node.graphics = graphics

    def _parseEdgeGraphics(self, edge: Edge) -> Edge:

        self._parseOpenWithKeyword(Parser.GRAPHICS_TOKEN)
        graphics: EdgeGraphics = EdgeGraphics()
        while self._currentToken() != Parser.END_TOKEN:

            current: str = self._currentToken()
            if current == Parser.LINE_DEFINITION_TOKEN:
                graphics = self._parseLineDefinition(graphics)
            else:
                graphics = self._parseAttribute(graphics)

        self._increment()
        edge.graphics = graphics

        return edge

    def _parseLineDefinition(self, graphics: EdgeGraphics) -> EdgeGraphics:

        self._parseOpenWithKeyword(Parser.LINE_DEFINITION_TOKEN)

        #
        # We'll use a List because of the vagaries of Python data classes
        #
        lineList: List[Point] = []
        while self._currentToken() != Parser.END_TOKEN:
            current: str = self._currentToken()
            self.logger.debug(f'current: {current}')

            lineList = self._parsePointDefinition(lineList)

        # But the data classes save a line definition as a Tuple of points
        graphics.line = tuple(lineList)

        self._increment()
        return graphics

    def _parsePointDefinition(self, lineList: List[Point]) -> List[Point]:

        self._parseOpenWithKeyword(Parser.POINT_DEFINITION_TOKEN)

        point: Point = Point()
        while self._currentToken() != Parser.END_TOKEN:
            self.logger.debug(f'point current: {self._currentToken()}')
            point = self._parseAttribute(point)

        lineList.append(point)

        self._increment()
        return lineList

    def _parseAttribute(self, obj: AttrObjectType) -> AttrObjectType:
        """

        Args:
            obj: The object we update attributes on


        Returns:
            The update object
        """
        name = self._currentToken()
        if not name.isalnum():
            raise GMLParseException(f'[pos {self._i}] attribute name is not alphanumeric: {name}')
        self._increment()

        val = self._currentToken()
        try:
            # try to parse val as int
            val = int(val, 10)
            self._increment()
            setattr(obj, name, val)
        except ValueError:
            # Try float
            try:
                val = float(val)
                self._increment()
                setattr(obj, name, val)
            except ValueError:
                # otherwise try to parse val as string
                if not val.startswith(f'{Parser.QUOTE_TOKEN}'):
                    raise GMLParseException(f'[pos {self._i}] attribute name is not alphanumeric: {name}')

                val_l = []
                while not self._currentToken().endswith(f'{Parser.QUOTE_TOKEN}'):
                    val_l.append(self._currentToken())
                    self._increment()
                val_l.append(self._currentToken())  # capture closing one

                self._increment()

                val = ' '.join(val_l)  # unify
                val = val.strip(f'{Parser.QUOTE_TOKEN}')
                setattr(obj, name, val)

        return obj

    def _parseOpenWithKeyword(self, kw: str):
        if self._currentToken() != kw:
            raise GMLParseException(f'[pos {self._i}] expected `{kw}` keyword, found: {self._currentToken()}')
        self._increment()

        if self._currentToken() != Parser.START_TOKEN:
            raise GMLParseException(f'[pos {self._i}] expected opening `[`, found: {self._currentToken()}')
        self._increment()

Class variables

var EDGE_TOKEN : str
var END_TOKEN : str
var GRAPHICS_TOKEN : str
var GRAPH_TOKEN : str
var ID_TOKEN : str
var LINE_DEFINITION_TOKEN : str
var NODE_TOKEN : str
var POINT_DEFINITION_TOKEN : str
var QUOTE_TOKEN : str
var SOURCE_ID_TOKEN : str
var START_TOKEN : str
var TARGET_ID_TOKEN : str

Methods

def AttrObjectType(x)
Expand source code
def new_type(x):
    return x
def LineType(x)
Expand source code
def new_type(x):
    return x
def loadGML(self, path: str)

First method to call after instantiating a Graph object

Args

path
The fully qualified path to the .gml file
Expand source code
def loadGML(self, path: str):
    """
    First method to call after instantiating a Graph object

    Args:
        path: The fully qualified path to the .gml file

    """
    with open(path) as infile:
        # NOTE: the split will destroy any spaces in string attributes
        self._raw = infile.read().strip().split()

    self._i    = 0
    self.graph = Graph()
def parse(self)

The second method to call after the parser loads the .gml file. After this method completes extract the graph from org.hasii.pygmlparser.Graph

Expand source code
def parse(self):
    """
    The second method to call after the parser loads the .gml file.  After this
    method completes extract the graph from `org.hasii.pygmlparser.Graph`

    """
    if len(self._raw) == 0:
        raise GMLParseException('Mot loaded you must call load_gml before parse')

    self._parseGraph()