From a246b22b0e790450b334d7865f6ed6176ba5fcc7 Mon Sep 17 00:00:00 2001 From: Paul Noel Date: Tue, 23 Sep 2025 21:43:11 -0500 Subject: [PATCH] node: change aptos watcher to use a graphql indexer --- node/cmd/guardiand/node.go | 46 +- node/pkg/watchers/aptos/config.go | 15 +- node/pkg/watchers/aptos/watcher.go | 418 ++++++++++++++-- node/pkg/watchers/aptos/watcher_test.go | 636 ++++++++++++++++++++++++ 4 files changed, 1068 insertions(+), 47 deletions(-) create mode 100644 node/pkg/watchers/aptos/watcher_test.go diff --git a/node/cmd/guardiand/node.go b/node/cmd/guardiand/node.go index 2e91633fbd..d3f8abc6c9 100644 --- a/node/cmd/guardiand/node.go +++ b/node/cmd/guardiand/node.go @@ -138,9 +138,11 @@ var ( accountantNttKeyPath *string accountantNttKeyPassPhrase *string - aptosRPC *string - aptosAccount *string - aptosHandle *string + aptosRPC *string + aptosAccount *string + aptosHandle *string + aptosIndexerRPC *string + aptosIndexerToken *string movementRPC *string movementAccount *string @@ -389,6 +391,8 @@ func init() { aptosRPC = node.RegisterFlagWithValidationOrFail(NodeCmd, "aptosRPC", "Aptos RPC URL", "http://aptos:8080", []string{"http", "https"}) aptosAccount = NodeCmd.Flags().String("aptosAccount", "", "aptos account") aptosHandle = NodeCmd.Flags().String("aptosHandle", "", "aptos handle") + aptosIndexerRPC = node.RegisterFlagWithValidationOrFail(NodeCmd, "aptosIndexerRPC", "Aptos Indexer RPC URL", "", []string{"http", "https"}) + aptosIndexerToken = NodeCmd.Flags().String("aptosIndexerToken", "", "Aptos Indexer access token") movementRPC = node.RegisterFlagWithValidationOrFail(NodeCmd, "movementRPC", "Movement RPC URL", "", []string{"http", "https"}) movementAccount = NodeCmd.Flags().String("movementAccount", "", "movement account") @@ -925,8 +929,24 @@ func runNode(cmd *cobra.Command, args []string) { logger.Fatal("Both --nearContract and --nearRPC must be set or both unset") } - if !argsConsistent([]string{*aptosAccount, *aptosRPC, *aptosHandle}) { - logger.Fatal("Either --aptosAccount, --aptosRPC and --aptosHandle must all be set or all unset") + // Validate Aptos configuration - support both legacy and indexer modes + aptosBasic := []string{*aptosRPC, *aptosAccount, *aptosHandle} + + if !argsConsistent(aptosBasic) { + logger.Fatal("Either --aptosRPC, --aptosAccount, and --aptosHandle must all be set together or all unset") + } + + // If Aptos is enabled, validate indexer configuration + if *aptosRPC != "" { + // Only aptosIndexerRPC is required for indexer mode, token is optional + useIndexerMode := *aptosIndexerRPC != "" + + // Log which mode will be used + if useIndexerMode { + logger.Info("Aptos watcher will run in indexer mode (GraphQL + REST)") + } else { + logger.Info("Aptos watcher will run in legacy mode (REST API only)") + } } if !argsConsistent([]string{*movementAccount, *movementRPC, *movementHandle}) { @@ -1658,12 +1678,18 @@ func runNode(cmd *cobra.Command, args []string) { } if shouldStart(aptosRPC) { + // Determine if we should use indexer mode (only aptosIndexerRPC required) + useIndexer := *aptosIndexerRPC != "" + wc := &aptos.WatcherConfig{ - NetworkID: "aptos", - ChainID: vaa.ChainIDAptos, - Rpc: *aptosRPC, - Account: *aptosAccount, - Handle: *aptosHandle, + NetworkID: "aptos", + ChainID: vaa.ChainIDAptos, + Rpc: *aptosRPC, + Account: *aptosAccount, + Handle: *aptosHandle, + IndexerRpc: *aptosIndexerRPC, + IndexerToken: *aptosIndexerToken, + UseIndexer: useIndexer, } watcherConfigs = append(watcherConfigs, wc) } diff --git a/node/pkg/watchers/aptos/config.go b/node/pkg/watchers/aptos/config.go index 4dbdbb5ce6..8d9bfeda86 100644 --- a/node/pkg/watchers/aptos/config.go +++ b/node/pkg/watchers/aptos/config.go @@ -11,11 +11,14 @@ import ( ) type WatcherConfig struct { - NetworkID watchers.NetworkID // human readable name - ChainID vaa.ChainID // ChainID - Rpc string - Account string - Handle string + NetworkID watchers.NetworkID // human readable name + ChainID vaa.ChainID // ChainID + Rpc string + Account string + Handle string + IndexerRpc string + IndexerToken string + UseIndexer bool // true for indexer mode, false for legacy mode } func (wc *WatcherConfig) GetNetworkID() watchers.NetworkID { @@ -35,5 +38,5 @@ func (wc *WatcherConfig) Create( _ chan<- *common.GuardianSet, _ common.Environment, ) (supervisor.Runnable, interfaces.Reobserver, error) { - return NewWatcher(wc.ChainID, wc.NetworkID, wc.Rpc, wc.Account, wc.Handle, msgC, obsvReqC).Run, nil, nil + return NewWatcher(wc.ChainID, wc.NetworkID, wc.Rpc, wc.Account, wc.Handle, wc.IndexerRpc, wc.IndexerToken, wc.UseIndexer, msgC, obsvReqC).Run, nil, nil } diff --git a/node/pkg/watchers/aptos/watcher.go b/node/pkg/watchers/aptos/watcher.go index 4fa1c8b578..f95170ea91 100644 --- a/node/pkg/watchers/aptos/watcher.go +++ b/node/pkg/watchers/aptos/watcher.go @@ -1,9 +1,11 @@ package aptos import ( + "bytes" "context" "encoding/binary" "encoding/hex" + "encoding/json" "fmt" "math" "net/http" @@ -29,9 +31,12 @@ type ( chainID vaa.ChainID networkID string - aptosRPC string - aptosAccount string - aptosHandle string + aptosRPC string + aptosAccount string + aptosHandle string + aptosIndexerRPC string + aptosIndexerToken string + useIndexer bool msgC chan<- *common.MessagePublication obsvReqC <-chan *gossipv1.ObservationRequest @@ -60,18 +65,24 @@ func NewWatcher( aptosRPC string, aptosAccount string, aptosHandle string, + aptosIndexerRPC string, + aptosIndexerToken string, + useIndexer bool, msgC chan<- *common.MessagePublication, obsvReqC <-chan *gossipv1.ObservationRequest, ) *Watcher { return &Watcher{ - chainID: chainID, - networkID: string(networkID), - aptosRPC: aptosRPC, - aptosAccount: aptosAccount, - aptosHandle: aptosHandle, - msgC: msgC, - obsvReqC: obsvReqC, - readinessSync: common.MustConvertChainIdToReadinessSyncing(chainID), + chainID: chainID, + networkID: string(networkID), + aptosRPC: aptosRPC, + aptosAccount: aptosAccount, + aptosHandle: aptosHandle, + aptosIndexerRPC: aptosIndexerRPC, + aptosIndexerToken: aptosIndexerToken, + useIndexer: useIndexer, + msgC: msgC, + obsvReqC: obsvReqC, + readinessSync: common.MustConvertChainIdToReadinessSyncing(chainID), } } @@ -82,34 +93,50 @@ func (e *Watcher) Run(ctx context.Context) error { logger := supervisor.Logger(ctx) - logger.Info("Starting watcher", - zap.String("watcher_name", e.networkID), - zap.String("rpc", e.aptosRPC), - zap.String("account", e.aptosAccount), - zap.String("handle", e.aptosHandle), - ) + if e.useIndexer { + logger.Info("Starting Aptos watcher in indexer mode", + zap.String("watcher_name", e.networkID), + zap.String("rpc", e.aptosRPC), + zap.String("account", e.aptosAccount), + zap.String("handle", e.aptosHandle), + zap.String("indexerRpc", e.aptosIndexerRPC), + ) + } else { + logger.Info("Starting Aptos watcher in legacy mode", + zap.String("watcher_name", e.networkID), + zap.String("rpc", e.aptosRPC), + zap.String("account", e.aptosAccount), + zap.String("handle", e.aptosHandle), + ) + } // SECURITY: the API guarantees that we only get the events from the right // contract - var eventsEndpoint = fmt.Sprintf(`%s/v1/accounts/%s/events/%s/event`, e.aptosRPC, e.aptosAccount, e.aptosHandle) - var aptosHealth = fmt.Sprintf(`%s/v1`, e.aptosRPC) - logger.Info("watcher connecting to RPC node ", - zap.String("url", e.aptosRPC), - zap.String("eventsQuery", eventsEndpoint), - zap.String("healthQuery", aptosHealth), - ) + // Important: There are 3 different sequence/version numbers in play: + // 1. Aptos Version: Transaction version number (like block number in EVM) + // 2. Aptos Sequence Number: Event-specific counter for WormholeMessage events + // 3. Wormhole Sequence: Protocol sequence inside the event data (goes into VAA) + // - // the events have sequence numbers associated with them in the aptos API - // (NOTE: this is not the same as the wormhole sequence id). The event - // endpoint is paginated, so we use this variable to keep track of which - // sequence number to look up next. - var nextSequence uint64 = 0 + supervisor.Signal(ctx, supervisor.SignalHealthy) + if e.useIndexer { + return e.runIndexerMode(ctx, logger) + } else { + return e.runLegacyMode(ctx, logger) + } +} + +func (e *Watcher) runLegacyMode(ctx context.Context, logger *zap.Logger) error { + // Legacy REST API implementation from original code timer := time.NewTicker(time.Second * 1) defer timer.Stop() - supervisor.Signal(ctx, supervisor.SignalHealthy) + // nextSequence tracks the Aptos Sequence Number (not version, not wormhole seq) + var nextSequence uint64 = 0 + var eventsEndpoint = fmt.Sprintf(`%s/v1/accounts/%s/events/%s/event`, e.aptosRPC, e.aptosAccount, e.aptosHandle) + var aptosHealth = fmt.Sprintf(`%s/v1`, e.aptosRPC) for { select { @@ -203,6 +230,12 @@ func (e *Watcher) Run(ctx context.Context) error { // the endpoint returns an array of events, ordered by sequence // id (ASC) + // Check if the array is empty + + if !events.Exists() || !events.IsArray() || len(events.Array()) == 0 { + logger.Warn("No new events found") + continue + } for _, event := range events.Array() { eventSequence := event.Get("sequence_number") if !eventSequence.Exists() { @@ -267,6 +300,216 @@ func (e *Watcher) Run(ctx context.Context) error { } } +func (e *Watcher) runIndexerMode(ctx context.Context, logger *zap.Logger) error { + timer := time.NewTicker(time.Second * 1) + defer timer.Stop() + + // nextSequence tracks the Aptos Sequence Number (not version, not wormhole seq) + var nextSequence uint64 = 0 + + // Add authorization header for indexer API calls, if token is present + headers := make(map[string]string) + if e.aptosIndexerToken != "" { + headers["Authorization"] = "Bearer " + e.aptosIndexerToken + } + + for { + select { + case <-ctx.Done(): + return ctx.Err() + case r := <-e.obsvReqC: + // node/pkg/node/reobserve.go already enforces the chain id is a valid uint16 + // and only writes to the channel for this chain id. + // If either of the below cases are true, something has gone wrong + if r.ChainId > math.MaxUint16 || vaa.ChainID(r.ChainId) != e.chainID { + panic("invalid chain ID") + } + + // uint64 will read the *first* 8 bytes, but the sequence is stored in the *last* 8. + nativeSeq := binary.BigEndian.Uint64(r.TxHash[24:]) + + logger.Info("Received obsv request", zap.Uint64("tx_hash", nativeSeq)) + + query := fmt.Sprintf(`query GetVersionBySequence { msg(where: {sequence_num: {_eq: %d}}) { version } }`, nativeSeq) + + body, err := e.queryIndexer(query, headers) + if err != nil { + logger.Error("retrievePayload", zap.Error(err)) + p2p.DefaultRegistry.AddErrorCount(e.chainID, 1) + continue + } + + if !gjson.Valid(string(body)) { + logger.Error("InvalidJson: " + string(body)) + p2p.DefaultRegistry.AddErrorCount(e.chainID, 1) + continue + } + + // Parse the GraphQL response + response := gjson.ParseBytes(body) + + // Extract the msg array from data.msg + messages := response.Get("data.msg") + // NOTE: Without this check, the code below panics if there are no results + if !messages.Exists() || len(messages.Array()) == 0 { + logger.Error("No data.msg field in indexer response or empty array") + p2p.DefaultRegistry.AddErrorCount(e.chainID, 1) + continue + } + + // Get the version from the first (and should be only) result + // Check if there is more than one result? + if len(messages.Array()) > 1 { + logger.Warn("More than one result found for sequence in indexer response", zap.Uint64("sequence", nativeSeq), zap.Int("count", len(messages.Array()))) + } + // Get the version from the first result + version := messages.Array()[0].Get("version") + if !version.Exists() { + logger.Error("No version field in indexer response") + p2p.DefaultRegistry.AddErrorCount(e.chainID, 1) + continue + } + + versionNum := version.Uint() + + // Process the transaction and extract WormholeMessage events + // isReobservation=true + if err := e.processTransactionVersion(logger, versionNum, true); err != nil { + logger.Error("Failed to process transaction for reobservation", + zap.Uint64("version", versionNum), + zap.Uint64("sequence", nativeSeq), + zap.Error(err)) + continue + } + + case <-timer.C: + query := "" + + if nextSequence == 0 { + // if nextSequence is 0, we look up the most recent event + // Get both version (for fetching tx) and sequence_num (for observeData) + query = "query GetLastEvent { msg(order_by: {sequence_num: desc}, limit: 1) { version sequence_num } }" + } else { + // otherwise just look up events starting at nextSequence. + // this will potentially return multiple events (whatever + // the default limit is per page), so we'll handle all of them. + query = fmt.Sprintf(`query GetNextEvents { msg(where: {sequence_num: {_gt: %d}}, order_by: {sequence_num: asc}) { version sequence_num } }`, nextSequence) + } + + eventsJson, err := e.queryIndexer(query, headers) + if err != nil { + logger.Error("queryIndexer", zap.Error(err)) + p2p.DefaultRegistry.AddErrorCount(e.chainID, 1) + continue + } + + // data doesn't exist yet. skip, and try again later + // this happens when the sequence id we're looking up hasn't + // been used yet. + if string(eventsJson) == "" { + continue + } + + if !gjson.Valid(string(eventsJson)) { + logger.Error("InvalidJson: " + string(eventsJson)) + p2p.DefaultRegistry.AddErrorCount(e.chainID, 1) + continue + } + + // Parse the GraphQL response + response := gjson.ParseBytes(eventsJson) + + // Extract the msg array from data.msg + messages := response.Get("data.msg") + if !messages.Exists() { + logger.Error("No data.msg field in indexer response") + p2p.DefaultRegistry.AddErrorCount(e.chainID, 1) + continue + } + // Check if the array is empty + if len(messages.Array()) == 0 { + logger.Warn("No new events found in indexer response") + continue + } + + // Walk through the array of events + for _, msg := range messages.Array() { + version := msg.Get("version") + sequenceNum := msg.Get("sequence_num") + if !version.Exists() || !sequenceNum.Exists() { + continue + } + + versionNum := version.Uint() + aptosSeqNum := sequenceNum.Uint() + logger.Debug("Found event from indexer", + zap.Uint64("version", versionNum), + zap.Uint64("sequence_num", aptosSeqNum)) + + if nextSequence == 0 && aptosSeqNum != 0 { + // Avoid publishing an old observation on startup. This does not block the first message on a new chain (when eventSeq would be zero). + nextSequence = aptosSeqNum + 1 + continue + } + + // Process the transaction and extract WormholeMessage events + // isReobservation=false + if err := e.processTransactionVersion(logger, versionNum, false); err != nil { + logger.Error("Failed to process transaction", + zap.Uint64("version", versionNum), + zap.Uint64("sequence", aptosSeqNum), + zap.Error(err)) + continue + } + + // Update nextSequence to track progress using the Aptos sequence_num + if aptosSeqNum > nextSequence { + nextSequence = aptosSeqNum + } + } + + // Health check endpoint + aptosHealth := fmt.Sprintf(`%s/v1`, e.aptosRPC) + health, err := e.retrievePayload(aptosHealth) + if err != nil { + logger.Error("health", zap.Error(err)) + p2p.DefaultRegistry.AddErrorCount(e.chainID, 1) + continue + } + + if !gjson.Valid(string(health)) { + logger.Error("Invalid JSON in health response: " + string(health)) + p2p.DefaultRegistry.AddErrorCount(e.chainID, 1) + continue + + } + + // TODO: Make this log more useful for humans + logger.Debug(string(health) + string(eventsJson)) + + pHealth := gjson.ParseBytes(health) + + blockHeight := pHealth.Get("block_height") + + if blockHeight.Uint() > math.MaxInt64 { + logger.Error("Block height not a valid uint64: ", zap.Uint64("blockHeight", blockHeight.Uint())) + p2p.DefaultRegistry.AddErrorCount(vaa.ChainIDAptos, 1) + continue + } + + if blockHeight.Exists() { + currentAptosHeight.WithLabelValues(e.networkID).Set(float64(blockHeight.Uint())) + p2p.DefaultRegistry.SetNetworkStats(e.chainID, &gossipv1.Heartbeat_Network{ + Height: int64(blockHeight.Uint()), // #nosec G115 -- This is validated above + ContractAddress: e.aptosAccount, + }) + + readiness.SetReady(e.readinessSync) + } + } + } +} + //nolint:noctx // TODO: this function should use a context. (Also this line flags nolintlint unless placed here.) func (e *Watcher) retrievePayload(s string) ([]byte, error) { //nolint:gosec // the URL is hard-coded to the Aptos RPC endpoint. @@ -279,7 +522,120 @@ func (e *Watcher) retrievePayload(s string) ([]byte, error) { if err != nil { return nil, err } - return body, err + return body, nil +} + +//nolint:noctx // TODO: this function should use a context. +func (e *Watcher) queryIndexer(query string, headers map[string]string) ([]byte, error) { + // Create GraphQL request body + requestBody := map[string]interface{}{ + "query": query, + } + jsonBody, err := json.Marshal(requestBody) + if err != nil { + return nil, err + } + + req, err := http.NewRequest("POST", e.aptosIndexerRPC, bytes.NewBuffer(jsonBody)) + if err != nil { + return nil, err + } + + // Set content type for GraphQL + req.Header.Set("Content-Type", "application/json") + + // Add additional headers if provided + for key, value := range headers { + req.Header.Set(key, value) + } + + // The indexer doesn't return errors in the HTTP status code. + // If there is no "data" field in the response, treat it as an error. + // The "data" field may, also, be empty, which may or may not be an error, + // depending on the query. + client := &http.Client{} + res, err := client.Do(req) + if err != nil { + return nil, err + } + defer res.Body.Close() + body, err := common.SafeRead(res.Body) + if err != nil { + return nil, err + } + + // Check for GraphQL errors in the response + // The indexer may return errors with a 200 status code + if gjson.Valid(string(body)) { + response := gjson.ParseBytes(body) + errors := response.Get("errors") + if errors.Exists() && errors.IsArray() && len(errors.Array()) > 0 { + // Get the first error message + firstError := errors.Array()[0] + message := firstError.Get("message") + if message.Exists() { + return nil, fmt.Errorf("indexer error: %s", message.String()) + } + return nil, fmt.Errorf("indexer error: %s", errors.String()) + } + } + + return body, nil +} + +// processTransactionVersion fetches a transaction by version and extracts WormholeMessage events +func (e *Watcher) processTransactionVersion( + logger *zap.Logger, + versionNum uint64, + isReobservation bool, +) error { + // Fetch transaction details for this version + var txEndpoint = fmt.Sprintf(`%s/v1/transactions/by_version/%d`, e.aptosRPC, versionNum) + txData, err := e.retrievePayload(txEndpoint) + if err != nil { + logger.Error("retrievePayload", zap.Error(err)) + p2p.DefaultRegistry.AddErrorCount(e.chainID, 1) + return err + } + + if !gjson.Valid(string(txData)) { + logger.Error("Invalid JSON in transaction response", zap.String("data", string(txData))) + p2p.DefaultRegistry.AddErrorCount(e.chainID, 1) + return fmt.Errorf("invalid JSON in transaction response") + } + + // Parse the transaction data + txResult := gjson.ParseBytes(txData) + + // Look for WormholeMessage in the events array + whEventType := fmt.Sprintf("%s::state::WormholeMessage", e.aptosAccount) + events := txResult.Get("events") + if !events.Exists() || !events.IsArray() || len(events.Array()) == 0 { + logger.Warn("No events found in transaction", zap.Uint64("version", versionNum)) + } else { + for _, event := range events.Array() { + eventType := event.Get("type") + if eventType.String() == whEventType { + // Get the Aptos sequence number from this specific event + eventSeq := event.Get("sequence_number") + if !eventSeq.Exists() { + logger.Error("WormholeMessage event missing sequence_number field", + zap.Uint64("version", versionNum)) + continue + } + + // Extract the event data + data := event.Get("data") + if data.Exists() { + // The event data has the fields we need for observeData + e.observeData(logger, data, eventSeq.Uint(), isReobservation) + } else { + logger.Error("WormholeMessage event missing data field", zap.Uint64("version", versionNum)) + } + } + } + } + return nil } func (e *Watcher) observeData(logger *zap.Logger, data gjson.Result, nativeSeq uint64, isReobservation bool) { diff --git a/node/pkg/watchers/aptos/watcher_test.go b/node/pkg/watchers/aptos/watcher_test.go new file mode 100644 index 0000000000..eb9a9203e1 --- /dev/null +++ b/node/pkg/watchers/aptos/watcher_test.go @@ -0,0 +1,636 @@ +package aptos + +import ( + "encoding/json" + "fmt" + "net/http" + "net/http/httptest" + "testing" + + "github.com/certusone/wormhole/node/pkg/common" + gossipv1 "github.com/certusone/wormhole/node/pkg/proto/gossip/v1" + "github.com/certusone/wormhole/node/pkg/watchers" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "github.com/tidwall/gjson" + "github.com/wormhole-foundation/wormhole/sdk/vaa" + "go.uber.org/zap" +) + +// Test against real indexer to understand data format +func TestQueryIndexerRealData(t *testing.T) { + // Skip this test in CI - it's for local development/exploration + t.Skip("Skipping real indexer test - for local development only") + + indexerRPC := "https://your-indexer-endpoint.com/v1/graphql" + indexerToken := "your-indexer-token" //nolint:gosec // This is a test token // Add your indexer token here + + // Create a minimal watcher with just the fields we need + w := &Watcher{ + chainID: vaa.ChainIDAptos, + networkID: string(watchers.NetworkID("aptos")), + aptosIndexerRPC: indexerRPC, + aptosIndexerToken: indexerToken, + } + + tests := []struct { + name string + query string + }{ + { + name: "GetLastEvent", + query: "query GetLastEvent { msg(order_by: {sequence_num: desc}, limit: 1) { version sequence_num } }", + }, + { + name: "GetNextEvents", + query: "query GetNextEvents { msg(where: {sequence_num: {_gt: 173800}}, order_by: {sequence_num: asc}, limit: 5) { version sequence_num } }", + }, + { + name: "GetVersionBySequence", + query: "query GetVersionBySequence { msg(where: {sequence_num: {_eq: 173829}}) { version } }", + }, + } + + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + headers := make(map[string]string) + if w.aptosIndexerToken != "" { + headers["Authorization"] = "Bearer " + w.aptosIndexerToken + } + + result, err := w.queryIndexer(tc.query, headers) + require.NoError(t, err, "Failed to query indexer") + + // Parse and print the result + fmt.Printf("\n=== %s ===\n", tc.name) + fmt.Printf("Query: %s\n", tc.query) + fmt.Printf("Response: %s\n", string(result)) + + // Validate it's valid JSON + assert.True(t, gjson.Valid(string(result))) + + // Parse and check structure + parsed := gjson.ParseBytes(result) + data := parsed.Get("data.msg") + if data.Exists() { + fmt.Printf("Found %d results\n", len(data.Array())) + for i, msg := range data.Array() { + version := msg.Get("version") + seqNum := msg.Get("sequence_num") + fmt.Printf(" [%d] version: %s, sequence_num: %s\n", i, version.String(), seqNum.String()) + } + } + + // Check for errors + errors := parsed.Get("errors") + if errors.Exists() { + fmt.Printf("GraphQL Errors: %s\n", errors.String()) + } + }) + } +} + +// Unit tests with mocked responses +func TestQueryIndexer(t *testing.T) { + tests := []struct { + name string + query string + token string + mockResponse string + mockStatusCode int + expectedError bool + expectedHeaders map[string]string + validateResponse func(t *testing.T, result []byte) + }{ + { + name: "successful query with data", + query: "query GetLastEvent { msg(order_by: {sequence_num: desc}, limit: 1) { version sequence_num } }", + token: "test-token", + mockResponse: `{"data":{"msg":[{"version":"3452526584","sequence_num":"173838"}]}}`, + mockStatusCode: 200, + expectedError: false, + expectedHeaders: map[string]string{ + "Authorization": "Bearer test-token", + "Content-Type": "application/json", + }, + validateResponse: func(t *testing.T, result []byte) { + parsed := gjson.ParseBytes(result) + assert.True(t, parsed.Get("data.msg").Exists()) + assert.Equal(t, "3452526584", parsed.Get("data.msg.0.version").String()) + assert.Equal(t, "173838", parsed.Get("data.msg.0.sequence_num").String()) + }, + }, + { + name: "successful query with empty result", + query: "query GetVersionBySequence { msg(where: {sequence_num: {_eq: 999999}}) { version } }", + token: "", + mockResponse: `{"data":{"msg":[]}}`, + mockStatusCode: 200, + expectedError: false, + expectedHeaders: map[string]string{ + "Content-Type": "application/json", + // No Authorization header when token is empty + }, + validateResponse: func(t *testing.T, result []byte) { + parsed := gjson.ParseBytes(result) + assert.True(t, parsed.Get("data.msg").Exists()) + assert.Equal(t, 0, len(parsed.Get("data.msg").Array())) + }, + }, + { + name: "graphql error response", + query: "query Invalid { invalid }", + token: "test-token", + mockResponse: `{"errors":[{"message":"Cannot query field 'invalid' on type 'query_root'"}]}`, + mockStatusCode: 200, // GraphQL returns 200 even for errors + expectedError: true, // Now returns an error instead of the error body + validateResponse: func(t *testing.T, result []byte) { + // This won't be called since we expect an error + }, + }, + { + name: "http error response", + query: "query Test { msg { version } }", + token: "invalid-token", + mockResponse: `{"error": "Unauthorized"}`, + mockStatusCode: 401, + expectedError: false, // queryIndexer doesn't check status code + validateResponse: func(t *testing.T, result []byte) { + assert.Contains(t, string(result), "Unauthorized") + }, + }, + { + name: "invalid json response", + query: "query Test { msg { version } }", + token: "", + mockResponse: `not json`, + mockStatusCode: 200, + expectedError: false, + validateResponse: func(t *testing.T, result []byte) { + assert.Equal(t, "not json", string(result)) + assert.False(t, gjson.Valid(string(result))) + }, + }, + } + + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + // Create a test server + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + // Verify method + assert.Equal(t, "POST", r.Method) + + // Verify headers + for key, expected := range tc.expectedHeaders { + actual := r.Header.Get(key) + assert.Equal(t, expected, actual, "Header %s mismatch", key) + } + + // Verify the request body contains our query + var requestBody map[string]interface{} + err := json.NewDecoder(r.Body).Decode(&requestBody) + assert.NoError(t, err) + assert.Equal(t, tc.query, requestBody["query"]) + + // Send mock response + w.WriteHeader(tc.mockStatusCode) + _, _ = w.Write([]byte(tc.mockResponse)) //nolint:errcheck // Test code + })) + defer server.Close() + + // Create watcher with test server URL + w := &Watcher{ + aptosIndexerRPC: server.URL, + aptosIndexerToken: tc.token, + } + + // Prepare headers + headers := make(map[string]string) + if w.aptosIndexerToken != "" { + headers["Authorization"] = "Bearer " + w.aptosIndexerToken + } + + // Call queryIndexer + result, err := w.queryIndexer(tc.query, headers) + + // Check error + if tc.expectedError { + assert.Error(t, err) + // For GraphQL error test, verify error message + if tc.name == "graphql error response" { + assert.Contains(t, err.Error(), "invalid") + } + } else { + assert.NoError(t, err) + } + + // Validate response + if tc.validateResponse != nil && !tc.expectedError { + tc.validateResponse(t, result) + } + }) + } +} + +// Test processTransactionVersion against real Aptos RPC to understand data format +func TestProcessTransactionVersionRealData(t *testing.T) { + // Skip this test in CI - it's for local development/exploration + t.Skip("Skipping real Aptos RPC test - for local development only") + + aptosRPC := "https://fullnode.mainnet.aptoslabs.com" + indexerRPC := "https://your-indexer-endpoint.com/v1/graphql" + indexerToken := "your-indexer-token" //nolint:gosec // This is a test token + aptosAccount := "0x5bc11445584a763c1fa7ed39081f1b920954da14e04b32440cba863d03e19625" + + // Create a watcher with real endpoints + w := &Watcher{ + chainID: vaa.ChainIDAptos, + networkID: "aptos-mainnet", + aptosRPC: aptosRPC, + aptosAccount: aptosAccount, + aptosIndexerRPC: indexerRPC, + aptosIndexerToken: indexerToken, + msgC: make(chan<- *common.MessagePublication, 10), + } + + // First, get some version numbers from the indexer + query := "query GetRecentEvents { msg(order_by: {sequence_num: desc}, limit: 3) { version sequence_num } }" + headers := make(map[string]string) + headers["Authorization"] = "Bearer " + indexerToken + + eventsJson, err := w.queryIndexer(query, headers) + require.NoError(t, err, "Failed to query indexer for test versions") + + parsed := gjson.ParseBytes(eventsJson) + messages := parsed.Get("data.msg") + require.True(t, messages.Exists(), "No data.msg in indexer response") + + logger := zap.NewNop() + + // Test a few recent transactions + for i, msg := range messages.Array() { + if i >= 3 { // Limit to 3 tests + break + } + + version := msg.Get("version") + sequenceNum := msg.Get("sequence_num") + + if !version.Exists() || !sequenceNum.Exists() { + continue + } + + versionNum := version.Uint() + seqNum := sequenceNum.Uint() + + fmt.Printf("\n=== Testing Version %d (sequence %d) ===\n", versionNum, seqNum) + + // Test processTransactionVersion + err := w.processTransactionVersion(logger, versionNum, false) + + fmt.Printf("Result: ") + if err != nil { + fmt.Printf("Error - %v\n", err) + } else { + fmt.Printf("Success\n") + } + + // Also fetch the raw transaction to see what it looks like + txEndpoint := fmt.Sprintf("%s/v1/transactions/by_version/%d", aptosRPC, versionNum) + txData, err := w.retrievePayload(txEndpoint) + if err != nil { + fmt.Printf("Failed to fetch transaction: %v\n", err) + continue + } + + // Parse and show event types + if gjson.Valid(string(txData)) { + txResult := gjson.ParseBytes(txData) + events := txResult.Get("events") + + fmt.Printf("Transaction events:\n") + if events.Exists() { + for j, event := range events.Array() { + eventType := event.Get("type") + fmt.Printf(" [%d] %s\n", j, eventType.String()) + } + } else { + fmt.Printf(" No events found\n") + } + } + } +} + +// Test processTransactionVersion with mocked responses +func TestProcessTransactionVersion(t *testing.T) { + // Mock transaction response based on txResult.json + mockTxWithEvent := `{ + "version": "3432334170", + "hash": "0xb56e99068a50e98d9eb3a447854c265d0ab472b5ecf6d583c71152e96fbf0e84", + "success": true, + "events": [ + { + "guid": { + "creation_number": "0", + "account_address": "0x0" + }, + "sequence_number": "0", + "type": "0x1::fungible_asset::Withdraw", + "data": { + "amount": "1000000", + "store": "0xbb3e678df60319ddb9e7c570e5a01bc2d57b38d4f0fc6b571d140c6a40528918" + } + }, + { + "guid": { + "creation_number": "2", + "account_address": "0x5bc11445584a763c1fa7ed39081f1b920954da14e04b32440cba863d03e19625" + }, + "sequence_number": "173829", + "type": "0x5bc11445584a763c1fa7ed39081f1b920954da14e04b32440cba863d03e19625::state::WormholeMessage", + "data": { + "consistency_level": 0, + "nonce": "0", + "payload": "0x0100000000000000000000000000000000000000000000000000000000000f4240a867703f5395cb2965feb7ebff5cdf39b771fc6156085da3ae4147a00be91b380016b730bbb0b27faef6cb524bd733649ddf1a91f3639eee1d300eff231fda49d64000150000000000000000000000000000000000000000000000000000000000000000", + "sender": "1", + "sequence": "170690", + "timestamp": "1758377559" + } + } + ] + }` + + mockTxWithoutEvent := `{ + "version": "3432334171", + "hash": "0xtest123", + "success": true, + "events": [ + { + "type": "0x1::coin::WithdrawEvent", + "data": { + "amount": "100" + } + } + ] + }` + + mockInvalidJson := `{invalid json}` + + tests := []struct { + name string + versionNum uint64 + sequenceForObserve uint64 + isReobservation bool + mockResponse string + mockStatusCode int + expectedError bool + expectObserveCall bool + aptosAccount string + }{ + { + name: "successful transaction with WormholeMessage event", + versionNum: 3432334170, + sequenceForObserve: 173829, + isReobservation: false, + mockResponse: mockTxWithEvent, + mockStatusCode: 200, + expectedError: false, + expectObserveCall: true, + aptosAccount: "0x5bc11445584a763c1fa7ed39081f1b920954da14e04b32440cba863d03e19625", + }, + { + name: "successful transaction without WormholeMessage event", + versionNum: 3432334171, + sequenceForObserve: 173830, + isReobservation: false, + mockResponse: mockTxWithoutEvent, + mockStatusCode: 200, + expectedError: false, + expectObserveCall: false, + aptosAccount: "0x5bc11445584a763c1fa7ed39081f1b920954da14e04b32440cba863d03e19625", + }, + { + name: "reobservation request", + versionNum: 3432334170, + sequenceForObserve: 173829, + isReobservation: true, + mockResponse: mockTxWithEvent, + mockStatusCode: 200, + expectedError: false, + expectObserveCall: true, + aptosAccount: "0x5bc11445584a763c1fa7ed39081f1b920954da14e04b32440cba863d03e19625", + }, + { + name: "server error with empty response", + versionNum: 3432334172, + sequenceForObserve: 173831, + isReobservation: false, + mockResponse: "", + mockStatusCode: 500, + expectedError: true, // empty response is invalid JSON + expectObserveCall: false, + aptosAccount: "0x5bc11445584a763c1fa7ed39081f1b920954da14e04b32440cba863d03e19625", + }, + { + name: "invalid JSON response", + versionNum: 3432334173, + sequenceForObserve: 173832, + isReobservation: false, + mockResponse: mockInvalidJson, + mockStatusCode: 200, + expectedError: true, + expectObserveCall: false, + aptosAccount: "0x5bc11445584a763c1fa7ed39081f1b920954da14e04b32440cba863d03e19625", + }, + { + name: "different account address - no event match", + versionNum: 3432334170, + sequenceForObserve: 173829, + isReobservation: false, + mockResponse: mockTxWithEvent, + mockStatusCode: 200, + expectedError: false, + expectObserveCall: false, + aptosAccount: "0xdifferentaccount", + }, + } + + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + // Track if observeData was called + observeCalled := false + var observedSequence uint64 + var observedIsReobservation bool + + // Create a test server for the Aptos RPC + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + // Verify the URL matches expected pattern + expectedPath := fmt.Sprintf("/v1/transactions/by_version/%d", tc.versionNum) + assert.Equal(t, expectedPath, r.URL.Path) + + // Send mock response + w.WriteHeader(tc.mockStatusCode) + _, _ = w.Write([]byte(tc.mockResponse)) //nolint:errcheck // Test code + })) + defer server.Close() + + // Create watcher with test server URL + w := &Watcher{ + chainID: vaa.ChainIDAptos, + networkID: "aptos-test", + aptosRPC: server.URL, + aptosAccount: tc.aptosAccount, + // Create a mock message channel to capture observations + msgC: make(chan<- *common.MessagePublication, 1), + } + + // Override observeData for testing (we'd need to refactor for proper mocking) + // For now, we can test the function returns the expected error + + logger := zap.NewNop() + err := w.processTransactionVersion(logger, tc.versionNum, tc.isReobservation) + + if tc.expectedError { + assert.Error(t, err) + } else { + assert.NoError(t, err) + } + + // Note: In a real test, we'd want to mock observeData to verify it's called + // This would require refactoring to use an interface or function parameter + _ = observeCalled + _ = observedSequence + _ = observedIsReobservation + }) + } +} + +// Test NewWatcher with different mode configurations +func TestNewWatcherModes(t *testing.T) { + tests := []struct { + name string + chainID vaa.ChainID + networkID watchers.NetworkID + rpc string + account string + handle string + indexerRpc string + indexerToken string + useIndexer bool + expectedUseIndexer bool + }{ + { + name: "legacy mode - all basic params", + chainID: vaa.ChainIDAptos, + networkID: "aptos", + rpc: "https://fullnode.mainnet.aptoslabs.com", + account: "0x5bc11445584a763c1fa7ed39081f1b920954da14e04b32440cba863d03e19625", + handle: "0x0000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000002::state::WormholeMessage", + indexerRpc: "", + indexerToken: "", + useIndexer: false, + expectedUseIndexer: false, + }, + { + name: "indexer mode - with indexer params", + chainID: vaa.ChainIDAptos, + networkID: "aptos", + rpc: "https://fullnode.mainnet.aptoslabs.com", + account: "0x5bc11445584a763c1fa7ed39081f1b920954da14e04b32440cba863d03e19625", + handle: "0x0000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000002::state::WormholeMessage", + indexerRpc: "https://example.com/v1/graphql", + indexerToken: "test-token", + useIndexer: true, + expectedUseIndexer: true, + }, + { + name: "indexer mode without token", + chainID: vaa.ChainIDAptos, + networkID: "aptos", + rpc: "https://fullnode.mainnet.aptoslabs.com", + account: "0x5bc11445584a763c1fa7ed39081f1b920954da14e04b32440cba863d03e19625", + handle: "0x0000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000002::state::WormholeMessage", + indexerRpc: "https://example.com/v1/graphql", + indexerToken: "", + useIndexer: true, + expectedUseIndexer: true, + }, + } + + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + // Create mock channels + msgC := make(chan *common.MessagePublication, 10) + obsvReqC := make(chan *gossipv1.ObservationRequest, 10) // Use interface{} to match signature + + // Create watcher + watcher := NewWatcher( + tc.chainID, + tc.networkID, + tc.rpc, + tc.account, + tc.handle, + tc.indexerRpc, + tc.indexerToken, + tc.useIndexer, + msgC, + obsvReqC, + ) + + // Verify watcher configuration + assert.Equal(t, tc.chainID, watcher.chainID) + assert.Equal(t, string(tc.networkID), watcher.networkID) + assert.Equal(t, tc.rpc, watcher.aptosRPC) + assert.Equal(t, tc.account, watcher.aptosAccount) + assert.Equal(t, tc.handle, watcher.aptosHandle) + assert.Equal(t, tc.indexerRpc, watcher.aptosIndexerRPC) + assert.Equal(t, tc.indexerToken, watcher.aptosIndexerToken) + assert.Equal(t, tc.expectedUseIndexer, watcher.useIndexer) + }) + } +} + +// Test that mode detection works correctly in NewWatcher +func TestModeDetection(t *testing.T) { + tests := []struct { + name string + useIndexer bool + expectedUseIndexer bool + }{ + { + name: "legacy mode", + useIndexer: false, + expectedUseIndexer: false, + }, + { + name: "indexer mode", + useIndexer: true, + expectedUseIndexer: true, + }, + } + + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + // Create mock channels + msgC := make(chan *common.MessagePublication, 10) + obsvReqC := make(chan *gossipv1.ObservationRequest, 10) + + // Create watcher with minimal config + watcher := NewWatcher( + vaa.ChainIDAptos, + "aptos", + "https://example.com", + "0x1", + "handle", + "https://indexer.com", + "token", + tc.useIndexer, + msgC, + obsvReqC, + ) + + // Verify mode is set correctly + assert.Equal(t, tc.expectedUseIndexer, watcher.useIndexer) + }) + } +}