Title here
Summary here
Shim(垫片)是 containerd 与容器进程之间的中间层,它是实现容器独立性的关键组件。
containerd 1.2+ 使用的现代 Shim 协议:
协议: TTRPC over Unix Socket
二进制命名: containerd-shim-<runtime>-v2
示例: containerd-shim-runc-v2// pkg/shim/shim.go
type BootstrapParams struct {
// Version 协议版本
Version int `json:"version"`
// Address Shim 监听地址
Address string `json:"address"`
// Protocol 协议类型 (ttrpc, grpc)
Protocol string `json:"protocol"`
}// cmd/containerd-shim-runc-v2/main.go
func main() {
// 解析命令
if os.Args[1] == "start" {
// 启动模式:daemonize 并返回 bootstrap 信息
startShim()
return
}
// 运行模式:作为 TTRPC 服务器运行
runShim()
}
func startShim() {
// 1. 创建 socket 路径
address := filepath.Join(bundlePath, "shim.sock")
// 2. Fork 新进程
cmd := exec.Command(os.Args[0], "run")
cmd.SysProcAttr = &syscall.SysProcAttr{
Setsid: true, // 创建新会话
}
cmd.Start()
// 3. 输出 bootstrap 信息
params := BootstrapParams{
Version: 2,
Address: address,
Protocol: "ttrpc",
}
json.NewEncoder(os.Stdout).Encode(params)
}// api/runtime/task/v2/shim.proto
service Task {
rpc State(StateRequest) returns (StateResponse);
rpc Create(CreateTaskRequest) returns (CreateTaskResponse);
rpc Start(StartRequest) returns (StartResponse);
rpc Delete(DeleteRequest) returns (DeleteResponse);
rpc Pids(PidsRequest) returns (PidsResponse);
rpc Pause(PauseRequest) returns (google.protobuf.Empty);
rpc Resume(ResumeRequest) returns (google.protobuf.Empty);
rpc Checkpoint(CheckpointTaskRequest) returns (google.protobuf.Empty);
rpc Kill(KillRequest) returns (google.protobuf.Empty);
rpc Exec(ExecProcessRequest) returns (google.protobuf.Empty);
rpc ResizePty(ResizePtyRequest) returns (google.protobuf.Empty);
rpc CloseIO(CloseIORequest) returns (google.protobuf.Empty);
rpc Update(UpdateTaskRequest) returns (google.protobuf.Empty);
rpc Wait(WaitRequest) returns (WaitResponse);
rpc Stats(StatsRequest) returns (StatsResponse);
rpc Connect(ConnectRequest) returns (ConnectResponse);
rpc Shutdown(ShutdownRequest) returns (google.protobuf.Empty);
}// cmd/containerd-shim-runc-v2/service.go
type service struct {
mu sync.Mutex
container runc.Container
context context.Context
processes map[string]process.Process
events chan interface{}
platform stdio.Platform
}
func (s *service) Create(ctx context.Context, req *taskAPI.CreateTaskRequest) (*taskAPI.CreateTaskResponse, error) {
// 1. 创建 Container 实例
container, err := runc.NewContainer(ctx, s.platform, &runc.CreateOpts{
Bundle: req.Bundle,
Rootfs: req.Rootfs,
IO: req.IO,
Terminal: req.Terminal,
Stdin: req.Stdin,
Stdout: req.Stdout,
Stderr: req.Stderr,
})
if err != nil {
return nil, err
}
s.container = container
// 2. 返回容器 PID
return &taskAPI.CreateTaskResponse{
Pid: uint32(container.Pid()),
}, nil
}
func (s *service) Start(ctx context.Context, req *taskAPI.StartRequest) (*taskAPI.StartResponse, error) {
// 获取要启动的进程
p := s.processes[req.ExecID]
if p == nil {
p = s.container.Init()
}
// 启动进程
if err := p.Start(ctx); err != nil {
return nil, err
}
return &taskAPI.StartResponse{
Pid: uint32(p.Pid()),
}, nil
}
func (s *service) Wait(ctx context.Context, req *taskAPI.WaitRequest) (*taskAPI.WaitResponse, error) {
// 获取进程
p := s.processes[req.ExecID]
if p == nil {
p = s.container.Init()
}
// 等待进程退出
exitStatus, exitedAt, err := p.Wait()
if err != nil {
return nil, err
}
return &taskAPI.WaitResponse{
ExitStatus: exitStatus,
ExitedAt: exitedAt,
}, nil
}Shim 作为容器进程的父进程,负责回收子进程:
// cmd/containerd-shim-runc-v2/process/init.go
func (p *Init) Wait() {
// 使用 reaper 监听子进程退出
ch := make(chan runc.Exit, 1)
go func() {
status, _ := p.runtime.Wait(p.pid)
ch <- runc.Exit{
Pid: p.pid,
Status: status,
}
}()
select {
case exit := <-ch:
p.mu.Lock()
p.exitStatus = exit.Status
p.exitedAt = time.Now()
p.mu.Unlock()
// 通知等待者
close(p.waitCh)
case <-p.context.Done():
return
}
}// 设置为 subreaper,接收孤儿进程
func setSubreaper() error {
return unix.Prctl(unix.PR_SET_CHILD_SUBREAPER, 1, 0, 0, 0)
}// pkg/shim/binary/binary.go
type binaryIO struct {
cmd *exec.Cmd
stdin io.WriteCloser
stdout io.ReadCloser
stderr io.ReadCloser
}
// 通过外部程序处理 I/O
func NewBinaryIO(ctx context.Context, id, binary string) (stdio.IO, error) {
cmd := exec.CommandContext(ctx, binary, id)
stdin, _ := cmd.StdinPipe()
stdout, _ := cmd.StdoutPipe()
stderr, _ := cmd.StderrPipe()
cmd.Start()
return &binaryIO{
cmd: cmd,
stdin: stdin,
stdout: stdout,
stderr: stderr,
}, nil
}// core/runtime/v2/shim_manager.go
func (m *ShimManager) loadExistingShims(ctx context.Context) error {
// 扫描 state 目录
entries, _ := os.ReadDir(m.state)
for _, entry := range entries {
if !entry.IsDir() {
continue
}
// 读取 bootstrap 信息
bundlePath := filepath.Join(m.state, entry.Name())
address := filepath.Join(bundlePath, "shim.sock")
// 检查 socket 是否存在
if _, err := os.Stat(address); err != nil {
continue
}
// 重新连接
conn, err := m.connect(ctx, address)
if err != nil {
continue
}
// 恢复 Shim 实例
shim := newShim(conn, &BootstrapParams{Address: address})
m.shims.Store(entry.Name(), shim)
}
return nil
}// pkg/shim/interface.go
type Shim interface {
// Cleanup 清理资源
Cleanup(ctx context.Context) (*shimapi.DeleteResponse, error)
// StartShim 启动 Shim
StartShim(ctx context.Context, opts StartOpts) (string, error)
}package main
import (
"github.com/containerd/containerd/v2/pkg/shim"
shimapi "github.com/containerd/containerd/v2/api/runtime/task/v2"
)
type myShim struct {
// 自定义字段
}
func (s *myShim) Create(ctx context.Context, req *shimapi.CreateTaskRequest) (*shimapi.CreateTaskResponse, error) {
// 创建容器
return &shimapi.CreateTaskResponse{
Pid: 1234,
}, nil
}
func (s *myShim) Start(ctx context.Context, req *shimapi.StartRequest) (*shimapi.StartResponse, error) {
// 启动容器
return &shimapi.StartResponse{
Pid: 1234,
}, nil
}
// ... 实现其他方法 ...
func main() {
shim.Run("io.containerd.my-runtime.v2", func(ctx context.Context, id string, config shim.Config) (shim.Shim, error) {
return &myShim{}, nil
})
}Shim 机制的关键点:
下一节我们将学习 容器生命周期管理。