|
1 | 1 | # -*- coding: utf-8 -*-
|
2 | 2 |
|
| 3 | +import pytest |
| 4 | + |
3 | 5 | import numpy as np
|
4 | 6 |
|
5 | 7 | from sse.music_v2 import MusicSoundSourceLocator
|
| 8 | +from sse.simulators.environments import ( |
| 9 | + Observer, |
| 10 | + Air, |
| 11 | + Microphone, |
| 12 | + Source, |
| 13 | + Position3D, |
| 14 | + SineSignalGenerator, |
| 15 | +) |
| 16 | + |
| 17 | +SAMPLING_FREQUENCY = 16000 # [Hz] |
| 18 | +SOUND_SPEED = Air().sound_speed # [m/s] |
| 19 | +GAP_WIDTH_BETWEEN_MICS = 5.0 # [m] |
| 20 | +SIGNAL_TIME_LENGTH = 5.0 # [sec.] |
6 | 21 |
|
7 | 22 |
|
8 | 23 | def make_dummy_signals(
|
9 |
| - theta=15.0, |
10 |
| - fs=16000, |
11 |
| - c=340, |
12 |
| - d=1.0, |
| 24 | + theta: float, |
| 25 | + fs: float, |
| 26 | + d: float, |
| 27 | + time_length: float, |
| 28 | + medium=Air(), |
13 | 29 | ) -> np.ndarray:
|
14 |
| - """_summary_ |
| 30 | + """Return 2ch dummy signals. |
15 | 31 |
|
16 | 32 | Args:
|
17 |
| - theta (float, optional): _description_. Defaults to 15.0. |
18 |
| - fs (int, optional): _description_. Defaults to 16000. |
19 |
| - N_fft (int, optional): _description_. Defaults to 128. |
20 |
| - c: Sound speed [m/sec]. Defaults to 340. |
21 |
| - d: Distance between mics [m]. Defaults to 1.0. |
| 33 | + theta: Which direction the signal comes from [rad]. |
| 34 | + fs: Sampling frequency [Hz]. |
| 35 | + d: Distance between mics [m]. |
| 36 | + medium: Medium which sounds pass through. |
22 | 37 |
|
23 | 38 | Returns:
|
24 |
| - np.ndarray: _description_ |
| 39 | + Sound waves shaped as: [num_samples, num_channels]. |
25 | 40 | """
|
26 |
| - tdoa = d * np.sin(np.deg2rad(theta)) / c |
27 |
| - # tdoa = d * np.cos(np.deg2rad(theta)) / c |
28 |
| - print("tdoa", tdoa) |
29 |
| - |
30 |
| - T = 0.2 # [sec] |
31 |
| - # width = N_fft // 2 |
32 |
| - num_points_of_tdoa_width = int(tdoa * fs) # point of TDOA. |
33 |
| - # t = np.linspace(0, N_fft + 2 * width - 1, N_fft + 2 * width) / fs |
34 |
| - t = np.linspace(0, int(fs * T - 1), int(fs * T)) / fs # base time. |
35 |
| - # t1 = t[width + num_points_of_tdoa_width : width + N_fft + num_points_of_tdoa_width] |
36 |
| - # t2 = t[width : width + N_fft] |
37 |
| - t1 = t[num_points_of_tdoa_width:] |
38 |
| - t2 = t[:-num_points_of_tdoa_width] |
39 |
| - print("t1.shape", t1.shape) |
40 |
| - print("t2.shape", t2.shape) |
41 |
| - x1 = np.sin(2 * np.pi * 5000 * t1)[:, None] |
42 |
| - x2 = np.sin(2 * np.pi * 5000 * t2)[:, None] |
43 |
| - x = np.c_[x1, x2] |
44 |
| - # x = np.c_[x1 + np.random.randn(*x1.shape) * 0.05, x2 + np.random.randn(*x2.shape) * 0.05] |
45 |
| - |
46 |
| - # xs = np.random.randn(len(t))[:, None] |
47 |
| - # x1 = xs[num_points_of_tdoa_width:] |
48 |
| - # t2 = xs[:-num_points_of_tdoa_width] |
49 |
| - # x = np.c_[x1, x2] |
50 |
| - return x |
| 41 | + |
| 42 | + obs = Observer( |
| 43 | + sources=[ |
| 44 | + Source( |
| 45 | + position=Position3D(r=100, theta=theta, phi=0), |
| 46 | + signal=SineSignalGenerator(frequency=3000.2).generate( |
| 47 | + sampling_frequency=fs, |
| 48 | + time_length=time_length, |
| 49 | + ), |
| 50 | + ) |
| 51 | + ], |
| 52 | + microphones=[ |
| 53 | + Microphone( |
| 54 | + position=Position3D(r=d / 2, theta=0, phi=0), |
| 55 | + sampling_frequency=fs, |
| 56 | + ), |
| 57 | + Microphone( |
| 58 | + position=Position3D(r=d / 2, theta=np.pi, phi=0), |
| 59 | + sampling_frequency=fs, |
| 60 | + ), |
| 61 | + ], |
| 62 | + medium=medium, |
| 63 | + ) |
| 64 | + outs = obs.ring_sources() |
| 65 | + return np.c_[outs[0].values, outs[1].values] |
51 | 66 |
|
52 | 67 |
|
53 | 68 | class TestMusicSoundSourceLocator:
|
54 |
| - def test_fit_transform(self): |
| 69 | + @pytest.mark.parametrize("theta", [60]) |
| 70 | + def test_fit_transform(self, theta: float): |
| 71 | + x = make_dummy_signals( |
| 72 | + theta=theta / 180 * np.pi, |
| 73 | + fs=SAMPLING_FREQUENCY, |
| 74 | + d=GAP_WIDTH_BETWEEN_MICS, |
| 75 | + time_length=10.0, |
| 76 | + ) |
55 | 77 | self.locator = MusicSoundSourceLocator(
|
56 |
| - fs=16000, |
57 |
| - d=0.1, |
| 78 | + fs=SAMPLING_FREQUENCY, |
| 79 | + d=GAP_WIDTH_BETWEEN_MICS, |
58 | 80 | N_theta=180 + 1,
|
59 | 81 | )
|
60 |
| - X = make_dummy_signals( |
61 |
| - theta=40.0, |
62 |
| - fs=16000, |
63 |
| - d=0.1, |
| 82 | + predicted_theta = self.locator.fit_transform(X=x) |
| 83 | + print("predicted_theta (MUSIC): ", predicted_theta) |
| 84 | + |
| 85 | + |
| 86 | +class TestCSPSoundSourceLocator: |
| 87 | + @pytest.mark.parametrize("theta", [30, 60, 90, 120, 150]) |
| 88 | + def test_accuracy( |
| 89 | + self, |
| 90 | + theta: float, |
| 91 | + acceptable_error_in_deg: float = 5.0, |
| 92 | + ): |
| 93 | + x = make_dummy_signals( |
| 94 | + theta=theta / 180 * np.pi, |
| 95 | + fs=SAMPLING_FREQUENCY, |
| 96 | + d=GAP_WIDTH_BETWEEN_MICS, |
| 97 | + time_length=SIGNAL_TIME_LENGTH, |
| 98 | + ) |
| 99 | + predicted_theta = estimate_theta_by_csp( |
| 100 | + x1=x[:, 0], |
| 101 | + x2=x[:, 1], |
| 102 | + fs=SAMPLING_FREQUENCY, |
| 103 | + c=SOUND_SPEED, |
| 104 | + d=GAP_WIDTH_BETWEEN_MICS, |
64 | 105 | )
|
65 |
| - predicted_theta = self.locator.fit_transform(X=X) |
66 |
| - print("predicted_theta", predicted_theta) |
67 |
| - # np.testing.assert_allclose(predicted_theta, 40.726257) |
| 106 | + print("predicted_theta (CSP): ", predicted_theta) |
| 107 | + assert (predicted_theta - theta) ** 2 < acceptable_error_in_deg |
| 108 | + |
| 109 | + |
| 110 | +def estimate_theta_by_csp( |
| 111 | + x1: np.ndarray, |
| 112 | + x2: np.ndarray, |
| 113 | + fs: float = 16000, |
| 114 | + c: float = 343.3, |
| 115 | + d: float = 0.1, |
| 116 | +) -> float: |
| 117 | + return tdoa2deg(calc_tdoa(x1, x2) / fs, c=c, d=d) |
| 118 | + |
| 119 | + |
| 120 | +def calc_tdoa(x1: np.ndarray, x2: np.ndarray) -> float: |
| 121 | + assert len(x1) == len(x2) |
| 122 | + estimated_delay = calc_csp_coefs(x1=x1, x2=x2).argmax() - len(x1) |
| 123 | + return estimated_delay |
| 124 | + |
| 125 | + |
| 126 | +def calc_csp_coefs(x1: np.ndarray, x2: np.ndarray) -> np.ndarray: |
| 127 | + phi = np.correlate(x2, x1, mode="full") |
| 128 | + PHI = np.fft.fft(phi) |
| 129 | + csp = np.fft.fft(PHI / np.abs(PHI)).real |
| 130 | + return csp |
| 131 | + |
| 132 | + |
| 133 | +def tdoa2deg( |
| 134 | + tdoa: float, |
| 135 | + c: float = 343.3, |
| 136 | + d: float = 0.1, |
| 137 | +) -> float: |
| 138 | + return np.rad2deg(np.arccos(np.clip(tdoa * c / d, -1, 1))) |
| 139 | + |
| 140 | + |
| 141 | +def deg2tdoa( |
| 142 | + deg: float, |
| 143 | + c: float = 343.3, |
| 144 | + d: float = 0.1, |
| 145 | +) -> float: |
| 146 | + return d * np.cos(np.deg2rad(deg)) / c |
0 commit comments