import fs from "fs" import path from "path" import * as z from "zod" import { WebSocketServer } from "ws" import { fileURLToPath } from "url" const __dirname = path.dirname(fileURLToPath(import.meta.url)) const DATA_DIR = path.resolve(__dirname) const DATA_FILE = path.join(DATA_DIR, "master.forms") if (!fs.existsSync(DATA_DIR)) { fs.mkdirSync(DATA_DIR, { recursive: true }) } if (!fs.existsSync(DATA_FILE)) { fs.writeFileSync(DATA_FILE, "") } class Kernel { port seq = 0 clients = new Set() schemas = new Map() constructor(port = 10000) { this.port = port this.loadData() this.wss = new WebSocketServer({ port: this.port }) this.wss.on("connection", ws => this.onConnection(ws)) console.log(`[kernel] running on ws://localhost:${port}`) } loadData() { const data = fs.readFileSync(DATA_FILE, "utf8") if (!data) return const lines = data.trim().split("\n") const last = lines[lines.length - 1] if (last) { const entry = JSON.parse(last) this.seq = entry.seq } } onConnection(ws) { this.clients.add(ws) ws.on("message", msg => this.onMessage(ws, msg)) ws.on("close", () => this.clients.delete(ws)) ws.send(JSON.stringify({ type: "hello", seq: this.seq })) } onMessage(ws, raw) { let msg try { msg = JSON.parse(raw.toString()) } catch { return } switch(msg.op) { case "add": this.add(msg.type, msg.data) break; case "register": this.register(msg.type, msg.schema) break; case "replay": this.replay(ws, msg.type, msg.fromSeq) break; default: console.error("error: unknown message operation: ", msg.op) } } register(name, schema) { try { schema = z.fromJSONSchema(schema) } catch(e) { console.error("register() error: bad schema ", schema, e) return } this.schemas.set(name, schema) console.log(`[kernel] registered form "${name}"`) } add(name, data) { let schema = this.schemas.get(name) if(!schema) { console.error("No schema for this form: ", name) return } try { schema.parse(data) } catch(e) { console.error("error parsing data : ", data, " for schema ", name, e) } const entry = { seq: ++this.seq, time: Date.now(), name, data } fs.appendFileSync(DATA_FILE, JSON.stringify(entry) + "\n") const payload = JSON.stringify({ type: "add", ...entry }) for (const ws of this.clients) { if (ws.readyState === ws.OPEN) { ws.send(payload) } } } replay(ws, form, fromSeq = 0) { const stream = fs.createReadStream(DATA_FILE, { encoding: "utf8" }) let buffer = "" stream.on("data", chunk => { buffer += chunk let idx while ((idx = buffer.indexOf("\n")) >= 0) { const line = buffer.slice(0, idx) buffer = buffer.slice(idx + 1) if (!line) continue const entry = JSON.parse(line) if (entry.seq > fromSeq && entry.form === form) { ws.send(JSON.stringify({ type: "form:update", ...entry })) } } }) } } new Kernel()