From c07adef09c1d813bc9b889a7eeb637d40ad4b7b8 Mon Sep 17 00:00:00 2001 From: liqiang <974923609@qq.com> Date: Tue, 23 Mar 2021 17:40:49 +0800 Subject: [PATCH] chore: message protocol backend --- addon/web/conn.go | 62 +++++------- addon/web/message.go | 220 ++++++++++++++++++++++++++++++++++++------- addon/web/web.go | 18 ++-- flow/flow.go | 1 - 4 files changed, 220 insertions(+), 81 deletions(-) diff --git a/addon/web/conn.go b/addon/web/conn.go index 922ef03..07fbaf4 100644 --- a/addon/web/conn.go +++ b/addon/web/conn.go @@ -1,14 +1,18 @@ package web import ( - "encoding/json" - "strings" "sync" "github.com/gorilla/websocket" "github.com/lqqyt2423/go-mitmproxy/flow" ) +type breakPointRule struct { + Method string `json:"method"` + URL string `json:"url"` + Action int `json:"action"` // 1 - change request 2 - change response 3 - both +} + type concurrentConn struct { conn *websocket.Conn mu sync.Mutex @@ -16,7 +20,7 @@ type concurrentConn struct { waitChans map[string]chan interface{} waitChansMu sync.Mutex - interceptUri string + breakPointRules []*breakPointRule } func newConn(c *websocket.Conn) *concurrentConn { @@ -26,7 +30,7 @@ func newConn(c *websocket.Conn) *concurrentConn { } } -func (c *concurrentConn) writeMessage(msg *message, f *flow.Flow) { +func (c *concurrentConn) writeMessage(msg *messageFlow, f *flow.Flow) { if c.isIntercpt(f, msg) { msg.waitIntercept = 1 } @@ -63,27 +67,15 @@ func (c *concurrentConn) readloop() { continue } - if msg.mType == messageTypeChangeInterceptUri { - interceptUri := "" - if len(msg.content) > 0 { - interceptUri = string(msg.content) - } - c.interceptUri = interceptUri - continue - } - - if msg.mType == messageTypeChangeRequest { - req := new(flow.Request) - err := json.Unmarshal(msg.content, req) - if err != nil { - log.Error(err) - continue - } - - ch := c.initWaitChan(msg.id.String()) - go func(req *flow.Request, ch chan<- interface{}) { - ch <- req - }(req, ch) + if msgEdit, ok := msg.(*messageEdit); ok { + ch := c.initWaitChan(msgEdit.id.String()) + go func(m *messageEdit, ch chan<- interface{}) { + ch <- m + }(msgEdit, ch) + } else if msgMeta, ok := msg.(*messageMeta); ok { + c.breakPointRules = msgMeta.breakPointRules + } else { + log.Warn("invalid message, skip") } } } @@ -101,28 +93,24 @@ func (c *concurrentConn) initWaitChan(key string) chan interface{} { } // 是否拦截 -func (c *concurrentConn) isIntercpt(f *flow.Flow, after *message) bool { +func (c *concurrentConn) isIntercpt(f *flow.Flow, after *messageFlow) bool { if after.mType != messageTypeRequest { return false } - if c.interceptUri == "" { - return false - } - if strings.Contains(f.Request.URL.String(), c.interceptUri) { - return true - } return false } // 拦截 -func (c *concurrentConn) waitIntercept(f *flow.Flow, after *message) { +func (c *concurrentConn) waitIntercept(f *flow.Flow, after *messageFlow) { log.Infof("waiting Intercept: %s\n", f.Request.URL) ch := c.initWaitChan(f.Id.String()) - req := (<-ch).(*flow.Request) + msg := (<-ch).(*messageEdit) log.Infof("waited Intercept: %s\n", f.Request.URL) - f.Request.Method = req.Method - f.Request.URL = req.URL - f.Request.Header = req.Header + // f.Request.Method = req.Method + // f.Request.URL = req.URL + // f.Request.Header = req.Header + + log.Infof("waitIntercept: %v", msg) } diff --git a/addon/web/message.go b/addon/web/message.go index e70f142..72681ac 100644 --- a/addon/web/message.go +++ b/addon/web/message.go @@ -2,12 +2,28 @@ package web import ( "bytes" + "encoding/binary" "encoding/json" + "errors" "github.com/lqqyt2423/go-mitmproxy/flow" uuid "github.com/satori/go.uuid" ) +// message: + +// type: 1/2/3/4 +// messageFlow +// version 1 byte + type 1 byte + id 36 byte + waitIntercept 1 byte + content left bytes + +// type: 11/12 +// messageEdit +// version 1 byte + type 1 byte + id 36 byte + header len 4 byte + header content bytes + body len 4 byte + [body content bytes] + +// type: 21 +// messageMeta +// version 1 byte + type 1 byte + content left bytes + const messageVersion = 1 type messageType byte @@ -18,9 +34,10 @@ const ( messageTypeResponse messageType = 3 messageTypeResponseBody messageType = 4 - messageTypeChangeRequest messageType = 11 + messageTypeChangeRequest messageType = 11 + messageTypeChangeResponse messageType = 12 - messageTypeChangeInterceptUri messageType = 21 + messageTypeChangeBreakPointRules messageType = 21 ) var allMessageTypes = []messageType{ @@ -29,7 +46,8 @@ var allMessageTypes = []messageType{ messageTypeResponse, messageTypeResponseBody, messageTypeChangeRequest, - messageTypeChangeInterceptUri, + messageTypeChangeResponse, + messageTypeChangeBreakPointRules, } func validMessageType(t byte) bool { @@ -41,73 +59,207 @@ func validMessageType(t byte) bool { return false } -type message struct { +type message interface { + bytes() []byte +} + +type messageFlow struct { mType messageType id uuid.UUID waitIntercept byte content []byte } -func newMessage(mType messageType, id uuid.UUID, content []byte) *message { - return &message{ +func newMessageFlow(mType messageType, f *flow.Flow) *messageFlow { + var content []byte + var err error = nil + + if mType == messageTypeRequest { + content, err = json.Marshal(f.Request) + } else if mType == messageTypeRequestBody { + content = f.Request.Body + } else if mType == messageTypeResponse { + content, err = json.Marshal(f.Response) + } else if mType == messageTypeResponseBody { + content, err = f.Response.DecodedBody() + } else { + panic(errors.New("invalid message type")) + } + + if err != nil { + panic(err) + } + + return &messageFlow{ mType: mType, - id: id, + id: f.Id, content: content, } } -func parseMessage(data []byte) *message { - if len(data) < 39 { +func (m *messageFlow) bytes() []byte { + buf := bytes.NewBuffer(make([]byte, 0)) + buf.WriteByte(byte(messageVersion)) + buf.WriteByte(byte(m.mType)) + buf.WriteString(m.id.String()) // len: 36 + buf.WriteByte(m.waitIntercept) + buf.Write(m.content) + return buf.Bytes() +} + +type messageEdit struct { + mType messageType + id uuid.UUID + request *flow.Request + response *flow.Response +} + +func parseMessageEdit(data []byte) *messageEdit { + // 2 + 36 + 4 + 4 + if len(data) < 46 { return nil } - if data[0] != messageVersion { + + mType := (messageType)(data[1]) + + id, err := uuid.FromString(string(data[2:38])) + if err != nil { return nil } - if !validMessageType(data[1]) { + + hl := (int)(binary.BigEndian.Uint32(data[38:42])) + if 42+hl+4 > len(data) { return nil } + headerContent := data[42 : 42+hl] - id, err := uuid.FromString(string(data[3:39])) - if err != nil { + bl := (int)(binary.BigEndian.Uint32(data[42+hl : 42+hl+4])) + if 42+hl+4+bl != len(data) { + return nil + } + bodyContent := data[42+hl+4:] + + msg := &messageEdit{ + mType: mType, + id: id, + } + + if mType == messageTypeChangeRequest { + req := new(flow.Request) + err := json.Unmarshal(headerContent, req) + if err != nil { + return nil + } + req.Body = bodyContent + msg.request = req + } else if mType == messageTypeChangeResponse { + res := new(flow.Response) + err := json.Unmarshal(headerContent, res) + if err != nil { + return nil + } + res.Body = bodyContent + msg.response = res + } else { return nil } - msg := newMessage(messageType(data[1]), id, data[39:]) - msg.waitIntercept = data[2] return msg } -func newMessageRequest(f *flow.Flow) *message { - content, err := json.Marshal(f.Request) - if err != nil { - panic(err) +func (m *messageEdit) bytes() []byte { + buf := bytes.NewBuffer(make([]byte, 0)) + buf.WriteByte(byte(messageVersion)) + buf.WriteByte(byte(m.mType)) + buf.WriteString(m.id.String()) // len: 36 + + if m.mType == messageTypeChangeRequest { + headerContent, err := json.Marshal(m.request) + if err != nil { + panic(err) + } + hl := make([]byte, 4) + binary.BigEndian.PutUint32(hl, (uint32)(len(headerContent))) + buf.Write(hl) + + bodyContent := m.request.Body + bl := make([]byte, 4) + binary.BigEndian.PutUint32(bl, (uint32)(len(bodyContent))) + buf.Write(bl) + buf.Write(bodyContent) + } else if m.mType == messageTypeChangeResponse { + headerContent, err := json.Marshal(m.response) + if err != nil { + panic(err) + } + hl := make([]byte, 4) + binary.BigEndian.PutUint32(hl, (uint32)(len(headerContent))) + buf.Write(hl) + + bodyContent := m.response.Body + bl := make([]byte, 4) + binary.BigEndian.PutUint32(bl, (uint32)(len(bodyContent))) + buf.Write(bl) + buf.Write(bodyContent) } - return newMessage(messageTypeRequest, f.Id, content) + + return buf.Bytes() } -func newMessageRequestBody(f *flow.Flow) *message { - return newMessage(messageTypeRequestBody, f.Id, f.Request.Body) +type messageMeta struct { + mType messageType + breakPointRules []*breakPointRule } -func newMessageResponse(f *flow.Flow) *message { - content, err := json.Marshal(f.Response) +func parseMessageMeta(data []byte) *messageMeta { + content := data[2:] + rules := make([]*breakPointRule, 0) + err := json.Unmarshal(content, &rules) if err != nil { - panic(err) + return nil } - return newMessage(messageTypeResponse, f.Id, content) -} -func newMessageResponseBody(f *flow.Flow) *message { - body, _ := f.Response.DecodedBody() - return newMessage(messageTypeResponseBody, f.Id, body) + return &messageMeta{ + mType: messageType(data[1]), + breakPointRules: rules, + } } -func (m *message) bytes() []byte { +func (m *messageMeta) bytes() []byte { buf := bytes.NewBuffer(make([]byte, 0)) buf.WriteByte(byte(messageVersion)) buf.WriteByte(byte(m.mType)) - buf.WriteByte(m.waitIntercept) - buf.WriteString(m.id.String()) // len: 36 - buf.Write(m.content) + + content, err := json.Marshal(m.breakPointRules) + if err != nil { + panic(err) + } + buf.Write(content) + return buf.Bytes() } + +func parseMessage(data []byte) message { + if len(data) < 2 { + return nil + } + + if data[0] != messageVersion { + return nil + } + + if !validMessageType(data[1]) { + return nil + } + + mType := (messageType)(data[1]) + + if mType == messageTypeChangeRequest || mType == messageTypeChangeResponse { + return parseMessageEdit(data) + } else if mType == messageTypeChangeBreakPointRules { + return parseMessageMeta(data) + } else { + log.Warnf("invalid message type %v", mType) + return nil + } +} diff --git a/addon/web/web.go b/addon/web/web.go index bf17e27..1c73236 100644 --- a/addon/web/web.go +++ b/addon/web/web.go @@ -96,7 +96,7 @@ func (web *WebAddon) removeConn(conn *concurrentConn) { web.conns = append(web.conns[:index], web.conns[index+1:]...) } -func (web *WebAddon) sendFlow(f *flow.Flow, msgFn func() *message) bool { +func (web *WebAddon) sendFlow(f *flow.Flow, msgFn func() *messageFlow) bool { web.connsMu.RLock() conns := web.conns web.connsMu.RUnlock() @@ -114,25 +114,25 @@ func (web *WebAddon) sendFlow(f *flow.Flow, msgFn func() *message) bool { } func (web *WebAddon) Requestheaders(f *flow.Flow) { - web.sendFlow(f, func() *message { - return newMessageRequest(f) + web.sendFlow(f, func() *messageFlow { + return newMessageFlow(messageTypeRequest, f) }) } func (web *WebAddon) Request(f *flow.Flow) { - web.sendFlow(f, func() *message { - return newMessageRequestBody(f) + web.sendFlow(f, func() *messageFlow { + return newMessageFlow(messageTypeRequestBody, f) }) } func (web *WebAddon) Responseheaders(f *flow.Flow) { - web.sendFlow(f, func() *message { - return newMessageResponse(f) + web.sendFlow(f, func() *messageFlow { + return newMessageFlow(messageTypeResponse, f) }) } func (web *WebAddon) Response(f *flow.Flow) { - web.sendFlow(f, func() *message { - return newMessageResponseBody(f) + web.sendFlow(f, func() *messageFlow { + return newMessageFlow(messageTypeResponseBody, f) }) } diff --git a/flow/flow.go b/flow/flow.go index 7903f20..f79679d 100644 --- a/flow/flow.go +++ b/flow/flow.go @@ -28,7 +28,6 @@ func (req *Request) MarshalJSON() ([]byte, error) { r["url"] = req.URL.String() r["proto"] = req.Proto r["header"] = req.Header - r["body"] = req.Body return json.Marshal(r) }