add udp support for node-vless (#101)

add udp support for node-vless
This commit is contained in:
zizifn
2023-02-27 00:25:27 +08:00
committed by GitHub
parent 6880d473e5
commit a2d5716358
2 changed files with 114 additions and 18 deletions

View File

@@ -6,6 +6,8 @@ import * as uuid from 'uuid';
import * as lodash from 'lodash';
import { createReadStream } from 'node:fs';
import { setDefaultResultOrder } from 'node:dns';
import { createSocket, Socket as UDPSocket } from 'node:dgram';
import {
makeReadableWebSocketStream,
processVlessHeader,
@@ -14,7 +16,11 @@ import {
} from 'vless-js';
import { connect, Socket } from 'node:net';
import { Duplex, Readable } from 'stream';
import {
TransformStream,
ReadableStream,
WritableStream,
} from 'node:stream/web';
const port = process.env.PORT;
const userID = process.env.UUID || '';
//'ipv4first' or 'verbatim'
@@ -79,7 +85,8 @@ vlessWServer.on('connection', async function connection(ws) {
const log = (info: string, event?: any) => {
console.log(`[${address}:${portWithRandomLog}] ${info}`, event || '');
};
let remoteConnection: Socket = null;
let remoteConnection: Duplex = null;
let udpClientStream: TransformStream = null;
let remoteConnectionReadyResolve: Function;
const readableWebSocketStream = makeReadableWebSocketStream(ws, log);
@@ -90,6 +97,13 @@ vlessWServer.on('connection', async function connection(ws) {
.pipeTo(
new WritableStream({
async write(chunk: Buffer, controller) {
if (udpClientStream) {
const writer = udpClientStream.writable.getWriter();
// nodejs buffer will fill some byte, need offset is
writer.write(chunk.buffer.slice(chunk.byteOffset));
writer.releaseLock();
return;
}
if (remoteConnection) {
await socketAsyncWrite(remoteConnection, chunk);
// remoteConnection.write(chunk);
@@ -103,23 +117,36 @@ vlessWServer.on('connection', async function connection(ws) {
addressRemote,
rawDataIndex,
vlessVersion,
isUDP,
} = processVlessHeader(vlessBuffer, userID, uuid, lodash);
address = addressRemote || '';
portWithRandomLog = `${portRemote}--${Math.random()}`;
portWithRandomLog = `${portRemote}--${Math.random()} ${
isUDP ? 'udp ' : 'tcp '
} `;
if (hasError) {
controller.error(`[${address}:${portWithRandomLog}] ${message} `);
}
// const addressType = requestAddr >> 42
// const addressLength = requestAddr & 0x0f;
console.log(`[${address}:${portWithRandomLog}] connecting`);
remoteConnection = await connect2Remote(portRemote, address, log);
vlessResponseHeader = new Uint8Array([vlessVersion![0], 0]);
const rawClientData = vlessBuffer.slice(rawDataIndex!);
if (isUDP) {
udpClientStream = makeUDPSocketStream(portRemote, address);
const writer = udpClientStream.writable.getWriter();
writer.write(rawClientData);
writer.releaseLock();
remoteConnectionReadyResolve(udpClientStream);
} else {
remoteConnection = await connect2Remote(portRemote, address, log);
remoteConnection.write(new Uint8Array(rawClientData));
remoteConnectionReadyResolve(remoteConnection);
}
},
close() {
// if (udpClientStream) {
// udpClientStream.writable.close();
// }
console.log(
`[${address}:${portWithRandomLog}] readableWebSocketStream is close`
);
@@ -145,9 +172,12 @@ vlessWServer.on('connection', async function connection(ws) {
await new Promise((resolve) => (remoteConnectionReadyResolve = resolve));
// remote --> ws
let remoteChunkCount = 0;
let totoal = 0;
await Readable.toWeb(remoteConnection).pipeTo(
let responseStream = udpClientStream?.readable;
if (remoteConnection) {
responseStream = Readable.toWeb(remoteConnection);
}
await responseStream.pipeTo(
new WritableStream({
start() {
if (ws.readyState === ws.OPEN) {
@@ -155,7 +185,10 @@ vlessWServer.on('connection', async function connection(ws) {
}
},
async write(chunk: Uint8Array, controller) {
// console.log('ws write', chunk);
if (ws.readyState === ws.OPEN) {
await wsAsyncWrite(ws, chunk);
}
},
close() {
console.log(
@@ -188,9 +221,15 @@ server.on('upgrade', function upgrade(request, socket, head) {
});
});
server.listen(port, () => {
server.listen(
{
port: port,
host: '0.0.0.0',
},
() => {
console.log(`server listen in http://127.0.0.1:${port}`);
});
}
);
async function connect2Remote(port, host, log: Function): Promise<Socket> {
return new Promise((resole, reject) => {
@@ -211,7 +250,7 @@ async function connect2Remote(port, host, log: Function): Promise<Socket> {
});
}
async function socketAsyncWrite(ws: Socket, chunk: Buffer) {
async function socketAsyncWrite(ws: Duplex, chunk: Buffer) {
return new Promise((resolve, reject) => {
ws.write(chunk, (error) => {
if (error) {
@@ -234,3 +273,52 @@ async function wsAsyncWrite(ws: WebSocket, chunk: Uint8Array) {
});
});
}
function makeUDPSocketStream(portRemote, address) {
const udpClient = createSocket('udp4');
const transformStream = new TransformStream({
start(controller) {
/* … */
udpClient.on('message', (message, info) => {
controller.enqueue(
Buffer.concat([new Uint8Array([0, info.size]), message])
);
});
udpClient.on('error', (error) => {
controller.error(error);
});
},
transform(chunk: ArrayBuffer, controller) {
// seems v2ray will use same web socket for dns query..
// And v2ray will combine A record and AAAA record into one ws message and use 2 btye for dns query length
for (let index = 0; index < chunk.byteLength; ) {
const lengthBuffer = chunk.slice(index, index + 2);
const udpPakcetLength = new DataView(lengthBuffer).getInt16(0);
const udpData = new Uint8Array(
chunk.slice(index + 2, index + 2 + udpPakcetLength)
);
index = index + 2 + udpPakcetLength;
udpClient.send(udpData, portRemote, address, (err) => {
if (err) {
console.log(err);
controller.error('Failed to send UDP packet !!');
udpClient.close();
}
});
index = index;
}
// console.log('dns chunk', chunk);
// console.log(portRemote, address);
// port is big-Endian in raw data etc 80 == 0x005d
},
flush(controller) {
udpClient.close();
controller.terminate();
},
});
return transformStream;
}

View File

@@ -53,9 +53,15 @@ export async function processWebSocket({
addressRemote,
rawDataIndex,
vlessVersion,
isUDP,
} = processVlessHeader(vlessBuffer, userID, uuid, lodash);
address = addressRemote || '';
portWithRandomLog = `${portRemote}--${Math.random()}`;
if (isUDP) {
controller.error(
`[${address}:${portWithRandomLog}] command udp is not support `
);
}
if (hasError) {
controller.error(`[${address}:${portWithRandomLog}] ${message} `);
}
@@ -238,6 +244,7 @@ export function processVlessHeader(
}
const version = new Uint8Array(vlessBuffer.slice(0, 1));
let isValidUser = false;
let isUDP = false;
if (uuidLib.stringify(new Uint8Array(vlessBuffer.slice(1, 17))) === userID) {
isValidUser = true;
}
@@ -256,14 +263,14 @@ export function processVlessHeader(
const command = new Uint8Array(
vlessBuffer.slice(18 + optLength, 18 + optLength + 1)
)[0];
// 0x01 TCP
// 0x02 UDP
// 0x03 MUX
if (command === 1) {
} else if (command === 2) {
isUDP = true;
} else {
// controller.error(
// `command ${command} is not support, command 01-tcp,02-udp,03-mux`
// );
return {
hasError: true,
message: `command ${command} is not support, command 01-tcp,02-udp,03-mux`,
@@ -343,5 +350,6 @@ export function processVlessHeader(
portRemote,
rawDataIndex: addressValueIndex + addressLength,
vlessVersion: version,
isUDP,
};
}