|
| 1 | +"""Complex example using Node and Edge class instances. |
| 2 | +
|
| 3 | +Demonstrates: |
| 4 | +- ``Node`` / ``Edge`` subclass instances in ``ReactFlow`` |
| 5 | +- Per-instance ``__panel__`` node views |
| 6 | +- Per-instance custom editors via ``editor(...)`` |
| 7 | +- Node/edge event hooks (``on_data_change``, ``on_selection_changed``) |
| 8 | +- Programmatic updates with ``patch_node_data`` / ``patch_edge_data`` |
| 9 | +""" |
| 10 | + |
| 11 | +import random |
| 12 | + |
| 13 | +import panel as pn |
| 14 | +import panel_material_ui as pmui |
| 15 | +import param |
| 16 | + |
| 17 | +from panel_reactflow import Edge, Node, ReactFlow |
| 18 | + |
| 19 | +pn.extension() |
| 20 | + |
| 21 | + |
| 22 | +class PipelineNode(Node): |
| 23 | + status = param.Selector(default="idle", objects=["idle", "running", "done", "failed"], precedence=0) |
| 24 | + retries = param.Integer(default=0, bounds=(0, None), precedence=0) |
| 25 | + owner = param.String(default="ops", precedence=0) |
| 26 | + notes = param.String(default="", precedence=0) |
| 27 | + |
| 28 | + def __init__(self, **params): |
| 29 | + params.setdefault("type", "pipeline") |
| 30 | + super().__init__(**params) |
| 31 | + self._summary = pn.pane.Markdown(margin=(0, 0, 6, 0)) |
| 32 | + self._activity = pn.pane.Markdown("", styles={"font-size": "12px", "opacity": "0.8"}) |
| 33 | + self.param.watch(self._refresh_view, ["status", "owner", "retries", "label"]) |
| 34 | + self._refresh_view() |
| 35 | + |
| 36 | + def _refresh_view(self, *_): |
| 37 | + self._summary.object = ( |
| 38 | + f"**{self.label}** \n" |
| 39 | + f"Status: `{self.status}` \n" |
| 40 | + f"Owner: `{self.owner}` \n" |
| 41 | + f"Retries: `{self.retries}`" |
| 42 | + ) |
| 43 | + |
| 44 | + def __panel__(self): |
| 45 | + return pn.Column(self._summary, self._activity, margin=0, sizing_mode="stretch_width") |
| 46 | + |
| 47 | + def editor(self, data, schema, *, id, type, on_patch): |
| 48 | + status = pmui.Select.from_param(self.param.status, name="Status") |
| 49 | + retries = pmui.IntInput.from_param(self.param.retries, name="Retries") |
| 50 | + owner = pmui.TextInput.from_param(self.param.owner, name="Owner") |
| 51 | + notes = pmui.TextAreaInput.from_param(self.param.notes, name="Notes", height=80) |
| 52 | + return pn.Column(status, retries, owner, notes, sizing_mode="stretch_width") |
| 53 | + |
| 54 | + def on_data_change(self, payload, flow): |
| 55 | + if payload.get("node_id") == self.id: |
| 56 | + self._activity.object = f"Last patch: `{payload.get('patch', {})}`" |
| 57 | + |
| 58 | + def on_selection_changed(self, payload, flow): |
| 59 | + selected = self.id in (payload.get("nodes") or []) |
| 60 | + if selected: |
| 61 | + self._activity.object = "Selected in canvas" |
| 62 | + |
| 63 | + |
| 64 | +class WeightedEdge(Edge): |
| 65 | + weight = param.Number(default=0.5, bounds=(0, 1), precedence=0) |
| 66 | + channel = param.Selector(default="main", objects=["main", "backup", "shadow"], precedence=0) |
| 67 | + enabled = param.Boolean(default=True, precedence=0) |
| 68 | + |
| 69 | + def __init__(self, **params): |
| 70 | + params.setdefault("type", "weighted") |
| 71 | + super().__init__(**params) |
| 72 | + |
| 73 | + def editor(self, data, schema, *, id, type, on_patch): |
| 74 | + weight = pmui.FloatSlider.from_param(self.param.weight, name="Weight", step=0.01) |
| 75 | + channel = pmui.Select.from_param(self.param.channel, name="Channel") |
| 76 | + enabled = pmui.Checkbox.from_param(self.param.enabled, name="Enabled") |
| 77 | + return pn.Column(weight, channel, enabled, sizing_mode="stretch_width") |
| 78 | + |
| 79 | + |
| 80 | +nodes = [ |
| 81 | + PipelineNode(id="extract", label="Extract", position={"x": 0, "y": 40}), |
| 82 | + PipelineNode(id="transform", label="Transform", position={"x": 300, "y": 160}, status="running", retries=1, owner="ml", notes="Batch window"), |
| 83 | + PipelineNode(id="load", label="Load", position={"x": 600, "y": 40}, owner="platform"), |
| 84 | +] |
| 85 | + |
| 86 | +edges = [ |
| 87 | + WeightedEdge(id="e1", source="extract", target="transform", weight=0.72), |
| 88 | + WeightedEdge(id="e2", source="transform", target="load", weight=0.63, channel="backup"), |
| 89 | +] |
| 90 | + |
| 91 | +event_log = pmui.TextAreaInput(name="Events", value="", disabled=True, height=180, sizing_mode="stretch_width") |
| 92 | +last_event = pn.pane.Markdown("**Last event:** _none_") |
| 93 | + |
| 94 | +flow = ReactFlow( |
| 95 | + nodes=nodes, |
| 96 | + edges=edges, |
| 97 | + editor_mode="side", |
| 98 | + sizing_mode="stretch_both", |
| 99 | +) |
| 100 | + |
| 101 | +def _log_event(payload): |
| 102 | + event_type = payload.get("type", "unknown") |
| 103 | + last_event.object = f"**Last event:** `{event_type}`" |
| 104 | + snippet = str(payload) |
| 105 | + event_log.value = f"{event_log.value}\n{event_type}: {snippet}"[-6000:] |
| 106 | + |
| 107 | + |
| 108 | +flow.on("*", _log_event) |
| 109 | + |
| 110 | + |
| 111 | +def _advance_nodes(_): |
| 112 | + order = {"idle": "running", "running": "done", "done": "done", "failed": "idle"} |
| 113 | + for node in nodes: |
| 114 | + current = node.status |
| 115 | + flow.patch_node_data(node.id, {"status": order.get(current, "idle")}) |
| 116 | + |
| 117 | + |
| 118 | +def _randomize_weights(_): |
| 119 | + for edge in edges: |
| 120 | + flow.patch_edge_data(edge.id, {"weight": round(random.uniform(0.05, 0.95), 2)}) |
| 121 | + |
| 122 | + |
| 123 | +advance_btn = pmui.Button(name="Advance pipeline") |
| 124 | +advance_btn.on_click(_advance_nodes) |
| 125 | + |
| 126 | +weights_btn = pmui.Button(name="Randomize edge weights") |
| 127 | +weights_btn.on_click(_randomize_weights) |
| 128 | + |
| 129 | +controls = pn.Row(advance_btn, weights_btn, sizing_mode="stretch_width") |
| 130 | + |
| 131 | +pn.Column( |
| 132 | + pn.pane.Markdown("## Node/Edge Instance Workflow"), |
| 133 | + controls, |
| 134 | + last_event, |
| 135 | + flow, |
| 136 | + event_log, |
| 137 | + sizing_mode="stretch_both", |
| 138 | +).servable() |
0 commit comments