8
8
9
9
import requests_mock
10
10
11
+ # MockUp Testdata
12
+
13
+ CAPABILITIES = ['/capabilities' , '/capabilities/services' , '/capabilities/output_formats' , '/data' ,
14
+ '/data/{product_id}' , '/processes' ]
15
+ COLLECTIONS = [{'product_id' : 'ASTER/AST_L1T_003' ,
16
+ 'description' : 'ASTER L1T Radiance' ,
17
+ 'source' : 'NASA LP DAAC at the USGS EROS Center, https://lpdaac.usgs.gov/dataset_discovery/aster/aster_products_table/ast_l1t' },
18
+ {'product_id' : 'AU/GA/AUSTRALIA_5M_DEM' ,
19
+ 'description' : 'Australian 5M DEM' ,
20
+ 'source' : 'Geoscience Australia, https://ecat.ga.gov.au/geonetwork/srv/eng/search#!22be4b55-2465-4320-e053-10a3070a5236' }]
21
+
22
+ PROCESSES = [{'process_id' : 'zonal_statistics' ,
23
+ 'description' : 'Calculates statistics for each zone specified in a file.' },
24
+ {'process_id' : 'NDVI' ,
25
+ 'description' : 'Finds the minimum value of time series for all bands of the input dataset.' },
26
+ {'process_id' : 'filter_bands' ,
27
+ 'description' : 'Selects certain bands from a collection.' }]
28
+
11
29
12
30
@requests_mock .mock ()
13
31
class TestUserFiles (TestCase ):
@@ -30,7 +48,7 @@ def match_uploaded_file(self, request):
30
48
return True
31
49
32
50
def test_user_upload_file (self , m ):
33
- upload_url = "http://localhost:8000/api/ users/{}/files/{}" .format (self .user_id ,
51
+ upload_url = "{}/ users/{}/files/{}" .format (self . endpoint , self .user_id ,
34
52
self .upload_remote_fname )
35
53
m .register_uri ('PUT' , upload_url ,
36
54
additional_matcher = self .match_uploaded_file )
@@ -41,7 +59,7 @@ def test_user_upload_file(self, m):
41
59
assert status
42
60
43
61
def test_user_download_file (self , m ):
44
- download_url = "http://localhost:8000/api/ users/{}/files/{}" .format (self .user_id ,
62
+ download_url = "{}/ users/{}/files/{}" .format (self . endpoint , self .user_id ,
45
63
self .upload_remote_fname )
46
64
with open (self .upload_local_fname , 'rb' ) as response_file :
47
65
content = response_file .read ()
@@ -57,10 +75,75 @@ def test_user_download_file(self, m):
57
75
assert content == downloaded_content
58
76
59
77
def test_user_delete_file (self , m ):
60
- delete_url = "http://localhost:8000/api/ users/{}/files/{}" .format (self .user_id ,
78
+ delete_url = "{}/ users/{}/files/{}" .format (self . endpoint , self .user_id ,
61
79
self .upload_remote_fname )
62
80
m .register_uri ('DELETE' , delete_url )
63
81
session = openeo .session (self .user_id , endpoint = self .endpoint )
64
82
session .auth (self .auth_id , self .auth_pwd )
65
83
status = session .user_delete_file (self .upload_remote_fname )
66
84
assert status
85
+
86
+ def test_list_capabilities (self , m ):
87
+ capabilties_url = "{}/capabilities" .format (self .endpoint )
88
+ m .register_uri ('GET' , capabilties_url , json = CAPABILITIES )
89
+ session = openeo .session (self .user_id , endpoint = self .endpoint )
90
+
91
+ capabilities = session .list_capabilities ()
92
+ assert capabilities == CAPABILITIES
93
+
94
+ def test_list_collections (self , m ):
95
+ collection_url = "{}/data" .format (self .endpoint )
96
+ m .register_uri ('GET' , collection_url , json = COLLECTIONS )
97
+ session = openeo .session (self .user_id , endpoint = self .endpoint )
98
+
99
+ collections = session .list_collections ()
100
+ assert collections == COLLECTIONS
101
+
102
+ def test_get_collection (self , m ):
103
+ collection_org = COLLECTIONS [0 ]
104
+ collection_id = collection_org ["product_id" ]
105
+ collection_url = "{}/data/{}" .format (self .endpoint , collection_id )
106
+ m .register_uri ('GET' , collection_url , json = collection_org )
107
+ session = openeo .session (self .user_id , endpoint = self .endpoint )
108
+
109
+ collection = session .get_collection (collection_id )
110
+ assert collection == collection_org
111
+
112
+ def test_get_all_processes (self , m ):
113
+ processes_url = "{}/processes" .format (self .endpoint )
114
+ m .register_uri ('GET' , processes_url , json = PROCESSES )
115
+ session = openeo .session (self .user_id , endpoint = self .endpoint )
116
+
117
+ processes = session .get_all_processes ()
118
+ assert processes == PROCESSES
119
+
120
+ def test_get_process (self , m ):
121
+ process_org = PROCESSES [0 ]
122
+ process_id = process_org ['process_id' ]
123
+ process_url = "{}/processes/{}" .format (self .endpoint , process_id )
124
+ m .register_uri ('GET' , process_url , json = process_org )
125
+ session = openeo .session (self .user_id , endpoint = self .endpoint )
126
+
127
+ process = session .get_process (process_id )
128
+ assert process == process_org
129
+
130
+ def test_create_job (self , m ):
131
+
132
+ # TODO: Add Test to check if post_data is sent properly
133
+ post_data = {}
134
+ job_id = "MyId"
135
+ result = {"job_id" : job_id }
136
+
137
+ m .register_uri ('POST' , "{}/jobs?evaluate={}" .format (self .endpoint , "lazy" ), status_code = 200 , json = result )
138
+ m .register_uri ('POST' , "{}/jobs?evaluate={}" .format (self .endpoint , "wrong" ), status_code = 400 )
139
+
140
+ session = openeo .session (self .user_id , endpoint = self .endpoint )
141
+
142
+ resp = session .create_job (post_data )
143
+
144
+ assert resp == job_id
145
+
146
+ resp = session .create_job (post_data , evaluation = "wrong" )
147
+
148
+ assert resp is None
149
+
0 commit comments