From fba827a9480f2c95a9897dbcbeb427bb9ac5bcef Mon Sep 17 00:00:00 2001 From: gitea_admin_user Date: Wed, 9 Apr 2025 16:45:34 +0000 Subject: [PATCH] Add initial files --- .gitea/workflows/cd_workflows.yml | 1 + .gitea/workflows/ci_workflows.yml | 62 ++++++++++++ Dockerfile | 17 ++++ README.md | 37 ++++++- block_wrapper.py | 161 ++++++++++++++++++++++++++++++ requirements.txt | 2 + test_block_wrapper.py | 113 +++++++++++++++++++++ 7 files changed, 392 insertions(+), 1 deletion(-) create mode 100644 .gitea/workflows/cd_workflows.yml create mode 100644 .gitea/workflows/ci_workflows.yml create mode 100644 Dockerfile create mode 100644 block_wrapper.py create mode 100644 requirements.txt create mode 100644 test_block_wrapper.py diff --git a/.gitea/workflows/cd_workflows.yml b/.gitea/workflows/cd_workflows.yml new file mode 100644 index 0000000..0967ef4 --- /dev/null +++ b/.gitea/workflows/cd_workflows.yml @@ -0,0 +1 @@ +{} diff --git a/.gitea/workflows/ci_workflows.yml b/.gitea/workflows/ci_workflows.yml new file mode 100644 index 0000000..99d8693 --- /dev/null +++ b/.gitea/workflows/ci_workflows.yml @@ -0,0 +1,62 @@ +name: CI Workflow + +on: + push: + branches: + - '*' + +jobs: + test: + ame: Testing the Block + runs-on: ubuntu-latest + steps: + - name: Checkout Code + uses: actions/checkout@v4 + + - name: Set Up Python Environment + uses: actions/setup-python@v4 + with: + python-version: '3.10' + + - name: Install Python Dependencies + run: | + python -m pip install --upgrade pip + pip install -r requirements.txt + + - name: Execute Unit Tests + run: python -m unittest discover -s . -p 'test_*.py' + + build_and_push: + name: Containerize the Block + runs-on: ubuntu-latest + needs: test + steps: + - name: Extract Repository Name + id: extract_repo + run: echo "repo_name=${GITHUB_REPOSITORY##*/}" >> $GITHUB_OUTPUT + + - name: Checkout Codebase + uses: actions/checkout@v4 + with: + fetch-depth: 1 + + - name: Configure Docker Buildx + uses: docker/setup-buildx-action@v3 + with: + driver: docker + + - name: Log In to Container Registry + uses: docker/login-action@v2 + with: + registry: centurion-version-control.default.svc.cluster.local:3000 + username: ${{ secrets.CI_USER }} + password: ${{ secrets.CI_USER_TOKEN }} + + - name: Build and Push Docker Image to Registry + uses: docker/build-push-action@v4 + with: + context: . + push: true + tags: | + centurion-version-control.default.svc.cluster.local:3000/centurion/${{ steps.extract_repo.outputs.repo_name }}/${{ github.ref_name }}:${{ github.sha }} + centurion-version-control.default.svc.cluster.local:3000/centurion/${{ steps.extract_repo.outputs.repo_name }}/${{ github.ref_name }}:latest diff --git a/Dockerfile b/Dockerfile new file mode 100644 index 0000000..ef40d55 --- /dev/null +++ b/Dockerfile @@ -0,0 +1,17 @@ +# Use Python slim image as base +FROM python:3.10-slim AS base + +# Set up a directory for the application code +WORKDIR /app + +# Copy only the requirements file initially for better caching +COPY requirements.txt . + +# Install Workflow SDK and other dependencies +RUN pip install --no-cache-dir -r requirements.txt + +# Copy the rest of the application code +COPY . . + +# Set entrypoint for the worker +ENTRYPOINT ["python", "/app/block_wrapper.py"] diff --git a/README.md b/README.md index 59a3efc..fb96806 100644 --- a/README.md +++ b/README.md @@ -1 +1,36 @@ -**Hello world!!!** +# Activity Block Wrapper +## Overview +This project includes a setup to wrap a user-defined `block.py` file into an activity block that runs within a managed worker environment. The setup uses `entrypoint.py` as a wrapper to dynamically load and execute the logic defined in `block.py` and make it available as an activity block. +This README explains the purpose and functionality of `entrypoint.py`, the `activity_block_wrapper` base image, and how it all comes together within the Dockerfile to create a production-ready, deployable worker. + +## Files +### 1. `entrypoint.py` +`entrypoint.py` is the main entry script for the activity block wrapper. It provides the following functionality: +- **Dynamic Loading**: Imports `block.py` and retrieves its `__main__` function dynamically. This function is wrapped and managed within the worker environment. +- **Logging and Error Handling**: Includes logging and error handling to ensure issues are properly recorded and managed, which is essential for production environments. +- **Worker Management**: Starts a worker instance that manages the execution of the activity block, making the `block.py` logic available as an activity in the specified task queue and namespace. + +#### Configuration +- `TASK_QUEUE`: The task queue name for the worker. Set via environment variable. +- `NAMESPACE`: The namespace for the worker to operate within. Set via environment variable. + +#### Example Usage +To configure and run `entrypoint.py` in your environment: +1. Set the `TASK_QUEUE` and `NAMESPACE` environment variables. +2. Run `entrypoint.py`, and it will automatically wrap `block.py` as an activity block within the worker. + +### 2. Dockerfile +The Dockerfile is structured to create an image that: +- Uses `activity_block_wrapper` as a base, which includes `entrypoint.py` and pre-installed dependencies. +- Copies the user’s `block.py` and other project-specific files into the container. +- Creates a symbolic link to `entrypoint.py` as `block.py`, making it appear as if `block.py` is the entry point, while actually running `entrypoint.py` to manage the activity block. + +#### Key Dockerfile Commands +- **FROM activity_block_wrapper:latest**: Builds on top of the `activity_block_wrapper` image, which includes the necessary runtime environment and `entrypoint.py`. +- **COPY**: Copies the user’s `block.py` and other related files into the container. +- **RUN ln -s /app/entrypoint.py /app/block.py**: Creates a symbolic link from `entrypoint.py` to `block.py`, allowing `entrypoint.py` to manage the activity block while maintaining `block.py` as the visible entry point. + +### Example Usage with Docker +1. **Build the Base Image**: + ```bash + docker build -f Dockerfile.base -t activity_block_wrapper:latest . diff --git a/block_wrapper.py b/block_wrapper.py new file mode 100644 index 0000000..170469a --- /dev/null +++ b/block_wrapper.py @@ -0,0 +1,161 @@ +import importlib +import json +import asyncio +import logging +import os +import re +import sys +from temporalio import activity +from temporalio.exceptions import ApplicationError +from jsonschema import validate, ValidationError +from temporalio.client import Client +from temporalio.worker import Worker + +# Configure logging +logging.basicConfig( + level=logging.INFO, + format="%(asctime)s [%(levelname)s] %(name)s - %(message)s", +) +logger = logging.getLogger(__name__) + +# Automatically determine if in a test environment +IS_TEST_ENVIRONMENT = "unittest" in sys.modules + +# Retrieve environment variables +REPO_NAME = os.getenv('REPO_NAME') +BRANCH_NAME = os.getenv('BRANCH_NAME') +COMMIT_ID = os.getenv('VERSION') +NAMESPACE = os.getenv('NAMESPACE') +FLOWX_ENGINE_ADDRESS = os.getenv('FLOWX_ENGINE_ADDRESS') + +if not BRANCH_NAME or not COMMIT_ID or not BRANCH_NAME or not NAMESPACE or not FLOWX_ENGINE_ADDRESS: + raise ValueError("Environment variables BRANCH_NAME, VERSION, BRANCH_NAME, NAMESPACE and FLOWX_ENGINE_ADDRESS must be set.") + + +# Shorten the commit ID to the first 10 characters +COMMIT_ID_SHORT = COMMIT_ID[:10] + +# Sanitize flow name and commit ID to create a valid task queue name +def sanitize_name(name): + # Replace non-alphanumeric characters or invalid start with underscores + sanitized = re.sub(r'\W|^(?=\d)', '_', name) + # Replace multiple consecutive underscores with a single underscore + sanitized = re.sub(r'_+', '_', sanitized) + # Remove trailing underscores + return sanitized.strip('_') + +BLOCK_NAME = REPO_NAME + "_" + BRANCH_NAME +block_name_safe = sanitize_name(BLOCK_NAME) +commit_id_safe = sanitize_name(COMMIT_ID_SHORT) + +# Construct the task queue name +TASK_QUEUE = f"{block_name_safe}_{commit_id_safe}" + +# Load schemas for input validation and output validation +def load_schema(schema_path): + try: + with open(schema_path, 'r') as schema_file: + return json.load(schema_file) + except Exception as e: + logger.error("Failed to load schema from %s: %s", schema_path, e) + if not IS_TEST_ENVIRONMENT: + raise ApplicationError(f"Schema loading failed: {e}") + else: + raise ValueError(f"Schema loading failed: {e}") + +# Load block.py dynamically and get the main function +def load_block_main(): + try: + block_module = importlib.import_module("block") + if not hasattr(block_module, "__main__"): + raise AttributeError("The module block.py does not have a __main__ function") + logger.info("Successfully loaded __main__ function from block.py") + return block_module.__main__ + except ImportError as e: + logger.error("Failed to import block.py: %s", e) + if not IS_TEST_ENVIRONMENT: + raise ApplicationError(f"block.py import failed: {e}") + else: + raise ValueError(f"block.py import failed: {e}") + except AttributeError as e: + logger.error("block.py does not contain a __main__ function: %s", e) + if not IS_TEST_ENVIRONMENT: + raise ApplicationError(f"__main__ function not found in block.py: {e}") + else: + raise ValueError(f"__main__ function not found in block.py: {e}") + +# Validate input data against request schema +def validate_input(input_data): + request_schema = load_schema("/app/request_schema.json") + try: + validate(instance=input_data, schema=request_schema) + logger.info("Input data validated successfully") + except ValidationError as e: + logger.error("Input validation failed: %s", e) + if not IS_TEST_ENVIRONMENT: + raise ApplicationError(f"Input validation error: {e}") + else: + raise ValueError(f"Input validation error: {e}") + +# Validate output data against response schema +def validate_output(output_data): + response_schema = load_schema("/app/response_schema.json") + try: + validate(instance=output_data, schema=response_schema) + logger.info("Output data validated successfully") + except ValidationError as e: + logger.error("Output validation failed: %s", e) + if not IS_TEST_ENVIRONMENT: + raise ApplicationError(f"Output validation error: {e}") + else: + raise ValueError(f"Output validation error: {e}") + +# Registering activity +@activity.defn +async def block_main_activity(input_data): + """ + Wraps the main function from block.py as an activity. + Validates input data against request schema and output data against response schema. + """ + # Validate the input + validate_input(input_data) + + # Load the main function and call it with validated input + main_func = load_block_main() + try: + # Pass input data as keyword arguments to the main function + result = main_func(**input_data) + logger.info("block.py executed successfully with result: %s", result) + + # Validate output against response schema + validate_output(result) + return result + except Exception as e: + logger.error("Error executing block.py: %s", e) + if not IS_TEST_ENVIRONMENT: + raise ApplicationError(f"Error during block execution: {e}") from e + else: + raise RuntimeError("Error during block.py execution") from e + +# Worker function +async def main(): + """ + Initialize and run the worker with the activity. + """ + try: + client = await Client.connect(FLOWX_ENGINE_ADDRESS, namespace=NAMESPACE) + worker = Worker( + client, + task_queue=TASK_QUEUE, + activities=[block_main_activity], + ) + logger.info("Worker starting, listening to task queue: %s", TASK_QUEUE) + await worker.run() + except Exception as e: + logger.critical("Worker failed to start: %s", e) + raise + +# Main function to initialize worker +if __name__ == "__main__": + # Run the worker + asyncio.run(main()) diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..0873d1e --- /dev/null +++ b/requirements.txt @@ -0,0 +1,2 @@ +temporalio==1.6.0 +jsonschema==4.23.0 diff --git a/test_block_wrapper.py b/test_block_wrapper.py new file mode 100644 index 0000000..0932ff3 --- /dev/null +++ b/test_block_wrapper.py @@ -0,0 +1,113 @@ +import unittest +from unittest.mock import patch, MagicMock, mock_open +import json +import asyncio +from jsonschema import ValidationError +with patch.dict('os.environ', { + "REPO_NAME": "test_repo", + "BRANCH_NAME": "test_branch", + "VERSION": "test_version", + "NAMESPACE": "test_namespace", + "FLOWX_ENGINE_ADDRESS": "test_address" +}): + from block_wrapper import block_main_activity, validate_input, validate_output + + +class TestBlockWrapper(unittest.TestCase): + + def setUp(self): + # Mock schemas to use for testing + self.mock_request_schema = { + "type": "object", + "properties": { + "a": {"type": "integer"}, + "b": {"type": "integer"} + }, + "required": ["a", "b"] + } + self.mock_response_schema = { + "type": "object", + "properties": { + "sum": {"type": "integer"} + }, + "required": ["sum"] + } + + # Mock the contents of request_schema.json and response_schema.json using different patchers + self.mock_open_request = mock_open(read_data=json.dumps(self.mock_request_schema)) + self.mock_open_response = mock_open(read_data=json.dumps(self.mock_response_schema)) + + # Mock load_block_main to return a mock main function + self.load_block_main_patcher = patch("block_wrapper.load_block_main", return_value=MagicMock(return_value={"sum": 3})) + self.mock_load_block_main = self.load_block_main_patcher.start() + + def tearDown(self): + # Stop all patches + self.load_block_main_patcher.stop() + + @patch("block_wrapper.load_schema") + def test_validate_input_success(self, mock_load_schema): + # Set up load_schema to return request schema for validate_input + mock_load_schema.return_value = self.mock_request_schema + input_data = {"a": 1, "b": 2} + validate_input(input_data) # Should pass without errors + + @patch("block_wrapper.load_schema") + def test_validate_input_failure(self, mock_load_schema): + # Set up load_schema to return request schema for validate_input + mock_load_schema.return_value = self.mock_request_schema + input_data = {"a": 1} # Missing 'b' + with self.assertRaises(ValueError): + validate_input(input_data) + + @patch("block_wrapper.load_schema") + def test_validate_output_success(self, mock_load_schema): + # Set up load_schema to return response schema for validate_output + mock_load_schema.return_value = self.mock_response_schema + output_data = {"sum": 3} + validate_output(output_data) # Should pass without errors + + @patch("block_wrapper.load_schema") + def test_validate_output_failure(self, mock_load_schema): + # Set up load_schema to return response schema for validate_output + mock_load_schema.return_value = self.mock_response_schema + output_data = {} # Missing 'sum' + with self.assertRaises(ValueError): + validate_output(output_data) + + @patch("block_wrapper.load_schema") + async def test_block_main_activity_success(self, mock_load_schema): + # Set up load_schema to return request and response schemas in order + mock_load_schema.side_effect = [self.mock_request_schema, self.mock_response_schema] + input_data = {"a": 1, "b": 2} + result = await block_main_activity(input_data) + self.assertEqual(result, {"sum": 3}) + + @patch("block_wrapper.load_schema") + async def test_block_main_activity_failure(self, mock_load_schema): + # Set up load_schema to return request and response schemas in order + mock_load_schema.side_effect = [self.mock_request_schema, self.mock_response_schema] + # Cause an exception in main function + self.mock_load_block_main.side_effect = Exception("Unexpected error") + input_data = {"a": 1, "b": 2} + with self.assertRaises(RuntimeError): + await block_main_activity(input_data) + + @patch("block_wrapper.load_schema") + async def test_block_main_activity_input_validation_failure(self, mock_load_schema): + # Mock validate_input to raise ValidationError + with patch("block_wrapper.validate_input", side_effect=ValidationError("Invalid input")): + input_data = {"a": 1} # Missing 'b' + with self.assertRaises(ValueError): + await block_main_activity(input_data) + + @patch("block_wrapper.load_schema") + async def test_block_main_activity_output_validation_failure(self, mock_load_schema): + # Mock validate_output to raise ValidationError + with patch("block_wrapper.validate_output", side_effect=ValidationError("Invalid output")): + input_data = {"a": 1, "b": 2} + with self.assertRaises(ValueError): + await block_main_activity(input_data) + +if __name__ == "__main__": + unittest.main()