add forms
This commit is contained in:
120
forms/kernel/kernel.js
Normal file
120
forms/kernel/kernel.js
Normal file
@@ -0,0 +1,120 @@
|
||||
import fs from "fs"
|
||||
import path from "path"
|
||||
import { WebSocketServer } from "ws"
|
||||
|
||||
const DATA_DIR = path.resolve("./")
|
||||
const DATA_FILE = path.join(DATA_DIR, "master.forms")
|
||||
|
||||
if (!fs.existsSync(DATA_DIR)) {
|
||||
fs.mkdirSync(DATA_DIR, { recursive: true })
|
||||
}
|
||||
|
||||
if (!fs.existsSync(DATA_FILE)) {
|
||||
fs.writeFileSync(DATA_FILE, "")
|
||||
}
|
||||
|
||||
class Kernel {
|
||||
port
|
||||
seq = 0
|
||||
clients = new Set()
|
||||
|
||||
constructor(port = 10000) {
|
||||
this.port = port
|
||||
|
||||
this._loadSeq()
|
||||
|
||||
this.wss = new WebSocketServer({ port: this.port })
|
||||
this.wss.on("connection", ws => this._onConnection(ws))
|
||||
|
||||
console.log(`[kernel] running on ws://localhost:${port}`)
|
||||
}
|
||||
|
||||
_loadSeq() {
|
||||
const data = fs.readFileSync(DATA_FILE, "utf8")
|
||||
if (!data) return
|
||||
|
||||
const lines = data.trim().split("\n")
|
||||
const last = lines[lines.length - 1]
|
||||
if (last) {
|
||||
const entry = JSON.parse(last)
|
||||
this.seq = entry.seq
|
||||
}
|
||||
}
|
||||
|
||||
_onConnection(ws) {
|
||||
this.clients.add(ws)
|
||||
|
||||
ws.on("message", msg => this._onMessage(ws, msg))
|
||||
ws.on("close", () => this.clients.delete(ws))
|
||||
|
||||
ws.send(JSON.stringify({
|
||||
type: "hello",
|
||||
seq: this.seq
|
||||
}))
|
||||
}
|
||||
|
||||
_onMessage(ws, raw) {
|
||||
let msg
|
||||
try {
|
||||
msg = JSON.parse(raw.toString())
|
||||
} catch {
|
||||
return
|
||||
}
|
||||
|
||||
if (msg.op === "append") {
|
||||
this._append(msg.form, msg.data)
|
||||
}
|
||||
|
||||
if (msg.op === "replay") {
|
||||
this._replay(ws, msg.form, msg.fromSeq)
|
||||
}
|
||||
}
|
||||
|
||||
_append(form, data) {
|
||||
const entry = {
|
||||
seq: ++this.seq,
|
||||
time: Date.now(),
|
||||
form,
|
||||
data
|
||||
}
|
||||
|
||||
fs.appendFileSync(DATA_FILE, JSON.stringify(entry) + "\n")
|
||||
|
||||
const payload = JSON.stringify({
|
||||
type: "form:update",
|
||||
...entry
|
||||
})
|
||||
|
||||
for (const ws of this.clients) {
|
||||
if (ws.readyState === ws.OPEN) {
|
||||
ws.send(payload)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
_replay(ws, form, fromSeq = 0) {
|
||||
const stream = fs.createReadStream(DATA_FILE, { encoding: "utf8" })
|
||||
let buffer = ""
|
||||
|
||||
stream.on("data", chunk => {
|
||||
buffer += chunk
|
||||
let idx
|
||||
while ((idx = buffer.indexOf("\n")) >= 0) {
|
||||
const line = buffer.slice(0, idx)
|
||||
buffer = buffer.slice(idx + 1)
|
||||
|
||||
if (!line) continue
|
||||
|
||||
const entry = JSON.parse(line)
|
||||
if (entry.seq > fromSeq && entry.form === form) {
|
||||
ws.send(JSON.stringify({
|
||||
type: "form:update",
|
||||
...entry
|
||||
}))
|
||||
}
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
new Kernel()
|
||||
Reference in New Issue
Block a user