Fixes & minetest utils

This commit is contained in:
root 2025-06-22 17:50:13 +03:00
parent 16353ec68f
commit f91cdea1fb
2 changed files with 166 additions and 23 deletions

9
config.json't.example Normal file
View file

@ -0,0 +1,9 @@
return {
services: {
mintest: minetestDockerService({
container: 'minetest',
bindAddr: [30000],
upstreamAddr: [31000, '127.0.0.1'],
}),
}
}

180
index.js
View file

@ -1,7 +1,75 @@
var dgram = require('node:dgram');
var net = require('node:net');
var fs = require('node:fs');
var { pipeline } = require('node:stream');
var { pipeline, Transform } = require('node:stream');
var child_process = require('node:child_process');
function spawner(cmd) {
return async function() {
console.log('$ '+cmd);
let proc = child_process.spawn('bash', ['-c', cmd], { stdio: 'inherit' });
await new Promise((ok, err) => {
proc.on('error', err);
proc.on('exit', (code, signal) => {if (code != 0) err(new Error('exited with error '+code+', '+signal)); else ok();});
});
}
}
async function minetest_preread(conn) {
let msg = await conn.recv();
if (Buffer.from([0x4f,0x45,0x74,0x03,0x00,0x00,0x00,0x01]).compare(msg)) {
let run = true;
(async function() {
await new Promise((ok, err)=>setTimeout(ok, 7000));
if (!run) return;
let reason = 'Server is taking a bit long to start. Please press "reconnect" immediately.';
reason = [...(new TextEncoder()).encode(reason)];
Promise.resolve(conn.service.powerOn())
.then(() => new Promise((ok) => setTimeout(ok,60000*0.5))) // +0.5 min
.then(() => conn.service.powerOff());
conn.send(Buffer.from([
0x4f,0x45,0x74,0x03, // proto
0,1, // peer
0, // chan
1, // type: original
0x00, 0x0A, // cmd: Access denied
11, // reason: shutdown
(reason.length >> 8) & 0xFF,
(reason.length >> 0) & 0xFF,
...reason,
1 // reconnect
]));
})();
conn.cancel = function() { run = false; }
return true;
}
conn.send(Buffer.from([...Array(14)].map(e=>0)));
await conn.recv();
return false;
}
function minetestPort(opt) {
return {
type: 'udp',
timeout: 5000,
preread: minetest_preread,
...opt,
}
}
function minetestDockerService(opt) {
opt = {...opt};
let container = opt.container;
delete opt.container;
return {
ports: [
minetestPort(opt)
],
powerOn: spawner(`if ! docker ps --format '{{.Names}}' | grep '^${container}$'; then ct=$(date +%s); path=/tmp/pid-$(head -c 8 /dev/urandom | xxd -p); docker start ${container} && (sh -c 'echo $PPID' > $path && docker logs --follow --since $ct ${container} 2>&1 </dev/null | grep --line-buffered 'Server for gameid="[^"]*" listening on') | (read line; echo $line; kill -TERM $(cat $path); rm $path); fi; :;`),
powerOff: spawner(`docker stop ${container}`),
}
}
async function loadConfig() {
let path = __dirname + "/config.json't";
@ -101,7 +169,7 @@ async function connectTcp(...arg) {
return client;
}
function connTemplate(conns, key, sendCl) {
function connTemplate(conns, key, sendCl, timeout, filter) {
let conn = {};
return Object.assign(conn, {
queue: [],
@ -117,6 +185,7 @@ function connTemplate(conns, key, sendCl) {
...(conns ? {
onkill() {
this.decond(new Error("timeout"));
this.killed = true;
conns.delete(key);
},
bumpout: new Bumpout(() => {
@ -143,7 +212,8 @@ function connTemplate(conns, key, sendCl) {
sendCl,
async send(msg) {
this.queue.push(msg);
this.permaque.push(msg);
Promise.resolve(filter(msg, 'client->server'))
.then(msg => this.permaque.push(msg));
if (this.bumpout)
this.bumpout.bump();
this.cond();
@ -151,7 +221,7 @@ function connTemplate(conns, key, sendCl) {
});
}
async function proxyUdp(address, upstreamAddr, service, timeout, filter) {
async function proxyUdp(address, upstreamAddr, service, timeout, preread, filter) {
var server = await bindUdp(...address);
var conns = new Map();
@ -161,10 +231,12 @@ async function proxyUdp(address, upstreamAddr, service, timeout, filter) {
if (!conns.has(key)) {
conns.set(key, connTemplate(conns, key, function(msg) {
server.send(msg, rinfo.port, rinfo.address);
}));
}, timeout, filter));
let conn = conns.get(key);
(async () => {
let pass = await filter({
let filc;
let pass = await preread(filc = {
service,
async recv(len) { return conn.recv(); },
async send(msg) { return conn.sendCl(msg); }
});
@ -172,11 +244,33 @@ async function proxyUdp(address, upstreamAddr, service, timeout, filter) {
conns.delete(key);
return;
}
await service.powerOn();
let upstream = await connectUdp(...upstreamAddr);
try {
await service.powerOn();
} finally {
if(filc.cancel) {
try {
filc.cancel()
} catch(e) {
console.error(e);
}
}
}
let upstream;
try {
upstream = await connectUdp(...upstreamAddr);
} catch(e) {
console.error(e);
conn.killed = true;
}
if (conn.killed) {
upstream.close();
await service.powerOff();
return;
}
upstream.on('message', (msg) => {
conn.bumpout.bump();
conn.sendCl(msg);
Promise.resolve(filter(msg, 'server->client'))
.then(msg => conn.sendCl(msg));
});
conn.permaque.forEach(msg => upstream.send(msg));
for (let k of ['queue','permaque','cond','decond','recv']) {
@ -191,7 +285,8 @@ async function proxyUdp(address, upstreamAddr, service, timeout, filter) {
},
send(msg) {
this.bumpout.bump();
this.upstream.send(msg);
Promise.resolve(filter(msg, 'client->server'))
.then(msg => this.upstream.send(msg));
}
});
})().catch(e => console.error(e));
@ -201,7 +296,7 @@ async function proxyUdp(address, upstreamAddr, service, timeout, filter) {
});
}
async function proxyTcp(address, upstreamAddr, service, timeout, filter) {
async function proxyTcp(address, upstreamAddr, service, timeout, preread, filter) {
var server = await bindTcp(...address);
server.on('connection', async (client) => {
@ -209,15 +304,17 @@ async function proxyTcp(address, upstreamAddr, service, timeout, filter) {
client.on('timeout', () => {
client.destroy();
});
let conn = connTemplate(null, null, (msg) => client.write(msg));
let conn = connTemplate(null, null, (msg) => client.write(msg), filter);
let msgh, clsh;
client.on('data', msgh = (msg) => {
conn.send(msg);
filter(msg, 'client->server')
.then(msg => conn.send(msg));
});
client.on('close', clsh = () => {
conn.decond(new Error("closed"));
})
let pass = await filter({
service,
async recv(len) { return conn.recv(); },
async send(msg) { return conn.sendCl(msg); }
});
@ -225,16 +322,42 @@ async function proxyTcp(address, upstreamAddr, service, timeout, filter) {
client.close();
return;
}
await service.powerOn();
let upstream = await connectTcp(...upstreamAddr);
upstream.setTimeout(timeout);
upstream.on('timeout', () => {
upstream.destroy();
});
try {
await service.powerOn();
} finally {
if(filc.cancel) {
try {
filc.cancel()
} catch(e) {
console.error(e);
}
}
}
let upstream;
try {
upstream = await connectTcp(...upstreamAddr);
conn.permaque.forEach(msg => upstream.write(msg));
upstream.setTimeout(timeout);
upstream.on('timeout', () => {
upstream.destroy();
});
} catch(e) {
await service.powerOff();
return;
}
function trans(gender) {
return new Transform({
transform(msg, enc, next) {
filter(msg, gender)
.then(msg => next(null, msg))
.catch(err => next(err));
}
})
}
try {
await Promise.all(
pipeline(client, upstream),
pipeline(upstream, client)
pipeline(client, trans('client->server'), upstream),
pipeline(upstream, trans('server->client'), client)
);
} finally {
await service.powerOff();
@ -298,6 +421,7 @@ Object.assign(StagedService.prototype, {
await this._stack.pop()();
}
})());
this._prom = null;
},
setOffPower() {
let level = this._service.stages[this._stack.length]
@ -307,6 +431,7 @@ Object.assign(StagedService.prototype, {
this._timeout = setTimeout(async () => {
await (this._prom = level.powerOff());
this._prom = null;
this._stack.push(() => level.powerOn());
this._timeout = null;
setOffPower();
@ -320,6 +445,13 @@ Object.assign(StagedService.prototype, {
},
});
function errHandler(err) {
console.error('ERROR!', err);
}
process.on('uncaughtException', errHandler);
process.on('unhandledRejection', errHandler);
let myServices = Object.create(null);
(async () => {
@ -327,7 +459,8 @@ let myServices = Object.create(null);
for (let [name, service] of Object.entries(config.services)) {
let xservice = new MultiplexedService(service);
myservices[name] = xservice;
myServices[name] = xservice;
xservice.name = name;
for (let port of service.ports) {
await ({
tcp: proxyTcp,
@ -337,7 +470,8 @@ let myServices = Object.create(null);
port.upstreamAddr,
xservice,
port.timeout ?? 30000,
port.filter ?? (() => true)
port.preread ?? (() => true),
port.filter ?? (msg => msg),
);
}
}