mirror of
https://github.com/lush2020/edgetunnel.git
synced 2026-03-24 17:18:25 +08:00
@@ -1 +1,6 @@
|
||||
export * from './lib/vless-js';
|
||||
export {
|
||||
delay,
|
||||
makeReadableWebSocketStream,
|
||||
closeWebSocket,
|
||||
processVlessHeader,
|
||||
} from './lib/vless-js';
|
||||
|
||||
@@ -1,3 +1,4 @@
|
||||
import { stringify } from 'uuid';
|
||||
export function vlessJs(): string {
|
||||
return 'vless-js';
|
||||
}
|
||||
@@ -7,180 +8,18 @@ export function delay(ms: number) {
|
||||
setTimeout(resolve, ms);
|
||||
});
|
||||
}
|
||||
export async function processWebSocket({
|
||||
userID,
|
||||
webSocket,
|
||||
rawTCPFactory,
|
||||
libs: { uuid, lodash },
|
||||
}: {
|
||||
userID: string;
|
||||
webSocket: WebSocket;
|
||||
rawTCPFactory: (port: number, hostname: string) => Promise<any>;
|
||||
libs: { uuid: any; lodash: any };
|
||||
}) {
|
||||
let address = '';
|
||||
let portWithRandomLog = '';
|
||||
let remoteConnection: {
|
||||
readable: any;
|
||||
writable: any;
|
||||
write: (arg0: Uint8Array) => any;
|
||||
close: () => void;
|
||||
} | null = null;
|
||||
let remoteConnectionReadyResolve: Function;
|
||||
try {
|
||||
const log = (info: string, event?: any) => {
|
||||
console.log(`[${address}:${portWithRandomLog}] ${info}`, event || '');
|
||||
};
|
||||
const readableWebSocketStream = makeReadableWebSocketStream(webSocket, log);
|
||||
let vlessResponseHeader: Uint8Array | null = null;
|
||||
|
||||
// ws --> remote
|
||||
readableWebSocketStream
|
||||
.pipeTo(
|
||||
new WritableStream({
|
||||
async write(chunk, controller) {
|
||||
const vlessBuffer = chunk;
|
||||
if (remoteConnection) {
|
||||
const number = await remoteConnection.write(
|
||||
new Uint8Array(vlessBuffer)
|
||||
);
|
||||
return;
|
||||
}
|
||||
const {
|
||||
hasError,
|
||||
message,
|
||||
portRemote,
|
||||
addressRemote,
|
||||
rawDataIndex,
|
||||
vlessVersion,
|
||||
isUDP,
|
||||
} = processVlessHeader(vlessBuffer, userID, uuid, lodash);
|
||||
address = addressRemote || '';
|
||||
portWithRandomLog = `${portRemote}--${Math.random()}`;
|
||||
if (isUDP) {
|
||||
controller.error(
|
||||
`[${address}:${portWithRandomLog}] command udp is not support `
|
||||
);
|
||||
}
|
||||
if (hasError) {
|
||||
controller.error(`[${address}:${portWithRandomLog}] ${message} `);
|
||||
}
|
||||
// const addressType = requestAddr >> 4;
|
||||
// const addressLength = requestAddr & 0x0f;
|
||||
console.log(`[${address}:${portWithRandomLog}] connecting`);
|
||||
remoteConnection = await rawTCPFactory(portRemote!, address!);
|
||||
vlessResponseHeader = new Uint8Array([vlessVersion![0], 0]);
|
||||
const rawClientData = vlessBuffer.slice(rawDataIndex!);
|
||||
await remoteConnection!.write(new Uint8Array(rawClientData));
|
||||
remoteConnectionReadyResolve(remoteConnection);
|
||||
},
|
||||
close() {
|
||||
console.log(
|
||||
`[${address}:${portWithRandomLog}] readableWebSocketStream is close`
|
||||
);
|
||||
},
|
||||
abort(reason) {
|
||||
console.log(
|
||||
`[${address}:${portWithRandomLog}] readableWebSocketStream is abort`,
|
||||
JSON.stringify(reason)
|
||||
);
|
||||
},
|
||||
})
|
||||
)
|
||||
.catch((error) => {
|
||||
console.error(
|
||||
`[${address}:${portWithRandomLog}] readableWebSocketStream pipeto has exception`,
|
||||
error.stack || error
|
||||
);
|
||||
// error is cancel readable stream anyway, no need close websocket in here
|
||||
// closeWebSocket(webSocket);
|
||||
// close remote conn
|
||||
// remoteConnection?.close();
|
||||
});
|
||||
await new Promise((resolve) => (remoteConnectionReadyResolve = resolve));
|
||||
let remoteChunkCount = 0;
|
||||
let totoal = 0;
|
||||
// remote --> ws
|
||||
await remoteConnection!.readable.pipeTo(
|
||||
new WritableStream({
|
||||
start() {
|
||||
if (webSocket.readyState === webSocket.OPEN) {
|
||||
webSocket.send(vlessResponseHeader!);
|
||||
}
|
||||
},
|
||||
async write(chunk: Uint8Array, controller) {
|
||||
function send2WebSocket() {
|
||||
if (webSocket.readyState !== webSocket.OPEN) {
|
||||
controller.error(
|
||||
`can't accept data from remoteConnection!.readable when client webSocket is close early`
|
||||
);
|
||||
return;
|
||||
}
|
||||
webSocket.send(chunk);
|
||||
}
|
||||
|
||||
remoteChunkCount++;
|
||||
//#region
|
||||
// console.log(
|
||||
// `${(totoal +=
|
||||
// chunk.length)}, count: ${remoteChunkCount.toString()}, ${
|
||||
// chunk.length
|
||||
// }`
|
||||
// );
|
||||
// https://github.com/zizifn/edgetunnel/issues/87, hack for this issue, maybe websocket sent too many small chunk,
|
||||
// casue v2ray client can't process https://github.com/denoland/deno/issues/17332
|
||||
// limit X number count / bandwith, due to deno can't read bufferedAmount in deno,
|
||||
// this is deno bug and this will not need in nodejs version
|
||||
//#endregion
|
||||
if (remoteChunkCount < 20) {
|
||||
send2WebSocket();
|
||||
} else if (remoteChunkCount < 120) {
|
||||
await delay(10); // 64kb * 100 = 6m/s
|
||||
send2WebSocket();
|
||||
} else if (remoteChunkCount < 500) {
|
||||
await delay(20); // (64kb * 1000/20) = 3m/s
|
||||
send2WebSocket();
|
||||
} else {
|
||||
await delay(50); // (64kb * 1000/50) /s
|
||||
send2WebSocket();
|
||||
}
|
||||
},
|
||||
close() {
|
||||
console.log(
|
||||
`[${address}:${portWithRandomLog}] remoteConnection!.readable is close`
|
||||
);
|
||||
},
|
||||
abort(reason) {
|
||||
closeWebSocket(webSocket);
|
||||
console.error(
|
||||
`[${address}:${portWithRandomLog}] remoteConnection!.readable abort`,
|
||||
reason
|
||||
);
|
||||
},
|
||||
})
|
||||
);
|
||||
} catch (error: any) {
|
||||
console.error(
|
||||
`[${address}:${portWithRandomLog}] processWebSocket has exception `,
|
||||
error.stack || error
|
||||
);
|
||||
closeWebSocket(webSocket);
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
||||
export function makeReadableWebSocketStream(
|
||||
ws: WebSocket | any,
|
||||
earlyDataHeader: string,
|
||||
log: Function
|
||||
) {
|
||||
let readableStreamCancel = false;
|
||||
return new ReadableStream<ArrayBuffer>({
|
||||
start(controller) {
|
||||
ws.addEventListener('message', async (e: { data: ArrayBuffer }) => {
|
||||
// console.log('MESSAGE');
|
||||
const vlessBuffer: ArrayBuffer = e.data;
|
||||
// console.log('MESSAGE', vlessBuffer);
|
||||
|
||||
// console.log(`message is ${vlessBuffer.byteLength}`);
|
||||
// this is not backpressure, but backpressure is depends on underying websocket can pasue
|
||||
// https://streams.spec.whatwg.org/#example-rs-push-backpressure
|
||||
@@ -194,7 +33,7 @@ export function makeReadableWebSocketStream(
|
||||
ws.addEventListener('close', () => {
|
||||
try {
|
||||
log('webSocket is close');
|
||||
// is stream is cancel, skill controller.close
|
||||
// is stream is cancel, skip controller.close
|
||||
if (readableStreamCancel) {
|
||||
return;
|
||||
}
|
||||
@@ -203,9 +42,23 @@ export function makeReadableWebSocketStream(
|
||||
log(`websocketStream can't close DUE to `, error);
|
||||
}
|
||||
});
|
||||
// header ws 0rtt
|
||||
const { earlyData, error } = base64ToArrayBuffer(earlyDataHeader);
|
||||
if (error) {
|
||||
log(`earlyDataHeader has invaild base64`);
|
||||
closeWebSocket(ws);
|
||||
return;
|
||||
}
|
||||
if (earlyData) {
|
||||
controller.enqueue(earlyData);
|
||||
}
|
||||
},
|
||||
pull(controller) {
|
||||
// if ws can stop read if stream is full, we can implement backpressure
|
||||
// https://streams.spec.whatwg.org/#example-rs-push-backpressure
|
||||
},
|
||||
pull(controller) {},
|
||||
cancel(reason) {
|
||||
// TODO: log can be remove, if writestream has error, write stream will has log
|
||||
log(`websocketStream is cancel DUE to `, reason);
|
||||
if (readableStreamCancel) {
|
||||
return;
|
||||
@@ -216,6 +69,21 @@ export function makeReadableWebSocketStream(
|
||||
});
|
||||
}
|
||||
|
||||
function base64ToArrayBuffer(base64Str: string) {
|
||||
if (!base64Str) {
|
||||
return { error: null };
|
||||
}
|
||||
try {
|
||||
// go use modified Base64 for URL rfc4648 which js atob not support
|
||||
base64Str = base64Str.replace(/-/g, '+').replace(/_/g, '/');
|
||||
const decode = atob(base64Str);
|
||||
const arryBuffer = Uint8Array.from(decode, (c) => c.charCodeAt(0));
|
||||
return { earlyData: arryBuffer.buffer, error: null };
|
||||
} catch (error) {
|
||||
return { error };
|
||||
}
|
||||
}
|
||||
|
||||
export function closeWebSocket(socket: WebSocket | any) {
|
||||
if (socket.readyState === socket.OPEN) {
|
||||
socket.close();
|
||||
@@ -223,16 +91,16 @@ export function closeWebSocket(socket: WebSocket | any) {
|
||||
}
|
||||
|
||||
//https://github.com/v2ray/v2ray-core/issues/2636
|
||||
// 1 字节 16 字节 1 字节 M 字节 1 字节 2 字节 1 字节 S 字节 X 字节
|
||||
// 协议版本 等价 UUID 附加信息长度 M 附加信息 ProtoBuf 指令 端口 地址类型 地址 请求数据
|
||||
|
||||
// 1 字节 16 字节 1 字节 M 字节 1 字节 2 字节 1 字节 S 字节 X 字节
|
||||
// 协议版本 等价 UUID 附加信息长度 M (附加信息 ProtoBuf) 指令(udp/tcp) 端口 地址类型 地址 请求数据
|
||||
// 00 00 01 01bb(443) 02(ip/host)
|
||||
// 1 字节 1 字节 N 字节 Y 字节
|
||||
// 协议版本,与请求的一致 附加信息长度 N 附加信息 ProtoBuf 响应数据
|
||||
export function processVlessHeader(
|
||||
vlessBuffer: ArrayBuffer,
|
||||
userID: string,
|
||||
uuidLib: any,
|
||||
lodash: any
|
||||
userID: string
|
||||
// uuidLib: any,
|
||||
// lodash: any
|
||||
) {
|
||||
if (vlessBuffer.byteLength < 24) {
|
||||
// console.log('invalid data');
|
||||
@@ -245,7 +113,7 @@ export function processVlessHeader(
|
||||
const version = new Uint8Array(vlessBuffer.slice(0, 1));
|
||||
let isValidUser = false;
|
||||
let isUDP = false;
|
||||
if (uuidLib.stringify(new Uint8Array(vlessBuffer.slice(1, 17))) === userID) {
|
||||
if (stringify(new Uint8Array(vlessBuffer.slice(1, 17))) === userID) {
|
||||
isValidUser = true;
|
||||
}
|
||||
if (!isValidUser) {
|
||||
@@ -311,26 +179,20 @@ export function processVlessHeader(
|
||||
break;
|
||||
case 3:
|
||||
addressLength = 16;
|
||||
const addressChunkBy2: number[][] = lodash.chunk(
|
||||
new Uint8Array(
|
||||
vlessBuffer.slice(
|
||||
addressValueIndex,
|
||||
addressValueIndex + addressLength
|
||||
)
|
||||
),
|
||||
2,
|
||||
null
|
||||
const dataView = new DataView(
|
||||
vlessBuffer.slice(addressValueIndex, addressValueIndex + addressLength)
|
||||
);
|
||||
// 2001:0db8:85a3:0000:0000:8a2e:0370:7334
|
||||
addressValue = addressChunkBy2
|
||||
.map((items) =>
|
||||
items.map((item) => item.toString(16).padStart(2, '0')).join('')
|
||||
)
|
||||
.join(':');
|
||||
if (addressValue) {
|
||||
addressValue = `[${addressValue}]`;
|
||||
const ipv6 = [];
|
||||
for (let i = 0; i < 8; i++) {
|
||||
ipv6.push(dataView.getUint16(i * 2).toString(16));
|
||||
}
|
||||
|
||||
addressValue = ipv6.join(':');
|
||||
// console.log('---------', addressValue)
|
||||
// seems no need add [] for ipv6
|
||||
// if (addressValue) {
|
||||
// addressValue = `[${addressValue}]`;
|
||||
// }
|
||||
break;
|
||||
default:
|
||||
console.log(`invild addressType is ${addressType}`);
|
||||
|
||||
Reference in New Issue
Block a user