CtrlK
BlogDocsLog inGet started
Tessl Logo

golikovichev/secure-log2test

Turn a Kibana JSON log export into a runnable pytest suite using the secure-log2test CLI. Use when the user has a Kibana or Elasticsearch JSON export of API traffic and wants a regression suite from production logs, when extracting test cases from staging traffic, when scrubbing auth headers or secret-looking body fields before logs leave the laptop, when bridging Kibana-captured requests into a pytest-based suite for CI, when the user mentions Kibana logs, Elasticsearch JSON export, log-to-test conversion, log replay tests, auth header redaction, PII in logs, or regression tests from production traffic.

92

1.00x
Quality

100%

Does it follow best practices?

Impact

93%

1.00x

Average score across 2 eval scenarios

SecuritybySnyk

Passed

No known issues

Overview
Quality
Evals
Security
Files

test_edge_cases.pytests/

"""Edge cases: header redaction, empty bodies, server error codes."""

import json

import pytest

from secure_log2test.core.parser import (
    KibanaLogEntry,
    KibanaLogParser,
    REDACTED,
    redact_body,
    redact_headers,
)


# ---------------------------------------------------------------------------
# Header redaction
# ---------------------------------------------------------------------------


def test_authorization_header_redacted():
    cleaned = redact_headers({"Authorization": "Bearer secret-token-xyz"})
    assert cleaned == {"Authorization": REDACTED}


def test_cookie_header_redacted():
    cleaned = redact_headers({"Cookie": "session=abc123"})
    assert cleaned["Cookie"] == REDACTED


def test_redaction_is_case_insensitive():
    cleaned = redact_headers(
        {
            "AUTHORIZATION": "Bearer x",
            "x-api-key": "k1",
            "X-Auth-Token": "t1",
        }
    )
    assert cleaned["AUTHORIZATION"] == REDACTED
    assert cleaned["x-api-key"] == REDACTED
    assert cleaned["X-Auth-Token"] == REDACTED


def test_dpop_and_webhook_signature_headers_redacted():
    # DPoP (RFC 9449) carries a signed proof JWT; webhook signature headers
    # carry an HMAC derived from a shared secret. Neither name contains the
    # substrings the regex looks for, so they need explicit coverage.
    cleaned = redact_headers(
        {
            "DPoP": "eyJhbGciOiJFUzI1Ni.eyJodHUiOiJodHRwczovL2FwaSJ9.signaturepart",
            "X-Hub-Signature-256": "sha256=8f3a9c0deadbeef",
            "X-Hub-Signature": "sha1=0123456789abcdef",
        }
    )
    assert cleaned["DPoP"] == REDACTED
    assert cleaned["X-Hub-Signature-256"] == REDACTED
    assert cleaned["X-Hub-Signature"] == REDACTED


def test_safe_headers_preserved():
    headers = {
        "Content-Type": "application/json",
        "Accept": "*/*",
        "User-Agent": "pytest/7",
    }
    assert redact_headers(headers) == headers


def test_redaction_does_not_mutate_input():
    original = {"Authorization": "Bearer x", "Accept": "*/*"}
    snapshot = dict(original)
    redact_headers(original)
    assert original == snapshot


def test_empty_headers_return_empty_dict():
    assert redact_headers({}) == {}
    assert redact_headers(None) == {}


def test_entry_redacts_headers_at_construction():
    entry = KibanaLogEntry(
        method="GET",
        url="/api/v1/users",
        status=200,
        headers={"Authorization": "Bearer leaky", "Accept": "json"},
    )
    assert entry.headers["Authorization"] == REDACTED
    assert entry.headers["Accept"] == "json"


def test_entry_redaction_on_parser_output(tmp_path):
    payload = {
        "hits": {
            "hits": [
                {
                    "_source": {
                        "method": "POST",
                        "url": "/api/v1/login",
                        "status": 200,
                        "headers": {"Authorization": "Bearer leaked"},
                    }
                },
            ]
        }
    }
    src = tmp_path / "with_auth.json"
    src.write_text(json.dumps(payload))
    entries = KibanaLogParser(src).parse()
    assert len(entries) == 1
    assert entries[0].headers["Authorization"] == REDACTED


# ---------------------------------------------------------------------------
# Empty / missing body
# ---------------------------------------------------------------------------


def test_body_defaults_to_none():
    entry = KibanaLogEntry(method="GET", url="/", status=200)
    assert entry.body is None


def test_empty_string_body_preserved():
    entry = KibanaLogEntry(method="POST", url="/api/echo", status=200, body="")
    assert entry.body == ""


def test_empty_dict_body_preserved():
    entry = KibanaLogEntry(method="POST", url="/api/echo", status=200, body={})
    assert entry.body == {}


def test_parser_handles_entries_without_body(tmp_path):
    payload = {
        "hits": {
            "hits": [
                {"_source": {"method": "GET", "url": "/a", "status": 200}},
                {"_source": {"method": "GET", "url": "/b", "status": 200, "body": ""}},
                {
                    "_source": {
                        "method": "GET",
                        "url": "/c",
                        "status": 200,
                        "body": None,
                    }
                },
            ]
        }
    }
    src = tmp_path / "no_body.json"
    src.write_text(json.dumps(payload))
    entries = KibanaLogParser(src).parse()
    assert len(entries) == 3
    assert [e.body for e in entries] == [None, "", None]


# ---------------------------------------------------------------------------
# Server-error status codes (5xx)
# ---------------------------------------------------------------------------


@pytest.mark.parametrize("code", [500, 502, 503, 504, 599])
def test_5xx_codes_accepted(code):
    entry = KibanaLogEntry(method="GET", url="/api/health", status=code)
    assert entry.status == code


def test_parser_preserves_5xx_codes(tmp_path):
    payload = {
        "hits": {
            "hits": [
                {"_source": {"method": "GET", "url": "/x", "status": 500}},
                {"_source": {"method": "GET", "url": "/y", "status": 503}},
                {"_source": {"method": "GET", "url": "/z", "status": 504}},
            ]
        }
    }
    src = tmp_path / "errors.json"
    src.write_text(json.dumps(payload))
    entries = KibanaLogParser(src).parse()
    statuses = sorted(e.status for e in entries)
    assert statuses == [500, 503, 504]


# ---------------------------------------------------------------------------
# Hardened header redaction (regex pattern fallback)
# ---------------------------------------------------------------------------


def test_custom_token_header_caught_by_pattern():
    cleaned = redact_headers({"X-Custom-Token": "secret"})
    assert cleaned["X-Custom-Token"] == REDACTED


def test_csrf_token_redacted():
    cleaned = redact_headers({"X-CSRF-Token": "abc"})
    assert cleaned["X-CSRF-Token"] == REDACTED


def test_refresh_token_header_redacted():
    cleaned = redact_headers({"Refresh-Token": "rt-1"})
    assert cleaned["Refresh-Token"] == REDACTED


# ---------------------------------------------------------------------------
# Body redaction walker
# ---------------------------------------------------------------------------


def test_redact_body_password_field():
    assert redact_body({"username": "u", "password": "p"}) == {
        "username": "u",
        "password": REDACTED,
    }


def test_redact_body_oauth_refresh_token():
    assert redact_body({"refresh_token": "rt", "expires_in": 3600}) == {
        "refresh_token": REDACTED,
        "expires_in": 3600,
    }


def test_redact_body_nested_dict():
    cleaned = redact_body({"auth": {"bearer": "x"}, "data": {"value": 1}})
    assert cleaned["auth"] == REDACTED  # whole subtree redacted at the "auth" key
    assert cleaned["data"] == {"value": 1}


def test_redact_body_list_of_dicts():
    payload = [
        {"name": "Alice", "api_key": "k1"},
        {"name": "Bob", "secret": "s2"},
    ]
    cleaned = redact_body(payload)
    assert cleaned[0] == {"name": "Alice", "api_key": REDACTED}
    assert cleaned[1] == {"name": "Bob", "secret": REDACTED}


def test_redact_body_non_dict_passthrough():
    assert redact_body("plain string") == "plain string"
    assert redact_body(42) == 42
    assert redact_body(None) is None


def test_log_entry_body_validator_redacts():
    entry = KibanaLogEntry(
        method="POST",
        url="/login",
        status=200,
        body={"username": "u", "password": "p"},
    )
    assert entry.body == {"username": "u", "password": REDACTED}

CHANGELOG.md

CONTRIBUTING.md

README.md

REFERENCE.md

RELEASING.md

requirements.txt

SECURITY.md

SKILL.md

tessl.json

tile.json