1154 lines
27 KiB
JavaScript
1154 lines
27 KiB
JavaScript
var dgram = require('node:dgram');
|
|
var net = require('node:net');
|
|
var fs = require('node:fs');
|
|
var { promises: { pipeline }, Transform } = require('node:stream');
|
|
var http = require('node:http');
|
|
|
|
var child_process = require('node:child_process');
|
|
var configDir = __dirname;
|
|
var configPath = __dirname+"/config.json't";
|
|
|
|
//## EXTRAS ##//
|
|
|
|
function spawner(cmd) {
|
|
return async function() {
|
|
console.log('$ '+cmd);
|
|
let proc = child_process.spawn('bash', ['-c', cmd], { stdio: 'inherit' });
|
|
await new Promise((ok, err) => {
|
|
proc.on('error', err);
|
|
proc.on('exit', (code, signal) => {
|
|
if (code != 0)
|
|
err(new Error('exited with error '+code+', '+signal));
|
|
else
|
|
ok();
|
|
});
|
|
});
|
|
}
|
|
}
|
|
|
|
// Minetest // Luanti
|
|
|
|
async function minetest_preread(conn) {
|
|
let msg = await conn.recv();
|
|
if (Buffer.from([0x4f,0x45,0x74,0x03,0x00,0x00,0x00,0x01]).compare(msg)) {
|
|
let run = true;
|
|
(async function() {
|
|
await new Promise((ok, err)=>setTimeout(ok, 7000));
|
|
if (!run) return;
|
|
let reason = 'Server is taking a bit long to start. Please press "reconnect" immediately.';
|
|
reason = [...(new TextEncoder()).encode(reason)];
|
|
Promise.resolve(conn.service.powerOn())
|
|
.then(() => new Promise((ok) => setTimeout(ok,60000*0.5))) // +0.5 min
|
|
.then(() => conn.service.powerOff());
|
|
conn.send(Buffer.from([
|
|
0x4f,0x45,0x74,0x03, // proto
|
|
0,1, // peer
|
|
0, // chan
|
|
1, // type: original
|
|
0x00, 0x0A, // cmd: Access denied
|
|
11, // reason: shutdown
|
|
(reason.length >> 8) & 0xFF,
|
|
(reason.length >> 0) & 0xFF,
|
|
...reason,
|
|
1 // reconnect
|
|
]));
|
|
})();
|
|
conn.cancel = function() { run = false; }
|
|
return true;
|
|
}
|
|
conn.send(Buffer.from([...Array(14)].map(e=>0)));
|
|
await conn.recv();
|
|
return false;
|
|
}
|
|
|
|
async function* minetest_filter(inp, dir, conn) {
|
|
for await (let pack of inp) {
|
|
if (pack[7]==0 && pack[8]==3) {
|
|
console.log('minetest disco packet');
|
|
conn.bumpout.timeout = 2000;
|
|
if (conn.bumpout.when > Date.now()+conn.bumpout.timeout) {
|
|
conn.bumpout.bump();
|
|
}
|
|
}
|
|
yield pack;
|
|
}
|
|
}
|
|
|
|
async function minetestServerlist(opt) {
|
|
let me;
|
|
let targetURL = new URL(opt.targetURL ?? 'https://servers.luanti.org/announce');
|
|
console.log('Starting serverlist proxy to '+targetURL+' from '+JSON.stringify(opt.bindAddr));
|
|
opt = {
|
|
cacheDir: configDir+'/.msv_cache',
|
|
...opt
|
|
};
|
|
let cacheDir = opt.cacheDir;
|
|
fs.mkdirSync(opt.cacheDir, {recursive: true});
|
|
let servers = new Map();
|
|
var server = http.createServer(async (req, res) => {
|
|
let rescode;
|
|
let resbody;
|
|
try {
|
|
if (req.method != 'POST' || (new URL(req.url,'http://localhost')).pathname != '/announce') {
|
|
rescode = 400;
|
|
throw new Error('bad request');
|
|
}
|
|
console.log('Serverlist incoming');
|
|
let body = [];
|
|
for await (let chunk of req) {
|
|
body.push(chunk);
|
|
}
|
|
body = body.join('');
|
|
|
|
// This is evil and wrong, but so is sending JSON wrapped in a multipart/form-data.
|
|
body = body.split('\n').filter(e=>e[0]=='{')[0];
|
|
console.log('Data:',body);
|
|
|
|
body = JSON.parse(body);
|
|
if(body.action == 'delete') return; // ignore delete
|
|
let addr = [parseInt(body.port ?? 30000), ...(body.address ? [body.address] : [])];
|
|
let key;
|
|
let entry = servers.get(key = JSON.stringify(addr));
|
|
if (!entry) {
|
|
entry = servers.get(key = JSON.stringify([addr[0]]));
|
|
}
|
|
if (!entry) {
|
|
rescode = 403;
|
|
throw new Error('bad server')
|
|
}
|
|
let formData = new FormData();
|
|
if(entry.lastAnnounce) {
|
|
body.action = 'update';
|
|
}
|
|
formData.append('json', JSON.stringify(body));
|
|
let sigma = await fetch(targetURL, {
|
|
method: 'POST',
|
|
body: formData,
|
|
});
|
|
resbody = await sigma.text();
|
|
if (sigma.status < 200 || sigma.status > 299) {
|
|
rescode = sigma.status;
|
|
console.error('Bad response '+rescode.status+': '+resbody);
|
|
throw new Error('Sigmature failure');
|
|
}
|
|
if (body.action in {'update': 1, 'start': 1}) {
|
|
entry.lastAnnounce = {
|
|
...(entry.lastAnnounce ?? {}),
|
|
...body,
|
|
action: 'update',
|
|
uptime: 0,
|
|
name: body.name + ' (Idle)',
|
|
...(body.clients_list ? {clients_list: []} : {}),
|
|
clients: 0
|
|
};
|
|
try {
|
|
fs.writeFileSync(opt.cacheDir+'/'+btoa(btoa(key)), JSON.stringify(entry.lastAnnounce));
|
|
} catch {}
|
|
}
|
|
entry.bumpout.bump();
|
|
rescode = sigma.status;
|
|
} catch(e) {
|
|
console.error(e);
|
|
let msg = 'error';
|
|
try {
|
|
msg = e.message;
|
|
} catch {}
|
|
if (msg != 'Sigmature failure') {
|
|
resbody = msg;
|
|
}
|
|
}
|
|
let buf = Buffer.from(resbody);
|
|
res
|
|
.writeHead(rescode ?? 503, {
|
|
'Content-Type': 'text/plain',
|
|
'Content-Length': buf.length
|
|
})
|
|
.end(buf);
|
|
});
|
|
server.listen(...opt.bindAddr);
|
|
await sockWaitFor(server, 'listening');
|
|
return me = {
|
|
add(opt) {
|
|
let entry, key, bumpout;
|
|
opt = {...opt};
|
|
servers.set(key = JSON.stringify(opt.announceAddr), entry = {
|
|
bumpout: bumpout = new Bumpout(function() {
|
|
me.reannounce(opt.announceAddr);
|
|
}, opt.reannounceTime),
|
|
...(() => { try {
|
|
console.log('adding entry',key);
|
|
let obj = {
|
|
lastAnnounce: JSON.parse(fs.readFileSync(cacheDir+'/'+btoa(btoa(key))))
|
|
};
|
|
setTimeout(function() {
|
|
me.reannounce(opt.announceAddr, { action: 'start' });
|
|
},100);
|
|
return obj;
|
|
} catch(e) { console.error(e); return {}; } })()
|
|
})
|
|
},
|
|
async reannounce(addr, over) {
|
|
let key;
|
|
let entry = servers.get(key = JSON.stringify(addr));
|
|
if (!entry) return;
|
|
if (!entry.lastAnnounce) return;
|
|
console.log('announcing '+key);
|
|
let i = 0;
|
|
while (1) {
|
|
entry.bumpout.cancel();
|
|
try {
|
|
let formData = new FormData();
|
|
formData.append('json', JSON.stringify({
|
|
...entry.lastAnnounce,
|
|
...(over ?? {})
|
|
}));
|
|
let res = await fetch(targetURL, {
|
|
method: 'POST',
|
|
body: formData,
|
|
});
|
|
if(res.status < 200 || res.status > 299) {
|
|
let body = await res.text();
|
|
throw new Error('bad response '+res.status+': '+body);
|
|
}
|
|
entry.bumpout.restart();
|
|
console.log('announced '+key);
|
|
break;
|
|
} catch(e) {
|
|
console.error(e);
|
|
await new Promise((ok) => setTimeout(ok, 2000));
|
|
}
|
|
i++; if (i >= 5) {
|
|
throw new Error('Giving up on serverlist');
|
|
}
|
|
entry.bumpout.restart();
|
|
}
|
|
}
|
|
};
|
|
}
|
|
|
|
function minetestPort(opt) {
|
|
return {
|
|
type: 'udp',
|
|
timeout: 30000,
|
|
preread: minetest_preread,
|
|
filter: minetest_filter,
|
|
...opt,
|
|
}
|
|
}
|
|
|
|
function minetestDockerService(opt) {
|
|
opt = {
|
|
dockerCommand: 'docker', // if u use a different kind of docker like a podman maybe
|
|
announceAddr: opt.bindAddr,
|
|
reannounceTime: 305000,
|
|
...opt
|
|
};
|
|
let container = opt.container;
|
|
let docker = opt.dockerCommand;
|
|
let srv;
|
|
if (opt.serverlist) {
|
|
opt.serverlist.add(opt);
|
|
}
|
|
delete opt.container;
|
|
let obj = {
|
|
ports: [
|
|
minetestPort(opt)
|
|
],
|
|
powerOn: spawner(
|
|
`if ! ${docker} ps --format '{{.Names}}' | grep '^${container}$'; then `+
|
|
`ct=$(date +%s); `+
|
|
`path=/tmp/pid-$(head -c 8 /dev/urandom | xxd -p); `+
|
|
`${docker} start ${container} && `+
|
|
`(sh -c 'echo $PPID' > $path && `+
|
|
`${docker} logs --follow --since $ct ${container} 2>&1 </dev/null | `+
|
|
`grep --line-buffered 'Server for gameid="[^"]*" listening on') | `+
|
|
`(read line; echo $line; kill -TERM $(cat $path); rm $path); `+
|
|
`fi; :;`),
|
|
powerOff: (fun => async function () {
|
|
await fun();
|
|
try {
|
|
await opt.serverlist.reannounce(opt.announceAddr);
|
|
} catch {}
|
|
})(spawner(`${docker} stop ${container}`)),
|
|
};
|
|
for (let k of ['powerOn','powerOff']) {
|
|
if (k in opt) {
|
|
let oldf = obj[k];
|
|
let newf = opt[k];
|
|
obj[k] = function() {
|
|
return newf(oldf);
|
|
}
|
|
}
|
|
}
|
|
return obj;
|
|
}
|
|
|
|
// Minecraft
|
|
|
|
function MCProto(dechunk) {
|
|
this._dechunk = dechunk;
|
|
}
|
|
|
|
Object.assign(MCProto.prototype, {
|
|
async read_b(len, b) {
|
|
len = len ?? 1;
|
|
if (b) {
|
|
b._dechunk = b._dechunk ?? new AIDechunkinator((async function*(){
|
|
yield b;
|
|
})());
|
|
return b._dechunk.read(len);
|
|
}
|
|
let buf = await this._dechunk.read(len);
|
|
return buf;
|
|
},
|
|
async readVarInt(b) {
|
|
var Int = 0;
|
|
var power = 0;
|
|
var rr;
|
|
let xn = false;
|
|
while(1) {
|
|
rr=await this.read_b(null,b);
|
|
if (((rr-0) != (rr-0)) && !xn)
|
|
return;
|
|
xn = true;
|
|
Int+=(rr&(0x80-1))<<power;
|
|
if(!(rr>>7)) {
|
|
break;
|
|
}
|
|
power+=7;
|
|
if(power>32)
|
|
throw "varInt too big!";
|
|
}
|
|
return Int|0;
|
|
},
|
|
async readPacket(b) {
|
|
var len = await this.readVarInt(b);
|
|
if (len == null)
|
|
return;
|
|
if (len > 2097151) {
|
|
throw "packet too big!";
|
|
}
|
|
var pack = await this.read_b(len,b);
|
|
pack = [...pack];
|
|
try {
|
|
pack.type = await this.readVarInt(pack);
|
|
} catch(e) {
|
|
console.error("bad packet! "+JSON.stringify(pack));
|
|
throw e;
|
|
}
|
|
return pack;
|
|
},
|
|
async readString(b) {
|
|
var len = await this.readVarInt(b);
|
|
var text = await this.read_b(len,b);
|
|
text = (new TextDecoder())
|
|
.decode(new Uint8Array(text));
|
|
return text;
|
|
},
|
|
async readShort(b) {
|
|
var p = await this.read_b(2,b);
|
|
return (p[0]<<8)+p[1];
|
|
},
|
|
});
|
|
|
|
(function(){
|
|
function memcpy(from,to) {
|
|
for(var i=0; i<from.length; i++) {
|
|
to.push(from[i]);
|
|
}
|
|
}
|
|
Object.assign(MCProto, {
|
|
makeVarInt(Int) {
|
|
var rr;
|
|
var out=[];
|
|
while (1) {
|
|
rr = Int&(0x80-1);
|
|
Int>>=7;
|
|
if(Int>0) {
|
|
out.push(rr|0x80);
|
|
} else {
|
|
out.push(rr);
|
|
break;
|
|
}
|
|
}
|
|
return out;
|
|
},
|
|
makePacket(type,pack) {
|
|
var pack1 = [];
|
|
var ll=this.makeVarInt(type);
|
|
memcpy(this.makeVarInt(ll.length+pack.length),pack1)
|
|
memcpy(ll,pack1);
|
|
memcpy(pack,pack1);
|
|
return pack1;
|
|
},
|
|
makeString(desc) {
|
|
var pack = [];
|
|
var str=[];
|
|
memcpy((new TextEncoder())
|
|
.encode(desc),str);
|
|
memcpy(this.makeVarInt(str.length),pack);
|
|
memcpy(str,pack);
|
|
return pack;
|
|
},
|
|
makeNBTString(desc) {
|
|
var pack = [];
|
|
var str=[];
|
|
memcpy((new TextEncoder())
|
|
.encode(desc),str);
|
|
memcpy([8,(str.length>>8)&0xff,str.length&0xff],pack);
|
|
return pack;
|
|
},
|
|
});
|
|
})();
|
|
|
|
function minecraftPort(opt) {
|
|
opt = {
|
|
cacheDir: configDir+'/.mcs_cache',
|
|
...opt
|
|
};
|
|
let cacheDir = opt.cacheDir;
|
|
let cpath = cacheDir+'/'+btoa(btoa((d => ((typeof d) == "string") ? d : JSON.stringify(d))(
|
|
opt.serverId ?? opt.container ?? opt.bindAddress
|
|
)))
|
|
let sping;
|
|
try {
|
|
sping = JSON.parse(fs.readFileSync(cpath));
|
|
} catch {}
|
|
fs.mkdirSync(opt.cacheDir, {recursive: true});
|
|
async function preread(conn) {
|
|
if (conn.service._count > 0)
|
|
return true;
|
|
console.log('Minecraft Preread');
|
|
let proto = new MCProto(new AIDechunkinator((async function* () {
|
|
while (1) {
|
|
let msg = await conn.recv();
|
|
yield msg;
|
|
}
|
|
})()));
|
|
let pack = await proto.readPacket();
|
|
await proto.readVarInt(pack); // version
|
|
await proto.readString(pack); // hostname
|
|
await proto.readShort(pack); // port
|
|
let nstate = await proto.readVarInt(pack); // next state
|
|
if (nstate != 1) { // != status?
|
|
let run = true;
|
|
Promise.resolve(conn.service.powerOn())
|
|
.then(() => new Promise((ok) => {
|
|
setTimeout(ok,60000*5)
|
|
})) // +5 min
|
|
.then(() => conn.service.powerOff());
|
|
conn.cancel = () => run = false;
|
|
(async function() {
|
|
await new Promise((ok) => {
|
|
setTimeout(ok, 20000);
|
|
});
|
|
if (!run) {
|
|
console.log('Minecraft Self-Connect');
|
|
await pipeline(
|
|
async function*(){
|
|
yield Buffer.from([
|
|
...MCProto.makeVarInt(pack.length),
|
|
...pack
|
|
]);
|
|
yield Buffer.from(MCProto.makePacket(0,[]));
|
|
yield Buffer.from(MCProto.makePacket(1,[0,1,0,1,0,1,0,1]));
|
|
}, await connectTcp(...[
|
|
opt.bindAddr[0],
|
|
...(
|
|
(opt.bindAddr[1]??'::') in {'::':1,'0.0.0.0':1}
|
|
? ['127.0.0.1']
|
|
: opt.bindAddr.slice(1)
|
|
)]
|
|
),
|
|
async function *(source) {
|
|
for await (let chunk of source) {}
|
|
}
|
|
);
|
|
return;
|
|
}
|
|
conn.send(Buffer.from(MCProto.makePacket(0,MCProto.makeString(JSON.stringify({
|
|
text:`Server is taking a bit to wake up. Keep retrying to connect.`
|
|
})))));
|
|
})();
|
|
return true;
|
|
}
|
|
console.log('Minecraft Status');
|
|
while (1) {
|
|
pack = await proto.readPacket();
|
|
if(!pack) {
|
|
return false;
|
|
}
|
|
if (pack.type == 0) {
|
|
await conn.send(Buffer.from(MCProto.makePacket(0, [
|
|
...MCProto.makeString(JSON.stringify((obj => {
|
|
obj = {...obj};
|
|
if (obj.players) {
|
|
obj.players.online = 0;
|
|
delete obj.players.sample;
|
|
}
|
|
return obj;
|
|
})(sping ?? {
|
|
"version":{"name":"confiltermon","protocol": 65535},
|
|
"description":{"text": "confiltermon: Unfamiliar Server, join to know"},
|
|
"players":{"max":1000000},
|
|
})))
|
|
])));
|
|
} else if (pack.type == 1) {
|
|
let data = await proto.read_b(8,pack);
|
|
await conn.send(Buffer.from(MCProto.makePacket(0, [
|
|
...data
|
|
])));
|
|
return false;
|
|
} else {
|
|
console.log('bad packet');
|
|
return false;
|
|
}
|
|
}
|
|
}
|
|
async function *filter(inp, dir, conn) {
|
|
inp = new AIDechunkinator(inp);
|
|
let proto = new MCProto(inp);
|
|
conn.mc_state = 'handshake';
|
|
if (dir == 'client->server') {
|
|
while (1) {
|
|
let pack = await proto.readPacket();
|
|
if (conn.mc_state == 'handshake') {
|
|
await proto.readVarInt(pack); // version
|
|
await proto.readString(pack); // hostname
|
|
await proto.readShort(pack); // port
|
|
let nstate = await proto.readVarInt(pack); // next state
|
|
if (nstate == 1)
|
|
conn.mc_state = 'status';
|
|
else
|
|
conn.mc_state = 'done';
|
|
}
|
|
yield Buffer.from([...MCProto.makeVarInt(pack.length),...pack]);
|
|
if (conn.mc_state == 'done' || conn.mc_state == 'status')
|
|
break;
|
|
}
|
|
}
|
|
if (dir == 'server->client') {
|
|
while (1) {
|
|
let pack = await proto.readPacket();
|
|
if (conn.mc_state == 'status' && pack.type == 0) {
|
|
let data = await proto.readString(pack);
|
|
try {
|
|
sping = JSON.parse(data);
|
|
fs.writeFileSync(cpath, JSON.stringify(sping));
|
|
} catch {}
|
|
conn.mc_state = 'done';
|
|
}
|
|
yield Buffer.from([...MCProto.makeVarInt(pack.length),...pack]);
|
|
if (conn.mc_state == 'done')
|
|
break;
|
|
}
|
|
}
|
|
for await (let chunk of inp.chunks()) {
|
|
yield chunk;
|
|
}
|
|
}
|
|
return {
|
|
...opt,
|
|
type: 'tcp',
|
|
filter,
|
|
preread,
|
|
};
|
|
}
|
|
|
|
function minecraftPodmanCheckpointService(opt) {
|
|
opt = {
|
|
cacheDir: configDir+'/.mcs_cache',
|
|
...opt
|
|
};
|
|
let cacheDir = opt.cacheDir+'/checkpoints';
|
|
fs.mkdirSync(cacheDir, {recursive: true});
|
|
let cdsh = cacheDir.replace(/[^A-Za-z_.-]/g,c=>'\\'+c);
|
|
let container = opt.container;
|
|
let docker = opt.dockerCommand;
|
|
delete opt.container;
|
|
return {
|
|
ports: [
|
|
minecraftPort(opt)
|
|
],
|
|
powerOn: spawner(
|
|
`if ! podman ps --format '{{.Names}}' | grep '^${container}$'; then `+
|
|
`(podman container restore -i ${cdsh}/${container}.tar.zst --file-locks && `+
|
|
`rm ${cdsh}/${container}.tar.zst) || `+
|
|
`podman container start ${container}; `+
|
|
`fi; :;`
|
|
),
|
|
powerOff: spawner(
|
|
`if podman ps --format '{{.Names}}' | grep '^${container}$'; then `+
|
|
`podman container checkpoint --compress=zstd -e ${cdsh}/${container}.tar.zst --file-locks ${container} && `+
|
|
`podman container rm ${container}; `+
|
|
`fi; :;`
|
|
),
|
|
}
|
|
}
|
|
|
|
//## /EXTRAS ##//
|
|
|
|
async function loadConfig() {
|
|
fun = eval('(async function() { ' +
|
|
((''+fs.readFileSync(configPath))
|
|
.match(/^#![^\n]*(\n(\n?.\n?)*)|^((\n?.\n?)*)/)
|
|
.filter(e=>e!=null)[1]) +
|
|
'; })');
|
|
return await fun();
|
|
}
|
|
|
|
var sockWaitFor = async function(socket, ev) {
|
|
var ok, err;
|
|
await new Promise((a, b) => {
|
|
[ok, err] = [a, b];
|
|
socket.on('error', err);
|
|
socket.on(ev, ok);
|
|
});
|
|
socket.off('error', err);
|
|
socket.off(ev, ok);
|
|
}
|
|
|
|
// timeout but u can bump it so it fires later
|
|
var Bumpout = function(fun, time) {
|
|
this._callback = (...args) => {
|
|
this._handle = null;
|
|
return fun(...args);
|
|
}
|
|
this.timeout = time;
|
|
this.when = Infinity;
|
|
this.restart();
|
|
}
|
|
|
|
Bumpout.prototype.bump = function() {
|
|
if (!this._handle) return false;
|
|
return this.restart();
|
|
}
|
|
|
|
Bumpout.prototype.cancel = function() {
|
|
if (!this._handle) return false;
|
|
clearTimeout(this._handle);
|
|
this._handle = null;
|
|
return true;
|
|
}
|
|
|
|
Bumpout.prototype.restart = function(timeout) {
|
|
if (this._handle) this.cancel();
|
|
this.when = Date.now() + this.timeout;
|
|
this._handle = setTimeout(this._callback, timeout ?? this.timeout);
|
|
return true;
|
|
}
|
|
|
|
function DataGenerator() {
|
|
this._gcs = [];
|
|
this._cond = null;
|
|
}
|
|
|
|
Object.assign(DataGenerator.prototype, {
|
|
async *[Symbol.asyncIterator]() {
|
|
while (1) {
|
|
let gencond = this._gcs.shift();
|
|
if (gencond)
|
|
gencond();
|
|
yield await new Promise((ok) => {
|
|
this._cond = ok;
|
|
});
|
|
}
|
|
},
|
|
async push(msg) {
|
|
if (!this._cond) {
|
|
let gencond;
|
|
let genprom = new Promise((ok) => {
|
|
gencond = ok;
|
|
});
|
|
this._gcs.push(gencond);
|
|
await genprom;
|
|
}
|
|
this._cond(msg);
|
|
this._cond = null;
|
|
},
|
|
});
|
|
|
|
function AIDechunkinator(ai) {
|
|
ai = ai[Symbol.asyncIterator]();
|
|
this._ai = {
|
|
[Symbol.asyncIterator]() { return this; },
|
|
next() { return ai.next(); }
|
|
};
|
|
this._chunk = null;
|
|
this._idx = null;
|
|
}
|
|
|
|
Object.assign(AIDechunkinator.prototype, {
|
|
async *bytes() {
|
|
while (this._chunk) {
|
|
let idx = this._idx; this._idx++;
|
|
let v = this._chunk[idx];
|
|
if (this._idx >= this._chunk.length)
|
|
this._chunk = null;
|
|
yield v;
|
|
}
|
|
for await (let v of this._ai) {
|
|
this._chunk = v;
|
|
this._idx = 0;
|
|
while (this._chunk) {
|
|
let idx = this._idx; this._idx++;
|
|
let v = this._chunk[idx];
|
|
if (this._idx >= this._chunk.length)
|
|
this._chunk = null;
|
|
yield v;
|
|
}
|
|
}
|
|
},
|
|
async *chunks() {
|
|
if (this._chunk) {
|
|
let chunk = this._chunk;
|
|
this._chunk = null;
|
|
yield chunk.slice(this._idx);
|
|
}
|
|
for await (let v of this._ai) {
|
|
yield v;
|
|
}
|
|
},
|
|
async read(len, lax) {
|
|
let chunk = [];
|
|
if(len <= 0) return chunk;
|
|
for await (let v of this.bytes()) {
|
|
if (v == null)
|
|
return;
|
|
chunk.push(v);
|
|
if (chunk.length >= len)
|
|
break;
|
|
}
|
|
if((!lax) && chunk.len < len)
|
|
throw new Error("Truncated Read");
|
|
return chunk;
|
|
},
|
|
});
|
|
|
|
async function mapAI(ai, fn) {
|
|
for await (let obj of ai) {
|
|
await fn(obj);
|
|
}
|
|
}
|
|
|
|
function composeAI(gen, ai, ...arg) {
|
|
return gen(ai[Symbol.asyncIterator](), ...arg);
|
|
}
|
|
|
|
async function bindUdp(...arg) {
|
|
var server;
|
|
await (async fun => {
|
|
// this is stupid but im stupid too so its ok
|
|
try {
|
|
server = dgram.createSocket('udp6');
|
|
await fun();
|
|
} catch {
|
|
try {
|
|
server.close();
|
|
} catch {}
|
|
server = dgram.createSocket('udp4');
|
|
await fun();
|
|
}
|
|
})(async () => {
|
|
server.bind(...arg);
|
|
await sockWaitFor(server, 'listening');
|
|
});
|
|
return server;
|
|
}
|
|
|
|
async function connectUdp(...arg) {
|
|
var client;
|
|
await (async fun => {
|
|
// see ${_FILE}:${_LINE - 18}
|
|
try {
|
|
client = dgram.createSocket('udp6');
|
|
await fun();
|
|
} catch {
|
|
try {
|
|
client.close();
|
|
} catch {}
|
|
client = dgram.createSocket('udp4');
|
|
await fun();
|
|
}
|
|
})(async () => {
|
|
client.bind(0);
|
|
await sockWaitFor(client, 'listening');
|
|
client.connect(...arg);
|
|
await sockWaitFor(client, 'connect');
|
|
});
|
|
return client
|
|
}
|
|
|
|
async function bindTcp(...arg) {
|
|
var server = net.createServer();
|
|
server.listen(...arg);
|
|
await sockWaitFor(server, 'listening');
|
|
return server;
|
|
}
|
|
|
|
async function connectTcp(...arg) {
|
|
var client = new net.Socket();
|
|
client.connect(...arg);
|
|
await sockWaitFor(client, 'connect');
|
|
return client;
|
|
}
|
|
|
|
function connTemplate(conns, key, sendCl, timeout, filter) {
|
|
let conn = {};
|
|
let dgenUp = new DataGenerator();
|
|
let dgenCl = new DataGenerator();
|
|
mapAI(composeAI(filter, dgenUp, 'client->server', conn),
|
|
msg => conn.sendUp(msg));
|
|
mapAI(composeAI(filter, dgenCl, 'server->client', conn),
|
|
msg => conn.sendCl(msg));
|
|
return Object.assign(conn, {
|
|
dgenUp,
|
|
dgenCl,
|
|
queue: [],
|
|
permaque: [],
|
|
...(fun => ({ promReset: fun, ...fun() }))(() => ({
|
|
cond() {},
|
|
decond(err) {
|
|
this.prom = Promise.reject(err);
|
|
this.prom.catch(() => {});
|
|
},
|
|
prom: null,
|
|
})),
|
|
...(conns ? {
|
|
onkill() {
|
|
this.decond(new Error("timeout"));
|
|
this.killed = true;
|
|
conns.delete(key);
|
|
},
|
|
bumpout: new Bumpout(() => {
|
|
conn.onkill();
|
|
}, timeout),
|
|
} : {}),
|
|
async recv() {
|
|
while (!this.queue.length) {
|
|
if (this.prom)
|
|
await this.prom;
|
|
else {
|
|
this.prom = new Promise((ok, err) => {
|
|
this.cond = () => {
|
|
Object.assign(this, this.promReset());
|
|
ok();
|
|
}
|
|
this.decond = err;
|
|
});
|
|
this.prom.catch(() => {});
|
|
}
|
|
}
|
|
return this.queue.shift();
|
|
},
|
|
sendCl,
|
|
async sendUp(msg) {
|
|
this.permaque.push(msg);
|
|
},
|
|
async send(msg) {
|
|
this.queue.push(msg);
|
|
conn.dgenUp.push(msg);
|
|
if (this.bumpout)
|
|
this.bumpout.bump();
|
|
this.cond();
|
|
},
|
|
});
|
|
}
|
|
|
|
async function proxyUdp(address, upstreamAddr, service, timeout, preread, filter) {
|
|
var server = await bindUdp(...address);
|
|
|
|
var conns = new Map();
|
|
|
|
server.on('message', async (msg, rinfo) => {
|
|
let key = JSON.stringify([rinfo.address, rinfo.port]);
|
|
if (!conns.has(key)) {
|
|
conns.set(key, connTemplate(conns, key, function(msg) {
|
|
server.send(msg, rinfo.port, rinfo.address);
|
|
}, timeout, filter));
|
|
let conn = conns.get(key);
|
|
(async () => {
|
|
let filc;
|
|
let pass = await preread(filc = {
|
|
service,
|
|
async recv(len) { return conn.recv(); },
|
|
async send(msg) { return conn.sendCl(msg); }
|
|
});
|
|
if (!pass) {
|
|
conns.delete(key);
|
|
conn.bumpout.cancel();
|
|
return;
|
|
}
|
|
try {
|
|
await service.powerOn();
|
|
} finally {
|
|
if(filc.cancel) {
|
|
try {
|
|
filc.cancel()
|
|
} catch(e) {
|
|
console.error(e);
|
|
}
|
|
}
|
|
}
|
|
let upstream;
|
|
try {
|
|
upstream = await connectUdp(...upstreamAddr);
|
|
} catch(e) {
|
|
console.error(e);
|
|
conn.killed = true;
|
|
}
|
|
if (conn.killed) {
|
|
upstream.close();
|
|
await service.powerOff();
|
|
return;
|
|
}
|
|
upstream.on('message', (msg) => {
|
|
conn.bumpout.bump();
|
|
conn.dgenCl.push(msg);
|
|
});
|
|
conn.permaque.forEach(msg => upstream.send(msg));
|
|
for (let k of ['queue','permaque','cond','decond','recv']) {
|
|
delete conn[k];
|
|
}
|
|
Object.assign(conn, {
|
|
upstream,
|
|
onkill() {
|
|
this.upstream.close();
|
|
conns.delete(key);
|
|
service.powerOff().catch(e => console.error(e));
|
|
},
|
|
sendUp(msg) {
|
|
this.upstream.send(msg);
|
|
},
|
|
send(msg) {
|
|
this.bumpout.bump();
|
|
conn.dgenUp.push(msg);
|
|
},
|
|
});
|
|
})().catch(e => console.error(e));
|
|
}
|
|
let conn = conns.get(key);
|
|
await conn.send(msg);
|
|
});
|
|
}
|
|
|
|
async function proxyTcp(address, upstreamAddr, service, timeout, preread, filter, nodelay) {
|
|
var server = await bindTcp(...address);
|
|
|
|
server.on('connection', async (client) => {
|
|
client.setTimeout(timeout);
|
|
client.setNoDelay(nodelay);
|
|
let upstream = null;
|
|
client.on('timeout', () => {
|
|
client.destroy();
|
|
if (upstream)
|
|
upstream.destroy();
|
|
});
|
|
let conn = connTemplate(null, null, (msg) => new Promise((ok) => client.write(msg, ok)), null, filter);
|
|
let msgh, clsh;
|
|
client.on('data', msgh = (msg) => {
|
|
conn.send(msg);
|
|
});
|
|
client.on('close', clsh = () => {
|
|
conn.decond(new Error("closed"));
|
|
});
|
|
let filc;
|
|
let pass = await preread(filc = {
|
|
service,
|
|
async recv(len) { return conn.recv(); },
|
|
async send(msg) { return conn.sendCl(msg); }
|
|
});
|
|
if (!pass) {
|
|
client.destroy();
|
|
return;
|
|
}
|
|
try {
|
|
await service.powerOn();
|
|
} finally {
|
|
if(filc.cancel) {
|
|
try {
|
|
filc.cancel()
|
|
} catch(e) {
|
|
console.error(e);
|
|
}
|
|
}
|
|
}
|
|
upstream;
|
|
try {
|
|
upstream = await connectTcp(...upstreamAddr);
|
|
upstream.setNoDelay(nodelay);
|
|
client.off('data',msgh);
|
|
client.pause();
|
|
conn.permaque.forEach(msg => upstream.write(msg));
|
|
upstream.setTimeout(timeout);
|
|
upstream.on('timeout', () => {
|
|
upstream.destroy();
|
|
client.destroy();
|
|
});
|
|
} catch(e) {
|
|
console.error(e);
|
|
client.destroy();
|
|
await service.powerOff();
|
|
return;
|
|
}
|
|
client.resume();
|
|
function trans(gender) {
|
|
let dgen = ({
|
|
'client->server': conn.dgenUp,
|
|
'server->client': conn.dgenCl,
|
|
})[gender];
|
|
let trans = new Transform({
|
|
async transform(msg, enc, next) {
|
|
await dgen.push(msg);
|
|
next();
|
|
}
|
|
});
|
|
({
|
|
'client->server': fun => conn.sendUp = fun,
|
|
'server->client': fun => conn.sendCl = fun,
|
|
})[gender](msg => trans.push(msg));
|
|
return trans;
|
|
}
|
|
try {
|
|
await Promise.all([
|
|
pipeline(client, trans('client->server'), upstream),
|
|
pipeline(upstream, trans('server->client'), client)
|
|
]);
|
|
} finally {
|
|
await service.powerOff();
|
|
}
|
|
});
|
|
}
|
|
|
|
function MultiplexedService(service) {
|
|
this._service = service;
|
|
this._count = 0;
|
|
this._prom = null;
|
|
}
|
|
|
|
Object.assign(MultiplexedService.prototype, {
|
|
async powerOn() {
|
|
console.log('... powerOn', this.name);
|
|
while (this._prom)
|
|
await this._prom;
|
|
|
|
if ((this._count) == 0) {
|
|
await (this._prom = (this._service.powerOn()
|
|
.then(() => {this._count++})
|
|
.then(() => {this._prom = null})));
|
|
} else {
|
|
this._count++;
|
|
}
|
|
console.log('[X] powerOn', this.name, this._count);
|
|
},
|
|
async powerOff() {
|
|
console.log('... powerOff',this.name);
|
|
while (this._prom)
|
|
await this._prom;
|
|
|
|
if (this._count == 0) {
|
|
throw new Error("Too many powerOffs");
|
|
}
|
|
if ((--this._count) == 0) {
|
|
await (this._prom = (this._service.powerOff()
|
|
.then(() => {this._prom = null})));
|
|
}
|
|
console.log('[X] powerOff',this.name, this._count);
|
|
},
|
|
});
|
|
|
|
function StagedService(service) {
|
|
this._service = service;
|
|
this._stack = [];
|
|
this._prom = null;
|
|
this._timeout = null;
|
|
}
|
|
|
|
Object.assign(StagedService.prototype, {
|
|
async powerOn() {
|
|
if (this._prom)
|
|
await this._prom;
|
|
|
|
await (this._prom = (async () => {
|
|
if (this._timeout) {
|
|
clearTimeout(this._timeout);
|
|
}
|
|
while (this._stack.length > 0) {
|
|
await this._stack.pop()();
|
|
}
|
|
})());
|
|
this._prom = null;
|
|
},
|
|
setOffPower() {
|
|
let level = this._service.stages[this._stack.length]
|
|
|
|
if(this._timeout || !level)
|
|
return;
|
|
|
|
this._timeout = setTimeout(async () => {
|
|
await (this._prom = level.powerOff());
|
|
this._prom = null;
|
|
this._stack.push(() => level.powerOn());
|
|
this._timeout = null;
|
|
setOffPower();
|
|
}, level.timeout);
|
|
},
|
|
async powerOff() {
|
|
if (this._prom)
|
|
await this._prom;
|
|
|
|
setOffPower();
|
|
},
|
|
});
|
|
|
|
function errHandler(err) {
|
|
console.error('ERROR!', err);
|
|
}
|
|
|
|
process.on('uncaughtException', errHandler);
|
|
process.on('unhandledRejection', errHandler);
|
|
|
|
var myServices = Object.create(null);
|
|
|
|
(async () => {
|
|
if (process.argv[2] != null) {
|
|
configPath = fs.realpathSync(process.argv[2]);
|
|
configDir = configPath.match(/(.*)\//)[1];
|
|
}
|
|
var config = await loadConfig();
|
|
|
|
for (let [name, service] of Object.entries(config.services)) {
|
|
let xservice = new MultiplexedService({
|
|
...{
|
|
powerOn() {},
|
|
powerOff() {},
|
|
},
|
|
...service
|
|
});
|
|
myServices[name] = xservice;
|
|
xservice.name = name;
|
|
for (let port of service.ports) {
|
|
await ({
|
|
tcp: proxyTcp,
|
|
udp: proxyUdp
|
|
})[port.type](
|
|
port.bindAddr,
|
|
port.upstreamAddr,
|
|
xservice,
|
|
port.timeout ?? 30000,
|
|
port.preread ?? (() => true),
|
|
port.filter ?? (function (inp, dir, conn) {
|
|
if (dir == "client->server" && this.filterUpstream) {
|
|
return this.filterUpstream(inp, conn);
|
|
}
|
|
if (dir == "server->client" && this.filterClient) {
|
|
return this.filterClient(inp, conn);
|
|
}
|
|
return inp;
|
|
}),
|
|
port.nodelay ?? true,
|
|
);
|
|
}
|
|
}
|
|
})();
|