Refactor filter_resources in __main__

With the new implementation, each filter is converted to a function,
then all resources are checked if they match any of the filter
functions. This is simpler than the old implementation, where the
resource lookup code was slightly different for some filter forms.
This commit is contained in:
dgelessus 2019-12-21 03:21:15 +01:00
parent c009e8f80f
commit 2b0bbb19ed

View File

@ -83,84 +83,60 @@ def bytes_escape(bs: bytes, *, quote: typing.Optional[str]=None) -> str:
return "".join(out)
def filter_resources(rf: api.ResourceFile, filters: typing.Sequence[str]) -> typing.List[api.Resource]:
if not filters:
resources = []
for reses in rf.values():
resources.extend(reses.values())
return resources
matching: typing.MutableMapping[typing.Tuple[bytes, int], api.Resource] = collections.OrderedDict()
for filter in filters:
if len(filter) == 4:
try:
resources = rf[filter.encode("ascii")]
except KeyError:
continue
for res in resources.values():
matching[res.type, res.id] = res
elif filter[0] == filter[-1] == "'":
try:
resources = rf[bytes_unescape(filter[1:-1])]
except KeyError:
continue
for res in resources.values():
matching[res.type, res.id] = res
def filter_to_predicate(filter: str) -> typing.Callable[[api.Resource], bool]:
if len(filter) == 4:
restype = filter.encode("ascii")
return lambda res: res.type == restype
elif filter[0] == filter[-1] == "'":
restype = bytes_unescape(filter[1:-1])
return lambda res: res.type == restype
else:
pos = filter.find("'", 1)
if pos == -1:
raise ValueError(f"Invalid filter {filter!r}: Resource type must be single-quoted")
elif filter[pos + 1] != " ":
raise ValueError(f"Invalid filter {filter!r}: Resource type and ID must be separated by a space")
restype_str, resid_str = filter[:pos + 1], filter[pos + 2:]
if not restype_str[0] == restype_str[-1] == "'":
raise ValueError(
f"Invalid filter {filter!r}: Resource type is not a single-quoted type identifier: {restype_str!r}")
restype = bytes_unescape(restype_str[1:-1])
if len(restype) != 4:
raise ValueError(
f"Invalid filter {filter!r}: Type identifier must be 4 bytes after replacing escapes, got {len(restype)} bytes: {restype!r}")
if resid_str[0] != "(" or resid_str[-1] != ")":
raise ValueError(f"Invalid filter {filter!r}: Resource ID must be parenthesized")
resid_str = resid_str[1:-1]
if resid_str[0] == resid_str[-1] == '"':
name = bytes_unescape(resid_str[1:-1])
return lambda res: res.type == restype and res.name == name
elif ":" in resid_str:
if resid_str.count(":") > 1:
raise ValueError(f"Invalid filter {filter!r}: Too many colons in ID range expression: {resid_str!r}")
start_str, end_str = resid_str.split(":")
start, end = int(start_str), int(end_str)
return lambda res: res.type == restype and start <= res.id <= end
else:
pos = filter.find("'", 1)
if pos == -1:
raise ValueError(f"Invalid filter {filter!r}: Resource type must be single-quoted")
elif filter[pos + 1] != " ":
raise ValueError(f"Invalid filter {filter!r}: Resource type and ID must be separated by a space")
restype_str, resid_str = filter[:pos + 1], filter[pos + 2:]
if not restype_str[0] == restype_str[-1] == "'":
raise ValueError(
f"Invalid filter {filter!r}: Resource type is not a single-quoted type identifier: {restype_str!r}")
restype = bytes_unescape(restype_str[1:-1])
if len(restype) != 4:
raise ValueError(
f"Invalid filter {filter!r}: Type identifier must be 4 bytes after replacing escapes, got {len(restype)} bytes: {restype!r}")
if resid_str[0] != "(" or resid_str[-1] != ")":
raise ValueError(f"Invalid filter {filter!r}: Resource ID must be parenthesized")
resid_str = resid_str[1:-1]
try:
resources = rf[restype]
except KeyError:
continue
if resid_str[0] == resid_str[-1] == '"':
name = bytes_unescape(resid_str[1:-1])
for res in resources.values():
if res.name == name:
matching[res.type, res.id] = res
break
elif ":" in resid_str:
if resid_str.count(":") > 1:
raise ValueError(f"Invalid filter {filter!r}: Too many colons in ID range expression: {resid_str!r}")
start_str, end_str = resid_str.split(":")
start, end = int(start_str), int(end_str)
for res in resources.values():
if start <= res.id <= end:
matching[res.type, res.id] = res
else:
resid = int(resid_str)
try:
res = resources[resid]
except KeyError:
continue
matching[res.type, res.id] = res
return list(matching.values())
resid = int(resid_str)
return lambda res: res.type == restype and res.id == resid
def filter_resources(rf: api.ResourceFile, filters: typing.Sequence[str]) -> typing.Iterable[api.Resource]:
if not filters:
# Special case: an empty list of filters matches all resources rather than none
for reses in rf.values():
yield from reses.values()
else:
preds = [filter_to_predicate(filter) for filter in filters]
for reses in rf.values():
for res in reses.values():
if any(pred(res) for pred in preds):
yield res
def hexdump(data: bytes) -> None:
last_line = None
@ -556,7 +532,7 @@ or rewritten by the shell.
ns = ap.parse_args(args)
with open_resource_file(ns.file, fork=ns.fork) as rf:
resources = filter_resources(rf, ns.filter)
resources = list(filter_resources(rf, ns.filter))
if ns.sort:
resources.sort(key=lambda res: (res.type, res.id))