Source code for instrmcp.servers.jupyter_qcodes.active_cell_bridge

"""
Active Cell Bridge for Jupyter MCP Extension

Handles communication between JupyterLab frontend and kernel to capture
the currently editing cell content via Jupyter comm protocol.
"""

import copy
import time
import threading
import logging
from typing import Optional, Dict, Any, List
from IPython.core.getipython import get_ipython

logger = logging.getLogger(__name__)

# Global state with thread safety
_STATE_LOCK = threading.Lock()
_LAST_SNAPSHOT: Optional[Dict[str, Any]] = None
_LAST_TS = 0.0
# FIX: Changed from set() to Dict to track comm-to-kernel association
# This prevents broadcasting to ALL comms - only sends to the current kernel's comm
_KERNEL_COMM_MAP: Dict[str, Any] = {}  # kernel_id -> comm (one-to-one mapping)

# Cell outputs cache with timestamps for staleness detection
# Structure: {exec_count: {"data": output_data, "timestamp": float, "kernel_id": str}}
_CELL_OUTPUTS_CACHE: Dict[int, Dict[str, Any]] = {}

# Default cache TTL in seconds (60 seconds)
# Cached outputs older than this are considered stale and will be refreshed
CELL_OUTPUT_CACHE_TTL_SECONDS = 60.0

# Response waiting mechanism for operations that need frontend confirmation
# Maps request_id -> [threading.Event, response_dict or None]
_PENDING_REQUESTS: Dict[str, List] = {}


def _get_kernel_id() -> Optional[str]:
    """
    Get the kernel ID for the current IPython session.

    IMPORTANT: This must return the same ID that JupyterLab's frontend uses
    (kernel.id), which is the UUID from the connection file name.

    The connection file is typically named: kernel-<UUID>.json
    JupyterLab uses this UUID as kernel.id.

    Returns:
        Kernel ID string or None if not in IPython context
    """
    import re

    ip = get_ipython()
    if ip and hasattr(ip, "kernel") and ip.kernel:
        # Method 1: Extract UUID from connection file (matches JupyterLab's kernel.id)
        try:
            from ipykernel import get_connection_file

            connection_file = get_connection_file()
            # Connection file format: /path/to/kernel-<UUID>.json
            match = re.search(r"kernel-([a-f0-9-]+)\.json", connection_file)
            if match:
                return match.group(1)
        except Exception as e:
            logger.debug(f"Could not extract kernel ID from connection file: {e}")

        # Method 2: Try kernel's ident (some versions expose this)
        ident = getattr(ip.kernel, "ident", None)
        if ident:
            return str(ident)

        # Method 3: Fallback to session.session (may not match frontend)
        session = getattr(ip.kernel, "session", None)
        if session:
            session_id = getattr(session, "session", None)
            if session_id:
                logger.warning(
                    f"Using session.session as kernel_id ({session_id}). "
                    "This may not match frontend's kernel.id."
                )
                return str(session_id)

        # Last resort: use object id (will definitely not match frontend)
        logger.warning("Using object id as kernel_id - frontend matching will fail")
        return f"kernel_{id(ip.kernel)}"
    return None


def _on_comm_open(comm, open_msg):
    """
    Handle new comm connection from frontend.

    FIX: Now registers comm with kernel ID instead of adding to global set.
    This ensures operations only target the correct kernel's frontend.
    """
    logger.debug(f"🔌 NEW COMM OPENED: {comm.comm_id}")

    # Get kernel_id from open message (sent by frontend) or detect at runtime
    data = open_msg.get("content", {}).get("data", {})
    kernel_id = data.get("kernel_id") or _get_kernel_id()

    if not kernel_id:
        logger.warning(
            "⚠️ Cannot register comm: no kernel_id available. "
            "Operations may fail until kernel is identified."
        )
        # Still set up handlers but without kernel association
        kernel_id = f"unknown_{comm.comm_id}"

    with _STATE_LOCK:
        # Close existing comm for this kernel (clean replacement)
        old_comm = _KERNEL_COMM_MAP.get(kernel_id)
        if old_comm and old_comm != comm:
            logger.debug(f"♻️ Replacing existing comm for kernel {kernel_id}")
            try:
                old_comm.close()
            except Exception as e:
                logger.debug(f"Failed to close old comm: {e}")
        _KERNEL_COMM_MAP[kernel_id] = comm

    logger.debug(
        f"📊 Registered comm for kernel: {kernel_id} "
        f"(total kernels: {len(_KERNEL_COMM_MAP)})"
    )

    # Store kernel_id on the comm for use in close handler
    comm._mcp_kernel_id = kernel_id

    def _on_msg(msg):
        """Handle incoming messages from frontend."""
        data = msg.get("content", {}).get("data", {})
        msg_type = data.get("type")

        if msg_type == "snapshot":
            # Store the cell snapshot
            snapshot = {
                "notebook_path": data.get("path"),
                "cell_id": data.get("id"),
                "cell_index": data.get("index"),
                "cell_type": data.get("cell_type", "code"),
                "text": data.get("text", ""),
                "cursor": data.get("cursor"),
                "selection": data.get("selection"),
                "client_id": data.get("client_id"),
                "ts_ms": data.get("ts_ms", int(time.time() * 1000)),
            }

            with _STATE_LOCK:
                global _LAST_SNAPSHOT, _LAST_TS
                _LAST_SNAPSHOT = snapshot
                _LAST_TS = time.time()

            logger.debug(
                f"Received cell snapshot: {len(snapshot.get('text', ''))} chars"
            )

        elif msg_type == "pong":
            # Response to our ping request
            logger.debug("Received pong from frontend")

        elif msg_type == "get_cell_outputs_response":
            # Response from frontend with cell outputs
            from .image_utils import process_outputs_list

            outputs = data.get("outputs", {})
            current_time = time.time()
            kernel_id = getattr(comm, "_mcp_kernel_id", "unknown")

            # Store outputs in cache with timestamp for staleness detection
            with _STATE_LOCK:
                for cell_num_str, output_data in outputs.items():
                    try:
                        cell_num = int(cell_num_str)
                        # Process images: save to temp files, replace base64 with paths
                        cell_outputs = output_data.get("outputs", [])
                        if cell_outputs:
                            processed, image_paths = process_outputs_list(cell_outputs)
                            output_data = dict(output_data)
                            output_data["outputs"] = processed
                            if image_paths:
                                output_data["image_paths"] = image_paths
                        _CELL_OUTPUTS_CACHE[cell_num] = {
                            "data": output_data,
                            "timestamp": current_time,
                            "kernel_id": kernel_id,
                        }
                    except ValueError:
                        pass

            logger.debug(f"Cached outputs for {len(outputs)} cells (with timestamps)")

        elif msg_type in [
            "update_response",
            "execute_response",
            "add_cell_response",
            "delete_cell_response",
            "apply_patch_response",
            "move_cursor_response",
            "get_active_cell_output_response",
            "get_notebook_structure_response",
            "get_cells_by_index_response",
            "delete_cells_by_index_response",
        ]:
            # Response from frontend for our requests
            request_id = data.get("request_id")
            success = data.get("success", False)
            message = data.get("message", "")

            # Log additional info for move_cursor_response
            if msg_type == "move_cursor_response" and success:
                old_index = data.get("old_index")
                new_index = data.get("new_index")
                logger.debug(f"✅ CURSOR MOVED: {old_index}{new_index}")
            else:
                logger.debug(
                    f"✅ RECEIVED {msg_type} for request {request_id}: success={success}, message={message}"
                )

            # Resolve pending request if someone is waiting for the response
            if request_id:
                with _STATE_LOCK:
                    if request_id in _PENDING_REQUESTS:
                        event, _ = _PENDING_REQUESTS[request_id]
                        # Store the full response data
                        _PENDING_REQUESTS[request_id] = [event, data]
                        event.set()  # Wake up the waiting thread
                        logger.debug(f"✅ Resolved pending request {request_id}")

        else:
            # Unknown message type - log for debugging
            logger.warning(f"❓ UNKNOWN MESSAGE TYPE: {msg_type}, data: {data}")

    def _on_close(msg):
        """Handle comm close - remove from kernel mapping."""
        kernel_id = getattr(comm, "_mcp_kernel_id", None)
        logger.debug(f"Comm closed: {comm.comm_id} (kernel: {kernel_id})")

        with _STATE_LOCK:
            # Only remove if this comm is still the registered one for this kernel
            if kernel_id and _KERNEL_COMM_MAP.get(kernel_id) == comm:
                del _KERNEL_COMM_MAP[kernel_id]
                logger.debug(
                    f"🗑️ Removed comm for kernel {kernel_id} "
                    f"(remaining kernels: {len(_KERNEL_COMM_MAP)})"
                )

    comm.on_msg(_on_msg)
    comm.on_close(_on_close)


[docs] def register_comm_target(): """Register the comm target with IPython kernel.""" ip = get_ipython() if not ip or not hasattr(ip, "kernel"): logger.warning("No IPython kernel found, cannot register comm target") return try: ip.kernel.comm_manager.register_target("mcp:active_cell", _on_comm_open) logger.debug("Registered comm target 'mcp:active_cell'") except Exception as e: logger.error(f"Failed to register comm target: {e}")
[docs] def request_frontend_snapshot(): """Request fresh snapshot from current kernel's frontend.""" kernel_id = _get_kernel_id() if not kernel_id: logger.debug("Cannot request snapshot: no kernel_id") return with _STATE_LOCK: comm = _KERNEL_COMM_MAP.get(kernel_id) if not comm: logger.debug(f"Cannot request snapshot: no comm for kernel {kernel_id}") return try: comm.send({"type": "request_current"}) logger.debug(f"Sent request_current to comm {comm.comm_id}") except Exception as e: logger.debug(f"Failed to send request to comm {comm.comm_id}: {e}")
def _send_to_kernel(payload: Dict[str, Any]) -> Dict[str, Any]: """ Send a message to the current kernel's comm only. This is the core fix for bugs #1 and #2 - instead of broadcasting to ALL comms (which caused operations to repeat N times), we now send to only the comm associated with the current kernel. Args: payload: Message payload (type, request_id, and operation-specific data) Returns: Result dict with success status, request_id, kernel_id, or error """ import uuid kernel_id = _get_kernel_id() if not kernel_id: return { "success": False, "error": "Cannot identify current kernel - are you in a Jupyter notebook?", } with _STATE_LOCK: comm = _KERNEL_COMM_MAP.get(kernel_id) if not comm: return { "success": False, "error": f"No active frontend connection for kernel {kernel_id}", "kernel_id": kernel_id, "hint": "Ensure the JupyterLab extension is loaded and connected", } # Add request_id if not present if "request_id" not in payload: payload["request_id"] = str(uuid.uuid4()) try: comm.send(payload) return { "success": True, "message": f"{payload.get('type', 'unknown')} request sent", "request_id": payload["request_id"], "kernel_id": kernel_id, } except Exception as e: logger.error(f"Failed to send {payload.get('type')}: {e}") # Clean up dead comm with _STATE_LOCK: if _KERNEL_COMM_MAP.get(kernel_id) == comm: del _KERNEL_COMM_MAP[kernel_id] logger.debug(f"Removed dead comm for kernel {kernel_id}") return { "success": False, "error": f"Send failed: {e}", "kernel_id": kernel_id, } def _send_and_wait(payload: Dict[str, Any], timeout_s: float = 2.0) -> Dict[str, Any]: """ Send a message to the frontend and wait for the response. This function allows operations to wait for and return the actual frontend response, including error messages for failures like "cell not found". Args: payload: Message payload (type, and operation-specific data) timeout_s: How long to wait for response from frontend Returns: The actual response from frontend, or error dict on timeout/failure """ import uuid # Generate request_id if not present request_id = payload.get("request_id") or str(uuid.uuid4()) payload["request_id"] = request_id # Create event for waiting event = threading.Event() with _STATE_LOCK: _PENDING_REQUESTS[request_id] = [event, None] try: # Send the message using existing function (which will use our request_id) send_result = _send_to_kernel(payload) if not send_result["success"]: # Send failed, clean up and return the error with _STATE_LOCK: _PENDING_REQUESTS.pop(request_id, None) return send_result # Wait for response with timeout if event.wait(timeout=timeout_s): # Got response with _STATE_LOCK: _, response = _PENDING_REQUESTS.pop(request_id, [None, None]) if response: # Return the full response from frontend return { "success": response.get("success", False), "message": response.get("message", ""), "request_id": request_id, "kernel_id": send_result.get("kernel_id"), # Include any additional fields from frontend response **{ k: v for k, v in response.items() if k not in ["type", "request_id", "success", "message"] }, } else: return { "success": False, "error": "Response received but data was empty", "request_id": request_id, } else: # Timeout with _STATE_LOCK: _PENDING_REQUESTS.pop(request_id, None) return { "success": False, "error": f"Timeout waiting for frontend response after {timeout_s}s", "request_id": request_id, "kernel_id": send_result.get("kernel_id"), } except Exception as e: # Clean up on any error with _STATE_LOCK: _PENDING_REQUESTS.pop(request_id, None) logger.error(f"Error in _send_and_wait: {e}") return { "success": False, "error": f"Error waiting for response: {e}", "request_id": request_id, } def _get_current_comm() -> Optional[Any]: """ Get the comm for the current kernel (for read operations). Returns: Comm object or None if not available """ kernel_id = _get_kernel_id() if not kernel_id: return None with _STATE_LOCK: return _KERNEL_COMM_MAP.get(kernel_id) def _wrap_snapshot_with_metadata( snapshot: Optional[Dict[str, Any]], stale: bool, source: str, age_ms: float, stale_reason: Optional[str] = None, ) -> Optional[Dict[str, Any]]: """ Wrap a snapshot with staleness metadata. Args: snapshot: The snapshot dict to wrap (will be deep copied) stale: Whether the data is stale source: "live" (fresh enough to use) or "cache" (stale/fallback). Note: "live" means data meets freshness threshold, not necessarily that it was just fetched from frontend. age_ms: Age of the snapshot in milliseconds stale_reason: Reason for staleness (e.g., "no_active_comms", "timeout") Returns: Deep copy of snapshot with metadata added, or None if snapshot is None """ if snapshot is None: return None # Use deepcopy to prevent mutations from corrupting the cached _LAST_SNAPSHOT result = copy.deepcopy(snapshot) result["stale"] = stale result["source"] = source result["age_ms"] = age_ms if stale_reason: result["stale_reason"] = stale_reason return result
[docs] def get_active_cell( fresh_ms: Optional[int] = None, timeout_s: float = 0.3 ) -> Optional[Dict[str, Any]]: """ Get the most recent active cell snapshot. Args: fresh_ms: If provided, require snapshot to be no older than this many milliseconds. If snapshot is too old, will request fresh data from frontend. timeout_s: How long to wait for fresh data from frontend (default 0.3s) Returns: Dictionary with cell information or None if no data available. Includes metadata fields: - stale (bool): Whether the data is stale - source (str): "live" or "cache" - age_ms (float): Age of the snapshot in milliseconds - stale_reason (str, optional): Reason for staleness if stale=True """ now = time.time() with _STATE_LOCK: if _LAST_SNAPSHOT is None: # No snapshot yet, try requesting from frontend pass else: age_ms = (now - _LAST_TS) * 1000 if _LAST_TS else None # Snapshot is fresh if: no freshness required OR (age is known AND within threshold) if fresh_ms is None or (age_ms is not None and age_ms <= fresh_ms): # Snapshot is fresh enough return _wrap_snapshot_with_metadata( _LAST_SNAPSHOT, stale=False, source="live", age_ms=age_ms ) # Need fresh data - request from current kernel's frontend comm = _get_current_comm() if not comm: logger.debug("No active comm available for fresh data request") with _STATE_LOCK: if _LAST_SNAPSHOT is None: return None age_ms = (time.time() - _LAST_TS) * 1000 if _LAST_TS else None return _wrap_snapshot_with_metadata( _LAST_SNAPSHOT, stale=True, source="cache", age_ms=age_ms, stale_reason="no_active_comms", ) # Request fresh data request_frontend_snapshot() # Wait for update with timeout start_time = time.time() while time.time() - start_time < timeout_s: time.sleep(0.05) # 50ms polling with _STATE_LOCK: if _LAST_SNAPSHOT is not None: age_ms = (time.time() - _LAST_TS) * 1000 if _LAST_TS else None if fresh_ms is None or (age_ms is not None and age_ms <= fresh_ms): return _wrap_snapshot_with_metadata( _LAST_SNAPSHOT, stale=False, source="live", age_ms=age_ms ) # Timeout - return what we have (marked as stale) with _STATE_LOCK: if _LAST_SNAPSHOT is None: return None age_ms = (time.time() - _LAST_TS) * 1000 if _LAST_TS else None return _wrap_snapshot_with_metadata( _LAST_SNAPSHOT, stale=True, source="cache", age_ms=age_ms, stale_reason="timeout", )
[docs] def get_bridge_status() -> Dict[str, Any]: """Get status information about the bridge.""" with _STATE_LOCK: return { "comm_target_registered": True, # If this function is called, target is registered "active_kernels": len(_KERNEL_COMM_MAP), "kernel_ids": list(_KERNEL_COMM_MAP.keys()), "current_kernel_id": _get_kernel_id(), "has_snapshot": _LAST_SNAPSHOT is not None, "last_snapshot_age_s": time.time() - _LAST_TS if _LAST_TS else None, "snapshot_summary": ( { "cell_type": ( _LAST_SNAPSHOT.get("cell_type") if _LAST_SNAPSHOT else None ), "text_length": ( len(_LAST_SNAPSHOT.get("text", "")) if _LAST_SNAPSHOT else 0 ), "notebook_path": ( _LAST_SNAPSHOT.get("notebook_path") if _LAST_SNAPSHOT else None ), } if _LAST_SNAPSHOT else None ), }
[docs] def update_active_cell(content: str, timeout_s: float = 2.0) -> Dict[str, Any]: """ Update the content of the currently active cell in JupyterLab frontend. FIX: Now sends to only the current kernel's comm instead of all comms. Args: content: New content to set in the active cell timeout_s: How long to wait for response from frontend (default 2.0s) Returns: Dictionary with update status and response details """ result = _send_to_kernel({"type": "update_cell", "content": content}) if result["success"]: result["content_length"] = len(content) return result
[docs] def execute_active_cell(timeout_s: float = 5.0) -> Dict[str, Any]: """ Execute the currently active cell in JupyterLab frontend. FIX: Now sends to only the current kernel's comm instead of all comms. Args: timeout_s: How long to wait for response from frontend (default 5.0s) Returns: Dictionary with execution status and response details """ result = _send_to_kernel({"type": "execute_cell"}) if result["success"]: result["warning"] = "UNSAFE: Code execution was requested in active cell" return result
[docs] def add_new_cell( cell_type: str = "code", position: str = "below", content: str = "", timeout_s: float = 2.0, ) -> Dict[str, Any]: """ Add a new cell relative to the currently active cell in JupyterLab frontend. FIX: Now sends to only the current kernel's comm instead of all comms and waits for the frontend response to report actual success/failure. Args: cell_type: Type of cell to create ("code", "markdown", "raw") position: Position relative to active cell ("above", "below", "end") "end" appends the cell at the very end of the notebook content: Initial content for the new cell timeout_s: How long to wait for response from frontend (default 2.0s) Returns: Dictionary with creation status and response details """ logger.debug( f"🚀 ADD_NEW_CELL called: type={cell_type}, position={position}, content_len={len(content)}" ) # Validate parameters valid_types = {"code", "markdown", "raw"} valid_positions = {"above", "below", "end"} if cell_type not in valid_types: return { "success": False, "error": f"Invalid cell_type '{cell_type}'. Must be one of: {', '.join(valid_types)}", } if position not in valid_positions: return { "success": False, "error": f"Invalid position '{position}'. Must be one of: {', '.join(valid_positions)}", } result = _send_and_wait( { "type": "add_cell", "cell_type": cell_type, "position": position, "content": content, }, timeout_s=timeout_s, ) if result["success"]: result["cell_type"] = cell_type result["position"] = position result["content_length"] = len(content) result["warning"] = "UNSAFE: New cell was added to notebook" return result
[docs] def delete_editing_cell(timeout_s: float = 2.0) -> Dict[str, Any]: """ Delete the currently active cell in JupyterLab frontend. FIX: Now sends to only the current kernel's comm instead of all comms. This is the primary fix for Bug #1 - previously this function would send delete requests to ALL connected frontends, causing 2-5 cells to be deleted. Args: timeout_s: How long to wait for response from frontend (default 2.0s) Returns: Dictionary with deletion status and response details """ result = _send_to_kernel({"type": "delete_cell"}) if result["success"]: result["warning"] = "UNSAFE: Cell was deleted from notebook" return result
[docs] def apply_patch(old_text: str, new_text: str, timeout_s: float = 2.0) -> Dict[str, Any]: """ Apply a simple text replacement patch to the currently active cell. FIX: Now sends to only the current kernel's comm instead of all comms. This is the primary fix for Bug #2 - previously this function would send patch requests to ALL connected frontends, causing the patch to apply multiple times (e.g., "y = 2" -> "y = 200" became "y = 2000000"). This function replaces the first occurrence of old_text with new_text in the active cell content. Args: old_text: Text to find and replace new_text: Text to replace with timeout_s: How long to wait for response from frontend (default 2.0s) Returns: Dictionary with patch status and response details """ if not old_text: return {"success": False, "error": "old_text parameter cannot be empty"} result = _send_to_kernel( { "type": "apply_patch", "old_text": old_text, "new_text": new_text, } ) if result["success"]: result["old_text_length"] = len(old_text) result["new_text_length"] = len(new_text) result["warning"] = "UNSAFE: Cell content was modified via patch" return result
[docs] def delete_cells_by_number( cell_numbers: List[int], timeout_s: float = 2.0 ) -> Dict[str, Any]: """ Delete multiple cells by their execution count numbers. FIX: Now sends to only the current kernel's comm instead of all comms. This function sends a request to the JupyterLab frontend to delete cells identified by their execution counts. Args: cell_numbers: List of execution count numbers to delete (e.g., [1, 2, 5]) timeout_s: How long to wait for response from frontend (default 2.0s) Returns: Dictionary with deletion status and detailed results for each cell """ if not isinstance(cell_numbers, list) or len(cell_numbers) == 0: return {"success": False, "error": "cell_numbers must be a non-empty list"} result = _send_to_kernel( { "type": "delete_cells_by_number", "cell_numbers": cell_numbers, } ) if result["success"]: result["cell_numbers"] = cell_numbers result["total_requested"] = len(cell_numbers) result["warning"] = ( "UNSAFE: Cells deletion requested - check notebook for results" ) return result
[docs] def get_cached_cell_output( cell_number: int, max_age_seconds: Optional[float] = None, ) -> Optional[Dict[str, Any]]: """ Get cached output for a specific cell from the frontend response cache. Implements timestamp-based cache validation to avoid returning stale error states that no longer reflect the current cell state. Args: cell_number: Execution count number of the cell max_age_seconds: Maximum age of cached data in seconds. If None, uses CELL_OUTPUT_CACHE_TTL_SECONDS (default 60s). If 0, returns cached data regardless of age. Returns: Dictionary with output data if available and not expired, None otherwise. The returned dict includes metadata: - "data": The actual output data - "timestamp": When the data was cached - "age_seconds": How old the cached data is - "stale": Whether the data exceeds max_age_seconds """ if max_age_seconds is None: max_age_seconds = CELL_OUTPUT_CACHE_TTL_SECONDS with _STATE_LOCK: cache_entry = _CELL_OUTPUTS_CACHE.get(cell_number) if cache_entry is None: return None current_time = time.time() cached_timestamp = cache_entry.get("timestamp", 0) age_seconds = current_time - cached_timestamp # Check if cache entry is expired (unless max_age is 0, meaning no expiry) is_stale = max_age_seconds > 0 and age_seconds > max_age_seconds if is_stale: # Remove stale entry from cache del _CELL_OUTPUTS_CACHE[cell_number] logger.debug( f"Cache entry for cell {cell_number} expired " f"(age: {age_seconds:.1f}s > TTL: {max_age_seconds}s)" ) return None # Return the cached data with metadata return { "data": cache_entry.get("data"), "timestamp": cached_timestamp, "age_seconds": age_seconds, "stale": False, "kernel_id": cache_entry.get("kernel_id"), }
[docs] def invalidate_cell_output_cache( cell_numbers: Optional[List[int]] = None, older_than_seconds: Optional[float] = None, ) -> Dict[str, Any]: """ Invalidate (clear) cached cell outputs. This function helps prevent stale error states from persisting in the cache. It can be called after cell re-execution to ensure fresh data is fetched. Args: cell_numbers: List of specific cell numbers to invalidate. If None, invalidates all cached cells. older_than_seconds: Only invalidate entries older than this many seconds. If None, invalidates regardless of age. Returns: Dictionary with invalidation results: - "invalidated_count": Number of cache entries removed - "remaining_count": Number of entries still in cache - "cell_numbers": List of cell numbers that were invalidated """ current_time = time.time() invalidated = [] with _STATE_LOCK: if cell_numbers is None: # Invalidate all or by age cells_to_check = list(_CELL_OUTPUTS_CACHE.keys()) else: cells_to_check = cell_numbers for cell_num in cells_to_check: if cell_num not in _CELL_OUTPUTS_CACHE: continue cache_entry = _CELL_OUTPUTS_CACHE[cell_num] should_invalidate = True if older_than_seconds is not None: cached_timestamp = cache_entry.get("timestamp", 0) age_seconds = current_time - cached_timestamp should_invalidate = age_seconds > older_than_seconds if should_invalidate: del _CELL_OUTPUTS_CACHE[cell_num] invalidated.append(cell_num) remaining_count = len(_CELL_OUTPUTS_CACHE) logger.debug( f"Invalidated {len(invalidated)} cache entries, {remaining_count} remaining" ) return { "invalidated_count": len(invalidated), "remaining_count": remaining_count, "cell_numbers": invalidated, }
[docs] def get_cell_outputs(cell_numbers: List[int], timeout_s: float = 2.0) -> Dict[str, Any]: """ Get outputs for specific cells from the JupyterLab frontend. FIX: Now sends to only the current kernel's comm instead of all comms. Retrieves cell outputs (stdout, stderr, execute_result, errors) from the notebook model in the JupyterLab frontend. Args: cell_numbers: List of execution count numbers to get outputs for (e.g., [1, 2, 5]) timeout_s: How long to wait for response from frontend (default 2.0s) Returns: Dictionary with outputs for each requested cell number """ if not isinstance(cell_numbers, list) or len(cell_numbers) == 0: return {"success": False, "error": "cell_numbers must be a non-empty list"} result = _send_to_kernel( { "type": "get_cell_outputs", "cell_numbers": cell_numbers, } ) if result["success"]: result["cell_numbers"] = cell_numbers return result
[docs] def move_cursor(target: str, timeout_s: float = 2.0) -> Dict[str, Any]: """ Move cursor to a different cell in the notebook. Waits for frontend response to return actual success/failure status, including errors when the target cell does not exist. Args: target: Where to move the cursor: - "above": Move to cell above current - "below": Move to cell below current - "bottom": Move to the last cell in the notebook (by file order) - "index:N": Move to cell at position N (0-indexed) - works for ALL cells timeout_s: How long to wait for response from frontend (default 2.0s) Returns: Dictionary with operation status, old index, and new index. Returns success=False with error message if target cell not found. """ # Validate target valid_targets = ["above", "below", "bottom"] if target not in valid_targets and not target.startswith("index:"): return { "success": False, "error": f"Invalid target '{target}'. Must be 'above', 'below', 'bottom', or 'index:N'", } # Use _send_and_wait to get actual frontend response result = _send_and_wait( { "type": "move_cursor", "target": str(target), }, timeout_s=timeout_s, ) if result["success"]: result["target"] = target return result
[docs] def get_active_cell_output(timeout_s: float = 10.0) -> Dict[str, Any]: """ Get the output of the currently active cell directly from JupyterLab frontend. This function retrieves the output from the cell that is currently selected in JupyterLab, avoiding stale state issues with IPython's In/Out history. FIX for Bug #10: The previous implementation used IPython's sys.last_value and Out history which can be stale. This directly queries the JupyterLab frontend for the active cell's current outputs. Args: timeout_s: How long to wait for response from frontend (default 10.0s) Returns: Dictionary with: - success (bool): Whether the operation succeeded - cell_type (str): Type of the active cell ("code", "markdown", etc.) - cell_index (int): Index of the active cell in the notebook - execution_count (int|None): Execution count for code cells - has_output (bool): Whether the cell has any output - has_error (bool): Whether the cell output contains an error - outputs (list): List of output objects (stream, execute_result, error, etc.) - image_paths (list): File paths of saved images (if any) - message (str): Status message """ from .image_utils import process_outputs_list result = _send_and_wait( {"type": "get_active_cell_output"}, timeout_s=timeout_s, ) # Process images: save to temp files, replace base64 with paths if result.get("success") and "outputs" in result: processed, image_paths = process_outputs_list(result["outputs"]) result["outputs"] = processed if image_paths: result["image_paths"] = image_paths return result
[docs] def get_notebook_structure(timeout_s: float = 2.0) -> Dict[str, Any]: """ Get lightweight notebook structure (metadata only, no source code). This function retrieves the list of all cells in the notebook with their metadata (type, position, execution count) but WITHOUT source code, making it fast for large notebooks. Args: timeout_s: How long to wait for response from frontend (default 2.0s) Returns: Dictionary with: - success (bool): Whether the operation succeeded - total_cells (int): Total number of cells in notebook - active_cell_index (int): Index of currently active cell - cells (list): List of cell metadata dicts with: - cell_id_notebook (int): Position in notebook (0-indexed) - cell_type (str): "code", "markdown", or "raw" - cell_execution_number (int|None): IPython counter or null - error (str): Error message if failed """ result = _send_and_wait( {"type": "get_notebook_structure"}, timeout_s=timeout_s, ) return result
[docs] def get_cells_by_index( cell_id_notebooks: List[int], timeout_s: float = 2.0 ) -> Dict[str, Any]: """ Get specific cells by position index (with source code). This function fetches specific cells from the notebook by their position, allowing targeted retrieval of cell content without fetching the entire notebook. Args: cell_id_notebooks: List of cell indices to fetch (0-indexed positions) timeout_s: How long to wait for response from frontend (default 2.0s) Returns: Dictionary with: - success (bool): Whether the operation succeeded - cells (list): List of cell dicts with: - cell_id_notebook (int): Position in notebook - cell_type (str): "code", "markdown", or "raw" - cell_execution_number (int|None): IPython counter or null - source (str): Cell source code - error (str): Error message if failed """ result = _send_and_wait( { "type": "get_cells_by_index", "cell_id_notebooks": cell_id_notebooks, }, timeout_s=timeout_s, ) return result
[docs] def delete_cells_by_index( cell_id_notebooks: List[int], timeout_s: float = 2.0 ) -> Dict[str, Any]: """ Delete cells by position index (works for ALL cells including unexecuted ones). This function deletes cells from the notebook by their position, allowing deletion of markdown cells or unexecuted code cells that don't have execution counts. IMPORTANT: This function also invalidates the output cache for any deleted cells that had execution counts, preventing stale data issues. Args: cell_id_notebooks: List of cell indices to delete (0-indexed positions) timeout_s: How long to wait for response from frontend (default 2.0s) Returns: Dictionary with: - success (bool): Whether the operation succeeded - deleted_count (int): Number of cells deleted - cleared_count (int): Number of cells cleared (if last cell) - invalidated_exec_counts (list): Execution counts that were invalidated - message (str): Status message """ result = _send_and_wait( { "type": "delete_cells_by_index", "cell_id_notebooks": cell_id_notebooks, }, timeout_s=timeout_s, ) # Invalidate cache entries for deleted cells if result.get("success") and result.get("invalidated_exec_counts"): with _STATE_LOCK: for exec_count in result["invalidated_exec_counts"]: _CELL_OUTPUTS_CACHE.pop(exec_count, None) logger.debug( f"Invalidated cache for {len(result['invalidated_exec_counts'])} cells" ) return result