diff --git a/player/main.s b/player/main.s index 92f6541..d1e3b4c 100644 --- a/player/main.s +++ b/player/main.s @@ -58,6 +58,7 @@ zpdummy = $08 dummy = $ffff ptr = $06 ; TODO: we only use this for connection retry count +HGRZP = $E6 ; ZP location used by HGR internals to track page to clear ; soft-switches KBD = $C000 @@ -74,7 +75,7 @@ DHIRESON = $C05E ; MONITOR SUBROUTINES HGR = $F3E2 -HGR0 = $F3EA ; internal entry point within HGR that doesn't set soft-switches +HGR0 = $F3F2 ; internal entry point within HGR that doesn't set soft-switches COUT = $FDED PRBYTE = $FDDA PRNTAX = $F941 @@ -311,30 +312,35 @@ op_header: ; Initialize (D)HGR in the CODE segment so we don't accidentally toast ourselves when ; erasing HGR _op_header_hgr: + ; Co-opt HGR internals to clear screen without displaying it. + ; nukes the startup code we placed in HGR segment + STA HIRESON + STA FULLSCR + + LDA #$20 + STA HGRZP ; ZP location used by HGR to track page to clear + JSR HGR0 + LDA WDATA ; Video mode BEQ @1 ; 0 = HGR mode - ; TODO: clear screen before displaying it to look cleaner - ; DHGR mode - STA TEXTOFF - STA HIRESON - STA DHIRESON - STA COL80ON STA STORE80ON ; Clear aux screen STA PAGE2ON ; AUX memory active ; Co-opt HGR internals to clear AUX for us. LDA #$20 + STA HGRZP JSR HGR0 - STA PAGE2OFF ; MAIN memory active + STA TEXTOFF ; now display empty (D)HGR screen. Doing this before the next instruction to make sure we don't see 80-column text garbage momentarily. + STA COL80ON + STA DHIRESON @1: - JSR HGR ; nukes the startup code we placed in HGR segment - STA FULLSCR + STA TEXTOFF ; now display empty HGR screen (NOP for DHGR since we've already done it) ; establish invariants expected by decode loop LDY #>RXBASE ; High byte of socket 0 receive buffer @@ -391,7 +397,7 @@ _op_header_hgr: ; Y register has the high byte of the W5100 address pointer in the RX socket code, so we ; can't trash this until we are ready to point back there. checkrecv: - BIT TICK ; 4 + STA TICK ; 4 LDA #3 pad cycles between tick pair; can't branch to tail STA @D+2 ; 4 @@ -999,7 +1005,7 @@ tickident page, 8 .macro op_tick_56 page ; 4+(4+4+5+4+5+4+5+4+5+4+4+4+4)+2+4+4+3 .ident (.concat ("op_tick_56_page_", .string(page))): - BIT TICK ; 4 + STA TICK ; 4 LDA WDATA ; 4 LDY WDATA ; 4 STA page << 8,Y ; 5 @@ -1014,7 +1020,7 @@ tickident page, 8 STA @D+2 ; 4 STA dummy ; 4 - BIT TICK ; 4 + STA TICK ; 4 ; used >3 pad cycles between tick pair; can't branch to tail NOP ; 2 @@ -1028,7 +1034,7 @@ tickident page, 8 .macro op_tick_58 page ; pattern repeats from op_tick_40 ;4+(4+4+5+4+5+4+5+4+5+4+4+3+3+4)+4+4+3 .ident (.concat ("op_tick_58_page_", .string(page))): - BIT TICK ; 4 + STA TICK ; 4 LDA WDATA ; 4 LDY WDATA ; 4 STA page << 8,Y ; 5 @@ -1044,7 +1050,7 @@ tickident page, 8 STA zpdummy ; 3 STA zpdummy ; 3 - BIT TICK ; 4 + STA TICK ; 4 ; used >3 pad cycles between tick pair; can't branch to tail LDA WDATA ; 4 @@ -1056,7 +1062,7 @@ tickident page, 8 .macro op_tick_60 page ; 4+(4+4+5+4+5+4+5+4+5+4+4+4+4+4)+2+4+3 .ident (.concat ("op_tick_60_page_", .string(page))): - BIT TICK ; 4 + STA TICK ; 4 LDA WDATA ; 4 LDY WDATA ; 4 STA page << 8,Y ; 5 @@ -1073,7 +1079,7 @@ tickident page, 8 LDA WDATA ; 4 STA dummy ; 4 - BIT TICK ; 4 + STA TICK ; 4 ; used >3 pad cycles between tick pair; can't branch to tail NOP ; 2 @@ -1085,7 +1091,7 @@ tickident page, 8 .macro op_tick_62 page ;4+(4+4+5+4+5+4+5+4+5+4+4+4+3+3+4)+4+3 .ident (.concat ("op_tick_62_page_", .string(page))): - BIT TICK ; 4 + STA TICK ; 4 LDA WDATA ; 4 LDY WDATA ; 4 STA page << 8,Y ; 5 @@ -1102,7 +1108,7 @@ tickident page, 8 STA zpdummy ; 3 STA zpdummy ; 3 - BIT TICK ; 4 + STA TICK ; 4 ; used >3 pad cycles between tick pair; can't branch to tail STA @D+1 ; 4 @@ -1113,7 +1119,7 @@ tickident page, 8 .macro op_tick_64 page ;4+(4+4+5+4+5+4+5+4+5+4+4+4+4+4+4)+2+3 .ident (.concat ("op_tick_64_page_", .string(page))): - BIT TICK ; 4 + STA TICK ; 4 LDA WDATA ; 4 LDY WDATA ; 4 STA page << 8,Y ; 5 @@ -1131,7 +1137,7 @@ tickident page, 8 STA @D+1 ; 4 STA dummy ; 4 - BIT TICK ; 4 + STA TICK ; 4 NOP ; 2 @D: @@ -1141,7 +1147,7 @@ tickident page, 8 .macro op_tick_66 page ; pattern repeats from op_tick_8 ; 4+(4+4+5+4+5+4+5+4+5+4+4+4+3+4+3+4)+3 .ident (.concat ("op_tick_66_page_", .string(page))): - BIT TICK ; 4 + STA TICK ; 4 LDA WDATA ; 4 LDY WDATA ; 4 STA page << 8,Y ; 5 @@ -1160,7 +1166,7 @@ tickident page, 8 STA zpdummy ; 3 STA zpdummy ; 3 - BIT TICK ; 4 + STA TICK ; 4 @D: JMP op_nop ; 3 @@ -1278,6 +1284,7 @@ op_terminate: LDA KBD BPL @0 @1: ; key pressed + LDA KBDSTRB ; clear strobe JMP exit ; Manage W5100 socket buffer and ACK TCP stream. @@ -1286,7 +1293,7 @@ op_terminate: ; the last 4 bytes in a 2K "TCP frame". i.e. we can assume that we need to consume ; exactly 2K from the W5100 socket buffer. op_ack: - BIT TICK ; 4 + STA TICK ; 4 ; allow flip-flopping the PAGE1/PAGE2 soft switches to steer writes to MAIN/AUX screens ; actually this allows touching any $C0XX soft-switch, in case that is useful somehow @@ -1307,7 +1314,7 @@ op_ack: LDX # LSB), which is opposite to screen + order (LSB -> MSB is ordered left-to-right on the screen) + + Note that these are right-rotated from the HGR mapping, because of a + 1-tick phase difference in the colour reference signal for DHGR vs HGR + """ + BLACK = 0b0000 + MAGENTA = 0b0001 + BROWN = 0b1000 + ORANGE = 0b1001 # HGR colour + DARK_GREEN = 0b0100 + GREY1 = 0b0101 + GREEN = 0b1100 # HGR colour + YELLOW = 0b1101 + DARK_BLUE = 0b0010 + VIOLET = 0b0011 # HGR colour + GREY2 = 0b1010 + PINK = 0b1011 + MED_BLUE = 0b0110 # HGR colour + LIGHT_BLUE = 0b0111 + AQUA = 0b1110 + WHITE = 0b1111 + + +class DHGRColours(NominalColours): + """Map from 4-bit dot representation to DHGR pixel colours. + + Dots are in memory bit order (MSB -> LSB), which is opposite to screen + order (LSB -> MSB is ordered left-to-right on the screen) + + Note that these are right-rotated from the HGR mapping, because of a + 1-tick phase difference in the colour reference signal for DHGR vs HGR + """ + + # representation. + BLACK = 0b0000 + MAGENTA = 0b1000 + BROWN = 0b0100 + ORANGE = 0b1100 # HGR colour + DARK_GREEN = 0b0010 + GREY1 = 0b1010 + GREEN = 0b0110 # HGR colour + YELLOW = 0b1110 + DARK_BLUE = 0b0001 + VIOLET = 0b1001 # HGR colour + GREY2 = 0b0101 + PINK = 0b1101 + MED_BLUE = 0b0011 # HGR colour + LIGHT_BLUE = 0b1011 + AQUA = 0b0111 + WHITE = 0b1111 + + +def ror(int4: int, howmany: int) -> int: + """Rotate-right an int4 some number of times.""" + res = int4 + for _ in range(howmany): + res = _ror(res) + + return res + + +def _ror(int4: int) -> int: + return ((int4 & 0b1110) >> 1) ^ ((int4 & 0b0001) << 3) + + +def rol(int4: int, howmany: int) -> int: + """Rotate-left an int4 some number of times.""" + res = int4 + for _ in range(howmany): + res = _rol(res) + + return res + + +def _rol(int4: int) -> int: + return ((int4 & 0b0111) << 1) ^ ((int4 & 0b1000) >> 3) + + +@functools.lru_cache(None) +def dots_to_nominal_colour_pixels( + num_bits: int, + dots: int, + colours: Type[NominalColours], + init_phase: int = 1 # Such that phase = 0 at start of body +) -> Tuple[NominalColours]: + """Sequence of num_bits nominal colour pixels via sliding 4-bit window. + + Includes the 3-bit header that represents the trailing 3 bits of the + previous tuple body. e.g. for DHGR, storing a byte in aux even columns + will also influence the colours of the previous main odd column. + + This naively models (approximates) the NTSC colour artifacting. + + TODO: Use a more careful analogue colour composition model to produce + effective pixel colours. + + TODO: DHGR vs HGR colour differences can be modeled by changing init_phase + """ + res = [] + + shifted = dots + phase = init_phase + + for i in range(num_bits): + colour = rol(shifted & 0b1111, phase) + res.append(colours(colour)) + + shifted >>= 1 + phase += 1 + if phase == 4: + phase = 0 + + return tuple(res) + + +@functools.lru_cache(None) +def dots_to_nominal_colour_pixel_values( + num_bits: int, + dots: int, + colours: Type[NominalColours], + init_phase: int = 1 # Such that phase = 0 at start of body +) -> Tuple[int]: + """"Sequence of num_bits nominal colour values via sliding 4-bit window.""" + + return tuple(p.value for p in dots_to_nominal_colour_pixels( + num_bits, dots, colours, init_phase + )) + diff --git a/transcoder/colours_test.py b/transcoder/colours_test.py new file mode 100644 index 0000000..887703c --- /dev/null +++ b/transcoder/colours_test.py @@ -0,0 +1,113 @@ +import unittest + +import colours + +HGRColours = colours.HGRColours + + +class TestColours(unittest.TestCase): + + def test_dots_to_pixels(self): + self.assertEqual( + ( + HGRColours.BLACK, + HGRColours.BLACK, + HGRColours.BLACK, + HGRColours.BLACK, + HGRColours.BLACK, + HGRColours.BLACK, + HGRColours.DARK_BLUE, + HGRColours.MED_BLUE, + HGRColours.AQUA, + HGRColours.AQUA, + HGRColours.GREEN, + HGRColours.BROWN, + HGRColours.BLACK, + HGRColours.BLACK, + HGRColours.BLACK, + HGRColours.BLACK, + HGRColours.BLACK, + HGRColours.BLACK, + HGRColours.BLACK, + HGRColours.BLACK, + HGRColours.BLACK, + HGRColours.BLACK, + HGRColours.BLACK, + HGRColours.BLACK, + HGRColours.BLACK, + HGRColours.BLACK, + HGRColours.BLACK, + HGRColours.BLACK, + HGRColours.BLACK, + HGRColours.BLACK, + HGRColours.BLACK + ), + colours.dots_to_nominal_colour_pixels( + 31, 0b00000000000000000000111000000000, HGRColours, init_phase=0 + ) + ) + + self.assertEqual( + ( + HGRColours.BLACK, + HGRColours.MAGENTA, + HGRColours.VIOLET, + HGRColours.LIGHT_BLUE, + HGRColours.WHITE, + HGRColours.AQUA, + HGRColours.GREEN, + HGRColours.BROWN, + HGRColours.BLACK, + HGRColours.MAGENTA, + HGRColours.VIOLET, + HGRColours.LIGHT_BLUE, + HGRColours.WHITE, + HGRColours.AQUA, + HGRColours.GREEN, + HGRColours.BROWN, + HGRColours.BLACK, + HGRColours.MAGENTA, + HGRColours.VIOLET, + HGRColours.LIGHT_BLUE, + HGRColours.WHITE, + HGRColours.AQUA, + HGRColours.GREEN, + HGRColours.BROWN, + HGRColours.BLACK, + HGRColours.BLACK, + HGRColours.BLACK, + HGRColours.BLACK, + HGRColours.BLACK, + HGRColours.BLACK, + HGRColours.BLACK + ), + colours.dots_to_nominal_colour_pixels( + 31, 0b0000111100001111000011110000, HGRColours, init_phase=0 + ) + ) + + +class TestRolRoR(unittest.TestCase): + def testRolOne(self): + self.assertEqual(0b1111, colours.rol(0b1111, 1)) + self.assertEqual(0b0001, colours.rol(0b1000, 1)) + self.assertEqual(0b1010, colours.rol(0b0101, 1)) + + def testRolMany(self): + self.assertEqual(0b1111, colours.rol(0b1111, 3)) + self.assertEqual(0b0010, colours.rol(0b1000, 2)) + self.assertEqual(0b0101, colours.rol(0b0101, 2)) + + def testRorOne(self): + self.assertEqual(0b1111, colours.ror(0b1111, 1)) + self.assertEqual(0b1000, colours.ror(0b0001, 1)) + self.assertEqual(0b0101, colours.ror(0b1010, 1)) + + def testRoRMany(self): + self.assertEqual(0b1111, colours.ror(0b1111, 3)) + self.assertEqual(0b1000, colours.ror(0b0010, 2)) + self.assertEqual(0b0101, colours.ror(0b0101, 2)) + + +if __name__ == "__main__": + unittest.main() diff --git a/transcoder/data/.gitattributes b/transcoder/data/.gitattributes new file mode 100644 index 0000000..7e1ef73 --- /dev/null +++ b/transcoder/data/.gitattributes @@ -0,0 +1 @@ +*.bz2 filter=lfs diff=lfs merge=lfs -text diff --git a/transcoder/data/DHGR_palette_0_edit_distance.pickle.bz2 b/transcoder/data/DHGR_palette_0_edit_distance.pickle.bz2 new file mode 100644 index 0000000..e669108 --- /dev/null +++ b/transcoder/data/DHGR_palette_0_edit_distance.pickle.bz2 @@ -0,0 +1,3 @@ +version https://git-lfs.github.com/spec/v1 +oid sha256:b47eadfdf8c8e16c6539f9a16ed0b5a393b17e0cbd03831aacda7f659e9522d6 +size 120830327 diff --git a/transcoder/data/DHGR_palette_5_edit_distance.pickle.bz2 b/transcoder/data/DHGR_palette_5_edit_distance.pickle.bz2 new file mode 100644 index 0000000..b06628c --- /dev/null +++ b/transcoder/data/DHGR_palette_5_edit_distance.pickle.bz2 @@ -0,0 +1,3 @@ +version https://git-lfs.github.com/spec/v1 +oid sha256:8c245981f91ffa89b47abdd1c9d646c2e79499a0c82c38c91234be0a59e52f1f +size 118832545 diff --git a/transcoder/data/HGR_palette_0_edit_distance.pickle.bz2 b/transcoder/data/HGR_palette_0_edit_distance.pickle.bz2 new file mode 100644 index 0000000..ed26b42 --- /dev/null +++ b/transcoder/data/HGR_palette_0_edit_distance.pickle.bz2 @@ -0,0 +1,3 @@ +version https://git-lfs.github.com/spec/v1 +oid sha256:3fd52feb08eb6f99b267a1050c68905f25d0d106ad7c2c63473cc0a0f6aa1b25 +size 224334626 diff --git a/transcoder/data/HGR_palette_5_edit_distance.pickle.bz2 b/transcoder/data/HGR_palette_5_edit_distance.pickle.bz2 new file mode 100644 index 0000000..9ca1922 --- /dev/null +++ b/transcoder/data/HGR_palette_5_edit_distance.pickle.bz2 @@ -0,0 +1,3 @@ +version https://git-lfs.github.com/spec/v1 +oid sha256:dbf83e3d0b6c7867ccf7ae1d55a6ed4e906409b08043dec514e1104cec95f0fc +size 220565577 diff --git a/transcoder/edit_distance.py b/transcoder/edit_distance.py deleted file mode 100644 index b5ff7d8..0000000 --- a/transcoder/edit_distance.py +++ /dev/null @@ -1,310 +0,0 @@ -"""Computes visual differences between screen image data. - -This is the core of the video encoding, for three reasons: - -- The edit distance between old and new frames is used to prioritize which - screen bytes to send - -- When deciding which other offset bytes to send along with a chosen screen -byte, we minimize the error introduced by sending this (probably non-optimal) -byte instead of the actual target screen byte. This needs to account for the -colour artifacts introduced by this byte as well as weighting perceived -errors introduced (e.g. long runs of colour) - -- The byte_screen_error_distance function is on the critical path of the encoding. -""" - -import functools - -import numpy as np -import weighted_levenshtein - - -@functools.lru_cache(None) -def byte_to_nominal_colour_string(b: int, is_odd_offset: bool) -> str: - """Compute nominal pixel colours for a byte. - - This ignores any fringing/colour combining effects, as well as - half-ignoring what happens to the colour pixel that crosses the byte - boundary. - - A better implementation of this might be to consider neighbouring (even, - odd) column bytes together since this will allow correctly colouring the - split pixel in the middle. - - There are also even weirder colour artifacts that happen when - neighbouring bytes have mismatched colour palettes, which also cross the - odd/even boundary. But these may not be worth worrying about. - - :param b: byte to encode - :param is_odd_offset: whether byte is at an odd screen column - :return: string encoding nominal colour of pixels in the byte, with "0" - or "1" for the "hanging" bit that spans the neighbouring byte. - """ - pixels = [] - - idx = 0 - if is_odd_offset: - pixels.append("01"[b & 0x01]) - idx += 1 - - # K = black - # G = green - # V = violet - # W = white - palettes = ( - ( - "K", # 0x00 - "V", # 0x01 - "G", # 0x10 - "W" # 0x11 - ), ( - "K", # 0x00 - "B", # 0x01 - "O", # 0x10 - "W" # 0x11 - ) - ) - palette = palettes[(b & 0x80) != 0] - - for _ in range(3): - pixel = palette[(b >> idx) & 0b11] - pixels.append(pixel) - idx += 2 - - if not is_odd_offset: - pixels.append("01"[(b & 0x40) != 0]) - idx += 1 - - return "".join(pixels) - - -@functools.lru_cache(None) -def byte_to_colour_string_with_white_coalescing( - b: int, is_odd_offset: bool) -> str: - """Model the combining of neighbouring 1 bits to produce white. - - The output is a string of length 7 representing the 7 display dots that now - have colour. - - Attempt to model the colour artifacting that consecutive runs of - 1 bits are coerced to white. This isn't quite correct since: - - a) it doesn't operate across byte boundaries (see note on - byte_to_nominal_colour_string) - - b) a sequence like WVV appears more like WWWVVV or WWVVVV rather than WWWKVV - (at least on the //gs) - - It also ignores other colour fringing e.g. from NTSC artifacts. - - TODO: this needs more work. - - :param b: - :param is_odd_offset: - :return: - """ - - pixels = [] - - fringing = { - "1V": "WWK", # 110 - "1W": "WWW", # 111 - - "1B": "WWB", # 110 - - "WV": "WWWK", # 1110 - "WB": "WWWK", # 1110 - - "GV": "KWWK", # 0110 - "OB": "KWWK", # 0110 - - "GW": "KWWW", # 0111 - "OW": "KWWW", # 0111 - - "W1": "WWW", # 111 - "G1": "KWW", # 011 - "O1": "KWW", # 011 - } - - nominal = byte_to_nominal_colour_string(b, is_odd_offset) - for idx in range(3): - pair = nominal[idx:idx + 2] - effective = fringing.get(pair) - if not effective: - e = [] - if pair[0] in {"0", "1"}: - e.append(pair[0]) - else: - e.extend([pair[0], pair[0]]) - if pair[1] in {"0", "1"}: - e.append(pair[1]) - else: - e.extend([pair[1], pair[1]]) - effective = "".join(e) - - if pixels: - pixels.append(effective[2:]) - else: - pixels.append(effective) - - return "".join(pixels) - - -substitute_costs = np.ones((128, 128), dtype=np.float64) - -# Substitution costs to use when evaluating other potential offsets at which -# to store a content byte. We penalize more harshly for introducing -# errors that alter pixel colours, since these tend to be very -# noticeable as visual noise. -error_substitute_costs = np.ones((128, 128), dtype=np.float64) - -# Penalty for turning on/off a black bit -for c in "01GVWOB": - substitute_costs[(ord('K'), ord(c))] = 1 - substitute_costs[(ord(c), ord('K'))] = 1 - error_substitute_costs[(ord('K'), ord(c))] = 5 - error_substitute_costs[(ord(c), ord('K'))] = 5 - -# Penalty for changing colour -for c in "01GVWOB": - for d in "01GVWOB": - substitute_costs[(ord(c), ord(d))] = 1 - substitute_costs[(ord(d), ord(c))] = 1 - error_substitute_costs[(ord(c), ord(d))] = 5 - error_substitute_costs[(ord(d), ord(c))] = 5 - -insert_costs = np.ones(128, dtype=np.float64) * 1000 -delete_costs = np.ones(128, dtype=np.float64) * 1000 - - -def _edit_weight(a: int, b: int, is_odd_offset: bool, error: bool): - """ - - :param a: - :param b: - :param is_odd_offset: - :param error: - :return: - """ - a_pixels = byte_to_colour_string_with_white_coalescing(a, is_odd_offset) - b_pixels = byte_to_colour_string_with_white_coalescing(b, is_odd_offset) - - dist = weighted_levenshtein.dam_lev( - a_pixels, b_pixels, - insert_costs=insert_costs, - delete_costs=delete_costs, - substitute_costs=error_substitute_costs if error else substitute_costs, - ) - return np.int64(dist) - - -@functools.lru_cache(None) -def _edit_weight_matrices(error: bool) -> np.array: - """ - - :param error: - :return: - """ - ewm = np.zeros(shape=(256, 256, 2), dtype=np.int64) - for a in range(256): - for b in range(256): - for is_odd_offset in (False, True): - ewm[a, b, int(is_odd_offset)] = _edit_weight( - a, b, is_odd_offset, error) - - return ewm - - -@functools.lru_cache(None) -def edit_weight(a: int, b: int, is_odd_offset: bool, error: bool): - """ - - :param a: first content value - :param b: second content value - :param is_odd_offset: whether this content byte is at an odd screen - byte offset - :param error: whether to compute error distance or edit distance - :return: the corresponding distance value - """ - return _edit_weight_matrices(error)[a, b, int(is_odd_offset)] - - -_even_ewm = {} -_odd_ewm = {} -_even_error_ewm = {} -_odd_error_ewm = {} -for a in range(256): - for b in range(256): - _even_ewm[(a << 8) + b] = edit_weight(a, b, False, False) - _odd_ewm[(a << 8) + b] = edit_weight(a, b, True, False) - - _even_error_ewm[(a << 8) + b] = edit_weight(a, b, False, True) - _odd_error_ewm[(a << 8) + b] = edit_weight(a, b, True, True) - - -@functools.lru_cache(None) -def _constant_array(content: int, shape) -> np.array: - """ - - :param content: - :param shape: - :return: - """ - return np.ones(shape, dtype=np.uint16) * content - - -def byte_screen_error_distance(content: int, b: np.array) -> np.array: - """ - - :param content: byte for which to compute error distance - :param b: np.array of size (32, 256) representing existing screen memory. - :return: np.array of size (32, 256) representing error distance from - content byte to each byte of b - """ - assert b.shape == (32, 256), b.shape - - # Extract even and off column offsets (128,) - even_b = b[:, ::2] - odd_b = b[:, 1::2] - - a = _constant_array(content << 8, even_b.shape) - - even = a + even_b - odd = a + odd_b - - even_weights = np.vectorize(_even_error_ewm.__getitem__)(even) - odd_weights = np.vectorize(_odd_error_ewm.__getitem__)(odd) - - res = np.ndarray(shape=b.shape, dtype=np.int64) - res[:, ::2] = even_weights - res[:, 1::2] = odd_weights - - return res - - -def screen_edit_distance(a: np.array, b: np.array) -> np.array: - """ - - :param a: - :param b: - :return: - """ - # Extract even and off column offsets (32, 128) - even_a = a[:, ::2] - odd_a = a[:, 1::2] - - even_b = b[:, ::2] - odd_b = b[:, 1::2] - - even = (even_a.astype(np.uint16) << 8) + even_b - odd = (odd_a.astype(np.uint16) << 8) + odd_b - - even_weights = np.vectorize(_even_ewm.__getitem__)(even) - odd_weights = np.vectorize(_odd_ewm.__getitem__)(odd) - - res = np.ndarray(shape=a.shape, dtype=np.int64) - res[:, ::2] = even_weights - res[:, 1::2] = odd_weights - - return res diff --git a/transcoder/edit_distance_test.py b/transcoder/edit_distance_test.py deleted file mode 100644 index c9e432a..0000000 --- a/transcoder/edit_distance_test.py +++ /dev/null @@ -1,88 +0,0 @@ -"""Tests for the edit_distance module.""" - -import unittest - -import edit_distance - - -class TestByteToNominalColourString(unittest.TestCase): - def testEncoding(self): - self.assertEqual( - "KKK0", - edit_distance.byte_to_nominal_colour_string( - 0, is_odd_offset=False)) - self.assertEqual( - "0KKK", - edit_distance.byte_to_nominal_colour_string( - 0, is_odd_offset=True)) - - self.assertEqual( - "WWW1", edit_distance.byte_to_nominal_colour_string( - 0xff, is_odd_offset=False)) - self.assertEqual( - "1WWW", edit_distance.byte_to_nominal_colour_string( - 0xff, is_odd_offset=True)) - - self.assertEqual( - "GGG0", edit_distance.byte_to_nominal_colour_string( - 0x2a, is_odd_offset=False)) - self.assertEqual( - "1GGG", edit_distance.byte_to_nominal_colour_string( - 0x55, is_odd_offset=True)) - - self.assertEqual( - "OOO0", edit_distance.byte_to_nominal_colour_string( - 0xaa, is_odd_offset=False)) - self.assertEqual( - "1OOO", edit_distance.byte_to_nominal_colour_string( - 0xd5, is_odd_offset=True)) - - -class TestEditWeight(unittest.TestCase): - def testTransposition(self): - self.assertEqual("WKK0", edit_distance.byte_to_nominal_colour_string( - 0b00000011, is_odd_offset=False)) - self.assertEqual("KWK0", edit_distance.byte_to_nominal_colour_string( - 0b00001100, is_odd_offset=False)) - self.assertEqual( - 1, edit_distance.edit_weight(0b00000011, 0b00001100, - is_odd_offset=False) - ) - - self.assertEqual("OWK1", edit_distance.byte_to_nominal_colour_string( - 0b11001110, is_odd_offset=False)) - self.assertEqual("OKW1", edit_distance.byte_to_nominal_colour_string( - 0b11110010, is_odd_offset=False)) - self.assertEqual( - 1, edit_distance.edit_weight( - 0b11001110, 0b11110010, is_odd_offset=False) - ) - - def testSubstitution(self): - # Black has cost 5 - self.assertEqual("WKK0", edit_distance.byte_to_nominal_colour_string( - 0b00000011, is_odd_offset=False)) - self.assertEqual("KKK0", edit_distance.byte_to_nominal_colour_string( - 0b00000000, is_odd_offset=False)) - self.assertEqual( - 5, edit_distance.edit_weight( - 0b00000011, 0b00000000, is_odd_offset=False) - ) - self.assertEqual( - 5, edit_distance.edit_weight( - 0b00000000, 0b00000011, is_odd_offset=False) - ) - - # Other colour has cost 1 - self.assertEqual( - 1, edit_distance.edit_weight( - 0b00000010, 0b00000011, is_odd_offset=False) - ) - self.assertEqual( - 1, edit_distance.edit_weight( - 0b00000011, 0b00000010, is_odd_offset=False) - ) - - -if __name__ == '__main__': - unittest.main() diff --git a/transcoder/frame_grabber.py b/transcoder/frame_grabber.py new file mode 100644 index 0000000..2940f1d --- /dev/null +++ b/transcoder/frame_grabber.py @@ -0,0 +1,147 @@ +"""Extracts sequence of still images from input video stream.""" + +import os +import queue +import subprocess +import threading +from typing import Iterator + +import numpy as np +import skvideo.io +from PIL import Image + +import screen +from palette import Palette +from video_mode import VideoMode + + +class FrameGrabber: + def __init__(self, mode: VideoMode): + self.video_mode = mode + self.input_frame_rate = 30 + + def frames(self) -> Iterator[screen.MemoryMap]: + raise NotImplementedError + + +class FileFrameGrabber(FrameGrabber): + def __init__(self, filename, mode: VideoMode, palette: Palette): + super(FileFrameGrabber, self).__init__(mode) + + self.filename = filename # type: str + self.palette = palette # type: Palette + self._reader = skvideo.io.FFmpegReader(filename) + + # Compute frame rate from input video + # TODO: possible to compute time offset for each frame instead? + data = skvideo.io.ffprobe(self.filename)['video'] + rate_data = data['@r_frame_rate'].split("/") # e.g. 12000/1001 + self.input_frame_rate = float( + rate_data[0]) / float(rate_data[1]) # type: float + + def _frame_grabber(self) -> Iterator[Image.Image]: + for frame_array in self._reader.nextFrame(): + yield Image.fromarray(frame_array) + + @staticmethod + def _output_dir(filename, video_mode, palette) -> str: + return "%s/%s/%s" % ( + ".".join(filename.split(".")[:-1]), + video_mode.name, + palette.name) + + def _palette_arg(self) -> str: + return "P%d" % self.palette.value + + def frames(self) -> Iterator[screen.MemoryMap]: + """Encode frame to (D)HGR using bmp2dhr. + + We do the encoding in a background thread to parallelize. + """ + + frame_dir = self._output_dir( + self.filename, self.video_mode, self.palette) + os.makedirs(frame_dir, exist_ok=True) + + q = queue.Queue(maxsize=10) + + def _hgr_decode(_idx, _frame): + outfile = "%s/%08dC.BIN" % (frame_dir, _idx) + bmpfile = "%s/%08d.bmp" % (frame_dir, _idx) + + try: + os.stat(outfile) + except FileNotFoundError: + _frame = _frame.resize((280, 192), resample=Image.LANCZOS) + _frame.save(bmpfile) + + subprocess.call([ + "/usr/local/bin/bmp2dhr", bmpfile, "hgr", + self._palette_arg(), + "D9" # Buckels dither + ]) + + os.remove(bmpfile) + + _main = np.fromfile(outfile, dtype=np.uint8) + + return _main, None + + def _dhgr_decode(_idx, _frame): + mainfile = "%s/%08d.BIN" % (frame_dir, _idx) + auxfile = "%s/%08d.AUX" % (frame_dir, _idx) + + bmpfile = "%s/%08d.bmp" % (frame_dir, _idx) + + try: + os.stat(mainfile) + os.stat(auxfile) + except FileNotFoundError: + _frame = _frame.resize((280, 192), resample=Image.LANCZOS) + _frame.save(bmpfile) + + subprocess.call([ + "/usr/local/bin/bmp2dhr", bmpfile, "dhgr", # "v", + self._palette_arg(), + "A", # Output separate .BIN and .AUX files + "D9" # Buckels dither + ]) + + os.remove(bmpfile) + + _main = np.fromfile(mainfile, dtype=np.uint8) + _aux = np.fromfile(auxfile, dtype=np.uint8) + + return _main, _aux + + def worker(): + """Invoke bmp2dhr to encode input image frames and push to queue.""" + + decode = ( + _dhgr_decode if self.video_mode == VideoMode.DHGR else + _hgr_decode + ) + for _idx, _frame in enumerate(self._frame_grabber()): + q.put(decode(_idx, _frame)) + + q.put((None, None)) + + t = threading.Thread(target=worker, daemon=True) + t.start() + + while True: + main, aux = q.get() + if main is None: + break + + main_map = screen.FlatMemoryMap( + screen_page=1, data=main).to_memory_map() + if aux is None: + aux_map = None + else: + aux_map = screen.FlatMemoryMap( + screen_page=1, data=aux).to_memory_map() + yield (main_map, aux_map) + q.task_done() + + t.join() diff --git a/transcoder/frame_grabber_test.py b/transcoder/frame_grabber_test.py new file mode 100644 index 0000000..aee23ec --- /dev/null +++ b/transcoder/frame_grabber_test.py @@ -0,0 +1,37 @@ +import unittest + +import frame_grabber +import palette +import video_mode + + +class TestFileFrameGrabber(unittest.TestCase): + def test_output_dir(self): + self.assertEqual( + "/foo/bar/DHGR/NTSC", + frame_grabber.FileFrameGrabber._output_dir( + "/foo/bar.mp4", video_mode.VideoMode.DHGR, palette.Palette.NTSC + ) + ) + + self.assertEqual( + "/foo/bar.blee/HGR/IIGS", + frame_grabber.FileFrameGrabber._output_dir( + "/foo/bar.blee.mp4", + video_mode.VideoMode.HGR, + palette.Palette.IIGS + ) + ) + + self.assertEqual( + "/foo/bar blee/DHGR/IIGS", + frame_grabber.FileFrameGrabber._output_dir( + "/foo/bar blee.mp4", + video_mode.VideoMode.DHGR, + palette.Palette.IIGS + ) + ) + + +if __name__ == '__main__': + unittest.main() diff --git a/transcoder/machine.py b/transcoder/machine.py index 6826530..c07c2e9 100644 --- a/transcoder/machine.py +++ b/transcoder/machine.py @@ -2,25 +2,12 @@ from typing import Iterator -import numpy as np - -import screen +# TODO: screen memory changes should happen via Machine while emitting opcodes? class Machine: """Represents Apple II and player virtual machine state.""" - def __init__( - self, - memmap: screen.MemoryMap, - update_priority: np.array - ): - self.page = 0x20 # type: int - self.content = 0x7f # type: int - - self.memmap = memmap # type: screen.MemoryMap - self.update_priority = update_priority # type: np.array - def emit(self, opcode: "Opcode") -> Iterator[int]: """ diff --git a/transcoder/main.py b/transcoder/main.py index 2579ffd..b816322 100644 --- a/transcoder/main.py +++ b/transcoder/main.py @@ -1,12 +1,13 @@ -"""Transcodes an input video file to ][Vision format.""" +"""Transcodes an input video file to ][-Vision format.""" import argparse import movie -import video +import palette +import video_mode parser = argparse.ArgumentParser( - description='Transcode videos to ][Vision format.') + description='Transcode videos to ][-Vision format.') parser.add_argument( 'input', help='Path to input video file.') parser.add_argument( @@ -25,9 +26,15 @@ parser.add_argument( 'frame rate, which may give better quality for some videos.' ) parser.add_argument( - '--video_mode', type=str, choices=video.Mode.__members__.keys(), + '--video_mode', type=str, choices=video_mode.VideoMode.__members__.keys(), + default=video_mode.VideoMode.DHGR.name, help='Video display mode to encode for (HGR/DHGR)' ) +parser.add_argument( + '--palette', type=str, choices=palette.Palette.__members__.keys(), + default=palette.Palette.NTSC.name, + help='Video palette to encode for (default=NTSC)' +) def main(args): @@ -37,10 +44,13 @@ def main(args): every_n_video_frames=args.every_n_video_frames, audio_normalization=args.audio_normalization, max_bytes_out=1024. * 1024 * args.max_output_mb, - video_mode=video.Mode[args.video_mode] + video_mode=video_mode.VideoMode[args.video_mode], + palette=palette.Palette[args.palette], ) - print("Input frame rate = %f" % m.video.input_frame_rate) + print("Palette %s" % args.palette) + + print("Input frame rate = %f" % m.frame_grabber.input_frame_rate) if args.output: out_filename = args.output diff --git a/transcoder/make_data_tables.py b/transcoder/make_data_tables.py new file mode 100644 index 0000000..8aa78a8 --- /dev/null +++ b/transcoder/make_data_tables.py @@ -0,0 +1,204 @@ +import bz2 +import functools +import pickle +import sys +from typing import Iterable, Type + +import colormath.color_conversions +import colormath.color_diff +import colormath.color_objects +import numpy as np +import weighted_levenshtein +from etaprogress.progress import ProgressBar + +import colours +import palette +import screen + + +PIXEL_CHARS = "0123456789ABCDEF" + + +def pixel_char(i: int) -> str: + return PIXEL_CHARS[i] + + +@functools.lru_cache(None) +def pixel_string(pixels: Iterable[int]) -> str: + return "".join(pixel_char(p) for p in pixels) + + +class EditDistanceParams: + """Data class for parameters to Damerau-Levenshtein edit distance.""" + + # Don't even consider insertions and deletions into the string, they don't + # make sense for comparing pixel strings + insert_costs = np.ones(128, dtype=np.float64) * 100000 + delete_costs = np.ones(128, dtype=np.float64) * 100000 + + # Smallest substitution value is ~20 from palette.diff_matrices, i.e. + # we always prefer to transpose 2 pixels rather than substituting colours. + # TODO: is quality really better allowing transposes? + transpose_costs = np.ones((128, 128), dtype=np.float64) * 100000 # 10 + + # These will be filled in later + substitute_costs = np.zeros((128, 128), dtype=np.float64) + + # Substitution costs to use when evaluating other potential offsets at which + # to store a content byte. We penalize more harshly for introducing + # errors that alter pixel colours, since these tend to be very + # noticeable as visual noise. + # + # TODO: currently unused + error_substitute_costs = np.zeros((128, 128), dtype=np.float64) + + +def compute_diff_matrix(pal: Type[palette.BasePalette]): + """Compute matrix of perceptual distance between colour pairs. + + Specifically CIE2000 delta values for this palette. + """ + dm = np.ndarray(shape=(16, 16), dtype=np.int) + + for colour1, a in pal.RGB.items(): + alab = colormath.color_conversions.convert_color( + a, colormath.color_objects.LabColor) + for colour2, b in pal.RGB.items(): + blab = colormath.color_conversions.convert_color( + b, colormath.color_objects.LabColor) + dm[colour1.value, colour2.value] = int( + colormath.color_diff.delta_e_cie2000(alab, blab)) + return dm + + +def compute_substitute_costs(pal: Type[palette.BasePalette]): + """Compute costs for substituting one colour pixel for another.""" + + edp = EditDistanceParams() + + diff_matrix = compute_diff_matrix(pal) + + # Penalty for changing colour + for i, c in enumerate(PIXEL_CHARS): + for j, d in enumerate(PIXEL_CHARS): + cost = diff_matrix[i, j] + edp.substitute_costs[(ord(c), ord(d))] = cost + edp.substitute_costs[(ord(d), ord(c))] = cost + edp.error_substitute_costs[(ord(c), ord(d))] = 5 * cost + edp.error_substitute_costs[(ord(d), ord(c))] = 5 * cost + + return edp + + +def edit_distance( + edp: EditDistanceParams, + a: str, + b: str, + error: bool) -> np.float64: + """Damerau-Levenshtein edit distance between two pixel strings.""" + res = weighted_levenshtein.dam_lev( + a, b, + insert_costs=edp.insert_costs, + delete_costs=edp.delete_costs, + substitute_costs=( + edp.error_substitute_costs if error else edp.substitute_costs), + ) + + # Make sure result can fit in a uint16 + assert (0 <= res < 2 ** 16), res + return res + + +def compute_edit_distance( + edp: EditDistanceParams, + bitmap_cls: Type[screen.Bitmap], + nominal_colours: Type[colours.NominalColours] +): + """Computes edit distance matrix between all pairs of pixel strings. + + Enumerates all possible values of the masked bit representation from + bitmap_cls (assuming it is contiguous, i.e. we enumerate all + 2**bitmap_cls.MASKED_BITS values). These are mapped to the dot + representation, turned into coloured pixel strings, and we compute the + edit distance. + + The effect of this is that we precompute the effect of storing all possible + byte values against all possible screen backgrounds (e.g. as + influencing/influenced by neighbouring bytes). + """ + + bits = bitmap_cls.MASKED_BITS + + bitrange = np.uint64(2 ** bits) + + edit = [] + for _ in range(len(bitmap_cls.BYTE_MASKS)): + edit.append( + np.zeros(shape=np.uint64(bitrange * bitrange), dtype=np.uint16)) + + # Matrix is symmetrical with zero diagonal so only need to compute upper + # triangle + bar = ProgressBar((bitrange * (bitrange - 1)) / 2, max_width=80) + + num_dots = bitmap_cls.MASKED_DOTS + + cnt = 0 + for i in range(np.uint64(bitrange)): + for j in range(i): + cnt += 1 + + if cnt % 10000 == 0: + bar.numerator = cnt + print(bar, end='\r') + sys.stdout.flush() + + pair = (np.uint64(i) << bits) + np.uint64(j) + + for o, ph in enumerate(bitmap_cls.PHASES): + first_dots = bitmap_cls.to_dots(i, byte_offset=o) + second_dots = bitmap_cls.to_dots(j, byte_offset=o) + + first_pixels = pixel_string( + colours.dots_to_nominal_colour_pixel_values( + num_dots, first_dots, nominal_colours, + init_phase=ph) + ) + second_pixels = pixel_string( + colours.dots_to_nominal_colour_pixel_values( + num_dots, second_dots, nominal_colours, + init_phase=ph) + ) + edit[o][pair] = edit_distance( + edp, first_pixels, second_pixels, error=False) + + return edit + + +def make_edit_distance( + pal: Type[palette.BasePalette], + edp: EditDistanceParams, + bitmap_cls: Type[screen.Bitmap], + nominal_colours: Type[colours.NominalColours] +): + """Write file containing (D)HGR edit distance matrix for a palette.""" + + dist = compute_edit_distance(edp, bitmap_cls, nominal_colours) + data = "transcoder/data/%s_palette_%d_edit_distance.pickle.bz2" % ( + bitmap_cls.NAME, pal.ID.value) + with bz2.open(data, "wb", compresslevel=9) as out: + pickle.dump(dist, out, protocol=pickle.HIGHEST_PROTOCOL) + + +def main(): + for p in palette.PALETTES.values(): + print("Processing palette %s" % p) + edp = compute_substitute_costs(p) + + # TODO: still worth using error distance matrices? + + make_edit_distance(p, edp, screen.HGRBitmap, colours.HGRColours) + make_edit_distance(p, edp, screen.DHGRBitmap, colours.DHGRColours) + + +if __name__ == "__main__": + main() diff --git a/transcoder/make_data_tables_test.py b/transcoder/make_data_tables_test.py new file mode 100644 index 0000000..92c4bc8 --- /dev/null +++ b/transcoder/make_data_tables_test.py @@ -0,0 +1,99 @@ +import sys +import unittest + +import numpy as np +from etaprogress.progress import ProgressBar + +import make_data_tables +import screen +from colours import HGRColours +from palette import PALETTES + + +class TestMakeDataTables(unittest.TestCase): + def test_pixel_string(self): + pixels = (HGRColours.BLACK, HGRColours.WHITE, HGRColours.ORANGE) + self.assertEqual("0FC", make_data_tables.pixel_string(pixels)) + + def test_edit_distances_dhgr(self): + """Assert invariants and symmetries of the edit distance matrices.""" + for p in PALETTES: + ed = screen.DHGRBitmap.edit_distances(p) + print(p) + + bar = ProgressBar((4 * 2 ** 13 * (2 ** 13 - 1)) / 2, max_width=80) + + cnt = 0 + for ph in range(3): + + # Only zero entries should be on diagonal, i.e. of form + # i << 13 + i + zeros = np.arange(len(ed[ph]))[ed[ph] == 0] + for z in zeros: + z1 = z & (2 ** 13 - 1) + z2 = (z >> 13) & (2 ** 13 - 1) + self.assertEqual(z1, z2) + + # Assert that matrix is symmetrical + for i in range(2 ** 13): + for j in range(i): + cnt += 1 + + if cnt % 10000 == 0: + bar.numerator = cnt + print(bar, end='\r') + sys.stdout.flush() + + self.assertEqual( + ed[ph][(i << 13) + j], + ed[ph][(j << 13) + i], + ) + + # Matrix is positive definite + self.assertGreaterEqual(ed[ph][(i << 13) + j], 0) + + def test_edit_distances_hgr(self): + """Assert invariants and symmetries of the edit distance matrices.""" + + for p in PALETTES: + ed = screen.HGRBitmap.edit_distances(p) + print(p) + + bar = ProgressBar((4 * 2 ** 14 * (2 ** 14 - 1)) / 2, max_width=80) + + cnt = 0 + for ph in range(2): + + # TODO: for HGR this invariant isn't true, all-0 and all-1 + # values for header/footer/body with/without palette bit can + # also have zero difference + # # Only zero entries should be on diagonal, i.e. of form + # # i << 14 + i + # zeros = np.arange(len(ed[ph]))[ed[ph] == 0] + # for z in zeros: + # z1 = z & (2**14-1) + # z2 = (z >> 14) & (2**14-1) + # if z1 != z2: + # self.assertEqual(z1, z2) + + # Assert that matrix is symmetrical + for i in range(2 ** 14): + for j in range(i): + cnt += 1 + + if cnt % 10000 == 0: + bar.numerator = cnt + print(bar, end='\r') + sys.stdout.flush() + + self.assertEqual( + ed[ph][(i << 14) + j], + ed[ph][(j << 14) + i], + ) + + # Matrix is positive definite + self.assertGreaterEqual(ed[ph][(i << 14) + j], 0) + + +if __name__ == '__main__': + unittest.main() diff --git a/transcoder/movie.py b/transcoder/movie.py index e6af86b..263fd80 100644 --- a/transcoder/movie.py +++ b/transcoder/movie.py @@ -3,9 +3,12 @@ from typing import Iterable, Iterator import audio +import frame_grabber import machine import opcodes import video +from palette import Palette +from video_mode import VideoMode class Movie: @@ -14,29 +17,37 @@ class Movie: every_n_video_frames: int = 1, audio_normalization: float = None, max_bytes_out: int = None, - video_mode: video.Mode = video.Mode.HGR, + video_mode: VideoMode = VideoMode.HGR, + palette: Palette = Palette.NTSC, ): self.filename = filename # type: str self.every_n_video_frames = every_n_video_frames # type: int self.max_bytes_out = max_bytes_out # type: int - self.video_mode = video_mode # type: video.Mode + self.video_mode = video_mode # type: VideoMode + self.palette = palette # type: Palette self.audio = audio.Audio( filename, normalization=audio_normalization) # type: audio.Audio + + self.frame_grabber = frame_grabber.FileFrameGrabber( + filename, mode=video_mode, palette=self.palette) self.video = video.Video( - filename, mode=video_mode, - ticks_per_second=self.audio.sample_rate + self.frame_grabber, + ticks_per_second=self.audio.sample_rate, + mode=video_mode, + palette=self.palette ) # type: video.Video + # Byte offset within TCP stream self.stream_pos = 0 # type: int + # Current audio tick opcode count within movie stream. self.ticks = 0 # type: int - self.state = machine.Machine( - self.video.memory_map, - self.video.update_priority - ) + # Tracks internal state of player virtual machine + self.state = machine.Machine() + # Currently operating on AUX memory bank? self.aux_memory_bank = False def encode(self) -> Iterator[opcodes.Opcode]: @@ -44,7 +55,7 @@ class Movie: :return: """ - video_frames = self.video.frames() + video_frames = self.frame_grabber.frames() main_seq = None aux_seq = None @@ -61,13 +72,10 @@ class Movie: if ((self.video.frame_number - 1) % self.every_n_video_frames == 0): print("Starting frame %d" % self.video.frame_number) - main_seq = self.video.encode_frame( - main, self.video.memory_map, self.video.update_priority) + main_seq = self.video.encode_frame(main, is_aux=False) if aux: - aux_seq = self.video.encode_frame( - aux, self.video.aux_memory_map, - self.video.aux_update_priority) + aux_seq = self.video.encode_frame(aux, is_aux=True) # au has range -15 .. 16 (step=1) # Tick cycles are units of 2 @@ -75,22 +83,24 @@ class Movie: tick += 34 # 4 .. 66 (step=2) (page, content, offsets) = next( - aux_seq if self.aux_memory_bank else main_seq) + aux_seq if self.aux_memory_bank else main_seq) yield opcodes.TICK_OPCODES[(tick, page)](content, offsets) - def _emit_bytes(self, _op): - """ + def _emit_bytes(self, _op: opcodes.Opcode) -> Iterable[int]: + """Emit compiled bytes corresponding to a player opcode. - :param _op: - :return: + Also tracks byte stream position. """ for b in self.state.emit(_op): yield b self.stream_pos += 1 def emit_stream(self, ops: Iterable[opcodes.Opcode]) -> Iterator[int]: - """ + """Emit compiled byte stream corresponding to opcode stream. + + Inserts padding opcodes at 2KB stream boundaries, to instruct player + to manage the TCP socket buffer. :param ops: :return: @@ -107,7 +117,7 @@ class Movie: if socket_pos >= 2044: # 2 op_ack address bytes + 2 payload bytes from ACK must # terminate 2K stream frame - if self.video_mode == video.Mode.DHGR: + if self.video_mode == VideoMode.DHGR: # Flip-flop between MAIN and AUX banks self.aux_memory_bank = not self.aux_memory_bank @@ -117,7 +127,7 @@ class Movie: yield from self.done() def done(self) -> Iterator[int]: - """Terminate opcode stream. + """Terminate byte stream by emitting terminal opcode and padding to 2KB. :return: """ diff --git a/transcoder/opcodes.py b/transcoder/opcodes.py index 4fa8d07..6c9b7bd 100644 --- a/transcoder/opcodes.py +++ b/transcoder/opcodes.py @@ -4,6 +4,7 @@ import enum from typing import Iterator, Tuple import symbol_table +import video_mode from machine import Machine @@ -64,7 +65,7 @@ class Header(Opcode): """Video header opcode.""" COMMAND = OpcodeCommand.HEADER - def __init__(self, mode: "video.Mode"): + def __init__(self, mode: video_mode.VideoMode): self.video_mode = mode def __data_eq__(self, other): diff --git a/transcoder/palette.py b/transcoder/palette.py new file mode 100644 index 0000000..7076eaf --- /dev/null +++ b/transcoder/palette.py @@ -0,0 +1,84 @@ +"""RGB palette values for rendering NominalColour pixels.""" + +import enum +from typing import Dict, Type + +import colormath.color_objects + +from colours import HGRColours + +# Type annotation +RGB = colormath.color_objects.sRGBColor + + +def rgb(r, g, b): + return RGB(r, g, b, is_upscaled=True) + + +class Palette(enum.Enum): + """BMP2DHR palette numbers.""" + + UNKNOWN = -1 + IIGS = 0 + NTSC = 5 + + +class BasePalette: + ID = Palette.UNKNOWN # type: Palette + + # Palette RGB map + RGB = {} # type: Dict[HGRColours: RGB] + + +class NTSCPalette(BasePalette): + ID = Palette.NTSC + + # Palette RGB values taken from BMP2DHGR's default NTSC palette + RGB = { + HGRColours.BLACK: rgb(0, 0, 0), + HGRColours.MAGENTA: rgb(148, 12, 125), + HGRColours.BROWN: rgb(99, 77, 0), + HGRColours.ORANGE: rgb(249, 86, 29), + HGRColours.DARK_GREEN: rgb(51, 111, 0), + HGRColours.GREY1: rgb(126, 126, 126), + HGRColours.GREEN: rgb(67, 200, 0), + HGRColours.YELLOW: rgb(221, 206, 23), + HGRColours.DARK_BLUE: rgb(32, 54, 212), + HGRColours.VIOLET: rgb(188, 55, 255), + HGRColours.GREY2: rgb(126, 126, 126), + HGRColours.PINK: rgb(255, 129, 236), + HGRColours.MED_BLUE: rgb(7, 168, 225), + HGRColours.LIGHT_BLUE: rgb(158, 172, 255), + HGRColours.AQUA: rgb(93, 248, 133), + HGRColours.WHITE: rgb(255, 255, 255) + } + + +class IIGSPalette(BasePalette): + ID = Palette.IIGS + + # Palette RGB values taken from BMP2DHGR's KEGS32 palette + RGB = { + HGRColours.BLACK: rgb(0, 0, 0), + HGRColours.MAGENTA: rgb(221, 0, 51), + HGRColours.BROWN: rgb(136, 85, 34), + HGRColours.ORANGE: rgb(255, 102, 0), + HGRColours.DARK_GREEN: rgb(0, 119, 0), + HGRColours.GREY1: rgb(85, 85, 85), + HGRColours.GREEN: rgb(0, 221, 0), + HGRColours.YELLOW: rgb(255, 255, 0), + HGRColours.DARK_BLUE: rgb(0, 0, 153), + HGRColours.VIOLET: rgb(221, 0, 221), + HGRColours.GREY2: rgb(170, 170, 170), + HGRColours.PINK: rgb(255, 153, 136), + HGRColours.MED_BLUE: rgb(34, 34, 255), + HGRColours.LIGHT_BLUE: rgb(102, 170, 255), + HGRColours.AQUA: rgb(0, 255, 153), + HGRColours.WHITE: rgb(255, 255, 255) + } + + +PALETTES = { + Palette.IIGS: IIGSPalette, + Palette.NTSC: NTSCPalette +} # type: Dict[Palette, Type[BasePalette]] diff --git a/transcoder/screen.py b/transcoder/screen.py index 5c03e9e..5029363 100644 --- a/transcoder/screen.py +++ b/transcoder/screen.py @@ -1,20 +1,20 @@ """Various representations of Apple II video display.""" +import bz2 +import functools +import pickle +from typing import Union, List, Optional, Tuple + import numpy as np +import palette as pal -# TODO: support DHGR - - -def bitmap_similarity(a1: np.array, a2: np.array) -> float: - """Measure bitwise % similarity between two bitmap arrays""" - bits_different = np.sum(np.logical_xor(a1, a2)).item() - - return 1. - (bits_different / (np.shape(a1)[0] * np.shape(a1)[1])) +# Type annotation for cases where we may process either an int or a numpy array. +IntOrArray = Union[np.uint64, np.ndarray] def y_to_base_addr(y: int, page: int = 0) -> int: - """Maps y coordinate to base address on given screen page""" + """Maps y coordinate to base address on given screen page.""" a = y // 64 d = y - 64 * a b = d // 8 @@ -30,6 +30,7 @@ Y_TO_BASE_ADDR = [ ] # Array mapping (page, offset) to x (byte) and y coords respectively +# TODO: is np.dtype(int) faster for these? PAGE_OFFSET_TO_X = np.zeros((32, 256), dtype=np.uint8) PAGE_OFFSET_TO_Y = np.zeros((32, 256), dtype=np.uint8) @@ -68,111 +69,6 @@ def _populate_mappings(): _populate_mappings() -class Bytemap: - """Bitmap array with horizontal pixels packed into bytes.""" - - def __init__(self, bytemap: np.array = None): - self.bytemap = None # type: np.array - if bytemap is not None: - if bytemap.shape != (192, 40): - raise ValueError("Unexpected shape: %r" % (bytemap.shape,)) - self.bytemap = bytemap - else: - self.bytemap = np.zeros((192, 40), dtype=np.uint8) - - def to_memory_map(self, screen_page: int) -> "MemoryMap": - # Numpy magic that constructs a new array indexed by (page, offset) - # instead of (y, x). - mmap = self.bytemap[PAGE_OFFSET_TO_Y, PAGE_OFFSET_TO_X] - # Reset whatever values ended up in the screen holes after this mapping - # (which came from default 0 values in PAGE_OFFSET_TO_X) - mmap[SCREEN_HOLES] = 0 - return MemoryMap(screen_page, mmap) - - -class Bitmap: - XMAX = None # type: int - YMAX = None # type: int - - def __init__(self, bitmap: np.array = None): - if bitmap is None: - self.bitmap = np.zeros((self.YMAX, self.XMAX), dtype=bool) - else: - if bitmap.shape != (self.YMAX, self.XMAX): - raise ValueError("Unexpected shape: %r" % (bitmap.shape,)) - self.bitmap = bitmap - - def randomize(self) -> None: - self.bitmap = np.random.randint( - 2, size=(self.YMAX, self.XMAX), dtype=bool) - - @staticmethod - def _to_bytemap(bitmap) -> Bytemap: - # Insert zero column after every 7 - pixels = bitmap.copy() - for i in range(pixels.shape[1] // 7 - 1, -1, -1): - pixels = np.insert(pixels, (i + 1) * 7, False, axis=1) - - # packbits is big-endian so we flip the array before and after to - # invert this - return Bytemap( - np.flip(np.packbits(np.flip(pixels, axis=1), axis=1), axis=1)) - - def to_bytemap(self) -> Bytemap: - return self._to_bytemap(self.bitmap) - - def to_memory_map(self, screen_page: int) -> "MemoryMap": - return self.to_bytemap().to_memory_map(screen_page) - - @staticmethod - def _from_bytemap(bytemap: Bytemap) -> np.array: - bm = np.unpackbits(bytemap.bytemap, axis=1) - bm = np.delete(bm, np.arange(0, bm.shape[1], 8), axis=1) - - # Need to flip each 7-bit sequence - reorder_cols = [] - for i in range(bm.shape[1] // 7): - for j in range((i + 1) * 7 - 1, i * 7 - 1, -1): - reorder_cols.append(j) - bm = bm[:, reorder_cols] - - return np.array(bm, dtype=np.bool) - - @classmethod - def from_bytemap(cls, bytemap: Bytemap) -> "Bitmap": - return cls(cls._from_bytemap(bytemap)) - - -class HGR140Bitmap(Bitmap): - XMAX = 140 # double-wide pixels to not worry about colour effects - YMAX = 192 - - def to_bytemap(self) -> Bytemap: - # Double each pixel horizontally - return self._to_bytemap(np.repeat(self.bitmap, 2, axis=1)) - - @classmethod - def from_bytemap(cls, bytemap: Bytemap) -> "HGR140Bitmap": - # Undouble pixels - bitmap = cls._from_bytemap(bytemap) - bitmap = np.array( - np.delete(bitmap, np.arange(0, bitmap.shape[1], 2), axis=1), - dtype=np.bool - ) - - return HGR140Bitmap(bitmap) - - -class HGRBitmap(Bitmap): - XMAX = 280 - YMAX = 192 - - -class DHGRBitmap(Bitmap): - XMAX = 560 - YMAX = 192 - - class FlatMemoryMap: """Linear 8K representation of HGR screen memory.""" @@ -223,11 +119,837 @@ class MemoryMap: def to_flat_memory_map(self) -> FlatMemoryMap: return FlatMemoryMap(self.screen_page, self.page_offset.reshape(8192)) - def to_bytemap(self) -> Bytemap: - bytemap = self.page_offset[X_Y_TO_PAGE, X_Y_TO_OFFSET] - return Bytemap(bytemap) - def write(self, page: int, offset: int, val: int) -> None: """Updates screen image to set (page, offset)=val (inc. screen holes)""" self.page_offset[page - self._page_start][offset] = val + + +class Bitmap: + """Packed bitmap representation of (D)HGR screen memory. + + Maintains a page-based array whose entries contain a packed representation + of multiple screen bytes, in a representation that supports efficiently + determining the visual effect of storing bytes at arbitrary screen offsets. + """ + + # NOTE: See https://github.com/numpy/numpy/issues/2524 and related issues + # for why we have to cast things explicitly to np.uint64 - type promotion + # to uint64 is broken in numpy :( + + # Name of bitmap type + NAME = None # type: str + + # Size of packed representation, consisting of header + body + footer + HEADER_BITS = None # type: np.uint64 + BODY_BITS = None # type: np.uint64 + FOOTER_BITS = None # type: np.uint64 + + # How many bits of packed representation are necessary to determine the + # effect of storing a memory byte, e.g. because they influence pixel + # colour or are influenced by other bits. + MASKED_BITS = None # type: np.uint64 + + # How many coloured screen pixels we can extract from MASKED_BITS. Note + # that this does not include the last 3 dots represented by the footer, + # since we don't have enough information to determine their colour (we + # would fall off the end of the 4-bit sliding window) + MASKED_DOTS = None # type: np.uint64 + + # List of bitmasks for extracting the subset of packed data corresponding + # to bits influencing/influenced by a given byte offset. These must be + # a contiguous bit mask, i.e. so that after shifting they are enumerated + # by 0..2**MASKED_BITS-1 + BYTE_MASKS = None # type: List[np.uint64] + BYTE_SHIFTS = None # type: List[np.uint64] + + # NTSC clock phase at first masked bit + PHASES = None # type: List[int] + + def __init__( + self, + palette: pal.Palette, + main_memory: MemoryMap, + aux_memory: Optional[MemoryMap] + ): + self.palette = palette # type: pal.Palette + self.main_memory = main_memory # type: MemoryMap + self.aux_memory = aux_memory # type: Optional[MemoryMap] + + self.PACKED_BITS = ( + self.HEADER_BITS + self.BODY_BITS + self.FOOTER_BITS + ) # type: np.uint64 + + # How many screen bytes we pack into a single scalar + self.SCREEN_BYTES = np.uint64(len(self.BYTE_MASKS)) # type: np.uint64 + + self.packed = np.empty( + shape=(32, 128), dtype=np.uint64) # type: np.ndarray + self._pack() + + # TODO: don't leak headers/footers across screen rows. We should be using + # x-y representation rather than page-offset + + @staticmethod + def _make_header(col: IntOrArray) -> IntOrArray: + """Extract values to use as header of next column.""" + raise NotImplementedError + + def _body(self) -> np.ndarray: + """Pack related screen bytes into an efficient representation.""" + raise NotImplementedError + + @staticmethod + def _make_footer(col: IntOrArray) -> IntOrArray: + """Extract values to use as footer of previous column.""" + raise NotImplementedError + + def _pack(self) -> None: + """Pack MemoryMap into efficient representation for diffing.""" + + body = self._body() + + # Prepend last 3 bits of previous odd byte so we can correctly + # decode the effective colours at the beginning of the 22-bit tuple + prev_col = np.roll(body, 1, axis=1).astype(np.uint64) + header = self._make_header(prev_col) + # Don't leak header across page boundaries + header[:, 0] = 0 + + # Append first 3 bits of next even byte so we can correctly + # decode the effective colours at the end of the 22-bit tuple + next_col = np.roll(body, -1, axis=1).astype(np.uint64) + footer = self._make_footer(next_col) + # Don't leak footer across page boundaries + footer[:, -1] = 0 + + self.packed = header ^ body ^ footer + + @staticmethod + def masked_update( + byte_offset: int, + old_value: IntOrArray, + new_value: np.uint8) -> IntOrArray: + """Update int/array to store new value at byte_offset in every entry. + + Does not patch up headers/footers of neighbouring columns. + """ + raise NotImplementedError + + @staticmethod + @functools.lru_cache(None) + def byte_offset(page_offset: int, is_aux: bool) -> int: + """Map screen offset for aux/main into offset within packed data.""" + raise NotImplementedError + + @staticmethod + @functools.lru_cache(None) + def _byte_offsets(is_aux: bool) -> Tuple[int, int]: + """Return byte offsets within packed data for AUX/MAIN memory.""" + raise NotImplementedError + + @classmethod + def to_dots(cls, masked_val: int, byte_offset: int) -> int: + """Convert masked representation to bit sequence of display dots.""" + raise NotImplementedError + + def apply( + self, + page: int, + offset: int, + is_aux: bool, + value: np.uint8) -> None: + """Update packed representation of changing main/aux memory.""" + + byte_offset = self.byte_offset(offset, is_aux) + packed_offset = offset // 2 + + self.packed[page, packed_offset] = self.masked_update( + byte_offset, self.packed[page, packed_offset], value) + self._fix_scalar_neighbours(page, packed_offset, byte_offset) + + def _fix_scalar_neighbours( + self, + page: int, + offset: int, + byte_offset: int) -> None: + """Fix up column headers/footers when updating a (page, offset).""" + + if byte_offset == 0 and offset > 0: + self.packed[page, offset - 1] = self._fix_column_left( + self.packed[page, offset - 1], + self.packed[page, offset] + ) + elif byte_offset == (self.SCREEN_BYTES - 1) and offset < 127: + # Need to also update the 3-bit header of the next column + self.packed[page, offset + 1] = self._fix_column_right( + self.packed[page, offset + 1], + self.packed[page, offset] + ) + + def _fix_column_left( + self, + column_left: IntOrArray, + column: IntOrArray + ) -> IntOrArray: + """Patch up the footer of the column to the left.""" + + # Mask out footer(s) + column_left &= np.uint64(2 ** (self.HEADER_BITS + self.BODY_BITS) - 1) + column_left ^= self._make_footer(column) + + return column_left + + def _fix_column_right( + self, + column_right: IntOrArray, + column: IntOrArray + ) -> IntOrArray: + """Patch up the header of the column to the right.""" + + # Mask out header(s) + column_right &= np.uint64( + (2 ** (self.BODY_BITS + self.FOOTER_BITS) - 1)) << self.HEADER_BITS + column_right ^= self._make_header(column) + + return column_right + + def _fix_array_neighbours( + self, + ary: np.ndarray, + byte_offset: int + ) -> None: + """Fix up column headers/footers for all array entries.""" + + # TODO: don't leak header/footer across page boundaries + + # Propagate new value into neighbouring byte headers/footers if + # necessary + if byte_offset == 0: + # Need to also update the footer of the preceding column + shifted_left = np.roll(ary, -1, axis=1) + self._fix_column_left(ary, shifted_left) + + elif byte_offset == (self.SCREEN_BYTES - 1): + # Need to also update the header of the next column + shifted_right = np.roll(ary, 1, axis=1) + self._fix_column_right(ary, shifted_right) + + @classmethod + @functools.lru_cache(None) + def edit_distances(cls, palette_id: pal.Palette) -> List[np.ndarray]: + """Load edit distance matrices for masked, shifted byte values.""" + + data = "transcoder/data/%s_palette_%d_edit_distance.pickle.bz2" % ( + cls.NAME, + palette_id.value + ) + with bz2.open(data, "rb") as ed: + dist = pickle.load(ed) # type: List[np.ndarray] + + # dist is an upper-triangular matrix of edit_distance(a, b) + # encoded as dist[(a << N) + b] = edit_distance(a, b) + # Because the distance metric is reflexive, + # edit_distance(b, a) = edit_distance(a, b) + + identity = np.arange(2 ** (2 * cls.MASKED_BITS), dtype=np.uint64) + # Swap values of form a << N + b to b << N + a + transpose = (identity >> cls.MASKED_BITS) + ( + (identity & np.uint64(2 ** cls.MASKED_BITS - 1)) << + cls.MASKED_BITS) + + for i in range(len(dist)): + dist[i][transpose] += dist[i][identity] + + return dist + + @classmethod + def mask_and_shift_data( + cls, + data: IntOrArray, + byte_offset: int) -> IntOrArray: + """Masks and shifts packed data into the MASKED_BITS range.""" + res = (data & cls.BYTE_MASKS[byte_offset]) >> ( + cls.BYTE_SHIFTS[byte_offset]) + assert np.all(res <= 2 ** cls.MASKED_BITS) + return res + + # Can't cache all possible values but this seems to give a good enough hit + # rate without costing too much memory + # TODO: unit tests + @functools.lru_cache(10 ** 6) + def byte_pair_difference( + self, + byte_offset: int, + old_packed: np.uint64, + content: np.uint8 + ) -> np.uint16: + """Compute effect of storing a new content byte within packed data.""" + + old_pixels = self.mask_and_shift_data(old_packed, byte_offset) + new_pixels = self.mask_and_shift_data( + self.masked_update(byte_offset, old_packed, content), byte_offset) + + pair = (old_pixels << self.MASKED_BITS) + new_pixels + + return self.edit_distances(self.palette)[byte_offset][pair] + + def diff_weights( + self, + source: "Bitmap", + is_aux: bool + ) -> np.ndarray: + """Compute edit distance matrix from source bitmap.""" + return self._diff_weights(source.packed, is_aux) + + # TODO: unit test + def _diff_weights( + self, + source_packed: np.ndarray, + is_aux: bool, + content: np.uint8 = None + ) -> np.ndarray: + """Computes edit distance matrix from source_packed to self.packed + + If content is set, the distance will be computed as if this value + was stored into each offset position of source_packed, i.e. to + allow evaluating which offsets (if any) should be chosen for storing + this content byte. + """ + + diff = np.ndarray((32, 256), dtype=np.int) + + offsets = self._byte_offsets(is_aux) + + dists = [] + for o in offsets: + if content is not None: + compare_packed = self.masked_update(o, source_packed, content) + self._fix_array_neighbours(compare_packed, o) + else: + compare_packed = source_packed + + # Pixels influenced by byte offset o + source_pixels = self.mask_and_shift_data(compare_packed, o) + target_pixels = self.mask_and_shift_data(self.packed, o) + + # Concatenate N-bit source and target into 2N-bit values + pair = (source_pixels << self.MASKED_BITS) + target_pixels + dist = self.edit_distances(self.palette)[o][pair].reshape( + pair.shape) + dists.append(dist) + + # Interleave even/odd columns + diff[:, 0::2] = dists[0] + diff[:, 1::2] = dists[1] + + return diff + + def _check_consistency(self): + """Sanity check that headers and footers are consistent.""" + + headers = np.roll(self._make_header(self.packed), 1, axis=1).astype( + np.uint64) + + footers = np.roll(self._make_footer(self.packed), -1, axis=1).astype( + np.uint64) + + mask_hf = np.uint64(0b1110000000000000000000000000000111) + + res = (self.packed ^ headers ^ footers) & mask_hf + nz = np.transpose(np.nonzero(res)) + + ok = True + if nz.size != 0: + for p, o in nz.tolist(): + if o == 0 or o == 127: + continue + ok = False + print(p, o, bin(self.packed[p, o - 1]), + bin(headers[p, o]), + bin(self.packed[p, o]), + bin(self.packed[p, o + 1]), bin(footers[p, o]), + bin(res[p, o]) + ) + assert ok + + # TODO: unit tests + def compute_delta( + self, + content: int, + diff_weights: np.ndarray, + is_aux: bool + ) -> np.ndarray: + """Compute which content stores introduce the least additional error. + + We compute the effect of storing content at all possible offsets + within self.packed, and then subtract the previous diff weights. + + Negative values indicate that the new content value is closer to the + target than the current content. + """ + # TODO: use error edit distance? + + new_diff = self._diff_weights(self.packed, is_aux, content) + + # TODO: try different weightings + return (new_diff * 5) - diff_weights + + +class HGRBitmap(Bitmap): + """Packed bitmap representation of HGR screen memory. + + The HGR display is encoded in a somewhat complicated way, so we have to + do a bit of work to turn it into a useful format. + + Each screen byte consists of a palette bit (7) and 6 data bits (0..6) + + Each non-palette bit turns on two consecutive display dots, with bit 6 + repeated a third time. This third dot may or may not be overwritten by the + effect of the next byte. + + Turning on the palette bit shifts that byte's dots right by one + position. + + Given two neighbouring screen bytes Aaaaaaaa, Bbbbbbbb (at even and odd + offsets), where capital letter indicates the position of the palette bit, + we use the following 22-bit packed representation: + + 2211111111110000000000 <-- bit position in uint22 + 1098765432109876543210 + ffFbbbbbbbBAaaaaaaaHhh + + h and f are headers/footers derived from the neighbouring screen bytes. + + Since our colour artifact model (see colours.py) uses a sliding 4-bit window + onto the dot string, we need to also include a 3-bit header and footer + to account for the influence from/on neighbouring bytes, i.e. adjacent + packed values. These are just the low/high 2 data bits of the 16-bit + body of those neighbouring columns, plus the corresponding palette bit. + + This 22-bit packed representation is sufficient to compute the effects + (on pixel colours) of storing a byte at even or odd offsets. From it we + can extract the bit stream of displayed HGR dots, and the mapping to pixel + colours follows the HGRColours bitmap, see colours.py. + + We put the two A/B palette bits next to each other so that we can + mask a contiguous range of bits whose colours influence/are influenced by + storing a byte at a given offset. + + We need to mask out bit subsequences of size 3+8+3=14, i.e. the 8-bits + corresponding to the byte being stored, plus the neighbouring 3 bits that + influence it/are influenced by it. + + Note that the masked representation has the same size for both offsets ( + 14 bits), but different meaning, since the palette bit is in a different + position. + + With this masked representation, we can precompute an edit distance for the + pixel changes resulting from all possible HGR byte stores, see + make_edit_distance.py. + + The edit distance matrix is encoded by concatenating the 14-bit source + and target masked values into a 28-bit pair, which indexes into the + edit_distance array to give the corresponding edit distance. + """ + NAME = 'HGR' + + # Size of packed representation, consisting of header + body + footer + HEADER_BITS = np.uint64(3) + # 2x 8-bit screen bytes + BODY_BITS = np.uint64(16) + FOOTER_BITS = np.uint64(3) + + # How many bits of packed representation are necessary to determine the + # effect of storing a memory byte, e.g. because they influence pixel + # colour or are influenced by other bits. + MASKED_BITS = np.uint64(14) # 3 + 8 + 3 + + # How many coloured screen pixels we can extract from MASKED_BITS. Note + # that this does not include the last 3 dots represented by the footer, + # since we don't have enough information to determine their colour (we + # would fall off the end of the 4-bit sliding window) + # + # From header: 3 bits (2 HGR pixels but might be shifted right by palette) + # From body: 7 bits doubled, plus possible shift from palette bit + MASKED_DOTS = np.uint64(18) # 3 + 7 + 7 + + # List of bitmasks for extracting the subset of packed data corresponding + # to bits influencing/influenced by a given byte offset. These must be + # a contiguous bit mask, i.e. so that after shifting they are enumerated + # by 0..2**MASKED_BITS-1 + BYTE_MASKS = [ + np.uint64(0b0000000011111111111111), + np.uint64(0b1111111111111100000000) + ] + BYTE_SHIFTS = [np.uint64(0), np.uint64(8)] + + # NTSC clock phase at first masked bit + # + # Each HGR byte offset has the same range of uint14 possible + # values and nominal colour pixels, but with different initial + # phases: + # even: 0 (1 at start of 3-bit header) + # odd: 2 (3) + PHASES = [1, 3] + + def __init__(self, palette: pal.Palette, main_memory: MemoryMap): + super(HGRBitmap, self).__init__(palette, main_memory, None) + + @staticmethod + def _make_header(col: IntOrArray) -> IntOrArray: + """Extract values to use as header of next column. + + Header format is bits 5,6,0 of previous screen byte + i.e. offsets 17, 18, 11 in packed representation + """ + + return ( + (col & np.uint64(0b1 << 11)) >> np.uint64(9) ^ ( + (col & np.uint64(0b11 << 17)) >> np.uint64(17)) + ) + + def _body(self) -> np.ndarray: + """Pack related screen bytes into an efficient representation. + + Body is of the form: + bbbbbbbBAaaaaaaa + + where capital indicates the palette bit. + """ + + even = self.main_memory.page_offset[:, 0::2].astype(np.uint64) + odd = self.main_memory.page_offset[:, 1::2].astype(np.uint64) + + return ( + (even << 3) + ((odd & 0x7f) << 12) + ((odd & 0x80) << 4) + ) + + @staticmethod + def _make_footer(col: IntOrArray) -> IntOrArray: + """Extract values to use as footer of previous column. + + Footer format is bits 7,0,1 of next screen byte + i.e. offsets 10,3,4 in packed representation + """ + + return ( + (col & np.uint64(0b1 << 10)) >> np.uint64(10) ^ ( + (col & np.uint64(0b11 << 3)) >> np.uint64(2)) + ) << np.uint64(19) + + @staticmethod + @functools.lru_cache(None) + def byte_offset(page_offset: int, is_aux: bool) -> int: + """Returns 0..1 offset in packed representation for page_offset.""" + + assert not is_aux + is_odd = page_offset % 2 == 1 + + return 1 if is_odd else 0 + + @staticmethod + @functools.lru_cache(None) + def _byte_offsets(is_aux: bool) -> Tuple[int, int]: + """Return byte offsets within packed data for AUX/MAIN memory.""" + + assert not is_aux + return 0, 1 + + @staticmethod + @functools.lru_cache(None) + def _double_pixels(int7: int) -> int: + """Each bit 0..6 controls two hires dots. + + Input bit 6 is repeated 3 times in case the neighbouring byte is + delayed (right-shifted by one dot) due to the palette bit being set, + which means the effect of this byte is "extended" by an extra dot. + + Care needs to be taken to mask this out when overwriting. + """ + double = ( + # Bit pos 6 + ((int7 & 0x40) << 8) + ((int7 & 0x40) << 7) + ( + (int7 & 0x40) << 6) + + # Bit pos 5 + ((int7 & 0x20) << 6) + ((int7 & 0x20) << 5) + + # Bit pos 4 + ((int7 & 0x10) << 5) + ((int7 & 0x10) << 4) + + # Bit pos 3 + ((int7 & 0x08) << 4) + ((int7 & 0x08) << 3) + + # Bit pos 2 + ((int7 & 0x04) << 3) + ((int7 & 0x04) << 2) + + # Bit pos 1 + ((int7 & 0x02) << 2) + ((int7 & 0x02) << 1) + + # Bit pos 0 + ((int7 & 0x01) << 1) + (int7 & 0x01) + ) + + return double + + @classmethod + def to_dots(cls, masked_val: int, byte_offset: int) -> int: + """Convert masked representation to bit sequence of display dots. + + Packed representation is of the form: + ffFbbbbbbbBAaaaaaaaHhh + + where capital indicates the palette bit. + + Each non-palette bit turns on two display dots, with bit 6 repeated + a third time. This may or may not be overwritten by the next byte. + + Turning on the palette bit shifts that byte's dots right by one + position. + """ + + # Assert 14-bit representation + assert (masked_val & (2 ** 14 - 1)) == masked_val + + # Take top 3 bits from header (plus duplicated MSB) not 4, because if it + # is palette-shifted then we don't know what is in bit 0 + h = (masked_val & 0b111) << 5 + hp = (h & 0x80) >> 7 + res = cls._double_pixels(h & 0x7f) >> (11 - hp) + + if byte_offset == 0: + # Offset 0: bbBAaaaaaaaHhh + b = (masked_val >> 3) & 0xff + bp = (b & 0x80) >> 7 + else: + # Offset 1: ffFbbbbbbbBAaa + bp = (masked_val >> 3) & 0x01 + b = ((masked_val >> 4) & 0x7f) ^ (bp << 7) + + # Mask out current contents in case we are overwriting the extended + # high bit from previous screen byte + res &= ~((2 ** 14 - 1) << (3 + bp)) + res ^= cls._double_pixels(b & 0x7f) << (3 + bp) + + f = ((masked_val >> 12) & 0b11) ^ ( + (masked_val >> 11) & 0b01) << 7 + fp = (f & 0x80) >> 7 + + # Mask out current contents in case we are overwriting the extended + # high bit from previous screen byte + res &= ~((2 ** 4 - 1) << (17 + fp)) + res ^= cls._double_pixels(f & 0x7f) << (17 + fp) + return res & (2 ** 21 - 1) + + @staticmethod + def masked_update( + byte_offset: int, + old_value: IntOrArray, + new_value: np.uint8) -> IntOrArray: + """Update int/array to store new value at byte_offset in every entry. + + Does not patch up headers/footers of neighbouring columns. + """ + + if byte_offset == 0: + # Mask out 8-bit value where update will go + masked_value = old_value & (~np.uint64(0xff << 3)) + + update = np.uint64(new_value) << np.uint64(3) + return masked_value ^ update + else: + # Mask out 8-bit value where update will go + masked_value = old_value & (~np.uint64(0xff << 11)) + + # shift palette bit into position 0 + shifted_new_value = ( + (new_value & 0x7f) << 1) ^ ( + (new_value & 0x80) >> 7) + update = np.uint64(shifted_new_value) << np.uint64(11) + return masked_value ^ update + + +class DHGRBitmap(Bitmap): + """Packed bitmap representation of DHGR screen memory. + + The DHGR display encodes 7 pixels across interleaved 4-byte sequences + of AUX and MAIN memory, as follows: + + PBBBAAAA PDDCCCCB PFEEEEDD PGGGGFFF + Aux N Main N Aux N+1 Main N+1 (N even) + + Where A..G are the pixels, and P represents the (unused) palette bit. + + This layout makes more sense when written as a (little-endian) 32-bit + integer: + + 33222222222211111111110000000000 <- bit pos in uint32 + 10987654321098765432109876543210 + PGGGGFFFPFEEEEDDPDDCCCCBPBBBAAAA + + i.e. apart from the palette bits this is a linear ordering of pixels, + when read from LSB to MSB (i.e. right-to-left). i.e. the screen layout + order of bits is opposite to the usual binary representation ordering. + + We can simplify things by stripping out the palette bit and packing + down to a 28-bit integer representation: + + 33222222222211111111110000000000 <- bit pos in uint32 + 10987654321098765432109876543210 + + GGGGFFFFEEEEDDDDCCCCBBBBAAAA <- pixel A..G + 3210321032103210321032103210 <- bit pos in A..G pixel + + 3333333222222211111110000000 <- byte offset 0.3 + + Since our colour artifact model (see colours.py) uses a sliding 4-bit window + onto the dot string, we need to also include a 3-bit header and footer + to account for the influence from/on neighbouring bytes, i.e. adjacent + packed values. These are just the low/high 3 bits of the 28-bit body of + those neighbouring columns. + + This gives a 34-bit packed representation that is sufficient to compute + the effects (on pixel colours) of storing a byte at one of the 0..3 offsets. + + Note that this representation is also 1:1 with the actual displayed + DHGR dots. The mapping to pixel colours follows the DHGRColours + bitmap, see colours.py. + + Because the packed representation is contiguous, we need to mask out bit + subsequences of size 3+7+3=13, i.e. the 7-bits corresponding to the + byte being stored, plus the neighbouring 3 bits that influence it/are + influenced by it. + + With this masked representation, we can precompute an edit distance for the + pixel changes resulting from all possible DHGR byte stores, see + make_edit_distance.py. + + The edit distance matrix is encoded by concatenating the 13-bit source + and target masked values into a 26-bit pair, which indexes into the + edit_distance array to give the corresponding edit distance. + """ + + NAME = 'DHGR' + + # Packed representation is 3 + 28 + 3 = 34 bits + HEADER_BITS = np.uint64(3) + BODY_BITS = np.uint64(28) + FOOTER_BITS = np.uint64(3) + + # Masked representation selecting the influence of each byte offset + MASKED_BITS = np.uint64(13) # 7-bit body + 3-bit header + 3-bit footer + + # Masking is 1:1 with screen dots, but we can't compute the colour of the + # last 3 dots because we fall off the end of the 4-bit sliding window + MASKED_DOTS = np.uint64(10) + + # 3-bit header + 28-bit body + 3-bit footer + BYTE_MASKS = [ + # 3333222222222211111111110000000000 <- bit pos in uint64 + # 3210987654321098765432109876543210 + # tttGGGGFFFFEEEEDDDDCCCCBBBBAAAAhhh <- pixel A..G + # 3210321032103210321032103210 <- bit pos in A..G pixel + # + # 3333333222222211111110000000 <- byte offset 0.3 + np.uint64(0b0000000000000000000001111111111111), # byte 0 uint13 mask + np.uint64(0b0000000000000011111111111110000000), # byte 1 uint13 mask + np.uint64(0b0000000111111111111100000000000000), # byte 2 uint13 mask + np.uint64(0b1111111111111000000000000000000000), # byte 3 uint13 mask + ] + + # How much to right-shift bits after masking, to bring into uint13 range + BYTE_SHIFTS = [np.uint64(0), np.uint64(7), np.uint64(14), np.uint64(21)] + + # NTSC clock phase at first masked bit + # + # Each DHGR byte offset has the same range of uint13 possible + # values and nominal colour pixels, but with different initial + # phases: + # AUX 0: 0 (1 at start of 3-bit header) + # MAIN 0: 3 (0) + # AUX 1: 2 (3) + # MAIN 1: 1 (2) + PHASES = [1, 0, 3, 2] + + @staticmethod + def _make_header(col: IntOrArray) -> IntOrArray: + """Extract upper 3 bits of body for header of next column.""" + return (col & np.uint64(0b111 << 28)) >> np.uint64(28) + + def _body(self) -> np.ndarray: + """Pack related screen bytes into an efficient representation. + + For DHGR we first strip off the (unused) palette bit to produce + 7-bit values, then interleave aux and main memory columns and pack + these 7-bit values into 28-bits. This sequentially encodes 7 4-bit + DHGR pixels, which is the "repeating unit" of the DHGR screen, and + in a form that is convenient to operate on. + + We also shift to make room for the 3-bit header. + """ + + # Palette bit is unused for DHGR so mask it out + aux = (self.aux_memory.page_offset & 0x7f).astype(np.uint64) + main = (self.main_memory.page_offset & 0x7f).astype(np.uint64) + + return ( + (aux[:, 0::2] << 3) + + (main[:, 0::2] << 10) + + (aux[:, 1::2] << 17) + + (main[:, 1::2] << 24) + ) + + @staticmethod + def _make_footer(col: IntOrArray) -> IntOrArray: + """Extract lower 3 bits of body for footer of previous column.""" + return (col & np.uint64(0b111 << 3)) << np.uint64(28) + + @staticmethod + @functools.lru_cache(None) + def byte_offset(page_offset: int, is_aux: bool) -> int: + """Returns 0..3 packed byte offset for a given page_offset and is_aux""" + + is_odd = page_offset % 2 == 1 + if is_aux: + if is_odd: + return 2 + return 0 + else: # main memory + if is_odd: + return 3 + else: + return 1 + + @staticmethod + @functools.lru_cache(None) + def _byte_offsets(is_aux: bool) -> Tuple[int, int]: + """Return byte offsets within packed data for AUX/MAIN memory.""" + + if is_aux: + offsets = (0, 2) + else: + offsets = (1, 3) + + return offsets + + @classmethod + def to_dots(cls, masked_val: int, byte_offset: int) -> int: + """Convert masked representation to bit sequence of display dots. + + For DHGR the 13-bit masked value is already a 13-bit dot sequence + so no need to transform it. + """ + + return masked_val + + @staticmethod + def masked_update( + byte_offset: int, + old_value: IntOrArray, + new_value: np.uint8) -> IntOrArray: + """Update int/array to store new value at byte_offset in every entry. + + Does not patch up headers/footers of neighbouring columns. + """ + # Mask out 7-bit value where update will go + masked_value = old_value & ( + ~np.uint64(0x7f << (7 * byte_offset + 3))) + + update = (new_value & np.uint64(0x7f)) << np.uint64( + 7 * byte_offset + 3) + return masked_value ^ update diff --git a/transcoder/screen_test.py b/transcoder/screen_test.py new file mode 100644 index 0000000..2bcbb5b --- /dev/null +++ b/transcoder/screen_test.py @@ -0,0 +1,1133 @@ +"""Tests for the screen module.""" + +import unittest + +import numpy as np + +import screen +import colours +from palette import Palette + + +def binary(a): + return np.vectorize("{:032b}".format)(a) + + +class TestDHGRBitmap(unittest.TestCase): + def setUp(self) -> None: + self.aux = screen.MemoryMap(screen_page=1) + self.main = screen.MemoryMap(screen_page=1) + + def test_make_header(self): + """Header extracted correctly from packed representation.""" + + self.assertEqual( + 0b100, + screen.DHGRBitmap._make_header( + np.uint64(0b0001000011111010110000111110101000)) + ) + + def test_make_footer(self): + """Footer extracted correctly from packed representation.""" + + self.assertEqual( + 0b1010000000000000000000000000000000, + screen.DHGRBitmap._make_footer( + np.uint64(0b0001000011111010110000111110101000)) + ) + + def test_pixel_packing_offset_0(self): + """Screen byte packing happens correctly at offset 0.""" + + # PBBBAAAA + self.aux.page_offset[0, 0] = 0b11110101 + # PDDCCCCB + self.main.page_offset[0, 0] = 0b01000011 + # PFEEEEDD + self.aux.page_offset[0, 1] = 0b11110101 + # PGGGGFFF + self.main.page_offset[0, 1] = 0b01000011 + + dhgr = screen.DHGRBitmap( + main_memory=self.main, aux_memory=self.aux, palette=Palette.NTSC) + + self.assertEqual( + 0b0001000011111010110000111110101000, + dhgr.packed[0, 0] + ) + + # Check header on neighbouring byte + self.assertEqual( + 0b0000000000000000000000000000000100, + dhgr.packed[0, 1] + ) + + # No other entries should be set, in particular no footer since we + # are at packed offset 0 + self.assertEqual(2, np.count_nonzero(dhgr.packed)) + + def test_pixel_packing_offset_1(self): + """Screen byte packing happens correctly at offset 1.""" + + # PBBBAAAA + self.aux.page_offset[0, 2] = 0b11110101 + # PDDCCCCB + self.main.page_offset[0, 2] = 0b01000011 + # PFEEEEDD + self.aux.page_offset[0, 3] = 0b11110101 + # PGGGGFFF + self.main.page_offset[0, 3] = 0b01000011 + + dhgr = screen.DHGRBitmap( + main_memory=self.main, aux_memory=self.aux, palette=Palette.NTSC) + + self.assertEqual( + 0b0001000011111010110000111110101000, + dhgr.packed[0, 1] + ) + + # Check footer on neighbouring byte + self.assertEqual( + 0b1010000000000000000000000000000000, + dhgr.packed[0, 0] + ) + + # Check header on neighbouring byte + self.assertEqual( + 0b0000000000000000000000000000000100, + dhgr.packed[0, 2] + ) + + # No other entries should be set + self.assertEqual(3, np.count_nonzero(dhgr.packed)) + + def test_pixel_packing_offset_127(self): + """Screen byte packing happens correctly at offset 127.""" + + # PBBBAAAA + self.aux.page_offset[0, 254] = 0b11110101 + # PDDCCCCB + self.main.page_offset[0, 254] = 0b01000011 + # PFEEEEDD + self.aux.page_offset[0, 255] = 0b11110101 + # PGGGGFFF + self.main.page_offset[0, 255] = 0b01000011 + + dhgr = screen.DHGRBitmap( + main_memory=self.main, aux_memory=self.aux, palette=Palette.NTSC) + + self.assertEqual( + 0b0001000011111010110000111110101000, + dhgr.packed[0, 127] + ) + + # Check footer on neighbouring byte + self.assertEqual( + 0b1010000000000000000000000000000000, + dhgr.packed[0, 126] + ) + + # No other entries should be set, in particular header should not + # propagate to next row + self.assertEqual(2, np.count_nonzero(dhgr.packed)) + + def test_byte_offset(self): + """Test the byte_offset behaviour.""" + + self.assertEqual(0, screen.DHGRBitmap.byte_offset(0, is_aux=True)) + self.assertEqual(1, screen.DHGRBitmap.byte_offset(0, is_aux=False)) + self.assertEqual(2, screen.DHGRBitmap.byte_offset(1, is_aux=True)) + self.assertEqual(3, screen.DHGRBitmap.byte_offset(1, is_aux=False)) + + def test_byte_offsets(self): + """Test the _byte_offsets behaviour.""" + + self.assertEqual((0, 2), screen.DHGRBitmap._byte_offsets(is_aux=True)) + self.assertEqual((1, 3), screen.DHGRBitmap._byte_offsets(is_aux=False)) + + def test_mask_and_shift_data(self): + """Verify that mask_and_shift_data extracts the right bit positions.""" + + int13_max = np.uint64(2 ** 13 - 1) + int34_max = np.uint64(2 ** 34 - 1) + + dhgr = screen.DHGRBitmap( + main_memory=self.main, aux_memory=self.aux, palette=Palette.NTSC) + + for o in range(3): + self.assertEqual( + int13_max, + dhgr.mask_and_shift_data( + screen.DHGRBitmap.BYTE_MASKS[o], o + ) + ) + + # Now check complement, i.e. no bits taken from outside expected + # range + self.assertEqual( + 0, + dhgr.mask_and_shift_data( + ~screen.DHGRBitmap.BYTE_MASKS[o] & int34_max, o + ) + ) + + def test_masked_update(self): + """Verify that masked_update updates the expected bit positions.""" + + self.assertEqual( + 0b0000000000000000000000001111111000, + screen.DHGRBitmap.masked_update( + 0, np.uint64(0), np.uint8(0xff)) + ) + self.assertEqual( + 0b0000000000000000011111110000000000, + screen.DHGRBitmap.masked_update( + 1, np.uint64(0), np.uint8(0xff)) + ) + self.assertEqual( + 0b0000000000111111100000000000000000, + screen.DHGRBitmap.masked_update( + 2, np.uint64(0), np.uint8(0xff)) + ) + self.assertEqual( + 0b0001111111000000000000000000000000, + screen.DHGRBitmap.masked_update( + 3, np.uint64(0), np.uint8(0xff)) + ) + + # Now test masking out existing values + + int34_max = np.uint64(2 ** 34 - 1) + + self.assertEqual( + 0b1111111111111111111111110000000111, + screen.DHGRBitmap.masked_update(0, int34_max, np.uint8(0x00)) + ) + self.assertEqual( + 0b1111111111111111100000001111111111, + screen.DHGRBitmap.masked_update(1, int34_max, np.uint8(0x00)) + ) + self.assertEqual( + 0b1111111111000000011111111111111111, + screen.DHGRBitmap.masked_update(2, int34_max, np.uint8(0x00)) + ) + self.assertEqual( + 0b1110000000111111111111111111111111, + screen.DHGRBitmap.masked_update(3, int34_max, np.uint8(0x00)) + ) + + # Test that masked_update can broadcast to numpy arrays + ary = np.zeros((2, 2), dtype=np.uint64) + + elt = np.uint64(0b1111111000) + self.assertTrue(np.array_equal( + np.array([[elt, elt], [elt, elt]], dtype=np.uint64), + screen.DHGRBitmap.masked_update(0, ary, np.uint8(0xff)) + )) + + def test_apply(self): + """Test that apply() correctly updates neighbours.""" + + dhgr = screen.DHGRBitmap( + main_memory=self.main, aux_memory=self.aux, palette=Palette.NTSC) + + dhgr.apply(page=0, offset=0, is_aux=True, value=np.uint8(0xff)) + self.assertEqual(0b1111111000, dhgr.packed[0, 0]) + + dhgr.apply(page=12, offset=36, is_aux=True, value=np.uint8(0xff)) + # Neighbouring header + self.assertEqual( + 0, + dhgr.packed[12, 19]) + # Body + self.assertEqual( + 0b1111111000, + dhgr.packed[12, 18]) + # Neighbouring footer + self.assertEqual( + 0b1110000000000000000000000000000000, + dhgr.packed[12, 17]) + + # Now update the next aux offset in same uint64 + dhgr.apply(page=12, offset=37, is_aux=True, value=np.uint8(0xff)) + # Neighbouring header + self.assertEqual( + 0, + dhgr.packed[12, 19]) + # Body + self.assertEqual( + 0b0000000111111100000001111111000, + dhgr.packed[12, 18] + ) + # Neighbouring footer + self.assertEqual( + 0b1110000000000000000000000000000000, + dhgr.packed[12, 17]) + + # Update offset 3, should propagate to next header + dhgr.apply(page=12, offset=37, is_aux=False, value=np.uint8(0b1010101)) + self.assertEqual( + 0b101, + dhgr.packed[12, 19]) + self.assertEqual( + 0b1010101111111100000001111111000, + dhgr.packed[12, 18] + ) + self.assertEqual( + 0b1110000000000000000000000000000000, + dhgr.packed[12, 17]) + + dhgr.apply(page=12, offset=36, is_aux=False, value=np.uint8(0b0001101)) + self.assertEqual( + 0b101, + dhgr.packed[12, 19]) + self.assertEqual( + 0b1010101111111100011011111111000, + dhgr.packed[12, 18] + ) + self.assertEqual( + 0b1110000000000000000000000000000000, + dhgr.packed[12, 17]) + + # Change offset 0, should propagate to neighbouring footer + dhgr.apply(page=12, offset=36, is_aux=True, value=np.uint8(0b0001101)) + # Neighbouring header + self.assertEqual( + 0b101, + dhgr.packed[12, 19]) + self.assertEqual( + 0b1010101111111100011010001101000, + dhgr.packed[12, 18] + ) + # Neighbouring footer + self.assertEqual( + 0b1010000000000000000000000000000000, + dhgr.packed[12, 17]) + + # Now propagate new header from neighbour onto (12, 18) + dhgr.apply(page=12, offset=35, is_aux=False, value=np.uint8(0b1010101)) + self.assertEqual( + 0b1010101111111100011010001101101, + dhgr.packed[12, 18] + ) + # Neighbouring footer + self.assertEqual( + 0b1011010101000000000000000000000000, + dhgr.packed[12, 17]) + + def test_fix_array_neighbours(self): + """Test that _fix_array_neighbours DTRT after masked_update.""" + + dhgr = screen.DHGRBitmap( + main_memory=self.main, aux_memory=self.aux, palette=Palette.NTSC) + + packed = dhgr.masked_update(0, dhgr.packed, np.uint8(0x7f)) + dhgr._fix_array_neighbours(packed, 0) + + # Should propagate to all footers + self.assertEqual( + 0, np.count_nonzero( + packed[packed != 0b1110000000000000000000001111111000] + ) + ) + + # Should not change headers/footers + packed = dhgr.masked_update(1, packed, np.uint8(0b1010101)) + dhgr._fix_array_neighbours(packed, 1) + + self.assertEqual( + 0, np.count_nonzero( + packed[packed != 0b1110000000000000010101011111111000] + ) + ) + + # Should propagate to all headers + packed = dhgr.masked_update(3, packed, np.uint8(0b0110110)) + dhgr._fix_array_neighbours(packed, 3) + + self.assertEqual( + 0, np.count_nonzero( + packed[packed != 0b1110110110000000010101011111111011] + ) + ) + + +class TestHGRBitmap(unittest.TestCase): + def setUp(self) -> None: + self.main = screen.MemoryMap(screen_page=1) + + def test_make_header(self): + """Header extracted correctly from packed representation.""" + + self.assertEqual( + 0b111, + screen.HGRBitmap._make_header( + np.uint64(0b0001100000100000000000)) + ) + + # Now check palette bit ends up in right spot + self.assertEqual( + 0b100, + screen.HGRBitmap._make_header( + np.uint64(0b0000000000100000000000)) + ) + + def test_make_footer(self): + """Footer extracted correctly from packed representation.""" + + self.assertEqual( + 0b1110000000000000000000, + screen.HGRBitmap._make_footer( + np.uint64(0b0000000000010000011000)) + ) + + # Now check palette bit ends up in right spot + self.assertEqual( + 0b0010000000000000000000, + screen.HGRBitmap._make_footer( + np.uint64(0b0000000000010000000000)) + ) + + def test_pixel_packing_p0_p0(self): + """Screen byte packing happens correctly with P=0, P=0 palette bits.""" + + # PDCCBBAA + self.main.page_offset[0, 0] = 0b01000011 + # PGGFFEED + self.main.page_offset[0, 1] = 0b01000011 + + hgr = screen.HGRBitmap(main_memory=self.main, palette=Palette.NTSC) + + want = 0b0001000011001000011000 + got = hgr.packed[0, 0] + + self.assertEqual( + want, got, "\n%s\n%s" % (binary(want), binary(got)) + ) + + def test_pixel_packing_p0_p1(self): + """Screen byte packing happens correctly with P=0, P=1 palette bits.""" + + # PDCCBBAA + self.main.page_offset[0, 0] = 0b01000011 + # PGGFFEED + self.main.page_offset[0, 1] = 0b11000011 + + hgr = screen.HGRBitmap(main_memory=self.main, palette=Palette.NTSC) + + want = 0b0001000011101000011000 + got = hgr.packed[0, 0] + + self.assertEqual( + want, got, "\n%s\n%s" % (binary(want), binary(got)) + ) + + def test_pixel_packing_p1_p0(self): + """Screen byte packing happens correctly with P=1, P=0 palette bits.""" + + # PDCCBBAA + self.main.page_offset[0, 0] = 0b11000011 + # PGGFFEED + self.main.page_offset[0, 1] = 0b01000011 + + hgr = screen.HGRBitmap(main_memory=self.main, palette=Palette.NTSC) + + want = 0b0001000011011000011000 + got = hgr.packed[0, 0] + + self.assertEqual( + want, got, "\n%s\n%s" % (binary(want), binary(got)) + ) + + def test_pixel_packing_p1_p1(self): + """Screen byte packing happens correctly with P=1, P=1 palette bits.""" + + # PDCCBBAA + self.main.page_offset[0, 0] = 0b11000011 + # PGGFFEED + self.main.page_offset[0, 1] = 0b11000011 + + hgr = screen.HGRBitmap(main_memory=self.main, palette=Palette.NTSC) + + want = 0b1000011111000011000 + got = hgr.packed[0, 0] + + self.assertEqual( + want, got, "\n%s\n%s" % (binary(want), binary(got)) + ) + + def test_apply(self): + """Test that header, body and footer are placed correctly.""" + hgr = screen.HGRBitmap(main_memory=self.main, palette=Palette.NTSC) + + hgr.apply(0, 0, False, 0b11000011) + hgr.apply(0, 1, False, 0b11000011) + + want = 0b1000011111000011000 + got = hgr.packed[0, 0] + + self.assertEqual( + want, got, "\n%s\n%s" % (binary(want), binary(got)) + ) + + # Now check with 4 consecutive bytes, i.e. even/odd pair plus the + # neighbouring header/footer. + hgr = screen.HGRBitmap(main_memory=self.main, palette=Palette.NTSC) + + hgr.apply(1, 197, False, 128) + hgr.apply(1, 198, False, 143) + hgr.apply(1, 199, False, 192) + hgr.apply(1, 200, False, 128) + + want = 0b0011000000110001111100 + got = hgr.packed[1, 199 // 2] + + self.assertEqual( + want, got, "\n%s\n%s" % (binary(want), binary(got)) + ) + + def test_double_pixels(self): + """Verify behaviour of _double_pixels.""" + + want = 0b111001100110011 + got = screen.HGRBitmap._double_pixels(0b1010101) + + self.assertEqual( + want, got, "\n%s\n%s" % (binary(want), binary(got)) + ) + + def test_to_dots_offset_0(self): + """Verify to_dots behaviour with byte_offset=0""" + + # Header has P=0, Body has P=0 + want = 0b00000000000000000111 + got = screen.HGRBitmap.to_dots(0b00000000000011, 0) + + self.assertEqual( + want, got, "\n%s\n%s" % (binary(want), binary(got)) + ) + + # Header has P=1, body has P=0 - cuts off + want = 0b00000000000000000111 + got = screen.HGRBitmap.to_dots(0b00000000000111, 0) + + self.assertEqual( + want, got, "\n%s\n%s" % (binary(want), binary(got)) + ) + + # Header has P=1, body has P=1 + want = 0b00000000000000001111 + got = screen.HGRBitmap.to_dots(0b00010000000111, 0) + + self.assertEqual( + want, got, "\n%s\n%s" % (binary(want), binary(got)) + ) + + # Header has P=1, body has P=1, footer has P=0 - cuts off body + want = 0b00010011001100111111 + got = screen.HGRBitmap.to_dots(0b00011010101111, 0) + + self.assertEqual( + want, got, "\n%s\n%s" % (binary(want), binary(got)) + ) + + # Header has P=1, body has P=1, footer has P=1 + want = 0b00110011001100111111 + got = screen.HGRBitmap.to_dots(0b00111010101111, 0) + + self.assertEqual( + want, got, "\n%s\n%s" % (binary(want), binary(got)) + ) + + # Header has P=1, body has P=1, footer has P=1 + want = 0b100110011001100111111 + got = screen.HGRBitmap.to_dots(0b10111010101111, 0) + + self.assertEqual( + want, got, "\n%s\n%s" % (binary(want), binary(got)) + ) + + # Header has P=0, body has P=0, footer has P=1 + want = 0b100000000000000000000 + got = screen.HGRBitmap.to_dots(0b10100000000000, 0) + + self.assertEqual( + want, got, "\n%s\n%s" % (binary(want), binary(got)) + ) + + # Header has P=0, body has P=0, footer has P=0 + want = 0b110000000000000000000 + got = screen.HGRBitmap.to_dots(0b10000000000000, 0) + + self.assertEqual( + want, got, "\n%s\n%s" % (binary(want), binary(got)) + ) + + def test_to_dots_offset_1(self): + """Verify to_dots behaviour with byte_offset=1""" + + # Header has P=0, Body has P=0 + want = 0b000000000000000000111 + got = screen.HGRBitmap.to_dots(0b00000000000011, 1) + + self.assertEqual( + want, got, "\n%s\n%s" % (binary(want), binary(got)) + ) + + # Header has P=1, body has P=0 - cuts off + want = 0b000000000000000000111 + got = screen.HGRBitmap.to_dots(0b00000000000111, 1) + + self.assertEqual( + want, got, "\n%s\n%s" % (binary(want), binary(got)) + ) + + # Header has P=1, body has P=1 + want = 0b000000000000000001111 + got = screen.HGRBitmap.to_dots(0b00000000001111, 1) + + self.assertEqual( + want, got, "\n%s\n%s" % (binary(want), binary(got)) + ) + + # Header has P=1, body has P=1, footer has P=0 - cuts off body + want = 0b000010011001100111111 + got = screen.HGRBitmap.to_dots(0b00010101011111, 1) + + self.assertEqual( + want, got, "\n%s\n%s" % (binary(want), binary(got)) + ) + + # Header has P=1, body has P=1, footer has P=1 + want = 0b000110011001100111111 + got = screen.HGRBitmap.to_dots(0b00110101011111, 1) + + self.assertEqual( + want, got, "\n%s\n%s" % (binary(want), binary(got)) + ) + + # Header has P=1, body has P=1, footer has P=1 + want = 0b100110011001100111111 + got = screen.HGRBitmap.to_dots(0b10110101011111, 1) + + self.assertEqual( + want, got, "\n%s\n%s" % (binary(want), binary(got)) + ) + + # Header has P=0, body has P=0, footer has P=1 + want = 0b100000000000000000000 + got = screen.HGRBitmap.to_dots(0b10100000000000, 1) + + self.assertEqual( + want, got, "\n%s\n%s" % (binary(want), binary(got)) + ) + + # Header has P=0, body has P=0, footer has P=0 + want = 0b110000000000000000000 + got = screen.HGRBitmap.to_dots(0b10000000000000, 1) + + self.assertEqual( + want, got, "\n%s\n%s" % (binary(want), binary(got)) + ) + + +class TestNominalColours(unittest.TestCase): + """Tests that screen pixel values produce expected colour sequences.""" + + def setUp(self) -> None: + self.main = screen.MemoryMap(screen_page=1) + + self.maxDiff = None + + def test_nominal_colours(self): + # PDCCBBAA + self.main.page_offset[0, 0] = 0b01010101 + # PGGFFEED + self.main.page_offset[0, 1] = 0b00101010 + # PDCCBBAA + self.main.page_offset[0, 2] = 0b01010101 + + self.hgr = screen.HGRBitmap(main_memory=self.main, palette=Palette.NTSC) + + want = 0b0100101010001010101000 + got = self.hgr.packed[0, 0] + + self.assertEqual( + want, got, "\n%s\n%s" % (binary(want), binary(got)) + ) + + masked = int(screen.HGRBitmap.mask_and_shift_data( + self.hgr.packed[0, 0], byte_offset=0)) + dots = screen.HGRBitmap.to_dots(masked, byte_offset=0) + self.assertEqual( + ( + colours.HGRColours.MAGENTA, + colours.HGRColours.VIOLET, + colours.HGRColours.VIOLET, + colours.HGRColours.VIOLET, + colours.HGRColours.VIOLET, + colours.HGRColours.VIOLET, + colours.HGRColours.VIOLET, + colours.HGRColours.VIOLET, + colours.HGRColours.VIOLET, + colours.HGRColours.VIOLET, + colours.HGRColours.VIOLET, + colours.HGRColours.VIOLET, + colours.HGRColours.VIOLET, + colours.HGRColours.VIOLET, + colours.HGRColours.VIOLET, + colours.HGRColours.VIOLET, + colours.HGRColours.VIOLET, + colours.HGRColours.VIOLET, + ), + colours.dots_to_nominal_colour_pixels( + 18, dots, colours.HGRColours, + init_phase=screen.HGRBitmap.PHASES[0]) + ) + + # Now check byte offset 1 + + masked = int(screen.HGRBitmap.mask_and_shift_data( + self.hgr.packed[0, 0], byte_offset=1)) + dots = screen.HGRBitmap.to_dots(masked, byte_offset=1) + self.assertEqual( + ( + colours.HGRColours.VIOLET, + colours.HGRColours.VIOLET, + colours.HGRColours.VIOLET, + colours.HGRColours.VIOLET, + colours.HGRColours.VIOLET, + colours.HGRColours.VIOLET, + colours.HGRColours.VIOLET, + colours.HGRColours.VIOLET, + colours.HGRColours.VIOLET, + colours.HGRColours.VIOLET, + colours.HGRColours.VIOLET, + colours.HGRColours.VIOLET, + colours.HGRColours.VIOLET, + colours.HGRColours.VIOLET, + colours.HGRColours.VIOLET, + colours.HGRColours.VIOLET, + colours.HGRColours.VIOLET, + colours.HGRColours.VIOLET, + ), + colours.dots_to_nominal_colour_pixels( + 18, dots, colours.HGRColours, + init_phase=screen.HGRBitmap.PHASES[1]) + ) + + # The following tests check for the extended/truncated behaviour across + # byte boundaries when mismatching palette bits. See Figure 8.15 from + # Sather, "Understanding the Apple IIe" + + def test_nominal_colours_sather_even_1(self): + """Extend violet into light blue.""" + + # PDCCBBAA + self.main.page_offset[0, 0] = 0b01000000 + # PGGFFEED + self.main.page_offset[0, 1] = 0b10000000 + + self.hgr = screen.HGRBitmap(main_memory=self.main, palette=Palette.NTSC) + + masked = int(screen.HGRBitmap.mask_and_shift_data( + self.hgr.packed[0, 0], byte_offset=0)) + dots = screen.HGRBitmap.to_dots(masked, byte_offset=0) + + self.assertEqual( + ( + colours.HGRColours.BLACK, + colours.HGRColours.BLACK, + colours.HGRColours.BLACK, + colours.HGRColours.BLACK, + colours.HGRColours.BLACK, + colours.HGRColours.BLACK, + colours.HGRColours.BLACK, + colours.HGRColours.BLACK, + colours.HGRColours.BLACK, + colours.HGRColours.BLACK, + colours.HGRColours.BLACK, + colours.HGRColours.BLACK, + colours.HGRColours.MAGENTA, # 1000 + colours.HGRColours.VIOLET, # 1100 + colours.HGRColours.LIGHT_BLUE, # 1110 + colours.HGRColours.LIGHT_BLUE, # 1110 + colours.HGRColours.MED_BLUE, # 0110 + # last repeated bit from byte 0 + colours.HGRColours.DARK_GREEN, # 0010 + ), + colours.dots_to_nominal_colour_pixels( + 18, dots, colours.HGRColours, + init_phase=screen.HGRBitmap.PHASES[0]) + ) + + def test_nominal_colours_sather_even_2(self): + """Cut off blue with black to produce dark blue.""" + + # PDCCBBAA + self.main.page_offset[0, 0] = 0b11000000 + # PGGFFEED + self.main.page_offset[0, 1] = 0b00000000 + + self.hgr = screen.HGRBitmap(main_memory=self.main, palette=Palette.NTSC) + + masked = int(screen.HGRBitmap.mask_and_shift_data( + self.hgr.packed[0, 0], byte_offset=0)) + dots = screen.HGRBitmap.to_dots(masked, byte_offset=0) + + self.assertEqual( + ( + colours.HGRColours.BLACK, + colours.HGRColours.BLACK, + colours.HGRColours.BLACK, + colours.HGRColours.BLACK, + colours.HGRColours.BLACK, + colours.HGRColours.BLACK, + colours.HGRColours.BLACK, + colours.HGRColours.BLACK, + colours.HGRColours.BLACK, + colours.HGRColours.BLACK, + colours.HGRColours.BLACK, + colours.HGRColours.BLACK, + colours.HGRColours.BLACK, + colours.HGRColours.DARK_BLUE, # 0100 + colours.HGRColours.DARK_BLUE, + colours.HGRColours.DARK_BLUE, + colours.HGRColours.DARK_BLUE, + colours.HGRColours.BLACK, + ), + colours.dots_to_nominal_colour_pixels( + 18, dots, colours.HGRColours, + init_phase=screen.HGRBitmap.PHASES[0]) + ) + + def test_nominal_colours_sather_even_3(self): + """Cut off blue with green to produce aqua.""" + + # PDCCBBAA + self.main.page_offset[0, 0] = 0b11000000 + # PGGFFEED + self.main.page_offset[0, 1] = 0b00000001 + + self.hgr = screen.HGRBitmap(main_memory=self.main, palette=Palette.NTSC) + + masked = int(screen.HGRBitmap.mask_and_shift_data( + self.hgr.packed[0, 0], byte_offset=0)) + dots = screen.HGRBitmap.to_dots(masked, byte_offset=0) + + self.assertEqual( + ( + colours.HGRColours.BLACK, + colours.HGRColours.BLACK, + colours.HGRColours.BLACK, + colours.HGRColours.BLACK, + colours.HGRColours.BLACK, + colours.HGRColours.BLACK, + colours.HGRColours.BLACK, + colours.HGRColours.BLACK, + colours.HGRColours.BLACK, + colours.HGRColours.BLACK, + colours.HGRColours.BLACK, + colours.HGRColours.BLACK, + colours.HGRColours.BLACK, + colours.HGRColours.DARK_BLUE, + colours.HGRColours.MED_BLUE, + colours.HGRColours.AQUA, + colours.HGRColours.AQUA, + colours.HGRColours.GREEN, + ), + colours.dots_to_nominal_colour_pixels( + 18, dots, colours.HGRColours, + init_phase=screen.HGRBitmap.PHASES[0]) + ) + + def test_nominal_colours_sather_even_4(self): + """Cut off white with black to produce pink.""" + + # PDCCBBAA + self.main.page_offset[0, 0] = 0b11100000 + # PGGFFEED + self.main.page_offset[0, 1] = 0b00000000 + + self.hgr = screen.HGRBitmap(main_memory=self.main, palette=Palette.NTSC) + + masked = int(screen.HGRBitmap.mask_and_shift_data( + self.hgr.packed[0, 0], byte_offset=0)) + dots = screen.HGRBitmap.to_dots(masked, byte_offset=0) + + self.assertEqual( + ( + colours.HGRColours.BLACK, + colours.HGRColours.BLACK, + colours.HGRColours.BLACK, + colours.HGRColours.BLACK, + colours.HGRColours.BLACK, + colours.HGRColours.BLACK, + colours.HGRColours.BLACK, + colours.HGRColours.BLACK, + colours.HGRColours.BLACK, + colours.HGRColours.BLACK, + colours.HGRColours.BLACK, + colours.HGRColours.BROWN, + colours.HGRColours.ORANGE, + colours.HGRColours.PINK, + colours.HGRColours.PINK, + colours.HGRColours.VIOLET, + colours.HGRColours.DARK_BLUE, + colours.HGRColours.BLACK, + ), + colours.dots_to_nominal_colour_pixels( + 18, dots, colours.HGRColours, + init_phase=screen.HGRBitmap.PHASES[0]) + ) + + def test_nominal_colours_sather_even_5(self): + """Cut off orange-black with green to produce bright green. + + "Bright" here is because the sequence of pixels has high intensity + Orange-Orange-Yellow-Yellow-Green-Green.""" + + # PDCCBBAA + self.main.page_offset[0, 0] = 0b10100000 + # PGGFFEED + self.main.page_offset[0, 1] = 0b00000001 + + self.hgr = screen.HGRBitmap(main_memory=self.main, palette=Palette.NTSC) + + masked = int(screen.HGRBitmap.mask_and_shift_data( + self.hgr.packed[0, 0], byte_offset=0)) + dots = screen.HGRBitmap.to_dots(masked, byte_offset=0) + + self.assertEqual( + ( + colours.HGRColours.BLACK, + colours.HGRColours.BLACK, + colours.HGRColours.BLACK, + colours.HGRColours.BLACK, + colours.HGRColours.BLACK, + colours.HGRColours.BLACK, + colours.HGRColours.BLACK, + colours.HGRColours.BLACK, + colours.HGRColours.BLACK, + colours.HGRColours.BLACK, + colours.HGRColours.BLACK, + colours.HGRColours.BROWN, # 0001 + colours.HGRColours.ORANGE, # 1001 + colours.HGRColours.ORANGE, # 1001 + colours.HGRColours.YELLOW, # 1011 + colours.HGRColours.YELLOW, # 1011 + colours.HGRColours.GREEN, # 0011 + colours.HGRColours.GREEN, # 0011 + ), + colours.dots_to_nominal_colour_pixels( + 18, dots, colours.HGRColours, + init_phase=screen.HGRBitmap.PHASES[0]) + ) + + def test_nominal_colours_sather_odd_1(self): + """Extend green into light brown.""" + + # PDCCBBAA + self.main.page_offset[0, 1] = 0b01000000 + # PGGFFEED + self.main.page_offset[0, 2] = 0b10000000 + + self.hgr = screen.HGRBitmap(main_memory=self.main, palette=Palette.NTSC) + + masked = int(screen.HGRBitmap.mask_and_shift_data( + self.hgr.packed[0, 0], byte_offset=1)) + dots = screen.HGRBitmap.to_dots(masked, byte_offset=1) + + self.assertEqual( + ( + colours.HGRColours.BLACK, + colours.HGRColours.BLACK, + colours.HGRColours.BLACK, + colours.HGRColours.BLACK, + colours.HGRColours.BLACK, + colours.HGRColours.BLACK, + colours.HGRColours.BLACK, + colours.HGRColours.BLACK, + colours.HGRColours.BLACK, + colours.HGRColours.BLACK, + colours.HGRColours.BLACK, + colours.HGRColours.BLACK, + colours.HGRColours.DARK_GREEN, + colours.HGRColours.GREEN, + colours.HGRColours.YELLOW, + colours.HGRColours.YELLOW, + colours.HGRColours.ORANGE, + colours.HGRColours.MAGENTA, + ), + colours.dots_to_nominal_colour_pixels( + 18, dots, colours.HGRColours, + init_phase=screen.HGRBitmap.PHASES[1]) + ) + + def test_nominal_colours_sather_odd_2(self): + """Cut off orange with black to produce dark brown.""" + + # PDCCBBAA + self.main.page_offset[0, 1] = 0b11000000 + # PGGFFEED + self.main.page_offset[0, 2] = 0b00000000 + + self.hgr = screen.HGRBitmap(main_memory=self.main, palette=Palette.NTSC) + + masked = int(screen.HGRBitmap.mask_and_shift_data( + self.hgr.packed[0, 0], byte_offset=1)) + dots = screen.HGRBitmap.to_dots(masked, byte_offset=1) + + self.assertEqual( + ( + colours.HGRColours.BLACK, + colours.HGRColours.BLACK, + colours.HGRColours.BLACK, + colours.HGRColours.BLACK, + colours.HGRColours.BLACK, + colours.HGRColours.BLACK, + colours.HGRColours.BLACK, + colours.HGRColours.BLACK, + colours.HGRColours.BLACK, + colours.HGRColours.BLACK, + colours.HGRColours.BLACK, + colours.HGRColours.BLACK, + colours.HGRColours.BLACK, + colours.HGRColours.BROWN, + colours.HGRColours.BROWN, + colours.HGRColours.BROWN, + colours.HGRColours.BROWN, + colours.HGRColours.BLACK, + ), + colours.dots_to_nominal_colour_pixels( + 18, dots, colours.HGRColours, + init_phase=screen.HGRBitmap.PHASES[1]) + ) + + def test_nominal_colours_sather_odd_3(self): + """Cut off orange with violet to produce pink.""" + + # PDCCBBAA + self.main.page_offset[0, 1] = 0b11000000 + # PGGFFEED + self.main.page_offset[0, 2] = 0b00000001 + + self.hgr = screen.HGRBitmap(main_memory=self.main, palette=Palette.NTSC) + + masked = int(screen.HGRBitmap.mask_and_shift_data( + self.hgr.packed[0, 0], byte_offset=1)) + dots = screen.HGRBitmap.to_dots(masked, byte_offset=1) + + self.assertEqual( + ( + colours.HGRColours.BLACK, + colours.HGRColours.BLACK, + colours.HGRColours.BLACK, + colours.HGRColours.BLACK, + colours.HGRColours.BLACK, + colours.HGRColours.BLACK, + colours.HGRColours.BLACK, + colours.HGRColours.BLACK, + colours.HGRColours.BLACK, + colours.HGRColours.BLACK, + colours.HGRColours.BLACK, + colours.HGRColours.BLACK, + colours.HGRColours.BLACK, + colours.HGRColours.BROWN, + colours.HGRColours.ORANGE, + colours.HGRColours.PINK, + colours.HGRColours.PINK, + colours.HGRColours.VIOLET, + ), + colours.dots_to_nominal_colour_pixels( + 18, dots, colours.HGRColours, + init_phase=screen.HGRBitmap.PHASES[1]) + ) + + def test_nominal_colours_sather_odd_4(self): + """Cut off white with black to produce aqua.""" + + # PDCCBBAA + self.main.page_offset[0, 1] = 0b11100000 + # PGGFFEED + self.main.page_offset[0, 2] = 0b00000000 + + self.hgr = screen.HGRBitmap(main_memory=self.main, palette=Palette.NTSC) + + masked = int(screen.HGRBitmap.mask_and_shift_data( + self.hgr.packed[0, 0], byte_offset=1)) + dots = screen.HGRBitmap.to_dots(masked, byte_offset=1) + + self.assertEqual( + ( + colours.HGRColours.BLACK, + colours.HGRColours.BLACK, + colours.HGRColours.BLACK, + colours.HGRColours.BLACK, + colours.HGRColours.BLACK, + colours.HGRColours.BLACK, + colours.HGRColours.BLACK, + colours.HGRColours.BLACK, + colours.HGRColours.BLACK, + colours.HGRColours.BLACK, + colours.HGRColours.BLACK, + colours.HGRColours.DARK_BLUE, + colours.HGRColours.MED_BLUE, + colours.HGRColours.AQUA, + colours.HGRColours.AQUA, + colours.HGRColours.GREEN, + colours.HGRColours.BROWN, + colours.HGRColours.BLACK, + ), + colours.dots_to_nominal_colour_pixels( + 18, dots, colours.HGRColours, + init_phase=screen.HGRBitmap.PHASES[1]) + ) + + def test_nominal_colours_sather_odd_5(self): + """Cut off blue-black with violet to produce bright violet. + + "Bright" here is because the sequence of pixels has high intensity + Blue-Blue-Light Blue-Light Blue-Violet-Violet. + """ + + # PDCCBBAA + self.main.page_offset[0, 1] = 0b10100000 + # PGGFFEED + self.main.page_offset[0, 2] = 0b00000001 + + self.hgr = screen.HGRBitmap(main_memory=self.main, palette=Palette.NTSC) + + masked = int(screen.HGRBitmap.mask_and_shift_data( + self.hgr.packed[0, 0], byte_offset=1)) + dots = screen.HGRBitmap.to_dots(masked, byte_offset=1) + + self.assertEqual( + ( + colours.HGRColours.BLACK, + colours.HGRColours.BLACK, + colours.HGRColours.BLACK, + colours.HGRColours.BLACK, + colours.HGRColours.BLACK, + colours.HGRColours.BLACK, + colours.HGRColours.BLACK, + colours.HGRColours.BLACK, + colours.HGRColours.BLACK, + colours.HGRColours.BLACK, + colours.HGRColours.BLACK, + colours.HGRColours.DARK_BLUE, + colours.HGRColours.MED_BLUE, + colours.HGRColours.MED_BLUE, + colours.HGRColours.LIGHT_BLUE, + colours.HGRColours.LIGHT_BLUE, + colours.HGRColours.VIOLET, + colours.HGRColours.VIOLET + ), + colours.dots_to_nominal_colour_pixels( + 18, dots, colours.HGRColours, + init_phase=screen.HGRBitmap.PHASES[1]) + ) + + +if __name__ == '__main__': + unittest.main() diff --git a/transcoder/video.py b/transcoder/video.py index 4214719..b8ffd12 100644 --- a/transcoder/video.py +++ b/transcoder/video.py @@ -1,56 +1,38 @@ """Encode a sequence of images as an optimized stream of screen changes.""" -import enum import heapq -import os -import queue import random -import subprocess -import threading from typing import List, Iterator, Tuple -# import hitherdither import numpy as np -import skvideo.io -from PIL import Image -import edit_distance import opcodes import screen - - -class Mode(enum.Enum): - HGR = 0 - DHGR = 1 +from frame_grabber import FrameGrabber +from palette import Palette +from video_mode import VideoMode class Video: - """Apple II screen memory map encoding a bitmapped frame.""" + """Encodes sequence of images into prioritized screen byte changes.""" CLOCK_SPEED = 1024 * 1024 # type: int def __init__( self, - filename: str, + frame_grabber: FrameGrabber, ticks_per_second: float, - mode: Mode = Mode.HGR, + mode: VideoMode = VideoMode.HGR, + palette: Palette = Palette.NTSC ): - self.filename = filename # type: str - self.mode = mode # type: Mode + self.mode = mode # type: VideoMode + self.frame_grabber = frame_grabber # type: FrameGrabber self.ticks_per_second = ticks_per_second # type: float - - self._reader = skvideo.io.FFmpegReader(filename) - - # Compute frame rate from input video - # TODO: possible to compute time offset for each frame instead? - data = skvideo.io.ffprobe(self.filename)['video'] - rate_data = data['@r_frame_rate'].split("/") # e.g. 12000/1001 - self.input_frame_rate = float( - rate_data[0]) / float(rate_data[1]) # type: float - self.ticks_per_frame = ( - self.ticks_per_second / self.input_frame_rate) # type: float + self.ticks_per_second / frame_grabber.input_frame_rate + ) # type: float self.frame_number = 0 # type: int + self.palette = palette # type: Palette # Initialize empty screen self.memory_map = screen.MemoryMap( @@ -59,181 +41,88 @@ class Video: self.aux_memory_map = screen.MemoryMap( screen_page=1) # type: screen.MemoryMap + self.pixelmap = screen.DHGRBitmap( + palette=palette, + main_memory=self.memory_map, + aux_memory=self.aux_memory_map + ) + else: + self.pixelmap = screen.HGRBitmap( + palette=palette, + main_memory=self.memory_map, + ) + # Accumulates pending edit weights across frames - self.update_priority = np.zeros((32, 256), dtype=np.int64) + self.update_priority = np.zeros((32, 256), dtype=np.int) if self.mode == mode.DHGR: - self.aux_update_priority = np.zeros((32, 256), dtype=np.int64) + self.aux_update_priority = np.zeros((32, 256), dtype=np.int) def tick(self, ticks: int) -> bool: + """Keep track of when it is time for a new image frame.""" + if ticks >= (self.ticks_per_frame * self.frame_number): self.frame_number += 1 return True return False - def _frame_grabber(self) -> Iterator[Image.Image]: - for frame_array in self._reader.nextFrame(): - yield Image.fromarray(frame_array) - - @staticmethod - def _rgb(r, g, b): - return (r << 16) + (g << 8) + b - - # def dither_framesframes(self) -> Iterator[screen.MemoryMap]: - # palette = hitherdither.palette.Palette( - # [ - # self._rgb(0,0,0), # black */ - # self._rgb(148,12,125), # red - hgr 0*/ - # self._rgb(32,54,212), # dk blue - hgr 0 */ - # self._rgb(188,55,255), # purple - default HGR overlay color */ - # self._rgb(51,111,0), # dk green - hgr 0 */ - # self._rgb(126,126,126), # gray - hgr 0 */ - # self._rgb(7,168,225), # med blue - alternate HGR overlay - # # color */ - # self._rgb(158,172,255), # lt blue - hgr 0 */ - # self._rgb(99,77,0), # brown - hgr 0 */ - # self._rgb(249,86,29), # orange */ - # self._rgb(126,126,126), # grey - hgr 0 */ - # self._rgb(255,129,236), # pink - hgr 0 */ - # self._rgb(67,200,0), # lt green */ - # self._rgb(221,206,23), # yellow - hgr 0 */ - # self._rgb(93,248,133), # aqua - hgr 0 */ - # self._rgb(255,255,255) # white - # ] - # ) - # for _idx, _frame in enumerate(self._frame_grabber()): - # if _idx % 60 == 0: - # img_dithered = hitherdither.ordered.yliluoma.yliluomas_1_ordered_dithering( - # _frame.resize((280,192), resample=Image.NEAREST), - # palette, order=8) - # - # yield img_dithered - - def frames(self) -> Iterator[screen.MemoryMap]: - """Encode frame to HGR using bmp2dhr. - - We do the encoding in a background thread to parallelize. - """ - - frame_dir = self.filename.split(".")[0] - try: - os.mkdir(frame_dir) - except FileExistsError: - pass - - q = queue.Queue(maxsize=10) - - def _hgr_decode(_idx, _frame): - outfile = "%s/%08dC.BIN" % (frame_dir, _idx) - bmpfile = "%s/%08d.bmp" % (frame_dir, _idx) - - try: - os.stat(outfile) - except FileNotFoundError: - _frame = _frame.resize((280, 192), resample=Image.LANCZOS) - _frame.save(bmpfile) - - # TODO: parametrize palette - subprocess.call([ - "/usr/local/bin/bmp2dhr", bmpfile, "hgr", - "P0", # Kegs32 RGB Color palette(for //gs playback) - "D9" # Buckels dither - ]) - - os.remove(bmpfile) - - _main = np.fromfile(outfile, dtype=np.uint8) - - return _main, None - - def _dhgr_decode(_idx, _frame): - mainfile = "%s/%08d.BIN" % (frame_dir, _idx) - auxfile = "%s/%08d.AUX" % (frame_dir, _idx) - - bmpfile = "%s/%08d.bmp" % (frame_dir, _idx) - - try: - os.stat(mainfile) - os.stat(auxfile) - except FileNotFoundError: - _frame = _frame.resize((280, 192), resample=Image.LANCZOS) - _frame.save(bmpfile) - - # TODO: parametrize palette - subprocess.call([ - "/usr/local/bin/bmp2dhr", bmpfile, "dhgr", - "P0", # Kegs32 RGB Color palette (for //gs playback) - "A", # Output separate .BIN and .AUX files - "D9" # Buckels dither - ]) - - os.remove(bmpfile) - - _main = np.fromfile(mainfile, dtype=np.uint8) - _aux = np.fromfile(auxfile, dtype=np.uint8) - - return _main, _aux - - def worker(): - """Invoke bmp2dhr to encode input image frames and push to queue.""" - for _idx, _frame in enumerate(self._frame_grabber()): - if self.mode == Mode.DHGR: - res = _dhgr_decode(_idx, _frame) - else: - res = _hgr_decode(_idx, _frame) - q.put(res) - - q.put((None, None)) - - t = threading.Thread(target=worker, daemon=True) - t.start() - - while True: - - main, aux = q.get() - if main is None: - break - - main_map = screen.FlatMemoryMap( - screen_page=1, data=main).to_memory_map() - if aux is None: - aux_map = None - else: - aux_map = screen.FlatMemoryMap( - screen_page=1, data=aux).to_memory_map() - yield (main_map, aux_map) - q.task_done() - - t.join() - def encode_frame( - self, target: screen.MemoryMap, - memory_map: screen.MemoryMap, - update_priority: np.array, + self, + target: screen.MemoryMap, + is_aux: bool, ) -> Iterator[opcodes.Opcode]: - """Update to match content of frame within provided budget.""" + """Converge towards target frame in priority order of edit distance.""" + + if is_aux: + memory_map = self.aux_memory_map + update_priority = self.aux_update_priority + else: + memory_map = self.memory_map + update_priority = self.update_priority + + # Make sure nothing is leaking into screen holes + assert np.count_nonzero( + memory_map.page_offset[screen.SCREEN_HOLES]) == 0 print("Similarity %f" % (update_priority.mean())) - yield from self._index_changes(memory_map, target, update_priority) + + yield from self._index_changes( + memory_map, target, update_priority, is_aux) def _index_changes( self, source: screen.MemoryMap, target: screen.MemoryMap, - update_priority: np.array + update_priority: np.array, + is_aux: True ) -> Iterator[Tuple[int, int, List[int]]]: """Transform encoded screen to sequence of change tuples.""" - diff_weights = self._diff_weights(source, target) + if self.mode == VideoMode.DHGR: + if is_aux: + target_pixelmap = screen.DHGRBitmap( + main_memory=self.memory_map, + aux_memory=target, + palette=self.palette + ) + else: + target_pixelmap = screen.DHGRBitmap( + main_memory=target, + aux_memory=self.aux_memory_map, + palette=self.palette + ) + else: + target_pixelmap = screen.HGRBitmap( + main_memory=target, + palette=self.palette + ) + + diff_weights = target_pixelmap.diff_weights(self.pixelmap, is_aux) + # Don't bother storing into screen holes + diff_weights[screen.SCREEN_HOLES] = 0 # Clear any update priority entries that have resolved themselves # with new frame update_priority[diff_weights == 0] = 0 - - # Halve existing weights to increase bias to new diffs. - # In particular this means that existing updates with diff 1 will - # become diff 0, i.e. will only be prioritized if they are still - # diffs in the new frame. - # self.update_priority >>= 1 update_priority += diff_weights priorities = self._heapify_priorities(update_priority) @@ -241,7 +130,12 @@ class Video: content_deltas = {} while priorities: - _, _, page, offset = heapq.heappop(priorities) + pri, _, page, offset = heapq.heappop(priorities) + + assert not screen.SCREEN_HOLES[page, offset], ( + "Attempted to store into screen hole at (%d, %d)" % ( + page, offset)) + # Check whether we've already cleared this diff while processing # an earlier opcode if update_priority[page, offset] == 0: @@ -249,100 +143,139 @@ class Video: offsets = [offset] content = target.page_offset[page, offset] + if self.mode == VideoMode.DHGR: + # DHGR palette bit not expected to be set + assert content < 0x80 # Clear priority for the offset we're emitting update_priority[page, offset] = 0 - source.page_offset[page, offset] = content diff_weights[page, offset] = 0 + # Update memory maps + source.page_offset[page, offset] = content + self.pixelmap.apply(page, offset, is_aux, content) + # Make sure we don't emit this offset as a side-effect of some # other offset later. for cd in content_deltas.values(): cd[page, offset] = 0 + # TODO: what if we add another content_deltas entry later? + # We might clobber it again # Need to find 3 more offsets to fill this opcode - for o in self._compute_error( + for err, o in self._compute_error( page, content, - target, + target_pixelmap, diff_weights, - content_deltas + content_deltas, + is_aux ): - offsets.append(o) + assert o != offset + assert not screen.SCREEN_HOLES[page, o], ( + "Attempted to store into screen hole at (%d, %d)" % ( + page, o)) - # Compute new edit distance between new content and target - # byte, so we can reinsert with this value - p = edit_distance.edit_weight( - content, target.page_offset[page, o], o % 2 == 1, - error=False) + if update_priority[page, o] == 0: + # Someone already resolved this diff. + continue + + # Make sure we don't end up considering this (page, offset) + # again until the next image frame. Even if a better match + # comes along, it's probably better to fix up some other byte. + # TODO: or should we recompute it with new error? + for cd in content_deltas.values(): + cd[page, o] = 0 + + byte_offset = target_pixelmap.byte_offset(o, is_aux) + old_packed = target_pixelmap.packed[page, o // 2] + + p = target_pixelmap.byte_pair_difference( + byte_offset, old_packed, content) # Update priority for the offset we're emitting - update_priority[page, o] = p # 0 + update_priority[page, o] = p source.page_offset[page, o] = content + self.pixelmap.apply(page, o, is_aux, content) if p: # This content byte introduced an error, so put back on the # heap in case we can get back to fixing it exactly # during this frame. Otherwise we'll get to it later. heapq.heappush( - priorities, (-p, random.random(), page, offset)) + priorities, (-p, random.getrandbits(8), page, o)) + + offsets.append(o) + if len(offsets) == 3: + break # Pad to 4 if we didn't find enough for _ in range(len(offsets), 4): offsets.append(offsets[0]) - yield (page + 32, content, offsets) + # # TODO: there is still a bug causing residual diffs when we have + # # apparently run out of work to do + if not np.array_equal(source.page_offset, target.page_offset): + diffs = np.nonzero(source.page_offset != target.page_offset) + for i in range(len(diffs[0])): + diff_p = diffs[0][i] + diff_o = diffs[1][i] + + # For HGR, 0x00 or 0x7f may be visually equivalent to the same + # bytes with high bit set (depending on neighbours), so skip + # them + if (source.page_offset[diff_p, diff_o] & 0x7f) == 0 and \ + (target.page_offset[diff_p, diff_o] & 0x7f) == 0: + continue + + if (source.page_offset[diff_p, diff_o] & 0x7f) == 0x7f and \ + (target.page_offset[diff_p, diff_o] & 0x7f) == 0x7f: + continue + + print("Diff at (%d, %d): %d != %d" % ( + diff_p, diff_o, source.page_offset[diff_p, diff_o], + target.page_offset[diff_p, diff_o] + )) + # assert False + # If we run out of things to do, pad forever - content = target.page_offset[(0, 0)] + content = target.page_offset[0, 0] while True: yield (32, content, [0, 0, 0, 0]) @staticmethod - def _diff_weights( - source: screen.MemoryMap, - target: screen.MemoryMap - ): - return edit_distance.screen_edit_distance( - source.page_offset, target.page_offset) - - def _heapify_priorities(self, update_priority: np.array) -> List: - priorities = [] - it = np.nditer(update_priority, flags=['multi_index']) - while not it.finished: - priority = it[0] - if not priority: - it.iternext() - continue - - page, offset = it.multi_index + def _heapify_priorities(update_priority: np.array) -> List: + """Build priority queue of (page, offset) ordered by update priority.""" + # Use numpy vectorization to efficiently compute the list of + # (priority, random nonce, page, offset) tuples to be heapified. + pages, offsets = update_priority.nonzero() + priorities = [tuple(data) for data in np.stack(( + -update_priority[pages, offsets], # Don't use deterministic order for page, offset - nonce = random.random() - priorities.append((-priority, nonce, page, offset)) - it.iternext() + np.random.randint(0, 2 ** 8, size=pages.shape[0]), + pages, + offsets) + ).T.tolist()] heapq.heapify(priorities) return priorities - @staticmethod - def _compute_delta(content, target, old): - """ - This function is the critical path for the video encoding. - """ - return edit_distance.byte_screen_error_distance(content, target) - old - _OFFSETS = np.arange(256) - def _compute_error(self, page, content, target, old_error, content_deltas): - offsets = [] + def _compute_error(self, page, content, target_pixelmap, diff_weights, + content_deltas, is_aux): + """Build priority queue of other offsets at which to store content. + Ordered by offsets which are closest to the target content value. + """ # TODO: move this up into parent delta_screen = content_deltas.get(content) if delta_screen is None: - delta_screen = self._compute_delta( - content, target.page_offset, old_error) + delta_screen = target_pixelmap.compute_delta( + content, diff_weights, is_aux) content_deltas[content] = delta_screen delta_page = delta_screen[page] @@ -350,23 +283,15 @@ class Video: candidate_offsets = self._OFFSETS[cond] priorities = delta_page[cond] - l = [ - (priorities[i], random.random(), candidate_offsets[i]) + deltas = [ + (priorities[i], random.getrandbits(8), candidate_offsets[i]) for i in range(len(candidate_offsets)) ] - heapq.heapify(l) + heapq.heapify(deltas) - while l: - _, _, o = heapq.heappop(l) - offsets.append(o) + while deltas: + pri, _, o = heapq.heappop(deltas) + assert pri < 0 + assert o <= 255 - # Make sure we don't end up considering this (page, offset) again - # until the next image frame. Even if a better match comes along, - # it's probably better to fix up some other byte. - for cd in content_deltas.values(): - cd[page, o] = 0 - - if len(offsets) == 3: - break - - return offsets + yield -pri, o diff --git a/transcoder/video_mode.py b/transcoder/video_mode.py new file mode 100644 index 0000000..a243088 --- /dev/null +++ b/transcoder/video_mode.py @@ -0,0 +1,8 @@ +"""Enum representing video encoding mode.""" + +import enum + + +class VideoMode(enum.Enum): + HGR = 0 # Hi-Res + DHGR = 1 # Double Hi-Res diff --git a/transcoder/video_test.py b/transcoder/video_test.py new file mode 100644 index 0000000..eefa04f --- /dev/null +++ b/transcoder/video_test.py @@ -0,0 +1,83 @@ +"""Tests for the video module.""" + +import unittest + +import frame_grabber +import palette +import screen +import video +import video_mode + + +class TestVideo(unittest.TestCase): + def test_diff_weights(self): + fs = frame_grabber.FrameGrabber(mode=video_mode.VideoMode.DHGR) + v = video.Video( + fs, ticks_per_second=10000., + mode=video_mode.VideoMode.DHGR) + + frame = screen.MemoryMap(screen_page=1) + frame.page_offset[0, 0] = 0b1111111 + frame.page_offset[0, 1] = 0b1010101 + + target_pixelmap = screen.DHGRBitmap( + palette=palette.Palette.NTSC, + main_memory=v.memory_map, + aux_memory=frame + ) + self.assertEqual( + 0b0000000000101010100000001111111000, + target_pixelmap.packed[0, 0]) + + pal = palette.NTSCPalette + + diff = target_pixelmap.diff_weights(v.pixelmap, is_aux=True) + + # Expect byte 0 to map to 0b0001111111000 + expect0 = target_pixelmap.edit_distances(pal.ID)[0][0b0001111111000] + + # Expect byte 2 to map to 0b0001010101000 + expect2 = target_pixelmap.edit_distances(pal.ID)[2][0b0001010101000] + + self.assertEqual(expect0, diff[0, 0]) + self.assertEqual(expect2, diff[0, 1]) + + # Update aux frame + v.aux_memory_map.page_offset = frame.page_offset + v.pixelmap._pack() + self.assertEqual( + 0b0000000000101010100000001111111000, + v.pixelmap.packed[0, 0] + ) + + # Encode new aux frame + frame = screen.MemoryMap(screen_page=1) + frame.page_offset[0, 0] = 0b1101101 + frame.page_offset[0, 1] = 0b0110110 + + target_pixelmap = screen.DHGRBitmap( + main_memory=v.memory_map, + aux_memory=frame, + palette=pal.ID + ) + self.assertEqual( + 0b0000000000011011000000001101101000, + target_pixelmap.packed[0, 0] + ) + + diff = target_pixelmap.diff_weights(v.pixelmap, is_aux=True) + + # Masked offset 0 changes from 0001111111000 to 0001101101000 + expect0 = target_pixelmap.edit_distances(pal.ID)[0][ + 0b00011111110000001101101000] + + # Masked offset 2 changes from 0001010101000 to 0000110110000 + expect2 = target_pixelmap.edit_distances(pal.ID)[2][ + 0b00010101010000000110110000] + + self.assertEqual(expect0, diff[0, 0]) + self.assertEqual(expect2, diff[0, 1]) + + +if __name__ == '__main__': + unittest.main()