commit 16353ec68f6eb3f2f9bbdea1be026668812b44c7 Author: Kimapr Date: Sat Jun 21 05:24:11 2025 +0500 break inexistence of this diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..d02e481 --- /dev/null +++ b/.gitignore @@ -0,0 +1 @@ +/config.json't diff --git a/index.js b/index.js new file mode 100644 index 0000000..81347ea --- /dev/null +++ b/index.js @@ -0,0 +1,344 @@ +var dgram = require('node:dgram'); +var net = require('node:net'); +var fs = require('node:fs'); +var { pipeline } = require('node:stream'); + +async function loadConfig() { + let path = __dirname + "/config.json't"; + fun = eval('(async function(path) { ' + + fs.readFileSync(path) + + '; })'); + return await fun(path); +} + +var sockWaitFor = async function(socket, ev) { + var ok, err; + await new Promise((a, b) => { + [ok, err] = [a, b]; + socket.on('error', err); + socket.on(ev, ok); + }); + socket.off('error', err); + socket.off(ev, ok); +} + +// timeout but u can bump it so it fires later +var Bumpout = function(fun, time) { + this._callback = (...args) => { + this._handle = null; + return fun(...args); + } + this._timeout = time; + this._handle = setTimeout(this._callback, this._timeout) +} + +Bumpout.prototype.bump = function() { + if (!this._handle) throw new Error('Gone'); + clearTimeout(this._handle); + this._handle = setTimeout(this._callback, this._timeout); +} + +Bumpout.prototype.cancel = function() { + if (!this._handle) throw new Error('Gone'); + clearTimeout(this._handle); + this._handle = null; +} + +Bumpout.prototype.restart = function() { + if (this._handle) throw new Error('Non-Gone'); + this._handle = setTimeout(this._callback, this._timeout); +} + +async function bindUdp(...arg) { + var server; + await (async fun => { + // this is stupid but im stupid too so its ok + try { + server = dgram.createSocket('udp6'); + await fun(); + } catch { + server = dgram.createSocket('udp4'); + await fun(); + } + })(async () => { + server.bind(...arg); + await sockWaitFor(server, 'listening'); + }); + return server; +} + +async function connectUdp(...arg) { + var client; + await (async fun => { + // see ${_FILE}:${_LINE - 18} + try { + client = dgram.createSocket('udp6'); + await fun(); + } catch { + client = dgram.createSocket('udp4'); + await fun(); + } + })(async () => { + client.bind(0); + await sockWaitFor(client, 'listening'); + client.connect(...arg); + await sockWaitFor(client, 'connect'); + }); + return client +} + +async function bindTcp(...arg) { + var server = net.createServer(); + server.listen(...arg); + await sockWaitFor(server, 'listening'); + return server; +} + +async function connectTcp(...arg) { + var client = new net.Socket(); + client.connect(...arg); + await sockWaitFor(client, 'connect'); + return client; +} + +function connTemplate(conns, key, sendCl) { + let conn = {}; + return Object.assign(conn, { + queue: [], + permaque: [], + ...(fun => ({ promReset: fun, ...fun() }))(() => ({ + cond() {}, + decond(err) { + this.prom = Promise.reject(err); + this.prom.catch(() => {}); + }, + prom: null, + })), + ...(conns ? { + onkill() { + this.decond(new Error("timeout")); + conns.delete(key); + }, + bumpout: new Bumpout(() => { + conn.onkill(); + }, timeout), + } : {}), + async recv() { + while (!this.queue.length) { + if (this.prom) + await this.prom; + else { + this.prom = new Promise((ok, err) => { + this.cond = () => { + Object.assign(this, this.promReset()); + ok(); + } + this.decond = err; + }); + this.prom.catch(() => {}); + } + } + return this.queue.shift(); + }, + sendCl, + async send(msg) { + this.queue.push(msg); + this.permaque.push(msg); + if (this.bumpout) + this.bumpout.bump(); + this.cond(); + } + }); +} + +async function proxyUdp(address, upstreamAddr, service, timeout, filter) { + var server = await bindUdp(...address); + + var conns = new Map(); + + server.on('message', async (msg, rinfo) => { + let key = JSON.stringify([rinfo.address, rinfo.port]); + if (!conns.has(key)) { + conns.set(key, connTemplate(conns, key, function(msg) { + server.send(msg, rinfo.port, rinfo.address); + })); + let conn = conns.get(key); + (async () => { + let pass = await filter({ + async recv(len) { return conn.recv(); }, + async send(msg) { return conn.sendCl(msg); } + }); + if (!pass) { + conns.delete(key); + return; + } + await service.powerOn(); + let upstream = await connectUdp(...upstreamAddr); + upstream.on('message', (msg) => { + conn.bumpout.bump(); + conn.sendCl(msg); + }); + conn.permaque.forEach(msg => upstream.send(msg)); + for (let k of ['queue','permaque','cond','decond','recv']) { + delete conn[k]; + } + Object.assign(conn, { + upstream, + onkill() { + this.upstream.close(); + conns.delete(key); + service.powerOff().catch(e => console.error(e)); + }, + send(msg) { + this.bumpout.bump(); + this.upstream.send(msg); + } + }); + })().catch(e => console.error(e)); + } + let conn = conns.get(key); + await conn.send(msg); + }); +} + +async function proxyTcp(address, upstreamAddr, service, timeout, filter) { + var server = await bindTcp(...address); + + server.on('connection', async (client) => { + client.setTimeout(timeout); + client.on('timeout', () => { + client.destroy(); + }); + let conn = connTemplate(null, null, (msg) => client.write(msg)); + let msgh, clsh; + client.on('data', msgh = (msg) => { + conn.send(msg); + }); + client.on('close', clsh = () => { + conn.decond(new Error("closed")); + }) + let pass = await filter({ + async recv(len) { return conn.recv(); }, + async send(msg) { return conn.sendCl(msg); } + }); + if (!pass) { + client.close(); + return; + } + await service.powerOn(); + let upstream = await connectTcp(...upstreamAddr); + upstream.setTimeout(timeout); + upstream.on('timeout', () => { + upstream.destroy(); + }); + try { + await Promise.all( + pipeline(client, upstream), + pipeline(upstream, client) + ); + } finally { + await service.powerOff(); + } + }); +} + +function MultiplexedService(service) { + this._service = service; + this._count = 0; + this._prom = null; +} + +Object.assign(MultiplexedService.prototype, { + async powerOn() { + if (this._prom) + await this._prom; + + if ((this._count++) == 0) { + try { + await (this._prom = this._service.powerOn()); + } finally { + this._prom = null; + } + } + }, + async powerOff() { + if (this._prom) + await this._prom; + + if (this._count == 0) { + throw new Error("Too many powerOffs"); + } + if ((--this._count) == 0) { + try { + await (this._prom = this._service.powerOff()); + } finally { + this._prom = null; + } + } + }, +}); + +function StagedService(service) { + this._service = service; + this._stack = []; + this._prom = null; + this._timeout = null; +} + +Object.assign(StagedService.prototype, { + async powerOn() { + if (this._prom) + await this._prom; + + await (this._prom = (async () => { + if (this._timeout) { + clearTimeout(this._timeout); + } + while (this._stack.length > 0) { + await this._stack.pop()(); + } + })()); + }, + setOffPower() { + let level = this._service.stages[this._stack.length] + + if(this._timeout || !level) + return; + + this._timeout = setTimeout(async () => { + await (this._prom = level.powerOff()); + this._stack.push(() => level.powerOn()); + this._timeout = null; + setOffPower(); + }, level.timeout); + }, + async powerOff() { + if (this._prom) + await this._prom; + + setOffPower(); + }, +}); + +let myServices = Object.create(null); + +(async () => { + var config = await loadConfig(); + + for (let [name, service] of Object.entries(config.services)) { + let xservice = new MultiplexedService(service); + myservices[name] = xservice; + for (let port of service.ports) { + await ({ + tcp: proxyTcp, + udp: proxyUdp + })[port.type]( + port.bindAddr, + port.upstreamAddr, + xservice, + port.timeout ?? 30000, + port.filter ?? (() => true) + ); + } + } +})(); diff --git a/package-lock.json b/package-lock.json new file mode 100644 index 0000000..81baade --- /dev/null +++ b/package-lock.json @@ -0,0 +1,6 @@ +{ + "name": "xinetd", + "lockfileVersion": 3, + "requires": true, + "packages": {} +} diff --git a/package.json b/package.json new file mode 100644 index 0000000..d497771 --- /dev/null +++ b/package.json @@ -0,0 +1,10 @@ +{ + "name": "xinetd", + "version": "0.1.0", + "description": "like inetd but fancier like with more bugs and stuff", + "main": "index.js", + "scripts": { + "test": "echo \"Error: no test specified\" && exit 1" + }, + "author": "kimapr" +}