Skip to content

[WIP] Migrate to v0.14 API #18

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 6 additions & 9 deletions lib/fluent/plugin/out_grepcounter.rb
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
# encoding: UTF-8
class Fluent::GrepCounterOutput < Fluent::Output
require 'pathname'
require 'fluent/plugin/output'

class Fluent::Plugin::GrepCounterOutput < Fluent::Plugin::Output
Fluent::Plugin.register_output('grepcounter', self)
helpers :event_emitter

# To support log_level option implemented by Fluentd v0.10.43
unless method_defined?(:log)
Expand All @@ -14,11 +18,6 @@ class Fluent::GrepCounterOutput < Fluent::Output

REGEXP_MAX_NUM = 20

def initialize
super
require 'pathname'
end

config_param :input_key, :string, :default => nil,
:desc => <<-DESC
The target field key to grep out.
Expand Down Expand Up @@ -177,7 +176,7 @@ def shutdown
end

# Called when new line comes. This method actually does not emit
def emit(tag, es, chain)
def process(tag, es)
count = 0; matches = []
# filter out and insert
es.each do |time,record|
Expand Down Expand Up @@ -208,8 +207,6 @@ def emit(tag, es, chain)
@counts[aggregate_key] += count
@matches[aggregate_key] += matches
end

chain.next
rescue => e
log.warn "grepcounter: #{e.class} #{e.message} #{e.backtrace.first}"
end
Expand Down
47 changes: 28 additions & 19 deletions spec/out_grepcounter_spec.rb
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
# encoding: UTF-8
require_relative 'spec_helper'
require 'fluent/test/driver/output'

class Fluent::Test::OutputTestDriver
def emit_with_tag(record, time=Time.now, tag = nil)
@tag = tag if tag
emit(record, time)
module Fluent::Test::Driver::EventFeeder
def emit_with_tag(record, time=Fluent::Engine.now, tag = nil)
@default_tag = tag if tag
feed(@default_tag, time, record)
end
end

Expand All @@ -14,13 +15,13 @@ def delete!(key)
end
end

describe Fluent::GrepCounterOutput do
describe Fluent::Plugin::GrepCounterOutput do
before { Fluent::Test.setup }
CONFIG = %[
input_key message
]
let(:tag) { 'syslog.host1' }
let(:driver) { Fluent::Test::OutputTestDriver.new(Fluent::GrepCounterOutput, tag).configure(config) }
let(:driver) { Fluent::Test::Driver::Output.new(Fluent::Plugin::GrepCounterOutput).configure(config) }

describe 'test configure' do
describe 'bad configuration' do
Expand Down Expand Up @@ -63,7 +64,7 @@ def delete!(key)
end

describe 'test emit' do
let(:time) { Time.now.to_i }
let(:time) { Fluent::Engine.now }
let(:messages) do
[
"2013/01/13T07:02:11.124202 INFO GET /ping",
Expand All @@ -73,8 +74,10 @@ def delete!(key)
]
end
let(:emit) do
driver.run { messages.each {|message| driver.emit({'message' => message}, time) } }
driver.instance.flush_emit(0)
driver.run(default_tag: tag) {
messages.each {|message| driver.feed(time, {'message' => message}) }
driver.instance.flush_emit(0)
}
end
let(:expected) do
{
Expand Down Expand Up @@ -430,9 +433,11 @@ def delete!(key)
context 'aggregate all' do
let(:messages) { ['foobar', 'foobar'] }
let(:emit) do
driver.run { messages.each {|message| driver.emit_with_tag({'message' => message}, time, 'foo.bar') } }
driver.run { messages.each {|message| driver.emit_with_tag({'message' => message}, time, 'foo.bar2') } }
driver.instance.flush_emit(0)
driver.run {
messages.each {|message| driver.emit_with_tag({'message' => message}, time, 'foo.bar') }
messages.each {|message| driver.emit_with_tag({'message' => message}, time, 'foo.bar2') }
driver.instance.flush_emit(0)
}
end
let(:expected) do
{
Expand All @@ -452,9 +457,11 @@ def delete!(key)
context 'aggregate in_tag' do
let(:messages) { ['foobar', 'foobar'] }
let(:emit) do
driver.run { messages.each {|message| driver.emit_with_tag({'message' => message}, time, 'foo.bar') } }
driver.run { messages.each {|message| driver.emit_with_tag({'message' => message}, time, 'foo.bar2') } }
driver.instance.flush_emit(0)
driver.run {
messages.each {|message| driver.emit_with_tag({'message' => message}, time, 'foo.bar') }
messages.each {|message| driver.emit_with_tag({'message' => message}, time, 'foo.bar2') }
driver.instance.flush_emit(0)
}
end

let(:config) { CONFIG + %[aggregate tag \n remove_tag_slice 0..-2] }
Expand All @@ -473,9 +480,11 @@ def delete!(key)
context 'aggregate out_tag' do
let(:messages) { ['foobar', 'foobar'] }
let(:emit) do
driver.run { messages.each {|message| driver.emit_with_tag({'message' => message}, time, 'foo.bar') } }
driver.run { messages.each {|message| driver.emit_with_tag({'message' => message}, time, 'foo.bar2') } }
driver.instance.flush_emit(0)
driver.run {
messages.each {|message| driver.emit_with_tag({'message' => message}, time, 'foo.bar') }
messages.each {|message| driver.emit_with_tag({'message' => message}, time, 'foo.bar2') }
driver.instance.flush_emit(0)
}
end

let(:config) { CONFIG + %[aggregate out_tag \n remove_tag_slice 0..-2] }
Expand Down Expand Up @@ -509,7 +518,7 @@ def delete!(key)
let(:config) { CONFIG + %[store_file #{store_file}] }

it 'stored_data and loaded_data should equal' do
driver.run { messages.each {|message| driver.emit({'message' => message}, time) } }
driver.run(default_tag: tag) { messages.each {|message| driver.feed(time, {'message' => message}) } }
driver.instance.shutdown
stored_counts = driver.instance.counts
stored_matches = driver.instance.matches
Expand Down