confiltermon/index.js
2025-06-21 05:24:11 +05:00

345 lines
7.3 KiB
JavaScript

var dgram = require('node:dgram');
var net = require('node:net');
var fs = require('node:fs');
var { pipeline } = require('node:stream');
async function loadConfig() {
let path = __dirname + "/config.json't";
fun = eval('(async function(path) { ' +
fs.readFileSync(path) +
'; })');
return await fun(path);
}
var sockWaitFor = async function(socket, ev) {
var ok, err;
await new Promise((a, b) => {
[ok, err] = [a, b];
socket.on('error', err);
socket.on(ev, ok);
});
socket.off('error', err);
socket.off(ev, ok);
}
// timeout but u can bump it so it fires later
var Bumpout = function(fun, time) {
this._callback = (...args) => {
this._handle = null;
return fun(...args);
}
this._timeout = time;
this._handle = setTimeout(this._callback, this._timeout)
}
Bumpout.prototype.bump = function() {
if (!this._handle) throw new Error('Gone');
clearTimeout(this._handle);
this._handle = setTimeout(this._callback, this._timeout);
}
Bumpout.prototype.cancel = function() {
if (!this._handle) throw new Error('Gone');
clearTimeout(this._handle);
this._handle = null;
}
Bumpout.prototype.restart = function() {
if (this._handle) throw new Error('Non-Gone');
this._handle = setTimeout(this._callback, this._timeout);
}
async function bindUdp(...arg) {
var server;
await (async fun => {
// this is stupid but im stupid too so its ok
try {
server = dgram.createSocket('udp6');
await fun();
} catch {
server = dgram.createSocket('udp4');
await fun();
}
})(async () => {
server.bind(...arg);
await sockWaitFor(server, 'listening');
});
return server;
}
async function connectUdp(...arg) {
var client;
await (async fun => {
// see ${_FILE}:${_LINE - 18}
try {
client = dgram.createSocket('udp6');
await fun();
} catch {
client = dgram.createSocket('udp4');
await fun();
}
})(async () => {
client.bind(0);
await sockWaitFor(client, 'listening');
client.connect(...arg);
await sockWaitFor(client, 'connect');
});
return client
}
async function bindTcp(...arg) {
var server = net.createServer();
server.listen(...arg);
await sockWaitFor(server, 'listening');
return server;
}
async function connectTcp(...arg) {
var client = new net.Socket();
client.connect(...arg);
await sockWaitFor(client, 'connect');
return client;
}
function connTemplate(conns, key, sendCl) {
let conn = {};
return Object.assign(conn, {
queue: [],
permaque: [],
...(fun => ({ promReset: fun, ...fun() }))(() => ({
cond() {},
decond(err) {
this.prom = Promise.reject(err);
this.prom.catch(() => {});
},
prom: null,
})),
...(conns ? {
onkill() {
this.decond(new Error("timeout"));
conns.delete(key);
},
bumpout: new Bumpout(() => {
conn.onkill();
}, timeout),
} : {}),
async recv() {
while (!this.queue.length) {
if (this.prom)
await this.prom;
else {
this.prom = new Promise((ok, err) => {
this.cond = () => {
Object.assign(this, this.promReset());
ok();
}
this.decond = err;
});
this.prom.catch(() => {});
}
}
return this.queue.shift();
},
sendCl,
async send(msg) {
this.queue.push(msg);
this.permaque.push(msg);
if (this.bumpout)
this.bumpout.bump();
this.cond();
}
});
}
async function proxyUdp(address, upstreamAddr, service, timeout, filter) {
var server = await bindUdp(...address);
var conns = new Map();
server.on('message', async (msg, rinfo) => {
let key = JSON.stringify([rinfo.address, rinfo.port]);
if (!conns.has(key)) {
conns.set(key, connTemplate(conns, key, function(msg) {
server.send(msg, rinfo.port, rinfo.address);
}));
let conn = conns.get(key);
(async () => {
let pass = await filter({
async recv(len) { return conn.recv(); },
async send(msg) { return conn.sendCl(msg); }
});
if (!pass) {
conns.delete(key);
return;
}
await service.powerOn();
let upstream = await connectUdp(...upstreamAddr);
upstream.on('message', (msg) => {
conn.bumpout.bump();
conn.sendCl(msg);
});
conn.permaque.forEach(msg => upstream.send(msg));
for (let k of ['queue','permaque','cond','decond','recv']) {
delete conn[k];
}
Object.assign(conn, {
upstream,
onkill() {
this.upstream.close();
conns.delete(key);
service.powerOff().catch(e => console.error(e));
},
send(msg) {
this.bumpout.bump();
this.upstream.send(msg);
}
});
})().catch(e => console.error(e));
}
let conn = conns.get(key);
await conn.send(msg);
});
}
async function proxyTcp(address, upstreamAddr, service, timeout, filter) {
var server = await bindTcp(...address);
server.on('connection', async (client) => {
client.setTimeout(timeout);
client.on('timeout', () => {
client.destroy();
});
let conn = connTemplate(null, null, (msg) => client.write(msg));
let msgh, clsh;
client.on('data', msgh = (msg) => {
conn.send(msg);
});
client.on('close', clsh = () => {
conn.decond(new Error("closed"));
})
let pass = await filter({
async recv(len) { return conn.recv(); },
async send(msg) { return conn.sendCl(msg); }
});
if (!pass) {
client.close();
return;
}
await service.powerOn();
let upstream = await connectTcp(...upstreamAddr);
upstream.setTimeout(timeout);
upstream.on('timeout', () => {
upstream.destroy();
});
try {
await Promise.all(
pipeline(client, upstream),
pipeline(upstream, client)
);
} finally {
await service.powerOff();
}
});
}
function MultiplexedService(service) {
this._service = service;
this._count = 0;
this._prom = null;
}
Object.assign(MultiplexedService.prototype, {
async powerOn() {
if (this._prom)
await this._prom;
if ((this._count++) == 0) {
try {
await (this._prom = this._service.powerOn());
} finally {
this._prom = null;
}
}
},
async powerOff() {
if (this._prom)
await this._prom;
if (this._count == 0) {
throw new Error("Too many powerOffs");
}
if ((--this._count) == 0) {
try {
await (this._prom = this._service.powerOff());
} finally {
this._prom = null;
}
}
},
});
function StagedService(service) {
this._service = service;
this._stack = [];
this._prom = null;
this._timeout = null;
}
Object.assign(StagedService.prototype, {
async powerOn() {
if (this._prom)
await this._prom;
await (this._prom = (async () => {
if (this._timeout) {
clearTimeout(this._timeout);
}
while (this._stack.length > 0) {
await this._stack.pop()();
}
})());
},
setOffPower() {
let level = this._service.stages[this._stack.length]
if(this._timeout || !level)
return;
this._timeout = setTimeout(async () => {
await (this._prom = level.powerOff());
this._stack.push(() => level.powerOn());
this._timeout = null;
setOffPower();
}, level.timeout);
},
async powerOff() {
if (this._prom)
await this._prom;
setOffPower();
},
});
let myServices = Object.create(null);
(async () => {
var config = await loadConfig();
for (let [name, service] of Object.entries(config.services)) {
let xservice = new MultiplexedService(service);
myservices[name] = xservice;
for (let port of service.ports) {
await ({
tcp: proxyTcp,
udp: proxyUdp
})[port.type](
port.bindAddr,
port.upstreamAddr,
xservice,
port.timeout ?? 30000,
port.filter ?? (() => true)
);
}
}
})();