diff --git a/.gitignore b/.gitignore index fe926107..35214ff1 100644 --- a/.gitignore +++ b/.gitignore @@ -33,3 +33,5 @@ build/ ### VS Code ### .vscode/ + +python-producer/data/credit_card_transactions.csv \ No newline at end of file diff --git a/ACTIVITY_LOG.md b/ACTIVITY_LOG.md new file mode 100644 index 00000000..c0e5aaaf --- /dev/null +++ b/ACTIVITY_LOG.md @@ -0,0 +1,68 @@ +# Activity Log: Project Architecture Restructuring + +## Goal + +Restructure the project to have: + +1. **Python producer**: Keep as-is, reads CSV and produces to Kafka +2. **Java Spring Boot**: Revert to original Transaction format (simple senderId, recipientId, amount) +3. **Go microservice**: New service to consume from Kafka and process AnalyticsTransaction format + +## Changes Made + +### 1. ✅ Switched to Flow Branch and Restored Original Spring Boot State + +- **Git Operation**: `git checkout flow` + + - Switched to the flow branch which contains the original Spring Boot implementation + - Restored TransactionListener.java to original state (listens to simple Transaction format) + - Restored TransactionService.java to original state (processes simple Transaction format) + +- **Current State**: + - Java Spring Boot now accepts simple Transaction format (senderId, recipientId, amount) + - No AnalyticsTransaction processing in Java service + - Python producer directory is available and ready to use + +### 2. ✅ Python Producer Status + +- **File**: `python-producer/data/kafka_prod.py` + - Already implemented and working + - Reads from CSV file: `credit_card_transactions.csv` + - Produces to Kafka topic: `transactions` + - Sends data in chunks of 100 with 10-second delays + +### 3. 🔄 Next Steps: Create Go Microservice + +- **File**: `go-analytics/main.go` + + - Create main Go application with Kafka consumer + - Consume AnalyticsTransaction format from Kafka + - Process credit card transaction data + +- **File**: `go-analytics/go.mod` + + - Go module configuration + - Dependencies for Kafka, JSON handling, logging + +- **File**: `go-analytics/kafka/consumer.go` + + - Kafka consumer implementation + - Handle AnalyticsTransaction message deserialization + +- **File**: `go-analytics/database/models.go` + - Database models for storing transaction analytics + - GORM models for credit card transaction data + +## Architecture Flow + +1. **Python Producer** → Reads CSV → Produces AnalyticsTransaction to Kafka +2. **Go Microservice** → Consumes from Kafka → Processes AnalyticsTransaction data +3. **Java Spring Boot** → Accepts simple Transaction format via REST API → Processes basic transactions + +## Benefits + +- Clear separation of concerns +- Python handles data ingestion +- Go handles analytics processing +- Java handles core transaction processing +- Microservices architecture with event-driven communication diff --git a/README.md b/README.md index 637f474b..990254b5 100644 --- a/README.md +++ b/README.md @@ -1,2 +1,97 @@ -# Midas -Project repo for the JPMC Advanced Software Engineering Forage program +# JPMorgan Chase Midas Analytics Service + +## Overview + +This repository contains the codebase for the Midas Analytics Service, a project designed to demonstrate modern microservice architecture using Java Spring Boot, with planned extensions in Go, Cassandra, and Terraform. The project is structured to provide a robust backend and, in the future, a Go-based analytics microservice and infrastructure as code. + +--- + +## Project Structure + +``` +jpmc-Midas/ + forage-midas/ + go-analytics/ + main.go # (Go microservice - planned, not yet functional) + # (other Go files: e.g., consumer.go, database/, kafka/) + src/ + main/ + java/ + com/jpmc/midascore/ + # Java Spring Boot backend (implemented) + terraform/ # (Infrastructure as code - planned) + README.md + # ...other files + go.mod +``` + +--- + +## Currently Implemented + +### 1. **Java Spring Boot Backend (`forage-midas/src/main/java/com/jpmc/midascore/`)** + +- **Entities:** `TransactionRecord`, `UserRecord`, etc. +- **Controllers:** `BalanceController` +- **Repositories:** `TransactionRepository`, `UserRepository` +- **Services:** `TransactionService` +- **Kafka Integration:** `TransactionListener` +- **Database Integration:** `DatabaseConduit` +- **Testing:** Multiple test classes for core functionality. + +> **Note:** The Go microservice and Terraform infrastructure are not yet functional. Current Go code is experimental and not production-ready. + +--- + +## Planned Additions + +### 1. **Go Analytics Microservice (Planned)** + +- **Kafka Producer & Consumer:** + - Implement robust Kafka producer and consumer in Go to process and analyze messages. +- **Cassandra Integration:** + - Use a Go Cassandra driver (e.g., `gocql`) to persist and query analytics data. +- **RESTful API:** + - Expose analytics endpoints via HTTP using `gorilla/mux`. + +### 2. **Infrastructure as Code (Terraform) (Planned)** + +- **Automated Provisioning:** + - Use Terraform to provision Kafka, Cassandra, and networking resources. +- **Environment Management:** + - Separate configurations for development, staging, and production. + +### 3. **CI/CD Integration (Planned)** + +- Automated testing and deployment for both Java and Go services. +- Terraform plan/apply automation. + +### 4. **Documentation and Developer Experience (Planned)** + +- Comprehensive API documentation (Swagger/OpenAPI). +- Sample data and usage examples. +- Learning guides for running and extending the system. + +--- + +## Getting Started + + +### Quick Start (Java Spring Boot Backend) + +1. **Navigate to the project directory:** + ```sh + cd forage-midas + ``` +2. **Build and run the Spring Boot application:** + ```sh + ./mvnw spring-boot:run + ``` +3. **Run tests:** + ```sh + ./mvnw test + ``` + +> **Note:** Go microservice and Terraform scripts are not yet functional. Instructions will be updated as these components are implemented. + +--- diff --git a/application.yml b/application.yml index e69de29b..05a51ed7 100644 --- a/application.yml +++ b/application.yml @@ -0,0 +1,31 @@ +general: + kafka-topic: test-topic + +spring: + kafka: + bootstrap:servers: localhost:9092 + producer: + key-serializer: org.apache.kafka.common.serialization.StringSerializer + value-serializer: org.springframework.kafka.support.serializer.JsonSerializer + consumer: + group-id: midas-core-group + auto-offset-reset: earliest + key-deserializer: org.apache.kafka.common.serialization.StringDeserializer + value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer + properties: + spring.json.trusted.packages: com.jpmc.midascore.foundation + + datasource: + url: jdbc:h2:mem:testdb + driver-class-name: org.h2.Driver + username: sa + password: password + h2: + console: + enabled: true + path: /h2-console + jpa: + database-platform: org.hibernate.dialect.H2Dialect + hibernate: + ddl-auto: update + show-sql: true diff --git a/go-analytics/analytics/behvior.go b/go-analytics/analytics/behvior.go new file mode 100644 index 00000000..f1d0c9a9 --- /dev/null +++ b/go-analytics/analytics/behvior.go @@ -0,0 +1 @@ +package analytics \ No newline at end of file diff --git a/go-analytics/analytics/metrics.go b/go-analytics/analytics/metrics.go new file mode 100644 index 00000000..f1d0c9a9 --- /dev/null +++ b/go-analytics/analytics/metrics.go @@ -0,0 +1 @@ +package analytics \ No newline at end of file diff --git a/go-analytics/analytics/processor.go b/go-analytics/analytics/processor.go new file mode 100644 index 00000000..9a78e982 --- /dev/null +++ b/go-analytics/analytics/processor.go @@ -0,0 +1,253 @@ +package analytics + +import ( + "fmt" + "time" + + "github.com/jpmc/forge/go-analytics/database" + "github.com/jpmc/forge/go-analytics/kafka" + "github.com/sirupsen/logrus" + "gopkg.in/inf.v0" +) + +type Processor struct { + db *database.CassandraDB + logger *logrus.Logger + repo *database.Repository +} + +func NewProcessor(db *database.CassandraDB, logger *logrus.Logger) *Processor { + + return &Processor{ + + db: db, + logger: logger, + repo: database.NewRepository(db.Session()), + } +} + +func (p *Processor) ProcessTransaction(transaction *kafka.AnalyticsTransaction) error { + + p.logger.Infof("processing analytics for transaction: %s", transaction.TransNum) + + riskScore := p.calculateRiskScore(transaction) + + // Convert float64 amount to inf.Dec for Cassandra + amount := inf.NewDec(int64(transaction.Amt*100), 2) // Convert to cents + + metric := &database.TransactionMetric{ + TransactionID: transaction.TransNum, + ProcessedAt: time.Now(), + Amount: amount, + Category: transaction.Category, + Merchant: transaction.Merchant, + IsFraud: transaction.IsFraud, + RiskScore: riskScore, + Location: fmt.Sprintf("%s, %s", transaction.City, transaction.State), + } + + // Save to database + if err := p.repo.SaveTransactionMetric(metric); err != nil { + return fmt.Errorf("failed to save transaction metric: %w", err) + } + + // Update user behavior + date := time.Now().Format("2006-01-02") + if err := p.repo.UpdateUserBehavior(transaction.CcNum, date, transaction.Amt, transaction.IsFraud); err != nil { + return fmt.Errorf("failed to update user behavior: %w", err) + } + + // Save time-series metrics + if err := p.saveTimeSeriesMetrics(transaction); err != nil { + return fmt.Errorf("failed to save time-series metrics: %w", err) + } + + // Check for fraud alerts + if err := p.checkForAlerts(transaction, riskScore); err != nil { + return fmt.Errorf("failed to check for alerts: %w", err) + } + + p.logger.Infof("Successfully processed and saved transaction: %s", transaction.TransNum) + return nil +} + +func (p *Processor) saveTimeSeriesMetrics(transaction *kafka.AnalyticsTransaction) error { + now := time.Now() + hourBucket := now.Format("2006-01-02-15") + dayBucket := now.Format("2006-01-02") + + metrics := []*database.TimeSeriesMetric{ + { + MetricName: "transaction_volume", + TimeBucket: hourBucket, + Timestamp: now, + Value: 1.0, // 1 transaction + Metadata: map[string]string{"category": transaction.Category}, + }, + { + MetricName: "transaction_amount", + TimeBucket: hourBucket, + Timestamp: now, + Value: transaction.Amt, // Use float64 directly + Metadata: map[string]string{"category": transaction.Category}, + }, + { + MetricName: "fraud_incidents", + TimeBucket: dayBucket, + Timestamp: now, + Value: boolToFloat(transaction.IsFraud), + Metadata: map[string]string{"category": transaction.Category}, + }, + { + MetricName: "average_transaction_amount", + TimeBucket: hourBucket, + Timestamp: now, + Value: transaction.Amt, // Use float64 directly + Metadata: map[string]string{"category": transaction.Category}, + }, + } + + for _, metric := range metrics { + if err := p.repo.SaveTimeSeriesMetric(metric); err != nil { + return err + } + } + + return nil +} + +func (p *Processor) calculateRiskScore(transaction *kafka.AnalyticsTransaction) *inf.Dec { + + riskScore := 0.0 + + if transaction.IsFraud { + riskScore += 50.0 + } + + if transaction.Amt > 1000 { + riskScore += 20.0 + } else if transaction.Amt > 500 { + riskScore += 10.0 + } + + // mapping high risk categories to their associated risk score + highRiskCategories := map[string]float64{ + "electronics": 15.0, + "jewelry": 20.0, + "travel": 25.0, + "gaming": 20.0, + "cryptocurrency": 30.0, + } + + if risk, exists := highRiskCategories[transaction.Category]; exists { + riskScore += risk + } + + // higher risk score in big cities + if transaction.CityPop > 1000000 { + riskScore += 5.0 + } + + // Late night transactions have higher risk score + // Use the transaction's timestamp instead of the current time + var hour int + if transaction.UnixTime != 0 { + hour = time.Unix(transaction.UnixTime, 0).Hour() + } else if transaction.TransDateTransTime != "" { + // Attempt to parse the TransDateTransTime if UnixTime is not available + parsedTime, err := time.Parse("2006-01-02 15:04:05", transaction.TransDateTransTime) + if err == nil { + hour = parsedTime.Hour() + } + } + if hour >= 22 || hour <= 6 { + riskScore += 10 + } + + // Distance-based risk (if user location differs significantly from merchant) + if transaction.Lat != 0 && transaction.Lon != 0 && transaction.MerchLat != 0 && transaction.MerchLon != 0 { + distance := p.calculateDistance(transaction.Lat, transaction.Lon, transaction.MerchLat, transaction.MerchLon) + if distance > 100 { // More than 100 km + riskScore += 15.0 + } + } + + // Convert float64 to inf.Dec (multiply by 100 to preserve 2 decimal places) + riskScoreInt := int64(riskScore * 100) + return inf.NewDec(riskScoreInt, 2) +} + +func (p *Processor) checkForAlerts(transaction *kafka.AnalyticsTransaction, riskScore *inf.Dec) error { + // Convert inf.Dec to float64 for comparison and storage + riskScoreFloat := infDecToFloat(riskScore) + + // High risk score alert + if riskScoreFloat > 70.0 { + alert := &database.Alert{ + AlertID: fmt.Sprintf("alert_%s_%d", transaction.TransNum, time.Now().Unix()), + CreatedAt: time.Now(), + AlertType: "high_risk_score", + Severity: "high", + UserID: transaction.CcNum, + TransactionID: transaction.TransNum, + Description: fmt.Sprintf("High risk transaction detected. Risk score: %.2f", riskScoreFloat), + RiskScore: riskScoreFloat, // Use float64 + Status: "open", + } + + if err := p.repo.SaveAlert(alert); err != nil { + return err + } + + p.logger.Warnf("High risk alert created for transaction %s with risk score %.2f", transaction.TransNum, riskScoreFloat) + } + + // Large amount alert + if transaction.Amt > 2000 { + alert := &database.Alert{ + AlertID: fmt.Sprintf("alert_%s_%d", transaction.TransNum, time.Now().Unix()), + CreatedAt: time.Now(), + AlertType: "large_amount", + Severity: "medium", + UserID: transaction.CcNum, + TransactionID: transaction.TransNum, + Description: fmt.Sprintf("Large transaction amount detected: $%.2f", transaction.Amt), + RiskScore: riskScoreFloat, // Use float64 + Status: "open", + } + + if err := p.repo.SaveAlert(alert); err != nil { + return err + } + } + + return nil +} + +func (p *Processor) calculateDistance(lat1, lon1, lat2, lon2 float64) float64 { + + // This is a simplified version for demonstration + deltaLat := lat2 - lat1 + deltaLon := lon2 - lon1 + + // Convert to kilometers (rough approximation) + distance := (deltaLat*deltaLat + deltaLon*deltaLon) * 111.0 + + return distance +} + +func boolToFloat(b bool) float64 { + if b { + return 1.0 + } + return 0.0 +} + +func infDecToFloat(d *inf.Dec) float64 { + if d == nil { + return 0.0 + } + // Convert inf.Dec to float64 (simplified conversion) + // In a real implementation, you might want more precise conversion + return float64(d.UnscaledBig().Int64()) / float64(inf.NewDec(1, d.Scale()).UnscaledBig().Int64()) +} diff --git a/go-analytics/config/config.go b/go-analytics/config/config.go new file mode 100644 index 00000000..b2b6875d --- /dev/null +++ b/go-analytics/config/config.go @@ -0,0 +1,75 @@ +package config + +import ( + "github.com/spf13/viper" +) + +type Config struct { + Kafka KafkaConfig `mapstructure:"kafka"` + Cassandra CassandraConfig `mapstructure:"cassandra"` +} + +type KafkaConfig struct { + Brokers []string `mapstructure:"brokers"` + Topic string `mapstructure:"topic"` + GroupID string `mapstructure:"group_id"` +} + +type CassandraConfig struct { + Hosts []string `mapstructure:"hosts"` + Keyspace string `mapstructure:"keyspace"` + Username string `mapstructure:"username"` + Password string `mapstructure:"password"` +} + + +// loads from config with defaults +func Load() (*Config, error) { + viper.SetConfigName("config") + viper.SetConfigType("yaml") + viper.AddConfigPath(".") + viper.AddConfigPath("./config") + + // Set defaults + viper.SetDefault("kafka.brokers", []string{"localhost:9092"}) + viper.SetDefault("kafka.topic", "transactions") + viper.SetDefault("kafka.group_id", "go-analytics-group") + viper.SetDefault("cassandra.hosts", []string{"127.0.0.1:9042"}) + viper.SetDefault("cassandra.keyspace", "midas_analytics") + viper.SetDefault("cassandra.username", "") // none required for local dev + viper.SetDefault("cassandra.password", "") // none required for local dev + + if err := viper.ReadInConfig(); err != nil { + // Use defaults if config file not found + if _, ok := err.(viper.ConfigFileNotFoundError); !ok { + return nil, err + } + } + + var config Config + + if err := viper.Unmarshal(&config); err != nil { + return nil, err + } + + return &config, nil +} + +// NewKafkaConfig returns Kafka configuration with defaults (can changed if needed) +func NewKafkaConfig() *KafkaConfig { + return &KafkaConfig{ + Brokers: []string{"localhost:9092"}, + Topic: "transactions", + GroupID: "go-analytics-group", + } +} + +// NewCassandraConfig returns Cassandra configuration with defaults (can change if needed) +func NewCassandraConfig() *CassandraConfig { + return &CassandraConfig{ + Hosts: []string{"127.0.0.1:9042"}, + Keyspace: "midas_analytics", + Username: "", + Password: "", + } +} diff --git a/go-analytics/database/cassandra.go b/go-analytics/database/cassandra.go new file mode 100644 index 00000000..96b4e388 --- /dev/null +++ b/go-analytics/database/cassandra.go @@ -0,0 +1,161 @@ +package database + +// cassandra connection. + +import ( + "fmt" + "time" + + "github.com/jpmc/forge/go-analytics/config" + + gocql "github.com/apache/cassandra-gocql-driver/v2" + "github.com/sirupsen/logrus" +) + +type CassandraDB struct { + session *gocql.Session + logger *logrus.Logger +} + +func NewCassandraConnection(cfg config.CassandraConfig) (*CassandraDB, error) { + + cluster := gocql.NewCluster(cfg.Hosts...) + cluster.Keyspace = cfg.Keyspace + cluster.Consistency = gocql.Quorum + cluster.Timeout = 10 * time.Second + cluster.ConnectTimeout = 10 * time.Second + + // check user and pass provided in the config file + if cfg.Username != "" && cfg.Password != "" { + cluster.Authenticator = gocql.PasswordAuthenticator{ + Username: cfg.Username, + Password: cfg.Password, + } + } + + session, err := cluster.CreateSession() + + if err != nil { + return nil, fmt.Errorf("failed to create cassandra connection: %w", err) + } + + db := &CassandraDB{ + session: session, + logger: logrus.New(), + } + + // Automatically check if schema needs initialization + if err := db.checkAndInitializeSchema(); err != nil { + session.Close() + return nil, fmt.Errorf("failed to check/init schema: %w", err) + } + + return db, nil +} + +func (db *CassandraDB) Close() { + + if db.session != nil { + db.session.Close() + } +} + +func (db *CassandraDB) Session() *gocql.Session { + return db.session +} + +func (db *CassandraDB) checkAndInitializeSchema() error { + // Check if tables already exist + tablesExist, err := db.checkTablesExist() + if err != nil { + return fmt.Errorf("failed to check if tables exist: %w", err) + } + + if !tablesExist { + db.logger.Info("Tables don't exist, initializing schema...") + return db.initializeSchema() + } + + db.logger.Info("Tables already exist, skipping schema initialization") + return nil +} + +func (db *CassandraDB) checkTablesExist() (bool, error) { + // Check if at least one of our tables exists + var count int + query := `SELECT COUNT(*) FROM system_schema.tables WHERE keyspace_name = 'midas_analytics' AND table_name = 'transaction_metrics';` + + if err := db.session.Query(query).Scan(&count); err != nil { + return false, fmt.Errorf("failed to query system_schema.tables: %w", err) + } + + return count > 0, nil +} + +func (db *CassandraDB) initializeSchema() error { + + createKeyspaceQuery := ` + CREATE KEYSPACE IF NOT EXISTS midas_analytics + WITH replication = { + 'class': 'SimpleStrategy', + 'replication_factor' : 1 + } + ` + + if err := db.session.Query(createKeyspaceQuery).Exec(); err != nil { + return fmt.Errorf("failed to create keyspace: %w", err) + } + + tables := []string{ + createTransactionMetricsTable, + createUserBehaviorTable, + createTimeSeriesMetricsTable, + } + + for _, table := range tables { + + if err := db.session.Query(table).Exec(); err != nil { + return fmt.Errorf("failed to create table: %w", err) + } + } + + db.logger.Info("Cassandra scheme initilized successfully") + + return nil +} + +const createTransactionMetricsTable = ` + CREATE TABLE IF NOT EXISTS midas_analytics.transaction_metrics ( + transaction_id text, + processed_at timestamp, + amount decimal, + category text, + merchant text, + is_fraud boolean, + risk_score decimal, + location text, + PRIMARY KEY (transaction_id, processed_at) + ) WITH CLUSTERING ORDER BY (processed_at DESC) +` + +const createUserBehaviorTable = ` + CREATE TABLE IF NOT EXISTS midas_analytics.user_behavior ( + user_id text, + date date, + total_transactions counter, + total_amount counter, + fraud_count counter, + PRIMARY KEY (user_id, date) + ) +` + +const createTimeSeriesMetricsTable = ` + CREATE TABLE IF NOT EXISTS midas_analytics.time_series_metrics ( + metric_name text, + time_bucket text, + timestamp timestamp, + value decimal, + metadata map, + PRIMARY KEY ((metric_name, time_bucket), timestamp) + ) WITH CLUSTERING ORDER BY (timestamp DESC) +` diff --git a/go-analytics/database/models.go b/go-analytics/database/models.go new file mode 100644 index 00000000..e15604a7 --- /dev/null +++ b/go-analytics/database/models.go @@ -0,0 +1,139 @@ +package database + +// cassandra data models + +import ( + "time" + + "gopkg.in/inf.v0" +) + +// TransactionMetric represents a transaction metric stored in Cassandra +type TransactionMetric struct { + TransactionID string `cql:"transaction_id"` + ProcessedAt time.Time `cql:"processed_at"` + Amount *inf.Dec `cql:"amount"` + Category string `cql:"category"` + Merchant string `cql:"merchant"` + IsFraud bool `cql:"is_fraud"` + RiskScore *inf.Dec `cql:"risk_score"` + Location string `cql:"location"` +} + +// UserBehavior represents user behavior metrics +type UserBehavior struct { + UserID string `cql:"user_id"` + Date string `cql:"date"` + TotalTransactions int64 `cql:"total_transactions"` + TotalAmount int64 `cql:"total_amount"` + FraudCount int64 `cql:"fraud_count"` +} + +// TimeSeriesMetric represents time-series analytics data +type TimeSeriesMetric struct { + MetricName string `cql:"metric_name"` + TimeBucket string `cql:"time_bucket"` + Timestamp time.Time `cql:"timestamp"` + Value float64 `cql:"value"` + Metadata map[string]string `cql:"metadata"` +} + +// TransactionSummary represents aggregated transaction data +type TransactionSummary struct { + Date string `cql:"date"` + Category string `cql:"category"` + TotalTransactions int64 `cql:"total_transactions"` + TotalAmount float64 `cql:"total_amount"` + AverageAmount float64 `cql:"average_amount"` + FraudCount int64 `cql:"fraud_count"` + FraudPercentage float64 `cql:"fraud_percentage"` +} + +// RiskProfile represents user risk assessment +type RiskProfile struct { + UserID string `cql:"user_id"` + LastUpdated time.Time `cql:"last_updated"` + RiskScore float64 `cql:"risk_score"` + RiskLevel string `cql:"risk_level"` + TransactionCount int64 `cql:"transaction_count"` + TotalAmount float64 `cql:"total_amount"` + FraudCount int64 `cql:"fraud_count"` + Categories []string `cql:"categories"` +} + +// MerchantAnalytics represents merchant-specific analytics +type MerchantAnalytics struct { + Merchant string `cql:"merchant"` + Date string `cql:"date"` + TransactionCount int64 `cql:"transaction_count"` + TotalAmount float64 `cql:"total_amount"` + AverageAmount float64 `cql:"average_amount"` + FraudCount int64 `cql:"fraud_count"` + FraudRate float64 `cql:"fraud_rate"` + Categories []string `cql:"categories"` +} + +// LocationAnalytics represents location-based analytics +type LocationAnalytics struct { + Location string `cql:"location"` + Date string `cql:"date"` + TransactionCount int64 `cql:"transaction_count"` + TotalAmount float64 `cql:"total_amount"` + AverageAmount float64 `cql:"average_amount"` + FraudCount int64 `cql:"fraud_count"` + FraudRate float64 `cql:"fraud_rate"` + Population int `cql:"population"` +} + +// DailyMetrics represents daily aggregated metrics +type DailyMetrics struct { + Date string `cql:"date"` + TotalTransactions int64 `cql:"total_transactions"` + TotalAmount float64 `cql:"total_amount"` + AverageAmount float64 `cql:"average_amount"` + FraudCount int64 `cql:"fraud_count"` + FraudRate float64 `cql:"fraud_rate"` + UniqueUsers int64 `cql:"unique_users"` + UniqueMerchants int64 `cql:"unique_merchants"` + TopCategories []string `cql:"top_categories"` +} + +// HourlyMetrics represents hourly aggregated metrics +type HourlyMetrics struct { + Date string `cql:"date"` + Hour int `cql:"hour"` + TotalTransactions int64 `cql:"total_transactions"` + TotalAmount float64 `cql:"total_amount"` + AverageAmount float64 `cql:"average_amount"` + FraudCount int64 `cql:"fraud_count"` + FraudRate float64 `cql:"fraud_rate"` + PeakHour bool `cql:"peak_hour"` +} + +// CategoryAnalytics represents category-specific analytics +type CategoryAnalytics struct { + Category string `cql:"category"` + Date string `cql:"date"` + TransactionCount int64 `cql:"transaction_count"` + TotalAmount float64 `cql:"total_amount"` + AverageAmount float64 `cql:"average_amount"` + FraudCount int64 `cql:"fraud_count"` + FraudRate float64 `cql:"fraud_rate"` + RiskLevel string `cql:"risk_level"` + TopMerchants []string `cql:"top_merchants"` +} + +// Alert represents fraud or risk alerts +type Alert struct { + AlertID string `cql:"alert_id"` + CreatedAt time.Time `cql:"created_at"` + AlertType string `cql:"alert_type"` + Severity string `cql:"severity"` + UserID string `cql:"user_id"` + TransactionID string `cql:"transaction_id"` + Description string `cql:"description"` + RiskScore float64 `cql:"risk_score"` + Status string `cql:"status"` + ResolvedAt time.Time `cql:"resolved_at"` + ResolvedBy string `cql:"resolved_by"` +} diff --git a/go-analytics/database/repositories.go b/go-analytics/database/repositories.go new file mode 100644 index 00000000..cdf799e8 --- /dev/null +++ b/go-analytics/database/repositories.go @@ -0,0 +1,215 @@ +package database + +// data access layer + +import ( + "fmt" + "time" + + gocql "github.com/apache/cassandra-gocql-driver/v2" +) + +type Repository struct { + session *gocql.Session +} + +func NewRepository(session *gocql.Session) *Repository { + + return &Repository{session: session} +} + +// SaveTransactionMetric saves a transaction metric to Cassandra +func (r *Repository) SaveTransactionMetric(metric *TransactionMetric) error { + + query := ` + INSERT INTO midas_analytics.transaction_metrics + (transaction_id, processed_at, amount, category, merchant, is_fraud, risk_score, location) + VALUES (?, ?, ?, ?, ?, ?, ?, ?) + ` + + return r.session.Query(query, + metric.TransactionID, + metric.ProcessedAt, + metric.Amount, + metric.Category, + metric.Merchant, + metric.IsFraud, + metric.RiskScore, + metric.Location, + ).Exec() +} + +func (r *Repository) GetTransactionMetrics(startTime, endTime time.Time) ([]*TransactionMetric, error){ + + query := ` + SELECT transaction_id, processed_at, amount, category, merchant, is_fraud, risk_score, location + from midas_analytics.transaction_metrics + where processed_at >= ? and processed_at <= ? + ALLOW FILTERING + ` + + iter := r.session.Query(query, startTime, endTime).Iter() + var metrics []*TransactionMetric + + for { + + metric := &TransactionMetric{} + + if !iter.Scan( + &metric.TransactionID, + &metric.ProcessedAt, + &metric.Amount, + &metric.Category, + &metric.Merchant, + &metric.IsFraud, + &metric.RiskScore, + &metric.Location, + ) { + break + } + + metrics = append(metrics, metric) + } + + if err := iter.Close(); err != nil { + return nil, fmt.Errorf("error iterating over results: %w", err) + } + + return metrics, nil +} + + +func (r *Repository) SaveTimeSeriesMetric(metric *TimeSeriesMetric) error { + + query := ` + INSERT INTO midas_analytics.time_series_metrics + (metric_name, time_bucket, timestamp, value, metadata) + values(?, ? , ?, ?, ?) + ` + + return r.session.Query(query, + metric.MetricName, + metric.TimeBucket, + metric.Timestamp, + metric.Value, + metric.Metadata, + ).Exec() +} + +func (r *Repository) GetTimeSeriesMetrics(metricName, timeBucket string, startTime, endTime time.Time) ([]*TimeSeriesMetric, error){ + + query :=` + SELECT metric_name, time_bucket, timestamp, value, metadata + FROM midas_analytics.time_series_metrics + where metric_name = ? and time_bucket = ? and timestamp >= ? and timestamp <= ? + ` + + iter := r.session.Query(query, metricName, timeBucket, startTime, endTime).Iter() + var metrics []*TimeSeriesMetric + + for { + + metric := &TimeSeriesMetric{} + + if !iter.Scan( + &metric.MetricName, + &metric.TimeBucket, + &metric.Timestamp, + &metric.Value, + &metric.Metadata, + ) { + break + } + + metrics = append(metrics, metric) + } + + return metrics, nil; +} + +func (r *Repository) UpdateUserBehavior(userID, date string, amount float64, isFraud bool) error { + + query := ` + UPDATE midas_analytics.user_behavior + set total_transactions = total_transactions + 1, + total_amount = total_amount + ?, + fraud_count = fraud_count + ? + where user_id = ? and date = ? + ` + + fraudIncrement := 0 + + if isFraud { + fraudIncrement++ + } + + return r.session.Query(query, int64(amount*100), fraudIncrement, userID, date).Exec() +} + +func (r *Repository) GetUserBehavior(userID string, startDate, endDate string) ([]*UserBehavior, error){ + + query := ` + SELECT user_id, date, total_transactions, total_amount, fraud_count + FROM midas_analytics.user_behavior + where user_id = ? and date >= ? and date <= ? + ` + + iter := r.session.Query(query, userID, startDate, endDate).Iter() + + var behaviors []*UserBehavior + + for { + + behavior := &UserBehavior{} + + if !iter.Scan( + &behavior.UserID, + &behavior.Date, + &behavior.TotalTransactions, + &behavior.TotalAmount, + &behavior.FraudCount, + ) { + break + } + + behaviors = append(behaviors, behavior) + } + + if err := iter.Close(); err != nil { + return nil, fmt.Errorf("error iterating results: %w", err) + } + + return behaviors, nil +} + +// SaveAlert saves a new alert +func (r *Repository) SaveAlert(alert *Alert) error { + query := ` + INSERT INTO analytics.alerts + (alert_id, created_at, alert_type, severity, user_id, transaction_id, description, risk_score, status) + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?) + ` + + return r.session.Query(query, + alert.AlertID, + alert.CreatedAt, + alert.AlertType, + alert.Severity, + alert.UserID, + alert.TransactionID, + alert.Description, + alert.RiskScore, + alert.Status, + ).Exec() +} + +// UpdateAlertStatus updates the status of an alert +func (r *Repository) UpdateAlertStatus(alertID, status, resolvedBy string) error { + query := ` + UPDATE analytics.alerts + SET status = ?, resolved_at = ?, resolved_by = ? + WHERE alert_id = ? + ` + + return r.session.Query(query, status, time.Now(), resolvedBy, alertID).Exec() +} \ No newline at end of file diff --git a/go-analytics/go.mod b/go-analytics/go.mod new file mode 100644 index 00000000..e2104aec --- /dev/null +++ b/go-analytics/go.mod @@ -0,0 +1,47 @@ +module github.com/jpmc/forge/go-analytics + +go 1.23.0 + +toolchain go1.23.2 + +require ( + github.com/IBM/sarama v1.45.2 + github.com/apache/cassandra-gocql-driver/v2 v2.0.0-rc1-tentative + github.com/sirupsen/logrus v1.9.3 + github.com/spf13/viper v1.20.1 +) + +require ( + github.com/davecgh/go-spew v1.1.1 // indirect + github.com/eapache/go-resiliency v1.7.0 // indirect + github.com/eapache/go-xerial-snappy v0.0.0-20230731223053-c322873962e3 // indirect + github.com/eapache/queue v1.1.0 // indirect + github.com/fsnotify/fsnotify v1.8.0 // indirect + github.com/go-viper/mapstructure/v2 v2.2.1 // indirect + github.com/golang/snappy v0.0.4 // indirect + github.com/hashicorp/errwrap v1.0.0 // indirect + github.com/hashicorp/go-multierror v1.1.1 // indirect + github.com/hashicorp/go-uuid v1.0.3 // indirect + github.com/jcmturner/aescts/v2 v2.0.0 // indirect + github.com/jcmturner/dnsutils/v2 v2.0.0 // indirect + github.com/jcmturner/gofork v1.7.6 // indirect + github.com/jcmturner/gokrb5/v8 v8.4.4 // indirect + github.com/jcmturner/rpc/v2 v2.0.3 // indirect + github.com/klauspost/compress v1.18.0 // indirect + github.com/pelletier/go-toml/v2 v2.2.3 // indirect + github.com/pierrec/lz4/v4 v4.1.22 // indirect + github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475 // indirect + github.com/sagikazarmark/locafero v0.7.0 // indirect + github.com/sourcegraph/conc v0.3.0 // indirect + github.com/spf13/afero v1.12.0 // indirect + github.com/spf13/cast v1.7.1 // indirect + github.com/spf13/pflag v1.0.6 // indirect + github.com/subosito/gotenv v1.6.0 // indirect + go.uber.org/multierr v1.10.0 // indirect + golang.org/x/crypto v0.38.0 // indirect + golang.org/x/net v0.40.0 // indirect + golang.org/x/sys v0.33.0 // indirect + golang.org/x/text v0.25.0 // indirect + gopkg.in/inf.v0 v0.9.1 // indirect + gopkg.in/yaml.v3 v3.0.1 // indirect +) diff --git a/go-analytics/go.sum b/go-analytics/go.sum new file mode 100644 index 00000000..3dfad9e3 --- /dev/null +++ b/go-analytics/go.sum @@ -0,0 +1,140 @@ +github.com/IBM/sarama v1.45.2 h1:8m8LcMCu3REcwpa7fCP6v2fuPuzVwXDAM2DOv3CBrKw= +github.com/IBM/sarama v1.45.2/go.mod h1:ppaoTcVdGv186/z6MEKsMm70A5fwJfRTpstI37kVn3Y= +github.com/apache/cassandra-gocql-driver/v2 v2.0.0-rc1-tentative h1:Jn4BCVpqLlJlYG/Bv9pmIV/6iPoU/dhT4HiCQG6qbDs= +github.com/apache/cassandra-gocql-driver/v2 v2.0.0-rc1-tentative/go.mod h1:QH/asJjB3mHvY6Dot6ZKMMpTcOrWJ8i9GhsvG1g0PK4= +github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/eapache/go-resiliency v1.7.0 h1:n3NRTnBn5N0Cbi/IeOHuQn9s2UwVUH7Ga0ZWcP+9JTA= +github.com/eapache/go-resiliency v1.7.0/go.mod h1:5yPzW0MIvSe0JDsv0v+DvcjEv2FyD6iZYSs1ZI+iQho= +github.com/eapache/go-xerial-snappy v0.0.0-20230731223053-c322873962e3 h1:Oy0F4ALJ04o5Qqpdz8XLIpNA3WM/iSIXqxtqo7UGVws= +github.com/eapache/go-xerial-snappy v0.0.0-20230731223053-c322873962e3/go.mod h1:YvSRo5mw33fLEx1+DlK6L2VV43tJt5Eyel9n9XBcR+0= +github.com/eapache/queue v1.1.0 h1:YOEu7KNc61ntiQlcEeUIoDTJ2o8mQznoNvUhiigpIqc= +github.com/eapache/queue v1.1.0/go.mod h1:6eCeP0CKFpHLu8blIFXhExK/dRa7WDZfr6jVFPTqq+I= +github.com/fortytw2/leaktest v1.3.0 h1:u8491cBMTQ8ft8aeV+adlcytMZylmA5nnwwkRZjI8vw= +github.com/fortytw2/leaktest v1.3.0/go.mod h1:jDsjWgpAGjm2CA7WthBh/CdZYEPF31XHquHwclZch5g= +github.com/frankban/quicktest v1.14.6 h1:7Xjx+VpznH+oBnejlPUj8oUpdxnVs4f8XU8WnHkI4W8= +github.com/frankban/quicktest v1.14.6/go.mod h1:4ptaffx2x8+WTWXmUCuVU6aPUX1/Mz7zb5vbUoiM6w0= +github.com/fsnotify/fsnotify v1.8.0 h1:dAwr6QBTBZIkG8roQaJjGof0pp0EeF+tNV7YBP3F/8M= +github.com/fsnotify/fsnotify v1.8.0/go.mod h1:8jBTzvmWwFyi3Pb8djgCCO5IBqzKJ/Jwo8TRcHyHii0= +github.com/go-viper/mapstructure/v2 v2.2.1 h1:ZAaOCxANMuZx5RCeg0mBdEZk7DZasvvZIxtHqx8aGss= +github.com/go-viper/mapstructure/v2 v2.2.1/go.mod h1:oJDH3BJKyqBA2TXFhDsKDGDTlndYOZ6rGS0BRZIxGhM= +github.com/golang/snappy v0.0.4 h1:yAGX7huGHXlcLOEtBnF4w7FQwA26wojNCwOYAEhLjQM= +github.com/golang/snappy v0.0.4/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= +github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI= +github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= +github.com/gorilla/securecookie v1.1.1/go.mod h1:ra0sb63/xPlUeL+yeDciTfxMRAA+MP+HVt/4epWDjd4= +github.com/gorilla/sessions v1.2.1/go.mod h1:dk2InVEVJ0sfLlnXv9EAgkf6ecYs/i80K/zI+bUmuGM= +github.com/hashicorp/errwrap v1.0.0 h1:hLrqtEDnRye3+sgx6z4qVLNuviH3MR5aQ0ykNJa/UYA= +github.com/hashicorp/errwrap v1.0.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4= +github.com/hashicorp/go-multierror v1.1.1 h1:H5DkEtf6CXdFp0N0Em5UCwQpXMWke8IA0+lD48awMYo= +github.com/hashicorp/go-multierror v1.1.1/go.mod h1:iw975J/qwKPdAO1clOe2L8331t/9/fmwbPZ6JB6eMoM= +github.com/hashicorp/go-uuid v1.0.2/go.mod h1:6SBZvOh/SIDV7/2o3Jml5SYk/TvGqwFJ/bN7x4byOro= +github.com/hashicorp/go-uuid v1.0.3 h1:2gKiV6YVmrJ1i2CKKa9obLvRieoRGviZFL26PcT/Co8= +github.com/hashicorp/go-uuid v1.0.3/go.mod h1:6SBZvOh/SIDV7/2o3Jml5SYk/TvGqwFJ/bN7x4byOro= +github.com/jcmturner/aescts/v2 v2.0.0 h1:9YKLH6ey7H4eDBXW8khjYslgyqG2xZikXP0EQFKrle8= +github.com/jcmturner/aescts/v2 v2.0.0/go.mod h1:AiaICIRyfYg35RUkr8yESTqvSy7csK90qZ5xfvvsoNs= +github.com/jcmturner/dnsutils/v2 v2.0.0 h1:lltnkeZGL0wILNvrNiVCR6Ro5PGU/SeBvVO/8c/iPbo= +github.com/jcmturner/dnsutils/v2 v2.0.0/go.mod h1:b0TnjGOvI/n42bZa+hmXL+kFJZsFT7G4t3HTlQ184QM= +github.com/jcmturner/gofork v1.7.6 h1:QH0l3hzAU1tfT3rZCnW5zXl+orbkNMMRGJfdJjHVETg= +github.com/jcmturner/gofork v1.7.6/go.mod h1:1622LH6i/EZqLloHfE7IeZ0uEJwMSUyQ/nDd82IeqRo= +github.com/jcmturner/goidentity/v6 v6.0.1 h1:VKnZd2oEIMorCTsFBnJWbExfNN7yZr3EhJAxwOkZg6o= +github.com/jcmturner/goidentity/v6 v6.0.1/go.mod h1:X1YW3bgtvwAXju7V3LCIMpY0Gbxyjn/mY9zx4tFonSg= +github.com/jcmturner/gokrb5/v8 v8.4.4 h1:x1Sv4HaTpepFkXbt2IkL29DXRf8sOfZXo8eRKh687T8= +github.com/jcmturner/gokrb5/v8 v8.4.4/go.mod h1:1btQEpgT6k+unzCwX1KdWMEwPPkkgBtP+F6aCACiMrs= +github.com/jcmturner/rpc/v2 v2.0.3 h1:7FXXj8Ti1IaVFpSAziCZWNzbNuZmnvw/i6CqLNdWfZY= +github.com/jcmturner/rpc/v2 v2.0.3/go.mod h1:VUJYCIDm3PVOEHw8sgt091/20OJjskO/YJki3ELg/Hc= +github.com/klauspost/compress v1.18.0 h1:c/Cqfb0r+Yi+JtIEq73FWXVkRonBlf0CRNYc8Zttxdo= +github.com/klauspost/compress v1.18.0/go.mod h1:2Pp+KzxcywXVXMr50+X0Q/Lsb43OQHYWRCY2AiWywWQ= +github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE= +github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk= +github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= +github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= +github.com/pelletier/go-toml/v2 v2.2.3 h1:YmeHyLY8mFWbdkNWwpr+qIL2bEqT0o95WSdkNHvL12M= +github.com/pelletier/go-toml/v2 v2.2.3/go.mod h1:MfCQTFTvCcUyyvvwm1+G6H/jORL20Xlb6rzQu9GuUkc= +github.com/pierrec/lz4/v4 v4.1.22 h1:cKFw6uJDK+/gfw5BcDL0JL5aBsAFdsIT18eRtLj7VIU= +github.com/pierrec/lz4/v4 v4.1.22/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475 h1:N/ElC8H3+5XpJzTSTfLsJV/mx9Q9g7kxmchpfZyxgzM= +github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4= +github.com/rogpeppe/go-internal v1.9.0 h1:73kH8U+JUqXU8lRuOHeVHaa/SZPifC7BkcraZVejAe8= +github.com/rogpeppe/go-internal v1.9.0/go.mod h1:WtVeX8xhTBvf0smdhujwtBcq4Qrzq/fJaraNFVN+nFs= +github.com/sagikazarmark/locafero v0.7.0 h1:5MqpDsTGNDhY8sGp0Aowyf0qKsPrhewaLSsFaodPcyo= +github.com/sagikazarmark/locafero v0.7.0/go.mod h1:2za3Cg5rMaTMoG/2Ulr9AwtFaIppKXTRYnozin4aB5k= +github.com/sirupsen/logrus v1.9.3 h1:dueUQJ1C2q9oE3F7wvmSGAaVtTmUizReu6fjN8uqzbQ= +github.com/sirupsen/logrus v1.9.3/go.mod h1:naHLuLoDiP4jHNo9R0sCBMtWGeIprob74mVsIT4qYEQ= +github.com/sourcegraph/conc v0.3.0 h1:OQTbbt6P72L20UqAkXXuLOj79LfEanQ+YQFNpLA9ySo= +github.com/sourcegraph/conc v0.3.0/go.mod h1:Sdozi7LEKbFPqYX2/J+iBAM6HpqSLTASQIKqDmF7Mt0= +github.com/spf13/afero v1.12.0 h1:UcOPyRBYczmFn6yvphxkn9ZEOY65cpwGKb5mL36mrqs= +github.com/spf13/afero v1.12.0/go.mod h1:ZTlWwG4/ahT8W7T0WQ5uYmjI9duaLQGy3Q2OAl4sk/4= +github.com/spf13/cast v1.7.1 h1:cuNEagBQEHWN1FnbGEjCXL2szYEXqfJPbP2HNUaca9Y= +github.com/spf13/cast v1.7.1/go.mod h1:ancEpBxwJDODSW/UG4rDrAqiKolqNNh2DX3mk86cAdo= +github.com/spf13/pflag v1.0.6 h1:jFzHGLGAlb3ruxLB8MhbI6A8+AQX/2eW4qeyNZXNp2o= +github.com/spf13/pflag v1.0.6/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An2Bg= +github.com/spf13/viper v1.20.1 h1:ZMi+z/lvLyPSCoNtFCpqjy0S4kPbirhpTMwl8BkW9X4= +github.com/spf13/viper v1.20.1/go.mod h1:P9Mdzt1zoHIG8m2eZQinpiBjo6kCmZSKBClNNqjJvu4= +github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= +github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo= +github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4= +github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= +github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4= +github.com/stretchr/testify v1.10.0 h1:Xv5erBjTwe/5IxqUQTdXv5kgmIvbHo3QQyRwhJsOfJA= +github.com/stretchr/testify v1.10.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= +github.com/subosito/gotenv v1.6.0 h1:9NlTDc1FTs4qu0DDq7AEtTPNw6SVm7uBMsUCUjABIf8= +github.com/subosito/gotenv v1.6.0/go.mod h1:Dk4QP5c2W3ibzajGcXpNraDfq2IrhjMIvMSWPKKo0FU= +github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5taEt/CY= +go.uber.org/multierr v1.10.0 h1:S0h4aNzvfcFsC3dRF1jLoaov7oRaKqRGC/pUEJ2yvPQ= +go.uber.org/multierr v1.10.0/go.mod h1:20+QtiLqy0Nd6FdQB9TLXag12DsQkrbs3htMFfDN80Y= +golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= +golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc= +golang.org/x/crypto v0.6.0/go.mod h1:OFC/31mSvZgRz0V1QTNCzfAI1aIRzbiufJtkMIlEp58= +golang.org/x/crypto v0.38.0 h1:jt+WWG8IZlBnVbomuhg2Mdq0+BBQaHbtqHEFEigjUV8= +golang.org/x/crypto v0.38.0/go.mod h1:MvrbAqul58NNYPKnOra203SB9vpuZW0e+RRZV+Ggqjw= +golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4/go.mod h1:jJ57K6gSWd91VN4djpZkiMVwK6gcyfeH4XE8wZrZaV4= +golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= +golang.org/x/net v0.0.0-20200114155413-6afb5195e5aa/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= +golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg= +golang.org/x/net v0.0.0-20220722155237-a158d28d115b/go.mod h1:XRhObCWvk6IyKnWLug+ECip1KBveYUHfp+8e9klMJ9c= +golang.org/x/net v0.6.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs= +golang.org/x/net v0.7.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs= +golang.org/x/net v0.40.0 h1:79Xs7wF06Gbdcg4kdCCIQArK11Z1hr5POQ6+fIYHNuY= +golang.org/x/net v0.40.0/go.mod h1:y0hY0exeL2Pku80/zKK7tpntoX23cqL3Oa6njdgRtds= +golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.14.0 h1:woo0S4Yywslg6hp4eUFjTVOyKt0RookbpAHG4c1HmhQ= +golang.org/x/sync v0.14.0/go.mod h1:1dzgHSNfp02xaA81J2MS99Qcpr2w7fw1gpm99rleRqA= +golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.33.0 h1:q3i8TbbEz+JRD9ywIRlyRAQbM0qF7hu24q3teo2hbuw= +golang.org/x/sys v0.33.0/go.mod h1:BJP2sWEmIv4KK5OTEluFJCKSidICx8ciO85XgH3Ak8k= +golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= +golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= +golang.org/x/term v0.5.0/go.mod h1:jMB1sMXY+tzblOD4FWmEbocvup2/aLOaQEp7JmGp78k= +golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= +golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= +golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ= +golang.org/x/text v0.7.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8= +golang.org/x/text v0.25.0 h1:qVyWApTSYLk/drJRO5mDlNYskwQznZmkpV2c8q9zls4= +golang.org/x/text v0.25.0/go.mod h1:WEdwpYrmk1qmdHvhkSTNPm3app7v4rsT8F2UD6+VHIA= +golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= +golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= +golang.org/x/tools v0.1.12/go.mod h1:hNGJHUnrk76NpqgfD5Aqm5Crs+Hm0VOH/i9J2+nxYbc= +golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk= +gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q= +gopkg.in/inf.v0 v0.9.1 h1:73M5CoZyi3ZLMOyDlQh031Cx6N9NDJ2Vvfl76EDAgDc= +gopkg.in/inf.v0 v0.9.1/go.mod h1:cWUDdTG/fYaXco+Dcufb5Vnc6Gp2YChqWtbxRZE0mXw= +gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= +gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/go-analytics/kafka/consumer.go b/go-analytics/kafka/consumer.go new file mode 100644 index 00000000..8e407b01 --- /dev/null +++ b/go-analytics/kafka/consumer.go @@ -0,0 +1,115 @@ +package kafka + +import ( + "context" + "encoding/json" + "fmt" + "time" + + "github.com/IBM/sarama" + "github.com/jpmc/forge/go-analytics/config" + "github.com/jpmc/forge/go-analytics/database" + "github.com/sirupsen/logrus" +) + +type Consumer struct { + consumer sarama.ConsumerGroup + topic string + db *database.CassandraDB + logger *logrus.Logger + // analytics *analytics.Processor +} + +func NewConsumer(kafkaConfig config.KafkaConfig, db *database.CassandraDB, logger *logrus.Logger) (*Consumer, error) { + + saramaConfig := sarama.NewConfig() + saramaConfig.Consumer.Group.Rebalance.Strategy = sarama.NewBalanceStrategyRoundRobin() + saramaConfig.Consumer.Offsets.Initial = sarama.OffsetOldest + saramaConfig.Consumer.Return.Errors = true + + consumer, err := sarama.NewConsumerGroup(kafkaConfig.Brokers, kafkaConfig.GroupID, saramaConfig) + + if err != nil { + return nil, fmt.Errorf("failed to create consumer group: %w", err) + } + + // will be implemented + //analyticsProcessor := analytics.NewProcessor(db, logger) + + return &Consumer{ + consumer: consumer, + topic: kafkaConfig.Topic, + db: db, + logger: logger, + //analytics: analyticsProcessor, + }, nil +} + +func (c *Consumer) Start(ctx context.Context) error { + + for { + + err := c.consumer.Consume(ctx, []string{c.topic}, c) + + if err != nil { + c.logger.Errorf("Error from the consumer: %v", err) + time.Sleep(time.Second) + } + + // constantly check if there is a problem with the context window and shut down gracefully + if ctx.Err() != nil { + return ctx.Err() + } + } +} + +// ConsumeClaim is called by consumer.Consumer everytime it runs. +func (c *Consumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error { + + for { + // lets you handle the first message that comes (read amessage or cxt cancellation) + select { + // <- is used to recieve a value from a channel, blocks until val available + case message := <-claim.Messages(): + c.logger.Infof("Recieved message from partition %d at offset %d", message.Partition, message.Offset) + + if err := c.processMessage(message.Value); err != nil { + c.logger.Errorf("Error processing the message: %v", err) + } + + session.MarkMessage(message, "") + + case <-session.Context().Done(): + return nil + } + } +} + +// methods required by the sarama interface, no resource allocation needed +func (c *Consumer) Setup(sarama.ConsumerGroupSession) error { + return nil +} + +func (c *Consumer) Cleanup(sarama.ConsumerGroupSession) error { + return nil +} + + +func (c *Consumer) processMessage(data []byte) error { + + var transaction AnalyticsTransaction + + if err := json.Unmarshal(data, &transaction); err != nil { + return fmt.Errorf("failed to unmarshal transaction: %w", err) + } + + c.logger.Infof("Processing transaction: %s, Amount: %.2f, Category: %s", + transaction.TransNum, transaction.Amt, transaction.Category) + + // TODO: Process analytics and store in Cassandra + // if err := c.analytics.ProcessTransaction(&transaction); err != nil { + // return fmt.Errorf("failed to process analytics: %w", err) + // } + + return nil +} diff --git a/go-analytics/kafka/models.go b/go-analytics/kafka/models.go new file mode 100644 index 00000000..d224c475 --- /dev/null +++ b/go-analytics/kafka/models.go @@ -0,0 +1,40 @@ +package kafka + +import "time" + +// analytics transaction represents the transaction data recieved from kafka + +type AnalyticsTransaction struct { + TransDateTransTime string `json:"transDateTransTime"` + UnixTime int64 `json:"unixTime"` + CcNum string `json:"ccNum"` + Amt float64 `json:"amt"` + Category string `json:"category"` + Merchant string `json:"merchant"` + IsFraud bool `json:"isFraud"` + City string `json:"city"` + State string `json:"state"` + Zip string `json:"zip"` + Lat float64 `json:"lat"` + Lon float64 `json:"lon"` + MerchLat float64 `json:"merchLat"` + MerchLon float64 `json:"merchLon"` + MerchZipcode string `json:"merchZipcode"` + TransNum string `json:"transNum"` + Gender string `json:"gender"` + Dob string `json:"dob"` + Job string `json:"job"` + CityPop int `json:"cityPop"` +} + +// processed transaction complete with analytics +type ProcessedTransaction struct { + TransactionID string `json:"transaction_id"` + ProcessedAt time.Time `json:"processed_at"` + RiskScore float64 `json:"risk_score"` + Category string `json:"category"` + Amount float64 `json:"amount"` + IsFraud bool `json:"is_fraud"` + Location string `json:"location"` + MerchantCategory string `json:"merchant_category"` +} diff --git a/go-analytics/main.go b/go-analytics/main.go new file mode 100644 index 00000000..00089820 --- /dev/null +++ b/go-analytics/main.go @@ -0,0 +1,110 @@ +package main + +import ( + "fmt" + "log" + "time" + + "github.com/jpmc/forge/go-analytics/config" + "github.com/jpmc/forge/go-analytics/database" + "gopkg.in/inf.v0" +) + +func main() { + + fmt.Println("Testing cassandra connection setup...") + + // Load configuration + // could load only cassandra, but want to test kafka running as well. + cfg, err := config.Load() + if err != nil { + log.Fatalf("Failed to load config: %v", err) + } + + if err := testCassandraConnection(&cfg.Cassandra); err != nil { + + log.Fatalf("Connection test failed: %v", err) + } + + fmt.Println("Cassandra connection successful!!") + + // Test SaveTransactionMetric function + if err := testSaveTransactionMetric(&cfg.Cassandra); err != nil { + log.Fatalf("SaveTransactionMetric test failed: %v", err) + } + + fmt.Println("SaveTransactionMetric test successful!!") +} + +func testCassandraConnection(cfg *config.CassandraConfig) error { + + db, err := database.NewCassandraConnection(*cfg) + + if err != nil { + return fmt.Errorf("failed to connect: %w", err) + } + + defer db.Close() + + // testing some query execution + var version string + + if err := db.Session().Query("Select release_version from system.local;").Scan(&version); err != nil { + return fmt.Errorf("failed to query system.local: %w", err) + } + + fmt.Printf("connected to cassandra version: %s\n", version) + + return nil +} + +func testSaveTransactionMetric(cfg *config.CassandraConfig) error { + // Create database connection + db, err := database.NewCassandraConnection(*cfg) + if err != nil { + return fmt.Errorf("failed to connect: %w", err) + } + defer db.Close() + + // Create repository + repo := database.NewRepository(db.Session()) + + // Create test transaction metric + testMetric := &database.TransactionMetric{ + TransactionID: "test-txn-001", + ProcessedAt: time.Now(), + Amount: inf.NewDec(15075, 2), // 150.75 + Category: "electronics", + Merchant: "Best Buy", + IsFraud: false, + RiskScore: inf.NewDec(15, 2), // 0.15 + Location: "New York, NY", + } + + fmt.Printf("Saving test transaction: %s, Amount: $%.2f, Category: %s\n", + testMetric.TransactionID, testMetric.Amount.String(), testMetric.Category) + + // Save the transaction metric + if err := repo.SaveTransactionMetric(testMetric); err != nil { + return fmt.Errorf("failed to save transaction metric: %w", err) + } + + fmt.Println("Transaction metric saved successfully!") + + // Verify the data was saved by retrieving it + startTime := time.Now().Add(-1 * time.Hour) // 1 hour ago + endTime := time.Now().Add(1 * time.Hour) // 1 hour from now + + metrics, err := repo.GetTransactionMetrics(startTime, endTime) + if err != nil { + return fmt.Errorf("failed to retrieve transaction metrics: %w", err) + } + + fmt.Printf("Retrieved %d transaction metrics\n", len(metrics)) + for _, metric := range metrics { + fmt.Printf("Transaction: %s, Amount: $%s, Category: %s\n", + metric.TransactionID, metric.Amount.String(), metric.Category) + } + + return nil +} diff --git a/pom.xml b/pom.xml index d1dedfec..6be85db1 100644 --- a/pom.xml +++ b/pom.xml @@ -17,6 +17,59 @@ 17 + + + + org.springframework.boot + spring-boot-starter-data-jpa + 3.2.5 + + + + + org.springframework.boot + spring-boot-starter-web + 3.2.5 + + + + + org.springframework.kafka + spring-kafka + 3.2.5 + + + + + com.h2database + h2 + 2.2.224 + + + + + org.springframework.boot + spring-boot-starter-test + 3.2.5 + test + + + + + org.springframework.kafka + spring-kafka-test + 3.1.4 + test + + + + + org.testcontainers + testcontainers + 1.19.1 + test + + diff --git a/python-producer/data/kafka_prod.py b/python-producer/data/kafka_prod.py new file mode 100644 index 00000000..204c3b23 --- /dev/null +++ b/python-producer/data/kafka_prod.py @@ -0,0 +1,31 @@ +import pandas as pd +from kafka import KafkaProducer +import json +import time + +csv_path = 'credit_card_transactions.csv' + +producer = KafkaProducer( + bootstrap_servers = 'localhost:9092', + value_serializer = lambda v: json.dumps(v).encode('utf-8') +) + +topic = 'transactions' + +chunk_size = 100 + +for chunk in pd.read_csv(csv_path, chunksize=chunk_size): + + for _, row in chunk.iterrows(): + producer.send(topic, row.to_dict()) + + producer.flush() + + print(f"Sent {chunk_size} transactions, sleep for 10 seconds...") + + time.sleep(10) + +producer.close() + +print("All transactions sent.") + diff --git a/src/main/java/com/jpmc/midascore/component/TransactionListener.java b/src/main/java/com/jpmc/midascore/component/TransactionListener.java new file mode 100644 index 00000000..29acb329 --- /dev/null +++ b/src/main/java/com/jpmc/midascore/component/TransactionListener.java @@ -0,0 +1,30 @@ +package com.jpmc.midascore.component; + +import com.jpmc.midascore.foundation.Transaction; +import com.jpmc.midascore.service.TransactionService; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.kafka.annotation.KafkaListener; +import org.springframework.stereotype.Component; + +@Component +public class TransactionListener { + private static final Logger logger = LoggerFactory.getLogger(TransactionListener.class); + + private final TransactionService transactionService; + + public TransactionListener(TransactionService transactionService) { + this.transactionService = transactionService; + } + + @Value("${general.kafka-topic}") + private String topic; + + @KafkaListener(topics = "${general.kafka-topic}", groupId = "midas-core-group") + public void listen(Transaction transaction) { + logger.info("Received transaction: {}", transaction); + transactionService.processTransaction(transaction); + transactionService.printAllBalances(); + } +} \ No newline at end of file diff --git a/src/main/java/com/jpmc/midascore/controller/BalanceController.java b/src/main/java/com/jpmc/midascore/controller/BalanceController.java new file mode 100644 index 00000000..e386239a --- /dev/null +++ b/src/main/java/com/jpmc/midascore/controller/BalanceController.java @@ -0,0 +1,24 @@ +package com.jpmc.midascore.controller; + +import com.jpmc.midascore.foundation.Balance; +import com.jpmc.midascore.repository.UserRepository; +import org.springframework.web.bind.annotation.GetMapping; +import org.springframework.web.bind.annotation.RequestParam; +import org.springframework.web.bind.annotation.RestController; + + +@RestController +public class BalanceController { + private final UserRepository userRepository; + + public BalanceController(UserRepository userRepository) { + this.userRepository = userRepository; + } + + @GetMapping("/balance") + public Balance getBalance(@RequestParam int userId) { + return userRepository.findById(userId) + .map(user -> new Balance(user.getBalance())) + .orElse(new Balance(0.0f)); + } +} \ No newline at end of file diff --git a/src/main/java/com/jpmc/midascore/entity/TransactionRecord.java b/src/main/java/com/jpmc/midascore/entity/TransactionRecord.java new file mode 100644 index 00000000..d86aaad6 --- /dev/null +++ b/src/main/java/com/jpmc/midascore/entity/TransactionRecord.java @@ -0,0 +1,70 @@ +package com.jpmc.midascore.entity; + +import jakarta.persistence.*; + +@Entity +public class TransactionRecord { + + @Id + @GeneratedValue(strategy = GenerationType.IDENTITY) + private Long id; + + @ManyToOne + @JoinColumn(name = "sender_id", nullable = false) + private UserRecord sender; + + @ManyToOne + @JoinColumn(name = "recipient_id", nullable = false) + private UserRecord recipient; + + @Column(nullable = false) + private float amount; + + @Column(nullable = true) + private float incentive; + + /* + @Column(nullable = false) + private boolean valid; + */ + + protected TransactionRecord() { + } + + // for now, we are only storing valid transactions so there is no need for a valid field + public TransactionRecord(UserRecord sender, UserRecord recipient, float amount, float incentive) { + this.sender = sender; + this.recipient = recipient; + this.amount = amount; + this.incentive = incentive; + //this.valid = valid; + } + + // Getters and setters + public Long getId() { + return id; + } + + public UserRecord getSender() { + return sender; + } + + public UserRecord getRecipient() { + return recipient; + } + + public float getAmount() { + return amount; + } + + public float getIncentive() { + return incentive; + } + + /* + public boolean isValid() { + return valid; + } + */ +} + diff --git a/src/main/java/com/jpmc/midascore/repository/TransactionRepository.java b/src/main/java/com/jpmc/midascore/repository/TransactionRepository.java new file mode 100644 index 00000000..310fc65b --- /dev/null +++ b/src/main/java/com/jpmc/midascore/repository/TransactionRepository.java @@ -0,0 +1,7 @@ +package com.jpmc.midascore.repository; + +import com.jpmc.midascore.entity.TransactionRecord; +import org.springframework.data.repository.CrudRepository; + +public interface TransactionRepository extends CrudRepository { +} \ No newline at end of file diff --git a/src/main/java/com/jpmc/midascore/repository/UserRepository.java b/src/main/java/com/jpmc/midascore/repository/UserRepository.java index 937275b6..d346a7e5 100644 --- a/src/main/java/com/jpmc/midascore/repository/UserRepository.java +++ b/src/main/java/com/jpmc/midascore/repository/UserRepository.java @@ -2,7 +2,8 @@ import com.jpmc.midascore.entity.UserRecord; import org.springframework.data.repository.CrudRepository; +import java.util.Optional; public interface UserRepository extends CrudRepository { - UserRecord findById(long id); + Optional findById(long id); } diff --git a/src/main/java/com/jpmc/midascore/service/TransactionService.java b/src/main/java/com/jpmc/midascore/service/TransactionService.java new file mode 100644 index 00000000..cf3bbcf7 --- /dev/null +++ b/src/main/java/com/jpmc/midascore/service/TransactionService.java @@ -0,0 +1,102 @@ +package com.jpmc.midascore.service; + +import com.jpmc.midascore.entity.TransactionRecord; +import com.jpmc.midascore.entity.UserRecord; +import com.jpmc.midascore.foundation.Transaction; +import com.jpmc.midascore.repository.TransactionRepository; +import com.jpmc.midascore.repository.UserRepository; +import org.springframework.stereotype.Service; +import org.springframework.transaction.annotation.Transactional; +import java.util.Optional; +import java.util.List; +import org.springframework.web.client.RestTemplate; + +import com.jpmc.midascore.foundation.Balance; + + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +@Service +public class TransactionService { + private static final Logger logger = LoggerFactory.getLogger(TransactionService.class); + + private final UserRepository userRepository; + private final TransactionRepository transactionRepository; + private final RestTemplate restTemplate; + + + private static final String INCENTIVE_API = "http://localhost:8080/incentive"; + + public TransactionService(UserRepository userRepository, TransactionRepository transactionRepository) { + this.userRepository = userRepository; + this.transactionRepository = transactionRepository; + this.restTemplate = new RestTemplate(); + } + + public float getIncentive(Transaction transaction) { + + try { + Balance response = restTemplate.postForObject(INCENTIVE_API, transaction, Balance.class); + return response != null ? response.getAmount(): 0.0f; + } catch (Exception e) { + logger.error("Failed to get incentive for transaction: {}", transaction, e); + return 0.0f; + } + } + + @Transactional + public void processTransaction(Transaction transaction) { + Optional senderOpt = userRepository.findById(transaction.getSenderId()); + Optional recipientOpt = userRepository.findById(transaction.getRecipientId()); + + if (senderOpt.isEmpty() || recipientOpt.isEmpty()) { + logger.error("Invalid transaction, either sender or recipient not found"); + return; + } + + UserRecord sender = senderOpt.get(); + UserRecord recipient = recipientOpt.get(); + + float incentive = getIncentive(transaction); + + // Validate transaction + boolean isValid = validateTransaction(sender, recipient, transaction.getAmount()); + + // Create transaction record + TransactionRecord record = new TransactionRecord(sender, recipient, transaction.getAmount(), incentive); + + // If valid, update balances + if (isValid) { + transactionRepository.save(record); // only make transaction record to the DB if valid transaction + sender.setBalance(sender.getBalance() - transaction.getAmount()); + recipient.setBalance(recipient.getBalance() + transaction.getAmount() + incentive); + userRepository.save(sender); + userRepository.save(recipient); + + logger.info("Processed transaction with incentive: {}", incentive); + } + } + + private boolean validateTransaction(UserRecord sender, UserRecord recipient, float amount) { + + if(sender.getBalance() < amount){ + + logger.error("Invalid transaction: Sender has insufficient funds for this transaction"); + return false; + } + + return true; + } + + public void printAllBalances() { + + Iterable users = userRepository.findAll(); + System.out.println("\nCurrent Balances:"); + System.out.println("----------------"); + for (UserRecord user : users) { + System.out.printf("%s: %.2f%n", user.getName(), user.getBalance()); + } + System.out.println("----------------\n"); + } +} \ No newline at end of file diff --git a/src/main/resources/application.yml b/src/main/resources/application.yml new file mode 100644 index 00000000..833e565b --- /dev/null +++ b/src/main/resources/application.yml @@ -0,0 +1,9 @@ +server: + port: 33400 + +spring: + kafka: + bootstrap-servers: localhost:9092 + +general: + kafka-topic: transactions