Skip to content

Commit cc0987e

Browse files
committed
feat: bitbucket connector
1 parent 2372dd4 commit cc0987e

File tree

15 files changed

+908
-1
lines changed

15 files changed

+908
-1
lines changed

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ settings.json
2929
*.sw?
3030
/backend/tests/regression/answer_quality/search_test_config.yaml
3131

32+
3233
# Local .terraform directories
3334
**/.terraform/*
3435

backend/onyx/configs/constants.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -199,6 +199,7 @@ class DocumentSource(str, Enum):
199199
HIGHSPOT = "highspot"
200200

201201
IMAP = "imap"
202+
BITBUCKET = "bitbucket"
202203

203204
# Special case just for integration tests
204205
MOCK_CONNECTOR = "mock_connector"
@@ -541,6 +542,7 @@ class OnyxCallTypes(str, Enum):
541542
DocumentSource.GITHUB: "github data (issues, PRs)",
542543
DocumentSource.GITBOOK: "gitbook data",
543544
DocumentSource.GITLAB: "gitlab data",
545+
DocumentSource.BITBUCKET: "bitbucket data",
544546
DocumentSource.GURU: "guru data",
545547
DocumentSource.BOOKSTACK: "bookstack data",
546548
DocumentSource.OUTLINE: "outline data",

backend/onyx/connectors/bitbucket/__init__.py

Whitespace-only changes.
Lines changed: 350 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,350 @@
1+
from __future__ import annotations
2+
3+
from collections.abc import Iterator
4+
from datetime import datetime
5+
from datetime import timedelta
6+
from datetime import timezone
7+
from typing import Any
8+
from typing import TYPE_CHECKING
9+
10+
from onyx.configs.app_configs import INDEX_BATCH_SIZE
11+
from onyx.configs.app_configs import REQUEST_TIMEOUT_SECONDS
12+
from onyx.configs.constants import DocumentSource
13+
from onyx.connectors.bitbucket.utils import build_auth_client
14+
from onyx.connectors.bitbucket.utils import list_repositories
15+
from onyx.connectors.bitbucket.utils import map_pr_to_document
16+
from onyx.connectors.bitbucket.utils import paginate
17+
from onyx.connectors.bitbucket.utils import PR_LIST_RESPONSE_FIELDS
18+
from onyx.connectors.bitbucket.utils import SLIM_PR_LIST_RESPONSE_FIELDS
19+
from onyx.connectors.exceptions import CredentialExpiredError
20+
from onyx.connectors.exceptions import InsufficientPermissionsError
21+
from onyx.connectors.exceptions import UnexpectedValidationError
22+
from onyx.connectors.interfaces import GenerateDocumentsOutput
23+
from onyx.connectors.interfaces import LoadConnector
24+
from onyx.connectors.interfaces import PollConnector
25+
from onyx.connectors.interfaces import SecondsSinceUnixEpoch
26+
from onyx.connectors.interfaces import SlimConnector
27+
from onyx.connectors.models import ConnectorMissingCredentialError
28+
from onyx.connectors.models import Document
29+
from onyx.connectors.models import SlimDocument
30+
from onyx.indexing.indexing_heartbeat import IndexingHeartbeatInterface
31+
from onyx.utils.logger import setup_logger
32+
33+
if TYPE_CHECKING:
34+
import httpx
35+
36+
logger = setup_logger()
37+
38+
39+
class BitbucketConnector(LoadConnector, PollConnector, SlimConnector):
40+
"""Connector for indexing Bitbucket Cloud pull requests.
41+
42+
Args:
43+
workspace: Bitbucket workspace ID.
44+
repositories: Comma-separated list of repository slugs to index.
45+
projects: Comma-separated list of project keys to index all repositories within.
46+
batch_size: Max number of documents to yield per batch.
47+
prune_closed_prs_after_days: Number of days after which to prune closed PRs.
48+
Use -1 to disable pruning, 0 to prune immediately, or a positive number
49+
to prune closed PRs, that are inactive for more than N days.
50+
"""
51+
52+
def __init__(
53+
self,
54+
workspace: str,
55+
repositories: str | None = None,
56+
projects: str | None = None,
57+
batch_size: int = INDEX_BATCH_SIZE,
58+
prune_closed_prs_after_days: int | None = None,
59+
) -> None:
60+
self.workspace = workspace
61+
self._repositories = (
62+
[s.strip() for s in repositories.split(",") if s.strip()]
63+
if repositories
64+
else None
65+
)
66+
self._projects = (
67+
[s.strip() for s in projects.split(",") if s.strip()] if projects else None
68+
)
69+
self.batch_size = batch_size
70+
# Pruning config: -1 => never; 0 => immediate; >0 => older than N days
71+
self._prune_days = (
72+
prune_closed_prs_after_days
73+
if prune_closed_prs_after_days is not None
74+
else -1
75+
)
76+
self.email: str | None = None
77+
self.api_token: str | None = None
78+
79+
def load_credentials(self, credentials: dict[str, Any]) -> dict[str, Any] | None:
80+
"""Load API token-based credentials.
81+
82+
Expects a dict with keys: `bitbucket_email`, `bitbucket_api_token`.
83+
"""
84+
self.email = credentials.get("bitbucket_email")
85+
self.api_token = credentials.get("bitbucket_api_token")
86+
if not self.email or not self.api_token:
87+
raise ConnectorMissingCredentialError("Bitbucket")
88+
return None
89+
90+
def _client(self) -> httpx.Client:
91+
"""Build an authenticated HTTP client or raise if credentials missing."""
92+
if not self.email or not self.api_token:
93+
raise ConnectorMissingCredentialError("Bitbucket")
94+
return build_auth_client(self.email, self.api_token)
95+
96+
def _iter_pull_requests_for_repo(
97+
self, client: httpx.Client, repo_slug: str, params: dict[str, Any] | None = None
98+
) -> Iterator[dict[str, Any]]:
99+
base = f"https://api.bitbucket.org/2.0/repositories/{self.workspace}/{repo_slug}/pullrequests"
100+
yield from paginate(client, base, params)
101+
102+
def _build_params(
103+
self,
104+
fields: str = PR_LIST_RESPONSE_FIELDS,
105+
start: SecondsSinceUnixEpoch | None = None,
106+
end: SecondsSinceUnixEpoch | None = None,
107+
) -> dict[str, Any]:
108+
"""Build Bitbucket fetch params honoring pruning and optional time window.
109+
110+
Respects start/end if BOTH are provided.
111+
- prune_days == -1: include OPEN/MERGED/DECLINED; apply start/end to all states if provided.
112+
- prune_days == 0: include only OPEN; apply start/end to OPEN if provided.
113+
- prune_days > 0: include OPEN (with optional start/end);
114+
for MERGED/DECLINED require updated_on >= max(threshold, start) and <= end (if provided).
115+
"""
116+
117+
def _iso(ts: SecondsSinceUnixEpoch) -> str:
118+
return datetime.fromtimestamp(ts, tz=timezone.utc).isoformat()
119+
120+
def _tc_epoch(
121+
lower_epoch: SecondsSinceUnixEpoch | None,
122+
upper_epoch: SecondsSinceUnixEpoch | None,
123+
) -> str | None:
124+
if lower_epoch is not None and upper_epoch is not None:
125+
lower_iso = _iso(lower_epoch)
126+
upper_iso = _iso(upper_epoch)
127+
return f'(updated_on >= "{lower_iso}" AND updated_on <= "{upper_iso}")'
128+
return None
129+
130+
params: dict[str, Any] = {"fields": fields, "pagelen": self.batch_size}
131+
if self._prune_days == -1:
132+
# All states, optional global window
133+
time_clause = _tc_epoch(start, end)
134+
q = '(state = "OPEN" OR state = "MERGED" OR state = "DECLINED")'
135+
if time_clause:
136+
q = f"{q} AND {time_clause}"
137+
params["q"] = q
138+
139+
elif self._prune_days == 0:
140+
# Only OPEN, optional window
141+
params["state"] = ["OPEN"]
142+
open_tc = _tc_epoch(start, end)
143+
q = '(state = "OPEN")'
144+
if open_tc:
145+
q = f"{q} AND {open_tc}"
146+
params["q"] = q
147+
148+
else:
149+
# prune_days > 0
150+
# load all OPEN and MERGED/DECLINED within the adjusted interval
151+
threshold_dt = datetime.now(tz=timezone.utc) - timedelta(
152+
days=self._prune_days
153+
)
154+
threshold_epoch: SecondsSinceUnixEpoch = int(threshold_dt.timestamp())
155+
156+
# Select max(start, threshold) for closed PRs lower bound (by epoch seconds)
157+
closed_lower_epoch: SecondsSinceUnixEpoch = threshold_epoch
158+
if start is not None and start > threshold_epoch:
159+
closed_lower_epoch = start
160+
161+
closed_tc = _tc_epoch(closed_lower_epoch, end)
162+
open_tc = _tc_epoch(start, end)
163+
164+
parts: list[str] = []
165+
if open_tc:
166+
parts.append(f'(state = "OPEN" AND {open_tc})')
167+
else:
168+
parts.append('(state = "OPEN")')
169+
if closed_tc:
170+
parts.append(
171+
f'((state = "MERGED" OR state = "DECLINED") AND {closed_tc})'
172+
)
173+
else:
174+
parts.append(
175+
f'((state = "MERGED" OR state = "DECLINED") AND updated_on >= "{_iso(threshold_epoch)}")'
176+
)
177+
178+
q = " OR ".join(parts)
179+
params["q"] = q
180+
181+
return params
182+
183+
def _iter_target_repositories(self, client: httpx.Client) -> Iterator[str]:
184+
"""Yield repository slugs based on configuration.
185+
186+
Priority:
187+
- repositories list
188+
- projects list (list repos by project key)
189+
- workspace (all repos)
190+
"""
191+
if self._repositories:
192+
for slug in self._repositories:
193+
yield slug
194+
return
195+
if self._projects:
196+
for project_key in self._projects:
197+
try:
198+
for repo in list_repositories(client, self.workspace, project_key):
199+
slug_val = repo.get("slug")
200+
if isinstance(slug_val, str) and slug_val:
201+
yield slug_val
202+
except Exception as e:
203+
logger.error(
204+
f"Failed to list repositories for project '{project_key}' in workspace '{self.workspace}'",
205+
exc_info=e,
206+
)
207+
return
208+
try:
209+
for repo in list_repositories(client, self.workspace, None):
210+
slug_val = repo.get("slug")
211+
if isinstance(slug_val, str) and slug_val:
212+
yield slug_val
213+
except Exception as e:
214+
logger.error(
215+
f"Failed to list repositories for workspace '{self.workspace}'",
216+
exc_info=e,
217+
)
218+
219+
def _load_by_params(
220+
self, client: httpx.Client, params: dict[str, Any] | None = None
221+
) -> GenerateDocumentsOutput:
222+
batch: list[Document] = []
223+
for slug in self._iter_target_repositories(client):
224+
try:
225+
for pr in self._iter_pull_requests_for_repo(
226+
client, slug, params=params
227+
):
228+
try:
229+
batch.append(map_pr_to_document(pr, self.workspace, slug))
230+
if len(batch) >= self.batch_size:
231+
yield batch
232+
batch = []
233+
except Exception as e:
234+
logger.error(
235+
f"Failed to map PR in repo '{slug}' to document. Skipping PR.",
236+
exc_info=e,
237+
)
238+
except Exception as e:
239+
logger.error(
240+
f"Failed to fetch PRs for repository '{slug}'. Continuing with next repo.",
241+
exc_info=e,
242+
)
243+
if batch:
244+
yield batch
245+
246+
def load_from_state(self) -> GenerateDocumentsOutput:
247+
"""Fetch and yield all pull requests as Documents in batches."""
248+
params = self._build_params()
249+
with self._client() as client:
250+
yield from self._load_by_params(client, params)
251+
252+
def poll_source(
253+
self, start: SecondsSinceUnixEpoch, end: SecondsSinceUnixEpoch
254+
) -> GenerateDocumentsOutput:
255+
"""Fetch pull requests updated within [start, end] and yield in batches."""
256+
params = self._build_params(
257+
start=start,
258+
end=end,
259+
)
260+
with self._client() as client:
261+
yield from self._load_by_params(client, params)
262+
263+
def retrieve_all_slim_documents(
264+
self,
265+
start: SecondsSinceUnixEpoch | None = None,
266+
end: SecondsSinceUnixEpoch | None = None,
267+
callback: IndexingHeartbeatInterface | None = None,
268+
) -> Iterator[list[SlimDocument]]:
269+
"""Return only document IDs for all pull requests for pruning checks."""
270+
batch: list[SlimDocument] = []
271+
params = self._build_params(
272+
fields=SLIM_PR_LIST_RESPONSE_FIELDS,
273+
start=start,
274+
end=end,
275+
)
276+
with self._client() as client:
277+
for slug in self._iter_target_repositories(client):
278+
try:
279+
for pr in self._iter_pull_requests_for_repo(
280+
client, slug, params=params
281+
):
282+
try:
283+
pr_id = pr["id"]
284+
doc_id = f"{DocumentSource.BITBUCKET.value}:{self.workspace}:{slug}:pr:{pr_id}"
285+
batch.append(SlimDocument(id=doc_id))
286+
if len(batch) >= self.batch_size:
287+
yield batch
288+
batch = []
289+
if callback:
290+
if callback.should_stop():
291+
raise RuntimeError(
292+
"bitbucket_pr_sync: Stop signal detected"
293+
)
294+
callback.progress("bitbucket_pr_sync", len(batch))
295+
except Exception as e:
296+
logger.error(
297+
f"Failed to build slim document for a PR in repo '{slug}'. Skipping PR.",
298+
exc_info=e,
299+
)
300+
except Exception as e:
301+
logger.error(
302+
f"Failed to fetch PRs for repository '{slug}' for slim retrieval. Continuing with next repo.",
303+
exc_info=e,
304+
)
305+
if batch:
306+
yield batch
307+
308+
def validate_connector_settings(self) -> None:
309+
"""Validate Bitbucket credentials and workspace access by probing a lightweight endpoint.
310+
311+
Raises:
312+
CredentialExpiredError: on HTTP 401
313+
InsufficientPermissionsError: on HTTP 403
314+
UnexpectedValidationError: on any other failure
315+
"""
316+
try:
317+
with self._client() as client:
318+
url = f"https://api.bitbucket.org/2.0/repositories/{self.workspace}"
319+
resp = client.get(
320+
url,
321+
params={"pagelen": 1, "fields": "pagelen"},
322+
timeout=REQUEST_TIMEOUT_SECONDS,
323+
)
324+
if resp.status_code == 401:
325+
raise CredentialExpiredError(
326+
"Invalid or expired Bitbucket credentials (HTTP 401)."
327+
)
328+
if resp.status_code == 403:
329+
raise InsufficientPermissionsError(
330+
"Insufficient permissions to access Bitbucket workspace (HTTP 403)."
331+
)
332+
if resp.status_code < 200 or resp.status_code >= 300:
333+
raise UnexpectedValidationError(
334+
f"Unexpected Bitbucket error (status={resp.status_code})."
335+
)
336+
except Exception as e:
337+
# Network or other unexpected errors
338+
if isinstance(
339+
e,
340+
(
341+
CredentialExpiredError,
342+
InsufficientPermissionsError,
343+
UnexpectedValidationError,
344+
ConnectorMissingCredentialError,
345+
),
346+
):
347+
raise
348+
raise UnexpectedValidationError(
349+
f"Unexpected error while validating Bitbucket settings: {e}"
350+
)

0 commit comments

Comments
 (0)