Turn a Kibana JSON log export into a runnable pytest suite using the secure-log2test CLI. Use when the user has a Kibana or Elasticsearch JSON export of API traffic and wants a regression suite from production logs, when extracting test cases from staging traffic, when scrubbing auth headers or secret-looking body fields before logs leave the laptop, when bridging Kibana-captured requests into a pytest-based suite for CI, when the user mentions Kibana logs, Elasticsearch JSON export, log-to-test conversion, log replay tests, auth header redaction, PII in logs, or regression tests from production traffic.
92
100%
Does it follow best practices?
Impact
93%
1.00xAverage score across 2 eval scenarios
Passed
No known issues
"""Edge cases: header redaction, empty bodies, server error codes."""
import json
import pytest
from secure_log2test.core.parser import (
KibanaLogEntry,
KibanaLogParser,
REDACTED,
redact_body,
redact_headers,
)
# ---------------------------------------------------------------------------
# Header redaction
# ---------------------------------------------------------------------------
def test_authorization_header_redacted():
cleaned = redact_headers({"Authorization": "Bearer secret-token-xyz"})
assert cleaned == {"Authorization": REDACTED}
def test_cookie_header_redacted():
cleaned = redact_headers({"Cookie": "session=abc123"})
assert cleaned["Cookie"] == REDACTED
def test_redaction_is_case_insensitive():
cleaned = redact_headers(
{
"AUTHORIZATION": "Bearer x",
"x-api-key": "k1",
"X-Auth-Token": "t1",
}
)
assert cleaned["AUTHORIZATION"] == REDACTED
assert cleaned["x-api-key"] == REDACTED
assert cleaned["X-Auth-Token"] == REDACTED
def test_dpop_and_webhook_signature_headers_redacted():
# DPoP (RFC 9449) carries a signed proof JWT; webhook signature headers
# carry an HMAC derived from a shared secret. Neither name contains the
# substrings the regex looks for, so they need explicit coverage.
cleaned = redact_headers(
{
"DPoP": "eyJhbGciOiJFUzI1Ni.eyJodHUiOiJodHRwczovL2FwaSJ9.signaturepart",
"X-Hub-Signature-256": "sha256=8f3a9c0deadbeef",
"X-Hub-Signature": "sha1=0123456789abcdef",
}
)
assert cleaned["DPoP"] == REDACTED
assert cleaned["X-Hub-Signature-256"] == REDACTED
assert cleaned["X-Hub-Signature"] == REDACTED
def test_safe_headers_preserved():
headers = {
"Content-Type": "application/json",
"Accept": "*/*",
"User-Agent": "pytest/7",
}
assert redact_headers(headers) == headers
def test_redaction_does_not_mutate_input():
original = {"Authorization": "Bearer x", "Accept": "*/*"}
snapshot = dict(original)
redact_headers(original)
assert original == snapshot
def test_empty_headers_return_empty_dict():
assert redact_headers({}) == {}
assert redact_headers(None) == {}
def test_entry_redacts_headers_at_construction():
entry = KibanaLogEntry(
method="GET",
url="/api/v1/users",
status=200,
headers={"Authorization": "Bearer leaky", "Accept": "json"},
)
assert entry.headers["Authorization"] == REDACTED
assert entry.headers["Accept"] == "json"
def test_entry_redaction_on_parser_output(tmp_path):
payload = {
"hits": {
"hits": [
{
"_source": {
"method": "POST",
"url": "/api/v1/login",
"status": 200,
"headers": {"Authorization": "Bearer leaked"},
}
},
]
}
}
src = tmp_path / "with_auth.json"
src.write_text(json.dumps(payload))
entries = KibanaLogParser(src).parse()
assert len(entries) == 1
assert entries[0].headers["Authorization"] == REDACTED
# ---------------------------------------------------------------------------
# Empty / missing body
# ---------------------------------------------------------------------------
def test_body_defaults_to_none():
entry = KibanaLogEntry(method="GET", url="/", status=200)
assert entry.body is None
def test_empty_string_body_preserved():
entry = KibanaLogEntry(method="POST", url="/api/echo", status=200, body="")
assert entry.body == ""
def test_empty_dict_body_preserved():
entry = KibanaLogEntry(method="POST", url="/api/echo", status=200, body={})
assert entry.body == {}
def test_parser_handles_entries_without_body(tmp_path):
payload = {
"hits": {
"hits": [
{"_source": {"method": "GET", "url": "/a", "status": 200}},
{"_source": {"method": "GET", "url": "/b", "status": 200, "body": ""}},
{
"_source": {
"method": "GET",
"url": "/c",
"status": 200,
"body": None,
}
},
]
}
}
src = tmp_path / "no_body.json"
src.write_text(json.dumps(payload))
entries = KibanaLogParser(src).parse()
assert len(entries) == 3
assert [e.body for e in entries] == [None, "", None]
# ---------------------------------------------------------------------------
# Server-error status codes (5xx)
# ---------------------------------------------------------------------------
@pytest.mark.parametrize("code", [500, 502, 503, 504, 599])
def test_5xx_codes_accepted(code):
entry = KibanaLogEntry(method="GET", url="/api/health", status=code)
assert entry.status == code
def test_parser_preserves_5xx_codes(tmp_path):
payload = {
"hits": {
"hits": [
{"_source": {"method": "GET", "url": "/x", "status": 500}},
{"_source": {"method": "GET", "url": "/y", "status": 503}},
{"_source": {"method": "GET", "url": "/z", "status": 504}},
]
}
}
src = tmp_path / "errors.json"
src.write_text(json.dumps(payload))
entries = KibanaLogParser(src).parse()
statuses = sorted(e.status for e in entries)
assert statuses == [500, 503, 504]
# ---------------------------------------------------------------------------
# Hardened header redaction (regex pattern fallback)
# ---------------------------------------------------------------------------
def test_custom_token_header_caught_by_pattern():
cleaned = redact_headers({"X-Custom-Token": "secret"})
assert cleaned["X-Custom-Token"] == REDACTED
def test_csrf_token_redacted():
cleaned = redact_headers({"X-CSRF-Token": "abc"})
assert cleaned["X-CSRF-Token"] == REDACTED
def test_refresh_token_header_redacted():
cleaned = redact_headers({"Refresh-Token": "rt-1"})
assert cleaned["Refresh-Token"] == REDACTED
# ---------------------------------------------------------------------------
# Body redaction walker
# ---------------------------------------------------------------------------
def test_redact_body_password_field():
assert redact_body({"username": "u", "password": "p"}) == {
"username": "u",
"password": REDACTED,
}
def test_redact_body_oauth_refresh_token():
assert redact_body({"refresh_token": "rt", "expires_in": 3600}) == {
"refresh_token": REDACTED,
"expires_in": 3600,
}
def test_redact_body_nested_dict():
cleaned = redact_body({"auth": {"bearer": "x"}, "data": {"value": 1}})
assert cleaned["auth"] == REDACTED # whole subtree redacted at the "auth" key
assert cleaned["data"] == {"value": 1}
def test_redact_body_list_of_dicts():
payload = [
{"name": "Alice", "api_key": "k1"},
{"name": "Bob", "secret": "s2"},
]
cleaned = redact_body(payload)
assert cleaned[0] == {"name": "Alice", "api_key": REDACTED}
assert cleaned[1] == {"name": "Bob", "secret": REDACTED}
def test_redact_body_non_dict_passthrough():
assert redact_body("plain string") == "plain string"
assert redact_body(42) == 42
assert redact_body(None) is None
def test_log_entry_body_validator_redacts():
entry = KibanaLogEntry(
method="POST",
url="/login",
status=200,
body={"username": "u", "password": "p"},
)
assert entry.body == {"username": "u", "password": REDACTED}.tessl-plugin
evals
scenario-1
scenario-2
secure_log2test
tests