diff --git a/.gitea/workflows/cd_workflows.yml b/.gitea/workflows/cd_workflows.yml new file mode 100644 index 0000000..0967ef4 --- /dev/null +++ b/.gitea/workflows/cd_workflows.yml @@ -0,0 +1 @@ +{} diff --git a/.gitea/workflows/ci_workflows.yml b/.gitea/workflows/ci_workflows.yml new file mode 100644 index 0000000..ef8e015 --- /dev/null +++ b/.gitea/workflows/ci_workflows.yml @@ -0,0 +1,62 @@ +name: CI Workflow + +on: + push: + branches: + - '*' + +jobs: + test: + name: Testing the Flow + runs-on: ubuntu-latest + steps: + - name: Checkout Code + uses: actions/checkout@v4 + + - name: Set Up Python Environment + uses: actions/setup-python@v4 + with: + python-version: '3.10' + + - name: Install Python Dependencies + run: | + python -m pip install --upgrade pip + pip install -r requirements.txt + + - name: Execute Unit Tests + run: python -m unittest discover -s . -p 'test_*.py' + + build_and_push: + name: Containerize the Flow + runs-on: ubuntu-latest + needs: test + steps: + - name: Extract Repository Name + id: extract_repo + run: echo "repo_name=${GITHUB_REPOSITORY##*/}" >> $GITHUB_OUTPUT + + - name: Checkout Codebase + uses: actions/checkout@v4 + with: + fetch-depth: 1 + + - name: Configure Docker Buildx + uses: docker/setup-buildx-action@v3 + with: + driver: docker + + - name: Log In to Container Registry + uses: docker/login-action@v2 + with: + registry: centurion-version-control.default.svc.cluster.local:3000 + username: ${{ secrets.CI_USER }} + password: ${{ secrets.CI_USER_TOKEN }} + + - name: Build and Push Docker Image to Registry + uses: docker/build-push-action@v4 + with: + context: . + push: true + tags: | + centurion-version-control.default.svc.cluster.local:3000/centurion/${{ steps.extract_repo.outputs.repo_name }}/${{ github.ref_name }}:${{ github.sha }} + centurion-version-control.default.svc.cluster.local:3000/centurion/${{ steps.extract_repo.outputs.repo_name }}/${{ github.ref_name }}:latest diff --git a/Dockerfile b/Dockerfile new file mode 100644 index 0000000..dc7c238 --- /dev/null +++ b/Dockerfile @@ -0,0 +1,17 @@ +# Use Python slim image as base +FROM python:3.10-slim AS base + +# Set up a directory for the application code +WORKDIR /app + +# Copy only the requirements file initially for better caching +COPY requirements.txt . + +# Install Workflow SDK and other dependencies +RUN pip install --no-cache-dir -r requirements.txt + +# Copy the rest of the application code +COPY . . + +# No entrypoint as this is a preparation image +ENTRYPOINT ["python", "/app/flow_wrapper.py"] diff --git a/expected_workflows/flow_hybrid_expected.py b/expected_workflows/flow_hybrid_expected.py new file mode 100644 index 0000000..8a662f3 --- /dev/null +++ b/expected_workflows/flow_hybrid_expected.py @@ -0,0 +1,264 @@ +import temporalio.workflow +from typing import Any, Dict, List, Callable, Awaitable +import logging +import asyncio +import json +import datetime +import re +import jmespath +from temporalio.exceptions import ApplicationError + +# Configure logging +logging.basicConfig(level=logging.INFO, + format="%(asctime)s [%(levelname)s] %(message)s") +logger = logging.getLogger(__name__) + +@temporalio.workflow.defn +class test_repo_test_branch: + @temporalio.workflow.run + async def run(self, root_inputs: Dict[str, Any]) -> Dict[str, Any]: + workflow_info = temporalio.workflow.info() + workflow_output: Dict[str, Any] = { + "workflow_id": workflow_info.workflow_id, + "run_id": workflow_info.run_id, + "name": "test_repo_test_branch", + "status": "in_progress", + "blocks": [], + "root_input": root_inputs + } + try: + # Initialize results + results: Dict[str, Any] = {} + + # Define task functions + task_functions: Dict[str, Callable[[], Awaitable[Any]]] = {} + async def task_2(): + node_id = "2" + block_name = "addition" + # Prepare inputs + input_params: Dict[str, Any] = {} + try: + jsonpath_expr = jmespath.compile("a") + value = jsonpath_expr.search(root_inputs) + input_params["a"] = value + except Exception as e: + logger.error(f"Error parsing jsonpath 'a' for parameter 'a': {e}") + input_params["a"] = None + try: + jsonpath_expr = jmespath.compile("b") + value = jsonpath_expr.search(root_inputs) + input_params["b"] = value + except Exception as e: + logger.error(f"Error parsing jsonpath 'b' for parameter 'b': {e}") + input_params["b"] = None + logger.info(f"Starting 'addition' activity on task queue 'blocks_transformer_addition_97ec9e0d50' with inputs: %s", input_params) + try: + # Convert timeouts and intervals from milliseconds to seconds + schedule_to_close_timeout_value_ms = 0 + start_to_close_timeout_value_ms = 0 + schedule_to_close_timeout = None if schedule_to_close_timeout_value_ms == 0 else datetime.timedelta(seconds=schedule_to_close_timeout_value_ms / 1000.0) + start_to_close_timeout = None if start_to_close_timeout_value_ms == 0 else datetime.timedelta(seconds=start_to_close_timeout_value_ms / 1000.0) + initial_interval_value_ms = 1000 + maximum_interval_value_ms = 100000 + initial_interval = datetime.timedelta(seconds=initial_interval_value_ms / 1000.0) + maximum_interval = datetime.timedelta(seconds=maximum_interval_value_ms / 1000.0) + maximum_attempts_value = 0 + maximum_attempts = None if maximum_attempts_value == 0 else maximum_attempts_value + + result = await temporalio.workflow.execute_activity( + "block_main_activity", + input_params, + schedule_to_close_timeout=schedule_to_close_timeout, + start_to_close_timeout=start_to_close_timeout, + task_queue="blocks_transformer_addition_97ec9e0d50", + retry_policy=temporalio.common.RetryPolicy( + maximum_attempts=maximum_attempts, + initial_interval=initial_interval, + backoff_coefficient=2, + maximum_interval=maximum_interval + ) + ) + logger.info(f"Completed 'addition' activity with result: %s", result) + block_status = "completed" + block_error = None + results[node_id] = result + except Exception as e: + logger.error(f"Activity 'addition' failed with error: {e}") + result = None + block_status = "failed" + block_error = { + "code": type(e).__name__, + "description": str(e), + "details": {"cause": str(getattr(e, "cause", "No additional details"))} + } + workflow_output["status"] = "failed" + # Collect block output + workflow_output["blocks"].append({ + "activity_id": node_id, + "name": block_name, + "status": block_status, + "input": input_params, + "result": result, + "error": block_error + }) + + task_functions["2"] = task_2 + async def task_m3aiq7ixuo6du35h8tr(): + node_id = "m3aiq7ixuo6du35h8tr" + block_name = "multiply" + # Prepare inputs + input_params: Dict[str, Any] = {} + try: + source_data = results.get("2", {}) + jsonpath_expr = jmespath.compile("sum") + value = jsonpath_expr.search(source_data) + input_params["sum"] = value + except Exception as e: + logger.error(f"Error parsing jsonpath 'sum' for parameter 'sum' from node '2': {e}") + input_params["sum"] = None + logger.info(f"Starting 'multiply' activity on task queue 'blocks_transformer_multiply_db086f09c9' with inputs: %s", input_params) + try: + # Convert timeouts and intervals from milliseconds to seconds + schedule_to_close_timeout_value_ms = 0 + start_to_close_timeout_value_ms = 0 + schedule_to_close_timeout = None if schedule_to_close_timeout_value_ms == 0 else datetime.timedelta(seconds=schedule_to_close_timeout_value_ms / 1000.0) + start_to_close_timeout = None if start_to_close_timeout_value_ms == 0 else datetime.timedelta(seconds=start_to_close_timeout_value_ms / 1000.0) + initial_interval_value_ms = 1000 + maximum_interval_value_ms = 100000 + initial_interval = datetime.timedelta(seconds=initial_interval_value_ms / 1000.0) + maximum_interval = datetime.timedelta(seconds=maximum_interval_value_ms / 1000.0) + maximum_attempts_value = 0 + maximum_attempts = None if maximum_attempts_value == 0 else maximum_attempts_value + + result = await temporalio.workflow.execute_activity( + "block_main_activity", + input_params, + schedule_to_close_timeout=schedule_to_close_timeout, + start_to_close_timeout=start_to_close_timeout, + task_queue="blocks_transformer_multiply_db086f09c9", + retry_policy=temporalio.common.RetryPolicy( + maximum_attempts=maximum_attempts, + initial_interval=initial_interval, + backoff_coefficient=2, + maximum_interval=maximum_interval + ) + ) + logger.info(f"Completed 'multiply' activity with result: %s", result) + block_status = "completed" + block_error = None + results[node_id] = result + except Exception as e: + logger.error(f"Activity 'multiply' failed with error: {e}") + result = None + block_status = "failed" + block_error = { + "code": type(e).__name__, + "description": str(e), + "details": {"cause": str(getattr(e, "cause", "No additional details"))} + } + workflow_output["status"] = "failed" + # Collect block output + workflow_output["blocks"].append({ + "activity_id": node_id, + "name": block_name, + "status": block_status, + "input": input_params, + "result": result, + "error": block_error + }) + + task_functions["m3aiq7ixuo6du35h8tr"] = task_m3aiq7ixuo6du35h8tr + async def task_m3aiqkrv4k1y6654ymr(): + node_id = "m3aiqkrv4k1y6654ymr" + block_name = "power" + # Prepare inputs + input_params: Dict[str, Any] = {} + try: + source_data = results.get("2", {}) + jsonpath_expr = jmespath.compile("sum") + value = jsonpath_expr.search(source_data) + input_params["product"] = value + except Exception as e: + logger.error(f"Error parsing jsonpath 'sum' for parameter 'product' from node '2': {e}") + input_params["product"] = None + logger.info(f"Starting 'power' activity on task queue 'blocks_transformer_power_057645be0c' with inputs: %s", input_params) + try: + # Convert timeouts and intervals from milliseconds to seconds + schedule_to_close_timeout_value_ms = 0 + start_to_close_timeout_value_ms = 0 + schedule_to_close_timeout = None if schedule_to_close_timeout_value_ms == 0 else datetime.timedelta(seconds=schedule_to_close_timeout_value_ms / 1000.0) + start_to_close_timeout = None if start_to_close_timeout_value_ms == 0 else datetime.timedelta(seconds=start_to_close_timeout_value_ms / 1000.0) + initial_interval_value_ms = 1000 + maximum_interval_value_ms = 100000 + initial_interval = datetime.timedelta(seconds=initial_interval_value_ms / 1000.0) + maximum_interval = datetime.timedelta(seconds=maximum_interval_value_ms / 1000.0) + maximum_attempts_value = 0 + maximum_attempts = None if maximum_attempts_value == 0 else maximum_attempts_value + + result = await temporalio.workflow.execute_activity( + "block_main_activity", + input_params, + schedule_to_close_timeout=schedule_to_close_timeout, + start_to_close_timeout=start_to_close_timeout, + task_queue="blocks_transformer_power_057645be0c", + retry_policy=temporalio.common.RetryPolicy( + maximum_attempts=maximum_attempts, + initial_interval=initial_interval, + backoff_coefficient=2, + maximum_interval=maximum_interval + ) + ) + logger.info(f"Completed 'power' activity with result: %s", result) + block_status = "completed" + block_error = None + results[node_id] = result + except Exception as e: + logger.error(f"Activity 'power' failed with error: {e}") + result = None + block_status = "failed" + block_error = { + "code": type(e).__name__, + "description": str(e), + "details": {"cause": str(getattr(e, "cause", "No additional details"))} + } + workflow_output["status"] = "failed" + # Collect block output + workflow_output["blocks"].append({ + "activity_id": node_id, + "name": block_name, + "status": block_status, + "input": input_params, + "result": result, + "error": block_error + }) + + task_functions["m3aiqkrv4k1y6654ymr"] = task_m3aiqkrv4k1y6654ymr + + # Execute tasks according to execution steps + # Execution step 1 + tasks = [task_functions[node_id]() for node_id in ['2']] + results_step = await asyncio.gather(*tasks, return_exceptions=True) + for result in results_step: + if isinstance(result, Exception): + logger.error(f"Task failed with exception: {result}") + workflow_output["status"] = "failed" + # Execution step 2 + tasks = [task_functions[node_id]() for node_id in ['m3aiq7ixuo6du35h8tr', 'm3aiqkrv4k1y6654ymr']] + results_step = await asyncio.gather(*tasks, return_exceptions=True) + for result in results_step: + if isinstance(result, Exception): + logger.error(f"Task failed with exception: {result}") + workflow_output["status"] = "failed" + + # Update workflow status to completed if not failed + if workflow_output["status"] != "failed": + workflow_output["status"] = "completed" + else: + raise ApplicationError("Activity error occurred", type="ActivityError", non_retryable=True) + + return workflow_output + + except Exception as e: + logger.error(f"Workflow failed with error: {e}") + workflow_output["status"] = "failed" + raise temporalio.exceptions.ApplicationError("Workflow failed",workflow_output,str(e),type="WorkflowError",non_retryable=True) from e diff --git a/expected_workflows/flow_parallel_expected.py b/expected_workflows/flow_parallel_expected.py new file mode 100644 index 0000000..805b10e --- /dev/null +++ b/expected_workflows/flow_parallel_expected.py @@ -0,0 +1,255 @@ +import temporalio.workflow +from typing import Any, Dict, List, Callable, Awaitable +import logging +import asyncio +import json +import datetime +import re +import jmespath +from temporalio.exceptions import ApplicationError + +# Configure logging +logging.basicConfig(level=logging.INFO, + format="%(asctime)s [%(levelname)s] %(message)s") +logger = logging.getLogger(__name__) + +@temporalio.workflow.defn +class test_repo_test_branch: + @temporalio.workflow.run + async def run(self, root_inputs: Dict[str, Any]) -> Dict[str, Any]: + workflow_info = temporalio.workflow.info() + workflow_output: Dict[str, Any] = { + "workflow_id": workflow_info.workflow_id, + "run_id": workflow_info.run_id, + "name": "test_repo_test_branch", + "status": "in_progress", + "blocks": [], + "root_input": root_inputs + } + try: + # Initialize results + results: Dict[str, Any] = {} + + # Define task functions + task_functions: Dict[str, Callable[[], Awaitable[Any]]] = {} + async def task_2(): + node_id = "2" + block_name = "addition" + # Prepare inputs + input_params: Dict[str, Any] = {} + try: + jsonpath_expr = jmespath.compile("a") + value = jsonpath_expr.search(root_inputs) + input_params["a"] = value + except Exception as e: + logger.error(f"Error parsing jsonpath 'a' for parameter 'a': {e}") + input_params["a"] = None + try: + jsonpath_expr = jmespath.compile("b") + value = jsonpath_expr.search(root_inputs) + input_params["b"] = value + except Exception as e: + logger.error(f"Error parsing jsonpath 'b' for parameter 'b': {e}") + input_params["b"] = None + logger.info(f"Starting 'addition' activity on task queue 'blocks_transformer_addition_97ec9e0d50' with inputs: %s", input_params) + try: + # Convert timeouts and intervals from milliseconds to seconds + schedule_to_close_timeout_value_ms = 0 + start_to_close_timeout_value_ms = 0 + schedule_to_close_timeout = None if schedule_to_close_timeout_value_ms == 0 else datetime.timedelta(seconds=schedule_to_close_timeout_value_ms / 1000.0) + start_to_close_timeout = None if start_to_close_timeout_value_ms == 0 else datetime.timedelta(seconds=start_to_close_timeout_value_ms / 1000.0) + initial_interval_value_ms = 1000 + maximum_interval_value_ms = 100000 + initial_interval = datetime.timedelta(seconds=initial_interval_value_ms / 1000.0) + maximum_interval = datetime.timedelta(seconds=maximum_interval_value_ms / 1000.0) + maximum_attempts_value = 0 + maximum_attempts = None if maximum_attempts_value == 0 else maximum_attempts_value + + result = await temporalio.workflow.execute_activity( + "block_main_activity", + input_params, + schedule_to_close_timeout=schedule_to_close_timeout, + start_to_close_timeout=start_to_close_timeout, + task_queue="blocks_transformer_addition_97ec9e0d50", + retry_policy=temporalio.common.RetryPolicy( + maximum_attempts=maximum_attempts, + initial_interval=initial_interval, + backoff_coefficient=2, + maximum_interval=maximum_interval + ) + ) + logger.info(f"Completed 'addition' activity with result: %s", result) + block_status = "completed" + block_error = None + results[node_id] = result + except Exception as e: + logger.error(f"Activity 'addition' failed with error: {e}") + result = None + block_status = "failed" + block_error = { + "code": type(e).__name__, + "description": str(e), + "details": {"cause": str(getattr(e, "cause", "No additional details"))} + } + workflow_output["status"] = "failed" + # Collect block output + workflow_output["blocks"].append({ + "activity_id": node_id, + "name": block_name, + "status": block_status, + "input": input_params, + "result": result, + "error": block_error + }) + + task_functions["2"] = task_2 + async def task_m3aiq7ixuo6du35h8tr(): + node_id = "m3aiq7ixuo6du35h8tr" + block_name = "multiply" + # Prepare inputs + input_params: Dict[str, Any] = {} + try: + jsonpath_expr = jmespath.compile("sum") + value = jsonpath_expr.search(root_inputs) + input_params["sum"] = value + except Exception as e: + logger.error(f"Error parsing jsonpath 'sum' for parameter 'sum': {e}") + input_params["sum"] = None + logger.info(f"Starting 'multiply' activity on task queue 'blocks_transformer_multiply_db086f09c9' with inputs: %s", input_params) + try: + # Convert timeouts and intervals from milliseconds to seconds + schedule_to_close_timeout_value_ms = 0 + start_to_close_timeout_value_ms = 0 + schedule_to_close_timeout = None if schedule_to_close_timeout_value_ms == 0 else datetime.timedelta(seconds=schedule_to_close_timeout_value_ms / 1000.0) + start_to_close_timeout = None if start_to_close_timeout_value_ms == 0 else datetime.timedelta(seconds=start_to_close_timeout_value_ms / 1000.0) + initial_interval_value_ms = 1000 + maximum_interval_value_ms = 100000 + initial_interval = datetime.timedelta(seconds=initial_interval_value_ms / 1000.0) + maximum_interval = datetime.timedelta(seconds=maximum_interval_value_ms / 1000.0) + maximum_attempts_value = 0 + maximum_attempts = None if maximum_attempts_value == 0 else maximum_attempts_value + + result = await temporalio.workflow.execute_activity( + "block_main_activity", + input_params, + schedule_to_close_timeout=schedule_to_close_timeout, + start_to_close_timeout=start_to_close_timeout, + task_queue="blocks_transformer_multiply_db086f09c9", + retry_policy=temporalio.common.RetryPolicy( + maximum_attempts=maximum_attempts, + initial_interval=initial_interval, + backoff_coefficient=2, + maximum_interval=maximum_interval + ) + ) + logger.info(f"Completed 'multiply' activity with result: %s", result) + block_status = "completed" + block_error = None + results[node_id] = result + except Exception as e: + logger.error(f"Activity 'multiply' failed with error: {e}") + result = None + block_status = "failed" + block_error = { + "code": type(e).__name__, + "description": str(e), + "details": {"cause": str(getattr(e, "cause", "No additional details"))} + } + workflow_output["status"] = "failed" + # Collect block output + workflow_output["blocks"].append({ + "activity_id": node_id, + "name": block_name, + "status": block_status, + "input": input_params, + "result": result, + "error": block_error + }) + + task_functions["m3aiq7ixuo6du35h8tr"] = task_m3aiq7ixuo6du35h8tr + async def task_m3aiqkrv4k1y6654ymr(): + node_id = "m3aiqkrv4k1y6654ymr" + block_name = "power" + # Prepare inputs + input_params: Dict[str, Any] = {} + try: + jsonpath_expr = jmespath.compile("product") + value = jsonpath_expr.search(root_inputs) + input_params["product"] = value + except Exception as e: + logger.error(f"Error parsing jsonpath 'product' for parameter 'product': {e}") + input_params["product"] = None + logger.info(f"Starting 'power' activity on task queue 'blocks_transformer_power_057645be0c' with inputs: %s", input_params) + try: + # Convert timeouts and intervals from milliseconds to seconds + schedule_to_close_timeout_value_ms = 0 + start_to_close_timeout_value_ms = 0 + schedule_to_close_timeout = None if schedule_to_close_timeout_value_ms == 0 else datetime.timedelta(seconds=schedule_to_close_timeout_value_ms / 1000.0) + start_to_close_timeout = None if start_to_close_timeout_value_ms == 0 else datetime.timedelta(seconds=start_to_close_timeout_value_ms / 1000.0) + initial_interval_value_ms = 1000 + maximum_interval_value_ms = 100000 + initial_interval = datetime.timedelta(seconds=initial_interval_value_ms / 1000.0) + maximum_interval = datetime.timedelta(seconds=maximum_interval_value_ms / 1000.0) + maximum_attempts_value = 0 + maximum_attempts = None if maximum_attempts_value == 0 else maximum_attempts_value + + result = await temporalio.workflow.execute_activity( + "block_main_activity", + input_params, + schedule_to_close_timeout=schedule_to_close_timeout, + start_to_close_timeout=start_to_close_timeout, + task_queue="blocks_transformer_power_057645be0c", + retry_policy=temporalio.common.RetryPolicy( + maximum_attempts=maximum_attempts, + initial_interval=initial_interval, + backoff_coefficient=2, + maximum_interval=maximum_interval + ) + ) + logger.info(f"Completed 'power' activity with result: %s", result) + block_status = "completed" + block_error = None + results[node_id] = result + except Exception as e: + logger.error(f"Activity 'power' failed with error: {e}") + result = None + block_status = "failed" + block_error = { + "code": type(e).__name__, + "description": str(e), + "details": {"cause": str(getattr(e, "cause", "No additional details"))} + } + workflow_output["status"] = "failed" + # Collect block output + workflow_output["blocks"].append({ + "activity_id": node_id, + "name": block_name, + "status": block_status, + "input": input_params, + "result": result, + "error": block_error + }) + + task_functions["m3aiqkrv4k1y6654ymr"] = task_m3aiqkrv4k1y6654ymr + + # Execute tasks according to execution steps + # Execution step 1 + tasks = [task_functions[node_id]() for node_id in ['2', 'm3aiq7ixuo6du35h8tr', 'm3aiqkrv4k1y6654ymr']] + results_step = await asyncio.gather(*tasks, return_exceptions=True) + for result in results_step: + if isinstance(result, Exception): + logger.error(f"Task failed with exception: {result}") + workflow_output["status"] = "failed" + + # Update workflow status to completed if not failed + if workflow_output["status"] != "failed": + workflow_output["status"] = "completed" + else: + raise ApplicationError("Activity error occurred", type="ActivityError", non_retryable=True) + + return workflow_output + + except Exception as e: + logger.error(f"Workflow failed with error: {e}") + workflow_output["status"] = "failed" + raise temporalio.exceptions.ApplicationError("Workflow failed",workflow_output,str(e),type="WorkflowError",non_retryable=True) from e diff --git a/expected_workflows/flow_sequential_expected.py b/expected_workflows/flow_sequential_expected.py new file mode 100644 index 0000000..33a58e8 --- /dev/null +++ b/expected_workflows/flow_sequential_expected.py @@ -0,0 +1,271 @@ +import temporalio.workflow +from typing import Any, Dict, List, Callable, Awaitable +import logging +import asyncio +import json +import datetime +import re +import jmespath +from temporalio.exceptions import ApplicationError + +# Configure logging +logging.basicConfig(level=logging.INFO, + format="%(asctime)s [%(levelname)s] %(message)s") +logger = logging.getLogger(__name__) + +@temporalio.workflow.defn +class test_repo_test_branch: + @temporalio.workflow.run + async def run(self, root_inputs: Dict[str, Any]) -> Dict[str, Any]: + workflow_info = temporalio.workflow.info() + workflow_output: Dict[str, Any] = { + "workflow_id": workflow_info.workflow_id, + "run_id": workflow_info.run_id, + "name": "test_repo_test_branch", + "status": "in_progress", + "blocks": [], + "root_input": root_inputs + } + try: + # Initialize results + results: Dict[str, Any] = {} + + # Define task functions + task_functions: Dict[str, Callable[[], Awaitable[Any]]] = {} + async def task_2(): + node_id = "2" + block_name = "addition" + # Prepare inputs + input_params: Dict[str, Any] = {} + try: + jsonpath_expr = jmespath.compile("a") + value = jsonpath_expr.search(root_inputs) + input_params["a"] = value + except Exception as e: + logger.error(f"Error parsing jsonpath 'a' for parameter 'a': {e}") + input_params["a"] = None + try: + jsonpath_expr = jmespath.compile("b") + value = jsonpath_expr.search(root_inputs) + input_params["b"] = value + except Exception as e: + logger.error(f"Error parsing jsonpath 'b' for parameter 'b': {e}") + input_params["b"] = None + logger.info(f"Starting 'addition' activity on task queue 'blocks_transformer_addition_97ec9e0d50' with inputs: %s", input_params) + try: + # Convert timeouts and intervals from milliseconds to seconds + schedule_to_close_timeout_value_ms = 0 + start_to_close_timeout_value_ms = 0 + schedule_to_close_timeout = None if schedule_to_close_timeout_value_ms == 0 else datetime.timedelta(seconds=schedule_to_close_timeout_value_ms / 1000.0) + start_to_close_timeout = None if start_to_close_timeout_value_ms == 0 else datetime.timedelta(seconds=start_to_close_timeout_value_ms / 1000.0) + initial_interval_value_ms = 1000 + maximum_interval_value_ms = 100000 + initial_interval = datetime.timedelta(seconds=initial_interval_value_ms / 1000.0) + maximum_interval = datetime.timedelta(seconds=maximum_interval_value_ms / 1000.0) + maximum_attempts_value = 0 + maximum_attempts = None if maximum_attempts_value == 0 else maximum_attempts_value + + result = await temporalio.workflow.execute_activity( + "block_main_activity", + input_params, + schedule_to_close_timeout=schedule_to_close_timeout, + start_to_close_timeout=start_to_close_timeout, + task_queue="blocks_transformer_addition_97ec9e0d50", + retry_policy=temporalio.common.RetryPolicy( + maximum_attempts=maximum_attempts, + initial_interval=initial_interval, + backoff_coefficient=2, + maximum_interval=maximum_interval + ) + ) + logger.info(f"Completed 'addition' activity with result: %s", result) + block_status = "completed" + block_error = None + results[node_id] = result + except Exception as e: + logger.error(f"Activity 'addition' failed with error: {e}") + result = None + block_status = "failed" + block_error = { + "code": type(e).__name__, + "description": str(e), + "details": {"cause": str(getattr(e, "cause", "No additional details"))} + } + workflow_output["status"] = "failed" + # Collect block output + workflow_output["blocks"].append({ + "activity_id": node_id, + "name": block_name, + "status": block_status, + "input": input_params, + "result": result, + "error": block_error + }) + + task_functions["2"] = task_2 + async def task_m3aiq7ixuo6du35h8tr(): + node_id = "m3aiq7ixuo6du35h8tr" + block_name = "multiply" + # Prepare inputs + input_params: Dict[str, Any] = {} + try: + source_data = results.get("2", {}) + jsonpath_expr = jmespath.compile("sum") + value = jsonpath_expr.search(source_data) + input_params["sum"] = value + except Exception as e: + logger.error(f"Error parsing jsonpath 'sum' for parameter 'sum' from node '2': {e}") + input_params["sum"] = None + logger.info(f"Starting 'multiply' activity on task queue 'blocks_transformer_multiply_db086f09c9' with inputs: %s", input_params) + try: + # Convert timeouts and intervals from milliseconds to seconds + schedule_to_close_timeout_value_ms = 0 + start_to_close_timeout_value_ms = 0 + schedule_to_close_timeout = None if schedule_to_close_timeout_value_ms == 0 else datetime.timedelta(seconds=schedule_to_close_timeout_value_ms / 1000.0) + start_to_close_timeout = None if start_to_close_timeout_value_ms == 0 else datetime.timedelta(seconds=start_to_close_timeout_value_ms / 1000.0) + initial_interval_value_ms = 1000 + maximum_interval_value_ms = 100000 + initial_interval = datetime.timedelta(seconds=initial_interval_value_ms / 1000.0) + maximum_interval = datetime.timedelta(seconds=maximum_interval_value_ms / 1000.0) + maximum_attempts_value = 0 + maximum_attempts = None if maximum_attempts_value == 0 else maximum_attempts_value + + result = await temporalio.workflow.execute_activity( + "block_main_activity", + input_params, + schedule_to_close_timeout=schedule_to_close_timeout, + start_to_close_timeout=start_to_close_timeout, + task_queue="blocks_transformer_multiply_db086f09c9", + retry_policy=temporalio.common.RetryPolicy( + maximum_attempts=maximum_attempts, + initial_interval=initial_interval, + backoff_coefficient=2, + maximum_interval=maximum_interval + ) + ) + logger.info(f"Completed 'multiply' activity with result: %s", result) + block_status = "completed" + block_error = None + results[node_id] = result + except Exception as e: + logger.error(f"Activity 'multiply' failed with error: {e}") + result = None + block_status = "failed" + block_error = { + "code": type(e).__name__, + "description": str(e), + "details": {"cause": str(getattr(e, "cause", "No additional details"))} + } + workflow_output["status"] = "failed" + # Collect block output + workflow_output["blocks"].append({ + "activity_id": node_id, + "name": block_name, + "status": block_status, + "input": input_params, + "result": result, + "error": block_error + }) + + task_functions["m3aiq7ixuo6du35h8tr"] = task_m3aiq7ixuo6du35h8tr + async def task_m3aiqkrv4k1y6654ymr(): + node_id = "m3aiqkrv4k1y6654ymr" + block_name = "power" + # Prepare inputs + input_params: Dict[str, Any] = {} + try: + source_data = results.get("m3aiq7ixuo6du35h8tr", {}) + jsonpath_expr = jmespath.compile("product") + value = jsonpath_expr.search(source_data) + input_params["product"] = value + except Exception as e: + logger.error(f"Error parsing jsonpath 'product' for parameter 'product' from node 'm3aiq7ixuo6du35h8tr': {e}") + input_params["product"] = None + logger.info(f"Starting 'power' activity on task queue 'blocks_transformer_power_057645be0c' with inputs: %s", input_params) + try: + # Convert timeouts and intervals from milliseconds to seconds + schedule_to_close_timeout_value_ms = 0 + start_to_close_timeout_value_ms = 0 + schedule_to_close_timeout = None if schedule_to_close_timeout_value_ms == 0 else datetime.timedelta(seconds=schedule_to_close_timeout_value_ms / 1000.0) + start_to_close_timeout = None if start_to_close_timeout_value_ms == 0 else datetime.timedelta(seconds=start_to_close_timeout_value_ms / 1000.0) + initial_interval_value_ms = 1000 + maximum_interval_value_ms = 100000 + initial_interval = datetime.timedelta(seconds=initial_interval_value_ms / 1000.0) + maximum_interval = datetime.timedelta(seconds=maximum_interval_value_ms / 1000.0) + maximum_attempts_value = 0 + maximum_attempts = None if maximum_attempts_value == 0 else maximum_attempts_value + + result = await temporalio.workflow.execute_activity( + "block_main_activity", + input_params, + schedule_to_close_timeout=schedule_to_close_timeout, + start_to_close_timeout=start_to_close_timeout, + task_queue="blocks_transformer_power_057645be0c", + retry_policy=temporalio.common.RetryPolicy( + maximum_attempts=maximum_attempts, + initial_interval=initial_interval, + backoff_coefficient=2, + maximum_interval=maximum_interval + ) + ) + logger.info(f"Completed 'power' activity with result: %s", result) + block_status = "completed" + block_error = None + results[node_id] = result + except Exception as e: + logger.error(f"Activity 'power' failed with error: {e}") + result = None + block_status = "failed" + block_error = { + "code": type(e).__name__, + "description": str(e), + "details": {"cause": str(getattr(e, "cause", "No additional details"))} + } + workflow_output["status"] = "failed" + # Collect block output + workflow_output["blocks"].append({ + "activity_id": node_id, + "name": block_name, + "status": block_status, + "input": input_params, + "result": result, + "error": block_error + }) + + task_functions["m3aiqkrv4k1y6654ymr"] = task_m3aiqkrv4k1y6654ymr + + # Execute tasks according to execution steps + # Execution step 1 + tasks = [task_functions[node_id]() for node_id in ['2']] + results_step = await asyncio.gather(*tasks, return_exceptions=True) + for result in results_step: + if isinstance(result, Exception): + logger.error(f"Task failed with exception: {result}") + workflow_output["status"] = "failed" + # Execution step 2 + tasks = [task_functions[node_id]() for node_id in ['m3aiq7ixuo6du35h8tr']] + results_step = await asyncio.gather(*tasks, return_exceptions=True) + for result in results_step: + if isinstance(result, Exception): + logger.error(f"Task failed with exception: {result}") + workflow_output["status"] = "failed" + # Execution step 3 + tasks = [task_functions[node_id]() for node_id in ['m3aiqkrv4k1y6654ymr']] + results_step = await asyncio.gather(*tasks, return_exceptions=True) + for result in results_step: + if isinstance(result, Exception): + logger.error(f"Task failed with exception: {result}") + workflow_output["status"] = "failed" + + # Update workflow status to completed if not failed + if workflow_output["status"] != "failed": + workflow_output["status"] = "completed" + else: + raise ApplicationError("Activity error occurred", type="ActivityError", non_retryable=True) + + return workflow_output + + except Exception as e: + logger.error(f"Workflow failed with error: {e}") + workflow_output["status"] = "failed" + raise temporalio.exceptions.ApplicationError("Workflow failed",workflow_output,str(e),type="WorkflowError",non_retryable=True) from e diff --git a/flow_wrapper.py b/flow_wrapper.py new file mode 100644 index 0000000..cd6adeb --- /dev/null +++ b/flow_wrapper.py @@ -0,0 +1,83 @@ +import asyncio +import os +import re +import sys +import logging +from temporalio.client import Client +from temporalio.worker import Worker + +# Ensure the generated workflow module is in the Python path +sys.path.append(os.path.dirname(os.path.abspath(__file__))) + +# Configure logging +logging.basicConfig( + level=logging.INFO, + format="%(asctime)s [%(levelname)s] %(name)s - %(message)s", +) +logger = logging.getLogger(__name__) + +# Retrieve environment variables +REPO_NAME = os.getenv('REPO_NAME') +BRANCH_NAME = os.getenv('BRANCH_NAME') +COMMIT_ID = os.getenv('VERSION') +NAMESPACE = os.getenv('NAMESPACE') +FLOWX_ENGINE_ADDRESS = os.getenv('FLOWX_ENGINE_ADDRESS') + +if not BRANCH_NAME or not COMMIT_ID or not BRANCH_NAME or not NAMESPACE or not FLOWX_ENGINE_ADDRESS: + raise ValueError("Environment variables BRANCH_NAME, VERSION, BRANCH_NAME, NAMESPACE and FLOWX_ENGINE_ADDRESS must be set.") + + +# Shorten the commit ID to the first 10 characters +COMMIT_ID_SHORT = COMMIT_ID[:10] + +# Sanitize flow name and commit ID to create a valid task queue name +def sanitize_name(name): + # Replace non-alphanumeric characters or invalid start with underscores + sanitized = re.sub(r'\W|^(?=\d)', '_', name) + # Replace multiple consecutive underscores with a single underscore + sanitized = re.sub(r'_+', '_', sanitized) + # Remove trailing underscores + return sanitized.strip('_') + +FLOW_NAME = REPO_NAME + "_" + BRANCH_NAME +flow_name_safe = sanitize_name(FLOW_NAME) +commit_id_safe = sanitize_name(COMMIT_ID_SHORT) + +# Construct the task queue name +TASK_QUEUE = f"{flow_name_safe}_{commit_id_safe}" + +# Import the default workflow module +workflow_module_name = "workflow" # Hardcoded to the default module name 'workflow.py' +try: + workflow_module = __import__(workflow_module_name) +except ImportError as e: + raise ImportError(f"Failed to import workflow module '{workflow_module_name}': {e}") + +# Get the workflow class +# Assuming the workflow class is named as Workflow, e.g., HybridWorkflow +workflow_class_name = f"{flow_name_safe}_{commit_id_safe}" +workflow_class = getattr(workflow_module, workflow_class_name, None) + +if not workflow_class: + raise AttributeError(f"Workflow class '{workflow_class_name}' not found in module '{workflow_module_name}'.") + +async def main(): + """ + Initialize and run the worker with the activity. + """ + try: + client = await Client.connect(FLOWX_ENGINE_ADDRESS, namespace=NAMESPACE) + # No activities are registered since they are in separate containers + worker = Worker( + client, + task_queue=TASK_QUEUE, + workflows=[workflow_class], + ) + logger.info("Worker starting for %s, listening to task queue: %s", workflow_class_name, TASK_QUEUE) + await worker.run() + except Exception as e: + logger.critical("Worker failed to start: %s", e) + raise + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/generator.py b/generator.py new file mode 100644 index 0000000..9c1b702 --- /dev/null +++ b/generator.py @@ -0,0 +1,160 @@ +import json +import os +import sys +from jinja2 import Environment, FileSystemLoader +import networkx as nx +import re # Import the re module for regular expressions +import argparse # Import the argparse module for parsing arguments + +# Define paths for templates and output +TEMPLATE_DIR = 'templates' + +# Load Jinja environment +env = Environment(loader=FileSystemLoader(TEMPLATE_DIR), + extensions=["jinja2.ext.do"]) + +# Add regex_replace filter +def regex_replace(s, pattern, repl): + return re.sub(pattern, repl, s) + +env.filters['regex_replace'] = regex_replace + +# Retrieve environment variables +REPO_NAME = os.getenv('REPO_NAME') +BRANCH_NAME = os.getenv('BRANCH_NAME') +COMMIT_ID = os.getenv('VERSION') +NAMESPACE = os.getenv('NAMESPACE') + +if not BRANCH_NAME or not COMMIT_ID or not BRANCH_NAME or not NAMESPACE: + raise ValueError("Environment variables BRANCH_NAME, VERSION, BRANCH_NAME, NAMESPACE must be set.") + +# Shorten the commit ID to the first 10 characters +COMMIT_ID_SHORT = COMMIT_ID[:10] + +# Sanitize flow name and commit ID to create a valid task queue name +def sanitize_name(name): + # Replace non-alphanumeric characters or invalid start with underscores + sanitized = re.sub(r'\W|^(?=\d)', '_', name) + # Replace multiple consecutive underscores with a single underscore + sanitized = re.sub(r'_+', '_', sanitized) + # Remove trailing underscores + return sanitized.strip('_') + +FLOW_NAME = REPO_NAME + "_" + BRANCH_NAME +flow_name_safe = sanitize_name(FLOW_NAME) +commit_id_safe = sanitize_name(COMMIT_ID_SHORT) + +workflow_class_name = f"{flow_name_safe}_{commit_id_safe}" + +def load_flow_definition(file_path): + """Load the flow definition from a JSON file.""" + try: + with open(file_path, "r") as f: + return json.load(f) + except json.JSONDecodeError as e: + print(f"Error loading JSON file '{file_path}': {e}") + sys.exit(1) + +def determine_flow_type(flow_definition): + """Determine the flow type based on the edges.""" + nodes = {node["id"]: node for node in flow_definition["nodes"]} + edges = flow_definition["edges"] + has_parallel = False + for node_id in nodes: + outgoing_edges = [e for e in edges if e["source"] == node_id] + if len(outgoing_edges) > 1: + has_parallel = True + break + if has_parallel: + return "hybrid" + elif len(edges) > 0: + return "sequential" + else: + return "parallel" + +def collect_root_input_keys(flow_definition): + """Collect all unique root input keys from the flow definition.""" + root_input_keys = set() + for node in flow_definition.get("nodes", []): # Safeguard for missing nodes + properties = node.get("data", {}).get("nodeConfig", {}).get("schema", {}).get("properties", {}) + for key, value in properties.items(): + source = value.get("source") + if isinstance(source, str) and source.startswith("$root."): + root_input_keys.add(source[6:]) # Adjusted to capture full path after $root. + return list(root_input_keys) + +def build_execution_graph(flow_definition): + """Builds an execution graph from the flow definition using networkx.""" + G = nx.DiGraph() + nodes = {node["id"]: node for node in flow_definition["nodes"]} + edges = flow_definition["edges"] + + # Add nodes + for node_id, node in nodes.items(): + G.add_node(node_id, node=node) + + # Add edges + for edge in edges: + G.add_edge(edge["source"], edge["target"]) + + return G + +def get_execution_steps(G): + """Returns a list of execution steps, each containing nodes that can be run in parallel.""" + try: + levels = list(nx.topological_generations(G)) + execution_steps = [list(level) for level in levels] + return execution_steps + except nx.NetworkXUnfeasible: + print("Error: Workflow graph has cycles.") + sys.exit(1) + +def generate_workflow(flow_definition, output_file): + """Generate the workflow code using the Jinja template.""" + template = env.get_template('workflow_template.py.j2') + flow_type = determine_flow_type(flow_definition) + root_input_keys = collect_root_input_keys(flow_definition) + + # Filter out requestNode from nodes + filtered_nodes = {node["id"]: node for node in flow_definition["nodes"] if node["type"] != "requestNode"} + + # Filter edges to exclude connections to or from filtered nodes + filtered_edges = [ + edge for edge in flow_definition["edges"] + if edge["source"] in filtered_nodes and edge["target"] in filtered_nodes + ] + + # Build execution graph and steps + filtered_flow_definition = { + "nodes": list(filtered_nodes.values()), + "edges": filtered_edges, + } + G = build_execution_graph(filtered_flow_definition) + execution_steps = get_execution_steps(G) + + # Render the workflow template + workflow_code = template.render( + workflow_class_name=workflow_class_name, + flow_type=flow_type, + root_input_keys=root_input_keys, + execution_steps=execution_steps, + nodes=filtered_nodes + ) + with open(output_file, "w") as f: + f.write(workflow_code) + print(f"Generated workflow: {output_file}") + +if __name__ == "__main__": + # Parse command-line arguments + parser = argparse.ArgumentParser(description="Generate Temporal workflow from JSON flow definition.") + parser.add_argument("--input-file", type=str, required=True, + help="Path to the flow definition JSON file.") + parser.add_argument("--output-file", type=str, required=True, + help="Path to the generated workflow output file.") + args = parser.parse_args() + + # Load the flow definition and generate the workflow + flow_file = args.input_file + output_file = args.output_file + flow_def = load_flow_definition(flow_file) + generate_workflow(flow_def, output_file) diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..7d33792 --- /dev/null +++ b/requirements.txt @@ -0,0 +1,5 @@ +temporalio==1.6.0 +jinja2==3.1.4 +pytest==8.3.3 +networkx==3.4.2 +jmespath==1.0.1 diff --git a/sample_flows/flow_hybrid.json b/sample_flows/flow_hybrid.json new file mode 100644 index 0000000..ba24c68 --- /dev/null +++ b/sample_flows/flow_hybrid.json @@ -0,0 +1,222 @@ +{ + "nodes": [ + { + "id": "2", + "position": { + "x": 0, + "y": 0 + }, + "data": { + "nodeConfig": { + "activityConfig": { + "retryPolicy": { + "maximumAttempts": 0, + "initialInterval": 1000, + "backoffCoefficient": 2, + "maximumInterval": 100000 + }, + "timeouts": { + "startToCloseTimeout": 0, + "scheduleToStartTimeout": 0, + "scheduleToCloseTimeout": 0, + "heartbeatTimeout": 0 + }, + "taskQueue": { + "taskQueueName": "" + }, + "advancedSettings": { + "cancellationType": "TRY_CANCEL", + "heartbeatEnabled": false + } + }, + "blockName": "addition", + "commitId": "97ec9e0d50ca7308347b28f8c006a475357eb096", + "repo_url": "http://centurion-version-control.default.svc.cluster.local:3000/Centurion/blocks_transformer.git", + "schema": { + "$schema": "http://json-schema.org/draft-07/schema#", + "type": "object", + "properties": { + "a": { + "type": "integer", + "description": "First number to add.", + "source": "$root.a" + }, + "b": { + "type": "integer", + "description": "Second number to add.", + "source": "$root.b" + } + }, + "required": [ + "a", + "b" + ] + } + }, + "outputSchema": { + "$schema": "http://json-schema.org/draft-07/schema#", + "type": "object", + "properties": { + "sum": { + "type": "integer", + "description": "Sum of the two numbers." + } + } + } + }, + "type": "transformerNode", + "positionAbsolute": { + "x": 0, + "y": 0 + }, + "width": 260, + "height": 79 + }, + { + "id": "m3aiq7ixuo6du35h8tr", + "position": { + "x": 0, + "y": 150 + }, + "data": { + "nodeConfig": { + "activityConfig": { + "retryPolicy": { + "maximumAttempts": 0, + "initialInterval": 1000, + "backoffCoefficient": 2, + "maximumInterval": 100000 + }, + "timeouts": { + "startToCloseTimeout": 0, + "scheduleToStartTimeout": 0, + "scheduleToCloseTimeout": 0, + "heartbeatTimeout": 0 + }, + "taskQueue": { + "taskQueueName": "" + }, + "advancedSettings": { + "cancellationType": "TRY_CANCEL", + "heartbeatEnabled": false + } + }, + "blockName": "multiply", + "commitId": "db086f09c9df60f622c9be47734422454f5e181f", + "repo_url": "http://centurion-version-control.default.svc.cluster.local:3000/Centurion/blocks_transformer.git", + "schema": { + "$schema": "http://json-schema.org/draft-07/schema#", + "type": "object", + "properties": { + "sum": { + "type": "integer", + "description": "Sum from the previous block.", + "source": "$2.sum" + } + }, + "required": [ + "sum" + ] + } + }, + "outputSchema": { + "$schema": "http://json-schema.org/draft-07/schema#", + "type": "object", + "properties": { + "product": { + "type": "integer", + "description": "Product of the sum and the multiplier." + } + } + } + }, + "type": "transformerNode", + "positionAbsolute": { + "x": 0, + "y": 150 + }, + "width": 260, + "height": 79 + }, + { + "id": "m3aiqkrv4k1y6654ymr", + "position": { + "x": 0, + "y": 300 + }, + "data": { + "nodeConfig": { + "activityConfig": { + "retryPolicy": { + "maximumAttempts": 0, + "initialInterval": 1000, + "backoffCoefficient": 2, + "maximumInterval": 100000 + }, + "timeouts": { + "startToCloseTimeout": 0, + "scheduleToStartTimeout": 0, + "scheduleToCloseTimeout": 0, + "heartbeatTimeout": 0 + }, + "taskQueue": { + "taskQueueName": "" + }, + "advancedSettings": { + "cancellationType": "TRY_CANCEL", + "heartbeatEnabled": false + } + }, + "blockName": "power", + "commitId": "057645be0cace46e3ea08d65f26fbc6dfd9348bd", + "repo_url": "http://centurion-version-control.default.svc.cluster.local:3000/Centurion/blocks_transformer.git", + "schema": { + "$schema": "http://json-schema.org/draft-07/schema#", + "type": "object", + "properties": { + "product": { + "type": "integer", + "description": "Product from the previous block.", + "source": "$2.sum" + } + }, + "required": [ + "product" + ] + } + }, + "outputSchema": { + "$schema": "http://json-schema.org/draft-07/schema#", + "type": "object", + "properties": { + "power": { + "type": "integer", + "description": "Result of raising the product to the power." + } + } + } + }, + "type": "transformerNode", + "positionAbsolute": { + "x": 0, + "y": 300 + }, + "width": 260, + "height": 79 + } + ], + "edges": [ + { + "id": "2=>m3aiq7ixuo6du35h8tr", + "source": "2", + "target": "m3aiq7ixuo6du35h8tr", + "type": "workflow" + }, + { + "id": "2=>m3aiqkrv4k1y6654ymr", + "source": "2", + "target": "m3aiqkrv4k1y6654ymr", + "type": "workflow" + } + ] +} diff --git a/sample_flows/flow_parallel.json b/sample_flows/flow_parallel.json new file mode 100644 index 0000000..45f7f9e --- /dev/null +++ b/sample_flows/flow_parallel.json @@ -0,0 +1,209 @@ +{ + "nodes": [ + { + "id": "2", + "position": { + "x": 0, + "y": 0 + }, + "data": { + "nodeConfig": { + "activityConfig": { + "retryPolicy": { + "maximumAttempts": 0, + "initialInterval": 1000, + "backoffCoefficient": 2, + "maximumInterval": 100000 + }, + "timeouts": { + "startToCloseTimeout": 0, + "scheduleToStartTimeout": 0, + "scheduleToCloseTimeout": 0, + "heartbeatTimeout": 0 + }, + "taskQueue": { + "taskQueueName": "" + }, + "advancedSettings": { + "cancellationType": "TRY_CANCEL", + "heartbeatEnabled": false + } + }, + "blockName": "addition", + "commitId": "97ec9e0d50ca7308347b28f8c006a475357eb096", + "repo_url": "http://centurion-version-control.default.svc.cluster.local:3000/Centurion/blocks_transformer.git", + "schema": { + "$schema": "http://json-schema.org/draft-07/schema#", + "type": "object", + "properties": { + "a": { + "type": "integer", + "description": "First number to add.", + "source": "$root.a" + }, + "b": { + "type": "integer", + "description": "Second number to add.", + "source": "$root.b" + } + }, + "required": [ + "a", + "b" + ] + } + }, + "outputSchema": { + "$schema": "http://json-schema.org/draft-07/schema#", + "type": "object", + "properties": { + "sum": { + "type": "integer", + "description": "Sum of the two numbers." + } + } + } + }, + "type": "transformerNode", + "positionAbsolute": { + "x": 0, + "y": 0 + }, + "width": 260, + "height": 79 + }, + { + "id": "m3aiq7ixuo6du35h8tr", + "position": { + "x": 0, + "y": 150 + }, + "data": { + "nodeConfig": { + "activityConfig": { + "retryPolicy": { + "maximumAttempts": 0, + "initialInterval": 1000, + "backoffCoefficient": 2, + "maximumInterval": 100000 + }, + "timeouts": { + "startToCloseTimeout": 0, + "scheduleToStartTimeout": 0, + "scheduleToCloseTimeout": 0, + "heartbeatTimeout": 0 + }, + "taskQueue": { + "taskQueueName": "" + }, + "advancedSettings": { + "cancellationType": "TRY_CANCEL", + "heartbeatEnabled": false + } + }, + "blockName": "multiply", + "commitId": "db086f09c9df60f622c9be47734422454f5e181f", + "repo_url": "http://centurion-version-control.default.svc.cluster.local:3000/Centurion/blocks_transformer.git", + "schema": { + "$schema": "http://json-schema.org/draft-07/schema#", + "type": "object", + "properties": { + "sum": { + "type": "integer", + "description": "Sum from the previous block.", + "source": "$root.sum" + } + }, + "required": [ + "sum" + ] + } + }, + "outputSchema": { + "$schema": "http://json-schema.org/draft-07/schema#", + "type": "object", + "properties": { + "product": { + "type": "integer", + "description": "Product of the sum and the multiplier." + } + } + } + }, + "type": "transformerNode", + "positionAbsolute": { + "x": 0, + "y": 150 + }, + "width": 260, + "height": 79 + }, + { + "id": "m3aiqkrv4k1y6654ymr", + "position": { + "x": 0, + "y": 300 + }, + "data": { + "nodeConfig": { + "activityConfig": { + "retryPolicy": { + "maximumAttempts": 0, + "initialInterval": 1000, + "backoffCoefficient": 2, + "maximumInterval": 100000 + }, + "timeouts": { + "startToCloseTimeout": 0, + "scheduleToStartTimeout": 0, + "scheduleToCloseTimeout": 0, + "heartbeatTimeout": 0 + }, + "taskQueue": { + "taskQueueName": "" + }, + "advancedSettings": { + "cancellationType": "TRY_CANCEL", + "heartbeatEnabled": false + } + }, + "blockName": "power", + "commitId": "057645be0cace46e3ea08d65f26fbc6dfd9348bd", + "repo_url": "http://centurion-version-control.default.svc.cluster.local:3000/Centurion/blocks_transformer.git", + "schema": { + "$schema": "http://json-schema.org/draft-07/schema#", + "type": "object", + "properties": { + "product": { + "type": "integer", + "description": "Product from the previous block.", + "source": "$root.product" + } + }, + "required": [ + "product" + ] + } + }, + "outputSchema": { + "$schema": "http://json-schema.org/draft-07/schema#", + "type": "object", + "properties": { + "power": { + "type": "integer", + "description": "Result of raising the product to the power." + } + } + } + }, + "type": "transformerNode", + "positionAbsolute": { + "x": 0, + "y": 300 + }, + "width": 260, + "height": 79 + } + ], + "edges": [] +} diff --git a/sample_flows/flow_sequential.json b/sample_flows/flow_sequential.json new file mode 100644 index 0000000..63473dd --- /dev/null +++ b/sample_flows/flow_sequential.json @@ -0,0 +1,222 @@ +{ + "nodes": [ + { + "id": "2", + "position": { + "x": 0, + "y": 0 + }, + "data": { + "nodeConfig": { + "activityConfig": { + "retryPolicy": { + "maximumAttempts": 0, + "initialInterval": 1000, + "backoffCoefficient": 2, + "maximumInterval": 100000 + }, + "timeouts": { + "startToCloseTimeout": 0, + "scheduleToStartTimeout": 0, + "scheduleToCloseTimeout": 0, + "heartbeatTimeout": 0 + }, + "taskQueue": { + "taskQueueName": "" + }, + "advancedSettings": { + "cancellationType": "TRY_CANCEL", + "heartbeatEnabled": false + } + }, + "blockName": "addition", + "commitId": "97ec9e0d50ca7308347b28f8c006a475357eb096", + "repo_url": "http://centurion-version-control.default.svc.cluster.local:3000/Centurion/blocks_transformer.git", + "schema": { + "$schema": "http://json-schema.org/draft-07/schema#", + "type": "object", + "properties": { + "a": { + "type": "integer", + "description": "First number to add.", + "source": "$root.a" + }, + "b": { + "type": "integer", + "description": "Second number to add.", + "source": "$root.b" + } + }, + "required": [ + "a", + "b" + ] + } + }, + "outputSchema": { + "$schema": "http://json-schema.org/draft-07/schema#", + "type": "object", + "properties": { + "sum": { + "type": "integer", + "description": "Sum of the two numbers." + } + } + } + }, + "type": "transformerNode", + "positionAbsolute": { + "x": 0, + "y": 0 + }, + "width": 260, + "height": 79 + }, + { + "id": "m3aiq7ixuo6du35h8tr", + "position": { + "x": 0, + "y": 150 + }, + "data": { + "nodeConfig": { + "activityConfig": { + "retryPolicy": { + "maximumAttempts": 0, + "initialInterval": 1000, + "backoffCoefficient": 2, + "maximumInterval": 100000 + }, + "timeouts": { + "startToCloseTimeout": 0, + "scheduleToStartTimeout": 0, + "scheduleToCloseTimeout": 0, + "heartbeatTimeout": 0 + }, + "taskQueue": { + "taskQueueName": "" + }, + "advancedSettings": { + "cancellationType": "TRY_CANCEL", + "heartbeatEnabled": false + } + }, + "blockName": "multiply", + "commitId": "db086f09c9df60f622c9be47734422454f5e181f", + "repo_url": "http://centurion-version-control.default.svc.cluster.local:3000/Centurion/blocks_transformer.git", + "schema": { + "$schema": "http://json-schema.org/draft-07/schema#", + "type": "object", + "properties": { + "sum": { + "type": "integer", + "description": "Sum from the previous block.", + "source": "$2.sum" + } + }, + "required": [ + "sum" + ] + } + }, + "outputSchema": { + "$schema": "http://json-schema.org/draft-07/schema#", + "type": "object", + "properties": { + "product": { + "type": "integer", + "description": "Product of the sum and the multiplier." + } + } + } + }, + "type": "transformerNode", + "positionAbsolute": { + "x": 0, + "y": 150 + }, + "width": 260, + "height": 79 + }, + { + "id": "m3aiqkrv4k1y6654ymr", + "position": { + "x": 0, + "y": 300 + }, + "data": { + "nodeConfig": { + "activityConfig": { + "retryPolicy": { + "maximumAttempts": 0, + "initialInterval": 1000, + "backoffCoefficient": 2, + "maximumInterval": 100000 + }, + "timeouts": { + "startToCloseTimeout": 0, + "scheduleToStartTimeout": 0, + "scheduleToCloseTimeout": 0, + "heartbeatTimeout": 0 + }, + "taskQueue": { + "taskQueueName": "" + }, + "advancedSettings": { + "cancellationType": "TRY_CANCEL", + "heartbeatEnabled": false + } + }, + "blockName": "power", + "commitId": "057645be0cace46e3ea08d65f26fbc6dfd9348bd", + "repo_url": "http://centurion-version-control.default.svc.cluster.local:3000/Centurion/blocks_transformer.git", + "schema": { + "$schema": "http://json-schema.org/draft-07/schema#", + "type": "object", + "properties": { + "product": { + "type": "integer", + "description": "Product from the previous block.", + "source": "$m3aiq7ixuo6du35h8tr.product" + } + }, + "required": [ + "product" + ] + } + }, + "outputSchema": { + "$schema": "http://json-schema.org/draft-07/schema#", + "type": "object", + "properties": { + "power": { + "type": "integer", + "description": "Result of raising the product to the power." + } + } + } + }, + "type": "transformerNode", + "positionAbsolute": { + "x": 0, + "y": 300 + }, + "width": 260, + "height": 79 + } + ], + "edges": [ + { + "id": "2=>m3aiq7ixuo6du35h8tr", + "source": "2", + "target": "m3aiq7ixuo6du35h8tr", + "type": "workflow" + }, + { + "id": "m3aiq7ixuo6du35h8tr=>m3aiqkrv4k1y6654ymr", + "source": "m3aiq7ixuo6du35h8tr", + "target": "m3aiqkrv4k1y6654ymr", + "type": "workflow" + } + ] +} diff --git a/templates/workflow_template.py.j2 b/templates/workflow_template.py.j2 new file mode 100644 index 0000000..4f8dd1b --- /dev/null +++ b/templates/workflow_template.py.j2 @@ -0,0 +1,153 @@ +import temporalio.workflow +from typing import Any, Dict, List, Callable, Awaitable +import logging +import asyncio +import json +import datetime +import re +import jmespath +from temporalio.exceptions import ApplicationError + +# Configure logging +logging.basicConfig(level=logging.INFO, + format="%(asctime)s [%(levelname)s] %(message)s") +logger = logging.getLogger(__name__) + +@temporalio.workflow.defn +class {{ workflow_class_name }}: + @temporalio.workflow.run + async def run(self, root_inputs: Dict[str, Any]) -> Dict[str, Any]: + workflow_info = temporalio.workflow.info() + workflow_output: Dict[str, Any] = { + "workflow_id": workflow_info.workflow_id, + "run_id": workflow_info.run_id, + "name": "{{ workflow_class_name }}", + "status": "in_progress", + "blocks": [], + "root_input": root_inputs + } + try: + # Initialize results + results: Dict[str, Any] = {} + + # Define task functions + task_functions: Dict[str, Callable[[], Awaitable[Any]]] = {} + + {%- for node_id, node in nodes.items() %} + {%- set block_name = node['data']['nodeConfig']['blockName'] %} + {%- set commit_id = node['data']['nodeConfig'].get('commitId', '') %} + {%- set commit_id_short = commit_id[:10] %} + {%- set repo_url = node['data']['nodeConfig']['repo_url'] %} + {%- set repo_name_1 = repo_url.split('/')[-1].split('.')[0] %} + {%- set repo_name = repo_name_1|regex_replace('[^A-Za-z0-9_]', '_') %} + {%- set block_name_safe = block_name|regex_replace('[^A-Za-z0-9_]', '_') %} + {%- set commit_id_short_safe = commit_id_short|regex_replace('[^A-Za-z0-9_]', '_') %} + {%- set task_queue_name = repo_name + '_' + block_name_safe + '_' + commit_id_short_safe %} + async def task_{{ node_id }}(): + node_id = "{{ node_id }}" + block_name = "{{ block_name }}" + # Prepare inputs + input_params: Dict[str, Any] = {} + {%- for param, details in node['data']['nodeConfig']['schema']['properties'].items() %} + {%- set source = details.get("source", "") %} + {%- if source and source.startswith("$root.") %} + try: + jsonpath_expr = jmespath.compile("{{ source[6:] }}") + value = jsonpath_expr.search(root_inputs) + input_params["{{ param }}"] = value + except Exception as e: + logger.error(f"Error parsing jsonpath '{{ source[6:] }}' for parameter '{{ param }}': {e}") + input_params["{{ param }}"] = None + {%- elif source and source.startswith("$") %} + {%- set source_parts = source[1:].split('.', 1) %} + {%- set source_node_id = source_parts[0] %} + {%- set source_path = source_parts[1] if source_parts|length > 1 else '' %} + try: + source_data = results.get("{{ source_node_id }}", {}) + jsonpath_expr = jmespath.compile("{{ source_path }}") + value = jsonpath_expr.search(source_data) + input_params["{{ param }}"] = value + except Exception as e: + logger.error(f"Error parsing jsonpath '{{ source_path }}' for parameter '{{ param }}' from node '{{ source_node_id }}': {e}") + input_params["{{ param }}"] = None + {%- else %} + input_params["{{ param }}"] = None + {%- endif %} + {%- endfor %} + logger.info(f"Starting '{{ block_name }}' activity on task queue '{{ task_queue_name }}' with inputs: %s", input_params) + try: + # Convert timeouts and intervals from milliseconds to seconds + schedule_to_close_timeout_value_ms = {{ node['data']['nodeConfig']['activityConfig']['timeouts']['scheduleToCloseTimeout'] }} + start_to_close_timeout_value_ms = {{ node['data']['nodeConfig']['activityConfig']['timeouts']['startToCloseTimeout'] }} + schedule_to_close_timeout = None if schedule_to_close_timeout_value_ms == 0 else datetime.timedelta(seconds=schedule_to_close_timeout_value_ms / 1000.0) + start_to_close_timeout = None if start_to_close_timeout_value_ms == 0 else datetime.timedelta(seconds=start_to_close_timeout_value_ms / 1000.0) + initial_interval_value_ms = {{ node['data']['nodeConfig']['activityConfig']['retryPolicy']['initialInterval'] }} + maximum_interval_value_ms = {{ node['data']['nodeConfig']['activityConfig']['retryPolicy']['maximumInterval'] }} + initial_interval = datetime.timedelta(seconds=initial_interval_value_ms / 1000.0) + maximum_interval = datetime.timedelta(seconds=maximum_interval_value_ms / 1000.0) + maximum_attempts_value = {{ node['data']['nodeConfig']['activityConfig']['retryPolicy']['maximumAttempts'] }} + maximum_attempts = None if maximum_attempts_value == 0 else maximum_attempts_value + + result = await temporalio.workflow.execute_activity( + "block_main_activity", + input_params, + schedule_to_close_timeout=schedule_to_close_timeout, + start_to_close_timeout=start_to_close_timeout, + task_queue="{{ task_queue_name }}", + retry_policy=temporalio.common.RetryPolicy( + maximum_attempts=maximum_attempts, + initial_interval=initial_interval, + backoff_coefficient={{ node['data']['nodeConfig']['activityConfig']['retryPolicy']['backoffCoefficient'] }}, + maximum_interval=maximum_interval + ) + ) + logger.info(f"Completed '{{ block_name }}' activity with result: %s", result) + block_status = "completed" + block_error = None + results[node_id] = result + except Exception as e: + logger.error(f"Activity '{{ block_name }}' failed with error: {e}") + result = None + block_status = "failed" + block_error = { + "code": type(e).__name__, + "description": str(e), + "details": {"cause": str(getattr(e, "cause", "No additional details"))} + } + workflow_output["status"] = "failed" + # Collect block output + workflow_output["blocks"].append({ + "activity_id": node_id, + "name": block_name, + "status": block_status, + "input": input_params, + "result": result, + "error": block_error + }) + + task_functions["{{ node_id }}"] = task_{{ node_id }} + {%- endfor %} + + # Execute tasks according to execution steps + {%- for step in execution_steps %} + # Execution step {{ loop.index }} + tasks = [task_functions[node_id]() for node_id in {{ step }}] + results_step = await asyncio.gather(*tasks, return_exceptions=True) + for result in results_step: + if isinstance(result, Exception): + logger.error(f"Task failed with exception: {result}") + workflow_output["status"] = "failed" + {%- endfor %} + + # Update workflow status to completed if not failed + if workflow_output["status"] != "failed": + workflow_output["status"] = "completed" + else: + raise ApplicationError("Activity error occurred", type="ActivityError", non_retryable=True) + + return workflow_output + + except Exception as e: + logger.error(f"Workflow failed with error: {e}") + workflow_output["status"] = "failed" + raise temporalio.exceptions.ApplicationError("Workflow failed",workflow_output,str(e),type="WorkflowError",non_retryable=True) from e diff --git a/test_generator.py b/test_generator.py new file mode 100644 index 0000000..29a0485 --- /dev/null +++ b/test_generator.py @@ -0,0 +1,62 @@ +import os +import filecmp +import unittest + + +class TestWorkflowGeneration(unittest.TestCase): + """ + Unit tests for verifying workflow generation. + """ + + def setUp(self): + # Ensure required directories exist + self.output_dir = "generated_workflows" + if not os.path.exists(self.output_dir): + os.makedirs(self.output_dir) + + # Set required environment variables + os.environ["REPO_NAME"] = "test_repo" + os.environ["BRANCH_NAME"] = "test_branch" + os.environ["VERSION"] = "1234567890" + os.environ["NAMESPACE"] = "test_namespace" + + def run_test_case(self, flow_file, expected_output): + """ + Helper function to run a test case. + + Args: + flow_file (str): The input flow definition JSON file. + expected_output (str): The path to the expected output file. + """ + output_file = os.path.join(self.output_dir, "workflow.py") + + # Run the generator with the specified flow file and output file + os.system(f"python generator.py --input-file {flow_file} --output-file {output_file}") + + # Compare the generated file with the expected output + self.assertTrue( + os.path.exists(output_file), f"Generated file not found: {output_file}" + ) + self.assertTrue( + filecmp.cmp(output_file, expected_output, shallow=False), + f"Generated file {output_file} does not match expected output {expected_output}", + ) + + def test_sequential_flow(self): + self.run_test_case( + "sample_flows/flow_sequential.json", "expected_workflows/flow_sequential_expected.py" + ) + + def test_parallel_flow(self): + self.run_test_case( + "sample_flows/flow_parallel.json", "expected_workflows/flow_parallel_expected.py" + ) + + def test_hybrid_flow(self): + self.run_test_case( + "sample_flows/flow_hybrid.json", "expected_workflows/flow_hybrid_expected.py" + ) + + +if __name__ == "__main__": + unittest.main()