1
1
import docker
2
2
from loguru import logger
3
3
from pathlib import Path
4
- from typing import Dict , List , Optional , Any , Literal
4
+ from typing import Dict , List , Optional , Any , Literal , Set , Tuple
5
5
import time
6
6
from docker .errors import APIError , ImageNotFound
7
7
import asyncio
15
15
from app .utils .generate_id import generate_id
16
16
import aiodocker
17
17
import json
18
+ import os
18
19
19
20
from ..shared .config import get_settings
20
21
from .database import db_manager
@@ -30,6 +31,16 @@ class ContainerMetrics:
30
31
cpu_usage : float = 0.0
31
32
32
33
34
+ @dataclass
35
+ class FileState :
36
+ """Tracks the state of a file for change detection."""
37
+ path : Path
38
+ size : int
39
+ mtime : float
40
+ md5_hash : str
41
+ exists : bool = True
42
+
43
+
33
44
class DockerExecutor :
34
45
"""Executes code in Docker containers with file management."""
35
46
@@ -96,6 +107,94 @@ def _file_lock(self, path: Path):
96
107
lock_file .close ()
97
108
lock_path .unlink (missing_ok = True )
98
109
110
+ def _scan_directory (self , directory : Path ) -> Dict [str , FileState ]:
111
+ """
112
+ Recursively scan a directory and collect file states.
113
+ Returns a dictionary mapping relative file paths to their FileState objects.
114
+ """
115
+ file_states = {}
116
+
117
+ if not directory .exists ():
118
+ logger .warning (f"Directory { directory } does not exist" )
119
+ return file_states
120
+
121
+ # Walk through the directory recursively
122
+ for root , _ , files in os .walk (directory ):
123
+ root_path = Path (root )
124
+
125
+ # Compute relative path from the base directory
126
+ rel_root = root_path .relative_to (directory )
127
+
128
+ for filename in files :
129
+ # Skip lock files
130
+ if filename .endswith ('.lock' ):
131
+ continue
132
+
133
+ file_path = root_path / filename
134
+
135
+ # Compute relative path for dictionary key
136
+ if rel_root == Path ('.' ):
137
+ rel_path = filename
138
+ else :
139
+ rel_path = str (rel_root / filename )
140
+
141
+ try :
142
+ # Get file stats
143
+ stat = file_path .stat ()
144
+ size = stat .st_size
145
+ mtime = stat .st_mtime
146
+
147
+ # Calculate MD5 hash for content comparison
148
+ md5_hash = hashlib .md5 (file_path .read_bytes ()).hexdigest ()
149
+
150
+ # Store file state
151
+ file_states [rel_path ] = FileState (
152
+ path = file_path ,
153
+ size = size ,
154
+ mtime = mtime ,
155
+ md5_hash = md5_hash
156
+ )
157
+ logger .debug (f"Scanned file: { rel_path } , size: { size } , hash: { md5_hash } " )
158
+ except (PermissionError , FileNotFoundError ) as e :
159
+ logger .warning (f"Error scanning file { file_path } : { str (e )} " )
160
+ continue
161
+
162
+ return file_states
163
+
164
+ def _find_changed_files (self ,
165
+ before_states : Dict [str , FileState ],
166
+ after_states : Dict [str , FileState ]) -> Set [str ]:
167
+ """
168
+ Compare before and after file states to identify new or modified files.
169
+ Returns a set of relative paths of changed files.
170
+ """
171
+ changed_files = set ()
172
+
173
+ # Find new or modified files
174
+ for rel_path , after_state in after_states .items ():
175
+ if rel_path not in before_states :
176
+ # New file
177
+ logger .info (f"New file detected: { rel_path } " )
178
+ changed_files .add (rel_path )
179
+ else :
180
+ before_state = before_states [rel_path ]
181
+ # Check if file was modified (size, hash, or timestamp changed)
182
+ if (before_state .size != after_state .size or
183
+ before_state .md5_hash != after_state .md5_hash ):
184
+ logger .info (f"Modified file detected: { rel_path } , before={ before_state .size } :{ before_state .md5_hash } , after={ after_state .size } :{ after_state .md5_hash } " )
185
+ changed_files .add (rel_path )
186
+ else :
187
+ logger .info (f"Unchanged file: { rel_path } , size={ after_state .size } , hash={ after_state .md5_hash } " )
188
+
189
+ # Add debug logs for summarizing scan results
190
+ for rel_path in before_states :
191
+ if rel_path not in after_states :
192
+ logger .info (f"File deleted: { rel_path } " )
193
+
194
+ logger .info (f"Before scan: { len (before_states )} files, After scan: { len (after_states )} files, Changed: { len (changed_files )} files" )
195
+
196
+ return changed_files
197
+
99
198
async def _update_container_metrics (self , container ) -> None :
100
199
"""Update metrics for a running container."""
101
200
try :
@@ -200,6 +299,11 @@ async def execute(
200
299
logger .info (f"Session directory contents: { list (session_path .glob ('*' ))} " )
201
300
logger .info (f"Code to execute: { code } " )
202
301
302
+ # Scan directory before execution to track file state
303
+ logger .info (f"Scanning directory { session_path } before code execution" )
304
+ before_file_states = self ._scan_directory (session_path )
305
+ logger .info (f"Found { len (before_file_states )} files before execution" )
306
+
203
307
async with self ._container_semaphore :
204
308
try :
205
309
# Ensure the image is available
@@ -333,32 +437,46 @@ async def execute(
333
437
if exec_inspect ["ExitCode" ] != 0 :
334
438
return {"stdout" : "" , "stderr" : output_text , "status" : "error" , "files" : []}
335
439
336
- # List files in the session directory
440
+ # Scan directory after execution to detect changes
441
+ logger .info (f"Scanning directory { session_path } after code execution" )
442
+ after_file_states = self ._scan_directory (session_path )
443
+ logger .info (f"Found { len (after_file_states )} files after execution" )
444
+
445
+ # Identify changed files
446
+ changed_file_paths = self ._find_changed_files (before_file_states , after_file_states )
447
+ logger .info (f"Detected { len (changed_file_paths )} changed files: { changed_file_paths } " )
448
+
449
+ # Process only new or modified files
337
450
output_files = []
338
451
existing_filenames = {file ["name" ] for file in (files or [])}
339
452
logger .info (f"Existing filenames: { existing_filenames } " )
340
- logger .info (f"Scanning directory { session_path } for created files" )
341
- for file_path in session_path .glob ("*" ):
342
- if file_path .is_file () and file_path .name not in existing_filenames :
453
+
454
+ for rel_path in changed_file_paths :
455
+ file_path = session_path / rel_path
456
+ if file_path .is_file ():
343
457
file_id = generate_id ()
344
458
file_size = file_path .stat ().st_size
345
- logger .info (f"Found new file: { file_path } , size: { file_size } " )
459
+ logger .info (f"Processing changed file: { file_path } , size: { file_size } " )
346
460
347
461
# Calculate file metadata
348
462
content_type , _ = mimetypes .guess_type (file_path .name ) or ("application/octet-stream" , None )
349
463
etag = hashlib .md5 (str (file_path .stat ().st_mtime ).encode ()).hexdigest ()
350
464
351
465
# Prepare file data for database
466
+ # Use directory structure in filepath if present
467
+ filepath = f"{ session_id } /{ rel_path } "
468
+ filename = Path (rel_path ).name
469
+
352
470
file_data = {
353
471
"id" : file_id ,
354
472
"session_id" : session_id ,
355
- "filename" : file_path .name ,
356
- "filepath" : session_id + "/" + file_path . name ,
473
+ "filename" : filename , # This is used by the API to convert to FileRef .name
474
+ "filepath" : filepath ,
357
475
"size" : file_size ,
358
476
"content_type" : content_type ,
359
- "original_filename" : file_path . name ,
477
+ "original_filename" : filename ,
360
478
"etag" : etag ,
361
- "name" : f"{ session_id } /{ file_id } /{ file_path . name } " ,
479
+ "name" : f"{ session_id } /{ file_id } /{ filename } " , # Full path for storage/reference
362
480
}
363
481
logger .info (f"Saving file metadata to database: { file_data } " )
364
482
0 commit comments