petri_net_nn.streaming¶
streaming ¶
Streaming anomaly evaluation over a live event source.
The offline :func:petri_net_nn.traces.anomaly_score expects a
finished trace — every event known up front. Production
deployments usually don't have that shape. Cases land event by
event from a Kafka topic, a webhook, a file-tail of a workflow
engine's audit log. This module lets PETRA consume those streams
case-aware and emit anomaly scores in real time, without ever
seeing a "complete" trace.
The runtime model is small and case-aware:
- Per-case state is held in a dict: each open case carries the
list of events seen so far plus a merged attribute dict
(latest-event-wins on key conflicts). New cases are created
implicitly on the first event for an unseen
case_id. - Scoring is on-demand. Two policies are supported:
- on close (the default) — call :meth:
close_casewhen the case is known to be finished; the evaluator scores once, returns the result, and frees the case's state. - on every event — set
score_on_every_event=Trueand every :meth:on_eventreturns a fresh score against the partial trace as currently buffered. Useful for live dashboards; costlier because each event triggers a full forward pass.
- on close (the default) — call :meth:
- Each scoring call delegates to the existing offline
:func:
anomaly_score, treating the partial event list as a :class:XESTrace. The semantics line up exactly with how PETRA already scores offline anomalies, so streaming and batch share the same correctness story.
Two API shapes are offered for symmetry with how streams get plugged in:
- Push — :meth:
on_eventis called from whatever source the user wires up. They handle threading, back-pressure, and error recovery; the evaluator just maintains state. - Pull — :meth:
process_streamconsumes an iterator and yields evaluations as they're produced. Useful for testing and for batch backfill of a stored log.
Threading: instances are not thread-safe. A real deployment that reads from a multi-threaded source should serialise calls through a single consumer goroutine / queue / lock.
StreamingEvent
dataclass
¶
One event arriving on the stream.
Attributes¶
case_id
Identifier the events get grouped by. Different cases'
events are scored independently and held in separate
state buckets inside the evaluator.
name
The activity / transition label, matched against the
net's :attr:PetriNet.transition_labels map.
timestamp
Optional epoch-seconds timestamp. Carried through but
not interpreted by the scoring code — it's there so
downstream consumers can attach it to the emitted
evaluation. None is acceptable for sources that
don't carry a timestamp.
attributes
Per-event attributes (channel, amount, sensor reading,
whatever the source carries). Merged into the case's
attribute dict on arrival: latest-event-wins when keys
collide with earlier events for the same case.
StreamingEvaluation
dataclass
¶
One scoring outcome emitted by the evaluator.
Attributes¶
case_id
Which case this score is about.
n_events
How many events have been observed for this case so far,
including the one that triggered this evaluation (if
applicable).
trace_score
Trace-level scalar anomaly score — the sum of all
per-transition residuals. Higher = more anomalous. Same
semantics as :func:petri_net_nn.traces.trace_anomaly_score.
per_transition_residuals
Per-transition |predicted − observed| residuals. Pinned
to transition IDs; consumers join against
:attr:PetriNet.transition_labels for human-readable
names.
closed
True if this evaluation was produced by
:meth:close_case (case is done, state freed),
False if it was produced by an in-flight
:meth:on_event or :meth:score_case call.
StreamingEvaluator ¶
StreamingEvaluator(module, *, attribute_to_marking, attribute_to_values=None, transitions=None, score_on_every_event=False)
Maintains per-case state and scores cases against a trained module as events arrive.
Parameters¶
module
The trained :class:PetriNetModule to score against.
attribute_to_marking
Same role as in :func:anomaly_score — maps a
:class:XESTrace (assembled here from the per-case
events + merged attributes) to the place-activation
marking the forward pass expects.
attribute_to_values
Optional coloured-Petri-net value channel; passed
through to the underlying offline scorer when supplied.
transitions
Optional explicit list of transition IDs to score
against. Defaults match the offline anomaly_score:
every transition whose label doesn't contain ->
(i.e. skip auto-generated gateway labels).
score_on_every_event
When True, every :meth:on_event call returns a
:class:StreamingEvaluation against the partial trace.
When False (default), :meth:on_event returns
None and the case is only scored on demand via
:meth:score_case or :meth:close_case.
Source code in petri_net_nn/streaming.py
on_event ¶
Receive one event. Creates the case if necessary, appends the event to that case's state, merges the event's attributes into the case attribute dict.
Returns a :class:StreamingEvaluation iff
score_on_every_event was set at construction; else
returns None. The case stays open either way.
Source code in petri_net_nn/streaming.py
score_case ¶
Compute and return the score for the case as currently
seen. Does not close the case — state stays in memory and
further :meth:on_event calls can add to it.
Returns None if no events have been seen for
case_id.
Source code in petri_net_nn/streaming.py
close_case ¶
Score the case one last time, mark it closed, free its
state. Returns the final score, or None if no events
had been seen for case_id.
After this call the case_id is forgotten — the next
:meth:on_event for that case_id (should one arrive)
starts a fresh state bucket.
Source code in petri_net_nn/streaming.py
process_stream ¶
Consume an iterator of events; yield each evaluation
produced by :meth:on_event.
Useful for testing and for batch backfill of a stored
log: feed events in the order they happened and observe
the same evaluations a live deployment would emit. Does
NOT call :meth:close_case at end-of-stream — the user
decides when a case is done.
Source code in petri_net_nn/streaming.py
open_cases ¶
Case IDs currently held in state, in deterministic sorted order so callers iterating for diagnostics get a stable view.