Merge pull request #2 from KrisKennaway/dhgr

Dhgr
This commit is contained in:
KrisKennaway 2019-07-11 23:56:51 +01:00 committed by GitHub
commit 326ca62075
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
24 changed files with 3215 additions and 881 deletions

View File

@ -58,6 +58,7 @@ zpdummy = $08
dummy = $ffff
ptr = $06 ; TODO: we only use this for connection retry count
HGRZP = $E6 ; ZP location used by HGR internals to track page to clear
; soft-switches
KBD = $C000
@ -74,7 +75,7 @@ DHIRESON = $C05E
; MONITOR SUBROUTINES
HGR = $F3E2
HGR0 = $F3EA ; internal entry point within HGR that doesn't set soft-switches
HGR0 = $F3F2 ; internal entry point within HGR that doesn't set soft-switches
COUT = $FDED
PRBYTE = $FDDA
PRNTAX = $F941
@ -311,30 +312,35 @@ op_header:
; Initialize (D)HGR in the CODE segment so we don't accidentally toast ourselves when
; erasing HGR
_op_header_hgr:
; Co-opt HGR internals to clear screen without displaying it.
; nukes the startup code we placed in HGR segment
STA HIRESON
STA FULLSCR
LDA #$20
STA HGRZP ; ZP location used by HGR to track page to clear
JSR HGR0
LDA WDATA ; Video mode
BEQ @1 ; 0 = HGR mode
; TODO: clear screen before displaying it to look cleaner
; DHGR mode
STA TEXTOFF
STA HIRESON
STA DHIRESON
STA COL80ON
STA STORE80ON
; Clear aux screen
STA PAGE2ON ; AUX memory active
; Co-opt HGR internals to clear AUX for us.
LDA #$20
STA HGRZP
JSR HGR0
STA PAGE2OFF ; MAIN memory active
STA TEXTOFF ; now display empty (D)HGR screen. Doing this before the next instruction to make sure we don't see 80-column text garbage momentarily.
STA COL80ON
STA DHIRESON
@1:
JSR HGR ; nukes the startup code we placed in HGR segment
STA FULLSCR
STA TEXTOFF ; now display empty HGR screen (NOP for DHGR since we've already done it)
; establish invariants expected by decode loop
LDY #>RXBASE ; High byte of socket 0 receive buffer
@ -391,7 +397,7 @@ _op_header_hgr:
; Y register has the high byte of the W5100 address pointer in the RX socket code, so we
; can't trash this until we are ready to point back there.
checkrecv:
BIT TICK ; 4
STA TICK ; 4
LDA #<S0RXRSR ; 2 Socket 0 Received Size register
STA WADRL ; 4
@ -434,7 +440,7 @@ recv: ; 15 cycles so far
; X will usually already be 0 from op_ack except during first frame when reading
; header but reset it unconditionally
LDX #$00 ; 2
BIT TICK ; 4 ; 36
STA TICK ; 4 ; 36
NOP ; 2
STA dummy ; 4
@ -458,7 +464,7 @@ op_nop:
; - read 2 bytes from the stream as address of next opcode
;
; Each opcode has 6 cycles of padding, which is necessary to support reordering things to
; get the second "BIT TICK" at the right cycle offset.
; get the second "STA TICK" at the right cycle offset.
;
; Where possible we share code by JMPing to a common tail instruction sequence in one of the
; earlier opcodes. This is critical for reducing code size enough to fit.
@ -474,8 +480,8 @@ op_nop:
.macro op_tick_4 page
;4+(4)+2+4+4+4+5+4+5+4+5+4+5+4+4+4+4+3=73
.ident (.concat ("op_tick_4_page_", .string(page))):
BIT TICK ; 4
BIT TICK ; 4
STA TICK ; 4
STA TICK ; 4
STA zpdummy ; 3
STA zpdummy ; 3
@ -518,9 +524,9 @@ tickident page, 7
.macro op_tick_6 page
;4+(2+4)+3+4+4+5+4+5+4+5+4+5+4+4+4+5+3
.ident (.concat ("op_tick_6_page_", .string(page))):
BIT TICK ; 4
STA TICK ; 4
NOP ; 2
BIT TICK ; 4
STA TICK ; 4
STA zpdummy ; 3
@ -563,9 +569,9 @@ tickident page, 8
.macro op_tick_8 page
;4+(4+4)+3+3+55
.ident (.concat ("op_tick_8_page_", .string(page))):
BIT TICK ; 4
STA TICK ; 4
LDA WDATA ; 4
BIT TICK ; 4
STA TICK ; 4
STA zpdummy ; 3
JMP .ident(.concat("_op_tick_page_", .string(page), "_tail_55")) ; 3 + 55
@ -574,10 +580,10 @@ tickident page, 8
.macro op_tick_10 page
;4+(4+2+4)+3+56
.ident (.concat ("op_tick_10_page_", .string(page))):
BIT TICK ; 4
STA TICK ; 4
LDA WDATA ; 4
NOP ; 2
BIT TICK ; 4
STA TICK ; 4
JMP .ident(.concat("_op_tick_page_", .string(page), "_tail_56")) ; 3 + 56
.endmacro
@ -585,10 +591,10 @@ tickident page, 8
.macro op_tick_12 page
;4+(4+4+4)+3+3+51
.ident (.concat ("op_tick_12_page_", .string(page))):
BIT TICK ; 4
STA TICK ; 4
LDA WDATA ; 4
LDY WDATA ; 4
BIT TICK ; 4
STA TICK ; 4
STA zpdummy ; 3
JMP .ident(.concat("_op_tick_page_", .string(page), "_tail_51")) ; 3 + 51
@ -597,11 +603,11 @@ tickident page, 8
.macro op_tick_14 page
;4+(4+4+2+4)+3+52
.ident (.concat ("op_tick_14_page_", .string(page))):
BIT TICK ; 4
STA TICK ; 4
LDA WDATA ; 4
LDY WDATA ; 4
NOP ; 2
BIT TICK ; 4
STA TICK ; 4
JMP .ident(.concat("_op_tick_page_", .string(page), "_tail_52")) ; 3+52
.endmacro
@ -609,14 +615,14 @@ tickident page, 8
.macro op_tick_16 page
; 4+(4+4+4+4)+5+2+3+43
.ident (.concat ("op_tick_16_page_", .string(page))):
BIT TICK ; 4
STA TICK ; 4
LDA WDATA ; 4
; This temporarily violates X=0 invariant required by tick_6, but lets us share a
; common opcode tail; otherwise we need a dummy 4-cycle opcode between the ticks, which
; doesn't leave enough to JMP with.
LDX WDATA ; 4
LDY WDATA ; 4
BIT TICK ; 4
STA TICK ; 4
STA page << 8,x ; 5
LDX #$00 ; 2 restore X=0 invariant
@ -627,14 +633,14 @@ tickident page, 8
.macro op_tick_18 page
; 4 + (4+4+4+2+4)+5+5+2+2+4+5+4+5+4+4+4+4+3
.ident (.concat ("op_tick_18_page_", .string(page))):
BIT TICK ; 4
STA TICK ; 4
LDA WDATA ; 4
LDY WDATA ; 4
; lets us reorder the 5-cycle STA page << 8,y outside of tick loop.
; This temporarily violates X=0 invariant required by tick_6
LDX WDATA ; 4
NOP ; 2
BIT TICK ; 4
STA TICK ; 4
STA page << 8,Y ; 5
STA page << 8,X ; 5
@ -661,12 +667,12 @@ tickident page, 8
.macro op_tick_20 page
;4+(4+4+5+3+4)+3+46=73
.ident (.concat ("op_tick_20_page_", .string(page))):
BIT TICK ; 4
STA TICK ; 4
LDA WDATA ; 4
LDY WDATA ; 4
STA page << 8,Y ; 5
STA zpdummy ; 3
BIT TICK ; 4
STA TICK ; 4
JMP .ident(.concat("_op_tick_page_", .string(page), "_tail_46"))
.endmacro
@ -675,12 +681,12 @@ tickident page, 8
.macro op_tick_22 page
; 4+(4+4+5+4+4)+3+3+42
.ident (.concat ("op_tick_22_page_", .string(page))):
BIT TICK ; 4
STA TICK ; 4
LDA WDATA ; 4
LDY WDATA ; 4
STA page << 8,Y ; 5
LDY WDATA ; 4
BIT TICK ; 4
STA TICK ; 4
STA zpdummy ; 3
JMP .ident(.concat("_op_tick_page_", .string(page), "_tail_42")) ; 3 + 42
@ -689,13 +695,13 @@ tickident page, 8
.macro op_tick_24 page
;4+(4+4+5+4+3+4)+3+42
.ident (.concat ("op_tick_24_page_", .string(page))):
BIT TICK ; 4
STA TICK ; 4
LDA WDATA ; 4
LDY WDATA ; 4
STA page << 8,Y ; 5
LDY WDATA ; 4
STA zpdummy ; 3
BIT TICK ; 4
STA TICK ; 4
JMP .ident(.concat("_op_tick_page_", .string(page), "_tail_42"))
.endmacro
@ -703,13 +709,13 @@ tickident page, 8
.macro op_tick_26 page ; pattern repeats from op_tick_8
; 4+(4+4+5+4+5+4)+3+37
.ident (.concat ("op_tick_26_page_", .string(page))):
BIT TICK ; 4
STA TICK ; 4
LDA WDATA ; 4
LDY WDATA ; 4
STA page << 8,Y ; 5
LDY WDATA ; 4
STA page << 8,Y ; 5
BIT TICK; 4
STA TICK; 4
STA zpdummy ; 3
JMP .ident(.concat("_op_tick_page_", .string(page), "_tail_37")) ; 3 + 37
@ -718,14 +724,14 @@ tickident page, 8
.macro op_tick_28 page ; pattern repeats from op_tick_10
; 4+(4+2+4+5+4+5+4)+3+38
.ident (.concat ("op_tick_28_page_", .string(page))):
BIT TICK ; 4
STA TICK ; 4
LDA WDATA ; 4
LDY WDATA ; 4
STA page << 8,Y ; 5
LDY WDATA ; 4
STA page << 8,Y ; 5
NOP ; 2
BIT TICK ; 4
STA TICK ; 4
JMP .ident(.concat("_op_tick_page_", .string(page), "_tail_38"))
.endmacro
@ -733,14 +739,14 @@ tickident page, 8
.macro op_tick_30 page ; pattern repeats from op_tick_12
;4+(4+4+5+4+5+4+4)+3+3+33
.ident (.concat ("op_tick_30_page_", .string(page))):
BIT TICK ; 4
STA TICK ; 4
LDA WDATA ; 4
LDY WDATA ; 4
STA page << 8,Y ; 5
LDY WDATA ; 4
STA page << 8,Y ; 5
LDY WDATA ; 4
BIT TICK ; 4
STA TICK ; 4
STA zpdummy ; 3
JMP .ident(.concat("_op_tick_page_", .string(page), "_tail_33")) ; 3 + 33
@ -749,7 +755,7 @@ tickident page, 8
.macro op_tick_32 page ; pattern repeats from op_tick_14
;4+(4+4+5+4+5+4+2+4)+3+34
.ident (.concat ("op_tick_32_page_", .string(page))):
BIT TICK ; 4
STA TICK ; 4
LDA WDATA ; 4
LDY WDATA ; 4
STA page << 8,Y ; 5
@ -757,7 +763,7 @@ tickident page, 8
STA page << 8,Y ; 5
LDY WDATA ; 4
NOP ; 2
BIT TICK ; 4
STA TICK ; 4
JMP .ident(.concat("_op_tick_page_", .string(page), "_tail_34"))
.endmacro
@ -765,7 +771,7 @@ tickident page, 8
.macro op_tick_34 page ; pattern repeats from op_tick_16
; 4+(4+4+5+4+5+4+4+4)+2+5+5+3+20
.ident (.concat ("op_tick_34_page_", .string(page))):
BIT TICK ; 4
STA TICK ; 4
LDA WDATA ; 4
LDY WDATA ; 4
STA page << 8,Y ; 5
@ -773,7 +779,7 @@ tickident page, 8
STA page << 8,Y ; 5
LDY WDATA ; 4
LDX WDATA ; 4 ; allows reordering STA ...,X outside ticks
BIT TICK ; 4
STA TICK ; 4
STA page << 8,Y ; 5
STA page << 8,X ; 5
@ -786,7 +792,7 @@ tickident page, 8
.macro op_tick_36 page ; pattern repeats from op_tick_18
;4+(4+4+5+4+5+4+4+2+4)+5+5+2+2+4+4+4+4+3
.ident (.concat ("op_tick_36_page_", .string(page))):
BIT TICK ; 4
STA TICK ; 4
LDA WDATA ; 4
LDY WDATA ; 4
STA page << 8,Y ; 5
@ -795,7 +801,7 @@ tickident page, 8
LDY WDATA ; 4
LDX WDATA ; 4
NOP ; 2
BIT TICK ; 4
STA TICK ; 4
STA page << 8,Y ; 5
STA page << 8,X ; 5
@ -814,7 +820,7 @@ tickident page, 8
.macro op_tick_38 page ; pattern repeats from op_tick_20
; 4 + (4+4+5+4+5+4+5+3+4)+3+28
.ident (.concat ("op_tick_38_page_", .string(page))):
BIT TICK ; 4
STA TICK ; 4
LDA WDATA ; 4
LDY WDATA ; 4
STA page << 8,Y ; 5
@ -823,7 +829,7 @@ tickident page, 8
LDY WDATA ; 4
STA page << 8,Y ; 5
STA zpdummy ; 3
BIT TICK ; 4
STA TICK ; 4
JMP .ident(.concat("_op_tick_page_", .string(page), "_tail_28")) ; 3 + 28
.endmacro
@ -832,7 +838,7 @@ tickident page, 8
.macro op_tick_40 page ; pattern repeats from op_tick_22
;4+(4+4+5+4+5+4+5+4+4)+3+3+24
.ident (.concat ("op_tick_40_page_", .string(page))):
BIT TICK ; 4
STA TICK ; 4
LDA WDATA ; 4
LDY WDATA ; 4
STA page << 8,Y ; 5
@ -841,7 +847,7 @@ tickident page, 8
LDY WDATA ; 4
STA page << 8,Y ; 5
LDY WDATA ; 4
BIT TICK ; 4
STA TICK ; 4
STA zpdummy
JMP .ident(.concat("_op_tick_page_", .string(page), "_tail_24"))
@ -850,7 +856,7 @@ tickident page, 8
.macro op_tick_42 page ; pattern repeats from op_tick_24
;4+(4+4+5+4+5+4+5+4+3+4)+3+24
.ident (.concat ("op_tick_42_page_", .string(page))):
BIT TICK ; 4
STA TICK ; 4
LDA WDATA ; 4
LDY WDATA ; 4
STA page << 8,Y ; 5
@ -860,7 +866,7 @@ tickident page, 8
STA page << 8,Y ; 5
LDY WDATA ; 4
STA zpdummy ; 3
BIT TICK ; 4
STA TICK ; 4
JMP .ident(.concat("_op_tick_page_", .string(page), "_tail_24")) ; 3 + 24
.endmacro
@ -868,7 +874,7 @@ tickident page, 8
.macro op_tick_44 page ; pattern repeats from op_tick_26
; 4 + (4+4+5+4+5+4+5+4+5+4)+3+3+19
.ident (.concat ("op_tick_44_page_", .string(page))):
BIT TICK ; 4
STA TICK ; 4
LDA WDATA ; 4
LDY WDATA ; 4
STA page << 8,Y ; 5
@ -878,7 +884,7 @@ tickident page, 8
STA page << 8,Y ; 5
LDY WDATA ; 4
STA page << 8,Y ; 5
BIT TICK; 4
STA TICK; 4
STA zpdummy ; 3
JMP .ident(.concat("_op_tick_page_", .string(page), "_tail_19")) ; 3 + 19
@ -887,7 +893,7 @@ tickident page, 8
.macro op_tick_46 page ; pattern repeats from op_tick_28
;4+(4+2+4+5+4+5+4+5+4+5+4)+3+20
.ident (.concat ("op_tick_46_page_", .string(page))):
BIT TICK ; 4
STA TICK ; 4
LDA WDATA ; 4
LDY WDATA ; 4
STA page << 8,Y ; 5
@ -898,7 +904,7 @@ tickident page, 8
LDY WDATA ; 4
STA page << 8,Y ; 5
NOP ; 2
BIT TICK ; 4
STA TICK ; 4
JMP .ident(.concat("_op_tick_page_", .string(page), "_tail_20"))
.endmacro
@ -906,7 +912,7 @@ tickident page, 8
.macro op_tick_48 page ; pattern repeats from op_tick_30
;4+(4+4+5+4+5+4+5+4+5+4+4)+3+3+15
.ident (.concat ("op_tick_48_page_", .string(page))):
BIT TICK ; 4
STA TICK ; 4
LDA WDATA ; 4
LDY WDATA ; 4
STA page << 8,Y ; 5
@ -918,7 +924,7 @@ tickident page, 8
STA page << 8,Y ; 5
LDA WDATA ; 4
BIT TICK ; 4
STA TICK ; 4
STA zpdummy ; 3
JMP .ident(.concat("_op_tick_page_", .string(page), "_tail_15")) ; 3 + 15
@ -927,7 +933,7 @@ tickident page, 8
.macro op_tick_50 page ; pattern repeats from op_tick_32
;4+(4+4+5+4+5+4+5+4+5+4+2+4)+3+16
.ident (.concat ("op_tick_50_page_", .string(page))):
BIT TICK ; 4
STA TICK ; 4
LDA WDATA ; 4
LDY WDATA ; 4
STA page << 8,Y ; 5
@ -940,7 +946,7 @@ tickident page, 8
LDA WDATA ; 4
NOP ; 2
BIT TICK ; 4
STA TICK ; 4
JMP .ident(.concat("_op_tick_page_", .string(page), "_tail_16"))
.endmacro
@ -948,7 +954,7 @@ tickident page, 8
.macro op_tick_52 page ; pattern repeats from op_tick_34
;4+(4+4+5+4+5+4+5+4+5+4+4+4)+2+3+12
.ident (.concat ("op_tick_52_page_", .string(page))):
BIT TICK ; 4
STA TICK ; 4
LDA WDATA ; 4
LDY WDATA ; 4
STA page << 8,Y ; 5
@ -961,7 +967,7 @@ tickident page, 8
LDA WDATA ; 4
STA .ident (.concat ("_op_tick_6_page_", .string(page), "_jmp"))+2 ; 4
BIT TICK ; 4
STA TICK ; 4
NOP ; 2
JMP .ident(.concat("_op_tick_page_", .string(page), "_tail_12"))
@ -970,7 +976,7 @@ tickident page, 8
.macro op_tick_54 page ; pattern repeats from op_tick_36
; 4 + (4+4+5+4+5+4+5+3+3+4+5+4+4)+4+4+4+3
.ident (.concat ("op_tick_54_page_", .string(page))):
BIT TICK ; 4
STA TICK ; 4
LDA WDATA ; 4
LDY WDATA ; 4
STA page << 8,Y ; 5
@ -986,7 +992,7 @@ tickident page, 8
STA zpdummy ; 3
STA zpdummy ; 3
BIT TICK ; 4
STA TICK ; 4
; used >3 pad cycles between tick pair; can't branch to tail
STA @D+2 ; 4
@ -999,7 +1005,7 @@ tickident page, 8
.macro op_tick_56 page
; 4+(4+4+5+4+5+4+5+4+5+4+4+4+4)+2+4+4+3
.ident (.concat ("op_tick_56_page_", .string(page))):
BIT TICK ; 4
STA TICK ; 4
LDA WDATA ; 4
LDY WDATA ; 4
STA page << 8,Y ; 5
@ -1014,7 +1020,7 @@ tickident page, 8
STA @D+2 ; 4
STA dummy ; 4
BIT TICK ; 4
STA TICK ; 4
; used >3 pad cycles between tick pair; can't branch to tail
NOP ; 2
@ -1028,7 +1034,7 @@ tickident page, 8
.macro op_tick_58 page ; pattern repeats from op_tick_40
;4+(4+4+5+4+5+4+5+4+5+4+4+3+3+4)+4+4+3
.ident (.concat ("op_tick_58_page_", .string(page))):
BIT TICK ; 4
STA TICK ; 4
LDA WDATA ; 4
LDY WDATA ; 4
STA page << 8,Y ; 5
@ -1044,7 +1050,7 @@ tickident page, 8
STA zpdummy ; 3
STA zpdummy ; 3
BIT TICK ; 4
STA TICK ; 4
; used >3 pad cycles between tick pair; can't branch to tail
LDA WDATA ; 4
@ -1056,7 +1062,7 @@ tickident page, 8
.macro op_tick_60 page
; 4+(4+4+5+4+5+4+5+4+5+4+4+4+4+4)+2+4+3
.ident (.concat ("op_tick_60_page_", .string(page))):
BIT TICK ; 4
STA TICK ; 4
LDA WDATA ; 4
LDY WDATA ; 4
STA page << 8,Y ; 5
@ -1073,7 +1079,7 @@ tickident page, 8
LDA WDATA ; 4
STA dummy ; 4
BIT TICK ; 4
STA TICK ; 4
; used >3 pad cycles between tick pair; can't branch to tail
NOP ; 2
@ -1085,7 +1091,7 @@ tickident page, 8
.macro op_tick_62 page
;4+(4+4+5+4+5+4+5+4+5+4+4+4+3+3+4)+4+3
.ident (.concat ("op_tick_62_page_", .string(page))):
BIT TICK ; 4
STA TICK ; 4
LDA WDATA ; 4
LDY WDATA ; 4
STA page << 8,Y ; 5
@ -1102,7 +1108,7 @@ tickident page, 8
STA zpdummy ; 3
STA zpdummy ; 3
BIT TICK ; 4
STA TICK ; 4
; used >3 pad cycles between tick pair; can't branch to tail
STA @D+1 ; 4
@ -1113,7 +1119,7 @@ tickident page, 8
.macro op_tick_64 page
;4+(4+4+5+4+5+4+5+4+5+4+4+4+4+4+4)+2+3
.ident (.concat ("op_tick_64_page_", .string(page))):
BIT TICK ; 4
STA TICK ; 4
LDA WDATA ; 4
LDY WDATA ; 4
STA page << 8,Y ; 5
@ -1131,7 +1137,7 @@ tickident page, 8
STA @D+1 ; 4
STA dummy ; 4
BIT TICK ; 4
STA TICK ; 4
NOP ; 2
@D:
@ -1141,7 +1147,7 @@ tickident page, 8
.macro op_tick_66 page ; pattern repeats from op_tick_8
; 4+(4+4+5+4+5+4+5+4+5+4+4+4+3+4+3+4)+3
.ident (.concat ("op_tick_66_page_", .string(page))):
BIT TICK ; 4
STA TICK ; 4
LDA WDATA ; 4
LDY WDATA ; 4
STA page << 8,Y ; 5
@ -1160,7 +1166,7 @@ tickident page, 8
STA zpdummy ; 3
STA zpdummy ; 3
BIT TICK ; 4
STA TICK ; 4
@D:
JMP op_nop ; 3
@ -1278,6 +1284,7 @@ op_terminate:
LDA KBD
BPL @0
@1: ; key pressed
LDA KBDSTRB ; clear strobe
JMP exit
; Manage W5100 socket buffer and ACK TCP stream.
@ -1286,7 +1293,7 @@ op_terminate:
; the last 4 bytes in a 2K "TCP frame". i.e. we can assume that we need to consume
; exactly 2K from the W5100 socket buffer.
op_ack:
BIT TICK ; 4
STA TICK ; 4
; allow flip-flopping the PAGE1/PAGE2 soft switches to steer writes to MAIN/AUX screens
; actually this allows touching any $C0XX soft-switch, in case that is useful somehow
@ -1307,7 +1314,7 @@ op_ack:
LDX #<S0RXRD ; 2
STX WADRL ; 4
BIT TICK ; 4 (36)
STA TICK ; 4 (36)
LDA WDATA ; 4 Read high byte
; No need to read low byte since it's guaranteed to be 0 since we're at the end of a 2K frame.
@ -1332,7 +1339,7 @@ op_ack:
; - used as the low byte for resetting the W5100 address pointer when we're ready to start processing more data
LDX #$00 ; 2 restore invariant for dispatch loop
JMP checkrecv ; 3 (37 with following BIT TICK)
JMP checkrecv ; 3 (37 with following STA TICK)
; Quit to ProDOS
exit:

149
transcoder/colours.py Normal file
View File

@ -0,0 +1,149 @@
"""Apple II nominal display colours, represented by 4-bit dot sequences.
These are the "asymptotic" colours as displayed in e.g. continuous runs of
pixels. The effective colours that are actually displayed are not discrete,
due to NTSC artifacting being a continuous process.
"""
from typing import Tuple, Type
import enum
import functools
class NominalColours(enum.Enum):
pass
class HGRColours(NominalColours):
"""Map from 4-bit dot representation to DHGR pixel colours.
Dots are in memory bit order (MSB -> LSB), which is opposite to screen
order (LSB -> MSB is ordered left-to-right on the screen)
Note that these are right-rotated from the HGR mapping, because of a
1-tick phase difference in the colour reference signal for DHGR vs HGR
"""
BLACK = 0b0000
MAGENTA = 0b0001
BROWN = 0b1000
ORANGE = 0b1001 # HGR colour
DARK_GREEN = 0b0100
GREY1 = 0b0101
GREEN = 0b1100 # HGR colour
YELLOW = 0b1101
DARK_BLUE = 0b0010
VIOLET = 0b0011 # HGR colour
GREY2 = 0b1010
PINK = 0b1011
MED_BLUE = 0b0110 # HGR colour
LIGHT_BLUE = 0b0111
AQUA = 0b1110
WHITE = 0b1111
class DHGRColours(NominalColours):
"""Map from 4-bit dot representation to DHGR pixel colours.
Dots are in memory bit order (MSB -> LSB), which is opposite to screen
order (LSB -> MSB is ordered left-to-right on the screen)
Note that these are right-rotated from the HGR mapping, because of a
1-tick phase difference in the colour reference signal for DHGR vs HGR
"""
# representation.
BLACK = 0b0000
MAGENTA = 0b1000
BROWN = 0b0100
ORANGE = 0b1100 # HGR colour
DARK_GREEN = 0b0010
GREY1 = 0b1010
GREEN = 0b0110 # HGR colour
YELLOW = 0b1110
DARK_BLUE = 0b0001
VIOLET = 0b1001 # HGR colour
GREY2 = 0b0101
PINK = 0b1101
MED_BLUE = 0b0011 # HGR colour
LIGHT_BLUE = 0b1011
AQUA = 0b0111
WHITE = 0b1111
def ror(int4: int, howmany: int) -> int:
"""Rotate-right an int4 some number of times."""
res = int4
for _ in range(howmany):
res = _ror(res)
return res
def _ror(int4: int) -> int:
return ((int4 & 0b1110) >> 1) ^ ((int4 & 0b0001) << 3)
def rol(int4: int, howmany: int) -> int:
"""Rotate-left an int4 some number of times."""
res = int4
for _ in range(howmany):
res = _rol(res)
return res
def _rol(int4: int) -> int:
return ((int4 & 0b0111) << 1) ^ ((int4 & 0b1000) >> 3)
@functools.lru_cache(None)
def dots_to_nominal_colour_pixels(
num_bits: int,
dots: int,
colours: Type[NominalColours],
init_phase: int = 1 # Such that phase = 0 at start of body
) -> Tuple[NominalColours]:
"""Sequence of num_bits nominal colour pixels via sliding 4-bit window.
Includes the 3-bit header that represents the trailing 3 bits of the
previous tuple body. e.g. for DHGR, storing a byte in aux even columns
will also influence the colours of the previous main odd column.
This naively models (approximates) the NTSC colour artifacting.
TODO: Use a more careful analogue colour composition model to produce
effective pixel colours.
TODO: DHGR vs HGR colour differences can be modeled by changing init_phase
"""
res = []
shifted = dots
phase = init_phase
for i in range(num_bits):
colour = rol(shifted & 0b1111, phase)
res.append(colours(colour))
shifted >>= 1
phase += 1
if phase == 4:
phase = 0
return tuple(res)
@functools.lru_cache(None)
def dots_to_nominal_colour_pixel_values(
num_bits: int,
dots: int,
colours: Type[NominalColours],
init_phase: int = 1 # Such that phase = 0 at start of body
) -> Tuple[int]:
""""Sequence of num_bits nominal colour values via sliding 4-bit window."""
return tuple(p.value for p in dots_to_nominal_colour_pixels(
num_bits, dots, colours, init_phase
))

113
transcoder/colours_test.py Normal file
View File

@ -0,0 +1,113 @@
import unittest
import colours
HGRColours = colours.HGRColours
class TestColours(unittest.TestCase):
def test_dots_to_pixels(self):
self.assertEqual(
(
HGRColours.BLACK,
HGRColours.BLACK,
HGRColours.BLACK,
HGRColours.BLACK,
HGRColours.BLACK,
HGRColours.BLACK,
HGRColours.DARK_BLUE,
HGRColours.MED_BLUE,
HGRColours.AQUA,
HGRColours.AQUA,
HGRColours.GREEN,
HGRColours.BROWN,
HGRColours.BLACK,
HGRColours.BLACK,
HGRColours.BLACK,
HGRColours.BLACK,
HGRColours.BLACK,
HGRColours.BLACK,
HGRColours.BLACK,
HGRColours.BLACK,
HGRColours.BLACK,
HGRColours.BLACK,
HGRColours.BLACK,
HGRColours.BLACK,
HGRColours.BLACK,
HGRColours.BLACK,
HGRColours.BLACK,
HGRColours.BLACK,
HGRColours.BLACK,
HGRColours.BLACK,
HGRColours.BLACK
),
colours.dots_to_nominal_colour_pixels(
31, 0b00000000000000000000111000000000, HGRColours, init_phase=0
)
)
self.assertEqual(
(
HGRColours.BLACK,
HGRColours.MAGENTA,
HGRColours.VIOLET,
HGRColours.LIGHT_BLUE,
HGRColours.WHITE,
HGRColours.AQUA,
HGRColours.GREEN,
HGRColours.BROWN,
HGRColours.BLACK,
HGRColours.MAGENTA,
HGRColours.VIOLET,
HGRColours.LIGHT_BLUE,
HGRColours.WHITE,
HGRColours.AQUA,
HGRColours.GREEN,
HGRColours.BROWN,
HGRColours.BLACK,
HGRColours.MAGENTA,
HGRColours.VIOLET,
HGRColours.LIGHT_BLUE,
HGRColours.WHITE,
HGRColours.AQUA,
HGRColours.GREEN,
HGRColours.BROWN,
HGRColours.BLACK,
HGRColours.BLACK,
HGRColours.BLACK,
HGRColours.BLACK,
HGRColours.BLACK,
HGRColours.BLACK,
HGRColours.BLACK
),
colours.dots_to_nominal_colour_pixels(
31, 0b0000111100001111000011110000, HGRColours, init_phase=0
)
)
class TestRolRoR(unittest.TestCase):
def testRolOne(self):
self.assertEqual(0b1111, colours.rol(0b1111, 1))
self.assertEqual(0b0001, colours.rol(0b1000, 1))
self.assertEqual(0b1010, colours.rol(0b0101, 1))
def testRolMany(self):
self.assertEqual(0b1111, colours.rol(0b1111, 3))
self.assertEqual(0b0010, colours.rol(0b1000, 2))
self.assertEqual(0b0101, colours.rol(0b0101, 2))
def testRorOne(self):
self.assertEqual(0b1111, colours.ror(0b1111, 1))
self.assertEqual(0b1000, colours.ror(0b0001, 1))
self.assertEqual(0b0101, colours.ror(0b1010, 1))
def testRoRMany(self):
self.assertEqual(0b1111, colours.ror(0b1111, 3))
self.assertEqual(0b1000, colours.ror(0b0010, 2))
self.assertEqual(0b0101, colours.ror(0b0101, 2))
if __name__ == "__main__":
unittest.main()

1
transcoder/data/.gitattributes vendored Normal file
View File

@ -0,0 +1 @@
*.bz2 filter=lfs diff=lfs merge=lfs -text

View File

@ -0,0 +1,3 @@
version https://git-lfs.github.com/spec/v1
oid sha256:b47eadfdf8c8e16c6539f9a16ed0b5a393b17e0cbd03831aacda7f659e9522d6
size 120830327

View File

@ -0,0 +1,3 @@
version https://git-lfs.github.com/spec/v1
oid sha256:8c245981f91ffa89b47abdd1c9d646c2e79499a0c82c38c91234be0a59e52f1f
size 118832545

View File

@ -0,0 +1,3 @@
version https://git-lfs.github.com/spec/v1
oid sha256:3fd52feb08eb6f99b267a1050c68905f25d0d106ad7c2c63473cc0a0f6aa1b25
size 224334626

View File

@ -0,0 +1,3 @@
version https://git-lfs.github.com/spec/v1
oid sha256:dbf83e3d0b6c7867ccf7ae1d55a6ed4e906409b08043dec514e1104cec95f0fc
size 220565577

View File

@ -1,310 +0,0 @@
"""Computes visual differences between screen image data.
This is the core of the video encoding, for three reasons:
- The edit distance between old and new frames is used to prioritize which
screen bytes to send
- When deciding which other offset bytes to send along with a chosen screen
byte, we minimize the error introduced by sending this (probably non-optimal)
byte instead of the actual target screen byte. This needs to account for the
colour artifacts introduced by this byte as well as weighting perceived
errors introduced (e.g. long runs of colour)
- The byte_screen_error_distance function is on the critical path of the encoding.
"""
import functools
import numpy as np
import weighted_levenshtein
@functools.lru_cache(None)
def byte_to_nominal_colour_string(b: int, is_odd_offset: bool) -> str:
"""Compute nominal pixel colours for a byte.
This ignores any fringing/colour combining effects, as well as
half-ignoring what happens to the colour pixel that crosses the byte
boundary.
A better implementation of this might be to consider neighbouring (even,
odd) column bytes together since this will allow correctly colouring the
split pixel in the middle.
There are also even weirder colour artifacts that happen when
neighbouring bytes have mismatched colour palettes, which also cross the
odd/even boundary. But these may not be worth worrying about.
:param b: byte to encode
:param is_odd_offset: whether byte is at an odd screen column
:return: string encoding nominal colour of pixels in the byte, with "0"
or "1" for the "hanging" bit that spans the neighbouring byte.
"""
pixels = []
idx = 0
if is_odd_offset:
pixels.append("01"[b & 0x01])
idx += 1
# K = black
# G = green
# V = violet
# W = white
palettes = (
(
"K", # 0x00
"V", # 0x01
"G", # 0x10
"W" # 0x11
), (
"K", # 0x00
"B", # 0x01
"O", # 0x10
"W" # 0x11
)
)
palette = palettes[(b & 0x80) != 0]
for _ in range(3):
pixel = palette[(b >> idx) & 0b11]
pixels.append(pixel)
idx += 2
if not is_odd_offset:
pixels.append("01"[(b & 0x40) != 0])
idx += 1
return "".join(pixels)
@functools.lru_cache(None)
def byte_to_colour_string_with_white_coalescing(
b: int, is_odd_offset: bool) -> str:
"""Model the combining of neighbouring 1 bits to produce white.
The output is a string of length 7 representing the 7 display dots that now
have colour.
Attempt to model the colour artifacting that consecutive runs of
1 bits are coerced to white. This isn't quite correct since:
a) it doesn't operate across byte boundaries (see note on
byte_to_nominal_colour_string)
b) a sequence like WVV appears more like WWWVVV or WWVVVV rather than WWWKVV
(at least on the //gs)
It also ignores other colour fringing e.g. from NTSC artifacts.
TODO: this needs more work.
:param b:
:param is_odd_offset:
:return:
"""
pixels = []
fringing = {
"1V": "WWK", # 110
"1W": "WWW", # 111
"1B": "WWB", # 110
"WV": "WWWK", # 1110
"WB": "WWWK", # 1110
"GV": "KWWK", # 0110
"OB": "KWWK", # 0110
"GW": "KWWW", # 0111
"OW": "KWWW", # 0111
"W1": "WWW", # 111
"G1": "KWW", # 011
"O1": "KWW", # 011
}
nominal = byte_to_nominal_colour_string(b, is_odd_offset)
for idx in range(3):
pair = nominal[idx:idx + 2]
effective = fringing.get(pair)
if not effective:
e = []
if pair[0] in {"0", "1"}:
e.append(pair[0])
else:
e.extend([pair[0], pair[0]])
if pair[1] in {"0", "1"}:
e.append(pair[1])
else:
e.extend([pair[1], pair[1]])
effective = "".join(e)
if pixels:
pixels.append(effective[2:])
else:
pixels.append(effective)
return "".join(pixels)
substitute_costs = np.ones((128, 128), dtype=np.float64)
# Substitution costs to use when evaluating other potential offsets at which
# to store a content byte. We penalize more harshly for introducing
# errors that alter pixel colours, since these tend to be very
# noticeable as visual noise.
error_substitute_costs = np.ones((128, 128), dtype=np.float64)
# Penalty for turning on/off a black bit
for c in "01GVWOB":
substitute_costs[(ord('K'), ord(c))] = 1
substitute_costs[(ord(c), ord('K'))] = 1
error_substitute_costs[(ord('K'), ord(c))] = 5
error_substitute_costs[(ord(c), ord('K'))] = 5
# Penalty for changing colour
for c in "01GVWOB":
for d in "01GVWOB":
substitute_costs[(ord(c), ord(d))] = 1
substitute_costs[(ord(d), ord(c))] = 1
error_substitute_costs[(ord(c), ord(d))] = 5
error_substitute_costs[(ord(d), ord(c))] = 5
insert_costs = np.ones(128, dtype=np.float64) * 1000
delete_costs = np.ones(128, dtype=np.float64) * 1000
def _edit_weight(a: int, b: int, is_odd_offset: bool, error: bool):
"""
:param a:
:param b:
:param is_odd_offset:
:param error:
:return:
"""
a_pixels = byte_to_colour_string_with_white_coalescing(a, is_odd_offset)
b_pixels = byte_to_colour_string_with_white_coalescing(b, is_odd_offset)
dist = weighted_levenshtein.dam_lev(
a_pixels, b_pixels,
insert_costs=insert_costs,
delete_costs=delete_costs,
substitute_costs=error_substitute_costs if error else substitute_costs,
)
return np.int64(dist)
@functools.lru_cache(None)
def _edit_weight_matrices(error: bool) -> np.array:
"""
:param error:
:return:
"""
ewm = np.zeros(shape=(256, 256, 2), dtype=np.int64)
for a in range(256):
for b in range(256):
for is_odd_offset in (False, True):
ewm[a, b, int(is_odd_offset)] = _edit_weight(
a, b, is_odd_offset, error)
return ewm
@functools.lru_cache(None)
def edit_weight(a: int, b: int, is_odd_offset: bool, error: bool):
"""
:param a: first content value
:param b: second content value
:param is_odd_offset: whether this content byte is at an odd screen
byte offset
:param error: whether to compute error distance or edit distance
:return: the corresponding distance value
"""
return _edit_weight_matrices(error)[a, b, int(is_odd_offset)]
_even_ewm = {}
_odd_ewm = {}
_even_error_ewm = {}
_odd_error_ewm = {}
for a in range(256):
for b in range(256):
_even_ewm[(a << 8) + b] = edit_weight(a, b, False, False)
_odd_ewm[(a << 8) + b] = edit_weight(a, b, True, False)
_even_error_ewm[(a << 8) + b] = edit_weight(a, b, False, True)
_odd_error_ewm[(a << 8) + b] = edit_weight(a, b, True, True)
@functools.lru_cache(None)
def _constant_array(content: int, shape) -> np.array:
"""
:param content:
:param shape:
:return:
"""
return np.ones(shape, dtype=np.uint16) * content
def byte_screen_error_distance(content: int, b: np.array) -> np.array:
"""
:param content: byte for which to compute error distance
:param b: np.array of size (32, 256) representing existing screen memory.
:return: np.array of size (32, 256) representing error distance from
content byte to each byte of b
"""
assert b.shape == (32, 256), b.shape
# Extract even and off column offsets (128,)
even_b = b[:, ::2]
odd_b = b[:, 1::2]
a = _constant_array(content << 8, even_b.shape)
even = a + even_b
odd = a + odd_b
even_weights = np.vectorize(_even_error_ewm.__getitem__)(even)
odd_weights = np.vectorize(_odd_error_ewm.__getitem__)(odd)
res = np.ndarray(shape=b.shape, dtype=np.int64)
res[:, ::2] = even_weights
res[:, 1::2] = odd_weights
return res
def screen_edit_distance(a: np.array, b: np.array) -> np.array:
"""
:param a:
:param b:
:return:
"""
# Extract even and off column offsets (32, 128)
even_a = a[:, ::2]
odd_a = a[:, 1::2]
even_b = b[:, ::2]
odd_b = b[:, 1::2]
even = (even_a.astype(np.uint16) << 8) + even_b
odd = (odd_a.astype(np.uint16) << 8) + odd_b
even_weights = np.vectorize(_even_ewm.__getitem__)(even)
odd_weights = np.vectorize(_odd_ewm.__getitem__)(odd)
res = np.ndarray(shape=a.shape, dtype=np.int64)
res[:, ::2] = even_weights
res[:, 1::2] = odd_weights
return res

View File

@ -1,88 +0,0 @@
"""Tests for the edit_distance module."""
import unittest
import edit_distance
class TestByteToNominalColourString(unittest.TestCase):
def testEncoding(self):
self.assertEqual(
"KKK0",
edit_distance.byte_to_nominal_colour_string(
0, is_odd_offset=False))
self.assertEqual(
"0KKK",
edit_distance.byte_to_nominal_colour_string(
0, is_odd_offset=True))
self.assertEqual(
"WWW1", edit_distance.byte_to_nominal_colour_string(
0xff, is_odd_offset=False))
self.assertEqual(
"1WWW", edit_distance.byte_to_nominal_colour_string(
0xff, is_odd_offset=True))
self.assertEqual(
"GGG0", edit_distance.byte_to_nominal_colour_string(
0x2a, is_odd_offset=False))
self.assertEqual(
"1GGG", edit_distance.byte_to_nominal_colour_string(
0x55, is_odd_offset=True))
self.assertEqual(
"OOO0", edit_distance.byte_to_nominal_colour_string(
0xaa, is_odd_offset=False))
self.assertEqual(
"1OOO", edit_distance.byte_to_nominal_colour_string(
0xd5, is_odd_offset=True))
class TestEditWeight(unittest.TestCase):
def testTransposition(self):
self.assertEqual("WKK0", edit_distance.byte_to_nominal_colour_string(
0b00000011, is_odd_offset=False))
self.assertEqual("KWK0", edit_distance.byte_to_nominal_colour_string(
0b00001100, is_odd_offset=False))
self.assertEqual(
1, edit_distance.edit_weight(0b00000011, 0b00001100,
is_odd_offset=False)
)
self.assertEqual("OWK1", edit_distance.byte_to_nominal_colour_string(
0b11001110, is_odd_offset=False))
self.assertEqual("OKW1", edit_distance.byte_to_nominal_colour_string(
0b11110010, is_odd_offset=False))
self.assertEqual(
1, edit_distance.edit_weight(
0b11001110, 0b11110010, is_odd_offset=False)
)
def testSubstitution(self):
# Black has cost 5
self.assertEqual("WKK0", edit_distance.byte_to_nominal_colour_string(
0b00000011, is_odd_offset=False))
self.assertEqual("KKK0", edit_distance.byte_to_nominal_colour_string(
0b00000000, is_odd_offset=False))
self.assertEqual(
5, edit_distance.edit_weight(
0b00000011, 0b00000000, is_odd_offset=False)
)
self.assertEqual(
5, edit_distance.edit_weight(
0b00000000, 0b00000011, is_odd_offset=False)
)
# Other colour has cost 1
self.assertEqual(
1, edit_distance.edit_weight(
0b00000010, 0b00000011, is_odd_offset=False)
)
self.assertEqual(
1, edit_distance.edit_weight(
0b00000011, 0b00000010, is_odd_offset=False)
)
if __name__ == '__main__':
unittest.main()

147
transcoder/frame_grabber.py Normal file
View File

@ -0,0 +1,147 @@
"""Extracts sequence of still images from input video stream."""
import os
import queue
import subprocess
import threading
from typing import Iterator
import numpy as np
import skvideo.io
from PIL import Image
import screen
from palette import Palette
from video_mode import VideoMode
class FrameGrabber:
def __init__(self, mode: VideoMode):
self.video_mode = mode
self.input_frame_rate = 30
def frames(self) -> Iterator[screen.MemoryMap]:
raise NotImplementedError
class FileFrameGrabber(FrameGrabber):
def __init__(self, filename, mode: VideoMode, palette: Palette):
super(FileFrameGrabber, self).__init__(mode)
self.filename = filename # type: str
self.palette = palette # type: Palette
self._reader = skvideo.io.FFmpegReader(filename)
# Compute frame rate from input video
# TODO: possible to compute time offset for each frame instead?
data = skvideo.io.ffprobe(self.filename)['video']
rate_data = data['@r_frame_rate'].split("/") # e.g. 12000/1001
self.input_frame_rate = float(
rate_data[0]) / float(rate_data[1]) # type: float
def _frame_grabber(self) -> Iterator[Image.Image]:
for frame_array in self._reader.nextFrame():
yield Image.fromarray(frame_array)
@staticmethod
def _output_dir(filename, video_mode, palette) -> str:
return "%s/%s/%s" % (
".".join(filename.split(".")[:-1]),
video_mode.name,
palette.name)
def _palette_arg(self) -> str:
return "P%d" % self.palette.value
def frames(self) -> Iterator[screen.MemoryMap]:
"""Encode frame to (D)HGR using bmp2dhr.
We do the encoding in a background thread to parallelize.
"""
frame_dir = self._output_dir(
self.filename, self.video_mode, self.palette)
os.makedirs(frame_dir, exist_ok=True)
q = queue.Queue(maxsize=10)
def _hgr_decode(_idx, _frame):
outfile = "%s/%08dC.BIN" % (frame_dir, _idx)
bmpfile = "%s/%08d.bmp" % (frame_dir, _idx)
try:
os.stat(outfile)
except FileNotFoundError:
_frame = _frame.resize((280, 192), resample=Image.LANCZOS)
_frame.save(bmpfile)
subprocess.call([
"/usr/local/bin/bmp2dhr", bmpfile, "hgr",
self._palette_arg(),
"D9" # Buckels dither
])
os.remove(bmpfile)
_main = np.fromfile(outfile, dtype=np.uint8)
return _main, None
def _dhgr_decode(_idx, _frame):
mainfile = "%s/%08d.BIN" % (frame_dir, _idx)
auxfile = "%s/%08d.AUX" % (frame_dir, _idx)
bmpfile = "%s/%08d.bmp" % (frame_dir, _idx)
try:
os.stat(mainfile)
os.stat(auxfile)
except FileNotFoundError:
_frame = _frame.resize((280, 192), resample=Image.LANCZOS)
_frame.save(bmpfile)
subprocess.call([
"/usr/local/bin/bmp2dhr", bmpfile, "dhgr", # "v",
self._palette_arg(),
"A", # Output separate .BIN and .AUX files
"D9" # Buckels dither
])
os.remove(bmpfile)
_main = np.fromfile(mainfile, dtype=np.uint8)
_aux = np.fromfile(auxfile, dtype=np.uint8)
return _main, _aux
def worker():
"""Invoke bmp2dhr to encode input image frames and push to queue."""
decode = (
_dhgr_decode if self.video_mode == VideoMode.DHGR else
_hgr_decode
)
for _idx, _frame in enumerate(self._frame_grabber()):
q.put(decode(_idx, _frame))
q.put((None, None))
t = threading.Thread(target=worker, daemon=True)
t.start()
while True:
main, aux = q.get()
if main is None:
break
main_map = screen.FlatMemoryMap(
screen_page=1, data=main).to_memory_map()
if aux is None:
aux_map = None
else:
aux_map = screen.FlatMemoryMap(
screen_page=1, data=aux).to_memory_map()
yield (main_map, aux_map)
q.task_done()
t.join()

View File

@ -0,0 +1,37 @@
import unittest
import frame_grabber
import palette
import video_mode
class TestFileFrameGrabber(unittest.TestCase):
def test_output_dir(self):
self.assertEqual(
"/foo/bar/DHGR/NTSC",
frame_grabber.FileFrameGrabber._output_dir(
"/foo/bar.mp4", video_mode.VideoMode.DHGR, palette.Palette.NTSC
)
)
self.assertEqual(
"/foo/bar.blee/HGR/IIGS",
frame_grabber.FileFrameGrabber._output_dir(
"/foo/bar.blee.mp4",
video_mode.VideoMode.HGR,
palette.Palette.IIGS
)
)
self.assertEqual(
"/foo/bar blee/DHGR/IIGS",
frame_grabber.FileFrameGrabber._output_dir(
"/foo/bar blee.mp4",
video_mode.VideoMode.DHGR,
palette.Palette.IIGS
)
)
if __name__ == '__main__':
unittest.main()

View File

@ -2,25 +2,12 @@
from typing import Iterator
import numpy as np
import screen
# TODO: screen memory changes should happen via Machine while emitting opcodes?
class Machine:
"""Represents Apple II and player virtual machine state."""
def __init__(
self,
memmap: screen.MemoryMap,
update_priority: np.array
):
self.page = 0x20 # type: int
self.content = 0x7f # type: int
self.memmap = memmap # type: screen.MemoryMap
self.update_priority = update_priority # type: np.array
def emit(self, opcode: "Opcode") -> Iterator[int]:
"""

View File

@ -1,12 +1,13 @@
"""Transcodes an input video file to ][Vision format."""
"""Transcodes an input video file to ][-Vision format."""
import argparse
import movie
import video
import palette
import video_mode
parser = argparse.ArgumentParser(
description='Transcode videos to ][Vision format.')
description='Transcode videos to ][-Vision format.')
parser.add_argument(
'input', help='Path to input video file.')
parser.add_argument(
@ -25,9 +26,15 @@ parser.add_argument(
'frame rate, which may give better quality for some videos.'
)
parser.add_argument(
'--video_mode', type=str, choices=video.Mode.__members__.keys(),
'--video_mode', type=str, choices=video_mode.VideoMode.__members__.keys(),
default=video_mode.VideoMode.DHGR.name,
help='Video display mode to encode for (HGR/DHGR)'
)
parser.add_argument(
'--palette', type=str, choices=palette.Palette.__members__.keys(),
default=palette.Palette.NTSC.name,
help='Video palette to encode for (default=NTSC)'
)
def main(args):
@ -37,10 +44,13 @@ def main(args):
every_n_video_frames=args.every_n_video_frames,
audio_normalization=args.audio_normalization,
max_bytes_out=1024. * 1024 * args.max_output_mb,
video_mode=video.Mode[args.video_mode]
video_mode=video_mode.VideoMode[args.video_mode],
palette=palette.Palette[args.palette],
)
print("Input frame rate = %f" % m.video.input_frame_rate)
print("Palette %s" % args.palette)
print("Input frame rate = %f" % m.frame_grabber.input_frame_rate)
if args.output:
out_filename = args.output

View File

@ -0,0 +1,204 @@
import bz2
import functools
import pickle
import sys
from typing import Iterable, Type
import colormath.color_conversions
import colormath.color_diff
import colormath.color_objects
import numpy as np
import weighted_levenshtein
from etaprogress.progress import ProgressBar
import colours
import palette
import screen
PIXEL_CHARS = "0123456789ABCDEF"
def pixel_char(i: int) -> str:
return PIXEL_CHARS[i]
@functools.lru_cache(None)
def pixel_string(pixels: Iterable[int]) -> str:
return "".join(pixel_char(p) for p in pixels)
class EditDistanceParams:
"""Data class for parameters to Damerau-Levenshtein edit distance."""
# Don't even consider insertions and deletions into the string, they don't
# make sense for comparing pixel strings
insert_costs = np.ones(128, dtype=np.float64) * 100000
delete_costs = np.ones(128, dtype=np.float64) * 100000
# Smallest substitution value is ~20 from palette.diff_matrices, i.e.
# we always prefer to transpose 2 pixels rather than substituting colours.
# TODO: is quality really better allowing transposes?
transpose_costs = np.ones((128, 128), dtype=np.float64) * 100000 # 10
# These will be filled in later
substitute_costs = np.zeros((128, 128), dtype=np.float64)
# Substitution costs to use when evaluating other potential offsets at which
# to store a content byte. We penalize more harshly for introducing
# errors that alter pixel colours, since these tend to be very
# noticeable as visual noise.
#
# TODO: currently unused
error_substitute_costs = np.zeros((128, 128), dtype=np.float64)
def compute_diff_matrix(pal: Type[palette.BasePalette]):
"""Compute matrix of perceptual distance between colour pairs.
Specifically CIE2000 delta values for this palette.
"""
dm = np.ndarray(shape=(16, 16), dtype=np.int)
for colour1, a in pal.RGB.items():
alab = colormath.color_conversions.convert_color(
a, colormath.color_objects.LabColor)
for colour2, b in pal.RGB.items():
blab = colormath.color_conversions.convert_color(
b, colormath.color_objects.LabColor)
dm[colour1.value, colour2.value] = int(
colormath.color_diff.delta_e_cie2000(alab, blab))
return dm
def compute_substitute_costs(pal: Type[palette.BasePalette]):
"""Compute costs for substituting one colour pixel for another."""
edp = EditDistanceParams()
diff_matrix = compute_diff_matrix(pal)
# Penalty for changing colour
for i, c in enumerate(PIXEL_CHARS):
for j, d in enumerate(PIXEL_CHARS):
cost = diff_matrix[i, j]
edp.substitute_costs[(ord(c), ord(d))] = cost
edp.substitute_costs[(ord(d), ord(c))] = cost
edp.error_substitute_costs[(ord(c), ord(d))] = 5 * cost
edp.error_substitute_costs[(ord(d), ord(c))] = 5 * cost
return edp
def edit_distance(
edp: EditDistanceParams,
a: str,
b: str,
error: bool) -> np.float64:
"""Damerau-Levenshtein edit distance between two pixel strings."""
res = weighted_levenshtein.dam_lev(
a, b,
insert_costs=edp.insert_costs,
delete_costs=edp.delete_costs,
substitute_costs=(
edp.error_substitute_costs if error else edp.substitute_costs),
)
# Make sure result can fit in a uint16
assert (0 <= res < 2 ** 16), res
return res
def compute_edit_distance(
edp: EditDistanceParams,
bitmap_cls: Type[screen.Bitmap],
nominal_colours: Type[colours.NominalColours]
):
"""Computes edit distance matrix between all pairs of pixel strings.
Enumerates all possible values of the masked bit representation from
bitmap_cls (assuming it is contiguous, i.e. we enumerate all
2**bitmap_cls.MASKED_BITS values). These are mapped to the dot
representation, turned into coloured pixel strings, and we compute the
edit distance.
The effect of this is that we precompute the effect of storing all possible
byte values against all possible screen backgrounds (e.g. as
influencing/influenced by neighbouring bytes).
"""
bits = bitmap_cls.MASKED_BITS
bitrange = np.uint64(2 ** bits)
edit = []
for _ in range(len(bitmap_cls.BYTE_MASKS)):
edit.append(
np.zeros(shape=np.uint64(bitrange * bitrange), dtype=np.uint16))
# Matrix is symmetrical with zero diagonal so only need to compute upper
# triangle
bar = ProgressBar((bitrange * (bitrange - 1)) / 2, max_width=80)
num_dots = bitmap_cls.MASKED_DOTS
cnt = 0
for i in range(np.uint64(bitrange)):
for j in range(i):
cnt += 1
if cnt % 10000 == 0:
bar.numerator = cnt
print(bar, end='\r')
sys.stdout.flush()
pair = (np.uint64(i) << bits) + np.uint64(j)
for o, ph in enumerate(bitmap_cls.PHASES):
first_dots = bitmap_cls.to_dots(i, byte_offset=o)
second_dots = bitmap_cls.to_dots(j, byte_offset=o)
first_pixels = pixel_string(
colours.dots_to_nominal_colour_pixel_values(
num_dots, first_dots, nominal_colours,
init_phase=ph)
)
second_pixels = pixel_string(
colours.dots_to_nominal_colour_pixel_values(
num_dots, second_dots, nominal_colours,
init_phase=ph)
)
edit[o][pair] = edit_distance(
edp, first_pixels, second_pixels, error=False)
return edit
def make_edit_distance(
pal: Type[palette.BasePalette],
edp: EditDistanceParams,
bitmap_cls: Type[screen.Bitmap],
nominal_colours: Type[colours.NominalColours]
):
"""Write file containing (D)HGR edit distance matrix for a palette."""
dist = compute_edit_distance(edp, bitmap_cls, nominal_colours)
data = "transcoder/data/%s_palette_%d_edit_distance.pickle.bz2" % (
bitmap_cls.NAME, pal.ID.value)
with bz2.open(data, "wb", compresslevel=9) as out:
pickle.dump(dist, out, protocol=pickle.HIGHEST_PROTOCOL)
def main():
for p in palette.PALETTES.values():
print("Processing palette %s" % p)
edp = compute_substitute_costs(p)
# TODO: still worth using error distance matrices?
make_edit_distance(p, edp, screen.HGRBitmap, colours.HGRColours)
make_edit_distance(p, edp, screen.DHGRBitmap, colours.DHGRColours)
if __name__ == "__main__":
main()

View File

@ -0,0 +1,99 @@
import sys
import unittest
import numpy as np
from etaprogress.progress import ProgressBar
import make_data_tables
import screen
from colours import HGRColours
from palette import PALETTES
class TestMakeDataTables(unittest.TestCase):
def test_pixel_string(self):
pixels = (HGRColours.BLACK, HGRColours.WHITE, HGRColours.ORANGE)
self.assertEqual("0FC", make_data_tables.pixel_string(pixels))
def test_edit_distances_dhgr(self):
"""Assert invariants and symmetries of the edit distance matrices."""
for p in PALETTES:
ed = screen.DHGRBitmap.edit_distances(p)
print(p)
bar = ProgressBar((4 * 2 ** 13 * (2 ** 13 - 1)) / 2, max_width=80)
cnt = 0
for ph in range(3):
# Only zero entries should be on diagonal, i.e. of form
# i << 13 + i
zeros = np.arange(len(ed[ph]))[ed[ph] == 0]
for z in zeros:
z1 = z & (2 ** 13 - 1)
z2 = (z >> 13) & (2 ** 13 - 1)
self.assertEqual(z1, z2)
# Assert that matrix is symmetrical
for i in range(2 ** 13):
for j in range(i):
cnt += 1
if cnt % 10000 == 0:
bar.numerator = cnt
print(bar, end='\r')
sys.stdout.flush()
self.assertEqual(
ed[ph][(i << 13) + j],
ed[ph][(j << 13) + i],
)
# Matrix is positive definite
self.assertGreaterEqual(ed[ph][(i << 13) + j], 0)
def test_edit_distances_hgr(self):
"""Assert invariants and symmetries of the edit distance matrices."""
for p in PALETTES:
ed = screen.HGRBitmap.edit_distances(p)
print(p)
bar = ProgressBar((4 * 2 ** 14 * (2 ** 14 - 1)) / 2, max_width=80)
cnt = 0
for ph in range(2):
# TODO: for HGR this invariant isn't true, all-0 and all-1
# values for header/footer/body with/without palette bit can
# also have zero difference
# # Only zero entries should be on diagonal, i.e. of form
# # i << 14 + i
# zeros = np.arange(len(ed[ph]))[ed[ph] == 0]
# for z in zeros:
# z1 = z & (2**14-1)
# z2 = (z >> 14) & (2**14-1)
# if z1 != z2:
# self.assertEqual(z1, z2)
# Assert that matrix is symmetrical
for i in range(2 ** 14):
for j in range(i):
cnt += 1
if cnt % 10000 == 0:
bar.numerator = cnt
print(bar, end='\r')
sys.stdout.flush()
self.assertEqual(
ed[ph][(i << 14) + j],
ed[ph][(j << 14) + i],
)
# Matrix is positive definite
self.assertGreaterEqual(ed[ph][(i << 14) + j], 0)
if __name__ == '__main__':
unittest.main()

View File

@ -3,9 +3,12 @@
from typing import Iterable, Iterator
import audio
import frame_grabber
import machine
import opcodes
import video
from palette import Palette
from video_mode import VideoMode
class Movie:
@ -14,29 +17,37 @@ class Movie:
every_n_video_frames: int = 1,
audio_normalization: float = None,
max_bytes_out: int = None,
video_mode: video.Mode = video.Mode.HGR,
video_mode: VideoMode = VideoMode.HGR,
palette: Palette = Palette.NTSC,
):
self.filename = filename # type: str
self.every_n_video_frames = every_n_video_frames # type: int
self.max_bytes_out = max_bytes_out # type: int
self.video_mode = video_mode # type: video.Mode
self.video_mode = video_mode # type: VideoMode
self.palette = palette # type: Palette
self.audio = audio.Audio(
filename, normalization=audio_normalization) # type: audio.Audio
self.frame_grabber = frame_grabber.FileFrameGrabber(
filename, mode=video_mode, palette=self.palette)
self.video = video.Video(
filename, mode=video_mode,
ticks_per_second=self.audio.sample_rate
self.frame_grabber,
ticks_per_second=self.audio.sample_rate,
mode=video_mode,
palette=self.palette
) # type: video.Video
# Byte offset within TCP stream
self.stream_pos = 0 # type: int
# Current audio tick opcode count within movie stream.
self.ticks = 0 # type: int
self.state = machine.Machine(
self.video.memory_map,
self.video.update_priority
)
# Tracks internal state of player virtual machine
self.state = machine.Machine()
# Currently operating on AUX memory bank?
self.aux_memory_bank = False
def encode(self) -> Iterator[opcodes.Opcode]:
@ -44,7 +55,7 @@ class Movie:
:return:
"""
video_frames = self.video.frames()
video_frames = self.frame_grabber.frames()
main_seq = None
aux_seq = None
@ -61,13 +72,10 @@ class Movie:
if ((self.video.frame_number - 1) % self.every_n_video_frames
== 0):
print("Starting frame %d" % self.video.frame_number)
main_seq = self.video.encode_frame(
main, self.video.memory_map, self.video.update_priority)
main_seq = self.video.encode_frame(main, is_aux=False)
if aux:
aux_seq = self.video.encode_frame(
aux, self.video.aux_memory_map,
self.video.aux_update_priority)
aux_seq = self.video.encode_frame(aux, is_aux=True)
# au has range -15 .. 16 (step=1)
# Tick cycles are units of 2
@ -75,22 +83,24 @@ class Movie:
tick += 34 # 4 .. 66 (step=2)
(page, content, offsets) = next(
aux_seq if self.aux_memory_bank else main_seq)
aux_seq if self.aux_memory_bank else main_seq)
yield opcodes.TICK_OPCODES[(tick, page)](content, offsets)
def _emit_bytes(self, _op):
"""
def _emit_bytes(self, _op: opcodes.Opcode) -> Iterable[int]:
"""Emit compiled bytes corresponding to a player opcode.
:param _op:
:return:
Also tracks byte stream position.
"""
for b in self.state.emit(_op):
yield b
self.stream_pos += 1
def emit_stream(self, ops: Iterable[opcodes.Opcode]) -> Iterator[int]:
"""
"""Emit compiled byte stream corresponding to opcode stream.
Inserts padding opcodes at 2KB stream boundaries, to instruct player
to manage the TCP socket buffer.
:param ops:
:return:
@ -107,7 +117,7 @@ class Movie:
if socket_pos >= 2044:
# 2 op_ack address bytes + 2 payload bytes from ACK must
# terminate 2K stream frame
if self.video_mode == video.Mode.DHGR:
if self.video_mode == VideoMode.DHGR:
# Flip-flop between MAIN and AUX banks
self.aux_memory_bank = not self.aux_memory_bank
@ -117,7 +127,7 @@ class Movie:
yield from self.done()
def done(self) -> Iterator[int]:
"""Terminate opcode stream.
"""Terminate byte stream by emitting terminal opcode and padding to 2KB.
:return:
"""

View File

@ -4,6 +4,7 @@ import enum
from typing import Iterator, Tuple
import symbol_table
import video_mode
from machine import Machine
@ -64,7 +65,7 @@ class Header(Opcode):
"""Video header opcode."""
COMMAND = OpcodeCommand.HEADER
def __init__(self, mode: "video.Mode"):
def __init__(self, mode: video_mode.VideoMode):
self.video_mode = mode
def __data_eq__(self, other):

84
transcoder/palette.py Normal file
View File

@ -0,0 +1,84 @@
"""RGB palette values for rendering NominalColour pixels."""
import enum
from typing import Dict, Type
import colormath.color_objects
from colours import HGRColours
# Type annotation
RGB = colormath.color_objects.sRGBColor
def rgb(r, g, b):
return RGB(r, g, b, is_upscaled=True)
class Palette(enum.Enum):
"""BMP2DHR palette numbers."""
UNKNOWN = -1
IIGS = 0
NTSC = 5
class BasePalette:
ID = Palette.UNKNOWN # type: Palette
# Palette RGB map
RGB = {} # type: Dict[HGRColours: RGB]
class NTSCPalette(BasePalette):
ID = Palette.NTSC
# Palette RGB values taken from BMP2DHGR's default NTSC palette
RGB = {
HGRColours.BLACK: rgb(0, 0, 0),
HGRColours.MAGENTA: rgb(148, 12, 125),
HGRColours.BROWN: rgb(99, 77, 0),
HGRColours.ORANGE: rgb(249, 86, 29),
HGRColours.DARK_GREEN: rgb(51, 111, 0),
HGRColours.GREY1: rgb(126, 126, 126),
HGRColours.GREEN: rgb(67, 200, 0),
HGRColours.YELLOW: rgb(221, 206, 23),
HGRColours.DARK_BLUE: rgb(32, 54, 212),
HGRColours.VIOLET: rgb(188, 55, 255),
HGRColours.GREY2: rgb(126, 126, 126),
HGRColours.PINK: rgb(255, 129, 236),
HGRColours.MED_BLUE: rgb(7, 168, 225),
HGRColours.LIGHT_BLUE: rgb(158, 172, 255),
HGRColours.AQUA: rgb(93, 248, 133),
HGRColours.WHITE: rgb(255, 255, 255)
}
class IIGSPalette(BasePalette):
ID = Palette.IIGS
# Palette RGB values taken from BMP2DHGR's KEGS32 palette
RGB = {
HGRColours.BLACK: rgb(0, 0, 0),
HGRColours.MAGENTA: rgb(221, 0, 51),
HGRColours.BROWN: rgb(136, 85, 34),
HGRColours.ORANGE: rgb(255, 102, 0),
HGRColours.DARK_GREEN: rgb(0, 119, 0),
HGRColours.GREY1: rgb(85, 85, 85),
HGRColours.GREEN: rgb(0, 221, 0),
HGRColours.YELLOW: rgb(255, 255, 0),
HGRColours.DARK_BLUE: rgb(0, 0, 153),
HGRColours.VIOLET: rgb(221, 0, 221),
HGRColours.GREY2: rgb(170, 170, 170),
HGRColours.PINK: rgb(255, 153, 136),
HGRColours.MED_BLUE: rgb(34, 34, 255),
HGRColours.LIGHT_BLUE: rgb(102, 170, 255),
HGRColours.AQUA: rgb(0, 255, 153),
HGRColours.WHITE: rgb(255, 255, 255)
}
PALETTES = {
Palette.IIGS: IIGSPalette,
Palette.NTSC: NTSCPalette
} # type: Dict[Palette, Type[BasePalette]]

View File

@ -1,20 +1,20 @@
"""Various representations of Apple II video display."""
import bz2
import functools
import pickle
from typing import Union, List, Optional, Tuple
import numpy as np
import palette as pal
# TODO: support DHGR
def bitmap_similarity(a1: np.array, a2: np.array) -> float:
"""Measure bitwise % similarity between two bitmap arrays"""
bits_different = np.sum(np.logical_xor(a1, a2)).item()
return 1. - (bits_different / (np.shape(a1)[0] * np.shape(a1)[1]))
# Type annotation for cases where we may process either an int or a numpy array.
IntOrArray = Union[np.uint64, np.ndarray]
def y_to_base_addr(y: int, page: int = 0) -> int:
"""Maps y coordinate to base address on given screen page"""
"""Maps y coordinate to base address on given screen page."""
a = y // 64
d = y - 64 * a
b = d // 8
@ -30,6 +30,7 @@ Y_TO_BASE_ADDR = [
]
# Array mapping (page, offset) to x (byte) and y coords respectively
# TODO: is np.dtype(int) faster for these?
PAGE_OFFSET_TO_X = np.zeros((32, 256), dtype=np.uint8)
PAGE_OFFSET_TO_Y = np.zeros((32, 256), dtype=np.uint8)
@ -68,111 +69,6 @@ def _populate_mappings():
_populate_mappings()
class Bytemap:
"""Bitmap array with horizontal pixels packed into bytes."""
def __init__(self, bytemap: np.array = None):
self.bytemap = None # type: np.array
if bytemap is not None:
if bytemap.shape != (192, 40):
raise ValueError("Unexpected shape: %r" % (bytemap.shape,))
self.bytemap = bytemap
else:
self.bytemap = np.zeros((192, 40), dtype=np.uint8)
def to_memory_map(self, screen_page: int) -> "MemoryMap":
# Numpy magic that constructs a new array indexed by (page, offset)
# instead of (y, x).
mmap = self.bytemap[PAGE_OFFSET_TO_Y, PAGE_OFFSET_TO_X]
# Reset whatever values ended up in the screen holes after this mapping
# (which came from default 0 values in PAGE_OFFSET_TO_X)
mmap[SCREEN_HOLES] = 0
return MemoryMap(screen_page, mmap)
class Bitmap:
XMAX = None # type: int
YMAX = None # type: int
def __init__(self, bitmap: np.array = None):
if bitmap is None:
self.bitmap = np.zeros((self.YMAX, self.XMAX), dtype=bool)
else:
if bitmap.shape != (self.YMAX, self.XMAX):
raise ValueError("Unexpected shape: %r" % (bitmap.shape,))
self.bitmap = bitmap
def randomize(self) -> None:
self.bitmap = np.random.randint(
2, size=(self.YMAX, self.XMAX), dtype=bool)
@staticmethod
def _to_bytemap(bitmap) -> Bytemap:
# Insert zero column after every 7
pixels = bitmap.copy()
for i in range(pixels.shape[1] // 7 - 1, -1, -1):
pixels = np.insert(pixels, (i + 1) * 7, False, axis=1)
# packbits is big-endian so we flip the array before and after to
# invert this
return Bytemap(
np.flip(np.packbits(np.flip(pixels, axis=1), axis=1), axis=1))
def to_bytemap(self) -> Bytemap:
return self._to_bytemap(self.bitmap)
def to_memory_map(self, screen_page: int) -> "MemoryMap":
return self.to_bytemap().to_memory_map(screen_page)
@staticmethod
def _from_bytemap(bytemap: Bytemap) -> np.array:
bm = np.unpackbits(bytemap.bytemap, axis=1)
bm = np.delete(bm, np.arange(0, bm.shape[1], 8), axis=1)
# Need to flip each 7-bit sequence
reorder_cols = []
for i in range(bm.shape[1] // 7):
for j in range((i + 1) * 7 - 1, i * 7 - 1, -1):
reorder_cols.append(j)
bm = bm[:, reorder_cols]
return np.array(bm, dtype=np.bool)
@classmethod
def from_bytemap(cls, bytemap: Bytemap) -> "Bitmap":
return cls(cls._from_bytemap(bytemap))
class HGR140Bitmap(Bitmap):
XMAX = 140 # double-wide pixels to not worry about colour effects
YMAX = 192
def to_bytemap(self) -> Bytemap:
# Double each pixel horizontally
return self._to_bytemap(np.repeat(self.bitmap, 2, axis=1))
@classmethod
def from_bytemap(cls, bytemap: Bytemap) -> "HGR140Bitmap":
# Undouble pixels
bitmap = cls._from_bytemap(bytemap)
bitmap = np.array(
np.delete(bitmap, np.arange(0, bitmap.shape[1], 2), axis=1),
dtype=np.bool
)
return HGR140Bitmap(bitmap)
class HGRBitmap(Bitmap):
XMAX = 280
YMAX = 192
class DHGRBitmap(Bitmap):
XMAX = 560
YMAX = 192
class FlatMemoryMap:
"""Linear 8K representation of HGR screen memory."""
@ -223,11 +119,837 @@ class MemoryMap:
def to_flat_memory_map(self) -> FlatMemoryMap:
return FlatMemoryMap(self.screen_page, self.page_offset.reshape(8192))
def to_bytemap(self) -> Bytemap:
bytemap = self.page_offset[X_Y_TO_PAGE, X_Y_TO_OFFSET]
return Bytemap(bytemap)
def write(self, page: int, offset: int, val: int) -> None:
"""Updates screen image to set (page, offset)=val (inc. screen holes)"""
self.page_offset[page - self._page_start][offset] = val
class Bitmap:
"""Packed bitmap representation of (D)HGR screen memory.
Maintains a page-based array whose entries contain a packed representation
of multiple screen bytes, in a representation that supports efficiently
determining the visual effect of storing bytes at arbitrary screen offsets.
"""
# NOTE: See https://github.com/numpy/numpy/issues/2524 and related issues
# for why we have to cast things explicitly to np.uint64 - type promotion
# to uint64 is broken in numpy :(
# Name of bitmap type
NAME = None # type: str
# Size of packed representation, consisting of header + body + footer
HEADER_BITS = None # type: np.uint64
BODY_BITS = None # type: np.uint64
FOOTER_BITS = None # type: np.uint64
# How many bits of packed representation are necessary to determine the
# effect of storing a memory byte, e.g. because they influence pixel
# colour or are influenced by other bits.
MASKED_BITS = None # type: np.uint64
# How many coloured screen pixels we can extract from MASKED_BITS. Note
# that this does not include the last 3 dots represented by the footer,
# since we don't have enough information to determine their colour (we
# would fall off the end of the 4-bit sliding window)
MASKED_DOTS = None # type: np.uint64
# List of bitmasks for extracting the subset of packed data corresponding
# to bits influencing/influenced by a given byte offset. These must be
# a contiguous bit mask, i.e. so that after shifting they are enumerated
# by 0..2**MASKED_BITS-1
BYTE_MASKS = None # type: List[np.uint64]
BYTE_SHIFTS = None # type: List[np.uint64]
# NTSC clock phase at first masked bit
PHASES = None # type: List[int]
def __init__(
self,
palette: pal.Palette,
main_memory: MemoryMap,
aux_memory: Optional[MemoryMap]
):
self.palette = palette # type: pal.Palette
self.main_memory = main_memory # type: MemoryMap
self.aux_memory = aux_memory # type: Optional[MemoryMap]
self.PACKED_BITS = (
self.HEADER_BITS + self.BODY_BITS + self.FOOTER_BITS
) # type: np.uint64
# How many screen bytes we pack into a single scalar
self.SCREEN_BYTES = np.uint64(len(self.BYTE_MASKS)) # type: np.uint64
self.packed = np.empty(
shape=(32, 128), dtype=np.uint64) # type: np.ndarray
self._pack()
# TODO: don't leak headers/footers across screen rows. We should be using
# x-y representation rather than page-offset
@staticmethod
def _make_header(col: IntOrArray) -> IntOrArray:
"""Extract values to use as header of next column."""
raise NotImplementedError
def _body(self) -> np.ndarray:
"""Pack related screen bytes into an efficient representation."""
raise NotImplementedError
@staticmethod
def _make_footer(col: IntOrArray) -> IntOrArray:
"""Extract values to use as footer of previous column."""
raise NotImplementedError
def _pack(self) -> None:
"""Pack MemoryMap into efficient representation for diffing."""
body = self._body()
# Prepend last 3 bits of previous odd byte so we can correctly
# decode the effective colours at the beginning of the 22-bit tuple
prev_col = np.roll(body, 1, axis=1).astype(np.uint64)
header = self._make_header(prev_col)
# Don't leak header across page boundaries
header[:, 0] = 0
# Append first 3 bits of next even byte so we can correctly
# decode the effective colours at the end of the 22-bit tuple
next_col = np.roll(body, -1, axis=1).astype(np.uint64)
footer = self._make_footer(next_col)
# Don't leak footer across page boundaries
footer[:, -1] = 0
self.packed = header ^ body ^ footer
@staticmethod
def masked_update(
byte_offset: int,
old_value: IntOrArray,
new_value: np.uint8) -> IntOrArray:
"""Update int/array to store new value at byte_offset in every entry.
Does not patch up headers/footers of neighbouring columns.
"""
raise NotImplementedError
@staticmethod
@functools.lru_cache(None)
def byte_offset(page_offset: int, is_aux: bool) -> int:
"""Map screen offset for aux/main into offset within packed data."""
raise NotImplementedError
@staticmethod
@functools.lru_cache(None)
def _byte_offsets(is_aux: bool) -> Tuple[int, int]:
"""Return byte offsets within packed data for AUX/MAIN memory."""
raise NotImplementedError
@classmethod
def to_dots(cls, masked_val: int, byte_offset: int) -> int:
"""Convert masked representation to bit sequence of display dots."""
raise NotImplementedError
def apply(
self,
page: int,
offset: int,
is_aux: bool,
value: np.uint8) -> None:
"""Update packed representation of changing main/aux memory."""
byte_offset = self.byte_offset(offset, is_aux)
packed_offset = offset // 2
self.packed[page, packed_offset] = self.masked_update(
byte_offset, self.packed[page, packed_offset], value)
self._fix_scalar_neighbours(page, packed_offset, byte_offset)
def _fix_scalar_neighbours(
self,
page: int,
offset: int,
byte_offset: int) -> None:
"""Fix up column headers/footers when updating a (page, offset)."""
if byte_offset == 0 and offset > 0:
self.packed[page, offset - 1] = self._fix_column_left(
self.packed[page, offset - 1],
self.packed[page, offset]
)
elif byte_offset == (self.SCREEN_BYTES - 1) and offset < 127:
# Need to also update the 3-bit header of the next column
self.packed[page, offset + 1] = self._fix_column_right(
self.packed[page, offset + 1],
self.packed[page, offset]
)
def _fix_column_left(
self,
column_left: IntOrArray,
column: IntOrArray
) -> IntOrArray:
"""Patch up the footer of the column to the left."""
# Mask out footer(s)
column_left &= np.uint64(2 ** (self.HEADER_BITS + self.BODY_BITS) - 1)
column_left ^= self._make_footer(column)
return column_left
def _fix_column_right(
self,
column_right: IntOrArray,
column: IntOrArray
) -> IntOrArray:
"""Patch up the header of the column to the right."""
# Mask out header(s)
column_right &= np.uint64(
(2 ** (self.BODY_BITS + self.FOOTER_BITS) - 1)) << self.HEADER_BITS
column_right ^= self._make_header(column)
return column_right
def _fix_array_neighbours(
self,
ary: np.ndarray,
byte_offset: int
) -> None:
"""Fix up column headers/footers for all array entries."""
# TODO: don't leak header/footer across page boundaries
# Propagate new value into neighbouring byte headers/footers if
# necessary
if byte_offset == 0:
# Need to also update the footer of the preceding column
shifted_left = np.roll(ary, -1, axis=1)
self._fix_column_left(ary, shifted_left)
elif byte_offset == (self.SCREEN_BYTES - 1):
# Need to also update the header of the next column
shifted_right = np.roll(ary, 1, axis=1)
self._fix_column_right(ary, shifted_right)
@classmethod
@functools.lru_cache(None)
def edit_distances(cls, palette_id: pal.Palette) -> List[np.ndarray]:
"""Load edit distance matrices for masked, shifted byte values."""
data = "transcoder/data/%s_palette_%d_edit_distance.pickle.bz2" % (
cls.NAME,
palette_id.value
)
with bz2.open(data, "rb") as ed:
dist = pickle.load(ed) # type: List[np.ndarray]
# dist is an upper-triangular matrix of edit_distance(a, b)
# encoded as dist[(a << N) + b] = edit_distance(a, b)
# Because the distance metric is reflexive,
# edit_distance(b, a) = edit_distance(a, b)
identity = np.arange(2 ** (2 * cls.MASKED_BITS), dtype=np.uint64)
# Swap values of form a << N + b to b << N + a
transpose = (identity >> cls.MASKED_BITS) + (
(identity & np.uint64(2 ** cls.MASKED_BITS - 1)) <<
cls.MASKED_BITS)
for i in range(len(dist)):
dist[i][transpose] += dist[i][identity]
return dist
@classmethod
def mask_and_shift_data(
cls,
data: IntOrArray,
byte_offset: int) -> IntOrArray:
"""Masks and shifts packed data into the MASKED_BITS range."""
res = (data & cls.BYTE_MASKS[byte_offset]) >> (
cls.BYTE_SHIFTS[byte_offset])
assert np.all(res <= 2 ** cls.MASKED_BITS)
return res
# Can't cache all possible values but this seems to give a good enough hit
# rate without costing too much memory
# TODO: unit tests
@functools.lru_cache(10 ** 6)
def byte_pair_difference(
self,
byte_offset: int,
old_packed: np.uint64,
content: np.uint8
) -> np.uint16:
"""Compute effect of storing a new content byte within packed data."""
old_pixels = self.mask_and_shift_data(old_packed, byte_offset)
new_pixels = self.mask_and_shift_data(
self.masked_update(byte_offset, old_packed, content), byte_offset)
pair = (old_pixels << self.MASKED_BITS) + new_pixels
return self.edit_distances(self.palette)[byte_offset][pair]
def diff_weights(
self,
source: "Bitmap",
is_aux: bool
) -> np.ndarray:
"""Compute edit distance matrix from source bitmap."""
return self._diff_weights(source.packed, is_aux)
# TODO: unit test
def _diff_weights(
self,
source_packed: np.ndarray,
is_aux: bool,
content: np.uint8 = None
) -> np.ndarray:
"""Computes edit distance matrix from source_packed to self.packed
If content is set, the distance will be computed as if this value
was stored into each offset position of source_packed, i.e. to
allow evaluating which offsets (if any) should be chosen for storing
this content byte.
"""
diff = np.ndarray((32, 256), dtype=np.int)
offsets = self._byte_offsets(is_aux)
dists = []
for o in offsets:
if content is not None:
compare_packed = self.masked_update(o, source_packed, content)
self._fix_array_neighbours(compare_packed, o)
else:
compare_packed = source_packed
# Pixels influenced by byte offset o
source_pixels = self.mask_and_shift_data(compare_packed, o)
target_pixels = self.mask_and_shift_data(self.packed, o)
# Concatenate N-bit source and target into 2N-bit values
pair = (source_pixels << self.MASKED_BITS) + target_pixels
dist = self.edit_distances(self.palette)[o][pair].reshape(
pair.shape)
dists.append(dist)
# Interleave even/odd columns
diff[:, 0::2] = dists[0]
diff[:, 1::2] = dists[1]
return diff
def _check_consistency(self):
"""Sanity check that headers and footers are consistent."""
headers = np.roll(self._make_header(self.packed), 1, axis=1).astype(
np.uint64)
footers = np.roll(self._make_footer(self.packed), -1, axis=1).astype(
np.uint64)
mask_hf = np.uint64(0b1110000000000000000000000000000111)
res = (self.packed ^ headers ^ footers) & mask_hf
nz = np.transpose(np.nonzero(res))
ok = True
if nz.size != 0:
for p, o in nz.tolist():
if o == 0 or o == 127:
continue
ok = False
print(p, o, bin(self.packed[p, o - 1]),
bin(headers[p, o]),
bin(self.packed[p, o]),
bin(self.packed[p, o + 1]), bin(footers[p, o]),
bin(res[p, o])
)
assert ok
# TODO: unit tests
def compute_delta(
self,
content: int,
diff_weights: np.ndarray,
is_aux: bool
) -> np.ndarray:
"""Compute which content stores introduce the least additional error.
We compute the effect of storing content at all possible offsets
within self.packed, and then subtract the previous diff weights.
Negative values indicate that the new content value is closer to the
target than the current content.
"""
# TODO: use error edit distance?
new_diff = self._diff_weights(self.packed, is_aux, content)
# TODO: try different weightings
return (new_diff * 5) - diff_weights
class HGRBitmap(Bitmap):
"""Packed bitmap representation of HGR screen memory.
The HGR display is encoded in a somewhat complicated way, so we have to
do a bit of work to turn it into a useful format.
Each screen byte consists of a palette bit (7) and 6 data bits (0..6)
Each non-palette bit turns on two consecutive display dots, with bit 6
repeated a third time. This third dot may or may not be overwritten by the
effect of the next byte.
Turning on the palette bit shifts that byte's dots right by one
position.
Given two neighbouring screen bytes Aaaaaaaa, Bbbbbbbb (at even and odd
offsets), where capital letter indicates the position of the palette bit,
we use the following 22-bit packed representation:
2211111111110000000000 <-- bit position in uint22
1098765432109876543210
ffFbbbbbbbBAaaaaaaaHhh
h and f are headers/footers derived from the neighbouring screen bytes.
Since our colour artifact model (see colours.py) uses a sliding 4-bit window
onto the dot string, we need to also include a 3-bit header and footer
to account for the influence from/on neighbouring bytes, i.e. adjacent
packed values. These are just the low/high 2 data bits of the 16-bit
body of those neighbouring columns, plus the corresponding palette bit.
This 22-bit packed representation is sufficient to compute the effects
(on pixel colours) of storing a byte at even or odd offsets. From it we
can extract the bit stream of displayed HGR dots, and the mapping to pixel
colours follows the HGRColours bitmap, see colours.py.
We put the two A/B palette bits next to each other so that we can
mask a contiguous range of bits whose colours influence/are influenced by
storing a byte at a given offset.
We need to mask out bit subsequences of size 3+8+3=14, i.e. the 8-bits
corresponding to the byte being stored, plus the neighbouring 3 bits that
influence it/are influenced by it.
Note that the masked representation has the same size for both offsets (
14 bits), but different meaning, since the palette bit is in a different
position.
With this masked representation, we can precompute an edit distance for the
pixel changes resulting from all possible HGR byte stores, see
make_edit_distance.py.
The edit distance matrix is encoded by concatenating the 14-bit source
and target masked values into a 28-bit pair, which indexes into the
edit_distance array to give the corresponding edit distance.
"""
NAME = 'HGR'
# Size of packed representation, consisting of header + body + footer
HEADER_BITS = np.uint64(3)
# 2x 8-bit screen bytes
BODY_BITS = np.uint64(16)
FOOTER_BITS = np.uint64(3)
# How many bits of packed representation are necessary to determine the
# effect of storing a memory byte, e.g. because they influence pixel
# colour or are influenced by other bits.
MASKED_BITS = np.uint64(14) # 3 + 8 + 3
# How many coloured screen pixels we can extract from MASKED_BITS. Note
# that this does not include the last 3 dots represented by the footer,
# since we don't have enough information to determine their colour (we
# would fall off the end of the 4-bit sliding window)
#
# From header: 3 bits (2 HGR pixels but might be shifted right by palette)
# From body: 7 bits doubled, plus possible shift from palette bit
MASKED_DOTS = np.uint64(18) # 3 + 7 + 7
# List of bitmasks for extracting the subset of packed data corresponding
# to bits influencing/influenced by a given byte offset. These must be
# a contiguous bit mask, i.e. so that after shifting they are enumerated
# by 0..2**MASKED_BITS-1
BYTE_MASKS = [
np.uint64(0b0000000011111111111111),
np.uint64(0b1111111111111100000000)
]
BYTE_SHIFTS = [np.uint64(0), np.uint64(8)]
# NTSC clock phase at first masked bit
#
# Each HGR byte offset has the same range of uint14 possible
# values and nominal colour pixels, but with different initial
# phases:
# even: 0 (1 at start of 3-bit header)
# odd: 2 (3)
PHASES = [1, 3]
def __init__(self, palette: pal.Palette, main_memory: MemoryMap):
super(HGRBitmap, self).__init__(palette, main_memory, None)
@staticmethod
def _make_header(col: IntOrArray) -> IntOrArray:
"""Extract values to use as header of next column.
Header format is bits 5,6,0 of previous screen byte
i.e. offsets 17, 18, 11 in packed representation
"""
return (
(col & np.uint64(0b1 << 11)) >> np.uint64(9) ^ (
(col & np.uint64(0b11 << 17)) >> np.uint64(17))
)
def _body(self) -> np.ndarray:
"""Pack related screen bytes into an efficient representation.
Body is of the form:
bbbbbbbBAaaaaaaa
where capital indicates the palette bit.
"""
even = self.main_memory.page_offset[:, 0::2].astype(np.uint64)
odd = self.main_memory.page_offset[:, 1::2].astype(np.uint64)
return (
(even << 3) + ((odd & 0x7f) << 12) + ((odd & 0x80) << 4)
)
@staticmethod
def _make_footer(col: IntOrArray) -> IntOrArray:
"""Extract values to use as footer of previous column.
Footer format is bits 7,0,1 of next screen byte
i.e. offsets 10,3,4 in packed representation
"""
return (
(col & np.uint64(0b1 << 10)) >> np.uint64(10) ^ (
(col & np.uint64(0b11 << 3)) >> np.uint64(2))
) << np.uint64(19)
@staticmethod
@functools.lru_cache(None)
def byte_offset(page_offset: int, is_aux: bool) -> int:
"""Returns 0..1 offset in packed representation for page_offset."""
assert not is_aux
is_odd = page_offset % 2 == 1
return 1 if is_odd else 0
@staticmethod
@functools.lru_cache(None)
def _byte_offsets(is_aux: bool) -> Tuple[int, int]:
"""Return byte offsets within packed data for AUX/MAIN memory."""
assert not is_aux
return 0, 1
@staticmethod
@functools.lru_cache(None)
def _double_pixels(int7: int) -> int:
"""Each bit 0..6 controls two hires dots.
Input bit 6 is repeated 3 times in case the neighbouring byte is
delayed (right-shifted by one dot) due to the palette bit being set,
which means the effect of this byte is "extended" by an extra dot.
Care needs to be taken to mask this out when overwriting.
"""
double = (
# Bit pos 6
((int7 & 0x40) << 8) + ((int7 & 0x40) << 7) + (
(int7 & 0x40) << 6) +
# Bit pos 5
((int7 & 0x20) << 6) + ((int7 & 0x20) << 5) +
# Bit pos 4
((int7 & 0x10) << 5) + ((int7 & 0x10) << 4) +
# Bit pos 3
((int7 & 0x08) << 4) + ((int7 & 0x08) << 3) +
# Bit pos 2
((int7 & 0x04) << 3) + ((int7 & 0x04) << 2) +
# Bit pos 1
((int7 & 0x02) << 2) + ((int7 & 0x02) << 1) +
# Bit pos 0
((int7 & 0x01) << 1) + (int7 & 0x01)
)
return double
@classmethod
def to_dots(cls, masked_val: int, byte_offset: int) -> int:
"""Convert masked representation to bit sequence of display dots.
Packed representation is of the form:
ffFbbbbbbbBAaaaaaaaHhh
where capital indicates the palette bit.
Each non-palette bit turns on two display dots, with bit 6 repeated
a third time. This may or may not be overwritten by the next byte.
Turning on the palette bit shifts that byte's dots right by one
position.
"""
# Assert 14-bit representation
assert (masked_val & (2 ** 14 - 1)) == masked_val
# Take top 3 bits from header (plus duplicated MSB) not 4, because if it
# is palette-shifted then we don't know what is in bit 0
h = (masked_val & 0b111) << 5
hp = (h & 0x80) >> 7
res = cls._double_pixels(h & 0x7f) >> (11 - hp)
if byte_offset == 0:
# Offset 0: bbBAaaaaaaaHhh
b = (masked_val >> 3) & 0xff
bp = (b & 0x80) >> 7
else:
# Offset 1: ffFbbbbbbbBAaa
bp = (masked_val >> 3) & 0x01
b = ((masked_val >> 4) & 0x7f) ^ (bp << 7)
# Mask out current contents in case we are overwriting the extended
# high bit from previous screen byte
res &= ~((2 ** 14 - 1) << (3 + bp))
res ^= cls._double_pixels(b & 0x7f) << (3 + bp)
f = ((masked_val >> 12) & 0b11) ^ (
(masked_val >> 11) & 0b01) << 7
fp = (f & 0x80) >> 7
# Mask out current contents in case we are overwriting the extended
# high bit from previous screen byte
res &= ~((2 ** 4 - 1) << (17 + fp))
res ^= cls._double_pixels(f & 0x7f) << (17 + fp)
return res & (2 ** 21 - 1)
@staticmethod
def masked_update(
byte_offset: int,
old_value: IntOrArray,
new_value: np.uint8) -> IntOrArray:
"""Update int/array to store new value at byte_offset in every entry.
Does not patch up headers/footers of neighbouring columns.
"""
if byte_offset == 0:
# Mask out 8-bit value where update will go
masked_value = old_value & (~np.uint64(0xff << 3))
update = np.uint64(new_value) << np.uint64(3)
return masked_value ^ update
else:
# Mask out 8-bit value where update will go
masked_value = old_value & (~np.uint64(0xff << 11))
# shift palette bit into position 0
shifted_new_value = (
(new_value & 0x7f) << 1) ^ (
(new_value & 0x80) >> 7)
update = np.uint64(shifted_new_value) << np.uint64(11)
return masked_value ^ update
class DHGRBitmap(Bitmap):
"""Packed bitmap representation of DHGR screen memory.
The DHGR display encodes 7 pixels across interleaved 4-byte sequences
of AUX and MAIN memory, as follows:
PBBBAAAA PDDCCCCB PFEEEEDD PGGGGFFF
Aux N Main N Aux N+1 Main N+1 (N even)
Where A..G are the pixels, and P represents the (unused) palette bit.
This layout makes more sense when written as a (little-endian) 32-bit
integer:
33222222222211111111110000000000 <- bit pos in uint32
10987654321098765432109876543210
PGGGGFFFPFEEEEDDPDDCCCCBPBBBAAAA
i.e. apart from the palette bits this is a linear ordering of pixels,
when read from LSB to MSB (i.e. right-to-left). i.e. the screen layout
order of bits is opposite to the usual binary representation ordering.
We can simplify things by stripping out the palette bit and packing
down to a 28-bit integer representation:
33222222222211111111110000000000 <- bit pos in uint32
10987654321098765432109876543210
GGGGFFFFEEEEDDDDCCCCBBBBAAAA <- pixel A..G
3210321032103210321032103210 <- bit pos in A..G pixel
3333333222222211111110000000 <- byte offset 0.3
Since our colour artifact model (see colours.py) uses a sliding 4-bit window
onto the dot string, we need to also include a 3-bit header and footer
to account for the influence from/on neighbouring bytes, i.e. adjacent
packed values. These are just the low/high 3 bits of the 28-bit body of
those neighbouring columns.
This gives a 34-bit packed representation that is sufficient to compute
the effects (on pixel colours) of storing a byte at one of the 0..3 offsets.
Note that this representation is also 1:1 with the actual displayed
DHGR dots. The mapping to pixel colours follows the DHGRColours
bitmap, see colours.py.
Because the packed representation is contiguous, we need to mask out bit
subsequences of size 3+7+3=13, i.e. the 7-bits corresponding to the
byte being stored, plus the neighbouring 3 bits that influence it/are
influenced by it.
With this masked representation, we can precompute an edit distance for the
pixel changes resulting from all possible DHGR byte stores, see
make_edit_distance.py.
The edit distance matrix is encoded by concatenating the 13-bit source
and target masked values into a 26-bit pair, which indexes into the
edit_distance array to give the corresponding edit distance.
"""
NAME = 'DHGR'
# Packed representation is 3 + 28 + 3 = 34 bits
HEADER_BITS = np.uint64(3)
BODY_BITS = np.uint64(28)
FOOTER_BITS = np.uint64(3)
# Masked representation selecting the influence of each byte offset
MASKED_BITS = np.uint64(13) # 7-bit body + 3-bit header + 3-bit footer
# Masking is 1:1 with screen dots, but we can't compute the colour of the
# last 3 dots because we fall off the end of the 4-bit sliding window
MASKED_DOTS = np.uint64(10)
# 3-bit header + 28-bit body + 3-bit footer
BYTE_MASKS = [
# 3333222222222211111111110000000000 <- bit pos in uint64
# 3210987654321098765432109876543210
# tttGGGGFFFFEEEEDDDDCCCCBBBBAAAAhhh <- pixel A..G
# 3210321032103210321032103210 <- bit pos in A..G pixel
#
# 3333333222222211111110000000 <- byte offset 0.3
np.uint64(0b0000000000000000000001111111111111), # byte 0 uint13 mask
np.uint64(0b0000000000000011111111111110000000), # byte 1 uint13 mask
np.uint64(0b0000000111111111111100000000000000), # byte 2 uint13 mask
np.uint64(0b1111111111111000000000000000000000), # byte 3 uint13 mask
]
# How much to right-shift bits after masking, to bring into uint13 range
BYTE_SHIFTS = [np.uint64(0), np.uint64(7), np.uint64(14), np.uint64(21)]
# NTSC clock phase at first masked bit
#
# Each DHGR byte offset has the same range of uint13 possible
# values and nominal colour pixels, but with different initial
# phases:
# AUX 0: 0 (1 at start of 3-bit header)
# MAIN 0: 3 (0)
# AUX 1: 2 (3)
# MAIN 1: 1 (2)
PHASES = [1, 0, 3, 2]
@staticmethod
def _make_header(col: IntOrArray) -> IntOrArray:
"""Extract upper 3 bits of body for header of next column."""
return (col & np.uint64(0b111 << 28)) >> np.uint64(28)
def _body(self) -> np.ndarray:
"""Pack related screen bytes into an efficient representation.
For DHGR we first strip off the (unused) palette bit to produce
7-bit values, then interleave aux and main memory columns and pack
these 7-bit values into 28-bits. This sequentially encodes 7 4-bit
DHGR pixels, which is the "repeating unit" of the DHGR screen, and
in a form that is convenient to operate on.
We also shift to make room for the 3-bit header.
"""
# Palette bit is unused for DHGR so mask it out
aux = (self.aux_memory.page_offset & 0x7f).astype(np.uint64)
main = (self.main_memory.page_offset & 0x7f).astype(np.uint64)
return (
(aux[:, 0::2] << 3) +
(main[:, 0::2] << 10) +
(aux[:, 1::2] << 17) +
(main[:, 1::2] << 24)
)
@staticmethod
def _make_footer(col: IntOrArray) -> IntOrArray:
"""Extract lower 3 bits of body for footer of previous column."""
return (col & np.uint64(0b111 << 3)) << np.uint64(28)
@staticmethod
@functools.lru_cache(None)
def byte_offset(page_offset: int, is_aux: bool) -> int:
"""Returns 0..3 packed byte offset for a given page_offset and is_aux"""
is_odd = page_offset % 2 == 1
if is_aux:
if is_odd:
return 2
return 0
else: # main memory
if is_odd:
return 3
else:
return 1
@staticmethod
@functools.lru_cache(None)
def _byte_offsets(is_aux: bool) -> Tuple[int, int]:
"""Return byte offsets within packed data for AUX/MAIN memory."""
if is_aux:
offsets = (0, 2)
else:
offsets = (1, 3)
return offsets
@classmethod
def to_dots(cls, masked_val: int, byte_offset: int) -> int:
"""Convert masked representation to bit sequence of display dots.
For DHGR the 13-bit masked value is already a 13-bit dot sequence
so no need to transform it.
"""
return masked_val
@staticmethod
def masked_update(
byte_offset: int,
old_value: IntOrArray,
new_value: np.uint8) -> IntOrArray:
"""Update int/array to store new value at byte_offset in every entry.
Does not patch up headers/footers of neighbouring columns.
"""
# Mask out 7-bit value where update will go
masked_value = old_value & (
~np.uint64(0x7f << (7 * byte_offset + 3)))
update = (new_value & np.uint64(0x7f)) << np.uint64(
7 * byte_offset + 3)
return masked_value ^ update

1133
transcoder/screen_test.py Normal file

File diff suppressed because it is too large Load Diff

View File

@ -1,56 +1,38 @@
"""Encode a sequence of images as an optimized stream of screen changes."""
import enum
import heapq
import os
import queue
import random
import subprocess
import threading
from typing import List, Iterator, Tuple
# import hitherdither
import numpy as np
import skvideo.io
from PIL import Image
import edit_distance
import opcodes
import screen
class Mode(enum.Enum):
HGR = 0
DHGR = 1
from frame_grabber import FrameGrabber
from palette import Palette
from video_mode import VideoMode
class Video:
"""Apple II screen memory map encoding a bitmapped frame."""
"""Encodes sequence of images into prioritized screen byte changes."""
CLOCK_SPEED = 1024 * 1024 # type: int
def __init__(
self,
filename: str,
frame_grabber: FrameGrabber,
ticks_per_second: float,
mode: Mode = Mode.HGR,
mode: VideoMode = VideoMode.HGR,
palette: Palette = Palette.NTSC
):
self.filename = filename # type: str
self.mode = mode # type: Mode
self.mode = mode # type: VideoMode
self.frame_grabber = frame_grabber # type: FrameGrabber
self.ticks_per_second = ticks_per_second # type: float
self._reader = skvideo.io.FFmpegReader(filename)
# Compute frame rate from input video
# TODO: possible to compute time offset for each frame instead?
data = skvideo.io.ffprobe(self.filename)['video']
rate_data = data['@r_frame_rate'].split("/") # e.g. 12000/1001
self.input_frame_rate = float(
rate_data[0]) / float(rate_data[1]) # type: float
self.ticks_per_frame = (
self.ticks_per_second / self.input_frame_rate) # type: float
self.ticks_per_second / frame_grabber.input_frame_rate
) # type: float
self.frame_number = 0 # type: int
self.palette = palette # type: Palette
# Initialize empty screen
self.memory_map = screen.MemoryMap(
@ -59,181 +41,88 @@ class Video:
self.aux_memory_map = screen.MemoryMap(
screen_page=1) # type: screen.MemoryMap
self.pixelmap = screen.DHGRBitmap(
palette=palette,
main_memory=self.memory_map,
aux_memory=self.aux_memory_map
)
else:
self.pixelmap = screen.HGRBitmap(
palette=palette,
main_memory=self.memory_map,
)
# Accumulates pending edit weights across frames
self.update_priority = np.zeros((32, 256), dtype=np.int64)
self.update_priority = np.zeros((32, 256), dtype=np.int)
if self.mode == mode.DHGR:
self.aux_update_priority = np.zeros((32, 256), dtype=np.int64)
self.aux_update_priority = np.zeros((32, 256), dtype=np.int)
def tick(self, ticks: int) -> bool:
"""Keep track of when it is time for a new image frame."""
if ticks >= (self.ticks_per_frame * self.frame_number):
self.frame_number += 1
return True
return False
def _frame_grabber(self) -> Iterator[Image.Image]:
for frame_array in self._reader.nextFrame():
yield Image.fromarray(frame_array)
@staticmethod
def _rgb(r, g, b):
return (r << 16) + (g << 8) + b
# def dither_framesframes(self) -> Iterator[screen.MemoryMap]:
# palette = hitherdither.palette.Palette(
# [
# self._rgb(0,0,0), # black */
# self._rgb(148,12,125), # red - hgr 0*/
# self._rgb(32,54,212), # dk blue - hgr 0 */
# self._rgb(188,55,255), # purple - default HGR overlay color */
# self._rgb(51,111,0), # dk green - hgr 0 */
# self._rgb(126,126,126), # gray - hgr 0 */
# self._rgb(7,168,225), # med blue - alternate HGR overlay
# # color */
# self._rgb(158,172,255), # lt blue - hgr 0 */
# self._rgb(99,77,0), # brown - hgr 0 */
# self._rgb(249,86,29), # orange */
# self._rgb(126,126,126), # grey - hgr 0 */
# self._rgb(255,129,236), # pink - hgr 0 */
# self._rgb(67,200,0), # lt green */
# self._rgb(221,206,23), # yellow - hgr 0 */
# self._rgb(93,248,133), # aqua - hgr 0 */
# self._rgb(255,255,255) # white
# ]
# )
# for _idx, _frame in enumerate(self._frame_grabber()):
# if _idx % 60 == 0:
# img_dithered = hitherdither.ordered.yliluoma.yliluomas_1_ordered_dithering(
# _frame.resize((280,192), resample=Image.NEAREST),
# palette, order=8)
#
# yield img_dithered
def frames(self) -> Iterator[screen.MemoryMap]:
"""Encode frame to HGR using bmp2dhr.
We do the encoding in a background thread to parallelize.
"""
frame_dir = self.filename.split(".")[0]
try:
os.mkdir(frame_dir)
except FileExistsError:
pass
q = queue.Queue(maxsize=10)
def _hgr_decode(_idx, _frame):
outfile = "%s/%08dC.BIN" % (frame_dir, _idx)
bmpfile = "%s/%08d.bmp" % (frame_dir, _idx)
try:
os.stat(outfile)
except FileNotFoundError:
_frame = _frame.resize((280, 192), resample=Image.LANCZOS)
_frame.save(bmpfile)
# TODO: parametrize palette
subprocess.call([
"/usr/local/bin/bmp2dhr", bmpfile, "hgr",
"P0", # Kegs32 RGB Color palette(for //gs playback)
"D9" # Buckels dither
])
os.remove(bmpfile)
_main = np.fromfile(outfile, dtype=np.uint8)
return _main, None
def _dhgr_decode(_idx, _frame):
mainfile = "%s/%08d.BIN" % (frame_dir, _idx)
auxfile = "%s/%08d.AUX" % (frame_dir, _idx)
bmpfile = "%s/%08d.bmp" % (frame_dir, _idx)
try:
os.stat(mainfile)
os.stat(auxfile)
except FileNotFoundError:
_frame = _frame.resize((280, 192), resample=Image.LANCZOS)
_frame.save(bmpfile)
# TODO: parametrize palette
subprocess.call([
"/usr/local/bin/bmp2dhr", bmpfile, "dhgr",
"P0", # Kegs32 RGB Color palette (for //gs playback)
"A", # Output separate .BIN and .AUX files
"D9" # Buckels dither
])
os.remove(bmpfile)
_main = np.fromfile(mainfile, dtype=np.uint8)
_aux = np.fromfile(auxfile, dtype=np.uint8)
return _main, _aux
def worker():
"""Invoke bmp2dhr to encode input image frames and push to queue."""
for _idx, _frame in enumerate(self._frame_grabber()):
if self.mode == Mode.DHGR:
res = _dhgr_decode(_idx, _frame)
else:
res = _hgr_decode(_idx, _frame)
q.put(res)
q.put((None, None))
t = threading.Thread(target=worker, daemon=True)
t.start()
while True:
main, aux = q.get()
if main is None:
break
main_map = screen.FlatMemoryMap(
screen_page=1, data=main).to_memory_map()
if aux is None:
aux_map = None
else:
aux_map = screen.FlatMemoryMap(
screen_page=1, data=aux).to_memory_map()
yield (main_map, aux_map)
q.task_done()
t.join()
def encode_frame(
self, target: screen.MemoryMap,
memory_map: screen.MemoryMap,
update_priority: np.array,
self,
target: screen.MemoryMap,
is_aux: bool,
) -> Iterator[opcodes.Opcode]:
"""Update to match content of frame within provided budget."""
"""Converge towards target frame in priority order of edit distance."""
if is_aux:
memory_map = self.aux_memory_map
update_priority = self.aux_update_priority
else:
memory_map = self.memory_map
update_priority = self.update_priority
# Make sure nothing is leaking into screen holes
assert np.count_nonzero(
memory_map.page_offset[screen.SCREEN_HOLES]) == 0
print("Similarity %f" % (update_priority.mean()))
yield from self._index_changes(memory_map, target, update_priority)
yield from self._index_changes(
memory_map, target, update_priority, is_aux)
def _index_changes(
self,
source: screen.MemoryMap,
target: screen.MemoryMap,
update_priority: np.array
update_priority: np.array,
is_aux: True
) -> Iterator[Tuple[int, int, List[int]]]:
"""Transform encoded screen to sequence of change tuples."""
diff_weights = self._diff_weights(source, target)
if self.mode == VideoMode.DHGR:
if is_aux:
target_pixelmap = screen.DHGRBitmap(
main_memory=self.memory_map,
aux_memory=target,
palette=self.palette
)
else:
target_pixelmap = screen.DHGRBitmap(
main_memory=target,
aux_memory=self.aux_memory_map,
palette=self.palette
)
else:
target_pixelmap = screen.HGRBitmap(
main_memory=target,
palette=self.palette
)
diff_weights = target_pixelmap.diff_weights(self.pixelmap, is_aux)
# Don't bother storing into screen holes
diff_weights[screen.SCREEN_HOLES] = 0
# Clear any update priority entries that have resolved themselves
# with new frame
update_priority[diff_weights == 0] = 0
# Halve existing weights to increase bias to new diffs.
# In particular this means that existing updates with diff 1 will
# become diff 0, i.e. will only be prioritized if they are still
# diffs in the new frame.
# self.update_priority >>= 1
update_priority += diff_weights
priorities = self._heapify_priorities(update_priority)
@ -241,7 +130,12 @@ class Video:
content_deltas = {}
while priorities:
_, _, page, offset = heapq.heappop(priorities)
pri, _, page, offset = heapq.heappop(priorities)
assert not screen.SCREEN_HOLES[page, offset], (
"Attempted to store into screen hole at (%d, %d)" % (
page, offset))
# Check whether we've already cleared this diff while processing
# an earlier opcode
if update_priority[page, offset] == 0:
@ -249,100 +143,139 @@ class Video:
offsets = [offset]
content = target.page_offset[page, offset]
if self.mode == VideoMode.DHGR:
# DHGR palette bit not expected to be set
assert content < 0x80
# Clear priority for the offset we're emitting
update_priority[page, offset] = 0
source.page_offset[page, offset] = content
diff_weights[page, offset] = 0
# Update memory maps
source.page_offset[page, offset] = content
self.pixelmap.apply(page, offset, is_aux, content)
# Make sure we don't emit this offset as a side-effect of some
# other offset later.
for cd in content_deltas.values():
cd[page, offset] = 0
# TODO: what if we add another content_deltas entry later?
# We might clobber it again
# Need to find 3 more offsets to fill this opcode
for o in self._compute_error(
for err, o in self._compute_error(
page,
content,
target,
target_pixelmap,
diff_weights,
content_deltas
content_deltas,
is_aux
):
offsets.append(o)
assert o != offset
assert not screen.SCREEN_HOLES[page, o], (
"Attempted to store into screen hole at (%d, %d)" % (
page, o))
# Compute new edit distance between new content and target
# byte, so we can reinsert with this value
p = edit_distance.edit_weight(
content, target.page_offset[page, o], o % 2 == 1,
error=False)
if update_priority[page, o] == 0:
# Someone already resolved this diff.
continue
# Make sure we don't end up considering this (page, offset)
# again until the next image frame. Even if a better match
# comes along, it's probably better to fix up some other byte.
# TODO: or should we recompute it with new error?
for cd in content_deltas.values():
cd[page, o] = 0
byte_offset = target_pixelmap.byte_offset(o, is_aux)
old_packed = target_pixelmap.packed[page, o // 2]
p = target_pixelmap.byte_pair_difference(
byte_offset, old_packed, content)
# Update priority for the offset we're emitting
update_priority[page, o] = p # 0
update_priority[page, o] = p
source.page_offset[page, o] = content
self.pixelmap.apply(page, o, is_aux, content)
if p:
# This content byte introduced an error, so put back on the
# heap in case we can get back to fixing it exactly
# during this frame. Otherwise we'll get to it later.
heapq.heappush(
priorities, (-p, random.random(), page, offset))
priorities, (-p, random.getrandbits(8), page, o))
offsets.append(o)
if len(offsets) == 3:
break
# Pad to 4 if we didn't find enough
for _ in range(len(offsets), 4):
offsets.append(offsets[0])
yield (page + 32, content, offsets)
# # TODO: there is still a bug causing residual diffs when we have
# # apparently run out of work to do
if not np.array_equal(source.page_offset, target.page_offset):
diffs = np.nonzero(source.page_offset != target.page_offset)
for i in range(len(diffs[0])):
diff_p = diffs[0][i]
diff_o = diffs[1][i]
# For HGR, 0x00 or 0x7f may be visually equivalent to the same
# bytes with high bit set (depending on neighbours), so skip
# them
if (source.page_offset[diff_p, diff_o] & 0x7f) == 0 and \
(target.page_offset[diff_p, diff_o] & 0x7f) == 0:
continue
if (source.page_offset[diff_p, diff_o] & 0x7f) == 0x7f and \
(target.page_offset[diff_p, diff_o] & 0x7f) == 0x7f:
continue
print("Diff at (%d, %d): %d != %d" % (
diff_p, diff_o, source.page_offset[diff_p, diff_o],
target.page_offset[diff_p, diff_o]
))
# assert False
# If we run out of things to do, pad forever
content = target.page_offset[(0, 0)]
content = target.page_offset[0, 0]
while True:
yield (32, content, [0, 0, 0, 0])
@staticmethod
def _diff_weights(
source: screen.MemoryMap,
target: screen.MemoryMap
):
return edit_distance.screen_edit_distance(
source.page_offset, target.page_offset)
def _heapify_priorities(self, update_priority: np.array) -> List:
priorities = []
it = np.nditer(update_priority, flags=['multi_index'])
while not it.finished:
priority = it[0]
if not priority:
it.iternext()
continue
page, offset = it.multi_index
def _heapify_priorities(update_priority: np.array) -> List:
"""Build priority queue of (page, offset) ordered by update priority."""
# Use numpy vectorization to efficiently compute the list of
# (priority, random nonce, page, offset) tuples to be heapified.
pages, offsets = update_priority.nonzero()
priorities = [tuple(data) for data in np.stack((
-update_priority[pages, offsets],
# Don't use deterministic order for page, offset
nonce = random.random()
priorities.append((-priority, nonce, page, offset))
it.iternext()
np.random.randint(0, 2 ** 8, size=pages.shape[0]),
pages,
offsets)
).T.tolist()]
heapq.heapify(priorities)
return priorities
@staticmethod
def _compute_delta(content, target, old):
"""
This function is the critical path for the video encoding.
"""
return edit_distance.byte_screen_error_distance(content, target) - old
_OFFSETS = np.arange(256)
def _compute_error(self, page, content, target, old_error, content_deltas):
offsets = []
def _compute_error(self, page, content, target_pixelmap, diff_weights,
content_deltas, is_aux):
"""Build priority queue of other offsets at which to store content.
Ordered by offsets which are closest to the target content value.
"""
# TODO: move this up into parent
delta_screen = content_deltas.get(content)
if delta_screen is None:
delta_screen = self._compute_delta(
content, target.page_offset, old_error)
delta_screen = target_pixelmap.compute_delta(
content, diff_weights, is_aux)
content_deltas[content] = delta_screen
delta_page = delta_screen[page]
@ -350,23 +283,15 @@ class Video:
candidate_offsets = self._OFFSETS[cond]
priorities = delta_page[cond]
l = [
(priorities[i], random.random(), candidate_offsets[i])
deltas = [
(priorities[i], random.getrandbits(8), candidate_offsets[i])
for i in range(len(candidate_offsets))
]
heapq.heapify(l)
heapq.heapify(deltas)
while l:
_, _, o = heapq.heappop(l)
offsets.append(o)
while deltas:
pri, _, o = heapq.heappop(deltas)
assert pri < 0
assert o <= 255
# Make sure we don't end up considering this (page, offset) again
# until the next image frame. Even if a better match comes along,
# it's probably better to fix up some other byte.
for cd in content_deltas.values():
cd[page, o] = 0
if len(offsets) == 3:
break
return offsets
yield -pri, o

8
transcoder/video_mode.py Normal file
View File

@ -0,0 +1,8 @@
"""Enum representing video encoding mode."""
import enum
class VideoMode(enum.Enum):
HGR = 0 # Hi-Res
DHGR = 1 # Double Hi-Res

83
transcoder/video_test.py Normal file
View File

@ -0,0 +1,83 @@
"""Tests for the video module."""
import unittest
import frame_grabber
import palette
import screen
import video
import video_mode
class TestVideo(unittest.TestCase):
def test_diff_weights(self):
fs = frame_grabber.FrameGrabber(mode=video_mode.VideoMode.DHGR)
v = video.Video(
fs, ticks_per_second=10000.,
mode=video_mode.VideoMode.DHGR)
frame = screen.MemoryMap(screen_page=1)
frame.page_offset[0, 0] = 0b1111111
frame.page_offset[0, 1] = 0b1010101
target_pixelmap = screen.DHGRBitmap(
palette=palette.Palette.NTSC,
main_memory=v.memory_map,
aux_memory=frame
)
self.assertEqual(
0b0000000000101010100000001111111000,
target_pixelmap.packed[0, 0])
pal = palette.NTSCPalette
diff = target_pixelmap.diff_weights(v.pixelmap, is_aux=True)
# Expect byte 0 to map to 0b0001111111000
expect0 = target_pixelmap.edit_distances(pal.ID)[0][0b0001111111000]
# Expect byte 2 to map to 0b0001010101000
expect2 = target_pixelmap.edit_distances(pal.ID)[2][0b0001010101000]
self.assertEqual(expect0, diff[0, 0])
self.assertEqual(expect2, diff[0, 1])
# Update aux frame
v.aux_memory_map.page_offset = frame.page_offset
v.pixelmap._pack()
self.assertEqual(
0b0000000000101010100000001111111000,
v.pixelmap.packed[0, 0]
)
# Encode new aux frame
frame = screen.MemoryMap(screen_page=1)
frame.page_offset[0, 0] = 0b1101101
frame.page_offset[0, 1] = 0b0110110
target_pixelmap = screen.DHGRBitmap(
main_memory=v.memory_map,
aux_memory=frame,
palette=pal.ID
)
self.assertEqual(
0b0000000000011011000000001101101000,
target_pixelmap.packed[0, 0]
)
diff = target_pixelmap.diff_weights(v.pixelmap, is_aux=True)
# Masked offset 0 changes from 0001111111000 to 0001101101000
expect0 = target_pixelmap.edit_distances(pal.ID)[0][
0b00011111110000001101101000]
# Masked offset 2 changes from 0001010101000 to 0000110110000
expect2 = target_pixelmap.edit_distances(pal.ID)[2][
0b00010101010000000110110000]
self.assertEqual(expect0, diff[0, 0])
self.assertEqual(expect2, diff[0, 1])
if __name__ == '__main__':
unittest.main()