system/templates/workflow_template.py.j2
gitea_admin_user a72efd6c0b
Some checks are pending
CI Workflow / Testing the Flow (push) Waiting to run
CI Workflow / Containerize the Flow (push) Blocked by required conditions
Add initial files
2025-04-09 16:45:29 +00:00

154 lines
8.2 KiB
Django/Jinja

import temporalio.workflow
from typing import Any, Dict, List, Callable, Awaitable
import logging
import asyncio
import json
import datetime
import re
import jmespath
from temporalio.exceptions import ApplicationError
# Configure logging
logging.basicConfig(level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(message)s")
logger = logging.getLogger(__name__)
@temporalio.workflow.defn
class {{ workflow_class_name }}:
@temporalio.workflow.run
async def run(self, root_inputs: Dict[str, Any]) -> Dict[str, Any]:
workflow_info = temporalio.workflow.info()
workflow_output: Dict[str, Any] = {
"workflow_id": workflow_info.workflow_id,
"run_id": workflow_info.run_id,
"name": "{{ workflow_class_name }}",
"status": "in_progress",
"blocks": [],
"root_input": root_inputs
}
try:
# Initialize results
results: Dict[str, Any] = {}
# Define task functions
task_functions: Dict[str, Callable[[], Awaitable[Any]]] = {}
{%- for node_id, node in nodes.items() %}
{%- set block_name = node['data']['nodeConfig']['blockName'] %}
{%- set commit_id = node['data']['nodeConfig'].get('commitId', '') %}
{%- set commit_id_short = commit_id[:10] %}
{%- set repo_url = node['data']['nodeConfig']['repo_url'] %}
{%- set repo_name_1 = repo_url.split('/')[-1].split('.')[0] %}
{%- set repo_name = repo_name_1|regex_replace('[^A-Za-z0-9_]', '_') %}
{%- set block_name_safe = block_name|regex_replace('[^A-Za-z0-9_]', '_') %}
{%- set commit_id_short_safe = commit_id_short|regex_replace('[^A-Za-z0-9_]', '_') %}
{%- set task_queue_name = repo_name + '_' + block_name_safe + '_' + commit_id_short_safe %}
async def task_{{ node_id }}():
node_id = "{{ node_id }}"
block_name = "{{ block_name }}"
# Prepare inputs
input_params: Dict[str, Any] = {}
{%- for param, details in node['data']['nodeConfig']['schema']['properties'].items() %}
{%- set source = details.get("source", "") %}
{%- if source and source.startswith("$root.") %}
try:
jsonpath_expr = jmespath.compile("{{ source[6:] }}")
value = jsonpath_expr.search(root_inputs)
input_params["{{ param }}"] = value
except Exception as e:
logger.error(f"Error parsing jsonpath '{{ source[6:] }}' for parameter '{{ param }}': {e}")
input_params["{{ param }}"] = None
{%- elif source and source.startswith("$") %}
{%- set source_parts = source[1:].split('.', 1) %}
{%- set source_node_id = source_parts[0] %}
{%- set source_path = source_parts[1] if source_parts|length > 1 else '' %}
try:
source_data = results.get("{{ source_node_id }}", {})
jsonpath_expr = jmespath.compile("{{ source_path }}")
value = jsonpath_expr.search(source_data)
input_params["{{ param }}"] = value
except Exception as e:
logger.error(f"Error parsing jsonpath '{{ source_path }}' for parameter '{{ param }}' from node '{{ source_node_id }}': {e}")
input_params["{{ param }}"] = None
{%- else %}
input_params["{{ param }}"] = None
{%- endif %}
{%- endfor %}
logger.info(f"Starting '{{ block_name }}' activity on task queue '{{ task_queue_name }}' with inputs: %s", input_params)
try:
# Convert timeouts and intervals from milliseconds to seconds
schedule_to_close_timeout_value_ms = {{ node['data']['nodeConfig']['activityConfig']['timeouts']['scheduleToCloseTimeout'] }}
start_to_close_timeout_value_ms = {{ node['data']['nodeConfig']['activityConfig']['timeouts']['startToCloseTimeout'] }}
schedule_to_close_timeout = None if schedule_to_close_timeout_value_ms == 0 else datetime.timedelta(seconds=schedule_to_close_timeout_value_ms / 1000.0)
start_to_close_timeout = None if start_to_close_timeout_value_ms == 0 else datetime.timedelta(seconds=start_to_close_timeout_value_ms / 1000.0)
initial_interval_value_ms = {{ node['data']['nodeConfig']['activityConfig']['retryPolicy']['initialInterval'] }}
maximum_interval_value_ms = {{ node['data']['nodeConfig']['activityConfig']['retryPolicy']['maximumInterval'] }}
initial_interval = datetime.timedelta(seconds=initial_interval_value_ms / 1000.0)
maximum_interval = datetime.timedelta(seconds=maximum_interval_value_ms / 1000.0)
maximum_attempts_value = {{ node['data']['nodeConfig']['activityConfig']['retryPolicy']['maximumAttempts'] }}
maximum_attempts = None if maximum_attempts_value == 0 else maximum_attempts_value
result = await temporalio.workflow.execute_activity(
"block_main_activity",
input_params,
schedule_to_close_timeout=schedule_to_close_timeout,
start_to_close_timeout=start_to_close_timeout,
task_queue="{{ task_queue_name }}",
retry_policy=temporalio.common.RetryPolicy(
maximum_attempts=maximum_attempts,
initial_interval=initial_interval,
backoff_coefficient={{ node['data']['nodeConfig']['activityConfig']['retryPolicy']['backoffCoefficient'] }},
maximum_interval=maximum_interval
)
)
logger.info(f"Completed '{{ block_name }}' activity with result: %s", result)
block_status = "completed"
block_error = None
results[node_id] = result
except Exception as e:
logger.error(f"Activity '{{ block_name }}' failed with error: {e}")
result = None
block_status = "failed"
block_error = {
"code": type(e).__name__,
"description": str(e),
"details": {"cause": str(getattr(e, "cause", "No additional details"))}
}
workflow_output["status"] = "failed"
# Collect block output
workflow_output["blocks"].append({
"activity_id": node_id,
"name": block_name,
"status": block_status,
"input": input_params,
"result": result,
"error": block_error
})
task_functions["{{ node_id }}"] = task_{{ node_id }}
{%- endfor %}
# Execute tasks according to execution steps
{%- for step in execution_steps %}
# Execution step {{ loop.index }}
tasks = [task_functions[node_id]() for node_id in {{ step }}]
results_step = await asyncio.gather(*tasks, return_exceptions=True)
for result in results_step:
if isinstance(result, Exception):
logger.error(f"Task failed with exception: {result}")
workflow_output["status"] = "failed"
{%- endfor %}
# Update workflow status to completed if not failed
if workflow_output["status"] != "failed":
workflow_output["status"] = "completed"
else:
raise ApplicationError("Activity error occurred", type="ActivityError", non_retryable=True)
return workflow_output
except Exception as e:
logger.error(f"Workflow failed with error: {e}")
workflow_output["status"] = "failed"
raise temporalio.exceptions.ApplicationError("Workflow failed",workflow_output,str(e),type="WorkflowError",non_retryable=True) from e