From a260bbe394f91fa05b163315390ed133de5c5494 Mon Sep 17 00:00:00 2001 From: holyfei Date: Wed, 19 Aug 2020 17:43:24 +0800 Subject: [PATCH] clock: synchronizes clock info to agent reason: virtual machine's clock may be incorrect, proxy synchronizes clock info to help virtual machine adjust clock time Signed-off-by: yangfeiyu --- proxy.go | 7 ++ proxy_test.go | 29 +++++ sync_clock_client.go | 107 +++++++++++++++++ sync_clock_client_test.go | 132 +++++++++++++++++++++ .../kata-containers/agent/pkg/clock/clock_util.go | 44 +++++++ 5 files changed, 319 insertions(+) create mode 100644 sync_clock_client.go create mode 100644 sync_clock_client_test.go create mode 100644 vendor/github.com/kata-containers/agent/pkg/clock/clock_util.go diff --git a/proxy.go b/proxy.go index ab062a5..9dfcb3c 100644 --- a/proxy.go +++ b/proxy.go @@ -105,6 +105,13 @@ func serve(servConn io.ReadWriteCloser, proto, addr string, results chan error) // Start the heartbeat in a separate go routine go heartBeat(session) + // start the sync clock in a separate go routine + syncClockStream, err := session.Open() + if err != nil { + return nil, nil, err + } + go SyncClock(syncClockStream) + // serving connection l, err := net.Listen(proto, addr) if err != nil { diff --git a/proxy_test.go b/proxy_test.go index 923b138..94fa523 100644 --- a/proxy_test.go +++ b/proxy_test.go @@ -10,6 +10,7 @@ package main import ( "bytes" "crypto/md5" + "encoding/json" "fmt" "io" "io/ioutil" @@ -23,6 +24,7 @@ import ( "testing" "github.com/hashicorp/yamux" + "github.com/kata-containers/agent/pkg/clock" "github.com/stretchr/testify/assert" ) @@ -121,6 +123,13 @@ func server(listener net.Listener, closeCh chan bool) error { session.Close() }() + // accept the sync clock stream first + if syncClockStream, err := session.Accept(); err != nil { + return err + } else { + go serverSyncClock(syncClockStream) + } + for { stream, err := session.Accept() if err != nil { @@ -133,6 +142,26 @@ func server(listener net.Listener, closeCh chan bool) error { } } +func serverSyncClock(stream net.Conn) { + for { + buf, byteNum, err := readConnData(stream) + if err != nil { + continue + } + var clockInfo clock.TimeValue + if err := json.Unmarshal(buf[:byteNum], &clockInfo); err != nil { + continue + } + nowTime := clock.GetCurrentTimeNs() + clockInfo.ClientArriveTime = nowTime + clockInfo.ServerSendTime = nowTime + b, _ := json.Marshal(clockInfo) + if err := clock.WriteConnData(stream, b); err != nil { + continue + } + } +} + func TestUnixAddrParsing(T *testing.T) { buf := "unix://foo/bar" addr, err := unixAddr(buf) diff --git a/sync_clock_client.go b/sync_clock_client.go new file mode 100644 index 0000000..9bf3e91 --- /dev/null +++ b/sync_clock_client.go @@ -0,0 +1,107 @@ +// Copyright (c) Huawei Technologies Co., Ltd. 2018. All rights reserved. +// SPDX-License-Identifier: Apache-2.0 +// Description: sync clock client related function +// Author: xueshaojia x00464843 +// Create: 2018-11-10 + +package main + +import ( + "encoding/json" + "fmt" + "net" + "time" + + "github.com/kata-containers/agent/pkg/clock" +) + +const ( + allowTimeDiff = 10 * 1000 * 1000 // allow 10ms's difference + syncClockInterval = 60 * time.Second // sync clock with agent every 60 seconds + rpcTimeout = 10 * time.Second // timeout for proxy's reading data +) + +// readConnData reads data from stream +func readConnData(stream net.Conn) (buf []byte, byteNum int, err error) { + // set read deadline to avoid case as following: + // proxy and agent are both reading and then syncClock will never work + stream.SetReadDeadline(time.Now().Add(rpcTimeout)) + buf = make([]byte, clock.MaxSyncClockByteNum) + byteNum, err = stream.Read(buf) + return buf, byteNum, err +} + +// getGuestClock syncs guest clock info +// sends ClientSendTime +// waits for ClientArriveTime and ServerSendTime +func getGuestClock(stream net.Conn, clockInfo *clock.TimeValue) error { + clockInfo.Delta = 0 + b, err := json.Marshal(clockInfo) + if err != nil { + return err + } + if err = clock.WriteConnData(stream, b); err != nil { + return err + } + + buf, byteNum, err := readConnData(stream) + if err != nil { + return err + } + + if err = json.Unmarshal(buf[:byteNum], clockInfo); err != nil { + return fmt.Errorf("sync clock, parse guest clocktime error:%v", err) + } + return nil +} + +// adjustGuestClock tells server to ajust local clock with Delta +func adjustGuestClock(stream net.Conn, clockInfo *clock.TimeValue) error { + b, err := json.Marshal(clockInfo) + if err != nil { + return err + } + logger().Debugf("sync clock, send:%s", string(b)) + return clock.WriteConnData(stream, b) +} + +// syncClock performs all the steps +// 1 get client send time[host] +// 2 request for client arrive time and server send time[guest os] +// 3 get server arrive time[host] +// 4 calculate clock diff +// 5 request to adjust guest clock +func syncClock(stream net.Conn) error { + var clockInfo clock.TimeValue + if clockInfo.ClientSendTime = clock.GetCurrentTimeNs(); clockInfo.ClientSendTime <= 0 { + return fmt.Errorf("sync clock, get client sendtime error") + } + err := getGuestClock(stream, &clockInfo) + if err != nil { + return fmt.Errorf("sync clock, get guest clocktime error:%v", err) + } + if clockInfo.ServerArriveTime = clock.GetCurrentTimeNs(); clockInfo.ServerArriveTime <= 0 { + return fmt.Errorf("sync clock, get client recvtime error") + } + if clockInfo.ClientSendTime <= 0 || clockInfo.ClientArriveTime <= 0 || clockInfo.ServerSendTime <= 0 { + return fmt.Errorf("sync clock, some fields of NTP message error, raw message:%v", clockInfo) + } + + delta := ((clockInfo.ClientSendTime - clockInfo.ClientArriveTime) + (clockInfo.ServerArriveTime - clockInfo.ServerSendTime)) / 2 + if delta < -allowTimeDiff || delta > allowTimeDiff { + clockInfo.Delta = delta + if err := adjustGuestClock(stream, &clockInfo); err != nil { + return fmt.Errorf("sync clock, failed to adjust guest clock : %v", err) + } + } + return nil +} + +func SyncClock(stream net.Conn) { + for { + if err := syncClock(stream); err != nil { + logger().WithError(err).Error("sync clock failed") + } + time.Sleep(syncClockInterval) + } +} diff --git a/sync_clock_client_test.go b/sync_clock_client_test.go new file mode 100644 index 0000000..b0b1c85 --- /dev/null +++ b/sync_clock_client_test.go @@ -0,0 +1,132 @@ +// Copyright (c) Huawei Technologies Co., Ltd. 2018. All rights reserved. +// SPDX-License-Identifier: Apache-2.0 +// Description: sync clock client related test +// Author: xueshaojia x00464843 +// Create: 2018-11-10 + +package main + +import ( + "encoding/json" + "fmt" + "math/rand" + "net" + "os" + "testing" + + "github.com/hashicorp/yamux" + "github.com/kata-containers/agent/pkg/clock" +) + +func TestReadWriteData(t *testing.T) { + var clockInfo clock.TimeValue + clockInfo.ClientSendTime = clock.GetCurrentTimeNs() + b, err := json.Marshal(clockInfo) + if err != nil { + t.Fatalf("Marshal clock info fail, err:%v", err) + } + + err = clock.WriteConnData(clientStream, b) + fmt.Printf("client send: %s\n", string(b)) + if err != nil { + t.Fatalf("send clock info fail, err:%v", err) + } + _, _, err = readConnData(clientStream) + if err != nil { + t.Fatalf("recv clock info fail, err:%v", err) + } +} + +func SetUpServer(sock string, readyChan chan int) error { + var err error + listener, err = net.Listen("unix", sock) + if err != nil { + return err + } + readyChan <- 1 + conn, err := listener.Accept() + if err != nil { + return err + } + session, err := yamux.Server(conn, nil) + if err != nil { + return err + } + serverSession = session + stream, err := session.Accept() + if err != nil { + return err + } + for { + var clockInfo clock.TimeValue + var byteNum int + var err error + buf := make([]byte, 400) + if byteNum, err = stream.Read(buf); err != nil { + break + } + + if err = json.Unmarshal(buf[:byteNum], &clockInfo); err != nil { + break + } + if clockInfo.Delta == 0 { + nowTime := clock.GetCurrentTimeNs() + clockInfo.ClientArriveTime = nowTime + clockInfo.ClientArriveTime = nowTime + b, _ := json.Marshal(&clockInfo) + stream.Write(b) + } + } + return nil +} + +func SetUpClient(sock string) error { + conn, err := net.Dial("unix", sock) + if err != nil { + return err + } + session, err := yamux.Client(conn, nil) + if err != nil { + conn.Close() + return err + } + clientSession = session + stream, err := session.Open() + if err != nil { + clientSession.Close() + return err + } + clientStream = stream + return nil +} + +func TearDown() { + listener.Close() + serverSession.Close() + clientSession.Close() +} + +func GenSocket() string { + randSeed := clock.GetCurrentTimeNs() + rand.Seed(randSeed) + return fmt.Sprintf("/tmp/%d.sock", rand.Uint32()) +} + +var listener net.Listener +var clientStream net.Conn +var serverSession *yamux.Session +var clientSession *yamux.Session + +func TestMain(m *testing.M) { + waitConn := make(chan int) + testSock := GenSocket() + go SetUpServer(testSock, waitConn) + <-waitConn + if err := SetUpClient(testSock); err != nil { + listener.Close() + serverSession.Close() + os.Exit(1) + } + m.Run() + TearDown() +} diff --git a/vendor/github.com/kata-containers/agent/pkg/clock/clock_util.go b/vendor/github.com/kata-containers/agent/pkg/clock/clock_util.go new file mode 100644 index 0000000..03244fd --- /dev/null +++ b/vendor/github.com/kata-containers/agent/pkg/clock/clock_util.go @@ -0,0 +1,44 @@ +// Copyright (c) Huawei Technologies Co., Ltd. 2019. All rights reserved. +// SPDX-License-Identifier: Apache-2.0 +// Description: common functions +// Author: jiangpeifei +// Create: 2019-05-28 + +package clock + +import ( + "net" + "syscall" +) + +type TimeValue struct { + ClientSendTime int64 `json:"client_send_time"` + ClientArriveTime int64 `json:"client_arrive_time"` + ServerSendTime int64 `json:"server_send_time"` + ServerArriveTime int64 `json:"server_arrive_time"` + Delta int64 `json:"delta"` +} + +const MaxSyncClockByteNum = 400 //sync clock byte num, max=400 + +// getCurrentTimeNs returns UTC time in Ns +func GetCurrentTimeNs() int64 { + var tv syscall.Timeval + if err := syscall.Gettimeofday(&tv); err != nil { + return -1 + } + return tv.Sec*1000000000 + tv.Usec*1000 +} + +// readConnData reads data from stream +func ReadConnData(stream net.Conn) (buf []byte, byteNum int, err error) { + buf = make([]byte, MaxSyncClockByteNum) + byteNum, err = stream.Read(buf) + return buf, byteNum, err +} + +// writeConnData writes data to stream +func WriteConnData(stream net.Conn, buf []byte) error { + _, err := stream.Write(buf) + return err +} -- 2.14.3 (Apple Git-98)