@@ -2,7 +2,9 @@ package alpakka.file
2
2
3
3
import alpakka .file .uploader .DirectoryWatcher
4
4
import org .apache .commons .io .FileUtils
5
+ import org .scalatest .concurrent .Eventually
5
6
import org .scalatest .matchers .should .Matchers
7
+ import org .scalatest .time .{Seconds , Span }
6
8
import org .scalatest .wordspec .AsyncWordSpec
7
9
import org .scalatest .{BeforeAndAfterAll , BeforeAndAfterEachTestData , TestData }
8
10
import org .slf4j .{Logger , LoggerFactory }
@@ -18,83 +20,134 @@ import scala.util.Random
18
20
* Hence we:
19
21
* - create the dir structure and copy files before each test
20
22
* - clean up dir structure after each test
21
- * - use a shared watcher instance for all tests
22
23
*/
23
- final class DirectoryWatcherSpec extends AsyncWordSpec with Matchers with BeforeAndAfterAll with BeforeAndAfterEachTestData {
24
+ final class DirectoryWatcherSpec extends AsyncWordSpec with Matchers with BeforeAndAfterAll with BeforeAndAfterEachTestData with Eventually {
24
25
val logger : Logger = LoggerFactory .getLogger(this .getClass)
25
-
26
- var watcher : DirectoryWatcher = _
26
+ val defaultTimeout : FiniteDuration = 5 .seconds
27
+ implicit val patience : PatienceConfig = PatienceConfig (timeout = Span ( 20 , Seconds ))
27
28
var tmpRootDir : Path = _
28
29
var uploadDir : Path = _
29
30
var processedDir : Path = _
30
31
32
+ case class WatcherFixture (watcher : DirectoryWatcher ) {
33
+ def withWatcher [T ](testCode : DirectoryWatcher => T ): T = {
34
+ try {
35
+ testCode(watcher)
36
+ } finally {
37
+ Await .result(watcher.stop(), defaultTimeout)
38
+ }
39
+ }
40
+ }
41
+
31
42
" DirectoryWatcher" should {
32
43
" detect_files_on_startup_in_parent_dir" in {
33
- watcher = DirectoryWatcher (uploadDir, processedDir)
34
- waitForCondition(3 .seconds)(watcher.countFilesProcessed() == 2 ) shouldBe true
44
+ WatcherFixture (DirectoryWatcher (uploadDir, processedDir))
45
+ .withWatcher { watcher =>
46
+ eventually {
47
+ watcher.countFilesProcessed() shouldBe 2
48
+ }
49
+ }
35
50
}
36
51
52
+
37
53
" detect_added_file_at_runtime_in_parent_dir" in {
38
- watcher = DirectoryWatcher (uploadDir, processedDir)
39
54
copyTestFileToDir(uploadDir)
40
- waitForCondition(3 .seconds)(watcher.countFilesProcessed() == 2 + 1 ) shouldBe true
55
+ WatcherFixture (DirectoryWatcher (uploadDir, processedDir))
56
+ .withWatcher { watcher =>
57
+ eventually {
58
+ watcher.countFilesProcessed() shouldBe 2 + 1
59
+ }
60
+ }
41
61
}
42
62
43
63
" detect_added_files_at_runtime_in_sub_dir" in {
44
- watcher = DirectoryWatcher (uploadDir, processedDir)
45
64
copyTestFileToDir(uploadDir.resolve(" subdir" ))
46
- waitForCondition(3 .seconds)(watcher.countFilesProcessed() == 2 + 1 ) shouldBe true
65
+ WatcherFixture (DirectoryWatcher (uploadDir, processedDir))
66
+ .withWatcher { watcher =>
67
+ eventually {
68
+ watcher.countFilesProcessed() shouldBe 2 + 1
69
+ }
70
+ }
47
71
}
48
72
49
73
" detect_added_nested_subdir_at_runtime_with_files_in_subdir" in {
50
- watcher = DirectoryWatcher (uploadDir, processedDir)
51
- val tmpDir = Files .createTempDirectory(" tmp" )
52
- val sourcePath = Paths .get(" src/main/resources/testfile.jpg" )
53
- val targetPath = tmpDir.resolve(createUniqueFileName(sourcePath.getFileName))
54
- val targetPath2 = tmpDir.resolve(createUniqueFileName(sourcePath.getFileName))
55
- Files .copy(sourcePath, targetPath)
56
- Files .copy(sourcePath, targetPath2)
57
- val targetDir = Files .createDirectories(uploadDir.resolve(" subdir" ).resolve(" nestedDirWithFiles" ))
58
- FileUtils .copyDirectory(tmpDir.toFile, targetDir.toFile)
59
- waitForCondition(3 .seconds)(watcher.countFilesProcessed() == 2 + 2 ) shouldBe true
74
+ val tmpDir = Files .createTempDirectory(" tmp" )
75
+ val sourcePath = Paths .get(" src/main/resources/testfile.jpg" )
76
+ val targetPath = tmpDir.resolve(createUniqueFileName(sourcePath.getFileName))
77
+ val targetPath2 = tmpDir.resolve(createUniqueFileName(sourcePath.getFileName))
78
+ Files .copy(sourcePath, targetPath)
79
+ Files .copy(sourcePath, targetPath2)
80
+ val targetDir = Files .createDirectories(uploadDir.resolve(" subdir" ).resolve(" nestedDirWithFiles" ))
81
+ FileUtils .copyDirectory(tmpDir.toFile, targetDir.toFile)
82
+ WatcherFixture (DirectoryWatcher (uploadDir, processedDir))
83
+ .withWatcher { watcher =>
84
+ eventually {
85
+ watcher.countFilesProcessed() shouldBe 2 + 2
86
+ }
87
+ }
60
88
}
61
89
62
90
" handle_large_number_of_files_in_parent_dir" in {
63
91
(1 to 1000 ).foreach(_ => copyTestFileToDir(uploadDir))
64
- watcher = DirectoryWatcher (uploadDir, processedDir)
65
- waitForCondition(20 .seconds)(watcher.countFilesProcessed() == 2 + 1000 ) shouldBe true
92
+ WatcherFixture (DirectoryWatcher (uploadDir, processedDir))
93
+ .withWatcher { watcher =>
94
+ eventually {
95
+ watcher.countFilesProcessed() shouldBe 2 + 1000
96
+ }
97
+ }
66
98
}
67
99
68
100
" handle_invalid_parent_directory_path" in {
69
101
val invalidParentDir = Paths .get(" /path/to/non-existent/directory" )
70
102
val processedDir = Files .createTempDirectory(" processed" )
71
103
72
104
the[IllegalArgumentException ] thrownBy {
73
- watcher = DirectoryWatcher (invalidParentDir, processedDir)
105
+ DirectoryWatcher (invalidParentDir, processedDir)
74
106
} should have message s " Invalid upload directory path: $invalidParentDir"
75
107
}
76
108
}
77
109
78
110
override protected def beforeEach (testData : TestData ): Unit = {
79
111
logger.info(s " Starting test: ${testData.name}" )
80
112
81
- tmpRootDir = Files .createTempDirectory(testData.text)
82
- logger.info(s " Created tmp root dir: $tmpRootDir" )
83
-
84
- uploadDir = tmpRootDir.resolve(" upload" )
85
- processedDir = tmpRootDir.resolve(" processed" )
86
- Files .createDirectories(uploadDir)
87
- Files .createDirectories(uploadDir.resolve(" subdir" ))
88
- Files .createDirectories(processedDir)
113
+ def withDirectoryCreation [T ](action : => T ): T = {
114
+ try {
115
+ tmpRootDir = Files .createTempDirectory(testData.text)
116
+ logger.info(s " Created tmp root dir: $tmpRootDir" )
117
+
118
+ uploadDir = tmpRootDir.resolve(" upload" )
119
+ processedDir = tmpRootDir.resolve(" processed" )
120
+
121
+ Files .createDirectories(uploadDir)
122
+ Files .createDirectories(uploadDir.resolve(" subdir" ))
123
+ Files .createDirectories(processedDir)
124
+
125
+ action
126
+ } catch {
127
+ case ex : Exception =>
128
+ logger.error(s " Failed to set up test directories: ${ex.getMessage}" )
129
+ if (tmpRootDir != null ) {
130
+ FileUtils .deleteDirectory(tmpRootDir.toFile)
131
+ }
132
+ throw ex
133
+ }
134
+ }
89
135
90
- // Populate dirs BEFORE startup
91
- copyTestFileToDir(tmpRootDir.resolve(" upload" ))
92
- copyTestFileToDir(tmpRootDir.resolve(" upload/subdir" ))
136
+ withDirectoryCreation {
137
+ // Populate dirs BEFORE startup
138
+ try {
139
+ copyTestFileToDir(uploadDir)
140
+ copyTestFileToDir(uploadDir.resolve(" subdir" ))
141
+ } catch {
142
+ case ex : Exception =>
143
+ logger.error(s " Failed to copy test files: ${ex.getMessage}" )
144
+ throw ex
145
+ }
146
+ }
93
147
}
94
148
95
149
override protected def afterEach (testData : TestData ): Unit = {
96
150
logger.info(s " Cleaning up after test: ${testData.name}" )
97
- if (watcher != null ) Await .result(watcher.stop(), 5 .seconds)
98
151
FileUtils .deleteDirectory(tmpRootDir.toFile)
99
152
logger.info(s " Finished test: ${testData.name}" )
100
153
}
@@ -104,21 +157,8 @@ final class DirectoryWatcherSpec extends AsyncWordSpec with Matchers with Before
104
157
val targetPath = target.resolve(createUniqueFileName(createUniqueFileName(sourcePath.getFileName)))
105
158
Files .copy(sourcePath, targetPath)
106
159
}
107
-
108
160
private def createUniqueFileName (fileName : Path ) = {
109
161
val parts = fileName.toString.split('.' ).map(_.trim)
110
162
Paths .get(s " ${parts.head}${Random .nextInt()}. ${parts.reverse.head}" )
111
163
}
112
-
113
- private def waitForCondition (maxDuration : FiniteDuration )(condition : => Boolean ): Boolean = {
114
- val startTime = System .currentTimeMillis()
115
- var elapsed = 0 .millis
116
-
117
- while (! condition && elapsed < maxDuration) {
118
- Thread .sleep(100 )
119
- elapsed = (System .currentTimeMillis() - startTime).millis
120
- }
121
- logger.info(" Condition reached after: {} ms" , elapsed.toMillis)
122
- condition
123
- }
124
- }
164
+ }
0 commit comments