Source code for instrmcp.servers.jupyter_qcodes.core.notebook_tools

"""
Jupyter notebook tool registrar.

Registers tools for interacting with Jupyter notebook variables and cells.
"""

import json
import time
from typing import List, Optional

from mcp.types import TextContent
from ..active_cell_bridge import (  # noqa: F401 - invalidate_cell_output_cache exposed for unsafe tools
    get_cell_outputs,
    get_cached_cell_output,
    get_active_cell_output,
    get_notebook_structure,
    get_cells_by_index,
    invalidate_cell_output_cache,
)
from instrmcp.utils.logging_config import get_logger
from instrmcp.utils.mcptool_logger import log_tool_call

logger = get_logger("tools.notebook")


[docs] class NotebookToolRegistrar: """Registers Jupyter notebook tools with the MCP server."""
[docs] def __init__( self, mcp_server, tools, ipython, safe_mode=True, dangerous_mode=False, enabled_options=None, ): """ Initialize the notebook tool registrar. Args: mcp_server: FastMCP server instance tools: QCodesReadOnlyTools instance ipython: IPython instance for direct notebook access safe_mode: Whether server is in safe mode (read-only) dangerous_mode: Whether server is in dangerous mode (auto-approve consents) enabled_options: Set of enabled optional features (measureit, database, etc.) """ self.mcp = mcp_server self.tools = tools self.ipython = ipython self.safe_mode = safe_mode self.dangerous_mode = dangerous_mode self.enabled_options = enabled_options or set()
# ===== Concise mode helpers ===== def _to_concise_variable_info(self, info: dict) -> dict: """Convert full variable info to concise format. Concise: name, type, qcodes_instrument flag, brief repr (first 10 chars). """ repr_full = info.get("repr", "") brief_repr = repr_full[:10] + "..." if len(repr_full) > 10 else repr_full return { "name": info.get("name"), "type": info.get("type"), "qcodes_instrument": info.get("qcodes_instrument", False), "repr": brief_repr, } def _to_concise_editing_cell(self, result: dict) -> dict: """Convert full editing cell info to concise format. Concise: cell_type, cell_index, cell_content. """ return { "cell_type": result.get("cell_type"), "cell_index": result.get("cell_index"), "cell_content": result.get("cell_content"), } def _to_detailed_editing_cell(self, result: dict) -> dict: """Filter editing cell info for detailed mode. Removes internal/debug fields: cell_id, notebook_path, client_id, captured, age_seconds, timestamp_ms, source, fresh_requested, fresh_threshold_ms. """ exclude_keys = { "cell_id", "notebook_path", "client_id", "captured", "age_seconds", "timestamp_ms", "source", "fresh_requested", "fresh_threshold_ms", } return {k: v for k, v in result.items() if k not in exclude_keys} def _to_concise_editing_cell_output(self, info: dict) -> dict: """Convert full editing cell output to concise format. Concise: status, message, has_output, has_error, output_summary (truncated), plus error_type/error_message if has_error is true. Removes verbose outputs array and provides brief summary instead. """ # Extract a brief summary of output (first 100 chars) output_summary = None outputs = info.get("outputs") or info.get("output") if outputs: if isinstance(outputs, str): output_summary = ( outputs[:100] + "..." if len(outputs) > 100 else outputs ) elif isinstance(outputs, list) and len(outputs) > 0: # Get first text output from outputs array for out in outputs: if isinstance(out, dict): text = out.get("text") or out.get("data", {}).get( "text/plain", "" ) if text: if isinstance(text, list): text = "".join(text) output_summary = ( text[:100] + "..." if len(text) > 100 else text ) break result = { "cell_id_notebook": info.get("cell_id_notebook") or info.get("cell_index"), "executed": info.get("executed", False), "status": info.get("status"), "message": info.get("message"), "has_output": info.get("has_output", False), "has_error": info.get("has_error", False), "output_summary": output_summary, } if info.get("has_error") and info.get("error"): result["error_type"] = info["error"].get("type") result["error_message"] = info["error"].get("message") if info.get("image_paths"): result["image_paths"] = info["image_paths"] return result def _to_concise_notebook_cells( self, result: dict, include_output: bool = True ) -> dict: """Convert full notebook cells to concise format. Concise: recent cells with cell_id_notebook, cell_type, executed, source (truncated). If include_output=True, also includes has_output, has_error, status. """ concise_cells = [] for cell in result.get("cells", []): # Support both old field name (input) and new field name (source) source_text = cell.get("source") or cell.get("input", "") truncated_source = ( source_text[:100] + "..." if len(source_text) > 100 else source_text ) concise_cell = { "cell_id_notebook": cell.get("cell_id_notebook"), "cell_type": cell.get("cell_type"), "executed": cell.get("cell_execution_number") is not None, "source": truncated_source, } if include_output: concise_cell["has_output"] = cell.get("has_output", False) concise_cell["has_error"] = cell.get("has_error", False) concise_cell["status"] = cell.get("status") if cell.get("image_paths"): concise_cell["image_paths"] = cell["image_paths"] concise_cells.append(concise_cell) return { "total_cells": result.get("total_cells"), "cells": concise_cells, "count": len(concise_cells), } def _to_concise_move_cursor(self, result: dict) -> dict: """Convert full move cursor result to concise format. Concise: success only. """ return {"success": result.get("success", False)} # ===== End concise mode helpers ===== def _is_valid_frontend_output(self, frontend_output: dict) -> bool: """Check if frontend response is valid cell output data (not a failure response). Valid responses have 'has_output' field or 'outputs' array. Failure responses like {success: false, message: "..."} are not valid. """ if not frontend_output or not isinstance(frontend_output, dict): return False # Valid cell output has 'has_output' field or 'outputs' array return "has_output" in frontend_output or "outputs" in frontend_output def _get_frontend_output( self, cell_number: int, timeout_s: float = 0.5 ) -> Optional[dict]: """ Request and retrieve cell output from JupyterLab frontend. Uses timestamp-based cache validation to avoid returning stale error states that no longer reflect the current cell state. Args: cell_number: Execution count of the cell timeout_s: Timeout for waiting for response Returns: Dictionary with output data or None if not available/expired """ # First check cache with TTL validation (default 60 seconds) # This prevents stale error states from persisting cached = get_cached_cell_output(cell_number) if cached and cached.get("data"): # Extract just the data portion, not the metadata wrapper return cached.get("data") # Request fresh data from frontend result = get_cell_outputs([cell_number], timeout_s=timeout_s) if not result.get("success"): return None # Wait a bit for response to arrive and be cached time.sleep(0.1) # Check cache again and extract data cached = get_cached_cell_output(cell_number) if cached and cached.get("data"): return cached.get("data") return None
[docs] def register_all(self): """Register all notebook tools.""" self._register_list_variables() self._register_read_variable() self._register_read_active_cell() self._register_read_active_cell_output() self._register_read_content() self._register_move_cursor() self._register_server_status()
def _register_list_variables(self): """Register the notebook/list_variables tool.""" @self.mcp.tool( name="notebook_list_variables", annotations={ "readOnlyHint": True, "idempotentHint": True, "openWorldHint": False, }, ) async def list_variables( type_filter: Optional[str] = None, detailed: bool = False, ) -> List[TextContent]: # Description loaded from metadata_baseline.yaml start = time.perf_counter() try: variables = await self.tools.list_variables(type_filter) duration = (time.perf_counter() - start) * 1000 log_tool_call( "notebook_list_variables", {"type_filter": type_filter}, duration, "success", ) return [TextContent(type="text", text=json.dumps(variables, indent=2))] except Exception as e: duration = (time.perf_counter() - start) * 1000 log_tool_call( "notebook_list_variables", {"type_filter": type_filter}, duration, "error", str(e), ) logger.error(f"Error in notebook/list_variables: {e}") return [ TextContent( type="text", text=json.dumps({"error": str(e)}, indent=2) ) ] def _register_read_variable(self): """Register the notebook/read_variable tool.""" @self.mcp.tool( name="notebook_read_variable", annotations={ "readOnlyHint": True, "idempotentHint": True, "openWorldHint": False, }, ) async def get_variable_info( name: str, detailed: bool = False ) -> List[TextContent]: # Description loaded from metadata_baseline.yaml start = time.perf_counter() try: info = await self.tools.get_variable_info(name) duration = (time.perf_counter() - start) * 1000 log_tool_call( "notebook_read_variable", {"name": name, "detailed": detailed}, duration, "success", ) # Apply concise mode filtering if not detailed: info = self._to_concise_variable_info(info) return [TextContent(type="text", text=json.dumps(info, indent=2))] except Exception as e: duration = (time.perf_counter() - start) * 1000 log_tool_call( "notebook_read_variable", {"name": name, "detailed": detailed}, duration, "error", str(e), ) logger.error(f"Error in notebook/get_variable_info: {e}") return [ TextContent( type="text", text=json.dumps({"error": str(e)}, indent=2) ) ] def _register_read_active_cell(self): """Register the notebook/read_active_cell tool.""" @self.mcp.tool( name="notebook_read_active_cell", annotations={ "readOnlyHint": True, "idempotentHint": True, "openWorldHint": False, }, ) async def get_editing_cell( fresh_ms: int = 1000, line_start: Optional[int] = None, line_end: Optional[int] = None, max_lines: int = 200, detailed: bool = False, ) -> List[TextContent]: # Description loaded from metadata_baseline.yaml start = time.perf_counter() args = { "fresh_ms": fresh_ms, "line_start": line_start, "line_end": line_end, "max_lines": max_lines, "detailed": detailed, } try: result = await self.tools.get_editing_cell( fresh_ms=fresh_ms, line_start=line_start, line_end=line_end, max_lines=max_lines, ) duration = (time.perf_counter() - start) * 1000 log_tool_call("notebook_read_active_cell", args, duration, "success") # Apply mode filtering if detailed: result = self._to_detailed_editing_cell(result) else: result = self._to_concise_editing_cell(result) return [ TextContent( type="text", text=json.dumps(result, indent=2, default=str) ) ] except Exception as e: duration = (time.perf_counter() - start) * 1000 log_tool_call( "notebook_read_active_cell", args, duration, "error", str(e) ) logger.error(f"Error in notebook/get_editing_cell: {e}") return [ TextContent( type="text", text=json.dumps({"error": str(e)}, indent=2) ) ] def _register_read_active_cell_output(self): """Register the notebook/read_active_cell_output tool.""" @self.mcp.tool( name="notebook_read_active_cell_output", annotations={ "readOnlyHint": True, "idempotentHint": True, "openWorldHint": False, }, ) async def get_editing_cell_output(detailed: bool = False) -> List[TextContent]: # Description loaded from metadata_baseline.yaml def format_response(info: dict) -> List[TextContent]: """Helper to format response with optional concise filtering.""" if not detailed: info = self._to_concise_editing_cell_output(info) return [TextContent(type="text", text=json.dumps(info, indent=2))] try: # FIX for Bug #10: Use direct frontend query instead of IPython history # This gets output from the currently selected cell in JupyterLab, # avoiding stale state issues with sys.last_* and Out history. frontend_result = get_active_cell_output(timeout_s=2.0) if frontend_result.get("success"): # Frontend returned the active cell's output directly outputs = frontend_result.get("outputs", []) has_output = frontend_result.get("has_output", False) has_error = frontend_result.get("has_error", False) execution_count = frontend_result.get("execution_count") cell_type = frontend_result.get("cell_type", "code") cell_index = frontend_result.get("cell_index") # Handle non-code cells if cell_type != "code": cell_info = { "status": "not_code_cell", "message": f"Active cell is a {cell_type} cell (no outputs)", "cell_type": cell_type, "cell_index": cell_index, "executed": False, "has_output": False, "has_error": False, } return format_response(cell_info) # Handle unexecuted code cells if execution_count is None: cell_info = { "status": "not_executed", "message": "Active cell has not been executed yet", "cell_index": cell_index, "executed": False, "has_output": False, "has_error": False, } return format_response(cell_info) # Extract error details if present error_info = None if has_error: for out in outputs: if out.get("type") == "error": error_info = { "type": out.get("ename", "UnknownError"), "message": out.get("evalue", ""), "traceback": "\n".join(out.get("traceback", [])), } break # Build response if has_error: status = "error" message = "Cell raised an exception" elif has_output: status = "completed" message = None else: status = "completed_no_output" message = "Cell executed successfully but produced no output" cell_info = { "cell_id_notebook": cell_index, "executed": True, "status": status, # Include outputs if there's output OR if there's an error # (error details are in the outputs array) "outputs": outputs if (has_output or has_error) else None, "has_output": has_output, "has_error": has_error, } if message: cell_info["message"] = message if error_info: cell_info["error"] = error_info if frontend_result.get("image_paths"): cell_info["image_paths"] = frontend_result["image_paths"] return format_response(cell_info) else: # Frontend request failed - fall back to IPython history # Note: _send_and_wait uses 'message' for errors, not 'error' error_msg = frontend_result.get("error") or frontend_result.get( "message" ) logger.debug( f"Frontend request failed: {error_msg}, " "falling back to IPython history" ) return await self._get_output_from_ipython_history(format_response) except Exception as e: logger.error(f"Error in get_editing_cell_output: {e}") return [ TextContent( type="text", text=json.dumps({"status": "error", "error": str(e)}, indent=2), ) ] async def _get_output_from_ipython_history(self, format_response): # Description loaded from metadata_baseline.yaml import sys import traceback if hasattr(self.ipython, "user_ns"): In = self.ipython.user_ns.get("In", []) Out = self.ipython.user_ns.get("Out", {}) current_execution_count = getattr(self.ipython, "execution_count", 0) if len(In) > 1: # In[0] is empty latest_cell_num = len(In) - 1 # Check if the latest cell is currently running if ( latest_cell_num not in Out and latest_cell_num == current_execution_count and In[latest_cell_num] ): cell_info = { "cell_number": latest_cell_num, "execution_count": latest_cell_num, "input": In[latest_cell_num], "status": "running", "message": "Cell is currently executing - no output available yet", "has_output": False, "has_error": False, "output": None, } return format_response(cell_info) # Find the most recent completed cell for i in range(len(In) - 1, 0, -1): if In[i]: # Skip empty entries # Check Out dictionary if i in Out: cell_info = { "cell_number": i, "execution_count": i, "input": In[i], "status": "completed", "output": str(Out[i]), "has_output": True, "has_error": False, } return format_response(cell_info) elif i < current_execution_count: # Cell was executed but produced no output has_error = False error_info = None # Check sys.last_* for error info if ( hasattr(sys, "last_type") and hasattr(sys, "last_value") and hasattr(sys, "last_traceback") and sys.last_type is not None and i == latest_cell_num ): has_error = True error_info = { "type": sys.last_type.__name__, "message": str(sys.last_value), "traceback": "".join( traceback.format_exception( sys.last_type, sys.last_value, sys.last_traceback, ) ), } if has_error: cell_info = { "cell_number": i, "execution_count": i, "input": In[i], "status": "error", "message": "Cell raised an exception", "output": None, "has_output": False, "has_error": True, "error": error_info, } else: cell_info = { "cell_number": i, "execution_count": i, "input": In[i], "status": "completed_no_output", "message": "Cell executed successfully but produced no output", "output": None, "has_output": False, "has_error": False, } return format_response(cell_info) # Fallback: no recent executed cells result = { "status": "no_cells", "error": "No recently executed cells found", "message": "Execute a cell first to see its output", "has_output": False, "has_error": False, } return format_response(result) def _register_read_content(self): """Register the notebook/read_content tool.""" @self.mcp.tool( name="notebook_read_content", annotations={ "readOnlyHint": True, "idempotentHint": True, "openWorldHint": False, }, ) async def get_notebook_cells( num_cells: int = 2, include_output: bool = True, cell_id_notebooks: Optional[str] = None, detailed: bool = False, ) -> List[TextContent]: # Description loaded from metadata_baseline.yaml # NEW TWO-PHASE APPROACH: Uses frontend to access ALL cells including unexecuted ones try: import sys import traceback # Parse cell_id_notebooks if provided (JSON array of indices) specific_indices = None if cell_id_notebooks: try: parsed = json.loads(cell_id_notebooks) if isinstance(parsed, list): specific_indices = [int(i) for i in parsed] elif isinstance(parsed, int): specific_indices = [parsed] else: return [ TextContent( type="text", text=json.dumps( { "error": "cell_id_notebooks must be an integer or list of integers" }, indent=2, ), ) ] except (json.JSONDecodeError, ValueError) as e: return [ TextContent( type="text", text=json.dumps( {"error": f"Invalid cell_id_notebooks format: {e}"}, indent=2, ), ) ] # PHASE 1: Get notebook structure (lightweight - no source code) structure = get_notebook_structure(timeout_s=2.0) if not structure.get("success"): # Frontend unavailable - check if position-based access was requested if specific_indices is not None: # Position-based access requires frontend - return explicit error logger.warning( f"Frontend unavailable: {structure.get('error')}. " "Cannot use cell_id_notebooks without frontend." ) return [ TextContent( type="text", text=json.dumps( { "error": "cell_id_notebooks requires JupyterLab frontend connection", "detail": "Position-based cell access is only available when the frontend bridge is connected. " "Use num_cells parameter instead, or ensure the JupyterLab extension is loaded.", "frontend_error": structure.get("error"), }, indent=2, ), ) ] # Fallback to IPython-based approach only for num_cells mode logger.warning( f"Frontend unavailable: {structure.get('error')}. " "Falling back to IPython history." ) return await self._get_cells_from_ipython( num_cells, include_output, detailed ) total_cells = structure.get("total_cells", 0) structure_cells = structure.get("cells", []) if total_cells == 0: result = { "total_cells": 0, "cells": [], "count": 0, "error_count": 0, } return [TextContent(type="text", text=json.dumps(result, indent=2))] # PHASE 2: Determine which cells to fetch if specific_indices is not None: # Fetch specific cells by index indices_to_fetch = [ i for i in specific_indices if 0 <= i < total_cells ] else: # Fetch cells around the active cell (cursor position) active_index = structure.get("active_cell_index", total_cells - 1) half = num_cells // 2 start = max(0, active_index - half) end = min(total_cells, start + num_cells) # Adjust start if we hit the end boundary if end == total_cells: start = max(0, total_cells - num_cells) indices_to_fetch = list(range(start, end)) # Get cells with source code cells_result = get_cells_by_index(indices_to_fetch, timeout_s=2.0) if not cells_result.get("success"): logger.warning( f"Failed to get cells by index: {cells_result.get('error')}" ) return await self._get_cells_from_ipython( num_cells, include_output, detailed ) fetched_cells = cells_result.get("cells", []) # PHASE 3: Process cells and add outputs for executed cells cells = [] for cell_data in fetched_cells: # Use execution number internally to fetch outputs (not exposed in return) exec_num = cell_data.get("cell_execution_number") cell_info = { "cell_id_notebook": cell_data.get("cell_id_notebook"), "cell_type": cell_data.get("cell_type"), "executed": exec_num is not None, "source": cell_data.get("source", ""), "has_output": False, "has_error": False, } if include_output and exec_num is not None: # Only fetch output for executed cells try: frontend_output = self._get_frontend_output(exec_num) if frontend_output and self._is_valid_frontend_output( frontend_output ): outputs = frontend_output.get("outputs", []) has_output = frontend_output.get("has_output", False) # Check for errors in outputs has_error_output = any( out.get("type") == "error" or out.get("output_type") == "error" for out in outputs ) cell_info["has_output"] = has_output cell_info["has_error"] = has_error_output cell_info["outputs"] = outputs if frontend_output.get("image_paths"): cell_info["image_paths"] = frontend_output[ "image_paths" ] if has_error_output: cell_info["status"] = "error" # Extract error details for out in outputs: if ( out.get("type") == "error" or out.get("output_type") == "error" ): cell_info["error"] = { "type": out.get( "ename", "UnknownError" ), "message": out.get("evalue", ""), "traceback": "\n".join( out.get("traceback", []) ), } break elif has_output: cell_info["status"] = "completed" else: cell_info["status"] = "completed_no_output" else: # Fallback to IPython Out cache Out = self.ipython.user_ns.get("Out", {}) if exec_num in Out: cell_info["output"] = str(Out[exec_num]) cell_info["has_output"] = True cell_info["status"] = "completed" else: cell_info["status"] = "completed_no_output" except Exception as e: logger.warning( f"Error getting output for cell {exec_num}: {e}" ) cell_info["status"] = "output_fetch_failed" elif exec_num is None: # Unexecuted cell (markdown or code that hasn't been run) cell_info["status"] = "not_executed" else: cell_info["status"] = "completed_no_output" cells.append(cell_info) # Count cells with errors error_count = sum(1 for cell in cells if cell.get("has_error", False)) result = { "total_cells": total_cells, "cells": cells, "count": len(cells), "requested": len(indices_to_fetch), "error_count": error_count, } # Apply concise mode filtering if not detailed: result = self._to_concise_notebook_cells(result, include_output) return [TextContent(type="text", text=json.dumps(result, indent=2))] except Exception as e: logger.error(f"Error in get_notebook_cells: {e}") return [ TextContent( type="text", text=json.dumps({"error": str(e)}, indent=2) ) ] async def _get_cells_from_ipython( self, num_cells: int, include_output: bool, detailed: bool ) -> List[TextContent]: """Fallback method to get cells from IPython history when frontend unavailable.""" import sys import traceback cells = [] current_execution_count = getattr(self.ipython, "execution_count", 0) if hasattr(self.ipython, "user_ns"): In = self.ipython.user_ns.get("In", []) Out = self.ipython.user_ns.get("Out", {}) if len(In) > 1: # In[0] is empty start_idx = max(1, len(In) - num_cells) latest_executed = len(In) - 1 for i in range(start_idx, len(In)): if i < len(In) and In[i]: cell_info = { "cell_id_notebook": None, # Unknown for IPython fallback "cell_type": "code", # IPython only tracks code cells "executed": True, # IPython In[] only contains executed cells "source": In[i], "has_error": False, } if include_output: if i in Out: cell_info["output"] = str(Out[i]) cell_info["has_output"] = True cell_info["status"] = "completed" elif i < current_execution_count: cell_info["has_output"] = False # Check sys.last_* for latest cell errors if i == latest_executed: if ( hasattr(sys, "last_type") and hasattr(sys, "last_value") and hasattr(sys, "last_traceback") and sys.last_type is not None ): cell_info["has_error"] = True cell_info["error"] = { "type": sys.last_type.__name__, "message": str(sys.last_value), "traceback": "".join( traceback.format_exception( sys.last_type, sys.last_value, sys.last_traceback, ) ), } cell_info["status"] = "error" else: cell_info["status"] = "completed_no_output" else: cell_info["status"] = "completed_no_output" else: cell_info["has_output"] = False cell_info["status"] = "not_executed" else: cell_info["has_output"] = False cells.append(cell_info) error_count = sum(1 for cell in cells if cell.get("has_error", False)) result = { "total_cells": None, # Unknown for IPython fallback "cells": cells, "count": len(cells), "requested": num_cells, "error_count": error_count, "note": "Using IPython fallback - only executed code cells shown", } if not detailed: result = self._to_concise_notebook_cells(result, include_output) return [TextContent(type="text", text=json.dumps(result, indent=2))] def _register_move_cursor(self): """Register the notebook/move_cursor tool.""" @self.mcp.tool( name="notebook_move_cursor", annotations={ "readOnlyHint": False, "destructiveHint": False, "idempotentHint": True, "openWorldHint": False, }, ) async def move_cursor(target: str, detailed: bool = False) -> List[TextContent]: # Description loaded from metadata_baseline.yaml try: result = await self.tools.move_cursor(target) # Apply concise mode filtering if not detailed: result = self._to_concise_move_cursor(result) return [ TextContent( type="text", text=json.dumps(result, indent=2, default=str) ) ] except Exception as e: logger.error(f"Error in notebook/move_cursor: {e}") return [ TextContent( type="text", text=json.dumps({"error": str(e)}, indent=2) ) ] def _register_server_status(self): """Register the notebook/server_status tool.""" @self.mcp.tool( name="notebook_server_status", annotations={ "readOnlyHint": True, "idempotentHint": True, "openWorldHint": False, }, ) async def server_status(detailed: bool = False) -> List[TextContent]: # Description loaded from metadata_baseline.yaml try: # Get list of registered tools from FastMCP registered_tools = [] if hasattr(self.mcp, "_tools"): registered_tools = list(self.mcp._tools.keys()) # Determine mode: dangerous > unsafe > safe if self.dangerous_mode: mode = "dangerous" elif self.safe_mode: mode = "safe" else: mode = "unsafe" status = { "status": "running", "mode": mode, "enabled_options": sorted(list(self.enabled_options)), "dynamic_tools_count": len(registered_tools), "tools": registered_tools[:20], # Limit to first 20 for readability } return [TextContent(type="text", text=json.dumps(status, indent=2))] except Exception as e: logger.error(f"Error in server_status: {e}") return [ TextContent( type="text", text=json.dumps({"error": str(e)}, indent=2) ) ]