diff --git a/filter/kyofilter b/filter/kyofilter index 545bf4a..5045a7c 100755 --- a/filter/kyofilter +++ b/filter/kyofilter @@ -24,5 +24,24 @@ def get_account_code(options): return code.zfill(8) +def process_stream(lines, account_code): + if not account_code: + yield from lines + return + + inject = f"({account_code}) statusdict /setmanagementnumber get exec\n".encode() + injected = False + + for line in lines: + stripped = line.rstrip(b'\r\n') + if not injected and (stripped == b'%%EndSetup' or stripped.startswith(b'%%Page:')): + yield inject + injected = True + yield line + + if not injected: + sys.stderr.write("kyofilter: WARNING: no injection point found in PostScript stream\n") + + if __name__ == '__main__': pass diff --git a/tests/test_kyofilter.py b/tests/test_kyofilter.py index df1727e..dc66f2c 100644 --- a/tests/test_kyofilter.py +++ b/tests/test_kyofilter.py @@ -37,3 +37,45 @@ class TestGetAccountCode: def test_non_numeric_code_returns_none(self): assert kyofilter.get_account_code({"KmManagment": "MGabc45"}) is None + + +class TestProcessStream: + def _stream(self, text): + return iter(line.encode() for line in text.splitlines(keepends=True)) + + def _collect(self, text, code): + return b"".join(kyofilter.process_stream(self._stream(text), code)) + + def test_passthrough_when_no_code(self): + ps = "%!PS\n%%BeginSetup\n%%EndSetup\n%%Page: 1 1\n" + assert self._collect(ps, None) == ps.encode() + + def test_injects_before_end_setup(self): + ps = "%!PS\n%%BeginSetup\n%%EndSetup\n%%Page: 1 1\n" + result = self._collect(ps, "00012345") + assert b"(00012345) statusdict /setmanagementnumber get exec\n%%EndSetup\n" in result + + def test_end_setup_preserved_after_injection(self): + ps = "%!PS\n%%BeginSetup\n%%EndSetup\n" + result = self._collect(ps, "00012345") + assert b"%%EndSetup\n" in result + + def test_fallback_injects_before_first_page_when_no_end_setup(self): + ps = "%!PS\n%%Page: 1 1\nshowpage\n" + result = self._collect(ps, "00012345") + assert b"(00012345) statusdict /setmanagementnumber get exec\n%%Page: 1 1\n" in result + + def test_injects_only_once(self): + ps = "%!PS\n%%BeginSetup\n%%EndSetup\n%%Page: 1 1\n%%Page: 2 1\n" + result = self._collect(ps, "00012345") + assert result.count(b"setmanagementnumber") == 1 + + def test_content_before_injection_point_preserved(self): + ps = "%!PS\n%%BeginSetup\n/mydict 10 dict def\n%%EndSetup\n" + result = self._collect(ps, "00012345") + assert b"/mydict 10 dict def\n" in result + + def test_content_after_injection_point_preserved(self): + ps = "%!PS\n%%BeginSetup\n%%EndSetup\nshowpage\n" + result = self._collect(ps, "00012345") + assert b"showpage\n" in result