Skip to content

Commit dd19ce9

Browse files
committed
apply pr feedback
consolidate ensdb modules; replace pino logger with native console logger; simplify ensdb worker logic
1 parent 623260f commit dd19ce9

File tree

13 files changed

+273
-316
lines changed

13 files changed

+273
-316
lines changed

apps/ensapi/package.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@
4343
"date-fns": "catalog:",
4444
"drizzle-orm": "catalog:",
4545
"hono": "catalog:",
46-
"p-memoize": "^8.0.0",
46+
"p-memoize": "catalog:",
4747
"p-retry": "catalog:",
4848
"pg-connection-string": "catalog:",
4949
"pino": "catalog:",

apps/ensindexer/package.json

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,15 +29,14 @@
2929
"@ensnode/ensnode-sdk": "workspace:*",
3030
"@ensnode/ensrainbow-sdk": "workspace:*",
3131
"@ensnode/ponder-metadata": "workspace:*",
32+
"@ponder/client": "catalog:",
3233
"caip": "catalog:",
3334
"date-fns": "catalog:",
3435
"deepmerge-ts": "^7.1.5",
3536
"dns-packet": "^5.6.1",
3637
"drizzle-orm": "catalog:",
3738
"p-retry": "catalog:",
38-
"pino": "catalog:",
3939
"pg-connection-string": "catalog:",
40-
"pg": "8.16.3",
4140
"hono": "catalog:",
4241
"ponder": "catalog:",
4342
"viem": "catalog:",

apps/ensindexer/ponder/src/ensdb-writer-worker.ts

Lines changed: 30 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,6 @@
44
* - Indexing Status
55
* into the ENSDb.
66
*/
7-
import config from "@/config";
8-
97
import { secondsToMilliseconds } from "date-fns";
108
import pRetry from "p-retry";
119

@@ -18,14 +16,11 @@ import {
1816
} from "@ensnode/ensnode-sdk";
1917

2018
import { validateENSIndexerPublicConfigCompatibility } from "@/config/compatibility";
21-
import { EnsDbConnection, EnsDbMutation, EnsDbQuery } from "@/lib/ensdb";
19+
import { EnsDbClient } from "@/lib/ensdb";
2220
import { ensIndexerClient, waitForEnsIndexerToBecomeHealthy } from "@/lib/ensindexer";
23-
import { makeLogger } from "@/lib/logger";
2421

2522
const INDEXING_STATUS_RECORD_UPDATE_INTERVAL: Duration = 1;
2623

27-
const logger = makeLogger("ensdb-writer-worker");
28-
2924
/**
3025
* ENSDb Writer Worker
3126
*
@@ -38,57 +33,45 @@ const logger = makeLogger("ensdb-writer-worker");
3833
* into ENSDb.
3934
*/
4035
async function ensDbWriterWorker() {
36+
console.log("ENSDb Writer Worker: waiting for ENSIndexer to become healthy.");
37+
4138
// 0. Wait for ENSIndexer to become healthy before running the worker's logic
4239
await waitForEnsIndexerToBecomeHealthy;
4340

44-
// 1. Create ENSDb Client
45-
const ensDbConnection = new EnsDbConnection();
46-
const ensDbClient = ensDbConnection.connect({
47-
schemaName: config.databaseSchemaName,
48-
poolConfig: {
49-
connectionString: config.databaseUrl,
50-
},
51-
});
52-
53-
logger.info("ENSDb Client connected");
41+
console.log("ENSDb Writer Worker: ENSIndexer is healthy, starting tasks.");
5442

55-
// 2. Create ENSDb Query object for read operations
56-
const ensDbQuery = new EnsDbQuery(ensDbClient);
57-
// 3. Create ENSDb Mutation object for write operations
58-
const ensDbMutation = new EnsDbMutation(ensDbClient);
43+
// 1. Create ENSDb Client
44+
const ensDbClient = new EnsDbClient();
5945

6046
/**
6147
* Handle ENSIndexerPublicConfig Record
6248
*/
6349
const handleEnsIndexerPublicConfigRecord = async () => {
6450
// Read stored config and in-memory config.
65-
// Note: we wrap read operations in pRetry to ensure all of them are
51+
// Note: we wrap each operation in pRetry to ensure all of them can be
6652
// completed successfully.
67-
const [storedConfig, inMemoryConfig] = await pRetry(() =>
68-
Promise.all([ensDbQuery.getEnsIndexerPublicConfig(), ensIndexerClient.config()]),
69-
);
53+
const [storedConfig, inMemoryConfig] = await Promise.all([
54+
pRetry(() => ensDbClient.getEnsIndexerPublicConfig()),
55+
pRetry(() => ensIndexerClient.config()),
56+
]);
7057

7158
// Validate in-memory config object compatibility with the stored one,
7259
// if the stored one is available
7360
if (storedConfig) {
7461
try {
7562
validateENSIndexerPublicConfigCompatibility(storedConfig, inMemoryConfig);
7663
} catch (error) {
77-
const errorMessage =
78-
"In-memory ENSIndexerPublicConfig object is not compatible with its counterpart stored in ENSDb.";
79-
80-
logger.error(error, errorMessage);
64+
const errorMessage = `In-memory ENSIndexerPublicConfig object is not compatible with its counterpart stored in ENSDb.`;
8165

8266
// Throw the error to terminate the ENSIndexer process due to
8367
// found config incompatibility
84-
throw new Error(errorMessage);
68+
throw new Error(errorMessage, {
69+
cause: error,
70+
});
8571
}
8672
} else {
8773
// Upsert ENSIndexerPublicConfig into ENSDb.
88-
// Note: we wrap write operation in pRetry to ensure it can complete
89-
// successfully, as there will be no other attempt.
90-
await pRetry(() => ensDbMutation.upsertEnsIndexerPublicConfig(inMemoryConfig));
91-
logger.info("ENSIndexer Public Config successfully stored in ENSDb.");
74+
await ensDbClient.upsertEnsIndexerPublicConfig(inMemoryConfig);
9275
}
9376
};
9477

@@ -110,16 +93,14 @@ async function ensDbWriterWorker() {
11093

11194
// Check if Indexing Status is in expected status.
11295
if (omnichainSnapshot.omnichainStatus === OmnichainIndexingStatusIds.Unstarted) {
113-
throw new Error("Omnichain Status must be different that 'Unstarted'.");
96+
throw new Error("Omnichain Status must be different than 'Unstarted'.");
11497
}
11598

11699
// Upsert ENSIndexerPublicConfig into ENSDb.
117-
await ensDbMutation.upsertIndexingStatus(snapshot);
118-
119-
logger.info("Indexing Status successfully stored in ENSDb.");
100+
await ensDbClient.upsertIndexingStatus(snapshot);
120101
} catch (error) {
121102
// Do nothing about this error, but having it logged.
122-
logger.error(error, "Could not upsert Indexing Status record");
103+
console.error(error, "Could not upsert Indexing Status record");
123104
} finally {
124105
// Regardless of current iteration result,
125106
// schedule the next callback to handle Indexing Status Record.
@@ -131,12 +112,20 @@ async function ensDbWriterWorker() {
131112
};
132113

133114
// 4. Handle ENSIndexer Public Config just once.
134-
await handleEnsIndexerPublicConfigRecord();
115+
console.log("Task: store ENSIndexer Public Config in ENSDb.");
116+
await handleEnsIndexerPublicConfigRecord().then(() =>
117+
console.log("ENSIndexer Public Config successfully stored in ENSDb."),
118+
);
135119

136120
// 5. Handle Indexing Status on recurring basis.
137-
await handleIndexingStatusRecordRecursively();
121+
console.log("Task: store Indexing Status in ENSDb.");
122+
await handleIndexingStatusRecordRecursively().then(() =>
123+
console.log("Indexing Status successfully stored in ENSDb."),
124+
);
138125
}
139126

140127
// Run ENSDb Writer Worker in a non-blocking way to
141128
// allow database migrations to proceed in the background.
142-
setTimeout(ensDbWriterWorker, 0);
129+
ensDbWriterWorker().catch((error) =>
130+
console.error("ENSDb Writer Worker failed to perform its tasks", error),
131+
);

apps/ensindexer/src/config/compatibility.ts

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -9,10 +9,7 @@ export type ENSIndexerPublicConfigCompatibilityCheck = Pick<
99
* Validate if `configB` is compatible with `configA`, such that `configA` is
1010
* a subset of `configB`.
1111
*
12-
* @throws error if 'indexedChainIds' were incompatible.
13-
* @throws error if 'isSubgraphCompatible' flag was incompatible.
14-
* @throws error if 'namespace' was incompatible.
15-
* @throws error if 'plugins' were incompatible.
12+
* @throws error if configs are incompatible.
1613
*/
1714
export function validateENSIndexerPublicConfigCompatibility(
1815
configA: ENSIndexerPublicConfigCompatibilityCheck,
Lines changed: 7 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -1,41 +1,25 @@
11
// This file was copied 1-to-1 from ENSApi.
2+
// TODO: deduplicate with apps/ensapi/src/lib/handlers/drizzle.ts when ensnode nodejs internal package is created
23

3-
import { isTable, Table } from "drizzle-orm";
4-
import { drizzle, type NodePgDatabase } from "drizzle-orm/node-postgres";
5-
import { isPgEnum } from "drizzle-orm/pg-core";
6-
import type { Pool } from "pg";
4+
import { setDatabaseSchema } from "@ponder/client";
5+
import { drizzle } from "drizzle-orm/node-postgres";
76

87
type Schema = { [name: string]: unknown };
98

10-
// https://github.com/ponder-sh/ponder/blob/f7f6444ab8d1a870fe6492023941091df7b7cddf/packages/client/src/index.ts#L226C1-L239C3
11-
const setDatabaseSchema = <T extends Schema>(schema: T, schemaName: string) => {
12-
for (const table of Object.values(schema)) {
13-
if (isTable(table)) {
14-
// @ts-expect-error
15-
table[Table.Symbol.Schema] = schemaName;
16-
} else if (isPgEnum(table)) {
17-
// @ts-expect-error
18-
table.schema = schemaName;
19-
}
20-
}
21-
};
22-
239
/**
2410
* Makes a Drizzle DB object.
2511
*/
2612
export const makeDrizzle = <SCHEMA extends Schema>({
2713
schema,
28-
connectionPool,
14+
databaseUrl,
2915
databaseSchema,
3016
}: {
3117
schema: SCHEMA;
32-
connectionPool: Pool;
18+
databaseUrl: string;
3319
databaseSchema: string;
34-
}): NodePgDatabase<SCHEMA> & {
35-
$client: Pool;
36-
} => {
20+
}) => {
3721
// monkeypatch schema onto tables
3822
setDatabaseSchema(schema, databaseSchema);
3923

40-
return drizzle(connectionPool, { schema, casing: "snake_case" });
24+
return drizzle(databaseUrl, { schema, casing: "snake_case" });
4125
};
Lines changed: 161 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,161 @@
1+
import config from "@/config";
2+
3+
import { eq } from "drizzle-orm/sql";
4+
5+
import * as schema from "@ensnode/ensnode-schema";
6+
import type {
7+
SerializedCrossChainIndexingStatusSnapshot,
8+
SerializedENSIndexerPublicConfig,
9+
} from "@ensnode/ensnode-sdk";
10+
11+
import { makeDrizzle } from "./drizzle";
12+
import {
13+
type EnsNodeMetadata,
14+
type EnsNodeMetadataEnsIndexerPublicConfig,
15+
type EnsNodeMetadataIndexingStatus,
16+
EnsNodeMetadataKeys,
17+
} from "./ensnode-metadata";
18+
19+
/**
20+
* ENSDb Client Query
21+
*
22+
Includes methods for reading from ENSDb.
23+
*/
24+
export interface EnsDbClientQuery {
25+
getEnsIndexerPublicConfig(): Promise<SerializedENSIndexerPublicConfig | undefined>;
26+
27+
getIndexingStatus(): Promise<SerializedCrossChainIndexingStatusSnapshot | undefined>;
28+
}
29+
30+
/**
31+
* ENSDb Client Mutation
32+
*
33+
* Includes methods for writing into ENSDb.
34+
*/
35+
export interface EnsDbClientMutation {
36+
upsertEnsIndexerPublicConfig(
37+
ensIndexerPublicConfig: SerializedENSIndexerPublicConfig,
38+
): Promise<SerializedENSIndexerPublicConfig>;
39+
40+
upsertIndexingStatus(
41+
indexingStatus: SerializedCrossChainIndexingStatusSnapshot,
42+
): Promise<SerializedCrossChainIndexingStatusSnapshot>;
43+
}
44+
45+
/**
46+
* ENSDb Client
47+
*/
48+
export class EnsDbClient implements EnsDbClientQuery, EnsDbClientMutation {
49+
#db = makeDrizzle({
50+
databaseSchema: config.databaseSchemaName,
51+
databaseUrl: config.databaseUrl,
52+
schema,
53+
});
54+
55+
/**
56+
* Upsert ENSIndexer Public Config
57+
*
58+
* @returns updated record in ENSDb.
59+
* @throws when upsert operation failed.
60+
*/
61+
async getEnsIndexerPublicConfig(): Promise<SerializedENSIndexerPublicConfig | undefined> {
62+
return this.getEnsNodeMetadata<EnsNodeMetadataEnsIndexerPublicConfig>({
63+
key: EnsNodeMetadataKeys.EnsIndexerPublicConfig,
64+
});
65+
}
66+
67+
/**
68+
* Upsert Indexing Status
69+
*
70+
* @returns updated record in ENSDb.
71+
* @throws when upsert operation failed.
72+
*/
73+
async getIndexingStatus(): Promise<SerializedCrossChainIndexingStatusSnapshot | undefined> {
74+
return this.getEnsNodeMetadata<EnsNodeMetadataIndexingStatus>({
75+
key: EnsNodeMetadataKeys.IndexingStatus,
76+
});
77+
}
78+
79+
/**
80+
* Upsert ENSIndexer Public Config
81+
*
82+
* @returns updated record in ENSDb.
83+
* @throws when upsert operation failed.
84+
*/
85+
async upsertEnsIndexerPublicConfig(
86+
ensIndexerPublicConfig: SerializedENSIndexerPublicConfig,
87+
): Promise<SerializedENSIndexerPublicConfig> {
88+
return this.upsertEnsNodeMetadata({
89+
key: EnsNodeMetadataKeys.EnsIndexerPublicConfig,
90+
value: ensIndexerPublicConfig,
91+
});
92+
}
93+
94+
/**
95+
* Upsert Indexing Status
96+
*
97+
* @returns updated record in ENSDb.
98+
* @throws when upsert operation failed.
99+
*/
100+
async upsertIndexingStatus(
101+
indexingStatus: SerializedCrossChainIndexingStatusSnapshot,
102+
): Promise<SerializedCrossChainIndexingStatusSnapshot> {
103+
return this.upsertEnsNodeMetadata({
104+
key: EnsNodeMetadataKeys.IndexingStatus,
105+
value: indexingStatus,
106+
});
107+
}
108+
109+
/**
110+
* Get ENSNode metadata record
111+
*
112+
* @returns selected record in ENSDb.
113+
* @throws when exactly one matching metadata record was not found
114+
*/
115+
private async getEnsNodeMetadata<EnsNodeMetadataType extends EnsNodeMetadata = EnsNodeMetadata>(
116+
metadata: Pick<EnsNodeMetadataType, "key">,
117+
): Promise<EnsNodeMetadataType["value"] | undefined> {
118+
const result = await this.#db
119+
.select()
120+
.from(schema.ensNodeMetadata)
121+
.where(eq(schema.ensNodeMetadata.key, metadata.key));
122+
123+
if (result.length === 0) {
124+
return undefined;
125+
}
126+
127+
if (result.length === 1 && result[0]) {
128+
return result[0].value as EnsNodeMetadataType["value"];
129+
}
130+
131+
throw new Error(`There must be exactly one ENSNodeMetadata record for '${metadata.key}' key`);
132+
}
133+
134+
/**
135+
* Upsert ENSNode metadata
136+
*
137+
* @returns updated record in ENSDb.
138+
* @throws when upsert operation failed.
139+
*/
140+
private async upsertEnsNodeMetadata<
141+
EnsNodeMetadataType extends EnsNodeMetadata = EnsNodeMetadata,
142+
>(metadata: EnsNodeMetadataType): Promise<EnsNodeMetadataType["value"]> {
143+
const [result] = await this.#db
144+
.insert(schema.ensNodeMetadata)
145+
.values({
146+
key: metadata.key,
147+
value: metadata.value,
148+
})
149+
.onConflictDoUpdate({
150+
target: schema.ensNodeMetadata.key,
151+
set: { value: metadata.value },
152+
})
153+
.returning({ value: schema.ensNodeMetadata.value });
154+
155+
if (!result) {
156+
throw new Error(`Failed to upsert metadata for key: ${metadata.key}`);
157+
}
158+
159+
return result.value as EnsNodeMetadataType["value"];
160+
}
161+
}

0 commit comments

Comments
 (0)