enhance logic

This commit is contained in:
zizifn
2022-12-06 15:22:54 +08:00
committed by zizifn
parent 96c877f52f
commit 6f16d2fc3c
7 changed files with 300 additions and 70 deletions

View File

@@ -1,11 +1,15 @@
import { serve } from 'https://deno.land/std@0.167.0/http/server.ts'; import { serve } from 'https://deno.land/std@0.167.0/http/server.ts';
import { buildRawHttp500, isVaildateReq } from './helper.ts';
const userID = Deno.env.get('UUID'); const userID = Deno.env.get('UUID');
const handler = async (request: Request): Promise<Response> => { const handler = async (request: Request): Promise<Response> => {
// console.log('--------start--------');
try {
const headers = request.headers; const headers = request.headers;
const serverAddress = headers.get('x-host') || ''; const serverAddress = headers.get('x-host') || '';
const remotePort = headers.get('x-port') || 443; const remotePort = headers.get('x-port') || 443;
const uuid = headers.get('x-uuid'); const isHttp = headers.get('x-http') === 'true';
const uuid = request.headers.get('x-uuid');
if (!serverAddress || !remotePort || !userID) { if (!serverAddress || !remotePort || !userID) {
return new Response( return new Response(
@@ -19,16 +23,28 @@ ${userID ? 'has UUID env' : 'no UUID env'}
} }
); );
} }
console.log(
`want to proxy to server address ${serverAddress}, and port ${remotePort}`
);
if (uuid !== userID) { if (uuid !== userID) {
return new Response('Do not send right UUID!', { return new Response(buildRawHttp500('Do not send right UUID!'), {
status: 403, status: 403,
headers: {}, headers: {},
}); });
} }
if (!isVaildateReq(request)) {
return new Response(
buildRawHttp500(
'request is not vaild due to lcoalip or request body is null'
),
{
status: 500,
headers: {},
}
);
}
console.log(
`want to proxy to server address ${serverAddress}, and port ${remotePort}`
);
const connection = await Deno.connect({ const connection = await Deno.connect({
port: Number(remotePort), port: Number(remotePort),
hostname: serverAddress, hostname: serverAddress,
@@ -39,17 +55,29 @@ ${userID ? 'has UUID env' : 'no UUID env'}
// and casue deno can't get TCP pcakge. // and casue deno can't get TCP pcakge.
// 2. current soluction for this, let proxy client wait for few ms and then end readablestream // 2. current soluction for this, let proxy client wait for few ms and then end readablestream
// 3. this is only inpact HTTP proxy not https // 3. this is only inpact HTTP proxy not https
(async () => { let readablestreamRsp = connection.readable;
if (isHttp) {
// if is http, we need wait for request read, or we can warpper into async function
for await (let chunk of request.body || []) { for await (let chunk of request.body || []) {
// console.log(new TextDecoder().decode(chunk)); // console.log(new TextDecoder().decode(chunk));
connection.write(chunk); connection.write(chunk);
} }
})(); readablestreamRsp = connection.readable;
} else {
readablestreamRsp = request.body!.pipeThrough(connection);
}
return new Response(connection.readable, { return new Response(readablestreamRsp, {
status: 200, status: 200,
headers: {}, headers: {},
}); });
} catch (error) {
console.log(error);
return new Response(buildRawHttp500('has error'), {
status: 500,
headers: {},
});
}
}; };
serve(handler, { port: 8080, hostname: '0.0.0.0' }); serve(handler, { port: 8080, hostname: '0.0.0.0' });

View File

@@ -0,0 +1,48 @@
import { EOL } from 'https://deno.land/std@0.110.0/node/os.ts';
function isPrivateIP(ip: string) {
if (ip === 'localhost') {
return true;
}
const parts = ip.split('.');
return (
parts[0] === '10' ||
(parts[0] === '172' &&
parseInt(parts[1], 10) >= 16 &&
parseInt(parts[1], 10) <= 31) ||
(parts[0] === '192' && parts[1] === '168')
);
}
function buildRawHttp500(message: string) {
const body = new TextEncoder().encode(`${message}`);
return new ReadableStream({
start(controller) {
controller.enqueue(
new TextEncoder().encode(`HTTP/1.1 500 Internal Server Error${EOL}`)
);
controller.enqueue(
new TextEncoder().encode(`content-length: ${body.length}${EOL}`)
);
controller.enqueue(
new TextEncoder().encode(
`content-type: text/plain;charset=UTF-8${EOL}${EOL}`
)
);
controller.enqueue(body);
controller.close();
},
cancel() {},
});
}
function isVaildateReq(request: Request) {
const serverAddress = request.headers.get('x-host') || '';
let isVaild = true;
if (isPrivateIP(serverAddress) || !request.body) {
console.log('lcoal ip or request.body is null');
isVaild = false;
}
return isVaild;
}
export { isPrivateIP, buildRawHttp500, isVaildateReq };

View File

@@ -0,0 +1,14 @@
// (async () => {
// try {
// for await (let chunk of request.body || []) {
// // console.log(new TextDecoder().decode(chunk));
// connection.write(chunk);
// }
// } catch (error) {
// console.log(error);
// return new Response('has error', {
// status: 500,
// headers: {},
// });
// }
// })();

View File

@@ -21,12 +21,12 @@ const handler = async (request: Request): Promise<Response> => {
}); });
// for await (const chunk of body2) { // for await (const chunk of body2) {
// console.log('11'); // console.log('11', new TextDecoder().decode(chunk));
// } // }
const proxyResp = body2?.pipeThrough(connection); const proxyResp = body2?.pipeThrough(connection);
for await (const chunk of proxyResp) { for await (const chunk of proxyResp) {
console.log('11'); console.log('11', new TextDecoder().decode(chunk));
} }
return new Response('111', { return new Response('111', {
status: 200, status: 200,

View File

@@ -3,18 +3,19 @@ import { pipeline, Readable } from 'node:stream';
import { config } from './lib/cmd'; import { config } from './lib/cmd';
import * as url from 'node:url'; import * as url from 'node:url';
import * as undici from 'undici'; import * as undici from 'undici';
import { import { concatStreams, rawHTTPPackage } from './lib/helper';
concatStreams,
rawHTTPHeader,
rawHTTPPackage,
rawHTTPPackageWithDelay,
} from './lib/helper';
const isLocal = process.env.env === 'LOCAL';
const httpProxyServer = createServer(async (req, resp) => { const httpProxyServer = createServer(async (req, resp) => {
const reqUrl = url.parse(req.url); const reqUrl = url.parse(req.url);
const clientSocketLoggerInfo = `[proxy to ${req.url}(http)]`; const clientSocketLoggerInfo = `[proxy to ${req.url}](http)`;
try { try {
console.log(`${clientSocketLoggerInfo} Client use HTTP/${req.httpVersion}`); console.log(`${clientSocketLoggerInfo} Client use HTTP/${req.httpVersion}`);
// for await (const chunk of req.socket) {
// console.log(chunk.toString());
// }
// make call to edge http server // make call to edge http server
// 1. forward all package remote, socket over http body // 1. forward all package remote, socket over http body
const { body, headers, statusCode, trailers } = await undici.request( const { body, headers, statusCode, trailers } = await undici.request(
@@ -27,8 +28,7 @@ const httpProxyServer = createServer(async (req, resp) => {
'x-http': 'true', 'x-http': 'true',
}, },
method: 'POST', method: 'POST',
// append few ms for body body: Readable.from(rawHTTPPackage(req)),
body: Readable.from(rawHTTPPackageWithDelay(req)),
} }
); );
console.log( console.log(
@@ -36,6 +36,9 @@ const httpProxyServer = createServer(async (req, resp) => {
); );
// 2. forward remote reponse body to clientSocket // 2. forward remote reponse body to clientSocket
for await (const chunk of body) { for await (const chunk of body) {
if (isLocal) {
console.log(chunk.toString());
}
req.socket.write(chunk); req.socket.write(chunk);
} }
body.on('error', (err) => { body.on('error', (err) => {
@@ -44,16 +47,6 @@ const httpProxyServer = createServer(async (req, resp) => {
err err
); );
}); });
// issue with pipeline
// https://stackoverflow.com/questions/55959479/error-err-stream-premature-close-premature-close-in-node-pipeline-stream
pipeline(body, req.socket, (error) => {
console.log(
`${clientSocketLoggerInfo} remote server to clientSocket has error: ` +
error
);
req.socket.end();
req.socket.destroy();
});
} catch (error) { } catch (error) {
req.socket.end(); req.socket.end();
req.socket.destroy(); req.socket.destroy();
@@ -76,6 +69,7 @@ httpProxyServer.on('connect', async (req, clientSocket, head) => {
`HTTP/${req.httpVersion} 200 Connection Established\r\n\r\n` `HTTP/${req.httpVersion} 200 Connection Established\r\n\r\n`
); );
// console.log(config);
// make call to edge http server // make call to edge http server
// 1. forward all package remote, socket over http body // 1. forward all package remote, socket over http body
const { body, headers, statusCode, trailers } = await undici.request( const { body, headers, statusCode, trailers } = await undici.request(
@@ -100,14 +94,6 @@ httpProxyServer.on('connect', async (req, clientSocket, head) => {
body.on('error', (err) => { body.on('error', (err) => {
console.log(`${clientSocketLoggerInfo} body error`, err); console.log(`${clientSocketLoggerInfo} body error`, err);
}); });
// pipeline(body, clientSocket, (error) => {
// console.log(
// `${clientSocketLoggerInfo} remote server to clientSocket has error: `,
// error
// );
// body?.destroy();
// clientSocket.destroy();
// });
clientSocket.on('error', (e) => { clientSocket.on('error', (e) => {
body?.destroy(); body?.destroy();
clientSocket.destroy(); clientSocket.destroy();
@@ -133,7 +119,7 @@ httpProxyServer.on('clientError', (err, clientSocket) => {
}); });
httpProxyServer.on('close', () => { httpProxyServer.on('close', () => {
console.log('Client Disconnected'); console.log('Server close');
}); });
httpProxyServer.listen(Number(config.port), () => { httpProxyServer.listen(Number(config.port), () => {

View File

@@ -0,0 +1,152 @@
import { createServer } from 'node:http';
import { pipeline, Readable } from 'node:stream';
import { config } from '../lib/cmd';
import * as url from 'node:url';
import * as undici from 'undici';
import {
concatStreams,
rawHTTPHeader,
rawHTTPPackage,
rawHTTPPackageWithDelay,
} from '../lib/helper';
const isLocal = process.env.env === 'LOCAL';
const httpProxyServer = createServer(async (req, resp) => {
const reqUrl = url.parse(req.url);
const clientSocketLoggerInfo = `[proxy to ${req.url}](http)`;
try {
console.log(`${clientSocketLoggerInfo} Client use HTTP/${req.httpVersion}`);
// for await (const chunk of req.socket) {
// console.log(chunk.toString());
// }
// make call to edge http server
// 1. forward all package remote, socket over http body
const { body, headers, statusCode, trailers } = await undici.request(
config.address,
{
headers: {
'x-host': reqUrl.hostname,
'x-port': reqUrl.port || '80',
'x-uuid': config.uuid,
'x-http': 'true',
},
method: 'POST',
// append few ms for body
// body: Readable.from(rawHTTPPackageWithDelay(req)),
body: Readable.from(rawHTTPPackage(req)),
}
);
console.log(
`${clientSocketLoggerInfo} remote server return ${statusCode} Connected To Proxy`
);
// 2. forward remote reponse body to clientSocket
for await (const chunk of body) {
if (isLocal) {
console.log(chunk.toString());
}
req.socket.write(chunk);
}
body.on('error', (err) => {
console.log(
`${clientSocketLoggerInfo} remote server response body has error`,
err
);
});
// issue with pipeline
// https://stackoverflow.com/questions/55959479/error-err-stream-premature-close-premature-close-in-node-pipeline-stream
// pipeline(body, req.socket, (error) => {
// console.log(
// `${clientSocketLoggerInfo} remote server to clientSocket has error: ` +
// error
// );
// req.socket.end();
// req.socket.destroy();
// });
} catch (error) {
req.socket.end();
req.socket.destroy();
console.log(`${clientSocketLoggerInfo} has error `, error);
}
});
// handle https website
httpProxyServer.on('connect', async (req, clientSocket, head) => {
const reqUrl = url.parse('https://' + req.url);
const clientSocketLoggerInfo = `[proxy to ${req.url}]`;
try {
console.log(
`${clientSocketLoggerInfo} Client use HTTP/${
req.httpVersion
} Connected To Proxy, head on connect is ${head.toString() || 'empty'}`
);
// We need only the data once, the starting packet, per http proxy spec
clientSocket.write(
`HTTP/${req.httpVersion} 200 Connection Established\r\n\r\n`
);
// console.log(config);
// make call to edge http server
// 1. forward all package remote, socket over http body
const { body, headers, statusCode, trailers } = await undici.request(
config.address,
{
headers: {
'x-host': reqUrl.hostname,
'x-port': reqUrl.port,
'x-uuid': config.uuid,
// "Content-Type": "text/plain",
},
method: 'POST',
body: Readable.from(concatStreams([head, clientSocket])),
}
);
console.log(`${clientSocketLoggerInfo} remote server return ${statusCode}`);
// 2. forward remote reponse body to clientSocket
// 2. forward remote reponse body to clientSocket
for await (const chunk of body) {
clientSocket.write(chunk);
}
body.on('error', (err) => {
console.log(`${clientSocketLoggerInfo} body error`, err);
});
// pipeline(body, clientSocket, (error) => {
// console.log(
// `${clientSocketLoggerInfo} remote server to clientSocket has error: `,
// error
// );
// body?.destroy();
// clientSocket.destroy();
// });
clientSocket.on('error', (e) => {
body?.destroy();
clientSocket.destroy();
console.log(`${clientSocketLoggerInfo} clientSocket has error: ` + e);
});
clientSocket.on('end', () => {
console.log(`${clientSocketLoggerInfo} has done and end.`);
});
} catch (error) {
clientSocket.destroy();
console.log(`${clientSocketLoggerInfo} has error `, error);
}
});
httpProxyServer.on('error', (err) => {
console.log('SERVER ERROR');
console.log(err);
throw err;
});
httpProxyServer.on('clientError', (err, clientSocket) => {
console.log('client error: ' + err);
clientSocket.end('HTTP/1.1 400 Bad Request\r\n\r\n');
});
httpProxyServer.on('close', () => {
console.log('Server close');
});
httpProxyServer.listen(Number(config.port), () => {
console.log('Server runnig at http://localhost:' + config.port);
});

View File

@@ -54,7 +54,9 @@ const httpServer = createServer(async (req, resp) => {
() => { () => {
console.log('connected'); console.log('connected');
resp.writeHead(200); resp.writeHead(200);
process.nextTick(() => {
rawHttp.pipe(socket).pipe(resp); rawHttp.pipe(socket).pipe(resp);
});
} }
); );