From c2b626c56f2bfdaab650d147160fd5e3c30323ec Mon Sep 17 00:00:00 2001 From: Elliot Nunn Date: Mon, 26 Apr 2021 11:51:30 +0800 Subject: [PATCH] Update LocalTalkMonitor.py --- LocalTalkMonitor.py | 84 ++++++++++++++++++++------------------------- 1 file changed, 37 insertions(+), 47 deletions(-) diff --git a/LocalTalkMonitor.py b/LocalTalkMonitor.py index dbdd83e..248e98e 100755 --- a/LocalTalkMonitor.py +++ b/LocalTalkMonitor.py @@ -7,6 +7,41 @@ import socket import os from os import path import struct +import subprocess +import shutil + + + +def get_ddp_type_name(n): + if n == 1: s = 'RTMP' + elif n == 2: s = 'NBP' + elif n == 3: s = 'ATP' + elif n == 4: s = 'AEP' + elif n == 5: s = 'RTMP' + elif n == 6: s = 'ZIP' + elif n == 7: s = 'ADSP' + elif n == 10: s = 'ABP' + else: s = '?' + return '%d(%s)' % (n, s) + +def hexdump(d): + HEXCHUNK = 2 + wid, _ = shutil.get_terminal_size() + lw = 2 + while 4 + lw//HEXCHUNK*(2*HEXCHUNK+1) + lw <= wid: + lw *= 2 + lw //= 2 + ret = '' + for i in range(0, len(d), lw): + dd = d[i:i+lw] + ret += '%03x|' % i + hex = dd.hex().ljust(lw*2) + for j in range(0, len(hex), 2*HEXCHUNK): + ret += hex[j:j+2*HEXCHUNK] + ' ' + ret = ret[:-1] + '|' + ret += ''.join(chr(c) if 32 <= c < 127 else '.' for c in dd).ljust(lw) + ret += '\n' + return ret @@ -56,54 +91,9 @@ while 1: ddp_hop_count = (ddp_len >> 10) & 0xF ddp_len &= 0x3FF data = data[13:ddp_len] - print(ddp_src_net, ddp_dest_net) else: # llap control packet -- can probably ignore! continue - print(f'datagram {llap_proto_type} {llap_src_node}:{ddp_src_socket}->{llap_dest_node}:{ddp_dest_socket}') - - if ddp_proto_type == 2: # NBP - if len(data) < 2: continue - nbp_func = data[0] >> 4 - nbp_tuple_cnt = data[0] & 0xF - nbp_id = data[1] - data = data[2:] - - nbp_tuples = [] - while data and len(nbp_tuples) < nbp_tuple_cnt: - if len(data) < 5: break - this_tuple = list(struct.unpack_from('>HBBB', data)) - data = data[5:] - for i in range(3): - # This should be coded more defensively, perhaps using exceptions - this_tuple.append(data[1:1+data[0]].decode('mac_roman')) - data = data[1+data[0]:] - nbp_tuples.append(tuple(this_tuple)) - - print(' NBP func', nbp_func, 'id', nbp_id) - for t in nbp_tuples: - print(f' net:node:sock={t[0]}:{t[1]}:{t[2]} enum={t[3]} object:type@zone={t[4]}:{t[5]}@{t[6]}') - - elif ddp_proto_type == 10: # ATBOOT - if len(data) < 2: - print('malformed short packet %r' % data.hex()) - continue - boot_type, boot_vers = struct.unpack_from('>BB', data) - - if boot_type == 1: - print(' ATBOOT "syn"', data[2:].hex()) - elif boot_type == 2: - print(' ATBOOT "ack"', data[2:].hex()) - elif boot_type == 3: - print(' ATBOOT image request', data[2:].hex()) - elif boot_type == 4: - print(' ATBOOT image reply', data[2:].hex()) - elif boot_type >= 0x80: - print(' ATBOOT Elliot', data.hex()) - else: - print(' ATBOOT ???', boot_type, boot_vers, data[2:].hex()) - - else: - print('Totally unknown DDP type', ddp_proto_type) - print(' ' + data.hex()) + print(f'{get_ddp_type_name(ddp_proto_type)} {llap_src_node}:{ddp_src_socket}->{llap_dest_node}:{ddp_dest_socket}') + print(hexdump(data))