diff --git a/apps/node-vless/src/main.ts b/apps/node-vless/src/main.ts index 4e9b18e..46cc3a5 100644 --- a/apps/node-vless/src/main.ts +++ b/apps/node-vless/src/main.ts @@ -6,6 +6,8 @@ import * as uuid from 'uuid'; import * as lodash from 'lodash'; import { createReadStream } from 'node:fs'; import { setDefaultResultOrder } from 'node:dns'; +import { createSocket, Socket as UDPSocket } from 'node:dgram'; + import { makeReadableWebSocketStream, processVlessHeader, @@ -14,7 +16,11 @@ import { } from 'vless-js'; import { connect, Socket } from 'node:net'; import { Duplex, Readable } from 'stream'; - +import { + TransformStream, + ReadableStream, + WritableStream, +} from 'node:stream/web'; const port = process.env.PORT; const userID = process.env.UUID || ''; //'ipv4first' or 'verbatim' @@ -79,7 +85,8 @@ vlessWServer.on('connection', async function connection(ws) { const log = (info: string, event?: any) => { console.log(`[${address}:${portWithRandomLog}] ${info}`, event || ''); }; - let remoteConnection: Socket = null; + let remoteConnection: Duplex = null; + let udpClientStream: TransformStream = null; let remoteConnectionReadyResolve: Function; const readableWebSocketStream = makeReadableWebSocketStream(ws, log); @@ -90,6 +97,13 @@ vlessWServer.on('connection', async function connection(ws) { .pipeTo( new WritableStream({ async write(chunk: Buffer, controller) { + if (udpClientStream) { + const writer = udpClientStream.writable.getWriter(); + // nodejs buffer will fill some byte, need offset is + writer.write(chunk.buffer.slice(chunk.byteOffset)); + writer.releaseLock(); + return; + } if (remoteConnection) { await socketAsyncWrite(remoteConnection, chunk); // remoteConnection.write(chunk); @@ -103,23 +117,36 @@ vlessWServer.on('connection', async function connection(ws) { addressRemote, rawDataIndex, vlessVersion, + isUDP, } = processVlessHeader(vlessBuffer, userID, uuid, lodash); address = addressRemote || ''; - portWithRandomLog = `${portRemote}--${Math.random()}`; + portWithRandomLog = `${portRemote}--${Math.random()} ${ + isUDP ? 'udp ' : 'tcp ' + } `; if (hasError) { controller.error(`[${address}:${portWithRandomLog}] ${message} `); } // const addressType = requestAddr >> 42 // const addressLength = requestAddr & 0x0f; console.log(`[${address}:${portWithRandomLog}] connecting`); - remoteConnection = await connect2Remote(portRemote, address, log); vlessResponseHeader = new Uint8Array([vlessVersion![0], 0]); - const rawClientData = vlessBuffer.slice(rawDataIndex!); - remoteConnection.write(new Uint8Array(rawClientData)); - remoteConnectionReadyResolve(remoteConnection); + if (isUDP) { + udpClientStream = makeUDPSocketStream(portRemote, address); + const writer = udpClientStream.writable.getWriter(); + writer.write(rawClientData); + writer.releaseLock(); + remoteConnectionReadyResolve(udpClientStream); + } else { + remoteConnection = await connect2Remote(portRemote, address, log); + remoteConnection.write(new Uint8Array(rawClientData)); + remoteConnectionReadyResolve(remoteConnection); + } }, close() { + // if (udpClientStream) { + // udpClientStream.writable.close(); + // } console.log( `[${address}:${portWithRandomLog}] readableWebSocketStream is close` ); @@ -145,9 +172,12 @@ vlessWServer.on('connection', async function connection(ws) { await new Promise((resolve) => (remoteConnectionReadyResolve = resolve)); // remote --> ws - let remoteChunkCount = 0; - let totoal = 0; - await Readable.toWeb(remoteConnection).pipeTo( + let responseStream = udpClientStream?.readable; + if (remoteConnection) { + responseStream = Readable.toWeb(remoteConnection); + } + + await responseStream.pipeTo( new WritableStream({ start() { if (ws.readyState === ws.OPEN) { @@ -155,7 +185,10 @@ vlessWServer.on('connection', async function connection(ws) { } }, async write(chunk: Uint8Array, controller) { - await wsAsyncWrite(ws, chunk); + // console.log('ws write', chunk); + if (ws.readyState === ws.OPEN) { + await wsAsyncWrite(ws, chunk); + } }, close() { console.log( @@ -188,9 +221,15 @@ server.on('upgrade', function upgrade(request, socket, head) { }); }); -server.listen(port, () => { - console.log(`server listen in http://127.0.0.1:${port}`); -}); +server.listen( + { + port: port, + host: '0.0.0.0', + }, + () => { + console.log(`server listen in http://127.0.0.1:${port}`); + } +); async function connect2Remote(port, host, log: Function): Promise { return new Promise((resole, reject) => { @@ -211,7 +250,7 @@ async function connect2Remote(port, host, log: Function): Promise { }); } -async function socketAsyncWrite(ws: Socket, chunk: Buffer) { +async function socketAsyncWrite(ws: Duplex, chunk: Buffer) { return new Promise((resolve, reject) => { ws.write(chunk, (error) => { if (error) { @@ -234,3 +273,52 @@ async function wsAsyncWrite(ws: WebSocket, chunk: Uint8Array) { }); }); } + +function makeUDPSocketStream(portRemote, address) { + const udpClient = createSocket('udp4'); + const transformStream = new TransformStream({ + start(controller) { + /* … */ + udpClient.on('message', (message, info) => { + controller.enqueue( + Buffer.concat([new Uint8Array([0, info.size]), message]) + ); + }); + udpClient.on('error', (error) => { + controller.error(error); + }); + }, + + transform(chunk: ArrayBuffer, controller) { + // seems v2ray will use same web socket for dns query.. + // And v2ray will combine A record and AAAA record into one ws message and use 2 btye for dns query length + for (let index = 0; index < chunk.byteLength; ) { + const lengthBuffer = chunk.slice(index, index + 2); + const udpPakcetLength = new DataView(lengthBuffer).getInt16(0); + const udpData = new Uint8Array( + chunk.slice(index + 2, index + 2 + udpPakcetLength) + ); + index = index + 2 + udpPakcetLength; + + udpClient.send(udpData, portRemote, address, (err) => { + if (err) { + console.log(err); + controller.error('Failed to send UDP packet !!'); + udpClient.close(); + } + }); + index = index; + } + + // console.log('dns chunk', chunk); + // console.log(portRemote, address); + // port is big-Endian in raw data etc 80 == 0x005d + }, + + flush(controller) { + udpClient.close(); + controller.terminate(); + }, + }); + return transformStream; +} diff --git a/libs/vless-js/src/lib/vless-js.ts b/libs/vless-js/src/lib/vless-js.ts index 205ad01..21e1c3a 100644 --- a/libs/vless-js/src/lib/vless-js.ts +++ b/libs/vless-js/src/lib/vless-js.ts @@ -53,9 +53,15 @@ export async function processWebSocket({ addressRemote, rawDataIndex, vlessVersion, + isUDP, } = processVlessHeader(vlessBuffer, userID, uuid, lodash); address = addressRemote || ''; portWithRandomLog = `${portRemote}--${Math.random()}`; + if (isUDP) { + controller.error( + `[${address}:${portWithRandomLog}] command udp is not support ` + ); + } if (hasError) { controller.error(`[${address}:${portWithRandomLog}] ${message} `); } @@ -238,6 +244,7 @@ export function processVlessHeader( } const version = new Uint8Array(vlessBuffer.slice(0, 1)); let isValidUser = false; + let isUDP = false; if (uuidLib.stringify(new Uint8Array(vlessBuffer.slice(1, 17))) === userID) { isValidUser = true; } @@ -256,14 +263,14 @@ export function processVlessHeader( const command = new Uint8Array( vlessBuffer.slice(18 + optLength, 18 + optLength + 1) )[0]; + // 0x01 TCP // 0x02 UDP // 0x03 MUX if (command === 1) { + } else if (command === 2) { + isUDP = true; } else { - // controller.error( - // `command ${command} is not support, command 01-tcp,02-udp,03-mux` - // ); return { hasError: true, message: `command ${command} is not support, command 01-tcp,02-udp,03-mux`, @@ -343,5 +350,6 @@ export function processVlessHeader( portRemote, rawDataIndex: addressValueIndex + addressLength, vlessVersion: version, + isUDP, }; }