Compare commits

..

7 Commits

Author SHA1 Message Date
6ce236ba8b Upload files to "/"
All checks were successful
CI Workflow / Testing the Flow (push) Successful in 7s
CI Workflow / Containerize the Flow (push) Successful in 14s
2025-04-09 20:08:58 +00:00
428f732a84 Upload files to "/"
All checks were successful
CI Workflow / Testing the Flow (push) Successful in 14s
CI Workflow / Containerize the Flow (push) Successful in 28s
2025-04-09 20:03:23 +00:00
ba9263a779 Delete test_generator.py
All checks were successful
CI Workflow / Testing the Flow (push) Successful in 8s
CI Workflow / Containerize the Flow (push) Successful in 37s
2025-04-09 17:11:58 +00:00
eb16868469 Update expected_workflows/flow_sequential_expected.py
Some checks failed
CI Workflow / Testing the Flow (push) Failing after 16s
CI Workflow / Containerize the Flow (push) Has been skipped
2025-04-09 17:04:49 +00:00
3ff0cd3e20 Update expected_workflows/flow_parallel_expected.py
Some checks are pending
CI Workflow / Testing the Flow (push) Waiting to run
CI Workflow / Containerize the Flow (push) Blocked by required conditions
2025-04-09 17:04:36 +00:00
f27327cea9 Update expected_workflows/flow_hybrid_expected.py
Some checks are pending
CI Workflow / Testing the Flow (push) Waiting to run
CI Workflow / Containerize the Flow (push) Blocked by required conditions
2025-04-09 17:04:22 +00:00
a72efd6c0b Add initial files
Some checks are pending
CI Workflow / Testing the Flow (push) Waiting to run
CI Workflow / Containerize the Flow (push) Blocked by required conditions
2025-04-09 16:45:29 +00:00
13 changed files with 1927 additions and 0 deletions

View File

@ -0,0 +1 @@
{}

View File

@ -0,0 +1,62 @@
name: CI Workflow
on:
push:
branches:
- '*'
jobs:
test:
name: Testing the Flow
runs-on: ubuntu-latest
steps:
- name: Checkout Code
uses: actions/checkout@v4
- name: Set Up Python Environment
uses: actions/setup-python@v4
with:
python-version: '3.10'
- name: Install Python Dependencies
run: |
python -m pip install --upgrade pip
pip install -r requirements.txt
- name: Execute Unit Tests
run: python -m unittest discover -s . -p 'test_*.py'
build_and_push:
name: Containerize the Flow
runs-on: ubuntu-latest
needs: test
steps:
- name: Extract Repository Name
id: extract_repo
run: echo "repo_name=${GITHUB_REPOSITORY##*/}" >> $GITHUB_OUTPUT
- name: Checkout Codebase
uses: actions/checkout@v4
with:
fetch-depth: 1
- name: Configure Docker Buildx
uses: docker/setup-buildx-action@v3
with:
driver: docker
- name: Log In to Container Registry
uses: docker/login-action@v2
with:
registry: centurion-version-control.default.svc.cluster.local:3000
username: ${{ secrets.CI_USER }}
password: ${{ secrets.CI_USER_TOKEN }}
- name: Build and Push Docker Image to Registry
uses: docker/build-push-action@v4
with:
context: .
push: true
tags: |
centurion-version-control.default.svc.cluster.local:3000/centurion/${{ steps.extract_repo.outputs.repo_name }}/${{ github.ref_name }}:${{ github.sha }}
centurion-version-control.default.svc.cluster.local:3000/centurion/${{ steps.extract_repo.outputs.repo_name }}/${{ github.ref_name }}:latest

17
Dockerfile Normal file
View File

@ -0,0 +1,17 @@
# Use Python slim image as base
FROM python:3.10-slim AS base
# Set up a directory for the application code
WORKDIR /app
# Copy only the requirements file initially for better caching
COPY requirements.txt .
# Install Workflow SDK and other dependencies
RUN pip install --no-cache-dir -r requirements.txt
# Copy the rest of the application code
COPY . .
# No entrypoint as this is a preparation image
ENTRYPOINT ["python", "/app/flow_wrapper.py"]

View File

@ -0,0 +1,264 @@
import temporalio.workflow
from typing import Any, Dict, List, Callable, Awaitable
import logging
import asyncio
import json
import datetime
import re
import jmespath
from temporalio.exceptions import ApplicationError
# Configure logging
logging.basicConfig(level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(message)s")
logger = logging.getLogger(__name__)
@temporalio.workflow.defn
class test_repo_test_branch:
@temporalio.workflow.run
async def run(self, root_inputs: Dict[str, Any]) -> Dict[str, Any]:
workflow_info = temporalio.workflow.info()
workflow_output: Dict[str, Any] = {
"workflow_id": workflow_info.workflow_id,
"run_id": workflow_info.run_id,
"name": "test_repo_test_branch",
"status": "in_progress",
"blocks": [],
"root_input": root_inputs
}
try:
# Initialize results
results: Dict[str, Any] = {}
# Define task functions
task_functions: Dict[str, Callable[[], Awaitable[Any]]] = {}
async def task_2():
node_id = "2"
block_name = "addition"
# Prepare inputs
input_params: Dict[str, Any] = {}
try:
jsonpath_expr = jmespath.compile("a")
value = jsonpath_expr.search(root_inputs)
input_params["a"] = value
except Exception as e:
logger.error(f"Error parsing jsonpath 'a' for parameter 'a': {e}")
input_params["a"] = None
try:
jsonpath_expr = jmespath.compile("b")
value = jsonpath_expr.search(root_inputs)
input_params["b"] = value
except Exception as e:
logger.error(f"Error parsing jsonpath 'b' for parameter 'b': {e}")
input_params["b"] = None
logger.info(f"Starting 'addition' activity on task queue 'blocks_transformer_addition_97ec9e0d50' with inputs: %s", input_params)
try:
# Convert timeouts and intervals from milliseconds to seconds
schedule_to_close_timeout_value_ms = 0
start_to_close_timeout_value_ms = 0
schedule_to_close_timeout = None if schedule_to_close_timeout_value_ms == 0 else datetime.timedelta(seconds=schedule_to_close_timeout_value_ms / 1000.0)
start_to_close_timeout = None if start_to_close_timeout_value_ms == 0 else datetime.timedelta(seconds=start_to_close_timeout_value_ms / 1000.0)
initial_interval_value_ms = 1000
maximum_interval_value_ms = 100000
initial_interval = datetime.timedelta(seconds=initial_interval_value_ms / 1000.0)
maximum_interval = datetime.timedelta(seconds=maximum_interval_value_ms / 1000.0)
maximum_attempts_value = 0
maximum_attempts = None if maximum_attempts_value == 0 else maximum_attempts_value
result = await temporalio.workflow.execute_activity(
"block_main_activity",
input_params,
schedule_to_close_timeout=schedule_to_close_timeout,
start_to_close_timeout=start_to_close_timeout,
task_queue="blocks_transformer_addition_97ec9e0d50",
retry_policy=temporalio.common.RetryPolicy(
maximum_attempts=maximum_attempts,
initial_interval=initial_interval,
backoff_coefficient=2,
maximum_interval=maximum_interval
)
)
logger.info(f"Completed 'addition' activity with result: %s", result)
block_status = "completed"
block_error = None
results[node_id] = result
except Exception as e:
logger.error(f"Activity 'addition' failed with error: {e}")
result = None
block_status = "failed"
block_error = {
"code": type(e).__name__,
"description": str(e),
"details": {"cause": str(getattr(e, "cause", "No additional details"))}
}
workflow_output["status"] = "failed"
# Collect block output
workflow_output["blocks"].append({
"activity_id": node_id,
"name": block_name,
"status": block_status,
"input": input_params,
"result": result,
"error": block_error
})
task_functions["2"] = task_2
async def task_m3aiq7ixuo6du35h8tr():
node_id = "m3aiq7ixuo6du35h8tr"
block_name = "multiply"
# Prepare inputs
input_params: Dict[str, Any] = {}
try:
source_data = results.get("2", {})
jsonpath_expr = jmespath.compile("sum")
value = jsonpath_expr.search(source_data)
input_params["sum"] = value
except Exception as e:
logger.error(f"Error parsing jsonpath 'sum' for parameter 'sum' from node '2': {e}")
input_params["sum"] = None
logger.info(f"Starting 'multiply' activity on task queue 'blocks_transformer_multiply_db086f09c9' with inputs: %s", input_params)
try:
# Convert timeouts and intervals from milliseconds to seconds
schedule_to_close_timeout_value_ms = 0
start_to_close_timeout_value_ms = 0
schedule_to_close_timeout = None if schedule_to_close_timeout_value_ms == 0 else datetime.timedelta(seconds=schedule_to_close_timeout_value_ms / 1000.0)
start_to_close_timeout = None if start_to_close_timeout_value_ms == 0 else datetime.timedelta(seconds=start_to_close_timeout_value_ms / 1000.0)
initial_interval_value_ms = 1000
maximum_interval_value_ms = 100000
initial_interval = datetime.timedelta(seconds=initial_interval_value_ms / 1000.0)
maximum_interval = datetime.timedelta(seconds=maximum_interval_value_ms / 1000.0)
maximum_attempts_value = 0
maximum_attempts = None if maximum_attempts_value == 0 else maximum_attempts_value
result = await temporalio.workflow.execute_activity(
"block_main_activity",
input_params,
schedule_to_close_timeout=schedule_to_close_timeout,
start_to_close_timeout=start_to_close_timeout,
task_queue="blocks_transformer_multiply_db086f09c9",
retry_policy=temporalio.common.RetryPolicy(
maximum_attempts=maximum_attempts,
initial_interval=initial_interval,
backoff_coefficient=2,
maximum_interval=maximum_interval
)
)
logger.info(f"Completed 'multiply' activity with result: %s", result)
block_status = "completed"
block_error = None
results[node_id] = result
except Exception as e:
logger.error(f"Activity 'multiply' failed with error: {e}")
result = None
block_status = "failed"
block_error = {
"code": type(e).__name__,
"description": str(e),
"details": {"cause": str(getattr(e, "cause", "No additional details"))}
}
workflow_output["status"] = "failed"
# Collect block output
workflow_output["blocks"].append({
"activity_id": node_id,
"name": block_name,
"status": block_status,
"input": input_params,
"result": result,
"error": block_error
})
task_functions["m3aiq7ixuo6du35h8tr"] = task_m3aiq7ixuo6du35h8tr
async def task_m3aiqkrv4k1y6654ymr():
node_id = "m3aiqkrv4k1y6654ymr"
block_name = "power"
# Prepare inputs
input_params: Dict[str, Any] = {}
try:
source_data = results.get("2", {})
jsonpath_expr = jmespath.compile("sum")
value = jsonpath_expr.search(source_data)
input_params["product"] = value
except Exception as e:
logger.error(f"Error parsing jsonpath 'sum' for parameter 'product' from node '2': {e}")
input_params["product"] = None
logger.info(f"Starting 'power' activity on task queue 'blocks_transformer_power_057645be0c' with inputs: %s", input_params)
try:
# Convert timeouts and intervals from milliseconds to seconds
schedule_to_close_timeout_value_ms = 0
start_to_close_timeout_value_ms = 0
schedule_to_close_timeout = None if schedule_to_close_timeout_value_ms == 0 else datetime.timedelta(seconds=schedule_to_close_timeout_value_ms / 1000.0)
start_to_close_timeout = None if start_to_close_timeout_value_ms == 0 else datetime.timedelta(seconds=start_to_close_timeout_value_ms / 1000.0)
initial_interval_value_ms = 1000
maximum_interval_value_ms = 100000
initial_interval = datetime.timedelta(seconds=initial_interval_value_ms / 1000.0)
maximum_interval = datetime.timedelta(seconds=maximum_interval_value_ms / 1000.0)
maximum_attempts_value = 0
maximum_attempts = None if maximum_attempts_value == 0 else maximum_attempts_value
result = await temporalio.workflow.execute_activity(
"block_main_activity",
input_params,
schedule_to_close_timeout=schedule_to_close_timeout,
start_to_close_timeout=start_to_close_timeout,
task_queue="blocks_transformer_power_057645be0c",
retry_policy=temporalio.common.RetryPolicy(
maximum_attempts=maximum_attempts,
initial_interval=initial_interval,
backoff_coefficient=2,
maximum_interval=maximum_interval
)
)
logger.info(f"Completed 'power' activity with result: %s", result)
block_status = "completed"
block_error = None
results[node_id] = result
except Exception as e:
logger.error(f"Activity 'power' failed with error: {e}")
result = None
block_status = "failed"
block_error = {
"code": type(e).__name__,
"description": str(e),
"details": {"cause": str(getattr(e, "cause", "No additional details"))}
}
workflow_output["status"] = "failed"
# Collect block output
workflow_output["blocks"].append({
"activity_id": node_id,
"name": block_name,
"status": block_status,
"input": input_params,
"result": result,
"error": block_error
})
task_functions["m3aiqkrv4k1y6654ymr"] = task_m3aiqkrv4k1y6654ymr
# Execute tasks according to execution steps
# Execution step 1
tasks = [task_functions[node_id]() for node_id in ['2']]
results_step = await asyncio.gather(*tasks, return_exceptions=True)
for result in results_step:
if isinstance(result, Exception):
logger.error(f"Task failed with exception: {result}")
workflow_output["status"] = "failed"
# Execution step 2
tasks = [task_functions[node_id]() for node_id in ['m3aiq7ixuo6du35h8tr', 'm3aiqkrv4k1y6654ymr']]
results_step = await asyncio.gather(*tasks, return_exceptions=True)
for result in results_step:
if isinstance(result, Exception):
logger.error(f"Task failed with exception: {result}")
workflow_output["status"] = "failed"
# Update workflow status to completed if not failed
if workflow_output["status"] != "failed":
workflow_output["status"] = "completed"
else:
raise ApplicationError("Activity error occurred", type="ActivityError", non_retryable=True)
return workflow_output
except Exception as e:
logger.error(f"Workflow failed with error: {e}")
workflow_output["status"] = "failed"
raise temporalio.exceptions.ApplicationError("Workflow failed",workflow_output,str(e),type="WorkflowError",non_retryable=True) from e

View File

@ -0,0 +1,255 @@
import temporalio.workflow
from typing import Any, Dict, List, Callable, Awaitable
import logging
import asyncio
import json
import datetime
import re
import jmespath
from temporalio.exceptions import ApplicationError
# Configure logging
logging.basicConfig(level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(message)s")
logger = logging.getLogger(__name__)
@temporalio.workflow.defn
class test_repo_test_branch:
@temporalio.workflow.run
async def run(self, root_inputs: Dict[str, Any]) -> Dict[str, Any]:
workflow_info = temporalio.workflow.info()
workflow_output: Dict[str, Any] = {
"workflow_id": workflow_info.workflow_id,
"run_id": workflow_info.run_id,
"name": "test_repo_test_branch",
"status": "in_progress",
"blocks": [],
"root_input": root_inputs
}
try:
# Initialize results
results: Dict[str, Any] = {}
# Define task functions
task_functions: Dict[str, Callable[[], Awaitable[Any]]] = {}
async def task_2():
node_id = "2"
block_name = "addition"
# Prepare inputs
input_params: Dict[str, Any] = {}
try:
jsonpath_expr = jmespath.compile("a")
value = jsonpath_expr.search(root_inputs)
input_params["a"] = value
except Exception as e:
logger.error(f"Error parsing jsonpath 'a' for parameter 'a': {e}")
input_params["a"] = None
try:
jsonpath_expr = jmespath.compile("b")
value = jsonpath_expr.search(root_inputs)
input_params["b"] = value
except Exception as e:
logger.error(f"Error parsing jsonpath 'b' for parameter 'b': {e}")
input_params["b"] = None
logger.info(f"Starting 'addition' activity on task queue 'blocks_transformer_addition_97ec9e0d50' with inputs: %s", input_params)
try:
# Convert timeouts and intervals from milliseconds to seconds
schedule_to_close_timeout_value_ms = 0
start_to_close_timeout_value_ms = 0
schedule_to_close_timeout = None if schedule_to_close_timeout_value_ms == 0 else datetime.timedelta(seconds=schedule_to_close_timeout_value_ms / 1000.0)
start_to_close_timeout = None if start_to_close_timeout_value_ms == 0 else datetime.timedelta(seconds=start_to_close_timeout_value_ms / 1000.0)
initial_interval_value_ms = 1000
maximum_interval_value_ms = 100000
initial_interval = datetime.timedelta(seconds=initial_interval_value_ms / 1000.0)
maximum_interval = datetime.timedelta(seconds=maximum_interval_value_ms / 1000.0)
maximum_attempts_value = 0
maximum_attempts = None if maximum_attempts_value == 0 else maximum_attempts_value
result = await temporalio.workflow.execute_activity(
"block_main_activity",
input_params,
schedule_to_close_timeout=schedule_to_close_timeout,
start_to_close_timeout=start_to_close_timeout,
task_queue="blocks_transformer_addition_97ec9e0d50",
retry_policy=temporalio.common.RetryPolicy(
maximum_attempts=maximum_attempts,
initial_interval=initial_interval,
backoff_coefficient=2,
maximum_interval=maximum_interval
)
)
logger.info(f"Completed 'addition' activity with result: %s", result)
block_status = "completed"
block_error = None
results[node_id] = result
except Exception as e:
logger.error(f"Activity 'addition' failed with error: {e}")
result = None
block_status = "failed"
block_error = {
"code": type(e).__name__,
"description": str(e),
"details": {"cause": str(getattr(e, "cause", "No additional details"))}
}
workflow_output["status"] = "failed"
# Collect block output
workflow_output["blocks"].append({
"activity_id": node_id,
"name": block_name,
"status": block_status,
"input": input_params,
"result": result,
"error": block_error
})
task_functions["2"] = task_2
async def task_m3aiq7ixuo6du35h8tr():
node_id = "m3aiq7ixuo6du35h8tr"
block_name = "multiply"
# Prepare inputs
input_params: Dict[str, Any] = {}
try:
jsonpath_expr = jmespath.compile("sum")
value = jsonpath_expr.search(root_inputs)
input_params["sum"] = value
except Exception as e:
logger.error(f"Error parsing jsonpath 'sum' for parameter 'sum': {e}")
input_params["sum"] = None
logger.info(f"Starting 'multiply' activity on task queue 'blocks_transformer_multiply_db086f09c9' with inputs: %s", input_params)
try:
# Convert timeouts and intervals from milliseconds to seconds
schedule_to_close_timeout_value_ms = 0
start_to_close_timeout_value_ms = 0
schedule_to_close_timeout = None if schedule_to_close_timeout_value_ms == 0 else datetime.timedelta(seconds=schedule_to_close_timeout_value_ms / 1000.0)
start_to_close_timeout = None if start_to_close_timeout_value_ms == 0 else datetime.timedelta(seconds=start_to_close_timeout_value_ms / 1000.0)
initial_interval_value_ms = 1000
maximum_interval_value_ms = 100000
initial_interval = datetime.timedelta(seconds=initial_interval_value_ms / 1000.0)
maximum_interval = datetime.timedelta(seconds=maximum_interval_value_ms / 1000.0)
maximum_attempts_value = 0
maximum_attempts = None if maximum_attempts_value == 0 else maximum_attempts_value
result = await temporalio.workflow.execute_activity(
"block_main_activity",
input_params,
schedule_to_close_timeout=schedule_to_close_timeout,
start_to_close_timeout=start_to_close_timeout,
task_queue="blocks_transformer_multiply_db086f09c9",
retry_policy=temporalio.common.RetryPolicy(
maximum_attempts=maximum_attempts,
initial_interval=initial_interval,
backoff_coefficient=2,
maximum_interval=maximum_interval
)
)
logger.info(f"Completed 'multiply' activity with result: %s", result)
block_status = "completed"
block_error = None
results[node_id] = result
except Exception as e:
logger.error(f"Activity 'multiply' failed with error: {e}")
result = None
block_status = "failed"
block_error = {
"code": type(e).__name__,
"description": str(e),
"details": {"cause": str(getattr(e, "cause", "No additional details"))}
}
workflow_output["status"] = "failed"
# Collect block output
workflow_output["blocks"].append({
"activity_id": node_id,
"name": block_name,
"status": block_status,
"input": input_params,
"result": result,
"error": block_error
})
task_functions["m3aiq7ixuo6du35h8tr"] = task_m3aiq7ixuo6du35h8tr
async def task_m3aiqkrv4k1y6654ymr():
node_id = "m3aiqkrv4k1y6654ymr"
block_name = "power"
# Prepare inputs
input_params: Dict[str, Any] = {}
try:
jsonpath_expr = jmespath.compile("product")
value = jsonpath_expr.search(root_inputs)
input_params["product"] = value
except Exception as e:
logger.error(f"Error parsing jsonpath 'product' for parameter 'product': {e}")
input_params["product"] = None
logger.info(f"Starting 'power' activity on task queue 'blocks_transformer_power_057645be0c' with inputs: %s", input_params)
try:
# Convert timeouts and intervals from milliseconds to seconds
schedule_to_close_timeout_value_ms = 0
start_to_close_timeout_value_ms = 0
schedule_to_close_timeout = None if schedule_to_close_timeout_value_ms == 0 else datetime.timedelta(seconds=schedule_to_close_timeout_value_ms / 1000.0)
start_to_close_timeout = None if start_to_close_timeout_value_ms == 0 else datetime.timedelta(seconds=start_to_close_timeout_value_ms / 1000.0)
initial_interval_value_ms = 1000
maximum_interval_value_ms = 100000
initial_interval = datetime.timedelta(seconds=initial_interval_value_ms / 1000.0)
maximum_interval = datetime.timedelta(seconds=maximum_interval_value_ms / 1000.0)
maximum_attempts_value = 0
maximum_attempts = None if maximum_attempts_value == 0 else maximum_attempts_value
result = await temporalio.workflow.execute_activity(
"block_main_activity",
input_params,
schedule_to_close_timeout=schedule_to_close_timeout,
start_to_close_timeout=start_to_close_timeout,
task_queue="blocks_transformer_power_057645be0c",
retry_policy=temporalio.common.RetryPolicy(
maximum_attempts=maximum_attempts,
initial_interval=initial_interval,
backoff_coefficient=2,
maximum_interval=maximum_interval
)
)
logger.info(f"Completed 'power' activity with result: %s", result)
block_status = "completed"
block_error = None
results[node_id] = result
except Exception as e:
logger.error(f"Activity 'power' failed with error: {e}")
result = None
block_status = "failed"
block_error = {
"code": type(e).__name__,
"description": str(e),
"details": {"cause": str(getattr(e, "cause", "No additional details"))}
}
workflow_output["status"] = "failed"
# Collect block output
workflow_output["blocks"].append({
"activity_id": node_id,
"name": block_name,
"status": block_status,
"input": input_params,
"result": result,
"error": block_error
})
task_functions["m3aiqkrv4k1y6654ymr"] = task_m3aiqkrv4k1y6654ymr
# Execute tasks according to execution steps
# Execution step 1
tasks = [task_functions[node_id]() for node_id in ['2', 'm3aiq7ixuo6du35h8tr', 'm3aiqkrv4k1y6654ymr']]
results_step = await asyncio.gather(*tasks, return_exceptions=True)
for result in results_step:
if isinstance(result, Exception):
logger.error(f"Task failed with exception: {result}")
workflow_output["status"] = "failed"
# Update workflow status to completed if not failed
if workflow_output["status"] != "failed":
workflow_output["status"] = "completed"
else:
raise ApplicationError("Activity error occurred", type="ActivityError", non_retryable=True)
return workflow_output
except Exception as e:
logger.error(f"Workflow failed with error: {e}")
workflow_output["status"] = "failed"
raise temporalio.exceptions.ApplicationError("Workflow failed",workflow_output,str(e),type="WorkflowError",non_retryable=True) from e

View File

@ -0,0 +1,271 @@
import temporalio.workflow
from typing import Any, Dict, List, Callable, Awaitable
import logging
import asyncio
import json
import datetime
import re
import jmespath
from temporalio.exceptions import ApplicationError
# Configure logging
logging.basicConfig(level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(message)s")
logger = logging.getLogger(__name__)
@temporalio.workflow.defn
class test_repo_test_branch:
@temporalio.workflow.run
async def run(self, root_inputs: Dict[str, Any]) -> Dict[str, Any]:
workflow_info = temporalio.workflow.info()
workflow_output: Dict[str, Any] = {
"workflow_id": workflow_info.workflow_id,
"run_id": workflow_info.run_id,
"name": "test_repo_test_branch",
"status": "in_progress",
"blocks": [],
"root_input": root_inputs
}
try:
# Initialize results
results: Dict[str, Any] = {}
# Define task functions
task_functions: Dict[str, Callable[[], Awaitable[Any]]] = {}
async def task_2():
node_id = "2"
block_name = "addition"
# Prepare inputs
input_params: Dict[str, Any] = {}
try:
jsonpath_expr = jmespath.compile("a")
value = jsonpath_expr.search(root_inputs)
input_params["a"] = value
except Exception as e:
logger.error(f"Error parsing jsonpath 'a' for parameter 'a': {e}")
input_params["a"] = None
try:
jsonpath_expr = jmespath.compile("b")
value = jsonpath_expr.search(root_inputs)
input_params["b"] = value
except Exception as e:
logger.error(f"Error parsing jsonpath 'b' for parameter 'b': {e}")
input_params["b"] = None
logger.info(f"Starting 'addition' activity on task queue 'blocks_transformer_addition_97ec9e0d50' with inputs: %s", input_params)
try:
# Convert timeouts and intervals from milliseconds to seconds
schedule_to_close_timeout_value_ms = 0
start_to_close_timeout_value_ms = 0
schedule_to_close_timeout = None if schedule_to_close_timeout_value_ms == 0 else datetime.timedelta(seconds=schedule_to_close_timeout_value_ms / 1000.0)
start_to_close_timeout = None if start_to_close_timeout_value_ms == 0 else datetime.timedelta(seconds=start_to_close_timeout_value_ms / 1000.0)
initial_interval_value_ms = 1000
maximum_interval_value_ms = 100000
initial_interval = datetime.timedelta(seconds=initial_interval_value_ms / 1000.0)
maximum_interval = datetime.timedelta(seconds=maximum_interval_value_ms / 1000.0)
maximum_attempts_value = 0
maximum_attempts = None if maximum_attempts_value == 0 else maximum_attempts_value
result = await temporalio.workflow.execute_activity(
"block_main_activity",
input_params,
schedule_to_close_timeout=schedule_to_close_timeout,
start_to_close_timeout=start_to_close_timeout,
task_queue="blocks_transformer_addition_97ec9e0d50",
retry_policy=temporalio.common.RetryPolicy(
maximum_attempts=maximum_attempts,
initial_interval=initial_interval,
backoff_coefficient=2,
maximum_interval=maximum_interval
)
)
logger.info(f"Completed 'addition' activity with result: %s", result)
block_status = "completed"
block_error = None
results[node_id] = result
except Exception as e:
logger.error(f"Activity 'addition' failed with error: {e}")
result = None
block_status = "failed"
block_error = {
"code": type(e).__name__,
"description": str(e),
"details": {"cause": str(getattr(e, "cause", "No additional details"))}
}
workflow_output["status"] = "failed"
# Collect block output
workflow_output["blocks"].append({
"activity_id": node_id,
"name": block_name,
"status": block_status,
"input": input_params,
"result": result,
"error": block_error
})
task_functions["2"] = task_2
async def task_m3aiq7ixuo6du35h8tr():
node_id = "m3aiq7ixuo6du35h8tr"
block_name = "multiply"
# Prepare inputs
input_params: Dict[str, Any] = {}
try:
source_data = results.get("2", {})
jsonpath_expr = jmespath.compile("sum")
value = jsonpath_expr.search(source_data)
input_params["sum"] = value
except Exception as e:
logger.error(f"Error parsing jsonpath 'sum' for parameter 'sum' from node '2': {e}")
input_params["sum"] = None
logger.info(f"Starting 'multiply' activity on task queue 'blocks_transformer_multiply_db086f09c9' with inputs: %s", input_params)
try:
# Convert timeouts and intervals from milliseconds to seconds
schedule_to_close_timeout_value_ms = 0
start_to_close_timeout_value_ms = 0
schedule_to_close_timeout = None if schedule_to_close_timeout_value_ms == 0 else datetime.timedelta(seconds=schedule_to_close_timeout_value_ms / 1000.0)
start_to_close_timeout = None if start_to_close_timeout_value_ms == 0 else datetime.timedelta(seconds=start_to_close_timeout_value_ms / 1000.0)
initial_interval_value_ms = 1000
maximum_interval_value_ms = 100000
initial_interval = datetime.timedelta(seconds=initial_interval_value_ms / 1000.0)
maximum_interval = datetime.timedelta(seconds=maximum_interval_value_ms / 1000.0)
maximum_attempts_value = 0
maximum_attempts = None if maximum_attempts_value == 0 else maximum_attempts_value
result = await temporalio.workflow.execute_activity(
"block_main_activity",
input_params,
schedule_to_close_timeout=schedule_to_close_timeout,
start_to_close_timeout=start_to_close_timeout,
task_queue="blocks_transformer_multiply_db086f09c9",
retry_policy=temporalio.common.RetryPolicy(
maximum_attempts=maximum_attempts,
initial_interval=initial_interval,
backoff_coefficient=2,
maximum_interval=maximum_interval
)
)
logger.info(f"Completed 'multiply' activity with result: %s", result)
block_status = "completed"
block_error = None
results[node_id] = result
except Exception as e:
logger.error(f"Activity 'multiply' failed with error: {e}")
result = None
block_status = "failed"
block_error = {
"code": type(e).__name__,
"description": str(e),
"details": {"cause": str(getattr(e, "cause", "No additional details"))}
}
workflow_output["status"] = "failed"
# Collect block output
workflow_output["blocks"].append({
"activity_id": node_id,
"name": block_name,
"status": block_status,
"input": input_params,
"result": result,
"error": block_error
})
task_functions["m3aiq7ixuo6du35h8tr"] = task_m3aiq7ixuo6du35h8tr
async def task_m3aiqkrv4k1y6654ymr():
node_id = "m3aiqkrv4k1y6654ymr"
block_name = "power"
# Prepare inputs
input_params: Dict[str, Any] = {}
try:
source_data = results.get("m3aiq7ixuo6du35h8tr", {})
jsonpath_expr = jmespath.compile("product")
value = jsonpath_expr.search(source_data)
input_params["product"] = value
except Exception as e:
logger.error(f"Error parsing jsonpath 'product' for parameter 'product' from node 'm3aiq7ixuo6du35h8tr': {e}")
input_params["product"] = None
logger.info(f"Starting 'power' activity on task queue 'blocks_transformer_power_057645be0c' with inputs: %s", input_params)
try:
# Convert timeouts and intervals from milliseconds to seconds
schedule_to_close_timeout_value_ms = 0
start_to_close_timeout_value_ms = 0
schedule_to_close_timeout = None if schedule_to_close_timeout_value_ms == 0 else datetime.timedelta(seconds=schedule_to_close_timeout_value_ms / 1000.0)
start_to_close_timeout = None if start_to_close_timeout_value_ms == 0 else datetime.timedelta(seconds=start_to_close_timeout_value_ms / 1000.0)
initial_interval_value_ms = 1000
maximum_interval_value_ms = 100000
initial_interval = datetime.timedelta(seconds=initial_interval_value_ms / 1000.0)
maximum_interval = datetime.timedelta(seconds=maximum_interval_value_ms / 1000.0)
maximum_attempts_value = 0
maximum_attempts = None if maximum_attempts_value == 0 else maximum_attempts_value
result = await temporalio.workflow.execute_activity(
"block_main_activity",
input_params,
schedule_to_close_timeout=schedule_to_close_timeout,
start_to_close_timeout=start_to_close_timeout,
task_queue="blocks_transformer_power_057645be0c",
retry_policy=temporalio.common.RetryPolicy(
maximum_attempts=maximum_attempts,
initial_interval=initial_interval,
backoff_coefficient=2,
maximum_interval=maximum_interval
)
)
logger.info(f"Completed 'power' activity with result: %s", result)
block_status = "completed"
block_error = None
results[node_id] = result
except Exception as e:
logger.error(f"Activity 'power' failed with error: {e}")
result = None
block_status = "failed"
block_error = {
"code": type(e).__name__,
"description": str(e),
"details": {"cause": str(getattr(e, "cause", "No additional details"))}
}
workflow_output["status"] = "failed"
# Collect block output
workflow_output["blocks"].append({
"activity_id": node_id,
"name": block_name,
"status": block_status,
"input": input_params,
"result": result,
"error": block_error
})
task_functions["m3aiqkrv4k1y6654ymr"] = task_m3aiqkrv4k1y6654ymr
# Execute tasks according to execution steps
# Execution step 1
tasks = [task_functions[node_id]() for node_id in ['2']]
results_step = await asyncio.gather(*tasks, return_exceptions=True)
for result in results_step:
if isinstance(result, Exception):
logger.error(f"Task failed with exception: {result}")
workflow_output["status"] = "failed"
# Execution step 2
tasks = [task_functions[node_id]() for node_id in ['m3aiq7ixuo6du35h8tr']]
results_step = await asyncio.gather(*tasks, return_exceptions=True)
for result in results_step:
if isinstance(result, Exception):
logger.error(f"Task failed with exception: {result}")
workflow_output["status"] = "failed"
# Execution step 3
tasks = [task_functions[node_id]() for node_id in ['m3aiqkrv4k1y6654ymr']]
results_step = await asyncio.gather(*tasks, return_exceptions=True)
for result in results_step:
if isinstance(result, Exception):
logger.error(f"Task failed with exception: {result}")
workflow_output["status"] = "failed"
# Update workflow status to completed if not failed
if workflow_output["status"] != "failed":
workflow_output["status"] = "completed"
else:
raise ApplicationError("Activity error occurred", type="ActivityError", non_retryable=True)
return workflow_output
except Exception as e:
logger.error(f"Workflow failed with error: {e}")
workflow_output["status"] = "failed"
raise temporalio.exceptions.ApplicationError("Workflow failed",workflow_output,str(e),type="WorkflowError",non_retryable=True) from e

85
flow_wrapper.py Normal file
View File

@ -0,0 +1,85 @@
import asyncio
import os
import re
import sys
import logging
from temporalio.client import Client
from temporalio.worker import Worker
# Ensure the generated workflow module is in the Python path
sys.path.append(os.path.dirname(os.path.abspath(__file__)))
# Configure logging
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(name)s - %(message)s",
)
logger = logging.getLogger(__name__)
# Retrieve environment variables
REPO_NAME = os.getenv('REPO_NAME')
BRANCH_NAME = os.getenv('BRANCH_NAME')
COMMIT_ID = os.getenv('VERSION')
NAMESPACE = os.getenv('NAMESPACE')
FLOWX_ENGINE_ADDRESS = os.getenv('FLOWX_ENGINE_ADDRESS')
if not BRANCH_NAME or not COMMIT_ID or not BRANCH_NAME or not NAMESPACE or not FLOWX_ENGINE_ADDRESS:
raise ValueError("Environment variables BRANCH_NAME, VERSION, BRANCH_NAME, NAMESPACE and FLOWX_ENGINE_ADDRESS must be set.")
# Shorten the commit ID to the first 10 characters
COMMIT_ID_SHORT = COMMIT_ID[:10]
# Sanitize flow name and commit ID to create a valid task queue name
def sanitize_name(name):
# Replace non-alphanumeric characters or invalid start with underscores
sanitized = re.sub(r'\W|^(?=\d)', '_', name)
# Replace multiple consecutive underscores with a single underscore
sanitized = re.sub(r'_+', '_', sanitized)
# Remove trailing underscores
return sanitized.strip('_')
FLOW_NAME = REPO_NAME + "_" + BRANCH_NAME
flow_name_safe = sanitize_name(FLOW_NAME)
commit_id_safe = sanitize_name(COMMIT_ID_SHORT)
# Construct the task queue name
# TASK_QUEUE = f"{flow_name_safe}_{commit_id_safe}"
TASK_QUEUE = flow_name_safe
# Import the default workflow module
workflow_module_name = "workflow" # Hardcoded to the default module name 'workflow.py'
try:
workflow_module = __import__(workflow_module_name)
except ImportError as e:
raise ImportError(f"Failed to import workflow module '{workflow_module_name}': {e}")
# Get the workflow class
# Assuming the workflow class is named as <FlowName>Workflow, e.g., HybridWorkflow
# workflow_class_name = f"{flow_name_safe}_{commit_id_safe}"
workflow_class_name = flow_name_safe
workflow_class = getattr(workflow_module, workflow_class_name, None)
if not workflow_class:
raise AttributeError(f"Workflow class '{workflow_class_name}' not found in module '{workflow_module_name}'.")
async def main():
"""
Initialize and run the worker with the activity.
"""
try:
client = await Client.connect(FLOWX_ENGINE_ADDRESS, namespace=NAMESPACE)
# No activities are registered since they are in separate containers
worker = Worker(
client,
task_queue=TASK_QUEUE,
workflows=[workflow_class],
)
logger.info("Worker starting for %s, listening to task queue: %s", workflow_class_name, TASK_QUEUE)
await worker.run()
except Exception as e:
logger.critical("Worker failed to start: %s", e)
raise
if __name__ == "__main__":
asyncio.run(main())

161
generator.py Normal file
View File

@ -0,0 +1,161 @@
import json
import os
import sys
from jinja2 import Environment, FileSystemLoader
import networkx as nx
import re # Import the re module for regular expressions
import argparse # Import the argparse module for parsing arguments
# Define paths for templates and output
TEMPLATE_DIR = 'templates'
# Load Jinja environment
env = Environment(loader=FileSystemLoader(TEMPLATE_DIR),
extensions=["jinja2.ext.do"])
# Add regex_replace filter
def regex_replace(s, pattern, repl):
return re.sub(pattern, repl, s)
env.filters['regex_replace'] = regex_replace
# Retrieve environment variables
REPO_NAME = os.getenv('REPO_NAME')
BRANCH_NAME = os.getenv('BRANCH_NAME')
COMMIT_ID = os.getenv('VERSION')
NAMESPACE = os.getenv('NAMESPACE')
if not BRANCH_NAME or not COMMIT_ID or not BRANCH_NAME or not NAMESPACE:
raise ValueError("Environment variables BRANCH_NAME, VERSION, BRANCH_NAME, NAMESPACE must be set.")
# Shorten the commit ID to the first 10 characters
COMMIT_ID_SHORT = COMMIT_ID[:10]
# Sanitize flow name and commit ID to create a valid task queue name
def sanitize_name(name):
# Replace non-alphanumeric characters or invalid start with underscores
sanitized = re.sub(r'\W|^(?=\d)', '_', name)
# Replace multiple consecutive underscores with a single underscore
sanitized = re.sub(r'_+', '_', sanitized)
# Remove trailing underscores
return sanitized.strip('_')
FLOW_NAME = REPO_NAME + "_" + BRANCH_NAME
flow_name_safe = sanitize_name(FLOW_NAME)
commit_id_safe = sanitize_name(COMMIT_ID_SHORT)
# workflow_class_name = f"{flow_name_safe}_{commit_id_safe}"
workflow_class_name = flow_name_safe
def load_flow_definition(file_path):
"""Load the flow definition from a JSON file."""
try:
with open(file_path, "r") as f:
return json.load(f)
except json.JSONDecodeError as e:
print(f"Error loading JSON file '{file_path}': {e}")
sys.exit(1)
def determine_flow_type(flow_definition):
"""Determine the flow type based on the edges."""
nodes = {node["id"]: node for node in flow_definition["nodes"]}
edges = flow_definition["edges"]
has_parallel = False
for node_id in nodes:
outgoing_edges = [e for e in edges if e["source"] == node_id]
if len(outgoing_edges) > 1:
has_parallel = True
break
if has_parallel:
return "hybrid"
elif len(edges) > 0:
return "sequential"
else:
return "parallel"
def collect_root_input_keys(flow_definition):
"""Collect all unique root input keys from the flow definition."""
root_input_keys = set()
for node in flow_definition.get("nodes", []): # Safeguard for missing nodes
properties = node.get("data", {}).get("nodeConfig", {}).get("schema", {}).get("properties", {})
for key, value in properties.items():
source = value.get("source")
if isinstance(source, str) and source.startswith("$root."):
root_input_keys.add(source[6:]) # Adjusted to capture full path after $root.
return list(root_input_keys)
def build_execution_graph(flow_definition):
"""Builds an execution graph from the flow definition using networkx."""
G = nx.DiGraph()
nodes = {node["id"]: node for node in flow_definition["nodes"]}
edges = flow_definition["edges"]
# Add nodes
for node_id, node in nodes.items():
G.add_node(node_id, node=node)
# Add edges
for edge in edges:
G.add_edge(edge["source"], edge["target"])
return G
def get_execution_steps(G):
"""Returns a list of execution steps, each containing nodes that can be run in parallel."""
try:
levels = list(nx.topological_generations(G))
execution_steps = [list(level) for level in levels]
return execution_steps
except nx.NetworkXUnfeasible:
print("Error: Workflow graph has cycles.")
sys.exit(1)
def generate_workflow(flow_definition, output_file):
"""Generate the workflow code using the Jinja template."""
template = env.get_template('workflow_template.py.j2')
flow_type = determine_flow_type(flow_definition)
root_input_keys = collect_root_input_keys(flow_definition)
# Filter out requestNode from nodes
filtered_nodes = {node["id"]: node for node in flow_definition["nodes"] if node["type"] != "requestNode"}
# Filter edges to exclude connections to or from filtered nodes
filtered_edges = [
edge for edge in flow_definition["edges"]
if edge["source"] in filtered_nodes and edge["target"] in filtered_nodes
]
# Build execution graph and steps
filtered_flow_definition = {
"nodes": list(filtered_nodes.values()),
"edges": filtered_edges,
}
G = build_execution_graph(filtered_flow_definition)
execution_steps = get_execution_steps(G)
# Render the workflow template
workflow_code = template.render(
workflow_class_name=workflow_class_name,
flow_type=flow_type,
root_input_keys=root_input_keys,
execution_steps=execution_steps,
nodes=filtered_nodes
)
with open(output_file, "w") as f:
f.write(workflow_code)
print(f"Generated workflow: {output_file}")
if __name__ == "__main__":
# Parse command-line arguments
parser = argparse.ArgumentParser(description="Generate Temporal workflow from JSON flow definition.")
parser.add_argument("--input-file", type=str, required=True,
help="Path to the flow definition JSON file.")
parser.add_argument("--output-file", type=str, required=True,
help="Path to the generated workflow output file.")
args = parser.parse_args()
# Load the flow definition and generate the workflow
flow_file = args.input_file
output_file = args.output_file
flow_def = load_flow_definition(flow_file)
generate_workflow(flow_def, output_file)

5
requirements.txt Normal file
View File

@ -0,0 +1,5 @@
temporalio==1.6.0
jinja2==3.1.4
pytest==8.3.3
networkx==3.4.2
jmespath==1.0.1

View File

@ -0,0 +1,222 @@
{
"nodes": [
{
"id": "2",
"position": {
"x": 0,
"y": 0
},
"data": {
"nodeConfig": {
"activityConfig": {
"retryPolicy": {
"maximumAttempts": 0,
"initialInterval": 1000,
"backoffCoefficient": 2,
"maximumInterval": 100000
},
"timeouts": {
"startToCloseTimeout": 0,
"scheduleToStartTimeout": 0,
"scheduleToCloseTimeout": 0,
"heartbeatTimeout": 0
},
"taskQueue": {
"taskQueueName": ""
},
"advancedSettings": {
"cancellationType": "TRY_CANCEL",
"heartbeatEnabled": false
}
},
"blockName": "addition",
"commitId": "97ec9e0d50ca7308347b28f8c006a475357eb096",
"repo_url": "http://centurion-version-control.default.svc.cluster.local:3000/Centurion/blocks_transformer.git",
"schema": {
"$schema": "http://json-schema.org/draft-07/schema#",
"type": "object",
"properties": {
"a": {
"type": "integer",
"description": "First number to add.",
"source": "$root.a"
},
"b": {
"type": "integer",
"description": "Second number to add.",
"source": "$root.b"
}
},
"required": [
"a",
"b"
]
}
},
"outputSchema": {
"$schema": "http://json-schema.org/draft-07/schema#",
"type": "object",
"properties": {
"sum": {
"type": "integer",
"description": "Sum of the two numbers."
}
}
}
},
"type": "transformerNode",
"positionAbsolute": {
"x": 0,
"y": 0
},
"width": 260,
"height": 79
},
{
"id": "m3aiq7ixuo6du35h8tr",
"position": {
"x": 0,
"y": 150
},
"data": {
"nodeConfig": {
"activityConfig": {
"retryPolicy": {
"maximumAttempts": 0,
"initialInterval": 1000,
"backoffCoefficient": 2,
"maximumInterval": 100000
},
"timeouts": {
"startToCloseTimeout": 0,
"scheduleToStartTimeout": 0,
"scheduleToCloseTimeout": 0,
"heartbeatTimeout": 0
},
"taskQueue": {
"taskQueueName": ""
},
"advancedSettings": {
"cancellationType": "TRY_CANCEL",
"heartbeatEnabled": false
}
},
"blockName": "multiply",
"commitId": "db086f09c9df60f622c9be47734422454f5e181f",
"repo_url": "http://centurion-version-control.default.svc.cluster.local:3000/Centurion/blocks_transformer.git",
"schema": {
"$schema": "http://json-schema.org/draft-07/schema#",
"type": "object",
"properties": {
"sum": {
"type": "integer",
"description": "Sum from the previous block.",
"source": "$2.sum"
}
},
"required": [
"sum"
]
}
},
"outputSchema": {
"$schema": "http://json-schema.org/draft-07/schema#",
"type": "object",
"properties": {
"product": {
"type": "integer",
"description": "Product of the sum and the multiplier."
}
}
}
},
"type": "transformerNode",
"positionAbsolute": {
"x": 0,
"y": 150
},
"width": 260,
"height": 79
},
{
"id": "m3aiqkrv4k1y6654ymr",
"position": {
"x": 0,
"y": 300
},
"data": {
"nodeConfig": {
"activityConfig": {
"retryPolicy": {
"maximumAttempts": 0,
"initialInterval": 1000,
"backoffCoefficient": 2,
"maximumInterval": 100000
},
"timeouts": {
"startToCloseTimeout": 0,
"scheduleToStartTimeout": 0,
"scheduleToCloseTimeout": 0,
"heartbeatTimeout": 0
},
"taskQueue": {
"taskQueueName": ""
},
"advancedSettings": {
"cancellationType": "TRY_CANCEL",
"heartbeatEnabled": false
}
},
"blockName": "power",
"commitId": "057645be0cace46e3ea08d65f26fbc6dfd9348bd",
"repo_url": "http://centurion-version-control.default.svc.cluster.local:3000/Centurion/blocks_transformer.git",
"schema": {
"$schema": "http://json-schema.org/draft-07/schema#",
"type": "object",
"properties": {
"product": {
"type": "integer",
"description": "Product from the previous block.",
"source": "$2.sum"
}
},
"required": [
"product"
]
}
},
"outputSchema": {
"$schema": "http://json-schema.org/draft-07/schema#",
"type": "object",
"properties": {
"power": {
"type": "integer",
"description": "Result of raising the product to the power."
}
}
}
},
"type": "transformerNode",
"positionAbsolute": {
"x": 0,
"y": 300
},
"width": 260,
"height": 79
}
],
"edges": [
{
"id": "2=>m3aiq7ixuo6du35h8tr",
"source": "2",
"target": "m3aiq7ixuo6du35h8tr",
"type": "workflow"
},
{
"id": "2=>m3aiqkrv4k1y6654ymr",
"source": "2",
"target": "m3aiqkrv4k1y6654ymr",
"type": "workflow"
}
]
}

View File

@ -0,0 +1,209 @@
{
"nodes": [
{
"id": "2",
"position": {
"x": 0,
"y": 0
},
"data": {
"nodeConfig": {
"activityConfig": {
"retryPolicy": {
"maximumAttempts": 0,
"initialInterval": 1000,
"backoffCoefficient": 2,
"maximumInterval": 100000
},
"timeouts": {
"startToCloseTimeout": 0,
"scheduleToStartTimeout": 0,
"scheduleToCloseTimeout": 0,
"heartbeatTimeout": 0
},
"taskQueue": {
"taskQueueName": ""
},
"advancedSettings": {
"cancellationType": "TRY_CANCEL",
"heartbeatEnabled": false
}
},
"blockName": "addition",
"commitId": "97ec9e0d50ca7308347b28f8c006a475357eb096",
"repo_url": "http://centurion-version-control.default.svc.cluster.local:3000/Centurion/blocks_transformer.git",
"schema": {
"$schema": "http://json-schema.org/draft-07/schema#",
"type": "object",
"properties": {
"a": {
"type": "integer",
"description": "First number to add.",
"source": "$root.a"
},
"b": {
"type": "integer",
"description": "Second number to add.",
"source": "$root.b"
}
},
"required": [
"a",
"b"
]
}
},
"outputSchema": {
"$schema": "http://json-schema.org/draft-07/schema#",
"type": "object",
"properties": {
"sum": {
"type": "integer",
"description": "Sum of the two numbers."
}
}
}
},
"type": "transformerNode",
"positionAbsolute": {
"x": 0,
"y": 0
},
"width": 260,
"height": 79
},
{
"id": "m3aiq7ixuo6du35h8tr",
"position": {
"x": 0,
"y": 150
},
"data": {
"nodeConfig": {
"activityConfig": {
"retryPolicy": {
"maximumAttempts": 0,
"initialInterval": 1000,
"backoffCoefficient": 2,
"maximumInterval": 100000
},
"timeouts": {
"startToCloseTimeout": 0,
"scheduleToStartTimeout": 0,
"scheduleToCloseTimeout": 0,
"heartbeatTimeout": 0
},
"taskQueue": {
"taskQueueName": ""
},
"advancedSettings": {
"cancellationType": "TRY_CANCEL",
"heartbeatEnabled": false
}
},
"blockName": "multiply",
"commitId": "db086f09c9df60f622c9be47734422454f5e181f",
"repo_url": "http://centurion-version-control.default.svc.cluster.local:3000/Centurion/blocks_transformer.git",
"schema": {
"$schema": "http://json-schema.org/draft-07/schema#",
"type": "object",
"properties": {
"sum": {
"type": "integer",
"description": "Sum from the previous block.",
"source": "$root.sum"
}
},
"required": [
"sum"
]
}
},
"outputSchema": {
"$schema": "http://json-schema.org/draft-07/schema#",
"type": "object",
"properties": {
"product": {
"type": "integer",
"description": "Product of the sum and the multiplier."
}
}
}
},
"type": "transformerNode",
"positionAbsolute": {
"x": 0,
"y": 150
},
"width": 260,
"height": 79
},
{
"id": "m3aiqkrv4k1y6654ymr",
"position": {
"x": 0,
"y": 300
},
"data": {
"nodeConfig": {
"activityConfig": {
"retryPolicy": {
"maximumAttempts": 0,
"initialInterval": 1000,
"backoffCoefficient": 2,
"maximumInterval": 100000
},
"timeouts": {
"startToCloseTimeout": 0,
"scheduleToStartTimeout": 0,
"scheduleToCloseTimeout": 0,
"heartbeatTimeout": 0
},
"taskQueue": {
"taskQueueName": ""
},
"advancedSettings": {
"cancellationType": "TRY_CANCEL",
"heartbeatEnabled": false
}
},
"blockName": "power",
"commitId": "057645be0cace46e3ea08d65f26fbc6dfd9348bd",
"repo_url": "http://centurion-version-control.default.svc.cluster.local:3000/Centurion/blocks_transformer.git",
"schema": {
"$schema": "http://json-schema.org/draft-07/schema#",
"type": "object",
"properties": {
"product": {
"type": "integer",
"description": "Product from the previous block.",
"source": "$root.product"
}
},
"required": [
"product"
]
}
},
"outputSchema": {
"$schema": "http://json-schema.org/draft-07/schema#",
"type": "object",
"properties": {
"power": {
"type": "integer",
"description": "Result of raising the product to the power."
}
}
}
},
"type": "transformerNode",
"positionAbsolute": {
"x": 0,
"y": 300
},
"width": 260,
"height": 79
}
],
"edges": []
}

View File

@ -0,0 +1,222 @@
{
"nodes": [
{
"id": "2",
"position": {
"x": 0,
"y": 0
},
"data": {
"nodeConfig": {
"activityConfig": {
"retryPolicy": {
"maximumAttempts": 0,
"initialInterval": 1000,
"backoffCoefficient": 2,
"maximumInterval": 100000
},
"timeouts": {
"startToCloseTimeout": 0,
"scheduleToStartTimeout": 0,
"scheduleToCloseTimeout": 0,
"heartbeatTimeout": 0
},
"taskQueue": {
"taskQueueName": ""
},
"advancedSettings": {
"cancellationType": "TRY_CANCEL",
"heartbeatEnabled": false
}
},
"blockName": "addition",
"commitId": "97ec9e0d50ca7308347b28f8c006a475357eb096",
"repo_url": "http://centurion-version-control.default.svc.cluster.local:3000/Centurion/blocks_transformer.git",
"schema": {
"$schema": "http://json-schema.org/draft-07/schema#",
"type": "object",
"properties": {
"a": {
"type": "integer",
"description": "First number to add.",
"source": "$root.a"
},
"b": {
"type": "integer",
"description": "Second number to add.",
"source": "$root.b"
}
},
"required": [
"a",
"b"
]
}
},
"outputSchema": {
"$schema": "http://json-schema.org/draft-07/schema#",
"type": "object",
"properties": {
"sum": {
"type": "integer",
"description": "Sum of the two numbers."
}
}
}
},
"type": "transformerNode",
"positionAbsolute": {
"x": 0,
"y": 0
},
"width": 260,
"height": 79
},
{
"id": "m3aiq7ixuo6du35h8tr",
"position": {
"x": 0,
"y": 150
},
"data": {
"nodeConfig": {
"activityConfig": {
"retryPolicy": {
"maximumAttempts": 0,
"initialInterval": 1000,
"backoffCoefficient": 2,
"maximumInterval": 100000
},
"timeouts": {
"startToCloseTimeout": 0,
"scheduleToStartTimeout": 0,
"scheduleToCloseTimeout": 0,
"heartbeatTimeout": 0
},
"taskQueue": {
"taskQueueName": ""
},
"advancedSettings": {
"cancellationType": "TRY_CANCEL",
"heartbeatEnabled": false
}
},
"blockName": "multiply",
"commitId": "db086f09c9df60f622c9be47734422454f5e181f",
"repo_url": "http://centurion-version-control.default.svc.cluster.local:3000/Centurion/blocks_transformer.git",
"schema": {
"$schema": "http://json-schema.org/draft-07/schema#",
"type": "object",
"properties": {
"sum": {
"type": "integer",
"description": "Sum from the previous block.",
"source": "$2.sum"
}
},
"required": [
"sum"
]
}
},
"outputSchema": {
"$schema": "http://json-schema.org/draft-07/schema#",
"type": "object",
"properties": {
"product": {
"type": "integer",
"description": "Product of the sum and the multiplier."
}
}
}
},
"type": "transformerNode",
"positionAbsolute": {
"x": 0,
"y": 150
},
"width": 260,
"height": 79
},
{
"id": "m3aiqkrv4k1y6654ymr",
"position": {
"x": 0,
"y": 300
},
"data": {
"nodeConfig": {
"activityConfig": {
"retryPolicy": {
"maximumAttempts": 0,
"initialInterval": 1000,
"backoffCoefficient": 2,
"maximumInterval": 100000
},
"timeouts": {
"startToCloseTimeout": 0,
"scheduleToStartTimeout": 0,
"scheduleToCloseTimeout": 0,
"heartbeatTimeout": 0
},
"taskQueue": {
"taskQueueName": ""
},
"advancedSettings": {
"cancellationType": "TRY_CANCEL",
"heartbeatEnabled": false
}
},
"blockName": "power",
"commitId": "057645be0cace46e3ea08d65f26fbc6dfd9348bd",
"repo_url": "http://centurion-version-control.default.svc.cluster.local:3000/Centurion/blocks_transformer.git",
"schema": {
"$schema": "http://json-schema.org/draft-07/schema#",
"type": "object",
"properties": {
"product": {
"type": "integer",
"description": "Product from the previous block.",
"source": "$m3aiq7ixuo6du35h8tr.product"
}
},
"required": [
"product"
]
}
},
"outputSchema": {
"$schema": "http://json-schema.org/draft-07/schema#",
"type": "object",
"properties": {
"power": {
"type": "integer",
"description": "Result of raising the product to the power."
}
}
}
},
"type": "transformerNode",
"positionAbsolute": {
"x": 0,
"y": 300
},
"width": 260,
"height": 79
}
],
"edges": [
{
"id": "2=>m3aiq7ixuo6du35h8tr",
"source": "2",
"target": "m3aiq7ixuo6du35h8tr",
"type": "workflow"
},
{
"id": "m3aiq7ixuo6du35h8tr=>m3aiqkrv4k1y6654ymr",
"source": "m3aiq7ixuo6du35h8tr",
"target": "m3aiqkrv4k1y6654ymr",
"type": "workflow"
}
]
}

View File

@ -0,0 +1,153 @@
import temporalio.workflow
from typing import Any, Dict, List, Callable, Awaitable
import logging
import asyncio
import json
import datetime
import re
import jmespath
from temporalio.exceptions import ApplicationError
# Configure logging
logging.basicConfig(level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(message)s")
logger = logging.getLogger(__name__)
@temporalio.workflow.defn
class {{ workflow_class_name }}:
@temporalio.workflow.run
async def run(self, root_inputs: Dict[str, Any]) -> Dict[str, Any]:
workflow_info = temporalio.workflow.info()
workflow_output: Dict[str, Any] = {
"workflow_id": workflow_info.workflow_id,
"run_id": workflow_info.run_id,
"name": "{{ workflow_class_name }}",
"status": "in_progress",
"blocks": [],
"root_input": root_inputs
}
try:
# Initialize results
results: Dict[str, Any] = {}
# Define task functions
task_functions: Dict[str, Callable[[], Awaitable[Any]]] = {}
{%- for node_id, node in nodes.items() %}
{%- set block_name = node['data']['nodeConfig']['blockName'] %}
{%- set commit_id = node['data']['nodeConfig'].get('commitId', '') %}
{%- set commit_id_short = commit_id[:10] %}
{%- set repo_url = node['data']['nodeConfig']['repo_url'] %}
{%- set repo_name_1 = repo_url.split('/')[-1].split('.')[0] %}
{%- set repo_name = repo_name_1|regex_replace('[^A-Za-z0-9_]', '_') %}
{%- set block_name_safe = block_name|regex_replace('[^A-Za-z0-9_]', '_') %}
{%- set commit_id_short_safe = commit_id_short|regex_replace('[^A-Za-z0-9_]', '_') %}
{%- set task_queue_name = repo_name + '_' + block_name_safe + '_' + commit_id_short_safe %}
async def task_{{ node_id }}():
node_id = "{{ node_id }}"
block_name = "{{ block_name }}"
# Prepare inputs
input_params: Dict[str, Any] = {}
{%- for param, details in node['data']['nodeConfig']['schema']['properties'].items() %}
{%- set source = details.get("source", "") %}
{%- if source and source.startswith("$root.") %}
try:
jsonpath_expr = jmespath.compile("{{ source[6:] }}")
value = jsonpath_expr.search(root_inputs)
input_params["{{ param }}"] = value
except Exception as e:
logger.error(f"Error parsing jsonpath '{{ source[6:] }}' for parameter '{{ param }}': {e}")
input_params["{{ param }}"] = None
{%- elif source and source.startswith("$") %}
{%- set source_parts = source[1:].split('.', 1) %}
{%- set source_node_id = source_parts[0] %}
{%- set source_path = source_parts[1] if source_parts|length > 1 else '' %}
try:
source_data = results.get("{{ source_node_id }}", {})
jsonpath_expr = jmespath.compile("{{ source_path }}")
value = jsonpath_expr.search(source_data)
input_params["{{ param }}"] = value
except Exception as e:
logger.error(f"Error parsing jsonpath '{{ source_path }}' for parameter '{{ param }}' from node '{{ source_node_id }}': {e}")
input_params["{{ param }}"] = None
{%- else %}
input_params["{{ param }}"] = None
{%- endif %}
{%- endfor %}
logger.info(f"Starting '{{ block_name }}' activity on task queue '{{ task_queue_name }}' with inputs: %s", input_params)
try:
# Convert timeouts and intervals from milliseconds to seconds
schedule_to_close_timeout_value_ms = {{ node['data']['nodeConfig']['activityConfig']['timeouts']['scheduleToCloseTimeout'] }}
start_to_close_timeout_value_ms = {{ node['data']['nodeConfig']['activityConfig']['timeouts']['startToCloseTimeout'] }}
schedule_to_close_timeout = None if schedule_to_close_timeout_value_ms == 0 else datetime.timedelta(seconds=schedule_to_close_timeout_value_ms / 1000.0)
start_to_close_timeout = None if start_to_close_timeout_value_ms == 0 else datetime.timedelta(seconds=start_to_close_timeout_value_ms / 1000.0)
initial_interval_value_ms = {{ node['data']['nodeConfig']['activityConfig']['retryPolicy']['initialInterval'] }}
maximum_interval_value_ms = {{ node['data']['nodeConfig']['activityConfig']['retryPolicy']['maximumInterval'] }}
initial_interval = datetime.timedelta(seconds=initial_interval_value_ms / 1000.0)
maximum_interval = datetime.timedelta(seconds=maximum_interval_value_ms / 1000.0)
maximum_attempts_value = {{ node['data']['nodeConfig']['activityConfig']['retryPolicy']['maximumAttempts'] }}
maximum_attempts = None if maximum_attempts_value == 0 else maximum_attempts_value
result = await temporalio.workflow.execute_activity(
"block_main_activity",
input_params,
schedule_to_close_timeout=schedule_to_close_timeout,
start_to_close_timeout=start_to_close_timeout,
task_queue="{{ task_queue_name }}",
retry_policy=temporalio.common.RetryPolicy(
maximum_attempts=maximum_attempts,
initial_interval=initial_interval,
backoff_coefficient={{ node['data']['nodeConfig']['activityConfig']['retryPolicy']['backoffCoefficient'] }},
maximum_interval=maximum_interval
)
)
logger.info(f"Completed '{{ block_name }}' activity with result: %s", result)
block_status = "completed"
block_error = None
results[node_id] = result
except Exception as e:
logger.error(f"Activity '{{ block_name }}' failed with error: {e}")
result = None
block_status = "failed"
block_error = {
"code": type(e).__name__,
"description": str(e),
"details": {"cause": str(getattr(e, "cause", "No additional details"))}
}
workflow_output["status"] = "failed"
# Collect block output
workflow_output["blocks"].append({
"activity_id": node_id,
"name": block_name,
"status": block_status,
"input": input_params,
"result": result,
"error": block_error
})
task_functions["{{ node_id }}"] = task_{{ node_id }}
{%- endfor %}
# Execute tasks according to execution steps
{%- for step in execution_steps %}
# Execution step {{ loop.index }}
tasks = [task_functions[node_id]() for node_id in {{ step }}]
results_step = await asyncio.gather(*tasks, return_exceptions=True)
for result in results_step:
if isinstance(result, Exception):
logger.error(f"Task failed with exception: {result}")
workflow_output["status"] = "failed"
{%- endfor %}
# Update workflow status to completed if not failed
if workflow_output["status"] != "failed":
workflow_output["status"] = "completed"
else:
raise ApplicationError("Activity error occurred", type="ActivityError", non_retryable=True)
return workflow_output
except Exception as e:
logger.error(f"Workflow failed with error: {e}")
workflow_output["status"] = "failed"
raise temporalio.exceptions.ApplicationError("Workflow failed",workflow_output,str(e),type="WorkflowError",non_retryable=True) from e