* chaneg deno code

* enhance logic

* chaneg to remove eol

* add cient code

* remove end call

* enhance logger

* add doc
This commit is contained in:
zizifn
2022-12-07 01:23:20 +08:00
committed by GitHub
parent 56f931dc06
commit d34bdb1907
5 changed files with 92 additions and 42 deletions

View File

@@ -1,5 +1,6 @@
{
"port": "",
"address": "",
"uuid": ""
"uuid": "",
"logLevel": "DEBUG"
}

View File

@@ -1,11 +1,12 @@
import { Command } from 'commander';
import { writeFileSync, existsSync, readFileSync } from 'fs';
import { exit } from 'node:process';
import { env } from 'process';
let config: {
port: string;
address: string;
uuid: string;
config: string;
logLevel: string;
} = null;
const program = new Command();
program.option('-v').action((options) => {
@@ -54,5 +55,6 @@ program
}
});
program.parse();
env.NODE_ENV = env.NODE_ENV || 'production';
export { config };

View File

@@ -1,6 +1,7 @@
import { IncomingMessage } from 'http';
import * as os from 'os';
import * as url from 'node:url';
import { config } from './cmd';
async function* concatStreams(readables: any[]) {
for (const readable of readables) {
@@ -57,9 +58,37 @@ function rawHTTPPackageWithDelay(req: IncomingMessage) {
return concatStreams([[rawHttpHeader], req, deplay(500)]);
}
function errorHandler() {
process
.on('unhandledRejection', (reason, p) => {
console.error(reason, 'Unhandled Rejection at Promise', p);
})
.on('uncaughtException', (err) => {
console.error(err, 'Uncaught Exception thrown');
// should exit node.js, but anyway
// process.exit(1);
});
}
function loghelper(...args) {
let logs = args;
if (config.logLevel?.toUpperCase() !== 'DEBUG') {
logs = args.map((item) => {
if (item instanceof Error) {
return `${item.message}`;
} else {
return item;
}
});
}
console.log('', ...logs);
}
export {
concatStreams,
rawHTTPPackage,
rawHTTPHeader,
rawHTTPPackageWithDelay,
errorHandler,
loghelper,
};

View File

@@ -3,21 +3,33 @@ import { pipeline, Readable } from 'node:stream';
import { config } from './lib/cmd';
import * as url from 'node:url';
import * as undici from 'undici';
import { concatStreams, rawHTTPPackage } from './lib/helper';
import {
concatStreams,
rawHTTPPackage,
errorHandler,
loghelper,
} from './lib/helper';
const isLocal = process.env.LOCAL === 'true';
errorHandler();
const isLocal = process.env.DEBUG === 'true';
const httpProxyServer = createServer(async (req, resp) => {
const reqUrl = url.parse(req.url);
const clientSocketLoggerInfo = `[proxy to ${req.url}](http)`;
try {
console.log(`${clientSocketLoggerInfo} Client use HTTP/${req.httpVersion}`);
// for await (const chunk of req.socket) {
// console.log(chunk.toString());
// }
loghelper(`${clientSocketLoggerInfo} Client use HTTP/${req.httpVersion}`);
// make call to edge http server
// 1. forward all package remote, socket over http body
const fromClientReq = Readable.from(rawHTTPPackage(req)).on(
'error',
(error) => {
loghelper(
`${clientSocketLoggerInfo} client socket to remote http body has error`,
error
);
req.destroy();
}
);
const { body, headers, statusCode, trailers } = await undici.request(
config.address,
{
@@ -28,30 +40,32 @@ const httpProxyServer = createServer(async (req, resp) => {
'x-http': 'true',
},
method: 'POST',
body: Readable.from(rawHTTPPackage(req)),
body: fromClientReq,
}
);
console.log(
loghelper(
`${clientSocketLoggerInfo} remote server return ${statusCode} Connected To Proxy`
);
// 2. forward remote reponse body to clientSocket
for await (const chunk of body) {
if (isLocal) {
console.log(chunk.toString());
loghelper(chunk.toString());
}
req.socket.write(chunk);
// resp.write(chunk);
}
// resp.end();
body.on('error', (err) => {
console.log(
loghelper(
`${clientSocketLoggerInfo} remote server response body has error`,
err
);
req.socket.destroy();
req.destroy();
});
} catch (error) {
req.destroy();
req.socket?.end();
req.socket?.destroy();
console.log(`${clientSocketLoggerInfo} has error `, error);
loghelper(`${clientSocketLoggerInfo} has error `, error);
}
});
@@ -59,8 +73,9 @@ const httpProxyServer = createServer(async (req, resp) => {
httpProxyServer.on('connect', async (req, clientSocket, head) => {
const reqUrl = url.parse('https://' + req.url);
const clientSocketLoggerInfo = `[proxy to ${req.url}]`;
let fromClientSocket = null;
try {
console.log(
loghelper(
`${clientSocketLoggerInfo} Client use HTTP/${
req.httpVersion
} Connected To Proxy, head on connect is ${head.toString() || 'empty'}`
@@ -70,18 +85,10 @@ httpProxyServer.on('connect', async (req, clientSocket, head) => {
`HTTP/${req.httpVersion} 200 Connection Established\r\n\r\n`
);
// console.log(config);
// loghelper(config);
// make call to edge http server
// 1. forward all package remote, socket over http body
const fromClientSocket = Readable.from(
concatStreams([head, clientSocket])
).on('error', (error) => {
console.log(
`${clientSocketLoggerInfo} client socket to remote http body has error`,
error
);
clientSocket.destroy();
});
fromClientSocket = Readable.from(concatStreams([head, clientSocket]));
const { body, headers, statusCode, trailers } = await undici.request(
config.address,
{
@@ -95,46 +102,57 @@ httpProxyServer.on('connect', async (req, clientSocket, head) => {
body: fromClientSocket,
}
);
console.log(`${clientSocketLoggerInfo} remote server return ${statusCode}`);
fromClientSocket.on('error', (error) => {
loghelper(
`${clientSocketLoggerInfo} client socket to remote http body has error`,
error
);
//
fromClientSocket.push(null);
clientSocket.destroy();
});
loghelper(`${clientSocketLoggerInfo} remote server return ${statusCode}`);
// 2. forward remote reponse body to clientSocket
for await (const chunk of body) {
clientSocket.write(chunk);
}
body.on('error', (err) => {
console.log(
loghelper(
`${clientSocketLoggerInfo} remote response body has error`,
err
);
fromClientSocket.push(null);
clientSocket.destroy();
});
clientSocket.on('error', (e) => {
body?.destroy();
clientSocket.destroy();
console.log(`${clientSocketLoggerInfo} clientSocket has error: ` + e);
fromClientSocket.push(null);
clientSocket.end();
loghelper(`${clientSocketLoggerInfo} clientSocket has error: ` + e);
});
clientSocket.on('end', () => {
console.log(`${clientSocketLoggerInfo} has done and end.`);
loghelper(`${clientSocketLoggerInfo} has done and end.`);
});
} catch (error) {
clientSocket.destroy();
console.log(`${clientSocketLoggerInfo} has error `, error);
fromClientSocket?.push(null);
clientSocket.end();
loghelper(`${clientSocketLoggerInfo} has error `, error);
}
});
httpProxyServer.on('error', (err) => {
console.log('SERVER ERROR');
console.log(err);
throw err;
loghelper('SERVER ERROR', err);
});
httpProxyServer.on('clientError', (err, clientSocket) => {
console.log('client error: ' + err);
loghelper('client error: ' + err);
clientSocket.end('HTTP/1.1 400 Bad Request\r\n\r\n');
});
httpProxyServer.on('close', () => {
console.log('Server close');
loghelper('Server close');
});
httpProxyServer.listen(Number(config.port), () => {
console.log('Server runnig at http://localhost:' + config.port);
loghelper('Server runnig at http://localhost:' + config.port);
});