Source code for fsa4streams.fsa

"""
Finite State Automata
"""
from __future__ import unicode_literals

from copy import deepcopy
from itertools import chain
import json
import logging
from uuid import uuid4

from matcher import DIRECTORY as matcher_directory
from state import State

LOG = logging.getLogger(__name__)

[docs]class FSA(object): """A Finite State Automaton.""" # FSA constuction def __init__(self, structure, check_structure=True): """ DON'T USE THIS CONSTRUCTOR DIRECTLY. YOU SHOULD RATHER USE ONE OF THE from_* STATIC METHODS. """ self._structure = structure self._tokens = { 'clock': None, 'running': {}, 'pending': {} } if check_structure: self.check_structure(True) @classmethod
[docs] def from_str(cls, json_str): return cls(json.loads(json_str))
@classmethod
[docs] def from_file(cls, json_filelike): return cls(json.load(json_filelike))
@classmethod
[docs] def from_dict(cls, dictobj): return cls(deepcopy(dictobj))
@classmethod
[docs] def make_empty(cls, **kw): "TODO doc: must call :meth:`check_structure` in the end" data = { 'states': {}} data.update(kw) return cls(data, False)
[docs] def add_state(self, stateid, **kw): """TODO doc IMPORTANT: 'target' may not exist yet, so it not checked. Hence, :meth:`check_structure` should be called in the end. """ if stateid in self._structure['states']: raise ValueError("State %r already present in FSA" % stateid) self._structure['states'][stateid] = data = { 'transitions': [] } data.update(kw) return State(self, stateid)
[docs] def check_structure(self, raises=True): problems = [] default_matcher = self._structure.get('default_matcher') if default_matcher is not None \ and default_matcher not in matcher_directory: problems.append("Unsupported default matcher %r" % default_matcher) states = self._structure['states'] if not 'start' in states: problems.append("No start state") terminals = 0 for stateid in states: state = State(self, stateid) terminal = state.terminal transitions = list(state.transitions) deftrans = state.default_transition if terminal: terminals += 1 if not transitions and not deftrans and not terminal: problems.append("Non-terminal state %r has no transition" % stateid) # NB: we do not check if it has only self-targeted transition # which would be equally bad... # TODO Should we? if deftrans: if state.max_noise != 0: problems.append("State %r can not have both a default " "transition and max_noise > 0" % stateid) transitions = chain(transitions, [deftrans]) for transition in transitions: if transition['target'] not in states: problems.append("Transition to non-existing state %r" % transition['target']) matcher = transition.get('matcher') if matcher is not None \ and matcher not in matcher_directory: problems.append("Unsupported matcher %r" % matcher) if terminals == 0: problems.append("No terminal state") # NB: we do not check that the terminal state is reachable from starte state... # TODO Should we? if problems: raise ValueError("\n".join(problems)) return problems
# access to structure def __getitem__(self, stateid): return State(self, stateid) @property def allow_overlap(self): return self._structure.get('allow_overlap', False) @allow_overlap.setter def allow_overlap(self, value): if type(value) is not bool: raise ValueError('FSA.allow_overlap must be a bool') self._structure['allow_overlap'] = value @allow_overlap.deleter def allow_overlap(self): del self._structure['allow_overlap'] @property def default_matcher(self): return self._structure.get('default_matcher') @default_matcher.setter def default_matcher(self, value): if value not in matcher_directory: raise ValueError('FSA.default_matcher must be in matcher.DIRECTORY') self._structure['default_matcher'] = value @default_matcher.deleter def default_matcher(self): del self._structure['default_matcher']
[docs] def export_structure_as_dict(self): return deepcopy(self._structure)
[docs] def export_structure_as_string(self, *args, **kw): return json.dumps(self._structure, *args, **kw)
[docs] def export_strutcure_to_file(self, fp, *args, **kw): return json.dump(self._structure, fp, *args, **kw)
# tokens management
[docs] def reset(self): self._tokens['running'].clear() self._tokens['pending'].clear() self._tokens['clock'] = None
[docs] def is_busy(self): return len(self._tokens['running']) > 0 \ or len(self._tokens['pending']) > 0
[docs] def load_tokens_from_str(self, json_str, force=False): if self.is_busy() and not force: raise ValueError('Can not load a tokens on a busy FSA') self._tokens = json.loads(json_str) self._check_tokens()
[docs] def load_tokens_from_file(self, json_filelike, force=False): if self.is_busy() and not force: raise ValueError('Can not load a tokens on a busy FSA') self._tokens = json.load(json_filelike) self._check_tokens()
[docs] def load_tokens_from_dict(self, dictobj, force=False): if self.is_busy() and not force: raise ValueError('Can not load a tokens on a busy FSA') self._tokens = deepcopy(dictobj) self._check_tokens()
def _check_tokens(self): states = self._structure['states'] all_tokens = chain(self._tokens['running'].iteritems(), self._tokens['pending'].iteritems()) for tokenid, token in all_tokens: if token['state'] not in states: raise ValueError("Inconsistent position " "(non-existing state %r)" % token['state'])
[docs] def export_tokens_as_dict(self): return deepcopy(self._tokens)
[docs] def export_tokens_as_string(self, *args, **kw): return json.dumps(self._tokens, *args, **kw)
[docs] def export_tokens_to_file(self, fp, *args, **kw): return json.dump(self._tokens, fp, *args, **kw)
# running the FSA
[docs] def match(self, transition, event, token): transition_type = transition.get('matcher') or self.default_matcher if transition_type is None: return transition['condition'] == event else: return matcher_directory[transition_type](transition, event, token, self)
def _delete_token(self, tokenid, token, token_state, running, pending, matches, allow_match=True): del running[tokenid] is_match = token_state.terminal if is_match and allow_match: LOG.debug(' matched') matches.append(token) else: LOG.debug(' was dropped') otherid = token.get('inhibits') if otherid is None: return other = pending.get(otherid) if other is None: # it has already been deleted return LOG.debug(' it was inhibiting a pending match in %r', other['state']) must_delete = is_match and ( other['state'] == token['state'] or other['state'] in token ['history_states'] ) if must_delete: del pending[otherid] else: if not [ t for t in running.itervalues() if t.get('inhibits') == otherid ]: # noone else was inhibiting 'other', # so 'other' is not inhibited anymore del pending[otherid] matches.append(other) LOG.debug(' making it an actual match')
[docs] def feed(self, event, timestamp=None): """ I ingest `event`, and I return a list of matching tokens. A matching token is a token that has reached a final state, and can not further progress in the FSA. If provided, ``timestamp`` must be an integer greater or equal than all previous timestamps; the default value is the previous timestamp + 1. Timestamps are used to check ``max_duration`` constraints. """ clock = self._tokens['clock'] running = self._tokens['running'] pending = self._tokens['pending'] skip_create = False matches = [] if timestamp is None: if clock is None: timestamp = 0 else: timestamp = clock+1 else: assert clock is None or timestamp >= clock LOG.debug('event %r at timestamp %s', event, timestamp) # Make each running token take the event into account # - some of them will walk through a transition, # - some of them will stay in place (if their noise counter allows it), # - the others will be removed. # A token removed from a final state is a match, # else it is plainly discarded. for tokenid, token in running.items(): oldstateid = token['state'] LOG.debug(' token in %r', oldstateid) oldstate = self[oldstateid] # delete token if a max_duration has expired if oldstate.max_total_duration \ and timestamp-token['created'] > oldstate.max_total_duration \ or oldstate.max_duration \ and timestamp-token['updated'] > oldstate.max_duration: LOG.debug(' was dropped because it exceeded max_duration') self._delete_token(tokenid, token, oldstate, running, pending, matches) continue possible_transitions = [ t for t in oldstate.transitions if self.match(t, event, token) ] if not possible_transitions: deftrans = oldstate.default_transition if deftrans: possible_transitions = [ deftrans ] # deleting token (it may or may not match) if not possible_transitions: LOG.debug(' added noise') token['noise_state'] += 1 token['noise_total'] += 1 if token['noise_state'] > oldstate.max_noise \ or oldstate.max_total_noise is not None \ and token['noise_total'] > oldstate.max_total_noise: self._delete_token(tokenid, token, oldstate, running, pending, matches) continue if oldstateid == 'start': # an old token is on start, # so any new token starting there would be redundant skip_create = True # this token *might* be a match # if no further transition leads to a match; if oldstate.terminal: LOG.debug(' keeping a pending match') otherid = token.pop('inhibits', None) if otherid: # pending token is overriden by this new match previous = pending.pop(otherid, None) # NB: inhibited token may have been deleted already if previous is not None: LOG.debug(' and dropping previous pending match in %r', previous['state']) # create new pending token newid = uuid4().hex pending_token = deepcopy(token) pending[newid] = pending_token token['inhibits'] = newid # pushing token through first transition newstateid = possible_transitions[0]['target'] LOG.debug(' moved to %r', newstateid) token['state'] = newstateid token['noise_state'] = 0 token['updated'] = timestamp if not possible_transitions[0].get('silent'): token['history_events'].append(event) token['history_states'].append(oldstateid) else: LOG.debug(' (silently)') newstate = self[newstateid] if newstate.max_total_noise is not None \ and token['noise_total'] > newstate.max_total_noise: LOG.debug(' and was dropped (max_total_noise exceeded)') self._delete_token(tokenid, token, newstate, running, pending, matches, False) if newstate.max_total_duration is not None \ and timestamp-token['created'] > newstate.max_total_duration: LOG.debug(' and was dropped (max_total_duration exceeded)') self._delete_token(tokenid, token, newstate, running, pending, matches, False) # cloning token through other transitions (non-deterministic FSA) for transition in possible_transitions[1:]: LOG.debug(' also moved to %r', transition['target']) newtoken = deepcopy(token) newtoken['state'] = transition['target'] newid = uuid4().hex running[newid] = newtoken # Create a new token for each transition of the 'start' state # that is satisfied by the current event # (unless an existing token just left the start state). if not skip_create: starting_transitions = [ t for t in self['start'].transitions if self.match(t, event, None) ] if starting_transitions: forbidden = set( t['state'] for t in running.itervalues() if t['noise_state'] == 0 ) for transition in starting_transitions: if transition['target'] in forbidden: continue LOG.debug(' new token in %r', transition['target']) newtoken = { "state": transition['target'], "created": timestamp, "updated": timestamp, "noise_state": 0, "noise_total": 0, "history_events": [ event ], "history_states": [] } newid = uuid4().hex running[newid] = newtoken # Immediately handle tokens on a final state with no transition # (no need to wait for the next event to do that...) # This may not result to an immediate match if another running token # has the exact same history... for tokenid, token in running.items(): token_state = self[token['state']] if token_state.terminal and len(token_state.transitions) == 0: LOG.debug(' token now in %r (final)', token['state']) inhibited = False for otherid, other in running.items(): if other is token: continue if other['history_events'] == token['history_events']: LOG.debug(' is kept pending (inhibited by token in %r)', other['state']) inhibitedid = other.get('inhibits') if inhibitedid is not None: dropped = pending.pop(inhibitedid, None) if dropped: if dropped['state'] == token['state'] \ or dropped['state'] in token['history_states']: LOG.debug(' dropping older pending token in %r', dropped['state']) else: matches.append(dropped) LOG.debug(' freeing older pending token in %r to match', dropped['state']) other['inhibits'] = tokenid inhibited = True if inhibited: pending[tokenid] = token del running[tokenid] else: self._delete_token(tokenid, token, token_state, running, pending, matches) self._tokens['clock'] = timestamp if matches and not self.allow_overlap: # drop all remaining tokens overlapping with matches, max_updated = max (match['updated'] for match in matches ) for d in (running, pending): for tokenid, token in d.items(): if token['created'] <= max_updated: del d[tokenid] LOG.debug(' dropping token %r to prevent overlap', tokenid) # drop all matches that are overlapped by another match if len(matches) > 1: min_created = min( match['created'] for match in matches) matches = [ match for match in matches if match['created'] == min_created ] if len(matches) > 1: matches = matches[:1] LOG.debug("to prevent overlap, only 1 match kept") return matches
[docs] def feed_all(self, iterable, finish=True): ret = [] for event in iterable: ret += self.feed(event) if finish: ret += self.finish() self.reset() return ret
[docs] def feed_all_timestamps(self, iterable, finish=True): ret = [] for event, timestamp in iterable: ret += self.feed(event, timestamp) if finish: ret += self.finish() self.reset() return ret
[docs] def finish(self): LOG.debug('finishing') running = self._tokens['running'] pending = self._tokens['pending'] matches = [] for tokenid, token in running.items(): LOG.debug(' token in %r', token['state']) token_state = self[token['state']] self._delete_token(tokenid, token, token_state, running, pending, matches) if matches and not self.allow_overlap: min_created = min( match['created'] for match in matches) matches = [ match for match in matches if match['created'] == min_created ] self.reset() LOG.debug(' returns %s match(es)', len(matches)) return matches