public
Authored by avatar Mathias Stelzer

minecraft manage

minecraft tmux server manager

manage.py 10.65 KiB
#! /usr/bin/env python3
import argparse
import json
import os
import requests
import shutil
import subprocess
import sys
import time
from pathlib import Path
from timeit import default_timer as timer


XMS, XMX = 1, 4

VERSION_MANIFEST_URL = 'https://launchermeta.mojang.com/mc/game/version_manifest.json'
USER_PROFILE_URL_TEMPLATE = 'https://api.mojang.com/users/profiles/minecraft/{username}'


def download(url, filename=None):
    if filename is None:
        filename = url.split('/')[-1]
    with requests.get(url, stream=True) as r:
        with open(filename, 'wb') as f:
            shutil.copyfileobj(r.raw, f)
    return filename


def get_json(url):
    return requests.get(url).json()


def try_delete(path):
    try:
        os.unlink(path)
    except FileNotFoundError:
        pass


class UserListFile:

    def __init__(self, filename):
        self.filename = filename
        self.data = []

    def _load(self):
        try:
            with open(self.filename) as f:
                self.data = json.load(f)
        except FileNotFoundError:
            self.data = []

    def _save(self):
        with open(self.filename, 'w') as f:
            json.dump(self.data, f, indent=4)

    def _index(self, username):
        uuid = get_uuid(username)
        for index, entry in whitelist:
            if entry['id'] == uuid:
                return index

    def add(self, username):
        self._load()
        url = USER_PROFILE_URL_TEMPLATE.format(username=username)
        user = get_json(url)
        new_entry = {
            "uuid": user["id"],
            "name": user["name"],
        }
        self.data.append(new_entry)
        self._save()

    def remove(self, username):
        self._load()
        index = self._index(username)
        if index is None:
            return False
        self.data.remove(index)
        self._save()
        return True


class InvalidVersion(Exception):
    pass


def get_version_info(version='release'):
    print('downloading version manifest ...')
    m = get_json(VERSION_MANIFEST_URL)

    # map version id and url
    urls = {v['id']: v['url'] for v in m['versions']}

    if version in ('release', 'snapshot'):
        # get latest version from manifest
        vtype = version
        version = m['latest'][vtype]
        print(f'latest {vtype}:', version)

    try:
        url = urls.get(version, None)
    except KeyError:
        raise InvalidVersion(version) from None

    print(f'downloading {version} info ...')
    return get_json(url)


class Session:

    def __init__(self, name):
        self.name = name

    def attach(self):
        process = subprocess.run(['tmux', 'attach', '-t', self.name])
        return process.returncode == 0

    def start(self, cmd, cwd=None):
        if os.getenv('TMUX'):
            # TODO: raise NestedTmuxError('cannot start tmux within tmux')
            print('ERROR: cannot start tmux within tmux')
            return False
        if self.exists():
            # TODO: raise TmuxSessionExists(f'tmux session already exists: {self.name}')
            print('ERROR: session already exists:', self.name)
            return False
        process = subprocess.run(['tmux', 'new-session', '-d', '-s', self.name], cwd=cwd)
        if process.returncode != 0:
            return False
        return self.send(cmd + '; exit')

    def wait(self, timeout=60):
        while timeout:
            if not self.exists():
                return True
            timeout -= 1
        return False

    def kill(self):
        process = subprocess.run(['tmux', 'kill-session', '-t', self.name], stdout=subprocess.DEVNULL, stderr=subprocess.STDOUT)
        return process.returncode == 0

    def exists(self):
        process = subprocess.run(['tmux', 'has-session', '-t', self.name], stdout=subprocess.DEVNULL, stderr=subprocess.STDOUT)
        return process.returncode == 0

    def send(self, text):
        print(f'{self.name}: {text}')
        process = subprocess.run(['tmux', 'send-keys', '-t', self.name, text, 'C-m'], stdout=subprocess.DEVNULL, stderr=subprocess.STDOUT)
        time.sleep(0.2)
        return process.returncode == 0


class Server:

    def __init__(self, root):
        self.root = Path(root)
        self.session = Session(self.root.name)
        self.whitelist = UserListFile(self.root / 'whitelist.json')
        self.ops = UserListFile(self.root / 'ops.json')
        self.banned_players = UserListFile(self.root / 'banned-players.json')
        self.run_file = self.root / 'run.sh'
        self.server_jar = self.root / 'server.jar'
        self.client_jar = self.root / 'client.jar'
        self.log = self.root / 'log.txt'

    def start(self):
        if self.run_file.exists():
            cmd = './run.sh'
        else:
            if not self.server_jar.exists():
                # TODO: raise NotInstalled()
                print('ERROR: server.jar does not exist')
                return False
            cmd = f'java -Xms{XMS}G -Xmx{XMX}G -jar server.jar nogui'
        cmd += f' | tee log.txt'
        if not self.session.start(cmd, cwd=self.root):
            print('ERROR: Failed to start server')
            return False

        time.sleep(10)
        if not self.session.exists():
            print('ERROR: Server exited! Output:')
            self.tail()
            return False
        return True

    def stop(self, wait=False, timeout=120):
        if not self.session.send('stop'):
            return False
        if wait:
            if not self.session.wait(timeout):
                print('WARNING: Session did not stop gracefully! Killing it now...')
                return not self.session.kill()
        return True

    def restart(self):
        if self.stop(wait=True):
            if self.start():
                return True
            print('ERROR: Failed to start server.')
            return False
        print('ERROR: Failed to stop server.')
        return False

    def tail(self, follow=False):
        cmd = ['tail']
        if follow:
            cmd.append('-f')
        cmd.append(self.log)
        process = subprocess.run(cmd)
        return process.returncode

    def update(self, version='release'):
        os.makedirs(self.root, exist_ok=True)

        info = get_version_info(version)
        version = info['id']

        try_delete(self.server_jar)
        try_delete(self.client_jar)

        print(f'downloading {version} server.jar ...')
        download(info['downloads']['server']['url'], self.server_jar)

        print(f'downloading {version} client.jar ...')
        download(info['downloads']['client']['url'], self.client_jar)

    def backup(self, world='world', destination=None):
        src = self.root / world
        if destination is None:
            dst = self.root / f'{world}.backup'
        else:
            dst = Path(destination)

        print('Creating backup in:', dst)
        start = timer()
        self.session.send('say Backup started')
        self.session.send('save-off')
        self.session.send('save-all')

        time.sleep(20)
        subprocess.run(['sync'])
        time.sleep(1)

        try:
            shutil.rmtree(dst)
        except FileNotFoundError:
            pass

        shutil.copytree(src, dst)  # actual backup

        self.session.send('save-on')
        duration = timer() - start
        self.session.send(f'say Backup finished in {duration:.2f}s')


def add_userlist_arguments(parser):
    parser.add_argument(
        'nickname',
        help='nickname to add/remove',
    )
    parser.add_argument(
        '-r',
        '--remove',
        action='store_true',
        help='remove nickname',
    )


def get_parser():
    parser = argparse.ArgumentParser()
    parser.add_argument(
        'root',
        help='server root directory',
    )

    subparsers = parser.add_subparsers(dest='command')

    start = subparsers.add_parser('start', help='Start the server')

    stop = subparsers.add_parser('stop', help='Stop the server')
    stop.add_argument('-w', '--wait', action='store_true', help='Wait until the server is stopped')

    restart = subparsers.add_parser('restart', help='Restart the server')
    attach = subparsers.add_parser('attach', help='Attach to the server console')

    tail = subparsers.add_parser('tail', help='Tail server output')
    tail.add_argument('-f', '--follow', action='store_true', help='follow output')

    update = subparsers.add_parser('update', help='Update server (vanilla only)')
    update.add_argument(
        'version',
        nargs='?',
        default='release',
        help='The exact version to update to or one of '
             '("release", "snapshot") to update to the latest. '
             '(Default: release)'
    )

    backup = subparsers.add_parser('backup', help='Create a backup of the world')
    backup.add_argument('world', nargs='?', default='world', help='World to backup (Default: world)')
    backup.add_argument('-d', '--destination', help='Destination directory (will be created) (Default: <world>.backup)')

    whitelist = subparsers.add_parser('whitelist', help='Add/remove a nickname to/from the whitelist')
    add_userlist_arguments(whitelist)

    op = subparsers.add_parser('op', help='Add/remove a nickname to/from the ops')
    add_userlist_arguments(op)

    ban = subparsers.add_parser('ban', help='Add/Remove a nickname to/from the banned-players')
    add_userlist_arguments(ban)

    return parser


def main(argv=None):
    if argv is None:
        argv = sys.argv[1:]

    parser = get_parser()
    args = parser.parse_args(argv)

    server = Server(args.root)

    if args.command == 'start':
        server.start()

    elif args.command == 'stop':
        server.stop(args.wait)

    elif args.command == 'restart':
        server.restart()

    elif args.command == 'attach':
        server.session.attach()

    elif args.command == 'tail':
        server.tail(args.follow)

    elif args.command == 'update':
        server.update(args.version)  # vanilla only

    elif args.command == 'backup':
        server.backup(args.world, args.destination)

    elif args.command == 'whitelist':
        if args.remove:
            server.whitelist.remove(args.nickname)
        else:
            server.whitelist.add(args.nickname)

    elif args.command == 'ops':
        if args.remove:
            server.ops.remove(args.nickname)
        else:
            server.ops.add(args.nickname)

    elif args.command == 'ban':
        if args.remove:
            server.banned.remove(args.nickname)
        else:
            server.banned.add(args.nickname)

    else:
        if args.command is None:
            print('No command given')
        else:
            print('Unknown command:', args.command)
        parser.print_usage()


if __name__ == '__main__':
    try:
        main()
    except KeyboardInterrupt:
        sys.exit(1)
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment