Skip to content

Commit cea8ec7

Browse files
mkhludnevreta
authored andcommitted
Support more than 1024 IP/masks with indexed field (opensearch-project#16391)
MultiRangeQuery for searching IP masks more 1025 masks in indexed field. --------- Signed-off-by: Mikhail Khludnev <mkhl@apache.org> Signed-off-by: Mikhail Khludnev <mkhludnev@users.noreply.github.com> Co-authored-by: Andriy Redko <drreta@gmail.com>
1 parent bc26123 commit cea8ec7

File tree

4 files changed

+375
-40
lines changed

4 files changed

+375
-40
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
4040
- Bump `org.apache.logging.log4j:log4j-core` from 2.24.1 to 2.24.2 ([#16718](https://github.com/opensearch-project/OpenSearch/pull/16718))
4141

4242
### Changed
43+
- Indexed IP field supports `terms_query` with more than 1025 IP masks [#16391](https://github.com/opensearch-project/OpenSearch/pull/16391)
4344

4445
### Deprecated
4546

Lines changed: 243 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,243 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.search.fields;
10+
11+
import org.apache.lucene.search.IndexSearcher;
12+
import org.opensearch.action.bulk.BulkRequestBuilder;
13+
import org.opensearch.action.search.SearchPhaseExecutionException;
14+
import org.opensearch.action.search.SearchResponse;
15+
import org.opensearch.common.network.InetAddresses;
16+
import org.opensearch.common.xcontent.XContentFactory;
17+
import org.opensearch.core.xcontent.XContentBuilder;
18+
import org.opensearch.index.query.BoolQueryBuilder;
19+
import org.opensearch.index.query.QueryBuilders;
20+
import org.opensearch.index.query.TermsQueryBuilder;
21+
import org.opensearch.test.OpenSearchSingleNodeTestCase;
22+
import org.hamcrest.MatcherAssert;
23+
24+
import java.io.IOException;
25+
import java.net.InetAddress;
26+
import java.util.ArrayList;
27+
import java.util.Collection;
28+
import java.util.HashSet;
29+
import java.util.List;
30+
import java.util.Map;
31+
import java.util.Objects;
32+
import java.util.Set;
33+
import java.util.function.Consumer;
34+
35+
import static org.opensearch.action.support.WriteRequest.RefreshPolicy.IMMEDIATE;
36+
import static org.hamcrest.Matchers.equalTo;
37+
38+
public class SearchIpFieldTermsIT extends OpenSearchSingleNodeTestCase {
39+
40+
/**
41+
* @return number of expected matches
42+
* */
43+
private int createIndex(String indexName, int numberOfMasks, List<String> queryTermsSink) throws IOException {
44+
XContentBuilder xcb = createMapping();
45+
client().admin().indices().prepareCreate(indexName).setMapping(xcb).get();
46+
ensureGreen();
47+
48+
BulkRequestBuilder bulkRequestBuilder = client().prepareBulk();
49+
50+
Set<String> dedupeCidrs = new HashSet<>();
51+
int cidrs = 0;
52+
int ips = 0;
53+
54+
for (int i = 0; ips <= 10240 && cidrs < numberOfMasks && i < 1000000; i++) {
55+
String ip;
56+
int prefix;
57+
boolean mask;
58+
do {
59+
mask = ips > 0 && random().nextBoolean();
60+
ip = generateRandomIPv4();
61+
prefix = 24 + random().nextInt(8); // CIDR prefix for IPv4
62+
} while (mask && !dedupeCidrs.add(getFirstThreeOctets(ip)));
63+
64+
bulkRequestBuilder.add(
65+
client().prepareIndex(indexName).setSource(Map.of("addr", ip, "dummy_filter", randomSubsetOf(1, "1", "2", "3")))
66+
);
67+
68+
final String termToQuery;
69+
if (mask) {
70+
termToQuery = ip + "/" + prefix;
71+
cidrs++;
72+
} else {
73+
termToQuery = ip;
74+
ips++;
75+
}
76+
queryTermsSink.add(termToQuery);
77+
}
78+
int addMatches = 0;
79+
for (int i = 0; i < atLeast(100); i++) {
80+
final String ip;
81+
ip = generateRandomIPv4();
82+
bulkRequestBuilder.add(
83+
client().prepareIndex(indexName).setSource(Map.of("addr", ip, "dummy_filter", randomSubsetOf(1, "1", "2", "3")))
84+
);
85+
boolean match = false;
86+
for (String termQ : queryTermsSink) {
87+
boolean isCidr = termQ.contains("/");
88+
if ((isCidr && isIPInCIDR(ip, termQ)) || (!isCidr && termQ.equals(ip))) {
89+
match = true;
90+
break;
91+
}
92+
}
93+
if (match) {
94+
addMatches++;
95+
} else {
96+
break; // single mismatch is enough.
97+
}
98+
}
99+
100+
bulkRequestBuilder.setRefreshPolicy(IMMEDIATE).get();
101+
return ips + cidrs + addMatches;
102+
}
103+
104+
public void testLessThanMaxClauses() throws IOException {
105+
ArrayList<String> toQuery = new ArrayList<>();
106+
String indexName = "small";
107+
int expectMatches = createIndex(indexName, IndexSearcher.getMaxClauseCount() - 1, toQuery);
108+
109+
assertTermsHitCount(indexName, "addr", toQuery, expectMatches);
110+
assertTermsHitCount(indexName, "addr.idx", toQuery, expectMatches);
111+
assertTermsHitCount(indexName, "addr.dv", toQuery, expectMatches);
112+
// passing dummy filter crushes on rewriting
113+
SearchPhaseExecutionException ose = assertThrows(SearchPhaseExecutionException.class, () -> {
114+
assertTermsHitCount(
115+
indexName,
116+
"addr.dv",
117+
toQuery,
118+
expectMatches,
119+
(boolBuilder) -> boolBuilder.filter(QueryBuilders.termsQuery("dummy_filter", "1", "2", "3"))
120+
.filter(QueryBuilders.termsQuery("dummy_filter", "1", "2", "3", "4"))
121+
.filter(QueryBuilders.termsQuery("dummy_filter", "1", "2", "3", "4", "5"))
122+
);
123+
});
124+
assertTrue("exceeding on query rewrite", ose.shardFailures()[0].getCause() instanceof IndexSearcher.TooManyNestedClauses);
125+
}
126+
127+
public void testExceedMaxClauses() throws IOException {
128+
ArrayList<String> toQuery = new ArrayList<>();
129+
String indexName = "larger";
130+
int expectMatches = createIndex(indexName, IndexSearcher.getMaxClauseCount() + (rarely() ? 0 : atLeast(10)), toQuery);
131+
assertTermsHitCount(indexName, "addr", toQuery, expectMatches);
132+
assertTermsHitCount(indexName, "addr.idx", toQuery, expectMatches);
133+
// error from mapper/parser
134+
final SearchPhaseExecutionException ose = assertThrows(
135+
SearchPhaseExecutionException.class,
136+
() -> assertTermsHitCount(indexName, "addr.dv", toQuery, expectMatches)
137+
);
138+
assertTrue("exceeding on query building", ose.shardFailures()[0].getCause().getCause() instanceof IndexSearcher.TooManyClauses);
139+
}
140+
141+
private static String getFirstThreeOctets(String ipAddress) {
142+
// Split the IP address by the dot delimiter
143+
String[] octets = ipAddress.split("\\.");
144+
145+
// Take the first three octets
146+
String[] firstThreeOctets = new String[3];
147+
System.arraycopy(octets, 0, firstThreeOctets, 0, 3);
148+
149+
// Join the first three octets back together with dots
150+
return String.join(".", firstThreeOctets);
151+
}
152+
153+
private void assertTermsHitCount(String indexName, String field, Collection<String> toQuery, long expectedMatches) {
154+
assertTermsHitCount(indexName, field, toQuery, expectedMatches, (bqb) -> {});
155+
}
156+
157+
private void assertTermsHitCount(
158+
String indexName,
159+
String field,
160+
Collection<String> toQuery,
161+
long expectedMatches,
162+
Consumer<BoolQueryBuilder> addFilter
163+
) {
164+
TermsQueryBuilder ipTerms = QueryBuilders.termsQuery(field, new ArrayList<>(toQuery));
165+
BoolQueryBuilder boolQueryBuilder = QueryBuilders.boolQuery();
166+
addFilter.accept(boolQueryBuilder);
167+
SearchResponse result = client().prepareSearch(indexName).setQuery(boolQueryBuilder.must(ipTerms)
168+
// .filter(QueryBuilders.termsQuery("dummy_filter", "a", "b"))
169+
).get();
170+
long hitsFound = Objects.requireNonNull(result.getHits().getTotalHits()).value;
171+
MatcherAssert.assertThat(field, hitsFound, equalTo(expectedMatches));
172+
}
173+
174+
// Converts an IP string (either IPv4 or IPv6) to a byte array
175+
private static byte[] ipToBytes(String ip) {
176+
InetAddress inetAddress = InetAddresses.forString(ip);
177+
return inetAddress.getAddress();
178+
}
179+
180+
// Checks if an IP is within a given CIDR (works for both IPv4 and IPv6)
181+
private static boolean isIPInCIDR(String ip, String cidr) {
182+
String[] cidrParts = cidr.split("/");
183+
String cidrIp = cidrParts[0];
184+
int prefixLength = Integer.parseInt(cidrParts[1]);
185+
186+
byte[] ipBytes = ipToBytes(ip);
187+
byte[] cidrIpBytes = ipToBytes(cidrIp);
188+
189+
// Calculate how many full bytes and how many bits are in the mask
190+
int fullBytes = prefixLength / 8;
191+
int extraBits = prefixLength % 8;
192+
193+
// Compare full bytes
194+
for (int i = 0; i < fullBytes; i++) {
195+
if (ipBytes[i] != cidrIpBytes[i]) {
196+
return false;
197+
}
198+
}
199+
200+
// Compare extra bits (if any)
201+
if (extraBits > 0) {
202+
int mask = 0xFF << (8 - extraBits);
203+
return (ipBytes[fullBytes] & mask) == (cidrIpBytes[fullBytes] & mask);
204+
}
205+
206+
return true;
207+
}
208+
209+
// Generate a random IPv4 address
210+
private String generateRandomIPv4() {
211+
return String.join(
212+
".",
213+
String.valueOf(random().nextInt(256)),
214+
String.valueOf(random().nextInt(256)),
215+
String.valueOf(random().nextInt(256)),
216+
String.valueOf(random().nextInt(256))
217+
);
218+
}
219+
220+
private XContentBuilder createMapping() throws IOException {
221+
return XContentFactory.jsonBuilder()
222+
.startObject()
223+
.startObject("properties")
224+
.startObject("addr")
225+
.field("type", "ip")
226+
.startObject("fields")
227+
.startObject("idx")
228+
.field("type", "ip")
229+
.field("doc_values", false)
230+
.endObject()
231+
.startObject("dv")
232+
.field("type", "ip")
233+
.field("index", false)
234+
.endObject()
235+
.endObject()
236+
.endObject()
237+
.startObject("dummy_filter")
238+
.field("type", "keyword")
239+
.endObject()
240+
.endObject()
241+
.endObject();
242+
}
243+
}

0 commit comments

Comments
 (0)