Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion tars/application.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,8 @@ func (a *application) initConfig() {
a.svrCfg.Node = sMap["node"]
a.svrCfg.App = sMap["app"]
a.svrCfg.Server = sMap["server"]
a.svrCfg.LocalIP = sMap["localip"]
a.svrCfg.LocalIP = c.GetStringWithDef("/tars/application/server<localip>", a.svrCfg.LocalIP)
a.svrCfg.NodeName = c.GetStringWithDef("/tars/application/server<node_name>", a.svrCfg.LocalIP)
a.svrCfg.Local = c.GetString("/tars/application/server<local>")
// svrCfg.Container = c.GetString("/tars/application<container>")

Expand Down Expand Up @@ -252,6 +253,7 @@ func (a *application) initConfig() {
a.cltCfg.ClientDialTimeout = tools.ParseTimeOut(c.GetIntWithDef("/tars/application/client<clientdialtimeout>", ClientDialTimeout))
a.cltCfg.ReqDefaultTimeout = c.GetInt32WithDef("/tars/application/client<reqdefaulttimeout>", ReqDefaultTimeout)
a.cltCfg.ObjQueueMax = c.GetInt32WithDef("/tars/application/client<objqueuemax>", ObjQueueMax)
a.cltCfg.context["node_name"] = a.svrCfg.NodeName
ca := c.GetString("/tars/application/client<ca>")
if ca != "" {
cert := c.GetString("/tars/application/client<cert>")
Expand Down
7 changes: 7 additions & 0 deletions tars/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ type serverConfig struct {
LogNum uint64
LogLevel string
Version string
NodeName string
LocalIP string
Local string
BasePath string
Expand Down Expand Up @@ -91,6 +92,7 @@ type clientConfig struct {
ClientDialTimeout time.Duration
ReqDefaultTimeout int32
ObjQueueMax int32
context map[string]string
}

// GetServerConfig Get server config
Expand Down Expand Up @@ -180,6 +182,11 @@ func newClientConfig() *clientConfig {
ClientDialTimeout: tools.ParseTimeOut(ClientDialTimeout),
ReqDefaultTimeout: ReqDefaultTimeout,
ObjQueueMax: ObjQueueMax,
context: make(map[string]string),
}
return conf
}

func (c *clientConfig) Context() map[string]string {
return c.context
}
2 changes: 1 addition & 1 deletion tars/endpointmanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -259,7 +259,7 @@ func newEndpointManager(objName string, comm *Communicator, opts ...EndpointMana
query := new(queryf.QueryF)
TLOG.Debug("string to proxy locator ", obj)
e.comm.StringToProxy(obj, query)
e.registrar = tarsregistry.New(query)
e.registrar = tarsregistry.New(query, e.comm.Client)
} else {
e.registrar = e.comm.opt.registrar
}
Expand Down
4 changes: 2 additions & 2 deletions tars/nodef.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ func (n *NodeFHelper) KeepAlive(adapter string) {
}
n.si.Pid = int32(os.Getpid())
n.si.Adapter = adapter
_, err := n.sf.KeepAlive(&n.si)
_, err := n.sf.KeepAlive(&n.si, n.comm.Client.Context())
if err != nil {
TLOG.Error("keepalive fail:", adapter)
}
Expand All @@ -47,7 +47,7 @@ func (n *NodeFHelper) ReportVersion(version string) {
if n.sf == nil {
return
}
_, err := n.sf.ReportVersion(n.si.Application, n.si.ServerName, version)
_, err := n.sf.ReportVersion(n.si.Application, n.si.ServerName, version, n.comm.Client.Context())
if err != nil {
TLOG.Error("report Version fail:")
}
Expand Down
2 changes: 1 addition & 1 deletion tars/notifyf.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ func (n *NotifyHelper) ReportNotifyInfo(level int32, info string) {
n.tm.ELevel = notifyf.NOTIFYLEVEL(level)
n.tm.SMessage = info
TLOG.Debug(n.tm)
if err := n.tn.ReportNotifyInfo(&n.tm); err != nil {
if err := n.tn.ReportNotifyInfo(&n.tm, n.comm.Client.Context()); err != nil {
TLOG.Errorf("ReportNotifyInfo err: %v", err)
}
}
Expand Down
4 changes: 2 additions & 2 deletions tars/propertyf.go
Original file line number Diff line number Diff line change
Expand Up @@ -425,7 +425,7 @@ func (p *PropertyReportHelper) ReportToServer() {
for k, v := range statMsg {
cnt++
if cnt >= 20 {
_, err := p.pf.ReportPropMsg(tmpStatMsg)
_, err := p.pf.ReportPropMsg(tmpStatMsg, p.comm.Client.Context())
if err != nil {
TLOG.Error("Send to property server Error", reflect.TypeOf(err), err)
}
Expand All @@ -434,7 +434,7 @@ func (p *PropertyReportHelper) ReportToServer() {
tmpStatMsg[k] = v
}
if len(tmpStatMsg) > 0 {
_, err := p.pf.ReportPropMsg(tmpStatMsg)
_, err := p.pf.ReportPropMsg(tmpStatMsg, p.comm.Client.Context())
if err != nil {
TLOG.Error("Send to property server Error", reflect.TypeOf(err), err)
}
Expand Down
4 changes: 2 additions & 2 deletions tars/rconfig.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ func (c *RConf) GetConfigList() (fList []string, err error) {
Containername:string
*/
}
ret, err := c.tc.ListAllConfigByInfo(&info, &fList)
ret, err := c.tc.ListAllConfigByInfo(&info, &fList, c.comm.Client.Context())
if err != nil {
return fList, err
}
Expand Down Expand Up @@ -117,7 +117,7 @@ func (c *RConf) getConfig(info configf.ConfigInfo) (config string, err error) {
set = v
}
info.Setdivision = set
ret, err := c.tc.LoadConfigByInfo(&info, &config)
ret, err := c.tc.LoadConfigByInfo(&info, &config, c.comm.Client.Context())
if err != nil {
return config, err
}
Expand Down
13 changes: 9 additions & 4 deletions tars/registry/tars/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,17 @@ import (
"github.com/TarsCloud/TarsGo/tars/registry"
)

type Context interface {
Context() map[string]string
}

type tarsRegistry struct {
query *queryf.QueryF
ctx Context
}

func New(query *queryf.QueryF) registry.Registrar {
return &tarsRegistry{query: query}
func New(query *queryf.QueryF, ctx Context) registry.Registrar {
return &tarsRegistry{query: query, ctx: ctx}
}

func (t *tarsRegistry) Registry(_ context.Context, _ *registry.ServantInstance) error {
Expand All @@ -25,7 +30,7 @@ func (t *tarsRegistry) Deregister(_ context.Context, _ *registry.ServantInstance
}

func (t *tarsRegistry) QueryServant(ctx context.Context, id string) (activeEp []registry.Endpoint, inactiveEp []registry.Endpoint, err error) {
ret, err := t.query.FindObjectByIdInSameGroupWithContext(ctx, id, &activeEp, &inactiveEp)
ret, err := t.query.FindObjectByIdInSameGroupWithContext(ctx, id, &activeEp, &inactiveEp, t.ctx.Context())
if err != nil {
return nil, nil, err
}
Expand All @@ -36,7 +41,7 @@ func (t *tarsRegistry) QueryServant(ctx context.Context, id string) (activeEp []
}

func (t *tarsRegistry) QueryServantBySet(ctx context.Context, id, set string) (activeEp []registry.Endpoint, inactiveEp []registry.Endpoint, err error) {
ret, err := t.query.FindObjectByIdInSameSetWithContext(ctx, id, set, &activeEp, &inactiveEp)
ret, err := t.query.FindObjectByIdInSameSetWithContext(ctx, id, set, &activeEp, &inactiveEp, t.ctx.Context())
if err != nil {
return nil, nil, err
}
Expand Down
4 changes: 3 additions & 1 deletion tars/remotelogger.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ type RemoteTimeWriter struct {
reportSuccessPtr *PropertyReport
reportFailPtr *PropertyReport
hasPrefix bool
comm *Communicator
}

// NewRemoteTimeWriter new and init RemoteTimeWriter
Expand All @@ -34,6 +35,7 @@ func NewRemoteTimeWriter() *RemoteTimeWriter {
log := GetServerConfig().Log
comm := GetCommunicator()
comm.StringToProxy(log, rw.logPtr)
rw.comm = comm
go rw.Sync2remote()
return rw
}
Expand Down Expand Up @@ -71,7 +73,7 @@ func (rw *RemoteTimeWriter) Sync2remote() {
}

func (rw *RemoteTimeWriter) sync2remote(s []string) error {
err := rw.logPtr.LoggerbyInfo(rw.logInfo, s)
err := rw.logPtr.LoggerbyInfo(rw.logInfo, s, rw.comm.Client.Context())
return err
}

Expand Down
4 changes: 2 additions & 2 deletions tars/statf.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ func (s *StatFHelper) getIntervCount(totalRspTime int32, intervalCount map[int32
func (s *StatFHelper) reportAndClear(mStat string, bFromClient bool) {
// report mStatInfo
if mStat == "mStatInfo" {
_, err := s.sf.ReportMicMsg(s.mStatInfo, bFromClient)
_, err := s.sf.ReportMicMsg(s.mStatInfo, bFromClient, s.comm.Client.Context())
if err != nil {
TLOG.Debug("mStatInfo report err:", err.Error())
}
Expand All @@ -124,7 +124,7 @@ func (s *StatFHelper) reportAndClear(mStat string, bFromClient bool) {
}
// report mStatInfoFromServer
if mStat == "mStatInfoFromServer" {
_, err := s.sf.ReportMicMsg(s.mStatInfoFromServer, bFromClient)
_, err := s.sf.ReportMicMsg(s.mStatInfoFromServer, bFromClient, s.comm.Client.Context())
if err != nil {
TLOG.Debug("mStatInfoFromServer report err:", err.Error())
}
Expand Down