7
7
8
8
use CPAN::DistnameInfo;
9
9
use Parse::CPAN::Packages::Fast;
10
+ use Ref::Util qw< is_arrayref is_hashref > ;
10
11
use Regexp::Common qw< time > ;
11
12
use Time::Local qw< timelocal > ;
12
13
31
32
my $minion ;
32
33
$minion = minion() if $queue ;
33
34
35
+ my $node = config-> {config }{es_test_node };
36
+
34
37
run();
35
38
36
39
sub run () {
37
40
log_info {' Reading 02packages.details' };
38
41
39
-
40
- my $node = config-> {config }{es_test_node };
41
- my $es = MetaCPAN::ES-> new(
42
+ my $es = MetaCPAN::ES-> new(
42
43
type => " file" ,
43
44
node => $node
44
45
);
54
55
my $dist = $packages -> package($p )-> distribution-> dist;
55
56
push @filter , $p if $dist && $dist eq $d ;
56
57
}
57
- log_info { " $distribution consists of " . scalar (@filter ) . ' modules' };
58
+ log_info {
59
+ " $distribution consists of " . scalar (@filter ) . ' modules'
60
+ };
58
61
}
59
62
60
63
# if we are just queueing a single distribution
98
101
99
102
log_debug { sprintf ( " Found %s modules" , $scroll -> total ) };
100
103
log_debug { sprintf ( " Found %s total modules" , $found_total ) }
101
- if @$filter != $total and $filter == $module_filters -> [-1];
104
+ if @$filter != $total and $filter == $module_filters -> [-1];
102
105
103
106
my $i = 0;
104
107
109
112
my $file_data = $file -> {_source };
110
113
111
114
# Convert module name into Parse::CPAN::Packages::Fast::Package object.
112
- my @modules =
113
- grep { defined }
114
- map { eval { $packages -> package( $_ -> {name } ) } }
115
- @{ $file_data -> {module } };
115
+ my @modules = grep { defined }
116
+ map {
117
+ eval { $packages -> package( $_ -> {name } ) }
118
+ } @{ $file_data -> {module } };
116
119
117
120
# For each of the packages in this file...
118
- foreach my $module ( @modules ) {
121
+ foreach my $module (@modules ) {
119
122
120
123
# Get P:C:P:F:Distribution (CPAN::DistnameInfo) object for package.
121
124
my $dist = $module -> distribution;
122
125
123
- if ( $queue ) {
126
+ if ($queue ) {
124
127
my $d = $dist -> dist;
125
128
_queue_latest($d )
126
129
unless exists $queued_distributions {$d };
141
144
142
145
# If multiple versions of a dist appear in 02packages
143
146
# only mark the most recent upload as latest.
144
- next if ( $upgrade
145
- && compare_dates( $upgrade -> {date }, $file_data -> {date } ) );
147
+ next
148
+ if ( $upgrade
149
+ && compare_dates( $upgrade -> {date },
150
+ $file_data -> {date } ) );
146
151
$upgrade { $file_data -> {distribution } } = $file_data ;
147
152
}
148
153
elsif ( $file_data -> {status } eq ' latest' ) {
152
157
}
153
158
}
154
159
155
- my $bulk = $es -> bulk_helper ( type => ' file' );
160
+ my $bulk = $es -> bulk ( type => ' file' );
156
161
157
162
my %to_purge ;
158
163
164
169
165
170
$to_purge { $file_data -> {download_url } } = 1;
166
171
167
- reindex ( $bulk , $file_data , ' latest' );
172
+ _reindex ( $bulk , $file_data , ' latest' );
168
173
}
169
174
170
175
while ( my ( $release , $file_data ) = each %downgrade ) {
181
186
182
187
$to_purge { $file_data -> {download_url } } = 1;
183
188
184
- reindex ( $bulk , $file_data , ' cpan' );
189
+ _reindex ( $bulk , $file_data , ' cpan' );
185
190
}
186
191
187
192
$bulk -> flush;
188
- $es -> index_refresh() ;
193
+ $es -> index_refresh;
189
194
190
- # Call Fastly to purge
191
- # purge_cpan_distnameinfos( [ map CPAN::DistnameInfo->new($_), keys %to_purge ] );
195
+ # Call Fastly to purge
196
+ # purge_cpan_distnameinfos( [ map CPAN::DistnameInfo->new($_), keys %to_purge ] );
192
197
}
193
198
194
- =head2
195
-
196
- TODO: FIX: removing this comment causes syntax error ???!!!
197
-
198
- =end
199
-
200
199
sub _add_module_filters ($filter ) {
201
200
my @module_filters ;
202
201
if (@$filter ) {
@@ -224,13 +223,13 @@ ($filter)
224
223
return +{
225
224
query => {
226
225
filtered => {
227
- query => { match_all => {} },
226
+ query => { match_all => {} },
228
227
filter => {
229
228
bool => {
230
229
must => [
231
230
{
232
231
nested => {
233
- path => 'module',
232
+ path => ' module' ,
234
233
query => { bool => { must => $filter } }
235
234
}
236
235
},
@@ -256,52 +255,110 @@ ( $dist = $distribution )
256
255
);
257
256
}
258
257
259
- # Update the status for the release and all the files.
260
- sub reindex ( $bulk, $source, $status ) {
258
+ sub _get_release ( $es , $author , $name ) {
259
+ my $release = $es -> search(
260
+ body => {
261
+ query => {
262
+ bool => {
263
+ must => [
264
+ { term => { author => $author } },
265
+ { term => { name => $name } },
266
+ ]
267
+ }
268
+ }
269
+ },
270
+ fields => [qw< id name > ],
271
+ );
261
272
262
- =head2
273
+ return {}
274
+ unless is_arrayref( $release -> {hits }{hits } )
275
+ && is_hashref( $release -> {hits }{hits }[0] );
276
+
277
+ my $fields = $release -> {hits }{hits }[0]{fields };
278
+
279
+ return +{
280
+ id => $fields -> {id },
281
+ name => $fields -> {name }[0],
282
+ };
283
+ }
284
+
285
+ sub _set_release_status ( $es , $release_id , $status ) {
286
+ my $bulk = $es -> bulk();
287
+ $bulk -> update( { id => $release_id , doc => { status => $status } } );
288
+ $bulk -> flush;
289
+ }
290
+
291
+ # Update the status for the release and all the files.
292
+ sub _reindex ( $bulk , $source , $status ) {
263
293
264
294
# Update the status on the release.
265
- my $release = $self->index->type('release')->get( {
266
- author => $source->{author},
267
- name => $source->{release},
268
- } );
295
+ my $es_release = MetaCPAN::ES-> new(
296
+ type => " release" ,
297
+ node => $node
298
+ );
299
+
300
+ my $release
301
+ = _get_release( $es_release , $source -> {author }, $source -> {release } );
302
+
303
+ unless ( keys %$release ) {
304
+ log_info {
305
+ sprintf ( ' failed to fetch release: %s - %s' ,
306
+ $source -> {author }, $source -> {release } )
307
+ };
308
+ return ;
309
+ }
310
+
311
+ _set_release_status( $es_release , $release -> {id }, $status )
312
+ unless $dry_run ;
269
313
270
- $release->_set_status($status);
271
314
log_info {
272
315
$status eq ' latest' ? ' Upgrading ' : ' Downgrading ' ,
273
- 'release ', $release->name || q[];
316
+ ' release ' , $release -> { name }
274
317
};
275
- $release->put unless ( $dry_run );
276
318
277
319
# Get all the files for the release.
278
- my $scroll = $self->index->type("file")->search_type('scan')->filter( {
279
- bool => {
280
- must => [
281
- { term => { 'release' => $source->{release} } },
282
- { term => { 'author' => $source->{author} } }
283
- ]
284
- }
285
- } )->size(100)->source( [ 'status', 'file' ] )->raw->scroll;
320
+
321
+ my $es_file = MetaCPAN::ES-> new(
322
+ type => " file" ,
323
+ node => $node
324
+ );
325
+
326
+ my $scroll = $es_file -> scroll(
327
+ body => {
328
+ query => {
329
+ filtered => {
330
+ query => { match_all => {} },
331
+ filter => {
332
+ bool => {
333
+ must => [
334
+ {
335
+ term =>
336
+ { ' release' => $source -> {release } }
337
+ },
338
+ { term => { ' author' => $source -> {author } } },
339
+ ],
340
+ },
341
+ },
342
+ },
343
+ },
344
+ fields => [qw< name > ],
345
+ },
346
+ );
286
347
287
348
while ( my $row = $scroll -> next ) {
288
- my $source = $row->{_source};
289
349
log_trace {
290
- $status eq 'latest' ? 'Upgrading ' : 'Downgrading ',
291
- 'file ', $source->{name} || q[];
350
+ sprintf ( ' %s file %s' ,
351
+ ( $status eq ' latest' ? ' Upgrading' : ' Downgrading' ),
352
+ $row -> {fields }{name }[0] )
292
353
};
293
354
294
355
# Use bulk update to overwrite the status for X files at a time.
295
356
$bulk -> update( { id => $row -> {_id }, doc => { status => $status } } )
296
357
unless $dry_run ;
297
358
}
298
-
299
- =end
300
-
301
359
}
302
360
303
- sub compare_dates {
304
- my ( $self, $d1, $d2 ) = @_;
361
+ sub compare_dates ( $d1 , $d2 ) {
305
362
for ( $d1 , $d2 ) {
306
363
if ( $_ =~ / $RE {time}{iso}{-keep}/ ) {
307
364
$_ = timelocal( $7 , $6 , $5 , $4 , $3 - 1, $2 );
0 commit comments