Skip to content

latest: more code ported in the run() method #15

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
May 20, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
169 changes: 72 additions & 97 deletions bin/latest.pl
Original file line number Diff line number Diff line change
Expand Up @@ -26,18 +26,23 @@
"queue" => \$queue,
);

# run

log_info {'Dry run: updates will not be written to ES'} if $dry_run;

my $minion;
$minion = minion() if $queue;

run();

sub run {
sub run () {
log_info {'Reading 02packages.details'};


my $node = config->{config}{es_test_node};
my $es = MetaCPAN::ES->new(
type => "file",
node => $node
);

my $packages = read_02packages();

# If a distribution name is passed get all the package names
Expand All @@ -47,12 +52,9 @@ sub run {
if ( my $d = $distribution ) {
for my $p ( $packages->packages ) {
my $dist = $packages->package($p)->distribution->dist;
push @filter, $p
if $dist && $dist eq $d;
push @filter, $p if $dist && $dist eq $d;
}
log_info {
"$distribution consists of " . scalar(@filter) . ' modules'
};
log_info { "$distribution consists of " . scalar(@filter) . ' modules' };
}

# if we are just queueing a single distribution
Expand All @@ -73,7 +75,6 @@ sub run {
my $module_filters = _add_module_filters( \@filter );

for my $filter (@$module_filters) {

log_debug {
sprintf( "Searching for %d of %d modules",
scalar(@$filter), $total )
Expand All @@ -86,8 +87,6 @@ sub run {
# indexed (the 'leading' module)
my $q_body = _body_filtered_query($filter);

my $node = config->{config}{es_test_node};
my $es = MetaCPAN::ES->new( type => "file", node => $node );
my $scroll = $es->scroll(
body => $q_body,
size => 100,
Expand All @@ -100,19 +99,8 @@ sub run {
log_debug { sprintf( "Found %s modules", $scroll->total ) };
log_debug { sprintf( "Found %s total modules", $found_total ) }
if @$filter != $total and $filter == $module_filters->[-1];
exit;

my $i = 0;
while ( my $file = $scroll->next ) {
use DDP;
&p( [$file] );
}
exit;

}
}

=head2

# For each file...
while ( my $file = $scroll->next ) {
Expand All @@ -121,20 +109,20 @@ sub run {
my $file_data = $file->{_source};

# Convert module name into Parse::CPAN::Packages::Fast::Package object.
my @modules = grep {defined}
map {
eval { $p->package( $_->{name} ) }
} @{ $file_data->{module} };
my @modules =
grep { defined }
map { eval { $packages->package( $_->{name} ) } }
@{ $file_data->{module} };

# For each of the packages in this file...
foreach my $module (@modules) {
foreach my $module ( @modules ) {

# Get P:C:P:F:Distribution (CPAN::DistnameInfo) object for package.
my $dist = $module->distribution;

if ( $self->queue ) {
if ( $queue ) {
my $d = $dist->dist;
$self->_queue_latest($d)
_queue_latest($d)
unless exists $queued_distributions{$d};
$queued_distributions{$d} = 1;
next;
Expand All @@ -153,13 +141,8 @@ sub run {

# If multiple versions of a dist appear in 02packages
# only mark the most recent upload as latest.
next
if (
$upgrade
&& $self->compare_dates(
$upgrade->{date}, $file_data->{date}
)
);
next if ( $upgrade
&& compare_dates( $upgrade->{date}, $file_data->{date} ) );
$upgrade{ $file_data->{distribution} } = $file_data;
}
elsif ( $file_data->{status} eq 'latest' ) {
Expand All @@ -169,13 +152,50 @@ sub run {
}
}

=cut
my $bulk = $es->bulk_helper( type => 'file' );

my %to_purge;

while ( my ( $dist, $file_data ) = each %upgrade ) {

# Don't reindex if already marked as latest.
# This just means that it hasn't changed (query includes 'latest').
next if ( !$force and $file_data->{status} eq 'latest' );

#$bulk->flush;
$to_purge{ $file_data->{download_url} } = 1;

#$es->index_refresh();
reindex( $bulk, $file_data, 'latest' );
}

# subs
while ( my ( $release, $file_data ) = each %downgrade ) {

# Don't downgrade if this release version is also marked as latest.
# This could happen if a module is moved to a new dist
# but the old dist remains (with other packages).
# This could also include bug fixes in our indexer, PAUSE, etc.
next
if ( !$force
&& $upgrade{ $file_data->{distribution} }
&& $upgrade{ $file_data->{distribution} }->{release} eq
$file_data->{release} );

$to_purge{ $file_data->{download_url} } = 1;

reindex( $bulk, $file_data, 'cpan' );
}

$bulk->flush;
$es->index_refresh();

# Call Fastly to purge
# purge_cpan_distnameinfos( [ map CPAN::DistnameInfo->new($_), keys %to_purge ] );
}

=head2

TODO: FIX: removing this comment causes syntax error ???!!!

=end

sub _add_module_filters ($filter) {
my @module_filters;
Expand Down Expand Up @@ -236,62 +256,10 @@ ( $dist = $distribution )
);
}

1;

__END__

sub run {
###





my $bulk = $self->es->bulk_helper(
index => $self->index->name,
type => 'file'
);

my %to_purge;

while ( my ( $dist, $file_data ) = each %upgrade ) {

# Don't reindex if already marked as latest.
# This just means that it hasn't changed (query includes 'latest').
next if ( !$self->force and $file_data->{status} eq 'latest' );

$to_purge{ $file_data->{download_url} } = 1;

$self->reindex( $bulk, $file_data, 'latest' );
}

while ( my ( $release, $file_data ) = each %downgrade ) {

# Don't downgrade if this release version is also marked as latest.
# This could happen if a module is moved to a new dist
# but the old dist remains (with other packages).
# This could also include bug fixes in our indexer, PAUSE, etc.
next
if ( !$self->force
&& $upgrade{ $file_data->{distribution} }
&& $upgrade{ $file_data->{distribution} }->{release} eq
$file_data->{release} );

$to_purge{ $file_data->{download_url} } = 1;

$self->reindex( $bulk, $file_data, 'cpan' );
}
$bulk->flush;
$self->index->refresh;

# Call Fastly to purge
$self->purge_cpan_distnameinfos( [
map CPAN::DistnameInfo->new($_), keys %to_purge ] );
}

# Update the status for the release and all the files.
sub reindex {
my ( $self, $bulk, $source, $status ) = @_;
sub reindex ( $bulk, $source, $status ) {

=head2

# Update the status on the release.
my $release = $self->index->type('release')->get( {
Expand Down Expand Up @@ -327,6 +295,9 @@ sub reindex {
$bulk->update( { id => $row->{_id}, doc => { status => $status } } )
unless $dry_run;
}

=end

}

sub compare_dates {
Expand All @@ -339,11 +310,15 @@ sub compare_dates {
return $d1 > $d2;
}

1;

__END__

=head1 SYNOPSIS

# bin/metacpan latest
# bin/latest

# bin/metacpan latest --dry_run
# bin/latest --dry_run

=head1 DESCRIPTION

Expand Down
Loading