diff --git a/workflow-cli/README.md b/workflow-cli/README.md index c0ee78c..30f699a 100644 --- a/workflow-cli/README.md +++ b/workflow-cli/README.md @@ -553,4 +553,31 @@ DESCRIPTION EXAMPLES $ workflow-cli snapshot ``` + +## `workflow-cli opensearch-sync-rollup` + +OpenSearch Rollup Job Creates/Updates + +``` +USAGE + $ workflow-cli opensearch-sync-rollup -u -d --region --accessId --accessKey + --accountNumber [--arn ] [-h] [--dryRun] + +FLAGS + -d, --domainName= (required) OpenSearch Domain + -h, --help Show CLI help. + -u, --hostname= (required) OpenSearch url + --accessId= (required) AWS access key id + --accessKey= (required) AWS secret access key + --accountNumber= (required) AWS account number + --arn= AWS ARN + --dryRun Enables dry run + --region= (required) AWS region + +DESCRIPTION + OpenSearch Rollup Job Creates/Updates + +EXAMPLES + $ workflow-cli opensearch-sync-rollup +``` diff --git a/workflow-cli/configuration-opensearch/index_template/accessrollup.json b/workflow-cli/configuration-opensearch/index_template/accessrollup.json new file mode 100644 index 0000000..c70fe87 --- /dev/null +++ b/workflow-cli/configuration-opensearch/index_template/accessrollup.json @@ -0,0 +1,45 @@ +{ + "index_patterns": ["nrm-accessrollup-*"], + "template": { + "aliases": { + "nrm-accessrollup": {} + }, + "settings": { + "number_of_shards": 1, + "number_of_replicas": 1 + }, + "mappings": { + "dynamic": "strict", + "properties": { + "url.path": { + "type": "keyword" + }, + "url.domain": { + "type": "keyword" + }, + "@timestamp": { + "type": "date" + }, + "event.outcome": { + "properties": { + "value_count": { + "type": "long" + } + } + }, + "event.duration": { + "properties": { + "avg": { + "type": "double" + } + } + } + } + } + }, + "version": 1, + "priority": 300, + "_meta": { + "description": "NRM Access Url Rollup indices" + } + } \ No newline at end of file diff --git a/workflow-cli/configuration-opensearch/state_management_policy/nrm-type-access-external-policy.json b/workflow-cli/configuration-opensearch/state_management_policy/nrm-type-access-external-policy.json index 8d9b2a3..2772460 100644 --- a/workflow-cli/configuration-opensearch/state_management_policy/nrm-type-access-external-policy.json +++ b/workflow-cli/configuration-opensearch/state_management_policy/nrm-type-access-external-policy.json @@ -35,13 +35,62 @@ ], "transitions": [ { - "state_name": "force_merge_standby", + "state_name": "rp", "conditions": { - "min_index_age": "36d" + "min_index_age": "14d" } } ] }, + { + "name": "rp", + "actions": [ + { + "rollup": { + "ism_rollup": { + "target_index": "nrm-accessrollup", + "description": "nrm access external rollup job", + "page_size": 200, + "dimensions": [ + { + "date_histogram": { + "source_field": "@timestamp", + "fixed_interval": "60m", + "timezone": "America/Vancouver" + } + }, + { + "terms": { + "source_field": "url.path" + } + }, + { + "terms": { + "source_field": "url.domain" + } + } + ], + "metrics": [ + { + "source_field": "event.outcome", + "metrics": [{"value_count":{}}] + }, + { + "source_field": "event.duration", + "metrics": [{"avg": {}}] + } + ] + } + } + } + ], + "transitions": [{ + "state_name": "force_merge_standby", + "conditions": { + "min_index_age": "36d" + } + }] + }, { "name": "force_merge_standby", "actions": [], diff --git a/workflow-cli/src/commands/opensearch-sync-rollup.ts b/workflow-cli/src/commands/opensearch-sync-rollup.ts new file mode 100644 index 0000000..bf1d6f2 --- /dev/null +++ b/workflow-cli/src/commands/opensearch-sync-rollup.ts @@ -0,0 +1,45 @@ +import 'reflect-metadata'; +import { Command } from '@oclif/core'; +import AwsService from '../services/aws.service'; +import { + accessId, + accessKey, + accountNumber, + arn, + domainName, + help, + hostname, + region, + dryRun, +} from '../flags'; +import OpenSearchRollupService from '../services/opensearch-rollup.service'; +import OpenSearchDomainService from '../services/opensearch-domain.service'; + +export default class OpenSearchRollup extends Command { + static description = 'Create/Update OpenSearch Rollup Index'; + + static examples = ['<%= config.bin %> <%= command.id %>']; + + static flags = { + ...hostname, + ...domainName, + ...region, + ...accessId, + ...accessKey, + ...accountNumber, + ...arn, + ...help, + ...dryRun, + }; + + public async run(): Promise { + const { flags } = await this.parse(OpenSearchRollup); + await AwsService.assumeIdentity(flags); + + const service = new OpenSearchRollupService(); + const domainService = new OpenSearchDomainService(); + + await domainService.getDomain(flags); + await service.createOrUpdateRollupJob(flags); + } +} diff --git a/workflow-cli/src/services/opensearch-rollup.service.ts b/workflow-cli/src/services/opensearch-rollup.service.ts new file mode 100644 index 0000000..879b6d4 --- /dev/null +++ b/workflow-cli/src/services/opensearch-rollup.service.ts @@ -0,0 +1,114 @@ +import * as fs from 'fs'; +import * as path from 'path'; +import AwsService from './aws.service'; +import { WorkflowSettings } from './opensearch-domain.service'; + +export default class OpenSearchRollupService extends AwsService { + public async createOrUpdateRollupJob( + settings: WorkflowSettings, + ): Promise { + const templateDir = path.resolve( + __dirname, + '../../configuration-opensearch/rollup_index_jobs', + ); + + for (const filePath of fs.readdirSync(templateDir)) { + if (!filePath.endsWith('.json')) { + continue; + } + const rollupJobName = path.basename(filePath, '.json'); + let rollupJobPath = `/_plugins/_rollup/jobs/${rollupJobName}`; + const exists = await this.checkRollupJobExists(settings, rollupJobName); + const rollupConfig = JSON.parse( + fs.readFileSync(path.join(templateDir, filePath), 'utf8'), + ); + + if (exists) { + rollupJobPath += '?if_seq_no=1&if_primary_term=1'; + console.log(`Updating existing rollup job: ${rollupJobName}`); + return this.executeRollupRequest( + 'PUT', + settings, + rollupJobPath, + rollupConfig, + ).then((res) => { + console.log( + `PUT ${rollupJobPath}\n[${res.statusCode}] Rollup job updated`, + ); + }); + } else { + console.log(`Creating new rollup job: ${rollupJobName}`); + return this.executeRollupRequest( + 'PUT', + settings, + rollupJobPath, + rollupConfig, + ).then((res) => { + console.log( + `PUT ${rollupJobPath}\n[${res.statusCode}] Rollup job created`, + ); + }); + } + } + } + + private async executeRollupRequest( + method: string, + settings: WorkflowSettings, + path: string, + body?: any, + ): Promise { + const [pathWithoutQuery, queryString] = path.split('?'); + const queryParams = queryString + ? Object.fromEntries(new URLSearchParams(queryString).entries()) + : undefined; + const requestOptions = { + method, + headers: { + 'Content-Type': 'application/json', + host: settings.hostname, + }, + hostname: settings.hostname, + path: pathWithoutQuery, + query: queryParams, + body: body ? JSON.stringify(body) : undefined, + }; + + return this.executeSignedHttpRequest(requestOptions) + .then((res) => this.waitAndReturnResponseBody(res)) + .catch((err) => { + console.error( + `Error during ${method} request to ${path}: ${err.message}`, + ); + throw err; + }); + } + + // check if the rollup job exists + private async checkRollupJobExists( + settings: WorkflowSettings, + jobName: string, + ): Promise { + const rollupJobPath = `/_plugins/_rollup/jobs/${jobName}`; + return this.executeRollupRequest('GET', settings, rollupJobPath) + .then((res) => { + if (res.statusCode === 200) { + console.log(`[${res.statusCode}] Rollup job exists`); + return true; + } else if (res.statusCode === 404) { + console.log(`[${res.statusCode}] Rollup job does not exist`); + return false; + } else { + throw new Error(`Unexpected status code: ${res.statusCode}`); + } + }) + .catch((err) => { + if (err.response?.statusCode === 404) { + console.log(`[404] Rollup job does not exist`); + return false; + } + console.error(`Error checking rollup job: ${err.message}`); + throw err; + }); + } +}