Skip to content

Commit c99d698

Browse files
authored
Merge pull request #149 from GIScience/feat/import-multipass
feat: import using multiple passes
2 parents a6baad3 + adaab74 commit c99d698

File tree

9 files changed

+319
-45
lines changed

9 files changed

+319
-45
lines changed

.github/workflows/run_tests.yml

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -10,8 +10,12 @@ jobs:
1010
run_tests:
1111
runs-on: ubuntu-22.04
1212
steps:
13+
- name: Set up Python
14+
uses: actions/setup-python@v4
15+
with:
16+
python-version: 3.11
1317
- name: Check out source
14-
uses: actions/checkout@v2
18+
uses: actions/checkout@v4
1519
with:
1620
fetch-depth: 0
1721
- name: Install postgis
@@ -21,9 +25,9 @@ jobs:
2125
postgresql user: 'gis_admin'
2226
postgresql password: 'admin'
2327
- name: Install packages
24-
run: sudo apt-get update && sudo apt-get install -y build-essential protobuf-compiler libprotobuf-dev python3 #proj-bin libproj-dev
28+
run: sudo apt-get update && sudo apt-get install -y build-essential protobuf-compiler libprotobuf-dev
2529
- name: Activate cache for python packages
26-
uses: actions/cache@v2
30+
uses: actions/cache@v4
2731
with:
2832
path: ~/.cache/pip
2933
key: ${{ runner.os }}-pip-${{ hashFiles('**/requirements.txt') }}

Dockerfile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
FROM python:3.9-slim
1+
FROM python:3.11-slim
22
LABEL org.opencontainers.image.authors="Timothy Ellersiek <timothy@openrouteservice.org>"
33

44
# protobuf is required to parse osm files.

docker-compose-standalone.yml

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,3 @@
1-
version: '2.2'
21
services:
32
api:
43
container_name: ops-api

docker-compose.yml

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,3 @@
1-
version: "2.2"
2-
31
volumes:
42
postgis-data:
53

@@ -67,8 +65,6 @@ services:
6765
mem_limit: 28g
6866
networks:
6967
- poi_network
70-
profiles:
71-
- init
7268

7369
update:
7470
container_name: ops-update
@@ -86,8 +82,6 @@ services:
8682
mem_limit: 28g
8783
networks:
8884
- poi_network
89-
profiles:
90-
- update
9185

9286
test:
9387
container_name: ops-test

manage.py

Lines changed: 14 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@
1212

1313
logging.basicConfig(
1414
level=logging.DEBUG if os.environ.get('OPS_DEBUG', False) else logging.INFO,
15-
format='%(levelname)-8s %(message)s',
15+
format='%(asctime)s %(levelname)-8s %(message)s',
1616
)
1717
logger = logging.getLogger(__name__)
1818

@@ -68,7 +68,7 @@ def import_data():
6868
continue
6969
logger.info(f"Found directory: {dir_name}")
7070
for filename in file_list:
71-
if filename.endswith(".osm.pbf") or filename.endswith(".osm"):
71+
if filename.endswith(".pbf") or filename.endswith(".osm"):
7272
osm_files.append(os.path.join(dir_name, filename))
7373
osm_files.sort()
7474

@@ -82,18 +82,22 @@ def import_data():
8282
finally:
8383
f.close()
8484

85-
# we have found previous data in the database, check if file list has changed which would require a full rebuild
86-
if len(import_log) and set(import_log.keys()) != set(osm_files):
87-
logger.error(f"File set has changed since last import, full rebuild required. Exiting.")
88-
return
85+
# check if file list has changed which would require a full rebuild by deleting the import log
86+
if len(import_log):
87+
if set(import_log.keys()) != set(osm_files):
88+
logger.error(f"File set has changed since last import, full rebuild required. Exiting.")
89+
return
90+
else:
91+
import_log = {key: 0 for key in osm_files}
92+
with open(logfile, "w") as f:
93+
json.dump(import_log, f, indent=4, sort_keys=True)
94+
f.close()
8995

9096
logger.info(f"Starting to import OSM data ({len(osm_files)} files in batch)")
9197
logger.debug(f"Files in import batch: {osm_files}")
92-
parser.run_import(osm_files, import_log, db)
98+
# parser.run_import(osm_files, import_log, db)
99+
parser.run_import_new(osm_files, import_log, logfile, db)
93100

94-
with open(logfile, "w") as f:
95-
json.dump(import_log, f, indent=4, sort_keys=True)
96-
f.close()
97101

98102

99103
if __name__ == '__main__':
Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,68 @@
1+
import numpy as np
2+
from cykhash import Int64toInt64Map
3+
4+
class NodeStore:
5+
def __init__(self, initial_capacity=1024):
6+
self.size = 0
7+
self.capacity = initial_capacity
8+
self.index_map = Int64toInt64Map()
9+
self.data = np.empty((self.capacity, 2), dtype=np.float64)
10+
11+
def _resize(self):
12+
# Double the capacity
13+
self.capacity *= 2
14+
new_data = np.empty((self.capacity, 2), dtype=np.float64)
15+
new_data[:self.size] = self.data # Copy old data
16+
self.data = new_data
17+
18+
def append(self, key, value_tuple):
19+
"""
20+
Insert a new key-value pair into the store.
21+
22+
Args:
23+
key (int): The key to insert.
24+
value_tuple (tuple of floats): The associated value tuple.
25+
"""
26+
if key in self.index_map:
27+
return self.set(key, value_tuple)
28+
29+
self.index_map[key] = self.size
30+
if self.size >= self.capacity:
31+
self._resize()
32+
self.data[self.size] = value_tuple
33+
self.size += 1
34+
35+
def set(self, key, value_tuple):
36+
"""
37+
Set the value of the key in the store.
38+
39+
Args:
40+
key (int): The key to modify.
41+
value_tuple (tuple of floats): The new value tuple.
42+
"""
43+
if not key in self.index_map:
44+
raise KeyError(f"Key does not exist.")
45+
idx = self.index_map.get(key)
46+
self.data[idx] = value_tuple
47+
48+
def compact(self):
49+
"""
50+
Compact the internal storage NumPy array for memory efficiency.
51+
"""
52+
self.data = self.data[:self.size]
53+
self.capacity = self.size
54+
55+
def get(self, key):
56+
"""Retrieve the value tuple for a given key."""
57+
idx = self.index_map.get(key)
58+
if idx is not None:
59+
return self.data[idx]
60+
return None
61+
62+
def __contains__(self, key):
63+
"""Check if a key exists in the store."""
64+
return key in self.index_map
65+
66+
def __len__(self):
67+
"""Return the number of stored keys."""
68+
return len(self.index_map)

openpoiservice/server/db_import/parse_osm.py

Lines changed: 116 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,22 @@
11
# openpoiservice/server/parse_osm.py
2-
from openpoiservice.server import db
3-
from openpoiservice.server import categories_tools, ops_settings
4-
from openpoiservice.server.db_import.models import POIs, Tags, Categories
5-
from openpoiservice.server.db_import.objects import PoiObject, TagsObject
2+
63
import logging
74
from bisect import bisect_left
85
from collections import deque
96

7+
import numpy as np
8+
from cykhash.khashsets import Int64Set
9+
10+
from openpoiservice.server import categories_tools, ops_settings
11+
from openpoiservice.server import db
12+
from openpoiservice.server.db_import.models import POIs, Tags, Categories
13+
from openpoiservice.server.db_import.node_store import NodeStore
14+
from openpoiservice.server.db_import.objects import PoiObject, TagsObject
15+
1016
logger = logging.getLogger(__name__)
1117

1218

19+
# deprecated, remove later
1320
class WayObject(object):
1421
""" Class that creates a way object. """
1522

@@ -60,20 +67,22 @@ def __init__(self, osm_file_index, update_mode=False):
6067
self.tags_cnt = 0
6168
self.categories_cnt = 0
6269
self.relation_ways = {}
63-
self.nodes = {}
64-
self.process_ways = []
70+
self.nodes_store = NodeStore()
71+
self.process_ways_set = Int64Set()
6572
self.poi_objects = []
6673
self.tags_objects = []
6774
self.categories_objects = []
68-
self.ways_temp = []
69-
self.ways_obj = None
70-
self.tags_object = None
71-
self.poi_object = None
72-
self.process_ways_length = None
7375
self.update_mode = update_mode
7476
self.osm_file_index = osm_file_index
7577
self.failed = False
7678

79+
# deprecated, remove later
80+
self.nodes = {}
81+
self.process_ways = []
82+
self.ways_obj = None
83+
self.ways_temp = []
84+
85+
7786
def parse_nodes(self, osm_nodes):
7887
"""
7988
Callback function called by imposm while nodes are parsed.
@@ -89,6 +98,7 @@ def parse_nodes(self, osm_nodes):
8998
self.failed = True
9099
return
91100

101+
92102
def parse_relations(self, relations):
93103
"""
94104
Callback function called by imposm while relations are parsed. The idea is to extract polygons which may
@@ -116,6 +126,8 @@ def parse_relations(self, relations):
116126
self.relation_ways[osmid_rel_member].update({"relation_id": osmid})
117127
self.relations_cnt += 1
118128

129+
130+
# deprecated, remove later
119131
def parse_ways(self, ways):
120132
"""
121133
Callback function called by imposm while ways are parsed. If a category can't be found it may likely
@@ -149,6 +161,8 @@ def parse_ways(self, ways):
149161
self.ways_obj = WayObject(osmid, osm_type, tags, refs, categories, len(refs))
150162
self.process_ways.append(self.ways_obj)
151163

164+
165+
# deprecated, remove later
152166
def parse_coords_for_ways(self, coords):
153167
"""
154168
Callback function called by imposm while coordinates are parsed. Due due ordering we can use coords
@@ -225,6 +239,93 @@ def parse_coords_for_ways(self, coords):
225239

226240
self.ways_temp = []
227241

242+
243+
def parse_ways_first(self, ways):
244+
"""
245+
Callback function called by imposm while ways are parsed. If a category can't be found it may likely
246+
be that the osmid of this way can be found in self.relation_ways which will contain additional tags
247+
and therefore eventually a category. A way object is added to a list process_ways which at this point
248+
is lacking coordinates -> next step.
249+
250+
:param ways: osm way objects
251+
:type ways: list of osm ways
252+
"""
253+
for osmid, tags, refs in ways:
254+
if len(refs) >= 1000:
255+
continue
256+
257+
categories = categories_tools.get_category(tags)
258+
259+
if len(categories) == 0 and osmid in self.relation_ways:
260+
tags = self.relation_ways[osmid]
261+
categories = categories_tools.get_category(tags)
262+
263+
if len(categories) > 0:
264+
self.ways_cnt += 1
265+
for ref in refs:
266+
self.nodes_store.append(ref, (None, None))
267+
self.process_ways_set.add(osmid)
268+
269+
270+
def parse_coords_and_store(self, coords):
271+
for osmid, lat, lng in coords:
272+
if osmid in self.nodes_store:
273+
self.nodes_store.set(osmid, (lat, lng))
274+
275+
276+
def parse_ways_second(self, ways):
277+
"""
278+
Callback function called by imposm while ways are parsed. If a category can't be found it may likely
279+
be that the osmid of this way can be found in self.relation_ways which will contain additional tags
280+
and therefore eventually a category. A way object is added to a list process_ways which at this point
281+
is lacking coordinates -> next step.
282+
283+
:param ways: osm way objects
284+
:type ways: list of osm ways
285+
"""
286+
for osmid, tags, refs in ways:
287+
if osmid not in self.process_ways_set:
288+
continue
289+
categories = categories_tools.get_category(tags)
290+
# from way
291+
osm_type = 2
292+
293+
if len(categories) == 0 and osmid in self.relation_ways:
294+
# current way is the outer ring of a relation which was marked as having a category
295+
tags = self.relation_ways[osmid]
296+
categories = categories_tools.get_category(tags)
297+
# from relation
298+
osm_type = 3
299+
300+
# Calculate centroid of way
301+
refs = set(refs)
302+
sum_lat = 0
303+
sum_lng = 0
304+
way_valid = True
305+
for ref in refs:
306+
if ref not in self.nodes_store: # should never ha
307+
way_valid = False
308+
break
309+
lat, lng = self.nodes_store.get(ref)
310+
if lat is None or lng is None or np.isnan(lat) or np.isnan(lng):
311+
way_valid = False
312+
break
313+
sum_lat += lat
314+
sum_lng += lng
315+
if not way_valid:
316+
continue
317+
318+
self.process_ways_set.remove(osmid)
319+
centroid_lat = sum_lat / len(refs)
320+
centroid_lng = sum_lng / len(refs)
321+
try:
322+
self.create_poi(osm_type, osmid, [centroid_lat, centroid_lng], tags, categories)
323+
except Exception as e:
324+
logger.debug(e)
325+
self.failed = True
326+
return
327+
328+
228329
def create_poi(self, osm_type, osm_id, lat_lng, tags, categories=None):
229330
"""
230331
Creates a poi entity if a category is found. Stored afterwards.
@@ -257,6 +358,7 @@ def create_poi(self, osm_type, osm_id, lat_lng, tags, categories=None):
257358

258359
self.store_poi(PoiObject(osm_type, osm_id, lat_lng, categories))
259360

361+
260362
def store_poi(self, poi_object):
261363
"""
262364
Appends poi storage objects to buffer for bulk storage to database.
@@ -273,6 +375,7 @@ def store_poi(self, poi_object):
273375
logger.debug(f"Pois: {self.pois_count}, tags: {self.tags_cnt}, categories: {self.categories_cnt}")
274376
self.save_buffer()
275377

378+
276379
def store_tags(self, tags_object):
277380
"""
278381
Appends tags storage objects to buffer for bulk storage to database.
@@ -285,6 +388,7 @@ def store_tags(self, tags_object):
285388
value=tags_object.value
286389
))
287390

391+
288392
def store_categories(self, osmtype, osmid, category):
289393
"""
290394
Appends category storage objects to buffer for bulk storage to database.
@@ -296,6 +400,7 @@ def store_categories(self, osmtype, osmid, category):
296400
category=category
297401
))
298402

403+
299404
def save_buffer(self):
300405
"""
301406
Save POIs, tags and categories to database and clear buffer.

0 commit comments

Comments
 (0)