update udp close

This commit is contained in:
zizifn
2023-03-07 02:12:43 +08:00
committed by zizifn
parent 0cbadf872a
commit 1d45c93e2c
4 changed files with 142 additions and 51 deletions

View File

@@ -107,7 +107,7 @@ vlessWServer.on('connection', async function connection(ws, request) {
const writer = udpClientStream.writable.getWriter(); const writer = udpClientStream.writable.getWriter();
// nodejs buffer to ArrayBuffer issue // nodejs buffer to ArrayBuffer issue
// https://nodejs.org/dist/latest-v18.x/docs/api/buffer.html#bufbuffer // https://nodejs.org/dist/latest-v18.x/docs/api/buffer.html#bufbuffer
writer.write( await writer.write(
chunk.buffer.slice( chunk.buffer.slice(
chunk.byteOffset, chunk.byteOffset,
chunk.byteOffset + chunk.length chunk.byteOffset + chunk.length
@@ -149,7 +149,7 @@ vlessWServer.on('connection', async function connection(ws, request) {
if (isUDP) { if (isUDP) {
udpClientStream = makeUDPSocketStream(portRemote, address); udpClientStream = makeUDPSocketStream(portRemote, address);
const writer = udpClientStream.writable.getWriter(); const writer = udpClientStream.writable.getWriter();
writer.write(rawClientData); writer.write(rawClientData).catch(error=>console.log)
writer.releaseLock(); writer.releaseLock();
remoteConnectionReadyResolve(udpClientStream); remoteConnectionReadyResolve(udpClientStream);
} else { } else {
@@ -301,11 +301,12 @@ function makeUDPSocketStream(portRemote, address) {
); );
}); });
udpClient.on('error', (error) => { udpClient.on('error', (error) => {
console.log('udpClient error event', error);
controller.error(error); controller.error(error);
}); });
}, },
transform(chunk: ArrayBuffer, controller) { async transform(chunk: ArrayBuffer, controller) {
//seems v2ray will use same web socket for dns query.. //seems v2ray will use same web socket for dns query..
//And v2ray will combine A record and AAAA record into one ws message and use 2 btye for dns query length //And v2ray will combine A record and AAAA record into one ws message and use 2 btye for dns query length
for (let index = 0; index < chunk.byteLength; ) { for (let index = 0; index < chunk.byteLength; ) {
@@ -315,13 +316,16 @@ function makeUDPSocketStream(portRemote, address) {
chunk.slice(index + 2, index + 2 + udpPakcetLength) chunk.slice(index + 2, index + 2 + udpPakcetLength)
); );
index = index + 2 + udpPakcetLength; index = index + 2 + udpPakcetLength;
await new Promise((resolve, reject)=>{
udpClient.send(udpData, portRemote, address, (err) => { udpClient.send(udpData, portRemote, address, (err) => {
if (err) { if (err) {
console.log(err); console.log('udps send error', err);
controller.error('Failed to send UDP packet !!'); controller.error(`Failed to send UDP packet !! ${err}`);
safeCloseUDP(udpClient); safeCloseUDP(udpClient);
} }
resolve(true)
}); });
})
index = index; index = index;
} }

View File

@@ -6128,7 +6128,7 @@ vlessWServer.on('connection', function connection(ws, request) {
const writer = udpClientStream.writable.getWriter(); const writer = udpClientStream.writable.getWriter();
// nodejs buffer to ArrayBuffer issue // nodejs buffer to ArrayBuffer issue
// https://nodejs.org/dist/latest-v18.x/docs/api/buffer.html#bufbuffer // https://nodejs.org/dist/latest-v18.x/docs/api/buffer.html#bufbuffer
writer.write(chunk.buffer.slice(chunk.byteOffset, chunk.byteOffset + chunk.length)); yield writer.write(chunk.buffer.slice(chunk.byteOffset, chunk.byteOffset + chunk.length));
writer.releaseLock(); writer.releaseLock();
return; return;
} }
@@ -6152,7 +6152,7 @@ vlessWServer.on('connection', function connection(ws, request) {
if (isUDP) { if (isUDP) {
udpClientStream = makeUDPSocketStream(portRemote, address); udpClientStream = makeUDPSocketStream(portRemote, address);
const writer = udpClientStream.writable.getWriter(); const writer = udpClientStream.writable.getWriter();
writer.write(rawClientData); writer.write(rawClientData).catch(error => console.log);
writer.releaseLock(); writer.releaseLock();
remoteConnectionReadyResolve(udpClientStream); remoteConnectionReadyResolve(udpClientStream);
} }
@@ -6282,10 +6282,12 @@ function makeUDPSocketStream(portRemote, address) {
controller.enqueue(Buffer.concat([new Uint8Array([0, info.size]), message])); controller.enqueue(Buffer.concat([new Uint8Array([0, info.size]), message]));
}); });
udpClient.on('error', (error) => { udpClient.on('error', (error) => {
console.log('udpClient error event', error);
controller.error(error); controller.error(error);
}); });
}, },
transform(chunk, controller) { transform(chunk, controller) {
return tslib_1.__awaiter(this, void 0, void 0, function* () {
//seems v2ray will use same web socket for dns query.. //seems v2ray will use same web socket for dns query..
//And v2ray will combine A record and AAAA record into one ws message and use 2 btye for dns query length //And v2ray will combine A record and AAAA record into one ws message and use 2 btye for dns query length
for (let index = 0; index < chunk.byteLength;) { for (let index = 0; index < chunk.byteLength;) {
@@ -6293,18 +6295,22 @@ function makeUDPSocketStream(portRemote, address) {
const udpPakcetLength = new DataView(lengthBuffer).getInt16(0); const udpPakcetLength = new DataView(lengthBuffer).getInt16(0);
const udpData = new Uint8Array(chunk.slice(index + 2, index + 2 + udpPakcetLength)); const udpData = new Uint8Array(chunk.slice(index + 2, index + 2 + udpPakcetLength));
index = index + 2 + udpPakcetLength; index = index + 2 + udpPakcetLength;
yield new Promise((resolve, reject) => {
udpClient.send(udpData, portRemote, address, (err) => { udpClient.send(udpData, portRemote, address, (err) => {
if (err) { if (err) {
console.log(err); console.log('udps send error', err);
controller.error('Failed to send UDP packet !!'); controller.error(`Failed to send UDP packet !! ${err}`);
safeCloseUDP(udpClient); safeCloseUDP(udpClient);
} }
resolve(true);
});
}); });
index = index; index = index;
} }
// console.log('dns chunk', chunk); // console.log('dns chunk', chunk);
// console.log(portRemote, address); // console.log(portRemote, address);
// port is big-Endian in raw data etc 80 == 0x005d // port is big-Endian in raw data etc 80 == 0x005d
});
}, },
flush(controller) { flush(controller) {
safeCloseUDP(udpClient); safeCloseUDP(udpClient);

File diff suppressed because one or more lines are too long

109
test.mjs
View File

@@ -9,10 +9,14 @@ try {
let i = 0; let i = 0;
const readableStream = new ReadableStream({ const readableStream = new ReadableStream({
start(control) { start(control) {
// throw 'pipeTo error';
control.enqueue(undefined); control.enqueue(undefined);
control.enqueue(1); control.enqueue(1);
setTimeout(()=>{
control.close()
}, 500)
// control.close(); // control.close();
control.error('eroro000-----readableStream--------'); // control.error('eroro000-----readableStream--------');
// setTimeout(() => { // setTimeout(() => {
// console.log('-----------------100'); // console.log('-----------------100');
// control.error('eroro000-----readableStream--------'); // control.error('eroro000-----readableStream--------');
@@ -36,11 +40,88 @@ try {
}, },
}); });
setTimeout(() => { // setTimeout(() => {
console.log('cancel'); // console.log('cancel');
}, 2000); // }, 2000);
await readableStream.pipeTo( // readableStream.pipeThrough(new TransformStream({
// start(controller){
// // setTimeout(()=>{
// // controller.error('xxxxxx')
// // console.log('--transform--start-');
// // },3000)
// },
// transform(chunk, controller){
// // throw 'err'
// Promise.reject('xxxx')
// // setTimeout(()=>{
// // console.log('--transform---');
// // // throw '333'
// // }, 2000)
// // controller.enqueue(chunk)
// }
// }))
// await read1.pipeTo(
// new WritableStream({
// async write(chunk, controller) {
// console.log(chunk);
// // throw 'pipeTo error';
// // controller.error('pipeTo has error');
// // await delay(1);
// // controller.error('error');
// // if (chunk === 7) {
// // throw 'error';
// // }
// },
// close() {
// console.log('close------WritableStream');
// },
// abort(reason) {
// console.log('abort--------', reason);
// },
// })
// );
// console.log('end--------');
// for await (const iterator of readableStream) {
// console.log(iterator);
// }
} catch (error) {
console.log('---end---', error);
}
try{
console.log('----------');
const transform = new TransformStream({
start(controller){
// setInterval(()=>{
// controller.enqueue('1234')
// }, 1000)
},
async transform(chunk, controller){
console.log('----------', chunk);
// controller.error('xxxxxxxxxxxxxxxxxxxxx')
// throw 'xxxxxx'
// console.log('----------', chunk);
// Promise.reject('xxxx')
setTimeout(()=>{
controller.error('xxxxxxxxxxxxxxxxxxxxx')
// throw '333'
}, 2000)
controller.enqueue(chunk)
return '-======='
}
})
transform.readable.pipeTo(
new WritableStream({ new WritableStream({
async write(chunk, controller) { async write(chunk, controller) {
console.log(chunk); console.log(chunk);
@@ -58,14 +139,14 @@ try {
abort(reason) { abort(reason) {
console.log('abort--------', reason); console.log('abort--------', reason);
}, },
})).catch(error=>{
console.log(error);
}) })
); const getWriter = transform.writable.getWriter()
await getWriter.write('abc').catch(error=>console.log(error))
getWriter.releaseLock()
console.log('xxxxxx');
// console.log('end--------'); }catch(errpr){
console.log(errpr);
// for await (const iterator of readableStream) { }
// console.log(iterator);
// }
} catch (error) {
console.log('---end---', error);
}