From a94d64bed1deb470b79d51861ee078a07573a2b6 Mon Sep 17 00:00:00 2001 From: "yu.deng" Date: Thu, 30 Jun 2022 14:58:26 +0800 Subject: [PATCH 1/3] =?UTF-8?q?=E6=94=AF=E6=8C=81=20stream=20=E7=9A=84?= =?UTF-8?q?=E4=BF=AE=E6=94=B9?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- proxy/addon.go | 13 +++++++++++++ proxy/proxy.go | 6 ++++++ 2 files changed, 19 insertions(+) diff --git a/proxy/addon.go b/proxy/addon.go index d10e055..4ca013a 100644 --- a/proxy/addon.go +++ b/proxy/addon.go @@ -1,6 +1,7 @@ package proxy import ( + "io" "time" log "github.com/sirupsen/logrus" @@ -33,6 +34,12 @@ type Addon interface { // The full HTTP response has been read. Response(*Flow) + + // Stream request body modifier + StreamRequestModifier(io.Reader) io.Reader + + // Stream response body modifier + StreamResponseModifier(io.Reader) io.Reader } // BaseAddon do nothing @@ -49,6 +56,12 @@ func (addon *BaseAddon) Requestheaders(*Flow) {} func (addon *BaseAddon) Request(*Flow) {} func (addon *BaseAddon) Responseheaders(*Flow) {} func (addon *BaseAddon) Response(*Flow) {} +func (addon *BaseAddon) StreamRequestModifier(in io.Reader) io.Reader { + return in +} +func (addon *BaseAddon) StreamResponseModifier(in io.Reader) io.Reader { + return in +} // LogAddon log connection and flow type LogAddon struct { diff --git a/proxy/proxy.go b/proxy/proxy.go index ab0f641..88d87a4 100644 --- a/proxy/proxy.go +++ b/proxy/proxy.go @@ -189,6 +189,9 @@ func (proxy *Proxy) ServeHTTP(res http.ResponseWriter, req *http.Request) { } } + for _, addon := range proxy.Addons { + reqBody = addon.StreamRequestModifier(reqBody) + } proxyReq, err := http.NewRequest(f.Request.Method, f.Request.URL.String(), reqBody) if err != nil { log.Error(err) @@ -247,6 +250,9 @@ func (proxy *Proxy) ServeHTTP(res http.ResponseWriter, req *http.Request) { } } } + for _, addon := range proxy.Addons { + resBody = addon.StreamResponseModifier(resBody) + } reply(f.Response, resBody) } From 396be08c27a62901cc783f41d7b3b00d7839ff34 Mon Sep 17 00:00:00 2001 From: "yu.deng" Date: Thu, 30 Jun 2022 15:20:06 +0800 Subject: [PATCH 2/3] =?UTF-8?q?proxy.Response=20=E6=B7=BB=E5=8A=A0=20BodyR?= =?UTF-8?q?eader=EF=BC=8C=E6=94=AF=E6=8C=81=E6=B7=BB=E5=8A=A0=E8=87=AA?= =?UTF-8?q?=E5=AE=9A=E4=B9=89=E7=9A=84stream=20body?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- proxy/flow.go | 2 ++ proxy/proxy.go | 17 +++++++++++++---- 2 files changed, 15 insertions(+), 4 deletions(-) diff --git a/proxy/flow.go b/proxy/flow.go index 2bec3b6..2c09c87 100644 --- a/proxy/flow.go +++ b/proxy/flow.go @@ -3,6 +3,7 @@ package proxy import ( "encoding/json" "errors" + "io" "net/http" "net/url" @@ -96,6 +97,7 @@ type Response struct { StatusCode int `json:"statusCode"` Header http.Header `json:"header"` Body []byte `json:"-"` + BodyReader io.Reader decodedBody []byte decoded bool // decoded reports whether the response was sent compressed but was decoded to decodedBody. diff --git a/proxy/proxy.go b/proxy/proxy.go index 88d87a4..8776329 100644 --- a/proxy/proxy.go +++ b/proxy/proxy.go @@ -131,10 +131,19 @@ func (proxy *Proxy) ServeHTTP(res http.ResponseWriter, req *http.Request) { if err != nil { logErr(log, err) } - } else if response.Body != nil && len(response.Body) > 0 { - _, err := res.Write(response.Body) - if err != nil { - logErr(log, err) + } + if response.Body != nil { + if response.BodyReader != nil { + _, err := io.Copy(res, response.BodyReader) + if err != nil { + logErr(log, err) + } + } + if len(response.Body) > 0 { + _, err := res.Write(response.Body) + if err != nil { + logErr(log, err) + } } } } From 9080c0e1160f08a5636e1d9bd3bb4ca6d0e05a18 Mon Sep 17 00:00:00 2001 From: "yu.deng" Date: Thu, 30 Jun 2022 15:54:55 +0800 Subject: [PATCH 3/3] need flow context in stream modifier provide extra info --- proxy/addon.go | 8 ++++---- proxy/proxy.go | 24 +++++++++++------------- 2 files changed, 15 insertions(+), 17 deletions(-) diff --git a/proxy/addon.go b/proxy/addon.go index 4ca013a..7ed029d 100644 --- a/proxy/addon.go +++ b/proxy/addon.go @@ -36,10 +36,10 @@ type Addon interface { Response(*Flow) // Stream request body modifier - StreamRequestModifier(io.Reader) io.Reader + StreamRequestModifier(*Flow, io.Reader) io.Reader // Stream response body modifier - StreamResponseModifier(io.Reader) io.Reader + StreamResponseModifier(*Flow, io.Reader) io.Reader } // BaseAddon do nothing @@ -56,10 +56,10 @@ func (addon *BaseAddon) Requestheaders(*Flow) {} func (addon *BaseAddon) Request(*Flow) {} func (addon *BaseAddon) Responseheaders(*Flow) {} func (addon *BaseAddon) Response(*Flow) {} -func (addon *BaseAddon) StreamRequestModifier(in io.Reader) io.Reader { +func (addon *BaseAddon) StreamRequestModifier(f *Flow, in io.Reader) io.Reader { return in } -func (addon *BaseAddon) StreamResponseModifier(in io.Reader) io.Reader { +func (addon *BaseAddon) StreamResponseModifier(f *Flow, in io.Reader) io.Reader { return in } diff --git a/proxy/proxy.go b/proxy/proxy.go index 8776329..3a68425 100644 --- a/proxy/proxy.go +++ b/proxy/proxy.go @@ -132,18 +132,16 @@ func (proxy *Proxy) ServeHTTP(res http.ResponseWriter, req *http.Request) { logErr(log, err) } } - if response.Body != nil { - if response.BodyReader != nil { - _, err := io.Copy(res, response.BodyReader) - if err != nil { - logErr(log, err) - } + if response.BodyReader != nil { + _, err := io.Copy(res, response.BodyReader) + if err != nil { + logErr(log, err) } - if len(response.Body) > 0 { - _, err := res.Write(response.Body) - if err != nil { - logErr(log, err) - } + } + if response.Body != nil && len(response.Body) > 0 { + _, err := res.Write(response.Body) + if err != nil { + logErr(log, err) } } } @@ -199,7 +197,7 @@ func (proxy *Proxy) ServeHTTP(res http.ResponseWriter, req *http.Request) { } for _, addon := range proxy.Addons { - reqBody = addon.StreamRequestModifier(reqBody) + reqBody = addon.StreamRequestModifier(f, reqBody) } proxyReq, err := http.NewRequest(f.Request.Method, f.Request.URL.String(), reqBody) if err != nil { @@ -260,7 +258,7 @@ func (proxy *Proxy) ServeHTTP(res http.ResponseWriter, req *http.Request) { } } for _, addon := range proxy.Addons { - resBody = addon.StreamResponseModifier(resBody) + resBody = addon.StreamResponseModifier(f, resBody) } reply(f.Response, resBody)