Skip to content

Commit a547112

Browse files
feat: bitbucket connector (#5294)
1 parent da5a948 commit a547112

File tree

14 files changed

+887
-0
lines changed

14 files changed

+887
-0
lines changed

backend/onyx/configs/constants.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -199,6 +199,7 @@ class DocumentSource(str, Enum):
199199
HIGHSPOT = "highspot"
200200

201201
IMAP = "imap"
202+
BITBUCKET = "bitbucket"
202203

203204
# Special case just for integration tests
204205
MOCK_CONNECTOR = "mock_connector"
@@ -541,6 +542,7 @@ class OnyxCallTypes(str, Enum):
541542
DocumentSource.GITHUB: "github data (issues, PRs)",
542543
DocumentSource.GITBOOK: "gitbook data",
543544
DocumentSource.GITLAB: "gitlab data",
545+
DocumentSource.BITBUCKET: "bitbucket data",
544546
DocumentSource.GURU: "guru data",
545547
DocumentSource.BOOKSTACK: "bookstack data",
546548
DocumentSource.OUTLINE: "outline data",

backend/onyx/connectors/bitbucket/__init__.py

Whitespace-only changes.
Lines changed: 345 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,345 @@
1+
from __future__ import annotations
2+
3+
import copy
4+
from collections.abc import Callable
5+
from collections.abc import Iterator
6+
from datetime import datetime
7+
from datetime import timezone
8+
from typing import Any
9+
from typing import TYPE_CHECKING
10+
11+
from typing_extensions import override
12+
13+
from onyx.configs.app_configs import INDEX_BATCH_SIZE
14+
from onyx.configs.app_configs import REQUEST_TIMEOUT_SECONDS
15+
from onyx.configs.constants import DocumentSource
16+
from onyx.connectors.bitbucket.utils import build_auth_client
17+
from onyx.connectors.bitbucket.utils import list_repositories
18+
from onyx.connectors.bitbucket.utils import map_pr_to_document
19+
from onyx.connectors.bitbucket.utils import paginate
20+
from onyx.connectors.bitbucket.utils import PR_LIST_RESPONSE_FIELDS
21+
from onyx.connectors.bitbucket.utils import SLIM_PR_LIST_RESPONSE_FIELDS
22+
from onyx.connectors.exceptions import CredentialExpiredError
23+
from onyx.connectors.exceptions import InsufficientPermissionsError
24+
from onyx.connectors.exceptions import UnexpectedValidationError
25+
from onyx.connectors.interfaces import CheckpointedConnector
26+
from onyx.connectors.interfaces import CheckpointOutput
27+
from onyx.connectors.interfaces import SecondsSinceUnixEpoch
28+
from onyx.connectors.interfaces import SlimConnector
29+
from onyx.connectors.models import ConnectorCheckpoint
30+
from onyx.connectors.models import ConnectorFailure
31+
from onyx.connectors.models import ConnectorMissingCredentialError
32+
from onyx.connectors.models import DocumentFailure
33+
from onyx.connectors.models import SlimDocument
34+
from onyx.indexing.indexing_heartbeat import IndexingHeartbeatInterface
35+
from onyx.utils.logger import setup_logger
36+
37+
if TYPE_CHECKING:
38+
import httpx
39+
40+
logger = setup_logger()
41+
42+
43+
class BitbucketConnectorCheckpoint(ConnectorCheckpoint):
44+
"""Checkpoint state for resumable Bitbucket PR indexing.
45+
46+
Fields:
47+
repos_queue: Materialized list of repository slugs to process.
48+
current_repo_index: Index of the repository currently being processed.
49+
next_url: Bitbucket "next" URL for continuing pagination within the current repo.
50+
"""
51+
52+
repos_queue: list[str] = []
53+
current_repo_index: int = 0
54+
next_url: str | None = None
55+
56+
57+
class BitbucketConnector(
58+
CheckpointedConnector[BitbucketConnectorCheckpoint],
59+
SlimConnector,
60+
):
61+
"""Connector for indexing Bitbucket Cloud pull requests.
62+
63+
Args:
64+
workspace: Bitbucket workspace ID.
65+
repositories: Comma-separated list of repository slugs to index.
66+
projects: Comma-separated list of project keys to index all repositories within.
67+
batch_size: Max number of documents to yield per batch.
68+
"""
69+
70+
def __init__(
71+
self,
72+
workspace: str,
73+
repositories: str | None = None,
74+
projects: str | None = None,
75+
batch_size: int = INDEX_BATCH_SIZE,
76+
) -> None:
77+
self.workspace = workspace
78+
self._repositories = (
79+
[s.strip() for s in repositories.split(",") if s.strip()]
80+
if repositories
81+
else None
82+
)
83+
self._projects: list[str] | None = (
84+
[s.strip() for s in projects.split(",") if s.strip()] if projects else None
85+
)
86+
self.batch_size = batch_size
87+
self.email: str | None = None
88+
self.api_token: str | None = None
89+
90+
def load_credentials(self, credentials: dict[str, Any]) -> dict[str, Any] | None:
91+
"""Load API token-based credentials.
92+
93+
Expects a dict with keys: `bitbucket_email`, `bitbucket_api_token`.
94+
"""
95+
self.email = credentials.get("bitbucket_email")
96+
self.api_token = credentials.get("bitbucket_api_token")
97+
if not self.email or not self.api_token:
98+
raise ConnectorMissingCredentialError("Bitbucket")
99+
return None
100+
101+
def _client(self) -> httpx.Client:
102+
"""Build an authenticated HTTP client or raise if credentials missing."""
103+
if not self.email or not self.api_token:
104+
raise ConnectorMissingCredentialError("Bitbucket")
105+
return build_auth_client(self.email, self.api_token)
106+
107+
def _iter_pull_requests_for_repo(
108+
self,
109+
client: httpx.Client,
110+
repo_slug: str,
111+
params: dict[str, Any] | None = None,
112+
start_url: str | None = None,
113+
on_page: Callable[[str | None], None] | None = None,
114+
) -> Iterator[dict[str, Any]]:
115+
base = f"https://api.bitbucket.org/2.0/repositories/{self.workspace}/{repo_slug}/pullrequests"
116+
yield from paginate(
117+
client,
118+
base,
119+
params,
120+
start_url=start_url,
121+
on_page=on_page,
122+
)
123+
124+
def _build_params(
125+
self,
126+
fields: str = PR_LIST_RESPONSE_FIELDS,
127+
start: SecondsSinceUnixEpoch | None = None,
128+
end: SecondsSinceUnixEpoch | None = None,
129+
) -> dict[str, Any]:
130+
"""Build Bitbucket fetch params.
131+
132+
Always include OPEN, MERGED, and DECLINED PRs. If both ``start`` and
133+
``end`` are provided, apply a single updated_on time window.
134+
"""
135+
136+
def _iso(ts: SecondsSinceUnixEpoch) -> str:
137+
return datetime.fromtimestamp(ts, tz=timezone.utc).isoformat()
138+
139+
def _tc_epoch(
140+
lower_epoch: SecondsSinceUnixEpoch | None,
141+
upper_epoch: SecondsSinceUnixEpoch | None,
142+
) -> str | None:
143+
if lower_epoch is not None and upper_epoch is not None:
144+
lower_iso = _iso(lower_epoch)
145+
upper_iso = _iso(upper_epoch)
146+
return f'(updated_on >= "{lower_iso}" AND updated_on <= "{upper_iso}")'
147+
return None
148+
149+
params: dict[str, Any] = {"fields": fields, "pagelen": 50}
150+
time_clause = _tc_epoch(start, end)
151+
q = '(state = "OPEN" OR state = "MERGED" OR state = "DECLINED")'
152+
if time_clause:
153+
q = f"{q} AND {time_clause}"
154+
params["q"] = q
155+
return params
156+
157+
def _iter_target_repositories(self, client: httpx.Client) -> Iterator[str]:
158+
"""Yield repository slugs based on configuration.
159+
160+
Priority:
161+
- repositories list
162+
- projects list (list repos by project key)
163+
- workspace (all repos)
164+
"""
165+
if self._repositories:
166+
for slug in self._repositories:
167+
yield slug
168+
return
169+
if self._projects:
170+
for project_key in self._projects:
171+
for repo in list_repositories(client, self.workspace, project_key):
172+
slug_val = repo.get("slug")
173+
if isinstance(slug_val, str) and slug_val:
174+
yield slug_val
175+
return
176+
for repo in list_repositories(client, self.workspace, None):
177+
slug_val = repo.get("slug")
178+
if isinstance(slug_val, str) and slug_val:
179+
yield slug_val
180+
181+
@override
182+
def load_from_checkpoint(
183+
self,
184+
start: SecondsSinceUnixEpoch,
185+
end: SecondsSinceUnixEpoch,
186+
checkpoint: BitbucketConnectorCheckpoint,
187+
) -> CheckpointOutput[BitbucketConnectorCheckpoint]:
188+
"""Resumable PR ingestion across repos and pages within a time window.
189+
190+
Yields Documents (or ConnectorFailure for per-PR mapping failures) and returns
191+
an updated checkpoint that records repo position and next page URL.
192+
"""
193+
new_checkpoint = copy.deepcopy(checkpoint)
194+
195+
with self._client() as client:
196+
# Materialize target repositories once
197+
if not new_checkpoint.repos_queue:
198+
# Preserve explicit order; otherwise ensure deterministic ordering
199+
repos_list = list(self._iter_target_repositories(client))
200+
new_checkpoint.repos_queue = sorted(set(repos_list))
201+
new_checkpoint.current_repo_index = 0
202+
new_checkpoint.next_url = None
203+
204+
repos = new_checkpoint.repos_queue
205+
if not repos or new_checkpoint.current_repo_index >= len(repos):
206+
new_checkpoint.has_more = False
207+
return new_checkpoint
208+
209+
repo_slug = repos[new_checkpoint.current_repo_index]
210+
211+
first_page_params = self._build_params(
212+
fields=PR_LIST_RESPONSE_FIELDS,
213+
start=start,
214+
end=end,
215+
)
216+
217+
def _on_page(next_url: str | None) -> None:
218+
new_checkpoint.next_url = next_url
219+
220+
for pr in self._iter_pull_requests_for_repo(
221+
client,
222+
repo_slug,
223+
params=first_page_params,
224+
start_url=new_checkpoint.next_url,
225+
on_page=_on_page,
226+
):
227+
try:
228+
document = map_pr_to_document(pr, self.workspace, repo_slug)
229+
yield document
230+
except Exception as e:
231+
pr_id = pr.get("id")
232+
pr_link = (
233+
f"https://bitbucket.org/{self.workspace}/{repo_slug}/pull-requests/{pr_id}"
234+
if pr_id is not None
235+
else None
236+
)
237+
yield ConnectorFailure(
238+
failed_document=DocumentFailure(
239+
document_id=(
240+
f"{DocumentSource.BITBUCKET.value}:{self.workspace}:{repo_slug}:pr:{pr_id}"
241+
if pr_id is not None
242+
else f"{DocumentSource.BITBUCKET.value}:{self.workspace}:{repo_slug}:pr:unknown"
243+
),
244+
document_link=pr_link,
245+
),
246+
failure_message=f"Failed to process Bitbucket PR: {e}",
247+
exception=e,
248+
)
249+
250+
# Advance to next repository (if any) and set has_more accordingly
251+
new_checkpoint.current_repo_index += 1
252+
new_checkpoint.next_url = None
253+
new_checkpoint.has_more = new_checkpoint.current_repo_index < len(repos)
254+
255+
return new_checkpoint
256+
257+
@override
258+
def build_dummy_checkpoint(self) -> BitbucketConnectorCheckpoint:
259+
"""Create an initial checkpoint with work remaining."""
260+
return BitbucketConnectorCheckpoint(has_more=True)
261+
262+
@override
263+
def validate_checkpoint_json(
264+
self, checkpoint_json: str
265+
) -> BitbucketConnectorCheckpoint:
266+
"""Validate and deserialize a checkpoint instance from JSON."""
267+
return BitbucketConnectorCheckpoint.model_validate_json(checkpoint_json)
268+
269+
def retrieve_all_slim_documents(
270+
self,
271+
start: SecondsSinceUnixEpoch | None = None,
272+
end: SecondsSinceUnixEpoch | None = None,
273+
callback: IndexingHeartbeatInterface | None = None,
274+
) -> Iterator[list[SlimDocument]]:
275+
"""Return only document IDs for all existing pull requests."""
276+
batch: list[SlimDocument] = []
277+
params = self._build_params(
278+
fields=SLIM_PR_LIST_RESPONSE_FIELDS,
279+
start=start,
280+
end=end,
281+
)
282+
with self._client() as client:
283+
for slug in self._iter_target_repositories(client):
284+
for pr in self._iter_pull_requests_for_repo(
285+
client, slug, params=params
286+
):
287+
pr_id = pr["id"]
288+
doc_id = f"{DocumentSource.BITBUCKET.value}:{self.workspace}:{slug}:pr:{pr_id}"
289+
batch.append(SlimDocument(id=doc_id))
290+
if len(batch) >= self.batch_size:
291+
yield batch
292+
batch = []
293+
if callback:
294+
if callback.should_stop():
295+
# Note: this is not actually used for permission sync yet, just pruning
296+
raise RuntimeError(
297+
"bitbucket_pr_sync: Stop signal detected"
298+
)
299+
callback.progress("bitbucket_pr_sync", len(batch))
300+
if batch:
301+
yield batch
302+
303+
def validate_connector_settings(self) -> None:
304+
"""Validate Bitbucket credentials and workspace access by probing a lightweight endpoint.
305+
306+
Raises:
307+
CredentialExpiredError: on HTTP 401
308+
InsufficientPermissionsError: on HTTP 403
309+
UnexpectedValidationError: on any other failure
310+
"""
311+
try:
312+
with self._client() as client:
313+
url = f"https://api.bitbucket.org/2.0/repositories/{self.workspace}"
314+
resp = client.get(
315+
url,
316+
params={"pagelen": 1, "fields": "pagelen"},
317+
timeout=REQUEST_TIMEOUT_SECONDS,
318+
)
319+
if resp.status_code == 401:
320+
raise CredentialExpiredError(
321+
"Invalid or expired Bitbucket credentials (HTTP 401)."
322+
)
323+
if resp.status_code == 403:
324+
raise InsufficientPermissionsError(
325+
"Insufficient permissions to access Bitbucket workspace (HTTP 403)."
326+
)
327+
if resp.status_code < 200 or resp.status_code >= 300:
328+
raise UnexpectedValidationError(
329+
f"Unexpected Bitbucket error (status={resp.status_code})."
330+
)
331+
except Exception as e:
332+
# Network or other unexpected errors
333+
if isinstance(
334+
e,
335+
(
336+
CredentialExpiredError,
337+
InsufficientPermissionsError,
338+
UnexpectedValidationError,
339+
ConnectorMissingCredentialError,
340+
),
341+
):
342+
raise
343+
raise UnexpectedValidationError(
344+
f"Unexpected error while validating Bitbucket settings: {e}"
345+
)

0 commit comments

Comments
 (0)