refactor VlESS (#91)

refactor vless code
This commit is contained in:
zizifn3
2023-01-13 21:30:41 +08:00
committed by GitHub
parent 1af04e6015
commit c8be100cfb
4 changed files with 249 additions and 204 deletions

4
.vscode/launch.json vendored
View File

@@ -8,13 +8,13 @@
"request": "launch",
"name": "Launch Program",
"type": "pwa-node",
"program": "C:\\github\\edgetunnel\\apps\\deno-vless\\src\\main.ts",
"program": "C:\\github\\test-deno\\apps\\deno-vless\\src\\main.ts",
"cwd": "${workspaceFolder}",
"runtimeExecutable": "C:\\Users\\zizifn\\scoop\\shims\\deno.EXE",
"runtimeArgs": ["run", "--inspect", "--allow-all", "--unstable"],
"attachSimplePort": 9229,
"env": {
"UUID": ""
"UUID": "e0a102c8-51c8-436c-860b-ef279d8a6ada"
}
}
]

View File

@@ -2,7 +2,7 @@ import { serve } from 'https://deno.land/std@0.170.0/http/server.ts';
import * as uuid from 'https://jspm.dev/uuid';
import * as lodash from 'https://jspm.dev/lodash-es';
import { serveClient } from './deno/client.ts';
import { processSocket } from '../../../libs/vless-js/src/lib/vless-js.ts';
import { processWebSocket } from '../../../libs/vless-js/src/lib/vless-js.ts';
const userID = Deno.env.get('UUID') || '';
let isVaildUser = uuid.validate(userID);
@@ -29,9 +29,12 @@ const handler = async (req: Request): Promise<Response> => {
const { socket, response } = Deno.upgradeWebSocket(req);
socket.addEventListener('open', () => {});
processSocket({
let test: Deno.TcpConn | null = null;
// test!.writable.abort();
//
processWebSocket({
userID,
socket,
webSocket: socket,
rawTCPFactory: (port: number, hostname: string) => {
return Deno.connect({
port,

View File

@@ -1 +1 @@
export { vlessJs, processSocket } from './lib/vless-js';
export { vlessJs, processWebSocket as processSocket } from './lib/vless-js';

View File

@@ -2,234 +2,276 @@ export function vlessJs(): string {
return 'vless-js';
}
export async function processSocket({
function delay(ms: number) {
return new Promise((resolve, rej) => {
setTimeout(resolve, ms);
});
}
export async function processWebSocket({
userID,
socket,
webSocket,
rawTCPFactory,
libs: { uuid, lodash },
}: {
userID: string;
socket: WebSocket;
webSocket: WebSocket;
rawTCPFactory: (port: number, hostname: string) => Promise<any>;
libs: { uuid: any; lodash: any };
}) {
let address = '';
let port = 0;
let remoteConnection: {
readable: any;
writable: any;
write: (arg0: Uint8Array) => any;
close: () => void;
} | null = null;
let remoteConnectionReadyResolve: Function;
try {
const websocketStream = new ReadableStream({
start(controller) {
socket.addEventListener('message', async (e) => {
const vlessBuffer: ArrayBuffer = e.data;
// console.log('request message ', vlessBuffer.byteLength);
controller.enqueue(vlessBuffer);
});
socket.addEventListener('error', (e) => {
console.log(`[${address}:${port}] socket has error`, e);
controller.error(e);
});
socket.addEventListener('close', () => {
try {
console.log(`[${address}:${port}] socket is close`);
controller.close();
} catch (error) {
console.log(`[${address}:${port}] websocketStream can't close`);
}
});
},
pull(controller) {},
cancel(reason) {
console.log(`[${address}:${port}] websocketStream is cancel`, reason);
socket.close();
},
});
let remoteConnection: {
readable: any;
write: (arg0: Uint8Array) => any;
close: () => void;
} | null = null;
const log = (info: string, event?: any) => {
console.log(`[${address}:${port}] ${info}`, event || '');
};
const readableWebSocketStream = makeReadableWebSocketStream(webSocket, log);
let vlessResponseHeader: Uint8Array | null = null;
await websocketStream.pipeTo(
new WritableStream({
async write(chunk, controller) {
const vlessBuffer = chunk;
if (remoteConnection) {
const number = await remoteConnection.write(
new Uint8Array(vlessBuffer)
);
return;
}
if (vlessBuffer.byteLength < 24) {
console.log('invalid data');
controller.error('invalid data');
return;
}
const version = new Uint8Array(vlessBuffer.slice(0, 1));
let isValidUser = false;
if (
uuid.stringify(new Uint8Array(vlessBuffer.slice(1, 17))) === userID
) {
isValidUser = true;
}
if (!isValidUser) {
console.log('in valid user');
controller.error('in valid user');
return;
}
const optLength = new Uint8Array(vlessBuffer.slice(17, 18))[0];
//skip opt for now
const command = new Uint8Array(
vlessBuffer.slice(18 + optLength, 18 + optLength + 1)
)[0];
// 0x01 TCP
// 0x02 UDP
// 0x03 MUX
if (command === 1) {
} else {
controller.error(
`command ${command} is not support, command 01-tcp,02-udp,03-mux`
);
return;
}
const portIndex = 18 + optLength + 1;
const portBuffer = vlessBuffer.slice(portIndex, portIndex + 2);
// port is big-Endian in raw data etc 80 == 0x005d
const portRemote = new DataView(portBuffer).getInt16(0);
port = portRemote;
let addressIndex = portIndex + 2;
const addressBuffer = new Uint8Array(
vlessBuffer.slice(addressIndex, addressIndex + 1)
);
// 1--> ipv4 addressLength =4
// 2--> domain name addressLength=addressBuffer[1]
// 3--> ipv6 addressLength =16
const addressType = addressBuffer[0];
let addressLength = 0;
let addressValueIndex = addressIndex + 1;
let addressValue = '';
switch (addressType) {
case 1:
addressLength = 4;
addressValue = new Uint8Array(
vlessBuffer.slice(
addressValueIndex,
addressValueIndex + addressLength
)
).join('.');
break;
case 2:
addressLength = new Uint8Array(
vlessBuffer.slice(addressValueIndex, addressValueIndex + 1)
)[0];
addressValueIndex += 1;
addressValue = new TextDecoder().decode(
vlessBuffer.slice(
addressValueIndex,
addressValueIndex + addressLength
)
// ws --> remote
readableWebSocketStream
.pipeTo(
new WritableStream({
async write(chunk, controller) {
const vlessBuffer = chunk;
if (remoteConnection) {
const number = await remoteConnection.write(
new Uint8Array(vlessBuffer)
);
break;
case 3:
addressLength = 16;
const addressChunkBy2: number[][] = lodash.chunk(
new Uint8Array(
return;
}
if (vlessBuffer.byteLength < 24) {
console.log('invalid data');
controller.error('invalid data');
return;
}
const version = new Uint8Array(vlessBuffer.slice(0, 1));
let isValidUser = false;
if (
uuid.stringify(new Uint8Array(vlessBuffer.slice(1, 17))) ===
userID
) {
isValidUser = true;
}
if (!isValidUser) {
console.log('in valid user');
controller.error('in valid user');
return;
}
const optLength = new Uint8Array(vlessBuffer.slice(17, 18))[0];
//skip opt for now
const command = new Uint8Array(
vlessBuffer.slice(18 + optLength, 18 + optLength + 1)
)[0];
// 0x01 TCP
// 0x02 UDP
// 0x03 MUX
if (command === 1) {
} else {
controller.error(
`command ${command} is not support, command 01-tcp,02-udp,03-mux`
);
return;
}
const portIndex = 18 + optLength + 1;
const portBuffer = vlessBuffer.slice(portIndex, portIndex + 2);
// port is big-Endian in raw data etc 80 == 0x005d
const portRemote = new DataView(portBuffer).getInt16(0);
port = portRemote;
let addressIndex = portIndex + 2;
const addressBuffer = new Uint8Array(
vlessBuffer.slice(addressIndex, addressIndex + 1)
);
// 1--> ipv4 addressLength =4
// 2--> domain name addressLength=addressBuffer[1]
// 3--> ipv6 addressLength =16
const addressType = addressBuffer[0];
let addressLength = 0;
let addressValueIndex = addressIndex + 1;
let addressValue = '';
switch (addressType) {
case 1:
addressLength = 4;
addressValue = new Uint8Array(
vlessBuffer.slice(
addressValueIndex,
addressValueIndex + addressLength
)
),
2,
null
).join('.');
break;
case 2:
addressLength = new Uint8Array(
vlessBuffer.slice(addressValueIndex, addressValueIndex + 1)
)[0];
addressValueIndex += 1;
addressValue = new TextDecoder().decode(
vlessBuffer.slice(
addressValueIndex,
addressValueIndex + addressLength
)
);
break;
case 3:
addressLength = 16;
const addressChunkBy2: number[][] = lodash.chunk(
new Uint8Array(
vlessBuffer.slice(
addressValueIndex,
addressValueIndex + addressLength
)
),
2,
null
);
// 2001:0db8:85a3:0000:0000:8a2e:0370:7334
addressValue = addressChunkBy2
.map((items) =>
items
.map((item) => item.toString(16).padStart(2, '0'))
.join('')
)
.join(':');
break;
default:
console.log(`[${address}:${port}] invild address`);
}
address = addressValue;
if (!addressValue) {
// console.log(`[${address}:${port}] addressValue is empty`);
controller.error(`[${address}:${port}] addressValue is empty`);
return;
}
// const addressType = requestAddr >> 4;
// const addressLength = requestAddr & 0x0f;
console.log(`[${addressValue}:${port}] connecting`);
remoteConnection = await rawTCPFactory(port, addressValue);
vlessResponseHeader = new Uint8Array([version[0], 0]);
const rawDataIndex = addressValueIndex + addressLength;
const rawClientData = vlessBuffer.slice(rawDataIndex);
await remoteConnection!.write(new Uint8Array(rawClientData));
remoteConnectionReadyResolve(remoteConnection);
},
})
)
.catch((error) => {
console.log(
`[${address}:${port}] readableWebSocketStream pipeto has exception`,
error.stack || error
);
closeWebSocket(webSocket);
// close remote conn
remoteConnection?.close();
});
await new Promise((resolve) => (remoteConnectionReadyResolve = resolve));
let remoteChunkCount = 0;
let totoal = 0;
// remote --> ws
await remoteConnection!.readable.pipeTo(
new WritableStream({
start() {
webSocket.send(vlessResponseHeader!);
},
async write(chunk: Uint8Array, controller) {
function send2WebSocket() {
if (webSocket.readyState !== webSocket.OPEN) {
controller.error(
`[${address}:${port}] abort when webSocket is close can't accept data from remoteConnection!.readable`
);
// 2001:0db8:85a3:0000:0000:8a2e:0370:7334
addressValue = addressChunkBy2
.map((items) =>
items
.map((item) => item.toString(16).padStart(2, '0'))
.join('')
)
.join(':');
break;
default:
console.log(`[${address}:${port}] invild address`);
return;
}
webSocket.send(chunk);
}
address = addressValue;
if (!addressValue) {
// console.log(`[${address}:${port}] addressValue is empty`);
controller.error(`[${address}:${port}] addressValue is empty`);
return;
remoteChunkCount++;
//#region
// console.log(
// `${(totoal +=
// chunk.length)}, count: ${remoteChunkCount.toString()}, ${
// chunk.length
// }`
// );
// https://github.com/zizifn/edgetunnel/issues/87, hack for this issue, maybe websocket sent too many small chunk,
// casue v2ray client can't process https://github.com/denoland/deno/issues/17332
// limit X number count / bandwith, due to deno can't read bufferedAmount in deno,
// this is deno bug and this will not need in nodejs version
//#endregion
if (remoteChunkCount < 20) {
send2WebSocket();
} else if (remoteChunkCount < 120) {
await delay(10); // 64kb * 100 = 6m/s
send2WebSocket();
} else if (remoteChunkCount < 500) {
await delay(20); // (64kb * 1000/20) = 3m/s
send2WebSocket();
} else {
await delay(50); // (64kb * 1000/50) /s
send2WebSocket();
}
// const addressType = requestAddr >> 4;
// const addressLength = requestAddr & 0x0f;
console.log(`[${addressValue}:${port}] connecting`);
remoteConnection = await rawTCPFactory(port, addressValue);
const rawDataIndex = addressValueIndex + addressLength;
const rawClientData = vlessBuffer.slice(rawDataIndex);
await remoteConnection!.write(new Uint8Array(rawClientData));
let chunkDatas = [new Uint8Array([version[0], 0])];
// let sizes = 0;
// get response from remoteConnection
remoteConnection!.readable
.pipeTo(
new WritableStream({
start() {
socket.send(new Blob(chunkDatas));
},
async write(chunk, controller) {
// ('' as any).toLowerCase1();
// sizes += chunk.length;
// console.log('response size--', chunk.length);
// console.log('totoal size--', sizes);
// https://github.com/zizifn/edgetunnel/issues/87, hack for this issue, maybe websocket sent too many small chunk,
// casue v2ray client can't process
await new Promise((res, rej) => {
setTimeout(res, 2);
});
socket.send(chunk);
},
close() {
console.error(
`[${address}:${port}] remoteConnection!.readable is close`
);
socket.close();
},
abort(reason) {
socket.close();
console.error(
`[${address}:${port}] remoteConnection!.readable abort`,
reason
);
},
})
)
.catch((error: any) => {
socket.close();
console.error(
`[${address}:${port}] remoteConnection.readable has error`,
error
);
});
},
close() {
console.log(`[${address}:${port}] websocketStream pipeto is close`);
console.log(
`[${address}:${port}] remoteConnection!.readable is close`
);
},
abort(reason) {
console.log(
`[${address}:${port}] websocketStream pipeto is abort `,
closeWebSocket(webSocket);
console.error(
`[${address}:${port}] remoteConnection!.readable abort`,
reason
);
remoteConnection?.close();
socket.close();
},
})
);
} catch (error: any) {
console.error(`[${address}:${port}] processSocket`, error);
socket.close();
console.error(
`[${address}:${port}] processWebSocket has esception `,
error.stack || error
);
closeWebSocket(webSocket);
}
return;
}
function makeReadableWebSocketStream(ws: WebSocket, log: Function) {
return new ReadableStream({
start(controller) {
ws.addEventListener('message', async (e) => {
const vlessBuffer: ArrayBuffer = e.data;
// console.log(`message is ${vlessBuffer.byteLength}`);
controller.enqueue(vlessBuffer);
});
ws.addEventListener('error', (e) => {
log('socket has error', e);
controller.error(e);
});
ws.addEventListener('close', () => {
try {
log('socket is close');
controller.close();
} catch (error) {
log(`websocketStream can't close`);
}
});
},
pull(controller) {},
cancel(reason) {
log(`websocketStream is cancel`, reason);
ws.close();
},
});
}
function closeWebSocket(socket: WebSocket) {
if (socket.readyState === socket.OPEN) {
socket.close();
}
}