diff --git a/lib/fluent/plugin/out_grepcounter.rb b/lib/fluent/plugin/out_grepcounter.rb index f74ab91..4caef8e 100644 --- a/lib/fluent/plugin/out_grepcounter.rb +++ b/lib/fluent/plugin/out_grepcounter.rb @@ -1,6 +1,10 @@ # encoding: UTF-8 -class Fluent::GrepCounterOutput < Fluent::Output +require 'pathname' +require 'fluent/plugin/output' + +class Fluent::Plugin::GrepCounterOutput < Fluent::Plugin::Output Fluent::Plugin.register_output('grepcounter', self) + helpers :event_emitter # To support log_level option implemented by Fluentd v0.10.43 unless method_defined?(:log) @@ -14,11 +18,6 @@ class Fluent::GrepCounterOutput < Fluent::Output REGEXP_MAX_NUM = 20 - def initialize - super - require 'pathname' - end - config_param :input_key, :string, :default => nil, :desc => <<-DESC The target field key to grep out. @@ -177,7 +176,7 @@ def shutdown end # Called when new line comes. This method actually does not emit - def emit(tag, es, chain) + def process(tag, es) count = 0; matches = [] # filter out and insert es.each do |time,record| @@ -208,8 +207,6 @@ def emit(tag, es, chain) @counts[aggregate_key] += count @matches[aggregate_key] += matches end - - chain.next rescue => e log.warn "grepcounter: #{e.class} #{e.message} #{e.backtrace.first}" end diff --git a/spec/out_grepcounter_spec.rb b/spec/out_grepcounter_spec.rb index b54c1f7..8a43681 100644 --- a/spec/out_grepcounter_spec.rb +++ b/spec/out_grepcounter_spec.rb @@ -1,10 +1,11 @@ # encoding: UTF-8 require_relative 'spec_helper' +require 'fluent/test/driver/output' -class Fluent::Test::OutputTestDriver - def emit_with_tag(record, time=Time.now, tag = nil) - @tag = tag if tag - emit(record, time) +module Fluent::Test::Driver::EventFeeder + def emit_with_tag(record, time=Fluent::Engine.now, tag = nil) + @default_tag = tag if tag + feed(@default_tag, time, record) end end @@ -14,13 +15,13 @@ def delete!(key) end end -describe Fluent::GrepCounterOutput do +describe Fluent::Plugin::GrepCounterOutput do before { Fluent::Test.setup } CONFIG = %[ input_key message ] let(:tag) { 'syslog.host1' } - let(:driver) { Fluent::Test::OutputTestDriver.new(Fluent::GrepCounterOutput, tag).configure(config) } + let(:driver) { Fluent::Test::Driver::Output.new(Fluent::Plugin::GrepCounterOutput).configure(config) } describe 'test configure' do describe 'bad configuration' do @@ -63,7 +64,7 @@ def delete!(key) end describe 'test emit' do - let(:time) { Time.now.to_i } + let(:time) { Fluent::Engine.now } let(:messages) do [ "2013/01/13T07:02:11.124202 INFO GET /ping", @@ -73,8 +74,10 @@ def delete!(key) ] end let(:emit) do - driver.run { messages.each {|message| driver.emit({'message' => message}, time) } } - driver.instance.flush_emit(0) + driver.run(default_tag: tag) { + messages.each {|message| driver.feed(time, {'message' => message}) } + driver.instance.flush_emit(0) + } end let(:expected) do { @@ -430,9 +433,11 @@ def delete!(key) context 'aggregate all' do let(:messages) { ['foobar', 'foobar'] } let(:emit) do - driver.run { messages.each {|message| driver.emit_with_tag({'message' => message}, time, 'foo.bar') } } - driver.run { messages.each {|message| driver.emit_with_tag({'message' => message}, time, 'foo.bar2') } } - driver.instance.flush_emit(0) + driver.run { + messages.each {|message| driver.emit_with_tag({'message' => message}, time, 'foo.bar') } + messages.each {|message| driver.emit_with_tag({'message' => message}, time, 'foo.bar2') } + driver.instance.flush_emit(0) + } end let(:expected) do { @@ -452,9 +457,11 @@ def delete!(key) context 'aggregate in_tag' do let(:messages) { ['foobar', 'foobar'] } let(:emit) do - driver.run { messages.each {|message| driver.emit_with_tag({'message' => message}, time, 'foo.bar') } } - driver.run { messages.each {|message| driver.emit_with_tag({'message' => message}, time, 'foo.bar2') } } - driver.instance.flush_emit(0) + driver.run { + messages.each {|message| driver.emit_with_tag({'message' => message}, time, 'foo.bar') } + messages.each {|message| driver.emit_with_tag({'message' => message}, time, 'foo.bar2') } + driver.instance.flush_emit(0) + } end let(:config) { CONFIG + %[aggregate tag \n remove_tag_slice 0..-2] } @@ -473,9 +480,11 @@ def delete!(key) context 'aggregate out_tag' do let(:messages) { ['foobar', 'foobar'] } let(:emit) do - driver.run { messages.each {|message| driver.emit_with_tag({'message' => message}, time, 'foo.bar') } } - driver.run { messages.each {|message| driver.emit_with_tag({'message' => message}, time, 'foo.bar2') } } - driver.instance.flush_emit(0) + driver.run { + messages.each {|message| driver.emit_with_tag({'message' => message}, time, 'foo.bar') } + messages.each {|message| driver.emit_with_tag({'message' => message}, time, 'foo.bar2') } + driver.instance.flush_emit(0) + } end let(:config) { CONFIG + %[aggregate out_tag \n remove_tag_slice 0..-2] } @@ -509,7 +518,7 @@ def delete!(key) let(:config) { CONFIG + %[store_file #{store_file}] } it 'stored_data and loaded_data should equal' do - driver.run { messages.each {|message| driver.emit({'message' => message}, time) } } + driver.run(default_tag: tag) { messages.each {|message| driver.feed(time, {'message' => message}) } } driver.instance.shutdown stored_counts = driver.instance.counts stored_matches = driver.instance.matches