import _pickle, numpy as np, itertools as it
from time import perf_counter
# from cppimport import import_hook
#
# # import cppimport
#
# # cppimport.set_quiet(False)
#
import rpxdock as rp
from rpxdock.bvh import bvh_test
from rpxdock.bvh import BVH, bvh
import rpxdock.homog as hm
[docs]def test_bvh_isect_cpp():
assert bvh_test.TEST_bvh_test_isect()
[docs]def test_bvh_isect_fixed():
# print()
mindist = 0.01
totbvh, totnaive = 0, 0
for i in range(10):
xyz1 = np.random.rand(1000, 3) + [0.9, 0.9, 0]
xyz2 = np.random.rand(1000, 3)
tcre = perf_counter()
bvh1 = BVH(xyz1)
bvh2 = BVH(xyz2)
tcre = perf_counter() - tcre
assert len(bvh1) == 1000
pos1 = hm.htrans([0.9, 0.9, 0.9])
pos2 = np.eye(4)
tbvh = perf_counter()
clash1 = bvh.bvh_isect_fixed(bvh1, bvh2, mindist)
tbvh = perf_counter() - tbvh
tn = perf_counter()
clash2 = bvh.naive_isect_fixed(bvh1, bvh2, mindist)
tn = perf_counter() - tn
assert clash1 == clash2
# print(f"{i:3} clash {clash1:1} {tn / tbvh:8.2f}, {tn:1.6f}, {tbvh:1.6f}")
totbvh += tbvh
totnaive += tn
print("total times", totbvh, totnaive / totbvh, totnaive)
[docs]def test_bvh_isect():
t = rp.Timer().start()
N1, N2 = 10, 10
N = N1 * N2
mindist = 0.04
nclash = 0
for outer in range(N1):
xyz1 = np.random.rand(1250, 3) - [0.5, 0.5, 0.5]
xyz2 = np.random.rand(1250, 3) - [0.5, 0.5, 0.5]
pos1 = hm.rand_xform(N2, cart_sd=0.8)
pos2 = hm.rand_xform(N2, cart_sd=0.8)
t.checkpoint('init')
bvh1 = BVH(xyz1)
bvh2 = BVH(xyz2)
t.checkpoint('BVH')
clash = list()
for inner in range(N2):
clash1 = bvh.bvh_isect(bvh1=bvh1, bvh2=bvh2, pos1=pos1[inner], pos2=pos2[inner],
mindist=mindist)
t.checkpoint('bvh_isect')
clash2 = bvh.naive_isect(bvh1, bvh2, pos1[inner], pos2[inner], mindist)
t.checkpoint('naive_isect')
assert clash1 == clash2
clash.append(clash1)
clashvec = bvh.bvh_isect_vec(bvh1, bvh2, pos1, pos2, mindist)
t.checkpoint('bvh_isect_vec')
assert np.all(clashvec == clash)
nclash += sum(clash)
assert clashvec[1] == bvh.bvh_isect_vec(bvh1, bvh2, pos1[1], pos2[1], mindist)
bvh.bvh_isect_vec(bvh1, bvh2, pos1, pos2[1], mindist) # ?? make sure api works?
bvh.bvh_isect_vec(bvh1, bvh2, pos1[1], pos2, mindist)
print(
f"Ngeom {N1:,} Npos {N2:,} isect {nclash/N:4.2f} bvh: {int(N/t.sum.bvh_isect):,}/s",
f"bvh_vec {int(N/t.sum.bvh_isect_vec):,} fastnaive {int(N/t.sum.naive_isect):,}/s",
f"ratio {int(t.sum.naive_isect/t.sum.bvh_isect_vec):,}x",
)
[docs]def test_bvh_isect_fixed_range():
N1, N2 = 10, 10
N = N1 * N2
mindist = 0.04
nclash = 0
for outer in range(N1):
xyz1 = np.random.rand(1000, 3) - [0.5, 0.5, 0.5]
xyz2 = np.random.rand(1000, 3) - [0.5, 0.5, 0.5]
bvh1 = BVH(xyz1)
bvh2 = BVH(xyz2)
bvh1_half = BVH(xyz1[250:750])
bvh2_half = BVH(xyz2[250:750])
pos1 = hm.rand_xform(N2, cart_sd=0.5)
pos2 = hm.rand_xform(N2, cart_sd=0.5)
isect1 = bvh.bvh_isect_vec(bvh1, bvh2, pos1, pos2, mindist)
isect2, clash = bvh.bvh_isect_fixed_range_vec(bvh1, bvh2, pos1, pos2, mindist)
assert np.all(isect1 == isect2)
bounds = [250], [749], [250], [749]
isect1 = bvh.bvh_isect_vec(bvh1_half, bvh2_half, pos1, pos2, mindist)
isect2, clash = bvh.bvh_isect_fixed_range_vec(bvh1, bvh2, pos1, pos2, mindist, *bounds)
assert np.all(isect1 == isect2)
[docs]def test_bvh_min_cpp():
assert bvh_test.TEST_bvh_test_min()
[docs]def test_bvh_min_dist_fixed():
xyz1 = np.random.rand(5000, 3) + [0.9, 0.9, 0.0]
xyz2 = np.random.rand(5000, 3)
tcre = perf_counter()
bvh1 = BVH(xyz1)
bvh2 = BVH(xyz2)
tcre = perf_counter() - tcre
tbvh = perf_counter()
d, i1, i2 = bvh.bvh_min_dist_fixed(bvh1, bvh2)
tbvh = perf_counter() - tbvh
dtest = np.linalg.norm(xyz1[i1] - xyz2[i2])
assert np.allclose(d, dtest, atol=1e-6)
# tnp = perf_counter()
# dnp = np.min(np.linalg.norm(xyz1[:, None] - xyz2[None], axis=2))
# tnp = perf_counter() - tnp
tn = perf_counter()
dn = bvh.naive_min_dist_fixed(bvh1, bvh2)
tn = perf_counter() - tn
print()
print("from bvh: ", d)
print("from naive:", dn)
assert np.allclose(dn, d, atol=1e-6)
print(f"tnaivecpp {tn:5f} tbvh {tbvh:5f} tbvhcreate {tcre:5f}")
print("bvh acceleration vs naive", tn / tbvh)
# assert tn / tbvh > 100
[docs]def test_bvh_min_dist():
xyz1 = np.random.rand(1000, 3) - [0.5, 0.5, 0.5]
xyz2 = np.random.rand(1000, 3) - [0.5, 0.5, 0.5]
tcre = perf_counter()
bvh1 = BVH(xyz1)
bvh2 = BVH(xyz2)
tcre = perf_counter() - tcre
# print()
totbvh, totnaive = 0, 0
N = 10
pos1 = hm.rand_xform(N, cart_sd=1)
pos2 = hm.rand_xform(N, cart_sd=1)
dis = list()
for i in range(N):
tbvh = perf_counter()
d, i1, i2 = bvh.bvh_min_dist(bvh1, bvh2, pos1[i], pos2[i])
tbvh = perf_counter() - tbvh
dtest = np.linalg.norm(pos1[i] @ hm.hpoint(xyz1[i1]) - pos2[i] @ hm.hpoint(xyz2[i2]))
assert np.allclose(d, dtest, atol=1e-6)
tn = perf_counter()
dn = bvh.naive_min_dist(bvh1, bvh2, pos1[i], pos2[i])
tn = perf_counter() - tn
assert np.allclose(dn, d, atol=1e-6)
dis.append((d, i1, i2))
# print(
# f"tnaivecpp {tn:1.6f} tbvh {tbvh:1.6f} tcpp/tbvh {tn/tbvh:8.1f}",
# np.linalg.norm(pos1[:3, 3]),
# dtest - d,
# )
totnaive += tn
totbvh += tbvh
d, i1, i2 = bvh.bvh_min_dist_vec(bvh1, bvh2, pos1, pos2)
for a, b, c, x in zip(d, i1, i2, dis):
assert a == x[0]
assert b == x[1]
assert c == x[2]
print(
"total times",
totbvh / N * 1000,
"ms",
totnaive / totbvh,
totnaive,
f"tcre {tcre:2.4f}",
)
[docs]def test_bvh_min_dist_floormin():
xyz1 = np.random.rand(1000, 3) - [0.5, 0.5, 0.5]
xyz2 = np.random.rand(1000, 3) - [0.5, 0.5, 0.5]
tcre = perf_counter()
bvh1 = BVH(xyz1)
bvh2 = BVH(xyz2)
tcre = perf_counter() - tcre
# print()
totbvh, totnaive = 0, 0
N = 10
for i in range(N):
pos1 = hm.rand_xform(cart_sd=1)
pos2 = hm.rand_xform(cart_sd=1)
tbvh = perf_counter()
d, i1, i2 = bvh.bvh_min_dist(bvh1, bvh2, pos1, pos2)
tbvh = perf_counter() - tbvh
dtest = np.linalg.norm(pos1 @ hm.hpoint(xyz1[i1]) - pos2 @ hm.hpoint(xyz2[i2]))
assert np.allclose(d, dtest, atol=1e-6)
tn = perf_counter()
dn = bvh.naive_min_dist(bvh1, bvh2, pos1, pos2)
tn = perf_counter() - tn
assert np.allclose(dn, d, atol=1e-6)
# print(
# f"tnaivecpp {tn:1.6f} tbvh {tbvh:1.6f} tcpp/tbvh {tn/tbvh:8.1f}",
# np.linalg.norm(pos1[:3, 3]),
# dtest - d,
# )
totnaive += tn
totbvh += tbvh
print(
"total times",
totbvh / N * 1000,
"ms",
totnaive / totbvh,
totnaive,
f"tcre {tcre:2.4f}",
)
[docs]def test_bvh_slide_single_inline():
bvh1 = BVH([[-10, 0, 0]])
bvh2 = BVH([[0, 0, 0]])
d = bvh.bvh_slide(bvh1, bvh2, np.eye(4), np.eye(4), rad=1.0, dirn=[1, 0, 0])
assert d == 8
# moves xyz1 to -2,0,0
# should always come in from "infinity" from -direction
bvh1 = BVH([[10, 0, 0]])
bvh2 = BVH([[0, 0, 0]])
d = bvh.bvh_slide(bvh1, bvh2, np.eye(4), np.eye(4), rad=1.0, dirn=[1, 0, 0])
assert d == -12
# also moves xyz1 to -2,0,0
for i in range(100):
np.random.seed(i)
dirn = np.array([np.random.randn(), 0, 0])
dirn /= np.linalg.norm(dirn)
rad = np.abs(np.random.randn() / 10)
xyz1 = np.array([[np.random.randn(), 0, 0]])
xyz2 = np.array([[np.random.randn(), 0, 0]])
bvh1 = BVH(xyz1)
bvh2 = BVH(xyz2)
d = bvh.bvh_slide(bvh1, bvh2, np.eye(4), np.eye(4), rad=rad, dirn=dirn)
xyz1 += d * dirn
assert np.allclose(np.linalg.norm(xyz1 - xyz2), 2 * rad, atol=1e-4)
[docs]def test_bvh_slide_single():
nmiss = 0
for i in range(100):
# np.random.seed(i)
dirn = np.random.randn(3)
dirn /= np.linalg.norm(dirn)
rad = np.abs(np.random.randn())
xyz1 = np.random.randn(1, 3)
xyz2 = np.random.randn(1, 3)
bvh1 = BVH(xyz1)
bvh2 = BVH(xyz2)
d = bvh.bvh_slide(bvh1, bvh2, np.eye(4), np.eye(4), rad=rad, dirn=dirn)
if d < 9e8:
xyz1 += d * dirn
assert np.allclose(np.linalg.norm(xyz1 - xyz2), 2 * rad, atol=1e-4)
else:
nmiss += 1
delta = xyz2 - xyz1
d0 = delta.dot(dirn)
dperp2 = np.sum(delta * delta) - d0 * d0
target_d2 = 4 * rad**2
assert target_d2 < dperp2
print("nmiss", nmiss, nmiss / 1000)
[docs]def test_bvh_slide_whole():
# timings wtih -Ofast
# slide test 10,000 iter bvhslide float: 16,934/s double: 16,491/s bvhmin 17,968/s fracmiss: 0.0834
# np.random.seed(0)
N1, N2 = 2, 10
totbvh, totbvhf, totmin = 0, 0, 0
nmiss = 0
for j in range(N1):
xyz1 = np.random.rand(5000, 3) - [0.5, 0.5, 0.5]
xyz2 = np.random.rand(5000, 3) - [0.5, 0.5, 0.5]
# tcre = perf_counter()
bvh1 = BVH(xyz1)
bvh2 = BVH(xyz2)
# bvh1f = BVH_32bit(xyz1)
# bvh2f = BVH_32bit(xyz2)
# tcre = perf_counter() - tcre
pos1 = hm.rand_xform(N2, cart_sd=0.5)
pos2 = hm.rand_xform(N2, cart_sd=0.5)
dirn = np.random.randn(3)
dirn /= np.linalg.norm(dirn)
radius = 0.001 + np.random.rand() / 10
slides = list()
for i in range(N2):
tbvh = perf_counter()
dslide = bvh.bvh_slide(bvh1, bvh2, pos1[i], pos2[i], radius, dirn)
tbvh = perf_counter() - tbvh
tbvhf = perf_counter()
# dslide = bvh.bvh_slide_32bit(bvh1f, bvh2f, pos1[i], pos2[i], radius, dirn)
tbvhf = perf_counter() - tbvhf
slides.append(dslide)
if dslide > 9e8:
tn = perf_counter()
dn, i, j = bvh.bvh_min_dist(bvh1, bvh2, pos1[i], pos2[i])
tn = perf_counter() - tn
assert dn > 2 * radius
nmiss += 1
else:
tmp = hm.htrans(dirn * dslide) @ pos1[i]
tn = perf_counter()
dn, i, j = bvh.bvh_min_dist(bvh1, bvh2, tmp, pos2[i])
tn = perf_counter() - tn
if not np.allclose(dn, 2 * radius, atol=1e-6):
print(dn, 2 * radius)
assert np.allclose(dn, 2 * radius, atol=1e-6)
# print(
# i,
# f"tnaivecpp {tn:1.6f} tbvh {tbvh:1.6f} tcpp/tbvh {tn/tbvh:8.1f}",
# np.linalg.norm(pos1[:3, 3]),
# dslide,
# )
totmin += tn
totbvh += tbvh
totbvhf += tbvhf
slides2 = bvh.bvh_slide_vec(bvh1, bvh2, pos1, pos2, radius, dirn)
assert np.allclose(slides, slides2)
N = N1 * N2
print(
f"slide test {N:,} iter bvhslide double: {int(N/totbvh):,}/s bvhmin {int(N/totmin):,}/s",
# f"slide test {N:,} iter bvhslide float: {int(N/totbvhf):,}/s double: {int(N/totbvh):,}/s bvhmin {int(N/totmin):,}/s",
f"fracmiss: {nmiss/N}",
)
[docs]def test_collect_pairs_simple():
print("test_collect_pairs_simple")
bufbvh = -np.ones((100, 2), dtype="i4")
bufnai = -np.ones((100, 2), dtype="i4")
bvh1 = BVH([[0, 0, 0], [0, 2, 0]])
bvh2 = BVH([[0.9, 0, 0], [0.9, 2, 0]])
assert len(bvh1) == 2
mindist = 1.0
pos1 = np.eye(4)
pos2 = np.eye(4)
pbvh, o = bvh.bvh_collect_pairs(bvh1, bvh2, pos1, pos2, mindist, bufbvh)
nnai = bvh.naive_collect_pairs(bvh1, bvh2, pos1, pos2, mindist, bufnai)
assert not o
print(pbvh.shape)
assert len(pbvh) == 2 and nnai == 2
assert np.all(pbvh == [[0, 0], [1, 1]])
assert np.all(bufnai[:nnai] == [[0, 0], [1, 1]])
pos1 = hm.htrans([0, 2, 0])
pbvh, o = bvh.bvh_collect_pairs(bvh1, bvh2, pos1, pos2, mindist, bufbvh)
nnai = bvh.naive_collect_pairs(bvh1, bvh2, pos1, pos2, mindist, bufnai)
assert not o
assert len(pbvh) == 1 and nnai == 1
assert np.all(pbvh == [[0, 1]])
assert np.all(bufnai[:nnai] == [[0, 1]])
pos1 = hm.htrans([0, -2, 0])
pbvh, o = bvh.bvh_collect_pairs(bvh1, bvh2, pos1, pos2, mindist, bufbvh)
nnai = bvh.naive_collect_pairs(bvh1, bvh2, pos1, pos2, mindist, bufnai)
assert not o
assert len(pbvh) == 1 and nnai == 1
assert np.all(pbvh == [[1, 0]])
assert np.all(bufnai[:nnai] == [[1, 0]])
[docs]def test_collect_pairs_simple_selection():
print("test_collect_pairs_simple_selection")
bufbvh = -np.ones((100, 2), dtype="i4")
bufnai = -np.ones((100, 2), dtype="i4")
crd1 = [[0, 0, 0], [0, 0, 0], [0, 2, 0], [0, 0, 0]]
crd2 = [[0, 0, 0], [0.9, 0, 0], [0, 0, 0], [0.9, 2, 0]]
mask1 = [1, 0, 1, 0]
mask2 = [0, 1, 0, 1]
bvh1 = BVH(crd1, mask1)
bvh2 = BVH(crd2, mask2)
assert len(bvh1) == 2
assert np.allclose(bvh1.radius(), 1.0, atol=1e-6)
assert np.allclose(bvh1.center(), [0, 1, 0], atol=1e-6)
mindist = 1.0
pos1 = np.eye(4)
pos2 = np.eye(4)
pbvh, o = bvh.bvh_collect_pairs(bvh1, bvh2, pos1, pos2, mindist, bufbvh)
assert not o
nnai = bvh.naive_collect_pairs(bvh1, bvh2, pos1, pos2, mindist, bufnai)
assert len(pbvh) == 2 and nnai == 2
assert np.all(pbvh == [[0, 1], [2, 3]])
assert np.all(bufnai[:nnai] == [[0, 1], [2, 3]])
pos1 = hm.htrans([0, 2, 0])
pbvh, o = bvh.bvh_collect_pairs(bvh1, bvh2, pos1, pos2, mindist, bufbvh)
assert not o
nnai = bvh.naive_collect_pairs(bvh1, bvh2, pos1, pos2, mindist, bufnai)
assert len(pbvh) == 1 and nnai == 1
assert np.all(pbvh == [[0, 3]])
assert np.all(bufnai[:nnai] == [[0, 3]])
pos1 = hm.htrans([0, -2, 0])
pbvh, o = bvh.bvh_collect_pairs(bvh1, bvh2, pos1, pos2, mindist, bufbvh)
assert not o
nnai = bvh.naive_collect_pairs(bvh1, bvh2, pos1, pos2, mindist, bufnai)
assert len(pbvh) == 1 and nnai == 1
assert np.all(pbvh == [[2, 1]])
assert np.all(bufnai[:nnai] == [[2, 1]])
[docs]def test_collect_pairs():
N1, N2 = 1, 50
N = N1 * N2
Npts = 500
totbvh, totbvhf, totmin = 0, 0, 0
totbvh, totnai, totct, ntot = 0, 0, 0, 0
bufbvh = -np.ones((Npts * Npts, 2), dtype="i4")
bufnai = -np.ones((Npts * Npts, 2), dtype="i4")
for j in range(N1):
xyz1 = np.random.rand(Npts, 3) - [0.5, 0.5, 0.5]
xyz2 = np.random.rand(Npts, 3) - [0.5, 0.5, 0.5]
bvh1 = BVH(xyz1)
bvh2 = BVH(xyz2)
pos1, pos2 = list(), list()
while 1:
x1 = hm.rand_xform(cart_sd=0.5)
x2 = hm.rand_xform(cart_sd=0.5)
d = np.linalg.norm(x1[:, 3] - x2[:, 3])
if 0.8 < d < 1.3:
pos1.append(x1)
pos2.append(x2)
if len(pos1) == N2:
break
pos1 = np.stack(pos1)
pos2 = np.stack(pos2)
pairs = list()
mindist = 0.002 + np.random.rand() / 10
for i in range(N2):
tbvh = perf_counter()
pbvh, o = bvh.bvh_collect_pairs(bvh1, bvh2, pos1[i], pos2[i], mindist, bufbvh)
tbvh = perf_counter() - tbvh
assert not o
tnai = perf_counter()
nnai = bvh.naive_collect_pairs(bvh1, bvh2, pos1[i], pos2[i], mindist, bufnai)
tnai = perf_counter() - tnai
tct = perf_counter()
nct = bvh.bvh_count_pairs(bvh1, bvh2, pos1[i], pos2[i], mindist)
tct = perf_counter() - tct
ntot += nct
assert nct == len(pbvh)
totnai += 1
pairs.append(pbvh.copy())
totbvh += tbvh
totnai += tnai
totct += tct
assert len(pbvh) == nnai
if len(pbvh) == 0:
continue
o = np.lexsort((pbvh[:, 1], pbvh[:, 0]))
pbvh[:] = pbvh[:][o]
o = np.lexsort((bufnai[:nnai, 1], bufnai[:nnai, 0]))
bufnai[:nnai] = bufnai[:nnai][o]
assert np.all(pbvh == bufnai[:nnai])
pair1 = pos1[i] @ hm.hpoint(xyz1[pbvh[:, 0]])[..., None]
pair2 = pos2[i] @ hm.hpoint(xyz2[pbvh[:, 1]])[..., None]
dpair = np.linalg.norm(pair2 - pair1, axis=1)
assert np.max(dpair) <= mindist
pcount = bvh.bvh_count_pairs_vec(bvh1, bvh2, pos1, pos2, mindist)
assert np.all(pcount == [len(x) for x in pairs])
pairs2, lbub = bvh.bvh_collect_pairs_vec(bvh1, bvh2, pos1, pos2, mindist)
for i, p in enumerate(pairs):
lb, ub = lbub[i]
assert np.all(pairs2[lb:ub] == pairs[i])
x, y = bvh.bvh_collect_pairs_vec(bvh1, bvh2, pos1[:3], pos2[0], mindist)
assert len(y) == 3
x, y = bvh.bvh_collect_pairs_vec(bvh1, bvh2, pos1[0], pos2[:5], mindist)
assert len(y) == 5
print(
f"collect test {N:,} iter bvh {int(N/totbvh):,}/s naive {int(N/totnai):,}/s ratio {totnai/totbvh:7.2f} count-only {int(N/totct):,}/s avg cnt {ntot/N}"
)
[docs]def test_collect_pairs_range():
N1, N2 = 1, 500
N = N1 * N2
Npts = 1000
for j in range(N1):
xyz1 = np.random.rand(Npts, 3) - [0.5, 0.5, 0.5]
xyz2 = np.random.rand(Npts, 3) - [0.5, 0.5, 0.5]
bvh1 = BVH(xyz1)
bvh2 = BVH(xyz2)
pos1, pos2 = list(), list()
while 1:
x1 = hm.rand_xform(cart_sd=0.5)
x2 = hm.rand_xform(cart_sd=0.5)
d = np.linalg.norm(x1[:, 3] - x2[:, 3])
if 0.8 < d < 1.3:
pos1.append(x1)
pos2.append(x2)
if len(pos1) == N2:
break
pos1 = np.stack(pos1)
pos2 = np.stack(pos2)
pairs = list()
mindist = 0.002 + np.random.rand() / 10
pairs, lbub = bvh.bvh_collect_pairs_vec(bvh1, bvh2, pos1, pos2, mindist)
rpairs, rlbub = bvh.bvh_collect_pairs_range_vec(bvh1, bvh2, pos1, pos2, mindist)
assert np.all(lbub == rlbub)
assert np.all(pairs == rpairs)
rpairs, rlbub = bvh.bvh_collect_pairs_range_vec(bvh1, bvh2, pos1, pos2, mindist, [250],
[750])
assert len(rlbub) == len(pos1)
assert np.all(rpairs[:, 0] >= 250)
assert np.all(rpairs[:, 0] <= 750)
filt_pairs = pairs[np.logical_and(pairs[:, 0] >= 250, pairs[:, 0] <= 750)]
# assert np.all(filt_pairs == rpairs) # sketchy???
assert np.allclose(np.unique(filt_pairs, axis=1), np.unique(rpairs, axis=1))
rpairs, rlbub = bvh.bvh_collect_pairs_range_vec(bvh1, bvh2, pos1, pos2, mindist, [600],
[1000], -1, [100], [400], -1)
assert len(rlbub) == len(pos1)
assert np.all(rpairs[:, 0] >= 600)
assert np.all(rpairs[:, 0] <= 1000)
assert np.all(rpairs[:, 1] >= 100)
assert np.all(rpairs[:, 1] <= 400)
filt_pairs = pairs[(pairs[:, 0] >= 600) * (pairs[:, 0] <= 1000) * (pairs[:, 1] >= 100) *
(pairs[:, 1] <= 400)]
assert np.all(filt_pairs == rpairs) # sketchy???
assert np.allclose(np.unique(filt_pairs, axis=1), np.unique(rpairs, axis=1))
[docs]def test_collect_pairs_range_sym():
# np.random.seed(132)
N1, N2 = 5, 100
N = N1 * N2
Npts = 1000
for j in range(N1):
xyz1 = np.random.rand(Npts, 3) - [0.5, 0.5, 0.5]
xyz2 = np.random.rand(Npts, 3) - [0.5, 0.5, 0.5]
bvh1 = BVH(xyz1)
bvh2 = BVH(xyz2)
pos1, pos2 = list(), list()
while 1:
x1 = hm.rand_xform(cart_sd=0.5)
x2 = hm.rand_xform(cart_sd=0.5)
d = np.linalg.norm(x1[:, 3] - x2[:, 3])
if 0.8 < d < 1.3:
pos1.append(x1)
pos2.append(x2)
if len(pos1) == N2:
break
pos1 = np.stack(pos1)
pos2 = np.stack(pos2)
pairs = list()
mindist = 0.002 + np.random.rand() / 10
pairs, lbub = bvh.bvh_collect_pairs_vec(bvh1, bvh2, pos1, pos2, mindist)
rpairs, rlbub = bvh.bvh_collect_pairs_range_vec(bvh1, bvh2, pos1, pos2, mindist)
assert np.all(lbub == rlbub)
assert np.all(pairs == rpairs)
bounds = [100], [400], len(xyz1) // 2
rpairs, rlbub = bvh.bvh_collect_pairs_range_vec(bvh1, bvh2, pos1, pos2, mindist, *bounds)
assert len(rlbub) == len(pos1)
assert np.all(
np.logical_or(np.logical_and(100 <= rpairs[:, 0], rpairs[:, 0] <= 400),
np.logical_and(600 <= rpairs[:, 0], rpairs[:, 0] <= 900)))
filt_pairs = pairs[np.logical_or(np.logical_and(100 <= pairs[:, 0], pairs[:, 0] <= 400),
np.logical_and(600 <= pairs[:, 0], pairs[:, 0] <= 900))]
assert np.allclose(np.unique(filt_pairs, axis=1), np.unique(rpairs, axis=1))
bounds = [100], [400], len(xyz1) // 2, [20], [180], len(xyz1) // 5
rpairs, rlbub = bvh.bvh_collect_pairs_range_vec(bvh1, bvh2, pos1, pos2, mindist, *bounds)
def awful(p):
return np.logical_and(
np.logical_or(np.logical_and(100 <= p[:, 0], p[:, 0] <= 400),
np.logical_and(600 <= p[:, 0], p[:, 0] <= 900)),
np.logical_or(
np.logical_and(+20 <= p[:, 1], p[:, 1] <= 180),
np.logical_or(
np.logical_and(220 <= p[:, 1], p[:, 1] <= 380),
np.logical_or(
np.logical_and(420 <= p[:, 1], p[:, 1] <= 580),
np.logical_or(np.logical_and(620 <= p[:, 1], p[:, 1] <= 780),
np.logical_and(820 <= p[:, 1], p[:, 1] <= 980))))))
assert len(rlbub) == len(pos1)
assert np.all(awful(rpairs))
filt_pairs = pairs[awful(pairs)]
assert np.all(filt_pairs == rpairs) # sketchy???
assert np.allclose(np.unique(filt_pairs, axis=1), np.unique(rpairs, axis=1))
[docs]def test_slide_collect_pairs():
# timings wtih -Ofast
# slide test 10,000 iter bvhslide float: 16,934/s double: 16,491/s bvhmin 17,968/s fracmiss: 0.0834
# np.random.seed(0)
N1, N2 = 2, 50
Npts = 5000
totbvh, totbvhf, totcol, totmin = 0, 0, 0, 0
nhit = 0
buf = -np.ones((Npts * Npts, 2), dtype="i4")
for j in range(N1):
xyz1 = np.random.rand(Npts, 3) - [0.5, 0.5, 0.5]
xyz2 = np.random.rand(Npts, 3) - [0.5, 0.5, 0.5]
xyzcol1 = xyz1[:int(Npts / 5)]
xyzcol2 = xyz2[:int(Npts / 5)]
# tcre = perf_counter()
bvh1 = BVH(xyz1)
bvh2 = BVH(xyz2)
bvhcol1 = BVH(xyzcol1)
bvhcol2 = BVH(xyzcol2)
# tcre = perf_counter() - tcre
for i in range(N2):
dirn = np.random.randn(3)
dirn /= np.linalg.norm(dirn)
radius = 0.001 + np.random.rand() / 10
pairdis = 3 * radius
pos1 = hm.rand_xform(cart_sd=0.5)
pos2 = hm.rand_xform(cart_sd=0.5)
tbvh = perf_counter()
dslide = bvh.bvh_slide(bvh1, bvh2, pos1, pos2, radius, dirn)
tbvh = perf_counter() - tbvh
if dslide > 9e8:
tn = perf_counter()
dn, i, j = bvh.bvh_min_dist(bvh1, bvh2, pos1, pos2)
tn = perf_counter() - tn
assert dn > 2 * radius
else:
nhit += 1
pos1 = hm.htrans(dirn * dslide) @ pos1
tn = perf_counter()
dn, i, j = bvh.bvh_min_dist(bvh1, bvh2, pos1, pos2)
tn = perf_counter() - tn
if not np.allclose(dn, 2 * radius, atol=1e-6):
print(dn, 2 * radius)
assert np.allclose(dn, 2 * radius, atol=1e-6)
tcol = perf_counter()
pair, o = bvh.bvh_collect_pairs(bvhcol1, bvhcol2, pos1, pos2, pairdis, buf)
assert not o
if len(pair) > 0:
tcol = perf_counter() - tcol
totcol += tcol
pair1 = pos1 @ hm.hpoint(xyzcol1[pair[:, 0]])[..., None]
pair2 = pos2 @ hm.hpoint(xyzcol2[pair[:, 1]])[..., None]
dpair = np.linalg.norm(pair2 - pair1, axis=1)
assert np.max(dpair) <= pairdis
totmin += tn
totbvh += tbvh
N = N1 * N2
print(
f"slide test {N:,} iter bvhslide double: {int(N/totbvh):,}/s bvhmin {int(N/totmin):,}/s",
# f"slide test {N:,} iter bvhslide float: {int(N/totbvhf):,}/s double: {int(N/totbvh):,}/s bvhmin {int(N/totmin):,}/s",
f"fracmiss: {nhit/N} collect {int(nhit/totcol):,}/s",
)
[docs]def test_bvh_accessors():
xyz = np.random.rand(10, 3) - [0.5, 0.5, 0.5]
b = BVH(xyz)
assert np.allclose(b.com()[:3], np.mean(xyz, axis=0))
p = b.centers()
dmat = np.linalg.norm(p[:, :3] - xyz[:, None], axis=2)
assert np.allclose(np.min(dmat, axis=1), 0)
[docs]def random_walk(N):
x = np.random.randn(N, 3).astype("f").cumsum(axis=0)
x -= x.mean(axis=0)
return 0.5 * x / x.std()
[docs]def test_bvh_isect_range(body=None, cart_sd=0.3, N2=10, mindist=0.02):
N1 = 1 if body else 2
N = N1 * N2
totbvh, totnaive, totbvh0, nhit = 0, 0, 0, 0
for ibvh in range(N1):
if body:
bvh1, bvh2 = body.bvh_bb, body.bvh_bb
else:
# xyz1 = np.random.rand(2000, 3) - [0.5, 0.5, 0.5]
# xyz2 = np.random.rand(2000, 3) - [0.5, 0.5, 0.5]
xyz1 = random_walk(1000)
xyz2 = random_walk(1000)
tcre = perf_counter()
bvh1 = BVH(xyz1)
bvh2 = BVH(xyz2)
tcre = perf_counter() - tcre
pos1 = hm.rand_xform(N2, cart_sd=cart_sd)
pos2 = hm.rand_xform(N2, cart_sd=cart_sd)
ranges = list()
for i in range(N2):
tbvh0 = perf_counter()
c = bvh.bvh_isect(bvh1=bvh1, bvh2=bvh2, pos1=pos1[i], pos2=pos2[i], mindist=mindist)
tbvh0 = perf_counter() - tbvh0
# if not c:
# continue
if c:
nhit += 1
tbvh = perf_counter()
range1 = bvh.isect_range_single(bvh1=bvh1, bvh2=bvh2, pos1=pos1[i], pos2=pos2[i],
mindist=mindist)
tbvh = perf_counter() - tbvh
tn = perf_counter()
range2 = bvh.naive_isect_range(bvh1, bvh2, pos1[i], pos2[i], mindist)
assert range1 == range2
tn = perf_counter() - tn
ranges.append(range1)
# print(f"{str(range1):=^80}")
# body.move_to(pos1).dump_pdb("test1.pdb")
# body.move_to(pos2).dump_pdb("test2.pdb")
# return
# print(f"{i:3} range {range1} {tn / tbvh:8.2f}, {tn:1.6f}, {tbvh:1.6f}")
totbvh += tbvh
totnaive += tn
totbvh0 += tbvh0
lb, ub = bvh.isect_range(bvh1, bvh2, pos1, pos2, mindist)
ranges = np.array(ranges)
assert np.all(lb == ranges[:, 0])
assert np.all(ub == ranges[:, 1])
ok = np.logical_and(lb >= 0, ub >= 0)
isect, clash = bvh.bvh_isect_fixed_range_vec(bvh1, bvh2, pos1, pos2, mindist, lb, ub)
assert not np.any(isect[ok])
print(
f"iscet {nhit:,} hit of {N:,} iter bvh: {int(nhit/totbvh):,}/s fastnaive {int(nhit/totnaive):,}/s",
f"ratio {int(totnaive/totbvh):,}x isect-only: {totbvh/totbvh0:3.3f}x",
)
[docs]def test_bvh_isect_range_ids():
N1 = 50
N2 = 100
N = N1 * N2
# Nids = 100
cart_sd = 0.3
mindist = 0.03
Npts = 1000
factors = [1000, 500, 250, 200, 125, 100, 50, 40, 25, 20, 10, 8, 5, 4, 2, 1]
# Npts = 6
# factors = [3]
# mindist = 0.3
# N1 = 1
assert all(Npts % f == 0 for f in factors)
for ibvh in range(N1):
# for ibvh in [5]:
# np.random.seed(ibvh)
# print(ibvh)
Nids = factors[ibvh % len(factors)]
# xyz1 = np.random.rand(2000, 3) - [0.5, 0.5, 0.5]
# xyz2 = np.random.rand(2000, 3) - [0.5, 0.5, 0.5]
xyz1 = random_walk(Npts)
xyz2 = random_walk(Npts)
tcre = perf_counter()
bvh1 = BVH(xyz1, [], np.repeat(np.arange(Nids), Npts / Nids))
bvh2 = BVH(xyz2, [], np.repeat(np.arange(Nids), Npts / Nids))
tcre = perf_counter() - tcre
pos1 = hm.rand_xform(N2, cart_sd=cart_sd)
pos2 = hm.rand_xform(N2, cart_sd=cart_sd)
# pos1 = pos1[99:]
# pos2 = pos2[99:]
# print(bvh1.vol_lb())
# print(bvh1.vol_ub())
# print(bvh1.obj_id())
# assert 0
# assert bvh1.max_id() == Nids - 1
# assert bvh1.min_lb() == 0
# assert bvh1.max_ub() == Nids - 1
lb, ub = bvh.isect_range(bvh1, bvh2, pos1, pos2, mindist)
pos1 = pos1[lb != -1]
pos2 = pos2[lb != -1]
ub = ub[lb != -1]
lb = lb[lb != -1]
# print(lb, ub)
assert np.all(0 <= lb) and np.all(lb - 1 <= ub) and np.all(ub < Nids)
isectall = bvh.bvh_isect_vec(bvh1, bvh2, pos1, pos2, mindist)
assert np.all(isectall == np.logical_or(lb > 0, ub < Nids - 1))
isect, clash = bvh.bvh_isect_fixed_range_vec(bvh1, bvh2, pos1, pos2, mindist, lb, ub)
if np.any(isect):
print(np.where(isect)[0])
print('lb', lb[isect])
print('ub', ub[isect])
print('cA', clash[isect, 0])
print('cB', clash[isect, 1])
# print('is', isect.astype('i') * 100)
# print('isectlbub', np.sum(isect), np.sum(isect) / len(isect))
assert not np.any(isect[lb <= ub])
[docs]def test_bvh_isect_range_lb_ub(body=None, cart_sd=0.3, N1=3, N2=20, mindist=0.02):
N1 = 1 if body else N1
N = N1 * N2
Npts = 1000
nhit, nrangefail = 0, 0
args = [
rp.Bunch(maxtrim=a, maxtrim_lb=b, maxtrim_ub=c) for a in (-1, 400) for b in (-1, 300)
for c in (-1, 300)
]
for ibvh, arg in it.product(range(N1), args):
if body:
bvh1, bvh2 = body.bvh_bb, body.bvh_bb
else:
# xyz1 = np.random.rand(Npts, 3) - [0.5, 0.5, 0.5]
# xyz2 = np.random.rand(Npts, 3) - [0.5, 0.5, 0.5]
xyz1 = random_walk(Npts)
xyz2 = random_walk(Npts)
bvh1 = BVH(xyz1)
bvh2 = BVH(xyz2)
pos1 = hm.rand_xform(N2, cart_sd=cart_sd)
pos2 = hm.rand_xform(N2, cart_sd=cart_sd)
ranges = list()
for i in range(N2):
c = bvh.bvh_isect(bvh1=bvh1, bvh2=bvh2, pos1=pos1[i], pos2=pos2[i], mindist=mindist)
if c: nhit += 1
range1 = bvh.isect_range_single(bvh1=bvh1, bvh2=bvh2, pos1=pos1[i], pos2=pos2[i],
mindist=mindist, **arg)
ranges.append(range1)
if range1[0] < 0:
nrangefail += 1
assert c
continue
assert (arg.maxtrim < 0) or (np.diff(range1) + 1 >= Npts - arg.maxtrim)
assert (arg.maxtrim_lb < 0) or (range1[0] <= arg.maxtrim_lb)
assert (arg.maxtrim_ub < 0) or (range1[1] + 1 >= Npts - arg.maxtrim_ub)
# mostly covered elsewhere, and quite slow
# range2 = bvh.naive_isect_range(bvh1, bvh2, pos1[i], pos2[i], mindist)
# assert range1 == range2
lb, ub = bvh.isect_range(bvh1, bvh2, pos1, pos2, mindist, **arg)
ranges = np.array(ranges)
assert np.all(lb == ranges[:, 0])
assert np.all(ub == ranges[:, 1])
print(f"iscet {nhit:,} hit of {N:,} iter, frangefail {nrangefail/nhit}", )
[docs]def test_bvh_pickle(tmpdir):
xyz1 = np.random.rand(1000, 3) - [0.5, 0.5, 0.5]
xyz2 = np.random.rand(1000, 3) - [0.5, 0.5, 0.5]
bvh1 = BVH(xyz1)
bvh2 = BVH(xyz2)
pos1 = hm.rand_xform(cart_sd=1)
pos2 = hm.rand_xform(cart_sd=1)
tbvh = perf_counter()
d, i1, i2 = bvh.bvh_min_dist(bvh1, bvh2, pos1, pos2)
rng = bvh.isect_range_single(bvh1, bvh2, pos1, pos2, mindist=d + 0.01)
with open(tmpdir + "/1", "wb") as out:
_pickle.dump(bvh1, out)
with open(tmpdir + "/2", "wb") as out:
_pickle.dump(bvh2, out)
with open(tmpdir + "/1", "rb") as out:
bvh1b = _pickle.load(out)
with open(tmpdir + "/2", "rb") as out:
bvh2b = _pickle.load(out)
assert len(bvh1) == len(bvh1b)
assert len(bvh2) == len(bvh2b)
assert np.allclose(bvh1.com(), bvh1b.com())
assert np.allclose(bvh1.centers(), bvh1b.centers())
assert np.allclose(bvh2.com(), bvh2b.com())
assert np.allclose(bvh2.centers(), bvh2b.centers())
db, i1b, i2b = bvh.bvh_min_dist(bvh1b, bvh2b, pos1, pos2)
assert np.allclose(d, db)
assert i1 == i1b
assert i2 == i2b
rngb = bvh.isect_range_single(bvh1b, bvh2b, pos1, pos2, mindist=d + 0.01)
assert rngb == rng
[docs]def test_bvh_threading_isect_may_fail():
from concurrent.futures import ThreadPoolExecutor
from itertools import repeat
reps = 1
npos = 1000
Npts = 1000
xyz1 = np.random.rand(Npts, 3) - [0.5, 0.5, 0.5]
xyz2 = np.random.rand(Npts, 3) - [0.5, 0.5, 0.5]
bvh1 = BVH(xyz1)
bvh2 = BVH(xyz2)
mindist = 0.1
tottmain, tottthread = 0, 0
nt = 2
exe = ThreadPoolExecutor(nt)
for i in range(reps):
pos1 = hm.rand_xform(npos, cart_sd=0.5)
pos2 = hm.rand_xform(npos, cart_sd=0.5)
buf = np.empty((Npts, 2), dtype="i4")
t = perf_counter()
_ = [bvh.bvh_isect(bvh1, bvh2, p1, p2, mindist) for p1, p2 in zip(pos1, pos2)]
isect = np.array(_)
tmain = perf_counter() - t
tottmain += tmain
t = perf_counter()
futures = exe.map(
bvh.bvh_isect_vec,
repeat(bvh1),
repeat(bvh2),
np.split(pos1, nt),
np.split(pos2, nt),
repeat(mindist),
)
isect2 = np.concatenate([f for f in futures])
tthread = perf_counter() - t
tottthread += tthread
print("fisect", np.sum(isect2) / len(isect2))
assert np.allclose(isect, isect2)
# print("bvh_isect", i, tmain / tthread, ">= 1.1")
# assert tmain / tthread > 1.1
print("bvh_isect", tottmain / tottthread)
[docs]def test_bvh_threading_mindist_may_fail():
from concurrent.futures import ThreadPoolExecutor
from itertools import repeat
reps = 1
npos = 100
Npts = 1000
xyz1 = np.random.rand(Npts, 3) - [0.5, 0.5, 0.5]
xyz2 = np.random.rand(Npts, 3) - [0.5, 0.5, 0.5]
bvh1 = BVH(xyz1)
bvh2 = BVH(xyz2)
tottmain, tottthread = 0, 0
nt = 2
exe = ThreadPoolExecutor(nt)
for i in range(reps):
pos1 = hm.rand_xform(npos, cart_sd=0.7)
pos2 = hm.rand_xform(npos, cart_sd=0.7)
buf = np.empty((Npts, 2), dtype="i4")
t = perf_counter()
_ = [bvh.bvh_min_dist(bvh1, bvh2, p1, p2) for p1, p2 in zip(pos1, pos2)]
mindist = np.array(_)
tmain = perf_counter() - t
tottmain += tmain
t = perf_counter()
futures = exe.map(
bvh.bvh_min_dist_vec,
repeat(bvh1),
repeat(bvh2),
np.split(pos1, nt),
np.split(pos2, nt),
)
mindist2 = np.concatenate([f for f in futures], axis=1).T
tthread = perf_counter() - t
tottthread += tthread
assert np.allclose(mindist, mindist2)
# print("bvh_min_dist", i, tmain / tthread, ">= 1.1")
# assert tmain / tthread > 1.1
print("bvh_min_dist", tottmain / tottthread)
if __name__ == "__main__":
# from rpxdock.body import Body
# b = Body("rpxdock/data/pdb/DHR14.pdb")
# test_bvh_isect_range(b, cart_sd=15, N2=500, mindist=3.5)
# test_bvh_isect_cpp()
# test_bvh_isect_fixed()
test_bvh_isect()
# test_bvh_isect_fixed_range()
# test_bvh_min_cpp()
# test_bvh_min_dist_fixed()
# test_bvh_min_dist()
# test_bvh_min_dist_floormin()
# test_bvh_slide_single_inline()
# test_bvh_slide_single()
# test_bvh_slide_single_xform()
# test_bvh_slide_whole()
# test_collect_pairs_simple()
# test_collect_pairs_simple_selection()
# test_collect_pairs()
# test_collect_pairs_range()
# test_collect_pairs_range_sym()
# test_slide_collect_pairs()
# test_bvh_accessors()
# test_bvh_isect_range()
# test_bvh_isect_range_ids()
# test_bvh_isect_range_lb_ub(N1=10, N2=20)
# import tempfile
# test_bvh_pickle(tempfile.mkdtemp())
# test_bvh_threading_mindist_may_fail()
# test_bvh_threading_isect_may_fail()