Skip to content

Commit 795d113

Browse files
authored
Merge pull request #31 from metacpan/mickey/watcher
Added watcher script
2 parents 5b31818 + 8da06c9 commit 795d113

File tree

2 files changed

+279
-0
lines changed

2 files changed

+279
-0
lines changed

bin/watcher.pl

Lines changed: 273 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,273 @@
1+
use strict;
2+
use warnings;
3+
use v5.36;
4+
5+
use Cpanel::JSON::XS qw< decode_json >;
6+
use DateTime ();
7+
use CPAN::DistnameInfo ();
8+
use FindBin ();
9+
use Getopt::Long;
10+
11+
use MetaCPAN::Logger qw< :log :dlog >;
12+
13+
use MetaCPAN::ES;
14+
use MetaCPAN::Ingest qw<
15+
cpan_dir
16+
read_recent_segment
17+
>;
18+
19+
# args
20+
my ( $backpan, $dry_run );
21+
GetOptions(
22+
"backpan" => \$backpan,
23+
"dry_run" => \$dry_run,
24+
);
25+
26+
# setup
27+
my $cpan = cpan_dir();
28+
my $es_release = MetaCPAN::ES->new( type => "release" );
29+
my $es_file = MetaCPAN::ES->new( type => "file" );
30+
31+
my $latest = 0;
32+
my @segments = qw(1h 6h 1d 1W 1M 1Q 1Y Z);
33+
34+
while (1) {
35+
$latest = eval { latest_release() };
36+
if ($@) {
37+
log_error {"getting latest release failed: $@"};
38+
sleep(15);
39+
next;
40+
}
41+
my @changes = $backpan ? backpan_changes() : changes();
42+
while ( my $release = pop(@changes) ) {
43+
$release->{type} eq 'delete'
44+
? reindex_release($release)
45+
: index_release($release);
46+
}
47+
last if $backpan;
48+
sleep(15);
49+
}
50+
51+
1;
52+
53+
###
54+
55+
sub changes () {
56+
my $now = DateTime->now->epoch;
57+
my $archive = $latest->archive;
58+
my %seen;
59+
my @changes;
60+
for my $segment (@segments) {
61+
log_debug {"Loading RECENT-$segment.json"};
62+
my $json = decode_json( read_recent_segment($segment) );
63+
for (
64+
grep {
65+
$_->{path}
66+
=~ /^authors\/id\/.*\.(tgz|tbz|tar[\._-]gz|tar\.bz2|tar\.Z|zip|7z)$/
67+
} grep { $backpan ? $_->{type} eq "delete" : 1 }
68+
@{ $json->{recent} }
69+
)
70+
{
71+
my $info = CPAN::DistnameInfo->new( $_->{path} );
72+
my $path = $info->cpanid . "/" . $info->filename;
73+
my $seen = $seen{$path};
74+
next
75+
if ( $seen
76+
&& ( $_->{type} eq $seen->{type} || $_->{type} eq 'delete' )
77+
);
78+
$seen{$path} = $_;
79+
if ( $_->{path} =~ /\/\Q$archive\E$/ ) {
80+
last;
81+
}
82+
push( @changes, $_ );
83+
}
84+
if ( !$backpan
85+
&& $json->{meta}->{minmax}->{min} < $latest->date->epoch )
86+
{
87+
log_debug {"Includes latest release"};
88+
last;
89+
}
90+
}
91+
return @changes;
92+
}
93+
94+
sub backpan_changes () {
95+
my $scroll_release = $es_release->scroll(
96+
size => 1000,
97+
scroll => '1m',
98+
fields => [qw< author archive >],
99+
body => {
100+
query => {
101+
filtered => {
102+
query => { match_all => {} },
103+
filter => {
104+
not => {
105+
filter => { term => { status => 'backpan' } }
106+
}
107+
}
108+
}
109+
}
110+
}
111+
);
112+
113+
my @changes;
114+
while ( my $release = $scroll_release->next ) {
115+
my $data = $release->{fields};
116+
my $path
117+
= $cpan->child( 'authors',
118+
MetaCPAN::Util::author_dir( $data->{author} ),
119+
$data->{archive} );
120+
121+
next if -e $path;
122+
log_debug {"$path not in the CPAN"};
123+
push( @changes, { path => $path, type => 'delete' } );
124+
}
125+
126+
return @changes;
127+
}
128+
129+
sub latest_release () {
130+
return undef if $backpan;
131+
132+
my $scroll_release = $es_release->scroll(
133+
scroll => '1m',
134+
body => {
135+
query => { match_all => {} },
136+
sort => { [ date => { order => 'desc' } ] },
137+
}
138+
);
139+
140+
return $scroll_release->next;
141+
}
142+
143+
sub index_release ($release) {
144+
my $archive = $cpan->child( $release->{path} )->stringify;
145+
for ( my $i = 0; $i < 15; $i++ ) {
146+
last if -e $archive;
147+
log_debug {"Archive $archive does not yet exist"};
148+
sleep(1);
149+
}
150+
151+
unless ( -e $archive ) {
152+
log_error {
153+
"Aborting, archive $archive not available after 15 seconds";
154+
};
155+
return;
156+
}
157+
158+
my @run = (
159+
$FindBin::RealBin . "/bin",
160+
'release', $archive, '--latest', '--queue'
161+
);
162+
163+
log_debug {"Running @run"};
164+
system(@run) unless ($dry_run);
165+
}
166+
167+
sub reindex_release_first ($info) {
168+
my $scroll_release = $es_release->scroll(
169+
scroll => '1m',
170+
body => {
171+
query => { match_all => {} },
172+
filter => {
173+
and => [
174+
{ term => { author => $info->cpanid } },
175+
{ term => { archive => $info->filename } },
176+
]
177+
},
178+
}
179+
);
180+
181+
return $scroll_release->next;
182+
}
183+
184+
sub reindex_release ($release) {
185+
my $info = CPAN::DistnameInfo->new( $release->{path} );
186+
$release = reindex_release_first($info);
187+
return unless ($release);
188+
log_info {"Moving $release->{_source}->{name} to BackPAN"};
189+
190+
my $scroll_file = $es_file->scroll( {
191+
scroll => '1m',
192+
size => 1000,
193+
fields => [qw< _parent _source >],
194+
body => {
195+
query => {
196+
filtered => {
197+
query => { match_all => {} },
198+
filter => {
199+
and => [
200+
{
201+
term => {
202+
release => $release->{_source}->{name}
203+
}
204+
},
205+
{
206+
term => {
207+
author => $release->{_source}->{author}
208+
}
209+
},
210+
]
211+
}
212+
}
213+
}
214+
}
215+
} );
216+
return if $dry_run;
217+
218+
my $bulk_release = $es_release->bulk();
219+
my $bulk_file = $es_file->bulk();
220+
221+
while ( my $row = $scroll_file->next ) {
222+
my $source = $row->{_source};
223+
$bulk_file->index( {
224+
id => $row->{_id},
225+
source => {
226+
$row->{fields}->{_parent}
227+
? ( parent => $row->{fields}->{_parent} )
228+
: (),
229+
%$source,
230+
status => 'backpan',
231+
}
232+
} );
233+
}
234+
235+
$bulk_release->index( {
236+
id => $release->{_id},
237+
source => {
238+
%{ $release->{_source} }, status => 'backpan',
239+
}
240+
} );
241+
242+
$bulk_release->flush;
243+
$bulk_file->flush;
244+
245+
# TODO - Call Fastly to purge
246+
# $self->purge_cpan_distnameinfos( [$info] );
247+
}
248+
249+
__END__
250+
251+
=pod
252+
253+
=head1 SYNOPSIS
254+
255+
# bin/metacpan watcher
256+
257+
=head1 DESCRIPTION
258+
259+
This script requires a local CPAN mirror. It watches the RECENT-*.json
260+
files for changes to the CPAN directory every 15 seconds. New uploads
261+
as well as deletions are processed sequentially.
262+
263+
=head1 OPTIONS
264+
265+
=head2 --backpan
266+
267+
This will look for the most recent release that has been deleted.
268+
From that point on, it will look in the RECENT files for new deletions
269+
and process them.
270+
271+
L<http://friendfeed.com/cpan>
272+
273+
=cut

lib/MetaCPAN/Ingest.pm

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ use Sub::Exporter -setup => {
3636
read_02packages_fh
3737
read_06perms_fh
3838
read_06perms_iter
39+
read_recent_segment
3940
read_url
4041
strip_pod
4142
tmp_dir
@@ -320,4 +321,9 @@ sub read_06perms_iter () {
320321
return $pp->module_iterator;
321322
}
322323

324+
sub read_recent_segment ( $segment ) {
325+
my $cpan = cpan_dir();
326+
return $cpan->child("RECENT-$segment.json")->slurp;
327+
}
328+
323329
1;

0 commit comments

Comments
 (0)