Skip to content

Commit 92c0858

Browse files
authored
Merge pull request #59 from metacpan/mickey/contributor
contributor - recreate following changes in metacpan-api script
2 parents ab9f341 + e6bb1b2 commit 92c0858

File tree

3 files changed

+277
-51
lines changed

3 files changed

+277
-51
lines changed

bin/contributor.pl

Lines changed: 263 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -2,73 +2,285 @@
22
use warnings;
33
use v5.36;
44

5+
use MetaCPAN::Logger qw< :log :dlog >;
6+
use Ref::Util qw< is_arrayref >;
7+
58
use Getopt::Long;
6-
use Ref::Util qw< is_arrayref >;
79

8-
use MetaCPAN::Logger qw< :log :dlog >;
910
use MetaCPAN::ES;
10-
use MetaCPAN::Contributor qw<
11-
get_cpan_author_contributors
12-
update_release_contirbutors
13-
>;
11+
use MetaCPAN::Ingest qw< false >;
1412

1513
# args
16-
my $all = 0;
17-
my ( $distribution, $release, $age );
14+
my ( $age, $all, $distribution, $release );
1815
GetOptions(
16+
"age=i" => \$age,
1917
"all" => \$all,
2018
"distribution=s" => \$distribution,
2119
"release=s" => \$release,
22-
"age=i" => \$age,
23-
);
2420

25-
# Setup
26-
my $query
27-
= $all ? { match_all => {} }
28-
: $distribution ? { term => { distribution => $distribution } }
29-
: $release ? {
30-
bool => {
31-
must => [
32-
{ term => { author => get_author($release) } },
33-
{ term => { name => $release } },
34-
]
35-
}
36-
}
37-
: $age ? { range => { date => { gte => sprintf( 'now-%dd', $age ) } } }
38-
: die "Error: must provide 'all' or 'distribution' or 'release' or 'age'";
39-
40-
my $body = { query => $query };
41-
my $timeout = $all ? '720m' : '5m';
42-
my $fields = [qw< author distribution name >];
43-
44-
my $es_release = MetaCPAN::ES->new( type => "release" );
45-
my $scroll = $es_release->scroll(
46-
body => $body,
47-
scroll => $timeout,
48-
fields => $fields,
4921
);
5022

51-
while ( my $r = $scroll->next ) {
52-
my $contrib_data = get_cpan_author_contributors(
53-
$r->{fields}{author}[0],
54-
$r->{fields}{name}[0],
55-
$r->{fields}{distribution}[0],
56-
);
57-
next unless is_arrayref($contrib_data);
58-
log_debug { 'adding release ' . $r->{fields}{name}[0] };
23+
# setup
24+
my $author_mapping = {};
25+
my $email_mapping = {};
5926

60-
update_release_contirbutors( $_, $timeout ) for @$contrib_data;
61-
}
27+
my $es_author = MetaCPAN::ES->new( type => 'author' );
28+
my $es_release = MetaCPAN::ES->new( type => "release" );
29+
my $es_contributor = MetaCPAN::ES->new( type => "contributor" );
30+
31+
run();
32+
33+
log_info {"done"};
6234

6335
###
6436

65-
sub get_author ($release) {
37+
sub author_release () {
6638
return unless $release;
67-
my $author = $release =~ s{/.*$}{}r;
68-
$author
39+
my ( $author, $release ) = split m{/}, $release;
40+
$author && $release
6941
or die
7042
"Error: invalid 'release' argument (format: PAUSEID/DISTRIBUTION-VERSION)";
71-
return $author;
43+
return +{
44+
author => $author,
45+
release => $release,
46+
};
47+
}
48+
49+
sub run () {
50+
my $query
51+
= $all ? query_all()
52+
: $distribution ? query_distribution()
53+
: $release ? query_release()
54+
: $age ? query_age()
55+
: return;
56+
57+
update_contributors($query);
58+
}
59+
60+
sub query_all () {
61+
return { match_all => {} };
62+
}
63+
64+
sub query_age () {
65+
return { range => { date => { gte => sprintf( 'now-%dd', $age ) } } };
66+
}
67+
68+
sub query_distribution () {
69+
return { term => { distribution => $distribution } };
70+
}
71+
72+
sub query_release () {
73+
my $author_release = author_release();
74+
return {
75+
bool => {
76+
must => [
77+
{ term => { author => $author_release->{author} } },
78+
{ term => { name => $author_release->{release} } },
79+
]
80+
}
81+
};
82+
}
83+
84+
sub update_contributors ($query) {
85+
my $scroll_release = $es_release->scroll(
86+
body => {
87+
query => $query,
88+
sort => ['_doc'],
89+
_source => [ qw<
90+
name
91+
author
92+
distribution
93+
metadata.author
94+
metadata.x_contributors
95+
> ],
96+
},
97+
);
98+
99+
my $report = sub {
100+
my ( $action, $result, $i ) = @_;
101+
if ( $i == 0 ) {
102+
log_info {'flushing contributor updates'};
103+
}
104+
};
105+
106+
my $bulk_contributor = $es_contributor->bulk(
107+
on_success => $report,
108+
on_error => $report,
109+
);
110+
111+
my $total = $scroll_release->total;
112+
log_info {"updating contributors for $total releases"};
113+
114+
my $i = 0;
115+
while ( my $release = $scroll_release->next ) {
116+
$i++;
117+
my $source = $release->{_source};
118+
my $name = $source->{name};
119+
if ( !( $name && $source->{author} && $source->{distribution} ) ) {
120+
Dlog_warn {"found broken release: $_"} $release;
121+
next;
122+
}
123+
log_debug {"updating contributors for $name ($i/$total)"};
124+
my $actions = release_contributor_update_actions( $release->{_source},
125+
$es_contributor );
126+
for my $action (@$actions) {
127+
$bulk_contributor->add_action(%$action);
128+
}
129+
}
130+
131+
$bulk_contributor->flush;
132+
}
133+
134+
sub release_contributor_update_actions ( $release, $es_contributor ) {
135+
my @actions;
136+
137+
my $res = $es_contributor->search(
138+
body => {
139+
query => {
140+
bool => {
141+
must => [
142+
{ term => { release_name => $release->{name} } },
143+
{ term => { release_author => $release->{author} } },
144+
],
145+
}
146+
},
147+
sort => ['_doc'],
148+
size => 500,
149+
_source => false,
150+
},
151+
);
152+
my @ids = map $_->{_id}, @{ $res->{hits}{hits} };
153+
push @actions, map +{ delete => { id => $_ } }, @ids;
154+
155+
my $contribs = get_contributors($release);
156+
my @docs = map {
157+
;
158+
my $contrib = $_;
159+
{
160+
release_name => $release->{name},
161+
release_author => $release->{author},
162+
distribution => $release->{distribution},
163+
map +( defined $contrib->{$_} ? ( $_ => $contrib->{$_} ) : () ),
164+
qw(pauseid name email)
165+
};
166+
} @$contribs;
167+
push @actions, map +{ create => { _source => $_ } }, @docs;
168+
return \@actions;
169+
}
170+
171+
sub get_contributors ($release) {
172+
my $author_name = $release->{author};
173+
my $contribs = $release->{metadata}{x_contributors} || [];
174+
my $authors = $release->{metadata}{author} || [];
175+
176+
for ( \( $contribs, $authors ) ) {
177+
178+
# If a sole contributor is a string upgrade it to an array...
179+
$$_ = [$$_]
180+
if !ref $$_;
181+
182+
# but if it's any other kind of value don't die trying to parse it.
183+
$$_ = []
184+
unless Ref::Util::is_arrayref($$_);
185+
}
186+
$authors = [ grep { $_ ne 'unknown' } @$authors ];
187+
188+
my $author_email = $author_mapping->{$author_name}
189+
//= eval { $es_author->get_source( id => $author_name )->{email}; }
190+
or return [];
191+
192+
my $author_info = {
193+
email => [
194+
lc "$author_name\@cpan.org",
195+
(
196+
Ref::Util::is_arrayref($author_email)
197+
? @{$author_email}
198+
: $author_email
199+
),
200+
],
201+
name => $author_name,
202+
};
203+
my %seen = map { $_ => $author_info }
204+
( @{ $author_info->{email} }, $author_info->{name}, );
205+
206+
my @contribs = map {
207+
my $name = $_;
208+
my $email;
209+
if ( $name =~ s/\s*<([^<>]+@[^<>]+)>// ) {
210+
$email = $1;
211+
}
212+
my $info;
213+
my $dupe;
214+
if ( $email and $info = $seen{$email} ) {
215+
$dupe = 1;
216+
}
217+
elsif ( $info = $seen{$name} ) {
218+
$dupe = 1;
219+
}
220+
else {
221+
$info = {
222+
name => $name,
223+
email => [],
224+
};
225+
}
226+
$seen{$name} ||= $info;
227+
if ($email) {
228+
push @{ $info->{email} }, $email
229+
unless grep { $_ eq $email } @{ $info->{email} };
230+
$seen{$email} ||= $info;
231+
}
232+
$dupe ? () : $info;
233+
} ( @$authors, @$contribs );
234+
235+
my %want_email;
236+
for my $contrib (@contribs) {
237+
238+
# heuristic to autofill pause accounts
239+
if ( !$contrib->{pauseid} ) {
240+
my ($pauseid)
241+
= map { /^(.*)\@cpan\.org$/ ? $1 : () }
242+
@{ $contrib->{email} };
243+
$contrib->{pauseid} = uc $pauseid
244+
if $pauseid;
245+
246+
}
247+
248+
push @{ $want_email{$_} }, $contrib for @{ $contrib->{email} };
249+
}
250+
251+
if (%want_email) {
252+
my @fetch_email = grep !exists $email_mapping->{$_},
253+
sort keys %want_email;
254+
255+
if (@fetch_email) {
256+
my $check_author = $es_author->search(
257+
body => {
258+
query => { terms => { email => \@fetch_email } },
259+
_source => [ 'email', 'pauseid' ],
260+
size => 100,
261+
},
262+
);
263+
264+
for my $author ( @{ $check_author->{hits}{hits} } ) {
265+
my $pauseid = uc $author->{_source}{pauseid};
266+
my $emails = $author->{_source}{email};
267+
$email_mapping->{$_} //= $pauseid
268+
for ref $emails ? @$emails : $emails;
269+
}
270+
271+
$email_mapping->{$_} //= undef for @fetch_email;
272+
}
273+
274+
for my $email ( keys %want_email ) {
275+
my $pauseid = $email_mapping->{$email}
276+
or next;
277+
for my $contrib ( @{ $want_email{$email} } ) {
278+
$contrib->{pauseid} = $pauseid;
279+
}
280+
}
281+
}
282+
283+
return \@contribs;
72284
}
73285

74286
1;
@@ -77,9 +289,9 @@ ($release)
77289
78290
=head1 SYNOPSIS
79291
80-
# bin/contributor.pl --all
81-
# bin/contributor.pl --distribution Moose
82-
# bin/contributor.pl --release ETHER/Moose-2.1806
292+
# bin/contributor --all
293+
# bin/contributor --distribution Moose
294+
# bin/contributor --release ETHER/Moose-2.1806
83295
84296
=head1 DESCRIPTION
85297

lib/MetaCPAN/ES.pm

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,8 @@ sub bulk ( $self, %args ) {
8686
type => $self->{type},
8787
max_count => ( $args{max_count} // 250 ),
8888
timeout => ( $args{timeout} // '25m' ),
89+
( $args{on_success} ? ( on_success => $args{on_success} ) : () ),
90+
( $args{on_error} ? ( on_error => $args{on_error} ) : () ),
8991
);
9092
}
9193

@@ -126,6 +128,10 @@ sub get_ids ( $self, %args ) {
126128
return \@ids;
127129
}
128130

131+
sub get_source ( $self, $id ) {
132+
return $self->{es}->get_source($id);
133+
}
134+
129135
sub delete_ids ( $self, $ids ) {
130136
my $bulk = $self->bulk;
131137

0 commit comments

Comments
 (0)