Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 40 additions & 0 deletions examples/addsvc/transport_http.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"github.com/go-kit/kit/log"
"github.com/go-kit/kit/tracing/opentracing"
httptransport "github.com/go-kit/kit/transport/http"
httpjsonrpctransport "github.com/go-kit/kit/transport/http/jsonrpc"
)

// MakeHTTPHandler returns a handler that makes a set of endpoints available
Expand All @@ -40,6 +41,25 @@ func MakeHTTPHandler(ctx context.Context, endpoints Endpoints, tracer stdopentra
EncodeHTTPGenericResponse,
append(options, httptransport.ServerBefore(opentracing.FromHTTPRequest(tracer, "Concat", logger)))...,
))

s := httpjsonrpctransport.NewServer(
ctx,
httpjsonrpctransport.EndpointCodecMap{
"sum": httpjsonrpctransport.EndpointCodec{
Endpoint: endpoints.SumEndpoint,
Decode: DecodeRPCHTTPConcatRequest,
Encode: EncodeRPCHTTPGenericResponse,
},
"concat": httpjsonrpctransport.EndpointCodec{
Endpoint: endpoints.SumEndpoint,
Decode: DecodeRPCHTTPConcatRequest,
Encode: EncodeRPCHTTPGenericResponse,
},
},
)

m.Handle("/rpc", s)

return m
}

Expand Down Expand Up @@ -130,3 +150,23 @@ func EncodeHTTPGenericRequest(_ context.Context, r *http.Request, request interf
func EncodeHTTPGenericResponse(_ context.Context, w http.ResponseWriter, response interface{}) error {
return json.NewEncoder(w).Encode(response)
}

// DecodeRPCHTTPSumRequest ...
func DecodeRPCHTTPSumRequest(_ context.Context, params json.RawMessage) (interface{}, error) {
var req sumRequest
err := json.Unmarshal(params, &req)
return req, err
}

// DecodeRPCHTTPConcatRequest ...
func DecodeRPCHTTPConcatRequest(_ context.Context, params json.RawMessage) (interface{}, error) {
var req concatRequest
err := json.Unmarshal(params, req)
return req, err
}

// EncodeRPCHTTPGenericResponse ...
func EncodeRPCHTTPGenericResponse(_ context.Context, params interface{}) (json.RawMessage, error) {
res, err := json.Marshal(params)
return res, err
}
31 changes: 31 additions & 0 deletions transport/http/jsonrpc/encode_decode.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package jsonrpc

import (
"encoding/json"

"golang.org/x/net/context"
)

// DecodeRequestFunc extracts a user-domain request object from an HTTP
// request object. It's designed to be used in HTTP servers, for server-side
// endpoints. One straightforward DecodeRequestFunc could be something that
// JSON decodes from the request body to the concrete response type.
type DecodeRequestFunc func(context.Context, json.RawMessage) (request interface{}, err error)

// EncodeRequestFunc encodes the passed request object into the HTTP request
// object. It's designed to be used in HTTP clients, for client-side
// endpoints. One straightforward EncodeRequestFunc could something that JSON
// encodes the object directly to the request body.
// type EncodeRequestFunc func(context.Context, *http.Request, interface{}) error

// EncodeResponseFunc encodes the passed response object to the HTTP response
// writer. It's designed to be used in HTTP servers, for server-side
// endpoints. One straightforward EncodeResponseFunc could be something that
// JSON encodes the object directly to the response body.
type EncodeResponseFunc func(context.Context, interface{}) (response json.RawMessage, err error)

// DecodeResponseFunc extracts a user-domain response object from an HTTP
// response object. It's designed to be used in HTTP clients, for client-side
// endpoints. One straightforward DecodeResponseFunc could be something that
// JSON decodes from the response body to the concrete response type.
// type DecodeResponseFunc func(context.Context, *http.Response) (response interface{}, err error)
72 changes: 72 additions & 0 deletions transport/http/jsonrpc/error.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
package jsonrpc

// Error defines a JSON RPC error that can be returned
// in a Response from the spec
// http://www.jsonrpc.org/specification#error_object
type Error struct {
Code int `json:"code"`
Message string `json:"message"`
Data interface{} `json:"data,omitempty"`
}

const (
// ParseError defines invalid JSON was received by the server.
// An error occurred on the server while parsing the JSON text.
ParseError int = -32700

// InvalidRequestError defines the JSON sent is not a valid Request object.
InvalidRequestError int = -32600

// MethodNotFoundError defines the method does not exist / is not available.
MethodNotFoundError int = -32601

// InvalidParamsError defines invalid method parameter(s).
InvalidParamsError int = -32602

// InternalError defines a server error
InternalError int = -32603
)

var errorMessage = map[int]string{
ParseError: "An error occurred on the server while parsing the JSON text.",
InvalidRequestError: "The JSON sent is not a valid Request object.",
MethodNotFoundError: "The method does not exist / is not available.",
InvalidParamsError: "Invalid method parameter(s).",
InternalError: "Internal JSON-RPC error.",
}

// ErrorMessage returns a message for the JSON RPC error code. It returns the empty
// string if the code is unknown.
func ErrorMessage(code int) string {
return errorMessage[code]
}

type parseError struct{}

func (e *parseError) ErrorCode() int {
return ParseError
}

type invalidRequestError struct{}

func (e *invalidRequestError) ErrorCode() int {
return InvalidRequestError
}

type methodNotFoundError struct{}

func (e *methodNotFoundError) ErrorCode() int {
return MethodNotFoundError
}

type invalidParamsError struct{}

func (e *invalidParamsError) ErrorCode() int {
return InvalidParamsError
}

type internalError struct{}

func (e *internalError) ErrorCode() int {
return InternalError
}
28 changes: 28 additions & 0 deletions transport/http/jsonrpc/request_response_types.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package jsonrpc

import "encoding/json"

// Request defines a JSON RPC request from the spec
// http://www.jsonrpc.org/specification#request_object
type Request struct {
JSONRPC string `json:"jsonrpc"`
Method string `json:"method"`
Params json.RawMessage `json:"params"`
ID interface{} `json:"id"`
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of interface{}, the ID could be a custom type that implements MarshalJSON. Thoughts?

}

// Response defines a JSON RPC response from the spec
// http://www.jsonrpc.org/specification#response_object
type Response struct {
JSONRPC string `json:"jsonrpc"`
Result interface{} `json:"result,omitempty"`
Error Error `json:"error,omitemty"`
}

const (
// Version defines the version of the JSON RPC implementation
Version string = "2.0"

// ContentType defines the content type to be served.
ContentType string = "application/json; charset=utf-8"
)
202 changes: 202 additions & 0 deletions transport/http/jsonrpc/server.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,202 @@
package jsonrpc

import (
"context"
"encoding/json"
"io"
"net/http"

"github.com/go-kit/kit/endpoint"
"github.com/go-kit/kit/log"
httptransport "github.com/go-kit/kit/transport/http"
)

// Server wraps an endpoint and implements http.Handler.
type Server struct {
ctx context.Context
ecm EndpointCodecMap
before []httptransport.RequestFunc
after []httptransport.ServerResponseFunc
finalizer httptransport.ServerFinalizerFunc
logger log.Logger
}

// NewServer constructs a new server, which implements http.Server.
func NewServer(
ctx context.Context,
ecm EndpointCodecMap,
options ...ServerOption,
) *Server {
s := &Server{
ctx: ctx,
ecm: ecm,
logger: log.NewNopLogger(),
}
for _, option := range options {
option(s)
}
return s
}

// EndpointCodec defines and Endpoint and its associated codecs
type EndpointCodec struct {
Endpoint endpoint.Endpoint
Decode DecodeRequestFunc
Encode EncodeResponseFunc
}

// EndpointCodecMap maps the Request.Method to the proper EndpointCodec
type EndpointCodecMap map[string]EndpointCodec

// ServerOption sets an optional parameter for servers.
type ServerOption func(*Server)

// ServerBefore functions are executed on the HTTP request object before the
// request is decoded.
func ServerBefore(before ...httptransport.RequestFunc) ServerOption {
return func(s *Server) { s.before = before }
}

// ServerAfter functions are executed on the HTTP response writer after the
// endpoint is invoked, but before anything is written to the client.
func ServerAfter(after ...httptransport.ServerResponseFunc) ServerOption {
return func(s *Server) { s.after = after }
}

// ServerErrorLogger is used to log non-terminal errors. By default, no errors
// are logged. This is intended as a diagnostic measure. Finer-grained control
// of error handling, including logging in more detail, should be performed in a
// custom ServerErrorEncoder or ServerFinalizer, both of which have access to
// the context.
func ServerErrorLogger(logger log.Logger) ServerOption {
return func(s *Server) { s.logger = logger }
}

// ServerFinalizer is executed at the end of every HTTP request.
// By default, no finalizer is registered.
func ServerFinalizer(f httptransport.ServerFinalizerFunc) ServerOption {
return func(s *Server) { s.finalizer = f }
}

// ServeHTTP implements http.Handler.
func (s Server) ServeHTTP(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodPost {
w.Header().Set("Content-Type", "text/plain; charset=utf-8")
w.WriteHeader(http.StatusMethodNotAllowed)
io.WriteString(w, "405 must POST\n")
return
}
ctx := s.ctx

if s.finalizer != nil {
iw := &interceptingWriter{w, http.StatusOK}
defer func() { s.finalizer(ctx, iw.code, r) }()
w = iw
}

for _, f := range s.before {
ctx = f(ctx, r)
}

// Decode the body into an object
var req Request
err := json.NewDecoder(r.Body).Decode(&req)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's a bit troubling that there is no way to wrap the r.Body by another io.Reader here. For example someone might want to wrap it with a io.LimitReader to ensure that the server is not flooded with a stream of neverending bytes.

Perhaps a ServerOption can be added here.

if err != nil {
s.logger.Log("err", err)
rpcErrorEncoder(ctx, err, w)
return
}

// Get the endpoint and codecs from the map using the method
// defined in the JSON object
ecm := s.ecm[req.Method]

// TODO: Need to handle unregistered methods

// Decode the JSON "params"
reqParams, err := ecm.Decode(ctx, req.Params)
if err != nil {
s.logger.Log("err", err)
rpcErrorEncoder(ctx, err, w)
return
}

// Call the Endpoint with the params
response, err := ecm.Endpoint(ctx, reqParams)
if err != nil {
s.logger.Log("err", err)
rpcErrorEncoder(ctx, err, w)
return
}

for _, f := range s.after {
ctx = f(ctx, w)
}

res := Response{
Error: Error{},
}

// Encode the response from the Endpoint
resParams, err := ecm.Encode(ctx, response)
if err != nil {
s.logger.Log("err", err)
rpcErrorEncoder(ctx, err, w)
return
}

res.Result = resParams

json.NewEncoder(w).Encode(res)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similar to the Decode, but here my request would be to be able to set options on the encoder: Example to pretty print the response, I'd want to SetIndent

}

// ErrorEncoder writes the error to the ResponseWriter, by default a
// content type of text/plain, a body of the plain text of the error, and a
// status code of 500. If the error implements Headerer, the provided headers
// will be applied to the response. If the error implements json.Marshaler, and
// the marshaling succeeds, a content type of application/json and the JSON
// encoded form of the error will be used. If the error implements StatusCoder,
// the provided StatusCode will be used instead of 500.
func rpcErrorEncoder(_ context.Context, err error, w http.ResponseWriter) {
body := []byte(err.Error())

w.Header().Set("Content-Type", ContentType)
if headerer, ok := err.(httptransport.Headerer); ok {
for k := range headerer.Headers() {
w.Header().Set(k, headerer.Headers().Get(k))
}
}

e := Error{
Code: InternalError,
Message: string(body),
}
if sc, ok := err.(ErrorCoder); ok {
e.Code = sc.ErrorCode()
}

w.WriteHeader(http.StatusOK)
json.NewEncoder(w).Encode(Response{
JSONRPC: Version,
Error: e,
})
}

// ErrorCoder is checked by DefaultErrorEncoder. If an error value implements
// ErrorCoder, the Error will be used when encoding the error. By default,
// InternalError (-32603) is used.
type ErrorCoder interface {
ErrorCode() int
}

type interceptingWriter struct {
http.ResponseWriter
code int
}

// WriteHeader may not be explicitly called, so care must be taken to
// initialize w.code to its default value of http.StatusOK.
func (w *interceptingWriter) WriteHeader(code int) {
w.code = code
w.ResponseWriter.WriteHeader(code)
}