Skip to content
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ tonic = { version = "0.8.3", features = ["tls", "tls-roots"] }
prost = "0.11.8"
prost-types = "0.11.8"
anyhow = "1"
reqwest = { version = "0.11.14", optional = true, features = ["stream"] }
reqwest = { version = "0.11.14", optional = true, features = ["stream", "multipart"] }
futures-util = { version = "0.3.27", optional = true }

[build-dependencies]
Expand Down
82 changes: 72 additions & 10 deletions src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,26 @@ use crate::qdrant::snapshots_client::SnapshotsClient;
use crate::qdrant::value::Kind;
use crate::qdrant::vectors::VectorsOptions;
use crate::qdrant::with_payload_selector::SelectorOptions;
use crate::qdrant::{qdrant_client, with_vectors_selector, AliasOperations, ChangeAliases, ClearPayloadPoints, CollectionOperationResponse, Condition, CountPoints, CountResponse, CreateAlias, CreateCollection, CreateFieldIndexCollection, CreateFullSnapshotRequest, CreateSnapshotRequest, CreateSnapshotResponse, DeleteAlias, DeleteCollection, DeleteFieldIndexCollection, DeleteFullSnapshotRequest, DeletePayloadPoints, DeletePoints, DeleteSnapshotRequest, DeleteSnapshotResponse, FieldCondition, FieldType, Filter, GetCollectionInfoRequest, GetCollectionInfoResponse, GetPoints, GetResponse, HasIdCondition, HealthCheckReply, HealthCheckRequest, IsEmptyCondition, ListAliasesRequest, ListAliasesResponse, ListCollectionAliasesRequest, ListCollectionsRequest, ListCollectionsResponse, ListFullSnapshotsRequest, ListSnapshotsRequest, ListSnapshotsResponse, ListValue, NamedVectors, OptimizersConfigDiff, PayloadIncludeSelector, PayloadIndexParams, PointId, PointStruct, PointsIdsList, PointsOperationResponse, PointsSelector, RecommendBatchPoints, RecommendBatchResponse, RecommendPoints, RecommendResponse, RenameAlias, ScrollPoints, ScrollResponse, SearchBatchPoints, SearchBatchResponse, SearchPoints, SearchResponse, SetPayloadPoints, Struct, UpdateCollection, UpsertPoints, Value, Vector, Vectors, VectorsSelector, WithPayloadSelector, WithVectorsSelector, WriteOrdering, ReadConsistency};
use crate::qdrant::{
qdrant_client, with_vectors_selector, AliasOperations, ChangeAliases, ClearPayloadPoints,
CollectionOperationResponse, Condition, CountPoints, CountResponse, CreateAlias,
CreateCollection, CreateFieldIndexCollection, CreateFullSnapshotRequest, CreateSnapshotRequest,
CreateSnapshotResponse, DeleteAlias, DeleteCollection, DeleteFieldIndexCollection,
DeleteFullSnapshotRequest, DeletePayloadPoints, DeletePoints, DeleteSnapshotRequest,
DeleteSnapshotResponse, FieldCondition, FieldType, Filter, GetCollectionInfoRequest,
GetCollectionInfoResponse, GetPoints, GetResponse, HasIdCondition, HealthCheckReply,
HealthCheckRequest, IsEmptyCondition, ListAliasesRequest, ListAliasesResponse,
ListCollectionAliasesRequest, ListCollectionsRequest, ListCollectionsResponse,
ListFullSnapshotsRequest, ListSnapshotsRequest, ListSnapshotsResponse, ListValue, NamedVectors,
OptimizersConfigDiff, PayloadIncludeSelector, PayloadIndexParams, PointId, PointStruct,
PointsIdsList, PointsOperationResponse, PointsSelector, ReadConsistency, RecommendBatchPoints,
RecommendBatchResponse, RecommendPoints, RecommendResponse, RenameAlias, ScrollPoints,
ScrollResponse, SearchBatchPoints, SearchBatchResponse, SearchPoints, SearchResponse,
SetPayloadPoints, Struct, UpdateCollection, UpsertPoints, Value, Vector, Vectors,
VectorsSelector, WithPayloadSelector, WithVectorsSelector, WriteOrdering,
};
use anyhow::{bail, Result};
use reqwest::multipart::{Form, Part};
use std::collections::HashMap;
use std::future::Future;
use std::path::PathBuf;
Expand All @@ -30,7 +48,10 @@ pub struct QdrantClientConfig {

impl QdrantClientConfig {
pub fn from_url(url: &str) -> Self {
QdrantClientConfig { uri: url.to_string(), ..Self::default() }
QdrantClientConfig {
uri: url.to_string(),
..Self::default()
}
}

pub fn set_api_key(&mut self, api_key: &str) {
Expand Down Expand Up @@ -499,7 +520,8 @@ impl QdrantClient {
points: Vec<PointStruct>,
ordering: Option<WriteOrdering>,
) -> Result<PointsOperationResponse> {
self._upsert_points(collection_name, &points, false, ordering).await
self._upsert_points(collection_name, &points, false, ordering)
.await
}

pub async fn upsert_points_blocking(
Expand All @@ -508,7 +530,8 @@ impl QdrantClient {
points: Vec<PointStruct>,
ordering: Option<WriteOrdering>,
) -> Result<PointsOperationResponse> {
self._upsert_points(collection_name, &points, true, ordering).await
self._upsert_points(collection_name, &points, true, ordering)
.await
}

#[inline]
Expand All @@ -517,7 +540,7 @@ impl QdrantClient {
collection_name: impl ToString,
points: &Vec<PointStruct>,
block: bool,
ordering: Option<WriteOrdering>
ordering: Option<WriteOrdering>,
) -> Result<PointsOperationResponse> {
let collection_name = collection_name.to_string();
let collection_name_ref = collection_name.as_str();
Expand Down Expand Up @@ -554,7 +577,7 @@ impl QdrantClient {
collection_name: impl ToString,
points: &PointsSelector,
payload: Payload,
ordering: Option<WriteOrdering>
ordering: Option<WriteOrdering>,
) -> Result<PointsOperationResponse> {
self._set_payload(collection_name, points, &payload, true, ordering)
.await
Expand Down Expand Up @@ -594,7 +617,7 @@ impl QdrantClient {
collection_name: impl ToString,
points: &PointsSelector,
payload: Payload,
ordering: Option<WriteOrdering>
ordering: Option<WriteOrdering>,
) -> Result<PointsOperationResponse> {
self._overwrite_payload(collection_name, points, &payload, false, ordering)
.await
Expand All @@ -618,7 +641,7 @@ impl QdrantClient {
points: &PointsSelector,
payload: &Payload,
block: bool,
ordering: Option<WriteOrdering>
ordering: Option<WriteOrdering>,
) -> Result<PointsOperationResponse> {
let collection_name = collection_name.to_string();
let collection_name_ref = collection_name.as_str();
Expand Down Expand Up @@ -800,7 +823,8 @@ impl QdrantClient {
points: &PointsSelector,
ordering: Option<WriteOrdering>,
) -> Result<PointsOperationResponse> {
self._delete_points(collection_name, false, points, ordering).await
self._delete_points(collection_name, false, points, ordering)
.await
}

pub async fn delete_points_blocking(
Expand All @@ -809,7 +833,8 @@ impl QdrantClient {
points: &PointsSelector,
ordering: Option<WriteOrdering>,
) -> Result<PointsOperationResponse> {
self._delete_points(collection_name, true, points, ordering).await
self._delete_points(collection_name, true, points, ordering)
.await
}

async fn _delete_points(
Expand Down Expand Up @@ -1146,6 +1171,43 @@ impl QdrantClient {

Ok(())
}

pub async fn upload_snapshot<T>(
&self,
in_path: impl Into<PathBuf>,
collection_name: T,
rest_api_uri: Option<T>,
) -> Result<()>
where
T: ToString + Clone,
{
let snapshot_path: PathBuf = in_path.into();
let filename = snapshot_path
.file_name()
.and_then(|name| name.to_str().map(|name| name.to_string()));

let snapshot_file = std::fs::read(snapshot_path)?;

let mut part = Part::bytes(snapshot_file);
if let Some(filename) = filename {
part = part.file_name(filename);
}
let form = Form::new().part("snapshot", part);
let client = reqwest::Client::new();
client
.post(format!(
"{}/collections/{}/snapshots/upload",
rest_api_uri
.map(|uri| uri.to_string())
.unwrap_or_else(|| String::from("http://localhost:6333")),
collection_name.to_string()
))
.multipart(form)
.send()
.await?;

Ok(())
}
}

impl PointStruct {
Expand Down