@@ -19,6 +19,8 @@ limitations under the License.
1919#include < string>
2020#include < vector>
2121
22+ #include < grpc++/grpc++.h>
23+
2224#include " snap/config.h"
2325#include " snap/metric.h"
2426#include " snap/flags.h"
@@ -29,14 +31,15 @@ namespace Plugin {
2931 class CollectorInterface ;
3032 class ProcessorInterface ;
3133 class PublisherInterface ;
32-
34+ class StreamCollectorInterface ;
3335 /* *
3436 * Type is the plugin type
3537 */
3638 enum Type {
3739 Collector,
3840 Processor,
39- Publisher
41+ Publisher,
42+ StreamCollector
4043 };
4144
4245 /* *
@@ -78,8 +81,7 @@ namespace Plugin {
7881 */
7982 class Meta final {
8083 public:
81- Meta (Type type, std::string name, int version);
82- Meta (Type type, std::string name, int version, Flags *flags);
84+ Meta (Type type, std::string name, int version, RpcType rpc_type = GRPC);
8385
8486 Type type;
8587 std::string name;
@@ -176,20 +178,6 @@ namespace Plugin {
176178 */
177179 int stand_alone_port;
178180
179- /* *
180- * sets the maximum duration (always greater than 0s) between collections
181- * before metrics are sent. Defaults to 10s what means that after 10 seconds
182- * no new metrics are received, the plugin should send whatever data it has
183- * in the buffer instead of waiting longer. (e.g. 5s)
184- */
185- std::chrono::seconds max_collect_duration;
186-
187- /* *
188- * maximum number of metrics the plugin is buffering before sending metrics.
189- * Defaults to zero what means send metrics immediately
190- */
191- int64_t max_metrics_buffer;
192-
193181 /* *
194182 * use_cli_args updates plugin meta using arguments from cli
195183 */
@@ -218,6 +206,7 @@ namespace Plugin {
218206 virtual CollectorInterface* IsCollector ();
219207 virtual ProcessorInterface* IsProcessor ();
220208 virtual PublisherInterface* IsPublisher ();
209+ virtual StreamCollectorInterface* IsStreamCollector ();
221210
222211 virtual const ConfigPolicy get_config_policy () = 0;
223212 protected:
@@ -366,6 +355,80 @@ namespace Plugin {
366355
367356
368357
358+ /* *
359+ * The interface for a stream collector plugin.
360+ * A Stream Collector is the source.
361+ * It is responsible for streaming metrics in the Snap pipeline.
362+ */
363+ class StreamCollectorInterface : public PluginInterface {
364+ public:
365+ Type GetType () const final ;
366+ StreamCollectorInterface* IsStreamCollector () final ;
367+
368+ void SetMaxCollectDuration (std::chrono::seconds maxCollectDuration) {
369+ _max_collect_duration = maxCollectDuration;
370+ }
371+ void SetMaxCollectDuration (int64_t maxCollectDuration) {
372+ _max_collect_duration = std::chrono::seconds (maxCollectDuration);
373+ }
374+ std::chrono::seconds GetMaxCollectDuration () {
375+ return _max_collect_duration;
376+ }
377+
378+ void SetMaxMetricsBuffer (int64_t maxMetricsBuffer) {
379+ _max_metrics_buffer = maxMetricsBuffer;
380+ }
381+ int64_t GetMaxMetricsBuffer () {
382+ return _max_metrics_buffer;
383+ }
384+
385+ /*
386+ * (inherited from PluginInterface)
387+ */
388+ virtual const ConfigPolicy get_config_policy () = 0;
389+
390+ virtual std::vector<Metric> get_metric_types (Config cfg) = 0;
391+
392+ /* StreamMetrics allows the plugin to send/receive metrics on a channel
393+ * Arguments are (in order):
394+ *
395+ * A channel for metrics into the plugin from Snap -- which
396+ * are the metric types snap is requesting the plugin to collect.
397+ *
398+ * A channel for metrics from the plugin to Snap -- the actual
399+ * collected metrics from the plugin.
400+ *
401+ * A channel for error strings that the library will report to snap
402+ * as task errors.
403+ */
404+ virtual void stream_metrics () = 0;
405+
406+ virtual std::vector<Plugin::Metric> put_metrics_out () = 0;
407+ virtual std::string put_err_msg () = 0;
408+ virtual void get_metrics_in (std::vector<Plugin::Metric> &metsIn) = 0;
409+ virtual bool put_mets () = 0;
410+ virtual bool put_err () = 0;
411+ virtual void set_put_mets (const bool &putMets) = 0;
412+ virtual void set_put_err (const bool &putErr) = 0;
413+ virtual void set_context_cancelled (const bool &contextCancelled) = 0;
414+ virtual bool context_cancelled () = 0;
415+
416+ private:
417+ /* *
418+ * sets the maximum duration (always greater than 0s) between collections
419+ * before metrics are sent. Defaults to 10s what means that after 10 seconds
420+ * no new metrics are received, the plugin should send whatever data it has
421+ * in the buffer instead of waiting longer. (e.g. 5s)
422+ */
423+ std::chrono::seconds _max_collect_duration;
424+
425+ /* *
426+ * maximum number of metrics the plugin is buffering before sending metrics.
427+ * Defaults to zero what means send metrics immediately
428+ */
429+ int64_t _max_metrics_buffer;
430+ };
431+
369432 /* *
370433 * These functions are called to start a plugin.
371434 * They export plugin using default PluginExporter based on GRPC:
@@ -378,4 +441,5 @@ namespace Plugin {
378441 void start_collector (int argc, char **argv, CollectorInterface* plg, Meta& meta);
379442 void start_processor (int argc, char **argv, ProcessorInterface* plg, Meta& meta);
380443 void start_publisher (int argc, char **argv, PublisherInterface* plg, Meta& meta);
444+ void start_stream_collector (int argc, char **argv, StreamCollectorInterface* plg, Meta& meta);
381445}; // namespace Plugin
0 commit comments