Source code for instrmcp.servers.jupyter_qcodes.mcp_server

"""
FastMCP server implementation for Jupyter QCoDeS integration.

This server provides read-only access to QCoDeS instruments and
Jupyter notebook functionality through MCP tools.
"""

import asyncio
import secrets
import sys
import threading
from typing import Dict, Any, Optional

from fastmcp import FastMCP

from .tools import QCodesReadOnlyTools
from .core.notebook_unsafe_tools import UnsafeToolRegistrar
from .core import (
    QCodesToolRegistrar,
    NotebookToolRegistrar,
    ResourceRegistrar,
)

# Dynamic tool integration (optional, requires dangerous mode + dynamictool option)
try:
    from .options.dynamic_tool import DynamicToolRegistrar

    DYNAMICTOOL_AVAILABLE = True
except ImportError:
    DynamicToolRegistrar = None
    DYNAMICTOOL_AVAILABLE = False
from instrmcp.utils.logging_config import get_logger
from instrmcp.utils.metadata_config import (
    load_config as load_metadata_config,
    validate_config_against_server,
    MetadataConfig,
)

# Tool transformation imports
try:
    from fastmcp.tools.tool_transform import ToolTransformConfig, ArgTransformConfig

    TOOL_TRANSFORM_AVAILABLE = True
except ImportError:
    ToolTransformConfig = None  # type: ignore[misc, assignment]
    ArgTransformConfig = None  # type: ignore[misc, assignment]
    TOOL_TRANSFORM_AVAILABLE = False

# MeasureIt integration (optional)
try:
    from .options import measureit as measureit_module
    from .options.measureit import MeasureItToolRegistrar

    MEASUREIT_AVAILABLE = True
except ImportError:
    measureit_module = None
    MeasureItToolRegistrar = None
    MEASUREIT_AVAILABLE = False

# Database integration (optional)
try:
    from .options import database as db_integration
    from .options.database import DatabaseToolRegistrar

    DATABASE_AVAILABLE = True
except ImportError:
    db_integration = None
    DatabaseToolRegistrar = None
    DATABASE_AVAILABLE = False

logger = get_logger("server")


[docs] class JupyterMCPServer: """MCP server for Jupyter QCoDeS integration."""
[docs] def __init__( self, ipython, host: str = "127.0.0.1", port: int = 8123, safe_mode: bool = True, dangerous_mode: bool = False, enabled_options: set = None, ): self.ipython = ipython self.host = host self.port = port self.safe_mode = safe_mode self.dangerous_mode = dangerous_mode self.enabled_options = enabled_options or set() self.running = False # Thread-isolated server state self._server_thread: Optional[threading.Thread] = None self._server_loop: Optional[asyncio.AbstractEventLoop] = None self._uvicorn_server = None self._ready_event = threading.Event() self._thread_error: Optional[Exception] = None self._server_started = False # Own flag, not dependent on uvicorn internals # Generate a random token for basic security self.token = secrets.token_urlsafe(32) # Load metadata config early (needed for resource descriptions) self.metadata_config = self._load_metadata_config() # Initialize tools self.tools = QCodesReadOnlyTools(ipython) # Create FastMCP server server_name = ( f"Jupyter QCoDeS MCP Server ({'Safe' if safe_mode else 'Unsafe'} Mode)" ) self.mcp = FastMCP(server_name) self._register_resources() self._register_tools() # Apply metadata overrides from config file (tool transformations) self._apply_metadata_overrides() mode_status = "safe" if safe_mode else "unsafe" logger.debug( f"Jupyter MCP Server initialized on {host}:{port} in {mode_status} mode" )
def _load_metadata_config(self) -> Optional[MetadataConfig]: """Load metadata configuration (baseline + user overrides). Returns MetadataConfig instance or None if loading fails. """ try: return load_metadata_config() except ImportError: # PyYAML not installed logger.debug("PyYAML not installed, skipping metadata config") return None except ValueError as e: logger.error(f"Invalid metadata config: {e}") return None def _register_resources(self): """Register MCP resources using the ResourceRegistrar.""" resource_registrar = ResourceRegistrar( self.mcp, self.tools, enabled_options=self.enabled_options, measureit_module=measureit_module if MEASUREIT_AVAILABLE else None, db_module=db_integration if DATABASE_AVAILABLE else None, metadata_config=self.metadata_config, ) resource_registrar.register_all() def _register_tools(self): """Register all MCP tools using core.""" # QCodes instrument tools qcodes_registrar = QCodesToolRegistrar(self.mcp, self.tools) qcodes_registrar.register_all() # Notebook tools (read-only) notebook_registrar = NotebookToolRegistrar( self.mcp, self.tools, self.ipython, safe_mode=self.safe_mode, dangerous_mode=self.dangerous_mode, enabled_options=self.enabled_options, ) notebook_registrar.register_all() # Unsafe mode tools (if enabled) # Create consent manager for unsafe tools if not self.safe_mode: from instrmcp.servers.jupyter_qcodes.security.consent import ConsentManager # Use infinite timeout for consent requests # User will wait as long as needed to review and approve # In dangerous mode, bypass all consent dialogs consent_manager_for_unsafe = ConsentManager( self.ipython, timeout_seconds=None, bypass_mode=self.dangerous_mode ) unsafe_registrar = UnsafeToolRegistrar( self.mcp, self.tools, consent_manager_for_unsafe ) unsafe_registrar.register_all() # Optional: MeasureIt tools if MEASUREIT_AVAILABLE and "measureit" in self.enabled_options: measureit_registrar = MeasureItToolRegistrar(self.mcp, self.tools) measureit_registrar.register_all() # Optional: Database tools if DATABASE_AVAILABLE and "database" in self.enabled_options: database_registrar = DatabaseToolRegistrar( self.mcp, db_integration, tools=self.tools, safe_mode=self.safe_mode, ) database_registrar.register_all() # Dynamic tool creation (meta-tools) # Requires dangerous mode AND explicit opt-in via %mcp_option dynamictool if self.dangerous_mode and "dynamictool" in self.enabled_options: auto_correct_json = "auto_correct_json" in self.enabled_options # Consent is enabled by default, can be bypassed via INSTRMCP_CONSENT_BYPASS=1 # In dangerous mode, bypass all consent dialogs require_consent = True dynamic_registrar = DynamicToolRegistrar( self.mcp, self.ipython, auto_correct_json=auto_correct_json, require_consent=require_consent, bypass_consent=self.dangerous_mode, ) dynamic_registrar.register_all() # Commented out: Parameter subscription tools (future feature) # @self.mcp.tool() # async def subscribe_parameter(instrument: str, parameter: str, interval_s: float = 1.0): # """Subscribe to periodic parameter updates.""" # pass # Commented out: System tools (future feature) # @self.mcp.tool() # async def get_cache_stats(): # """Get parameter cache statistics.""" # pass def _apply_metadata_overrides(self) -> None: """Apply tool and resource metadata overrides from config. Uses self.metadata_config (baseline + user overrides) to apply: - Tool overrides via FastMCP's add_tool_transformation() - Resource overrides via direct FunctionResource attribute modification Note: Resource descriptions are already set during registration via ResourceRegistrar. This method handles tool transformations and updates FunctionResource wrappers for resources/list responses. """ if self.metadata_config is None: logger.debug("No metadata config loaded, skipping overrides") return config = self.metadata_config # Skip if config is empty (no overrides) if not config.tools and not config.resources and not config.resource_templates: logger.debug("No metadata overrides configured") return # Apply tool overrides self._apply_tool_overrides(config) # Apply resource overrides (updates FunctionResource wrappers) self._apply_resource_overrides(config) logger.info( f"Applied metadata overrides: {len(config.tools)} tools, " f"{len(config.resources) + len(config.resource_templates)} resources" ) def _apply_tool_overrides(self, config: MetadataConfig) -> None: """Apply tool metadata overrides using FastMCP's transformation API.""" if not TOOL_TRANSFORM_AVAILABLE: if config.tools: logger.warning( "Tool transformations not available (FastMCP < 2.8.0), " "skipping tool overrides" ) return # Get registered tools for validation # Note: mcp.get_tools() is async, but we're in sync context during __init__ # We'll validate tool names lazily - invalid names will be caught by FastMCP for tool_name, overrides in config.tools.items(): try: # Build argument transformations arg_transforms: Dict[str, Any] = {} for arg_name, arg_config in overrides.arguments.items(): if arg_config.description: arg_transforms[arg_name] = ArgTransformConfig( description=arg_config.description ) # Build tool transformation config transform = ToolTransformConfig( title=overrides.title, description=overrides.description, arguments=arg_transforms if arg_transforms else {}, ) # Apply transformation self.mcp.add_tool_transformation(tool_name, transform) logger.debug(f"Applied metadata override for tool: {tool_name}") except Exception as e: if config.strict: logger.error( f"Failed to apply tool override for '{tool_name}': {e}" ) else: logger.warning( f"Failed to apply tool override for '{tool_name}': {e} (skipped)" ) def _apply_resource_overrides(self, config: MetadataConfig) -> None: """Apply resource metadata overrides via direct attribute modification. FastMCP has no resource transformation API, so we modify FunctionResource attributes directly after registration. """ # Get registered resources # FastMCP.get_resources() is async, so we need to run it # In Jupyter there's usually already a running event loop try: try: loop = asyncio.get_running_loop() # Already in an event loop (e.g., Jupyter) - use nest_asyncio or thread import concurrent.futures with concurrent.futures.ThreadPoolExecutor() as executor: future = executor.submit(asyncio.run, self.mcp.get_resources()) registered = future.result(timeout=5.0) except RuntimeError: # No running loop - safe to use asyncio.run() registered = asyncio.run(self.mcp.get_resources()) logger.debug( f"Got {len(registered)} registered resources: {list(registered.keys())}" ) except Exception as e: logger.warning(f"Could not get registered resources: {e}") return # Combine resources and resource_templates for processing all_overrides = {**config.resources, **config.resource_templates} for uri, overrides in all_overrides.items(): if uri not in registered: if config.strict: logger.error(f"Config references unknown resource: {uri}") else: logger.warning( f"Config references unknown resource: {uri} (skipped)" ) continue try: resource = registered[uri] # Apply name override if overrides.name: resource.name = overrides.name # Compose and apply description override composed_desc = overrides.compose_description() if composed_desc: resource.description = composed_desc logger.debug(f"Applied metadata override for resource: {uri}") except Exception as e: if config.strict: logger.error(f"Failed to apply resource override for '{uri}': {e}") else: logger.warning( f"Failed to apply resource override for '{uri}': {e} (skipped)" ) def _cancel_all_tasks(self, loop: asyncio.AbstractEventLoop) -> None: """Cancel all pending tasks in the event loop. This ensures proper cleanup of FastMCP's embedded Docket worker and any other background tasks before closing the loop. """ try: # Get all tasks for this loop pending = asyncio.all_tasks(loop) if not pending: return logger.debug(f"Cancelling {len(pending)} pending tasks") # Cancel all tasks for task in pending: task.cancel() # Wait for all tasks to complete their cancellation # Use a timeout to avoid hanging forever loop.run_until_complete(asyncio.wait(pending, timeout=2.0)) # Log any tasks that didn't cancel cleanly for task in pending: if not task.done(): logger.warning(f"Task {task.get_name()} did not cancel in time") elif task.cancelled(): logger.debug(f"Task {task.get_name()} cancelled successfully") elif task.exception(): # Suppress CancelledError, log others exc = task.exception() if not isinstance(exc, asyncio.CancelledError): logger.debug(f"Task {task.get_name()} raised: {exc}") except Exception as e: logger.debug(f"Error during task cancellation: {e}") def _run_server_in_thread(self): """Thread target: runs uvicorn with its own event loop. This runs in a dedicated background thread, isolated from IPython's main event loop. This allows %gui qt and other event loop changes to not affect the HTTP server. """ # Windows: create a selector-based loop for THIS thread only # Don't use set_event_loop_policy() - that's process-global and would # affect IPython/qasync in the main thread if sys.platform == "win32": policy = asyncio.WindowsSelectorEventLoopPolicy() self._server_loop = policy.new_event_loop() else: self._server_loop = asyncio.new_event_loop() asyncio.set_event_loop(self._server_loop) try: self._server_loop.run_until_complete(self._async_serve()) except Exception as e: self._thread_error = e self._ready_event.set() # Unblock start_sync even on failure logger.error(f"Server thread error: {e}") finally: self._server_started = False # Cancel all pending tasks before closing the loop # This ensures FastMCP's Docket worker and other background tasks # get a chance to clean up properly if self._server_loop and not self._server_loop.is_closed(): self._cancel_all_tasks(self._server_loop) try: self._server_loop.close() except Exception: pass self._server_loop = None async def _async_serve(self): """Async server runner within dedicated thread. Uses uvicorn.Server.serve() which properly initializes all internal state (lifespan, servers, etc.) before starting. We wrap it with a startup detection task to signal readiness. """ import uvicorn app = self.mcp.http_app() config = uvicorn.Config( app, host=self.host, port=self.port, log_level="info", access_log=True, ) self._uvicorn_server = uvicorn.Server(config) # Threads can't install signal handlers self._uvicorn_server.install_signal_handlers = lambda: None # Create a task to detect when server is ready async def signal_ready(): # Poll until uvicorn reports started (checks internal state) while not self._uvicorn_server.started: if self._uvicorn_server.should_exit: return # Aborted before ready await asyncio.sleep(0.05) self._server_started = True self._ready_event.set() logger.debug(f"Uvicorn startup complete on {self.host}:{self.port}") # Start readiness monitor as background task ready_task = asyncio.create_task(signal_ready()) try: # serve() handles all lifecycle: startup, main_loop, shutdown await self._uvicorn_server.serve() finally: self._server_started = False ready_task.cancel() try: await ready_task except asyncio.CancelledError: pass
[docs] def start_sync(self): """Synchronous start - works from any context (including after %gui qt). This is the primary method for starting the server. It creates a dedicated background thread for uvicorn, making it immune to event loop changes in the main thread. """ if self._server_thread and self._server_thread.is_alive(): logger.debug("Server thread already running") return logger.debug(f"Starting Jupyter MCP server on {self.host}:{self.port}") # Clear state from previous runs self._ready_event.clear() self._thread_error = None self._uvicorn_server = None self._server_started = False self._server_thread = threading.Thread( target=self._run_server_in_thread, daemon=True, name="MCP-Server" ) self._server_thread.start() # Wait for server to be ready (or fail) ready = self._ready_event.wait(timeout=5.0) # Check for startup failure if self._thread_error: # Thread failed with error - it should have exited, but ensure cleanup self._abort_orphaned_thread() raise RuntimeError(f"Server startup failed: {self._thread_error}") if not ready: # Timeout - thread may still be starting up, could bind port later # Signal it to stop and wait briefly to prevent port race self._abort_orphaned_thread() raise RuntimeError("Server startup timed out") self.running = True logger.debug("MCP server started successfully")
def _abort_orphaned_thread(self): """Signal orphaned thread to stop and wait briefly. Called when start_sync times out or encounters an error. Prevents the thread from binding the port after we've given up. """ if self._uvicorn_server: self._uvicorn_server.should_exit = True if self._server_thread and self._server_thread.is_alive(): # Give it a brief moment to notice the stop flag self._server_thread.join(timeout=1.0) if self._server_thread.is_alive(): logger.warning( "Orphaned server thread still running after abort - " "may cause port conflicts on next start" ) # Clear references regardless - we've done our best self._server_thread = None self._uvicorn_server = None self._server_started = False self.running = False
[docs] def stop_sync(self) -> bool: """Synchronous stop - works from any context (including after %gui qt). This is the primary method for stopping the server. It signals uvicorn to exit, waits for the thread to finish, and cleans up resources. Returns: True if server stopped successfully, False if timeout occurred. """ if not self._server_thread or not self._server_thread.is_alive(): # Already stopped or never started - clean up any stale state self._server_thread = None self._uvicorn_server = None self._server_started = False self.running = False # Cleanup tools even if server wasn't running (handles leaked resources) self._cleanup_tools_sync() return True logger.debug("Stopping MCP server...") # Phase 1: Request graceful shutdown if self._uvicorn_server: self._uvicorn_server.should_exit = True # Wait briefly for graceful shutdown (allows clean client disconnection) self._server_thread.join(timeout=0.5) # Phase 2: Force exit if still running (kills active connections) if self._server_thread.is_alive(): logger.debug("Forcing server shutdown (active connections)") if self._uvicorn_server: self._uvicorn_server.force_exit = True # Force-stop the event loop to ensure thread exits if self._server_loop: try: self._server_loop.call_soon_threadsafe(self._server_loop.stop) except RuntimeError: # Loop already closed or stopping pass # Wait for thread to finish after force-stop self._server_thread.join(timeout=1.5) # Check if thread actually stopped if self._server_thread.is_alive(): logger.warning("Server thread did not stop within timeout") # Even on timeout, we've force-stopped the loop, so clear state # to allow restart attempt (port should be released soon) # Clear state - server is down or will be momentarily self._server_thread = None self._uvicorn_server = None self._server_started = False self.running = False logger.debug("MCP server stopped") # Cleanup tools AFTER server is down to avoid conflicts self._cleanup_tools_sync() return True
def _cleanup_tools_sync(self): """Run async tools cleanup synchronously. Strategy: 1. Try new_event_loop() - works when cleanup doesn't touch IPython APIs 2. If RuntimeError (IPython API access), try run_coroutine_threadsafe on server loop if still alive 3. If both fail, log warning and continue (don't block server stop) """ # First try: fresh event loop (simplest, works for most cleanup) try: loop = asyncio.new_event_loop() try: loop.run_until_complete(self.tools.cleanup()) logger.debug("Tools cleanup completed (new loop)") return finally: loop.close() except RuntimeError as e: # May fail if cleanup touches IPython APIs expecting kernel loop logger.debug(f"new_event_loop cleanup failed: {e}, trying server loop") except Exception as e: logger.warning(f"Tools cleanup failed (new loop): {e}") return # Fallback: use server loop if still alive if self._server_loop and self._server_thread and self._server_thread.is_alive(): try: future = asyncio.run_coroutine_threadsafe( self.tools.cleanup(), self._server_loop ) # Wait with timeout to avoid blocking indefinitely future.result(timeout=2.0) logger.debug("Tools cleanup completed (server loop)") except Exception as e: logger.warning(f"Tools cleanup failed (server loop): {e}")
[docs] def is_running(self) -> bool: """Thread-safe running check. Uses our own _server_started flag rather than uvicorn internals. This avoids dependency on uvicorn's internal API which may change. """ thread_alive = ( self._server_thread is not None and self._server_thread.is_alive() ) return thread_alive and self._server_started
[docs] async def start(self): """Start the MCP server (async wrapper for start_sync). This async method is kept for backward compatibility but internally uses the synchronous thread-based approach. """ self.start_sync() print(f"🚀 QCoDeS MCP Server running on http://{self.host}:{self.port}") print(f"🔑 Access token: {self.token}")
[docs] async def stop(self): """Stop the MCP server (async wrapper for stop_sync). This async method is kept for backward compatibility but internally uses the synchronous thread-based approach. Cleanup is handled by stop_sync() so we don't duplicate it here. """ success = self.stop_sync() if success: print("🛑 QCoDeS MCP Server stopped") else: print("⚠️ Server stop timed out - server may still be running")
[docs] def set_safe_mode(self, safe_mode: bool) -> Dict[str, Any]: """Change the server's safe mode setting. Note: This requires server restart to take effect for tool registration. Args: safe_mode: True for safe mode, False for unsafe mode Returns: Dictionary with status information """ old_mode = self.safe_mode self.safe_mode = safe_mode mode_status = "safe" if safe_mode else "unsafe" old_mode_status = "safe" if old_mode else "unsafe" logger.debug(f"MCP server mode changed from {old_mode_status} to {mode_status}") return { "old_mode": old_mode_status, "new_mode": mode_status, "server_running": self.running, "restart_required": True, "message": f"Server mode changed to {mode_status}. Restart required for tool changes to take effect.", }
[docs] def set_enabled_options(self, enabled_options: set) -> Dict[str, Any]: """Change the server's enabled options. Note: This requires server restart to take effect for resource registration. Args: enabled_options: Set of enabled option names Returns: Dictionary with status information """ old_options = self.enabled_options.copy() self.enabled_options = enabled_options.copy() added = enabled_options - old_options removed = old_options - enabled_options logger.debug(f"MCP server options changed: added={added}, removed={removed}") return { "old_options": sorted(old_options), "new_options": sorted(enabled_options), "added_options": sorted(added), "removed_options": sorted(removed), "server_running": self.running, "restart_required": True, "message": "Server options updated. Restart required for resource changes to take effect.", }