Compare commits
7 Commits
main
...
flows-wrap
| Author | SHA1 | Date | |
|---|---|---|---|
| 6ce236ba8b | |||
| 428f732a84 | |||
| ba9263a779 | |||
| eb16868469 | |||
| 3ff0cd3e20 | |||
| f27327cea9 | |||
| a72efd6c0b |
1
.gitea/workflows/cd_workflows.yml
Normal file
1
.gitea/workflows/cd_workflows.yml
Normal file
@ -0,0 +1 @@
|
|||||||
|
{}
|
||||||
62
.gitea/workflows/ci_workflows.yml
Normal file
62
.gitea/workflows/ci_workflows.yml
Normal file
@ -0,0 +1,62 @@
|
|||||||
|
name: CI Workflow
|
||||||
|
|
||||||
|
on:
|
||||||
|
push:
|
||||||
|
branches:
|
||||||
|
- '*'
|
||||||
|
|
||||||
|
jobs:
|
||||||
|
test:
|
||||||
|
name: Testing the Flow
|
||||||
|
runs-on: ubuntu-latest
|
||||||
|
steps:
|
||||||
|
- name: Checkout Code
|
||||||
|
uses: actions/checkout@v4
|
||||||
|
|
||||||
|
- name: Set Up Python Environment
|
||||||
|
uses: actions/setup-python@v4
|
||||||
|
with:
|
||||||
|
python-version: '3.10'
|
||||||
|
|
||||||
|
- name: Install Python Dependencies
|
||||||
|
run: |
|
||||||
|
python -m pip install --upgrade pip
|
||||||
|
pip install -r requirements.txt
|
||||||
|
|
||||||
|
- name: Execute Unit Tests
|
||||||
|
run: python -m unittest discover -s . -p 'test_*.py'
|
||||||
|
|
||||||
|
build_and_push:
|
||||||
|
name: Containerize the Flow
|
||||||
|
runs-on: ubuntu-latest
|
||||||
|
needs: test
|
||||||
|
steps:
|
||||||
|
- name: Extract Repository Name
|
||||||
|
id: extract_repo
|
||||||
|
run: echo "repo_name=${GITHUB_REPOSITORY##*/}" >> $GITHUB_OUTPUT
|
||||||
|
|
||||||
|
- name: Checkout Codebase
|
||||||
|
uses: actions/checkout@v4
|
||||||
|
with:
|
||||||
|
fetch-depth: 1
|
||||||
|
|
||||||
|
- name: Configure Docker Buildx
|
||||||
|
uses: docker/setup-buildx-action@v3
|
||||||
|
with:
|
||||||
|
driver: docker
|
||||||
|
|
||||||
|
- name: Log In to Container Registry
|
||||||
|
uses: docker/login-action@v2
|
||||||
|
with:
|
||||||
|
registry: centurion-version-control.default.svc.cluster.local:3000
|
||||||
|
username: ${{ secrets.CI_USER }}
|
||||||
|
password: ${{ secrets.CI_USER_TOKEN }}
|
||||||
|
|
||||||
|
- name: Build and Push Docker Image to Registry
|
||||||
|
uses: docker/build-push-action@v4
|
||||||
|
with:
|
||||||
|
context: .
|
||||||
|
push: true
|
||||||
|
tags: |
|
||||||
|
centurion-version-control.default.svc.cluster.local:3000/centurion/${{ steps.extract_repo.outputs.repo_name }}/${{ github.ref_name }}:${{ github.sha }}
|
||||||
|
centurion-version-control.default.svc.cluster.local:3000/centurion/${{ steps.extract_repo.outputs.repo_name }}/${{ github.ref_name }}:latest
|
||||||
17
Dockerfile
Normal file
17
Dockerfile
Normal file
@ -0,0 +1,17 @@
|
|||||||
|
# Use Python slim image as base
|
||||||
|
FROM python:3.10-slim AS base
|
||||||
|
|
||||||
|
# Set up a directory for the application code
|
||||||
|
WORKDIR /app
|
||||||
|
|
||||||
|
# Copy only the requirements file initially for better caching
|
||||||
|
COPY requirements.txt .
|
||||||
|
|
||||||
|
# Install Workflow SDK and other dependencies
|
||||||
|
RUN pip install --no-cache-dir -r requirements.txt
|
||||||
|
|
||||||
|
# Copy the rest of the application code
|
||||||
|
COPY . .
|
||||||
|
|
||||||
|
# No entrypoint as this is a preparation image
|
||||||
|
ENTRYPOINT ["python", "/app/flow_wrapper.py"]
|
||||||
264
expected_workflows/flow_hybrid_expected.py
Normal file
264
expected_workflows/flow_hybrid_expected.py
Normal file
@ -0,0 +1,264 @@
|
|||||||
|
import temporalio.workflow
|
||||||
|
from typing import Any, Dict, List, Callable, Awaitable
|
||||||
|
import logging
|
||||||
|
import asyncio
|
||||||
|
import json
|
||||||
|
import datetime
|
||||||
|
import re
|
||||||
|
import jmespath
|
||||||
|
from temporalio.exceptions import ApplicationError
|
||||||
|
|
||||||
|
# Configure logging
|
||||||
|
logging.basicConfig(level=logging.INFO,
|
||||||
|
format="%(asctime)s [%(levelname)s] %(message)s")
|
||||||
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
@temporalio.workflow.defn
|
||||||
|
class test_repo_test_branch:
|
||||||
|
@temporalio.workflow.run
|
||||||
|
async def run(self, root_inputs: Dict[str, Any]) -> Dict[str, Any]:
|
||||||
|
workflow_info = temporalio.workflow.info()
|
||||||
|
workflow_output: Dict[str, Any] = {
|
||||||
|
"workflow_id": workflow_info.workflow_id,
|
||||||
|
"run_id": workflow_info.run_id,
|
||||||
|
"name": "test_repo_test_branch",
|
||||||
|
"status": "in_progress",
|
||||||
|
"blocks": [],
|
||||||
|
"root_input": root_inputs
|
||||||
|
}
|
||||||
|
try:
|
||||||
|
# Initialize results
|
||||||
|
results: Dict[str, Any] = {}
|
||||||
|
|
||||||
|
# Define task functions
|
||||||
|
task_functions: Dict[str, Callable[[], Awaitable[Any]]] = {}
|
||||||
|
async def task_2():
|
||||||
|
node_id = "2"
|
||||||
|
block_name = "addition"
|
||||||
|
# Prepare inputs
|
||||||
|
input_params: Dict[str, Any] = {}
|
||||||
|
try:
|
||||||
|
jsonpath_expr = jmespath.compile("a")
|
||||||
|
value = jsonpath_expr.search(root_inputs)
|
||||||
|
input_params["a"] = value
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"Error parsing jsonpath 'a' for parameter 'a': {e}")
|
||||||
|
input_params["a"] = None
|
||||||
|
try:
|
||||||
|
jsonpath_expr = jmespath.compile("b")
|
||||||
|
value = jsonpath_expr.search(root_inputs)
|
||||||
|
input_params["b"] = value
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"Error parsing jsonpath 'b' for parameter 'b': {e}")
|
||||||
|
input_params["b"] = None
|
||||||
|
logger.info(f"Starting 'addition' activity on task queue 'blocks_transformer_addition_97ec9e0d50' with inputs: %s", input_params)
|
||||||
|
try:
|
||||||
|
# Convert timeouts and intervals from milliseconds to seconds
|
||||||
|
schedule_to_close_timeout_value_ms = 0
|
||||||
|
start_to_close_timeout_value_ms = 0
|
||||||
|
schedule_to_close_timeout = None if schedule_to_close_timeout_value_ms == 0 else datetime.timedelta(seconds=schedule_to_close_timeout_value_ms / 1000.0)
|
||||||
|
start_to_close_timeout = None if start_to_close_timeout_value_ms == 0 else datetime.timedelta(seconds=start_to_close_timeout_value_ms / 1000.0)
|
||||||
|
initial_interval_value_ms = 1000
|
||||||
|
maximum_interval_value_ms = 100000
|
||||||
|
initial_interval = datetime.timedelta(seconds=initial_interval_value_ms / 1000.0)
|
||||||
|
maximum_interval = datetime.timedelta(seconds=maximum_interval_value_ms / 1000.0)
|
||||||
|
maximum_attempts_value = 0
|
||||||
|
maximum_attempts = None if maximum_attempts_value == 0 else maximum_attempts_value
|
||||||
|
|
||||||
|
result = await temporalio.workflow.execute_activity(
|
||||||
|
"block_main_activity",
|
||||||
|
input_params,
|
||||||
|
schedule_to_close_timeout=schedule_to_close_timeout,
|
||||||
|
start_to_close_timeout=start_to_close_timeout,
|
||||||
|
task_queue="blocks_transformer_addition_97ec9e0d50",
|
||||||
|
retry_policy=temporalio.common.RetryPolicy(
|
||||||
|
maximum_attempts=maximum_attempts,
|
||||||
|
initial_interval=initial_interval,
|
||||||
|
backoff_coefficient=2,
|
||||||
|
maximum_interval=maximum_interval
|
||||||
|
)
|
||||||
|
)
|
||||||
|
logger.info(f"Completed 'addition' activity with result: %s", result)
|
||||||
|
block_status = "completed"
|
||||||
|
block_error = None
|
||||||
|
results[node_id] = result
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"Activity 'addition' failed with error: {e}")
|
||||||
|
result = None
|
||||||
|
block_status = "failed"
|
||||||
|
block_error = {
|
||||||
|
"code": type(e).__name__,
|
||||||
|
"description": str(e),
|
||||||
|
"details": {"cause": str(getattr(e, "cause", "No additional details"))}
|
||||||
|
}
|
||||||
|
workflow_output["status"] = "failed"
|
||||||
|
# Collect block output
|
||||||
|
workflow_output["blocks"].append({
|
||||||
|
"activity_id": node_id,
|
||||||
|
"name": block_name,
|
||||||
|
"status": block_status,
|
||||||
|
"input": input_params,
|
||||||
|
"result": result,
|
||||||
|
"error": block_error
|
||||||
|
})
|
||||||
|
|
||||||
|
task_functions["2"] = task_2
|
||||||
|
async def task_m3aiq7ixuo6du35h8tr():
|
||||||
|
node_id = "m3aiq7ixuo6du35h8tr"
|
||||||
|
block_name = "multiply"
|
||||||
|
# Prepare inputs
|
||||||
|
input_params: Dict[str, Any] = {}
|
||||||
|
try:
|
||||||
|
source_data = results.get("2", {})
|
||||||
|
jsonpath_expr = jmespath.compile("sum")
|
||||||
|
value = jsonpath_expr.search(source_data)
|
||||||
|
input_params["sum"] = value
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"Error parsing jsonpath 'sum' for parameter 'sum' from node '2': {e}")
|
||||||
|
input_params["sum"] = None
|
||||||
|
logger.info(f"Starting 'multiply' activity on task queue 'blocks_transformer_multiply_db086f09c9' with inputs: %s", input_params)
|
||||||
|
try:
|
||||||
|
# Convert timeouts and intervals from milliseconds to seconds
|
||||||
|
schedule_to_close_timeout_value_ms = 0
|
||||||
|
start_to_close_timeout_value_ms = 0
|
||||||
|
schedule_to_close_timeout = None if schedule_to_close_timeout_value_ms == 0 else datetime.timedelta(seconds=schedule_to_close_timeout_value_ms / 1000.0)
|
||||||
|
start_to_close_timeout = None if start_to_close_timeout_value_ms == 0 else datetime.timedelta(seconds=start_to_close_timeout_value_ms / 1000.0)
|
||||||
|
initial_interval_value_ms = 1000
|
||||||
|
maximum_interval_value_ms = 100000
|
||||||
|
initial_interval = datetime.timedelta(seconds=initial_interval_value_ms / 1000.0)
|
||||||
|
maximum_interval = datetime.timedelta(seconds=maximum_interval_value_ms / 1000.0)
|
||||||
|
maximum_attempts_value = 0
|
||||||
|
maximum_attempts = None if maximum_attempts_value == 0 else maximum_attempts_value
|
||||||
|
|
||||||
|
result = await temporalio.workflow.execute_activity(
|
||||||
|
"block_main_activity",
|
||||||
|
input_params,
|
||||||
|
schedule_to_close_timeout=schedule_to_close_timeout,
|
||||||
|
start_to_close_timeout=start_to_close_timeout,
|
||||||
|
task_queue="blocks_transformer_multiply_db086f09c9",
|
||||||
|
retry_policy=temporalio.common.RetryPolicy(
|
||||||
|
maximum_attempts=maximum_attempts,
|
||||||
|
initial_interval=initial_interval,
|
||||||
|
backoff_coefficient=2,
|
||||||
|
maximum_interval=maximum_interval
|
||||||
|
)
|
||||||
|
)
|
||||||
|
logger.info(f"Completed 'multiply' activity with result: %s", result)
|
||||||
|
block_status = "completed"
|
||||||
|
block_error = None
|
||||||
|
results[node_id] = result
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"Activity 'multiply' failed with error: {e}")
|
||||||
|
result = None
|
||||||
|
block_status = "failed"
|
||||||
|
block_error = {
|
||||||
|
"code": type(e).__name__,
|
||||||
|
"description": str(e),
|
||||||
|
"details": {"cause": str(getattr(e, "cause", "No additional details"))}
|
||||||
|
}
|
||||||
|
workflow_output["status"] = "failed"
|
||||||
|
# Collect block output
|
||||||
|
workflow_output["blocks"].append({
|
||||||
|
"activity_id": node_id,
|
||||||
|
"name": block_name,
|
||||||
|
"status": block_status,
|
||||||
|
"input": input_params,
|
||||||
|
"result": result,
|
||||||
|
"error": block_error
|
||||||
|
})
|
||||||
|
|
||||||
|
task_functions["m3aiq7ixuo6du35h8tr"] = task_m3aiq7ixuo6du35h8tr
|
||||||
|
async def task_m3aiqkrv4k1y6654ymr():
|
||||||
|
node_id = "m3aiqkrv4k1y6654ymr"
|
||||||
|
block_name = "power"
|
||||||
|
# Prepare inputs
|
||||||
|
input_params: Dict[str, Any] = {}
|
||||||
|
try:
|
||||||
|
source_data = results.get("2", {})
|
||||||
|
jsonpath_expr = jmespath.compile("sum")
|
||||||
|
value = jsonpath_expr.search(source_data)
|
||||||
|
input_params["product"] = value
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"Error parsing jsonpath 'sum' for parameter 'product' from node '2': {e}")
|
||||||
|
input_params["product"] = None
|
||||||
|
logger.info(f"Starting 'power' activity on task queue 'blocks_transformer_power_057645be0c' with inputs: %s", input_params)
|
||||||
|
try:
|
||||||
|
# Convert timeouts and intervals from milliseconds to seconds
|
||||||
|
schedule_to_close_timeout_value_ms = 0
|
||||||
|
start_to_close_timeout_value_ms = 0
|
||||||
|
schedule_to_close_timeout = None if schedule_to_close_timeout_value_ms == 0 else datetime.timedelta(seconds=schedule_to_close_timeout_value_ms / 1000.0)
|
||||||
|
start_to_close_timeout = None if start_to_close_timeout_value_ms == 0 else datetime.timedelta(seconds=start_to_close_timeout_value_ms / 1000.0)
|
||||||
|
initial_interval_value_ms = 1000
|
||||||
|
maximum_interval_value_ms = 100000
|
||||||
|
initial_interval = datetime.timedelta(seconds=initial_interval_value_ms / 1000.0)
|
||||||
|
maximum_interval = datetime.timedelta(seconds=maximum_interval_value_ms / 1000.0)
|
||||||
|
maximum_attempts_value = 0
|
||||||
|
maximum_attempts = None if maximum_attempts_value == 0 else maximum_attempts_value
|
||||||
|
|
||||||
|
result = await temporalio.workflow.execute_activity(
|
||||||
|
"block_main_activity",
|
||||||
|
input_params,
|
||||||
|
schedule_to_close_timeout=schedule_to_close_timeout,
|
||||||
|
start_to_close_timeout=start_to_close_timeout,
|
||||||
|
task_queue="blocks_transformer_power_057645be0c",
|
||||||
|
retry_policy=temporalio.common.RetryPolicy(
|
||||||
|
maximum_attempts=maximum_attempts,
|
||||||
|
initial_interval=initial_interval,
|
||||||
|
backoff_coefficient=2,
|
||||||
|
maximum_interval=maximum_interval
|
||||||
|
)
|
||||||
|
)
|
||||||
|
logger.info(f"Completed 'power' activity with result: %s", result)
|
||||||
|
block_status = "completed"
|
||||||
|
block_error = None
|
||||||
|
results[node_id] = result
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"Activity 'power' failed with error: {e}")
|
||||||
|
result = None
|
||||||
|
block_status = "failed"
|
||||||
|
block_error = {
|
||||||
|
"code": type(e).__name__,
|
||||||
|
"description": str(e),
|
||||||
|
"details": {"cause": str(getattr(e, "cause", "No additional details"))}
|
||||||
|
}
|
||||||
|
workflow_output["status"] = "failed"
|
||||||
|
# Collect block output
|
||||||
|
workflow_output["blocks"].append({
|
||||||
|
"activity_id": node_id,
|
||||||
|
"name": block_name,
|
||||||
|
"status": block_status,
|
||||||
|
"input": input_params,
|
||||||
|
"result": result,
|
||||||
|
"error": block_error
|
||||||
|
})
|
||||||
|
|
||||||
|
task_functions["m3aiqkrv4k1y6654ymr"] = task_m3aiqkrv4k1y6654ymr
|
||||||
|
|
||||||
|
# Execute tasks according to execution steps
|
||||||
|
# Execution step 1
|
||||||
|
tasks = [task_functions[node_id]() for node_id in ['2']]
|
||||||
|
results_step = await asyncio.gather(*tasks, return_exceptions=True)
|
||||||
|
for result in results_step:
|
||||||
|
if isinstance(result, Exception):
|
||||||
|
logger.error(f"Task failed with exception: {result}")
|
||||||
|
workflow_output["status"] = "failed"
|
||||||
|
# Execution step 2
|
||||||
|
tasks = [task_functions[node_id]() for node_id in ['m3aiq7ixuo6du35h8tr', 'm3aiqkrv4k1y6654ymr']]
|
||||||
|
results_step = await asyncio.gather(*tasks, return_exceptions=True)
|
||||||
|
for result in results_step:
|
||||||
|
if isinstance(result, Exception):
|
||||||
|
logger.error(f"Task failed with exception: {result}")
|
||||||
|
workflow_output["status"] = "failed"
|
||||||
|
|
||||||
|
# Update workflow status to completed if not failed
|
||||||
|
if workflow_output["status"] != "failed":
|
||||||
|
workflow_output["status"] = "completed"
|
||||||
|
else:
|
||||||
|
raise ApplicationError("Activity error occurred", type="ActivityError", non_retryable=True)
|
||||||
|
|
||||||
|
return workflow_output
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"Workflow failed with error: {e}")
|
||||||
|
workflow_output["status"] = "failed"
|
||||||
|
raise temporalio.exceptions.ApplicationError("Workflow failed",workflow_output,str(e),type="WorkflowError",non_retryable=True) from e
|
||||||
255
expected_workflows/flow_parallel_expected.py
Normal file
255
expected_workflows/flow_parallel_expected.py
Normal file
@ -0,0 +1,255 @@
|
|||||||
|
import temporalio.workflow
|
||||||
|
from typing import Any, Dict, List, Callable, Awaitable
|
||||||
|
import logging
|
||||||
|
import asyncio
|
||||||
|
import json
|
||||||
|
import datetime
|
||||||
|
import re
|
||||||
|
import jmespath
|
||||||
|
from temporalio.exceptions import ApplicationError
|
||||||
|
|
||||||
|
# Configure logging
|
||||||
|
logging.basicConfig(level=logging.INFO,
|
||||||
|
format="%(asctime)s [%(levelname)s] %(message)s")
|
||||||
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
@temporalio.workflow.defn
|
||||||
|
class test_repo_test_branch:
|
||||||
|
@temporalio.workflow.run
|
||||||
|
async def run(self, root_inputs: Dict[str, Any]) -> Dict[str, Any]:
|
||||||
|
workflow_info = temporalio.workflow.info()
|
||||||
|
workflow_output: Dict[str, Any] = {
|
||||||
|
"workflow_id": workflow_info.workflow_id,
|
||||||
|
"run_id": workflow_info.run_id,
|
||||||
|
"name": "test_repo_test_branch",
|
||||||
|
"status": "in_progress",
|
||||||
|
"blocks": [],
|
||||||
|
"root_input": root_inputs
|
||||||
|
}
|
||||||
|
try:
|
||||||
|
# Initialize results
|
||||||
|
results: Dict[str, Any] = {}
|
||||||
|
|
||||||
|
# Define task functions
|
||||||
|
task_functions: Dict[str, Callable[[], Awaitable[Any]]] = {}
|
||||||
|
async def task_2():
|
||||||
|
node_id = "2"
|
||||||
|
block_name = "addition"
|
||||||
|
# Prepare inputs
|
||||||
|
input_params: Dict[str, Any] = {}
|
||||||
|
try:
|
||||||
|
jsonpath_expr = jmespath.compile("a")
|
||||||
|
value = jsonpath_expr.search(root_inputs)
|
||||||
|
input_params["a"] = value
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"Error parsing jsonpath 'a' for parameter 'a': {e}")
|
||||||
|
input_params["a"] = None
|
||||||
|
try:
|
||||||
|
jsonpath_expr = jmespath.compile("b")
|
||||||
|
value = jsonpath_expr.search(root_inputs)
|
||||||
|
input_params["b"] = value
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"Error parsing jsonpath 'b' for parameter 'b': {e}")
|
||||||
|
input_params["b"] = None
|
||||||
|
logger.info(f"Starting 'addition' activity on task queue 'blocks_transformer_addition_97ec9e0d50' with inputs: %s", input_params)
|
||||||
|
try:
|
||||||
|
# Convert timeouts and intervals from milliseconds to seconds
|
||||||
|
schedule_to_close_timeout_value_ms = 0
|
||||||
|
start_to_close_timeout_value_ms = 0
|
||||||
|
schedule_to_close_timeout = None if schedule_to_close_timeout_value_ms == 0 else datetime.timedelta(seconds=schedule_to_close_timeout_value_ms / 1000.0)
|
||||||
|
start_to_close_timeout = None if start_to_close_timeout_value_ms == 0 else datetime.timedelta(seconds=start_to_close_timeout_value_ms / 1000.0)
|
||||||
|
initial_interval_value_ms = 1000
|
||||||
|
maximum_interval_value_ms = 100000
|
||||||
|
initial_interval = datetime.timedelta(seconds=initial_interval_value_ms / 1000.0)
|
||||||
|
maximum_interval = datetime.timedelta(seconds=maximum_interval_value_ms / 1000.0)
|
||||||
|
maximum_attempts_value = 0
|
||||||
|
maximum_attempts = None if maximum_attempts_value == 0 else maximum_attempts_value
|
||||||
|
|
||||||
|
result = await temporalio.workflow.execute_activity(
|
||||||
|
"block_main_activity",
|
||||||
|
input_params,
|
||||||
|
schedule_to_close_timeout=schedule_to_close_timeout,
|
||||||
|
start_to_close_timeout=start_to_close_timeout,
|
||||||
|
task_queue="blocks_transformer_addition_97ec9e0d50",
|
||||||
|
retry_policy=temporalio.common.RetryPolicy(
|
||||||
|
maximum_attempts=maximum_attempts,
|
||||||
|
initial_interval=initial_interval,
|
||||||
|
backoff_coefficient=2,
|
||||||
|
maximum_interval=maximum_interval
|
||||||
|
)
|
||||||
|
)
|
||||||
|
logger.info(f"Completed 'addition' activity with result: %s", result)
|
||||||
|
block_status = "completed"
|
||||||
|
block_error = None
|
||||||
|
results[node_id] = result
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"Activity 'addition' failed with error: {e}")
|
||||||
|
result = None
|
||||||
|
block_status = "failed"
|
||||||
|
block_error = {
|
||||||
|
"code": type(e).__name__,
|
||||||
|
"description": str(e),
|
||||||
|
"details": {"cause": str(getattr(e, "cause", "No additional details"))}
|
||||||
|
}
|
||||||
|
workflow_output["status"] = "failed"
|
||||||
|
# Collect block output
|
||||||
|
workflow_output["blocks"].append({
|
||||||
|
"activity_id": node_id,
|
||||||
|
"name": block_name,
|
||||||
|
"status": block_status,
|
||||||
|
"input": input_params,
|
||||||
|
"result": result,
|
||||||
|
"error": block_error
|
||||||
|
})
|
||||||
|
|
||||||
|
task_functions["2"] = task_2
|
||||||
|
async def task_m3aiq7ixuo6du35h8tr():
|
||||||
|
node_id = "m3aiq7ixuo6du35h8tr"
|
||||||
|
block_name = "multiply"
|
||||||
|
# Prepare inputs
|
||||||
|
input_params: Dict[str, Any] = {}
|
||||||
|
try:
|
||||||
|
jsonpath_expr = jmespath.compile("sum")
|
||||||
|
value = jsonpath_expr.search(root_inputs)
|
||||||
|
input_params["sum"] = value
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"Error parsing jsonpath 'sum' for parameter 'sum': {e}")
|
||||||
|
input_params["sum"] = None
|
||||||
|
logger.info(f"Starting 'multiply' activity on task queue 'blocks_transformer_multiply_db086f09c9' with inputs: %s", input_params)
|
||||||
|
try:
|
||||||
|
# Convert timeouts and intervals from milliseconds to seconds
|
||||||
|
schedule_to_close_timeout_value_ms = 0
|
||||||
|
start_to_close_timeout_value_ms = 0
|
||||||
|
schedule_to_close_timeout = None if schedule_to_close_timeout_value_ms == 0 else datetime.timedelta(seconds=schedule_to_close_timeout_value_ms / 1000.0)
|
||||||
|
start_to_close_timeout = None if start_to_close_timeout_value_ms == 0 else datetime.timedelta(seconds=start_to_close_timeout_value_ms / 1000.0)
|
||||||
|
initial_interval_value_ms = 1000
|
||||||
|
maximum_interval_value_ms = 100000
|
||||||
|
initial_interval = datetime.timedelta(seconds=initial_interval_value_ms / 1000.0)
|
||||||
|
maximum_interval = datetime.timedelta(seconds=maximum_interval_value_ms / 1000.0)
|
||||||
|
maximum_attempts_value = 0
|
||||||
|
maximum_attempts = None if maximum_attempts_value == 0 else maximum_attempts_value
|
||||||
|
|
||||||
|
result = await temporalio.workflow.execute_activity(
|
||||||
|
"block_main_activity",
|
||||||
|
input_params,
|
||||||
|
schedule_to_close_timeout=schedule_to_close_timeout,
|
||||||
|
start_to_close_timeout=start_to_close_timeout,
|
||||||
|
task_queue="blocks_transformer_multiply_db086f09c9",
|
||||||
|
retry_policy=temporalio.common.RetryPolicy(
|
||||||
|
maximum_attempts=maximum_attempts,
|
||||||
|
initial_interval=initial_interval,
|
||||||
|
backoff_coefficient=2,
|
||||||
|
maximum_interval=maximum_interval
|
||||||
|
)
|
||||||
|
)
|
||||||
|
logger.info(f"Completed 'multiply' activity with result: %s", result)
|
||||||
|
block_status = "completed"
|
||||||
|
block_error = None
|
||||||
|
results[node_id] = result
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"Activity 'multiply' failed with error: {e}")
|
||||||
|
result = None
|
||||||
|
block_status = "failed"
|
||||||
|
block_error = {
|
||||||
|
"code": type(e).__name__,
|
||||||
|
"description": str(e),
|
||||||
|
"details": {"cause": str(getattr(e, "cause", "No additional details"))}
|
||||||
|
}
|
||||||
|
workflow_output["status"] = "failed"
|
||||||
|
# Collect block output
|
||||||
|
workflow_output["blocks"].append({
|
||||||
|
"activity_id": node_id,
|
||||||
|
"name": block_name,
|
||||||
|
"status": block_status,
|
||||||
|
"input": input_params,
|
||||||
|
"result": result,
|
||||||
|
"error": block_error
|
||||||
|
})
|
||||||
|
|
||||||
|
task_functions["m3aiq7ixuo6du35h8tr"] = task_m3aiq7ixuo6du35h8tr
|
||||||
|
async def task_m3aiqkrv4k1y6654ymr():
|
||||||
|
node_id = "m3aiqkrv4k1y6654ymr"
|
||||||
|
block_name = "power"
|
||||||
|
# Prepare inputs
|
||||||
|
input_params: Dict[str, Any] = {}
|
||||||
|
try:
|
||||||
|
jsonpath_expr = jmespath.compile("product")
|
||||||
|
value = jsonpath_expr.search(root_inputs)
|
||||||
|
input_params["product"] = value
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"Error parsing jsonpath 'product' for parameter 'product': {e}")
|
||||||
|
input_params["product"] = None
|
||||||
|
logger.info(f"Starting 'power' activity on task queue 'blocks_transformer_power_057645be0c' with inputs: %s", input_params)
|
||||||
|
try:
|
||||||
|
# Convert timeouts and intervals from milliseconds to seconds
|
||||||
|
schedule_to_close_timeout_value_ms = 0
|
||||||
|
start_to_close_timeout_value_ms = 0
|
||||||
|
schedule_to_close_timeout = None if schedule_to_close_timeout_value_ms == 0 else datetime.timedelta(seconds=schedule_to_close_timeout_value_ms / 1000.0)
|
||||||
|
start_to_close_timeout = None if start_to_close_timeout_value_ms == 0 else datetime.timedelta(seconds=start_to_close_timeout_value_ms / 1000.0)
|
||||||
|
initial_interval_value_ms = 1000
|
||||||
|
maximum_interval_value_ms = 100000
|
||||||
|
initial_interval = datetime.timedelta(seconds=initial_interval_value_ms / 1000.0)
|
||||||
|
maximum_interval = datetime.timedelta(seconds=maximum_interval_value_ms / 1000.0)
|
||||||
|
maximum_attempts_value = 0
|
||||||
|
maximum_attempts = None if maximum_attempts_value == 0 else maximum_attempts_value
|
||||||
|
|
||||||
|
result = await temporalio.workflow.execute_activity(
|
||||||
|
"block_main_activity",
|
||||||
|
input_params,
|
||||||
|
schedule_to_close_timeout=schedule_to_close_timeout,
|
||||||
|
start_to_close_timeout=start_to_close_timeout,
|
||||||
|
task_queue="blocks_transformer_power_057645be0c",
|
||||||
|
retry_policy=temporalio.common.RetryPolicy(
|
||||||
|
maximum_attempts=maximum_attempts,
|
||||||
|
initial_interval=initial_interval,
|
||||||
|
backoff_coefficient=2,
|
||||||
|
maximum_interval=maximum_interval
|
||||||
|
)
|
||||||
|
)
|
||||||
|
logger.info(f"Completed 'power' activity with result: %s", result)
|
||||||
|
block_status = "completed"
|
||||||
|
block_error = None
|
||||||
|
results[node_id] = result
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"Activity 'power' failed with error: {e}")
|
||||||
|
result = None
|
||||||
|
block_status = "failed"
|
||||||
|
block_error = {
|
||||||
|
"code": type(e).__name__,
|
||||||
|
"description": str(e),
|
||||||
|
"details": {"cause": str(getattr(e, "cause", "No additional details"))}
|
||||||
|
}
|
||||||
|
workflow_output["status"] = "failed"
|
||||||
|
# Collect block output
|
||||||
|
workflow_output["blocks"].append({
|
||||||
|
"activity_id": node_id,
|
||||||
|
"name": block_name,
|
||||||
|
"status": block_status,
|
||||||
|
"input": input_params,
|
||||||
|
"result": result,
|
||||||
|
"error": block_error
|
||||||
|
})
|
||||||
|
|
||||||
|
task_functions["m3aiqkrv4k1y6654ymr"] = task_m3aiqkrv4k1y6654ymr
|
||||||
|
|
||||||
|
# Execute tasks according to execution steps
|
||||||
|
# Execution step 1
|
||||||
|
tasks = [task_functions[node_id]() for node_id in ['2', 'm3aiq7ixuo6du35h8tr', 'm3aiqkrv4k1y6654ymr']]
|
||||||
|
results_step = await asyncio.gather(*tasks, return_exceptions=True)
|
||||||
|
for result in results_step:
|
||||||
|
if isinstance(result, Exception):
|
||||||
|
logger.error(f"Task failed with exception: {result}")
|
||||||
|
workflow_output["status"] = "failed"
|
||||||
|
|
||||||
|
# Update workflow status to completed if not failed
|
||||||
|
if workflow_output["status"] != "failed":
|
||||||
|
workflow_output["status"] = "completed"
|
||||||
|
else:
|
||||||
|
raise ApplicationError("Activity error occurred", type="ActivityError", non_retryable=True)
|
||||||
|
|
||||||
|
return workflow_output
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"Workflow failed with error: {e}")
|
||||||
|
workflow_output["status"] = "failed"
|
||||||
|
raise temporalio.exceptions.ApplicationError("Workflow failed",workflow_output,str(e),type="WorkflowError",non_retryable=True) from e
|
||||||
271
expected_workflows/flow_sequential_expected.py
Normal file
271
expected_workflows/flow_sequential_expected.py
Normal file
@ -0,0 +1,271 @@
|
|||||||
|
import temporalio.workflow
|
||||||
|
from typing import Any, Dict, List, Callable, Awaitable
|
||||||
|
import logging
|
||||||
|
import asyncio
|
||||||
|
import json
|
||||||
|
import datetime
|
||||||
|
import re
|
||||||
|
import jmespath
|
||||||
|
from temporalio.exceptions import ApplicationError
|
||||||
|
|
||||||
|
# Configure logging
|
||||||
|
logging.basicConfig(level=logging.INFO,
|
||||||
|
format="%(asctime)s [%(levelname)s] %(message)s")
|
||||||
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
@temporalio.workflow.defn
|
||||||
|
class test_repo_test_branch:
|
||||||
|
@temporalio.workflow.run
|
||||||
|
async def run(self, root_inputs: Dict[str, Any]) -> Dict[str, Any]:
|
||||||
|
workflow_info = temporalio.workflow.info()
|
||||||
|
workflow_output: Dict[str, Any] = {
|
||||||
|
"workflow_id": workflow_info.workflow_id,
|
||||||
|
"run_id": workflow_info.run_id,
|
||||||
|
"name": "test_repo_test_branch",
|
||||||
|
"status": "in_progress",
|
||||||
|
"blocks": [],
|
||||||
|
"root_input": root_inputs
|
||||||
|
}
|
||||||
|
try:
|
||||||
|
# Initialize results
|
||||||
|
results: Dict[str, Any] = {}
|
||||||
|
|
||||||
|
# Define task functions
|
||||||
|
task_functions: Dict[str, Callable[[], Awaitable[Any]]] = {}
|
||||||
|
async def task_2():
|
||||||
|
node_id = "2"
|
||||||
|
block_name = "addition"
|
||||||
|
# Prepare inputs
|
||||||
|
input_params: Dict[str, Any] = {}
|
||||||
|
try:
|
||||||
|
jsonpath_expr = jmespath.compile("a")
|
||||||
|
value = jsonpath_expr.search(root_inputs)
|
||||||
|
input_params["a"] = value
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"Error parsing jsonpath 'a' for parameter 'a': {e}")
|
||||||
|
input_params["a"] = None
|
||||||
|
try:
|
||||||
|
jsonpath_expr = jmespath.compile("b")
|
||||||
|
value = jsonpath_expr.search(root_inputs)
|
||||||
|
input_params["b"] = value
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"Error parsing jsonpath 'b' for parameter 'b': {e}")
|
||||||
|
input_params["b"] = None
|
||||||
|
logger.info(f"Starting 'addition' activity on task queue 'blocks_transformer_addition_97ec9e0d50' with inputs: %s", input_params)
|
||||||
|
try:
|
||||||
|
# Convert timeouts and intervals from milliseconds to seconds
|
||||||
|
schedule_to_close_timeout_value_ms = 0
|
||||||
|
start_to_close_timeout_value_ms = 0
|
||||||
|
schedule_to_close_timeout = None if schedule_to_close_timeout_value_ms == 0 else datetime.timedelta(seconds=schedule_to_close_timeout_value_ms / 1000.0)
|
||||||
|
start_to_close_timeout = None if start_to_close_timeout_value_ms == 0 else datetime.timedelta(seconds=start_to_close_timeout_value_ms / 1000.0)
|
||||||
|
initial_interval_value_ms = 1000
|
||||||
|
maximum_interval_value_ms = 100000
|
||||||
|
initial_interval = datetime.timedelta(seconds=initial_interval_value_ms / 1000.0)
|
||||||
|
maximum_interval = datetime.timedelta(seconds=maximum_interval_value_ms / 1000.0)
|
||||||
|
maximum_attempts_value = 0
|
||||||
|
maximum_attempts = None if maximum_attempts_value == 0 else maximum_attempts_value
|
||||||
|
|
||||||
|
result = await temporalio.workflow.execute_activity(
|
||||||
|
"block_main_activity",
|
||||||
|
input_params,
|
||||||
|
schedule_to_close_timeout=schedule_to_close_timeout,
|
||||||
|
start_to_close_timeout=start_to_close_timeout,
|
||||||
|
task_queue="blocks_transformer_addition_97ec9e0d50",
|
||||||
|
retry_policy=temporalio.common.RetryPolicy(
|
||||||
|
maximum_attempts=maximum_attempts,
|
||||||
|
initial_interval=initial_interval,
|
||||||
|
backoff_coefficient=2,
|
||||||
|
maximum_interval=maximum_interval
|
||||||
|
)
|
||||||
|
)
|
||||||
|
logger.info(f"Completed 'addition' activity with result: %s", result)
|
||||||
|
block_status = "completed"
|
||||||
|
block_error = None
|
||||||
|
results[node_id] = result
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"Activity 'addition' failed with error: {e}")
|
||||||
|
result = None
|
||||||
|
block_status = "failed"
|
||||||
|
block_error = {
|
||||||
|
"code": type(e).__name__,
|
||||||
|
"description": str(e),
|
||||||
|
"details": {"cause": str(getattr(e, "cause", "No additional details"))}
|
||||||
|
}
|
||||||
|
workflow_output["status"] = "failed"
|
||||||
|
# Collect block output
|
||||||
|
workflow_output["blocks"].append({
|
||||||
|
"activity_id": node_id,
|
||||||
|
"name": block_name,
|
||||||
|
"status": block_status,
|
||||||
|
"input": input_params,
|
||||||
|
"result": result,
|
||||||
|
"error": block_error
|
||||||
|
})
|
||||||
|
|
||||||
|
task_functions["2"] = task_2
|
||||||
|
async def task_m3aiq7ixuo6du35h8tr():
|
||||||
|
node_id = "m3aiq7ixuo6du35h8tr"
|
||||||
|
block_name = "multiply"
|
||||||
|
# Prepare inputs
|
||||||
|
input_params: Dict[str, Any] = {}
|
||||||
|
try:
|
||||||
|
source_data = results.get("2", {})
|
||||||
|
jsonpath_expr = jmespath.compile("sum")
|
||||||
|
value = jsonpath_expr.search(source_data)
|
||||||
|
input_params["sum"] = value
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"Error parsing jsonpath 'sum' for parameter 'sum' from node '2': {e}")
|
||||||
|
input_params["sum"] = None
|
||||||
|
logger.info(f"Starting 'multiply' activity on task queue 'blocks_transformer_multiply_db086f09c9' with inputs: %s", input_params)
|
||||||
|
try:
|
||||||
|
# Convert timeouts and intervals from milliseconds to seconds
|
||||||
|
schedule_to_close_timeout_value_ms = 0
|
||||||
|
start_to_close_timeout_value_ms = 0
|
||||||
|
schedule_to_close_timeout = None if schedule_to_close_timeout_value_ms == 0 else datetime.timedelta(seconds=schedule_to_close_timeout_value_ms / 1000.0)
|
||||||
|
start_to_close_timeout = None if start_to_close_timeout_value_ms == 0 else datetime.timedelta(seconds=start_to_close_timeout_value_ms / 1000.0)
|
||||||
|
initial_interval_value_ms = 1000
|
||||||
|
maximum_interval_value_ms = 100000
|
||||||
|
initial_interval = datetime.timedelta(seconds=initial_interval_value_ms / 1000.0)
|
||||||
|
maximum_interval = datetime.timedelta(seconds=maximum_interval_value_ms / 1000.0)
|
||||||
|
maximum_attempts_value = 0
|
||||||
|
maximum_attempts = None if maximum_attempts_value == 0 else maximum_attempts_value
|
||||||
|
|
||||||
|
result = await temporalio.workflow.execute_activity(
|
||||||
|
"block_main_activity",
|
||||||
|
input_params,
|
||||||
|
schedule_to_close_timeout=schedule_to_close_timeout,
|
||||||
|
start_to_close_timeout=start_to_close_timeout,
|
||||||
|
task_queue="blocks_transformer_multiply_db086f09c9",
|
||||||
|
retry_policy=temporalio.common.RetryPolicy(
|
||||||
|
maximum_attempts=maximum_attempts,
|
||||||
|
initial_interval=initial_interval,
|
||||||
|
backoff_coefficient=2,
|
||||||
|
maximum_interval=maximum_interval
|
||||||
|
)
|
||||||
|
)
|
||||||
|
logger.info(f"Completed 'multiply' activity with result: %s", result)
|
||||||
|
block_status = "completed"
|
||||||
|
block_error = None
|
||||||
|
results[node_id] = result
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"Activity 'multiply' failed with error: {e}")
|
||||||
|
result = None
|
||||||
|
block_status = "failed"
|
||||||
|
block_error = {
|
||||||
|
"code": type(e).__name__,
|
||||||
|
"description": str(e),
|
||||||
|
"details": {"cause": str(getattr(e, "cause", "No additional details"))}
|
||||||
|
}
|
||||||
|
workflow_output["status"] = "failed"
|
||||||
|
# Collect block output
|
||||||
|
workflow_output["blocks"].append({
|
||||||
|
"activity_id": node_id,
|
||||||
|
"name": block_name,
|
||||||
|
"status": block_status,
|
||||||
|
"input": input_params,
|
||||||
|
"result": result,
|
||||||
|
"error": block_error
|
||||||
|
})
|
||||||
|
|
||||||
|
task_functions["m3aiq7ixuo6du35h8tr"] = task_m3aiq7ixuo6du35h8tr
|
||||||
|
async def task_m3aiqkrv4k1y6654ymr():
|
||||||
|
node_id = "m3aiqkrv4k1y6654ymr"
|
||||||
|
block_name = "power"
|
||||||
|
# Prepare inputs
|
||||||
|
input_params: Dict[str, Any] = {}
|
||||||
|
try:
|
||||||
|
source_data = results.get("m3aiq7ixuo6du35h8tr", {})
|
||||||
|
jsonpath_expr = jmespath.compile("product")
|
||||||
|
value = jsonpath_expr.search(source_data)
|
||||||
|
input_params["product"] = value
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"Error parsing jsonpath 'product' for parameter 'product' from node 'm3aiq7ixuo6du35h8tr': {e}")
|
||||||
|
input_params["product"] = None
|
||||||
|
logger.info(f"Starting 'power' activity on task queue 'blocks_transformer_power_057645be0c' with inputs: %s", input_params)
|
||||||
|
try:
|
||||||
|
# Convert timeouts and intervals from milliseconds to seconds
|
||||||
|
schedule_to_close_timeout_value_ms = 0
|
||||||
|
start_to_close_timeout_value_ms = 0
|
||||||
|
schedule_to_close_timeout = None if schedule_to_close_timeout_value_ms == 0 else datetime.timedelta(seconds=schedule_to_close_timeout_value_ms / 1000.0)
|
||||||
|
start_to_close_timeout = None if start_to_close_timeout_value_ms == 0 else datetime.timedelta(seconds=start_to_close_timeout_value_ms / 1000.0)
|
||||||
|
initial_interval_value_ms = 1000
|
||||||
|
maximum_interval_value_ms = 100000
|
||||||
|
initial_interval = datetime.timedelta(seconds=initial_interval_value_ms / 1000.0)
|
||||||
|
maximum_interval = datetime.timedelta(seconds=maximum_interval_value_ms / 1000.0)
|
||||||
|
maximum_attempts_value = 0
|
||||||
|
maximum_attempts = None if maximum_attempts_value == 0 else maximum_attempts_value
|
||||||
|
|
||||||
|
result = await temporalio.workflow.execute_activity(
|
||||||
|
"block_main_activity",
|
||||||
|
input_params,
|
||||||
|
schedule_to_close_timeout=schedule_to_close_timeout,
|
||||||
|
start_to_close_timeout=start_to_close_timeout,
|
||||||
|
task_queue="blocks_transformer_power_057645be0c",
|
||||||
|
retry_policy=temporalio.common.RetryPolicy(
|
||||||
|
maximum_attempts=maximum_attempts,
|
||||||
|
initial_interval=initial_interval,
|
||||||
|
backoff_coefficient=2,
|
||||||
|
maximum_interval=maximum_interval
|
||||||
|
)
|
||||||
|
)
|
||||||
|
logger.info(f"Completed 'power' activity with result: %s", result)
|
||||||
|
block_status = "completed"
|
||||||
|
block_error = None
|
||||||
|
results[node_id] = result
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"Activity 'power' failed with error: {e}")
|
||||||
|
result = None
|
||||||
|
block_status = "failed"
|
||||||
|
block_error = {
|
||||||
|
"code": type(e).__name__,
|
||||||
|
"description": str(e),
|
||||||
|
"details": {"cause": str(getattr(e, "cause", "No additional details"))}
|
||||||
|
}
|
||||||
|
workflow_output["status"] = "failed"
|
||||||
|
# Collect block output
|
||||||
|
workflow_output["blocks"].append({
|
||||||
|
"activity_id": node_id,
|
||||||
|
"name": block_name,
|
||||||
|
"status": block_status,
|
||||||
|
"input": input_params,
|
||||||
|
"result": result,
|
||||||
|
"error": block_error
|
||||||
|
})
|
||||||
|
|
||||||
|
task_functions["m3aiqkrv4k1y6654ymr"] = task_m3aiqkrv4k1y6654ymr
|
||||||
|
|
||||||
|
# Execute tasks according to execution steps
|
||||||
|
# Execution step 1
|
||||||
|
tasks = [task_functions[node_id]() for node_id in ['2']]
|
||||||
|
results_step = await asyncio.gather(*tasks, return_exceptions=True)
|
||||||
|
for result in results_step:
|
||||||
|
if isinstance(result, Exception):
|
||||||
|
logger.error(f"Task failed with exception: {result}")
|
||||||
|
workflow_output["status"] = "failed"
|
||||||
|
# Execution step 2
|
||||||
|
tasks = [task_functions[node_id]() for node_id in ['m3aiq7ixuo6du35h8tr']]
|
||||||
|
results_step = await asyncio.gather(*tasks, return_exceptions=True)
|
||||||
|
for result in results_step:
|
||||||
|
if isinstance(result, Exception):
|
||||||
|
logger.error(f"Task failed with exception: {result}")
|
||||||
|
workflow_output["status"] = "failed"
|
||||||
|
# Execution step 3
|
||||||
|
tasks = [task_functions[node_id]() for node_id in ['m3aiqkrv4k1y6654ymr']]
|
||||||
|
results_step = await asyncio.gather(*tasks, return_exceptions=True)
|
||||||
|
for result in results_step:
|
||||||
|
if isinstance(result, Exception):
|
||||||
|
logger.error(f"Task failed with exception: {result}")
|
||||||
|
workflow_output["status"] = "failed"
|
||||||
|
|
||||||
|
# Update workflow status to completed if not failed
|
||||||
|
if workflow_output["status"] != "failed":
|
||||||
|
workflow_output["status"] = "completed"
|
||||||
|
else:
|
||||||
|
raise ApplicationError("Activity error occurred", type="ActivityError", non_retryable=True)
|
||||||
|
|
||||||
|
return workflow_output
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"Workflow failed with error: {e}")
|
||||||
|
workflow_output["status"] = "failed"
|
||||||
|
raise temporalio.exceptions.ApplicationError("Workflow failed",workflow_output,str(e),type="WorkflowError",non_retryable=True) from e
|
||||||
85
flow_wrapper.py
Normal file
85
flow_wrapper.py
Normal file
@ -0,0 +1,85 @@
|
|||||||
|
import asyncio
|
||||||
|
import os
|
||||||
|
import re
|
||||||
|
import sys
|
||||||
|
import logging
|
||||||
|
from temporalio.client import Client
|
||||||
|
from temporalio.worker import Worker
|
||||||
|
|
||||||
|
# Ensure the generated workflow module is in the Python path
|
||||||
|
sys.path.append(os.path.dirname(os.path.abspath(__file__)))
|
||||||
|
|
||||||
|
# Configure logging
|
||||||
|
logging.basicConfig(
|
||||||
|
level=logging.INFO,
|
||||||
|
format="%(asctime)s [%(levelname)s] %(name)s - %(message)s",
|
||||||
|
)
|
||||||
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
# Retrieve environment variables
|
||||||
|
REPO_NAME = os.getenv('REPO_NAME')
|
||||||
|
BRANCH_NAME = os.getenv('BRANCH_NAME')
|
||||||
|
COMMIT_ID = os.getenv('VERSION')
|
||||||
|
NAMESPACE = os.getenv('NAMESPACE')
|
||||||
|
FLOWX_ENGINE_ADDRESS = os.getenv('FLOWX_ENGINE_ADDRESS')
|
||||||
|
|
||||||
|
if not BRANCH_NAME or not COMMIT_ID or not BRANCH_NAME or not NAMESPACE or not FLOWX_ENGINE_ADDRESS:
|
||||||
|
raise ValueError("Environment variables BRANCH_NAME, VERSION, BRANCH_NAME, NAMESPACE and FLOWX_ENGINE_ADDRESS must be set.")
|
||||||
|
|
||||||
|
|
||||||
|
# Shorten the commit ID to the first 10 characters
|
||||||
|
COMMIT_ID_SHORT = COMMIT_ID[:10]
|
||||||
|
|
||||||
|
# Sanitize flow name and commit ID to create a valid task queue name
|
||||||
|
def sanitize_name(name):
|
||||||
|
# Replace non-alphanumeric characters or invalid start with underscores
|
||||||
|
sanitized = re.sub(r'\W|^(?=\d)', '_', name)
|
||||||
|
# Replace multiple consecutive underscores with a single underscore
|
||||||
|
sanitized = re.sub(r'_+', '_', sanitized)
|
||||||
|
# Remove trailing underscores
|
||||||
|
return sanitized.strip('_')
|
||||||
|
|
||||||
|
FLOW_NAME = REPO_NAME + "_" + BRANCH_NAME
|
||||||
|
flow_name_safe = sanitize_name(FLOW_NAME)
|
||||||
|
commit_id_safe = sanitize_name(COMMIT_ID_SHORT)
|
||||||
|
|
||||||
|
# Construct the task queue name
|
||||||
|
# TASK_QUEUE = f"{flow_name_safe}_{commit_id_safe}"
|
||||||
|
TASK_QUEUE = flow_name_safe
|
||||||
|
|
||||||
|
# Import the default workflow module
|
||||||
|
workflow_module_name = "workflow" # Hardcoded to the default module name 'workflow.py'
|
||||||
|
try:
|
||||||
|
workflow_module = __import__(workflow_module_name)
|
||||||
|
except ImportError as e:
|
||||||
|
raise ImportError(f"Failed to import workflow module '{workflow_module_name}': {e}")
|
||||||
|
|
||||||
|
# Get the workflow class
|
||||||
|
# Assuming the workflow class is named as <FlowName>Workflow, e.g., HybridWorkflow
|
||||||
|
# workflow_class_name = f"{flow_name_safe}_{commit_id_safe}"
|
||||||
|
workflow_class_name = flow_name_safe
|
||||||
|
workflow_class = getattr(workflow_module, workflow_class_name, None)
|
||||||
|
|
||||||
|
if not workflow_class:
|
||||||
|
raise AttributeError(f"Workflow class '{workflow_class_name}' not found in module '{workflow_module_name}'.")
|
||||||
|
|
||||||
|
async def main():
|
||||||
|
"""
|
||||||
|
Initialize and run the worker with the activity.
|
||||||
|
"""
|
||||||
|
try:
|
||||||
|
client = await Client.connect(FLOWX_ENGINE_ADDRESS, namespace=NAMESPACE)
|
||||||
|
# No activities are registered since they are in separate containers
|
||||||
|
worker = Worker(
|
||||||
|
client,
|
||||||
|
task_queue=TASK_QUEUE,
|
||||||
|
workflows=[workflow_class],
|
||||||
|
)
|
||||||
|
logger.info("Worker starting for %s, listening to task queue: %s", workflow_class_name, TASK_QUEUE)
|
||||||
|
await worker.run()
|
||||||
|
except Exception as e:
|
||||||
|
logger.critical("Worker failed to start: %s", e)
|
||||||
|
raise
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
asyncio.run(main())
|
||||||
161
generator.py
Normal file
161
generator.py
Normal file
@ -0,0 +1,161 @@
|
|||||||
|
import json
|
||||||
|
import os
|
||||||
|
import sys
|
||||||
|
from jinja2 import Environment, FileSystemLoader
|
||||||
|
import networkx as nx
|
||||||
|
import re # Import the re module for regular expressions
|
||||||
|
import argparse # Import the argparse module for parsing arguments
|
||||||
|
|
||||||
|
# Define paths for templates and output
|
||||||
|
TEMPLATE_DIR = 'templates'
|
||||||
|
|
||||||
|
# Load Jinja environment
|
||||||
|
env = Environment(loader=FileSystemLoader(TEMPLATE_DIR),
|
||||||
|
extensions=["jinja2.ext.do"])
|
||||||
|
|
||||||
|
# Add regex_replace filter
|
||||||
|
def regex_replace(s, pattern, repl):
|
||||||
|
return re.sub(pattern, repl, s)
|
||||||
|
|
||||||
|
env.filters['regex_replace'] = regex_replace
|
||||||
|
|
||||||
|
# Retrieve environment variables
|
||||||
|
REPO_NAME = os.getenv('REPO_NAME')
|
||||||
|
BRANCH_NAME = os.getenv('BRANCH_NAME')
|
||||||
|
COMMIT_ID = os.getenv('VERSION')
|
||||||
|
NAMESPACE = os.getenv('NAMESPACE')
|
||||||
|
|
||||||
|
if not BRANCH_NAME or not COMMIT_ID or not BRANCH_NAME or not NAMESPACE:
|
||||||
|
raise ValueError("Environment variables BRANCH_NAME, VERSION, BRANCH_NAME, NAMESPACE must be set.")
|
||||||
|
|
||||||
|
# Shorten the commit ID to the first 10 characters
|
||||||
|
COMMIT_ID_SHORT = COMMIT_ID[:10]
|
||||||
|
|
||||||
|
# Sanitize flow name and commit ID to create a valid task queue name
|
||||||
|
def sanitize_name(name):
|
||||||
|
# Replace non-alphanumeric characters or invalid start with underscores
|
||||||
|
sanitized = re.sub(r'\W|^(?=\d)', '_', name)
|
||||||
|
# Replace multiple consecutive underscores with a single underscore
|
||||||
|
sanitized = re.sub(r'_+', '_', sanitized)
|
||||||
|
# Remove trailing underscores
|
||||||
|
return sanitized.strip('_')
|
||||||
|
|
||||||
|
FLOW_NAME = REPO_NAME + "_" + BRANCH_NAME
|
||||||
|
flow_name_safe = sanitize_name(FLOW_NAME)
|
||||||
|
commit_id_safe = sanitize_name(COMMIT_ID_SHORT)
|
||||||
|
|
||||||
|
# workflow_class_name = f"{flow_name_safe}_{commit_id_safe}"
|
||||||
|
workflow_class_name = flow_name_safe
|
||||||
|
|
||||||
|
def load_flow_definition(file_path):
|
||||||
|
"""Load the flow definition from a JSON file."""
|
||||||
|
try:
|
||||||
|
with open(file_path, "r") as f:
|
||||||
|
return json.load(f)
|
||||||
|
except json.JSONDecodeError as e:
|
||||||
|
print(f"Error loading JSON file '{file_path}': {e}")
|
||||||
|
sys.exit(1)
|
||||||
|
|
||||||
|
def determine_flow_type(flow_definition):
|
||||||
|
"""Determine the flow type based on the edges."""
|
||||||
|
nodes = {node["id"]: node for node in flow_definition["nodes"]}
|
||||||
|
edges = flow_definition["edges"]
|
||||||
|
has_parallel = False
|
||||||
|
for node_id in nodes:
|
||||||
|
outgoing_edges = [e for e in edges if e["source"] == node_id]
|
||||||
|
if len(outgoing_edges) > 1:
|
||||||
|
has_parallel = True
|
||||||
|
break
|
||||||
|
if has_parallel:
|
||||||
|
return "hybrid"
|
||||||
|
elif len(edges) > 0:
|
||||||
|
return "sequential"
|
||||||
|
else:
|
||||||
|
return "parallel"
|
||||||
|
|
||||||
|
def collect_root_input_keys(flow_definition):
|
||||||
|
"""Collect all unique root input keys from the flow definition."""
|
||||||
|
root_input_keys = set()
|
||||||
|
for node in flow_definition.get("nodes", []): # Safeguard for missing nodes
|
||||||
|
properties = node.get("data", {}).get("nodeConfig", {}).get("schema", {}).get("properties", {})
|
||||||
|
for key, value in properties.items():
|
||||||
|
source = value.get("source")
|
||||||
|
if isinstance(source, str) and source.startswith("$root."):
|
||||||
|
root_input_keys.add(source[6:]) # Adjusted to capture full path after $root.
|
||||||
|
return list(root_input_keys)
|
||||||
|
|
||||||
|
def build_execution_graph(flow_definition):
|
||||||
|
"""Builds an execution graph from the flow definition using networkx."""
|
||||||
|
G = nx.DiGraph()
|
||||||
|
nodes = {node["id"]: node for node in flow_definition["nodes"]}
|
||||||
|
edges = flow_definition["edges"]
|
||||||
|
|
||||||
|
# Add nodes
|
||||||
|
for node_id, node in nodes.items():
|
||||||
|
G.add_node(node_id, node=node)
|
||||||
|
|
||||||
|
# Add edges
|
||||||
|
for edge in edges:
|
||||||
|
G.add_edge(edge["source"], edge["target"])
|
||||||
|
|
||||||
|
return G
|
||||||
|
|
||||||
|
def get_execution_steps(G):
|
||||||
|
"""Returns a list of execution steps, each containing nodes that can be run in parallel."""
|
||||||
|
try:
|
||||||
|
levels = list(nx.topological_generations(G))
|
||||||
|
execution_steps = [list(level) for level in levels]
|
||||||
|
return execution_steps
|
||||||
|
except nx.NetworkXUnfeasible:
|
||||||
|
print("Error: Workflow graph has cycles.")
|
||||||
|
sys.exit(1)
|
||||||
|
|
||||||
|
def generate_workflow(flow_definition, output_file):
|
||||||
|
"""Generate the workflow code using the Jinja template."""
|
||||||
|
template = env.get_template('workflow_template.py.j2')
|
||||||
|
flow_type = determine_flow_type(flow_definition)
|
||||||
|
root_input_keys = collect_root_input_keys(flow_definition)
|
||||||
|
|
||||||
|
# Filter out requestNode from nodes
|
||||||
|
filtered_nodes = {node["id"]: node for node in flow_definition["nodes"] if node["type"] != "requestNode"}
|
||||||
|
|
||||||
|
# Filter edges to exclude connections to or from filtered nodes
|
||||||
|
filtered_edges = [
|
||||||
|
edge for edge in flow_definition["edges"]
|
||||||
|
if edge["source"] in filtered_nodes and edge["target"] in filtered_nodes
|
||||||
|
]
|
||||||
|
|
||||||
|
# Build execution graph and steps
|
||||||
|
filtered_flow_definition = {
|
||||||
|
"nodes": list(filtered_nodes.values()),
|
||||||
|
"edges": filtered_edges,
|
||||||
|
}
|
||||||
|
G = build_execution_graph(filtered_flow_definition)
|
||||||
|
execution_steps = get_execution_steps(G)
|
||||||
|
|
||||||
|
# Render the workflow template
|
||||||
|
workflow_code = template.render(
|
||||||
|
workflow_class_name=workflow_class_name,
|
||||||
|
flow_type=flow_type,
|
||||||
|
root_input_keys=root_input_keys,
|
||||||
|
execution_steps=execution_steps,
|
||||||
|
nodes=filtered_nodes
|
||||||
|
)
|
||||||
|
with open(output_file, "w") as f:
|
||||||
|
f.write(workflow_code)
|
||||||
|
print(f"Generated workflow: {output_file}")
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
# Parse command-line arguments
|
||||||
|
parser = argparse.ArgumentParser(description="Generate Temporal workflow from JSON flow definition.")
|
||||||
|
parser.add_argument("--input-file", type=str, required=True,
|
||||||
|
help="Path to the flow definition JSON file.")
|
||||||
|
parser.add_argument("--output-file", type=str, required=True,
|
||||||
|
help="Path to the generated workflow output file.")
|
||||||
|
args = parser.parse_args()
|
||||||
|
|
||||||
|
# Load the flow definition and generate the workflow
|
||||||
|
flow_file = args.input_file
|
||||||
|
output_file = args.output_file
|
||||||
|
flow_def = load_flow_definition(flow_file)
|
||||||
|
generate_workflow(flow_def, output_file)
|
||||||
5
requirements.txt
Normal file
5
requirements.txt
Normal file
@ -0,0 +1,5 @@
|
|||||||
|
temporalio==1.6.0
|
||||||
|
jinja2==3.1.4
|
||||||
|
pytest==8.3.3
|
||||||
|
networkx==3.4.2
|
||||||
|
jmespath==1.0.1
|
||||||
222
sample_flows/flow_hybrid.json
Normal file
222
sample_flows/flow_hybrid.json
Normal file
@ -0,0 +1,222 @@
|
|||||||
|
{
|
||||||
|
"nodes": [
|
||||||
|
{
|
||||||
|
"id": "2",
|
||||||
|
"position": {
|
||||||
|
"x": 0,
|
||||||
|
"y": 0
|
||||||
|
},
|
||||||
|
"data": {
|
||||||
|
"nodeConfig": {
|
||||||
|
"activityConfig": {
|
||||||
|
"retryPolicy": {
|
||||||
|
"maximumAttempts": 0,
|
||||||
|
"initialInterval": 1000,
|
||||||
|
"backoffCoefficient": 2,
|
||||||
|
"maximumInterval": 100000
|
||||||
|
},
|
||||||
|
"timeouts": {
|
||||||
|
"startToCloseTimeout": 0,
|
||||||
|
"scheduleToStartTimeout": 0,
|
||||||
|
"scheduleToCloseTimeout": 0,
|
||||||
|
"heartbeatTimeout": 0
|
||||||
|
},
|
||||||
|
"taskQueue": {
|
||||||
|
"taskQueueName": ""
|
||||||
|
},
|
||||||
|
"advancedSettings": {
|
||||||
|
"cancellationType": "TRY_CANCEL",
|
||||||
|
"heartbeatEnabled": false
|
||||||
|
}
|
||||||
|
},
|
||||||
|
"blockName": "addition",
|
||||||
|
"commitId": "97ec9e0d50ca7308347b28f8c006a475357eb096",
|
||||||
|
"repo_url": "http://centurion-version-control.default.svc.cluster.local:3000/Centurion/blocks_transformer.git",
|
||||||
|
"schema": {
|
||||||
|
"$schema": "http://json-schema.org/draft-07/schema#",
|
||||||
|
"type": "object",
|
||||||
|
"properties": {
|
||||||
|
"a": {
|
||||||
|
"type": "integer",
|
||||||
|
"description": "First number to add.",
|
||||||
|
"source": "$root.a"
|
||||||
|
},
|
||||||
|
"b": {
|
||||||
|
"type": "integer",
|
||||||
|
"description": "Second number to add.",
|
||||||
|
"source": "$root.b"
|
||||||
|
}
|
||||||
|
},
|
||||||
|
"required": [
|
||||||
|
"a",
|
||||||
|
"b"
|
||||||
|
]
|
||||||
|
}
|
||||||
|
},
|
||||||
|
"outputSchema": {
|
||||||
|
"$schema": "http://json-schema.org/draft-07/schema#",
|
||||||
|
"type": "object",
|
||||||
|
"properties": {
|
||||||
|
"sum": {
|
||||||
|
"type": "integer",
|
||||||
|
"description": "Sum of the two numbers."
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
},
|
||||||
|
"type": "transformerNode",
|
||||||
|
"positionAbsolute": {
|
||||||
|
"x": 0,
|
||||||
|
"y": 0
|
||||||
|
},
|
||||||
|
"width": 260,
|
||||||
|
"height": 79
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"id": "m3aiq7ixuo6du35h8tr",
|
||||||
|
"position": {
|
||||||
|
"x": 0,
|
||||||
|
"y": 150
|
||||||
|
},
|
||||||
|
"data": {
|
||||||
|
"nodeConfig": {
|
||||||
|
"activityConfig": {
|
||||||
|
"retryPolicy": {
|
||||||
|
"maximumAttempts": 0,
|
||||||
|
"initialInterval": 1000,
|
||||||
|
"backoffCoefficient": 2,
|
||||||
|
"maximumInterval": 100000
|
||||||
|
},
|
||||||
|
"timeouts": {
|
||||||
|
"startToCloseTimeout": 0,
|
||||||
|
"scheduleToStartTimeout": 0,
|
||||||
|
"scheduleToCloseTimeout": 0,
|
||||||
|
"heartbeatTimeout": 0
|
||||||
|
},
|
||||||
|
"taskQueue": {
|
||||||
|
"taskQueueName": ""
|
||||||
|
},
|
||||||
|
"advancedSettings": {
|
||||||
|
"cancellationType": "TRY_CANCEL",
|
||||||
|
"heartbeatEnabled": false
|
||||||
|
}
|
||||||
|
},
|
||||||
|
"blockName": "multiply",
|
||||||
|
"commitId": "db086f09c9df60f622c9be47734422454f5e181f",
|
||||||
|
"repo_url": "http://centurion-version-control.default.svc.cluster.local:3000/Centurion/blocks_transformer.git",
|
||||||
|
"schema": {
|
||||||
|
"$schema": "http://json-schema.org/draft-07/schema#",
|
||||||
|
"type": "object",
|
||||||
|
"properties": {
|
||||||
|
"sum": {
|
||||||
|
"type": "integer",
|
||||||
|
"description": "Sum from the previous block.",
|
||||||
|
"source": "$2.sum"
|
||||||
|
}
|
||||||
|
},
|
||||||
|
"required": [
|
||||||
|
"sum"
|
||||||
|
]
|
||||||
|
}
|
||||||
|
},
|
||||||
|
"outputSchema": {
|
||||||
|
"$schema": "http://json-schema.org/draft-07/schema#",
|
||||||
|
"type": "object",
|
||||||
|
"properties": {
|
||||||
|
"product": {
|
||||||
|
"type": "integer",
|
||||||
|
"description": "Product of the sum and the multiplier."
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
},
|
||||||
|
"type": "transformerNode",
|
||||||
|
"positionAbsolute": {
|
||||||
|
"x": 0,
|
||||||
|
"y": 150
|
||||||
|
},
|
||||||
|
"width": 260,
|
||||||
|
"height": 79
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"id": "m3aiqkrv4k1y6654ymr",
|
||||||
|
"position": {
|
||||||
|
"x": 0,
|
||||||
|
"y": 300
|
||||||
|
},
|
||||||
|
"data": {
|
||||||
|
"nodeConfig": {
|
||||||
|
"activityConfig": {
|
||||||
|
"retryPolicy": {
|
||||||
|
"maximumAttempts": 0,
|
||||||
|
"initialInterval": 1000,
|
||||||
|
"backoffCoefficient": 2,
|
||||||
|
"maximumInterval": 100000
|
||||||
|
},
|
||||||
|
"timeouts": {
|
||||||
|
"startToCloseTimeout": 0,
|
||||||
|
"scheduleToStartTimeout": 0,
|
||||||
|
"scheduleToCloseTimeout": 0,
|
||||||
|
"heartbeatTimeout": 0
|
||||||
|
},
|
||||||
|
"taskQueue": {
|
||||||
|
"taskQueueName": ""
|
||||||
|
},
|
||||||
|
"advancedSettings": {
|
||||||
|
"cancellationType": "TRY_CANCEL",
|
||||||
|
"heartbeatEnabled": false
|
||||||
|
}
|
||||||
|
},
|
||||||
|
"blockName": "power",
|
||||||
|
"commitId": "057645be0cace46e3ea08d65f26fbc6dfd9348bd",
|
||||||
|
"repo_url": "http://centurion-version-control.default.svc.cluster.local:3000/Centurion/blocks_transformer.git",
|
||||||
|
"schema": {
|
||||||
|
"$schema": "http://json-schema.org/draft-07/schema#",
|
||||||
|
"type": "object",
|
||||||
|
"properties": {
|
||||||
|
"product": {
|
||||||
|
"type": "integer",
|
||||||
|
"description": "Product from the previous block.",
|
||||||
|
"source": "$2.sum"
|
||||||
|
}
|
||||||
|
},
|
||||||
|
"required": [
|
||||||
|
"product"
|
||||||
|
]
|
||||||
|
}
|
||||||
|
},
|
||||||
|
"outputSchema": {
|
||||||
|
"$schema": "http://json-schema.org/draft-07/schema#",
|
||||||
|
"type": "object",
|
||||||
|
"properties": {
|
||||||
|
"power": {
|
||||||
|
"type": "integer",
|
||||||
|
"description": "Result of raising the product to the power."
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
},
|
||||||
|
"type": "transformerNode",
|
||||||
|
"positionAbsolute": {
|
||||||
|
"x": 0,
|
||||||
|
"y": 300
|
||||||
|
},
|
||||||
|
"width": 260,
|
||||||
|
"height": 79
|
||||||
|
}
|
||||||
|
],
|
||||||
|
"edges": [
|
||||||
|
{
|
||||||
|
"id": "2=>m3aiq7ixuo6du35h8tr",
|
||||||
|
"source": "2",
|
||||||
|
"target": "m3aiq7ixuo6du35h8tr",
|
||||||
|
"type": "workflow"
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"id": "2=>m3aiqkrv4k1y6654ymr",
|
||||||
|
"source": "2",
|
||||||
|
"target": "m3aiqkrv4k1y6654ymr",
|
||||||
|
"type": "workflow"
|
||||||
|
}
|
||||||
|
]
|
||||||
|
}
|
||||||
209
sample_flows/flow_parallel.json
Normal file
209
sample_flows/flow_parallel.json
Normal file
@ -0,0 +1,209 @@
|
|||||||
|
{
|
||||||
|
"nodes": [
|
||||||
|
{
|
||||||
|
"id": "2",
|
||||||
|
"position": {
|
||||||
|
"x": 0,
|
||||||
|
"y": 0
|
||||||
|
},
|
||||||
|
"data": {
|
||||||
|
"nodeConfig": {
|
||||||
|
"activityConfig": {
|
||||||
|
"retryPolicy": {
|
||||||
|
"maximumAttempts": 0,
|
||||||
|
"initialInterval": 1000,
|
||||||
|
"backoffCoefficient": 2,
|
||||||
|
"maximumInterval": 100000
|
||||||
|
},
|
||||||
|
"timeouts": {
|
||||||
|
"startToCloseTimeout": 0,
|
||||||
|
"scheduleToStartTimeout": 0,
|
||||||
|
"scheduleToCloseTimeout": 0,
|
||||||
|
"heartbeatTimeout": 0
|
||||||
|
},
|
||||||
|
"taskQueue": {
|
||||||
|
"taskQueueName": ""
|
||||||
|
},
|
||||||
|
"advancedSettings": {
|
||||||
|
"cancellationType": "TRY_CANCEL",
|
||||||
|
"heartbeatEnabled": false
|
||||||
|
}
|
||||||
|
},
|
||||||
|
"blockName": "addition",
|
||||||
|
"commitId": "97ec9e0d50ca7308347b28f8c006a475357eb096",
|
||||||
|
"repo_url": "http://centurion-version-control.default.svc.cluster.local:3000/Centurion/blocks_transformer.git",
|
||||||
|
"schema": {
|
||||||
|
"$schema": "http://json-schema.org/draft-07/schema#",
|
||||||
|
"type": "object",
|
||||||
|
"properties": {
|
||||||
|
"a": {
|
||||||
|
"type": "integer",
|
||||||
|
"description": "First number to add.",
|
||||||
|
"source": "$root.a"
|
||||||
|
},
|
||||||
|
"b": {
|
||||||
|
"type": "integer",
|
||||||
|
"description": "Second number to add.",
|
||||||
|
"source": "$root.b"
|
||||||
|
}
|
||||||
|
},
|
||||||
|
"required": [
|
||||||
|
"a",
|
||||||
|
"b"
|
||||||
|
]
|
||||||
|
}
|
||||||
|
},
|
||||||
|
"outputSchema": {
|
||||||
|
"$schema": "http://json-schema.org/draft-07/schema#",
|
||||||
|
"type": "object",
|
||||||
|
"properties": {
|
||||||
|
"sum": {
|
||||||
|
"type": "integer",
|
||||||
|
"description": "Sum of the two numbers."
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
},
|
||||||
|
"type": "transformerNode",
|
||||||
|
"positionAbsolute": {
|
||||||
|
"x": 0,
|
||||||
|
"y": 0
|
||||||
|
},
|
||||||
|
"width": 260,
|
||||||
|
"height": 79
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"id": "m3aiq7ixuo6du35h8tr",
|
||||||
|
"position": {
|
||||||
|
"x": 0,
|
||||||
|
"y": 150
|
||||||
|
},
|
||||||
|
"data": {
|
||||||
|
"nodeConfig": {
|
||||||
|
"activityConfig": {
|
||||||
|
"retryPolicy": {
|
||||||
|
"maximumAttempts": 0,
|
||||||
|
"initialInterval": 1000,
|
||||||
|
"backoffCoefficient": 2,
|
||||||
|
"maximumInterval": 100000
|
||||||
|
},
|
||||||
|
"timeouts": {
|
||||||
|
"startToCloseTimeout": 0,
|
||||||
|
"scheduleToStartTimeout": 0,
|
||||||
|
"scheduleToCloseTimeout": 0,
|
||||||
|
"heartbeatTimeout": 0
|
||||||
|
},
|
||||||
|
"taskQueue": {
|
||||||
|
"taskQueueName": ""
|
||||||
|
},
|
||||||
|
"advancedSettings": {
|
||||||
|
"cancellationType": "TRY_CANCEL",
|
||||||
|
"heartbeatEnabled": false
|
||||||
|
}
|
||||||
|
},
|
||||||
|
"blockName": "multiply",
|
||||||
|
"commitId": "db086f09c9df60f622c9be47734422454f5e181f",
|
||||||
|
"repo_url": "http://centurion-version-control.default.svc.cluster.local:3000/Centurion/blocks_transformer.git",
|
||||||
|
"schema": {
|
||||||
|
"$schema": "http://json-schema.org/draft-07/schema#",
|
||||||
|
"type": "object",
|
||||||
|
"properties": {
|
||||||
|
"sum": {
|
||||||
|
"type": "integer",
|
||||||
|
"description": "Sum from the previous block.",
|
||||||
|
"source": "$root.sum"
|
||||||
|
}
|
||||||
|
},
|
||||||
|
"required": [
|
||||||
|
"sum"
|
||||||
|
]
|
||||||
|
}
|
||||||
|
},
|
||||||
|
"outputSchema": {
|
||||||
|
"$schema": "http://json-schema.org/draft-07/schema#",
|
||||||
|
"type": "object",
|
||||||
|
"properties": {
|
||||||
|
"product": {
|
||||||
|
"type": "integer",
|
||||||
|
"description": "Product of the sum and the multiplier."
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
},
|
||||||
|
"type": "transformerNode",
|
||||||
|
"positionAbsolute": {
|
||||||
|
"x": 0,
|
||||||
|
"y": 150
|
||||||
|
},
|
||||||
|
"width": 260,
|
||||||
|
"height": 79
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"id": "m3aiqkrv4k1y6654ymr",
|
||||||
|
"position": {
|
||||||
|
"x": 0,
|
||||||
|
"y": 300
|
||||||
|
},
|
||||||
|
"data": {
|
||||||
|
"nodeConfig": {
|
||||||
|
"activityConfig": {
|
||||||
|
"retryPolicy": {
|
||||||
|
"maximumAttempts": 0,
|
||||||
|
"initialInterval": 1000,
|
||||||
|
"backoffCoefficient": 2,
|
||||||
|
"maximumInterval": 100000
|
||||||
|
},
|
||||||
|
"timeouts": {
|
||||||
|
"startToCloseTimeout": 0,
|
||||||
|
"scheduleToStartTimeout": 0,
|
||||||
|
"scheduleToCloseTimeout": 0,
|
||||||
|
"heartbeatTimeout": 0
|
||||||
|
},
|
||||||
|
"taskQueue": {
|
||||||
|
"taskQueueName": ""
|
||||||
|
},
|
||||||
|
"advancedSettings": {
|
||||||
|
"cancellationType": "TRY_CANCEL",
|
||||||
|
"heartbeatEnabled": false
|
||||||
|
}
|
||||||
|
},
|
||||||
|
"blockName": "power",
|
||||||
|
"commitId": "057645be0cace46e3ea08d65f26fbc6dfd9348bd",
|
||||||
|
"repo_url": "http://centurion-version-control.default.svc.cluster.local:3000/Centurion/blocks_transformer.git",
|
||||||
|
"schema": {
|
||||||
|
"$schema": "http://json-schema.org/draft-07/schema#",
|
||||||
|
"type": "object",
|
||||||
|
"properties": {
|
||||||
|
"product": {
|
||||||
|
"type": "integer",
|
||||||
|
"description": "Product from the previous block.",
|
||||||
|
"source": "$root.product"
|
||||||
|
}
|
||||||
|
},
|
||||||
|
"required": [
|
||||||
|
"product"
|
||||||
|
]
|
||||||
|
}
|
||||||
|
},
|
||||||
|
"outputSchema": {
|
||||||
|
"$schema": "http://json-schema.org/draft-07/schema#",
|
||||||
|
"type": "object",
|
||||||
|
"properties": {
|
||||||
|
"power": {
|
||||||
|
"type": "integer",
|
||||||
|
"description": "Result of raising the product to the power."
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
},
|
||||||
|
"type": "transformerNode",
|
||||||
|
"positionAbsolute": {
|
||||||
|
"x": 0,
|
||||||
|
"y": 300
|
||||||
|
},
|
||||||
|
"width": 260,
|
||||||
|
"height": 79
|
||||||
|
}
|
||||||
|
],
|
||||||
|
"edges": []
|
||||||
|
}
|
||||||
222
sample_flows/flow_sequential.json
Normal file
222
sample_flows/flow_sequential.json
Normal file
@ -0,0 +1,222 @@
|
|||||||
|
{
|
||||||
|
"nodes": [
|
||||||
|
{
|
||||||
|
"id": "2",
|
||||||
|
"position": {
|
||||||
|
"x": 0,
|
||||||
|
"y": 0
|
||||||
|
},
|
||||||
|
"data": {
|
||||||
|
"nodeConfig": {
|
||||||
|
"activityConfig": {
|
||||||
|
"retryPolicy": {
|
||||||
|
"maximumAttempts": 0,
|
||||||
|
"initialInterval": 1000,
|
||||||
|
"backoffCoefficient": 2,
|
||||||
|
"maximumInterval": 100000
|
||||||
|
},
|
||||||
|
"timeouts": {
|
||||||
|
"startToCloseTimeout": 0,
|
||||||
|
"scheduleToStartTimeout": 0,
|
||||||
|
"scheduleToCloseTimeout": 0,
|
||||||
|
"heartbeatTimeout": 0
|
||||||
|
},
|
||||||
|
"taskQueue": {
|
||||||
|
"taskQueueName": ""
|
||||||
|
},
|
||||||
|
"advancedSettings": {
|
||||||
|
"cancellationType": "TRY_CANCEL",
|
||||||
|
"heartbeatEnabled": false
|
||||||
|
}
|
||||||
|
},
|
||||||
|
"blockName": "addition",
|
||||||
|
"commitId": "97ec9e0d50ca7308347b28f8c006a475357eb096",
|
||||||
|
"repo_url": "http://centurion-version-control.default.svc.cluster.local:3000/Centurion/blocks_transformer.git",
|
||||||
|
"schema": {
|
||||||
|
"$schema": "http://json-schema.org/draft-07/schema#",
|
||||||
|
"type": "object",
|
||||||
|
"properties": {
|
||||||
|
"a": {
|
||||||
|
"type": "integer",
|
||||||
|
"description": "First number to add.",
|
||||||
|
"source": "$root.a"
|
||||||
|
},
|
||||||
|
"b": {
|
||||||
|
"type": "integer",
|
||||||
|
"description": "Second number to add.",
|
||||||
|
"source": "$root.b"
|
||||||
|
}
|
||||||
|
},
|
||||||
|
"required": [
|
||||||
|
"a",
|
||||||
|
"b"
|
||||||
|
]
|
||||||
|
}
|
||||||
|
},
|
||||||
|
"outputSchema": {
|
||||||
|
"$schema": "http://json-schema.org/draft-07/schema#",
|
||||||
|
"type": "object",
|
||||||
|
"properties": {
|
||||||
|
"sum": {
|
||||||
|
"type": "integer",
|
||||||
|
"description": "Sum of the two numbers."
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
},
|
||||||
|
"type": "transformerNode",
|
||||||
|
"positionAbsolute": {
|
||||||
|
"x": 0,
|
||||||
|
"y": 0
|
||||||
|
},
|
||||||
|
"width": 260,
|
||||||
|
"height": 79
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"id": "m3aiq7ixuo6du35h8tr",
|
||||||
|
"position": {
|
||||||
|
"x": 0,
|
||||||
|
"y": 150
|
||||||
|
},
|
||||||
|
"data": {
|
||||||
|
"nodeConfig": {
|
||||||
|
"activityConfig": {
|
||||||
|
"retryPolicy": {
|
||||||
|
"maximumAttempts": 0,
|
||||||
|
"initialInterval": 1000,
|
||||||
|
"backoffCoefficient": 2,
|
||||||
|
"maximumInterval": 100000
|
||||||
|
},
|
||||||
|
"timeouts": {
|
||||||
|
"startToCloseTimeout": 0,
|
||||||
|
"scheduleToStartTimeout": 0,
|
||||||
|
"scheduleToCloseTimeout": 0,
|
||||||
|
"heartbeatTimeout": 0
|
||||||
|
},
|
||||||
|
"taskQueue": {
|
||||||
|
"taskQueueName": ""
|
||||||
|
},
|
||||||
|
"advancedSettings": {
|
||||||
|
"cancellationType": "TRY_CANCEL",
|
||||||
|
"heartbeatEnabled": false
|
||||||
|
}
|
||||||
|
},
|
||||||
|
"blockName": "multiply",
|
||||||
|
"commitId": "db086f09c9df60f622c9be47734422454f5e181f",
|
||||||
|
"repo_url": "http://centurion-version-control.default.svc.cluster.local:3000/Centurion/blocks_transformer.git",
|
||||||
|
"schema": {
|
||||||
|
"$schema": "http://json-schema.org/draft-07/schema#",
|
||||||
|
"type": "object",
|
||||||
|
"properties": {
|
||||||
|
"sum": {
|
||||||
|
"type": "integer",
|
||||||
|
"description": "Sum from the previous block.",
|
||||||
|
"source": "$2.sum"
|
||||||
|
}
|
||||||
|
},
|
||||||
|
"required": [
|
||||||
|
"sum"
|
||||||
|
]
|
||||||
|
}
|
||||||
|
},
|
||||||
|
"outputSchema": {
|
||||||
|
"$schema": "http://json-schema.org/draft-07/schema#",
|
||||||
|
"type": "object",
|
||||||
|
"properties": {
|
||||||
|
"product": {
|
||||||
|
"type": "integer",
|
||||||
|
"description": "Product of the sum and the multiplier."
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
},
|
||||||
|
"type": "transformerNode",
|
||||||
|
"positionAbsolute": {
|
||||||
|
"x": 0,
|
||||||
|
"y": 150
|
||||||
|
},
|
||||||
|
"width": 260,
|
||||||
|
"height": 79
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"id": "m3aiqkrv4k1y6654ymr",
|
||||||
|
"position": {
|
||||||
|
"x": 0,
|
||||||
|
"y": 300
|
||||||
|
},
|
||||||
|
"data": {
|
||||||
|
"nodeConfig": {
|
||||||
|
"activityConfig": {
|
||||||
|
"retryPolicy": {
|
||||||
|
"maximumAttempts": 0,
|
||||||
|
"initialInterval": 1000,
|
||||||
|
"backoffCoefficient": 2,
|
||||||
|
"maximumInterval": 100000
|
||||||
|
},
|
||||||
|
"timeouts": {
|
||||||
|
"startToCloseTimeout": 0,
|
||||||
|
"scheduleToStartTimeout": 0,
|
||||||
|
"scheduleToCloseTimeout": 0,
|
||||||
|
"heartbeatTimeout": 0
|
||||||
|
},
|
||||||
|
"taskQueue": {
|
||||||
|
"taskQueueName": ""
|
||||||
|
},
|
||||||
|
"advancedSettings": {
|
||||||
|
"cancellationType": "TRY_CANCEL",
|
||||||
|
"heartbeatEnabled": false
|
||||||
|
}
|
||||||
|
},
|
||||||
|
"blockName": "power",
|
||||||
|
"commitId": "057645be0cace46e3ea08d65f26fbc6dfd9348bd",
|
||||||
|
"repo_url": "http://centurion-version-control.default.svc.cluster.local:3000/Centurion/blocks_transformer.git",
|
||||||
|
"schema": {
|
||||||
|
"$schema": "http://json-schema.org/draft-07/schema#",
|
||||||
|
"type": "object",
|
||||||
|
"properties": {
|
||||||
|
"product": {
|
||||||
|
"type": "integer",
|
||||||
|
"description": "Product from the previous block.",
|
||||||
|
"source": "$m3aiq7ixuo6du35h8tr.product"
|
||||||
|
}
|
||||||
|
},
|
||||||
|
"required": [
|
||||||
|
"product"
|
||||||
|
]
|
||||||
|
}
|
||||||
|
},
|
||||||
|
"outputSchema": {
|
||||||
|
"$schema": "http://json-schema.org/draft-07/schema#",
|
||||||
|
"type": "object",
|
||||||
|
"properties": {
|
||||||
|
"power": {
|
||||||
|
"type": "integer",
|
||||||
|
"description": "Result of raising the product to the power."
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
},
|
||||||
|
"type": "transformerNode",
|
||||||
|
"positionAbsolute": {
|
||||||
|
"x": 0,
|
||||||
|
"y": 300
|
||||||
|
},
|
||||||
|
"width": 260,
|
||||||
|
"height": 79
|
||||||
|
}
|
||||||
|
],
|
||||||
|
"edges": [
|
||||||
|
{
|
||||||
|
"id": "2=>m3aiq7ixuo6du35h8tr",
|
||||||
|
"source": "2",
|
||||||
|
"target": "m3aiq7ixuo6du35h8tr",
|
||||||
|
"type": "workflow"
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"id": "m3aiq7ixuo6du35h8tr=>m3aiqkrv4k1y6654ymr",
|
||||||
|
"source": "m3aiq7ixuo6du35h8tr",
|
||||||
|
"target": "m3aiqkrv4k1y6654ymr",
|
||||||
|
"type": "workflow"
|
||||||
|
}
|
||||||
|
]
|
||||||
|
}
|
||||||
153
templates/workflow_template.py.j2
Normal file
153
templates/workflow_template.py.j2
Normal file
@ -0,0 +1,153 @@
|
|||||||
|
import temporalio.workflow
|
||||||
|
from typing import Any, Dict, List, Callable, Awaitable
|
||||||
|
import logging
|
||||||
|
import asyncio
|
||||||
|
import json
|
||||||
|
import datetime
|
||||||
|
import re
|
||||||
|
import jmespath
|
||||||
|
from temporalio.exceptions import ApplicationError
|
||||||
|
|
||||||
|
# Configure logging
|
||||||
|
logging.basicConfig(level=logging.INFO,
|
||||||
|
format="%(asctime)s [%(levelname)s] %(message)s")
|
||||||
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
@temporalio.workflow.defn
|
||||||
|
class {{ workflow_class_name }}:
|
||||||
|
@temporalio.workflow.run
|
||||||
|
async def run(self, root_inputs: Dict[str, Any]) -> Dict[str, Any]:
|
||||||
|
workflow_info = temporalio.workflow.info()
|
||||||
|
workflow_output: Dict[str, Any] = {
|
||||||
|
"workflow_id": workflow_info.workflow_id,
|
||||||
|
"run_id": workflow_info.run_id,
|
||||||
|
"name": "{{ workflow_class_name }}",
|
||||||
|
"status": "in_progress",
|
||||||
|
"blocks": [],
|
||||||
|
"root_input": root_inputs
|
||||||
|
}
|
||||||
|
try:
|
||||||
|
# Initialize results
|
||||||
|
results: Dict[str, Any] = {}
|
||||||
|
|
||||||
|
# Define task functions
|
||||||
|
task_functions: Dict[str, Callable[[], Awaitable[Any]]] = {}
|
||||||
|
|
||||||
|
{%- for node_id, node in nodes.items() %}
|
||||||
|
{%- set block_name = node['data']['nodeConfig']['blockName'] %}
|
||||||
|
{%- set commit_id = node['data']['nodeConfig'].get('commitId', '') %}
|
||||||
|
{%- set commit_id_short = commit_id[:10] %}
|
||||||
|
{%- set repo_url = node['data']['nodeConfig']['repo_url'] %}
|
||||||
|
{%- set repo_name_1 = repo_url.split('/')[-1].split('.')[0] %}
|
||||||
|
{%- set repo_name = repo_name_1|regex_replace('[^A-Za-z0-9_]', '_') %}
|
||||||
|
{%- set block_name_safe = block_name|regex_replace('[^A-Za-z0-9_]', '_') %}
|
||||||
|
{%- set commit_id_short_safe = commit_id_short|regex_replace('[^A-Za-z0-9_]', '_') %}
|
||||||
|
{%- set task_queue_name = repo_name + '_' + block_name_safe + '_' + commit_id_short_safe %}
|
||||||
|
async def task_{{ node_id }}():
|
||||||
|
node_id = "{{ node_id }}"
|
||||||
|
block_name = "{{ block_name }}"
|
||||||
|
# Prepare inputs
|
||||||
|
input_params: Dict[str, Any] = {}
|
||||||
|
{%- for param, details in node['data']['nodeConfig']['schema']['properties'].items() %}
|
||||||
|
{%- set source = details.get("source", "") %}
|
||||||
|
{%- if source and source.startswith("$root.") %}
|
||||||
|
try:
|
||||||
|
jsonpath_expr = jmespath.compile("{{ source[6:] }}")
|
||||||
|
value = jsonpath_expr.search(root_inputs)
|
||||||
|
input_params["{{ param }}"] = value
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"Error parsing jsonpath '{{ source[6:] }}' for parameter '{{ param }}': {e}")
|
||||||
|
input_params["{{ param }}"] = None
|
||||||
|
{%- elif source and source.startswith("$") %}
|
||||||
|
{%- set source_parts = source[1:].split('.', 1) %}
|
||||||
|
{%- set source_node_id = source_parts[0] %}
|
||||||
|
{%- set source_path = source_parts[1] if source_parts|length > 1 else '' %}
|
||||||
|
try:
|
||||||
|
source_data = results.get("{{ source_node_id }}", {})
|
||||||
|
jsonpath_expr = jmespath.compile("{{ source_path }}")
|
||||||
|
value = jsonpath_expr.search(source_data)
|
||||||
|
input_params["{{ param }}"] = value
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"Error parsing jsonpath '{{ source_path }}' for parameter '{{ param }}' from node '{{ source_node_id }}': {e}")
|
||||||
|
input_params["{{ param }}"] = None
|
||||||
|
{%- else %}
|
||||||
|
input_params["{{ param }}"] = None
|
||||||
|
{%- endif %}
|
||||||
|
{%- endfor %}
|
||||||
|
logger.info(f"Starting '{{ block_name }}' activity on task queue '{{ task_queue_name }}' with inputs: %s", input_params)
|
||||||
|
try:
|
||||||
|
# Convert timeouts and intervals from milliseconds to seconds
|
||||||
|
schedule_to_close_timeout_value_ms = {{ node['data']['nodeConfig']['activityConfig']['timeouts']['scheduleToCloseTimeout'] }}
|
||||||
|
start_to_close_timeout_value_ms = {{ node['data']['nodeConfig']['activityConfig']['timeouts']['startToCloseTimeout'] }}
|
||||||
|
schedule_to_close_timeout = None if schedule_to_close_timeout_value_ms == 0 else datetime.timedelta(seconds=schedule_to_close_timeout_value_ms / 1000.0)
|
||||||
|
start_to_close_timeout = None if start_to_close_timeout_value_ms == 0 else datetime.timedelta(seconds=start_to_close_timeout_value_ms / 1000.0)
|
||||||
|
initial_interval_value_ms = {{ node['data']['nodeConfig']['activityConfig']['retryPolicy']['initialInterval'] }}
|
||||||
|
maximum_interval_value_ms = {{ node['data']['nodeConfig']['activityConfig']['retryPolicy']['maximumInterval'] }}
|
||||||
|
initial_interval = datetime.timedelta(seconds=initial_interval_value_ms / 1000.0)
|
||||||
|
maximum_interval = datetime.timedelta(seconds=maximum_interval_value_ms / 1000.0)
|
||||||
|
maximum_attempts_value = {{ node['data']['nodeConfig']['activityConfig']['retryPolicy']['maximumAttempts'] }}
|
||||||
|
maximum_attempts = None if maximum_attempts_value == 0 else maximum_attempts_value
|
||||||
|
|
||||||
|
result = await temporalio.workflow.execute_activity(
|
||||||
|
"block_main_activity",
|
||||||
|
input_params,
|
||||||
|
schedule_to_close_timeout=schedule_to_close_timeout,
|
||||||
|
start_to_close_timeout=start_to_close_timeout,
|
||||||
|
task_queue="{{ task_queue_name }}",
|
||||||
|
retry_policy=temporalio.common.RetryPolicy(
|
||||||
|
maximum_attempts=maximum_attempts,
|
||||||
|
initial_interval=initial_interval,
|
||||||
|
backoff_coefficient={{ node['data']['nodeConfig']['activityConfig']['retryPolicy']['backoffCoefficient'] }},
|
||||||
|
maximum_interval=maximum_interval
|
||||||
|
)
|
||||||
|
)
|
||||||
|
logger.info(f"Completed '{{ block_name }}' activity with result: %s", result)
|
||||||
|
block_status = "completed"
|
||||||
|
block_error = None
|
||||||
|
results[node_id] = result
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"Activity '{{ block_name }}' failed with error: {e}")
|
||||||
|
result = None
|
||||||
|
block_status = "failed"
|
||||||
|
block_error = {
|
||||||
|
"code": type(e).__name__,
|
||||||
|
"description": str(e),
|
||||||
|
"details": {"cause": str(getattr(e, "cause", "No additional details"))}
|
||||||
|
}
|
||||||
|
workflow_output["status"] = "failed"
|
||||||
|
# Collect block output
|
||||||
|
workflow_output["blocks"].append({
|
||||||
|
"activity_id": node_id,
|
||||||
|
"name": block_name,
|
||||||
|
"status": block_status,
|
||||||
|
"input": input_params,
|
||||||
|
"result": result,
|
||||||
|
"error": block_error
|
||||||
|
})
|
||||||
|
|
||||||
|
task_functions["{{ node_id }}"] = task_{{ node_id }}
|
||||||
|
{%- endfor %}
|
||||||
|
|
||||||
|
# Execute tasks according to execution steps
|
||||||
|
{%- for step in execution_steps %}
|
||||||
|
# Execution step {{ loop.index }}
|
||||||
|
tasks = [task_functions[node_id]() for node_id in {{ step }}]
|
||||||
|
results_step = await asyncio.gather(*tasks, return_exceptions=True)
|
||||||
|
for result in results_step:
|
||||||
|
if isinstance(result, Exception):
|
||||||
|
logger.error(f"Task failed with exception: {result}")
|
||||||
|
workflow_output["status"] = "failed"
|
||||||
|
{%- endfor %}
|
||||||
|
|
||||||
|
# Update workflow status to completed if not failed
|
||||||
|
if workflow_output["status"] != "failed":
|
||||||
|
workflow_output["status"] = "completed"
|
||||||
|
else:
|
||||||
|
raise ApplicationError("Activity error occurred", type="ActivityError", non_retryable=True)
|
||||||
|
|
||||||
|
return workflow_output
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"Workflow failed with error: {e}")
|
||||||
|
workflow_output["status"] = "failed"
|
||||||
|
raise temporalio.exceptions.ApplicationError("Workflow failed",workflow_output,str(e),type="WorkflowError",non_retryable=True) from e
|
||||||
Loading…
x
Reference in New Issue
Block a user