Skip to content
27 changes: 27 additions & 0 deletions workflow-cli/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -553,4 +553,31 @@ DESCRIPTION
EXAMPLES
$ workflow-cli snapshot
```

## `workflow-cli opensearch-sync-rollup`

OpenSearch Rollup Job Creates/Updates

```
USAGE
$ workflow-cli opensearch-sync-rollup -u <value> -d <value> --region <value> --accessId <value> --accessKey <value>
--accountNumber <value> [--arn <value>] [-h] [--dryRun]

FLAGS
-d, --domainName=<value> (required) OpenSearch Domain
-h, --help Show CLI help.
-u, --hostname=<value> (required) OpenSearch url
--accessId=<value> (required) AWS access key id
--accessKey=<value> (required) AWS secret access key
--accountNumber=<value> (required) AWS account number
--arn=<value> AWS ARN
--dryRun Enables dry run
--region=<value> (required) AWS region

DESCRIPTION
OpenSearch Rollup Job Creates/Updates

EXAMPLES
$ workflow-cli opensearch-sync-rollup
```
<!-- commandsstop -->
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
{
"index_patterns": ["nrm-accessrollup-*"],
"template": {
"aliases": {
"nrm-accessrollup": {}
},
"settings": {
"number_of_shards": 1,
"number_of_replicas": 1
},
"mappings": {
"dynamic": "strict",
"properties": {
"url.path": {
"type": "keyword"
},
"url.domain": {
"type": "keyword"
},
"@timestamp": {
"type": "date"
},
"event.outcome": {
"properties": {
"value_count": {
"type": "long"
}
}
},
"event.duration": {
"properties": {
"avg": {
"type": "double"
}
}
}
}
}
},
"version": 1,
"priority": 300,
"_meta": {
"description": "NRM Access Url Rollup indices"
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,62 @@
"actions": [],
"transitions": [
{
"state_name": "scaled",
"state_name": "rp",
"conditions": {
"min_index_age": "4d"
"min_index_age": "14d"
}
}
]
},
{
"name": "rp",
"actions": [
{
"rollup": {
"ism_rollup": {
"target_index": "nrm-accessrollup",
"description": "nrm access external rollup job",
"page_size": 200,
"dimensions": [
{
"date_histogram": {
"source_field": "@timestamp",
"fixed_interval": "60m",
"timezone": "America/Vancouver"
}
},
{
"terms": {
"source_field": "url.path"
}
},
{
"terms": {
"source_field": "url.domain"
}
}
],
"metrics": [
{
"source_field": "event.outcome",
"metrics": [{"value_count":{}}]
},
{
"source_field": "event.duration",
"metrics": [{"avg": {}}]
}
]
}
}
}
],
"transitions": [{
"state_name": "scaled",
"conditions": {
"min_index_age": "16d"
}
}]
},
{
"name": "scaled",
"actions": [
Expand Down
45 changes: 45 additions & 0 deletions workflow-cli/src/commands/opensearch-sync-rollup.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
import 'reflect-metadata';
import { Command } from '@oclif/core';
import AwsService from '../services/aws.service';
import {
accessId,
accessKey,
accountNumber,
arn,
domainName,
help,
hostname,
region,
dryRun,
} from '../flags';
import OpenSearchRollupService from '../services/opensearch-rollup.service';
import OpenSearchDomainService from '../services/opensearch-domain.service';

export default class OpenSearchRollup extends Command {
static description = 'Create/Update OpenSearch Rollup Index';

static examples = ['<%= config.bin %> <%= command.id %>'];

static flags = {
...hostname,
...domainName,
...region,
...accessId,
...accessKey,
...accountNumber,
...arn,
...help,
...dryRun,
};

public async run(): Promise<void> {
const { flags } = await this.parse(OpenSearchRollup);
await AwsService.assumeIdentity(flags);

const service = new OpenSearchRollupService();
const domainService = new OpenSearchDomainService();

await domainService.getDomain(flags);
await service.createOrUpdateRollupJob(flags);
}
}
114 changes: 114 additions & 0 deletions workflow-cli/src/services/opensearch-rollup.service.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
import * as fs from 'fs';
import * as path from 'path';
import AwsService from './aws.service';
import { WorkflowSettings } from './opensearch-domain.service';

export default class OpenSearchRollupService extends AwsService {
public async createOrUpdateRollupJob(
settings: WorkflowSettings,
): Promise<any> {
const templateDir = path.resolve(
__dirname,
'../../configuration-opensearch/rollup_index_jobs',
);

for (const filePath of fs.readdirSync(templateDir)) {
if (!filePath.endsWith('.json')) {
continue;
}
const rollupJobName = path.basename(filePath, '.json');
let rollupJobPath = `/_plugins/_rollup/jobs/${rollupJobName}`;
const exists = await this.checkRollupJobExists(settings, rollupJobName);
const rollupConfig = JSON.parse(
fs.readFileSync(path.join(templateDir, filePath), 'utf8'),
);

if (exists) {
rollupJobPath += '?if_seq_no=1&if_primary_term=1';
console.log(`Updating existing rollup job: ${rollupJobName}`);
return this.executeRollupRequest(
'PUT',
settings,
rollupJobPath,
rollupConfig,
).then((res) => {
console.log(
`PUT ${rollupJobPath}\n[${res.statusCode}] Rollup job updated`,
);
});
} else {
console.log(`Creating new rollup job: ${rollupJobName}`);
return this.executeRollupRequest(
'PUT',
settings,
rollupJobPath,
rollupConfig,
).then((res) => {
console.log(
`PUT ${rollupJobPath}\n[${res.statusCode}] Rollup job created`,
);
});
}
}
}

private async executeRollupRequest(
method: string,
settings: WorkflowSettings,
path: string,
body?: any,
): Promise<any> {
const [pathWithoutQuery, queryString] = path.split('?');
const queryParams = queryString
? Object.fromEntries(new URLSearchParams(queryString).entries())
: undefined;
const requestOptions = {
method,
headers: {
'Content-Type': 'application/json',
host: settings.hostname,
},
hostname: settings.hostname,
path: pathWithoutQuery,
query: queryParams,
body: body ? JSON.stringify(body) : undefined,
};

return this.executeSignedHttpRequest(requestOptions)
.then((res) => this.waitAndReturnResponseBody(res))
.catch((err) => {
console.error(
`Error during ${method} request to ${path}: ${err.message}`,
);
throw err;
});
}

// check if the rollup job exists
private async checkRollupJobExists(
settings: WorkflowSettings,
jobName: string,
): Promise<boolean> {
const rollupJobPath = `/_plugins/_rollup/jobs/${jobName}`;
return this.executeRollupRequest('GET', settings, rollupJobPath)
.then((res) => {
if (res.statusCode === 200) {
console.log(`[${res.statusCode}] Rollup job exists`);
return true;
} else if (res.statusCode === 404) {
console.log(`[${res.statusCode}] Rollup job does not exist`);
return false;
} else {
throw new Error(`Unexpected status code: ${res.statusCode}`);
}
})
.catch((err) => {
if (err.response?.statusCode === 404) {
console.log(`[404] Rollup job does not exist`);
return false;
}
console.error(`Error checking rollup job: ${err.message}`);
throw err;
});
}
}