Creating Custom Abilities¶
Learn how to create custom abilities that extend your assistant's capabilities.
Overview¶
Abilities are pluggable skills that your assistant can execute. They follow a simple pattern: 1. Define metadata (name, description, parameters) 2. Implement execution logic 3. Return structured responses
Basic Ability¶
Step 1: Extend BaseAbility¶
from bruno_core.base import BaseAbility
from bruno_core.models import (
AbilityMetadata,
AbilityParameter,
AbilityRequest,
AbilityResponse,
)
class WeatherAbility(BaseAbility):
"""Get weather information for locations."""
def get_metadata(self) -> AbilityMetadata:
return AbilityMetadata(
name="weather",
description="Get current weather and forecasts",
version="1.0.0",
parameters=[
AbilityParameter(
name="location",
type="string",
description="City or location name",
required=True,
),
AbilityParameter(
name="units",
type="string",
description="Temperature units",
required=False,
allowed_values=["celsius", "fahrenheit"],
default_value="celsius",
),
],
examples=[
"What's the weather in London?",
"Get weather for New York",
"Weather forecast for Tokyo",
],
)
async def execute_action(self, request: AbilityRequest) -> AbilityResponse:
"""Execute the weather lookup."""
location = request.parameters.get("location")
units = request.parameters.get("units", "celsius")
try:
# Your implementation here
weather_data = await self._fetch_weather(location, units)
return AbilityResponse(
request_id=request.id,
ability_name="weather",
action=request.action,
success=True,
message=f"Weather in {location}: {weather_data['description']}",
data=weather_data,
)
except Exception as e:
return AbilityResponse(
request_id=request.id,
ability_name="weather",
action=request.action,
success=False,
error=str(e),
)
def get_supported_actions(self) -> list[str]:
"""Actions this ability can handle."""
return ["get_weather", "weather_forecast", "current_weather"]
async def _fetch_weather(self, location: str, units: str) -> dict:
"""Fetch weather from API."""
# Implementation details
pass
Step 2: Register the Ability¶
from bruno_core.base import BaseAssistant
# Create and register
weather = WeatherAbility()
await assistant.register_ability(weather)
Ability Lifecycle¶
Initialization¶
class MyAbility(BaseAbility):
async def initialize(self):
"""Called when ability is registered."""
await super().initialize()
# Your setup code
self.api_client = ApiClient()
self.cache = {}
Shutdown¶
async def shutdown(self):
"""Called when ability is unregistered."""
# Cleanup code
await self.api_client.close()
await super().shutdown()
Health Check¶
async def health_check(self) -> dict:
"""Check if ability is healthy."""
health = await super().health_check()
health["api_status"] = await self.api_client.ping()
return health
Advanced Features¶
Custom Validation¶
class ValidatedAbility(BaseAbility):
def validate_request(self, request: AbilityRequest) -> bool:
"""Custom validation logic."""
if not super().validate_request(request):
return False
# Custom checks
location = request.parameters.get("location")
if not location or len(location) < 2:
return False
return True
Rollback Support¶
class TransactionalAbility(BaseAbility):
async def execute_action(self, request: AbilityRequest) -> AbilityResponse:
# Store rollback info
self.last_action = {
"type": request.action,
"data": {"old_value": current_value},
}
# Execute
result = await self._do_action(request)
return result
async def rollback(self, request: AbilityRequest) -> None:
"""Undo the last action."""
if self.last_action:
await self._restore(self.last_action["data"])
State Management¶
class StatefulAbility(BaseAbility):
def __init__(self):
super().__init__()
self.state = {}
async def execute_action(self, request: AbilityRequest) -> AbilityResponse:
user_id = request.user_id
# Get user-specific state
user_state = self.state.get(user_id, {})
# Update state
user_state["last_action"] = request.action
self.state[user_id] = user_state
# Execute
return await self._process(request, user_state)
Ability Patterns¶
API Integration¶
import aiohttp
class ApiAbility(BaseAbility):
def __init__(self, api_key: str):
super().__init__()
self.api_key = api_key
self.session = None
async def initialize(self):
await super().initialize()
self.session = aiohttp.ClientSession(
headers={"Authorization": f"Bearer {self.api_key}"}
)
async def shutdown(self):
if self.session:
await self.session.close()
await super().shutdown()
async def execute_action(self, request: AbilityRequest) -> AbilityResponse:
async with self.session.get(self.api_url) as resp:
data = await resp.json()
return AbilityResponse(
request_id=request.id,
ability_name=self._metadata.name,
action=request.action,
success=True,
data=data,
)
Database Operations¶
import asyncpg
class DatabaseAbility(BaseAbility):
def __init__(self, db_url: str):
super().__init__()
self.db_url = db_url
self.pool = None
async def initialize(self):
await super().initialize()
self.pool = await asyncpg.create_pool(self.db_url)
async def shutdown(self):
if self.pool:
await self.pool.close()
await super().shutdown()
async def execute_action(self, request: AbilityRequest) -> AbilityResponse:
query = request.parameters.get("query")
async with self.pool.acquire() as conn:
result = await conn.fetch(query)
return AbilityResponse(
request_id=request.id,
ability_name=self._metadata.name,
action=request.action,
success=True,
data={"rows": [dict(r) for r in result]},
)
File Operations¶
import aiofiles
class FileAbility(BaseAbility):
async def execute_action(self, request: AbilityRequest) -> AbilityResponse:
action = request.action
file_path = request.parameters.get("path")
if action == "read":
async with aiofiles.open(file_path, 'r') as f:
content = await f.read()
return self._success_response(request, {"content": content})
elif action == "write":
content = request.parameters.get("content")
async with aiofiles.open(file_path, 'w') as f:
await f.write(content)
return self._success_response(request, {"written": True})
Scheduled Tasks¶
import asyncio
class ScheduledAbility(BaseAbility):
def __init__(self):
super().__init__()
self.tasks = {}
async def execute_action(self, request: AbilityRequest) -> AbilityResponse:
action = request.action
if action == "schedule":
delay = request.parameters.get("delay", 60)
task_id = str(uuid.uuid4())
task = asyncio.create_task(self._run_after_delay(delay, request))
self.tasks[task_id] = task
return self._success_response(request, {"task_id": task_id})
elif action == "cancel":
task_id = request.parameters.get("task_id")
task = self.tasks.get(task_id)
if task:
task.cancel()
return self._success_response(request, {"cancelled": True})
async def _run_after_delay(self, delay: int, original_request: AbilityRequest):
await asyncio.sleep(delay)
# Execute the scheduled action
await self._do_scheduled_action(original_request)
Plugin Registration¶
Via Entry Points¶
# setup.py
from setuptools import setup
setup(
name="my-bruno-abilities",
packages=["my_abilities"],
entry_points={
"bruno.abilities": [
"weather = my_abilities.weather:WeatherAbility",
"timer = my_abilities.timer:TimerAbility",
"notes = my_abilities.notes:NotesAbility",
]
}
)
Manual Registration¶
from bruno_core.registry import AbilityRegistry
registry = AbilityRegistry()
registry.register(
name="weather",
plugin_class=WeatherAbility,
version="1.0.0",
metadata={"category": "utilities"}
)
Testing Abilities¶
import pytest
from bruno_core.models import AbilityRequest
@pytest.mark.asyncio
async def test_weather_ability():
ability = WeatherAbility()
await ability.initialize()
request = AbilityRequest(
ability_name="weather",
action="get_weather",
parameters={"location": "London", "units": "celsius"},
user_id="test-user",
)
response = await ability.execute(request)
assert response.success is True
assert "weather" in response.data
await ability.shutdown()
Best Practices¶
1. Use Type Hints¶
async def execute_action(self, request: AbilityRequest) -> AbilityResponse:
location: str = request.parameters.get("location")
units: str = request.parameters.get("units", "celsius")
2. Validate Input¶
def validate_request(self, request: AbilityRequest) -> bool:
if not super().validate_request(request):
return False
location = request.parameters.get("location")
if not location or not isinstance(location, str):
return False
return True
3. Handle Errors Gracefully¶
async def execute_action(self, request: AbilityRequest) -> AbilityResponse:
try:
result = await self._do_work(request)
return self._success_response(request, result)
except ApiError as e:
logger.error("api_error", error=str(e))
return self._error_response(request, f"API error: {str(e)}")
except Exception as e:
logger.error("unexpected_error", error=str(e))
return self._error_response(request, "An unexpected error occurred")
4. Log Appropriately¶
from bruno_core.utils.logging import get_logger
logger = get_logger(__name__)
async def execute_action(self, request: AbilityRequest) -> AbilityResponse:
logger.info("ability_executing",
ability=self._metadata.name,
action=request.action,
user_id=request.user_id)
result = await self._do_work(request)
logger.info("ability_executed",
ability=self._metadata.name,
success=True)
return self._success_response(request, result)
5. Document Thoroughly¶
class DocumentedAbility(BaseAbility):
"""
A well-documented ability.
This ability does X, Y, and Z. It requires API credentials
and has the following limitations:
- Rate limit: 100 requests/hour
- Max payload: 1MB
Example:
>>> ability = DocumentedAbility(api_key="...")
>>> await ability.initialize()
>>> response = await ability.execute(request)
"""
async def execute_action(self, request: AbilityRequest) -> AbilityResponse:
"""
Execute the ability action.
Args:
request: Ability request with action and parameters
Returns:
AbilityResponse with success status and data
Raises:
ApiError: If API call fails
ValidationError: If parameters are invalid
"""
pass
Examples¶
See the examples directory for complete, working examples of: - Basic ability - API integration ability - Database ability - File operations ability - Scheduled task ability
Troubleshooting¶
Ability Not Detected¶
Ensure: 1. Ability is registered with assistant 2. get_supported_actions() returns correct action names 3. Action keywords appear in user messages
Validation Failures¶
Check: 1. Required parameters are present 2. Parameter types match metadata 3. allowed_values constraints are met
Execution Errors¶
Debug by: 1. Adding logging statements 2. Checking health_check() output 3. Testing with simple inputs first 4. Reviewing error messages in response
For more examples, see the examples directory.