diff --git a/bin/backpan.pl b/bin/backpan.pl index 2188314..3eb8668 100644 --- a/bin/backpan.pl +++ b/bin/backpan.pl @@ -38,14 +38,17 @@ () log_info {"find_releases"}; my $scroll = $es_release->scroll( - fields => [qw< author archive name >], - body => get_release_query(), + body => { + %{ get_release_query() }, + size => 500, + _source => [qw< author archive name >], + }, ); while ( my $release = $scroll->next ) { - my $author = $release->{fields}{author}[0]; - my $archive = $release->{fields}{archive}[0]; - my $name = $release->{fields}{name}[0]; + my $author = $release->{_source}{author}; + my $archive = $release->{_source}{archive}; + my $name = $release->{_source}{name}; next unless $name; # bypass some broken releases $release_status{$author}{$name} = [ @@ -64,8 +67,10 @@ () unless ($undo) { return +{ query => { - not => { term => { status => 'backpan' } } - } + bool => { + must_not => [ { term => { status => 'backpan' } }, ], + }, + }, }; } @@ -118,23 +123,24 @@ ( $author, $author_releases ) my $scroll_file = $es_file->scroll( scroll => '5m', - fields => [qw< release >], body => { query => { bool => { must => [ { term => { author => $author } }, - { terms => { release => $author_releases } } - ] - } - } + { terms => { release => $author_releases } }, + ], + }, + }, + size => 500, + _source => [qw< release >], }, ); $bulk{file} ||= $es_file->bulk( timeout => '5m' ); while ( my $file = $scroll_file->next ) { - my $release = $file->{fields}{release}[0]; + my $release = $file->{_source}{release}; $bulk{file}->update( { id => $file->{_id}, doc => { diff --git a/bin/backup.pl b/bin/backup.pl index a4018b7..7fefd28 100644 --- a/bin/backup.pl +++ b/bin/backup.pl @@ -12,7 +12,7 @@ use Try::Tiny qw< catch try >; use MetaCPAN::ES; -use MetaCPAN::Ingest qw< home >; +use MetaCPAN::Ingest qw< home true >; # config @@ -82,7 +82,7 @@ () $bulk_store{$key} ||= $es->bulk( max_count => $batch_size ); my $bulk = $bulk_store{$key}; - my $parent = $raw->{fields}{_parent}; + my $parent = $raw->{_parent}; if ( $raw->{_type} eq 'author' ) { @@ -169,9 +169,12 @@ sub run_backup { ( $type ? ( type => $type ) : () ) ); my $scroll = $es->scroll( - size => $size, - fields => [qw< _parent _source >], scroll => '1m', + body => { + _source => true, + size => $size, + sort => '_doc', + }, ); log_info { 'Backing up ', $scroll->total, ' documents' }; diff --git a/bin/check.pl b/bin/check.pl index e805fcd..d0636fb 100644 --- a/bin/check.pl +++ b/bin/check.pl @@ -38,11 +38,7 @@ # look up this module in ElasticSearch and see what we have on it my $results = $es_file->search( - size => 100, # shouldn't get more than this - fields => [ - qw< name release author distribution version authorized indexed maturity date > - ], - query => { + query => { bool => { must => [ { term => { 'module.name' => $pkg } }, @@ -51,22 +47,38 @@ ], }, }, + size => 100, # shouldn't get more than this + _source => [ qw< + name + release + author + distribution + version + authorized + indexed + maturity + date + > ], + ); my @files = @{ $results->{hits}{hits} }; # now find the first latest releases for these files foreach my $file (@files) { my $release_results = $es_release->search( - size => 1, - fields => [qw< name status authorized version id date >], - query => { + query => { bool => { must => [ - { term => { name => $file->{fields}{release} } }, + { + term => + { name => $file->{_source}{release} } + }, { term => { status => 'latest' } }, ], }, }, + size => 1, + _source => [qw< name status authorized version id date >], ); push @releases, $release_results->{hits}{hits}[0] @@ -78,16 +90,20 @@ if ( !@releases ) { foreach my $file (@files) { my $release_results = $es_release->search( - size => 1, - fields => - [qw< name status authorized version id date >], - query => { + query => { bool => { must => [ - { term => { name => $file->{fields}{release} } }, + { + term => { + name => $file->{_source}{release} + } + }, ], }, }, + size => 1, + _source => + [qw< name status authorized version id date >], ); push @releases, @{ $release_results->{hits}{hits} }; @@ -97,22 +113,22 @@ # if we found the releases tell them about it if (@releases) { if ( @releases == 1 - and $releases[0]->{fields}{status} eq 'latest' ) + and $releases[0]->{_source}{status} eq 'latest' ) { log_info { - "Found latest release $releases[0]->{fields}{name} for $pkg" + "Found latest release $releases[0]->{_source}{name} for $pkg" } unless $errors_only; } else { log_error {"Could not find latest release for $pkg"}; foreach my $rel (@releases) { - log_warn {" Found release $rel->{fields}{name}"}; - log_warn {" STATUS : $rel->{fields}{status}"}; + log_warn {" Found release $rel->{_source}{name}"}; + log_warn {" STATUS : $rel->{_source}{status}"}; log_warn { - " AUTORIZED : $rel->{fields}{authorized}" + " AUTORIZED : $rel->{_source}{authorized}" }; - log_warn {" DATE : $rel->{fields}{date}"}; + log_warn {" DATE : $rel->{_source}{date}"}; } $error_count++; @@ -123,13 +139,13 @@ "Module $pkg doesn't have any releases in ElasticSearch!" }; foreach my $file (@files) { - log_warn {" Found file $file->{fields}{name}"}; - log_warn {" RELEASE : $file->{fields}{release}"}; - log_warn {" AUTHOR : $file->{fields}{author}"}; + log_warn {" Found file $file->{_source}{name}"}; + log_warn {" RELEASE : $file->{_source}{release}"}; + log_warn {" AUTHOR : $file->{_source}{author}"}; log_warn { - " AUTHORIZED : $file->{fields}{authorized}" + " AUTHORIZED : $file->{_source}{authorized}" }; - log_warn {" DATE : $file->{fields}{date}"}; + log_warn {" DATE : $file->{_source}{date}"}; } $error_count++; } diff --git a/bin/checksum.pl b/bin/checksum.pl index b661de6..efadaca 100644 --- a/bin/checksum.pl +++ b/bin/checksum.pl @@ -32,11 +32,11 @@ not => { exists => { field => "checksum_md5" - } - } - } + }, + }, + }, + _source => [qw< id name download_url >], }, - fields => [qw< id name download_url >], ); log_warn { "Found " . $scroll->total . " releases" }; @@ -50,11 +50,11 @@ last; } - log_info { "Adding checksums for " . $p->{fields}{name}[0] }; + log_info { "Adding checksums for " . $p->{_source}{name} }; - if ( my $download_url = $p->{fields}{download_url} ) { + if ( my $download_url = $p->{_source}{download_url} ) { my $file - = cpan_dir . "/authors" . $p->{fields}{download_url}[0] + = cpan_dir . "/authors" . $p->{_source}{download_url} =~ s/^.*authors//r; my $checksum_md5 = digest_file_hex( $file, 'MD5' ); my $checksum_sha256 = digest_file_hex( $file, 'SHA-256' ); @@ -75,7 +75,7 @@ } } else { - log_info { $p->{fields}{name}[0] . " is missing a download_url" }; + log_info { $p->{_source}{name} . " is missing a download_url" }; } } diff --git a/bin/cve.pl b/bin/cve.pl index cb4f251..62a72f8 100644 --- a/bin/cve.pl +++ b/bin/cve.pl @@ -134,32 +134,29 @@ if (@filters) { my $query = { - query => { - bool => { - must => [ - { term => { distribution => $dist } }, @filters, - ] - } - }, + bool => { + must => + [ { term => { distribution => $dist } }, @filters, ] + } }; my $releases = $es->search( - index => 'cpan', - type => 'release', - body => $query, - fields => [ "version", "name", "author", ], - size => 2000, + index => 'cpan', + type => 'release', + body => { + query => $query, + _source => [qw< version name author >], + size => 2000, + }, ); if ( $releases->{hits}{total} ) { ## no critic (ControlStructures::ProhibitMutatingListFunctions) @matches = map { $_->[0] } sort { $a->[1] <=> $b->[1] } - map { - my %fields = %{ $_->{fields} }; - ref $_ and $_ = $_->[0] for values %fields; - [ \%fields, numify_version( $fields{version} ) ]; - } @{ $releases->{hits}{hits} }; + map { [ $_->{_source}, + numify_version( $_->{_source}{version} ) ] } + @{ $releases->{hits}{hits} }; } else { log_debug { diff --git a/bin/favorite.pl b/bin/favorite.pl index 81132cb..792c8ef 100644 --- a/bin/favorite.pl +++ b/bin/favorite.pl @@ -6,7 +6,7 @@ use MetaCPAN::Logger qw< :log :dlog >; use MetaCPAN::ES; -use MetaCPAN::Ingest qw< minion >; +use MetaCPAN::Ingest qw< false minion >; # args my ( $age, $check_missing, $count, $distribution, $limit, $queue ); @@ -39,7 +39,7 @@ ### sub index_favorites () { - my $body; + my $query = { match_all => {} }; my $age_filter; if ($age) { @@ -48,37 +48,30 @@ () } if ($distribution) { - $body = { - query => { - term => { distribution => $distribution } - } - }; + $query = { term => { distribution => $distribution } }; } elsif ($age) { my $es = MetaCPAN::ES->new( type => "favorite" ); my $favs = $es->scroll( scroll => '5m', - fields => [qw< distribution >], body => { - query => $age_filter, - ( $limit ? ( size => $limit ) : () ) + query => $age_filter, + _source => [qw< distribution >], + size => $limit || 500, + sort => '_doc', } ); my %recent_dists; while ( my $fav = $favs->next ) { - my $dist = $fav->{fields}{distribution}[0]; + my $dist = $fav->{_source}{distribution}; $recent_dists{$dist}++ if $dist; } my @keys = keys %recent_dists; if (@keys) { - $body = { - query => { - terms => { distribution => \@keys } - } - }; + $query = { terms => { distribution => \@keys } }; } $es->index_refresh; } @@ -94,12 +87,16 @@ () my $es = MetaCPAN::ES->new( type => "favorite" ); my $favs = $es->scroll( scroll => '30s', - fields => [qw< distribution >], - ( $body ? ( body => $body ) : () ), + body => { + query => $query, + _source => [qw< distribution >], + size => 500, + sort => '_doc', + }, ); while ( my $fav = $favs->next ) { - my $dist = $fav->{fields}{distribution}[0]; + my $dist = $fav->{_source}{distribution}; $dist_fav_count{$dist}++ if $dist; } @@ -119,8 +116,6 @@ () my $es = MetaCPAN::ES->new( type => "file" ); my $files = $es->scroll( scroll => '15m', - fields => [qw< id distribution >], - size => 500, body => { query => { bool => { @@ -128,13 +123,17 @@ () { range => { dist_fav_count => { gte => 1 } } } ], @age_filter, - } - } + }, + }, + _source => [qw< id distribution >], + size => 500, + sort => '_doc', + }, ); while ( my $file = $files->next ) { - my $dist = $file->{fields}{distribution}[0]; + my $dist = $file->{_source}{distribution}; next unless $dist; next if exists $missing{$dist} or exists $dist_fav_count{$dist}; @@ -189,20 +188,22 @@ () my $bulk = $es->bulk( timeout => '120m' ); my $files = $es->scroll( scroll => '15s', - fields => [qw< id >], body => { - query => { term => { distribution => $dist } } + query => { term => { distribution => $dist } } _source => + false, + size => 500, + sort => '_doc', }, ); while ( my $file = $files->next ) { - my $id = $file->{fields}{id}[0]; + my $id = $file->{_id}; my $cnt = $dist_fav_count{$dist}; log_debug {"Updating file id $id with fav_count $cnt"}; $bulk->update( { - id => $file->{fields}{id}[0], + id => $file->{_id}; doc => { dist_fav_count => $cnt }, } ); } diff --git a/bin/latest.pl b/bin/latest.pl index d161a93..41ebc3d 100644 --- a/bin/latest.pl +++ b/bin/latest.pl @@ -238,7 +238,8 @@ ($filter) ] } }, - }, + }, + ; } sub _queue_latest ( $dist = $distribution ) { @@ -259,21 +260,22 @@ ( $es, $author, $name ) { term => { author => $author } }, { term => { name => $name } }, ] - } - } + }, + size => 500, + _source => [qw< id name >], + }, }, - fields => [qw< id name >], ); return {} unless is_arrayref( $release->{hits}{hits} ) && is_hashref( $release->{hits}{hits}[0] ); - my $fields = $release->{hits}{hits}[0]{fields}; + my $source = $release->{hits}{hits}[0]{_source}; return +{ - id => $fields->{id}, - name => $fields->{name}[0], + id => $source->{_id}, + name => $source->{name}, }; } @@ -324,14 +326,14 @@ ( $bulk, $source, $status ) bool => { must => [ { - term => - { 'release' => $source->{release} } + term => { 'release' => $source->{release} } }, { term => { 'author' => $source->{author} } }, ], }, + size => 500, + _source => [qw< name >], }, - fields => [qw< name >], }, ); @@ -339,7 +341,7 @@ ( $bulk, $source, $status ) log_trace { sprintf( '%s file %s', ( $status eq 'latest' ? 'Upgrading' : 'Downgrading' ), - $row->{fields}{name}[0] ) + $row->{_source}{name} ) }; # Use bulk update to overwrite the status for X files at a time. diff --git a/bin/release.pl b/bin/release.pl index 4be1776..88e56f5 100644 --- a/bin/release.pl +++ b/bin/release.pl @@ -426,14 +426,17 @@ ($document) my $count = $es->search( search_type => 'count', body => { - query => { + query => { bool => { must => [ - { term => { distribution => $document->{distribution} } }, + { + term => + { distribution => $document->{distribution} } + }, { range => { version_numified => - { 'lt' => $document->{version_numified} } + { 'lt' => $document->{version_numified} } }, }, ], diff --git a/bin/suggest.pl b/bin/suggest.pl index 14a37e6..9cccedb 100644 --- a/bin/suggest.pl +++ b/bin/suggest.pl @@ -64,7 +64,6 @@ ($range) my $files = $es->scroll( scroll => '5m', - fields => [qw< id documentation >], body => { query => { bool => { @@ -72,25 +71,27 @@ ($range) { exists => { field => "documentation" } }, $range ], } - } + }, + _source => [qw< id documentation >], + size => 500, + sort => '_doc', }, ); my $bulk = $es->bulk( timeout => '5m' ); while ( my $file = $files->next ) { - my $documentation = $file->{fields}{documentation}[0]; + my $documentation = $file->{_source}{documentation}; my $weight = 1000 - length($documentation); $weight = 0 if $weight < 0; $bulk->update( { - id => $file->{fields}{id}[0], + id => $file->{_id}, doc => { suggest => { - input => [$documentation], - payload => { doc_name => $documentation }, - weight => $weight, - } + input => [$documentation], + weight => $weight, + }, }, } ); } diff --git a/bin/tickets.pl b/bin/tickets.pl index 8542ba1..5da794b 100644 --- a/bin/tickets.pl +++ b/bin/tickets.pl @@ -45,18 +45,21 @@ sub check_all_distributions () { my $es_release = MetaCPAN::ES->new( type => "release" ); my $scroll_release = $es_release->scroll( - fields => ['distribution'], - body => { + body => { query => { - not => { term => { status => 'backpan' } }, - } + bool => { + must_not => [ { term => { status => 'backpan' } }, ], + }, + }, + size => 500, + _source => [qw< distribution >], }, ); my %dists; while ( my $release = $scroll_release->next ) { - my $d = $release->{'fields'}{'distribution'}[0]; + my $d = $release->{_source}{distribution}; $d or next; log_debug { sprintf( "Adding missing distribution record: %s", $d ) }; @@ -133,13 +136,13 @@ () { prefix => { "resources.bugtracker.web" => - 'http://github.com/' + 'http://github.com/' }, }, { prefix => { "resources.bugtracker.web" => - 'https://github.com/' + 'https://github.com/' }, }, ], diff --git a/bin/watcher.pl b/bin/watcher.pl index 14c023a..f678bfb 100644 --- a/bin/watcher.pl +++ b/bin/watcher.pl @@ -14,6 +14,7 @@ use MetaCPAN::Ingest qw< cpan_dir read_recent_segment + true >; # args @@ -93,23 +94,21 @@ () sub backpan_changes () { my $scroll_release = $es_release->scroll( - size => 1000, scroll => '1m', - fields => [qw< author archive >], body => { query => { bool => { - must_not => [ - { term => { status => 'backpan' } } - ], + must_not => [ { term => { status => 'backpan' } } ], }, }, + size => 1000, + _source => [qw< author archive >], }, ); my @changes; while ( my $release = $scroll_release->next ) { - my $data = $release->{fields}; + my $data = $release->{_source}; my $path = $cpan->child( 'authors', MetaCPAN::Util::author_dir( $data->{author} ), @@ -165,7 +164,7 @@ ($info) my $scroll_release = $es_release->scroll( scroll => '1m', body => { - query => { + query => { bool => { must => [ { term => { author => $info->cpanid } }, @@ -187,8 +186,6 @@ ($release) my $scroll_file = $es_file->scroll( { scroll => '1m', - size => 1000, - fields => [qw< _parent _source >], body => { query => { bool => { @@ -206,6 +203,9 @@ ($release) ], }, }, + size => 1000, + _source => true, + sort => '_doc', }, } ); return if $dry_run; @@ -218,11 +218,7 @@ ($release) $bulk_file->index( { id => $row->{_id}, source => { - $row->{fields}{_parent} - ? ( parent => $row->{fields}{_parent} ) - : (), - %$source, - status => 'backpan', + %$source, status => 'backpan', } } ); } diff --git a/lib/MetaCPAN/ES.pm b/lib/MetaCPAN/ES.pm index 6777a7a..67b51f1 100644 --- a/lib/MetaCPAN/ES.pm +++ b/lib/MetaCPAN/ES.pm @@ -66,16 +66,14 @@ sub get ( $self, %args ) { sub search ( $self, %args ) { my $body = $args{body} or die "Missing body\n"; - my $index = $args{index} // $self->{index}; - my $type = $args{type} // $self->{type}; - my @fields = ( $args{fields} ? ( fields => $args{fields} ) : () ); - my @size = ( $args{size} ? ( size => $args{size} ) : () ); + my $index = $args{index} // $self->{index}; + my $type = $args{type} // $self->{type}; + my @size = ( $args{size} ? ( size => $args{size} ) : () ); return $self->{es}->search( index => $index, type => $type, body => $body, - @fields, @size, ); } @@ -95,11 +93,9 @@ sub scroll ( $self, %args ) { return $self->{es}->scroll_helper( index => $self->{index}, type => $self->{type}, - size => ( $args{size} // 500 ), body => ( $args{body} // { query => { match_all => {} } } ), search_type => 'scan', scroll => ( $args{scroll} // '30m' ), - ( $args{fields} ? ( fields => $args{fields} ) : () ), ); }