From c07364b358e7bba5fc4240b3c0bf4cc8c12d44bd Mon Sep 17 00:00:00 2001 From: lqqyt2423 <974923609@qq.com> Date: Tue, 15 Dec 2020 18:11:51 +0800 Subject: [PATCH] addon logic --- README.md | 2 +- flow/flow.go | 91 ++++++++++++++++++++++++++++++ proxy/helper.go | 26 +++++++++ proxy/proxy.go | 145 +++++++++++++++++++++++++++++++++++++++++++----- 4 files changed, 248 insertions(+), 16 deletions(-) create mode 100644 flow/flow.go create mode 100644 proxy/helper.go diff --git a/README.md b/README.md index cb6c4f0..afdb25f 100644 --- a/README.md +++ b/README.md @@ -12,6 +12,6 @@ - [x] 经内存转发 https 流量 - [x] 忽略某些错误例如:broken pipe, reset by peer, timeout - [x] websocket +- [x] 插件机制 - [ ] support get method with body - [ ] http2 -- [ ] 插件机制 diff --git a/flow/flow.go b/flow/flow.go new file mode 100644 index 0000000..e5a4fd6 --- /dev/null +++ b/flow/flow.go @@ -0,0 +1,91 @@ +package flow + +import ( + "net/http" + "net/url" + "time" + + _log "github.com/sirupsen/logrus" +) + +var log = _log.WithField("at", "flow") + +type Request struct { + Method string + URL *url.URL + Proto string + Header http.Header + Body []byte +} + +type Response struct { + StatusCode int + Header http.Header + Body []byte +} + +type Flow struct { + *Request + *Response + + // https://docs.mitmproxy.org/stable/overview-features/#streaming + // 如果为 true,则不缓冲 Request.Body 和 Response.Body,且不进入之后的 Addon.Request 和 Addon.Response + Stream bool + done chan struct{} +} + +func NewFlow() *Flow { + return &Flow{done: make(chan struct{})} +} + +func (f *Flow) Done() <-chan struct{} { + return f.done +} + +func (f *Flow) Finish() { + close(f.done) +} + +type Addon interface { + // HTTP request headers were successfully read. At this point, the body is empty. + Requestheaders(*Flow) + + // The full HTTP request has been read. + Request(*Flow) + + // HTTP response headers were successfully read. At this point, the body is empty. + Responseheaders(*Flow) + + // The full HTTP response has been read. + Response(*Flow) +} + +// BaseAddon do nothing +type BaseAddon struct{} + +func (addon *BaseAddon) Requestheaders(*Flow) {} +func (addon *BaseAddon) Request(*Flow) {} +func (addon *BaseAddon) Responseheaders(*Flow) {} +func (addon *BaseAddon) Response(*Flow) {} + +// LogAddon log http record +type LogAddon struct { + BaseAddon +} + +func (addon *LogAddon) Requestheaders(flo *Flow) { + log := log.WithField("in", "LogAddon") + start := time.Now() + go func() { + <-flo.Done() + var StatusCode int + if flo.Response != nil { + StatusCode = flo.Response.StatusCode + } + var contentLen int + if flo.Response != nil && flo.Response.Body != nil { + contentLen = len(flo.Response.Body) + } + log.Infof("%v %v %v %v - %v ms\n", flo.Request.Method, flo.Request.URL.String(), StatusCode, contentLen, time.Since(start).Milliseconds()) + }() +} diff --git a/proxy/helper.go b/proxy/helper.go new file mode 100644 index 0000000..33808cf --- /dev/null +++ b/proxy/helper.go @@ -0,0 +1,26 @@ +package proxy + +import ( + "bytes" + "io" +) + +// 尝试将 Reader 读取至 buffer 中 +func ReaderToBuffer(r io.Reader, limit int64) ([]byte, io.Reader, error) { + buf := bytes.NewBuffer(make([]byte, 0)) + lr := io.LimitReader(r, limit) + + _, err := io.Copy(buf, lr) + if err != nil { + return nil, nil, err + } + + // 达到上限 + if int64(buf.Len()) == limit { + // 返回新的 Reader + return nil, io.MultiReader(bytes.NewBuffer(buf.Bytes()), r), nil + } + + // 返回 buffer + return buf.Bytes(), nil, nil +} diff --git a/proxy/proxy.go b/proxy/proxy.go index 0592ecc..2ee1e07 100644 --- a/proxy/proxy.go +++ b/proxy/proxy.go @@ -1,6 +1,7 @@ package proxy import ( + "bytes" "crypto/tls" "io" "net" @@ -10,6 +11,7 @@ import ( "sync" "time" + "github.com/lqqyt2423/go-mitmproxy/flow" _log "github.com/sirupsen/logrus" ) @@ -75,9 +77,15 @@ type Options struct { } type Proxy struct { - Server *http.Server - Client *http.Client - Mitm Mitm + Server *http.Server + Client *http.Client + Mitm Mitm + StreamLargeBodies int64 + Addons []flow.Addon +} + +func (proxy *Proxy) AddAddon(addon flow.Addon) { + proxy.Addons = append(proxy.Addons, addon) } func (proxy *Proxy) Start() error { @@ -121,16 +129,92 @@ func (proxy *Proxy) ServeHTTP(res http.ResponseWriter, req *http.Request) { return } - start := time.Now() + endRes := func(response *flow.Response, body io.Reader) { + if response.Header != nil { + for key, value := range response.Header { + for _, v := range value { + res.Header().Add(key, v) + } + } + } + res.WriteHeader(response.StatusCode) + + if body != nil { + _, err := io.Copy(res, body) + if err != nil && !ignoreErr(log, err) { + log.Error(err) + } + } else if response.Body != nil && len(response.Body) > 0 { + _, err := res.Write(response.Body) + if err != nil && !ignoreErr(log, err) { + log.Error(err) + } + } + } + + // when addons panic + defer func() { + if err := recover(); err != nil { + log.Warnf("Recovered: %v\n", err) + } + }() + + flo := flow.NewFlow() + flo.Request = &flow.Request{ + Method: req.Method, + URL: req.URL, + Proto: req.Proto, + Header: req.Header, + } + defer flo.Finish() + + // trigger addon event Requestheaders + for _, addon := range proxy.Addons { + addon.Requestheaders(flo) + if flo.Response != nil { + endRes(flo.Response, nil) + return + } + } + + // 读 request body + var reqBody io.Reader = req.Body + if !flo.Stream { + reqBuf, r, err := ReaderToBuffer(req.Body, proxy.StreamLargeBodies) + reqBody = r + if err != nil { + log.Error(err) + res.WriteHeader(502) + return + } + if reqBuf == nil { + log.Warnf("request body size >= %v\n", proxy.StreamLargeBodies) + flo.Stream = true + } else { + flo.Request.Body = reqBuf + } + + // trigger addon event Request + if !flo.Stream { + for _, addon := range proxy.Addons { + addon.Request(flo) + if flo.Response != nil { + endRes(flo.Response, nil) + return + } + } + reqBody = bytes.NewReader(flo.Request.Body) + } + } - proxyReq, err := http.NewRequest(req.Method, req.URL.String(), req.Body) + proxyReq, err := http.NewRequest(flo.Request.Method, flo.Request.URL.String(), reqBody) if err != nil { log.Error(err) res.WriteHeader(502) return } - for key, value := range req.Header { + for key, value := range flo.Request.Header { for _, v := range value { proxyReq.Header.Add(key, v) } @@ -145,19 +229,46 @@ func (proxy *Proxy) ServeHTTP(res http.ResponseWriter, req *http.Request) { } defer proxyRes.Body.Close() - for key, value := range proxyRes.Header { - for _, v := range value { - res.Header().Add(key, v) + flo.Response = &flow.Response{ + StatusCode: proxyRes.StatusCode, + Header: proxyRes.Header, + } + + // trigger addon event Responseheaders + for _, addon := range proxy.Addons { + addon.Responseheaders(flo) + if flo.Response.Body != nil { + endRes(flo.Response, nil) + return } } - res.WriteHeader(proxyRes.StatusCode) - _, err = io.Copy(res, proxyRes.Body) - if err != nil && !ignoreErr(log, err) { - log.Error(err) - return + + // 读 response body + var resBody io.Reader = proxyRes.Body + if !flo.Stream { + resBuf, r, err := ReaderToBuffer(proxyRes.Body, proxy.StreamLargeBodies) + resBody = r + if err != nil { + log.Error(err) + res.WriteHeader(502) + return + } + if resBuf == nil { + log.Warnf("response body size >= %v\n", proxy.StreamLargeBodies) + flo.Stream = true + } else { + flo.Response.Body = resBuf + } + + // trigger addon event Response + if !flo.Stream { + for _, addon := range proxy.Addons { + addon.Response(flo) + } + } } - log.Infof("status code: %v cost %v ms\n", proxyRes.StatusCode, time.Since(start).Milliseconds()) + endRes(flo.Response, resBody) } func (proxy *Proxy) handleConnect(res http.ResponseWriter, req *http.Request) { @@ -233,6 +344,10 @@ func NewProxy(opts *Options) (*Proxy, error) { proxy.Mitm = mitm + proxy.StreamLargeBodies = 1024 * 1024 * 5 // 5mb + proxy.Addons = make([]flow.Addon, 0) + proxy.AddAddon(&flow.LogAddon{}) + return proxy, nil }