diff --git a/db/db.json b/db/db.json deleted file mode 100644 index 544b7b4..0000000 --- a/db/db.json +++ /dev/null @@ -1,3 +0,0 @@ -{ - -} \ No newline at end of file diff --git a/forms/bin/forms.js b/forms/bin/forms.js new file mode 100755 index 0000000..f28341a --- /dev/null +++ b/forms/bin/forms.js @@ -0,0 +1,43 @@ +#!/usr/bin/env node +import { spawn } from "child_process" +import fs from "fs" +import path from "path" +import { fileURLToPath } from "url" + +const __dirname = path.dirname(fileURLToPath(import.meta.url)) +const KERNEL = path.join(__dirname, "../kernel/kernel.js") +const PID_FILE = "/tmp/forms.pid" + +const cmd = process.argv[2] + +if (cmd === "start") { + if (fs.existsSync(PID_FILE)) { + console.log("forms kernel already running") + process.exit(0) + } + + const proc = spawn("node", [KERNEL], { + detached: true, + stdio: "ignore" + }) + + proc.unref() + fs.writeFileSync(PID_FILE, String(proc.pid)) + console.log("forms kernel started") +} + +if (cmd === "stop") { + if (!fs.existsSync(PID_FILE)) { + console.log("forms kernel not running") + process.exit(0) + } + + const pid = Number(fs.readFileSync(PID_FILE)) + process.kill(pid) + fs.unlinkSync(PID_FILE) + console.log("forms kernel stopped") +} + +if (!cmd) { + console.log("usage: forms start | stop") +} diff --git a/forms/client/index.js b/forms/client/index.js new file mode 100644 index 0000000..9d9ad97 --- /dev/null +++ b/forms/client/index.js @@ -0,0 +1,60 @@ +import WebSocket from "ws" + +export default class FormsClient { + constructor(url = "ws://localhost:4001") { + this.url = url + this.ws = null + this.seq = 0 + this.handlers = new Map() + } + + connect() { + this.ws = new WebSocket(this.url) + + this.ws.on("message", msg => this._onMessage(msg)) + this.ws.on("open", () => console.log("[forms] connected")) + this.ws.on("close", () => console.log("[forms] disconnected")) + } + + append(form, data) { + this.ws.send(JSON.stringify({ + op: "append", + form, + data + })) + } + + watch(form, handler) { + if (!this.handlers.has(form)) { + this.handlers.set(form, new Set()) + } + + this.handlers.get(form).add(handler) + + this.ws.send(JSON.stringify({ + op: "replay", + form, + fromSeq: this.seq + })) + } + + _onMessage(raw) { + const msg = JSON.parse(raw.toString()) + + if (msg.type === "hello") { + this.seq = msg.seq + return + } + + if (msg.type === "form:update") { + this.seq = msg.seq + + const set = this.handlers.get(msg.form) + if (set) { + for (const fn of set) { + fn(msg.data, msg) + } + } + } + } +} diff --git a/forms/kernel/kernel.js b/forms/kernel/kernel.js new file mode 100644 index 0000000..a5710d2 --- /dev/null +++ b/forms/kernel/kernel.js @@ -0,0 +1,120 @@ +import fs from "fs" +import path from "path" +import { WebSocketServer } from "ws" + +const DATA_DIR = path.resolve("./") +const DATA_FILE = path.join(DATA_DIR, "master.forms") + +if (!fs.existsSync(DATA_DIR)) { + fs.mkdirSync(DATA_DIR, { recursive: true }) +} + +if (!fs.existsSync(DATA_FILE)) { + fs.writeFileSync(DATA_FILE, "") +} + +class Kernel { + port + seq = 0 + clients = new Set() + + constructor(port = 10000) { + this.port = port + + this._loadSeq() + + this.wss = new WebSocketServer({ port: this.port }) + this.wss.on("connection", ws => this._onConnection(ws)) + + console.log(`[kernel] running on ws://localhost:${port}`) + } + + _loadSeq() { + const data = fs.readFileSync(DATA_FILE, "utf8") + if (!data) return + + const lines = data.trim().split("\n") + const last = lines[lines.length - 1] + if (last) { + const entry = JSON.parse(last) + this.seq = entry.seq + } + } + + _onConnection(ws) { + this.clients.add(ws) + + ws.on("message", msg => this._onMessage(ws, msg)) + ws.on("close", () => this.clients.delete(ws)) + + ws.send(JSON.stringify({ + type: "hello", + seq: this.seq + })) + } + + _onMessage(ws, raw) { + let msg + try { + msg = JSON.parse(raw.toString()) + } catch { + return + } + + if (msg.op === "append") { + this._append(msg.form, msg.data) + } + + if (msg.op === "replay") { + this._replay(ws, msg.form, msg.fromSeq) + } + } + + _append(form, data) { + const entry = { + seq: ++this.seq, + time: Date.now(), + form, + data + } + + fs.appendFileSync(DATA_FILE, JSON.stringify(entry) + "\n") + + const payload = JSON.stringify({ + type: "form:update", + ...entry + }) + + for (const ws of this.clients) { + if (ws.readyState === ws.OPEN) { + ws.send(payload) + } + } + } + + _replay(ws, form, fromSeq = 0) { + const stream = fs.createReadStream(DATA_FILE, { encoding: "utf8" }) + let buffer = "" + + stream.on("data", chunk => { + buffer += chunk + let idx + while ((idx = buffer.indexOf("\n")) >= 0) { + const line = buffer.slice(0, idx) + buffer = buffer.slice(idx + 1) + + if (!line) continue + + const entry = JSON.parse(line) + if (entry.seq > fromSeq && entry.form === form) { + ws.send(JSON.stringify({ + type: "form:update", + ...entry + })) + } + } + }) + } +} + +new Kernel() diff --git a/db/forms.js b/forms/master.forms similarity index 100% rename from db/forms.js rename to forms/master.forms diff --git a/forms/package.json b/forms/package.json new file mode 100644 index 0000000..b6fdf32 --- /dev/null +++ b/forms/package.json @@ -0,0 +1,14 @@ +{ + "name": "forms", + "version": "0.1.0", + "type": "module", + "bin": { + "forms": "./bin/forms.js" + }, + "exports": { + ".": "./client/index.js" + }, + "dependencies": { + "ws": "^8.16.0" + } +} \ No newline at end of file diff --git a/forms/readme.md b/forms/readme.md new file mode 100644 index 0000000..bd28cf3 --- /dev/null +++ b/forms/readme.md @@ -0,0 +1,8 @@ +cd forms +npm link + +In app: +npm link forms + +Start: +forms start \ No newline at end of file diff --git a/master.forms b/master.forms new file mode 100644 index 0000000..e69de29