Add initial files
This commit is contained in:
parent
2f456e94d4
commit
a72efd6c0b
1
.gitea/workflows/cd_workflows.yml
Normal file
1
.gitea/workflows/cd_workflows.yml
Normal file
@ -0,0 +1 @@
|
||||
{}
|
||||
62
.gitea/workflows/ci_workflows.yml
Normal file
62
.gitea/workflows/ci_workflows.yml
Normal file
@ -0,0 +1,62 @@
|
||||
name: CI Workflow
|
||||
|
||||
on:
|
||||
push:
|
||||
branches:
|
||||
- '*'
|
||||
|
||||
jobs:
|
||||
test:
|
||||
name: Testing the Flow
|
||||
runs-on: ubuntu-latest
|
||||
steps:
|
||||
- name: Checkout Code
|
||||
uses: actions/checkout@v4
|
||||
|
||||
- name: Set Up Python Environment
|
||||
uses: actions/setup-python@v4
|
||||
with:
|
||||
python-version: '3.10'
|
||||
|
||||
- name: Install Python Dependencies
|
||||
run: |
|
||||
python -m pip install --upgrade pip
|
||||
pip install -r requirements.txt
|
||||
|
||||
- name: Execute Unit Tests
|
||||
run: python -m unittest discover -s . -p 'test_*.py'
|
||||
|
||||
build_and_push:
|
||||
name: Containerize the Flow
|
||||
runs-on: ubuntu-latest
|
||||
needs: test
|
||||
steps:
|
||||
- name: Extract Repository Name
|
||||
id: extract_repo
|
||||
run: echo "repo_name=${GITHUB_REPOSITORY##*/}" >> $GITHUB_OUTPUT
|
||||
|
||||
- name: Checkout Codebase
|
||||
uses: actions/checkout@v4
|
||||
with:
|
||||
fetch-depth: 1
|
||||
|
||||
- name: Configure Docker Buildx
|
||||
uses: docker/setup-buildx-action@v3
|
||||
with:
|
||||
driver: docker
|
||||
|
||||
- name: Log In to Container Registry
|
||||
uses: docker/login-action@v2
|
||||
with:
|
||||
registry: centurion-version-control.default.svc.cluster.local:3000
|
||||
username: ${{ secrets.CI_USER }}
|
||||
password: ${{ secrets.CI_USER_TOKEN }}
|
||||
|
||||
- name: Build and Push Docker Image to Registry
|
||||
uses: docker/build-push-action@v4
|
||||
with:
|
||||
context: .
|
||||
push: true
|
||||
tags: |
|
||||
centurion-version-control.default.svc.cluster.local:3000/centurion/${{ steps.extract_repo.outputs.repo_name }}/${{ github.ref_name }}:${{ github.sha }}
|
||||
centurion-version-control.default.svc.cluster.local:3000/centurion/${{ steps.extract_repo.outputs.repo_name }}/${{ github.ref_name }}:latest
|
||||
17
Dockerfile
Normal file
17
Dockerfile
Normal file
@ -0,0 +1,17 @@
|
||||
# Use Python slim image as base
|
||||
FROM python:3.10-slim AS base
|
||||
|
||||
# Set up a directory for the application code
|
||||
WORKDIR /app
|
||||
|
||||
# Copy only the requirements file initially for better caching
|
||||
COPY requirements.txt .
|
||||
|
||||
# Install Workflow SDK and other dependencies
|
||||
RUN pip install --no-cache-dir -r requirements.txt
|
||||
|
||||
# Copy the rest of the application code
|
||||
COPY . .
|
||||
|
||||
# No entrypoint as this is a preparation image
|
||||
ENTRYPOINT ["python", "/app/flow_wrapper.py"]
|
||||
264
expected_workflows/flow_hybrid_expected.py
Normal file
264
expected_workflows/flow_hybrid_expected.py
Normal file
@ -0,0 +1,264 @@
|
||||
import temporalio.workflow
|
||||
from typing import Any, Dict, List, Callable, Awaitable
|
||||
import logging
|
||||
import asyncio
|
||||
import json
|
||||
import datetime
|
||||
import re
|
||||
import jmespath
|
||||
from temporalio.exceptions import ApplicationError
|
||||
|
||||
# Configure logging
|
||||
logging.basicConfig(level=logging.INFO,
|
||||
format="%(asctime)s [%(levelname)s] %(message)s")
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
@temporalio.workflow.defn
|
||||
class test_repo_test_branch:
|
||||
@temporalio.workflow.run
|
||||
async def run(self, root_inputs: Dict[str, Any]) -> Dict[str, Any]:
|
||||
workflow_info = temporalio.workflow.info()
|
||||
workflow_output: Dict[str, Any] = {
|
||||
"workflow_id": workflow_info.workflow_id,
|
||||
"run_id": workflow_info.run_id,
|
||||
"name": "test_repo_test_branch",
|
||||
"status": "in_progress",
|
||||
"blocks": [],
|
||||
"root_input": root_inputs
|
||||
}
|
||||
try:
|
||||
# Initialize results
|
||||
results: Dict[str, Any] = {}
|
||||
|
||||
# Define task functions
|
||||
task_functions: Dict[str, Callable[[], Awaitable[Any]]] = {}
|
||||
async def task_2():
|
||||
node_id = "2"
|
||||
block_name = "addition"
|
||||
# Prepare inputs
|
||||
input_params: Dict[str, Any] = {}
|
||||
try:
|
||||
jsonpath_expr = jmespath.compile("a")
|
||||
value = jsonpath_expr.search(root_inputs)
|
||||
input_params["a"] = value
|
||||
except Exception as e:
|
||||
logger.error(f"Error parsing jsonpath 'a' for parameter 'a': {e}")
|
||||
input_params["a"] = None
|
||||
try:
|
||||
jsonpath_expr = jmespath.compile("b")
|
||||
value = jsonpath_expr.search(root_inputs)
|
||||
input_params["b"] = value
|
||||
except Exception as e:
|
||||
logger.error(f"Error parsing jsonpath 'b' for parameter 'b': {e}")
|
||||
input_params["b"] = None
|
||||
logger.info(f"Starting 'addition' activity on task queue 'blocks_transformer_addition_97ec9e0d50' with inputs: %s", input_params)
|
||||
try:
|
||||
# Convert timeouts and intervals from milliseconds to seconds
|
||||
schedule_to_close_timeout_value_ms = 0
|
||||
start_to_close_timeout_value_ms = 0
|
||||
schedule_to_close_timeout = None if schedule_to_close_timeout_value_ms == 0 else datetime.timedelta(seconds=schedule_to_close_timeout_value_ms / 1000.0)
|
||||
start_to_close_timeout = None if start_to_close_timeout_value_ms == 0 else datetime.timedelta(seconds=start_to_close_timeout_value_ms / 1000.0)
|
||||
initial_interval_value_ms = 1000
|
||||
maximum_interval_value_ms = 100000
|
||||
initial_interval = datetime.timedelta(seconds=initial_interval_value_ms / 1000.0)
|
||||
maximum_interval = datetime.timedelta(seconds=maximum_interval_value_ms / 1000.0)
|
||||
maximum_attempts_value = 0
|
||||
maximum_attempts = None if maximum_attempts_value == 0 else maximum_attempts_value
|
||||
|
||||
result = await temporalio.workflow.execute_activity(
|
||||
"block_main_activity",
|
||||
input_params,
|
||||
schedule_to_close_timeout=schedule_to_close_timeout,
|
||||
start_to_close_timeout=start_to_close_timeout,
|
||||
task_queue="blocks_transformer_addition_97ec9e0d50",
|
||||
retry_policy=temporalio.common.RetryPolicy(
|
||||
maximum_attempts=maximum_attempts,
|
||||
initial_interval=initial_interval,
|
||||
backoff_coefficient=2,
|
||||
maximum_interval=maximum_interval
|
||||
)
|
||||
)
|
||||
logger.info(f"Completed 'addition' activity with result: %s", result)
|
||||
block_status = "completed"
|
||||
block_error = None
|
||||
results[node_id] = result
|
||||
except Exception as e:
|
||||
logger.error(f"Activity 'addition' failed with error: {e}")
|
||||
result = None
|
||||
block_status = "failed"
|
||||
block_error = {
|
||||
"code": type(e).__name__,
|
||||
"description": str(e),
|
||||
"details": {"cause": str(getattr(e, "cause", "No additional details"))}
|
||||
}
|
||||
workflow_output["status"] = "failed"
|
||||
# Collect block output
|
||||
workflow_output["blocks"].append({
|
||||
"activity_id": node_id,
|
||||
"name": block_name,
|
||||
"status": block_status,
|
||||
"input": input_params,
|
||||
"result": result,
|
||||
"error": block_error
|
||||
})
|
||||
|
||||
task_functions["2"] = task_2
|
||||
async def task_m3aiq7ixuo6du35h8tr():
|
||||
node_id = "m3aiq7ixuo6du35h8tr"
|
||||
block_name = "multiply"
|
||||
# Prepare inputs
|
||||
input_params: Dict[str, Any] = {}
|
||||
try:
|
||||
source_data = results.get("2", {})
|
||||
jsonpath_expr = jmespath.compile("sum")
|
||||
value = jsonpath_expr.search(source_data)
|
||||
input_params["sum"] = value
|
||||
except Exception as e:
|
||||
logger.error(f"Error parsing jsonpath 'sum' for parameter 'sum' from node '2': {e}")
|
||||
input_params["sum"] = None
|
||||
logger.info(f"Starting 'multiply' activity on task queue 'blocks_transformer_multiply_db086f09c9' with inputs: %s", input_params)
|
||||
try:
|
||||
# Convert timeouts and intervals from milliseconds to seconds
|
||||
schedule_to_close_timeout_value_ms = 0
|
||||
start_to_close_timeout_value_ms = 0
|
||||
schedule_to_close_timeout = None if schedule_to_close_timeout_value_ms == 0 else datetime.timedelta(seconds=schedule_to_close_timeout_value_ms / 1000.0)
|
||||
start_to_close_timeout = None if start_to_close_timeout_value_ms == 0 else datetime.timedelta(seconds=start_to_close_timeout_value_ms / 1000.0)
|
||||
initial_interval_value_ms = 1000
|
||||
maximum_interval_value_ms = 100000
|
||||
initial_interval = datetime.timedelta(seconds=initial_interval_value_ms / 1000.0)
|
||||
maximum_interval = datetime.timedelta(seconds=maximum_interval_value_ms / 1000.0)
|
||||
maximum_attempts_value = 0
|
||||
maximum_attempts = None if maximum_attempts_value == 0 else maximum_attempts_value
|
||||
|
||||
result = await temporalio.workflow.execute_activity(
|
||||
"block_main_activity",
|
||||
input_params,
|
||||
schedule_to_close_timeout=schedule_to_close_timeout,
|
||||
start_to_close_timeout=start_to_close_timeout,
|
||||
task_queue="blocks_transformer_multiply_db086f09c9",
|
||||
retry_policy=temporalio.common.RetryPolicy(
|
||||
maximum_attempts=maximum_attempts,
|
||||
initial_interval=initial_interval,
|
||||
backoff_coefficient=2,
|
||||
maximum_interval=maximum_interval
|
||||
)
|
||||
)
|
||||
logger.info(f"Completed 'multiply' activity with result: %s", result)
|
||||
block_status = "completed"
|
||||
block_error = None
|
||||
results[node_id] = result
|
||||
except Exception as e:
|
||||
logger.error(f"Activity 'multiply' failed with error: {e}")
|
||||
result = None
|
||||
block_status = "failed"
|
||||
block_error = {
|
||||
"code": type(e).__name__,
|
||||
"description": str(e),
|
||||
"details": {"cause": str(getattr(e, "cause", "No additional details"))}
|
||||
}
|
||||
workflow_output["status"] = "failed"
|
||||
# Collect block output
|
||||
workflow_output["blocks"].append({
|
||||
"activity_id": node_id,
|
||||
"name": block_name,
|
||||
"status": block_status,
|
||||
"input": input_params,
|
||||
"result": result,
|
||||
"error": block_error
|
||||
})
|
||||
|
||||
task_functions["m3aiq7ixuo6du35h8tr"] = task_m3aiq7ixuo6du35h8tr
|
||||
async def task_m3aiqkrv4k1y6654ymr():
|
||||
node_id = "m3aiqkrv4k1y6654ymr"
|
||||
block_name = "power"
|
||||
# Prepare inputs
|
||||
input_params: Dict[str, Any] = {}
|
||||
try:
|
||||
source_data = results.get("2", {})
|
||||
jsonpath_expr = jmespath.compile("sum")
|
||||
value = jsonpath_expr.search(source_data)
|
||||
input_params["product"] = value
|
||||
except Exception as e:
|
||||
logger.error(f"Error parsing jsonpath 'sum' for parameter 'product' from node '2': {e}")
|
||||
input_params["product"] = None
|
||||
logger.info(f"Starting 'power' activity on task queue 'blocks_transformer_power_057645be0c' with inputs: %s", input_params)
|
||||
try:
|
||||
# Convert timeouts and intervals from milliseconds to seconds
|
||||
schedule_to_close_timeout_value_ms = 0
|
||||
start_to_close_timeout_value_ms = 0
|
||||
schedule_to_close_timeout = None if schedule_to_close_timeout_value_ms == 0 else datetime.timedelta(seconds=schedule_to_close_timeout_value_ms / 1000.0)
|
||||
start_to_close_timeout = None if start_to_close_timeout_value_ms == 0 else datetime.timedelta(seconds=start_to_close_timeout_value_ms / 1000.0)
|
||||
initial_interval_value_ms = 1000
|
||||
maximum_interval_value_ms = 100000
|
||||
initial_interval = datetime.timedelta(seconds=initial_interval_value_ms / 1000.0)
|
||||
maximum_interval = datetime.timedelta(seconds=maximum_interval_value_ms / 1000.0)
|
||||
maximum_attempts_value = 0
|
||||
maximum_attempts = None if maximum_attempts_value == 0 else maximum_attempts_value
|
||||
|
||||
result = await temporalio.workflow.execute_activity(
|
||||
"block_main_activity",
|
||||
input_params,
|
||||
schedule_to_close_timeout=schedule_to_close_timeout,
|
||||
start_to_close_timeout=start_to_close_timeout,
|
||||
task_queue="blocks_transformer_power_057645be0c",
|
||||
retry_policy=temporalio.common.RetryPolicy(
|
||||
maximum_attempts=maximum_attempts,
|
||||
initial_interval=initial_interval,
|
||||
backoff_coefficient=2,
|
||||
maximum_interval=maximum_interval
|
||||
)
|
||||
)
|
||||
logger.info(f"Completed 'power' activity with result: %s", result)
|
||||
block_status = "completed"
|
||||
block_error = None
|
||||
results[node_id] = result
|
||||
except Exception as e:
|
||||
logger.error(f"Activity 'power' failed with error: {e}")
|
||||
result = None
|
||||
block_status = "failed"
|
||||
block_error = {
|
||||
"code": type(e).__name__,
|
||||
"description": str(e),
|
||||
"details": {"cause": str(getattr(e, "cause", "No additional details"))}
|
||||
}
|
||||
workflow_output["status"] = "failed"
|
||||
# Collect block output
|
||||
workflow_output["blocks"].append({
|
||||
"activity_id": node_id,
|
||||
"name": block_name,
|
||||
"status": block_status,
|
||||
"input": input_params,
|
||||
"result": result,
|
||||
"error": block_error
|
||||
})
|
||||
|
||||
task_functions["m3aiqkrv4k1y6654ymr"] = task_m3aiqkrv4k1y6654ymr
|
||||
|
||||
# Execute tasks according to execution steps
|
||||
# Execution step 1
|
||||
tasks = [task_functions[node_id]() for node_id in ['2']]
|
||||
results_step = await asyncio.gather(*tasks, return_exceptions=True)
|
||||
for result in results_step:
|
||||
if isinstance(result, Exception):
|
||||
logger.error(f"Task failed with exception: {result}")
|
||||
workflow_output["status"] = "failed"
|
||||
# Execution step 2
|
||||
tasks = [task_functions[node_id]() for node_id in ['m3aiq7ixuo6du35h8tr', 'm3aiqkrv4k1y6654ymr']]
|
||||
results_step = await asyncio.gather(*tasks, return_exceptions=True)
|
||||
for result in results_step:
|
||||
if isinstance(result, Exception):
|
||||
logger.error(f"Task failed with exception: {result}")
|
||||
workflow_output["status"] = "failed"
|
||||
|
||||
# Update workflow status to completed if not failed
|
||||
if workflow_output["status"] != "failed":
|
||||
workflow_output["status"] = "completed"
|
||||
else:
|
||||
raise ApplicationError("Activity error occurred", type="ActivityError", non_retryable=True)
|
||||
|
||||
return workflow_output
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Workflow failed with error: {e}")
|
||||
workflow_output["status"] = "failed"
|
||||
raise temporalio.exceptions.ApplicationError("Workflow failed",workflow_output,str(e),type="WorkflowError",non_retryable=True) from e
|
||||
255
expected_workflows/flow_parallel_expected.py
Normal file
255
expected_workflows/flow_parallel_expected.py
Normal file
@ -0,0 +1,255 @@
|
||||
import temporalio.workflow
|
||||
from typing import Any, Dict, List, Callable, Awaitable
|
||||
import logging
|
||||
import asyncio
|
||||
import json
|
||||
import datetime
|
||||
import re
|
||||
import jmespath
|
||||
from temporalio.exceptions import ApplicationError
|
||||
|
||||
# Configure logging
|
||||
logging.basicConfig(level=logging.INFO,
|
||||
format="%(asctime)s [%(levelname)s] %(message)s")
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
@temporalio.workflow.defn
|
||||
class test_repo_test_branch:
|
||||
@temporalio.workflow.run
|
||||
async def run(self, root_inputs: Dict[str, Any]) -> Dict[str, Any]:
|
||||
workflow_info = temporalio.workflow.info()
|
||||
workflow_output: Dict[str, Any] = {
|
||||
"workflow_id": workflow_info.workflow_id,
|
||||
"run_id": workflow_info.run_id,
|
||||
"name": "test_repo_test_branch",
|
||||
"status": "in_progress",
|
||||
"blocks": [],
|
||||
"root_input": root_inputs
|
||||
}
|
||||
try:
|
||||
# Initialize results
|
||||
results: Dict[str, Any] = {}
|
||||
|
||||
# Define task functions
|
||||
task_functions: Dict[str, Callable[[], Awaitable[Any]]] = {}
|
||||
async def task_2():
|
||||
node_id = "2"
|
||||
block_name = "addition"
|
||||
# Prepare inputs
|
||||
input_params: Dict[str, Any] = {}
|
||||
try:
|
||||
jsonpath_expr = jmespath.compile("a")
|
||||
value = jsonpath_expr.search(root_inputs)
|
||||
input_params["a"] = value
|
||||
except Exception as e:
|
||||
logger.error(f"Error parsing jsonpath 'a' for parameter 'a': {e}")
|
||||
input_params["a"] = None
|
||||
try:
|
||||
jsonpath_expr = jmespath.compile("b")
|
||||
value = jsonpath_expr.search(root_inputs)
|
||||
input_params["b"] = value
|
||||
except Exception as e:
|
||||
logger.error(f"Error parsing jsonpath 'b' for parameter 'b': {e}")
|
||||
input_params["b"] = None
|
||||
logger.info(f"Starting 'addition' activity on task queue 'blocks_transformer_addition_97ec9e0d50' with inputs: %s", input_params)
|
||||
try:
|
||||
# Convert timeouts and intervals from milliseconds to seconds
|
||||
schedule_to_close_timeout_value_ms = 0
|
||||
start_to_close_timeout_value_ms = 0
|
||||
schedule_to_close_timeout = None if schedule_to_close_timeout_value_ms == 0 else datetime.timedelta(seconds=schedule_to_close_timeout_value_ms / 1000.0)
|
||||
start_to_close_timeout = None if start_to_close_timeout_value_ms == 0 else datetime.timedelta(seconds=start_to_close_timeout_value_ms / 1000.0)
|
||||
initial_interval_value_ms = 1000
|
||||
maximum_interval_value_ms = 100000
|
||||
initial_interval = datetime.timedelta(seconds=initial_interval_value_ms / 1000.0)
|
||||
maximum_interval = datetime.timedelta(seconds=maximum_interval_value_ms / 1000.0)
|
||||
maximum_attempts_value = 0
|
||||
maximum_attempts = None if maximum_attempts_value == 0 else maximum_attempts_value
|
||||
|
||||
result = await temporalio.workflow.execute_activity(
|
||||
"block_main_activity",
|
||||
input_params,
|
||||
schedule_to_close_timeout=schedule_to_close_timeout,
|
||||
start_to_close_timeout=start_to_close_timeout,
|
||||
task_queue="blocks_transformer_addition_97ec9e0d50",
|
||||
retry_policy=temporalio.common.RetryPolicy(
|
||||
maximum_attempts=maximum_attempts,
|
||||
initial_interval=initial_interval,
|
||||
backoff_coefficient=2,
|
||||
maximum_interval=maximum_interval
|
||||
)
|
||||
)
|
||||
logger.info(f"Completed 'addition' activity with result: %s", result)
|
||||
block_status = "completed"
|
||||
block_error = None
|
||||
results[node_id] = result
|
||||
except Exception as e:
|
||||
logger.error(f"Activity 'addition' failed with error: {e}")
|
||||
result = None
|
||||
block_status = "failed"
|
||||
block_error = {
|
||||
"code": type(e).__name__,
|
||||
"description": str(e),
|
||||
"details": {"cause": str(getattr(e, "cause", "No additional details"))}
|
||||
}
|
||||
workflow_output["status"] = "failed"
|
||||
# Collect block output
|
||||
workflow_output["blocks"].append({
|
||||
"activity_id": node_id,
|
||||
"name": block_name,
|
||||
"status": block_status,
|
||||
"input": input_params,
|
||||
"result": result,
|
||||
"error": block_error
|
||||
})
|
||||
|
||||
task_functions["2"] = task_2
|
||||
async def task_m3aiq7ixuo6du35h8tr():
|
||||
node_id = "m3aiq7ixuo6du35h8tr"
|
||||
block_name = "multiply"
|
||||
# Prepare inputs
|
||||
input_params: Dict[str, Any] = {}
|
||||
try:
|
||||
jsonpath_expr = jmespath.compile("sum")
|
||||
value = jsonpath_expr.search(root_inputs)
|
||||
input_params["sum"] = value
|
||||
except Exception as e:
|
||||
logger.error(f"Error parsing jsonpath 'sum' for parameter 'sum': {e}")
|
||||
input_params["sum"] = None
|
||||
logger.info(f"Starting 'multiply' activity on task queue 'blocks_transformer_multiply_db086f09c9' with inputs: %s", input_params)
|
||||
try:
|
||||
# Convert timeouts and intervals from milliseconds to seconds
|
||||
schedule_to_close_timeout_value_ms = 0
|
||||
start_to_close_timeout_value_ms = 0
|
||||
schedule_to_close_timeout = None if schedule_to_close_timeout_value_ms == 0 else datetime.timedelta(seconds=schedule_to_close_timeout_value_ms / 1000.0)
|
||||
start_to_close_timeout = None if start_to_close_timeout_value_ms == 0 else datetime.timedelta(seconds=start_to_close_timeout_value_ms / 1000.0)
|
||||
initial_interval_value_ms = 1000
|
||||
maximum_interval_value_ms = 100000
|
||||
initial_interval = datetime.timedelta(seconds=initial_interval_value_ms / 1000.0)
|
||||
maximum_interval = datetime.timedelta(seconds=maximum_interval_value_ms / 1000.0)
|
||||
maximum_attempts_value = 0
|
||||
maximum_attempts = None if maximum_attempts_value == 0 else maximum_attempts_value
|
||||
|
||||
result = await temporalio.workflow.execute_activity(
|
||||
"block_main_activity",
|
||||
input_params,
|
||||
schedule_to_close_timeout=schedule_to_close_timeout,
|
||||
start_to_close_timeout=start_to_close_timeout,
|
||||
task_queue="blocks_transformer_multiply_db086f09c9",
|
||||
retry_policy=temporalio.common.RetryPolicy(
|
||||
maximum_attempts=maximum_attempts,
|
||||
initial_interval=initial_interval,
|
||||
backoff_coefficient=2,
|
||||
maximum_interval=maximum_interval
|
||||
)
|
||||
)
|
||||
logger.info(f"Completed 'multiply' activity with result: %s", result)
|
||||
block_status = "completed"
|
||||
block_error = None
|
||||
results[node_id] = result
|
||||
except Exception as e:
|
||||
logger.error(f"Activity 'multiply' failed with error: {e}")
|
||||
result = None
|
||||
block_status = "failed"
|
||||
block_error = {
|
||||
"code": type(e).__name__,
|
||||
"description": str(e),
|
||||
"details": {"cause": str(getattr(e, "cause", "No additional details"))}
|
||||
}
|
||||
workflow_output["status"] = "failed"
|
||||
# Collect block output
|
||||
workflow_output["blocks"].append({
|
||||
"activity_id": node_id,
|
||||
"name": block_name,
|
||||
"status": block_status,
|
||||
"input": input_params,
|
||||
"result": result,
|
||||
"error": block_error
|
||||
})
|
||||
|
||||
task_functions["m3aiq7ixuo6du35h8tr"] = task_m3aiq7ixuo6du35h8tr
|
||||
async def task_m3aiqkrv4k1y6654ymr():
|
||||
node_id = "m3aiqkrv4k1y6654ymr"
|
||||
block_name = "power"
|
||||
# Prepare inputs
|
||||
input_params: Dict[str, Any] = {}
|
||||
try:
|
||||
jsonpath_expr = jmespath.compile("product")
|
||||
value = jsonpath_expr.search(root_inputs)
|
||||
input_params["product"] = value
|
||||
except Exception as e:
|
||||
logger.error(f"Error parsing jsonpath 'product' for parameter 'product': {e}")
|
||||
input_params["product"] = None
|
||||
logger.info(f"Starting 'power' activity on task queue 'blocks_transformer_power_057645be0c' with inputs: %s", input_params)
|
||||
try:
|
||||
# Convert timeouts and intervals from milliseconds to seconds
|
||||
schedule_to_close_timeout_value_ms = 0
|
||||
start_to_close_timeout_value_ms = 0
|
||||
schedule_to_close_timeout = None if schedule_to_close_timeout_value_ms == 0 else datetime.timedelta(seconds=schedule_to_close_timeout_value_ms / 1000.0)
|
||||
start_to_close_timeout = None if start_to_close_timeout_value_ms == 0 else datetime.timedelta(seconds=start_to_close_timeout_value_ms / 1000.0)
|
||||
initial_interval_value_ms = 1000
|
||||
maximum_interval_value_ms = 100000
|
||||
initial_interval = datetime.timedelta(seconds=initial_interval_value_ms / 1000.0)
|
||||
maximum_interval = datetime.timedelta(seconds=maximum_interval_value_ms / 1000.0)
|
||||
maximum_attempts_value = 0
|
||||
maximum_attempts = None if maximum_attempts_value == 0 else maximum_attempts_value
|
||||
|
||||
result = await temporalio.workflow.execute_activity(
|
||||
"block_main_activity",
|
||||
input_params,
|
||||
schedule_to_close_timeout=schedule_to_close_timeout,
|
||||
start_to_close_timeout=start_to_close_timeout,
|
||||
task_queue="blocks_transformer_power_057645be0c",
|
||||
retry_policy=temporalio.common.RetryPolicy(
|
||||
maximum_attempts=maximum_attempts,
|
||||
initial_interval=initial_interval,
|
||||
backoff_coefficient=2,
|
||||
maximum_interval=maximum_interval
|
||||
)
|
||||
)
|
||||
logger.info(f"Completed 'power' activity with result: %s", result)
|
||||
block_status = "completed"
|
||||
block_error = None
|
||||
results[node_id] = result
|
||||
except Exception as e:
|
||||
logger.error(f"Activity 'power' failed with error: {e}")
|
||||
result = None
|
||||
block_status = "failed"
|
||||
block_error = {
|
||||
"code": type(e).__name__,
|
||||
"description": str(e),
|
||||
"details": {"cause": str(getattr(e, "cause", "No additional details"))}
|
||||
}
|
||||
workflow_output["status"] = "failed"
|
||||
# Collect block output
|
||||
workflow_output["blocks"].append({
|
||||
"activity_id": node_id,
|
||||
"name": block_name,
|
||||
"status": block_status,
|
||||
"input": input_params,
|
||||
"result": result,
|
||||
"error": block_error
|
||||
})
|
||||
|
||||
task_functions["m3aiqkrv4k1y6654ymr"] = task_m3aiqkrv4k1y6654ymr
|
||||
|
||||
# Execute tasks according to execution steps
|
||||
# Execution step 1
|
||||
tasks = [task_functions[node_id]() for node_id in ['2', 'm3aiq7ixuo6du35h8tr', 'm3aiqkrv4k1y6654ymr']]
|
||||
results_step = await asyncio.gather(*tasks, return_exceptions=True)
|
||||
for result in results_step:
|
||||
if isinstance(result, Exception):
|
||||
logger.error(f"Task failed with exception: {result}")
|
||||
workflow_output["status"] = "failed"
|
||||
|
||||
# Update workflow status to completed if not failed
|
||||
if workflow_output["status"] != "failed":
|
||||
workflow_output["status"] = "completed"
|
||||
else:
|
||||
raise ApplicationError("Activity error occurred", type="ActivityError", non_retryable=True)
|
||||
|
||||
return workflow_output
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Workflow failed with error: {e}")
|
||||
workflow_output["status"] = "failed"
|
||||
raise temporalio.exceptions.ApplicationError("Workflow failed",workflow_output,str(e),type="WorkflowError",non_retryable=True) from e
|
||||
271
expected_workflows/flow_sequential_expected.py
Normal file
271
expected_workflows/flow_sequential_expected.py
Normal file
@ -0,0 +1,271 @@
|
||||
import temporalio.workflow
|
||||
from typing import Any, Dict, List, Callable, Awaitable
|
||||
import logging
|
||||
import asyncio
|
||||
import json
|
||||
import datetime
|
||||
import re
|
||||
import jmespath
|
||||
from temporalio.exceptions import ApplicationError
|
||||
|
||||
# Configure logging
|
||||
logging.basicConfig(level=logging.INFO,
|
||||
format="%(asctime)s [%(levelname)s] %(message)s")
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
@temporalio.workflow.defn
|
||||
class test_repo_test_branch:
|
||||
@temporalio.workflow.run
|
||||
async def run(self, root_inputs: Dict[str, Any]) -> Dict[str, Any]:
|
||||
workflow_info = temporalio.workflow.info()
|
||||
workflow_output: Dict[str, Any] = {
|
||||
"workflow_id": workflow_info.workflow_id,
|
||||
"run_id": workflow_info.run_id,
|
||||
"name": "test_repo_test_branch",
|
||||
"status": "in_progress",
|
||||
"blocks": [],
|
||||
"root_input": root_inputs
|
||||
}
|
||||
try:
|
||||
# Initialize results
|
||||
results: Dict[str, Any] = {}
|
||||
|
||||
# Define task functions
|
||||
task_functions: Dict[str, Callable[[], Awaitable[Any]]] = {}
|
||||
async def task_2():
|
||||
node_id = "2"
|
||||
block_name = "addition"
|
||||
# Prepare inputs
|
||||
input_params: Dict[str, Any] = {}
|
||||
try:
|
||||
jsonpath_expr = jmespath.compile("a")
|
||||
value = jsonpath_expr.search(root_inputs)
|
||||
input_params["a"] = value
|
||||
except Exception as e:
|
||||
logger.error(f"Error parsing jsonpath 'a' for parameter 'a': {e}")
|
||||
input_params["a"] = None
|
||||
try:
|
||||
jsonpath_expr = jmespath.compile("b")
|
||||
value = jsonpath_expr.search(root_inputs)
|
||||
input_params["b"] = value
|
||||
except Exception as e:
|
||||
logger.error(f"Error parsing jsonpath 'b' for parameter 'b': {e}")
|
||||
input_params["b"] = None
|
||||
logger.info(f"Starting 'addition' activity on task queue 'blocks_transformer_addition_97ec9e0d50' with inputs: %s", input_params)
|
||||
try:
|
||||
# Convert timeouts and intervals from milliseconds to seconds
|
||||
schedule_to_close_timeout_value_ms = 0
|
||||
start_to_close_timeout_value_ms = 0
|
||||
schedule_to_close_timeout = None if schedule_to_close_timeout_value_ms == 0 else datetime.timedelta(seconds=schedule_to_close_timeout_value_ms / 1000.0)
|
||||
start_to_close_timeout = None if start_to_close_timeout_value_ms == 0 else datetime.timedelta(seconds=start_to_close_timeout_value_ms / 1000.0)
|
||||
initial_interval_value_ms = 1000
|
||||
maximum_interval_value_ms = 100000
|
||||
initial_interval = datetime.timedelta(seconds=initial_interval_value_ms / 1000.0)
|
||||
maximum_interval = datetime.timedelta(seconds=maximum_interval_value_ms / 1000.0)
|
||||
maximum_attempts_value = 0
|
||||
maximum_attempts = None if maximum_attempts_value == 0 else maximum_attempts_value
|
||||
|
||||
result = await temporalio.workflow.execute_activity(
|
||||
"block_main_activity",
|
||||
input_params,
|
||||
schedule_to_close_timeout=schedule_to_close_timeout,
|
||||
start_to_close_timeout=start_to_close_timeout,
|
||||
task_queue="blocks_transformer_addition_97ec9e0d50",
|
||||
retry_policy=temporalio.common.RetryPolicy(
|
||||
maximum_attempts=maximum_attempts,
|
||||
initial_interval=initial_interval,
|
||||
backoff_coefficient=2,
|
||||
maximum_interval=maximum_interval
|
||||
)
|
||||
)
|
||||
logger.info(f"Completed 'addition' activity with result: %s", result)
|
||||
block_status = "completed"
|
||||
block_error = None
|
||||
results[node_id] = result
|
||||
except Exception as e:
|
||||
logger.error(f"Activity 'addition' failed with error: {e}")
|
||||
result = None
|
||||
block_status = "failed"
|
||||
block_error = {
|
||||
"code": type(e).__name__,
|
||||
"description": str(e),
|
||||
"details": {"cause": str(getattr(e, "cause", "No additional details"))}
|
||||
}
|
||||
workflow_output["status"] = "failed"
|
||||
# Collect block output
|
||||
workflow_output["blocks"].append({
|
||||
"activity_id": node_id,
|
||||
"name": block_name,
|
||||
"status": block_status,
|
||||
"input": input_params,
|
||||
"result": result,
|
||||
"error": block_error
|
||||
})
|
||||
|
||||
task_functions["2"] = task_2
|
||||
async def task_m3aiq7ixuo6du35h8tr():
|
||||
node_id = "m3aiq7ixuo6du35h8tr"
|
||||
block_name = "multiply"
|
||||
# Prepare inputs
|
||||
input_params: Dict[str, Any] = {}
|
||||
try:
|
||||
source_data = results.get("2", {})
|
||||
jsonpath_expr = jmespath.compile("sum")
|
||||
value = jsonpath_expr.search(source_data)
|
||||
input_params["sum"] = value
|
||||
except Exception as e:
|
||||
logger.error(f"Error parsing jsonpath 'sum' for parameter 'sum' from node '2': {e}")
|
||||
input_params["sum"] = None
|
||||
logger.info(f"Starting 'multiply' activity on task queue 'blocks_transformer_multiply_db086f09c9' with inputs: %s", input_params)
|
||||
try:
|
||||
# Convert timeouts and intervals from milliseconds to seconds
|
||||
schedule_to_close_timeout_value_ms = 0
|
||||
start_to_close_timeout_value_ms = 0
|
||||
schedule_to_close_timeout = None if schedule_to_close_timeout_value_ms == 0 else datetime.timedelta(seconds=schedule_to_close_timeout_value_ms / 1000.0)
|
||||
start_to_close_timeout = None if start_to_close_timeout_value_ms == 0 else datetime.timedelta(seconds=start_to_close_timeout_value_ms / 1000.0)
|
||||
initial_interval_value_ms = 1000
|
||||
maximum_interval_value_ms = 100000
|
||||
initial_interval = datetime.timedelta(seconds=initial_interval_value_ms / 1000.0)
|
||||
maximum_interval = datetime.timedelta(seconds=maximum_interval_value_ms / 1000.0)
|
||||
maximum_attempts_value = 0
|
||||
maximum_attempts = None if maximum_attempts_value == 0 else maximum_attempts_value
|
||||
|
||||
result = await temporalio.workflow.execute_activity(
|
||||
"block_main_activity",
|
||||
input_params,
|
||||
schedule_to_close_timeout=schedule_to_close_timeout,
|
||||
start_to_close_timeout=start_to_close_timeout,
|
||||
task_queue="blocks_transformer_multiply_db086f09c9",
|
||||
retry_policy=temporalio.common.RetryPolicy(
|
||||
maximum_attempts=maximum_attempts,
|
||||
initial_interval=initial_interval,
|
||||
backoff_coefficient=2,
|
||||
maximum_interval=maximum_interval
|
||||
)
|
||||
)
|
||||
logger.info(f"Completed 'multiply' activity with result: %s", result)
|
||||
block_status = "completed"
|
||||
block_error = None
|
||||
results[node_id] = result
|
||||
except Exception as e:
|
||||
logger.error(f"Activity 'multiply' failed with error: {e}")
|
||||
result = None
|
||||
block_status = "failed"
|
||||
block_error = {
|
||||
"code": type(e).__name__,
|
||||
"description": str(e),
|
||||
"details": {"cause": str(getattr(e, "cause", "No additional details"))}
|
||||
}
|
||||
workflow_output["status"] = "failed"
|
||||
# Collect block output
|
||||
workflow_output["blocks"].append({
|
||||
"activity_id": node_id,
|
||||
"name": block_name,
|
||||
"status": block_status,
|
||||
"input": input_params,
|
||||
"result": result,
|
||||
"error": block_error
|
||||
})
|
||||
|
||||
task_functions["m3aiq7ixuo6du35h8tr"] = task_m3aiq7ixuo6du35h8tr
|
||||
async def task_m3aiqkrv4k1y6654ymr():
|
||||
node_id = "m3aiqkrv4k1y6654ymr"
|
||||
block_name = "power"
|
||||
# Prepare inputs
|
||||
input_params: Dict[str, Any] = {}
|
||||
try:
|
||||
source_data = results.get("m3aiq7ixuo6du35h8tr", {})
|
||||
jsonpath_expr = jmespath.compile("product")
|
||||
value = jsonpath_expr.search(source_data)
|
||||
input_params["product"] = value
|
||||
except Exception as e:
|
||||
logger.error(f"Error parsing jsonpath 'product' for parameter 'product' from node 'm3aiq7ixuo6du35h8tr': {e}")
|
||||
input_params["product"] = None
|
||||
logger.info(f"Starting 'power' activity on task queue 'blocks_transformer_power_057645be0c' with inputs: %s", input_params)
|
||||
try:
|
||||
# Convert timeouts and intervals from milliseconds to seconds
|
||||
schedule_to_close_timeout_value_ms = 0
|
||||
start_to_close_timeout_value_ms = 0
|
||||
schedule_to_close_timeout = None if schedule_to_close_timeout_value_ms == 0 else datetime.timedelta(seconds=schedule_to_close_timeout_value_ms / 1000.0)
|
||||
start_to_close_timeout = None if start_to_close_timeout_value_ms == 0 else datetime.timedelta(seconds=start_to_close_timeout_value_ms / 1000.0)
|
||||
initial_interval_value_ms = 1000
|
||||
maximum_interval_value_ms = 100000
|
||||
initial_interval = datetime.timedelta(seconds=initial_interval_value_ms / 1000.0)
|
||||
maximum_interval = datetime.timedelta(seconds=maximum_interval_value_ms / 1000.0)
|
||||
maximum_attempts_value = 0
|
||||
maximum_attempts = None if maximum_attempts_value == 0 else maximum_attempts_value
|
||||
|
||||
result = await temporalio.workflow.execute_activity(
|
||||
"block_main_activity",
|
||||
input_params,
|
||||
schedule_to_close_timeout=schedule_to_close_timeout,
|
||||
start_to_close_timeout=start_to_close_timeout,
|
||||
task_queue="blocks_transformer_power_057645be0c",
|
||||
retry_policy=temporalio.common.RetryPolicy(
|
||||
maximum_attempts=maximum_attempts,
|
||||
initial_interval=initial_interval,
|
||||
backoff_coefficient=2,
|
||||
maximum_interval=maximum_interval
|
||||
)
|
||||
)
|
||||
logger.info(f"Completed 'power' activity with result: %s", result)
|
||||
block_status = "completed"
|
||||
block_error = None
|
||||
results[node_id] = result
|
||||
except Exception as e:
|
||||
logger.error(f"Activity 'power' failed with error: {e}")
|
||||
result = None
|
||||
block_status = "failed"
|
||||
block_error = {
|
||||
"code": type(e).__name__,
|
||||
"description": str(e),
|
||||
"details": {"cause": str(getattr(e, "cause", "No additional details"))}
|
||||
}
|
||||
workflow_output["status"] = "failed"
|
||||
# Collect block output
|
||||
workflow_output["blocks"].append({
|
||||
"activity_id": node_id,
|
||||
"name": block_name,
|
||||
"status": block_status,
|
||||
"input": input_params,
|
||||
"result": result,
|
||||
"error": block_error
|
||||
})
|
||||
|
||||
task_functions["m3aiqkrv4k1y6654ymr"] = task_m3aiqkrv4k1y6654ymr
|
||||
|
||||
# Execute tasks according to execution steps
|
||||
# Execution step 1
|
||||
tasks = [task_functions[node_id]() for node_id in ['2']]
|
||||
results_step = await asyncio.gather(*tasks, return_exceptions=True)
|
||||
for result in results_step:
|
||||
if isinstance(result, Exception):
|
||||
logger.error(f"Task failed with exception: {result}")
|
||||
workflow_output["status"] = "failed"
|
||||
# Execution step 2
|
||||
tasks = [task_functions[node_id]() for node_id in ['m3aiq7ixuo6du35h8tr']]
|
||||
results_step = await asyncio.gather(*tasks, return_exceptions=True)
|
||||
for result in results_step:
|
||||
if isinstance(result, Exception):
|
||||
logger.error(f"Task failed with exception: {result}")
|
||||
workflow_output["status"] = "failed"
|
||||
# Execution step 3
|
||||
tasks = [task_functions[node_id]() for node_id in ['m3aiqkrv4k1y6654ymr']]
|
||||
results_step = await asyncio.gather(*tasks, return_exceptions=True)
|
||||
for result in results_step:
|
||||
if isinstance(result, Exception):
|
||||
logger.error(f"Task failed with exception: {result}")
|
||||
workflow_output["status"] = "failed"
|
||||
|
||||
# Update workflow status to completed if not failed
|
||||
if workflow_output["status"] != "failed":
|
||||
workflow_output["status"] = "completed"
|
||||
else:
|
||||
raise ApplicationError("Activity error occurred", type="ActivityError", non_retryable=True)
|
||||
|
||||
return workflow_output
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Workflow failed with error: {e}")
|
||||
workflow_output["status"] = "failed"
|
||||
raise temporalio.exceptions.ApplicationError("Workflow failed",workflow_output,str(e),type="WorkflowError",non_retryable=True) from e
|
||||
83
flow_wrapper.py
Normal file
83
flow_wrapper.py
Normal file
@ -0,0 +1,83 @@
|
||||
import asyncio
|
||||
import os
|
||||
import re
|
||||
import sys
|
||||
import logging
|
||||
from temporalio.client import Client
|
||||
from temporalio.worker import Worker
|
||||
|
||||
# Ensure the generated workflow module is in the Python path
|
||||
sys.path.append(os.path.dirname(os.path.abspath(__file__)))
|
||||
|
||||
# Configure logging
|
||||
logging.basicConfig(
|
||||
level=logging.INFO,
|
||||
format="%(asctime)s [%(levelname)s] %(name)s - %(message)s",
|
||||
)
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
# Retrieve environment variables
|
||||
REPO_NAME = os.getenv('REPO_NAME')
|
||||
BRANCH_NAME = os.getenv('BRANCH_NAME')
|
||||
COMMIT_ID = os.getenv('VERSION')
|
||||
NAMESPACE = os.getenv('NAMESPACE')
|
||||
FLOWX_ENGINE_ADDRESS = os.getenv('FLOWX_ENGINE_ADDRESS')
|
||||
|
||||
if not BRANCH_NAME or not COMMIT_ID or not BRANCH_NAME or not NAMESPACE or not FLOWX_ENGINE_ADDRESS:
|
||||
raise ValueError("Environment variables BRANCH_NAME, VERSION, BRANCH_NAME, NAMESPACE and FLOWX_ENGINE_ADDRESS must be set.")
|
||||
|
||||
|
||||
# Shorten the commit ID to the first 10 characters
|
||||
COMMIT_ID_SHORT = COMMIT_ID[:10]
|
||||
|
||||
# Sanitize flow name and commit ID to create a valid task queue name
|
||||
def sanitize_name(name):
|
||||
# Replace non-alphanumeric characters or invalid start with underscores
|
||||
sanitized = re.sub(r'\W|^(?=\d)', '_', name)
|
||||
# Replace multiple consecutive underscores with a single underscore
|
||||
sanitized = re.sub(r'_+', '_', sanitized)
|
||||
# Remove trailing underscores
|
||||
return sanitized.strip('_')
|
||||
|
||||
FLOW_NAME = REPO_NAME + "_" + BRANCH_NAME
|
||||
flow_name_safe = sanitize_name(FLOW_NAME)
|
||||
commit_id_safe = sanitize_name(COMMIT_ID_SHORT)
|
||||
|
||||
# Construct the task queue name
|
||||
TASK_QUEUE = f"{flow_name_safe}_{commit_id_safe}"
|
||||
|
||||
# Import the default workflow module
|
||||
workflow_module_name = "workflow" # Hardcoded to the default module name 'workflow.py'
|
||||
try:
|
||||
workflow_module = __import__(workflow_module_name)
|
||||
except ImportError as e:
|
||||
raise ImportError(f"Failed to import workflow module '{workflow_module_name}': {e}")
|
||||
|
||||
# Get the workflow class
|
||||
# Assuming the workflow class is named as <FlowName>Workflow, e.g., HybridWorkflow
|
||||
workflow_class_name = f"{flow_name_safe}_{commit_id_safe}"
|
||||
workflow_class = getattr(workflow_module, workflow_class_name, None)
|
||||
|
||||
if not workflow_class:
|
||||
raise AttributeError(f"Workflow class '{workflow_class_name}' not found in module '{workflow_module_name}'.")
|
||||
|
||||
async def main():
|
||||
"""
|
||||
Initialize and run the worker with the activity.
|
||||
"""
|
||||
try:
|
||||
client = await Client.connect(FLOWX_ENGINE_ADDRESS, namespace=NAMESPACE)
|
||||
# No activities are registered since they are in separate containers
|
||||
worker = Worker(
|
||||
client,
|
||||
task_queue=TASK_QUEUE,
|
||||
workflows=[workflow_class],
|
||||
)
|
||||
logger.info("Worker starting for %s, listening to task queue: %s", workflow_class_name, TASK_QUEUE)
|
||||
await worker.run()
|
||||
except Exception as e:
|
||||
logger.critical("Worker failed to start: %s", e)
|
||||
raise
|
||||
|
||||
if __name__ == "__main__":
|
||||
asyncio.run(main())
|
||||
160
generator.py
Normal file
160
generator.py
Normal file
@ -0,0 +1,160 @@
|
||||
import json
|
||||
import os
|
||||
import sys
|
||||
from jinja2 import Environment, FileSystemLoader
|
||||
import networkx as nx
|
||||
import re # Import the re module for regular expressions
|
||||
import argparse # Import the argparse module for parsing arguments
|
||||
|
||||
# Define paths for templates and output
|
||||
TEMPLATE_DIR = 'templates'
|
||||
|
||||
# Load Jinja environment
|
||||
env = Environment(loader=FileSystemLoader(TEMPLATE_DIR),
|
||||
extensions=["jinja2.ext.do"])
|
||||
|
||||
# Add regex_replace filter
|
||||
def regex_replace(s, pattern, repl):
|
||||
return re.sub(pattern, repl, s)
|
||||
|
||||
env.filters['regex_replace'] = regex_replace
|
||||
|
||||
# Retrieve environment variables
|
||||
REPO_NAME = os.getenv('REPO_NAME')
|
||||
BRANCH_NAME = os.getenv('BRANCH_NAME')
|
||||
COMMIT_ID = os.getenv('VERSION')
|
||||
NAMESPACE = os.getenv('NAMESPACE')
|
||||
|
||||
if not BRANCH_NAME or not COMMIT_ID or not BRANCH_NAME or not NAMESPACE:
|
||||
raise ValueError("Environment variables BRANCH_NAME, VERSION, BRANCH_NAME, NAMESPACE must be set.")
|
||||
|
||||
# Shorten the commit ID to the first 10 characters
|
||||
COMMIT_ID_SHORT = COMMIT_ID[:10]
|
||||
|
||||
# Sanitize flow name and commit ID to create a valid task queue name
|
||||
def sanitize_name(name):
|
||||
# Replace non-alphanumeric characters or invalid start with underscores
|
||||
sanitized = re.sub(r'\W|^(?=\d)', '_', name)
|
||||
# Replace multiple consecutive underscores with a single underscore
|
||||
sanitized = re.sub(r'_+', '_', sanitized)
|
||||
# Remove trailing underscores
|
||||
return sanitized.strip('_')
|
||||
|
||||
FLOW_NAME = REPO_NAME + "_" + BRANCH_NAME
|
||||
flow_name_safe = sanitize_name(FLOW_NAME)
|
||||
commit_id_safe = sanitize_name(COMMIT_ID_SHORT)
|
||||
|
||||
workflow_class_name = f"{flow_name_safe}_{commit_id_safe}"
|
||||
|
||||
def load_flow_definition(file_path):
|
||||
"""Load the flow definition from a JSON file."""
|
||||
try:
|
||||
with open(file_path, "r") as f:
|
||||
return json.load(f)
|
||||
except json.JSONDecodeError as e:
|
||||
print(f"Error loading JSON file '{file_path}': {e}")
|
||||
sys.exit(1)
|
||||
|
||||
def determine_flow_type(flow_definition):
|
||||
"""Determine the flow type based on the edges."""
|
||||
nodes = {node["id"]: node for node in flow_definition["nodes"]}
|
||||
edges = flow_definition["edges"]
|
||||
has_parallel = False
|
||||
for node_id in nodes:
|
||||
outgoing_edges = [e for e in edges if e["source"] == node_id]
|
||||
if len(outgoing_edges) > 1:
|
||||
has_parallel = True
|
||||
break
|
||||
if has_parallel:
|
||||
return "hybrid"
|
||||
elif len(edges) > 0:
|
||||
return "sequential"
|
||||
else:
|
||||
return "parallel"
|
||||
|
||||
def collect_root_input_keys(flow_definition):
|
||||
"""Collect all unique root input keys from the flow definition."""
|
||||
root_input_keys = set()
|
||||
for node in flow_definition.get("nodes", []): # Safeguard for missing nodes
|
||||
properties = node.get("data", {}).get("nodeConfig", {}).get("schema", {}).get("properties", {})
|
||||
for key, value in properties.items():
|
||||
source = value.get("source")
|
||||
if isinstance(source, str) and source.startswith("$root."):
|
||||
root_input_keys.add(source[6:]) # Adjusted to capture full path after $root.
|
||||
return list(root_input_keys)
|
||||
|
||||
def build_execution_graph(flow_definition):
|
||||
"""Builds an execution graph from the flow definition using networkx."""
|
||||
G = nx.DiGraph()
|
||||
nodes = {node["id"]: node for node in flow_definition["nodes"]}
|
||||
edges = flow_definition["edges"]
|
||||
|
||||
# Add nodes
|
||||
for node_id, node in nodes.items():
|
||||
G.add_node(node_id, node=node)
|
||||
|
||||
# Add edges
|
||||
for edge in edges:
|
||||
G.add_edge(edge["source"], edge["target"])
|
||||
|
||||
return G
|
||||
|
||||
def get_execution_steps(G):
|
||||
"""Returns a list of execution steps, each containing nodes that can be run in parallel."""
|
||||
try:
|
||||
levels = list(nx.topological_generations(G))
|
||||
execution_steps = [list(level) for level in levels]
|
||||
return execution_steps
|
||||
except nx.NetworkXUnfeasible:
|
||||
print("Error: Workflow graph has cycles.")
|
||||
sys.exit(1)
|
||||
|
||||
def generate_workflow(flow_definition, output_file):
|
||||
"""Generate the workflow code using the Jinja template."""
|
||||
template = env.get_template('workflow_template.py.j2')
|
||||
flow_type = determine_flow_type(flow_definition)
|
||||
root_input_keys = collect_root_input_keys(flow_definition)
|
||||
|
||||
# Filter out requestNode from nodes
|
||||
filtered_nodes = {node["id"]: node for node in flow_definition["nodes"] if node["type"] != "requestNode"}
|
||||
|
||||
# Filter edges to exclude connections to or from filtered nodes
|
||||
filtered_edges = [
|
||||
edge for edge in flow_definition["edges"]
|
||||
if edge["source"] in filtered_nodes and edge["target"] in filtered_nodes
|
||||
]
|
||||
|
||||
# Build execution graph and steps
|
||||
filtered_flow_definition = {
|
||||
"nodes": list(filtered_nodes.values()),
|
||||
"edges": filtered_edges,
|
||||
}
|
||||
G = build_execution_graph(filtered_flow_definition)
|
||||
execution_steps = get_execution_steps(G)
|
||||
|
||||
# Render the workflow template
|
||||
workflow_code = template.render(
|
||||
workflow_class_name=workflow_class_name,
|
||||
flow_type=flow_type,
|
||||
root_input_keys=root_input_keys,
|
||||
execution_steps=execution_steps,
|
||||
nodes=filtered_nodes
|
||||
)
|
||||
with open(output_file, "w") as f:
|
||||
f.write(workflow_code)
|
||||
print(f"Generated workflow: {output_file}")
|
||||
|
||||
if __name__ == "__main__":
|
||||
# Parse command-line arguments
|
||||
parser = argparse.ArgumentParser(description="Generate Temporal workflow from JSON flow definition.")
|
||||
parser.add_argument("--input-file", type=str, required=True,
|
||||
help="Path to the flow definition JSON file.")
|
||||
parser.add_argument("--output-file", type=str, required=True,
|
||||
help="Path to the generated workflow output file.")
|
||||
args = parser.parse_args()
|
||||
|
||||
# Load the flow definition and generate the workflow
|
||||
flow_file = args.input_file
|
||||
output_file = args.output_file
|
||||
flow_def = load_flow_definition(flow_file)
|
||||
generate_workflow(flow_def, output_file)
|
||||
5
requirements.txt
Normal file
5
requirements.txt
Normal file
@ -0,0 +1,5 @@
|
||||
temporalio==1.6.0
|
||||
jinja2==3.1.4
|
||||
pytest==8.3.3
|
||||
networkx==3.4.2
|
||||
jmespath==1.0.1
|
||||
222
sample_flows/flow_hybrid.json
Normal file
222
sample_flows/flow_hybrid.json
Normal file
@ -0,0 +1,222 @@
|
||||
{
|
||||
"nodes": [
|
||||
{
|
||||
"id": "2",
|
||||
"position": {
|
||||
"x": 0,
|
||||
"y": 0
|
||||
},
|
||||
"data": {
|
||||
"nodeConfig": {
|
||||
"activityConfig": {
|
||||
"retryPolicy": {
|
||||
"maximumAttempts": 0,
|
||||
"initialInterval": 1000,
|
||||
"backoffCoefficient": 2,
|
||||
"maximumInterval": 100000
|
||||
},
|
||||
"timeouts": {
|
||||
"startToCloseTimeout": 0,
|
||||
"scheduleToStartTimeout": 0,
|
||||
"scheduleToCloseTimeout": 0,
|
||||
"heartbeatTimeout": 0
|
||||
},
|
||||
"taskQueue": {
|
||||
"taskQueueName": ""
|
||||
},
|
||||
"advancedSettings": {
|
||||
"cancellationType": "TRY_CANCEL",
|
||||
"heartbeatEnabled": false
|
||||
}
|
||||
},
|
||||
"blockName": "addition",
|
||||
"commitId": "97ec9e0d50ca7308347b28f8c006a475357eb096",
|
||||
"repo_url": "http://centurion-version-control.default.svc.cluster.local:3000/Centurion/blocks_transformer.git",
|
||||
"schema": {
|
||||
"$schema": "http://json-schema.org/draft-07/schema#",
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"a": {
|
||||
"type": "integer",
|
||||
"description": "First number to add.",
|
||||
"source": "$root.a"
|
||||
},
|
||||
"b": {
|
||||
"type": "integer",
|
||||
"description": "Second number to add.",
|
||||
"source": "$root.b"
|
||||
}
|
||||
},
|
||||
"required": [
|
||||
"a",
|
||||
"b"
|
||||
]
|
||||
}
|
||||
},
|
||||
"outputSchema": {
|
||||
"$schema": "http://json-schema.org/draft-07/schema#",
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"sum": {
|
||||
"type": "integer",
|
||||
"description": "Sum of the two numbers."
|
||||
}
|
||||
}
|
||||
}
|
||||
},
|
||||
"type": "transformerNode",
|
||||
"positionAbsolute": {
|
||||
"x": 0,
|
||||
"y": 0
|
||||
},
|
||||
"width": 260,
|
||||
"height": 79
|
||||
},
|
||||
{
|
||||
"id": "m3aiq7ixuo6du35h8tr",
|
||||
"position": {
|
||||
"x": 0,
|
||||
"y": 150
|
||||
},
|
||||
"data": {
|
||||
"nodeConfig": {
|
||||
"activityConfig": {
|
||||
"retryPolicy": {
|
||||
"maximumAttempts": 0,
|
||||
"initialInterval": 1000,
|
||||
"backoffCoefficient": 2,
|
||||
"maximumInterval": 100000
|
||||
},
|
||||
"timeouts": {
|
||||
"startToCloseTimeout": 0,
|
||||
"scheduleToStartTimeout": 0,
|
||||
"scheduleToCloseTimeout": 0,
|
||||
"heartbeatTimeout": 0
|
||||
},
|
||||
"taskQueue": {
|
||||
"taskQueueName": ""
|
||||
},
|
||||
"advancedSettings": {
|
||||
"cancellationType": "TRY_CANCEL",
|
||||
"heartbeatEnabled": false
|
||||
}
|
||||
},
|
||||
"blockName": "multiply",
|
||||
"commitId": "db086f09c9df60f622c9be47734422454f5e181f",
|
||||
"repo_url": "http://centurion-version-control.default.svc.cluster.local:3000/Centurion/blocks_transformer.git",
|
||||
"schema": {
|
||||
"$schema": "http://json-schema.org/draft-07/schema#",
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"sum": {
|
||||
"type": "integer",
|
||||
"description": "Sum from the previous block.",
|
||||
"source": "$2.sum"
|
||||
}
|
||||
},
|
||||
"required": [
|
||||
"sum"
|
||||
]
|
||||
}
|
||||
},
|
||||
"outputSchema": {
|
||||
"$schema": "http://json-schema.org/draft-07/schema#",
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"product": {
|
||||
"type": "integer",
|
||||
"description": "Product of the sum and the multiplier."
|
||||
}
|
||||
}
|
||||
}
|
||||
},
|
||||
"type": "transformerNode",
|
||||
"positionAbsolute": {
|
||||
"x": 0,
|
||||
"y": 150
|
||||
},
|
||||
"width": 260,
|
||||
"height": 79
|
||||
},
|
||||
{
|
||||
"id": "m3aiqkrv4k1y6654ymr",
|
||||
"position": {
|
||||
"x": 0,
|
||||
"y": 300
|
||||
},
|
||||
"data": {
|
||||
"nodeConfig": {
|
||||
"activityConfig": {
|
||||
"retryPolicy": {
|
||||
"maximumAttempts": 0,
|
||||
"initialInterval": 1000,
|
||||
"backoffCoefficient": 2,
|
||||
"maximumInterval": 100000
|
||||
},
|
||||
"timeouts": {
|
||||
"startToCloseTimeout": 0,
|
||||
"scheduleToStartTimeout": 0,
|
||||
"scheduleToCloseTimeout": 0,
|
||||
"heartbeatTimeout": 0
|
||||
},
|
||||
"taskQueue": {
|
||||
"taskQueueName": ""
|
||||
},
|
||||
"advancedSettings": {
|
||||
"cancellationType": "TRY_CANCEL",
|
||||
"heartbeatEnabled": false
|
||||
}
|
||||
},
|
||||
"blockName": "power",
|
||||
"commitId": "057645be0cace46e3ea08d65f26fbc6dfd9348bd",
|
||||
"repo_url": "http://centurion-version-control.default.svc.cluster.local:3000/Centurion/blocks_transformer.git",
|
||||
"schema": {
|
||||
"$schema": "http://json-schema.org/draft-07/schema#",
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"product": {
|
||||
"type": "integer",
|
||||
"description": "Product from the previous block.",
|
||||
"source": "$2.sum"
|
||||
}
|
||||
},
|
||||
"required": [
|
||||
"product"
|
||||
]
|
||||
}
|
||||
},
|
||||
"outputSchema": {
|
||||
"$schema": "http://json-schema.org/draft-07/schema#",
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"power": {
|
||||
"type": "integer",
|
||||
"description": "Result of raising the product to the power."
|
||||
}
|
||||
}
|
||||
}
|
||||
},
|
||||
"type": "transformerNode",
|
||||
"positionAbsolute": {
|
||||
"x": 0,
|
||||
"y": 300
|
||||
},
|
||||
"width": 260,
|
||||
"height": 79
|
||||
}
|
||||
],
|
||||
"edges": [
|
||||
{
|
||||
"id": "2=>m3aiq7ixuo6du35h8tr",
|
||||
"source": "2",
|
||||
"target": "m3aiq7ixuo6du35h8tr",
|
||||
"type": "workflow"
|
||||
},
|
||||
{
|
||||
"id": "2=>m3aiqkrv4k1y6654ymr",
|
||||
"source": "2",
|
||||
"target": "m3aiqkrv4k1y6654ymr",
|
||||
"type": "workflow"
|
||||
}
|
||||
]
|
||||
}
|
||||
209
sample_flows/flow_parallel.json
Normal file
209
sample_flows/flow_parallel.json
Normal file
@ -0,0 +1,209 @@
|
||||
{
|
||||
"nodes": [
|
||||
{
|
||||
"id": "2",
|
||||
"position": {
|
||||
"x": 0,
|
||||
"y": 0
|
||||
},
|
||||
"data": {
|
||||
"nodeConfig": {
|
||||
"activityConfig": {
|
||||
"retryPolicy": {
|
||||
"maximumAttempts": 0,
|
||||
"initialInterval": 1000,
|
||||
"backoffCoefficient": 2,
|
||||
"maximumInterval": 100000
|
||||
},
|
||||
"timeouts": {
|
||||
"startToCloseTimeout": 0,
|
||||
"scheduleToStartTimeout": 0,
|
||||
"scheduleToCloseTimeout": 0,
|
||||
"heartbeatTimeout": 0
|
||||
},
|
||||
"taskQueue": {
|
||||
"taskQueueName": ""
|
||||
},
|
||||
"advancedSettings": {
|
||||
"cancellationType": "TRY_CANCEL",
|
||||
"heartbeatEnabled": false
|
||||
}
|
||||
},
|
||||
"blockName": "addition",
|
||||
"commitId": "97ec9e0d50ca7308347b28f8c006a475357eb096",
|
||||
"repo_url": "http://centurion-version-control.default.svc.cluster.local:3000/Centurion/blocks_transformer.git",
|
||||
"schema": {
|
||||
"$schema": "http://json-schema.org/draft-07/schema#",
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"a": {
|
||||
"type": "integer",
|
||||
"description": "First number to add.",
|
||||
"source": "$root.a"
|
||||
},
|
||||
"b": {
|
||||
"type": "integer",
|
||||
"description": "Second number to add.",
|
||||
"source": "$root.b"
|
||||
}
|
||||
},
|
||||
"required": [
|
||||
"a",
|
||||
"b"
|
||||
]
|
||||
}
|
||||
},
|
||||
"outputSchema": {
|
||||
"$schema": "http://json-schema.org/draft-07/schema#",
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"sum": {
|
||||
"type": "integer",
|
||||
"description": "Sum of the two numbers."
|
||||
}
|
||||
}
|
||||
}
|
||||
},
|
||||
"type": "transformerNode",
|
||||
"positionAbsolute": {
|
||||
"x": 0,
|
||||
"y": 0
|
||||
},
|
||||
"width": 260,
|
||||
"height": 79
|
||||
},
|
||||
{
|
||||
"id": "m3aiq7ixuo6du35h8tr",
|
||||
"position": {
|
||||
"x": 0,
|
||||
"y": 150
|
||||
},
|
||||
"data": {
|
||||
"nodeConfig": {
|
||||
"activityConfig": {
|
||||
"retryPolicy": {
|
||||
"maximumAttempts": 0,
|
||||
"initialInterval": 1000,
|
||||
"backoffCoefficient": 2,
|
||||
"maximumInterval": 100000
|
||||
},
|
||||
"timeouts": {
|
||||
"startToCloseTimeout": 0,
|
||||
"scheduleToStartTimeout": 0,
|
||||
"scheduleToCloseTimeout": 0,
|
||||
"heartbeatTimeout": 0
|
||||
},
|
||||
"taskQueue": {
|
||||
"taskQueueName": ""
|
||||
},
|
||||
"advancedSettings": {
|
||||
"cancellationType": "TRY_CANCEL",
|
||||
"heartbeatEnabled": false
|
||||
}
|
||||
},
|
||||
"blockName": "multiply",
|
||||
"commitId": "db086f09c9df60f622c9be47734422454f5e181f",
|
||||
"repo_url": "http://centurion-version-control.default.svc.cluster.local:3000/Centurion/blocks_transformer.git",
|
||||
"schema": {
|
||||
"$schema": "http://json-schema.org/draft-07/schema#",
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"sum": {
|
||||
"type": "integer",
|
||||
"description": "Sum from the previous block.",
|
||||
"source": "$root.sum"
|
||||
}
|
||||
},
|
||||
"required": [
|
||||
"sum"
|
||||
]
|
||||
}
|
||||
},
|
||||
"outputSchema": {
|
||||
"$schema": "http://json-schema.org/draft-07/schema#",
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"product": {
|
||||
"type": "integer",
|
||||
"description": "Product of the sum and the multiplier."
|
||||
}
|
||||
}
|
||||
}
|
||||
},
|
||||
"type": "transformerNode",
|
||||
"positionAbsolute": {
|
||||
"x": 0,
|
||||
"y": 150
|
||||
},
|
||||
"width": 260,
|
||||
"height": 79
|
||||
},
|
||||
{
|
||||
"id": "m3aiqkrv4k1y6654ymr",
|
||||
"position": {
|
||||
"x": 0,
|
||||
"y": 300
|
||||
},
|
||||
"data": {
|
||||
"nodeConfig": {
|
||||
"activityConfig": {
|
||||
"retryPolicy": {
|
||||
"maximumAttempts": 0,
|
||||
"initialInterval": 1000,
|
||||
"backoffCoefficient": 2,
|
||||
"maximumInterval": 100000
|
||||
},
|
||||
"timeouts": {
|
||||
"startToCloseTimeout": 0,
|
||||
"scheduleToStartTimeout": 0,
|
||||
"scheduleToCloseTimeout": 0,
|
||||
"heartbeatTimeout": 0
|
||||
},
|
||||
"taskQueue": {
|
||||
"taskQueueName": ""
|
||||
},
|
||||
"advancedSettings": {
|
||||
"cancellationType": "TRY_CANCEL",
|
||||
"heartbeatEnabled": false
|
||||
}
|
||||
},
|
||||
"blockName": "power",
|
||||
"commitId": "057645be0cace46e3ea08d65f26fbc6dfd9348bd",
|
||||
"repo_url": "http://centurion-version-control.default.svc.cluster.local:3000/Centurion/blocks_transformer.git",
|
||||
"schema": {
|
||||
"$schema": "http://json-schema.org/draft-07/schema#",
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"product": {
|
||||
"type": "integer",
|
||||
"description": "Product from the previous block.",
|
||||
"source": "$root.product"
|
||||
}
|
||||
},
|
||||
"required": [
|
||||
"product"
|
||||
]
|
||||
}
|
||||
},
|
||||
"outputSchema": {
|
||||
"$schema": "http://json-schema.org/draft-07/schema#",
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"power": {
|
||||
"type": "integer",
|
||||
"description": "Result of raising the product to the power."
|
||||
}
|
||||
}
|
||||
}
|
||||
},
|
||||
"type": "transformerNode",
|
||||
"positionAbsolute": {
|
||||
"x": 0,
|
||||
"y": 300
|
||||
},
|
||||
"width": 260,
|
||||
"height": 79
|
||||
}
|
||||
],
|
||||
"edges": []
|
||||
}
|
||||
222
sample_flows/flow_sequential.json
Normal file
222
sample_flows/flow_sequential.json
Normal file
@ -0,0 +1,222 @@
|
||||
{
|
||||
"nodes": [
|
||||
{
|
||||
"id": "2",
|
||||
"position": {
|
||||
"x": 0,
|
||||
"y": 0
|
||||
},
|
||||
"data": {
|
||||
"nodeConfig": {
|
||||
"activityConfig": {
|
||||
"retryPolicy": {
|
||||
"maximumAttempts": 0,
|
||||
"initialInterval": 1000,
|
||||
"backoffCoefficient": 2,
|
||||
"maximumInterval": 100000
|
||||
},
|
||||
"timeouts": {
|
||||
"startToCloseTimeout": 0,
|
||||
"scheduleToStartTimeout": 0,
|
||||
"scheduleToCloseTimeout": 0,
|
||||
"heartbeatTimeout": 0
|
||||
},
|
||||
"taskQueue": {
|
||||
"taskQueueName": ""
|
||||
},
|
||||
"advancedSettings": {
|
||||
"cancellationType": "TRY_CANCEL",
|
||||
"heartbeatEnabled": false
|
||||
}
|
||||
},
|
||||
"blockName": "addition",
|
||||
"commitId": "97ec9e0d50ca7308347b28f8c006a475357eb096",
|
||||
"repo_url": "http://centurion-version-control.default.svc.cluster.local:3000/Centurion/blocks_transformer.git",
|
||||
"schema": {
|
||||
"$schema": "http://json-schema.org/draft-07/schema#",
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"a": {
|
||||
"type": "integer",
|
||||
"description": "First number to add.",
|
||||
"source": "$root.a"
|
||||
},
|
||||
"b": {
|
||||
"type": "integer",
|
||||
"description": "Second number to add.",
|
||||
"source": "$root.b"
|
||||
}
|
||||
},
|
||||
"required": [
|
||||
"a",
|
||||
"b"
|
||||
]
|
||||
}
|
||||
},
|
||||
"outputSchema": {
|
||||
"$schema": "http://json-schema.org/draft-07/schema#",
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"sum": {
|
||||
"type": "integer",
|
||||
"description": "Sum of the two numbers."
|
||||
}
|
||||
}
|
||||
}
|
||||
},
|
||||
"type": "transformerNode",
|
||||
"positionAbsolute": {
|
||||
"x": 0,
|
||||
"y": 0
|
||||
},
|
||||
"width": 260,
|
||||
"height": 79
|
||||
},
|
||||
{
|
||||
"id": "m3aiq7ixuo6du35h8tr",
|
||||
"position": {
|
||||
"x": 0,
|
||||
"y": 150
|
||||
},
|
||||
"data": {
|
||||
"nodeConfig": {
|
||||
"activityConfig": {
|
||||
"retryPolicy": {
|
||||
"maximumAttempts": 0,
|
||||
"initialInterval": 1000,
|
||||
"backoffCoefficient": 2,
|
||||
"maximumInterval": 100000
|
||||
},
|
||||
"timeouts": {
|
||||
"startToCloseTimeout": 0,
|
||||
"scheduleToStartTimeout": 0,
|
||||
"scheduleToCloseTimeout": 0,
|
||||
"heartbeatTimeout": 0
|
||||
},
|
||||
"taskQueue": {
|
||||
"taskQueueName": ""
|
||||
},
|
||||
"advancedSettings": {
|
||||
"cancellationType": "TRY_CANCEL",
|
||||
"heartbeatEnabled": false
|
||||
}
|
||||
},
|
||||
"blockName": "multiply",
|
||||
"commitId": "db086f09c9df60f622c9be47734422454f5e181f",
|
||||
"repo_url": "http://centurion-version-control.default.svc.cluster.local:3000/Centurion/blocks_transformer.git",
|
||||
"schema": {
|
||||
"$schema": "http://json-schema.org/draft-07/schema#",
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"sum": {
|
||||
"type": "integer",
|
||||
"description": "Sum from the previous block.",
|
||||
"source": "$2.sum"
|
||||
}
|
||||
},
|
||||
"required": [
|
||||
"sum"
|
||||
]
|
||||
}
|
||||
},
|
||||
"outputSchema": {
|
||||
"$schema": "http://json-schema.org/draft-07/schema#",
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"product": {
|
||||
"type": "integer",
|
||||
"description": "Product of the sum and the multiplier."
|
||||
}
|
||||
}
|
||||
}
|
||||
},
|
||||
"type": "transformerNode",
|
||||
"positionAbsolute": {
|
||||
"x": 0,
|
||||
"y": 150
|
||||
},
|
||||
"width": 260,
|
||||
"height": 79
|
||||
},
|
||||
{
|
||||
"id": "m3aiqkrv4k1y6654ymr",
|
||||
"position": {
|
||||
"x": 0,
|
||||
"y": 300
|
||||
},
|
||||
"data": {
|
||||
"nodeConfig": {
|
||||
"activityConfig": {
|
||||
"retryPolicy": {
|
||||
"maximumAttempts": 0,
|
||||
"initialInterval": 1000,
|
||||
"backoffCoefficient": 2,
|
||||
"maximumInterval": 100000
|
||||
},
|
||||
"timeouts": {
|
||||
"startToCloseTimeout": 0,
|
||||
"scheduleToStartTimeout": 0,
|
||||
"scheduleToCloseTimeout": 0,
|
||||
"heartbeatTimeout": 0
|
||||
},
|
||||
"taskQueue": {
|
||||
"taskQueueName": ""
|
||||
},
|
||||
"advancedSettings": {
|
||||
"cancellationType": "TRY_CANCEL",
|
||||
"heartbeatEnabled": false
|
||||
}
|
||||
},
|
||||
"blockName": "power",
|
||||
"commitId": "057645be0cace46e3ea08d65f26fbc6dfd9348bd",
|
||||
"repo_url": "http://centurion-version-control.default.svc.cluster.local:3000/Centurion/blocks_transformer.git",
|
||||
"schema": {
|
||||
"$schema": "http://json-schema.org/draft-07/schema#",
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"product": {
|
||||
"type": "integer",
|
||||
"description": "Product from the previous block.",
|
||||
"source": "$m3aiq7ixuo6du35h8tr.product"
|
||||
}
|
||||
},
|
||||
"required": [
|
||||
"product"
|
||||
]
|
||||
}
|
||||
},
|
||||
"outputSchema": {
|
||||
"$schema": "http://json-schema.org/draft-07/schema#",
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"power": {
|
||||
"type": "integer",
|
||||
"description": "Result of raising the product to the power."
|
||||
}
|
||||
}
|
||||
}
|
||||
},
|
||||
"type": "transformerNode",
|
||||
"positionAbsolute": {
|
||||
"x": 0,
|
||||
"y": 300
|
||||
},
|
||||
"width": 260,
|
||||
"height": 79
|
||||
}
|
||||
],
|
||||
"edges": [
|
||||
{
|
||||
"id": "2=>m3aiq7ixuo6du35h8tr",
|
||||
"source": "2",
|
||||
"target": "m3aiq7ixuo6du35h8tr",
|
||||
"type": "workflow"
|
||||
},
|
||||
{
|
||||
"id": "m3aiq7ixuo6du35h8tr=>m3aiqkrv4k1y6654ymr",
|
||||
"source": "m3aiq7ixuo6du35h8tr",
|
||||
"target": "m3aiqkrv4k1y6654ymr",
|
||||
"type": "workflow"
|
||||
}
|
||||
]
|
||||
}
|
||||
153
templates/workflow_template.py.j2
Normal file
153
templates/workflow_template.py.j2
Normal file
@ -0,0 +1,153 @@
|
||||
import temporalio.workflow
|
||||
from typing import Any, Dict, List, Callable, Awaitable
|
||||
import logging
|
||||
import asyncio
|
||||
import json
|
||||
import datetime
|
||||
import re
|
||||
import jmespath
|
||||
from temporalio.exceptions import ApplicationError
|
||||
|
||||
# Configure logging
|
||||
logging.basicConfig(level=logging.INFO,
|
||||
format="%(asctime)s [%(levelname)s] %(message)s")
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
@temporalio.workflow.defn
|
||||
class {{ workflow_class_name }}:
|
||||
@temporalio.workflow.run
|
||||
async def run(self, root_inputs: Dict[str, Any]) -> Dict[str, Any]:
|
||||
workflow_info = temporalio.workflow.info()
|
||||
workflow_output: Dict[str, Any] = {
|
||||
"workflow_id": workflow_info.workflow_id,
|
||||
"run_id": workflow_info.run_id,
|
||||
"name": "{{ workflow_class_name }}",
|
||||
"status": "in_progress",
|
||||
"blocks": [],
|
||||
"root_input": root_inputs
|
||||
}
|
||||
try:
|
||||
# Initialize results
|
||||
results: Dict[str, Any] = {}
|
||||
|
||||
# Define task functions
|
||||
task_functions: Dict[str, Callable[[], Awaitable[Any]]] = {}
|
||||
|
||||
{%- for node_id, node in nodes.items() %}
|
||||
{%- set block_name = node['data']['nodeConfig']['blockName'] %}
|
||||
{%- set commit_id = node['data']['nodeConfig'].get('commitId', '') %}
|
||||
{%- set commit_id_short = commit_id[:10] %}
|
||||
{%- set repo_url = node['data']['nodeConfig']['repo_url'] %}
|
||||
{%- set repo_name_1 = repo_url.split('/')[-1].split('.')[0] %}
|
||||
{%- set repo_name = repo_name_1|regex_replace('[^A-Za-z0-9_]', '_') %}
|
||||
{%- set block_name_safe = block_name|regex_replace('[^A-Za-z0-9_]', '_') %}
|
||||
{%- set commit_id_short_safe = commit_id_short|regex_replace('[^A-Za-z0-9_]', '_') %}
|
||||
{%- set task_queue_name = repo_name + '_' + block_name_safe + '_' + commit_id_short_safe %}
|
||||
async def task_{{ node_id }}():
|
||||
node_id = "{{ node_id }}"
|
||||
block_name = "{{ block_name }}"
|
||||
# Prepare inputs
|
||||
input_params: Dict[str, Any] = {}
|
||||
{%- for param, details in node['data']['nodeConfig']['schema']['properties'].items() %}
|
||||
{%- set source = details.get("source", "") %}
|
||||
{%- if source and source.startswith("$root.") %}
|
||||
try:
|
||||
jsonpath_expr = jmespath.compile("{{ source[6:] }}")
|
||||
value = jsonpath_expr.search(root_inputs)
|
||||
input_params["{{ param }}"] = value
|
||||
except Exception as e:
|
||||
logger.error(f"Error parsing jsonpath '{{ source[6:] }}' for parameter '{{ param }}': {e}")
|
||||
input_params["{{ param }}"] = None
|
||||
{%- elif source and source.startswith("$") %}
|
||||
{%- set source_parts = source[1:].split('.', 1) %}
|
||||
{%- set source_node_id = source_parts[0] %}
|
||||
{%- set source_path = source_parts[1] if source_parts|length > 1 else '' %}
|
||||
try:
|
||||
source_data = results.get("{{ source_node_id }}", {})
|
||||
jsonpath_expr = jmespath.compile("{{ source_path }}")
|
||||
value = jsonpath_expr.search(source_data)
|
||||
input_params["{{ param }}"] = value
|
||||
except Exception as e:
|
||||
logger.error(f"Error parsing jsonpath '{{ source_path }}' for parameter '{{ param }}' from node '{{ source_node_id }}': {e}")
|
||||
input_params["{{ param }}"] = None
|
||||
{%- else %}
|
||||
input_params["{{ param }}"] = None
|
||||
{%- endif %}
|
||||
{%- endfor %}
|
||||
logger.info(f"Starting '{{ block_name }}' activity on task queue '{{ task_queue_name }}' with inputs: %s", input_params)
|
||||
try:
|
||||
# Convert timeouts and intervals from milliseconds to seconds
|
||||
schedule_to_close_timeout_value_ms = {{ node['data']['nodeConfig']['activityConfig']['timeouts']['scheduleToCloseTimeout'] }}
|
||||
start_to_close_timeout_value_ms = {{ node['data']['nodeConfig']['activityConfig']['timeouts']['startToCloseTimeout'] }}
|
||||
schedule_to_close_timeout = None if schedule_to_close_timeout_value_ms == 0 else datetime.timedelta(seconds=schedule_to_close_timeout_value_ms / 1000.0)
|
||||
start_to_close_timeout = None if start_to_close_timeout_value_ms == 0 else datetime.timedelta(seconds=start_to_close_timeout_value_ms / 1000.0)
|
||||
initial_interval_value_ms = {{ node['data']['nodeConfig']['activityConfig']['retryPolicy']['initialInterval'] }}
|
||||
maximum_interval_value_ms = {{ node['data']['nodeConfig']['activityConfig']['retryPolicy']['maximumInterval'] }}
|
||||
initial_interval = datetime.timedelta(seconds=initial_interval_value_ms / 1000.0)
|
||||
maximum_interval = datetime.timedelta(seconds=maximum_interval_value_ms / 1000.0)
|
||||
maximum_attempts_value = {{ node['data']['nodeConfig']['activityConfig']['retryPolicy']['maximumAttempts'] }}
|
||||
maximum_attempts = None if maximum_attempts_value == 0 else maximum_attempts_value
|
||||
|
||||
result = await temporalio.workflow.execute_activity(
|
||||
"block_main_activity",
|
||||
input_params,
|
||||
schedule_to_close_timeout=schedule_to_close_timeout,
|
||||
start_to_close_timeout=start_to_close_timeout,
|
||||
task_queue="{{ task_queue_name }}",
|
||||
retry_policy=temporalio.common.RetryPolicy(
|
||||
maximum_attempts=maximum_attempts,
|
||||
initial_interval=initial_interval,
|
||||
backoff_coefficient={{ node['data']['nodeConfig']['activityConfig']['retryPolicy']['backoffCoefficient'] }},
|
||||
maximum_interval=maximum_interval
|
||||
)
|
||||
)
|
||||
logger.info(f"Completed '{{ block_name }}' activity with result: %s", result)
|
||||
block_status = "completed"
|
||||
block_error = None
|
||||
results[node_id] = result
|
||||
except Exception as e:
|
||||
logger.error(f"Activity '{{ block_name }}' failed with error: {e}")
|
||||
result = None
|
||||
block_status = "failed"
|
||||
block_error = {
|
||||
"code": type(e).__name__,
|
||||
"description": str(e),
|
||||
"details": {"cause": str(getattr(e, "cause", "No additional details"))}
|
||||
}
|
||||
workflow_output["status"] = "failed"
|
||||
# Collect block output
|
||||
workflow_output["blocks"].append({
|
||||
"activity_id": node_id,
|
||||
"name": block_name,
|
||||
"status": block_status,
|
||||
"input": input_params,
|
||||
"result": result,
|
||||
"error": block_error
|
||||
})
|
||||
|
||||
task_functions["{{ node_id }}"] = task_{{ node_id }}
|
||||
{%- endfor %}
|
||||
|
||||
# Execute tasks according to execution steps
|
||||
{%- for step in execution_steps %}
|
||||
# Execution step {{ loop.index }}
|
||||
tasks = [task_functions[node_id]() for node_id in {{ step }}]
|
||||
results_step = await asyncio.gather(*tasks, return_exceptions=True)
|
||||
for result in results_step:
|
||||
if isinstance(result, Exception):
|
||||
logger.error(f"Task failed with exception: {result}")
|
||||
workflow_output["status"] = "failed"
|
||||
{%- endfor %}
|
||||
|
||||
# Update workflow status to completed if not failed
|
||||
if workflow_output["status"] != "failed":
|
||||
workflow_output["status"] = "completed"
|
||||
else:
|
||||
raise ApplicationError("Activity error occurred", type="ActivityError", non_retryable=True)
|
||||
|
||||
return workflow_output
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Workflow failed with error: {e}")
|
||||
workflow_output["status"] = "failed"
|
||||
raise temporalio.exceptions.ApplicationError("Workflow failed",workflow_output,str(e),type="WorkflowError",non_retryable=True) from e
|
||||
62
test_generator.py
Normal file
62
test_generator.py
Normal file
@ -0,0 +1,62 @@
|
||||
import os
|
||||
import filecmp
|
||||
import unittest
|
||||
|
||||
|
||||
class TestWorkflowGeneration(unittest.TestCase):
|
||||
"""
|
||||
Unit tests for verifying workflow generation.
|
||||
"""
|
||||
|
||||
def setUp(self):
|
||||
# Ensure required directories exist
|
||||
self.output_dir = "generated_workflows"
|
||||
if not os.path.exists(self.output_dir):
|
||||
os.makedirs(self.output_dir)
|
||||
|
||||
# Set required environment variables
|
||||
os.environ["REPO_NAME"] = "test_repo"
|
||||
os.environ["BRANCH_NAME"] = "test_branch"
|
||||
os.environ["VERSION"] = "1234567890"
|
||||
os.environ["NAMESPACE"] = "test_namespace"
|
||||
|
||||
def run_test_case(self, flow_file, expected_output):
|
||||
"""
|
||||
Helper function to run a test case.
|
||||
|
||||
Args:
|
||||
flow_file (str): The input flow definition JSON file.
|
||||
expected_output (str): The path to the expected output file.
|
||||
"""
|
||||
output_file = os.path.join(self.output_dir, "workflow.py")
|
||||
|
||||
# Run the generator with the specified flow file and output file
|
||||
os.system(f"python generator.py --input-file {flow_file} --output-file {output_file}")
|
||||
|
||||
# Compare the generated file with the expected output
|
||||
self.assertTrue(
|
||||
os.path.exists(output_file), f"Generated file not found: {output_file}"
|
||||
)
|
||||
self.assertTrue(
|
||||
filecmp.cmp(output_file, expected_output, shallow=False),
|
||||
f"Generated file {output_file} does not match expected output {expected_output}",
|
||||
)
|
||||
|
||||
def test_sequential_flow(self):
|
||||
self.run_test_case(
|
||||
"sample_flows/flow_sequential.json", "expected_workflows/flow_sequential_expected.py"
|
||||
)
|
||||
|
||||
def test_parallel_flow(self):
|
||||
self.run_test_case(
|
||||
"sample_flows/flow_parallel.json", "expected_workflows/flow_parallel_expected.py"
|
||||
)
|
||||
|
||||
def test_hybrid_flow(self):
|
||||
self.run_test_case(
|
||||
"sample_flows/flow_hybrid.json", "expected_workflows/flow_hybrid_expected.py"
|
||||
)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
unittest.main()
|
||||
Loading…
x
Reference in New Issue
Block a user