Skip to content

Commit cb0c60a

Browse files
committed
feat #OBS-I501 : data analyse PII API added
1 parent edff55d commit cb0c60a

File tree

9 files changed

+65
-22
lines changed

9 files changed

+65
-22
lines changed

api-service/src/configs/Config.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -98,7 +98,7 @@ export const config = {
9898
"command_service_config": {
9999
"host": process.env.command_service_host || "http://localhost",
100100
"port": parseInt(process.env.command_service_port || "8000"),
101-
"path": process.env.command_service_path || "/system/v1/dataset/command"
101+
"paths": JSON.parse(process.env.command_service_paths || '{"dataset":"/system/v1/dataset/command","connector":"/connector/v1/register","analyzePII":"/system/data/v1/analyze/pii"}')
102102
},
103103
"flink_job_configs": {
104104
"pipeline_merged_job_manager_url": process.env.pipeline_merged_job_manager_url || "http://localhost:8081",

api-service/src/connections/commandServiceConnection.ts

Lines changed: 13 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -5,21 +5,29 @@ import { v4 } from "uuid";
55

66
const commandHost = _.get(config, ["command_service_config", "host"])
77
const commandPort = _.get(config, ["command_service_config", "port"])
8-
const commandPath = _.get(config, ["command_service_config", "path"])
8+
const commandPaths = _.get(config, ["command_service_config", "paths"])
9+
const datasetPath = _.get(commandPaths, ["dataset"])
10+
const connectorRegisterPath = _.get(commandPaths, ["connector"])
11+
const analyzePIIPath = _.get(commandPaths, ["analyzePII"])
912

1013
export const commandHttpService = axios.create({ baseURL: `${commandHost}:${commandPort}`, headers: { "Content-Type": "application/json" } });
1114

12-
export const executeCommand = async (id: string, command: string) => {
15+
export const executeCommand = async (id: string, command: string, userToken: string) => {
1316
const payload = {
1417
"id": v4(),
1518
"data": {
1619
"dataset_id": id,
1720
"command": command
1821
}
1922
}
20-
return commandHttpService.post(commandPath, payload)
23+
return commandHttpService.post(datasetPath, payload, { headers: { Authorization: userToken }})
2124
}
2225

23-
export const registerConnector = async (requestBody: any) => {
24-
return commandHttpService.post("/connector/v1/register", requestBody)
26+
export const registerConnector = async (requestBody: any, userToken: string) => {
27+
return commandHttpService.post(connectorRegisterPath, requestBody, { headers: { Authorization: userToken }})
28+
}
29+
30+
export const detectPII = async (requestBody: any, userToken: string) => {
31+
console.log(`analyzePIIPath : ${analyzePIIPath}`)
32+
return commandHttpService.post(analyzePIIPath, requestBody, { headers: { Authorization: userToken }})
2533
}

api-service/src/controllers/ConnectorRegister/ConnectorRegisterController.ts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,8 @@ const connectorRegisterController = async (req: Request, res: Response) => {
2222
relative_path: uploadStreamResponse[0]
2323
}
2424
logger.info({ apiId, resmsgid, message: `File uploaded to cloud provider successfully` })
25-
const registryResponse = await registerConnector(payload);
25+
const userToken = req.get('authorization') as string;
26+
const registryResponse = await registerConnector(payload, userToken);
2627
logger.info({ apiId, resmsgid, message: `Connector registered successfully` })
2728
ResponseHandler.successResponse(req, res, { status: httpStatus.OK, data: { message: registryResponse?.data?.message } })
2829
} catch (error: any) {
Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
import { Request, Response } from "express";
2+
import { ResponseHandler } from "../../helpers/ResponseHandler";
3+
import httpStatus from "http-status";
4+
import * as _ from "lodash";
5+
import logger from "../../logger";
6+
import { detectPII } from "../../connections/commandServiceConnection";
7+
8+
const code = "FAILED_TO_DETECT_PII";
9+
export const dataAnalyzePII = async (req: Request, res: Response) => {
10+
const apiId = _.get(req, 'id')
11+
try {
12+
const userToken = req.get('authorization') as string;
13+
const piiSuggestionsResponse = await detectPII(_.get(req, ['body', 'data']), userToken);
14+
logger.info({apiId , message: `Detected PII successfully` })
15+
ResponseHandler.successResponse(req, res, { status: httpStatus.OK, data: piiSuggestionsResponse?.data})
16+
} catch (error: any) {
17+
const errMessage = _.get(error, "response.data.detail")
18+
logger.error(error, apiId, code);
19+
let errorMessage = error;
20+
const statusCode = _.get(error, "status")
21+
if (!statusCode || statusCode == 500) {
22+
errorMessage = { code, message: errMessage || "Failed to detect pii" }
23+
}
24+
ResponseHandler.errorResponse(errorMessage, req, res);
25+
}
26+
}

api-service/src/controllers/DatasetReset/DatasetReset.ts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,12 +29,13 @@ const datasetReset = async (req: Request, res: Response) => {
2929

3030
const category = _.get(req, ["body", "request", "category"]);
3131
const datasetId = _.get(req, ["params"," datasetId"]);
32+
const userToken = req.get('authorization') as string;
3233

3334
await validateRequest(req);
3435
if (category == "processing") {
3536
const pipeLineStatus = await getFlinkHealthStatus()
3637
if (pipeLineStatus == HealthStatus.UnHealthy) {
37-
await restartPipeline({ "dataset": { "dataset_id": datasetId } })
38+
await restartPipeline({ "dataset": { "dataset_id": datasetId } }, userToken)
3839
}
3940
} else if (category == "query") {
4041
const datasources = await datasetService.findDatasources({"dataset_id": datasetId})

api-service/src/controllers/DatasetStatusTransition/DatasetStatusTransition.ts

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@ const validateDataset = (dataset: any, datasetId: any, action: string) => {
5050
const datasetStatusTransition = async (req: Request, res: Response) => {
5151

5252
const { dataset_id, status } = _.get(req.body, "request");
53+
const userToken = req.get('authorization') as string;
5354
validateRequest(req, dataset_id);
5455

5556
const dataset: Record<string, any> = (_.includes(liveDatasetActions, status)) ? await datasetService.getDataset(dataset_id, ["id", "status", "type", "api_version", "name"], true) : await datasetService.getDraftDataset(dataset_id, ["id", "dataset_id", "status", "type", "api_version"])
@@ -64,10 +65,10 @@ const datasetStatusTransition = async (req: Request, res: Response) => {
6465
await readyForPublish(dataset, userID);
6566
break;
6667
case "Live":
67-
await publishDataset(dataset, userID);
68+
await publishDataset(dataset, userID, userToken);
6869
break;
6970
case "Retire":
70-
await retireDataset(dataset, userID);
71+
await retireDataset(dataset, userID, userToken);
7172
break;
7273
default:
7374
throw obsrvError(dataset.id, "UNKNOWN_STATUS_TRANSITION", "Unknown status transition requested", "BAD_REQUEST", 400)
@@ -134,15 +135,15 @@ const readyForPublish = async (dataset: Record<string, any>, updated_by: any) =>
134135
*
135136
* @param dataset
136137
*/
137-
const publishDataset = async (dataset: Record<string, any>, userID: any) => {
138+
const publishDataset = async (dataset: Record<string, any>, userID: any, userToken: string) => {
138139

139140
const draftDataset: Record<string, any> = await datasetService.getDraftDataset(dataset.dataset_id) as unknown as Record<string, any>
140141
validateStorageSupport(draftDataset);
141142
_.set(draftDataset, ["created_by"], userID);
142143
_.set(draftDataset, ["updated_by"], userID);
143144
await validateAndUpdateDenormConfig(draftDataset);
144145
await updateMasterDataConfig(draftDataset)
145-
await datasetService.publishDataset(draftDataset)
146+
await datasetService.publishDataset(draftDataset, userToken);
146147
}
147148

148149
const validateAndUpdateDenormConfig = async (draftDataset: Record<string, any>) => {
@@ -227,11 +228,11 @@ const updateMasterDataConfig = async (draftDataset: Record<string, any>) => {
227228
}
228229
}
229230

230-
const retireDataset = async (dataset: Record<string, any>, updated_by: any) => {
231+
const retireDataset = async (dataset: Record<string, any>, updated_by: any, userToken: string) => {
231232

232233
await canRetireIfMasterDataset(dataset);
233234
await datasetService.retireDataset(dataset, updated_by);
234-
await restartPipeline(dataset);
235+
await restartPipeline(dataset, userToken);
235236
}
236237

237238

@@ -254,8 +255,8 @@ const canRetireIfMasterDataset = async (dataset: Record<string, any>) => {
254255
}
255256
}
256257

257-
export const restartPipeline = async (dataset: Record<string, any>) => {
258-
return executeCommand(dataset.id, "RESTART_PIPELINE")
258+
export const restartPipeline = async (dataset: Record<string, any>, userToken: string) => {
259+
return executeCommand(dataset.id, "RESTART_PIPELINE", userToken)
259260
}
260261

261262
export default datasetStatusTransition;

api-service/src/middlewares/userPermissions.json

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,8 @@
5050
"api.datasets.dataschema"
5151
],
5252
"data": [
53-
"api.data.in"
53+
"api.data.in",
54+
"api.data.analyze.pii"
5455
],
5556
"queryTemplate": [
5657
"api.query.template.create",
@@ -92,7 +93,8 @@
9293
"schema",
9394
"file",
9495
"connector",
95-
"sqlQuery"
96+
"sqlQuery",
97+
"data"
9698
],
9799
"dataset_manager": [
98100
"general_access",
@@ -104,7 +106,8 @@
104106
"file",
105107
"connector",
106108
"sqlQuery",
107-
"restricted_dataset_api"
109+
"restricted_dataset_api",
110+
"data"
108111
],
109112
"admin": [
110113
"general_access",
@@ -120,7 +123,8 @@
120123
"alert",
121124
"metric",
122125
"silence",
123-
"notificationChannel"
126+
"notificationChannel",
127+
"data"
124128
],
125129
"operations_admin": [
126130
"alert",

api-service/src/routes/Router.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ import telemetryActions from "../telemetry/telemetryActions";
3232
import datasetMetrics from "../controllers/DatasetMetrics/DatasetMetricsController";
3333
import checkRBAC from "../middlewares/RBAC_middleware";
3434
import connectorRegisterController from "../controllers/ConnectorRegister/ConnectorRegisterController";
35+
import { dataAnalyzePII } from "../controllers/DataAnalyzePII/DataAnalyzePIIController";
3536

3637
export const router = express.Router();
3738

@@ -60,6 +61,7 @@ router.post("/connectors/list", setDataToRequestObject("api.connectors.list"), o
6061
router.get("/connectors/read/:id", setDataToRequestObject("api.connectors.read"), onRequest({entity: Entity.Management }), telemetryAuditStart({action: telemetryActions.readConnectors, operationType: OperationType.GET}), checkRBAC.handler(), ConnectorsRead);
6162
router.post("/datasets/import", setDataToRequestObject("api.datasets.import"), onRequest({ entity: Entity.Management }), checkRBAC.handler(), DatasetImport);
6263
router.post("/connector/register", setDataToRequestObject("api.connector.register"), onRequest({ entity: Entity.Management }), connectorRegisterController);
64+
router.post("/data/analyze/pii", setDataToRequestObject("api.data.analyze.pii"), onRequest({ entity: Entity.Management }),checkRBAC.handler(), dataAnalyzePII);
6365
//Wrapper Service
6466
router.post("/obsrv/data/sql-query", setDataToRequestObject("api.obsrv.data.sql-query"), onRequest({ entity: Entity.Data_out }), checkRBAC.handler(), sqlQuery);
6567
router.post("/data/metrics", setDataToRequestObject("api.data.metrics"), onRequest({ entity: Entity.Data_out }), datasetMetrics)

api-service/src/services/DatasetService.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -303,7 +303,7 @@ class DatasetService {
303303
}
304304
}
305305

306-
publishDataset = async (draftDataset: Record<string, any>) => {
306+
publishDataset = async (draftDataset: Record<string, any>, userToken: string) => {
307307

308308
const indexingConfig = draftDataset.dataset_config.indexing_config;
309309
const transaction = await sequelize.transaction()
@@ -326,7 +326,7 @@ class DatasetService {
326326
await transaction.rollback()
327327
throw obsrvError(draftDataset.id, "FAILED_TO_PUBLISH_DATASET", err.message, "SERVER_ERROR", 500, err);
328328
}
329-
await executeCommand(draftDataset.dataset_id, "PUBLISH_DATASET");
329+
await executeCommand(draftDataset.dataset_id, "PUBLISH_DATASET", userToken);
330330

331331
}
332332

0 commit comments

Comments
 (0)