Skip to content

Commit 2ecabea

Browse files
committed
Adding more test examples for unmanaged control
1 parent c57629e commit 2ecabea

File tree

5 files changed

+111
-1
lines changed

5 files changed

+111
-1
lines changed

.github/workflows/run_examples.sh

+1-1
Original file line numberDiff line numberDiff line change
@@ -82,7 +82,7 @@ echo "Using examples directory: $EXAMPLES_DIR"
8282
echo "With Mongo? ${WITH_MONGO}"
8383

8484
# Define the test cases
85-
default_tests=("instrumented_simple_example.py" "instrumented_loop_example.py" "distributed_consumer_example.py" "dask_example.py" "mlflow_example.py" "tensorboard_example.py" "single_layer_perceptron_example.py" "llm_complex/llm_main_example.py")
85+
default_tests=("instrumented_simple_example.py" "instrumented_loop_example.py" "distributed_consumer_example.py" "dask_example.py" "mlflow_example.py" "tensorboard_example.py" "single_layer_perceptron_example.py" "llm_complex/llm_main_example.py" "unmanaged/main.py")
8686

8787
# Use the third argument if provided, otherwise use default tests
8888
if [[ -n "$3" ]]; then

examples/unmanaged/main.py

+55
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
import uuid
2+
import subprocess
3+
import os
4+
from flowcept.flowcept_api.flowcept_controller import Flowcept
5+
from pathlib import Path
6+
7+
def start_workflow():
8+
"""Initialize and start the Flowcept workflow."""
9+
workflow_id = str(uuid.uuid4())
10+
flowcept_instance = Flowcept(
11+
workflow_id=workflow_id,
12+
workflow_name="Test-Workflow",
13+
bundle_exec_id=workflow_id
14+
)
15+
flowcept_instance.start()
16+
return workflow_id, flowcept_instance
17+
18+
19+
def run_task(workflow_id, script="simple_task.py"):
20+
"""Run an external Python script with the given workflow ID."""
21+
process = subprocess.Popen(
22+
["python", script, workflow_id],
23+
stdout=subprocess.PIPE,
24+
stderr=subprocess.PIPE,
25+
text=True
26+
)
27+
28+
for line in iter(process.stdout.readline, ""):
29+
print(line, end="")
30+
31+
process.stdout.close()
32+
process.wait()
33+
34+
35+
def main():
36+
"""Main function to manage workflow execution."""
37+
38+
parent_dir = Path(__file__).resolve().parent
39+
script = os.path.join(parent_dir, "simple_task.py")
40+
41+
workflow_id, f = start_workflow()
42+
43+
try:
44+
run_task(workflow_id, script=script)
45+
finally:
46+
f.stop()
47+
48+
return workflow_id
49+
50+
51+
if __name__ == "__main__":
52+
workflow_id = main()
53+
tasks = Flowcept.db.query({"workflow_id": workflow_id})
54+
assert len(tasks) == 1
55+
print(f"There is one task for the workflow {workflow_id}.")

examples/unmanaged/simple_task.py

+29
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
import argparse
2+
3+
4+
def run_task(workflow_id):
5+
print(f"Task ran successfully for the workflow_id {workflow_id}.")
6+
7+
8+
def parse_args():
9+
# Set up argument parser
10+
parser = argparse.ArgumentParser(description="Process a workflow by ID.")
11+
parser.add_argument("workflow_id", type=str, help="The ID of the workflow to process")
12+
13+
# Parse arguments
14+
args = parser.parse_args()
15+
return args.workflow_id
16+
17+
18+
if __name__ == "__main__":
19+
workflow_id = parse_args()
20+
from flowcept import Flowcept, FlowceptTask
21+
f = Flowcept(workflow_id=workflow_id,
22+
bundle_exec_id=workflow_id,
23+
start_persistence=False, save_workflow=False)
24+
f.start()
25+
t = FlowceptTask(workflow_id=workflow_id, activity_id="test_task")
26+
run_task(workflow_id)
27+
t.end()
28+
f.stop()
29+

resources/redis_consumer_test.py

+24
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
import redis
2+
import msgpack
3+
from flowcept.configs import MQ_HOST, MQ_PORT, MQ_CHANNEL
4+
5+
# Connect to Redis
6+
redis_client = redis.Redis(host=MQ_HOST, port=MQ_PORT, db=0)
7+
8+
# Subscribe to a channel
9+
pubsub = redis_client.pubsub()
10+
pubsub.subscribe(MQ_CHANNEL)
11+
12+
print("Listening for messages...")
13+
14+
15+
for message in pubsub.listen():
16+
print("Received a message!")
17+
if message["type"] in {"psubscribe"}:
18+
continue
19+
20+
if isinstance(message["data"], int):
21+
message["data"] = str(message["data"]).encode() # Convert to string and encode to bytes
22+
23+
msg_obj = msgpack.loads(message["data"], strict_map_key=False)
24+
print(msg_obj)

src/flowcept/instrumentation/task_capture.py

+2
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,7 @@ def __init__(
5757
task_id: str = None,
5858
workflow_id: str = None,
5959
campaign_id: str = None,
60+
activity_id: str = None,
6061
used: Dict = None,
6162
custom_metadata: Dict = None,
6263
):
@@ -65,6 +66,7 @@ def __init__(
6566
return
6667
self._task = TaskObject()
6768
self._task.telemetry_at_start = FlowceptTask._interceptor.telemetry_capture.capture()
69+
self._task.activity_id = activity_id
6870
self._task.started_at = time()
6971
self._task.task_id = task_id or str(self._task.started_at)
7072
self._task.workflow_id = workflow_id or Flowcept.current_workflow_id

0 commit comments

Comments
 (0)