bigapple/boot1_levenshtein.py

135 lines
3.6 KiB
Python

import bitstring
import datetime
import heapq
import multiprocessing
import pylev
import sqlite3
DB_PATH = '/tank/apple2/data/apple2.db'
WORKERS = 2
def main():
conn = sqlite3.connect(DB_PATH)
cursor = conn.cursor()
q = cursor.execute(
"""
select boot1_sha1, boot1.data, count(*) as c from disks
join
(select sha1, data from boot1) as boot1
on disks.boot1_sha1 = boot1.sha1 group by 1;
"""
)
hashes = []
sectors = {}
for r in q:
(hash, sector, count) = r
sectors[hash] = bitstring.BitString(bytes=sector).bin
hashes.append((count, intern(str(hash))))
hashes.sort()
num_items = len(hashes) * (len(hashes) + 1) / 2
workitems = []
for idx, data1 in enumerate(hashes):
(cnt1, hash1) = data1
for data2 in hashes[idx+1:]:
(cnt2, hash2) = data2
score = cnt1*cnt2
heapq.heappush(workitems, (-score, hash1, hash2))
num_workitems = len(workitems)
queue = multiprocessing.JoinableQueue()
results = multiprocessing.Queue()
workers = []
for _ in xrange(WORKERS):
worker = multiprocessing.Process(target=levenshtein_worker, args=(queue, results))
worker.start()
workers.append(worker)
print "Workers started"
q = cursor.execute(
"""
select source, target from boot1distances
"""
)
existing = set((intern(str(s)), intern(str(t))) for (s, t) in q)
print "Found %d existing entries" % len(existing)
items_put = 0
while True:
try:
(score, hash1, hash2) = heapq.heappop(workitems)
except IndexError:
break
if (hash1, hash2) in existing and (hash2, hash1) in existing:
continue
items_put += 1
sector1 = sectors[hash1]
sector2 = sectors[hash2]
queue.put_nowait((hash1, hash2, sector1, sector2, score))
del existing
print "%d items put" % items_put
queue.close()
start_time = datetime.datetime.now()
num_results = 0
batch = []
while True:
result = results.get()
num_results += 1
(hash1, hash2, distance, score) = result
batch.append(result)
if num_results % 100 == 0 or num_results == items_put:
# Insert results into DB
cursor.executemany(
"""INSERT OR REPLACE INTO Boot1Distances (source, target, distance) VALUES (?, ?, ?)""",
[(hash1, hash2, distance) for (hash1, hash2, distance, score) in batch]
)
# Inverse pairing
cursor.executemany(
"""INSERT OR REPLACE INTO Boot1Distances (source, target, distance) VALUES (?, ?, ?)""",
[(hash2, hash1, distance) for (hash1, hash2, distance, score) in batch]
)
conn.commit()
if num_results == items_put:
break
now = datetime.datetime.now()
eta = datetime.timedelta(
seconds=(now - start_time).total_seconds() * items_put / num_results) + start_time
print "%d/%d results = %f%% (Score: %d, ETA: %s)" % (
num_results, items_put, float(100*num_results)/items_put, score, eta)
batch = []
print "Done"
conn.close()
def levenshtein_worker(queue, results):
while True:
work = queue.get()
(hash1, hash2, sector1, sector2, score) = work
distance = pylev.levenshtein(sector1, sector2)
results.put_nowait((hash1, hash2, distance, score))
queue.task_done()
if __name__ == "__main__":
main()