33//! [symlink_format_manifest](https://docs.delta.io/delta-utility/#generate-a-manifest-file) file
44//! when needed for an external engine such as Presto or BigQuery.
55//!
6+ //! The "symlink_format_manifest" is not something that has been well documented, but for
7+ //! enon-partitioned tables this will generate a `_symlink_format_manifest/manifest` file next to
8+ //! the `_delta_log`, for example:
9+ //!
10+ //! ```
11+ //! COVID-19_NYT
12+ //! ├── _delta_log
13+ //! │ ├── 00000000000000000000.crc
14+ //! │ └── 00000000000000000000.json
15+ //! ├── part-00000-a496f40c-e091-413a-85f9-b1b69d4b3b4e-c000.snappy.parquet
16+ //! ├── part-00001-9d9d980b-c500-4f0b-bb96-771a515fbccc-c000.snappy.parquet
17+ //! ├── part-00002-8826af84-73bd-49a6-a4b9-e39ffed9c15a-c000.snappy.parquet
18+ //! ├── part-00003-539aff30-2349-4b0d-9726-c18630c6ad90-c000.snappy.parquet
19+ //! ├── part-00004-1bb9c3e3-c5b0-4d60-8420-23261f58a5eb-c000.snappy.parquet
20+ //! ├── part-00005-4d47f8ff-94db-4d32-806c-781a1cf123d2-c000.snappy.parquet
21+ //! ├── part-00006-d0ec7722-b30c-4e1c-92cd-b4fe8d3bb954-c000.snappy.parquet
22+ //! ├── part-00007-4582392f-9fc2-41b0-ba97-a74b3afc8239-c000.snappy.parquet
23+ //! └── _symlink_format_manifest
24+ //! └── manifest
25+ //! ```
26+ //!
27+ //! For partitioned tables, a `manifest` file will be generated inside a hive-style partitioned
28+ //! tree structure, e.g.:
29+ //!
30+ //! ```
31+ //! delta-0.8.0-partitioned
32+ //! ├── _delta_log
33+ //! │ └── 00000000000000000000.json
34+ //! ├── _symlink_format_manifest
35+ //! │ ├── year=2020
36+ //! │ │ ├── month=1
37+ //! │ │ │ └── day=1
38+ //! │ │ │ └── manifest
39+ //! │ │ └── month=2
40+ //! │ │ ├── day=3
41+ //! │ │ │ └── manifest
42+ //! │ │ └── day=5
43+ //! │ │ └── manifest
44+ //! │ └── year=2021
45+ //! │ ├── month=12
46+ //! │ │ ├── day=20
47+ //! │ │ │ └── manifest
48+ //! │ │ └── day=4
49+ //! │ │ └── manifest
50+ //! │ └── month=4
51+ //! │ └── day=5
52+ //! │ └── manifest
53+ //! ├── year=2020
54+ //! │ ├── month=1
55+ //! │ │ └── day=1
56+ //! │ │ └── part-00000-8eafa330-3be9-4a39-ad78-fd13c2027c7e.c000.snappy.parquet
57+ //! │ └── month=2
58+ //! │ ├── day=3
59+ //! │ │ └── part-00000-94d16827-f2fd-42cd-a060-f67ccc63ced9.c000.snappy.parquet
60+ //! │ └── day=5
61+ //! │ └── part-00000-89cdd4c8-2af7-4add-8ea3-3990b2f027b5.c000.snappy.parquet
62+ //! └── year=2021
63+ //! ├── month=12
64+ //! │ ├── day=20
65+ //! │ │ └── part-00000-9275fdf4-3961-4184-baa0-1c8a2bb98104.c000.snappy.parquet
66+ //! │ └── day=4
67+ //! │ └── part-00000-6dc763c0-3e8b-4d52-b19e-1f92af3fbb25.c000.snappy.parquet
68+ //! └── month=4
69+ //! └── day=5
70+ //! └── part-00000-c5856301-3439-4032-a6fc-22b7bc92bebb.c000.snappy.parquet
71+ //! ```
672use bytes:: { BufMut , BytesMut } ;
773use futures:: future:: BoxFuture ;
74+ use std:: collections:: HashMap ;
875use std:: sync:: Arc ;
976
10- use object_store:: path:: Path ;
77+ use itertools:: Itertools ;
78+ use object_store:: path:: { Path , PathPart } ;
1179use tracing:: log:: * ;
1280
1381use super :: { CustomExecuteHandler , Operation } ;
82+ use crate :: kernel:: scalars:: ScalarExt ;
1483use crate :: kernel:: { resolve_snapshot, EagerSnapshot } ;
1584use crate :: logstore:: object_store:: PutPayload ;
1685use crate :: logstore:: LogStoreRef ;
1786use crate :: table:: state:: DeltaTableState ;
18- use crate :: { DeltaResult , DeltaTable } ;
87+ use crate :: { DeltaResult , DeltaTable , DeltaTableError } ;
1988
2089/// Simple builder to generate the manifest
2190#[ derive( Clone ) ]
@@ -56,7 +125,8 @@ impl std::future::IntoFuture for GenerateBuilder {
56125 let this = self ;
57126 Box :: pin ( async move {
58127 let snapshot = resolve_snapshot ( this. log_store ( ) , this. snapshot . clone ( ) , true ) . await ?;
59- let mut payload = BytesMut :: new ( ) ;
128+ let mut payloads = HashMap :: new ( ) ;
129+ let manifest_part = PathPart :: parse ( "manifest" ) . expect ( "This is not possible" ) ;
60130
61131 for add in this
62132 . snapshot
@@ -65,17 +135,44 @@ impl std::future::IntoFuture for GenerateBuilder {
65135 . log_data ( )
66136 . into_iter ( )
67137 {
68- let uri = this. log_store ( ) . to_uri ( & add. object_store_path ( ) ) ;
69- trace ! ( "Prepare {uri} for the symlink_format_manifest" ) ;
70- payload. put ( uri. as_bytes ( ) ) ;
71- payload. put_u8 ( b'\n' ) ;
138+ let path = add. object_store_path ( ) ;
139+ // The output_path is more or less the tree structure as the original file, just
140+ // inside the _symlink_format_manifest directory. This makes it easier to avoid
141+ // messing with partition values on the action
142+ let output_path = Path :: from_iter (
143+ std:: iter:: once ( PathPart :: parse ( "_symlink_format_manifest" ) . map_err ( |e| {
144+ DeltaTableError :: GenericError {
145+ source : Box :: new ( e) ,
146+ }
147+ } ) ?)
148+ . chain ( path. parts ( ) . filter ( |p| path. filename ( ) != Some ( p. as_ref ( ) ) ) )
149+ . chain ( std:: iter:: once ( manifest_part. clone ( ) ) ) ,
150+ ) ;
151+ trace ! ( "Computed output path for add action: {output_path:?}" ) ;
152+ if !payloads. contains_key ( & output_path) {
153+ payloads. insert ( output_path. clone ( ) , BytesMut :: new ( ) ) ;
154+ }
155+
156+ if let Some ( payload) = payloads. get_mut ( & output_path) {
157+ let uri = this. log_store ( ) . to_uri ( & path) ;
158+ trace ! ( "Prepare {uri} for the symlink_format_manifest" ) ;
159+ payload. put ( uri. as_bytes ( ) ) ;
160+ payload. put_u8 ( b'\n' ) ;
161+ }
162+ }
163+ debug ! ( "Total of {} manifest files prepared" , payloads. len( ) ) ;
164+ for ( path, payload) in payloads. drain ( ) {
165+ debug ! (
166+ "Generated manifest for {:?} is {} bytes" ,
167+ path,
168+ payload. len( )
169+ ) ;
170+ let payload = PutPayload :: from ( payload. freeze ( ) ) ;
171+ this. log_store ( )
172+ . object_store ( None )
173+ . put ( & path, payload)
174+ . await ?;
72175 }
73- debug ! ( "Generate manifest {} bytes prepared" , payload. len( ) ) ;
74- let payload = PutPayload :: from ( payload. freeze ( ) ) ;
75- this. log_store ( )
76- . object_store ( None )
77- . put ( & Path :: from ( "_symlink_format_manifest/manifest" ) , payload)
78- . await ?;
79176 Ok ( DeltaTable :: new_with_state (
80177 this. log_store ( ) . clone ( ) ,
81178 DeltaTableState :: new ( snapshot) ,
@@ -88,15 +185,14 @@ impl std::future::IntoFuture for GenerateBuilder {
88185mod tests {
89186 use super :: * ;
90187
188+ use futures:: StreamExt ;
189+
91190 use crate :: kernel:: schema:: { DataType , PrimitiveType } ;
92- use crate :: kernel:: Action ;
191+ use crate :: kernel:: { Action , Add } ;
93192 use crate :: DeltaOps ;
94193
95- use futures:: StreamExt ;
96-
97194 #[ tokio:: test]
98195 async fn test_generate ( ) -> DeltaResult < ( ) > {
99- use crate :: kernel:: Add ;
100196 let actions = vec ! [ Action :: Add ( Add {
101197 path: "some-files.parquet" . into( ) ,
102198 ..Default :: default ( )
@@ -125,4 +221,49 @@ mod tests {
125221 assert ! ( found, "The _symlink_format_manifest/manifest was not found in the Delta table's object store prefix" ) ;
126222 Ok ( ( ) )
127223 }
224+
225+ #[ tokio:: test]
226+ async fn test_generate_with_partitions ( ) -> DeltaResult < ( ) > {
227+ use crate :: kernel:: Add ;
228+ let actions = vec ! [ Action :: Add ( Add {
229+ path: "locale=us/some-files.parquet" . into( ) ,
230+ partition_values: HashMap :: from( [ ( "locale" . to_string( ) , Some ( "us" . to_string( ) ) ) ] ) ,
231+ ..Default :: default ( )
232+ } ) ] ;
233+ let table = DeltaOps :: new_in_memory ( )
234+ . create ( )
235+ . with_column ( "id" , DataType :: Primitive ( PrimitiveType :: Long ) , true , None )
236+ . with_column (
237+ "locale" ,
238+ DataType :: Primitive ( PrimitiveType :: String ) ,
239+ true ,
240+ None ,
241+ )
242+ . with_partition_columns ( vec ! [ "locale" ] )
243+ . with_actions ( actions)
244+ . await ?;
245+
246+ let generate = GenerateBuilder :: new ( table. log_store ( ) , table. state . map ( |s| s. snapshot ) ) ;
247+ let table = generate. await ?;
248+
249+ let store = table. log_store ( ) . object_store ( None ) ;
250+ let mut stream = store. list ( None ) ;
251+ let mut found = false ;
252+ while let Some ( meta) = stream. next ( ) . await . transpose ( ) . unwrap ( ) {
253+ // Printing out the files so the failed assertion below will include the actual
254+ // contents of the table's prefix in the log
255+ println ! ( "Name: {}, size: {}" , meta. location, meta. size) ;
256+ if meta. location == Path :: from ( "_symlink_format_manifest/locale=us/manifest" ) {
257+ found = true ;
258+ break ;
259+ }
260+ assert_ne ! (
261+ meta. location,
262+ Path :: from( "_symlink_format_manifest/manifest" ) ,
263+ "The 'root' manifest file is not expected in a partitioned table!"
264+ ) ;
265+ }
266+ assert ! ( found, "The _symlink_format_manifest/manifest was not found in the Delta table's object store prefix" ) ;
267+ Ok ( ( ) )
268+ }
128269}
0 commit comments