Files
SuperCharged-Claude-Code-Up…/.venv/lib/python3.11/site-packages/mcp/server/auth/handlers/register.py
uroma 55aafbae9a Fix project isolation: Make loadChatHistory respect active project sessions
- Modified loadChatHistory() to check for active project before fetching all sessions
- When active project exists, use project.sessions instead of fetching from API
- Added detailed console logging to debug session filtering
- This prevents ALL sessions from appearing in every project's sidebar

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-22 14:43:05 +00:00

137 lines
5.8 KiB
Python

import secrets
import time
from dataclasses import dataclass
from typing import Any
from uuid import uuid4
from pydantic import BaseModel, RootModel, ValidationError
from starlette.requests import Request
from starlette.responses import Response
from mcp.server.auth.errors import stringify_pydantic_error
from mcp.server.auth.json_response import PydanticJSONResponse
from mcp.server.auth.provider import OAuthAuthorizationServerProvider, RegistrationError, RegistrationErrorCode
from mcp.server.auth.settings import ClientRegistrationOptions
from mcp.shared.auth import OAuthClientInformationFull, OAuthClientMetadata
class RegistrationRequest(RootModel[OAuthClientMetadata]):
# this wrapper is a no-op; it's just to separate out the types exposed to the
# provider from what we use in the HTTP handler
root: OAuthClientMetadata
class RegistrationErrorResponse(BaseModel):
error: RegistrationErrorCode
error_description: str | None
@dataclass
class RegistrationHandler:
provider: OAuthAuthorizationServerProvider[Any, Any, Any]
options: ClientRegistrationOptions
async def handle(self, request: Request) -> Response:
# Implements dynamic client registration as defined in https://datatracker.ietf.org/doc/html/rfc7591#section-3.1
try:
# Parse request body as JSON
body = await request.json()
client_metadata = OAuthClientMetadata.model_validate(body)
# Scope validation is handled below
except ValidationError as validation_error:
return PydanticJSONResponse(
content=RegistrationErrorResponse(
error="invalid_client_metadata",
error_description=stringify_pydantic_error(validation_error),
),
status_code=400,
)
client_id = str(uuid4())
# If auth method is None, default to client_secret_post
if client_metadata.token_endpoint_auth_method is None:
client_metadata.token_endpoint_auth_method = "client_secret_post"
client_secret = None
if client_metadata.token_endpoint_auth_method != "none": # pragma: no branch
# cryptographically secure random 32-byte hex string
client_secret = secrets.token_hex(32)
if client_metadata.scope is None and self.options.default_scopes is not None:
client_metadata.scope = " ".join(self.options.default_scopes)
elif client_metadata.scope is not None and self.options.valid_scopes is not None:
requested_scopes = set(client_metadata.scope.split())
valid_scopes = set(self.options.valid_scopes)
if not requested_scopes.issubset(valid_scopes): # pragma: no branch
return PydanticJSONResponse(
content=RegistrationErrorResponse(
error="invalid_client_metadata",
error_description="Requested scopes are not valid: "
f"{', '.join(requested_scopes - valid_scopes)}",
),
status_code=400,
)
if not {"authorization_code", "refresh_token"}.issubset(set(client_metadata.grant_types)):
return PydanticJSONResponse(
content=RegistrationErrorResponse(
error="invalid_client_metadata",
error_description="grant_types must be authorization_code and refresh_token",
),
status_code=400,
)
# The MCP spec requires servers to use the authorization `code` flow
# with PKCE
if "code" not in client_metadata.response_types:
return PydanticJSONResponse(
content=RegistrationErrorResponse(
error="invalid_client_metadata",
error_description="response_types must include 'code' for authorization_code grant",
),
status_code=400,
)
client_id_issued_at = int(time.time())
client_secret_expires_at = (
client_id_issued_at + self.options.client_secret_expiry_seconds
if self.options.client_secret_expiry_seconds is not None
else None
)
client_info = OAuthClientInformationFull(
client_id=client_id,
client_id_issued_at=client_id_issued_at,
client_secret=client_secret,
client_secret_expires_at=client_secret_expires_at,
# passthrough information from the client request
redirect_uris=client_metadata.redirect_uris,
token_endpoint_auth_method=client_metadata.token_endpoint_auth_method,
grant_types=client_metadata.grant_types,
response_types=client_metadata.response_types,
client_name=client_metadata.client_name,
client_uri=client_metadata.client_uri,
logo_uri=client_metadata.logo_uri,
scope=client_metadata.scope,
contacts=client_metadata.contacts,
tos_uri=client_metadata.tos_uri,
policy_uri=client_metadata.policy_uri,
jwks_uri=client_metadata.jwks_uri,
jwks=client_metadata.jwks,
software_id=client_metadata.software_id,
software_version=client_metadata.software_version,
)
try:
# Register client
await self.provider.register_client(client_info)
# Return client information
return PydanticJSONResponse(content=client_info, status_code=201)
except RegistrationError as e:
# Handle registration errors as defined in RFC 7591 Section 3.2.2
return PydanticJSONResponse(
content=RegistrationErrorResponse(error=e.error, error_description=e.error_description),
status_code=400,
)