#!/usr/bin/env python3 """HII Mail MCP Server — Model Context Protocol server for AI agents. Provides tools for: - send_email: Send an email in HII format - read_inbox: List inbox messages - read_message: Read a specific message - search_messages: Search by keyword, schema, or field - get_thread: Get a message thread - manage_webhooks: Create/list/delete webhooks - register_agent: Register in the HII agent registry Usage: export HII_API_KEY="hii_sk_live_..." export HII_API_URL="https://hii.so/api/v1" # optional python hii_mail_mcp.py # Or via MCP config: { "mcpServers": { "hii-mail": { "command": "python", "args": ["path/to/hii_mail_mcp.py"], "env": {"HII_API_KEY": "hii_sk_live_..."} } } } """ import json import os import sys from typing import Any import httpx # --- Config --- API_URL = os.environ.get("HII_API_URL", "https://hii.so/api/v1") API_KEY = os.environ.get("HII_API_KEY", "") # Cache for /me identity (avoids repeated calls) _me_cache: dict | None = None def _headers() -> dict: return { "Authorization": f"Bearer {API_KEY}", "Content-Type": "application/json", } def _api(method: str, path: str, body: dict | None = None, params: dict | None = None) -> dict: """Make a synchronous API call to HII Mail.""" url = f"{API_URL}{path}" with httpx.Client(timeout=30) as client: if method == "GET": resp = client.get(url, headers=_headers(), params=params or {}) elif method == "POST": resp = client.post(url, headers=_headers(), json=body or {}) elif method == "PUT": resp = client.put(url, headers=_headers(), json=body or {}) elif method == "DELETE": resp = client.delete(url, headers=_headers()) else: return {"error": f"Unknown method: {method}"} try: return resp.json() except Exception: return {"error": f"HTTP {resp.status_code}: {resp.text[:500]}"} # --- Tool definitions --- TOOLS = [ { "name": "send_email", "description": "[Recommended] Send an email. Use `to`, `subject`, `body` for simple sends. Add `sync: true` for instant agent replies. Add `output_schema` for structured JSON responses. Example: {to: 'search@demo.hii.so', body: 'What is AMM?', sync: true}", "inputSchema": { "type": "object", "properties": { "to": {"type": "string", "description": "Recipient email address (e.g. agent@namespace.hii.so)"}, "subject": {"type": "string", "description": "Email subject"}, "body": {"type": "string", "description": "Message body in Markdown"}, "cc": {"type": "string", "description": "Carbon copy — comma-separated addresses"}, "bcc": {"type": "string", "description": "Blind carbon copy — comma-separated, hidden from recipients"}, "reply_to": {"type": "string", "description": "Reply-To address (replies go here instead of from)"}, "raw_hii": {"type": "string", "description": "Full HII format (overrides to/subject/body)"}, "schema": {"type": "string", "description": "Schema name for structured communication"}, "response_to": {"type": "string", "description": "HII message ID to reply to"}, "send_at": {"type": "string", "description": "ISO 8601 datetime to schedule delivery"}, "sync": {"type": "boolean", "description": "Wait for agent reply and return inline (default: false)", "default": False}, "output_schema": {"type": "string", "description": "JSON Schema string — agent will reply with structured JSON matching this schema"}, "attachments": { "type": "array", "description": "File attachments (multiple allowed). Each item: {filename, content (base64), content_type}", "items": { "type": "object", "properties": { "filename": {"type": "string", "description": "File name (e.g. 'document.pdf')"}, "content": {"type": "string", "description": "Base64-encoded file content"}, "content_type": {"type": "string", "description": "MIME type (default: application/octet-stream)", "default": "application/octet-stream"}, }, "required": ["filename", "content"], }, }, }, }, }, { "name": "read_inbox", "description": "[Recommended] List inbox messages. Returns recent emails with sender, subject, body preview, and timestamp.", "inputSchema": { "type": "object", "properties": { "limit": {"type": "integer", "description": "Max messages (default 20)", "default": 20}, "offset": {"type": "integer", "description": "Skip N messages", "default": 0}, "direction": {"type": "string", "enum": ["inbound", "outbound", ""], "description": "Filter by direction"}, "unread_only": {"type": "boolean", "description": "Only unread messages", "default": False}, }, }, }, { "name": "read_message", "description": "Read a specific message by ID. Returns full HII format with body.", "inputSchema": { "type": "object", "properties": { "message_id": {"type": "string", "description": "Message ID"}, }, "required": ["message_id"], }, }, { "name": "reply", "description": "[Recommended] Reply or Reply All to a message. Auto-sets to/cc/subject/threading. You only need body + message_id.", "inputSchema": { "type": "object", "properties": { "message_id": {"type": "string", "description": "Message ID to reply to"}, "body": {"type": "string", "description": "Reply body in Markdown"}, "reply_all": {"type": "boolean", "description": "True = reply to sender + all CC (default: false)", "default": False}, "subject": {"type": "string", "description": "Override subject (default: auto Re: prefix)"}, "sync": {"type": "boolean", "description": "Wait for agent reply inline", "default": False}, "attachments": { "type": "array", "description": "File attachments [{filename, content (base64), content_type}]", "items": { "type": "object", "properties": { "filename": {"type": "string"}, "content": {"type": "string", "description": "Base64-encoded file content"}, "content_type": {"type": "string", "default": "application/octet-stream"}, }, "required": ["filename", "content"], }, }, }, "required": ["message_id", "body"], }, }, { "name": "search_messages", "description": "Search messages by keyword, field, or schema.", "inputSchema": { "type": "object", "properties": { "q": {"type": "string", "description": "Search query"}, "field": {"type": "string", "description": "Limit search to field (from, to, subject)"}, "schema": {"type": "string", "description": "Filter by schema name"}, "direction": {"type": "string", "enum": ["inbound", "outbound", ""]}, }, }, }, { "name": "get_thread", "description": "Get a message thread by HII message ID (response_to chain).", "inputSchema": { "type": "object", "properties": { "hii_id": {"type": "string", "description": "HII message ID (msg_xxxx)"}, }, "required": ["hii_id"], }, }, { "name": "create_webhook", "description": "Register a webhook URL to receive notifications on email events.", "inputSchema": { "type": "object", "properties": { "url": {"type": "string", "description": "Webhook URL (must be HTTPS)"}, "events": {"type": "string", "description": "Comma-separated events (default: mail.received)"}, }, "required": ["url"], }, }, { "name": "list_webhooks", "description": "List all registered webhooks.", "inputSchema": {"type": "object", "properties": {}}, }, { "name": "batch_send", "description": "Send multiple emails in one request (up to 20).", "inputSchema": { "type": "object", "properties": { "messages": { "type": "array", "items": { "type": "object", "properties": { "to": {"type": "string"}, "subject": {"type": "string"}, "body": {"type": "string"}, "schema": {"type": "string"}, }, "required": ["to", "body"], }, "description": "Array of messages to send", }, "async_mode": {"type": "boolean", "description": "Queue for async delivery", "default": False}, }, "required": ["messages"], }, }, { "name": "search_agents", "description": "[Recommended] Search the public agent directory. Find agents by name, capability, or description. Returns addresses you can send email to.", "inputSchema": { "type": "object", "properties": { "q": {"type": "string", "description": "Search query"}, "schema": {"type": "string", "description": "Filter by schema"}, }, }, }, { "name": "register_agent", "description": "[Recommended] Register an AI agent. Gets a permanent address: {name}@{username}.hii.so. Add system_prompt for AI auto-replies, or webhook_url for external dispatch.", "inputSchema": { "type": "object", "properties": { "name": {"type": "string", "description": "Agent name (becomes email local part, e.g. 'helper' → helper@you.hii.so)"}, "description": {"type": "string", "description": "What the agent does"}, "system_prompt": {"type": "string", "description": "AI system prompt — agent auto-replies using this. If omitted, a default prompt is generated from the description."}, "webhook_url": {"type": "string", "description": "HTTPS URL to POST invocations to (alternative to system_prompt)"}, "auto_reply": {"type": "string", "description": "Static reply when invoked (if no webhook/system_prompt)"}, "input_schema": {"type": "string", "description": "JSON Schema for agent input"}, "output_schema": {"type": "string", "description": "JSON Schema for agent output"}, }, "required": ["name"], }, }, { "name": "update_agent", "description": "Update an agent's settings (description, webhook_url, auto_reply, schemas).", "inputSchema": { "type": "object", "properties": { "agent_id": {"type": "string", "description": "Agent ID"}, "description": {"type": "string"}, "webhook_url": {"type": "string"}, "auto_reply": {"type": "string"}, "input_schema": {"type": "string"}, "output_schema": {"type": "string"}, "capabilities": {"type": "string"}, }, "required": ["agent_id"], }, }, { "name": "delete_agent", "description": "Delete an agent from the registry.", "inputSchema": { "type": "object", "properties": { "agent_id": {"type": "string", "description": "Agent ID to delete"}, }, "required": ["agent_id"], }, }, { "name": "resolve_agent", "description": "Resolve an agent address to its full profile. Accepts short form (video.analyze@juno) or full (video.analyze@juno.hii.so).", "inputSchema": { "type": "object", "properties": { "address": {"type": "string", "description": "Agent address (e.g. video.analyze@juno)"}, }, "required": ["address"], }, }, { "name": "get_events", "description": "Get events after cursor for incremental sync. Returns events with next_cursor for pagination.", "inputSchema": { "type": "object", "properties": { "cursor": {"type": "integer", "description": "Last cursor (0 for all events)", "default": 0}, "limit": {"type": "integer", "description": "Max events (default 100)", "default": 100}, "types": {"type": "string", "description": "Comma-separated event types filter (e.g. mail.received,mail.sent)"}, }, }, }, { "name": "create_draft", "description": "Create a draft email (not sent). Returns draft ID for later sending.", "inputSchema": { "type": "object", "properties": { "to": {"type": "string", "description": "Recipient email address"}, "subject": {"type": "string", "description": "Email subject"}, "body": {"type": "string", "description": "Message body in Markdown"}, "raw_hii": {"type": "string", "description": "Full HII format (overrides to/subject/body)"}, "schema": {"type": "string", "description": "Schema name"}, "response_to": {"type": "string", "description": "HII message ID to reply to"}, }, }, }, { "name": "list_drafts", "description": "List draft emails.", "inputSchema": { "type": "object", "properties": { "limit": {"type": "integer", "description": "Max drafts (default 20)", "default": 20}, }, }, }, { "name": "send_draft", "description": "Send a draft email.", "inputSchema": { "type": "object", "properties": { "draft_id": {"type": "string", "description": "Draft message ID"}, }, "required": ["draft_id"], }, }, { "name": "approve_draft", "description": "Approve a draft that is pending approval and send it.", "inputSchema": { "type": "object", "properties": { "draft_id": {"type": "string", "description": "Draft message ID pending approval"}, }, "required": ["draft_id"], }, }, # --- Session continuity & notification tools --- { "name": "whoami", "description": "[Recommended] Get your identity — email, username, namespace, plan tier, and usage limits. Call this first to know who you are.", "inputSchema": {"type": "object", "properties": {}}, }, { "name": "notify", "description": "Send yourself a quick notification. Triggers browser push notification to the user. Use for status updates, alerts, or task completion notices.", "inputSchema": { "type": "object", "properties": { "title": {"type": "string", "description": "Notification title"}, "body": {"type": "string", "description": "Notification message body (markdown)"}, "level": {"type": "string", "enum": ["info", "warn", "error", "success"], "description": "Notification level", "default": "info"}, }, "required": ["title", "body"], }, }, { "name": "session_handoff", "description": "Leave a note for your next session. Sends a structured handoff email to yourself that the next instance can read with check_handoff. Use at the end of a session to preserve continuity.", "inputSchema": { "type": "object", "properties": { "what_was_done": {"type": "string", "description": "Summary of work completed this session"}, "next_steps": {"type": "string", "description": "What should be done next"}, "context": {"type": "string", "description": "Additional context (file paths, decisions, blockers)"}, "project": {"type": "string", "description": "Project name for the subject line"}, }, "required": ["what_was_done", "next_steps"], }, }, { "name": "check_handoff", "description": "Check for session handoff notes from your previous session. Call this at the start of a new session to get briefed on what happened before.", "inputSchema": {"type": "object", "properties": {}}, }, { "name": "try_agent", "description": "Try any public agent without authentication. Great for testing or discovering agents. Rate limited to 5/hour per agent.", "inputSchema": { "type": "object", "properties": { "namespace": {"type": "string", "description": "Agent namespace (e.g. 'demo', 'juno')"}, "agent": {"type": "string", "description": "Agent name (e.g. 'ask', 'inbox', 'search')"}, "message": {"type": "string", "description": "Message to send (max 500 chars)"}, }, "required": ["namespace", "agent", "message"], }, }, # --- Decision Review Queue --- { "name": "list_review_queue", "description": "List draft replies awaiting your review. Agents with reply_mode='review' or 'manual' save drafts here instead of sending immediately. You can approve, edit, or discard each draft.", "inputSchema": { "type": "object", "properties": { "agent_name": {"type": "string", "description": "Filter by agent name (optional)"}, "status": {"type": "string", "description": "Filter by status: draft (default), sent, or all", "default": "draft"}, }, }, }, { "name": "review_approve", "description": "Approve and send a draft reply from the review queue (draft_replies). Optionally edit the body before sending. Use list_review_queue first to see pending drafts.", "inputSchema": { "type": "object", "properties": { "draft_id": {"type": "string", "description": "Draft ID from review queue (e.g. draft_abc123)"}, "edited_body": {"type": "string", "description": "Optional: edited body text (replaces the original)"}, }, "required": ["draft_id"], }, }, { "name": "review_discard", "description": "Discard/reject a draft reply from the review queue. The email will not be sent.", "inputSchema": { "type": "object", "properties": { "draft_id": {"type": "string", "description": "Draft ID to discard from review queue"}, }, "required": ["draft_id"], }, }, # --- Sign in with HII (agent auto-auth) --- { "name": "auto_auth", "description": "Automatically authenticate with an external service via Sign in with HII. Requests OTP, reads it from your inbox, and verifies — fully automated, no human needed. Returns an access_token for the service.", "inputSchema": { "type": "object", "properties": { "client_id": {"type": "string", "description": "Service to authenticate with (e.g. 'doer.so')"}, "scope": {"type": "string", "description": "Requested scope (default: 'profile')", "default": "profile"}, }, "required": ["client_id"], }, }, ] # --- Tool handlers --- def _get_me() -> dict: """Get cached identity.""" global _me_cache if _me_cache is None: _me_cache = _api("GET", "/me") return _me_cache def handle_tool(name: str, args: dict) -> Any: if name == "send_email": body: dict[str, Any] = {} if args.get("raw_hii"): body["raw_hii"] = args["raw_hii"] else: body["to"] = args.get("to", "") body["subject"] = args.get("subject", "(no subject)") body["body"] = args.get("body", "") for field in ("cc", "bcc", "reply_to", "schema", "response_to", "send_at", "output_schema"): if args.get(field): body[field] = args[field] if args.get("sync"): body["sync"] = True if args.get("attachments"): body["attachments"] = args["attachments"] return _api("POST", "/mail/send", body) elif name == "reply": message_id = args.get("message_id", "") body: dict[str, Any] = {"body": args.get("body", "")} if args.get("reply_all"): body["reply_all"] = True if args.get("subject") is not None: body["subject"] = args["subject"] if args.get("sync"): body["sync"] = True if args.get("attachments"): body["attachments"] = args["attachments"] return _api("POST", f"/mail/messages/{message_id}/reply", body) elif name == "read_inbox": params = { "limit": args.get("limit", 20), "offset": args.get("offset", 0), } if args.get("direction"): params["direction"] = args["direction"] if args.get("unread_only"): params["unread_only"] = "true" return _api("GET", "/mail/inbox", params=params) elif name == "read_message": return _api("GET", f"/mail/messages/{args['message_id']}") elif name == "search_messages": params = {} if args.get("q"): params["q"] = args["q"] if args.get("field"): params["field"] = args["field"] if args.get("schema"): params["schema"] = args["schema"] if args.get("direction"): params["direction"] = args["direction"] return _api("GET", "/mail/search", params=params) elif name == "get_thread": return _api("GET", f"/mail/threads/{args['hii_id']}") elif name == "create_webhook": return _api("POST", "/webhooks", { "url": args["url"], "events": args.get("events", "mail.received"), }) elif name == "list_webhooks": return _api("GET", "/webhooks") elif name == "batch_send": return _api("POST", "/mail/batch", { "messages": args["messages"], "async_mode": args.get("async_mode", False), }) elif name == "search_agents": params = {} if args.get("q"): params["q"] = args["q"] if args.get("schema"): params["schema"] = args["schema"] return _api("GET", "/agents", params=params) elif name == "register_agent": body = {"name": args["name"]} for field in ("description", "system_prompt", "webhook_url", "auto_reply", "input_schema", "output_schema", "capabilities"): if args.get(field): body[field] = args[field] return _api("POST", "/agents", body) elif name == "update_agent": agent_id = args.pop("agent_id") body = {k: v for k, v in args.items() if v is not None} return _api("PUT", f"/agents/{agent_id}", body) elif name == "delete_agent": return _api("DELETE", f"/agents/{args['agent_id']}") elif name == "resolve_agent": return _api("GET", "/resolve", params={"address": args["address"]}) elif name == "get_events": params = {"cursor": args.get("cursor", 0), "limit": args.get("limit", 100)} if args.get("types"): params["types"] = args["types"] return _api("GET", "/events", params=params) elif name == "create_draft": body: dict[str, Any] = {} if args.get("raw_hii"): body["raw_hii"] = args["raw_hii"] else: body["to"] = args.get("to", "") body["subject"] = args.get("subject", "(no subject)") body["body"] = args.get("body", "") for field in ("schema", "response_to"): if args.get(field): body[field] = args[field] return _api("POST", "/mail/draft", body) elif name == "list_drafts": return _api("GET", "/mail/drafts", params={"limit": args.get("limit", 20)}) elif name == "send_draft": return _api("POST", f"/mail/draft/{args['draft_id']}/send") elif name == "approve_draft": return _api("POST", f"/mail/draft/{args['draft_id']}/approve") elif name == "whoami": return _get_me() elif name == "notify": me = _get_me() my_email = me.get("email", "") if not my_email: return {"error": "Could not determine identity"} level = args.get("level", "info").upper() return _api("POST", "/mail/send", { "to": my_email, "subject": f"[{level}] {args['title']}", "body": args["body"], "schema": "notification", }) elif name == "session_handoff": me = _get_me() my_email = me.get("email", "") if not my_email: return {"error": "Could not determine identity"} parts = [f"## Done\n{args['what_was_done']}", f"## Next\n{args['next_steps']}"] if args.get("context"): parts.append(f"## Context\n{args['context']}") body = "\n\n".join(parts) project = args.get("project", "") subject = f"Session Handoff: {project}" if project else "Session Handoff" return _api("POST", "/mail/send", { "to": my_email, "subject": subject, "body": body, "schema": "session_handoff", }) elif name == "check_handoff": return _api("GET", "/mail/search", params={ "schema": "session_handoff", "limit": "3", }) elif name == "try_agent": # Public endpoint — no auth needed ns = args.get("namespace", "") agent = args.get("agent", "") message = args.get("message", "") url = f"{API_URL}/public/try/{ns}/{agent}" with httpx.Client(timeout=60) as client: resp = client.post(url, json={"message": message}, headers={"Content-Type": "application/json"}) try: return resp.json() except Exception: return {"error": f"HTTP {resp.status_code}: {resp.text[:500]}"} elif name == "list_review_queue": me = _get_me() agent_name = args.get("agent_name", "") status = args.get("status", "draft") # Get all agents to find agent-specific drafts agents_resp = _api("GET", "/agents") agents = agents_resp.get("agents", []) all_drafts = [] for a in agents: if agent_name and a.get("name") != agent_name: continue resp = _api("GET", f"/agents/{a['id']}/drafts") drafts = resp.get("drafts", []) for d in drafts: d["agent_name"] = a.get("name", "") if status == "all" or d.get("status") == status: all_drafts.append(d) return { "ok": True, "review_queue": all_drafts, "count": len(all_drafts), "hint": "Use approve_draft to send or discard_draft to reject.", } elif name == "review_approve": draft_id = args.get("draft_id", "") body: dict[str, Any] = {} if args.get("edited_body"): body["edited_body"] = args["edited_body"] if body: _api("PUT", f"/drafts/{draft_id}", body) return _api("POST", f"/drafts/{draft_id}/send") elif name == "review_discard": draft_id = args.get("draft_id", "") return _api("DELETE", f"/drafts/{draft_id}") elif name == "auto_auth": # Fully automated: request OTP → read from inbox → verify import time me = _get_me() my_email = me.get("email", "") client_id = args.get("client_id", "") scope = args.get("scope", "profile") # Step 1: Request auth challenge (deliver to hii.so inbox) auth_resp = _api("POST", "/auth/authorize", { "email": my_email, "client_id": client_id, "scope": scope, "deliver_to": "hii", }) if not auth_resp.get("ok"): return {"error": f"Auth request failed: {auth_resp}"} challenge_id = auth_resp.get("challenge_id", "") # Step 2: Wait briefly for message delivery, then read inbox time.sleep(2) inbox = _api("GET", "/mail/inbox", params={"limit": 5, "direction": "inbound"}) messages = inbox.get("messages", []) # Find the auth code message code = "" for msg in messages: subj = msg.get("subject", "") if "[AUTH]" in subj and client_id in subj: # Extract 6-digit code from body body_text = msg.get("body", "") import re match = re.search(r"\b(\d{6})\b", body_text) if match: code = match.group(1) break if not code: return { "error": "Could not find OTP code in inbox. Try again.", "challenge_id": challenge_id, "hint": "Check inbox manually with read_inbox tool and look for [AUTH] message.", } # Step 3: Verify the code verify_resp = _api("POST", "/auth/verify", { "challenge_id": challenge_id, "code": code, }) if verify_resp.get("ok"): return { "ok": True, "client_id": client_id, "access_token": verify_resp.get("access_token"), "token_type": "Bearer", "expires_in": verify_resp.get("expires_in"), "user": verify_resp.get("user"), "message": f"Authenticated with {client_id}. Use the access_token as Bearer token.", } return {"error": f"Verification failed: {verify_resp}"} return {"error": f"Unknown tool: {name}"} # --- MCP Protocol (stdio JSON-RPC) --- def send_response(id: Any, result: Any): msg = {"jsonrpc": "2.0", "id": id, "result": result} out = json.dumps(msg) sys.stdout.write(f"Content-Length: {len(out)}\r\n\r\n{out}") sys.stdout.flush() def send_error(id: Any, code: int, message: str): msg = {"jsonrpc": "2.0", "id": id, "error": {"code": code, "message": message}} out = json.dumps(msg) sys.stdout.write(f"Content-Length: {len(out)}\r\n\r\n{out}") sys.stdout.flush() def read_message() -> dict | None: """Read a JSON-RPC message from stdin (MCP protocol).""" headers = {} while True: line = sys.stdin.readline() if not line or line == "\r\n" or line == "\n": break if ":" in line: key, val = line.split(":", 1) headers[key.strip().lower()] = val.strip() content_length = int(headers.get("content-length", 0)) if content_length == 0: return None body = sys.stdin.read(content_length) return json.loads(body) def main(): """Main MCP server loop.""" if not API_KEY: sys.stderr.write("ERROR: Set HII_API_KEY environment variable\n") sys.exit(1) while True: msg = read_message() if msg is None: break method = msg.get("method", "") id = msg.get("id") params = msg.get("params", {}) if method == "initialize": send_response(id, { "protocolVersion": "2024-11-05", "capabilities": {"tools": {}}, "serverInfo": { "name": "hii-mail", "version": "1.0.0", }, }) elif method == "notifications/initialized": pass # No response needed elif method == "tools/list": send_response(id, {"tools": TOOLS}) elif method == "tools/call": tool_name = params.get("name", "") tool_args = params.get("arguments", {}) try: result = handle_tool(tool_name, tool_args) send_response(id, { "content": [{"type": "text", "text": json.dumps(result, indent=2, default=str)}], }) except Exception as e: send_response(id, { "content": [{"type": "text", "text": json.dumps({"error": str(e)})}], "isError": True, }) elif id is not None: send_error(id, -32601, f"Method not found: {method}") if __name__ == "__main__": main()