API Service Component
The API Service component is where you add custom business logic to your system design. While optional for simple CRUD operations, it becomes essential when you need validation, data transformation, or complex processing before database operations.
Overview
Think of the API Service as middleware between your User Request and Database. It intercepts requests, runs your custom Python code, and can modify, validate, or even reject operations before they reach the database.
In a URL shortener like bit.ly:
- User sends POST request with a long URL
- API Service generates a short code, validates the URL, checks for duplicates
- Database stores both the short code and long URL
- User can later GET the long URL using the short code
Without API Service, you'd just store whatever data comes in without any processing.
When to Use API Service
Use API Service when you need:
- ✅ Data validation (email format, required fields, etc.)
- ✅ Data transformation (hashing passwords, generating IDs)
- ✅ Custom IDs (short URLs, user-friendly slugs)
- ✅ Business logic (checking inventory, calculating prices)
- ✅ Duplicate detection (checking existing records)
- ✅ Error handling (custom validation messages)
Skip API Service if you only need:
- ❌ Simple CRUD operations
- ❌ Direct pass-through from request to database
- ❌ No data processing or validation
Component Features
Description Field
Add a description to document what your API Service does:
"Validates email format and checks for duplicate users before registration"
"Generates short URL codes and stores mappings"
"Processes payment and updates order status"
This helps you and others understand your system design at a glance.
Custom Code Editor
Click "Edit Code" or "Add Custom Code" to open a full-featured code editor with:
- Syntax highlighting for Python
- Auto-completion
- Line numbers
- Dark/light mode support
The process_request Function
Every API Service must define a process_request function. This is your entry point where all logic begins.
Function Signature
def process_request(input_data):
"""
Process the incoming request with access to existing database records
Args:
input_data (dict): Request data containing:
- method: HTTP method (e.g., "POST", "GET", "PUT", "DELETE")
- endpoint: API endpoint (e.g., "users", "urls")
- data: Request body data (e.g., {"email": "user@example.com"})
- existing_records: List of all existing records in the target table
Returns:
dict: Response object with operation, table, columns, and data
"""
Input Data Structure
The input_data parameter contains:
{
"method": "POST", # HTTP method from User Request
"endpoint": "users", # Endpoint name (becomes table name)
"data": { # Request body from User Request properties
"name": "John Doe",
"email": "john@example.com"
},
"existing_records": [ # All records currently in this table
{
"record_id": "abc123",
"name": "Jane Smith",
"email": "jane@example.com",
"created_at": "2024-01-15T10:00:00Z"
}
]
}
Accessing Input Data:
method = input_data.get('method') # "POST", "GET", "PUT", "DELETE"
endpoint = input_data.get('endpoint') # "users", "posts", etc.
data = input_data.get('data', {}) # Request body
existing = input_data.get('existing_records', []) # Previous records
Return Structure
Your function must return a dictionary with these keys:
{
"operation": "INSERT", # Required: INSERT, UPDATE, or NONE
"table": "users", # Optional: defaults to endpoint if not provided
"columns": [ # Required for INSERT/UPDATE
{"name": "record_id", "type": "TEXT"}, # Optional: for custom IDs
{"name": "name", "type": "TEXT"},
{"name": "email", "type": "TEXT"},
{"name": "age", "type": "INTEGER"}
],
"data": { # Required for INSERT/UPDATE
"record_id": "custom_id_123", # Optional: custom ID
"name": "John Doe",
"email": "john@example.com",
"age": 27
}
}
Operations
INSERT Operation
Creates a new record in the database.
When to use:
- POST requests creating new records
- After validation passes
- When generating new data
Example:
def process_request(input_data):
data = input_data.get('data', {})
# Validate required fields
if 'email' not in data:
return {
'operation': 'NONE',
'error': 'Email is required'
}
return {
'operation': 'INSERT',
'table': 'users',
'columns': [
{'name': 'name', 'type': 'TEXT'},
{'name': 'email', 'type': 'TEXT'}
],
'data': {
'name': data.get('name'),
'email': data.get('email')
}
}
UPDATE Operation
Modifies an existing record in the database.
When to use:
- PUT requests updating existing records
- When record ID is known
- Modifying specific fields
Example:
def process_request(input_data):
data = input_data.get('data', {})
return {
'operation': 'UPDATE',
'columns': [
{'name': 'status', 'type': 'TEXT'},
{'name': 'updated_at', 'type': 'TIMESTAMP'}
],
'data': {
'status': 'active',
'updated_at': '2024-01-15T10:00:00Z'
}
}
NONE Operation
Stops processing without database operation. Used for validation failures or when returning custom errors.
When to use:
- Validation fails
- Duplicate detection
- Conditional logic that prevents database changes
- Custom error messages
Example:
def process_request(input_data):
data = input_data.get('data', {})
existing = input_data.get('existing_records', [])
# Check for duplicate email
email = data.get('email')
for record in existing:
if record.get('email') == email:
return {
'operation': 'NONE',
'error': 'Email already exists'
}
# Continue with INSERT...
Custom Record IDs
By default, the system auto-generates unique IDs for all records. However, you can override this with custom IDs - essential for systems like URL shorteners or user-friendly slugs.
Why Custom IDs?
Problem without custom IDs:
Long URL: https://example.com/very/long/path
Auto-generated ID: "a7b3c9d2-8f1e-4a6b-9c3d-2e5f8a1b4c7d"
Access via: /api/urls/a7b3c9d2-8f1e-4a6b-9c3d-2e5f8a1b4c7d ❌ Not user-friendly
Solution with custom IDs:
Long URL: https://example.com/very/long/path
Custom ID: "abc123"
Access via: /api/urls/abc123 ✅ Clean and short!
How to Set Custom IDs
Step 1: Add record_id to columns array:
columns = [
{'name': 'record_id', 'type': 'TEXT'}, # This enables custom IDs
{'name': 'long_url', 'type': 'TEXT'},
{'name': 'short_code', 'type': 'TEXT'}
]
Step 2: Add record_id to data:
data = {
'record_id': 'abc123', # Your custom ID
'long_url': 'https://example.com/very/long/path',
'short_code': 'abc123'
}
Complete Example: URL Shortener
import random
import string
def generate_short_code(length=6):
"""Generate a random short code"""
chars = string.ascii_letters + string.digits
return ''.join(random.choice(chars) for _ in range(length))
def process_request(input_data):
data = input_data.get('data', {})
existing = input_data.get('existing_records', [])
# Validate long URL
long_url = data.get('long_url')
if not long_url:
return {
'operation': 'NONE',
'error': 'Long URL is required'
}
# Generate unique short code
while True:
short_code = generate_short_code()
# Check if code already exists
if not any(r.get('record_id') == short_code for r in existing):
break
return {
'operation': 'INSERT',
'table': 'urls',
'columns': [
{'name': 'record_id', 'type': 'TEXT'}, # Enable custom ID
{'name': 'long_url', 'type': 'TEXT'},
{'name': 'short_code', 'type': 'TEXT'},
{'name': 'clicks', 'type': 'INTEGER'}
],
'data': {
'record_id': short_code, # Use short code as ID
'long_url': long_url,
'short_code': short_code,
'clicks': 0
}
}
Now you can retrieve the URL with:
GET /api/urls/abc123
Instead of:
GET /api/urls/a7b3c9d2-8f1e-4a6b-9c3d-2e5f8a1b4c7d
If you don't provide record_id in both columns and data, the system will auto-generate an ID. This is fine for most use cases, but required for systems where the ID has meaning (URL shorteners, slugs, etc.).
Returning Custom Errors
Stop processing and return error messages to the user.
Error Structure
return {
'operation': 'NONE',
'error': 'Your error message here'
}
Error Examples
Validation Error:
if 'email' not in data or '@' not in data['email']:
return {
'operation': 'NONE',
'error': 'Valid email address is required'
}
Duplicate Detection:
email = data.get('email')
for record in existing:
if record.get('email') == email:
return {
'operation': 'NONE',
'error': f'User with email {email} already exists'
}
Business Logic:
age = data.get('age', 0)
if age < 18:
return {
'operation': 'NONE',
'error': 'Must be 18 or older to register'
}
Column Types
When defining columns, use these supported types:
| Type | Python Type | Use For |
|---|---|---|
TEXT | str | Strings, emails, URLs, descriptions, dates |
INTEGER | int | Whole numbers, counts, IDs |
REAL | float | Decimals, prices, measurements |
BOOLEAN | bool | True/false flags |
Example:
columns = [
{'name': 'username', 'type': 'TEXT'},
{'name': 'age', 'type': 'INTEGER'},
{'name': 'balance', 'type': 'REAL'},
{'name': 'is_active', 'type': 'BOOLEAN'},
{'name': 'created_at', 'type': 'TEXT'} # Store ISO format dates as TEXT
]
Working with Existing Records
Access previously inserted records to check for duplicates, calculate aggregates, or implement business logic.
Structure of Existing Records
existing_records = [
{
"record_id": "abc123",
"name": "John Doe",
"email": "john@example.com",
"created_at": "2024-01-15T10:00:00Z",
"updated_at": "2024-01-15T10:00:00Z"
},
{
"record_id": "def456",
"name": "Jane Smith",
"email": "jane@example.com",
"created_at": "2024-01-14T09:30:00Z",
"updated_at": "2024-01-14T09:30:00Z"
}
]
Example: Check for Duplicates
def process_request(input_data):
data = input_data.get('data', {})
existing = input_data.get('existing_records', [])
email = data.get('email')
# Check if email already exists
for record in existing:
if record.get('email') == email:
return {
'operation': 'NONE',
'error': f'Email {email} is already registered'
}
# Email is unique, proceed with insert
return {
'operation': 'INSERT',
'columns': [
{'name': 'email', 'type': 'TEXT'},
{'name': 'name', 'type': 'TEXT'}
],
'data': data
}
Example: Count Records
def process_request(input_data):
existing = input_data.get('existing_records', [])
# Count active users
active_count = sum(1 for r in existing if r.get('is_active'))
# Limit to 100 active users
if active_count >= 100:
return {
'operation': 'NONE',
'error': 'Maximum number of active users reached'
}
# Continue with insert...
Example: Generate Sequential IDs
def process_request(input_data):
existing = input_data.get('existing_records', [])
# Find highest order number
max_order = 0
for record in existing:
order_num = record.get('order_number', 0)
if order_num > max_order:
max_order = order_num
new_order_number = max_order + 1
return {
'operation': 'INSERT',
'columns': [
{'name': 'order_number', 'type': 'INTEGER'},
{'name': 'customer', 'type': 'TEXT'}
],
'data': {
'order_number': new_order_number,
'customer': input_data['data'].get('customer')
}
}
Available Python Libraries
Your custom code runs in a sandboxed Python environment with access to:
Standard Library:
json- JSON parsingdatetime,timedelta- Date/time operationsrandom- Random number generationstring- String constants and operationsmath- Mathematical functions- Other standard Python modules
Example using datetime:
from datetime import datetime, timedelta
def process_request(input_data):
# Calculate expiration date (30 days from now)
expiry = datetime.now() + timedelta(days=30)
return {
'operation': 'INSERT',
'columns': [
{'name': 'token', 'type': 'TEXT'},
{'name': 'expires_at', 'type': 'TEXT'}
],
'data': {
'token': 'abc123',
'expires_at': expiry.isoformat()
}
}
You can use print() statements for debugging your code, but you must remove them before running the simulation.
Print statements will cause your code to fail with an error:
def process_request(input_data):
print("Debug: Processing request") # ❌ This will cause an error
# ... rest of code
Why? Print output interferes with the structured return format the system expects.
How to debug:
- Add print statements temporarily
- Review the error output to see what's happening
- Remove all print statements
- Run simulation successfully
Alternative: Use the error return format instead:
def process_request(input_data):
# For debugging, return diagnostic info as an error
return {
'operation': 'NONE',
'error': f'Debug: data = {input_data.get("data")}'
}
Third-party libraries (like requests, numpy, etc.) are not currently available.
Complete Examples
Example 1: User Registration with Validation
import re
def process_request(input_data):
data = input_data.get('data', {})
existing = input_data.get('existing_records', [])
# Validate email format
email = data.get('email', '')
if not re.match(r'^[\w\.-]+@[\w\.-]+\.\w+$', email):
return {
'operation': 'NONE',
'error': 'Invalid email format'
}
# Check for duplicate email
for record in existing:
if record.get('email') == email:
return {
'operation': 'NONE',
'error': 'Email already registered'
}
# Validate password strength (if provided)
password = data.get('password', '')
if len(password) < 8:
return {
'operation': 'NONE',
'error': 'Password must be at least 8 characters'
}
# Create user
return {
'operation': 'INSERT',
'table': 'users',
'columns': [
{'name': 'email', 'type': 'TEXT'},
{'name': 'name', 'type': 'TEXT'},
{'name': 'is_active', 'type': 'BOOLEAN'},
{'name': 'created_at', 'type': 'TIMESTAMP'}
],
'data': {
'email': email,
'name': data.get('name'),
'is_active': True,
'created_at': datetime.now().isoformat()
}
}
Example 2: Rate Limiter
from datetime import datetime, timedelta
def process_request(input_data):
data = input_data.get('data', {})
existing = input_data.get('existing_records', [])
user_id = data.get('user_id')
current_time = datetime.now()
one_minute_ago = current_time - timedelta(minutes=1)
# Count requests in last minute for this user
recent_requests = 0
for record in existing:
if record.get('user_id') == user_id:
request_time = datetime.fromisoformat(record.get('timestamp', ''))
if request_time > one_minute_ago:
recent_requests += 1
# Limit: 10 requests per minute
if recent_requests >= 10:
return {
'operation': 'NONE',
'error': 'Rate limit exceeded. Try again later.'
}
# Log this request
return {
'operation': 'INSERT',
'table': 'rate_limits',
'columns': [
{'name': 'user_id', 'type': 'TEXT'},
{'name': 'timestamp', 'type': 'TIMESTAMP'}
],
'data': {
'user_id': user_id,
'timestamp': current_time.isoformat()
}
}
Example 3: Cache System
import hashlib
from datetime import datetime, timedelta
def process_request(input_data):
data = input_data.get('data', {})
existing = input_data.get('existing_records', [])
# Generate cache key from URL
url = data.get('url')
cache_key = hashlib.md5(url.encode()).hexdigest()
# Check if cached (and not expired)
current_time = datetime.now()
for record in existing:
if record.get('record_id') == cache_key:
expiry = datetime.fromisoformat(record.get('expires_at'))
if current_time < expiry:
return {
'operation': 'NONE',
'error': 'Cache hit - data already exists'
}
# Cache miss - store new data
expiry_time = current_time + timedelta(hours=1)
return {
'operation': 'INSERT',
'table': 'cache',
'columns': [
{'name': 'record_id', 'type': 'TEXT'},
{'name': 'url', 'type': 'TEXT'},
{'name': 'content', 'type': 'TEXT'},
{'name': 'expires_at', 'type': 'TIMESTAMP'}
],
'data': {
'record_id': cache_key,
'url': url,
'content': data.get('content'),
'expires_at': expiry_time.isoformat()
}
}
Best Practices
- Always validate input data - Never trust incoming data
- Use meaningful variable names - Make your code readable
- Return clear error messages - Help users understand what went wrong
- Check existing_records when needed - Prevent duplicates and conflicts
- Use appropriate column types - INTEGER for numbers, TEXT for strings, etc.
- Document your logic - Add comments explaining complex operations
- Test incrementally - Start simple, add complexity gradually
- Handle edge cases - What if data is missing? What if it's invalid?
Troubleshooting
"Invalid code output structure"
- Cause: Return dictionary missing required keys
- Solution: Ensure you return
operation, and includecolumns/datafor INSERT/UPDATE
"Error processing the request"
- Cause: Python syntax error or runtime exception
- Solution: Check the error output, verify your Python syntax
"Missing 'operation' field"
- Cause: Forgot to include
operationin return dictionary - Solution: Always include
'operation': 'INSERT','UPDATE', or'NONE'
Custom ID not working
- Cause: Forgot to add
record_idto columns or data - Solution: Add
{'name': 'record_id', 'type': 'TEXT'}to columns AND'record_id': 'your_id'to data
Can't access existing records
- Cause: Variable name typo or wrong key
- Solution: Use
input_data.get('existing_records', [])exactly
Next Steps
- Learn about Database Component for data persistence
- Explore Load Balancer Component for scaling
- Review User Request Component for sending requests