Skip to content

Commit f908208

Browse files
authored
Merge pull request #1383 from HusterWan/0.5.x
cherry-pick: cherry-pick #1359 from master to 0.5.x
2 parents feba9bc + f0e8c55 commit f908208

File tree

30 files changed

+31794
-95
lines changed

30 files changed

+31794
-95
lines changed

cri/config/config.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,4 +10,6 @@ type Config struct {
1010
NetworkPluginConfDir string
1111
// SandboxImage is the image used by sandbox container.
1212
SandboxImage string
13+
// CriVersion is the cri version
14+
CriVersion string
1315
}

cri/criservice.go

Lines changed: 114 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,114 @@
1+
package cri
2+
3+
import (
4+
"fmt"
5+
6+
criv1alpha1 "github.com/alibaba/pouch/cri/v1alpha1"
7+
servicev1alpha1 "github.com/alibaba/pouch/cri/v1alpha1/service"
8+
criv1alpha2 "github.com/alibaba/pouch/cri/v1alpha2"
9+
servicev1alpha2 "github.com/alibaba/pouch/cri/v1alpha2/service"
10+
"github.com/alibaba/pouch/daemon/config"
11+
"github.com/alibaba/pouch/daemon/mgr"
12+
13+
"github.com/sirupsen/logrus"
14+
)
15+
16+
// RunCriService start cri service if pouchd is specified with --enable-cri.
17+
func RunCriService(daemonconfig *config.Config, containerMgr mgr.ContainerMgr, imageMgr mgr.ImageMgr, stopCh chan error) {
18+
var err error
19+
20+
defer func() {
21+
stopCh <- err
22+
close(stopCh)
23+
}()
24+
if !daemonconfig.IsCriEnabled {
25+
return
26+
}
27+
switch daemonconfig.CriConfig.CriVersion {
28+
case "v1alpha1":
29+
err = runv1alpha1(daemonconfig, containerMgr, imageMgr)
30+
case "v1alpha2":
31+
err = runv1alpha2(daemonconfig, containerMgr, imageMgr)
32+
default:
33+
err = fmt.Errorf("invalid CRI version,failed to start CRI service")
34+
}
35+
return
36+
}
37+
38+
// Start CRI service with CRI version: v1alpha1
39+
func runv1alpha1(daemonconfig *config.Config, containerMgr mgr.ContainerMgr, imageMgr mgr.ImageMgr) error {
40+
logrus.Infof("Start CRI service with CRI version: v1alpha1")
41+
criMgr, err := criv1alpha1.NewCriManager(daemonconfig, containerMgr, imageMgr)
42+
if err != nil {
43+
return fmt.Errorf("failed to get CriManager with error: %v", err)
44+
}
45+
46+
service, err := servicev1alpha1.NewService(daemonconfig, criMgr)
47+
if err != nil {
48+
return fmt.Errorf("failed to start CRI service with error: %v", err)
49+
}
50+
51+
// TODO: Stop the whole CRI service if any of the critical service exits
52+
grpcServerCloseCh := make(chan struct{})
53+
go func() {
54+
if err := service.Serve(); err != nil {
55+
logrus.Errorf("failed to start grpc server: %v", err)
56+
}
57+
close(grpcServerCloseCh)
58+
}()
59+
60+
streamServerCloseCh := make(chan struct{})
61+
go func() {
62+
if err := criMgr.StreamServerStart(); err != nil {
63+
logrus.Errorf("failed to start stream server: %v", err)
64+
}
65+
close(streamServerCloseCh)
66+
}()
67+
68+
// TODO: refactor it with select
69+
<-streamServerCloseCh
70+
logrus.Infof("CRI Stream server stopped")
71+
<-grpcServerCloseCh
72+
logrus.Infof("CRI GRPC server stopped")
73+
74+
logrus.Infof("CRI service stopped")
75+
return nil
76+
}
77+
78+
// Start CRI service with CRI version: v1alpha2
79+
func runv1alpha2(daemonconfig *config.Config, containerMgr mgr.ContainerMgr, imageMgr mgr.ImageMgr) error {
80+
logrus.Infof("Start CRI service with CRI version: v1alpha2")
81+
criMgr, err := criv1alpha2.NewCriManager(daemonconfig, containerMgr, imageMgr)
82+
if err != nil {
83+
return fmt.Errorf("failed to get CriManager with error: %v", err)
84+
}
85+
86+
service, err := servicev1alpha2.NewService(daemonconfig, criMgr)
87+
if err != nil {
88+
return fmt.Errorf("failed to start CRI service with error: %v", err)
89+
}
90+
// TODO: Stop the whole CRI service if any of the critical service exits
91+
grpcServerCloseCh := make(chan struct{})
92+
go func() {
93+
if err := service.Serve(); err != nil {
94+
logrus.Errorf("failed to start grpc server: %v", err)
95+
}
96+
close(grpcServerCloseCh)
97+
}()
98+
99+
streamServerCloseCh := make(chan struct{})
100+
go func() {
101+
if err := criMgr.StreamServerStart(); err != nil {
102+
logrus.Errorf("failed to start stream server: %v", err)
103+
}
104+
close(streamServerCloseCh)
105+
}()
106+
// TODO: refactor it with select
107+
<-streamServerCloseCh
108+
logrus.Infof("CRI Stream server stopped")
109+
<-grpcServerCloseCh
110+
logrus.Infof("CRI GRPC server stopped")
111+
112+
logrus.Infof("CRI service stopped")
113+
return nil
114+
}

cri/stream/request_cache.go

Lines changed: 12 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -19,10 +19,10 @@ var (
1919
TokenLen = 8
2020
)
2121

22-
// requestCache caches streaming (exec/attach/port-forward) requests and generates a single-use
22+
// RequestCache caches streaming (exec/attach/port-forward) requests and generates a single-use
2323
// random token for their retrieval. The requestCache is used for building streaming URLs without
2424
// the need to encode every request parameter in the URL.
25-
type requestCache struct {
25+
type RequestCache struct {
2626
// tokens maps the generate token to the request for fast retrieval.
2727
tokens map[string]*list.Element
2828
// ll maintains an age-ordered request list for faster garbage collection of expired requests.
@@ -31,24 +31,25 @@ type requestCache struct {
3131
lock sync.Mutex
3232
}
3333

34-
// Type representing an *ExecRequest, *AttachRequest, or *PortForwardRequest.
35-
type request interface{}
34+
// Request representing an *ExecRequest, *AttachRequest, or *PortForwardRequest Type.
35+
type Request interface{}
3636

3737
type cacheEntry struct {
3838
token string
39-
req request
39+
req Request
4040
expireTime time.Time
4141
}
4242

43-
func newRequestCache() *requestCache {
44-
return &requestCache{
43+
// NewRequestCache return a RequestCache
44+
func NewRequestCache() *RequestCache {
45+
return &RequestCache{
4546
ll: list.New(),
4647
tokens: make(map[string]*list.Element),
4748
}
4849
}
4950

5051
// Insert the given request into the cache and returns the token used for fetching it out.
51-
func (c *requestCache) Insert(req request) (token string, err error) {
52+
func (c *RequestCache) Insert(req Request) (token string, err error) {
5253
c.lock.Lock()
5354
defer c.lock.Unlock()
5455

@@ -69,7 +70,7 @@ func (c *requestCache) Insert(req request) (token string, err error) {
6970
}
7071

7172
// Consume the token (remove it from the cache) and return the cached request, if found.
72-
func (c *requestCache) Consume(token string) (req request, found bool) {
73+
func (c *RequestCache) Consume(token string) (req Request, found bool) {
7374
c.lock.Lock()
7475
defer c.lock.Unlock()
7576
ele, ok := c.tokens[token]
@@ -88,7 +89,7 @@ func (c *requestCache) Consume(token string) (req request, found bool) {
8889
}
8990

9091
// uniqueToken generates a random URL-safe token and ensures uniqueness.
91-
func (c *requestCache) uniqueToken() (string, error) {
92+
func (c *RequestCache) uniqueToken() (string, error) {
9293
const maxTries = 10
9394
// Number of bytes to be TokenLen when base64 encoded.
9495
tokenSize := math.Ceil(float64(TokenLen) * 6 / 8)
@@ -108,7 +109,7 @@ func (c *requestCache) uniqueToken() (string, error) {
108109
}
109110

110111
// Must be write-locked prior to calling.
111-
func (c *requestCache) gc() {
112+
func (c *RequestCache) gc() {
112113
now := time.Now()
113114
for c.ll.Len() > 0 {
114115
oldest := c.ll.Back()

cri/src/cri.go renamed to cri/v1alpha1/cri.go

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
package src
1+
package v1alpha1
22

33
import (
44
"bytes"
@@ -12,7 +12,6 @@ import (
1212
"time"
1313

1414
apitypes "github.com/alibaba/pouch/apis/types"
15-
"github.com/alibaba/pouch/cri/stream"
1615
"github.com/alibaba/pouch/daemon/config"
1716
"github.com/alibaba/pouch/daemon/mgr"
1817
"github.com/alibaba/pouch/pkg/errtypes"
@@ -87,7 +86,7 @@ type CriManager struct {
8786
CniMgr CniMgr
8887

8988
// StreamServer is the stream server of CRI serves container streaming request.
90-
StreamServer stream.Server
89+
StreamServer Server
9190

9291
// SandboxBaseDir is the directory used to store sandbox files like /etc/hosts, /etc/resolv.conf, etc.
9392
SandboxBaseDir string

cri/src/cri_network.go renamed to cri/v1alpha1/cri_network.go

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
package src
1+
package v1alpha1
22

33
import (
44
"fmt"
@@ -78,9 +78,6 @@ func (c *CniManager) Name() string {
7878
// are launched.
7979
func (c *CniManager) SetUpPodNetwork(podNetwork *ocicni.PodNetwork) error {
8080
_, err := c.plugin.SetUpPod(*podNetwork)
81-
if err != nil {
82-
return fmt.Errorf("failed to setup network for sandbox %q: %v", podNetwork.ID, err)
83-
}
8481

8582
defer func() {
8683
if err != nil {
@@ -92,6 +89,10 @@ func (c *CniManager) SetUpPodNetwork(podNetwork *ocicni.PodNetwork) error {
9289
}
9390
}()
9491

92+
if err != nil {
93+
return fmt.Errorf("failed to setup network for sandbox %q: %v", podNetwork.ID, err)
94+
}
95+
9596
return nil
9697
}
9798

cri/src/cri_stream.go renamed to cri/v1alpha1/cri_stream.go

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
package src
1+
package v1alpha1
22

33
import (
44
"bytes"
@@ -11,25 +11,24 @@ import (
1111
"time"
1212

1313
apitypes "github.com/alibaba/pouch/apis/types"
14-
"github.com/alibaba/pouch/cri/stream"
1514
"github.com/alibaba/pouch/cri/stream/remotecommand"
1615
"github.com/alibaba/pouch/daemon/mgr"
1716

1817
"github.com/sirupsen/logrus"
1918
)
2019

21-
func newStreamServer(ctrMgr mgr.ContainerMgr, address string, port string) (stream.Server, error) {
22-
config := stream.DefaultConfig
20+
func newStreamServer(ctrMgr mgr.ContainerMgr, address string, port string) (Server, error) {
21+
config := DefaultConfig
2322
config.Address = net.JoinHostPort(address, port)
2423
runtime := newStreamRuntime(ctrMgr)
25-
return stream.NewServer(config, runtime)
24+
return NewServer(config, runtime)
2625
}
2726

2827
type streamRuntime struct {
2928
containerMgr mgr.ContainerMgr
3029
}
3130

32-
func newStreamRuntime(ctrMgr mgr.ContainerMgr) stream.Runtime {
31+
func newStreamRuntime(ctrMgr mgr.ContainerMgr) Runtime {
3332
return &streamRuntime{containerMgr: ctrMgr}
3433
}
3534

@@ -61,6 +60,7 @@ func (s *streamRuntime) Exec(containerID string, cmd []string, streamOpts *remot
6160
return 0, fmt.Errorf("failed to start exec for container %q: %v", containerID, err)
6261
}
6362

63+
// TODO Find a better way instead of the dead loop
6464
var ei *apitypes.ContainerExecInspect
6565
for {
6666
ei, err = s.containerMgr.InspectExec(ctx, execid)

cri/src/cri_types.go renamed to cri/v1alpha1/cri_types.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
package src
1+
package v1alpha1
22

33
import (
44
"k8s.io/kubernetes/pkg/kubelet/apis/cri/v1alpha1/runtime"

cri/src/cri_utils.go renamed to cri/v1alpha1/cri_utils.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
package src
1+
package v1alpha1
22

33
import (
44
"bytes"

cri/src/cri_utils_test.go renamed to cri/v1alpha1/cri_utils_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
package src
1+
package v1alpha1
22

33
import (
44
"fmt"

cri/src/cri_wrapper.go renamed to cri/v1alpha1/cri_wrapper.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
package src
1+
package v1alpha1
22

33
import (
44
"github.com/sirupsen/logrus"

0 commit comments

Comments
 (0)