本文目录导读:

我来详细解释一下Python沙箱的实现方案。
什么是Python沙箱?
沙箱是一个隔离的执行环境,用于安全地运行不可信代码,限制其对系统资源的访问。
基础沙箱实现方案
使用exec限制命名空间
def basic_sandbox(code, timeout=5):
"""
基础沙箱 - 限制可访问的模块和函数
"""
# 限制可用的内置函数
allowed_builtins = {
'print': print,
'len': len,
'range': range,
'int': int,
'str': str,
'list': list,
'dict': dict,
'tuple': tuple,
'True': True,
'False': False,
'None': None,
}
# 创建安全的命名空间
safe_globals = {
'__builtins__': allowed_builtins,
'__name__': '__sandbox__',
}
safe_locals = {}
try:
exec(code, safe_globals, safe_locals)
return safe_locals
except Exception as e:
return f"执行错误: {e}"
# 测试
user_code = """
result = [x**2 for x in range(10)]
print(result)
"""
basic_sandbox(user_code)
使用ast模块解析和验证
import ast
import sys
class SandboxVisitor(ast.NodeVisitor):
"""AST访问者,用于检查代码安全性"""
def __init__(self):
self.allowed_nodes = {
ast.Expr, ast.Assign, ast.Name, ast.Constant,
ast.BinOp, ast.UnaryOp, ast.Compare, ast.BoolOp,
ast.List, ast.Tuple, ast.Dict, ast.Set,
ast.Call, ast.Attribute, ast.Subscript,
ast.If, ast.For, ast.While, ast.Break,
ast.Continue, ast.Return, ast.FunctionDef,
ast.Lambda, ast.FormattedValue, ast.JoinedStr,
}
self.dangerous_attributes = [
'__import__', 'open', 'exec', 'eval', 'compile',
'__subclasses__', '__globals__', '__builtins__',
'os.', 'sys.', 'subprocess.', 'shutil.',
]
def visit_Call(self, node):
"""检查函数调用"""
if isinstance(node.func, ast.Name):
func_name = node.func.id
if func_name in ['eval', 'exec', 'compile', 'open', '__import__']:
raise SecurityError(f"禁止调用: {func_name}")
if isinstance(node.func, ast.Attribute):
full_name = self._get_full_name(node.func)
for dangerous in self.dangerous_attributes:
if dangerous in full_name.lower():
raise SecurityError(f"禁止访问: {full_name}")
self.generic_visit(node)
def _get_full_name(self, node):
"""获取属性访问的完整名称"""
if isinstance(node, ast.Name):
return node.id
elif isinstance(node, ast.Attribute):
return f"{self._get_full_name(node.value)}.{node.attr}"
return ""
class SecurityError(Exception):
pass
def ast_sandbox(code):
"""
使用AST进行代码安全检查
"""
try:
tree = ast.parse(code)
visitor = SandboxVisitor()
visitor.visit(tree)
# 编译并执行
compiled = compile(tree, '<sandbox>', 'exec')
safe_builtins = {
'print': print,
'range': range,
'len': len,
'int': int,
'float': float,
'str': str,
'bool': bool,
'list': list,
'dict': dict,
'tuple': tuple,
}
exec(compiled, {'__builtins__': safe_builtins})
except SecurityError as e:
return f"安全检查失败: {e}"
except Exception as e:
return f"执行错误: {e}"
# 测试
test_codes = [
"print('Hello, World!')",
"x = [i**2 for i in range(10)]",
"import os # 应该被阻止",
"__import__('os').system('ls')", # 应该被阻止
]
for code in test_codes:
print(f"执行: {code}")
print(f"结果: {ast_sandbox(code)}")
print("-" * 40)
限制资源使用
import resource
import signal
import threading
class ResourceLimitSandbox:
"""资源受限的沙箱"""
def __init__(self, cpu_time=1, memory_mb=100):
self.cpu_time = cpu_time
self.memory_bytes = memory_mb * 1024 * 1024
def run(self, code, globals_dict=None):
"""在资源限制下运行代码"""
if globals_dict is None:
globals_dict = {
'__builtins__': self._get_safe_builtins()
}
result = []
error = None
def execute():
nonlocal error
try:
# 设置资源限制
resource.setrlimit(resource.RLIMIT_CPU,
(self.cpu_time, self.cpu_time))
resource.setrlimit(resource.RLIMIT_AS,
(self.memory_bytes, self.memory_bytes))
exec(code, globals_dict)
result.append(True)
except Exception as e:
error = str(e)
result.append(False)
thread = threading.Thread(target=execute)
thread.daemon = True
thread.start()
thread.join(timeout=self.cpu_time)
if thread.is_alive():
return "超时: 代码执行时间过长"
if not result[0]:
return f"执行错误: {error}"
return "执行成功"
def _get_safe_builtins(self):
"""获取安全的内置函数"""
return {
'print': print,
'len': len,
'range': range,
'int': int,
'str': str,
'list': list,
'dict': dict,
'tuple': tuple,
'True': True,
'False': False,
'None': None,
}
# 测试
sandbox = ResourceLimitSandbox(cpu_time=1, memory_mb=100)
# 测试正常代码
print(sandbox.run("print('Hello')"))
# 测试无限循环
print(sandbox.run("while True: pass"))
# 测试内存溢出
print(sandbox.run("data = [0] * (1024**3)"))
使用子进程隔离
import subprocess
import json
import tempfile
import os
def subprocess_sandbox(code, timeout=5):
"""
使用子进程实现完全隔离
"""
# 创建临时文件
with tempfile.NamedTemporaryFile(mode='w', suffix='.py',
delete=False) as f:
f.write(f"""
import sys
import json
def safe_exec():
try:
{code}
except Exception as e:
print(f"Error: {{e}}", file=sys.stderr)
sys.exit(1)
safe_exec()
""")
temp_file = f.name
try:
# 在子进程中执行
result = subprocess.run(
[sys.executable, temp_file],
capture_output=True,
text=True,
timeout=timeout,
# 限制进程权限(Unix)
preexec_fn=lambda: os.setuid(65534) if os.name == 'posix' else None
)
if result.returncode == 0:
return result.stdout
else:
return f"错误: {result.stderr}"
except subprocess.TimeoutExpired:
return "超时: 代码执行时间过长"
except Exception as e:
return f"执行错误: {e}"
finally:
# 清理临时文件
try:
os.unlink(temp_file)
except:
pass
# 测试
print(subprocess_sandbox("print('Hello from subprocess!')"))
使用Docker容器隔离(高级方案)
import docker
import uuid
class DockerSandbox:
"""使用Docker容器实现完全隔离"""
def __init__(self, image='python:3.9-slim'):
self.client = docker.from_env()
self.image = image
def run_code(self, code, timeout=30):
"""在Docker容器中运行代码"""
container_name = f"sandbox_{uuid.uuid4().hex[:8]}"
# 创建代码文件
code_file = f"/tmp/{container_name}.py"
with open(code_file, 'w') as f:
f.write(code)
try:
container = self.client.containers.run(
self.image,
f"python /code/{container_name}.py",
volumes={
code_file: {
'bind': f'/code/{container_name}.py',
'mode': 'ro'
}
},
name=container_name,
detach=True,
# 限制资源
mem_limit='100m',
cpu_period=100000,
cpu_quota=50000, # 0.5 CPU
network_disabled=True, # 禁用网络
read_only=True, # 只读文件系统
remove=True, # 自动删除容器
)
# 等待结果
result = container.wait(timeout=timeout)
logs = container.logs(stdout=True, stderr=True)
if result['StatusCode'] == 0:
return logs.decode('utf-8')
else:
return f"错误: {logs.decode('utf-8')}"
except docker.errors.APIError as e:
return f"Docker错误: {e}"
except Exception as e:
return f"执行错误: {e}"
finally:
# 清理
try:
os.remove(code_file)
except:
pass
# 使用示例(需要安装docker和python-docker库)
# sandbox = DockerSandbox()
# print(sandbox.run_code("print('Hello from Docker!')"))
安全性检查清单
class SecurityChecklist:
"""安全检查清单"""
@staticmethod
def check_imports(code):
"""检查是否导入危险模块"""
dangerous_modules = [
'os', 'sys', 'subprocess', 'shutil',
'ctypes', 'socket', 'requests',
'pickle', 'marshal', 'shelve',
]
for module in dangerous_modules:
if f"import {module}" in code or f"from {module}" in code:
return False, f"禁止导入模块: {module}"
return True, "通过"
@staticmethod
def check_dangerous_functions(code):
"""检查危险函数调用"""
dangerous_funcs = [
'eval(', 'exec(', 'compile(',
'open(', '__import__(',
'getattr(', 'setattr(', 'delattr(',
]
for func in dangerous_funcs:
if func in code:
return False, f"禁止调用函数: {func}"
return True, "通过"
@staticmethod
def check_file_operations(code):
"""检查文件操作"""
file_operations = [
'file(', 'open(',
'write(', 'read(',
'os.remove', 'os.unlink',
'shutil.',
]
for op in file_operations:
if op in code:
return False, f"禁止文件操作: {op}"
return True, "通过"
# 综合沙箱
def comprehensive_sandbox(code, timeout=5, memory_limit=100):
"""综合安全检查沙箱"""
# 1. 静态代码检查
checks = [
SecurityChecklist.check_imports,
SecurityChecklist.check_dangerous_functions,
SecurityChecklist.check_file_operations,
]
for check in checks:
passed, message = check(code)
if not passed:
return f"安全拦截: {message}"
# 2. AST验证
try:
tree = ast.parse(code)
visitor = SandboxVisitor()
visitor.visit(tree)
except SyntaxError as e:
return f"语法错误: {e}"
except SecurityError as e:
return f"安全错误: {e}"
# 3. 资源限制执行
sandbox = ResourceLimitSandbox(cpu_time=timeout, memory_mb=memory_limit)
return sandbox.run(code)
# 使用示例
user_code = """
def calculate():
result = sum(range(100))
return result
print(f"计算结果: {calculate()}")
"""
print(comprehensive_sandbox(user_code))
注意事项
- 没有绝对安全:Python的沙箱很难做到100%安全
- 使用现成方案:考虑使用PyPy、pybox等成熟方案
- 生产环境:强烈建议使用容器化方案(Docker)
- 性能考虑:沙箱会带来额外性能开销
- 定制化需求:根据实际需求选择合适的隔离级别
选择哪种沙箱方案取决于你的具体需求:安全级别、性能要求、部署环境等,对于生产环境,推荐使用Docker等容器化方案。