Skip to content

Commit 42a9c15

Browse files
authored
Merge pull request #8431 from sagemathinc/conat-router-peer-discovery
hub/conat: discover via k8s api
2 parents 8599c22 + 3ee6329 commit 42a9c15

File tree

2 files changed

+150
-8
lines changed

2 files changed

+150
-8
lines changed
Lines changed: 100 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,100 @@
1+
import * as fs from "fs";
2+
import * as https from "https";
3+
4+
// Define the options interface for type safety
5+
interface ListPodsOptions {
6+
labelSelector?: string; // e.g. "app=foo,env=prod"
7+
}
8+
9+
const NAMESPACE: string = fs
10+
.readFileSync(
11+
"/var/run/secrets/kubernetes.io/serviceaccount/namespace",
12+
"utf8",
13+
)
14+
.trim();
15+
const CA: Buffer = fs.readFileSync(
16+
"/var/run/secrets/kubernetes.io/serviceaccount/ca.crt",
17+
);
18+
19+
async function listPods(options: ListPodsOptions = {}): Promise<any> {
20+
try {
21+
// Read service account details, token could be rotated
22+
const token = fs
23+
.readFileSync(
24+
"/var/run/secrets/kubernetes.io/serviceaccount/token",
25+
"utf8",
26+
)
27+
.trim();
28+
29+
// Base API path
30+
let path = `/api/v1/namespaces/${NAMESPACE}/pods`;
31+
32+
const queryParams: string[] = [];
33+
if (options.labelSelector) {
34+
queryParams.push(
35+
`labelSelector=${encodeURIComponent(options.labelSelector)}`,
36+
);
37+
}
38+
39+
if (queryParams.length > 0) {
40+
path += `?${queryParams.join("&")}`;
41+
}
42+
43+
const query: https.RequestOptions = {
44+
hostname: "kubernetes.default.svc",
45+
path,
46+
method: "GET",
47+
headers: {
48+
Authorization: `Bearer ${token}`,
49+
Accept: "application/json",
50+
},
51+
ca: [CA],
52+
};
53+
54+
return new Promise((resolve, reject) => {
55+
const req = https.request(query, (res) => {
56+
let data = "";
57+
res.on("data", (chunk) => {
58+
data += chunk;
59+
});
60+
res.on("end", () => {
61+
if (res.statusCode !== 200) {
62+
reject(
63+
new Error(
64+
`K8S API request failed. status=${res.statusCode}: ${data}`,
65+
),
66+
);
67+
} else {
68+
try {
69+
resolve(JSON.parse(data));
70+
} catch (parseError) {
71+
reject(parseError);
72+
}
73+
}
74+
});
75+
});
76+
77+
req.on("error", (error) => reject(error));
78+
req.end();
79+
});
80+
} catch (error) {
81+
throw new Error(
82+
`Failed to read service account files: ${(error as Error).message}`,
83+
);
84+
}
85+
}
86+
87+
export async function getAddressesFromK8sApi(): Promise<
88+
{ name: string; podIP: string }[]
89+
> {
90+
const res = await listPods({ labelSelector: "run=hub-conat-router" });
91+
const ret: { name: string; podIP: string }[] = [];
92+
for (const pod of res.items) {
93+
const name = pod.metadata?.name;
94+
const podIP = pod.status?.podIP;
95+
if (name && podIP) {
96+
ret.push({ name, podIP });
97+
}
98+
}
99+
return ret;
100+
}

src/packages/server/conat/socketio/dns-scan.ts

Lines changed: 50 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -5,16 +5,32 @@ COCALC_SERVICE
55
*/
66

77
import { delay } from "awaiting";
8-
import type { ConatServer } from "@cocalc/conat/core/server";
98
import { lookup } from "dns/promises";
10-
import port from "@cocalc/backend/port";
119
import { hostname } from "node:os";
12-
import { getLogger } from "@cocalc/backend/logger";
10+
1311
import { executeCode } from "@cocalc/backend/execute-code";
14-
import { split } from "@cocalc/util/misc";
12+
import { getLogger } from "@cocalc/backend/logger";
13+
import port from "@cocalc/backend/port";
14+
import type { ConatServer } from "@cocalc/conat/core/server";
15+
import { split, unreachable } from "@cocalc/util/misc";
16+
import { getAddressesFromK8sApi } from "./dns-scan-k8s-api";
1517

1618
export const SCAN_INTERVAL = 15_000;
1719

20+
type PeerDiscovery = "KUBECTL" | "API";
21+
22+
function isPeerDiscovery(x: string): x is PeerDiscovery {
23+
return x === "KUBECTL" || x === "API";
24+
}
25+
26+
const PEER_DISCOVERY: PeerDiscovery = (function () {
27+
const val = process.env.COCALC_CONAT_PEER_DISCOVERY ?? "KUBECTL";
28+
if (!isPeerDiscovery(val)) {
29+
throw Error(`Invalid COCALC_CONAT_PEER_DISCOVERY: ${val}`);
30+
}
31+
return val;
32+
})();
33+
1834
const logger = getLogger("conat:socketio:dns-scan");
1935

2036
export async function dnsScan(server: ConatServer) {
@@ -83,6 +99,32 @@ export async function getAddresses(): Promise<string[]> {
8399
const h = hostname();
84100
const i = h.lastIndexOf("-");
85101
const prefix = h.slice(0, i);
102+
103+
const podInfos = await getPodInfos();
104+
for (const { name, podIP } of podInfos) {
105+
if (name != h && name.startsWith(prefix)) {
106+
v.push(`http://${podIP}:${port}`);
107+
}
108+
}
109+
return v;
110+
}
111+
112+
async function getPodInfos(): Promise<{ name: string; podIP: string }[]> {
113+
switch (PEER_DISCOVERY) {
114+
case "KUBECTL":
115+
return await getAddressesFromKubectl();
116+
case "API":
117+
return await getAddressesFromK8sApi();
118+
default:
119+
unreachable(PEER_DISCOVERY);
120+
throw Error(`Unknown PEER_DISCOVERY: ${PEER_DISCOVERY}`);
121+
}
122+
}
123+
124+
async function getAddressesFromKubectl(): Promise<
125+
{ name: string; podIP: string }[]
126+
> {
127+
const ret: { name: string; podIP: string }[] = [];
86128
const { stdout } = await executeCode({
87129
command: "kubectl",
88130
args: [
@@ -97,10 +139,10 @@ export async function getAddresses(): Promise<string[]> {
97139
for (const x of stdout.split("\n")) {
98140
const row = split(x);
99141
if (row.length == 2) {
100-
if (row[0] != h && row[0].startsWith(prefix)) {
101-
v.push(`http://${row[1]}:${port}`);
102-
}
142+
ret.push({ name: row[0], podIP: row[1] });
143+
} else {
144+
logger.warn(`Unexpected row from kubectl: ${x}`);
103145
}
104146
}
105-
return v;
147+
return ret;
106148
}

0 commit comments

Comments
 (0)