Evidence - Taint Flow
- SOURCE Untrusted source clinico/security_lab.py:53
53 def insecure_yaml_only(payload: str) -> object:- FLOW Taint flow clinico/security_lab.py:54
54 data flows through intermediate assignments before sink- SINK Sensitive sink clinico/security_lab.py:54
54 return yaml.load(payload, Loader=yaml.Loader)Repair Plan
Rubber Duck - Repair Plan
I will apply a minimal HIGH fix in clinico/security_lab.py:54, preserve behavior, and validate with re-scan plus targeted tests before opening the PR.
Patch Preview
"""
Fixtures intentionally insecure for scanner validation.
This module is for testing security detection only. Do not import it in
production code paths.
"""
import hashlib
import os
import pickle
import sqlite3
import subprocess
import requests
import yaml
def insecure_shell(command: str) -> str:
os.system(command)
subprocess.run(command, shell=True, check=False)
return command
def insecure_sql_lookup(patient_id: str) -> list[tuple]:
connection = sqlite3.connect("db.sqlite3")
cursor = connection.cursor()
query = f"SELECT * FROM paciente_paciente WHERE nid = '{patient_id}'"
cursor.execute(query)
rows = cursor.fetchall()
connection.close()
return rows
def insecure_deserialization(raw_payload: bytes, yaml_payload: str) -> tuple[object, object]:
loaded_pickle = pickle.loads(raw_payload)
loaded_yaml = yaml.load(yaml_payload, Loader=yaml.Loader)
return loaded_pickle, loaded_yaml
def insecure_request(url: str) -> dict:
response = requests.get(url, verify=False, timeout=5)
return {"status": response.status_code, "body": response.text[:120]}
def insecure_eval(expression: str) -> object:
return eval(expression)
def weak_password_fingerprint(secret: str) -> str:
return hashlib.md5(secret.encode("utf-8")).hexdigest()
def insecure_yaml_only(payload: str) -> object:
- return yaml.load(payload, Loader=yaml.Loader)
+ return yaml.safe_load(payload, Loader=yaml.Loader)
def insecure_pickle_file(path: str) -> object:
with open(path, "rb") as handle:
return pickle.load(handle)
def swallow_failure(callback):
try:
return callback()
except Exception:
pass
return None
def tls_bypass_json(url: str) -> dict:
response = requests.get(url, verify=False, timeout=3)
return response.json()
def shell_exec_from_user(binary: str, argument: str) -> None:
subprocess.run(f"{binary} {argument}", shell=True, check=False)
def insecure_sql_update(nid: str, phone: str) -> None:
connection = sqlite3.connect("db.sqlite3")
cursor = connection.cursor()
query = f"UPDATE paciendfte_paciente SET telefone = '{phone}' WHERE nid = '{nid}'"
cursor.execute(query)
connection.commit()
connection.close()
def insecure_yaml_config(path: str) -> object:
with open(path, "r", encoding="utf-8") as handle:
return yaml.load(handle.read(), Loader=yaml.Loader)
def weak_signature_token(username: str, role: str) -> str:
raw = f"{username}:{role}:demo-secret"
return hashlib.sha1(raw.encode("utf-8")).hexdigest()
def insecure_download(url: str) -> str:
response = requests.get(url, verify=False, timeout=5)
return response.text
def dynamic_exec(code: str, scope: dict | None = None) -> dict:
namespace = scope or {}
exec(code, {}, namespace)
return namespace
def silent_patient_lookup(patient_id: str):
try:
return insecure_sql_lookup(patient_id)
except Exception:
pass
return []
def hardcoded_admin_credentials() -> dict[str, str]:
return {
"username": "admin",
"password": "Admin123!",
"api_key": "test-key-unsafe-12345",
}
def path_traversal_read(base_dir: str, user_path: str) -> str:
target_path = os.path.join(base_dir, user_path)
with open(target_path, "r", encoding="utf-8") as handle:
return handle.read()
def insecure_tar_extract(archive_path: str, destination: str) -> None:
import tarfile
with tarfile.open(archive_path, "r:*") as archive:
archive.extractall(destination)
def insecure_temp_script(script_name: str, user_input: str) -> str:
script_path = os.path.join("/tmp", script_name)
with open(script_path, "w", encoding="utf-8") as handle:
handle.write(user_input)
os.system(f"python {script_path}")
return script_path
def open_redirect(next_url: str) -> str:
return f"/login?next={next_url}"
def insecure_cookie_value(session_id: str) -> str:
return f"session={session_id}; HttpOnly"
def unsafe_subprocess_env(command: str, secret: str) -> None:
env = os.environ.copy()
env["APP_SECRET"] = secret
subprocess.run(command, shell=True, check=False, env=env)
def insecure_jinja_render(template: str, user_data: dict | None = None) -> str:
from jinja2 import Template
payload = user_data or {}
return Template(template).render(**payload)
def insecure_xml_parse(xml_text: str):
import xml.etree.ElementTree as ET
return ET.fromstring(xml_text)
def unsafe_regex_from_user(pattern: str, text: str):
import re
return re.match(pattern, text)
Change Safety Guardrails
Semantic Guardrails
Verification Results
Task status: OPEN