feat: PostScript stream processing with account code injection
This commit is contained in:
parent
e938e90284
commit
ca895889f9
@ -24,5 +24,24 @@ def get_account_code(options):
|
||||
return code.zfill(8)
|
||||
|
||||
|
||||
def process_stream(lines, account_code):
|
||||
if not account_code:
|
||||
yield from lines
|
||||
return
|
||||
|
||||
inject = f"({account_code}) statusdict /setmanagementnumber get exec\n".encode()
|
||||
injected = False
|
||||
|
||||
for line in lines:
|
||||
stripped = line.rstrip(b'\r\n')
|
||||
if not injected and (stripped == b'%%EndSetup' or stripped.startswith(b'%%Page:')):
|
||||
yield inject
|
||||
injected = True
|
||||
yield line
|
||||
|
||||
if not injected:
|
||||
sys.stderr.write("kyofilter: WARNING: no injection point found in PostScript stream\n")
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
pass
|
||||
|
||||
@ -37,3 +37,45 @@ class TestGetAccountCode:
|
||||
|
||||
def test_non_numeric_code_returns_none(self):
|
||||
assert kyofilter.get_account_code({"KmManagment": "MGabc45"}) is None
|
||||
|
||||
|
||||
class TestProcessStream:
|
||||
def _stream(self, text):
|
||||
return iter(line.encode() for line in text.splitlines(keepends=True))
|
||||
|
||||
def _collect(self, text, code):
|
||||
return b"".join(kyofilter.process_stream(self._stream(text), code))
|
||||
|
||||
def test_passthrough_when_no_code(self):
|
||||
ps = "%!PS\n%%BeginSetup\n%%EndSetup\n%%Page: 1 1\n"
|
||||
assert self._collect(ps, None) == ps.encode()
|
||||
|
||||
def test_injects_before_end_setup(self):
|
||||
ps = "%!PS\n%%BeginSetup\n%%EndSetup\n%%Page: 1 1\n"
|
||||
result = self._collect(ps, "00012345")
|
||||
assert b"(00012345) statusdict /setmanagementnumber get exec\n%%EndSetup\n" in result
|
||||
|
||||
def test_end_setup_preserved_after_injection(self):
|
||||
ps = "%!PS\n%%BeginSetup\n%%EndSetup\n"
|
||||
result = self._collect(ps, "00012345")
|
||||
assert b"%%EndSetup\n" in result
|
||||
|
||||
def test_fallback_injects_before_first_page_when_no_end_setup(self):
|
||||
ps = "%!PS\n%%Page: 1 1\nshowpage\n"
|
||||
result = self._collect(ps, "00012345")
|
||||
assert b"(00012345) statusdict /setmanagementnumber get exec\n%%Page: 1 1\n" in result
|
||||
|
||||
def test_injects_only_once(self):
|
||||
ps = "%!PS\n%%BeginSetup\n%%EndSetup\n%%Page: 1 1\n%%Page: 2 1\n"
|
||||
result = self._collect(ps, "00012345")
|
||||
assert result.count(b"setmanagementnumber") == 1
|
||||
|
||||
def test_content_before_injection_point_preserved(self):
|
||||
ps = "%!PS\n%%BeginSetup\n/mydict 10 dict def\n%%EndSetup\n"
|
||||
result = self._collect(ps, "00012345")
|
||||
assert b"/mydict 10 dict def\n" in result
|
||||
|
||||
def test_content_after_injection_point_preserved(self):
|
||||
ps = "%!PS\n%%BeginSetup\n%%EndSetup\nshowpage\n"
|
||||
result = self._collect(ps, "00012345")
|
||||
assert b"showpage\n" in result
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user