From f91cdea1fb31666dd3e6eb285bda403b59eff1cf Mon Sep 17 00:00:00 2001 From: root Date: Sun, 22 Jun 2025 17:50:13 +0300 Subject: [PATCH] Fixes & minetest utils --- config.json't.example | 9 +++ index.js | 180 ++++++++++++++++++++++++++++++++++++------ 2 files changed, 166 insertions(+), 23 deletions(-) create mode 100644 config.json't.example diff --git a/config.json't.example b/config.json't.example new file mode 100644 index 0000000..6be580e --- /dev/null +++ b/config.json't.example @@ -0,0 +1,9 @@ +return { + services: { + mintest: minetestDockerService({ + container: 'minetest', + bindAddr: [30000], + upstreamAddr: [31000, '127.0.0.1'], + }), + } +} diff --git a/index.js b/index.js index 81347ea..78572b4 100644 --- a/index.js +++ b/index.js @@ -1,7 +1,75 @@ var dgram = require('node:dgram'); var net = require('node:net'); var fs = require('node:fs'); -var { pipeline } = require('node:stream'); +var { pipeline, Transform } = require('node:stream'); + +var child_process = require('node:child_process'); + +function spawner(cmd) { + return async function() { + console.log('$ '+cmd); + let proc = child_process.spawn('bash', ['-c', cmd], { stdio: 'inherit' }); + await new Promise((ok, err) => { + proc.on('error', err); + proc.on('exit', (code, signal) => {if (code != 0) err(new Error('exited with error '+code+', '+signal)); else ok();}); + }); + } +} + +async function minetest_preread(conn) { + let msg = await conn.recv(); + if (Buffer.from([0x4f,0x45,0x74,0x03,0x00,0x00,0x00,0x01]).compare(msg)) { + let run = true; + (async function() { + await new Promise((ok, err)=>setTimeout(ok, 7000)); + if (!run) return; + let reason = 'Server is taking a bit long to start. Please press "reconnect" immediately.'; + reason = [...(new TextEncoder()).encode(reason)]; + Promise.resolve(conn.service.powerOn()) + .then(() => new Promise((ok) => setTimeout(ok,60000*0.5))) // +0.5 min + .then(() => conn.service.powerOff()); + conn.send(Buffer.from([ + 0x4f,0x45,0x74,0x03, // proto + 0,1, // peer + 0, // chan + 1, // type: original + 0x00, 0x0A, // cmd: Access denied + 11, // reason: shutdown + (reason.length >> 8) & 0xFF, + (reason.length >> 0) & 0xFF, + ...reason, + 1 // reconnect + ])); + })(); + conn.cancel = function() { run = false; } + return true; + } + conn.send(Buffer.from([...Array(14)].map(e=>0))); + await conn.recv(); + return false; +} + +function minetestPort(opt) { + return { + type: 'udp', + timeout: 5000, + preread: minetest_preread, + ...opt, + } +} + +function minetestDockerService(opt) { + opt = {...opt}; + let container = opt.container; + delete opt.container; + return { + ports: [ + minetestPort(opt) + ], + powerOn: spawner(`if ! docker ps --format '{{.Names}}' | grep '^${container}$'; then ct=$(date +%s); path=/tmp/pid-$(head -c 8 /dev/urandom | xxd -p); docker start ${container} && (sh -c 'echo $PPID' > $path && docker logs --follow --since $ct ${container} 2>&1 { @@ -143,7 +212,8 @@ function connTemplate(conns, key, sendCl) { sendCl, async send(msg) { this.queue.push(msg); - this.permaque.push(msg); + Promise.resolve(filter(msg, 'client->server')) + .then(msg => this.permaque.push(msg)); if (this.bumpout) this.bumpout.bump(); this.cond(); @@ -151,7 +221,7 @@ function connTemplate(conns, key, sendCl) { }); } -async function proxyUdp(address, upstreamAddr, service, timeout, filter) { +async function proxyUdp(address, upstreamAddr, service, timeout, preread, filter) { var server = await bindUdp(...address); var conns = new Map(); @@ -161,10 +231,12 @@ async function proxyUdp(address, upstreamAddr, service, timeout, filter) { if (!conns.has(key)) { conns.set(key, connTemplate(conns, key, function(msg) { server.send(msg, rinfo.port, rinfo.address); - })); + }, timeout, filter)); let conn = conns.get(key); (async () => { - let pass = await filter({ + let filc; + let pass = await preread(filc = { + service, async recv(len) { return conn.recv(); }, async send(msg) { return conn.sendCl(msg); } }); @@ -172,11 +244,33 @@ async function proxyUdp(address, upstreamAddr, service, timeout, filter) { conns.delete(key); return; } - await service.powerOn(); - let upstream = await connectUdp(...upstreamAddr); + try { + await service.powerOn(); + } finally { + if(filc.cancel) { + try { + filc.cancel() + } catch(e) { + console.error(e); + } + } + } + let upstream; + try { + upstream = await connectUdp(...upstreamAddr); + } catch(e) { + console.error(e); + conn.killed = true; + } + if (conn.killed) { + upstream.close(); + await service.powerOff(); + return; + } upstream.on('message', (msg) => { conn.bumpout.bump(); - conn.sendCl(msg); + Promise.resolve(filter(msg, 'server->client')) + .then(msg => conn.sendCl(msg)); }); conn.permaque.forEach(msg => upstream.send(msg)); for (let k of ['queue','permaque','cond','decond','recv']) { @@ -191,7 +285,8 @@ async function proxyUdp(address, upstreamAddr, service, timeout, filter) { }, send(msg) { this.bumpout.bump(); - this.upstream.send(msg); + Promise.resolve(filter(msg, 'client->server')) + .then(msg => this.upstream.send(msg)); } }); })().catch(e => console.error(e)); @@ -201,7 +296,7 @@ async function proxyUdp(address, upstreamAddr, service, timeout, filter) { }); } -async function proxyTcp(address, upstreamAddr, service, timeout, filter) { +async function proxyTcp(address, upstreamAddr, service, timeout, preread, filter) { var server = await bindTcp(...address); server.on('connection', async (client) => { @@ -209,15 +304,17 @@ async function proxyTcp(address, upstreamAddr, service, timeout, filter) { client.on('timeout', () => { client.destroy(); }); - let conn = connTemplate(null, null, (msg) => client.write(msg)); + let conn = connTemplate(null, null, (msg) => client.write(msg), filter); let msgh, clsh; client.on('data', msgh = (msg) => { - conn.send(msg); + filter(msg, 'client->server') + .then(msg => conn.send(msg)); }); client.on('close', clsh = () => { conn.decond(new Error("closed")); }) let pass = await filter({ + service, async recv(len) { return conn.recv(); }, async send(msg) { return conn.sendCl(msg); } }); @@ -225,16 +322,42 @@ async function proxyTcp(address, upstreamAddr, service, timeout, filter) { client.close(); return; } - await service.powerOn(); - let upstream = await connectTcp(...upstreamAddr); - upstream.setTimeout(timeout); - upstream.on('timeout', () => { - upstream.destroy(); - }); + try { + await service.powerOn(); + } finally { + if(filc.cancel) { + try { + filc.cancel() + } catch(e) { + console.error(e); + } + } + } + let upstream; + try { + upstream = await connectTcp(...upstreamAddr); + conn.permaque.forEach(msg => upstream.write(msg)); + upstream.setTimeout(timeout); + upstream.on('timeout', () => { + upstream.destroy(); + }); + } catch(e) { + await service.powerOff(); + return; + } + function trans(gender) { + return new Transform({ + transform(msg, enc, next) { + filter(msg, gender) + .then(msg => next(null, msg)) + .catch(err => next(err)); + } + }) + } try { await Promise.all( - pipeline(client, upstream), - pipeline(upstream, client) + pipeline(client, trans('client->server'), upstream), + pipeline(upstream, trans('server->client'), client) ); } finally { await service.powerOff(); @@ -298,6 +421,7 @@ Object.assign(StagedService.prototype, { await this._stack.pop()(); } })()); + this._prom = null; }, setOffPower() { let level = this._service.stages[this._stack.length] @@ -307,6 +431,7 @@ Object.assign(StagedService.prototype, { this._timeout = setTimeout(async () => { await (this._prom = level.powerOff()); + this._prom = null; this._stack.push(() => level.powerOn()); this._timeout = null; setOffPower(); @@ -320,6 +445,13 @@ Object.assign(StagedService.prototype, { }, }); +function errHandler(err) { + console.error('ERROR!', err); +} + +process.on('uncaughtException', errHandler); +process.on('unhandledRejection', errHandler); + let myServices = Object.create(null); (async () => { @@ -327,7 +459,8 @@ let myServices = Object.create(null); for (let [name, service] of Object.entries(config.services)) { let xservice = new MultiplexedService(service); - myservices[name] = xservice; + myServices[name] = xservice; + xservice.name = name; for (let port of service.ports) { await ({ tcp: proxyTcp, @@ -337,7 +470,8 @@ let myServices = Object.create(null); port.upstreamAddr, xservice, port.timeout ?? 30000, - port.filter ?? (() => true) + port.preread ?? (() => true), + port.filter ?? (msg => msg), ); } }