158 lines
3.7 KiB
JavaScript
158 lines
3.7 KiB
JavaScript
import fs from "fs"
|
|
import path from "path"
|
|
import * as z from "zod"
|
|
import { WebSocketServer } from "ws"
|
|
import { fileURLToPath } from "url"
|
|
const __dirname = path.dirname(fileURLToPath(import.meta.url))
|
|
|
|
const DATA_DIR = path.resolve(__dirname)
|
|
const DATA_FILE = path.join(DATA_DIR, "master.forms")
|
|
|
|
if (!fs.existsSync(DATA_DIR)) {
|
|
fs.mkdirSync(DATA_DIR, { recursive: true })
|
|
}
|
|
|
|
if (!fs.existsSync(DATA_FILE)) {
|
|
fs.writeFileSync(DATA_FILE, "")
|
|
}
|
|
|
|
class Kernel {
|
|
port
|
|
seq = 0
|
|
clients = new Set()
|
|
schemas = new Map()
|
|
|
|
constructor(port = 10000) {
|
|
this.port = port
|
|
|
|
this.loadData()
|
|
|
|
this.wss = new WebSocketServer({ port: this.port })
|
|
this.wss.on("connection", ws => this.onConnection(ws))
|
|
|
|
console.log(`[kernel] running on ws://localhost:${port}`)
|
|
}
|
|
|
|
loadData() {
|
|
const data = fs.readFileSync(DATA_FILE, "utf8")
|
|
if (!data) return
|
|
|
|
const lines = data.trim().split("\n")
|
|
const last = lines[lines.length - 1]
|
|
if (last) {
|
|
const entry = JSON.parse(last)
|
|
this.seq = entry.seq
|
|
}
|
|
}
|
|
|
|
onConnection(ws) {
|
|
this.clients.add(ws)
|
|
|
|
ws.on("message", msg => this.onMessage(ws, msg))
|
|
ws.on("close", () => this.clients.delete(ws))
|
|
|
|
ws.send(JSON.stringify({
|
|
type: "hello",
|
|
seq: this.seq
|
|
}))
|
|
}
|
|
|
|
onMessage(ws, raw) {
|
|
let msg
|
|
try {
|
|
msg = JSON.parse(raw.toString())
|
|
} catch {
|
|
return
|
|
}
|
|
|
|
switch(msg.op) {
|
|
case "add":
|
|
this.add(msg.type, msg.data)
|
|
break;
|
|
case "register":
|
|
this.register(msg.type, msg.schema)
|
|
break;
|
|
case "replay":
|
|
this.replay(ws, msg.type, msg.fromSeq)
|
|
break;
|
|
|
|
default:
|
|
console.error("error: unknown message operation: ", msg.op)
|
|
}
|
|
}
|
|
|
|
register(name, schema) {
|
|
|
|
try {
|
|
schema = z.fromJSONSchema(schema)
|
|
} catch(e) {
|
|
console.error("register() error: bad schema ", schema, e)
|
|
return
|
|
}
|
|
|
|
this.schemas.set(name, schema)
|
|
|
|
console.log(`[kernel] registered form "${name}"`)
|
|
}
|
|
|
|
add(name, data) {
|
|
let schema = this.schemas.get(name)
|
|
if(!schema) {
|
|
console.error("No schema for this form: ", name)
|
|
return
|
|
}
|
|
|
|
try {
|
|
schema.parse(data)
|
|
} catch(e) {
|
|
console.error("error parsing data : ", data, " for schema ", name, e)
|
|
}
|
|
|
|
const entry = {
|
|
seq: ++this.seq,
|
|
time: Date.now(),
|
|
name,
|
|
data
|
|
}
|
|
|
|
fs.appendFileSync(DATA_FILE, JSON.stringify(entry) + "\n")
|
|
|
|
const payload = JSON.stringify({
|
|
type: "add",
|
|
...entry
|
|
})
|
|
|
|
for (const ws of this.clients) {
|
|
if (ws.readyState === ws.OPEN) {
|
|
ws.send(payload)
|
|
}
|
|
}
|
|
}
|
|
|
|
replay(ws, form, fromSeq = 0) {
|
|
const stream = fs.createReadStream(DATA_FILE, { encoding: "utf8" })
|
|
let buffer = ""
|
|
|
|
stream.on("data", chunk => {
|
|
buffer += chunk
|
|
let idx
|
|
while ((idx = buffer.indexOf("\n")) >= 0) {
|
|
const line = buffer.slice(0, idx)
|
|
buffer = buffer.slice(idx + 1)
|
|
|
|
if (!line) continue
|
|
|
|
const entry = JSON.parse(line)
|
|
if (entry.seq > fromSeq && entry.form === form) {
|
|
ws.send(JSON.stringify({
|
|
type: "form:update",
|
|
...entry
|
|
}))
|
|
}
|
|
}
|
|
})
|
|
}
|
|
}
|
|
|
|
new Kernel()
|