Прежде чем перейти к статье, хочу вам представить, экономическую онлайн игру Brave Knights, в которой вы можете играть и зарабатывать. Регистируйтесь, играйте и зарабатывайте!
Вступление
Данная статья является третьей в цикле (1,2), посвященном изучению исходного кода Docker и прямым продолжением предыдущей статьи, в которой мы начали разбирать код первого публичного релиза Docker v0.1.0. В этой части будет рассмотрена реализация практически всех команд, а в конце, мы создадим образ и запустим докер контейнер на его основе. Для удобства я постарался разбить список команд на условные группы: работа с образами, работа с контейнерами, сетевой стек и т.д.
А теперь, как говорится, “without further ado”, приступим к изучению кода из файла commands.go начиная с команд для управления образами (images).
Управление образами
Import
Команда import позволяет импортировать образ файловой системы из tar архива, подаваемого на stdin, или же загрузить его по url:
CmdImport
func (srv *Server) CmdImport(stdin io.ReadCloser, stdout io.Writer, args ...string) error {
cmd := rcli.Subcmd(stdout, "import", "[OPTIONS] URL|- [REPOSITORY [TAG]]", "Create a new filesystem image from the contents of a tarball")
var archive io.Reader
var resp *http.Response
if err := cmd.Parse(args); err != nil {
return nil
}
src := cmd.Arg(0)
if src == "" {
return errors.New("Not enough arguments")
} else if src == "-" {
archive = stdin
} else {
u, err := url.Parse(src)
if err != nil {
return err
}
if u.Scheme == "" {
u.Scheme = "http"
u.Host = src
u.Path = ""
}
fmt.Fprintf(stdout, "Downloading from %s\n", u.String())
// Download with curl (pretty progress bar)
// If curl is not available, fallback to http.Get()
resp, err = Download(u.String(), stdout)
if err != nil {
return err
}
archive = ProgressReader(resp.Body, int(resp.ContentLength), stdout)
}
img, err := srv.runtime.graph.Create(archive, nil, "Imported from "+src)
if err != nil {
return err
}
// Optionally register the image at REPO/TAG
if repository := cmd.Arg(1); repository != "" {
tag := cmd.Arg(2) // Repository will handle an empty tag properly
if err := srv.runtime.repositories.Set(repository, tag, img.Id, true); err != nil {
return err
}
}
fmt.Fprintln(stdout, img.Id)
return nil
}
После стандартного разбора аргументов функция определяет место откуда нужно импортировать образ: "-" означает stdin, в другом случае аргумент рассматривается, как url. Хелпер методы Download и ProgressReader для загрузки архива по http находятся в файле utils.go:
utils.go
// Request a given URL and return an io.Reader
func Download(url string, stderr io.Writer) (*http.Response, error) {
var resp *http.Response
var err error = nil
if resp, err = http.Get(url); err != nil {
return nil, err
}
if resp.StatusCode >= 400 {
return nil, errors.New("Got HTTP status code >= 400: " + resp.Status)
}
return resp, nil
}
type progressReader struct {
reader io.ReadCloser // Stream to read from
output io.Writer // Where to send progress bar to
read_total int // Expected stream length (bytes)
read_progress int // How much has been read so far (bytes)
last_update int // How many bytes read at least update
}
func (r *progressReader) Read(p []byte) (n int, err error) {
read, err := io.ReadCloser(r.reader).Read(p)
r.read_progress += read
// Only update progress for every 1% read
update_every := int(0.01 * float64(r.read_total))
if r.read_progress-r.last_update > update_every || r.read_progress == r.read_total {
fmt.Fprintf(r.output, "%d/%d (%.0f%%)\r",
r.read_progress,
r.read_total,
float64(r.read_progress)/float64(r.read_total)*100)
r.last_update = r.read_progress
}
// Send newline when complete
if err == io.EOF {
fmt.Fprintf(r.output, "\n")
}
return read, err
}
func (r *progressReader) Close() error {
return io.ReadCloser(r.reader).Close()
}
func ProgressReader(r io.ReadCloser, size int, output io.Writer) *progressReader {
return &progressReader{r, output, size, 0, 0}
}
Далее управление переходит в функцию graph.Create из файла graph.go:
graph.Create
func (graph *Graph) Create(layerData Archive, container *Container, comment string) (*Image, error) {
img := &Image{
Id: GenerateId(),
Comment: comment,
Created: time.Now(),
}
if container != nil {
img.Parent = container.Image
img.Container = container.Id
img.ContainerConfig = *container.Config
}
if err := graph.Register(layerData, img); err != nil {
return nil, err
}
return img, nil
}
Здесь генерируется уникальный идентификатор образа и инициализируется структура Image, которая далее вместе с данными архива передается в метод graph.Register. Если дополнительно передан и контейнер, то ссылка на его образ будет сохранена в поле img.Parent - это используется в команде Commit, создающей новый образ из текущего контейнера. Структура Image и функции для генерации Id на основе SHA256 находятся в файле image.go:
image.go
type Image struct {
Id string `json:"id"`
Parent string `json:"parent,omitempty"`
Comment string `json:"comment,omitempty"`
Created time.Time `json:"created"`
Container string `json:"container,omitempty"`
ContainerConfig Config `json:"container_config,omitempty"`
graph *Graph
}
func GenerateId() string {
// FIXME: don't seed every time
rand.Seed(time.Now().UTC().UnixNano())
randomBytes := bytes.NewBuffer([]byte(fmt.Sprintf("%x", rand.Int())))
id, _ := ComputeId(randomBytes) // can't fail
return id
}
// ComputeId reads from `content` until EOF, then returns a SHA of what it read, as a string.
func ComputeId(content io.Reader) (string, error) {
h := sha256.New()
if _, err := io.Copy(h, content); err != nil {
return "", err
}
return fmt.Sprintf("%x", h.Sum(nil)[:8]), nil
}
Далее взглянем на метод graph.Register:
graph.Register
func (graph *Graph) Register(layerData Archive, img *Image) error {
if err := ValidateId(img.Id); err != nil {
return err
}
// (This is a convenience to save time. Race conditions are taken care of by os.Rename)
if graph.Exists(img.Id) {
return fmt.Errorf("Image %s already exists", img.Id)
}
tmp, err := graph.Mktemp(img.Id)
defer os.RemoveAll(tmp)
if err != nil {
return fmt.Errorf("Mktemp failed: %s", err)
}
if err := StoreImage(img, layerData, tmp); err != nil {
return err
}
// Commit
if err := os.Rename(tmp, graph.imageRoot(img.Id)); err != nil {
return err
}
img.graph = graph
return nil
}
После валидации id на наличие запрещенного символа ":" (так как он является разделителем для тега), создается временная папка для модификаций, а затем вызывается функция StoreImage, в которой происходит создание образа. По завершению временная папка переименовывается в img.Id:
StoreImage
func StoreImage(img *Image, layerData Archive, root string) error {
// Check that root doesn't already exist
if _, err := os.Stat(root); err == nil {
return fmt.Errorf("Image %s already exists", img.Id)
} else if !os.IsNotExist(err) {
return err
}
// Store the layer
layer := layerPath(root)
if err := os.MkdirAll(layer, 0700); err != nil {
return err
}
if err := Untar(layerData, layer); err != nil {
return err
}
// Store the json ball
jsonData, err := json.Marshal(img)
if err != nil {
return err
}
if err := ioutil.WriteFile(jsonPath(root), jsonData, 0600); err != nil {
return err
}
return nil
}
func layerPath(root string) string {
return path.Join(root, "layer")
}
func jsonPath(root string) string {
return path.Join(root, "json")
}
В StoreImage создается директория layer, в которую помещается распакованный при помощи функции Untar архив файловой системы, после чего структура Image экспортируется в json и сохраняется в соседний файл, как метаданные для образа. Функции Tar и Untar для работы с архивами находятся в файле archive.go и представляют собой лишь удобные обертки над утилитой bsdtar:
archive.go
type Archive io.Reader
type Compression uint32
const (
Uncompressed Compression = iota
Bzip2
Gzip
)
func (compression *Compression) Flag() string {
switch *compression {
case Bzip2:
return "j"
case Gzip:
return "z"
}
return ""
}
func Tar(path string, compression Compression) (io.Reader, error) {
cmd := exec.Command("bsdtar", "-f", "-", "-C", path, "-c"+compression.Flag(), ".")
return CmdStream(cmd)
}
func Untar(archive io.Reader, path string) error {
cmd := exec.Command("bsdtar", "-f", "-", "-C", path, "-x")
cmd.Stdin = archive
output, err := cmd.CombinedOutput()
if err != nil {
return errors.New(err.Error() + ": " + string(output))
}
return nil
}
func CmdStream(cmd *exec.Cmd) (io.Reader, error) {
stdout, err := cmd.StdoutPipe()
if err != nil {
return nil, err
}
stderr, err := cmd.StderrPipe()
if err != nil {
return nil, err
}
pipeR, pipeW := io.Pipe()
go func() {
_, err := io.Copy(pipeW, stdout)
if err != nil {
pipeW.CloseWithError(err)
}
errText, e := ioutil.ReadAll(stderr)
if e != nil {
errText = []byte("(...couldn't fetch stderr: " + e.Error() + ")")
}
if err := cmd.Wait(); err != nil {
// FIXME: can this block if stderr outputs more than the size of StderrPipe()'s buffer?
pipeW.CloseWithError(errors.New(err.Error() + ": " + string(errText)))
} else {
pipeW.Close()
}
}()
if err := cmd.Start(); err != nil {
return nil, err
}
return pipeR, nil
}
Отметим, что функция import также может сохранять tag образа, принимая его опциональным параметром, но этот функционал мы рассмотрим позже, когда до него дойдет очередь.
Export
Команда export возвращает экспортированный архив файловой системы контейнера:
CmdExport
func (srv *Server) CmdExport(stdin io.ReadCloser, stdout io.Writer, args ...string) error {
cmd := rcli.Subcmd(stdout,
"export", "CONTAINER",
"Export the contents of a filesystem as a tar archive")
if err := cmd.Parse(args); err != nil {
return nil
}
name := cmd.Arg(0)
if container := srv.runtime.Get(name); container != nil {
data, err := container.Export()
if err != nil {
return err
}
// Stream the entire contents of the container (basically a volatile snapshot)
if _, err := io.Copy(stdout, data); err != nil {
return err
}
return nil
}
return errors.New("No such container: " + name)
}
func (container *Container) Export() (Archive, error) {
if err := container.EnsureMounted(); err != nil {
return nil, err
}
return Tar(container.RootfsPath(), Uncompressed)
}
Функция по переданному имени получает контейнер и вызывает у него метод container.Export, который в свою очередь просто возвращает созданный архив, смонтированной директории Rootfs. Код функции Tar был приведен выше в файле archive.go.
Rmi
Удаляет переданный список образов, вызывая метод graph.Delete:
graph.Delete
func (srv *Server) CmdRmi(stdin io.ReadCloser, stdout io.Writer, args ...string) (err error) {
cmd := rcli.Subcmd(stdout, "rmimage", "[OPTIONS] IMAGE", "Remove an image")
if cmd.Parse(args) != nil || cmd.NArg() < 1 {
cmd.Usage()
return nil
}
for _, name := range cmd.Args() {
if err := srv.runtime.graph.Delete(name); err != nil {
return err
}
}
return nil
}
func (graph *Graph) Delete(id string) error {
garbage, err := graph.Garbage()
if err != nil {
return err
}
return os.Rename(graph.imageRoot(id), garbage.imageRoot(id))
}
func (graph *Graph) Garbage() (*Graph, error) {
return NewGraph(path.Join(graph.Root, ":garbage:"))
}
В реальности, graph.Delete перемещает их в папку :garbage:, для возможности последующего восстановления, но данная функция здесь не используется.
Images
Возвращает таблицу со списком имеющихся образов:
CmdImages
func (srv *Server) CmdImages(stdin io.ReadCloser, stdout io.Writer, args ...string) error {
cmd := rcli.Subcmd(stdout, "images", "[OPTIONS] [NAME]", "List images")
//limit := cmd.Int("l", 0, "Only show the N most recent versions of each image")
quiet := cmd.Bool("q", false, "only show numeric IDs")
fl_a := cmd.Bool("a", false, "show all images")
if err := cmd.Parse(args); err != nil {
return nil
}
if cmd.NArg() > 1 {
cmd.Usage()
return nil
}
var nameFilter string
if cmd.NArg() == 1 {
nameFilter = cmd.Arg(0)
}
w := tabwriter.NewWriter(stdout, 20, 1, 3, ' ', 0)
if !*quiet {
fmt.Fprintf(w, "REPOSITORY\tTAG\tID\tCREATED\tPARENT\n")
}
var allImages map[string]*Image
var err error
if *fl_a {
allImages, err = srv.runtime.graph.Map()
} else {
allImages, err = srv.runtime.graph.Heads()
}
if err != nil {
return err
}
for name, repository := range srv.runtime.repositories.Repositories {
if nameFilter != "" && name != nameFilter {
continue
}
for tag, id := range repository {
image, err := srv.runtime.graph.Get(id)
if err != nil {
log.Printf("Warning: couldn't load %s from %s/%s: %s", id, name, tag, err)
continue
}
delete(allImages, id)
if !*quiet {
for idx, field := range []string{
/* REPOSITORY */ name,
/* TAG */ tag,
/* ID */ id,
/* CREATED */ HumanDuration(time.Now().Sub(image.Created)) + " ago",
/* PARENT */ srv.runtime.repositories.ImageName(image.Parent),
} {
if idx == 0 {
w.Write([]byte(field))
} else {
w.Write([]byte("\t" + field))
}
}
w.Write([]byte{'\n'})
} else {
stdout.Write([]byte(image.Id + "\n"))
}
}
}
// Display images which aren't part of a
if nameFilter == "" {
for id, image := range allImages {
if !*quiet {
for idx, field := range []string{
/* REPOSITORY */ "",
/* TAG */ "",
/* ID */ id,
/* CREATED */ HumanDuration(time.Now().Sub(image.Created)) + " ago",
/* PARENT */ srv.runtime.repositories.ImageName(image.Parent),
} {
if idx == 0 {
w.Write([]byte(field))
} else {
w.Write([]byte("\t" + field))
}
}
w.Write([]byte{'\n'})
} else {
stdout.Write([]byte(image.Id + "\n"))
}
}
}
if !*quiet {
w.Flush()
}
return nil
}
В функции происходит простая итерация по полученным образам, фильтрация на основе имени репозитория и вывод полей в консольную таблицу. Map, Head и вспомогательные к ним функции, формирующие хеш таблицы allImages, находятся в файле graph.go:
graph.go
func (graph *Graph) Map() (map[string]*Image, error) {
// FIXME: this should replace All()
all, err := graph.All()
if err != nil {
return nil, err
}
images := make(map[string]*Image, len(all))
for _, image := range all {
images[image.Id] = image
}
return images, nil
}
func (graph *Graph) All() ([]*Image, error) {
var images []*Image
err := graph.WalkAll(func(image *Image) {
images = append(images, image)
})
return images, err
}
func (graph *Graph) WalkAll(handler func(*Image)) error {
files, err := ioutil.ReadDir(graph.Root)
if err != nil {
return err
}
for _, st := range files {
if img, err := graph.Get(st.Name()); err != nil {
// Skip image
continue
} else if handler != nil {
handler(img)
}
}
return nil
}
func (graph *Graph) ByParent() (map[string][]*Image, error) {
byParent := make(map[string][]*Image)
err := graph.WalkAll(func(image *Image) {
image, err := graph.Get(image.Parent)
if err != nil {
return
}
if children, exists := byParent[image.Parent]; exists {
byParent[image.Parent] = []*Image{image}
} else {
byParent[image.Parent] = append(children, image)
}
})
return byParent, err
}
func (graph *Graph) Heads() (map[string]*Image, error) {
heads := make(map[string]*Image)
byParent, err := graph.ByParent()
if err != nil {
return nil, err
}
err = graph.WalkAll(func(image *Image) {
// If it's not in the byParent lookup table, then
// it's not a parent -> so it's a head!
if _, exists := byParent[image.Id]; !exists {
heads[image.Id] = image
}
})
return heads, err
}
History
Отображает историю образа:
CmdHistory
func (srv *Server) CmdHistory(stdin io.ReadCloser, stdout io.Writer, args ...string) error {
cmd := rcli.Subcmd(stdout, "history", "[OPTIONS] IMAGE", "Show the history of an image")
if cmd.Parse(args) != nil || cmd.NArg() != 1 {
cmd.Usage()
return nil
}
image, err := srv.runtime.repositories.LookupImage(cmd.Arg(0))
if err != nil {
return err
}
w := tabwriter.NewWriter(stdout, 20, 1, 3, ' ', 0)
defer w.Flush()
fmt.Fprintf(w, "ID\tCREATED\tCREATED BY\n")
return image.WalkHistory(func(img *Image) error {
fmt.Fprintf(w, "%s\t%s\t%s\n",
srv.runtime.repositories.ImageName(img.Id),
HumanDuration(time.Now().Sub(img.Created))+" ago",
strings.Join(img.ContainerConfig.Cmd, " "),
)
return nil
})
}
func (img *Image) WalkHistory(handler func(*Image) error) (err error) {
currentImg := img
for currentImg != nil {
if handler != nil {
if err := handler(currentImg); err != nil {
return err
}
}
currentImg, err = currentImg.GetParent()
if err != nil {
return fmt.Errorf("Error while getting parent image: %v", err)
}
}
return nil
}
func (img *Image) GetParent() (*Image, error) {
if img.Parent == "" {
return nil, nil
}
if img.graph == nil {
return nil, fmt.Errorf("Can't lookup parent of unregistered image")
}
return img.graph.Get(img.Parent)
}
После получения структуры образа по переданному имени вызывается метод image.WalkHistory, который по цепочке обходит родительские образы, используя сохраненные ссылки Image.Parent, и выводит информацию в виде таблицы.
Commit
Создает новый образ на основе измененных данных файловой системы контейнера:
CmdCommit
func (srv *Server) CmdCommit(stdin io.ReadCloser, stdout io.Writer, args ...string) error {
cmd := rcli.Subcmd(stdout,
"commit", "[OPTIONS] CONTAINER [REPOSITORY [TAG]]",
"Create a new image from a container's changes")
if err := cmd.Parse(args); err != nil {
return nil
}
containerName, repository, tag := cmd.Arg(0), cmd.Arg(1), cmd.Arg(2)
if containerName == "" {
cmd.Usage()
return nil
}
img, err := srv.runtime.Commit(containerName, repository, tag)
if err != nil {
return err
}
fmt.Fprintln(stdout, img.Id)
return nil
}
// Commit creates a new filesystem image from the current state of a container.
// The image can optionally be tagged into a repository
func (runtime *Runtime) Commit(id, repository, tag string) (*Image, error) {
container := runtime.Get(id)
if container == nil {
return nil, fmt.Errorf("No such container: %s", id)
}
// FIXME: freeze the container before copying it to avoid data corruption?
// FIXME: this shouldn't be in commands.
rwTar, err := container.ExportRw()
if err != nil {
return nil, err
}
// Create a new image from the container's base layers + a new layer from container changes
img, err := runtime.graph.Create(rwTar, container, "")
if err != nil {
return nil, err
}
// Register the image if needed
if repository != "" {
if err := runtime.repositories.Set(repository, tag, img.Id, true); err != nil {
return img, err
}
}
return img, nil
}
Метод Commit получает структуру container по переданному имени, вызывает метод container.ExportRw, который возвращает архив с директорией rw, после чего передает его в метод graph.Create, который мы уже разбирали выше. Если передано имя репозитория и tag, то дополнительно будет создан tag образа. Этот функционал будет разобран ниже в команде Tag.
Tag
Функция создает tag образа в локальном репозитории. Используется при импортировании образа и коммите контейнера:
CmdTag
func (srv *Server) CmdTag(stdin io.ReadCloser, stdout io.Writer, args ...string) error {
cmd := rcli.Subcmd(stdout, "tag", "[OPTIONS] IMAGE REPOSITORY [TAG]", "Tag an image into a repository")
force := cmd.Bool("f", false, "Force")
if err := cmd.Parse(args); err != nil {
return nil
}
if cmd.NArg() < 2 {
cmd.Usage()
return nil
}
return srv.runtime.repositories.Set(cmd.Arg(1), cmd.Arg(2), cmd.Arg(0), *force)
}
После обработки параметров вызывается метод repositories.Set из файла tags.go. Ниже я приведу содержание файла tags.go, который отвечает за весь этот функционал. Общий принцип работы довольно простой - структура TagStore имеет хеш таблицу(map) для соответствия тегов и образов. При добавлении нового тега он проходит валидацию на запрещенные символы и записывается в хеш таблицу. Далее структура TagStore экспортируется в json и сохраняется в файле на диске. При запуске докер структура загружается из этого файла и в дальнейшем на ее основе осуществляется поиск и фильтрация образов по именам и тегам:
tags.go
const DEFAULT_TAG = "latest"
type TagStore struct {
path string
graph *Graph
Repositories map[string]Repository
}
type Repository map[string]string
func NewTagStore(path string, graph *Graph) (*TagStore, error) {
abspath, err := filepath.Abs(path)
if err != nil {
return nil, err
}
store := &TagStore{
path: abspath,
graph: graph,
Repositories: make(map[string]Repository),
}
// Load the json file if it exists, otherwise create it.
if err := store.Reload(); os.IsNotExist(err) {
if err := store.Save(); err != nil {
return nil, err
}
} else if err != nil {
return nil, err
}
return store, nil
}
func (store *TagStore) Save() error {
// Store the json ball
jsonData, err := json.Marshal(store)
if err != nil {
return err
}
if err := ioutil.WriteFile(store.path, jsonData, 0600); err != nil {
return err
}
return nil
}
func (store *TagStore) Reload() error {
jsonData, err := ioutil.ReadFile(store.path)
if err != nil {
return err
}
if err := json.Unmarshal(jsonData, store); err != nil {
return err
}
return nil
}
func (store *TagStore) LookupImage(name string) (*Image, error) {
img, err := store.graph.Get(name)
if err != nil {
// FIXME: standardize on returning nil when the image doesn't exist, and err for everything else
// (so we can pass all errors here)
repoAndTag := strings.SplitN(name, ":", 2)
if len(repoAndTag) == 1 {
repoAndTag = append(repoAndTag, DEFAULT_TAG)
}
if i, err := store.GetImage(repoAndTag[0], repoAndTag[1]); err != nil {
return nil, err
} else if i == nil {
return nil, fmt.Errorf("No such image: %s", name)
} else {
img = i
}
}
return img, nil
}
// Return a reverse-lookup table of all the names which refer to each image
// Eg. {"43b5f19b10584": {"base:latest", "base:v1"}}
func (store *TagStore) ById() map[string][]string {
byId := make(map[string][]string)
for repoName, repository := range store.Repositories {
for tag, id := range repository {
name := repoName + ":" + tag
if _, exists := byId[id]; !exists {
byId[id] = []string{name}
} else {
byId[id] = append(byId[id], name)
}
}
}
return byId
}
func (store *TagStore) ImageName(id string) string {
if names, exists := store.ById()[id]; exists && len(names) > 0 {
return names[0]
}
return id
}
func (store *TagStore) Set(repoName, tag, imageName string, force bool) error {
img, err := store.LookupImage(imageName)
if err != nil {
return err
}
if tag == "" {
tag = DEFAULT_TAG
}
if err := validateRepoName(repoName); err != nil {
return err
}
if err := validateTagName(tag); err != nil {
return err
}
if err := store.Reload(); err != nil {
return err
}
var repo Repository
if r, exists := store.Repositories[repoName]; exists {
repo = r
} else {
repo = make(map[string]string)
if old, exists := store.Repositories[repoName]; exists && !force {
return fmt.Errorf("Tag %s:%s is already set to %s", repoName, tag, old)
}
store.Repositories[repoName] = repo
}
repo[tag] = img.Id
return store.Save()
}
func (store *TagStore) Get(repoName string) (Repository, error) {
if err := store.Reload(); err != nil {
return nil, err
}
if r, exists := store.Repositories[repoName]; exists {
return r, nil
}
return nil, nil
}
func (store *TagStore) GetImage(repoName, tag string) (*Image, error) {
repo, err := store.Get(repoName)
if err != nil {
return nil, err
} else if repo == nil {
return nil, nil
}
if revision, exists := repo[tag]; exists {
return store.graph.Get(revision)
}
return nil, nil
}
// Validate the name of a repository
func validateRepoName(name string) error {
if name == "" {
return fmt.Errorf("Repository name can't be empty")
}
if strings.Contains(name, ":") {
return fmt.Errorf("Illegal repository name: %s", name)
}
return nil
}
// Validate the name of a tag
func validateTagName(name string) error {
if name == "" {
return fmt.Errorf("Tag name can't be empty")
}
if strings.Contains(name, "/") || strings.Contains(name, ":") {
return fmt.Errorf("Illegal tag name: %s", name)
}
return nil
}
Управление контейнерами
Run
Создает и запускает контейнер на основе заданного образа:
CmdRun
func (srv *Server) CmdRun(stdin io.ReadCloser, stdout io.Writer, args ...string) error {
config, err := ParseRun(args)
if err != nil {
return err
}
if config.Image == "" {
return fmt.Errorf("Image not specified")
}
if len(config.Cmd) == 0 {
return fmt.Errorf("Command not specified")
}
// Create new container
container, err := srv.runtime.Create(config)
if err != nil {
return errors.New("Error creating container: " + err.Error())
}
if config.OpenStdin {
cmd_stdin, err := container.StdinPipe()
if err != nil {
return err
}
if !config.Detach {
Go(func() error {
_, err := io.Copy(cmd_stdin, stdin)
cmd_stdin.Close()
return err
})
}
}
// Run the container
if !config.Detach {
cmd_stderr, err := container.StderrPipe()
if err != nil {
return err
}
cmd_stdout, err := container.StdoutPipe()
if err != nil {
return err
}
if err := container.Start(); err != nil {
return err
}
sending_stdout := Go(func() error {
_, err := io.Copy(stdout, cmd_stdout)
return err
})
sending_stderr := Go(func() error {
_, err := io.Copy(stdout, cmd_stderr)
return err
})
err_sending_stdout := <-sending_stdout
err_sending_stderr := <-sending_stderr
if err_sending_stdout != nil {
return err_sending_stdout
}
if err_sending_stderr != nil {
return err_sending_stderr
}
container.Wait()
} else {
if err := container.Start(); err != nil {
return err
}
fmt.Fprintln(stdout, container.Id)
}
return nil
}
В начале функция ParseRun производит разбор параметров и инициализацию структуры Config:
ParseRun
func ParseRun(args []string) (*Config, error) {
cmd := flag.NewFlagSet("", flag.ContinueOnError)
cmd.SetOutput(ioutil.Discard)
fl_user := cmd.String("u", "", "Username or UID")
fl_detach := cmd.Bool("d", false, "Detached mode: leave the container running in the background")
fl_stdin := cmd.Bool("i", false, "Keep stdin open even if not attached")
fl_tty := cmd.Bool("t", false, "Allocate a pseudo-tty")
fl_memory := cmd.Int64("m", 0, "Memory limit (in bytes)")
var fl_ports ports
cmd.Var(&fl_ports, "p", "Map a network port to the container")
var fl_env ListOpts
cmd.Var(&fl_env, "e", "Set environment variables")
if err := cmd.Parse(args); err != nil {
return nil, err
}
config := &Config{
Ports: fl_ports,
User: *fl_user,
Tty: *fl_tty,
OpenStdin: *fl_stdin,
Memory: *fl_memory,
Detach: *fl_detach,
Env: fl_env,
Cmd: cmd.Args()[1:],
Image: cmd.Arg(0),
}
return config, nil
}
На основании Config функция runtime.Create создает и возвращает новый контейнер, после чего при помощи пайпов и каналов перенаправляет потоки stdin, stdout, stderr, а затем запускает созданный контейнер методом container.Start.
runtime.Create
func (runtime *Runtime) Create(config *Config) (*Container, error) {
// Lookup image
img, err := runtime.repositories.LookupImage(config.Image)
if err != nil {
return nil, err
}
container := &Container{
// FIXME: we should generate the ID here instead of receiving it as an argument
Id: GenerateId(),
Created: time.Now(),
Path: config.Cmd[0],
Args: config.Cmd[1:], //FIXME: de-duplicate from config
Config: config,
Image: img.Id, // Always use the resolved image id
NetworkSettings: &NetworkSettings{},
// FIXME: do we need to store this in the container?
SysInitPath: sysInitPath,
}
container.root = runtime.containerRoot(container.Id)
// Step 1: create the container directory.
// This doubles as a barrier to avoid race conditions.
if err := os.Mkdir(container.root, 0700); err != nil {
return nil, err
}
// Step 2: save the container json
if err := container.ToDisk(); err != nil {
return nil, err
}
// Step 3: register the container
if err := runtime.Register(container); err != nil {
return nil, err
}
return container, nil
}
Здесь происходит инициализация структуры Container, создание рабочей директории контейнера и экспорт структуры в json формате. Завершает это вызов метода runtime.Register, код которого мы разбирали в прошлой статье. Функция GenerateId была рассмотрена ранее в разделе по команде import. Теперь перейдем к методу container.Start:
container.Start
func (container *Container) Start() error {
if err := container.EnsureMounted(); err != nil {
return err
}
if err := container.allocateNetwork(); err != nil {
return err
}
if err := container.generateLXCConfig(); err != nil {
return err
}
params := []string{
"-n", container.Id,
"-f", container.lxcConfigPath(),
"--",
"/sbin/init",
}
// Networking
params = append(params, "-g", container.network.Gateway.String())
// User
if container.Config.User != "" {
params = append(params, "-u", container.Config.User)
}
// Program
params = append(params, "--", container.Path)
params = append(params, container.Args...)
container.cmd = exec.Command("/usr/bin/lxc-start", params...)
// Setup environment
container.cmd.Env = append(
[]string{
"HOME=/",
"PATH=/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin",
},
container.Config.Env...,
)
var err error
if container.Config.Tty {
err = container.startPty()
} else {
err = container.start()
}
if err != nil {
return err
}
// FIXME: save state on disk *first*, then converge
// this way disk state is used as a journal, eg. we can restore after crash etc.
container.State.setRunning(container.cmd.Process.Pid)
container.ToDisk()
go container.monitor()
return nil
}
Основная логика его работы практически не изменилась со времен первого коммита, который мы разбирали в первой статье. Изменения коснулись части отвечающей за монтирование файловой системы, а также был добавлен код для сетевого стека и новый метод запуска процесса в контейнере. Разберем все по порядку, начиная с монтирования файловой системы:
EnsureMounted
func (container *Container) EnsureMounted() error {
if mounted, err := container.Mounted(); err != nil {
return err
} else if mounted {
return nil
}
return container.Mount()
}
func (container *Container) Mounted() (bool, error) {
return Mounted(container.RootfsPath())
}
func Mounted(mountpoint string) (bool, error) {
mntpoint, err := os.Stat(mountpoint)
if err != nil {
if os.IsNotExist(err) {
return false, nil
}
return false, err
}
parent, err := os.Stat(filepath.Join(mountpoint, ".."))
if err != nil {
return false, err
}
mntpointSt := mntpoint.Sys().(*syscall.Stat_t)
parentSt := parent.Sys().(*syscall.Stat_t)
return mntpointSt.Dev != parentSt.Dev, nil
}
Метод container.EnsureMounted проверяет была ли смонтирована файловая система, в противном случае - выполняет монтирование вызовом container.Mount():
container.Mount
func (container *Container) Mount() error {
image, err := container.GetImage()
if err != nil {
return err
}
return image.Mount(container.RootfsPath(), container.rwPath())
}
func (image *Image) Mount(root, rw string) error {
if mounted, err := Mounted(root); err != nil {
return err
} else if mounted {
return fmt.Errorf("%s is already mounted", root)
}
layers, err := image.layers()
if err != nil {
return err
}
// Create the target directories if they don't exist
if err := os.Mkdir(root, 0755); err != nil && !os.IsExist(err) {
return err
}
if err := os.Mkdir(rw, 0755); err != nil && !os.IsExist(err) {
return err
}
// FIXME: @creack shouldn't we do this after going over changes?
if err := MountAUFS(layers, rw, root); err != nil {
return err
}
// FIXME: Create tests for deletion
// FIXME: move this part to change.go
// Retrieve the changeset from the parent and apply it to the container
// - Retrieve the changes
changes, err := Changes(layers, layers[0])
if err != nil {
return err
}
// Iterate on changes
for _, c := range changes {
// If there is a delete
if c.Kind == ChangeDelete {
// Make sure the directory exists
file_path, file_name := path.Dir(c.Path), path.Base(c.Path)
if err := os.MkdirAll(path.Join(rw, file_path), 0755); err != nil {
return err
}
// And create the whiteout (we just need to create empty file, discard the return)
if _, err := os.Create(path.Join(path.Join(rw, file_path),
".wh."+path.Base(file_name))); err != nil {
return err
}
}
}
return nil
}
Подготовка параметров для монтирования производится функцией MountAUFS, логика которой аналогична первой версии, только теперь вместо утилиты mount монтирование производится системным вызовом:
MountAUFS
func MountAUFS(ro []string, rw string, target string) error {
// FIXME: Now mount the layers
rwBranch := fmt.Sprintf("%v=rw", rw)
roBranches := ""
for _, layer := range ro {
roBranches += fmt.Sprintf("%v=ro:", layer)
}
branches := fmt.Sprintf("br:%v:%v", rwBranch, roBranches)
return mount("none", target, "aufs", 0, branches)
}
func mount(source string, target string, fstype string, flags uintptr, data string) (err error) {
return syscall.Mount(source, target, fstype, flags, data)
}
После монтирования функция получает изменения файловой системы с помощью команды Changes и производит физическое удаление файлов в папке rw, если они были удалены в верхнем слое. Алгоритм работы этой функции будет рассмотрен в разделе команды Diff, вычисляющей изменения в файловой системе. Теперь перейдем к инициализации сети вызовом container.allocateNetwork:
container.allocateNetwork
func (container *Container) allocateNetwork() error {
iface, err := container.runtime.networkManager.Allocate()
if err != nil {
return err
}
container.NetworkSettings.PortMapping = make(map[string]string)
for _, port := range container.Config.Ports {
if extPort, err := iface.AllocatePort(port); err != nil {
iface.Release()
return err
} else {
container.NetworkSettings.PortMapping[strconv.Itoa(port)] = strconv.Itoa(extPort)
}
}
container.network = iface
container.NetworkSettings.IpAddress = iface.IPNet.IP.String()
container.NetworkSettings.IpPrefixLen, _ = iface.IPNet.Mask.Size()
container.NetworkSettings.Gateway = iface.Gateway.String()
return nil
}
В нем происходит настройка сетевого интерфейса, присвоение ip адреса, маски и шлюза, а также проброс портов. Я подробно разберу весь этот функционал ниже, в отдельной части по работе с сетевым стеком. Далее идет вызов метода generateLXCConfig. Он был подробно разобран в первой статье и остался практически без изменений. Стоит лишь отметить, что теперь в lxc_template.go добавлены настройки сети, монтирование /etc/resolv.conf для работы dns и, главное, монтирование исполняемого файла docker в точку /sbin/init, так как теперь выполнение процесса будет начинаться с него. Я уже обращал на это внимание в части 2.1. Ниже приведены изменения в lxc_template:
lxc_template.go
# network configuration
lxc.network.type = veth
lxc.network.flags = up
lxc.network.link = lxcbr0
lxc.network.name = eth0
lxc.network.mtu = 1500
lxc.network.ipv4 = {{.NetworkSettings.IpAddress}}/{{.NetworkSettings.IpPrefixLen}}
# Inject docker-init
lxc.mount.entry = {{.SysInitPath}} {{$ROOTFS}}/sbin/init none bind,ro 0 0
# In order to get a working DNS environment, mount bind (ro) the host's /etc/resolv.conf into the container
lxc.mount.entry = /etc/resolv.conf {{$ROOTFS}}/etc/resolv.conf none bind,ro 0 0
Теперь осталось разобрать файл sysinit.go, с которого стартует созданный lxc контейнер:
sysinit.go
// Setup networking
func setupNetworking(gw string) {
if gw == "" {
return
}
cmd := exec.Command("/sbin/route", "add", "default", "gw", gw)
if err := cmd.Run(); err != nil {
log.Fatalf("Unable to set up networking: %v", err)
}
}
// Takes care of dropping privileges to the desired user
func changeUser(u string) {
if u == "" {
return
}
userent, err := user.LookupId(u)
if err != nil {
userent, err = user.Lookup(u)
}
if err != nil {
log.Fatalf("Unable to find user %v: %v", u, err)
}
uid, err := strconv.Atoi(userent.Uid)
if err != nil {
log.Fatalf("Invalid uid: %v", userent.Uid)
}
gid, err := strconv.Atoi(userent.Gid)
if err != nil {
log.Fatalf("Invalid gid: %v", userent.Gid)
}
if err := syscall.Setgid(gid); err != nil {
log.Fatalf("setgid failed: %v", err)
}
if err := syscall.Setuid(uid); err != nil {
log.Fatalf("setuid failed: %v", err)
}
}
func executeProgram(name string, args []string) {
path, err := exec.LookPath(name)
if err != nil {
log.Printf("Unable to locate %v", name)
os.Exit(127)
}
if err := syscall.Exec(path, args, os.Environ()); err != nil {
panic(err)
}
}
// Sys Init code
// This code is run INSIDE the container and is responsible for setting
// up the environment before running the actual process
func SysInit() {
if len(os.Args) <= 1 {
fmt.Println("You should not invoke docker-init manually")
os.Exit(1)
}
var u = flag.String("u", "", "username or uid")
var gw = flag.String("g", "", "gateway address")
flag.Parse()
setupNetworking(*gw)
changeUser(*u)
executeProgram(flag.Arg(0), flag.Args())
}
Как видим, в SysInit происходит настройка окружения перед запуском процесса. Добавление default gateway в таблицу маршрутизации, настройка пользователя и группы, под которыми будет выполняться процесс, и собственно запуск процесса стандартным методом Exec.
После запуска процесса в методе Start идет перенаправление стандартных потоков и запуск горутины container.monitor, работу которой мы разбирали в первой статье. Можно лишь добавить, что теперь в ней происходит освобождение назначенного ip адреса, проброшенных портов и размонтирование файловой системы.
container.monitor
func (container *Container) monitor() {
// Wait for the program to exit
container.cmd.Wait()
exitCode := container.cmd.ProcessState.Sys().(syscall.WaitStatus).ExitStatus()
// Cleanup
if err := container.releaseNetwork(); err != nil {
log.Printf("%v: Failed to release network: %v", container.Id, err)
}
container.stdout.Close()
container.stderr.Close()
if err := container.Unmount(); err != nil {
log.Printf("%v: Failed to umount filesystem: %v", container.Id, err)
}
// Re-create a brand new stdin pipe once the container exited
if container.Config.OpenStdin {
container.stdin, container.stdinPipe = io.Pipe()
}
// Report status back
container.State.setStopped(exitCode)
container.ToDisk()
}
func (container *Container) releaseNetwork() error {
err := container.network.Release()
container.network = nil
container.NetworkSettings = &NetworkSettings{}
return err
}
func (container *Container) Unmount() error {
return Unmount(container.RootfsPath())
}
func Unmount(target string) error {
if err := syscall.Unmount(target, 0); err != nil {
return err
}
// Even though we just unmounted the filesystem, AUFS will prevent deleting the mntpoint
// for some time. We'll just keep retrying until it succeeds.
for retries := 0; retries < 1000; retries++ {
err := os.Remove(target)
if err == nil {
// rm mntpoint succeeded
return nil
}
if os.IsNotExist(err) {
// mntpoint doesn't exist anymore. Success.
return nil
}
// fmt.Printf("(%v) Remove %v returned: %v\n", retries, target, err)
time.Sleep(10 * time.Millisecond)
}
return fmt.Errorf("Umount: Failed to umount %v", target)
}
Неожиданное завершение
Ввиду того, что редактор хабра начал сильно тормозить, а местами и полностью зависать, (видимо сказывается большое количество снипетов кода), я вынужден завершить эту часть, но продолжу в следующей, прямо с этого места.