Compare commits

..

1 Commits

Author SHA1 Message Date
9a8753f490 Add initial files
All checks were successful
CI Workflow / Testing the Block (push) Successful in 1m43s
CI Workflow / Containerize the Block (push) Successful in 1m42s
2025-04-09 16:45:38 +00:00
6 changed files with 591 additions and 1 deletions

View File

@ -0,0 +1,65 @@
name: CI Workflow
on:
push:
branches:
- '*'
jobs:
test:
name: Testing the Block
runs-on: ubuntu-latest
steps:
- name: Checkout Code
uses: actions/checkout@v4
- name: Set Up Python Environment
uses: actions/setup-python@v4
with:
python-version: '3.10'
# - name: Install Python Dependencies
# run: |
# python -m pip install --upgrade pip
# pip install -r requirements.txt
# - name: Execute Unit Tests
# run: python -m unittest discover -s . -p 'test_*.py'
build_and_push:
name: Containerize the Block
runs-on: ubuntu-latest
needs: test
steps:
- name: Extract Repository Name
id: extract_repo
run: echo "repo_name=${GITHUB_REPOSITORY##*/}" >> $GITHUB_OUTPUT
- name: Checkout Codebase
uses: actions/checkout@v4
with:
fetch-depth: 1
- name: Configure Docker Buildx
uses: docker/setup-buildx-action@v3
with:
config-inline: |
[registry."centurion-version-control.default.svc.cluster.local:3000"]
http = true
insecure = true
- name: Log In to Container Registry
uses: docker/login-action@v2
with:
registry: centurion-version-control.default.svc.cluster.local:3000
username: ${{ secrets.CI_USER }}
password: ${{ secrets.CI_USER_TOKEN }}
- name: Build and Push Docker Image to Registry
uses: docker/build-push-action@v4
with:
context: .
push: true
tags: |
centurion-version-control.default.svc.cluster.local:3000/centurion/${{ steps.extract_repo.outputs.repo_name }}/${{ github.ref_name }}:${{ github.sha }}
centurion-version-control.default.svc.cluster.local:3000/centurion/${{ steps.extract_repo.outputs.repo_name }}/${{ github.ref_name }}:latest

17
Dockerfile Normal file
View File

@ -0,0 +1,17 @@
# Use Python 3.10 slim as base
FROM python:3.10-slim AS base
# Set a working directory
WORKDIR /app
# Copy only the requirements file first
COPY requirements.txt .
# Install dependencies
RUN pip install --no-cache-dir -r requirements.txt
# Copy the rest of the application
COPY . .
# Final entrypoint
ENTRYPOINT ["python", "/app/block_wrapper.py"]

View File

@ -1 +1,7 @@
**Hello world!!!**
# Activity Block Wrapper
### Example Usage with Docker
1. **Build the Base Image**:
```bash
docker build -f Dockerfile.base -t activity_block_wrapper:latest .
```

258
block_wrapper.py Normal file
View File

@ -0,0 +1,258 @@
import importlib
import json
import asyncio
import logging
import os
import re
import sys
import requests
from temporalio import activity
from temporalio.exceptions import ApplicationError
from jsonschema import validate, ValidationError
from temporalio.client import Client
from temporalio.worker import Worker
import time
# Configure logging
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(name)s - %(message)s",
)
logger = logging.getLogger(__name__)
# Automatically determine if in a test environment
IS_TEST_ENVIRONMENT = "unittest" in sys.modules
# Environment variables
REPO_NAME = os.getenv('REPO_NAME')
BRANCH_NAME = os.getenv('BRANCH_NAME')
COMMIT_ID = os.getenv('VERSION')
NAMESPACE = os.getenv('NAMESPACE')
FLOWX_ENGINE_ADDRESS = os.getenv('FLOWX_ENGINE_ADDRESS')
SQLPAD_API_URL = os.getenv('SQLPAD_API_URL')
if not BRANCH_NAME or not COMMIT_ID or not NAMESPACE or not FLOWX_ENGINE_ADDRESS:
raise ValueError("Missing required environment variables.")
COMMIT_ID_SHORT = COMMIT_ID[:10]
# Sanitize name function
def sanitize_name(name):
sanitized = re.sub(r'\W|^(?=\d)', '_', name)
sanitized = re.sub(r'_+', '_', sanitized)
return sanitized.strip('_')
BLOCK_NAME = REPO_NAME + "_" + BRANCH_NAME
block_name_safe = sanitize_name(BLOCK_NAME)
commit_id_safe = sanitize_name(COMMIT_ID_SHORT)
# Construct the task queue name
TASK_QUEUE = f"{block_name_safe}_{commit_id_safe}"
# Load JSON schema
def load_schema(schema_path):
try:
with open(schema_path, 'r') as schema_file:
return json.load(schema_file)
except Exception as e:
logger.error("Failed to load schema from %s: %s", schema_path, e)
if not IS_TEST_ENVIRONMENT:
raise ApplicationError(f"Schema loading failed: {e}")
else:
raise ValueError(f"Schema loading failed: {e}")
# Validate input against request schema
def validate_input(input_data):
request_schema = load_schema("/app/request_schema.json")
try:
validate(instance=input_data, schema=request_schema)
logger.info("Input data validated successfully")
except ValidationError as e:
logger.error("Input validation failed: %s", e)
if not IS_TEST_ENVIRONMENT:
raise ApplicationError(f"Input validation error: {e}")
else:
raise ValueError(f"Input validation error: {e}")
# Validate output against response schema
def validate_output(output_data):
response_schema = load_schema("/app/response_schema.json")
try:
validate(instance=output_data, schema=response_schema)
logger.info("Output data validated successfully")
except ValidationError as e:
logger.error("Output validation failed: %s", e)
if not IS_TEST_ENVIRONMENT:
raise ApplicationError(f"Output validation error: {e}")
else:
raise ValueError(f"Output validation error: {e}")
# Get the connection ID from config.json
def get_connection_id(namespace):
response_schema = load_schema("/app/config.json")
for item in response_schema:
if item.get("namespace") == namespace:
logger.info("Got the connectionID")
return item.get("connectionId")
logger.error("Provided Namespace not found.")
raise ValueError(f"Namespace '{namespace}' not found")
# Read SQL file and replace placeholders
def construct_sql(input_data):
try:
with open("/app/main.sql", "r") as sql_file:
sql_template = sql_file.read()
for key, value in input_data.items():
placeholder = f"${key}"
if value is None:
replacement = "NULL"
elif isinstance(value, bool):
replacement = "TRUE" if value else "FALSE"
elif isinstance(value, str):
replacement = f"'{value}'"
else:
replacement = str(value)
sql_template = sql_template.replace(placeholder, replacement)
logger.info("SQL query constructed.")
return sql_template.strip()
except Exception as e:
logger.error("Error processing SQL template: %s", e)
raise ApplicationError(f"SQL template error: {e}")
def get_batch_results(batch_id, retry_interval=0.05, max_retries=5):
retries = 0
while retries < max_retries:
try:
response = requests.get(f"{SQLPAD_API_URL}/api/batches/{batch_id}")
response.raise_for_status()
batch_status = response.json()
status = batch_status.get("status")
if status in ["finished", "error"]:
statements = batch_status.get("statements", [])
if not statements:
raise ApplicationError("No statements found in batch response.")
statement = statements[0]
statement_id = statement.get("id")
error = statement.get("error")
columns = statement.get("columns", None)
sql_text = batch_status.get("batchText", "").strip().lower()
logger.info(f"statements: {statements}")
logger.info(f"error from batches result {error}, statement: {statement_id}, columns: {columns}")
if error:
raise ApplicationError(f"SQL execution failed: {error}")
is_select_query = sql_text.startswith("select") or (
sql_text.startswith("with") and "select" in sql_text
)
if is_select_query and not columns:
raise ApplicationError("SELECT query did not return columns, cannot process data.")
return status, statement_id, error, columns, is_select_query
time.sleep(retry_interval)
retries += 1
except requests.RequestException as e:
logger.error("Failed to fetch batch results: %s", e)
raise ApplicationError(f"Failed to fetch batch results: {e}")
raise ApplicationError("SQLPad batch execution timed out.")
def execute_sqlpad_query(connection_id, sql_query):
payload = {
"connectionId": connection_id,
"name": "",
"batchText": sql_query,
"selectedText": ""
}
try:
response = requests.post(f"{SQLPAD_API_URL}/api/batches", json=payload)
response.raise_for_status()
batch_response = response.json()
batch_id = batch_response.get("statements", [{}])[0].get("batchId")
logger.info(f"Batch ID from the batches API response {batch_id}")
if not batch_id:
raise ApplicationError("Batch ID not found in SQLPad response.")
status, statement_id, error, columns, is_select_query = get_batch_results(batch_id)
if not is_select_query:
return {"status": status, "error": error}
result_response = requests.get(f"{SQLPAD_API_URL}/api/statements/{statement_id}/results")
result_response.raise_for_status()
result_data = result_response.json()
type_mapping = {
"number": float,
"string": str,
"date": str,
"boolean": bool,
"timestamp": str,
}
column_names_list = [col["name"] for col in columns]
column_types_list = [col["datatype"] for col in columns]
converted_data = [
[
type_mapping.get(dtype, str)(value) if value is not None else None
for dtype, value in zip(column_types_list, row)
]
for row in result_data
]
results_dict_list = [dict(zip(column_names_list, row)) for row in converted_data]
logger.info(f"results_dict_list: {results_dict_list}")
return {"results": results_dict_list}
except requests.RequestException as e:
logger.error("SQLPad API request failed: %s", e)
raise ApplicationError(f"SQLPad API request failed: {e}")
@activity.defn
async def block_main_activity(input_data):
validate_input(input_data)
try:
sql_query = construct_sql(input_data)
logger.info(f"constructed sql query: {sql_query}")
connection_id = get_connection_id(NAMESPACE)
if connection_id:
logger.info(f"connection id exists {connection_id}")
result = execute_sqlpad_query(connection_id, sql_query)
validate_output(result)
logger.info(f"final result for the query: {result}")
return result
else:
logger.error("connection id not exists, please add the connection id according to the namespace.")
raise ApplicationError("connection id not exists, please add the connection id according to the namespace.")
except Exception as e:
logger.error("Error executing query execution: %s", e)
if not IS_TEST_ENVIRONMENT:
raise ApplicationError(f"Error during block execution: {e}") from e
else:
raise RuntimeError("Error during query execution") from e
async def main():
try:
client = await Client.connect(FLOWX_ENGINE_ADDRESS, namespace=NAMESPACE)
worker = Worker(
client,
task_queue=TASK_QUEUE,
activities=[block_main_activity],
)
logger.info("Worker starting, listening to task queue: %s", TASK_QUEUE)
await worker.run()
except Exception as e:
logger.critical("Worker failed to start: %s", e)
raise
if __name__ == "__main__":
asyncio.run(main())

3
requirements.txt Normal file
View File

@ -0,0 +1,3 @@
temporalio==1.6.0
jsonschema==4.23.0
requests==2.32.3

241
test_block_wrapper.py Normal file
View File

@ -0,0 +1,241 @@
import unittest
from unittest.mock import patch, MagicMock, mock_open
import json
import asyncio
from jsonschema import ValidationError
import os
with patch.dict('os.environ', {
"REPO_NAME": "test_repo",
"BRANCH_NAME": "test_branch",
"VERSION": "test_version",
"NAMESPACE": "test_namespace",
"FLOWX_ENGINE_ADDRESS": "test_address"
}):
from block_wrapper import block_main_activity, validate_input, validate_output, construct_sql, get_connection_id
class TestBlockWrapper(unittest.TestCase):
def setUp(self):
# Mock schemas to use for testing
self.mock_request_schema = {
"type": "object",
"properties": {
"salary": {"type": "number"},
"department": {"type": "string"}
},
"required": ["salary", "department"]
}
self.mock_response_schema = {
"type": "object",
"$schema": "http://json-schema.org/draft-07/schema",
"properties": {
"id": {
"type": "integer"
},
"first_name": {
"type": "string"
},
"last_name": {
"type": "string"
},
"email": {
"type": "string",
"format": "email"
},
"phone_number": {
"type": "string"
},
"hire_date": {
"type": "string",
"format": "date-time"
},
"job_title": {
"type": "string"
},
"salary": {
"type": "number"
},
"department": {
"type": "string"
}
},
"required": ["id","first_name","last_name","email","phone_number","hire_date","job_title","salary","department"]
}
self.mock_config_schema = [
{
"namespace": "staging",
"connectionId": "8d7341b4-53a5-41b8-8c9d-5133fafb5d7b"
},
{
"namespace": "production",
"connectionId": "4b1437d8-53a5-41b8-8c9d-5133fafbtyuu"
}
]
self.mock_main_sql = "SELECT * FROM public.employee WHERE salary=$salary and department=$department;"
# Mock the contents of request_schema.json and response_schema.json using different patchers
self.mock_open_request = mock_open(read_data=json.dumps(self.mock_request_schema))
self.mock_open_response = mock_open(read_data=json.dumps(self.mock_response_schema))
self.mock_open_config = mock_open(read_data=json.dumps(self.mock_config_schema))
self.mock_open_main_sql = mock_open(read_data=self.mock_main_sql)
self.open_main_sql_patcher = patch("builtins.open", self.mock_open_main_sql)
self.open_main_sql_patcher.start()
# Mock execute_sqlpad_query to return a known result
self.load_block_main_patcher = patch(
"block_wrapper.execute_sqlpad_query",
return_value=MagicMock(
return_value={
"id": 4,
"first_name": "Bob",
"last_name": "Brown",
"email": "bob.brown@example.com",
"phone_number": "444-222-1111",
"hire_date": "2020-07-25",
"job_title": "Marketing Specialist",
"salary": 60000.00,
"department": "Marketing"
}
)
)
self.mock_load_block_main = self.load_block_main_patcher.start()
def tearDown(self):
# Stop all patches
self.load_block_main_patcher.stop()
self.open_main_sql_patcher.stop()
@patch("block_wrapper.load_schema")
def test_validate_input_success(self, mock_load_schema):
# Set up load_schema to return request schema for validate_input
mock_load_schema.return_value = self.mock_request_schema
input_data = {"salary": 20000.0, "department": "Marketing"}
validate_input(input_data) # Should pass without errors
@patch("block_wrapper.load_schema")
def test_validate_input_failure(self, mock_load_schema):
# Set up load_schema to return request schema for validate_input
mock_load_schema.return_value = self.mock_request_schema
input_data = {"salary": 20000.00} # Missing 'department'
with self.assertRaises(ValueError):
validate_input(input_data)
@patch("block_wrapper.load_schema")
def test_validate_output_success(self, mock_load_schema):
# Set up load_schema to return response schema for validate_output
mock_load_schema.return_value = self.mock_response_schema
output_data = {
"id": 4,
"first_name": "Bob",
"last_name": "Brown",
"email": "bob.brown@example.com",
"phone_number": "444-222-1111",
"hire_date": "2020-07-25",
"job_title": "Marketing Specialist",
"salary": 60000.00,
"department": "Marketing"
}
validate_output(output_data) # Should pass without errors
@patch("block_wrapper.load_schema")
def test_validate_output_failure(self, mock_load_schema):
# Set up load_schema to return response schema for validate_output
mock_load_schema.return_value = self.mock_response_schema
# Missing department
output_data = {
"id": 4,
"first_name": "Bob",
"last_name": "Brown",
"email": "bob.brown@example.com",
"phone_number": "444-222-1111",
"hire_date": "2020-07-25",
"job_title": "Marketing Specialist",
"salary": 60000.00
}
with self.assertRaises(ValueError):
validate_output(output_data)
@patch("block_wrapper.load_schema")
async def test_block_main_activity_success(self, mock_load_schema):
# Set up load_schema to return request and response schemas in order
mock_load_schema.side_effect = [self.mock_request_schema, self.mock_response_schema]
input_data = {"salary": 20000.0, "department": "Marketing"}
result = await block_main_activity(input_data)
self.assertEqual(
result,
{
"id": 4,
"first_name": "Bob",
"last_name": "Brown",
"email": "bob.brown@example.com",
"phone_number": "444-222-1111",
"hire_date": "2020-07-25",
"job_title": "Marketing Specialist",
"salary": 60000.00,
"department": "Marketing"
}
)
@patch("block_wrapper.load_schema")
async def test_block_main_activity_failure(self, mock_load_schema):
# Set up load_schema to return request and response schemas in order
mock_load_schema.side_effect = [self.mock_request_schema, self.mock_response_schema]
# Cause an exception in main function
self.mock_load_block_main.side_effect = Exception("Unexpected error")
input_data = {"salary": 20000.0, "department": "Marketing"}
with self.assertRaises(RuntimeError):
await block_main_activity(input_data)
@patch("block_wrapper.validate_input", side_effect=ValidationError("Invalid input"))
async def test_block_main_activity_input_validation_failure(self, mock_validate):
input_data = {"salary": 20000.00} # Missing 'department'
with self.assertRaises(ValueError):
await block_main_activity(input_data)
@patch("block_wrapper.validate_output", side_effect=ValidationError("Invalid output"))
async def test_block_main_activity_output_validation_failure(self, mock_validate):
input_data = {"salary": 20000.0, "department": "Marketing"}
with self.assertRaises(ValueError):
await block_main_activity(input_data)
@patch.dict(os.environ, {"NAMESPACE": "staging"})
@patch("block_wrapper.load_schema")
def test_get_connection_id_staging(self, mock_load_schema):
"""Test fetching connectionId for 'staging' namespace"""
mock_load_schema.return_value = self.mock_config_schema
connection_id = get_connection_id(os.environ["NAMESPACE"])
self.assertEqual(connection_id, "8d7341b4-53a5-41b8-8c9d-5133fafb5d7b")
@patch.dict(os.environ, {"NAMESPACE": "production"})
@patch("block_wrapper.load_schema")
def test_get_connection_id_production(self, mock_load_schema):
"""Test fetching connectionId for 'production' namespace"""
mock_load_schema.return_value = self.mock_config_schema
connection_id = get_connection_id(os.environ["NAMESPACE"])
self.assertEqual(connection_id, "4b1437d8-53a5-41b8-8c9d-5133fafbtyuu")
@patch("block_wrapper.load_schema")
def test_get_connection_id_invalid_namespace(self, mock_load_schema):
"""Test handling of invalid namespace"""
mock_load_schema.return_value = self.mock_config_schema
with self.assertRaises(ValueError) as context:
get_connection_id("development")
self.assertIn("Namespace 'development' not found", str(context.exception))
@patch("block_wrapper.load_schema")
def test_valid_sql_replacement(self, mock_load_schema):
mock_load_schema.return_value = self.mock_main_sql
input_data = {"salary": 20000.0, "department": "Marketing"}
expected_sql = "SELECT * FROM public.employee WHERE salary=20000.0 and department='Marketing';"
result = construct_sql(input_data)
self.assertEqual(result, expected_sql)
if __name__ == "__main__":
unittest.main()