confiltermon/index.js
2025-06-23 19:50:28 +03:00

656 lines
15 KiB
JavaScript

var dgram = require('node:dgram');
var net = require('node:net');
var fs = require('node:fs');
var { pipeline, Transform } = require('node:stream');
var http = require('node:http');
var child_process = require('node:child_process');
function spawner(cmd) {
return async function() {
console.log('$ '+cmd);
let proc = child_process.spawn('bash', ['-c', cmd], { stdio: 'inherit' });
await new Promise((ok, err) => {
proc.on('error', err);
proc.on('exit', (code, signal) => {
if (code != 0)
err(new Error('exited with error '+code+', '+signal));
else
ok();
});
});
}
}
async function minetest_preread(conn) {
let msg = await conn.recv();
if (Buffer.from([0x4f,0x45,0x74,0x03,0x00,0x00,0x00,0x01]).compare(msg)) {
let run = true;
(async function() {
await new Promise((ok, err)=>setTimeout(ok, 7000));
if (!run) return;
let reason = 'Server is taking a bit long to start. Please press "reconnect" immediately.';
reason = [...(new TextEncoder()).encode(reason)];
Promise.resolve(conn.service.powerOn())
.then(() => new Promise((ok) => setTimeout(ok,60000*0.5))) // +0.5 min
.then(() => conn.service.powerOff());
conn.send(Buffer.from([
0x4f,0x45,0x74,0x03, // proto
0,1, // peer
0, // chan
1, // type: original
0x00, 0x0A, // cmd: Access denied
11, // reason: shutdown
(reason.length >> 8) & 0xFF,
(reason.length >> 0) & 0xFF,
...reason,
1 // reconnect
]));
})();
conn.cancel = function() { run = false; }
return true;
}
conn.send(Buffer.from([...Array(14)].map(e=>0)));
await conn.recv();
return false;
}
async function minetestServerlist(opt) {
let me;
let targetURL = new URL(opt.targetURL ?? 'https://servers.luanti.org/announce');
console.log('Starting serverlist proxy to '+targetURL+' from '+JSON.stringify(opt.bindAddr));
opt = {
cacheDir: __dirname+'/.msv_cache',
...opt
};
let cacheDir = opt.cacheDir;
fs.mkdirSync(opt.cacheDir, {recursive: true});
let servers = new Map();
var server = http.createServer(async (req, res) => {
let rescode;
let resbody;
try {
if (req.method != 'POST' || (new URL(req.url,'http://localhost')).pathname != '/announce') {
rescode = 400;
throw new Error('bad request');
}
console.log('Serverlist incoming');
let body = [];
for await (let chunk of req) {
body.push(chunk);
}
body = body.join('');
// This is evil and wrong, but so is sending JSON wrapped in a multipart/form-data.
body = body.split('\n').filter(e=>e[0]=='{')[0];
console.log('Data:',body);
body = JSON.parse(body);
let addr = [parseInt(body.port ?? 30000), ...(body.address ? [body.address] : [])];
let key;
let entry = servers.get(key = JSON.stringify(addr));
if (!entry) {
entry = servers.get(key = JSON.stringify([addr[0]]));
}
if (!entry) {
rescode = 403;
throw new Error('bad server')
}
let formData = new FormData();
formData.append('json', JSON.stringify(body));
let sigma = await fetch(targetURL, {
method: 'POST',
body: formData,
});
resbody = await sigma.text();
if (sigma.status < 200 || sigma.status > 299) {
rescode = sigma.status;
console.error('Bad response '+rescode.status+': '+resbody);
throw new Error('Sigmature failure');
}
if (body.action in {'update': 1, 'start': 1}) {
entry.lastAnnounce = {
...body,
action: 'update',
uptime: 0,
name: body.name + ' (Idle)',
...(body.clients_list ? {clients_list: []} : {}),
clients: 0
};
try {
fs.writeFileSync(opt.cacheDir+'/'+btoa(btoa(key)), JSON.stringify(entry.lastAnnounce));
} catch {}
}
entry.bumpout.bump();
} catch(e) {
console.error(e);
let msg = 'error';
try {
msg = e.message;
} catch {}
if (msg != 'Sigmature failure') {
resbody = msg;
}
}
let buf = Buffer.from(resbody);
res
.writeHead(rescode ?? 503, {
'Content-Type': 'text/plain',
'Content-Length': buf.length
})
.end(buf);
});
server.listen(...opt.bindAddr);
await sockWaitFor(server, 'listening');
return me = {
add(opt) {
let entry, key, bumpout;
opt = {...opt};
servers.set(key = JSON.stringify(opt.announceAddr), entry = {
bumpout: bumpout = new Bumpout(function() {
me.reannounce(opt.announceAddr);
}, opt.reannounceTime),
...(() => { try {
console.log('adding entry',key);
let obj = {
lastAnnounce: JSON.parse(fs.readFileSync(cacheDir+'/'+btoa(btoa(key))))
};
setTimeout(function() {
me.reannounce(opt.announceAddr, {action: 'start'});
},100);
return obj;
} catch(e) { console.error(e); return {}; } })()
})
},
async reannounce(addr, over) {
let key;
let entry = servers.get(key = JSON.stringify(addr));
if (!entry) return;
if (!entry.lastAnnounce) return;
console.log('announcing '+key);
let i = 0;
while (1) {
entry.bumpout.cancel();
try {
let formData = new FormData();
formData.append('json', JSON.stringify({
...entry.lastAnnounce,
...(over ?? {})
}));
let res = await fetch(targetURL, {
method: 'POST',
body: formData,
});
if(res.status < 200 || res.status > 299) {
let body = await res.text();
throw new Error('bad response '+res.status+': '+body);
}
entry.bumpout.restart();
console.log('announced '+key);
break;
} catch(e) {
console.error(e);
await new Promise((ok) => setTimeout(ok, 2000));
}
i++; if (i >= 5) {
throw new Error('Giving up on serverlist');
}
entry.bumpout.restart();
}
}
};
}
function minetestPort(opt) {
return {
type: 'udp',
timeout: 5000,
preread: minetest_preread,
...opt,
}
}
function minetestDockerService(opt) {
opt = {
dockerCommand: 'docker', // if u use a different kind of docker like a podman maybe
announceAddr: opt.bindAddr,
reannounceTime: 305000,
...opt
};
let container = opt.container;
let docker = opt.dockerCommand;
let srv;
if (opt.serverlist) {
opt.serverlist.add(opt);
}
delete opt.container;
return {
ports: [
minetestPort(opt)
],
powerOn: spawner(
`if ! ${docker} ps --format '{{.Names}}' | grep '^${container}$'; then `+
`ct=$(date +%s); `+
`path=/tmp/pid-$(head -c 8 /dev/urandom | xxd -p); `+
`${docker} start ${container} && `+
`(sh -c 'echo $PPID' > $path && `+
`${docker} logs --follow --since $ct ${container} 2>&1 </dev/null | `+
`grep --line-buffered 'Server for gameid="[^"]*" listening on') | `+
`(read line; echo $line; kill -TERM $(cat $path); rm $path); `+
`fi; :;`),
powerOff: (fun => async function () {
await fun();
try {
await opt.serverlist.reannounce(opt.announceAddr, { action: 'start' });
} catch {}
})(spawner(`${docker} stop ${container}`)),
}
}
async function loadConfig() {
let path = __dirname + "/config.json't";
fun = eval('(async function(path) { ' +
fs.readFileSync(path) +
'; })');
return await fun(path);
}
var sockWaitFor = async function(socket, ev) {
var ok, err;
await new Promise((a, b) => {
[ok, err] = [a, b];
socket.on('error', err);
socket.on(ev, ok);
});
socket.off('error', err);
socket.off(ev, ok);
}
// timeout but u can bump it so it fires later
var Bumpout = function(fun, time) {
this._callback = (...args) => {
this._handle = null;
return fun(...args);
}
this._timeout = time;
this.restart();
}
Bumpout.prototype.bump = function() {
if (!this._handle) return false;
return this.restart();
}
Bumpout.prototype.cancel = function() {
if (!this._handle) return false;
clearTimeout(this._handle);
this._handle = null;
return true;
}
Bumpout.prototype.restart = function(timeout) {
if (this._handle) this.cancel();
this._handle = setTimeout(this._callback, timeout ?? this._timeout);
return true;
}
async function bindUdp(...arg) {
var server;
await (async fun => {
// this is stupid but im stupid too so its ok
try {
server = dgram.createSocket('udp6');
await fun();
} catch {
server = dgram.createSocket('udp4');
await fun();
}
})(async () => {
server.bind(...arg);
await sockWaitFor(server, 'listening');
});
return server;
}
async function connectUdp(...arg) {
var client;
await (async fun => {
// see ${_FILE}:${_LINE - 18}
try {
client = dgram.createSocket('udp6');
await fun();
} catch {
client = dgram.createSocket('udp4');
await fun();
}
})(async () => {
client.bind(0);
await sockWaitFor(client, 'listening');
client.connect(...arg);
await sockWaitFor(client, 'connect');
});
return client
}
async function bindTcp(...arg) {
var server = net.createServer();
server.listen(...arg);
await sockWaitFor(server, 'listening');
return server;
}
async function connectTcp(...arg) {
var client = new net.Socket();
client.connect(...arg);
await sockWaitFor(client, 'connect');
return client;
}
function connTemplate(conns, key, sendCl, timeout, filter) {
let conn = {};
return Object.assign(conn, {
queue: [],
permaque: [],
...(fun => ({ promReset: fun, ...fun() }))(() => ({
cond() {},
decond(err) {
this.prom = Promise.reject(err);
this.prom.catch(() => {});
},
prom: null,
})),
...(conns ? {
onkill() {
this.decond(new Error("timeout"));
this.killed = true;
conns.delete(key);
},
bumpout: new Bumpout(() => {
conn.onkill();
}, timeout),
} : {}),
async recv() {
while (!this.queue.length) {
if (this.prom)
await this.prom;
else {
this.prom = new Promise((ok, err) => {
this.cond = () => {
Object.assign(this, this.promReset());
ok();
}
this.decond = err;
});
this.prom.catch(() => {});
}
}
return this.queue.shift();
},
sendCl,
async send(msg) {
this.queue.push(msg);
Promise.resolve(filter(msg, 'client->server'))
.then(msg => this.permaque.push(msg));
if (this.bumpout)
this.bumpout.bump();
this.cond();
}
});
}
async function proxyUdp(address, upstreamAddr, service, timeout, preread, filter) {
var server = await bindUdp(...address);
var conns = new Map();
server.on('message', async (msg, rinfo) => {
let key = JSON.stringify([rinfo.address, rinfo.port]);
if (!conns.has(key)) {
conns.set(key, connTemplate(conns, key, function(msg) {
server.send(msg, rinfo.port, rinfo.address);
}, timeout, filter));
let conn = conns.get(key);
(async () => {
let filc;
let pass = await preread(filc = {
service,
async recv(len) { return conn.recv(); },
async send(msg) { return conn.sendCl(msg); }
});
if (!pass) {
conns.delete(key);
return;
}
try {
await service.powerOn();
} finally {
if(filc.cancel) {
try {
filc.cancel()
} catch(e) {
console.error(e);
}
}
}
let upstream;
try {
upstream = await connectUdp(...upstreamAddr);
} catch(e) {
console.error(e);
conn.killed = true;
}
if (conn.killed) {
upstream.close();
await service.powerOff();
return;
}
upstream.on('message', (msg) => {
conn.bumpout.bump();
Promise.resolve(filter(msg, 'server->client'))
.then(msg => conn.sendCl(msg));
});
conn.permaque.forEach(msg => upstream.send(msg));
for (let k of ['queue','permaque','cond','decond','recv']) {
delete conn[k];
}
Object.assign(conn, {
upstream,
onkill() {
this.upstream.close();
conns.delete(key);
service.powerOff().catch(e => console.error(e));
},
send(msg) {
this.bumpout.bump();
Promise.resolve(filter(msg, 'client->server'))
.then(msg => this.upstream.send(msg));
}
});
})().catch(e => console.error(e));
}
let conn = conns.get(key);
await conn.send(msg);
});
}
async function proxyTcp(address, upstreamAddr, service, timeout, preread, filter) {
var server = await bindTcp(...address);
server.on('connection', async (client) => {
client.setTimeout(timeout);
client.on('timeout', () => {
client.destroy();
});
let conn = connTemplate(null, null, (msg) => client.write(msg), filter);
let msgh, clsh;
client.on('data', msgh = (msg) => {
filter(msg, 'client->server')
.then(msg => conn.send(msg));
});
client.on('close', clsh = () => {
conn.decond(new Error("closed"));
})
let pass = await filter({
service,
async recv(len) { return conn.recv(); },
async send(msg) { return conn.sendCl(msg); }
});
if (!pass) {
client.close();
return;
}
try {
await service.powerOn();
} finally {
if(filc.cancel) {
try {
filc.cancel()
} catch(e) {
console.error(e);
}
}
}
let upstream;
try {
upstream = await connectTcp(...upstreamAddr);
conn.permaque.forEach(msg => upstream.write(msg));
upstream.setTimeout(timeout);
upstream.on('timeout', () => {
upstream.destroy();
});
} catch(e) {
await service.powerOff();
return;
}
function trans(gender) {
return new Transform({
transform(msg, enc, next) {
filter(msg, gender)
.then(msg => next(null, msg))
.catch(err => next(err));
}
})
}
try {
await Promise.all(
pipeline(client, trans('client->server'), upstream),
pipeline(upstream, trans('server->client'), client)
);
} finally {
await service.powerOff();
}
});
}
function MultiplexedService(service) {
this._service = service;
this._count = 0;
this._prom = null;
}
Object.assign(MultiplexedService.prototype, {
async powerOn() {
if (this._prom)
await this._prom;
if ((this._count++) == 0) {
try {
await (this._prom = this._service.powerOn());
} finally {
this._prom = null;
}
}
},
async powerOff() {
if (this._prom)
await this._prom;
if (this._count == 0) {
throw new Error("Too many powerOffs");
}
if ((--this._count) == 0) {
try {
await (this._prom = this._service.powerOff());
} finally {
this._prom = null;
}
}
},
});
function StagedService(service) {
this._service = service;
this._stack = [];
this._prom = null;
this._timeout = null;
}
Object.assign(StagedService.prototype, {
async powerOn() {
if (this._prom)
await this._prom;
await (this._prom = (async () => {
if (this._timeout) {
clearTimeout(this._timeout);
}
while (this._stack.length > 0) {
await this._stack.pop()();
}
})());
this._prom = null;
},
setOffPower() {
let level = this._service.stages[this._stack.length]
if(this._timeout || !level)
return;
this._timeout = setTimeout(async () => {
await (this._prom = level.powerOff());
this._prom = null;
this._stack.push(() => level.powerOn());
this._timeout = null;
setOffPower();
}, level.timeout);
},
async powerOff() {
if (this._prom)
await this._prom;
setOffPower();
},
});
function errHandler(err) {
console.error('ERROR!', err);
}
process.on('uncaughtException', errHandler);
process.on('unhandledRejection', errHandler);
var myServices = Object.create(null);
(async () => {
var config = await loadConfig();
for (let [name, service] of Object.entries(config.services)) {
let xservice = new MultiplexedService(service);
myServices[name] = xservice;
xservice.name = name;
for (let port of service.ports) {
await ({
tcp: proxyTcp,
udp: proxyUdp
})[port.type](
port.bindAddr,
port.upstreamAddr,
xservice,
port.timeout ?? 30000,
port.preread ?? (() => true),
port.filter ?? (msg => msg),
);
}
}
})();