1+ <?php
2+
3+ namespace EcomDev \MySQL2JSONL \Command ;
4+
5+ use Amp \Pipeline \Queue ;
6+ use EcomDev \MySQL2JSONL \Configuration ;
7+ use EcomDev \MySQL2JSONL \ConfigurationException ;
8+ use EcomDev \MySQL2JSONL \ExportTableFactory ;
9+ use EcomDev \MySQL2JSONL \Progress \ExportProgressNotifier ;
10+ use Revolt \EventLoop ;
11+ use Symfony \Component \Console \Attribute \AsCommand ;
12+ use Symfony \Component \Console \Command \Command ;
13+ use Symfony \Component \Console \Helper \FormatterHelper ;
14+ use Symfony \Component \Console \Input \InputArgument ;
15+ use Symfony \Component \Console \Input \InputInterface ;
16+ use Symfony \Component \Console \Output \OutputInterface ;
17+ use Symfony \Component \Console \Input \InputOption ;
18+ use function Amp \async ;
19+ use function Amp \delay ;
20+ use function Amp \Future \awaitAll ;
21+
22+ #[AsCommand(name: 'export ' , description: 'Export data from MySQL to a directory with JSONL files ' )]
23+ class ExportCommand extends Command
24+ {
25+ public function configure ()
26+ {
27+ $ this ->addOption (
28+ 'config ' ,
29+ 'c ' , InputOption::VALUE_REQUIRED ,
30+ 'Configuration file ' ,
31+ 'config.json '
32+ );
33+
34+ $ this ->addArgument (
35+ 'directory ' ,
36+ InputArgument::OPTIONAL ,
37+ 'Directory to export data to ' ,
38+ './data-dump '
39+ );
40+ }
41+
42+ public function execute (InputInterface $ input , OutputInterface $ output ): int
43+ {
44+ $ file = $ input ->getOption ('config ' );
45+ if (!file_exists ($ file )) {
46+ /* @var FormatterHelper $formatter*/
47+ $ formatter = $ this ->getHelper ('formatter ' );
48+ $ output ->write ($ formatter ->formatSection ('Error ' , sprintf (
49+ 'Configuration file %s does not exist ' ,
50+ $ file
51+ ), 'error ' ));
52+ return 1 ;
53+ }
54+ try {
55+ $ config = Configuration::fromJSON (file_get_contents ($ file ));
56+ } catch (ConfigurationException $ error ) {
57+ $ error ->output ($ output );
58+ return 1 ;
59+ }
60+
61+ $ notifiers = new ExportProgressNotifier ($ output );
62+ $ futures = [];
63+ $ connectionPool = $ config ->createConnectionPool ();
64+ $ factory = new ExportTableFactory ($ connectionPool , $ notifiers );
65+ foreach ($ factory ->tablesToExport ($ config ) as $ table ) {
66+ $ queue = new Queue (100 );
67+
68+ $ futures [] = async (function () use ($ factory , $ table , $ queue ) {
69+ $ factory ->createExport ($ table )->run ($ queue );
70+ });
71+
72+ $ futures [] = async (function () use ($ queue , $ output ) {
73+ foreach ($ queue ->iterate () as $ item ) {
74+ }
75+ });
76+
77+ if (count ($ futures ) >= 100 ) {
78+ awaitAll ($ futures );
79+ $ futures = [];
80+ }
81+ }
82+
83+ EventLoop::run ();
84+ return 0 ;
85+ }
86+ }
0 commit comments