Compare commits

..

1 Commits

Author SHA1 Message Date
fba827a948 Add initial files
All checks were successful
CI Workflow / test (push) Successful in 1m52s
CI Workflow / Containerize the Block (push) Successful in 1m38s
2025-04-09 16:45:34 +00:00
7 changed files with 392 additions and 1 deletions

View File

@ -0,0 +1 @@
{}

View File

@ -0,0 +1,62 @@
name: CI Workflow
on:
push:
branches:
- '*'
jobs:
test:
ame: Testing the Block
runs-on: ubuntu-latest
steps:
- name: Checkout Code
uses: actions/checkout@v4
- name: Set Up Python Environment
uses: actions/setup-python@v4
with:
python-version: '3.10'
- name: Install Python Dependencies
run: |
python -m pip install --upgrade pip
pip install -r requirements.txt
- name: Execute Unit Tests
run: python -m unittest discover -s . -p 'test_*.py'
build_and_push:
name: Containerize the Block
runs-on: ubuntu-latest
needs: test
steps:
- name: Extract Repository Name
id: extract_repo
run: echo "repo_name=${GITHUB_REPOSITORY##*/}" >> $GITHUB_OUTPUT
- name: Checkout Codebase
uses: actions/checkout@v4
with:
fetch-depth: 1
- name: Configure Docker Buildx
uses: docker/setup-buildx-action@v3
with:
driver: docker
- name: Log In to Container Registry
uses: docker/login-action@v2
with:
registry: centurion-version-control.default.svc.cluster.local:3000
username: ${{ secrets.CI_USER }}
password: ${{ secrets.CI_USER_TOKEN }}
- name: Build and Push Docker Image to Registry
uses: docker/build-push-action@v4
with:
context: .
push: true
tags: |
centurion-version-control.default.svc.cluster.local:3000/centurion/${{ steps.extract_repo.outputs.repo_name }}/${{ github.ref_name }}:${{ github.sha }}
centurion-version-control.default.svc.cluster.local:3000/centurion/${{ steps.extract_repo.outputs.repo_name }}/${{ github.ref_name }}:latest

17
Dockerfile Normal file
View File

@ -0,0 +1,17 @@
# Use Python slim image as base
FROM python:3.10-slim AS base
# Set up a directory for the application code
WORKDIR /app
# Copy only the requirements file initially for better caching
COPY requirements.txt .
# Install Workflow SDK and other dependencies
RUN pip install --no-cache-dir -r requirements.txt
# Copy the rest of the application code
COPY . .
# Set entrypoint for the worker
ENTRYPOINT ["python", "/app/block_wrapper.py"]

View File

@ -1 +1,36 @@
**Hello world!!!**
# Activity Block Wrapper
## Overview
This project includes a setup to wrap a user-defined `block.py` file into an activity block that runs within a managed worker environment. The setup uses `entrypoint.py` as a wrapper to dynamically load and execute the logic defined in `block.py` and make it available as an activity block.
This README explains the purpose and functionality of `entrypoint.py`, the `activity_block_wrapper` base image, and how it all comes together within the Dockerfile to create a production-ready, deployable worker.
## Files
### 1. `entrypoint.py`
`entrypoint.py` is the main entry script for the activity block wrapper. It provides the following functionality:
- **Dynamic Loading**: Imports `block.py` and retrieves its `__main__` function dynamically. This function is wrapped and managed within the worker environment.
- **Logging and Error Handling**: Includes logging and error handling to ensure issues are properly recorded and managed, which is essential for production environments.
- **Worker Management**: Starts a worker instance that manages the execution of the activity block, making the `block.py` logic available as an activity in the specified task queue and namespace.
#### Configuration
- `TASK_QUEUE`: The task queue name for the worker. Set via environment variable.
- `NAMESPACE`: The namespace for the worker to operate within. Set via environment variable.
#### Example Usage
To configure and run `entrypoint.py` in your environment:
1. Set the `TASK_QUEUE` and `NAMESPACE` environment variables.
2. Run `entrypoint.py`, and it will automatically wrap `block.py` as an activity block within the worker.
### 2. Dockerfile
The Dockerfile is structured to create an image that:
- Uses `activity_block_wrapper` as a base, which includes `entrypoint.py` and pre-installed dependencies.
- Copies the users `block.py` and other project-specific files into the container.
- Creates a symbolic link to `entrypoint.py` as `block.py`, making it appear as if `block.py` is the entry point, while actually running `entrypoint.py` to manage the activity block.
#### Key Dockerfile Commands
- **FROM activity_block_wrapper:latest**: Builds on top of the `activity_block_wrapper` image, which includes the necessary runtime environment and `entrypoint.py`.
- **COPY**: Copies the users `block.py` and other related files into the container.
- **RUN ln -s /app/entrypoint.py /app/block.py**: Creates a symbolic link from `entrypoint.py` to `block.py`, allowing `entrypoint.py` to manage the activity block while maintaining `block.py` as the visible entry point.
### Example Usage with Docker
1. **Build the Base Image**:
```bash
docker build -f Dockerfile.base -t activity_block_wrapper:latest .

161
block_wrapper.py Normal file
View File

@ -0,0 +1,161 @@
import importlib
import json
import asyncio
import logging
import os
import re
import sys
from temporalio import activity
from temporalio.exceptions import ApplicationError
from jsonschema import validate, ValidationError
from temporalio.client import Client
from temporalio.worker import Worker
# Configure logging
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(name)s - %(message)s",
)
logger = logging.getLogger(__name__)
# Automatically determine if in a test environment
IS_TEST_ENVIRONMENT = "unittest" in sys.modules
# Retrieve environment variables
REPO_NAME = os.getenv('REPO_NAME')
BRANCH_NAME = os.getenv('BRANCH_NAME')
COMMIT_ID = os.getenv('VERSION')
NAMESPACE = os.getenv('NAMESPACE')
FLOWX_ENGINE_ADDRESS = os.getenv('FLOWX_ENGINE_ADDRESS')
if not BRANCH_NAME or not COMMIT_ID or not BRANCH_NAME or not NAMESPACE or not FLOWX_ENGINE_ADDRESS:
raise ValueError("Environment variables BRANCH_NAME, VERSION, BRANCH_NAME, NAMESPACE and FLOWX_ENGINE_ADDRESS must be set.")
# Shorten the commit ID to the first 10 characters
COMMIT_ID_SHORT = COMMIT_ID[:10]
# Sanitize flow name and commit ID to create a valid task queue name
def sanitize_name(name):
# Replace non-alphanumeric characters or invalid start with underscores
sanitized = re.sub(r'\W|^(?=\d)', '_', name)
# Replace multiple consecutive underscores with a single underscore
sanitized = re.sub(r'_+', '_', sanitized)
# Remove trailing underscores
return sanitized.strip('_')
BLOCK_NAME = REPO_NAME + "_" + BRANCH_NAME
block_name_safe = sanitize_name(BLOCK_NAME)
commit_id_safe = sanitize_name(COMMIT_ID_SHORT)
# Construct the task queue name
TASK_QUEUE = f"{block_name_safe}_{commit_id_safe}"
# Load schemas for input validation and output validation
def load_schema(schema_path):
try:
with open(schema_path, 'r') as schema_file:
return json.load(schema_file)
except Exception as e:
logger.error("Failed to load schema from %s: %s", schema_path, e)
if not IS_TEST_ENVIRONMENT:
raise ApplicationError(f"Schema loading failed: {e}")
else:
raise ValueError(f"Schema loading failed: {e}")
# Load block.py dynamically and get the main function
def load_block_main():
try:
block_module = importlib.import_module("block")
if not hasattr(block_module, "__main__"):
raise AttributeError("The module block.py does not have a __main__ function")
logger.info("Successfully loaded __main__ function from block.py")
return block_module.__main__
except ImportError as e:
logger.error("Failed to import block.py: %s", e)
if not IS_TEST_ENVIRONMENT:
raise ApplicationError(f"block.py import failed: {e}")
else:
raise ValueError(f"block.py import failed: {e}")
except AttributeError as e:
logger.error("block.py does not contain a __main__ function: %s", e)
if not IS_TEST_ENVIRONMENT:
raise ApplicationError(f"__main__ function not found in block.py: {e}")
else:
raise ValueError(f"__main__ function not found in block.py: {e}")
# Validate input data against request schema
def validate_input(input_data):
request_schema = load_schema("/app/request_schema.json")
try:
validate(instance=input_data, schema=request_schema)
logger.info("Input data validated successfully")
except ValidationError as e:
logger.error("Input validation failed: %s", e)
if not IS_TEST_ENVIRONMENT:
raise ApplicationError(f"Input validation error: {e}")
else:
raise ValueError(f"Input validation error: {e}")
# Validate output data against response schema
def validate_output(output_data):
response_schema = load_schema("/app/response_schema.json")
try:
validate(instance=output_data, schema=response_schema)
logger.info("Output data validated successfully")
except ValidationError as e:
logger.error("Output validation failed: %s", e)
if not IS_TEST_ENVIRONMENT:
raise ApplicationError(f"Output validation error: {e}")
else:
raise ValueError(f"Output validation error: {e}")
# Registering activity
@activity.defn
async def block_main_activity(input_data):
"""
Wraps the main function from block.py as an activity.
Validates input data against request schema and output data against response schema.
"""
# Validate the input
validate_input(input_data)
# Load the main function and call it with validated input
main_func = load_block_main()
try:
# Pass input data as keyword arguments to the main function
result = main_func(**input_data)
logger.info("block.py executed successfully with result: %s", result)
# Validate output against response schema
validate_output(result)
return result
except Exception as e:
logger.error("Error executing block.py: %s", e)
if not IS_TEST_ENVIRONMENT:
raise ApplicationError(f"Error during block execution: {e}") from e
else:
raise RuntimeError("Error during block.py execution") from e
# Worker function
async def main():
"""
Initialize and run the worker with the activity.
"""
try:
client = await Client.connect(FLOWX_ENGINE_ADDRESS, namespace=NAMESPACE)
worker = Worker(
client,
task_queue=TASK_QUEUE,
activities=[block_main_activity],
)
logger.info("Worker starting, listening to task queue: %s", TASK_QUEUE)
await worker.run()
except Exception as e:
logger.critical("Worker failed to start: %s", e)
raise
# Main function to initialize worker
if __name__ == "__main__":
# Run the worker
asyncio.run(main())

2
requirements.txt Normal file
View File

@ -0,0 +1,2 @@
temporalio==1.6.0
jsonschema==4.23.0

113
test_block_wrapper.py Normal file
View File

@ -0,0 +1,113 @@
import unittest
from unittest.mock import patch, MagicMock, mock_open
import json
import asyncio
from jsonschema import ValidationError
with patch.dict('os.environ', {
"REPO_NAME": "test_repo",
"BRANCH_NAME": "test_branch",
"VERSION": "test_version",
"NAMESPACE": "test_namespace",
"FLOWX_ENGINE_ADDRESS": "test_address"
}):
from block_wrapper import block_main_activity, validate_input, validate_output
class TestBlockWrapper(unittest.TestCase):
def setUp(self):
# Mock schemas to use for testing
self.mock_request_schema = {
"type": "object",
"properties": {
"a": {"type": "integer"},
"b": {"type": "integer"}
},
"required": ["a", "b"]
}
self.mock_response_schema = {
"type": "object",
"properties": {
"sum": {"type": "integer"}
},
"required": ["sum"]
}
# Mock the contents of request_schema.json and response_schema.json using different patchers
self.mock_open_request = mock_open(read_data=json.dumps(self.mock_request_schema))
self.mock_open_response = mock_open(read_data=json.dumps(self.mock_response_schema))
# Mock load_block_main to return a mock main function
self.load_block_main_patcher = patch("block_wrapper.load_block_main", return_value=MagicMock(return_value={"sum": 3}))
self.mock_load_block_main = self.load_block_main_patcher.start()
def tearDown(self):
# Stop all patches
self.load_block_main_patcher.stop()
@patch("block_wrapper.load_schema")
def test_validate_input_success(self, mock_load_schema):
# Set up load_schema to return request schema for validate_input
mock_load_schema.return_value = self.mock_request_schema
input_data = {"a": 1, "b": 2}
validate_input(input_data) # Should pass without errors
@patch("block_wrapper.load_schema")
def test_validate_input_failure(self, mock_load_schema):
# Set up load_schema to return request schema for validate_input
mock_load_schema.return_value = self.mock_request_schema
input_data = {"a": 1} # Missing 'b'
with self.assertRaises(ValueError):
validate_input(input_data)
@patch("block_wrapper.load_schema")
def test_validate_output_success(self, mock_load_schema):
# Set up load_schema to return response schema for validate_output
mock_load_schema.return_value = self.mock_response_schema
output_data = {"sum": 3}
validate_output(output_data) # Should pass without errors
@patch("block_wrapper.load_schema")
def test_validate_output_failure(self, mock_load_schema):
# Set up load_schema to return response schema for validate_output
mock_load_schema.return_value = self.mock_response_schema
output_data = {} # Missing 'sum'
with self.assertRaises(ValueError):
validate_output(output_data)
@patch("block_wrapper.load_schema")
async def test_block_main_activity_success(self, mock_load_schema):
# Set up load_schema to return request and response schemas in order
mock_load_schema.side_effect = [self.mock_request_schema, self.mock_response_schema]
input_data = {"a": 1, "b": 2}
result = await block_main_activity(input_data)
self.assertEqual(result, {"sum": 3})
@patch("block_wrapper.load_schema")
async def test_block_main_activity_failure(self, mock_load_schema):
# Set up load_schema to return request and response schemas in order
mock_load_schema.side_effect = [self.mock_request_schema, self.mock_response_schema]
# Cause an exception in main function
self.mock_load_block_main.side_effect = Exception("Unexpected error")
input_data = {"a": 1, "b": 2}
with self.assertRaises(RuntimeError):
await block_main_activity(input_data)
@patch("block_wrapper.load_schema")
async def test_block_main_activity_input_validation_failure(self, mock_load_schema):
# Mock validate_input to raise ValidationError
with patch("block_wrapper.validate_input", side_effect=ValidationError("Invalid input")):
input_data = {"a": 1} # Missing 'b'
with self.assertRaises(ValueError):
await block_main_activity(input_data)
@patch("block_wrapper.load_schema")
async def test_block_main_activity_output_validation_failure(self, mock_load_schema):
# Mock validate_output to raise ValidationError
with patch("block_wrapper.validate_output", side_effect=ValidationError("Invalid output")):
input_data = {"a": 1, "b": 2}
with self.assertRaises(ValueError):
await block_main_activity(input_data)
if __name__ == "__main__":
unittest.main()