From 5e10e1e424d841bcd20e847b9cc28746d16fb14a Mon Sep 17 00:00:00 2001 From: Jeffrey Date: Tue, 10 Dec 2024 08:56:06 -0500 Subject: [PATCH 01/22] onelake.py file added to /connector/sources --- connectors/sources/onelake.py | 0 1 file changed, 0 insertions(+), 0 deletions(-) create mode 100644 connectors/sources/onelake.py diff --git a/connectors/sources/onelake.py b/connectors/sources/onelake.py new file mode 100644 index 000000000..e69de29bb From 117400be8e79c87f05d6cbde836402427a71e13a Mon Sep 17 00:00:00 2001 From: Jeffrey Date: Tue, 10 Dec 2024 11:32:43 -0500 Subject: [PATCH 02/22] Draft: BaseDataSource class and methods implementation for Onelake connector code --- connectors/sources/onelake.py | 315 ++++++++++++++++++++++++++++++++++ 1 file changed, 315 insertions(+) diff --git a/connectors/sources/onelake.py b/connectors/sources/onelake.py index e69de29bb..b9e2a2e88 100644 --- a/connectors/sources/onelake.py +++ b/connectors/sources/onelake.py @@ -0,0 +1,315 @@ +"""OneLake connector to retrieve data from datalakes""" + +from functools import partial + +from azure.identity import ClientSecretCredential +from azure.storage.filedatalake import DataLakeServiceClient + +from connectors.source import BaseDataSource + +ACCOUNT_NAME = "onelake" + + +class OneLakeDataSource(BaseDataSource): + """OneLake""" + + name = "OneLake" + service_type = "onelake" + incremental_sync_enabled = True + + def __init__(self, configuration): + """Set up the connection to the azure base client + + Args: + configuration (DataSourceConfiguration): Object of DataSourceConfiguration class. + """ + super().__init__(configuration=configuration) + self.tenant_id = self.configuration["tenant_id"] + self.client_id = self.configuration["client_id"] + self.client_secret = self.configuration["client_secret"] + self.workspace_name = self.configuration["workspace_name"] + self.data_path = self.configuration["data_path"] + + @classmethod + def get_default_configuration(cls): + """Get the default configuration for OneLake + + Returns: + dictionary: Default configuration + """ + return { + "tenant_id": { + "label": "OneLake tenant id", + "order": 1, + "type": "str", + }, + "client_id": { + "label": "OneLake client id", + "order": 2, + "type": "str", + }, + "client_secret": { + "label": "OneLake client secret", + "order": 3, + "type": "str", + "sensitive": True, + }, + "workspace_name": { + "label": "OneLake workspace name", + "order": 4, + "type": "str", + }, + "data_path": { + "label": "OneLake data path", + "tooltip": "Path in format .Lakehouse/files/", + "order": 5, + "type": "str", + }, + "account_name": { + "tooltip": "In the most cases is 'onelake'", + "default_value": ACCOUNT_NAME, + "label": "Account name", + "order": 6, + "type": "str", + }, + } + + async def ping(self): + """Verify the connection with OneLake""" + + self._logger.info("Generating file system client...") + + try: + await self._get_directory_paths(self.configuration["data_path"]) + self._logger.info("Connection to OneLake successful") + + except Exception: + self._logger.exception("Error while connecting to OneLake.") + raise + + def _get_account_url(self): + """Get the account URL for OneLake + + Returns: + str: Account URL + """ + + return f"https://{self.configuration['account_name']}.dfs.fabric.microsoft.com" + + def _get_token_credentials(self): + """Get the token credentials for OneLake + + Returns: + obj: Token credentials + """ + + tenant_id = self.configuration["tenant_id"] + client_id = self.configuration["client_id"] + client_secret = self.configuration["client_secret"] + + try: + return ClientSecretCredential(tenant_id, client_id, client_secret) + except Exception as e: + self._logger.error(f"Error while getting token credentials: {e}") + raise + + def _get_service_client(self): + """Get the service client for OneLake + + Returns: + obj: Service client + """ + + try: + return DataLakeServiceClient( + account_url=self._get_account_url(), + credential=self._get_token_credentials(), + ) + except Exception as e: + self._logger.error(f"Error while getting service client: {e}") + raise + + def _get_file_system_client(self): + """Get the file system client for OneLake + + Returns: + obj: File system client + """ + try: + return self._get_service_client().get_file_system_client( + self.configuration["workspace_name"] + ) + except Exception as e: + self._logger.error(f"Error while getting file system client: {e}") + raise + + def _get_directory_client(self): + """Get the directory client for OneLake + + Returns: + obj: Directory client + """ + + try: + return self._get_file_system_client().get_directory_client( + self.configuration["data_path"] + ) + except Exception as e: + self._logger.error(f"Error while getting directory client: {e}") + raise + + async def _get_file_client(self, file_name): + """Get file client from OneLake + + Args: + file_name (str): name of the file + + Returns: + obj: File client + """ + + try: + return self._get_directory_client().get_file_client(file_name) + except Exception as e: + self._logger.error(f"Error while getting file client: {e}") + raise + + async def _get_directory_paths(self, directory_path): + """List directory paths from data lake + + Args: + directory_path (str): Directory path + + Returns: + list: List of paths + """ + + try: + paths = self._get_file_system_client().get_paths(path=directory_path) + + return paths + except Exception as e: + self._logger.error(f"Error while getting directory paths: {e}") + raise + + async def format_file(self, file_client): + """Format file_client to be processed + + Args: + file_client (obj): File object + + Returns: + dict: Formatted file + """ + + try: + file_properties = file_client.get_file_properties() + + return { + "_id": f"{file_client.file_system_name}_{file_properties.name.split('/')[-1]}", + "name": file_properties.name.split("/")[-1], + "created_at": file_properties.creation_time.isoformat(), + "_timestamp": file_properties.last_modified.isoformat(), + "size": file_properties.size, + } + except Exception as e: + self._logger.error( + f"Error while formatting file or getting file properties: {e}" + ) + raise + + async def download_file(self, file_client): + """Download file from OneLake + + Args: + file_client (obj): File client + + Returns: + generator: File stream + """ + + try: + download = file_client.download_file() + stream = download.chunks() + + for chunk in stream: + yield chunk + except Exception as e: + self._logger.error(f"Error while downloading file: {e}") + raise + + async def get_content(self, file_name, doit=None, timestamp=None): + """Obtains the file content for the specified file in `file_name`. + + Args: + file_name (obj): The file name to process to obtain the content. + timestamp (timestamp, optional): Timestamp of blob last modified. Defaults to None. + doit (boolean, optional): Boolean value for whether to get content or not. Defaults to None. + + Returns: + str: Content of the file or None if not applicable. + """ + + if not doit: + return + + file_client = await self._get_file_client(file_name) + file_properties = file_client.get_file_properties() + file_extension = self.get_file_extension(file_name) + + doc = { + "_id": f"{file_client.file_system_name}_{file_properties.name}", # id in format _ + "name": file_properties.name.split("/")[-1], + "_timestamp": file_properties.last_modified, + "created_at": file_properties.creation_time, + } + + can_be_downloaded = self.can_file_be_downloaded( + file_extension=file_extension, + filename=file_properties.name, + file_size=file_properties.size, + ) + + if not can_be_downloaded: + return doc + + extracted_doc = await self.download_and_extract_file( + doc=doc, + source_filename=file_properties.name.split("/")[-1], + file_extension=file_extension, + download_func=partial(self.download_file, file_client), + ) + + return extracted_doc if extracted_doc is not None else doc + + async def prepare_files(self, doc_paths): + """Prepare files for processing + + Args: + doc_paths (list): List of paths extracted from OneLake + + Yields: + tuple: File document and partial function to get content + """ + + for path in doc_paths: + file_name = path.name.split("/")[-1] + field_client = await self._get_file_client(file_name) + + yield self.format_file(field_client) + + async def get_docs(self, filtering=None): + """Get documents from OneLake and index them + + Yields: + tuple: dictionary with meta-data of each file and a partial function to get the file content. + """ + + directory_paths = await self._get_directory_paths( + self.configuration["data_path"] + ) + + async for file in self.prepare_files(directory_paths): + file_dict = await file + + yield file_dict, partial(self.get_content, file_dict["name"]) From a10e0dd9cd19fab4cbe3ccd049705089d2c75a0f Mon Sep 17 00:00:00 2001 From: Jeffrey Date: Tue, 10 Dec 2024 11:39:42 -0500 Subject: [PATCH 03/22] onelake connector declared in connectors/config.py --- connectors/config.py | 1 + 1 file changed, 1 insertion(+) diff --git a/connectors/config.py b/connectors/config.py index 0fd988695..f16fb01fc 100644 --- a/connectors/config.py +++ b/connectors/config.py @@ -121,6 +121,7 @@ def _default_config(): "network_drive": "connectors.sources.network_drive:NASDataSource", "notion": "connectors.sources.notion:NotionDataSource", "onedrive": "connectors.sources.onedrive:OneDriveDataSource", + "onelake": "connectors.sources.onelake:OneLakeDataSource", "oracle": "connectors.sources.oracle:OracleDataSource", "outlook": "connectors.sources.outlook:OutlookDataSource", "postgresql": "connectors.sources.postgresql:PostgreSQLDataSource", From ae7fbb0ea14fb37bc24345fecb104df771b903f2 Mon Sep 17 00:00:00 2001 From: Jeffrey Date: Tue, 10 Dec 2024 12:30:35 -0500 Subject: [PATCH 04/22] onelake connector dependencies added to requirements/framework.txt --- requirements/framework.txt | 2 ++ 1 file changed, 2 insertions(+) diff --git a/requirements/framework.txt b/requirements/framework.txt index 775ad7ac4..a84f644b4 100644 --- a/requirements/framework.txt +++ b/requirements/framework.txt @@ -44,3 +44,5 @@ notion-client==2.2.1 certifi==2024.7.4 aioboto3==12.4.0 pyasn1<0.6.1 +azure-identity==1.19.0 +azure-storage-file-datalake==12.17.0 \ No newline at end of file From 1c8918188162c337046c57774fa9dbb2451b98c5 Mon Sep 17 00:00:00 2001 From: Jeffrey Date: Wed, 11 Dec 2024 12:34:27 -0500 Subject: [PATCH 05/22] refactor sync functions to async functions in onelake connector code --- connectors/sources/onelake.py | 26 ++++++++++++++++---------- 1 file changed, 16 insertions(+), 10 deletions(-) diff --git a/connectors/sources/onelake.py b/connectors/sources/onelake.py index b9e2a2e88..70a3feb60 100644 --- a/connectors/sources/onelake.py +++ b/connectors/sources/onelake.py @@ -113,7 +113,7 @@ def _get_token_credentials(self): self._logger.error(f"Error while getting token credentials: {e}") raise - def _get_service_client(self): + async def _get_service_client(self): """Get the service client for OneLake Returns: @@ -129,21 +129,23 @@ def _get_service_client(self): self._logger.error(f"Error while getting service client: {e}") raise - def _get_file_system_client(self): + async def _get_file_system_client(self): """Get the file system client for OneLake Returns: obj: File system client """ try: - return self._get_service_client().get_file_system_client( + service_client = await self._get_service_client() + + return service_client.get_file_system_client( self.configuration["workspace_name"] ) except Exception as e: self._logger.error(f"Error while getting file system client: {e}") raise - def _get_directory_client(self): + async def _get_directory_client(self): """Get the directory client for OneLake Returns: @@ -151,7 +153,9 @@ def _get_directory_client(self): """ try: - return self._get_file_system_client().get_directory_client( + file_system_client = await self._get_file_system_client() + + return file_system_client.get_directory_client( self.configuration["data_path"] ) except Exception as e: @@ -169,7 +173,9 @@ async def _get_file_client(self, file_name): """ try: - return self._get_directory_client().get_file_client(file_name) + directory_client = await self._get_directory_client() + + return directory_client.get_file_client(file_name) except Exception as e: self._logger.error(f"Error while getting file client: {e}") raise @@ -185,14 +191,14 @@ async def _get_directory_paths(self, directory_path): """ try: - paths = self._get_file_system_client().get_paths(path=directory_path) + file_system_client = await self._get_file_system_client() - return paths + return file_system_client.get_paths(path=directory_path) except Exception as e: self._logger.error(f"Error while getting directory paths: {e}") raise - async def format_file(self, file_client): + def format_file(self, file_client): """Format file_client to be processed Args: @@ -310,6 +316,6 @@ async def get_docs(self, filtering=None): ) async for file in self.prepare_files(directory_paths): - file_dict = await file + file_dict = file yield file_dict, partial(self.get_content, file_dict["name"]) From 314bf38ca2c494b949fb492b753654abed9f9f8b Mon Sep 17 00:00:00 2001 From: Jeffrey Date: Wed, 11 Dec 2024 12:36:54 -0500 Subject: [PATCH 06/22] name and created_at removed from dictionary in get_content function (onelake connector) --- connectors/sources/onelake.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/connectors/sources/onelake.py b/connectors/sources/onelake.py index 70a3feb60..85a3c604c 100644 --- a/connectors/sources/onelake.py +++ b/connectors/sources/onelake.py @@ -265,9 +265,7 @@ async def get_content(self, file_name, doit=None, timestamp=None): doc = { "_id": f"{file_client.file_system_name}_{file_properties.name}", # id in format _ - "name": file_properties.name.split("/")[-1], "_timestamp": file_properties.last_modified, - "created_at": file_properties.creation_time, } can_be_downloaded = self.can_file_be_downloaded( From 4cf0c10af9990e459d599c4302b71dfe10ba70f2 Mon Sep 17 00:00:00 2001 From: Jeffrey Date: Wed, 11 Dec 2024 16:53:09 -0500 Subject: [PATCH 07/22] (onelake connector) added logs --- connectors/sources/onelake.py | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/connectors/sources/onelake.py b/connectors/sources/onelake.py index 85a3c604c..6fca8db83 100644 --- a/connectors/sources/onelake.py +++ b/connectors/sources/onelake.py @@ -81,7 +81,9 @@ async def ping(self): try: await self._get_directory_paths(self.configuration["data_path"]) - self._logger.info("Connection to OneLake successful") + self._logger.info( + f"Connection to OneLake successful to {self.configuration['data_path']}" + ) except Exception: self._logger.exception("Error while connecting to OneLake.") @@ -275,6 +277,9 @@ async def get_content(self, file_name, doit=None, timestamp=None): ) if not can_be_downloaded: + self._logger.warning( + f"File {file_properties.name} cannot be downloaded. Skipping." + ) return doc extracted_doc = await self.download_and_extract_file( @@ -309,6 +314,7 @@ async def get_docs(self, filtering=None): tuple: dictionary with meta-data of each file and a partial function to get the file content. """ + self._logger.info(f"Fetching files from OneLake datalake {self.data_path}") directory_paths = await self._get_directory_paths( self.configuration["data_path"] ) From ee07c13e14c170a3c585c3c98f110c0997dd33a9 Mon Sep 17 00:00:00 2001 From: Jeffrey Date: Wed, 11 Dec 2024 16:59:04 -0500 Subject: [PATCH 08/22] (onelake connector) added debug statements --- connectors/sources/onelake.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/connectors/sources/onelake.py b/connectors/sources/onelake.py index 6fca8db83..da9573fdb 100644 --- a/connectors/sources/onelake.py +++ b/connectors/sources/onelake.py @@ -282,6 +282,7 @@ async def get_content(self, file_name, doit=None, timestamp=None): ) return doc + self._logger.debug(f"Downloading file {file_properties.name}...") extracted_doc = await self.download_and_extract_file( doc=doc, source_filename=file_properties.name.split("/")[-1], @@ -319,6 +320,7 @@ async def get_docs(self, filtering=None): self.configuration["data_path"] ) + self._logger.debug(f"Found {len(directory_paths)} files in {self.data_path}") async for file in self.prepare_files(directory_paths): file_dict = file From e7716ae3a67e392653e7f87717143f5ae35d1ddd Mon Sep 17 00:00:00 2001 From: Jeffrey Date: Mon, 16 Dec 2024 07:33:50 -0500 Subject: [PATCH 09/22] downgrade azure-storage-file-datalek to 12.14.0 version --- requirements/framework.txt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/requirements/framework.txt b/requirements/framework.txt index a84f644b4..94da2fc48 100644 --- a/requirements/framework.txt +++ b/requirements/framework.txt @@ -45,4 +45,4 @@ certifi==2024.7.4 aioboto3==12.4.0 pyasn1<0.6.1 azure-identity==1.19.0 -azure-storage-file-datalake==12.17.0 \ No newline at end of file +azure-storage-file-datalake==12.14.0 \ No newline at end of file From 3f8871034701b83b841c4cd818ae5f196f58b9a8 Mon Sep 17 00:00:00 2001 From: Jeffrey Date: Tue, 17 Dec 2024 07:10:39 -0500 Subject: [PATCH 10/22] (onelake connector) tests: 'ping', 'format_file' and '_get_file_client' methods --- tests/sources/test_onelake.py | 116 ++++++++++++++++++++++++++++++++++ 1 file changed, 116 insertions(+) create mode 100644 tests/sources/test_onelake.py diff --git a/tests/sources/test_onelake.py b/tests/sources/test_onelake.py new file mode 100644 index 000000000..4fbdebedf --- /dev/null +++ b/tests/sources/test_onelake.py @@ -0,0 +1,116 @@ +from contextlib import asynccontextmanager +from datetime import datetime +from unittest.mock import AsyncMock, MagicMock, Mock, patch + +import pytest + +from connectors.sources.onelake import OneLakeDataSource +from tests.sources.support import create_source + + +@asynccontextmanager +async def create_abs_source( + use_text_extraction_service=False, +): + async with create_source( + OneLakeDataSource, + tenant_id="", + client_id="", + client_secret="", + workspace_name="", + data_path="", + use_text_extraction_service=use_text_extraction_service, + ) as source: + yield source + + +@pytest.mark.asyncio +async def test_ping_for_successful_connection(): + """Test ping method of OneLakeDataSource class""" + + # Setup + async with create_abs_source() as source: + with patch.object( + source, "_get_directory_paths", new_callable=AsyncMock + ) as mock_get_paths: + mock_get_paths.return_value = [] + + # Run + await source.ping() + + # Check + mock_get_paths.assert_called_once_with(source.configuration["data_path"]) + + +@pytest.mark.asyncio +async def test_ping_for_failed_connection(): + """Test ping method of OneLakeDataSource class with negative case""" + + # Setup + async with create_abs_source() as source: + with patch.object( + source, "_get_directory_paths", new_callable=AsyncMock + ) as mock_get_paths: + mock_get_paths.side_effect = Exception("Something went wrong") + + # Run & Check + with pytest.raises(Exception, match="Something went wrong"): + await source.ping() + + mock_get_paths.assert_called_once_with(source.configuration["data_path"]) + + # Cleanup + mock_get_paths.reset_mock + + +@pytest.mark.asyncio +async def test_get_file_client(): + """Test _get_file_client method of OneLakeDataSource class""" + + # Setup + mock_client = Mock() + async with create_abs_source() as source: + with patch.object( + source, "_get_file_client", new_callable=AsyncMock + ) as mock_method: + mock_method.return_value = mock_client + + # Run + file_client = await source._get_file_client() + + # Check + assert file_client is mock_client + + +@pytest.mark.asyncio +async def test_format_file(): + """Test format_file method of OneLakeDataSource class""" + + # Setup + async with create_abs_source() as source: + mock_file_client = MagicMock() + mock_file_properties = MagicMock( + creation_time=datetime(2022, 4, 21, 12, 12, 30), + last_modified=datetime(2022, 4, 22, 15, 45, 10), + size=2048, + name="path/to/file.txt", + ) + + mock_file_properties.name.split.return_value = ["path", "to", "file.txt"] + mock_file_client.get_file_properties.return_value = mock_file_properties + mock_file_client.file_system_name = "my_file_system" + + expected_output = { + "_id": "my_file_system_file.txt", + "name": "file.txt", + "created_at": "2022-04-21T12:12:30", + "_timestamp": "2022-04-22T15:45:10", + "size": 2048, + } + + # Execute + actual_output = source.format_file(mock_file_client) + + # Assert + assert actual_output == expected_output + mock_file_client.get_file_properties.assert_called_once() From 297db3e98eb5c9960f4a6a7c95023d7f204f1176 Mon Sep 17 00:00:00 2001 From: Jeffrey Date: Wed, 18 Dec 2024 08:50:32 -0500 Subject: [PATCH 11/22] (onelake connector) removed _timestamp from dictionary --- connectors/sources/onelake.py | 1 - 1 file changed, 1 deletion(-) diff --git a/connectors/sources/onelake.py b/connectors/sources/onelake.py index da9573fdb..580b7119e 100644 --- a/connectors/sources/onelake.py +++ b/connectors/sources/onelake.py @@ -267,7 +267,6 @@ async def get_content(self, file_name, doit=None, timestamp=None): doc = { "_id": f"{file_client.file_system_name}_{file_properties.name}", # id in format _ - "_timestamp": file_properties.last_modified, } can_be_downloaded = self.can_file_be_downloaded( From 8afd67681c919ea16b8e6962d99b3e8ed3b02d5c Mon Sep 17 00:00:00 2001 From: Jeffrey Date: Tue, 24 Dec 2024 11:50:23 -0500 Subject: [PATCH 12/22] (onelake connector) 96% test coverage reached --- tests/sources/test_onelake.py | 508 +++++++++++++++++++++++++++++++++- 1 file changed, 499 insertions(+), 9 deletions(-) diff --git a/tests/sources/test_onelake.py b/tests/sources/test_onelake.py index 4fbdebedf..a947c3729 100644 --- a/tests/sources/test_onelake.py +++ b/tests/sources/test_onelake.py @@ -1,6 +1,7 @@ from contextlib import asynccontextmanager from datetime import datetime -from unittest.mock import AsyncMock, MagicMock, Mock, patch +from functools import partial +from unittest.mock import AsyncMock, MagicMock, Mock, call, patch import pytest @@ -64,22 +65,274 @@ async def test_ping_for_failed_connection(): @pytest.mark.asyncio -async def test_get_file_client(): - """Test _get_file_client method of OneLakeDataSource class""" +async def test_get_account_url(): + """Test _get_account_url method of OneLakeDataSource class""" # Setup - mock_client = Mock() async with create_abs_source() as source: + account_name = source.configuration["account_name"] + expected_url = f"https://{account_name}.dfs.fabric.microsoft.com" + + # Run + actual_url = source._get_account_url() + + # Check + assert actual_url == expected_url + + +@pytest.mark.asyncio +async def test_get_token_credentials(): + """Test _get_token_credentials method of OneLakeDataSource class""" + + # Setup + async with create_abs_source() as source: + tenant_id = source.configuration["tenant_id"] + client_id = source.configuration["client_id"] + client_secret = source.configuration["client_secret"] + + with patch( + "connectors.sources.onelake.ClientSecretCredential", autospec=True + ) as mock_credential: + mock_instance = mock_credential.return_value + + # Run + credentials = source._get_token_credentials() + + # Check + mock_credential.assert_called_once_with(tenant_id, client_id, client_secret) + assert credentials is mock_instance + + +@pytest.mark.asyncio +async def test_get_token_credentials_error(): + """Test _get_token_credentials method when credential creation fails""" + + async with create_abs_source() as source: + with patch( + "connectors.sources.onelake.ClientSecretCredential", autospec=True + ) as mock_credential: + mock_credential.side_effect = Exception("Credential error") + + with pytest.raises(Exception, match="Credential error"): + source._get_token_credentials() + + +@pytest.mark.asyncio +async def test_get_service_client(): + """Test _get_service_client method of OneLakeDataSource class""" + + # Setup + async with create_abs_source() as source: + mock_service_client = Mock() + mock_account_url = "https://mockaccount.dfs.fabric.microsoft.com" + mock_credentials = Mock() + + with patch( + "connectors.sources.onelake.DataLakeServiceClient", + autospec=True, + ) as mock_client, patch.object( + source, + "_get_account_url", + return_value=mock_account_url, + ), patch.object( + source, "_get_token_credentials", return_value=mock_credentials + ): + mock_client.return_value = mock_service_client + + # Run + service_client = await source._get_service_client() + + # Check + mock_client.assert_called_once_with( + account_url=mock_account_url, + credential=mock_credentials, + ) + assert service_client is mock_service_client + + +@pytest.mark.asyncio +async def test_get_service_client_error(): + """Test _get_service_client method when client creation fails""" + + async with create_abs_source() as source: + with patch( + "connectors.sources.onelake.DataLakeServiceClient", + side_effect=Exception("Service client error"), + ): + with pytest.raises(Exception, match="Service client error"): + await source._get_service_client() + + +@pytest.mark.asyncio +async def test_get_file_system_client(): + """Test _get_file_system_client method of OneLakeDataSource class""" + + # Setup + async with create_abs_source() as source: + mock_file_system_client = Mock() + workspace_name = source.configuration["workspace_name"] + with patch.object( - source, "_get_file_client", new_callable=AsyncMock - ) as mock_method: - mock_method.return_value = mock_client + source, "_get_service_client", new_callable=AsyncMock + ) as mock_get_service_client: + mock_service_client = Mock() + mock_service_client.get_file_system_client.return_value = ( + mock_file_system_client + ) + mock_get_service_client.return_value = mock_service_client + + # Run + file_system_client = await source._get_file_system_client() + + # Check + mock_service_client.get_file_system_client.assert_called_once_with( + workspace_name + ) + assert file_system_client == mock_file_system_client + + +@pytest.mark.asyncio +async def test_get_file_system_client_error(): + """Test _get_file_system_client method when client creation fails""" + + async with create_abs_source() as source: + mock_service_client = Mock() + mock_service_client.get_file_system_client.side_effect = Exception( + "File system error" + ) + + with patch.object( + source, "_get_service_client", new_callable=AsyncMock + ) as mock_get_service_client: + mock_get_service_client.return_value = mock_service_client + + with pytest.raises(Exception, match="File system error"): + await source._get_file_system_client() + + +@pytest.mark.asyncio +async def test_get_directory_client(): + """Test _get_directory_client method of OneLakeDataSource class""" + + # Setup + async with create_abs_source() as source: + mock_directory_client = Mock() + data_path = source.configuration["data_path"] + + with patch.object( + source, "_get_file_system_client", new_callable=AsyncMock + ) as mock_get_file_system_client: + mock_file_system_client = Mock() + mock_file_system_client.get_directory_client.return_value = ( + mock_directory_client + ) + mock_get_file_system_client.return_value = mock_file_system_client + + # Run + directory_client = await source._get_directory_client() + + # Check + mock_file_system_client.get_directory_client.assert_called_once_with( + data_path + ) + assert directory_client == mock_directory_client + + +@pytest.mark.asyncio +async def test_get_directory_client_error(): + """Test _get_directory_client method when client creation fails""" + + async with create_abs_source() as source: + mock_file_system_client = Mock() + mock_file_system_client.get_directory_client.side_effect = Exception( + "Directory error" + ) + + with patch.object( + source, "_get_file_system_client", new_callable=AsyncMock + ) as mock_get_file_system_client: + mock_get_file_system_client.return_value = mock_file_system_client + + with pytest.raises(Exception, match="Directory error"): + await source._get_directory_client() + + +@pytest.mark.asyncio +async def test_get_file_client_success(): + """Test successful file client retrieval""" + + mock_file_client = Mock() + mock_directory_client = Mock() + mock_directory_client.get_file_client.return_value = mock_file_client + + async with create_abs_source() as source: + with patch.object( + source, "_get_directory_client", new_callable=AsyncMock + ) as mock_get_directory: + mock_get_directory.return_value = mock_directory_client + + result = await source._get_file_client("test.txt") + + assert result == mock_file_client + mock_directory_client.get_file_client.assert_called_once_with("test.txt") + + +@pytest.mark.asyncio +async def test_get_file_client_error(): + """Test file client retrieval with error""" + + async with create_abs_source() as source: + with patch.object( + source, "_get_directory_client", new_callable=AsyncMock + ) as mock_get_directory: + mock_get_directory.side_effect = Exception("Test error") + + with pytest.raises(Exception, match="Test error"): + await source._get_file_client("test.txt") + + +@pytest.mark.asyncio +async def test_get_directory_paths(): + """Test _get_directory_paths method of OneLakeDataSource class""" + + # Setup + async with create_abs_source() as source: + mock_paths = ["path1", "path2"] + directory_path = "mock_directory_path" + + with patch.object( + source, "_get_file_system_client", new_callable=AsyncMock + ) as mock_get_file_system_client: + mock_get_paths = Mock(return_value=mock_paths) + mock_file_system_client = mock_get_file_system_client.return_value + mock_file_system_client.get_paths = mock_get_paths # Run - file_client = await source._get_file_client() + paths = await source._get_directory_paths(directory_path) # Check - assert file_client is mock_client + mock_file_system_client.get_paths.assert_called_once_with( + path=directory_path + ) + assert paths == mock_paths + + +@pytest.mark.asyncio +async def test_get_directory_paths_error(): + """Test _get_directory_paths method when getting paths fails""" + + async with create_abs_source() as source: + directory_path = "mock_directory_path" + with patch.object( + source, "_get_file_system_client", new_callable=AsyncMock + ) as mock_get_file_system_client: + mock_file_system_client = mock_get_file_system_client.return_value + mock_file_system_client.get_paths = AsyncMock( + side_effect=Exception("Path error") + ) + + with pytest.raises(Exception, match="Path error"): + await source._get_directory_paths(directory_path) @pytest.mark.asyncio @@ -114,3 +367,240 @@ async def test_format_file(): # Assert assert actual_output == expected_output mock_file_client.get_file_properties.assert_called_once() + + +@pytest.mark.asyncio +async def test_format_file_error(): + """Test format_file method when getting properties fails""" + + async with create_abs_source() as source: + mock_file_client = MagicMock() + mock_file_client.get_file_properties.side_effect = Exception("Properties error") + mock_file_client.file_system_name = "my_file_system" + + with pytest.raises(Exception, match="Properties error"): + source.format_file(mock_file_client) + + +@pytest.mark.asyncio +async def test_format_file_empty_name(): + """Test format_file method with empty file name""" + + async with create_abs_source() as source: + mock_file_client = MagicMock() + mock_file_properties = MagicMock( + creation_time=datetime(2022, 4, 21, 12, 12, 30), + last_modified=datetime(2022, 4, 22, 15, 45, 10), + size=2048, + name="", + ) + mock_file_properties.name.split.return_value = [""] + mock_file_client.get_file_properties.return_value = mock_file_properties + mock_file_client.file_system_name = "my_file_system" + + result = source.format_file(mock_file_client) + assert result["name"] == "" + assert result["_id"] == "my_file_system_" + + +@pytest.mark.asyncio +async def test_download_file(): + """Test download_file method of OneLakeDataSource class""" + + # Setup + mock_file_client = Mock() + mock_download = Mock() + mock_file_client.download_file.return_value = mock_download + + mock_chunks = ["chunk1", "chunk2", "chunk3"] + + mock_download.chunks.return_value = iter(mock_chunks) + + async with create_abs_source() as source: + # Run + chunks = [] + async for chunk in source.download_file(mock_file_client): + chunks.append(chunk) + + # Check + assert chunks == mock_chunks + mock_file_client.download_file.assert_called_once() + mock_download.chunks.assert_called_once() + + +@pytest.mark.asyncio +async def test_download_file_with_error(): + """Test download_file method of OneLakeDataSource class with exception handling""" + + # Setup + mock_file_client = Mock() + mock_download = Mock() + mock_file_client.download_file.return_value = mock_download + mock_download.chunks.side_effect = Exception("Download error") + + async with create_abs_source() as source: + # Run & Check + with pytest.raises(Exception, match="Download error"): + async for _ in source.download_file(mock_file_client): + pass + + mock_file_client.download_file.assert_called_once() + mock_download.chunks.assert_called_once() + + +@pytest.mark.asyncio +async def test_get_content_with_download(): + """Test get_content method when doit=True""" + + mock_configuration = { + "account_name": "mockaccount", + "tenant_id": "mocktenant", + "client_id": "mockclient", + "client_secret": "mocksecret", + "workspace_name": "mockworkspace", + "data_path": "mockpath", + } + + async with create_abs_source() as source: + source.configuration = mock_configuration + + class FileClientMock: + file_system_name = "mockfilesystem" + + class FileProperties: + def __init__(self, name, size): + self.name = name + self.size = size + + def get_file_properties(self): + return self.FileProperties(name="file1.txt", size=2000) + + with patch.object( + source, "_get_file_client", return_value=FileClientMock() + ), patch.object( + source, "can_file_be_downloaded", return_value=True + ), patch.object( + source, + "download_and_extract_file", + return_value={ + "_id": "mockfilesystem_file1.txt", + "_attachment": "TW9jayBjb250ZW50", + }, + ): + actual_response = await source.get_content("file1.txt", doit=True) + assert actual_response == { + "_id": "mockfilesystem_file1.txt", + "_attachment": "TW9jayBjb250ZW50", + } + + +@pytest.mark.asyncio +async def test_get_content_without_download(): + """Test get_content method when doit=False""" + + async with create_abs_source() as source: + source.configuration = { + "account_name": "mockaccount", + "tenant_id": "mocktenant", + "client_id": "mockclient", + "client_secret": "mocksecret", + "workspace_name": "mockworkspace", + "data_path": "mockpath", + } + + class FileClientMock: + file_system_name = "mockfilesystem" + + class FileProperties: + def __init__(self, name, size): + self.name = name + self.size = size + + def get_file_properties(self): + return self.FileProperties(name="file1.txt", size=2000) + + with patch.object(source, "_get_file_client", return_value=FileClientMock()): + actual_response = await source.get_content("file1.txt", doit=False) + assert actual_response is None + + +@pytest.mark.asyncio +async def test_prepare_files(): + """Test prepare_files method of OneLakeDataSource class""" + + # Setup + doc_paths = [ + Mock( + name="doc1", + **{"name.split.return_value": ["folder", "doc1"], "path": "folder/doc1"}, + ), + Mock( + name="doc2", + **{"name.split.return_value": ["folder", "doc2"], "path": "folder/doc2"}, + ), + ] + mock_field_client = Mock() + + async def mock_format_file(*args, **kwargs): + """Mock for the format_file method""" + + return "file_document", "partial_function" + + async with create_abs_source() as source: + with patch.object( + source, "_get_file_client", new_callable=AsyncMock + ) as mock_get_file_client: + mock_get_file_client.return_value = mock_field_client + + with patch.object(source, "format_file", side_effect=mock_format_file): + result = [] + # Run + async for item in source.prepare_files(doc_paths): + result.append(await item) + + # Check results + assert result == [ + ("file_document", "partial_function"), + ("file_document", "partial_function"), + ] + + mock_get_file_client.assert_has_calls([call("doc1"), call("doc2")]) + + +@pytest.mark.asyncio +async def test_get_docs(): + """Test get_docs method of OneLakeDataSource class""" + + mock_paths = [ + Mock(name="doc1", path="folder/doc1"), + Mock(name="doc2", path="folder/doc2"), + ] + + mock_file_docs = [{"name": "doc1", "id": "1"}, {"name": "doc2", "id": "2"}] + + async def mock_prepare_files_impl(paths): + for doc in mock_file_docs: + yield doc + + async with create_abs_source() as source: + with patch.object( + source, "_get_directory_paths", new_callable=AsyncMock + ) as mock_get_paths: + mock_get_paths.return_value = mock_paths + + with patch.object( + source, "prepare_files", side_effect=mock_prepare_files_impl + ): + result = [] + async for doc, get_content in source.get_docs(): + result.append((doc, get_content)) + + mock_get_paths.assert_called_once_with( + source.configuration["data_path"] + ) + assert len(result) == 2 + for (doc, get_content), expected_doc in zip(result, mock_file_docs): + assert doc == expected_doc + assert isinstance(get_content, partial) + assert get_content.func == source.get_content + assert get_content.args == (doc["name"],) From 38c260cf5d7a99b12a8ad50a8bf0f44636f90003 Mon Sep 17 00:00:00 2001 From: Jeffrey Date: Fri, 3 Jan 2025 14:26:53 -0500 Subject: [PATCH 13/22] (onelake) fixture folder for onelake connector, added fixture.py, docker-compose and connector.json --- tests/sources/fixtures/onelake/connector.json | 44 +++++++++ .../fixtures/onelake/docker-compose.yml | 44 +++++++++ tests/sources/fixtures/onelake/fixture.py | 92 +++++++++++++++++++ 3 files changed, 180 insertions(+) create mode 100644 tests/sources/fixtures/onelake/connector.json create mode 100644 tests/sources/fixtures/onelake/docker-compose.yml create mode 100644 tests/sources/fixtures/onelake/fixture.py diff --git a/tests/sources/fixtures/onelake/connector.json b/tests/sources/fixtures/onelake/connector.json new file mode 100644 index 000000000..1b74c208b --- /dev/null +++ b/tests/sources/fixtures/onelake/connector.json @@ -0,0 +1,44 @@ +{ + "configuration": { + "tenant_id": { + "label": "OneLake tenant id", + "order": 1, + "type": "str", + "value": "tenant-id" + }, + "client_id": { + "label": "OneLake client id", + "order": 2, + "type": "str", + "value": "client-id" + }, + "client_secret": { + "label": "OneLake client secret", + "order": 3, + "type": "str", + "sensitive": true, + "value": "client-secret" + }, + "workspace_name": { + "label": "OneLake workspace name", + "order": 4, + "type": "str", + "value": "testWorkspace" + }, + "data_path": { + "label": "OneLake data path", + "tooltip": "Path in format .Lakehouse/files/", + "order": 5, + "type": "str", + "value": "test_data_path" + }, + "account_name": { + "tooltip": "In the most cases is 'onelake'", + "default_value": "onelake", + "label": "Account name", + "order": 6, + "type": "str", + "value": "onelake" + } + } +} diff --git a/tests/sources/fixtures/onelake/docker-compose.yml b/tests/sources/fixtures/onelake/docker-compose.yml new file mode 100644 index 000000000..941f93f0a --- /dev/null +++ b/tests/sources/fixtures/onelake/docker-compose.yml @@ -0,0 +1,44 @@ +version: "3.9" + +services: + elasticsearch: + image: ${ELASTICSEARCH_DRA_DOCKER_IMAGE} + container_name: elasticsearch + environment: + - cluster.name=docker-cluster + - bootstrap.memory_lock=true + - ES_JAVA_OPTS=-Xms2g -Xmx2g + - ELASTIC_PASSWORD=changeme + - xpack.security.enabled=true + - xpack.security.authc.api_key.enabled=true + - discovery.type=single-node + - action.destructive_requires_name=false + ulimits: + memlock: + soft: -1 + hard: -1 + volumes: + - esdata:/usr/share/elasticsearch/data + ports: + - 9200:9200 + networks: + - esnet + + onelake: + build: + context: ../../../../ + dockerfile: ${DOCKERFILE_FTEST_PATH} + command: .venv/bin/python tests/sources/fixtures/onelake/fixture.py + ports: + - "8000:8000" + volumes: + - .:/python-flask + restart: always + +volumes: + esdata: + driver: local + +networks: + esnet: + driver: bridge diff --git a/tests/sources/fixtures/onelake/fixture.py b/tests/sources/fixtures/onelake/fixture.py new file mode 100644 index 000000000..48ac90fc2 --- /dev/null +++ b/tests/sources/fixtures/onelake/fixture.py @@ -0,0 +1,92 @@ +from typing import Dict, List + +""" +This is a fixture for generating test data and listing paths in a file system. +""" + + +class PathProperties: + def __init__(self, name: str, is_directory: bool = False): + self.name = name + self.is_directory = is_directory + + def to_dict(self): + return {"name": self.name, "is_directory": self.is_directory} + + +class ItemPaged: + def __init__(self, items: List[PathProperties]): + self.items = items + + def __iter__(self): + for item in self.items: + yield item + + +class FileSystemClient: + def __init__(self, file_system_name: str): + self.file_system_name = file_system_name + self.files = {} + + def add_file(self, file_path: str, is_directory: bool = False): + self.files[file_path] = is_directory + + def get_paths(self, path: str = None): + paths = [ + PathProperties(name=file_path, is_directory=is_directory) + for file_path, is_directory in self.files.items() + if path is None or file_path.startswith(path) + ] + return ItemPaged(paths) + + +FILE_SYSTEMS: Dict[str, FileSystemClient] = {} + + +def create_file_system(name: str): + if name not in FILE_SYSTEMS: + FILE_SYSTEMS[name] = FileSystemClient(name) + + +def load(config: Dict): + """ + Loads initial data into the backend based on OneLake configuration. + + Args: + config: Dictionary containing OneLake configuration with format: + { + "configuration": { + "workspace_name": {"value": str}, + "data_path": {"value": str}, + ... + } + } + """ + if not config.get("configuration"): + raise ValueError("Invalid configuration format") + + conf = config["configuration"] + workspace_name = conf["workspace_name"]["value"] + data_path = conf["data_path"]["value"] + + create_file_system(workspace_name) + generate_test_data(workspace_name, data_path, file_count=10000) + + +def generate_test_data(file_system_name: str, folder_path: str, file_count: int): + create_file_system(file_system_name) + file_system = FILE_SYSTEMS[file_system_name] + + file_system.add_file(folder_path, is_directory=True) + + for i in range(file_count): + file_name = f"{folder_path}/file_{i}.txt" + file_system.add_file(file_name) + + +def list_paths(file_system_name: str, folder_path: str): + if file_system_name not in FILE_SYSTEMS: + return [] + + file_system = FILE_SYSTEMS[file_system_name] + return file_system.get_paths(path=folder_path) From 54c5bf0bc1f78b1fd474a47c60b25a1b9a8bac91 Mon Sep 17 00:00:00 2001 From: Jeffrey Date: Fri, 3 Jan 2025 15:20:06 -0500 Subject: [PATCH 14/22] (onelake) tests: added fake credential --- tests/sources/test_onelake.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/tests/sources/test_onelake.py b/tests/sources/test_onelake.py index a947c3729..aa37bf790 100644 --- a/tests/sources/test_onelake.py +++ b/tests/sources/test_onelake.py @@ -15,11 +15,11 @@ async def create_abs_source( ): async with create_source( OneLakeDataSource, - tenant_id="", - client_id="", - client_secret="", - workspace_name="", - data_path="", + tenant_id="fake-tenant", + client_id="-fake-client", + client_secret="fake-client", + workspace_name="FakeWorkspace", + data_path="FakeDatalake.Lakehouse/Files/Data", use_text_extraction_service=use_text_extraction_service, ) as source: yield source From abfc07e4c7c4cb6444276a233fa125a274bc0bf0 Mon Sep 17 00:00:00 2001 From: Jeffrey Date: Wed, 8 Jan 2025 15:23:13 -0500 Subject: [PATCH 15/22] (onelake) _get_account_url method replaced by one field of the class --- connectors/sources/onelake.py | 16 ++++++---------- 1 file changed, 6 insertions(+), 10 deletions(-) diff --git a/connectors/sources/onelake.py b/connectors/sources/onelake.py index 580b7119e..3a525bc5b 100644 --- a/connectors/sources/onelake.py +++ b/connectors/sources/onelake.py @@ -29,6 +29,9 @@ def __init__(self, configuration): self.client_secret = self.configuration["client_secret"] self.workspace_name = self.configuration["workspace_name"] self.data_path = self.configuration["data_path"] + self.account_url = ( + f"https://{self.configuration['account_name']}.dfs.fabric.microsoft.com" + ) @classmethod def get_default_configuration(cls): @@ -89,15 +92,6 @@ async def ping(self): self._logger.exception("Error while connecting to OneLake.") raise - def _get_account_url(self): - """Get the account URL for OneLake - - Returns: - str: Account URL - """ - - return f"https://{self.configuration['account_name']}.dfs.fabric.microsoft.com" - def _get_token_credentials(self): """Get the token credentials for OneLake @@ -124,7 +118,7 @@ async def _get_service_client(self): try: return DataLakeServiceClient( - account_url=self._get_account_url(), + account_url=self.account_url, credential=self._get_token_credentials(), ) except Exception as e: @@ -315,11 +309,13 @@ async def get_docs(self, filtering=None): """ self._logger.info(f"Fetching files from OneLake datalake {self.data_path}") + directory_paths = await self._get_directory_paths( self.configuration["data_path"] ) self._logger.debug(f"Found {len(directory_paths)} files in {self.data_path}") + async for file in self.prepare_files(directory_paths): file_dict = file From 1e1b27a065e9ca4e1f7c857a48ecb1829769f39c Mon Sep 17 00:00:00 2001 From: Delacrobix Date: Sun, 25 May 2025 18:59:12 -0500 Subject: [PATCH 16/22] Clients refactorized --- connectors/sources/onelake.py | 27 +++++++++++++-------------- 1 file changed, 13 insertions(+), 14 deletions(-) diff --git a/connectors/sources/onelake.py b/connectors/sources/onelake.py index 3a525bc5b..a5e487f28 100644 --- a/connectors/sources/onelake.py +++ b/connectors/sources/onelake.py @@ -16,6 +16,9 @@ class OneLakeDataSource(BaseDataSource): name = "OneLake" service_type = "onelake" incremental_sync_enabled = True + service_client = None + file_system_client = None + directory_client = None def __init__(self, configuration): """Set up the connection to the azure base client @@ -32,6 +35,9 @@ def __init__(self, configuration): self.account_url = ( f"https://{self.configuration['account_name']}.dfs.fabric.microsoft.com" ) + self.service_client = self._get_service_client() + self.file_system_client = self._get_file_system_client() + self.directory_client = self._get_directory_client() @classmethod def get_default_configuration(cls): @@ -110,7 +116,7 @@ def _get_token_credentials(self): raise async def _get_service_client(self): - """Get the service client for OneLake + """Get the service client for OneLake. The service client is the client that allows to interact with the OneLake service. Returns: obj: Service client @@ -126,15 +132,14 @@ async def _get_service_client(self): raise async def _get_file_system_client(self): - """Get the file system client for OneLake + """Get the file system client for OneLake. This client is used to interact with the file system of the OneLake service. Returns: obj: File system client """ - try: - service_client = await self._get_service_client() - return service_client.get_file_system_client( + try: + return self.service_client.get_file_system_client( self.configuration["workspace_name"] ) except Exception as e: @@ -149,9 +154,7 @@ async def _get_directory_client(self): """ try: - file_system_client = await self._get_file_system_client() - - return file_system_client.get_directory_client( + return self.file_system_client.get_directory_client( self.configuration["data_path"] ) except Exception as e: @@ -169,9 +172,7 @@ async def _get_file_client(self, file_name): """ try: - directory_client = await self._get_directory_client() - - return directory_client.get_file_client(file_name) + return self.directory_client.get_file_client(file_name) except Exception as e: self._logger.error(f"Error while getting file client: {e}") raise @@ -187,9 +188,7 @@ async def _get_directory_paths(self, directory_path): """ try: - file_system_client = await self._get_file_system_client() - - return file_system_client.get_paths(path=directory_path) + return self.file_system_client.get_paths(path=directory_path) except Exception as e: self._logger.error(f"Error while getting directory paths: {e}") raise From 006a66fbe853ec430f62675ac142652ea62a0026 Mon Sep 17 00:00:00 2001 From: Delacrobix Date: Sun, 25 May 2025 19:00:56 -0500 Subject: [PATCH 17/22] adding account_url to global variables --- connectors/sources/onelake.py | 1 + 1 file changed, 1 insertion(+) diff --git a/connectors/sources/onelake.py b/connectors/sources/onelake.py index a5e487f28..53f4318e8 100644 --- a/connectors/sources/onelake.py +++ b/connectors/sources/onelake.py @@ -19,6 +19,7 @@ class OneLakeDataSource(BaseDataSource): service_client = None file_system_client = None directory_client = None + account_url = None def __init__(self, configuration): """Set up the connection to the azure base client From 930d9859e131f95d05fc1b48b1348fd214501b87 Mon Sep 17 00:00:00 2001 From: Delacrobix Date: Tue, 17 Jun 2025 15:52:32 -0500 Subject: [PATCH 18/22] making async sync code --- connectors/sources/onelake.py | 72 ++++++++++++++++++++++++++++++----- 1 file changed, 63 insertions(+), 9 deletions(-) diff --git a/connectors/sources/onelake.py b/connectors/sources/onelake.py index 53f4318e8..ba99c63a0 100644 --- a/connectors/sources/onelake.py +++ b/connectors/sources/onelake.py @@ -1,5 +1,6 @@ """OneLake connector to retrieve data from datalakes""" +import asyncio from functools import partial from azure.identity import ClientSecretCredential @@ -99,6 +100,31 @@ async def ping(self): self._logger.exception("Error while connecting to OneLake.") raise + async def _process_items_concurrently( + self, items, process_item_func, max_concurrency=10 + ): + """Process a list of items concurrently using a semaphore for concurrency control. + + This function applies the `process_item_func` to each item in the `items` list + using a semaphore to control the level of concurrency. + + Args: + items (list): List of items to process. + process_item_func (function): The function to be called for each item. + max_concurrency (int): Maximum number of concurrent items to process. + + Returns: + list: A list containing the results of processing each item. + """ + + async def process_item(item, semaphore): + async with semaphore: + return await process_item_func(item) + + semaphore = asyncio.Semaphore(max_concurrency) + tasks = [process_item(item, semaphore) for item in items] + return await asyncio.gather(*tasks) + def _get_token_credentials(self): """Get the token credentials for OneLake @@ -178,6 +204,23 @@ async def _get_file_client(self, file_name): self._logger.error(f"Error while getting file client: {e}") raise + async def get_files_properties(self, file_clients): + """Get the properties of a list of file clients + + Args: + file_clients (list): List of file clients + + Returns: + list: List of file properties + """ + + async def get_properties(file_client): + return file_client.get_file_properties() + + return await self._process_items_concurrently( + file_clients, get_properties, max_concurrency=10 + ) + async def _get_directory_paths(self, directory_path): """List directory paths from data lake @@ -189,12 +232,15 @@ async def _get_directory_paths(self, directory_path): """ try: - return self.file_system_client.get_paths(path=directory_path) + loop = asyncio.get_running_loop() + return await loop.run_in_executor( + None, lambda: self.file_system_client.get_paths(path=directory_path) + ) except Exception as e: self._logger.error(f"Error while getting directory paths: {e}") raise - def format_file(self, file_client): + async def format_file(self, file_client): """Format file_client to be processed Args: @@ -205,7 +251,10 @@ def format_file(self, file_client): """ try: - file_properties = file_client.get_file_properties() + loop = asyncio.get_running_loop() + file_properties = await loop.run_in_executor( + None, file_client.get_file_properties + ) return { "_id": f"{file_client.file_system_name}_{file_properties.name.split('/')[-1]}", @@ -231,9 +280,10 @@ async def download_file(self, file_client): """ try: - download = file_client.download_file() - stream = download.chunks() + loop = asyncio.get_running_loop() + download = await loop.run_in_executor(None, file_client.download_file) + stream = download.chunks() for chunk in stream: yield chunk except Exception as e: @@ -291,15 +341,19 @@ async def prepare_files(self, doc_paths): Args: doc_paths (list): List of paths extracted from OneLake - Yields: - tuple: File document and partial function to get content + Returns: + list: List of files """ - for path in doc_paths: + async def prepare_single_file(path): file_name = path.name.split("/")[-1] field_client = await self._get_file_client(file_name) + return self.format_file(field_client) + + files = await self._process_items_concurrently(doc_paths, prepare_single_file) - yield self.format_file(field_client) + for file in files: + yield file async def get_docs(self, filtering=None): """Get documents from OneLake and index them From 78627c81e17ed666bdf1d30a9fd8cc24740893df Mon Sep 17 00:00:00 2001 From: Delacrobix Date: Wed, 18 Jun 2025 15:32:00 -0500 Subject: [PATCH 19/22] fixing issues and initialize method --- connectors/sources/onelake.py | 46 ++++++++++++++++++++++------------- 1 file changed, 29 insertions(+), 17 deletions(-) diff --git a/connectors/sources/onelake.py b/connectors/sources/onelake.py index ba99c63a0..a81807e49 100644 --- a/connectors/sources/onelake.py +++ b/connectors/sources/onelake.py @@ -17,10 +17,6 @@ class OneLakeDataSource(BaseDataSource): name = "OneLake" service_type = "onelake" incremental_sync_enabled = True - service_client = None - file_system_client = None - directory_client = None - account_url = None def __init__(self, configuration): """Set up the connection to the azure base client @@ -37,9 +33,9 @@ def __init__(self, configuration): self.account_url = ( f"https://{self.configuration['account_name']}.dfs.fabric.microsoft.com" ) - self.service_client = self._get_service_client() - self.file_system_client = self._get_file_system_client() - self.directory_client = self._get_directory_client() + self.service_client = None + self.file_system_client = None + self.directory_client = None @classmethod def get_default_configuration(cls): @@ -72,7 +68,7 @@ def get_default_configuration(cls): }, "data_path": { "label": "OneLake data path", - "tooltip": "Path in format .Lakehouse/files/", + "tooltip": "Path in format .Lakehouse/files/ this is uppercase sensitive, make sure to insert the correct path", "order": 5, "type": "str", }, @@ -80,28 +76,40 @@ def get_default_configuration(cls): "tooltip": "In the most cases is 'onelake'", "default_value": ACCOUNT_NAME, "label": "Account name", + "required": False, "order": 6, "type": "str", }, } + async def initialize(self): + """Initialize the Azure clients asynchronously""" + + if not self.service_client: + self.service_client = await self._get_service_client() + self.file_system_client = await self._get_file_system_client() + self.directory_client = await self._get_directory_client() + async def ping(self): """Verify the connection with OneLake""" self._logger.info("Generating file system client...") try: - await self._get_directory_paths(self.configuration["data_path"]) + await self.initialize() # Initialize the clients + + await self._get_directory_paths( + self.configuration["data_path"] + ) # Condition to check if the connection is successful self._logger.info( f"Connection to OneLake successful to {self.configuration['data_path']}" ) - except Exception: self._logger.exception("Error while connecting to OneLake.") raise async def _process_items_concurrently( - self, items, process_item_func, max_concurrency=10 + self, items, process_item_func, max_concurrency=50 ): """Process a list of items concurrently using a semaphore for concurrency control. @@ -217,9 +225,7 @@ async def get_files_properties(self, file_clients): async def get_properties(file_client): return file_client.get_file_properties() - return await self._process_items_concurrently( - file_clients, get_properties, max_concurrency=10 - ) + return await self._process_items_concurrently(file_clients, get_properties) async def _get_directory_paths(self, directory_path): """List directory paths from data lake @@ -232,10 +238,15 @@ async def _get_directory_paths(self, directory_path): """ try: + if not self.file_system_client: + await self.initialize() + loop = asyncio.get_running_loop() - return await loop.run_in_executor( - None, lambda: self.file_system_client.get_paths(path=directory_path) + paths = await loop.run_in_executor( + None, self.file_system_client.get_paths, directory_path ) + + return paths except Exception as e: self._logger.error(f"Error while getting directory paths: {e}") raise @@ -348,7 +359,7 @@ async def prepare_files(self, doc_paths): async def prepare_single_file(path): file_name = path.name.split("/")[-1] field_client = await self._get_file_client(file_name) - return self.format_file(field_client) + return await self.format_file(field_client) files = await self._process_items_concurrently(doc_paths, prepare_single_file) @@ -367,6 +378,7 @@ async def get_docs(self, filtering=None): directory_paths = await self._get_directory_paths( self.configuration["data_path"] ) + directory_paths = list(directory_paths) self._logger.debug(f"Found {len(directory_paths)} files in {self.data_path}") From 45016b1d135007d7fecd1dacf62a8672fda73d47 Mon Sep 17 00:00:00 2001 From: Delacrobix Date: Fri, 20 Jun 2025 13:42:22 -0500 Subject: [PATCH 20/22] renaming create source method in onelake tests --- tests/sources/test_onelake.py | 50 +++++++++++++++++------------------ 1 file changed, 25 insertions(+), 25 deletions(-) diff --git a/tests/sources/test_onelake.py b/tests/sources/test_onelake.py index aa37bf790..f25639430 100644 --- a/tests/sources/test_onelake.py +++ b/tests/sources/test_onelake.py @@ -10,7 +10,7 @@ @asynccontextmanager -async def create_abs_source( +async def create_onelake_source( use_text_extraction_service=False, ): async with create_source( @@ -30,7 +30,7 @@ async def test_ping_for_successful_connection(): """Test ping method of OneLakeDataSource class""" # Setup - async with create_abs_source() as source: + async with create_onelake_source() as source: with patch.object( source, "_get_directory_paths", new_callable=AsyncMock ) as mock_get_paths: @@ -48,7 +48,7 @@ async def test_ping_for_failed_connection(): """Test ping method of OneLakeDataSource class with negative case""" # Setup - async with create_abs_source() as source: + async with create_onelake_source() as source: with patch.object( source, "_get_directory_paths", new_callable=AsyncMock ) as mock_get_paths: @@ -69,7 +69,7 @@ async def test_get_account_url(): """Test _get_account_url method of OneLakeDataSource class""" # Setup - async with create_abs_source() as source: + async with create_onelake_source() as source: account_name = source.configuration["account_name"] expected_url = f"https://{account_name}.dfs.fabric.microsoft.com" @@ -85,7 +85,7 @@ async def test_get_token_credentials(): """Test _get_token_credentials method of OneLakeDataSource class""" # Setup - async with create_abs_source() as source: + async with create_onelake_source() as source: tenant_id = source.configuration["tenant_id"] client_id = source.configuration["client_id"] client_secret = source.configuration["client_secret"] @@ -107,7 +107,7 @@ async def test_get_token_credentials(): async def test_get_token_credentials_error(): """Test _get_token_credentials method when credential creation fails""" - async with create_abs_source() as source: + async with create_onelake_source() as source: with patch( "connectors.sources.onelake.ClientSecretCredential", autospec=True ) as mock_credential: @@ -122,7 +122,7 @@ async def test_get_service_client(): """Test _get_service_client method of OneLakeDataSource class""" # Setup - async with create_abs_source() as source: + async with create_onelake_source() as source: mock_service_client = Mock() mock_account_url = "https://mockaccount.dfs.fabric.microsoft.com" mock_credentials = Mock() @@ -154,7 +154,7 @@ async def test_get_service_client(): async def test_get_service_client_error(): """Test _get_service_client method when client creation fails""" - async with create_abs_source() as source: + async with create_onelake_source() as source: with patch( "connectors.sources.onelake.DataLakeServiceClient", side_effect=Exception("Service client error"), @@ -168,7 +168,7 @@ async def test_get_file_system_client(): """Test _get_file_system_client method of OneLakeDataSource class""" # Setup - async with create_abs_source() as source: + async with create_onelake_source() as source: mock_file_system_client = Mock() workspace_name = source.configuration["workspace_name"] @@ -195,7 +195,7 @@ async def test_get_file_system_client(): async def test_get_file_system_client_error(): """Test _get_file_system_client method when client creation fails""" - async with create_abs_source() as source: + async with create_onelake_source() as source: mock_service_client = Mock() mock_service_client.get_file_system_client.side_effect = Exception( "File system error" @@ -215,7 +215,7 @@ async def test_get_directory_client(): """Test _get_directory_client method of OneLakeDataSource class""" # Setup - async with create_abs_source() as source: + async with create_onelake_source() as source: mock_directory_client = Mock() data_path = source.configuration["data_path"] @@ -242,7 +242,7 @@ async def test_get_directory_client(): async def test_get_directory_client_error(): """Test _get_directory_client method when client creation fails""" - async with create_abs_source() as source: + async with create_onelake_source() as source: mock_file_system_client = Mock() mock_file_system_client.get_directory_client.side_effect = Exception( "Directory error" @@ -265,7 +265,7 @@ async def test_get_file_client_success(): mock_directory_client = Mock() mock_directory_client.get_file_client.return_value = mock_file_client - async with create_abs_source() as source: + async with create_onelake_source() as source: with patch.object( source, "_get_directory_client", new_callable=AsyncMock ) as mock_get_directory: @@ -281,7 +281,7 @@ async def test_get_file_client_success(): async def test_get_file_client_error(): """Test file client retrieval with error""" - async with create_abs_source() as source: + async with create_onelake_source() as source: with patch.object( source, "_get_directory_client", new_callable=AsyncMock ) as mock_get_directory: @@ -296,7 +296,7 @@ async def test_get_directory_paths(): """Test _get_directory_paths method of OneLakeDataSource class""" # Setup - async with create_abs_source() as source: + async with create_onelake_source() as source: mock_paths = ["path1", "path2"] directory_path = "mock_directory_path" @@ -321,7 +321,7 @@ async def test_get_directory_paths(): async def test_get_directory_paths_error(): """Test _get_directory_paths method when getting paths fails""" - async with create_abs_source() as source: + async with create_onelake_source() as source: directory_path = "mock_directory_path" with patch.object( source, "_get_file_system_client", new_callable=AsyncMock @@ -340,7 +340,7 @@ async def test_format_file(): """Test format_file method of OneLakeDataSource class""" # Setup - async with create_abs_source() as source: + async with create_onelake_source() as source: mock_file_client = MagicMock() mock_file_properties = MagicMock( creation_time=datetime(2022, 4, 21, 12, 12, 30), @@ -373,7 +373,7 @@ async def test_format_file(): async def test_format_file_error(): """Test format_file method when getting properties fails""" - async with create_abs_source() as source: + async with create_onelake_source() as source: mock_file_client = MagicMock() mock_file_client.get_file_properties.side_effect = Exception("Properties error") mock_file_client.file_system_name = "my_file_system" @@ -386,7 +386,7 @@ async def test_format_file_error(): async def test_format_file_empty_name(): """Test format_file method with empty file name""" - async with create_abs_source() as source: + async with create_onelake_source() as source: mock_file_client = MagicMock() mock_file_properties = MagicMock( creation_time=datetime(2022, 4, 21, 12, 12, 30), @@ -416,7 +416,7 @@ async def test_download_file(): mock_download.chunks.return_value = iter(mock_chunks) - async with create_abs_source() as source: + async with create_onelake_source() as source: # Run chunks = [] async for chunk in source.download_file(mock_file_client): @@ -438,7 +438,7 @@ async def test_download_file_with_error(): mock_file_client.download_file.return_value = mock_download mock_download.chunks.side_effect = Exception("Download error") - async with create_abs_source() as source: + async with create_onelake_source() as source: # Run & Check with pytest.raises(Exception, match="Download error"): async for _ in source.download_file(mock_file_client): @@ -461,7 +461,7 @@ async def test_get_content_with_download(): "data_path": "mockpath", } - async with create_abs_source() as source: + async with create_onelake_source() as source: source.configuration = mock_configuration class FileClientMock: @@ -498,7 +498,7 @@ def get_file_properties(self): async def test_get_content_without_download(): """Test get_content method when doit=False""" - async with create_abs_source() as source: + async with create_onelake_source() as source: source.configuration = { "account_name": "mockaccount", "tenant_id": "mocktenant", @@ -546,7 +546,7 @@ async def mock_format_file(*args, **kwargs): return "file_document", "partial_function" - async with create_abs_source() as source: + async with create_onelake_source() as source: with patch.object( source, "_get_file_client", new_callable=AsyncMock ) as mock_get_file_client: @@ -582,7 +582,7 @@ async def mock_prepare_files_impl(paths): for doc in mock_file_docs: yield doc - async with create_abs_source() as source: + async with create_onelake_source() as source: with patch.object( source, "_get_directory_paths", new_callable=AsyncMock ) as mock_get_paths: From 9064512c3c5189ae3f4c815ab588bb9c5b52e9ba Mon Sep 17 00:00:00 2001 From: Delacrobix Date: Sun, 6 Jul 2025 12:41:35 -0500 Subject: [PATCH 21/22] Test updated, onelake test with 97% of coverage and global coverage of 92% --- tests/sources/fixtures/onelake/connector.json | 44 -- .../fixtures/onelake/docker-compose.yml | 44 -- tests/sources/fixtures/onelake/fixture.py | 92 --- tests/sources/test_onelake.py | 659 ++++++++++++------ 4 files changed, 433 insertions(+), 406 deletions(-) delete mode 100644 tests/sources/fixtures/onelake/connector.json delete mode 100644 tests/sources/fixtures/onelake/docker-compose.yml delete mode 100644 tests/sources/fixtures/onelake/fixture.py diff --git a/tests/sources/fixtures/onelake/connector.json b/tests/sources/fixtures/onelake/connector.json deleted file mode 100644 index 1b74c208b..000000000 --- a/tests/sources/fixtures/onelake/connector.json +++ /dev/null @@ -1,44 +0,0 @@ -{ - "configuration": { - "tenant_id": { - "label": "OneLake tenant id", - "order": 1, - "type": "str", - "value": "tenant-id" - }, - "client_id": { - "label": "OneLake client id", - "order": 2, - "type": "str", - "value": "client-id" - }, - "client_secret": { - "label": "OneLake client secret", - "order": 3, - "type": "str", - "sensitive": true, - "value": "client-secret" - }, - "workspace_name": { - "label": "OneLake workspace name", - "order": 4, - "type": "str", - "value": "testWorkspace" - }, - "data_path": { - "label": "OneLake data path", - "tooltip": "Path in format .Lakehouse/files/", - "order": 5, - "type": "str", - "value": "test_data_path" - }, - "account_name": { - "tooltip": "In the most cases is 'onelake'", - "default_value": "onelake", - "label": "Account name", - "order": 6, - "type": "str", - "value": "onelake" - } - } -} diff --git a/tests/sources/fixtures/onelake/docker-compose.yml b/tests/sources/fixtures/onelake/docker-compose.yml deleted file mode 100644 index 941f93f0a..000000000 --- a/tests/sources/fixtures/onelake/docker-compose.yml +++ /dev/null @@ -1,44 +0,0 @@ -version: "3.9" - -services: - elasticsearch: - image: ${ELASTICSEARCH_DRA_DOCKER_IMAGE} - container_name: elasticsearch - environment: - - cluster.name=docker-cluster - - bootstrap.memory_lock=true - - ES_JAVA_OPTS=-Xms2g -Xmx2g - - ELASTIC_PASSWORD=changeme - - xpack.security.enabled=true - - xpack.security.authc.api_key.enabled=true - - discovery.type=single-node - - action.destructive_requires_name=false - ulimits: - memlock: - soft: -1 - hard: -1 - volumes: - - esdata:/usr/share/elasticsearch/data - ports: - - 9200:9200 - networks: - - esnet - - onelake: - build: - context: ../../../../ - dockerfile: ${DOCKERFILE_FTEST_PATH} - command: .venv/bin/python tests/sources/fixtures/onelake/fixture.py - ports: - - "8000:8000" - volumes: - - .:/python-flask - restart: always - -volumes: - esdata: - driver: local - -networks: - esnet: - driver: bridge diff --git a/tests/sources/fixtures/onelake/fixture.py b/tests/sources/fixtures/onelake/fixture.py deleted file mode 100644 index 48ac90fc2..000000000 --- a/tests/sources/fixtures/onelake/fixture.py +++ /dev/null @@ -1,92 +0,0 @@ -from typing import Dict, List - -""" -This is a fixture for generating test data and listing paths in a file system. -""" - - -class PathProperties: - def __init__(self, name: str, is_directory: bool = False): - self.name = name - self.is_directory = is_directory - - def to_dict(self): - return {"name": self.name, "is_directory": self.is_directory} - - -class ItemPaged: - def __init__(self, items: List[PathProperties]): - self.items = items - - def __iter__(self): - for item in self.items: - yield item - - -class FileSystemClient: - def __init__(self, file_system_name: str): - self.file_system_name = file_system_name - self.files = {} - - def add_file(self, file_path: str, is_directory: bool = False): - self.files[file_path] = is_directory - - def get_paths(self, path: str = None): - paths = [ - PathProperties(name=file_path, is_directory=is_directory) - for file_path, is_directory in self.files.items() - if path is None or file_path.startswith(path) - ] - return ItemPaged(paths) - - -FILE_SYSTEMS: Dict[str, FileSystemClient] = {} - - -def create_file_system(name: str): - if name not in FILE_SYSTEMS: - FILE_SYSTEMS[name] = FileSystemClient(name) - - -def load(config: Dict): - """ - Loads initial data into the backend based on OneLake configuration. - - Args: - config: Dictionary containing OneLake configuration with format: - { - "configuration": { - "workspace_name": {"value": str}, - "data_path": {"value": str}, - ... - } - } - """ - if not config.get("configuration"): - raise ValueError("Invalid configuration format") - - conf = config["configuration"] - workspace_name = conf["workspace_name"]["value"] - data_path = conf["data_path"]["value"] - - create_file_system(workspace_name) - generate_test_data(workspace_name, data_path, file_count=10000) - - -def generate_test_data(file_system_name: str, folder_path: str, file_count: int): - create_file_system(file_system_name) - file_system = FILE_SYSTEMS[file_system_name] - - file_system.add_file(folder_path, is_directory=True) - - for i in range(file_count): - file_name = f"{folder_path}/file_{i}.txt" - file_system.add_file(file_name) - - -def list_paths(file_system_name: str, folder_path: str): - if file_system_name not in FILE_SYSTEMS: - return [] - - file_system = FILE_SYSTEMS[file_system_name] - return file_system.get_paths(path=folder_path) diff --git a/tests/sources/test_onelake.py b/tests/sources/test_onelake.py index f25639430..34e4cb6df 100644 --- a/tests/sources/test_onelake.py +++ b/tests/sources/test_onelake.py @@ -1,7 +1,7 @@ from contextlib import asynccontextmanager from datetime import datetime from functools import partial -from unittest.mock import AsyncMock, MagicMock, Mock, call, patch +from unittest.mock import AsyncMock, MagicMock, Mock, patch import pytest @@ -10,36 +10,130 @@ @asynccontextmanager -async def create_onelake_source( +async def create_abs_source( use_text_extraction_service=False, ): async with create_source( OneLakeDataSource, tenant_id="fake-tenant", - client_id="-fake-client", - client_secret="fake-client", + client_id="fake-client", + client_secret="fake-client-secret", workspace_name="FakeWorkspace", data_path="FakeDatalake.Lakehouse/Files/Data", + account_name="onelake", use_text_extraction_service=use_text_extraction_service, ) as source: yield source +@pytest.mark.asyncio +async def test_init(): + """Test OneLakeDataSource initialization""" + + async with create_abs_source() as source: + # Check that all configuration values are set correctly + assert source.tenant_id == source.configuration["tenant_id"] + assert source.client_id == source.configuration["client_id"] + assert source.client_secret == source.configuration["client_secret"] + assert source.workspace_name == source.configuration["workspace_name"] + assert source.data_path == source.configuration["data_path"] + assert ( + source.account_url + == f"https://{source.configuration['account_name']}.dfs.fabric.microsoft.com" + ) + + # Check that clients are initially None + assert source.service_client is None + assert source.file_system_client is None + assert source.directory_client is None + + +def test_get_default_configuration(): + """Test get_default_configuration class method""" + + config = OneLakeDataSource.get_default_configuration() + + # Check that all required configuration fields are present + required_fields = [ + "tenant_id", + "client_id", + "client_secret", + "workspace_name", + "data_path", + "account_name", + ] + + for field in required_fields: + assert field in config + assert "label" in config[field] + assert "order" in config[field] + assert "type" in config[field] + + # Check specific configurations + assert config["account_name"]["default_value"] == "onelake" + assert config["client_secret"]["sensitive"] is True + assert config["account_name"]["required"] is False + + +@pytest.mark.asyncio +async def test_initialize(): + """Test initialize method""" + + async with create_abs_source() as source: + mock_service_client = Mock() + mock_file_system_client = Mock() + mock_directory_client = Mock() + + with patch.object( + source, "_get_service_client", new_callable=AsyncMock + ) as mock_get_service: + with patch.object( + source, "_get_file_system_client", new_callable=AsyncMock + ) as mock_get_fs: + with patch.object( + source, "_get_directory_client", new_callable=AsyncMock + ) as mock_get_dir: + mock_get_service.return_value = mock_service_client + mock_get_fs.return_value = mock_file_system_client + mock_get_dir.return_value = mock_directory_client + + # Test first initialization + await source.initialize() + + assert source.service_client == mock_service_client + assert source.file_system_client == mock_file_system_client + assert source.directory_client == mock_directory_client + + mock_get_service.assert_called_once() + mock_get_fs.assert_called_once() + mock_get_dir.assert_called_once() + + # Test that it doesn't re-initialize if service_client already exists + mock_get_service.reset_mock() + mock_get_fs.reset_mock() + mock_get_dir.reset_mock() + + await source.initialize() + + # Should not be called again + mock_get_service.assert_not_called() + mock_get_fs.assert_not_called() + mock_get_dir.assert_not_called() + + @pytest.mark.asyncio async def test_ping_for_successful_connection(): """Test ping method of OneLakeDataSource class""" # Setup - async with create_onelake_source() as source: + async with create_abs_source() as source: with patch.object( source, "_get_directory_paths", new_callable=AsyncMock ) as mock_get_paths: mock_get_paths.return_value = [] - # Run await source.ping() - # Check mock_get_paths.assert_called_once_with(source.configuration["data_path"]) @@ -48,7 +142,7 @@ async def test_ping_for_failed_connection(): """Test ping method of OneLakeDataSource class with negative case""" # Setup - async with create_onelake_source() as source: + async with create_abs_source() as source: with patch.object( source, "_get_directory_paths", new_callable=AsyncMock ) as mock_get_paths: @@ -60,32 +154,13 @@ async def test_ping_for_failed_connection(): mock_get_paths.assert_called_once_with(source.configuration["data_path"]) - # Cleanup - mock_get_paths.reset_mock - - -@pytest.mark.asyncio -async def test_get_account_url(): - """Test _get_account_url method of OneLakeDataSource class""" - - # Setup - async with create_onelake_source() as source: - account_name = source.configuration["account_name"] - expected_url = f"https://{account_name}.dfs.fabric.microsoft.com" - - # Run - actual_url = source._get_account_url() - - # Check - assert actual_url == expected_url - @pytest.mark.asyncio async def test_get_token_credentials(): """Test _get_token_credentials method of OneLakeDataSource class""" # Setup - async with create_onelake_source() as source: + async with create_abs_source() as source: tenant_id = source.configuration["tenant_id"] client_id = source.configuration["client_id"] client_secret = source.configuration["client_secret"] @@ -107,7 +182,7 @@ async def test_get_token_credentials(): async def test_get_token_credentials_error(): """Test _get_token_credentials method when credential creation fails""" - async with create_onelake_source() as source: + async with create_abs_source() as source: with patch( "connectors.sources.onelake.ClientSecretCredential", autospec=True ) as mock_credential: @@ -122,19 +197,14 @@ async def test_get_service_client(): """Test _get_service_client method of OneLakeDataSource class""" # Setup - async with create_onelake_source() as source: + async with create_abs_source() as source: mock_service_client = Mock() - mock_account_url = "https://mockaccount.dfs.fabric.microsoft.com" mock_credentials = Mock() with patch( "connectors.sources.onelake.DataLakeServiceClient", autospec=True, ) as mock_client, patch.object( - source, - "_get_account_url", - return_value=mock_account_url, - ), patch.object( source, "_get_token_credentials", return_value=mock_credentials ): mock_client.return_value = mock_service_client @@ -144,7 +214,7 @@ async def test_get_service_client(): # Check mock_client.assert_called_once_with( - account_url=mock_account_url, + account_url=source.account_url, credential=mock_credentials, ) assert service_client is mock_service_client @@ -154,7 +224,7 @@ async def test_get_service_client(): async def test_get_service_client_error(): """Test _get_service_client method when client creation fails""" - async with create_onelake_source() as source: + async with create_abs_source() as source: with patch( "connectors.sources.onelake.DataLakeServiceClient", side_effect=Exception("Service client error"), @@ -168,46 +238,38 @@ async def test_get_file_system_client(): """Test _get_file_system_client method of OneLakeDataSource class""" # Setup - async with create_onelake_source() as source: + async with create_abs_source() as source: mock_file_system_client = Mock() workspace_name = source.configuration["workspace_name"] - with patch.object( - source, "_get_service_client", new_callable=AsyncMock - ) as mock_get_service_client: - mock_service_client = Mock() - mock_service_client.get_file_system_client.return_value = ( - mock_file_system_client - ) - mock_get_service_client.return_value = mock_service_client + # Set up the service_client that _get_file_system_client depends on + mock_service_client = Mock() + mock_service_client.get_file_system_client.return_value = ( + mock_file_system_client + ) + source.service_client = mock_service_client - # Run - file_system_client = await source._get_file_system_client() + # Run + file_system_client = await source._get_file_system_client() - # Check - mock_service_client.get_file_system_client.assert_called_once_with( - workspace_name - ) - assert file_system_client == mock_file_system_client + # Check + mock_service_client.get_file_system_client.assert_called_once_with( + workspace_name + ) + assert file_system_client == mock_file_system_client @pytest.mark.asyncio async def test_get_file_system_client_error(): """Test _get_file_system_client method when client creation fails""" - async with create_onelake_source() as source: + async with create_abs_source() as source: mock_service_client = Mock() - mock_service_client.get_file_system_client.side_effect = Exception( - "File system error" - ) + mock_service_client.get_file_system_client.side_effect = Exception("Test error") + source.service_client = mock_service_client - with patch.object( - source, "_get_service_client", new_callable=AsyncMock - ) as mock_get_service_client: - mock_get_service_client.return_value = mock_service_client - - with pytest.raises(Exception, match="File system error"): - await source._get_file_system_client() + with pytest.raises(Exception, match="Test error"): + await source._get_file_system_client() @pytest.mark.asyncio @@ -215,46 +277,38 @@ async def test_get_directory_client(): """Test _get_directory_client method of OneLakeDataSource class""" # Setup - async with create_onelake_source() as source: + async with create_abs_source() as source: mock_directory_client = Mock() data_path = source.configuration["data_path"] - with patch.object( - source, "_get_file_system_client", new_callable=AsyncMock - ) as mock_get_file_system_client: - mock_file_system_client = Mock() - mock_file_system_client.get_directory_client.return_value = ( - mock_directory_client - ) - mock_get_file_system_client.return_value = mock_file_system_client + # Set up the file_system_client that _get_directory_client depends on + mock_file_system_client = Mock() + mock_file_system_client.get_directory_client.return_value = ( + mock_directory_client + ) + source.file_system_client = mock_file_system_client - # Run - directory_client = await source._get_directory_client() + # Run + directory_client = await source._get_directory_client() - # Check - mock_file_system_client.get_directory_client.assert_called_once_with( - data_path - ) - assert directory_client == mock_directory_client + # Check + mock_file_system_client.get_directory_client.assert_called_once_with(data_path) + assert directory_client == mock_directory_client @pytest.mark.asyncio async def test_get_directory_client_error(): """Test _get_directory_client method when client creation fails""" - async with create_onelake_source() as source: + async with create_abs_source() as source: mock_file_system_client = Mock() mock_file_system_client.get_directory_client.side_effect = Exception( - "Directory error" + "Test error" ) + source.file_system_client = mock_file_system_client - with patch.object( - source, "_get_file_system_client", new_callable=AsyncMock - ) as mock_get_file_system_client: - mock_get_file_system_client.return_value = mock_file_system_client - - with pytest.raises(Exception, match="Directory error"): - await source._get_directory_client() + with pytest.raises(Exception, match="Test error"): + await source._get_directory_client() @pytest.mark.asyncio @@ -265,30 +319,29 @@ async def test_get_file_client_success(): mock_directory_client = Mock() mock_directory_client.get_file_client.return_value = mock_file_client - async with create_onelake_source() as source: - with patch.object( - source, "_get_directory_client", new_callable=AsyncMock - ) as mock_get_directory: - mock_get_directory.return_value = mock_directory_client + async with create_abs_source() as source: + # Mock the directory_client directly since that's what _get_file_client uses + source.directory_client = mock_directory_client - result = await source._get_file_client("test.txt") + result = await source._get_file_client("test.txt") - assert result == mock_file_client - mock_directory_client.get_file_client.assert_called_once_with("test.txt") + assert result == mock_file_client + mock_directory_client.get_file_client.assert_called_once_with("test.txt") @pytest.mark.asyncio async def test_get_file_client_error(): """Test file client retrieval with error""" - async with create_onelake_source() as source: - with patch.object( - source, "_get_directory_client", new_callable=AsyncMock - ) as mock_get_directory: - mock_get_directory.side_effect = Exception("Test error") + async with create_abs_source() as source: + mock_directory_client = Mock() + mock_directory_client.get_file_client.side_effect = Exception( + "Error while getting file client" + ) + source.directory_client = mock_directory_client - with pytest.raises(Exception, match="Test error"): - await source._get_file_client("test.txt") + with pytest.raises(Exception, match="Error while getting file client"): + await source._get_file_client("test.txt") @pytest.mark.asyncio @@ -296,42 +349,77 @@ async def test_get_directory_paths(): """Test _get_directory_paths method of OneLakeDataSource class""" # Setup - async with create_onelake_source() as source: + async with create_abs_source() as source: mock_paths = ["path1", "path2"] directory_path = "mock_directory_path" - with patch.object( - source, "_get_file_system_client", new_callable=AsyncMock - ) as mock_get_file_system_client: - mock_get_paths = Mock(return_value=mock_paths) - mock_file_system_client = mock_get_file_system_client.return_value - mock_file_system_client.get_paths = mock_get_paths + # Set up the file_system_client so initialize() is not called + mock_file_system_client = Mock() + source.file_system_client = mock_file_system_client + + # Mock the run_in_executor call + with patch("asyncio.get_running_loop") as mock_loop: + mock_loop.return_value.run_in_executor = AsyncMock(return_value=mock_paths) # Run paths = await source._get_directory_paths(directory_path) # Check - mock_file_system_client.get_paths.assert_called_once_with( - path=directory_path - ) assert paths == mock_paths + mock_loop.return_value.run_in_executor.assert_called_once_with( + None, mock_file_system_client.get_paths, directory_path + ) + + +@pytest.mark.asyncio +async def test_get_directory_paths_with_initialize(): + """Test _get_directory_paths method when file_system_client is None""" + + async with create_abs_source() as source: + mock_paths = ["path1", "path2"] + directory_path = "mock_directory_path" + + # Ensure file_system_client is None to trigger initialize() + source.file_system_client = None + + # Mock initialize to set up file_system_client + async def mock_initialize(): + mock_file_system_client = Mock() + source.file_system_client = mock_file_system_client + + with patch.object( + source, "initialize", side_effect=mock_initialize + ) as mock_init: + with patch("asyncio.get_running_loop") as mock_loop: + mock_loop.return_value.run_in_executor = AsyncMock( + return_value=mock_paths + ) + + # Run + paths = await source._get_directory_paths(directory_path) + + # Check + mock_init.assert_called_once() + assert paths == mock_paths @pytest.mark.asyncio async def test_get_directory_paths_error(): """Test _get_directory_paths method when getting paths fails""" - async with create_onelake_source() as source: + async with create_abs_source() as source: directory_path = "mock_directory_path" - with patch.object( - source, "_get_file_system_client", new_callable=AsyncMock - ) as mock_get_file_system_client: - mock_file_system_client = mock_get_file_system_client.return_value - mock_file_system_client.get_paths = AsyncMock( - side_effect=Exception("Path error") + + # Set up the file_system_client so initialize() is not called + mock_file_system_client = Mock() + source.file_system_client = mock_file_system_client + + with patch("asyncio.get_running_loop") as mock_loop: + mock_loop.return_value.run_in_executor = AsyncMock( + side_effect=Exception("Error while getting directory paths") ) - with pytest.raises(Exception, match="Path error"): + with pytest.raises(Exception, match="Error while getting directory paths"): await source._get_directory_paths(directory_path) @@ -340,7 +428,7 @@ async def test_format_file(): """Test format_file method of OneLakeDataSource class""" # Setup - async with create_onelake_source() as source: + async with create_abs_source() as source: mock_file_client = MagicMock() mock_file_properties = MagicMock( creation_time=datetime(2022, 4, 21, 12, 12, 30), @@ -350,7 +438,6 @@ async def test_format_file(): ) mock_file_properties.name.split.return_value = ["path", "to", "file.txt"] - mock_file_client.get_file_properties.return_value = mock_file_properties mock_file_client.file_system_name = "my_file_system" expected_output = { @@ -361,32 +448,41 @@ async def test_format_file(): "size": 2048, } - # Execute - actual_output = source.format_file(mock_file_client) + # Mock the run_in_executor call since format_file is now async + with patch("asyncio.get_running_loop") as mock_loop: + mock_loop.return_value.run_in_executor = AsyncMock( + return_value=mock_file_properties + ) + + # Execute + actual_output = await source.format_file(mock_file_client) - # Assert - assert actual_output == expected_output - mock_file_client.get_file_properties.assert_called_once() + # Assert + assert actual_output == expected_output @pytest.mark.asyncio async def test_format_file_error(): """Test format_file method when getting properties fails""" - async with create_onelake_source() as source: + async with create_abs_source() as source: mock_file_client = MagicMock() - mock_file_client.get_file_properties.side_effect = Exception("Properties error") mock_file_client.file_system_name = "my_file_system" - with pytest.raises(Exception, match="Properties error"): - source.format_file(mock_file_client) + with patch("asyncio.get_running_loop") as mock_loop: + mock_loop.return_value.run_in_executor = AsyncMock( + side_effect=Exception("Test error") + ) + + with pytest.raises(Exception, match="Test error"): + await source.format_file(mock_file_client) @pytest.mark.asyncio async def test_format_file_empty_name(): """Test format_file method with empty file name""" - async with create_onelake_source() as source: + async with create_abs_source() as source: mock_file_client = MagicMock() mock_file_properties = MagicMock( creation_time=datetime(2022, 4, 21, 12, 12, 30), @@ -395,12 +491,16 @@ async def test_format_file_empty_name(): name="", ) mock_file_properties.name.split.return_value = [""] - mock_file_client.get_file_properties.return_value = mock_file_properties mock_file_client.file_system_name = "my_file_system" - result = source.format_file(mock_file_client) - assert result["name"] == "" - assert result["_id"] == "my_file_system_" + with patch("asyncio.get_running_loop") as mock_loop: + mock_loop.return_value.run_in_executor = AsyncMock( + return_value=mock_file_properties + ) + + result = await source.format_file(mock_file_client) + assert result["name"] == "" + assert result["_id"] == "my_file_system_" @pytest.mark.asyncio @@ -410,22 +510,23 @@ async def test_download_file(): # Setup mock_file_client = Mock() mock_download = Mock() - mock_file_client.download_file.return_value = mock_download - mock_chunks = ["chunk1", "chunk2", "chunk3"] - mock_download.chunks.return_value = iter(mock_chunks) + async with create_abs_source() as source: + with patch("asyncio.get_running_loop") as mock_loop: + mock_loop.return_value.run_in_executor = AsyncMock( + return_value=mock_download + ) + mock_download.chunks.return_value = iter(mock_chunks) - async with create_onelake_source() as source: - # Run - chunks = [] - async for chunk in source.download_file(mock_file_client): - chunks.append(chunk) + # Run + chunks = [] + async for chunk in source.download_file(mock_file_client): + chunks.append(chunk) - # Check - assert chunks == mock_chunks - mock_file_client.download_file.assert_called_once() - mock_download.chunks.assert_called_once() + # Check + assert chunks == mock_chunks + mock_loop.return_value.run_in_executor.assert_called_once() @pytest.mark.asyncio @@ -434,35 +535,24 @@ async def test_download_file_with_error(): # Setup mock_file_client = Mock() - mock_download = Mock() - mock_file_client.download_file.return_value = mock_download - mock_download.chunks.side_effect = Exception("Download error") - async with create_onelake_source() as source: - # Run & Check - with pytest.raises(Exception, match="Download error"): - async for _ in source.download_file(mock_file_client): - pass + async with create_abs_source() as source: + with patch("asyncio.get_running_loop") as mock_loop: + mock_loop.return_value.run_in_executor = AsyncMock( + side_effect=Exception("Test error") + ) - mock_file_client.download_file.assert_called_once() - mock_download.chunks.assert_called_once() + # Run & Check + with pytest.raises(Exception, match="Test error"): + async for _ in source.download_file(mock_file_client): + pass @pytest.mark.asyncio async def test_get_content_with_download(): """Test get_content method when doit=True""" - mock_configuration = { - "account_name": "mockaccount", - "tenant_id": "mocktenant", - "client_id": "mockclient", - "client_secret": "mocksecret", - "workspace_name": "mockworkspace", - "data_path": "mockpath", - } - - async with create_onelake_source() as source: - source.configuration = mock_configuration + async with create_abs_source() as source: class FileClientMock: file_system_name = "mockfilesystem" @@ -476,12 +566,18 @@ def get_file_properties(self): return self.FileProperties(name="file1.txt", size=2000) with patch.object( - source, "_get_file_client", return_value=FileClientMock() + source, + "_get_file_client", + new_callable=AsyncMock, + return_value=FileClientMock(), ), patch.object( source, "can_file_be_downloaded", return_value=True + ), patch.object( + source, "get_file_extension", return_value="txt" ), patch.object( source, "download_and_extract_file", + new_callable=AsyncMock, return_value={ "_id": "mockfilesystem_file1.txt", "_attachment": "TW9jayBjb250ZW50", @@ -498,30 +594,9 @@ def get_file_properties(self): async def test_get_content_without_download(): """Test get_content method when doit=False""" - async with create_onelake_source() as source: - source.configuration = { - "account_name": "mockaccount", - "tenant_id": "mocktenant", - "client_id": "mockclient", - "client_secret": "mocksecret", - "workspace_name": "mockworkspace", - "data_path": "mockpath", - } - - class FileClientMock: - file_system_name = "mockfilesystem" - - class FileProperties: - def __init__(self, name, size): - self.name = name - self.size = size - - def get_file_properties(self): - return self.FileProperties(name="file1.txt", size=2000) - - with patch.object(source, "_get_file_client", return_value=FileClientMock()): - actual_response = await source.get_content("file1.txt", doit=False) - assert actual_response is None + async with create_abs_source() as source: + actual_response = await source.get_content("file1.txt", doit=False) + assert actual_response is None @pytest.mark.asyncio @@ -530,41 +605,31 @@ async def test_prepare_files(): # Setup doc_paths = [ - Mock( - name="doc1", - **{"name.split.return_value": ["folder", "doc1"], "path": "folder/doc1"}, - ), - Mock( - name="doc2", - **{"name.split.return_value": ["folder", "doc2"], "path": "folder/doc2"}, - ), + Mock(name="doc1.txt"), + Mock(name="doc2.txt"), ] - mock_field_client = Mock() - - async def mock_format_file(*args, **kwargs): - """Mock for the format_file method""" - return "file_document", "partial_function" + async with create_abs_source() as source: + mock_file_results = [ + {"name": "doc1.txt", "id": "1"}, + {"name": "doc2.txt", "id": "2"}, + ] - async with create_onelake_source() as source: with patch.object( - source, "_get_file_client", new_callable=AsyncMock - ) as mock_get_file_client: - mock_get_file_client.return_value = mock_field_client + source, "_process_items_concurrently", new_callable=AsyncMock + ) as mock_process: + mock_process.return_value = mock_file_results - with patch.object(source, "format_file", side_effect=mock_format_file): - result = [] - # Run - async for item in source.prepare_files(doc_paths): - result.append(await item) + result = [] + async for item in source.prepare_files(doc_paths): + result.append(item) - # Check results - assert result == [ - ("file_document", "partial_function"), - ("file_document", "partial_function"), - ] - - mock_get_file_client.assert_has_calls([call("doc1"), call("doc2")]) + # Check results + assert result == mock_file_results + # Check that _process_items_concurrently was called with the paths + mock_process.assert_called_once() + call_args = mock_process.call_args[0] + assert call_args[0] == doc_paths # First argument should be the paths @pytest.mark.asyncio @@ -582,7 +647,7 @@ async def mock_prepare_files_impl(paths): for doc in mock_file_docs: yield doc - async with create_onelake_source() as source: + async with create_abs_source() as source: with patch.object( source, "_get_directory_paths", new_callable=AsyncMock ) as mock_get_paths: @@ -604,3 +669,145 @@ async def mock_prepare_files_impl(paths): assert isinstance(get_content, partial) assert get_content.func == source.get_content assert get_content.args == (doc["name"],) + + +@pytest.mark.asyncio +async def test_process_items_concurrently(): + """Test _process_items_concurrently method""" + + async with create_abs_source() as source: + items = ["item1", "item2", "item3"] + + async def mock_process_item(item): + return f"processed_{item}" + + result = await source._process_items_concurrently(items, mock_process_item) + + expected = ["processed_item1", "processed_item2", "processed_item3"] + assert result == expected + + +@pytest.mark.asyncio +async def test_process_items_concurrently_with_custom_concurrency(): + """Test _process_items_concurrently method with custom max_concurrency""" + + async with create_abs_source() as source: + items = ["item1", "item2"] + + async def mock_process_item(item): + return f"processed_{item}" + + result = await source._process_items_concurrently( + items, mock_process_item, max_concurrency=1 + ) + + expected = ["processed_item1", "processed_item2"] + assert result == expected + + +@pytest.mark.asyncio +async def test_get_files_properties(): + """Test get_files_properties method""" + + async with create_abs_source() as source: + mock_file_client1 = Mock() + mock_file_client2 = Mock() + mock_properties1 = Mock() + mock_properties2 = Mock() + + mock_file_client1.get_file_properties.return_value = mock_properties1 + mock_file_client2.get_file_properties.return_value = mock_properties2 + + file_clients = [mock_file_client1, mock_file_client2] + + with patch.object( + source, "_process_items_concurrently", new_callable=AsyncMock + ) as mock_process: + mock_process.return_value = [mock_properties1, mock_properties2] + + result = await source.get_files_properties(file_clients) + + assert result == [mock_properties1, mock_properties2] + mock_process.assert_called_once() + # Check that the first argument is the file_clients list + call_args = mock_process.call_args[0] + assert call_args[0] == file_clients + + +@pytest.mark.asyncio +async def test_get_content_file_cannot_be_downloaded(): + """Test get_content method when file cannot be downloaded""" + + async with create_abs_source() as source: + + class FileClientMock: + file_system_name = "mockfilesystem" + + class FileProperties: + def __init__(self, name, size): + self.name = name + self.size = size + + def get_file_properties(self): + return self.FileProperties( + name="large_file.exe", size=200000000 + ) # Very large file + + with patch.object( + source, + "_get_file_client", + new_callable=AsyncMock, + return_value=FileClientMock(), + ), patch.object( + source, "can_file_be_downloaded", return_value=False # Cannot download + ), patch.object( + source, "get_file_extension", return_value="exe" + ): + result = await source.get_content("large_file.exe", doit=True) + + # Should return only the basic doc without content + expected = { + "_id": "mockfilesystem_large_file.exe", + } + assert result == expected + + +@pytest.mark.asyncio +async def test_get_content_extracted_doc_is_none(): + """Test get_content method when download_and_extract_file returns None""" + + async with create_abs_source() as source: + + class FileClientMock: + file_system_name = "mockfilesystem" + + class FileProperties: + def __init__(self, name, size): + self.name = name + self.size = size + + def get_file_properties(self): + return self.FileProperties(name="file.txt", size=1000) + + with patch.object( + source, + "_get_file_client", + new_callable=AsyncMock, + return_value=FileClientMock(), + ), patch.object( + source, "can_file_be_downloaded", return_value=True + ), patch.object( + source, "get_file_extension", return_value="txt" + ), patch.object( + source, + "download_and_extract_file", + new_callable=AsyncMock, + return_value=None, + ): + result = await source.get_content("file.txt", doit=True) + + # Should return the basic doc when extracted_doc is None + expected = { + "_id": "mockfilesystem_file.txt", + } + assert result == expected From 38970b31136baecde71b08d55fa350eb4bf9a399 Mon Sep 17 00:00:00 2001 From: Delacrobix Date: Sun, 6 Jul 2025 13:53:23 -0500 Subject: [PATCH 22/22] name of create_source method changed in onelake test --- tests/sources/test_onelake.py | 64 +++++++++++++++++------------------ 1 file changed, 32 insertions(+), 32 deletions(-) diff --git a/tests/sources/test_onelake.py b/tests/sources/test_onelake.py index 34e4cb6df..6215b40e5 100644 --- a/tests/sources/test_onelake.py +++ b/tests/sources/test_onelake.py @@ -10,7 +10,7 @@ @asynccontextmanager -async def create_abs_source( +async def create_onelake_source( use_text_extraction_service=False, ): async with create_source( @@ -30,7 +30,7 @@ async def create_abs_source( async def test_init(): """Test OneLakeDataSource initialization""" - async with create_abs_source() as source: + async with create_onelake_source() as source: # Check that all configuration values are set correctly assert source.tenant_id == source.configuration["tenant_id"] assert source.client_id == source.configuration["client_id"] @@ -79,7 +79,7 @@ def test_get_default_configuration(): async def test_initialize(): """Test initialize method""" - async with create_abs_source() as source: + async with create_onelake_source() as source: mock_service_client = Mock() mock_file_system_client = Mock() mock_directory_client = Mock() @@ -126,7 +126,7 @@ async def test_ping_for_successful_connection(): """Test ping method of OneLakeDataSource class""" # Setup - async with create_abs_source() as source: + async with create_onelake_source() as source: with patch.object( source, "_get_directory_paths", new_callable=AsyncMock ) as mock_get_paths: @@ -142,7 +142,7 @@ async def test_ping_for_failed_connection(): """Test ping method of OneLakeDataSource class with negative case""" # Setup - async with create_abs_source() as source: + async with create_onelake_source() as source: with patch.object( source, "_get_directory_paths", new_callable=AsyncMock ) as mock_get_paths: @@ -160,7 +160,7 @@ async def test_get_token_credentials(): """Test _get_token_credentials method of OneLakeDataSource class""" # Setup - async with create_abs_source() as source: + async with create_onelake_source() as source: tenant_id = source.configuration["tenant_id"] client_id = source.configuration["client_id"] client_secret = source.configuration["client_secret"] @@ -182,7 +182,7 @@ async def test_get_token_credentials(): async def test_get_token_credentials_error(): """Test _get_token_credentials method when credential creation fails""" - async with create_abs_source() as source: + async with create_onelake_source() as source: with patch( "connectors.sources.onelake.ClientSecretCredential", autospec=True ) as mock_credential: @@ -197,7 +197,7 @@ async def test_get_service_client(): """Test _get_service_client method of OneLakeDataSource class""" # Setup - async with create_abs_source() as source: + async with create_onelake_source() as source: mock_service_client = Mock() mock_credentials = Mock() @@ -224,7 +224,7 @@ async def test_get_service_client(): async def test_get_service_client_error(): """Test _get_service_client method when client creation fails""" - async with create_abs_source() as source: + async with create_onelake_source() as source: with patch( "connectors.sources.onelake.DataLakeServiceClient", side_effect=Exception("Service client error"), @@ -238,7 +238,7 @@ async def test_get_file_system_client(): """Test _get_file_system_client method of OneLakeDataSource class""" # Setup - async with create_abs_source() as source: + async with create_onelake_source() as source: mock_file_system_client = Mock() workspace_name = source.configuration["workspace_name"] @@ -263,7 +263,7 @@ async def test_get_file_system_client(): async def test_get_file_system_client_error(): """Test _get_file_system_client method when client creation fails""" - async with create_abs_source() as source: + async with create_onelake_source() as source: mock_service_client = Mock() mock_service_client.get_file_system_client.side_effect = Exception("Test error") source.service_client = mock_service_client @@ -277,7 +277,7 @@ async def test_get_directory_client(): """Test _get_directory_client method of OneLakeDataSource class""" # Setup - async with create_abs_source() as source: + async with create_onelake_source() as source: mock_directory_client = Mock() data_path = source.configuration["data_path"] @@ -300,7 +300,7 @@ async def test_get_directory_client(): async def test_get_directory_client_error(): """Test _get_directory_client method when client creation fails""" - async with create_abs_source() as source: + async with create_onelake_source() as source: mock_file_system_client = Mock() mock_file_system_client.get_directory_client.side_effect = Exception( "Test error" @@ -319,7 +319,7 @@ async def test_get_file_client_success(): mock_directory_client = Mock() mock_directory_client.get_file_client.return_value = mock_file_client - async with create_abs_source() as source: + async with create_onelake_source() as source: # Mock the directory_client directly since that's what _get_file_client uses source.directory_client = mock_directory_client @@ -333,7 +333,7 @@ async def test_get_file_client_success(): async def test_get_file_client_error(): """Test file client retrieval with error""" - async with create_abs_source() as source: + async with create_onelake_source() as source: mock_directory_client = Mock() mock_directory_client.get_file_client.side_effect = Exception( "Error while getting file client" @@ -349,7 +349,7 @@ async def test_get_directory_paths(): """Test _get_directory_paths method of OneLakeDataSource class""" # Setup - async with create_abs_source() as source: + async with create_onelake_source() as source: mock_paths = ["path1", "path2"] directory_path = "mock_directory_path" @@ -375,7 +375,7 @@ async def test_get_directory_paths(): async def test_get_directory_paths_with_initialize(): """Test _get_directory_paths method when file_system_client is None""" - async with create_abs_source() as source: + async with create_onelake_source() as source: mock_paths = ["path1", "path2"] directory_path = "mock_directory_path" @@ -407,7 +407,7 @@ async def mock_initialize(): async def test_get_directory_paths_error(): """Test _get_directory_paths method when getting paths fails""" - async with create_abs_source() as source: + async with create_onelake_source() as source: directory_path = "mock_directory_path" # Set up the file_system_client so initialize() is not called @@ -428,7 +428,7 @@ async def test_format_file(): """Test format_file method of OneLakeDataSource class""" # Setup - async with create_abs_source() as source: + async with create_onelake_source() as source: mock_file_client = MagicMock() mock_file_properties = MagicMock( creation_time=datetime(2022, 4, 21, 12, 12, 30), @@ -465,7 +465,7 @@ async def test_format_file(): async def test_format_file_error(): """Test format_file method when getting properties fails""" - async with create_abs_source() as source: + async with create_onelake_source() as source: mock_file_client = MagicMock() mock_file_client.file_system_name = "my_file_system" @@ -482,7 +482,7 @@ async def test_format_file_error(): async def test_format_file_empty_name(): """Test format_file method with empty file name""" - async with create_abs_source() as source: + async with create_onelake_source() as source: mock_file_client = MagicMock() mock_file_properties = MagicMock( creation_time=datetime(2022, 4, 21, 12, 12, 30), @@ -512,7 +512,7 @@ async def test_download_file(): mock_download = Mock() mock_chunks = ["chunk1", "chunk2", "chunk3"] - async with create_abs_source() as source: + async with create_onelake_source() as source: with patch("asyncio.get_running_loop") as mock_loop: mock_loop.return_value.run_in_executor = AsyncMock( return_value=mock_download @@ -536,7 +536,7 @@ async def test_download_file_with_error(): # Setup mock_file_client = Mock() - async with create_abs_source() as source: + async with create_onelake_source() as source: with patch("asyncio.get_running_loop") as mock_loop: mock_loop.return_value.run_in_executor = AsyncMock( side_effect=Exception("Test error") @@ -552,7 +552,7 @@ async def test_download_file_with_error(): async def test_get_content_with_download(): """Test get_content method when doit=True""" - async with create_abs_source() as source: + async with create_onelake_source() as source: class FileClientMock: file_system_name = "mockfilesystem" @@ -594,7 +594,7 @@ def get_file_properties(self): async def test_get_content_without_download(): """Test get_content method when doit=False""" - async with create_abs_source() as source: + async with create_onelake_source() as source: actual_response = await source.get_content("file1.txt", doit=False) assert actual_response is None @@ -609,7 +609,7 @@ async def test_prepare_files(): Mock(name="doc2.txt"), ] - async with create_abs_source() as source: + async with create_onelake_source() as source: mock_file_results = [ {"name": "doc1.txt", "id": "1"}, {"name": "doc2.txt", "id": "2"}, @@ -647,7 +647,7 @@ async def mock_prepare_files_impl(paths): for doc in mock_file_docs: yield doc - async with create_abs_source() as source: + async with create_onelake_source() as source: with patch.object( source, "_get_directory_paths", new_callable=AsyncMock ) as mock_get_paths: @@ -675,7 +675,7 @@ async def mock_prepare_files_impl(paths): async def test_process_items_concurrently(): """Test _process_items_concurrently method""" - async with create_abs_source() as source: + async with create_onelake_source() as source: items = ["item1", "item2", "item3"] async def mock_process_item(item): @@ -691,7 +691,7 @@ async def mock_process_item(item): async def test_process_items_concurrently_with_custom_concurrency(): """Test _process_items_concurrently method with custom max_concurrency""" - async with create_abs_source() as source: + async with create_onelake_source() as source: items = ["item1", "item2"] async def mock_process_item(item): @@ -709,7 +709,7 @@ async def mock_process_item(item): async def test_get_files_properties(): """Test get_files_properties method""" - async with create_abs_source() as source: + async with create_onelake_source() as source: mock_file_client1 = Mock() mock_file_client2 = Mock() mock_properties1 = Mock() @@ -738,7 +738,7 @@ async def test_get_files_properties(): async def test_get_content_file_cannot_be_downloaded(): """Test get_content method when file cannot be downloaded""" - async with create_abs_source() as source: + async with create_onelake_source() as source: class FileClientMock: file_system_name = "mockfilesystem" @@ -776,7 +776,7 @@ def get_file_properties(self): async def test_get_content_extracted_doc_is_none(): """Test get_content method when download_and_extract_file returns None""" - async with create_abs_source() as source: + async with create_onelake_source() as source: class FileClientMock: file_system_name = "mockfilesystem"