import importlib import json import asyncio import logging import os import re import sys from temporalio import activity from temporalio.exceptions import ApplicationError from jsonschema import validate, ValidationError from temporalio.client import Client from temporalio.worker import Worker # Configure logging logging.basicConfig( level=logging.INFO, format="%(asctime)s [%(levelname)s] %(name)s - %(message)s", ) logger = logging.getLogger(__name__) # Automatically determine if in a test environment IS_TEST_ENVIRONMENT = "unittest" in sys.modules # Retrieve environment variables REPO_NAME = os.getenv('REPO_NAME') BRANCH_NAME = os.getenv('BRANCH_NAME') COMMIT_ID = os.getenv('VERSION') NAMESPACE = os.getenv('NAMESPACE') FLOWX_ENGINE_ADDRESS = os.getenv('FLOWX_ENGINE_ADDRESS') if not BRANCH_NAME or not COMMIT_ID or not BRANCH_NAME or not NAMESPACE or not FLOWX_ENGINE_ADDRESS: raise ValueError("Environment variables BRANCH_NAME, VERSION, BRANCH_NAME, NAMESPACE and FLOWX_ENGINE_ADDRESS must be set.") # Shorten the commit ID to the first 10 characters COMMIT_ID_SHORT = COMMIT_ID[:10] # Sanitize flow name and commit ID to create a valid task queue name def sanitize_name(name): # Replace non-alphanumeric characters or invalid start with underscores sanitized = re.sub(r'\W|^(?=\d)', '_', name) # Replace multiple consecutive underscores with a single underscore sanitized = re.sub(r'_+', '_', sanitized) # Remove trailing underscores return sanitized.strip('_') BLOCK_NAME = REPO_NAME + "_" + BRANCH_NAME block_name_safe = sanitize_name(BLOCK_NAME) commit_id_safe = sanitize_name(COMMIT_ID_SHORT) # Construct the task queue name TASK_QUEUE = f"{block_name_safe}_{commit_id_safe}" # Load schemas for input validation and output validation def load_schema(schema_path): try: with open(schema_path, 'r') as schema_file: return json.load(schema_file) except Exception as e: logger.error("Failed to load schema from %s: %s", schema_path, e) if not IS_TEST_ENVIRONMENT: raise ApplicationError(f"Schema loading failed: {e}") else: raise ValueError(f"Schema loading failed: {e}") # Load block.py dynamically and get the main function def load_block_main(): try: block_module = importlib.import_module("block") if not hasattr(block_module, "__main__"): raise AttributeError("The module block.py does not have a __main__ function") logger.info("Successfully loaded __main__ function from block.py") return block_module.__main__ except ImportError as e: logger.error("Failed to import block.py: %s", e) if not IS_TEST_ENVIRONMENT: raise ApplicationError(f"block.py import failed: {e}") else: raise ValueError(f"block.py import failed: {e}") except AttributeError as e: logger.error("block.py does not contain a __main__ function: %s", e) if not IS_TEST_ENVIRONMENT: raise ApplicationError(f"__main__ function not found in block.py: {e}") else: raise ValueError(f"__main__ function not found in block.py: {e}") # Validate input data against request schema def validate_input(input_data): request_schema = load_schema("/app/request_schema.json") try: validate(instance=input_data, schema=request_schema) logger.info("Input data validated successfully") except ValidationError as e: logger.error("Input validation failed: %s", e) if not IS_TEST_ENVIRONMENT: raise ApplicationError(f"Input validation error: {e}") else: raise ValueError(f"Input validation error: {e}") # Validate output data against response schema def validate_output(output_data): response_schema = load_schema("/app/response_schema.json") try: validate(instance=output_data, schema=response_schema) logger.info("Output data validated successfully") except ValidationError as e: logger.error("Output validation failed: %s", e) if not IS_TEST_ENVIRONMENT: raise ApplicationError(f"Output validation error: {e}") else: raise ValueError(f"Output validation error: {e}") # Registering activity @activity.defn async def block_main_activity(input_data): """ Wraps the main function from block.py as an activity. Validates input data against request schema and output data against response schema. """ # Validate the input validate_input(input_data) # Load the main function and call it with validated input main_func = load_block_main() try: # Pass input data as keyword arguments to the main function result = main_func(**input_data) logger.info("block.py executed successfully with result: %s", result) # Validate output against response schema validate_output(result) return result except Exception as e: logger.error("Error executing block.py: %s", e) if not IS_TEST_ENVIRONMENT: raise ApplicationError(f"Error during block execution: {e}") from e else: raise RuntimeError("Error during block.py execution") from e # Worker function async def main(): """ Initialize and run the worker with the activity. """ try: client = await Client.connect(FLOWX_ENGINE_ADDRESS, namespace=NAMESPACE) worker = Worker( client, task_queue=TASK_QUEUE, activities=[block_main_activity], ) logger.info("Worker starting, listening to task queue: %s", TASK_QUEUE) await worker.run() except Exception as e: logger.critical("Worker failed to start: %s", e) raise # Main function to initialize worker if __name__ == "__main__": # Run the worker asyncio.run(main())