Update LocalTalkMonitor.py
This commit is contained in:
parent
72bcd522ea
commit
c2b626c56f
|
@ -7,6 +7,41 @@ import socket
|
|||
import os
|
||||
from os import path
|
||||
import struct
|
||||
import subprocess
|
||||
import shutil
|
||||
|
||||
|
||||
|
||||
def get_ddp_type_name(n):
|
||||
if n == 1: s = 'RTMP'
|
||||
elif n == 2: s = 'NBP'
|
||||
elif n == 3: s = 'ATP'
|
||||
elif n == 4: s = 'AEP'
|
||||
elif n == 5: s = 'RTMP'
|
||||
elif n == 6: s = 'ZIP'
|
||||
elif n == 7: s = 'ADSP'
|
||||
elif n == 10: s = 'ABP'
|
||||
else: s = '?'
|
||||
return '%d(%s)' % (n, s)
|
||||
|
||||
def hexdump(d):
|
||||
HEXCHUNK = 2
|
||||
wid, _ = shutil.get_terminal_size()
|
||||
lw = 2
|
||||
while 4 + lw//HEXCHUNK*(2*HEXCHUNK+1) + lw <= wid:
|
||||
lw *= 2
|
||||
lw //= 2
|
||||
ret = ''
|
||||
for i in range(0, len(d), lw):
|
||||
dd = d[i:i+lw]
|
||||
ret += '%03x|' % i
|
||||
hex = dd.hex().ljust(lw*2)
|
||||
for j in range(0, len(hex), 2*HEXCHUNK):
|
||||
ret += hex[j:j+2*HEXCHUNK] + ' '
|
||||
ret = ret[:-1] + '|'
|
||||
ret += ''.join(chr(c) if 32 <= c < 127 else '.' for c in dd).ljust(lw)
|
||||
ret += '\n'
|
||||
return ret
|
||||
|
||||
|
||||
|
||||
|
@ -56,54 +91,9 @@ while 1:
|
|||
ddp_hop_count = (ddp_len >> 10) & 0xF
|
||||
ddp_len &= 0x3FF
|
||||
data = data[13:ddp_len]
|
||||
print(ddp_src_net, ddp_dest_net)
|
||||
else:
|
||||
# llap control packet -- can probably ignore!
|
||||
continue
|
||||
|
||||
print(f'datagram {llap_proto_type} {llap_src_node}:{ddp_src_socket}->{llap_dest_node}:{ddp_dest_socket}')
|
||||
|
||||
if ddp_proto_type == 2: # NBP
|
||||
if len(data) < 2: continue
|
||||
nbp_func = data[0] >> 4
|
||||
nbp_tuple_cnt = data[0] & 0xF
|
||||
nbp_id = data[1]
|
||||
data = data[2:]
|
||||
|
||||
nbp_tuples = []
|
||||
while data and len(nbp_tuples) < nbp_tuple_cnt:
|
||||
if len(data) < 5: break
|
||||
this_tuple = list(struct.unpack_from('>HBBB', data))
|
||||
data = data[5:]
|
||||
for i in range(3):
|
||||
# This should be coded more defensively, perhaps using exceptions
|
||||
this_tuple.append(data[1:1+data[0]].decode('mac_roman'))
|
||||
data = data[1+data[0]:]
|
||||
nbp_tuples.append(tuple(this_tuple))
|
||||
|
||||
print(' NBP func', nbp_func, 'id', nbp_id)
|
||||
for t in nbp_tuples:
|
||||
print(f' net:node:sock={t[0]}:{t[1]}:{t[2]} enum={t[3]} object:type@zone={t[4]}:{t[5]}@{t[6]}')
|
||||
|
||||
elif ddp_proto_type == 10: # ATBOOT
|
||||
if len(data) < 2:
|
||||
print('malformed short packet %r' % data.hex())
|
||||
continue
|
||||
boot_type, boot_vers = struct.unpack_from('>BB', data)
|
||||
|
||||
if boot_type == 1:
|
||||
print(' ATBOOT "syn"', data[2:].hex())
|
||||
elif boot_type == 2:
|
||||
print(' ATBOOT "ack"', data[2:].hex())
|
||||
elif boot_type == 3:
|
||||
print(' ATBOOT image request', data[2:].hex())
|
||||
elif boot_type == 4:
|
||||
print(' ATBOOT image reply', data[2:].hex())
|
||||
elif boot_type >= 0x80:
|
||||
print(' ATBOOT Elliot', data.hex())
|
||||
else:
|
||||
print(' ATBOOT ???', boot_type, boot_vers, data[2:].hex())
|
||||
|
||||
else:
|
||||
print('Totally unknown DDP type', ddp_proto_type)
|
||||
print(' ' + data.hex())
|
||||
print(f'{get_ddp_type_name(ddp_proto_type)} {llap_src_node}:{ddp_src_socket}->{llap_dest_node}:{ddp_dest_socket}')
|
||||
print(hexdump(data))
|
||||
|
|
Loading…
Reference in New Issue