HEX
Server: Apache
System: Linux info 3.0 #1337 SMP Tue Jan 01 00:00:00 CEST 2000 all GNU/Linux
User: u90323915 (5560665)
PHP: 7.4.33
Disabled: NONE
Upload Files
File: //kunden/proc/self/root/lib/python3/dist-packages/breezy/plugins/rewrite/rebase.py
# Copyright (C) 2006-2007 by Jelmer Vernooij
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA

"""Rebase."""

from __future__ import absolute_import

import os

from ... import (
    config as _mod_config,
    osutils,
    )
from ...errors import (
    BzrError,
    NoSuchFile,
    UnknownFormatError,
    NoCommonAncestor,
    UnrelatedBranches,
    )
from ...bzr.generate_ids import gen_revision_id
from ...graph import FrozenHeadsCache
from ...merge import Merger
from ...revision import NULL_REVISION
from ...trace import mutter
from ...tsort import topo_sort
from ...tree import TreeChange
from ... import ui

from .maptree import (
    MapTree,
    map_file_ids,
    )

REBASE_PLAN_FILENAME = 'rebase-plan'
REBASE_CURRENT_REVID_FILENAME = 'rebase-current'
REBASE_PLAN_VERSION = 1
REVPROP_REBASE_OF = 'rebase-of'

class RebaseState(object):

    def has_plan(self):
        """Check whether there is a rebase plan present.

        :return: boolean
        """
        raise NotImplementedError(self.has_plan)

    def read_plan(self):
        """Read a rebase plan file.

        :return: Tuple with last revision info and replace map.
        """
        raise NotImplementedError(self.read_plan)

    def write_plan(self, replace_map):
        """Write a rebase plan file.

        :param replace_map: Replace map (old revid -> (new revid, new parents))
        """
        raise NotImplementedError(self.write_plan)

    def remove_plan(self):
        """Remove a rebase plan file.
        """
        raise NotImplementedError(self.remove_plan)

    def write_active_revid(self, revid):
        """Write the id of the revision that is currently being rebased.

        :param revid: Revision id to write
        """
        raise NotImplementedError(self.write_active_revid)

    def read_active_revid(self):
        """Read the id of the revision that is currently being rebased.

        :return: Id of the revision that is being rebased.
        """
        raise NotImplementedError(self.read_active_revid)


class RebaseState1(RebaseState):

    def __init__(self, wt):
        self.wt = wt
        self.transport = wt._transport

    def has_plan(self):
        """See `RebaseState`."""
        try:
            return self.transport.get_bytes(REBASE_PLAN_FILENAME) != b''
        except NoSuchFile:
            return False

    def read_plan(self):
        """See `RebaseState`."""
        text = self.transport.get_bytes(REBASE_PLAN_FILENAME)
        if text == b'':
            raise NoSuchFile(REBASE_PLAN_FILENAME)
        return unmarshall_rebase_plan(text)

    def write_plan(self, replace_map):
        """See `RebaseState`."""
        self.wt.update_feature_flags({b"rebase-v1": b"write-required"})
        content = marshall_rebase_plan(
            self.wt.branch.last_revision_info(), replace_map)
        assert isinstance(content, bytes)
        self.transport.put_bytes(REBASE_PLAN_FILENAME, content)

    def remove_plan(self):
        """See `RebaseState`."""
        self.wt.update_feature_flags({b"rebase-v1": None})
        self.transport.put_bytes(REBASE_PLAN_FILENAME, b'')

    def write_active_revid(self, revid):
        """See `RebaseState`."""
        if revid is None:
            revid = NULL_REVISION
        assert isinstance(revid, bytes)
        self.transport.put_bytes(REBASE_CURRENT_REVID_FILENAME, revid)

    def read_active_revid(self):
        """See `RebaseState`."""
        try:
            text = self.transport.get_bytes(REBASE_CURRENT_REVID_FILENAME).rstrip(b"\n")
            if text == NULL_REVISION:
                return None
            return text
        except NoSuchFile:
            return None


def marshall_rebase_plan(last_rev_info, replace_map):
    """Marshall a rebase plan.

    :param last_rev_info: Last revision info tuple.
    :param replace_map: Replace map (old revid -> (new revid, new parents))
    :return: string
    """
    ret = b"# Bazaar rebase plan %d\n" % REBASE_PLAN_VERSION
    ret += b"%d %s\n" % last_rev_info
    for oldrev in replace_map:
        (newrev, newparents) = replace_map[oldrev]
        ret += b"%s %s" % (oldrev, newrev) + \
            b"".join([b" %s" % p for p in newparents]) + b"\n"
    return ret


def unmarshall_rebase_plan(text):
    """Unmarshall a rebase plan.

    :param text: Text to parse
    :return: Tuple with last revision info, replace map.
    """
    lines = text.split(b'\n')
    # Make sure header is there
    if lines[0] != b"# Bazaar rebase plan %d" % REBASE_PLAN_VERSION:
        raise UnknownFormatError(lines[0])

    pts = lines[1].split(b" ", 1)
    last_revision_info = (int(pts[0]), pts[1])
    replace_map = {}
    for l in lines[2:]:
        if l == b"":
            # Skip empty lines
            continue
        pts = l.split(b" ")
        replace_map[pts[0]] = (pts[1], tuple(pts[2:]))
    return (last_revision_info, replace_map)


def regenerate_default_revid(repository, revid):
    """Generate a revision id for the rebase of an existing revision.

    :param repository: Repository in which the revision is present.
    :param revid: Revision id of the revision that is being rebased.
    :return: new revision id."""
    if revid == NULL_REVISION:
        return NULL_REVISION
    rev = repository.get_revision(revid)
    return gen_revision_id(rev.committer, rev.timestamp)


def generate_simple_plan(todo_set, start_revid, stop_revid, onto_revid, graph,
                         generate_revid, skip_full_merged=False):
    """Create a simple rebase plan that replays history based
    on one revision being replayed on top of another.

    :param todo_set: A set of revisions to rebase. Only the revisions
        topologically between stop_revid and start_revid (inclusive) are
        rebased; other revisions are ignored (and references to them are
        preserved).
    :param start_revid: Id of revision at which to start replaying
    :param stop_revid: Id of revision until which to stop replaying
    :param onto_revid: Id of revision on top of which to replay
    :param graph: Graph object
    :param generate_revid: Function for generating new revision ids
    :param skip_full_merged: Skip revisions that merge already merged
                             revisions.

    :return: replace map
    """
    assert start_revid is None or start_revid in todo_set, \
        "invalid start revid(%r), todo_set(%r)" % (start_revid, todo_set)
    assert stop_revid is None or stop_revid in todo_set, "invalid stop_revid"
    replace_map = {}
    parent_map = graph.get_parent_map(todo_set)
    order = topo_sort(parent_map)
    if stop_revid is None:
        stop_revid = order[-1]
    if start_revid is None:
        # We need a common base.
        lca = graph.find_lca(stop_revid, onto_revid)
        if lca == set([NULL_REVISION]):
            raise UnrelatedBranches()
        start_revid = order[0]
    todo = order[order.index(start_revid):order.index(stop_revid) + 1]
    heads_cache = FrozenHeadsCache(graph)
    # XXX: The output replacemap'd parents should get looked up in some manner
    # by the heads cache? RBC 20080719
    for oldrevid in todo:
        oldparents = parent_map[oldrevid]
        assert isinstance(oldparents, tuple), "not tuple: %r" % oldparents
        parents = []
        # Left parent:
        if heads_cache.heads((oldparents[0], onto_revid)) == set((onto_revid,)):
            parents.append(onto_revid)
        elif oldparents[0] in replace_map:
            parents.append(replace_map[oldparents[0]][0])
        else:
            parents.append(onto_revid)
            parents.append(oldparents[0])
        # Other parents:
        if len(oldparents) > 1:
            additional_parents = heads_cache.heads(oldparents[1:])
            for oldparent in oldparents[1:]:
                if oldparent in additional_parents:
                    if heads_cache.heads((oldparent, onto_revid)) == set((onto_revid,)):
                        pass
                    elif oldparent in replace_map:
                        newparent = replace_map[oldparent][0]
                        if parents[0] == onto_revid:
                            parents[0] = newparent
                        else:
                            parents.append(newparent)
                    else:
                        parents.append(oldparent)
            if len(parents) == 1 and skip_full_merged:
                continue
        parents = tuple(parents)
        newrevid = generate_revid(oldrevid, parents)
        assert newrevid != oldrevid, "old and newrevid equal (%r)" % newrevid
        assert isinstance(parents, tuple), "parents not tuple: %r" % parents
        replace_map[oldrevid] = (newrevid, parents)
    return replace_map


def generate_transpose_plan(ancestry, renames, graph, generate_revid):
    """Create a rebase plan that replaces a bunch of revisions
    in a revision graph.

    :param ancestry: Ancestry to consider
    :param renames: Renames of revision
    :param graph: Graph object
    :param generate_revid: Function for creating new revision ids
    """
    replace_map = {}
    todo = []
    children = {}
    parent_map = {}
    for r, ps in ancestry:
        if r not in children:
            children[r] = []
        if ps is None: # Ghost
            continue
        parent_map[r] = ps
        if r not in children:
            children[r] = []
        for p in ps:
            if p not in children:
                children[p] = []
            children[p].append(r)

    parent_map.update(graph.get_parent_map(filter(lambda x: x not in parent_map, renames.values())))

    # todo contains a list of revisions that need to
    # be rewritten
    for r, v in renames.items():
        replace_map[r] = (v, parent_map[v])
        todo.append(r)

    total = len(todo)
    processed = set()
    i = 0
    pb = ui.ui_factory.nested_progress_bar()
    try:
        while len(todo) > 0:
            r = todo.pop()
            processed.add(r)
            i += 1
            pb.update('determining dependencies', i, total)
            # Add entry for them in replace_map
            for c in children[r]:
                if c in renames:
                    continue
                if c in replace_map:
                    parents = replace_map[c][1]
                else:
                    parents = parent_map[c]
                assert isinstance(parents, tuple), \
                    "Expected tuple of parents, got: %r" % parents
                # replace r in parents with replace_map[r][0]
                if not replace_map[r][0] in parents:
                    parents = list(parents)
                    parents[parents.index(r)] = replace_map[r][0]
                    parents = tuple(parents)
                replace_map[c] = (generate_revid(c, tuple(parents)),
                                  tuple(parents))
                if replace_map[c][0] == c:
                    del replace_map[c]
                elif c not in processed:
                    todo.append(c)
    finally:
        pb.finished()

    # Remove items from the map that already exist
    for revid in renames:
        if revid in replace_map:
            del replace_map[revid]

    return replace_map


def rebase_todo(repository, replace_map):
    """Figure out what revisions still need to be rebased.

    :param repository: Repository that contains the revisions
    :param replace_map: Replace map
    """
    for revid, parent_ids in replace_map.items():
        assert isinstance(parent_ids, tuple), "replace map parents not tuple"
        if not repository.has_revision(parent_ids[0]):
            yield revid


def rebase(repository, replace_map, revision_rewriter):
    """Rebase a working tree according to the specified map.

    :param repository: Repository that contains the revisions
    :param replace_map: Dictionary with revisions to (optionally) rewrite
    :param merge_fn: Function for replaying a revision
    """
    # Figure out the dependencies
    graph = repository.get_graph()
    todo = list(graph.iter_topo_order(replace_map.keys()))
    pb = ui.ui_factory.nested_progress_bar()
    try:
        for i, revid in enumerate(todo):
            pb.update('rebase revisions', i, len(todo))
            (newrevid, newparents) = replace_map[revid]
            assert isinstance(newparents, tuple), "Expected tuple for %r" % newparents
            if repository.has_revision(newrevid):
                # Was already converted, no need to worry about it again
                continue
            revision_rewriter(revid, newrevid, newparents)
    finally:
        pb.finished()


def wrap_iter_changes(old_iter_changes, map_tree):
    for change in old_iter_changes:
        if change.parent_id[0] is not None:
            old_parent = map_tree.new_id(change.parent_id[0])
        else:
            old_parent = change.parent_id[0]
        if change.parent_id[1] is not None:
            new_parent = map_tree.new_id(change.parent_id[1])
        else:
            new_parent = change.parent_id[1]
        yield TreeChange(
            map_tree.new_id(change.file_id), change.path,
            change.changed_content, change.versioned,
            (old_parent, new_parent), change.name, change.kind,
            change.executable)


class CommitBuilderRevisionRewriter(object):
    """Revision rewriter that use commit builder.

    :ivar repository: Repository in which the revision is present.
    """

    def __init__(self, repository, map_ids=True):
        self.repository = repository
        self.map_ids = map_ids

    def _get_present_revisions(self, revids):
        return tuple([p for p in revids if self.repository.has_revision(p)])

    def __call__(self, oldrevid, newrevid, new_parents):
        """Replay a commit by simply commiting the same snapshot with different
        parents.

        :param oldrevid: Revision id of the revision to copy.
        :param newrevid: Revision id of the revision to create.
        :param new_parents: Revision ids of the new parent revisions.
        """
        assert isinstance(new_parents, tuple), "CommitBuilderRevisionRewriter: Expected tuple for %r" % new_parents
        mutter('creating copy %r of %r with new parents %r',
               newrevid, oldrevid, new_parents)
        oldrev = self.repository.get_revision(oldrevid)

        revprops = dict(oldrev.properties)
        revprops[REVPROP_REBASE_OF] = oldrevid.decode('utf-8')

        # Check what new_ie.file_id should be
        # use old and new parent trees to generate new_id map
        nonghost_oldparents = self._get_present_revisions(oldrev.parent_ids)
        nonghost_newparents = self._get_present_revisions(new_parents)
        oldtree = self.repository.revision_tree(oldrevid)
        if self.map_ids:
            fileid_map = map_file_ids(
                self.repository, nonghost_oldparents,
                nonghost_newparents)
            mappedtree = MapTree(oldtree, fileid_map)
        else:
            mappedtree = oldtree

        try:
            old_base = nonghost_oldparents[0]
        except IndexError:
            old_base = NULL_REVISION
        try:
            new_base = new_parents[0]
        except IndexError:
            new_base = NULL_REVISION
        old_base_tree = self.repository.revision_tree(old_base)
        old_iter_changes = oldtree.iter_changes(old_base_tree)
        iter_changes = wrap_iter_changes(old_iter_changes, mappedtree)
        builder = self.repository.get_commit_builder(
            branch=None, parents=new_parents, committer=oldrev.committer,
            timestamp=oldrev.timestamp, timezone=oldrev.timezone,
            revprops=revprops, revision_id=newrevid,
            config_stack=_mod_config.GlobalStack())
        try:
            for (relpath, fs_hash) in builder.record_iter_changes(
                    mappedtree, new_base, iter_changes):
                pass
            builder.finish_inventory()
            return builder.commit(oldrev.message)
        except:
            builder.abort()
            raise


class WorkingTreeRevisionRewriter(object):

    def __init__(self, wt, state, merge_type=None):
        """
        :param wt: Working tree in which to do the replays.
        """
        self.wt = wt
        self.graph = self.wt.branch.repository.get_graph()
        self.state = state
        self.merge_type = merge_type

    def __call__(self, oldrevid, newrevid, newparents):
        """Replay a commit in a working tree, with a different base.

        :param oldrevid: Old revision id
        :param newrevid: New revision id
        :param newparents: New parent revision ids
        """
        repository = self.wt.branch.repository
        if self.merge_type is None:
            from ...merge import Merge3Merger
            merge_type = Merge3Merger
        else:
            merge_type = self.merge_type
        oldrev = self.wt.branch.repository.get_revision(oldrevid)
        # Make sure there are no conflicts or pending merges/changes
        # in the working tree
        complete_revert(self.wt, [newparents[0]])
        assert not self.wt.changes_from(self.wt.basis_tree()).has_changed(), "Changes in rev"

        oldtree = repository.revision_tree(oldrevid)
        self.state.write_active_revid(oldrevid)
        merger = Merger(self.wt.branch, this_tree=self.wt)
        merger.set_other_revision(oldrevid, self.wt.branch)
        base_revid = self.determine_base(
            oldrevid, oldrev.parent_ids, newrevid, newparents)
        mutter('replaying %r as %r with base %r and new parents %r' %
               (oldrevid, newrevid, base_revid, newparents))
        merger.set_base_revision(base_revid, self.wt.branch)
        merger.merge_type = merge_type
        merger.do_merge()
        for newparent in newparents[1:]:
            self.wt.add_pending_merge(newparent)
        self.commit_rebase(oldrev, newrevid)
        self.state.write_active_revid(None)

    def determine_base(self, oldrevid, oldparents, newrevid, newparents):
        """Determine the base for replaying a revision using merge.

        :param oldrevid: Revid of old revision.
        :param oldparents: List of old parents revids.
        :param newrevid: Revid of new revision.
        :param newparents: List of new parents revids.
        :return: Revision id of the new new revision.
        """
        # If this was the first commit, no base is needed
        if len(oldparents) == 0:
            return NULL_REVISION

        # In the case of a "simple" revision with just one parent,
        # that parent should be the base
        if len(oldparents) == 1:
            return oldparents[0]

        # In case the rhs parent(s) of the origin revision has already been
        # merged in the new branch, use diff between rhs parent and diff from
        # original revision
        if len(newparents) == 1:
            # FIXME: Find oldparents entry that matches newparents[0]
            # and return it
            return oldparents[1]

        try:
            return self.graph.find_unique_lca(*[oldparents[0], newparents[1]])
        except NoCommonAncestor:
            return oldparents[0]

    def commit_rebase(self, oldrev, newrevid):
        """Commit a rebase.

        :param oldrev: Revision info of new revision to commit.
        :param newrevid: New revision id."""
        assert oldrev.revision_id != newrevid, "Invalid revid %r" % newrevid
        revprops = dict(oldrev.properties)
        revprops[REVPROP_REBASE_OF] = oldrev.revision_id.decode('utf-8')
        committer = self.wt.branch.get_config().username()
        authors = oldrev.get_apparent_authors()
        if oldrev.committer == committer:
            # No need to explicitly record the authors if the original
            # committer is rebasing.
            if [oldrev.committer] == authors:
                authors = None
        else:
            if oldrev.committer not in authors:
                authors.append(oldrev.committer)
        if 'author' in revprops:
            del revprops['author']
        if 'authors' in revprops:
            del revprops['authors']
        self.wt.commit(
            message=oldrev.message, timestamp=oldrev.timestamp,
            timezone=oldrev.timezone, revprops=revprops, rev_id=newrevid,
            committer=committer, authors=authors)


def complete_revert(wt, newparents):
    """Simple helper that reverts to specified new parents and makes sure none
    of the extra files are left around.

    :param wt: Working tree to use for rebase
    :param newparents: New parents of the working tree
    """
    newtree = wt.branch.repository.revision_tree(newparents[0])
    delta = wt.changes_from(newtree)
    wt.branch.generate_revision_history(newparents[0])
    wt.set_parent_ids([r for r in newparents[:1] if r != NULL_REVISION])
    for change in delta.added:
        abs_path = wt.abspath(change.path[1])
        if osutils.lexists(abs_path):
            if osutils.isdir(abs_path):
                osutils.rmtree(abs_path)
            else:
                os.unlink(abs_path)
    wt.revert(None, old_tree=newtree, backups=False)
    assert not wt.changes_from(wt.basis_tree()).has_changed(), "Rev changed"
    wt.set_parent_ids([r for r in newparents if r != NULL_REVISION])


class ReplaySnapshotError(BzrError):
    """Raised when replaying a snapshot failed."""
    _fmt = """Replaying the snapshot failed: %(msg)s."""

    def __init__(self, msg):
        BzrError.__init__(self)
        self.msg = msg