Initial provider for nerdctl/Finch

Adds implementation for a provider based on nerdctl. Several todos
in the code but the core functionality of creating/deleting clusters
is working and a simple application deployed works properly

Signed-off-by: Phil Estes <estesp@gmail.com>
This commit is contained in:
Phil Estes
2023-11-17 14:05:09 -05:00
committed by Phil Estes
parent 7c2f6c1dcd
commit 86c7c2e414
11 changed files with 1376 additions and 0 deletions

View File

@@ -0,0 +1,2 @@
labels:
- area/provider/nerdctl

View File

@@ -0,0 +1,24 @@
/*
Copyright 2019 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or impliep.
See the License for the specific language governing permissions and
limitations under the License.
*/
package nerdctl
// clusterLabelKey is applied to each "node" container for identification
const clusterLabelKey = "io.x-k8s.kind.cluster"
// nodeRoleLabelKey is applied to each "node" container for categorization
// of nodes by role
const nodeRoleLabelKey = "io.x-k8s.kind.role"

View File

@@ -0,0 +1,91 @@
/*
Copyright 2019 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package nerdctl
import (
"fmt"
"strings"
"time"
"sigs.k8s.io/kind/pkg/errors"
"sigs.k8s.io/kind/pkg/exec"
"sigs.k8s.io/kind/pkg/log"
"sigs.k8s.io/kind/pkg/cluster/internal/providers/common"
"sigs.k8s.io/kind/pkg/internal/apis/config"
"sigs.k8s.io/kind/pkg/internal/cli"
)
// ensureNodeImages ensures that the node images used by the create
// configuration are present
func ensureNodeImages(logger log.Logger, status *cli.Status, cfg *config.Cluster, binaryName string) error {
// pull each required image
for _, image := range common.RequiredNodeImages(cfg).List() {
// prints user friendly message
friendlyImageName, image := sanitizeImage(image)
status.Start(fmt.Sprintf("Ensuring node image (%s) 🖼", friendlyImageName))
if _, err := pullIfNotPresent(logger, image, 4, binaryName); err != nil {
status.End(false)
return err
}
}
return nil
}
// pullIfNotPresent will pull an image if it is not present locally
// retrying up to retries times
// it returns true if it attempted to pull, and any errors from pulling
func pullIfNotPresent(logger log.Logger, image string, retries int, binaryName string) (pulled bool, err error) {
// TODO(bentheelder): switch most (all) of the logging here to debug level
// once we have configurable log levels
// if this did not return an error, then the image exists locally
cmd := exec.Command(binaryName, "inspect", "--type=image", image)
if err := cmd.Run(); err == nil {
logger.V(1).Infof("Image: %s present locally", image)
return false, nil
}
// otherwise try to pull it
return true, pull(logger, image, retries, binaryName)
}
// pull pulls an image, retrying up to retries times
func pull(logger log.Logger, image string, retries int, binaryName string) error {
logger.V(1).Infof("Pulling image: %s ...", image)
err := exec.Command(binaryName, "pull", image).Run()
// retry pulling up to retries times if necessary
if err != nil {
for i := 0; i < retries; i++ {
time.Sleep(time.Second * time.Duration(i+1))
logger.V(1).Infof("Trying again to pull image: %q ... %v", image, err)
// TODO(bentheelder): add some backoff / sleep?
err = exec.Command(binaryName, "pull", image).Run()
if err == nil {
break
}
}
}
return errors.Wrapf(err, "failed to pull image %q", image)
}
// sanitizeImage is a helper to return human readable image name and
// the docker pullable image name from the provided image
func sanitizeImage(image string) (string, string) {
if strings.Contains(image, "@sha256:") {
return strings.Split(image, "@sha256:")[0], image
}
return image, image
}

View File

@@ -0,0 +1,187 @@
/*
Copyright 2020 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package nerdctl
import (
"crypto/sha1"
"encoding/binary"
"fmt"
"net"
"strconv"
"strings"
"sigs.k8s.io/kind/pkg/errors"
"sigs.k8s.io/kind/pkg/exec"
)
// This may be overridden by KIND_EXPERIMENTAL_DOCKER_NETWORK env,
// experimentally...
//
// By default currently picking a single network is equivalent to the previous
// behavior *except* that we moved from the default bridge to a user defined
// network because the default bridge is actually special versus any other
// docker network and lacks the embedded DNS
//
// For now this also makes it easier for apps to join the same network, and
// leaves users with complex networking desires to create and manage their own
// networks.
const fixedNetworkName = "kind"
// ensureNetwork checks if docker network by name exists, if not it creates it
func ensureNetwork(name, binaryName string) error {
// check if network exists already and remove any duplicate networks
exists, err := checkIfNetworkExists(name, binaryName)
if err != nil {
return err
}
// network already exists, we're good
// TODO: the network might already exist and not have ipv6 ... :|
// discussion: https://github.com/kubernetes-sigs/kind/pull/1508#discussion_r414594198
if exists {
return nil
}
subnet := generateULASubnetFromName(name, 0)
mtu := getDefaultNetworkMTU(binaryName)
err = createNetwork(name, subnet, mtu, binaryName)
if err == nil {
// Success!
return nil
}
// On the first try check if ipv6 fails entirely on this machine
// https://github.com/kubernetes-sigs/kind/issues/1544
// Otherwise if it's not a pool overlap error, fail
// If it is, make more attempts below
if isIPv6UnavailableError(err) {
// only one attempt, IPAM is automatic in ipv4 only
return createNetwork(name, "", mtu, binaryName)
}
if isPoolOverlapError(err) {
// pool overlap suggests perhaps another process created the network
// check if network exists already and remove any duplicate networks
exists, err := checkIfNetworkExists(name, binaryName)
if err != nil {
return err
}
if exists {
return nil
}
// otherwise we'll start trying with different subnets
} else {
// unknown error ...
return err
}
// keep trying for ipv6 subnets
const maxAttempts = 5
for attempt := int32(1); attempt < maxAttempts; attempt++ {
subnet := generateULASubnetFromName(name, attempt)
err = createNetwork(name, subnet, mtu, binaryName)
if err == nil {
// success!
return nil
}
if isPoolOverlapError(err) {
// pool overlap suggests perhaps another process created the network
// check if network exists already and remove any duplicate networks
exists, err := checkIfNetworkExists(name, binaryName)
if err != nil {
return err
}
if exists {
return nil
}
// otherwise we'll try again
continue
}
// unknown error ...
return err
}
return errors.New("exhausted attempts trying to find a non-overlapping subnet")
}
func createNetwork(name, ipv6Subnet string, mtu int, binaryName string) error {
args := []string{"network", "create", "-d=bridge"}
// TODO: Not supported in nerdctl yet
// "-o", "com.docker.network.bridge.enable_ip_masquerade=true",
if mtu > 0 {
args = append(args, "-o", fmt.Sprintf("com.docker.network.driver.mtu=%d", mtu))
}
if ipv6Subnet != "" {
args = append(args, "--ipv6", "--subnet", ipv6Subnet)
}
args = append(args, name)
return exec.Command(binaryName, args...).Run()
}
// getDefaultNetworkMTU obtains the MTU from the docker default network
func getDefaultNetworkMTU(binaryName string) int {
cmd := exec.Command(binaryName, "network", "inspect", "bridge",
"-f", `{{ index .Options "com.docker.network.driver.mtu" }}`)
lines, err := exec.OutputLines(cmd)
if err != nil || len(lines) != 1 {
return 0
}
mtu, err := strconv.Atoi(lines[0])
if err != nil {
return 0
}
return mtu
}
func checkIfNetworkExists(name, binaryName string) (bool, error) {
out, err := exec.Output(exec.Command(
binaryName, "network", "inspect",
name, "--format={{.Name}}",
))
if err != nil {
return false, nil
}
return strings.HasPrefix(string(out), name), err
}
func isIPv6UnavailableError(err error) bool {
rerr := exec.RunErrorForError(err)
return rerr != nil && strings.HasPrefix(string(rerr.Output), "Error response from daemon: Cannot read IPv6 setup for bridge")
}
func isPoolOverlapError(err error) bool {
rerr := exec.RunErrorForError(err)
return rerr != nil && strings.HasPrefix(string(rerr.Output), "Error response from daemon: Pool overlaps with other one on this address space") || strings.Contains(string(rerr.Output), "networks have overlapping")
}
// generateULASubnetFromName generate an IPv6 subnet based on the
// name and Nth probing attempt
func generateULASubnetFromName(name string, attempt int32) string {
ip := make([]byte, 16)
ip[0] = 0xfc
ip[1] = 0x00
h := sha1.New()
_, _ = h.Write([]byte(name))
_ = binary.Write(h, binary.LittleEndian, attempt)
bs := h.Sum(nil)
for i := 2; i < 8; i++ {
ip[i] = bs[i]
}
subnet := &net.IPNet{
IP: net.IP(ip),
Mask: net.CIDRMask(64, 128),
}
return subnet.String()
}

View File

@@ -0,0 +1,68 @@
/*
Copyright 2020 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package nerdctl
import (
"fmt"
"testing"
)
func Test_generateULASubnetFromName(t *testing.T) {
t.Parallel()
cases := []struct {
name string
attempt int32
subnet string
}{
{
name: "kind",
subnet: "fc00:f853:ccd:e793::/64",
},
{
name: "foo",
attempt: 1,
subnet: "fc00:8edf:7f02:ec8f::/64",
},
{
name: "foo",
attempt: 2,
subnet: "fc00:9968:306b:2c65::/64",
},
{
name: "kind2",
subnet: "fc00:444c:147a:44ab::/64",
},
{
name: "kin",
subnet: "fc00:fcd9:c2be:8e23::/64",
},
{
name: "mysupernetwork",
subnet: "fc00:7ae1:1e0d:b4d4::/64",
},
}
for _, tc := range cases {
tc := tc // capture variable
t.Run(fmt.Sprintf("%s,%d", tc.name, tc.attempt), func(t *testing.T) {
t.Parallel()
subnet := generateULASubnetFromName(tc.name, tc.attempt)
if subnet != tc.subnet {
t.Errorf("Wrong subnet from %v: expected %v, received %v", tc.name, tc.subnet, subnet)
}
})
}
}

View File

@@ -0,0 +1,175 @@
/*
Copyright 2019 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or impliep.
See the License for the specific language governing permissions and
limitations under the License.
*/
package nerdctl
import (
"context"
"fmt"
"io"
"strings"
"sigs.k8s.io/kind/pkg/errors"
"sigs.k8s.io/kind/pkg/exec"
)
// nodes.Node implementation for the docker provider
type node struct {
name string
binaryName string
}
func (n *node) String() string {
return n.name
}
func (n *node) Role() (string, error) {
cmd := exec.Command(n.binaryName, "inspect",
"--format", fmt.Sprintf(`{{ index .Config.Labels "%s"}}`, nodeRoleLabelKey),
n.name,
)
lines, err := exec.OutputLines(cmd)
if err != nil {
return "", errors.Wrap(err, "failed to get role for node")
}
if len(lines) != 1 {
return "", errors.Errorf("failed to get role for node: output lines %d != 1", len(lines))
}
return lines[0], nil
}
func (n *node) IP() (ipv4 string, ipv6 string, err error) {
// retrieve the IP address of the node using docker inspect
cmd := exec.Command(n.binaryName, "inspect",
"-f", "{{range .NetworkSettings.Networks}}{{.IPAddress}},{{.GlobalIPv6Address}}{{end}}",
n.name, // ... against the "node" container
)
lines, err := exec.OutputLines(cmd)
if err != nil {
return "", "", errors.Wrap(err, "failed to get container details")
}
if len(lines) != 1 {
return "", "", errors.Errorf("file should only be one line, got %d lines", len(lines))
}
ips := strings.Split(lines[0], ",")
if len(ips) != 2 {
return "", "", errors.Errorf("container addresses should have 2 values, got %d values", len(ips))
}
return ips[0], ips[1], nil
}
func (n *node) Command(command string, args ...string) exec.Cmd {
return &nodeCmd{
binaryName: n.binaryName,
nameOrID: n.name,
command: command,
args: args,
}
}
func (n *node) CommandContext(ctx context.Context, command string, args ...string) exec.Cmd {
return &nodeCmd{
binaryName: n.binaryName,
nameOrID: n.name,
command: command,
args: args,
ctx: ctx,
}
}
// nodeCmd implements exec.Cmd for docker nodes
type nodeCmd struct {
binaryName string
nameOrID string // the container name or ID
command string
args []string
env []string
stdin io.Reader
stdout io.Writer
stderr io.Writer
ctx context.Context
}
func (c *nodeCmd) Run() error {
args := []string{
"exec",
// run with privileges so we can remount etc..
// this might not make sense in the most general sense, but it is
// important to many kind commands
"--privileged",
}
if c.stdin != nil {
args = append(args,
"-i", // interactive so we can supply input
)
}
// set env
for _, env := range c.env {
args = append(args, "-e", env)
}
// specify the container and command, after this everything will be
// args the command in the container rather than to docker
args = append(
args,
c.nameOrID, // ... against the container
c.command, // with the command specified
)
args = append(
args,
// finally, with the caller args
c.args...,
)
var cmd exec.Cmd
if c.ctx != nil {
cmd = exec.CommandContext(c.ctx, c.binaryName, args...)
} else {
cmd = exec.Command(c.binaryName, args...)
}
if c.stdin != nil {
cmd.SetStdin(c.stdin)
}
if c.stderr != nil {
cmd.SetStderr(c.stderr)
}
if c.stdout != nil {
cmd.SetStdout(c.stdout)
}
return cmd.Run()
}
func (c *nodeCmd) SetEnv(env ...string) exec.Cmd {
c.env = env
return c
}
func (c *nodeCmd) SetStdin(r io.Reader) exec.Cmd {
c.stdin = r
return c
}
func (c *nodeCmd) SetStdout(w io.Writer) exec.Cmd {
c.stdout = w
return c
}
func (c *nodeCmd) SetStderr(w io.Writer) exec.Cmd {
c.stderr = w
return c
}
func (n *node) SerialLogs(w io.Writer) error {
return exec.Command(n.binaryName, "logs", n.name).SetStdout(w).SetStderr(w).Run()
}

View File

@@ -0,0 +1,372 @@
/*
Copyright 2019 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or impliep.
See the License for the specific language governing permissions and
limitations under the License.
*/
package nerdctl
import (
"encoding/csv"
"encoding/json"
"fmt"
"net"
"path/filepath"
"strings"
"sigs.k8s.io/kind/pkg/cluster/nodes"
"sigs.k8s.io/kind/pkg/errors"
"sigs.k8s.io/kind/pkg/exec"
"sigs.k8s.io/kind/pkg/log"
internallogs "sigs.k8s.io/kind/pkg/cluster/internal/logs"
"sigs.k8s.io/kind/pkg/cluster/internal/providers"
"sigs.k8s.io/kind/pkg/cluster/internal/providers/common"
"sigs.k8s.io/kind/pkg/cluster/nodeutils"
"sigs.k8s.io/kind/pkg/internal/apis/config"
"sigs.k8s.io/kind/pkg/internal/cli"
"sigs.k8s.io/kind/pkg/internal/sets"
)
// NewProvider returns a new provider based on executing `nerdctl ...`
func NewProvider(logger log.Logger, binaryName string) providers.Provider {
// if unset, default to nerdctl
if binaryName == "" {
binaryName = "nerdctl"
}
return &provider{
logger: logger,
binaryName: binaryName,
}
}
// Provider implements provider.Provider
// see NewProvider
type provider struct {
logger log.Logger
binaryName string
info *providers.ProviderInfo
}
// String implements fmt.Stringer
// NOTE: the value of this should not currently be relied upon for anything!
// This is only used for setting the Node's providerID
func (p *provider) String() string {
return "nerdctl"
}
func (p *provider) Binary() string {
return p.binaryName
}
// Provision is part of the providers.Provider interface
func (p *provider) Provision(status *cli.Status, cfg *config.Cluster) (err error) {
// TODO: validate cfg
// ensure node images are pulled before actually provisioning
if err := ensureNodeImages(p.logger, status, cfg, p.Binary()); err != nil {
return err
}
// ensure the pre-requisite network exists
if err := ensureNetwork(fixedNetworkName, p.Binary()); err != nil {
return errors.Wrap(err, "failed to ensure nerdctl network")
}
// actually provision the cluster
icons := strings.Repeat("📦 ", len(cfg.Nodes))
status.Start(fmt.Sprintf("Preparing nodes %s", icons))
defer func() { status.End(err == nil) }()
// plan creating the containers
createContainerFuncs, err := planCreation(cfg, fixedNetworkName, p.Binary())
if err != nil {
return err
}
// actually create nodes
return errors.UntilErrorConcurrent(createContainerFuncs)
}
// ListClusters is part of the providers.Provider interface
func (p *provider) ListClusters() ([]string, error) {
cmd := exec.Command(p.Binary(),
"ps",
"-a", // show stopped nodes
// filter for nodes with the cluster label
"--filter", "label="+clusterLabelKey,
// format to include the cluster name
"--format", fmt.Sprintf(`{{index .Labels "%s"}}`, clusterLabelKey),
)
lines, err := exec.OutputLines(cmd)
if err != nil {
return nil, errors.Wrap(err, "failed to list clusters")
}
return sets.NewString(lines...).List(), nil
}
// ListNodes is part of the providers.Provider interface
func (p *provider) ListNodes(cluster string) ([]nodes.Node, error) {
cmd := exec.Command(p.Binary(),
"ps",
"-a", // show stopped nodes
// filter for nodes with the cluster label
"--filter", fmt.Sprintf("label=%s=%s", clusterLabelKey, cluster),
// format to include the cluster name
"--format", `{{.Names}}`,
)
lines, err := exec.OutputLines(cmd)
if err != nil {
return nil, errors.Wrap(err, "failed to list nodes")
}
length := len(lines)
// convert names to node handles
ret := make([]nodes.Node, 0, length)
for _, name := range lines {
if name != "" {
ret = append(ret, p.node(name))
}
}
return ret, nil
}
// DeleteNodes is part of the providers.Provider interface
func (p *provider) DeleteNodes(n []nodes.Node) error {
if len(n) == 0 {
return nil
}
argsNoRestart := make([]string, 0, len(n)+2)
argsNoRestart = append(argsNoRestart,
"update",
"--restart=no",
)
argsStop := make([]string, 0, len(n)+1)
argsStop = append(argsStop, "stop")
argsWait := make([]string, 0, len(n)+1)
argsWait = append(argsWait, "wait")
argsRm := make([]string, 0, len(n)+3) // allocate once
argsRm = append(argsRm,
"rm",
"-f",
"-v", // delete volumes
)
for _, node := range n {
argsRm = append(argsRm, node.String())
argsStop = append(argsStop, node.String())
argsWait = append(argsWait, node.String())
argsNoRestart = append(argsNoRestart, node.String())
}
if err := exec.Command(p.Binary(), argsNoRestart...).Run(); err != nil {
return errors.Wrap(err, "failed to update restart policy to 'no'")
}
if err := exec.Command(p.Binary(), argsStop...).Run(); err != nil {
return errors.Wrap(err, "failed to stop nodes")
}
if err := exec.Command(p.Binary(), argsWait...).Run(); err != nil {
return errors.Wrap(err, "failed to wait for node exit")
}
if err := exec.Command(p.Binary(), argsRm...).Run(); err != nil {
return errors.Wrap(err, "failed to delete nodes")
}
return nil
}
// GetAPIServerEndpoint is part of the providers.Provider interface
func (p *provider) GetAPIServerEndpoint(cluster string) (string, error) {
// locate the node that hosts this
allNodes, err := p.ListNodes(cluster)
if err != nil {
return "", errors.Wrap(err, "failed to list nodes")
}
n, err := nodeutils.APIServerEndpointNode(allNodes)
if err != nil {
return "", errors.Wrap(err, "failed to get api server endpoint")
}
// if the 'desktop.docker.io/ports/<PORT>/tcp' label is present,
// defer to its value for the api server endpoint
//
// For example:
// "Labels": {
// "desktop.docker.io/ports/6443/tcp": "10.0.1.7:6443",
// }
cmd := exec.Command(
p.Binary(), "inspect",
"--format", fmt.Sprintf(
"{{ index .Config.Labels \"desktop.docker.io/ports/%d/tcp\" }}", common.APIServerInternalPort,
),
n.String(),
)
lines, err := exec.OutputLines(cmd)
if err != nil {
return "", errors.Wrap(err, "failed to get api server port")
}
if len(lines) == 1 && lines[0] != "" {
return lines[0], nil
}
// else, retrieve the specific port mapping via NetworkSettings.Ports
cmd = exec.Command(
p.Binary(), "inspect",
"--format", fmt.Sprintf(
"{{ with (index (index .NetworkSettings.Ports \"%d/tcp\") 0) }}{{ printf \"%%s\t%%s\" .HostIp .HostPort }}{{ end }}", common.APIServerInternalPort,
),
n.String(),
)
lines, err = exec.OutputLines(cmd)
if err != nil {
return "", errors.Wrap(err, "failed to get api server port")
}
if len(lines) != 1 {
return "", errors.Errorf("network details should only be one line, got %d lines", len(lines))
}
parts := strings.Split(lines[0], "\t")
if len(parts) != 2 {
return "", errors.Errorf("network details should only be two parts, got %d", len(parts))
}
// join host and port
return net.JoinHostPort(parts[0], parts[1]), nil
}
// GetAPIServerInternalEndpoint is part of the providers.Provider interface
func (p *provider) GetAPIServerInternalEndpoint(cluster string) (string, error) {
// locate the node that hosts this
allNodes, err := p.ListNodes(cluster)
if err != nil {
return "", errors.Wrap(err, "failed to list nodes")
}
n, err := nodeutils.APIServerEndpointNode(allNodes)
if err != nil {
return "", errors.Wrap(err, "failed to get api server endpoint")
}
// NOTE: we're using the nodes's hostnames which are their names
return net.JoinHostPort(n.String(), fmt.Sprintf("%d", common.APIServerInternalPort)), nil
}
// node returns a new node handle for this provider
func (p *provider) node(name string) nodes.Node {
return &node{
binaryName: p.binaryName,
name: name,
}
}
// CollectLogs will populate dir with cluster logs and other debug files
func (p *provider) CollectLogs(dir string, nodes []nodes.Node) error {
execToPathFn := func(cmd exec.Cmd, path string) func() error {
return func() error {
f, err := common.FileOnHost(path)
if err != nil {
return err
}
defer f.Close()
return cmd.SetStdout(f).SetStderr(f).Run()
}
}
// construct a slice of methods to collect logs
fns := []func() error{
// record info about the host nerdctl
execToPathFn(
exec.Command(p.Binary(), "info"),
filepath.Join(dir, "docker-info.txt"),
),
}
// collect /var/log for each node and plan collecting more logs
var errs []error
for _, n := range nodes {
node := n // https://golang.org/doc/faq#closures_and_goroutines
name := node.String()
path := filepath.Join(dir, name)
if err := internallogs.DumpDir(p.logger, node, "/var/log", path); err != nil {
errs = append(errs, err)
}
fns = append(fns,
func() error { return common.CollectLogs(node, path) },
execToPathFn(exec.Command(p.Binary(), "inspect", name), filepath.Join(path, "inspect.json")),
func() error {
f, err := common.FileOnHost(filepath.Join(path, "serial.log"))
if err != nil {
return err
}
defer f.Close()
return node.SerialLogs(f)
},
)
}
// run and collect up all errors
errs = append(errs, errors.AggregateConcurrent(fns))
return errors.NewAggregate(errs)
}
// Info returns the provider info.
// The info is cached on the first time of the execution.
func (p *provider) Info() (*providers.ProviderInfo, error) {
var err error
if p.info == nil {
p.info, err = info(p.Binary())
}
return p.info, err
}
// dockerInfo corresponds to `docker info --format '{{json .}}'`
type dockerInfo struct {
CgroupDriver string `json:"CgroupDriver"` // "systemd", "cgroupfs", "none"
CgroupVersion string `json:"CgroupVersion"` // e.g. "2"
MemoryLimit bool `json:"MemoryLimit"`
PidsLimit bool `json:"PidsLimit"`
CPUShares bool `json:"CPUShares"`
SecurityOptions []string `json:"SecurityOptions"`
}
func info(binaryName string) (*providers.ProviderInfo, error) {
cmd := exec.Command(binaryName, "info", "--format", "{{json .}}")
out, err := exec.Output(cmd)
if err != nil {
return nil, errors.Wrap(err, "failed to get nerdctl info")
}
var dInfo dockerInfo
if err := json.Unmarshal(out, &dInfo); err != nil {
return nil, err
}
info := providers.ProviderInfo{
Cgroup2: dInfo.CgroupVersion == "2",
}
// When CgroupDriver == "none", the MemoryLimit/PidsLimit/CPUShares
// values are meaningless and need to be considered false.
// https://github.com/moby/moby/issues/42151
if dInfo.CgroupDriver != "none" {
info.SupportsMemoryLimit = dInfo.MemoryLimit
info.SupportsPidsLimit = dInfo.PidsLimit
info.SupportsCPUShares = dInfo.CPUShares
}
for _, o := range dInfo.SecurityOptions {
// o is like "name=seccomp,profile=default", or "name=rootless",
csvReader := csv.NewReader(strings.NewReader(o))
sliceSlice, err := csvReader.ReadAll()
if err != nil {
return nil, err
}
for _, f := range sliceSlice {
for _, ff := range f {
if ff == "name=rootless" {
info.Rootless = true
}
}
}
}
return &info, nil
}

View File

@@ -0,0 +1,391 @@
/*
Copyright 2019 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package nerdctl
import (
"context"
"fmt"
"net"
"path/filepath"
"strings"
"time"
"sigs.k8s.io/kind/pkg/cluster/constants"
"sigs.k8s.io/kind/pkg/errors"
"sigs.k8s.io/kind/pkg/exec"
"sigs.k8s.io/kind/pkg/fs"
"sigs.k8s.io/kind/pkg/cluster/internal/loadbalancer"
"sigs.k8s.io/kind/pkg/cluster/internal/providers/common"
"sigs.k8s.io/kind/pkg/internal/apis/config"
)
// planCreation creates a slice of funcs that will create the containers
func planCreation(cfg *config.Cluster, networkName, binaryName string) (createContainerFuncs []func() error, err error) {
// we need to know all the names for NO_PROXY
// compute the names first before any actual node details
nodeNamer := common.MakeNodeNamer(cfg.Name)
names := make([]string, len(cfg.Nodes))
for i, node := range cfg.Nodes {
name := nodeNamer(string(node.Role)) // name the node
names[i] = name
}
haveLoadbalancer := config.ClusterHasImplicitLoadBalancer(cfg)
if haveLoadbalancer {
names = append(names, nodeNamer(constants.ExternalLoadBalancerNodeRoleValue))
}
// these apply to all container creation
genericArgs, err := commonArgs(cfg.Name, cfg, networkName, names, binaryName)
if err != nil {
return nil, err
}
// only the external LB should reflect the port if we have multiple control planes
apiServerPort := cfg.Networking.APIServerPort
apiServerAddress := cfg.Networking.APIServerAddress
if haveLoadbalancer {
// TODO: picking ports locally is less than ideal with remote docker
// but this is supposed to be an implementation detail and NOT picking
// them breaks host reboot ...
// For now remote docker + multi control plane is not supported
apiServerPort = 0 // replaced with random ports
apiServerAddress = "127.0.0.1" // only the LB needs to be non-local
// only for IPv6 only clusters
if cfg.Networking.IPFamily == config.IPv6Family {
apiServerAddress = "::1" // only the LB needs to be non-local
}
// plan loadbalancer node
name := names[len(names)-1]
createContainerFuncs = append(createContainerFuncs, func() error {
args, err := runArgsForLoadBalancer(cfg, name, genericArgs)
if err != nil {
return err
}
return createContainer(name, args, binaryName)
})
}
// plan normal nodes
for i, node := range cfg.Nodes {
node := node.DeepCopy() // copy so we can modify
name := names[i]
// fixup relative paths, docker can only handle absolute paths
for m := range node.ExtraMounts {
hostPath := node.ExtraMounts[m].HostPath
if !fs.IsAbs(hostPath) {
absHostPath, err := filepath.Abs(hostPath)
if err != nil {
return nil, errors.Wrapf(err, "unable to resolve absolute path for hostPath: %q", hostPath)
}
node.ExtraMounts[m].HostPath = absHostPath
}
}
// plan actual creation based on role
switch node.Role {
case config.ControlPlaneRole:
createContainerFuncs = append(createContainerFuncs, func() error {
node.ExtraPortMappings = append(node.ExtraPortMappings,
config.PortMapping{
ListenAddress: apiServerAddress,
HostPort: apiServerPort,
ContainerPort: common.APIServerInternalPort,
},
)
args, err := runArgsForNode(node, cfg.Networking.IPFamily, name, genericArgs)
if err != nil {
return err
}
return createContainerWithWaitUntilSystemdReachesMultiUserSystem(name, args, binaryName)
})
case config.WorkerRole:
createContainerFuncs = append(createContainerFuncs, func() error {
args, err := runArgsForNode(node, cfg.Networking.IPFamily, name, genericArgs)
if err != nil {
return err
}
return createContainerWithWaitUntilSystemdReachesMultiUserSystem(name, args, binaryName)
})
default:
return nil, errors.Errorf("unknown node role: %q", node.Role)
}
}
return createContainerFuncs, nil
}
// commonArgs computes static arguments that apply to all containers
func commonArgs(cluster string, cfg *config.Cluster, networkName string, nodeNames []string, binaryName string) ([]string, error) {
// standard arguments all nodes containers need, computed once
args := []string{
"--detach", // run the container detached
"--tty", // allocate a tty for entrypoint logs
// label the node with the cluster ID
"--label", fmt.Sprintf("%s=%s", clusterLabelKey, cluster),
// user a user defined network so we get embedded DNS
"--net", networkName,
// containerd supports the following restart modes:
// - no
// - on-failure[:max-retries]
// - unless-stopped
// - always
//
// What we desire is:
// - restart on host / container runtime reboot
// - don't restart for any other reason
//
"--restart=on-failure:1",
// this can be enabled by default in docker daemon.json, so we explicitly
// disable it, we want our entrypoint to be PID1, not docker-init / tini
"--init=false",
}
// enable IPv6 if necessary
if config.ClusterHasIPv6(cfg) {
args = append(args, "--sysctl=net.ipv6.conf.all.disable_ipv6=0", "--sysctl=net.ipv6.conf.all.forwarding=1")
}
// pass proxy environment variables
proxyEnv, err := getProxyEnv(cfg, networkName, nodeNames, binaryName)
if err != nil {
return nil, errors.Wrap(err, "proxy setup error")
}
for key, val := range proxyEnv {
args = append(args, "-e", fmt.Sprintf("%s=%s", key, val))
}
// enable /dev/fuse explicitly for fuse-overlayfs
// (Rootless Docker does not automatically mount /dev/fuse with --privileged)
if mountFuse(binaryName) {
args = append(args, "--device", "/dev/fuse")
}
if cfg.Networking.DNSSearch != nil {
args = append(args, "-e", "KIND_DNS_SEARCH="+strings.Join(*cfg.Networking.DNSSearch, " "))
}
return args, nil
}
func runArgsForNode(node *config.Node, clusterIPFamily config.ClusterIPFamily, name string, args []string) ([]string, error) {
args = append([]string{
"--hostname", name, // make hostname match container name
// label the node with the role ID
"--label", fmt.Sprintf("%s=%s", nodeRoleLabelKey, node.Role),
// running containers in a container requires privileged
// NOTE: we could try to replicate this with --cap-add, and use less
// privileges, but this flag also changes some mounts that are necessary
// including some ones docker would otherwise do by default.
// for now this is what we want. in the future we may revisit this.
"--privileged",
"--security-opt", "seccomp=unconfined", // also ignore seccomp
"--security-opt", "apparmor=unconfined", // also ignore apparmor
// runtime temporary storage
"--tmpfs", "/tmp", // various things depend on working /tmp
"--tmpfs", "/run", // systemd wants a writable /run
// runtime persistent storage
// this ensures that E.G. pods, logs etc. are not on the container
// filesystem, which is not only better for performance, but allows
// running kind in kind for "party tricks"
// (please don't depend on doing this though!)
"--volume", "/var",
// some k8s things want to read /lib/modules
"--volume", "/lib/modules:/lib/modules:ro",
// propagate KIND_EXPERIMENTAL_CONTAINERD_SNAPSHOTTER to the entrypoint script
"-e", "KIND_EXPERIMENTAL_CONTAINERD_SNAPSHOTTER",
},
args...,
)
// convert mounts and port mappings to container run args
args = append(args, generateMountBindings(node.ExtraMounts...)...)
mappingArgs, err := generatePortMappings(clusterIPFamily, node.ExtraPortMappings...)
if err != nil {
return nil, err
}
args = append(args, mappingArgs...)
switch node.Role {
case config.ControlPlaneRole:
args = append(args, "-e", "KUBECONFIG=/etc/kubernetes/admin.conf")
}
// finally, specify the image to run
return append(args, node.Image), nil
}
func runArgsForLoadBalancer(cfg *config.Cluster, name string, args []string) ([]string, error) {
args = append([]string{
"--hostname", name, // make hostname match container name
// label the node with the role ID
"--label", fmt.Sprintf("%s=%s", nodeRoleLabelKey, constants.ExternalLoadBalancerNodeRoleValue),
},
args...,
)
// load balancer port mapping
mappingArgs, err := generatePortMappings(cfg.Networking.IPFamily,
config.PortMapping{
ListenAddress: cfg.Networking.APIServerAddress,
HostPort: cfg.Networking.APIServerPort,
ContainerPort: common.APIServerInternalPort,
},
)
if err != nil {
return nil, err
}
args = append(args, mappingArgs...)
// finally, specify the image to run
return append(args, loadbalancer.Image), nil
}
func getProxyEnv(cfg *config.Cluster, networkName string, nodeNames []string, binaryName string) (map[string]string, error) {
envs := common.GetProxyEnvs(cfg)
// Specifically add the docker network subnets to NO_PROXY if we are using a proxy
if len(envs) > 0 {
subnets, err := getSubnets(networkName, binaryName)
if err != nil {
return nil, err
}
noProxyList := append(subnets, envs[common.NOProxy])
noProxyList = append(noProxyList, nodeNames...)
// Add pod and service dns names to no_proxy to allow in cluster
// Note: this is best effort based on the default CoreDNS spec
// https://github.com/kubernetes/dns/blob/master/docs/specification.md
// Any user created pod/service hostnames, namespaces, custom DNS services
// are expected to be no-proxied by the user explicitly.
noProxyList = append(noProxyList, ".svc", ".svc.cluster", ".svc.cluster.local")
noProxyJoined := strings.Join(noProxyList, ",")
envs[common.NOProxy] = noProxyJoined
envs[strings.ToLower(common.NOProxy)] = noProxyJoined
}
return envs, nil
}
func getSubnets(networkName, binaryName string) ([]string, error) {
format := `{{range (index (index . "IPAM") "Config")}}{{index . "Subnet"}} {{end}}`
cmd := exec.Command(binaryName, "network", "inspect", "-f", format, networkName)
lines, err := exec.OutputLines(cmd)
if err != nil {
return nil, errors.Wrap(err, "failed to get subnets")
}
return strings.Split(strings.TrimSpace(lines[0]), " "), nil
}
// generateMountBindings converts the mount list to a list of args for docker
// '<HostPath>:<ContainerPath>[:options]', where 'options'
// is a comma-separated list of the following strings:
// 'ro', if the path is read only
// 'Z', if the volume requires SELinux relabeling
func generateMountBindings(mounts ...config.Mount) []string {
args := make([]string, 0, len(mounts))
for _, m := range mounts {
bind := fmt.Sprintf("%s:%s", m.HostPath, m.ContainerPath)
var attrs []string
if m.Readonly {
attrs = append(attrs, "ro")
}
// Only request relabeling if the pod provides an SELinux context. If the pod
// does not provide an SELinux context relabeling will label the volume with
// the container's randomly allocated MCS label. This would restrict access
// to the volume to the container which mounts it first.
if m.SelinuxRelabel {
attrs = append(attrs, "Z")
}
switch m.Propagation {
case config.MountPropagationNone:
// noop, private is default
case config.MountPropagationBidirectional:
attrs = append(attrs, "rshared")
case config.MountPropagationHostToContainer:
attrs = append(attrs, "rslave")
default: // Falls back to "private"
}
if len(attrs) > 0 {
bind = fmt.Sprintf("%s:%s", bind, strings.Join(attrs, ","))
}
args = append(args, fmt.Sprintf("--volume=%s", bind))
}
return args
}
// generatePortMappings converts the portMappings list to a list of args for docker
func generatePortMappings(clusterIPFamily config.ClusterIPFamily, portMappings ...config.PortMapping) ([]string, error) {
args := make([]string, 0, len(portMappings))
for _, pm := range portMappings {
// do provider internal defaulting
// in a future API revision we will handle this at the API level and remove this
if pm.ListenAddress == "" {
switch clusterIPFamily {
case config.IPv4Family, config.DualStackFamily:
pm.ListenAddress = "0.0.0.0" // this is the docker default anyhow
case config.IPv6Family:
pm.ListenAddress = "::"
default:
return nil, errors.Errorf("unknown cluster IP family: %v", clusterIPFamily)
}
}
if string(pm.Protocol) == "" {
pm.Protocol = config.PortMappingProtocolTCP // TCP is the default
}
// validate that the provider can handle this binding
switch pm.Protocol {
case config.PortMappingProtocolTCP:
case config.PortMappingProtocolUDP:
case config.PortMappingProtocolSCTP:
default:
return nil, errors.Errorf("unknown port mapping protocol: %v", pm.Protocol)
}
// get a random port if necessary (port = 0)
hostPort, releaseHostPortFn, err := common.PortOrGetFreePort(pm.HostPort, pm.ListenAddress)
if err != nil {
return nil, errors.Wrap(err, "failed to get random host port for port mapping")
}
if releaseHostPortFn != nil {
defer releaseHostPortFn()
}
// generate the actual mapping arg
protocol := string(pm.Protocol)
hostPortBinding := net.JoinHostPort(pm.ListenAddress, fmt.Sprintf("%d", hostPort))
args = append(args, fmt.Sprintf("--publish=%s:%d/%s", hostPortBinding, pm.ContainerPort, protocol))
}
return args, nil
}
func createContainer(name string, args []string, binaryName string) error {
if err := exec.Command(binaryName, append([]string{"run", "--name", name}, args...)...).Run(); err != nil {
return err
}
return nil
}
func createContainerWithWaitUntilSystemdReachesMultiUserSystem(name string, args []string, binaryName string) error {
if err := exec.Command(binaryName, append([]string{"run", "--name", name}, args...)...).Run(); err != nil {
return err
}
logCtx, logCancel := context.WithTimeout(context.Background(), 30*time.Second)
logCmd := exec.CommandContext(logCtx, binaryName, "logs", "-f", name)
defer logCancel()
return common.WaitUntilLogRegexpMatches(logCtx, logCmd, common.NodeReachedCgroupsReadyRegexp())
}

View File

@@ -0,0 +1,52 @@
/*
Copyright 2019 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package nerdctl
import (
"strings"
"sigs.k8s.io/kind/pkg/exec"
)
// IsAvailable checks if nerdctl (or finch) is available in the system
func IsAvailable() bool {
cmd := exec.Command("nerdctl", "-v")
lines, err := exec.OutputLines(cmd)
if err != nil || len(lines) != 1 {
// check finch
cmd = exec.Command("finch", "-v")
lines, err = exec.OutputLines(cmd)
if err != nil || len(lines) != 1 {
return false
}
return strings.HasPrefix(lines[0], "finch version")
}
return strings.HasPrefix(lines[0], "nerdctl version")
}
// rootless: use fuse-overlayfs by default
// https://github.com/kubernetes-sigs/kind/issues/2275
func mountFuse(binaryName string) bool {
i, err := info(binaryName)
if err != nil {
return false
}
if i != nil && i.Rootless {
return true
}
return false
}

View File

@@ -34,6 +34,7 @@ import (
"sigs.k8s.io/kind/pkg/cluster/internal/kubeconfig"
internalproviders "sigs.k8s.io/kind/pkg/cluster/internal/providers"
"sigs.k8s.io/kind/pkg/cluster/internal/providers/docker"
"sigs.k8s.io/kind/pkg/cluster/internal/providers/nerdctl"
"sigs.k8s.io/kind/pkg/cluster/internal/providers/podman"
)
@@ -115,6 +116,9 @@ func DetectNodeProvider() (ProviderOption, error) {
if docker.IsAvailable() {
return ProviderWithDocker(), nil
}
if nerdctl.IsAvailable() {
return ProviderWithNerdctl(""), nil
}
if podman.IsAvailable() {
return ProviderWithPodman(), nil
}
@@ -167,6 +171,13 @@ func ProviderWithPodman() ProviderOption {
})
}
// ProviderWithNerdctl configures the provider to use the nerdctl runtime
func ProviderWithNerdctl(binaryName string) ProviderOption {
return providerRuntimeOption(func(p *Provider) {
p.provider = nerdctl.NewProvider(p.logger, binaryName)
})
}
// Create provisions and starts a kubernetes-in-docker cluster
func (p *Provider) Create(name string, options ...CreateOption) error {
// apply options