Compare commits
No commits in common. "db-activity-wrapper" and "main" have entirely different histories.
db-activit
...
main
@ -1,65 +0,0 @@
|
|||||||
name: CI Workflow
|
|
||||||
|
|
||||||
on:
|
|
||||||
push:
|
|
||||||
branches:
|
|
||||||
- '*'
|
|
||||||
|
|
||||||
jobs:
|
|
||||||
test:
|
|
||||||
name: Testing the Block
|
|
||||||
runs-on: ubuntu-latest
|
|
||||||
steps:
|
|
||||||
- name: Checkout Code
|
|
||||||
uses: actions/checkout@v4
|
|
||||||
|
|
||||||
- name: Set Up Python Environment
|
|
||||||
uses: actions/setup-python@v4
|
|
||||||
with:
|
|
||||||
python-version: '3.10'
|
|
||||||
|
|
||||||
# - name: Install Python Dependencies
|
|
||||||
# run: |
|
|
||||||
# python -m pip install --upgrade pip
|
|
||||||
# pip install -r requirements.txt
|
|
||||||
|
|
||||||
# - name: Execute Unit Tests
|
|
||||||
# run: python -m unittest discover -s . -p 'test_*.py'
|
|
||||||
|
|
||||||
build_and_push:
|
|
||||||
name: Containerize the Block
|
|
||||||
runs-on: ubuntu-latest
|
|
||||||
needs: test
|
|
||||||
steps:
|
|
||||||
- name: Extract Repository Name
|
|
||||||
id: extract_repo
|
|
||||||
run: echo "repo_name=${GITHUB_REPOSITORY##*/}" >> $GITHUB_OUTPUT
|
|
||||||
|
|
||||||
- name: Checkout Codebase
|
|
||||||
uses: actions/checkout@v4
|
|
||||||
with:
|
|
||||||
fetch-depth: 1
|
|
||||||
|
|
||||||
- name: Configure Docker Buildx
|
|
||||||
uses: docker/setup-buildx-action@v3
|
|
||||||
with:
|
|
||||||
config-inline: |
|
|
||||||
[registry."centurion-version-control.default.svc.cluster.local:3000"]
|
|
||||||
http = true
|
|
||||||
insecure = true
|
|
||||||
|
|
||||||
- name: Log In to Container Registry
|
|
||||||
uses: docker/login-action@v2
|
|
||||||
with:
|
|
||||||
registry: centurion-version-control.default.svc.cluster.local:3000
|
|
||||||
username: ${{ secrets.CI_USER }}
|
|
||||||
password: ${{ secrets.CI_USER_TOKEN }}
|
|
||||||
|
|
||||||
- name: Build and Push Docker Image to Registry
|
|
||||||
uses: docker/build-push-action@v4
|
|
||||||
with:
|
|
||||||
context: .
|
|
||||||
push: true
|
|
||||||
tags: |
|
|
||||||
centurion-version-control.default.svc.cluster.local:3000/centurion/${{ steps.extract_repo.outputs.repo_name }}/${{ github.ref_name }}:${{ github.sha }}
|
|
||||||
centurion-version-control.default.svc.cluster.local:3000/centurion/${{ steps.extract_repo.outputs.repo_name }}/${{ github.ref_name }}:latest
|
|
||||||
17
Dockerfile
17
Dockerfile
@ -1,17 +0,0 @@
|
|||||||
# Use Python 3.10 slim as base
|
|
||||||
FROM python:3.10-slim AS base
|
|
||||||
|
|
||||||
# Set a working directory
|
|
||||||
WORKDIR /app
|
|
||||||
|
|
||||||
# Copy only the requirements file first
|
|
||||||
COPY requirements.txt .
|
|
||||||
|
|
||||||
# Install dependencies
|
|
||||||
RUN pip install --no-cache-dir -r requirements.txt
|
|
||||||
|
|
||||||
# Copy the rest of the application
|
|
||||||
COPY . .
|
|
||||||
|
|
||||||
# Final entrypoint
|
|
||||||
ENTRYPOINT ["python", "/app/block_wrapper.py"]
|
|
||||||
@ -1,7 +1 @@
|
|||||||
# Activity Block Wrapper
|
**Hello world!!!**
|
||||||
|
|
||||||
### Example Usage with Docker
|
|
||||||
1. **Build the Base Image**:
|
|
||||||
```bash
|
|
||||||
docker build -f Dockerfile.base -t activity_block_wrapper:latest .
|
|
||||||
```
|
|
||||||
|
|||||||
258
block_wrapper.py
258
block_wrapper.py
@ -1,258 +0,0 @@
|
|||||||
import importlib
|
|
||||||
import json
|
|
||||||
import asyncio
|
|
||||||
import logging
|
|
||||||
import os
|
|
||||||
import re
|
|
||||||
import sys
|
|
||||||
import requests
|
|
||||||
from temporalio import activity
|
|
||||||
from temporalio.exceptions import ApplicationError
|
|
||||||
from jsonschema import validate, ValidationError
|
|
||||||
from temporalio.client import Client
|
|
||||||
from temporalio.worker import Worker
|
|
||||||
import time
|
|
||||||
|
|
||||||
# Configure logging
|
|
||||||
logging.basicConfig(
|
|
||||||
level=logging.INFO,
|
|
||||||
format="%(asctime)s [%(levelname)s] %(name)s - %(message)s",
|
|
||||||
)
|
|
||||||
logger = logging.getLogger(__name__)
|
|
||||||
|
|
||||||
# Automatically determine if in a test environment
|
|
||||||
IS_TEST_ENVIRONMENT = "unittest" in sys.modules
|
|
||||||
|
|
||||||
# Environment variables
|
|
||||||
REPO_NAME = os.getenv('REPO_NAME')
|
|
||||||
BRANCH_NAME = os.getenv('BRANCH_NAME')
|
|
||||||
COMMIT_ID = os.getenv('VERSION')
|
|
||||||
NAMESPACE = os.getenv('NAMESPACE')
|
|
||||||
FLOWX_ENGINE_ADDRESS = os.getenv('FLOWX_ENGINE_ADDRESS')
|
|
||||||
SQLPAD_API_URL = os.getenv('SQLPAD_API_URL')
|
|
||||||
|
|
||||||
|
|
||||||
if not BRANCH_NAME or not COMMIT_ID or not NAMESPACE or not FLOWX_ENGINE_ADDRESS:
|
|
||||||
raise ValueError("Missing required environment variables.")
|
|
||||||
|
|
||||||
COMMIT_ID_SHORT = COMMIT_ID[:10]
|
|
||||||
|
|
||||||
# Sanitize name function
|
|
||||||
def sanitize_name(name):
|
|
||||||
sanitized = re.sub(r'\W|^(?=\d)', '_', name)
|
|
||||||
sanitized = re.sub(r'_+', '_', sanitized)
|
|
||||||
return sanitized.strip('_')
|
|
||||||
|
|
||||||
BLOCK_NAME = REPO_NAME + "_" + BRANCH_NAME
|
|
||||||
block_name_safe = sanitize_name(BLOCK_NAME)
|
|
||||||
commit_id_safe = sanitize_name(COMMIT_ID_SHORT)
|
|
||||||
|
|
||||||
# Construct the task queue name
|
|
||||||
TASK_QUEUE = f"{block_name_safe}_{commit_id_safe}"
|
|
||||||
|
|
||||||
# Load JSON schema
|
|
||||||
def load_schema(schema_path):
|
|
||||||
try:
|
|
||||||
with open(schema_path, 'r') as schema_file:
|
|
||||||
return json.load(schema_file)
|
|
||||||
except Exception as e:
|
|
||||||
logger.error("Failed to load schema from %s: %s", schema_path, e)
|
|
||||||
if not IS_TEST_ENVIRONMENT:
|
|
||||||
raise ApplicationError(f"Schema loading failed: {e}")
|
|
||||||
else:
|
|
||||||
raise ValueError(f"Schema loading failed: {e}")
|
|
||||||
|
|
||||||
# Validate input against request schema
|
|
||||||
def validate_input(input_data):
|
|
||||||
request_schema = load_schema("/app/request_schema.json")
|
|
||||||
try:
|
|
||||||
validate(instance=input_data, schema=request_schema)
|
|
||||||
logger.info("Input data validated successfully")
|
|
||||||
except ValidationError as e:
|
|
||||||
logger.error("Input validation failed: %s", e)
|
|
||||||
if not IS_TEST_ENVIRONMENT:
|
|
||||||
raise ApplicationError(f"Input validation error: {e}")
|
|
||||||
else:
|
|
||||||
raise ValueError(f"Input validation error: {e}")
|
|
||||||
|
|
||||||
# Validate output against response schema
|
|
||||||
def validate_output(output_data):
|
|
||||||
response_schema = load_schema("/app/response_schema.json")
|
|
||||||
try:
|
|
||||||
validate(instance=output_data, schema=response_schema)
|
|
||||||
logger.info("Output data validated successfully")
|
|
||||||
except ValidationError as e:
|
|
||||||
logger.error("Output validation failed: %s", e)
|
|
||||||
if not IS_TEST_ENVIRONMENT:
|
|
||||||
raise ApplicationError(f"Output validation error: {e}")
|
|
||||||
else:
|
|
||||||
raise ValueError(f"Output validation error: {e}")
|
|
||||||
|
|
||||||
# Get the connection ID from config.json
|
|
||||||
def get_connection_id(namespace):
|
|
||||||
response_schema = load_schema("/app/config.json")
|
|
||||||
for item in response_schema:
|
|
||||||
if item.get("namespace") == namespace:
|
|
||||||
logger.info("Got the connectionID")
|
|
||||||
return item.get("connectionId")
|
|
||||||
logger.error("Provided Namespace not found.")
|
|
||||||
raise ValueError(f"Namespace '{namespace}' not found")
|
|
||||||
|
|
||||||
# Read SQL file and replace placeholders
|
|
||||||
def construct_sql(input_data):
|
|
||||||
try:
|
|
||||||
with open("/app/main.sql", "r") as sql_file:
|
|
||||||
sql_template = sql_file.read()
|
|
||||||
|
|
||||||
for key, value in input_data.items():
|
|
||||||
placeholder = f"${key}"
|
|
||||||
|
|
||||||
if value is None:
|
|
||||||
replacement = "NULL"
|
|
||||||
elif isinstance(value, bool):
|
|
||||||
replacement = "TRUE" if value else "FALSE"
|
|
||||||
elif isinstance(value, str):
|
|
||||||
replacement = f"'{value}'"
|
|
||||||
else:
|
|
||||||
replacement = str(value)
|
|
||||||
|
|
||||||
sql_template = sql_template.replace(placeholder, replacement)
|
|
||||||
|
|
||||||
logger.info("SQL query constructed.")
|
|
||||||
return sql_template.strip()
|
|
||||||
except Exception as e:
|
|
||||||
logger.error("Error processing SQL template: %s", e)
|
|
||||||
raise ApplicationError(f"SQL template error: {e}")
|
|
||||||
|
|
||||||
def get_batch_results(batch_id, retry_interval=0.05, max_retries=5):
|
|
||||||
retries = 0
|
|
||||||
while retries < max_retries:
|
|
||||||
try:
|
|
||||||
response = requests.get(f"{SQLPAD_API_URL}/api/batches/{batch_id}")
|
|
||||||
response.raise_for_status()
|
|
||||||
batch_status = response.json()
|
|
||||||
|
|
||||||
status = batch_status.get("status")
|
|
||||||
if status in ["finished", "error"]:
|
|
||||||
statements = batch_status.get("statements", [])
|
|
||||||
|
|
||||||
if not statements:
|
|
||||||
raise ApplicationError("No statements found in batch response.")
|
|
||||||
|
|
||||||
statement = statements[0]
|
|
||||||
statement_id = statement.get("id")
|
|
||||||
error = statement.get("error")
|
|
||||||
columns = statement.get("columns", None)
|
|
||||||
sql_text = batch_status.get("batchText", "").strip().lower()
|
|
||||||
logger.info(f"statements: {statements}")
|
|
||||||
logger.info(f"error from batches result {error}, statement: {statement_id}, columns: {columns}")
|
|
||||||
|
|
||||||
if error:
|
|
||||||
raise ApplicationError(f"SQL execution failed: {error}")
|
|
||||||
|
|
||||||
is_select_query = sql_text.startswith("select") or (
|
|
||||||
sql_text.startswith("with") and "select" in sql_text
|
|
||||||
)
|
|
||||||
if is_select_query and not columns:
|
|
||||||
raise ApplicationError("SELECT query did not return columns, cannot process data.")
|
|
||||||
|
|
||||||
return status, statement_id, error, columns, is_select_query
|
|
||||||
|
|
||||||
time.sleep(retry_interval)
|
|
||||||
retries += 1
|
|
||||||
except requests.RequestException as e:
|
|
||||||
logger.error("Failed to fetch batch results: %s", e)
|
|
||||||
raise ApplicationError(f"Failed to fetch batch results: {e}")
|
|
||||||
|
|
||||||
raise ApplicationError("SQLPad batch execution timed out.")
|
|
||||||
|
|
||||||
def execute_sqlpad_query(connection_id, sql_query):
|
|
||||||
payload = {
|
|
||||||
"connectionId": connection_id,
|
|
||||||
"name": "",
|
|
||||||
"batchText": sql_query,
|
|
||||||
"selectedText": ""
|
|
||||||
}
|
|
||||||
|
|
||||||
try:
|
|
||||||
response = requests.post(f"{SQLPAD_API_URL}/api/batches", json=payload)
|
|
||||||
response.raise_for_status()
|
|
||||||
batch_response = response.json()
|
|
||||||
|
|
||||||
batch_id = batch_response.get("statements", [{}])[0].get("batchId")
|
|
||||||
logger.info(f"Batch ID from the batches API response {batch_id}")
|
|
||||||
if not batch_id:
|
|
||||||
raise ApplicationError("Batch ID not found in SQLPad response.")
|
|
||||||
|
|
||||||
status, statement_id, error, columns, is_select_query = get_batch_results(batch_id)
|
|
||||||
if not is_select_query:
|
|
||||||
return {"status": status, "error": error}
|
|
||||||
|
|
||||||
result_response = requests.get(f"{SQLPAD_API_URL}/api/statements/{statement_id}/results")
|
|
||||||
result_response.raise_for_status()
|
|
||||||
result_data = result_response.json()
|
|
||||||
|
|
||||||
type_mapping = {
|
|
||||||
"number": float,
|
|
||||||
"string": str,
|
|
||||||
"date": str,
|
|
||||||
"boolean": bool,
|
|
||||||
"timestamp": str,
|
|
||||||
}
|
|
||||||
column_names_list = [col["name"] for col in columns]
|
|
||||||
column_types_list = [col["datatype"] for col in columns]
|
|
||||||
converted_data = [
|
|
||||||
[
|
|
||||||
type_mapping.get(dtype, str)(value) if value is not None else None
|
|
||||||
for dtype, value in zip(column_types_list, row)
|
|
||||||
]
|
|
||||||
for row in result_data
|
|
||||||
]
|
|
||||||
results_dict_list = [dict(zip(column_names_list, row)) for row in converted_data]
|
|
||||||
logger.info(f"results_dict_list: {results_dict_list}")
|
|
||||||
|
|
||||||
return {"results": results_dict_list}
|
|
||||||
|
|
||||||
except requests.RequestException as e:
|
|
||||||
logger.error("SQLPad API request failed: %s", e)
|
|
||||||
raise ApplicationError(f"SQLPad API request failed: {e}")
|
|
||||||
|
|
||||||
@activity.defn
|
|
||||||
async def block_main_activity(input_data):
|
|
||||||
validate_input(input_data)
|
|
||||||
try:
|
|
||||||
sql_query = construct_sql(input_data)
|
|
||||||
logger.info(f"constructed sql query: {sql_query}")
|
|
||||||
connection_id = get_connection_id(NAMESPACE)
|
|
||||||
if connection_id:
|
|
||||||
logger.info(f"connection id exists {connection_id}")
|
|
||||||
result = execute_sqlpad_query(connection_id, sql_query)
|
|
||||||
validate_output(result)
|
|
||||||
logger.info(f"final result for the query: {result}")
|
|
||||||
return result
|
|
||||||
else:
|
|
||||||
logger.error("connection id not exists, please add the connection id according to the namespace.")
|
|
||||||
raise ApplicationError("connection id not exists, please add the connection id according to the namespace.")
|
|
||||||
except Exception as e:
|
|
||||||
logger.error("Error executing query execution: %s", e)
|
|
||||||
if not IS_TEST_ENVIRONMENT:
|
|
||||||
raise ApplicationError(f"Error during block execution: {e}") from e
|
|
||||||
else:
|
|
||||||
raise RuntimeError("Error during query execution") from e
|
|
||||||
|
|
||||||
async def main():
|
|
||||||
try:
|
|
||||||
client = await Client.connect(FLOWX_ENGINE_ADDRESS, namespace=NAMESPACE)
|
|
||||||
worker = Worker(
|
|
||||||
client,
|
|
||||||
task_queue=TASK_QUEUE,
|
|
||||||
activities=[block_main_activity],
|
|
||||||
)
|
|
||||||
logger.info("Worker starting, listening to task queue: %s", TASK_QUEUE)
|
|
||||||
await worker.run()
|
|
||||||
except Exception as e:
|
|
||||||
logger.critical("Worker failed to start: %s", e)
|
|
||||||
raise
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
|
||||||
asyncio.run(main())
|
|
||||||
@ -1,3 +0,0 @@
|
|||||||
temporalio==1.6.0
|
|
||||||
jsonschema==4.23.0
|
|
||||||
requests==2.32.3
|
|
||||||
@ -1,241 +0,0 @@
|
|||||||
import unittest
|
|
||||||
from unittest.mock import patch, MagicMock, mock_open
|
|
||||||
import json
|
|
||||||
import asyncio
|
|
||||||
from jsonschema import ValidationError
|
|
||||||
import os
|
|
||||||
|
|
||||||
with patch.dict('os.environ', {
|
|
||||||
"REPO_NAME": "test_repo",
|
|
||||||
"BRANCH_NAME": "test_branch",
|
|
||||||
"VERSION": "test_version",
|
|
||||||
"NAMESPACE": "test_namespace",
|
|
||||||
"FLOWX_ENGINE_ADDRESS": "test_address"
|
|
||||||
}):
|
|
||||||
from block_wrapper import block_main_activity, validate_input, validate_output, construct_sql, get_connection_id
|
|
||||||
|
|
||||||
|
|
||||||
class TestBlockWrapper(unittest.TestCase):
|
|
||||||
|
|
||||||
def setUp(self):
|
|
||||||
# Mock schemas to use for testing
|
|
||||||
self.mock_request_schema = {
|
|
||||||
"type": "object",
|
|
||||||
"properties": {
|
|
||||||
"salary": {"type": "number"},
|
|
||||||
"department": {"type": "string"}
|
|
||||||
},
|
|
||||||
"required": ["salary", "department"]
|
|
||||||
}
|
|
||||||
|
|
||||||
self.mock_response_schema = {
|
|
||||||
"type": "object",
|
|
||||||
"$schema": "http://json-schema.org/draft-07/schema",
|
|
||||||
"properties": {
|
|
||||||
"id": {
|
|
||||||
"type": "integer"
|
|
||||||
},
|
|
||||||
"first_name": {
|
|
||||||
"type": "string"
|
|
||||||
},
|
|
||||||
"last_name": {
|
|
||||||
"type": "string"
|
|
||||||
},
|
|
||||||
"email": {
|
|
||||||
"type": "string",
|
|
||||||
"format": "email"
|
|
||||||
},
|
|
||||||
"phone_number": {
|
|
||||||
"type": "string"
|
|
||||||
},
|
|
||||||
"hire_date": {
|
|
||||||
"type": "string",
|
|
||||||
"format": "date-time"
|
|
||||||
},
|
|
||||||
"job_title": {
|
|
||||||
"type": "string"
|
|
||||||
},
|
|
||||||
"salary": {
|
|
||||||
"type": "number"
|
|
||||||
},
|
|
||||||
"department": {
|
|
||||||
"type": "string"
|
|
||||||
}
|
|
||||||
},
|
|
||||||
"required": ["id","first_name","last_name","email","phone_number","hire_date","job_title","salary","department"]
|
|
||||||
}
|
|
||||||
|
|
||||||
self.mock_config_schema = [
|
|
||||||
{
|
|
||||||
"namespace": "staging",
|
|
||||||
"connectionId": "8d7341b4-53a5-41b8-8c9d-5133fafb5d7b"
|
|
||||||
},
|
|
||||||
{
|
|
||||||
"namespace": "production",
|
|
||||||
"connectionId": "4b1437d8-53a5-41b8-8c9d-5133fafbtyuu"
|
|
||||||
}
|
|
||||||
]
|
|
||||||
|
|
||||||
self.mock_main_sql = "SELECT * FROM public.employee WHERE salary=$salary and department=$department;"
|
|
||||||
|
|
||||||
# Mock the contents of request_schema.json and response_schema.json using different patchers
|
|
||||||
self.mock_open_request = mock_open(read_data=json.dumps(self.mock_request_schema))
|
|
||||||
self.mock_open_response = mock_open(read_data=json.dumps(self.mock_response_schema))
|
|
||||||
self.mock_open_config = mock_open(read_data=json.dumps(self.mock_config_schema))
|
|
||||||
self.mock_open_main_sql = mock_open(read_data=self.mock_main_sql)
|
|
||||||
|
|
||||||
self.open_main_sql_patcher = patch("builtins.open", self.mock_open_main_sql)
|
|
||||||
self.open_main_sql_patcher.start()
|
|
||||||
|
|
||||||
# Mock execute_sqlpad_query to return a known result
|
|
||||||
self.load_block_main_patcher = patch(
|
|
||||||
"block_wrapper.execute_sqlpad_query",
|
|
||||||
return_value=MagicMock(
|
|
||||||
return_value={
|
|
||||||
"id": 4,
|
|
||||||
"first_name": "Bob",
|
|
||||||
"last_name": "Brown",
|
|
||||||
"email": "bob.brown@example.com",
|
|
||||||
"phone_number": "444-222-1111",
|
|
||||||
"hire_date": "2020-07-25",
|
|
||||||
"job_title": "Marketing Specialist",
|
|
||||||
"salary": 60000.00,
|
|
||||||
"department": "Marketing"
|
|
||||||
}
|
|
||||||
)
|
|
||||||
)
|
|
||||||
self.mock_load_block_main = self.load_block_main_patcher.start()
|
|
||||||
|
|
||||||
def tearDown(self):
|
|
||||||
# Stop all patches
|
|
||||||
self.load_block_main_patcher.stop()
|
|
||||||
self.open_main_sql_patcher.stop()
|
|
||||||
|
|
||||||
@patch("block_wrapper.load_schema")
|
|
||||||
def test_validate_input_success(self, mock_load_schema):
|
|
||||||
# Set up load_schema to return request schema for validate_input
|
|
||||||
mock_load_schema.return_value = self.mock_request_schema
|
|
||||||
input_data = {"salary": 20000.0, "department": "Marketing"}
|
|
||||||
validate_input(input_data) # Should pass without errors
|
|
||||||
|
|
||||||
@patch("block_wrapper.load_schema")
|
|
||||||
def test_validate_input_failure(self, mock_load_schema):
|
|
||||||
# Set up load_schema to return request schema for validate_input
|
|
||||||
mock_load_schema.return_value = self.mock_request_schema
|
|
||||||
input_data = {"salary": 20000.00} # Missing 'department'
|
|
||||||
with self.assertRaises(ValueError):
|
|
||||||
validate_input(input_data)
|
|
||||||
|
|
||||||
@patch("block_wrapper.load_schema")
|
|
||||||
def test_validate_output_success(self, mock_load_schema):
|
|
||||||
# Set up load_schema to return response schema for validate_output
|
|
||||||
mock_load_schema.return_value = self.mock_response_schema
|
|
||||||
output_data = {
|
|
||||||
"id": 4,
|
|
||||||
"first_name": "Bob",
|
|
||||||
"last_name": "Brown",
|
|
||||||
"email": "bob.brown@example.com",
|
|
||||||
"phone_number": "444-222-1111",
|
|
||||||
"hire_date": "2020-07-25",
|
|
||||||
"job_title": "Marketing Specialist",
|
|
||||||
"salary": 60000.00,
|
|
||||||
"department": "Marketing"
|
|
||||||
}
|
|
||||||
validate_output(output_data) # Should pass without errors
|
|
||||||
|
|
||||||
@patch("block_wrapper.load_schema")
|
|
||||||
def test_validate_output_failure(self, mock_load_schema):
|
|
||||||
# Set up load_schema to return response schema for validate_output
|
|
||||||
mock_load_schema.return_value = self.mock_response_schema
|
|
||||||
# Missing department
|
|
||||||
output_data = {
|
|
||||||
"id": 4,
|
|
||||||
"first_name": "Bob",
|
|
||||||
"last_name": "Brown",
|
|
||||||
"email": "bob.brown@example.com",
|
|
||||||
"phone_number": "444-222-1111",
|
|
||||||
"hire_date": "2020-07-25",
|
|
||||||
"job_title": "Marketing Specialist",
|
|
||||||
"salary": 60000.00
|
|
||||||
}
|
|
||||||
with self.assertRaises(ValueError):
|
|
||||||
validate_output(output_data)
|
|
||||||
|
|
||||||
@patch("block_wrapper.load_schema")
|
|
||||||
async def test_block_main_activity_success(self, mock_load_schema):
|
|
||||||
# Set up load_schema to return request and response schemas in order
|
|
||||||
mock_load_schema.side_effect = [self.mock_request_schema, self.mock_response_schema]
|
|
||||||
input_data = {"salary": 20000.0, "department": "Marketing"}
|
|
||||||
result = await block_main_activity(input_data)
|
|
||||||
self.assertEqual(
|
|
||||||
result,
|
|
||||||
{
|
|
||||||
"id": 4,
|
|
||||||
"first_name": "Bob",
|
|
||||||
"last_name": "Brown",
|
|
||||||
"email": "bob.brown@example.com",
|
|
||||||
"phone_number": "444-222-1111",
|
|
||||||
"hire_date": "2020-07-25",
|
|
||||||
"job_title": "Marketing Specialist",
|
|
||||||
"salary": 60000.00,
|
|
||||||
"department": "Marketing"
|
|
||||||
}
|
|
||||||
)
|
|
||||||
|
|
||||||
@patch("block_wrapper.load_schema")
|
|
||||||
async def test_block_main_activity_failure(self, mock_load_schema):
|
|
||||||
# Set up load_schema to return request and response schemas in order
|
|
||||||
mock_load_schema.side_effect = [self.mock_request_schema, self.mock_response_schema]
|
|
||||||
# Cause an exception in main function
|
|
||||||
self.mock_load_block_main.side_effect = Exception("Unexpected error")
|
|
||||||
input_data = {"salary": 20000.0, "department": "Marketing"}
|
|
||||||
with self.assertRaises(RuntimeError):
|
|
||||||
await block_main_activity(input_data)
|
|
||||||
|
|
||||||
@patch("block_wrapper.validate_input", side_effect=ValidationError("Invalid input"))
|
|
||||||
async def test_block_main_activity_input_validation_failure(self, mock_validate):
|
|
||||||
input_data = {"salary": 20000.00} # Missing 'department'
|
|
||||||
with self.assertRaises(ValueError):
|
|
||||||
await block_main_activity(input_data)
|
|
||||||
|
|
||||||
@patch("block_wrapper.validate_output", side_effect=ValidationError("Invalid output"))
|
|
||||||
async def test_block_main_activity_output_validation_failure(self, mock_validate):
|
|
||||||
input_data = {"salary": 20000.0, "department": "Marketing"}
|
|
||||||
with self.assertRaises(ValueError):
|
|
||||||
await block_main_activity(input_data)
|
|
||||||
|
|
||||||
@patch.dict(os.environ, {"NAMESPACE": "staging"})
|
|
||||||
@patch("block_wrapper.load_schema")
|
|
||||||
def test_get_connection_id_staging(self, mock_load_schema):
|
|
||||||
"""Test fetching connectionId for 'staging' namespace"""
|
|
||||||
mock_load_schema.return_value = self.mock_config_schema
|
|
||||||
connection_id = get_connection_id(os.environ["NAMESPACE"])
|
|
||||||
self.assertEqual(connection_id, "8d7341b4-53a5-41b8-8c9d-5133fafb5d7b")
|
|
||||||
|
|
||||||
@patch.dict(os.environ, {"NAMESPACE": "production"})
|
|
||||||
@patch("block_wrapper.load_schema")
|
|
||||||
def test_get_connection_id_production(self, mock_load_schema):
|
|
||||||
"""Test fetching connectionId for 'production' namespace"""
|
|
||||||
mock_load_schema.return_value = self.mock_config_schema
|
|
||||||
connection_id = get_connection_id(os.environ["NAMESPACE"])
|
|
||||||
self.assertEqual(connection_id, "4b1437d8-53a5-41b8-8c9d-5133fafbtyuu")
|
|
||||||
|
|
||||||
@patch("block_wrapper.load_schema")
|
|
||||||
def test_get_connection_id_invalid_namespace(self, mock_load_schema):
|
|
||||||
"""Test handling of invalid namespace"""
|
|
||||||
mock_load_schema.return_value = self.mock_config_schema
|
|
||||||
with self.assertRaises(ValueError) as context:
|
|
||||||
get_connection_id("development")
|
|
||||||
self.assertIn("Namespace 'development' not found", str(context.exception))
|
|
||||||
|
|
||||||
@patch("block_wrapper.load_schema")
|
|
||||||
def test_valid_sql_replacement(self, mock_load_schema):
|
|
||||||
mock_load_schema.return_value = self.mock_main_sql
|
|
||||||
input_data = {"salary": 20000.0, "department": "Marketing"}
|
|
||||||
expected_sql = "SELECT * FROM public.employee WHERE salary=20000.0 and department='Marketing';"
|
|
||||||
result = construct_sql(input_data)
|
|
||||||
self.assertEqual(result, expected_sql)
|
|
||||||
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
|
||||||
unittest.main()
|
|
||||||
Loading…
x
Reference in New Issue
Block a user