refactor image loading to be more efficient and handle tag rewriting

This commit is contained in:
Benjamin Elder
2019-07-15 20:48:17 -07:00
parent 9205bac4a8
commit c192f9c698
9 changed files with 385 additions and 110 deletions

View File

@@ -21,12 +21,8 @@ import (
"os"
"path"
"path/filepath"
"strings"
"sigs.k8s.io/kind/pkg/container/docker"
"github.com/pkg/errors"
"k8s.io/apimachinery/pkg/util/version"
"sigs.k8s.io/kind/pkg/exec"
"sigs.k8s.io/kind/pkg/util"
)
@@ -35,7 +31,8 @@ import (
type BazelBuildBits struct {
kubeRoot string
// computed at build time
paths map[string]string
paths map[string]string
imagePaths []string
}
var _ Bits = &BazelBuildBits{}
@@ -87,83 +84,29 @@ func (b *BazelBuildBits) Build() error {
b.paths = b.findPaths(bazelGoosGoarch)
// capture version info
rawVersion, err := buildVersionFile(b.kubeRoot)
_, err = buildVersionFile(b.kubeRoot)
if err != nil {
return err
}
// additional special handling for old kubernetes versions + bazel
// before Kubernetes v1.12.0 kubeadm requires arch specific images, instead
// later releases use manifest list images
// we must re-tag them here
ver, err := version.ParseGeneric(rawVersion)
if err != nil {
return err
}
// only < 1.12.0 has this problem
if !ver.LessThan(version.MustParseSemantic("v1.12.0")) {
return nil
}
// fix all tar files
for path := range b.paths {
if !strings.HasSuffix(path, ".tar") {
continue
}
if err := fixOldImageTags(path, arch); err != nil {
return err
}
}
return nil
}
// fixes the missing -$arch suffix on old kubernetes image archives
func fixOldImageTags(path, arch string) error {
// open input at path and create a fixed file at path+.fixed
in, err := os.Open(path)
if err != nil {
return err
}
defer in.Close()
out, err := os.Create(path + ".fixed")
if err != nil {
return err
}
defer out.Close()
// create a tarball with corrected tags
archSuffix := "-" + arch
repositoryFixer := func(repository string) string {
if !strings.HasSuffix(repository, archSuffix) {
fmt.Println("fixed: " + repository + " -> " + repository + archSuffix)
repository = repository + archSuffix
}
return repository
}
if err := docker.EditArchiveRepositories(in, out, repositoryFixer); err != nil {
return err
}
// replace the original file with the fixed file
in.Close()
out.Sync()
out.Close()
return os.Rename(out.Name(), in.Name())
}
func (b *BazelBuildBits) findPaths(bazelGoosGoarch string) map[string]string {
// https://docs.bazel.build/versions/master/output_directories.html
binDir := filepath.Join(b.kubeRoot, "bazel-bin")
buildDir := filepath.Join(binDir, "build")
// docker images
b.imagePaths = []string{
filepath.Join(buildDir, "kube-apiserver.tar"),
filepath.Join(buildDir, "kube-controller-manager.tar"),
filepath.Join(buildDir, "kube-scheduler.tar"),
filepath.Join(buildDir, "kube-proxy.tar"),
}
// all well-known paths that have not changed
paths := map[string]string{
// docker images
filepath.Join(buildDir, "kube-apiserver.tar"): "images/kube-apiserver.tar",
filepath.Join(buildDir, "kube-controller-manager.tar"): "images/kube-controller-manager.tar",
filepath.Join(buildDir, "kube-scheduler.tar"): "images/kube-scheduler.tar",
filepath.Join(buildDir, "kube-proxy.tar"): "images/kube-proxy.tar",
// version file
filepath.Join(b.kubeRoot, "_output", "git_version"): "version",
}
@@ -216,6 +159,11 @@ func (b *BazelBuildBits) Paths() map[string]string {
return b.paths
}
// ImagePaths implements Bits.ImagePaths
func (b *BazelBuildBits) ImagePaths() []string {
return b.imagePaths
}
// Install implements Bits.Install
func (b *BazelBuildBits) Install(install InstallContext) error {
kindBinDir := path.Join(install.BasePath(), "bin")

View File

@@ -33,9 +33,10 @@ type Bits interface {
// Paths returns a map of path on host machine to desired path in the image
// These paths will be populated in the image relative to some base path,
// obtainable by NodeInstall.BasePath()
// Note: if Images are populated to images/, the cluster provisioning
// will load these prior to calling kubeadm
Paths() map[string]string
// ImagePaths returns a list of paths to image archives to be loaded into
// the Node
ImagePaths() []string
// Install should install the built sources on the node, assuming paths
// have been populated
// TODO(bentheelder): eliminate install, make install file-copies only,

View File

@@ -198,24 +198,29 @@ func (b *DockerBuildBits) Paths() map[string]string {
binDir := filepath.Join(b.kubeRoot,
"_output", "dockerized", "bin", "linux", util.GetArch(),
)
imageDir := filepath.Join(b.kubeRoot,
"_output", "release-images", util.GetArch(),
)
return map[string]string{
// binaries (hyperkube)
filepath.Join(binDir, "kubeadm"): "bin/kubeadm",
filepath.Join(binDir, "kubelet"): "bin/kubelet",
filepath.Join(binDir, "kubectl"): "bin/kubectl",
// docker images
filepath.Join(imageDir, "kube-apiserver.tar"): "images/kube-apiserver.tar",
filepath.Join(imageDir, "kube-controller-manager.tar"): "images/kube-controller-manager.tar",
filepath.Join(imageDir, "kube-scheduler.tar"): "images/kube-scheduler.tar",
filepath.Join(imageDir, "kube-proxy.tar"): "images/kube-proxy.tar",
// version file
filepath.Join(b.kubeRoot, "_output", "git_version"): "version",
}
}
// ImagePaths implements Bits.ImagePaths
func (b *DockerBuildBits) ImagePaths() []string {
imageDir := filepath.Join(b.kubeRoot,
"_output", "release-images", util.GetArch(),
)
return []string{
filepath.Join(imageDir, "kube-apiserver.tar"),
filepath.Join(imageDir, "kube-controller-manager.tar"),
filepath.Join(imageDir, "kube-scheduler.tar"),
filepath.Join(imageDir, "kube-proxy.tar"),
}
}
// Install implements Bits.Install
func (b *DockerBuildBits) Install(install InstallContext) error {
kindBinDir := path.Join(install.BasePath(), "bin")

59
pkg/build/node/import.go Normal file
View File

@@ -0,0 +1,59 @@
/*
Copyright 2019 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package node
import (
"sigs.k8s.io/kind/pkg/exec"
)
type imageImporter interface {
Prepare() error
LoadCommand() exec.Cmd
End() error
}
type containerdImporter struct {
containerCmder exec.Cmder
}
func newContainerdImporter(containerCmder exec.Cmder) imageImporter {
return &containerdImporter{
containerCmder: containerCmder,
}
}
var _ imageImporter = &containerdImporter{}
func (c *containerdImporter) Prepare() error {
if err := c.containerCmder.Command(
"bash", "-c", "nohup containerd > /dev/null 2>&1 &",
).Run(); err != nil {
return err
}
// TODO(bentheelder): some healthcheck?
return nil
}
func (c *containerdImporter) LoadCommand() exec.Cmd {
return c.containerCmder.Command(
"ctr", "--namespace=k8s.io", "images", "import", "--no-unpack", "-",
)
}
func (c *containerdImporter) End() error {
return c.containerCmder.Command("pkill", "containerd").Run()
}

View File

@@ -19,9 +19,9 @@ package node
import (
"fmt"
"io"
"os"
"path"
"regexp"
"strings"
"time"
@@ -178,21 +178,15 @@ func (c *BuildContext) populateBits(buildDir string) error {
return nil
}
// matches image tarballs kind will sideload
var imageRegex = regexp.MustCompile(`images/[^/]+\.tar`)
// returns a set of image tags that will be sideloaded
func (c *BuildContext) getBuiltImages() (sets.String, error) {
bitPaths := c.bits.Paths()
images := sets.NewString()
for src, dest := range bitPaths {
if imageRegex.MatchString(dest) {
tags, err := docker.GetArchiveTags(src)
if err != nil {
return nil, err
}
images.Insert(tags...)
for _, path := range c.bits.ImagePaths() {
tags, err := docker.GetArchiveTags(path)
if err != nil {
return nil, err
}
images.Insert(tags...)
}
return images, nil
}
@@ -401,14 +395,24 @@ func (c *BuildContext) prePullImages(dir, containerID string) error {
if err != nil {
return err
}
if ver.LessThan(version.MustParseSemantic("v1.12.0")) {
archSuffix := fmt.Sprintf("-%s:", c.arch)
for _, image := range builtImages.List() {
if !strings.Contains(image, archSuffix) {
builtImages.Insert(strings.Replace(image, ":", archSuffix, 1))
}
// get image tag fixing function for this version
fixRepository := repositoryCorrectorForVersion(ver)
// correct set of built tags using the same logic we will use to rewrite
// the tags as we load the archives
fixedImages := sets.NewString()
for _, image := range builtImages.List() {
registry, tag, err := docker.SplitImage(image)
if err != nil {
return err
}
registry = fixRepository(registry)
fixedImages.Insert(registry + ":" + tag)
}
builtImages = fixedImages
println("built images")
println(strings.Join(builtImages.List(), ", "))
// write the default CNI manifest
// NOTE: the paths inside the container should use the path package
@@ -496,19 +500,85 @@ func (c *BuildContext) prePullImages(dir, containerID string) error {
return err
}
// preload images into containerd
// TODO(bentheelder): we can skip the move and chown steps if we go directly to this
if err = inheritOutputAndRun(cmder.Command(
"bash", "-c",
`containerd & find /kind/images -name *.tar -print0 | xargs -0 -n 1 -P $(nproc) ctr --namespace=k8s.io images import --no-unpack && kill %1 && rm -rf /kind/images/*`,
)); err != nil {
log.Errorf("Image build Failed! Failed to load images into containerd %v", err)
// setup image importer
importer := newContainerdImporter(cmder)
if err := importer.Prepare(); err != nil {
log.Errorf("Image build Failed! Failed to prepare containerd to load images %v", err)
return err
}
// TODO: return this error?
defer func() {
if err := importer.End(); err != nil {
log.Errorf("Image build Failed! Failed to tear down containerd after loading images %v", err)
}
}()
// create a plan of image loading
loadFns := []func() error{}
for _, image := range c.bits.ImagePaths() {
image := image // capture loop var
loadFns = append(loadFns, func() error {
f, err := os.Open(image)
if err != nil {
return err
}
defer f.Close()
//return importer.LoadCommand().SetStdout(os.Stdout).SetStderr(os.Stderr).SetStdin(f).Run()
// we will rewrite / correct the tags as we load the image
if err := exec.RunWithStdinWriter(importer.LoadCommand().SetStdout(os.Stdout).SetStderr(os.Stdout), func(w io.Writer) error {
return docker.EditArchiveRepositories(f, w, fixRepository)
// _, err := io.Copy(w, f)
// return err
}); err != nil {
return err
}
return nil
})
}
// run all image loading concurrently until one fails or all succeed
if err := concurrent.UntilError(loadFns); err != nil {
log.Errorf("Image build Failed! Failed to load images %v", err)
return err
}
return nil
}
func repositoryCorrectorForVersion(kubeVersion *version.Version) func(string) string {
// TODO(bentheelder): we assume the host arch, but cross compiling should
// be possible now
arch := util.GetArch()
archSuffix := "-" + arch
// For kubernetes v1.15+ (actually 1.16 alpha versions) we may need to
// drop the arch suffix from images to get the expected image
// for < v1.12 we need to do the opposite.
// We can accomplish this by just handling < 1.12 & >= 1.12 as we won't
// touch images that match the expectation in either case ...
if kubeVersion.LessThan(version.MustParseSemantic("v1.12.0")) {
return func(repository string) string {
if !strings.HasSuffix(repository, archSuffix) {
fixed := repository + archSuffix
fmt.Println("fixed: " + repository + " -> " + fixed)
repository = fixed
}
return repository
}
}
return func(repository string) string {
if strings.HasSuffix(repository, archSuffix) {
fixed := strings.TrimSuffix(repository, archSuffix)
fmt.Println("fixed: " + repository + " -> " + fixed)
repository = fixed
}
return repository
}
}
func (c *BuildContext) createBuildContainer(buildDir string) (id string, err error) {
// attempt to explicitly pull the image if it doesn't exist locally
// we don't care if this errors, we'll still try to run which also pulls

View File

@@ -20,7 +20,6 @@ import (
"io"
"sigs.k8s.io/kind/pkg/exec"
"sigs.k8s.io/kind/pkg/log"
)
// containerCmder implements exec.Cmder for docker containers
@@ -67,12 +66,6 @@ func (c *containerCmd) Run() error {
"-i", // interactive so we can supply input
)
}
// if the command is hooked up to the processes's output we want a tty
if log.IsTerminal(c.stderr) || log.IsTerminal(c.stdout) {
args = append(args,
"-t",
)
}
// set env
for _, env := range c.env {
args = append(args, "-e", env)

View File

@@ -17,10 +17,52 @@ limitations under the License.
package docker
import (
"fmt"
"strings"
"github.com/pkg/errors"
"sigs.k8s.io/kind/pkg/exec"
)
// SplitImage splits an image into (registry,tag) following these cases:
//
// alpine -> (alpine, latest)
//
// alpine:latest -> (alpine, latest)
//
// alpine@sha256:28ef97b8686a0b5399129e9b763d5b7e5ff03576aa5580d6f4182a49c5fe1913 -> (alpine, latest@sha256:28ef97b8686a0b5399129e9b763d5b7e5ff03576aa5580d6f4182a49c5fe1913)
//
// alpine:latest@sha256:28ef97b8686a0b5399129e9b763d5b7e5ff03576aa5580d6f4182a49c5fe1913 -> (alpine, latest@sha256:28ef97b8686a0b5399129e9b763d5b7e5ff03576aa5580d6f4182a49c5fe1913)
//
// NOTE: for our purposes we consider the sha to be part of the tag, and we
// resolve the implicit :latest
func SplitImage(image string) (registry, tag string, err error) {
// we are looking for ':' and '@'
firstColon := strings.IndexByte(image, 58)
firstAt := strings.IndexByte(image, 64)
// there should be a registry before the tag, and @/: should not be the last
// character, these cases are assumed not to exist by the rest of the code
if firstColon == 0 || firstAt == 0 || firstColon+1 == len(image) || firstAt+1 == len(image) {
return "", "", fmt.Errorf("unexpected image: %q", image)
}
// NOTE: The order of these cases matters
// case: alpine
if firstColon == -1 && firstAt == -1 {
return image, "latest", nil
}
// case: alpine@sha256:28ef97b8686a0b5399129e9b763d5b7e5ff03576aa5580d6f4182a49c5fe1913
if firstAt != -1 && firstAt < firstColon {
return image[:firstAt], "latest" + image[firstAt:], nil
}
// case: alpine:latest
// case: alpine:latest@sha256:28ef97b8686a0b5399129e9b763d5b7e5ff03576aa5580d6f4182a49c5fe1913
return image[:firstColon], image[firstColon+1:], nil
}
// ImageInspect return low-level information on containers images
func ImageInspect(containerNameOrID, format string) ([]string, error) {
cmd := exec.Command("docker", "image", "inspect",

View File

@@ -0,0 +1,130 @@
/*
Copyright 2019 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package docker
import "testing"
func TestSplitImage(t *testing.T) {
/*
alpine -> (alpine, latest)
alpine:latest -> (alpine, latest)
alpine@sha256:28ef97b8686a0b5399129e9b763d5b7e5ff03576aa5580d6f4182a49c5fe1913 -> (alpine, latest@sha256:28ef97b8686a0b5399129e9b763d5b7e5ff03576aa5580d6f4182a49c5fe1913)
alpine:latest@sha256:28ef97b8686a0b5399129e9b763d5b7e5ff03576aa5580d6f4182a49c5fe1913 -> (alpine, latest@sha256:28ef97b8686a0b5399129e9b763d5b7e5ff03576aa5580d6f4182a49c5fe1913)
*/
cases := []struct {
Image string
ExpectedRegistry string
ExpectedTag string
ExpectError bool
}{
{
Image: "alpine",
ExpectedRegistry: "alpine",
ExpectedTag: "latest",
ExpectError: false,
},
{
Image: "alpine:latest",
ExpectedRegistry: "alpine",
ExpectedTag: "latest",
ExpectError: false,
},
{
Image: "alpine@sha256:28ef97b8686a0b5399129e9b763d5b7e5ff03576aa5580d6f4182a49c5fe1913",
ExpectedRegistry: "alpine",
ExpectedTag: "latest@sha256:28ef97b8686a0b5399129e9b763d5b7e5ff03576aa5580d6f4182a49c5fe1913",
ExpectError: false,
},
{
Image: "alpine:latest@sha256:28ef97b8686a0b5399129e9b763d5b7e5ff03576aa5580d6f4182a49c5fe1913",
ExpectedRegistry: "alpine",
ExpectedTag: "latest@sha256:28ef97b8686a0b5399129e9b763d5b7e5ff03576aa5580d6f4182a49c5fe1913",
ExpectError: false,
},
{
Image: "k8s.gcr.io/coredns:1.1.3",
ExpectedRegistry: "k8s.gcr.io/coredns",
ExpectedTag: "1.1.3",
ExpectError: false,
},
{
Image: "k8s.gcr.io/coredns:1.1.3@sha256:28ef97b8686a0b5399129e9b763d5b7e5ff03576aa5580d6f4182a49c5fe1913",
ExpectedRegistry: "k8s.gcr.io/coredns",
ExpectedTag: "1.1.3@sha256:28ef97b8686a0b5399129e9b763d5b7e5ff03576aa5580d6f4182a49c5fe1913",
ExpectError: false,
},
{
Image: "k8s.gcr.io/coredns:latest@sha256:28ef97b8686a0b5399129e9b763d5b7e5ff03576aa5580d6f4182a49c5fe1913",
ExpectedRegistry: "k8s.gcr.io/coredns",
ExpectedTag: "latest@sha256:28ef97b8686a0b5399129e9b763d5b7e5ff03576aa5580d6f4182a49c5fe1913",
ExpectError: false,
},
{
Image: "k8s.gcr.io/coredns@sha256:28ef97b8686a0b5399129e9b763d5b7e5ff03576aa5580d6f4182a49c5fe1913",
ExpectedRegistry: "k8s.gcr.io/coredns",
ExpectedTag: "latest@sha256:28ef97b8686a0b5399129e9b763d5b7e5ff03576aa5580d6f4182a49c5fe1913",
ExpectError: false,
},
{
Image: ":",
ExpectedRegistry: "",
ExpectedTag: "",
ExpectError: true,
},
{
Image: "@",
ExpectedRegistry: "",
ExpectedTag: "",
ExpectError: true,
},
{
Image: "a@",
ExpectedRegistry: "",
ExpectedTag: "",
ExpectError: true,
},
{
Image: "a:",
ExpectedRegistry: "",
ExpectedTag: "",
ExpectError: true,
},
}
for _, tc := range cases {
tc := tc // capture tc
t.Run(tc.Image, func(t *testing.T) {
t.Parallel()
registry, tag, err := SplitImage(tc.Image)
if err != nil && !tc.ExpectError {
t.Fatalf("Unexpected error: %q", err)
} else if err == nil && tc.ExpectError {
t.Fatalf("Expected error but got nil")
}
if registry != tc.ExpectedRegistry {
t.Fatalf("ExpectedRegistry %q != %q", tc.ExpectedRegistry, registry)
}
if tag != tc.ExpectedTag {
t.Fatalf("ExpectedTag %q != %q", tc.ExpectedTag, tag)
}
})
}
}

View File

@@ -120,3 +120,30 @@ func RunWithStdoutReader(cmd Cmd, readerFunc func(io.Reader) error) error {
}
return nil
}
// RunWithStdinWriter runs cmd with writerFunc piped to stdin
func RunWithStdinWriter(cmd Cmd, writerFunc func(io.Writer) error) error {
pr, pw, err := os.Pipe()
if err != nil {
return err
}
defer pw.Close()
defer pr.Close()
cmd.SetStdin(pr)
errChan := make(chan error, 1)
go func() {
errChan <- writerFunc(pw)
pw.Close()
}()
err = cmd.Run()
if err != nil {
return err
}
err2 := <-errChan
if err2 != nil {
return err2
}
return nil
}