7
7
8
8
import asyncio
9
9
import os
10
+ from abc import ABC , abstractmethod
10
11
from copy import copy
11
12
from datetime import date
12
13
from functools import cached_property , partial
14
+ from typing import List
13
15
14
16
import aiofiles
15
17
import aiohttp
@@ -348,13 +350,13 @@ async def get_user_accounts(self):
348
350
yield user_account
349
351
350
352
351
- class Office365Users :
352
- """Fetch users from Office365 Active Directory """
353
+ class BaseOffice365User ( ABC ) :
354
+ """Abstract base class for Office 365 user management """
353
355
354
356
def __init__ (self , client_id , client_secret , tenant_id ):
355
- self .tenant_id = tenant_id
356
357
self .client_id = client_id
357
358
self .client_secret = client_secret
359
+ self .tenant_id = tenant_id
358
360
359
361
@cached_property
360
362
def _get_session (self ):
@@ -403,6 +405,21 @@ async def _fetch_token(self):
403
405
except Exception as exception :
404
406
self ._check_errors (response = exception )
405
407
408
+ @abstractmethod
409
+ async def get_users (self ):
410
+ pass
411
+
412
+ @abstractmethod
413
+ async def get_user_accounts (self ):
414
+ pass
415
+
416
+
417
+ class Office365Users (BaseOffice365User ):
418
+ """Fetch users from Office365 Active Directory"""
419
+
420
+ def __init__ (self , client_id , client_secret , tenant_id ):
421
+ super ().__init__ (client_id , client_secret , tenant_id )
422
+
406
423
@retryable (
407
424
retries = RETRIES ,
408
425
interval = RETRY_INTERVAL ,
@@ -456,6 +473,57 @@ async def get_user_accounts(self):
456
473
yield user_account
457
474
458
475
476
+ class MultiOffice365Users (BaseOffice365User ):
477
+ """Fetch multiple Office365 users based on a list of email addresses."""
478
+
479
+ def __init__ (self , client_id , client_secret , tenant_id , client_emails : List [str ]):
480
+ super ().__init__ (client_id , client_secret , tenant_id )
481
+ self .client_emails = client_emails
482
+
483
+ async def get_users (self ):
484
+ access_token = await self ._fetch_token ()
485
+ for email in self .client_emails :
486
+ url = f"https://graph.microsoft.com/v1.0/users/{ email } "
487
+ try :
488
+ async with self ._get_session .get (
489
+ url = url ,
490
+ headers = {
491
+ "Authorization" : f"Bearer { access_token } " ,
492
+ "Content-Type" : "application/json" ,
493
+ },
494
+ ) as response :
495
+ json_response = await response .json ()
496
+ yield json_response
497
+ except Exception :
498
+ raise
499
+
500
+ async def get_user_accounts (self ):
501
+ async for user in self .get_users ():
502
+ mail = user .get ("mail" )
503
+ if mail is None :
504
+ continue
505
+
506
+ credentials = OAuth2Credentials (
507
+ client_id = self .client_id ,
508
+ tenant_id = self .tenant_id ,
509
+ client_secret = self .client_secret ,
510
+ identity = Identity (primary_smtp_address = mail ),
511
+ )
512
+ configuration = Configuration (
513
+ credentials = credentials ,
514
+ auth_type = OAUTH2 ,
515
+ service_endpoint = EWS_ENDPOINT ,
516
+ retry_policy = FaultTolerance (max_wait = 120 ),
517
+ )
518
+ user_account = Account (
519
+ primary_smtp_address = mail ,
520
+ config = configuration ,
521
+ autodiscover = False ,
522
+ access_type = IMPERSONATION ,
523
+ )
524
+ yield user_account
525
+
526
+
459
527
class OutlookDocFormatter :
460
528
"""Format Outlook object documents to Elasticsearch document"""
461
529
@@ -583,6 +651,27 @@ def attachment_doc_formatter(self, attachment, attachment_type, timezone):
583
651
}
584
652
585
653
654
+ class UserFactory :
655
+ """Factory class for creating Office365 user instances"""
656
+
657
+ @staticmethod
658
+ def create_user (configuration : dict ) -> BaseOffice365User :
659
+ if configuration .get ("client_emails" ):
660
+ client_emails = [email .strip () for email in configuration ["client_emails" ].split ("," )]
661
+ return MultiOffice365Users (
662
+ client_id = configuration ["client_id" ],
663
+ client_secret = configuration ["client_secret" ],
664
+ tenant_id = configuration ["tenant_id" ],
665
+ client_emails = client_emails
666
+ )
667
+ else :
668
+ return Office365Users (
669
+ client_id = configuration ["client_id" ],
670
+ client_secret = configuration ["client_secret" ],
671
+ tenant_id = configuration ["tenant_id" ]
672
+ )
673
+
674
+
586
675
class OutlookClient :
587
676
"""Outlook client to handle API calls made to Outlook"""
588
677
@@ -605,11 +694,7 @@ def set_logger(self, logger_):
605
694
@cached_property
606
695
def _get_user_instance (self ):
607
696
if self .is_cloud :
608
- return Office365Users (
609
- client_id = self .configuration ["client_id" ],
610
- client_secret = self .configuration ["client_secret" ],
611
- tenant_id = self .configuration ["tenant_id" ],
612
- )
697
+ return UserFactory .create_user (self .configuration )
613
698
614
699
return ExchangeUsers (
615
700
ad_server = self .configuration ["active_directory_server" ],
@@ -666,9 +751,12 @@ async def get_tasks(self, account):
666
751
yield task
667
752
668
753
async def get_contacts (self , account ):
669
- folder = account .root / "Top of Information Store" / "Contacts"
670
- for contact in await asyncio .to_thread (folder .all ().only , * CONTACT_FIELDS ):
671
- yield contact
754
+ try :
755
+ folder = account .root / "Top of Information Store" / "Contacts"
756
+ for contact in await asyncio .to_thread (folder .all ().only , * CONTACT_FIELDS ):
757
+ yield contact
758
+ except Exception :
759
+ raise
672
760
673
761
674
762
class OutlookDataSource (BaseDataSource ):
@@ -735,6 +823,13 @@ def get_default_configuration(cls):
735
823
"sensitive" : True ,
736
824
"type" : "str" ,
737
825
},
826
+ "client_emails" : {
827
+ "depends_on" : [{"field" : "data_source" , "value" : OUTLOOK_CLOUD }],
828
+ "label" : "Client Email Addresses (comma-separated)" ,
829
+ "order" : 5 ,
830
+ "required" : False ,
831
+ "type" : "str" ,
832
+ },
738
833
"exchange_server" : {
739
834
"depends_on" : [{"field" : "data_source" , "value" : OUTLOOK_SERVER }],
740
835
"label" : "Exchange Server" ,
@@ -1072,9 +1167,11 @@ async def get_docs(self, filtering=None):
1072
1167
dictionary: dictionary containing meta-data of the files.
1073
1168
"""
1074
1169
async for account in self .client ._get_user_instance .get_user_accounts ():
1170
+ self ._logger .debug (f"Processing account: { account } " )
1075
1171
timezone = account .default_timezone or DEFAULT_TIMEZONE
1076
1172
1077
1173
async for mail in self ._fetch_mails (account = account , timezone = timezone ):
1174
+ self ._logger .debug (f"Fetched mail: { mail } " )
1078
1175
yield mail
1079
1176
1080
1177
async for contact in self ._fetch_contacts (
0 commit comments