Source code for bravo.world

from array import array
from functools import wraps
from itertools import imap, product
import random
import sys

from twisted.internet import reactor
from twisted.internet.defer import (inlineCallbacks, maybeDeferred,
                                    returnValue, succeed)
from twisted.internet.task import LoopingCall, coiterate
from twisted.python import log

from bravo.beta.structures import Level
from bravo.chunk import Chunk, CHUNK_HEIGHT
from bravo.entity import Player, Furnace
from bravo.errors import (ChunkNotLoaded, SerializerReadException,
                          SerializerWriteException)
from bravo.ibravo import ISerializer
from bravo.plugin import retrieve_named_plugins
from bravo.utilities.coords import split_coords
from bravo.utilities.temporal import PendingEvent
from bravo.mobmanager import MobManager


[docs]class ChunkCache(object): """ A cache which holds references to all chunks which should be held in memory. This cache remembers chunks that were recently used, that are in permanent residency, and so forth. Its exact caching algorithm is currently null. When chunks dirty themselves, they are expected to notify the cache, which will then schedule an eviction for the chunk. """ def __init__(self): self._perm = {} self._dirty = {} def pin(self, chunk): self._perm[chunk.x, chunk.z] = chunk def unpin(self, chunk): del self._perm[chunk.x, chunk.z] def put(self, chunk): # XXX expand caching strategy pass def get(self, coords): if coords in self._perm: return self._perm[coords] # Returns None if not found! return self._dirty.get(coords) def cleaned(self, chunk): del self._dirty[chunk.x, chunk.z] def dirtied(self, chunk): self._dirty[chunk.x, chunk.z] = chunk def iterperm(self): return self._perm.itervalues() def iterdirty(self): return self._dirty.itervalues()
[docs]class ImpossibleCoordinates(Exception): """ A coordinate could not ever be valid. """
[docs]def coords_to_chunk(f): """ Automatically look up the chunk for the coordinates, and convert world coordinates to chunk coordinates. """ @wraps(f) def decorated(self, coords, *args, **kwargs): x, y, z = coords # Fail early if Y is OOB. if not 0 <= y < CHUNK_HEIGHT: raise ImpossibleCoordinates("Y value %d is impossible" % y) bigx, smallx, bigz, smallz = split_coords(x, z) d = self.request_chunk(bigx, bigz) @d.addCallback def cb(chunk): return f(self, chunk, (smallx, y, smallz), *args, **kwargs) return d return decorated
[docs]def sync_coords_to_chunk(f): """ Either get a chunk for the coordinates, or raise an exception. """ @wraps(f) def decorated(self, coords, *args, **kwargs): x, y, z = coords # Fail early if Y is OOB. if not 0 <= y < CHUNK_HEIGHT: raise ImpossibleCoordinates("Y value %d is impossible" % y) bigx, smallx, bigz, smallz = split_coords(x, z) bigcoords = bigx, bigz chunk = self._cache.get(bigcoords) if chunk is None: raise ChunkNotLoaded("Chunk (%d, %d) isn't loaded" % bigcoords) return f(self, chunk, (smallx, y, smallz), *args, **kwargs) return decorated
[docs]class World(object): """ Object representing a world on disk. Worlds are composed of levels and chunks, each of which corresponds to exactly one file on disk. Worlds also contain saved player data. """ factory = None """ The factory managing this world. Worlds do not need to be owned by a factory, but will not callback to surrounding objects without an owner. """ _season = None """ The current `ISeason`. """ saving = True """ Whether objects belonging to this world may be written out to disk. """ async = False """ Whether this world is using multiprocessing methods to generate geometry. """ dimension = "earth" """ The world dimension. Valid values are earth, sky, and nether. """ level = Level(seed=0, spawn=(0, 0, 0), time=0) """ The initial level data. """ _cache = None """ The chunk cache. """ def __init__(self, config, name): """ :Parameters: name : str The configuration key to use to look up configuration data. """ self.config = config self.config_name = "world %s" % name self._pending_chunks = dict() @property def season(self): return self._season @season.setter def season(self, value): if self._season != value: self._season = value if self._cache is not None: # Issue 388: Apply the season to the permanent cache. # Use a list so that we don't end up with indefinite amounts # of work to do, and also so that we don't try to do work # while the permanent cache is changing size. coiterate(imap(value.transform, list(self._cache.iterperm())))
[docs] def connect(self): """ Connect to the world. """ world_url = self.config.get(self.config_name, "url") world_sf_name = self.config.get(self.config_name, "serializer") # Get the current serializer list, and attempt to connect our # serializer of choice to our resource. # This could fail. Each of these lines (well, only the first and # third) could raise a variety of exceptions. They should *all* be # fatal. serializers = retrieve_named_plugins(ISerializer, [world_sf_name]) self.serializer = serializers[0] self.serializer.connect(world_url) log.msg("World connected on %s, using serializer %s" % (world_url, self.serializer.name))
[docs] def start(self): """ Start managing a world. Connect to the world and turn on all of the timed actions which continuously manage the world. """ self.connect() # Create our cache. self._cache = ChunkCache() # Pick a random number for the seed. Use the configured value if one # is present. seed = random.randint(0, sys.maxint) seed = self.config.getintdefault(self.config_name, "seed", seed) self.level = self.level._replace(seed=seed) # Check if we should offload chunk requests to ampoule. if self.config.getbooleandefault("bravo", "ampoule", False): try: import ampoule if ampoule: self.async = True except ImportError: pass log.msg("World is %s" % ("read-write" if self.saving else "read-only")) log.msg("Using Ampoule: %s" % self.async) # First, try loading the level, to see if there's any data out there # which we can use. If not, don't worry about it. d = maybeDeferred(self.serializer.load_level) @d.addCallback def cb(level): self.level = level log.msg("Loaded level data!") @d.addErrback def sre(failure): failure.trap(SerializerReadException) log.msg("Had issues loading level data, continuing anyway...") # And now save our level. if self.saving: self.serializer.save_level(self.level) # Start up the permanent cache. # has_option() is not exactly desirable, but it's appropriate here # because we don't want to take any action if the key is unset. if self.config.has_option(self.config_name, "perm_cache"): cache_level = self.config.getint(self.config_name, "perm_cache") self.enable_cache(cache_level) self.chunk_management_loop = LoopingCall(self.flush_chunk) self.chunk_management_loop.start(1) # XXX Put this in init or here? self.mob_manager = MobManager() # XXX Put this in the managers constructor? self.mob_manager.world = self
@inlineCallbacks
[docs] def stop(self): """ Stop managing the world. This can be a time-consuming, blocking operation, while the world's data is serialized. Note to callers: If you want the world time to be accurate, don't forget to write it back before calling this method! :returns: A ``Deferred`` that fires after the world has stopped. """ self.chunk_management_loop.stop() # Flush all dirty chunks to disk. Don't bother cleaning them off. for chunk in self._cache.iterdirty(): yield self.save_chunk(chunk) # Destroy the cache. self._cache = None # Save the level data. yield maybeDeferred(self.serializer.save_level, self.level)
[docs] def enable_cache(self, size): """ Set the permanent cache size. Changing the size of the cache sets off a series of events which will empty or fill the cache to make it the proper size. For reference, 3 is a large-enough size to completely satisfy the Notchian client's login demands. 10 is enough to completely fill the Notchian client's chunk buffer. :param int size: The taxicab radius of the cache, in chunks :returns: A ``Deferred`` which will fire when the cache has been adjusted. """ log.msg("Setting cache size to %d, please hold..." % size) assign = self._cache.pin def worker(x, z): log.msg("Adding %d, %d to cache..." % (x, z)) return self.request_chunk(x, z).addCallback(assign) x = self.level.spawn[0] // 16 z = self.level.spawn[2] // 16 rx = xrange(x - size, x + size) rz = xrange(z - size, z + size) work = (worker(x, z) for x, z in product(rx, rz)) d = coiterate(work) @d.addCallback def notify(none): log.msg("Cache size is now %d!" % size) return d
[docs] def flush_chunk(self): """ Flush a dirty chunk. This method will always block when there are dirty chunks. """ for chunk in self._cache.iterdirty(): # Save a single chunk, and add a callback to remove it from the # cache when it's been cleaned. d = self.save_chunk(chunk) d.addCallback(self._cache.cleaned) break
[docs] def save_off(self): """ Disable saving to disk. This is useful for accessing the world on disk without Bravo interfering, for backing up the world. """ if not self.saving: return self.chunk_management_loop.stop() self.saving = False
[docs] def save_on(self): """ Enable saving to disk. """ if self.saving: return self.chunk_management_loop.start(1) self.saving = True
[docs] def postprocess_chunk(self, chunk): """ Do a series of final steps to bring a chunk into the world. This method might be called multiple times on a chunk, but it should not be harmful to do so. """ # Apply the current season to the chunk. if self.season: self.season.transform(chunk) # Since this chunk hasn't been given to any player yet, there's no # conceivable way that any meaningful damage has been accumulated; # anybody loading any part of this chunk will want the entire thing. # Thus, it should start out undamaged. chunk.clear_damage() # Skip some of the spendier scans if we have no factory; for example, # if we are generating chunks offline. if not self.factory: return chunk # XXX slightly icky, print statements are bad # Register the chunk's entities with our parent factory. for entity in chunk.entities: if hasattr(entity, 'loop'): print "Started mob!" self.mob_manager.start_mob(entity) else: print "I have no loop" self.factory.register_entity(entity) # XXX why is this for furnaces only? :T # Scan the chunk for burning furnaces for coords, tile in chunk.tiles.iteritems(): # If the furnace was saved while burning ... if type(tile) == Furnace and tile.burntime != 0: x, y, z = coords coords = chunk.x, x, chunk.z, z, y # ... start it's burning loop reactor.callLater(2, tile.changed, self.factory, coords) # Return the chunk, in case we are in a Deferred chain. return chunk
@inlineCallbacks
[docs] def request_chunk(self, x, z): """ Request a ``Chunk`` to be delivered later. :returns: ``Deferred`` that will be called with the ``Chunk`` """ # First, try the cache. cached = self._cache.get((x, z)) if cached is not None: returnValue(cached) # Is it pending? if (x, z) in self._pending_chunks: # Rig up another Deferred and wrap it up in a to-go box. retval = yield self._pending_chunks[x, z].deferred() returnValue(retval) # Create a new chunk object, since the cache turned up empty. try: chunk = yield maybeDeferred(self.serializer.load_chunk, x, z) except SerializerReadException: # Looks like the chunk wasn't already on disk. Guess we're gonna # need to keep going. chunk = Chunk(x, z) # Add in our magic dirtiness hook so that the cache can be aware of # chunks who have been...naughty. chunk.dirtied = self._cache.dirtied if chunk.dirty: # The chunk was already dirty!? Oh, naughty indeed! self._cache.dirtied(chunk) if chunk.populated: self._cache.put(chunk) self.postprocess_chunk(chunk) if self.factory: self.factory.scan_chunk(chunk) returnValue(chunk) if self.async: from ampoule import deferToAMPProcess from bravo.remote import MakeChunk generators = [plugin.name for plugin in self.pipeline] d = deferToAMPProcess(MakeChunk, x=x, z=z, seed=self.level.seed, generators=generators) # Get chunk data into our chunk object. def fill_chunk(kwargs): chunk.blocks = array("B") chunk.blocks.fromstring(kwargs["blocks"]) chunk.heightmap = array("B") chunk.heightmap.fromstring(kwargs["heightmap"]) chunk.metadata = array("B") chunk.metadata.fromstring(kwargs["metadata"]) chunk.skylight = array("B") chunk.skylight.fromstring(kwargs["skylight"]) chunk.blocklight = array("B") chunk.blocklight.fromstring(kwargs["blocklight"]) return chunk d.addCallback(fill_chunk) else: # Populate the chunk the slow way. :c for stage in self.pipeline: stage.populate(chunk, self.level.seed) chunk.regenerate() d = succeed(chunk) # Set up our event and generate our return-value Deferred. It has to # be done early becaues PendingEvents only fire exactly once and it # might fire immediately in certain cases. pe = PendingEvent() # This one is for our return value. retval = pe.deferred() # This one is for scanning the chunk for automatons. if self.factory: pe.deferred().addCallback(self.factory.scan_chunk) self._pending_chunks[x, z] = pe def pp(chunk): chunk.populated = True chunk.dirty = True self.postprocess_chunk(chunk) self._cache.dirtied(chunk) del self._pending_chunks[x, z] return chunk # Set up callbacks. d.addCallback(pp) d.chainDeferred(pe) # Because multiple people might be attached to this callback, we're # going to do something magical here. We will yield a forked version # of our Deferred. This means that we will wait right here, for a # long, long time, before actually returning with the chunk, *but*, # when we actually finish, we'll be ready to return the chunk # immediately. Our caller cannot possibly care because they only see a # Deferred either way. retval = yield retval returnValue(retval)
[docs] def save_chunk(self, chunk): """ Write a chunk to the serializer. Note that this method does nothing when the given chunk is not dirty or saving is off! :returns: A ``Deferred`` which will fire after the chunk has been saved with the chunk. """ if not chunk.dirty or not self.saving: return succeed(chunk) d = maybeDeferred(self.serializer.save_chunk, chunk) @d.addCallback def cb(none): chunk.dirty = False return chunk @d.addErrback def eb(failure): failure.trap(SerializerWriteException) log.msg("Couldn't write %r" % chunk) return d
[docs] def load_player(self, username): """ Retrieve player data. :returns: a ``Deferred`` that will be fired with a ``Player`` """ # Get the player, possibly. d = maybeDeferred(self.serializer.load_player, username) @d.addErrback def eb(failure): failure.trap(SerializerReadException) log.msg("Couldn't load player %r" % username) # Make a player. player = Player(username=username) player.location.x = self.level.spawn[0] player.location.y = self.level.spawn[1] player.location.stance = self.level.spawn[1] player.location.z = self.level.spawn[2] return player # This Deferred's good to go as-is. return d
def save_player(self, username, player): if self.saving: self.serializer.save_player(player) # World-level geometry access. # These methods let external API users refrain from going through the # standard motions of looking up and loading chunk information. @coords_to_chunk
[docs] def get_block(self, chunk, coords): """ Get a block from an unknown chunk. :returns: a ``Deferred`` with the requested value """ return chunk.get_block(coords)
@coords_to_chunk
[docs] def set_block(self, chunk, coords, value): """ Set a block in an unknown chunk. :returns: a ``Deferred`` that will fire on completion """ chunk.set_block(coords, value)
@coords_to_chunk
[docs] def get_metadata(self, chunk, coords): """ Get a block's metadata from an unknown chunk. :returns: a ``Deferred`` with the requested value """ return chunk.get_metadata(coords)
@coords_to_chunk
[docs] def set_metadata(self, chunk, coords, value): """ Set a block's metadata in an unknown chunk. :returns: a ``Deferred`` that will fire on completion """ chunk.set_metadata(coords, value)
@coords_to_chunk
[docs] def destroy(self, chunk, coords): """ Destroy a block in an unknown chunk. :returns: a ``Deferred`` that will fire on completion """ chunk.destroy(coords)
@coords_to_chunk
[docs] def mark_dirty(self, chunk, coords): """ Mark an unknown chunk dirty. :returns: a ``Deferred`` that will fire on completion """ chunk.dirty = True
@sync_coords_to_chunk
[docs] def sync_get_block(self, chunk, coords): """ Get a block from an unknown chunk. :returns: the requested block """ return chunk.get_block(coords)
@sync_coords_to_chunk
[docs] def sync_set_block(self, chunk, coords, value): """ Set a block in an unknown chunk. :returns: None """ chunk.set_block(coords, value)
@sync_coords_to_chunk
[docs] def sync_get_metadata(self, chunk, coords): """ Get a block's metadata from an unknown chunk. :returns: the requested metadata """ return chunk.get_metadata(coords)
@sync_coords_to_chunk
[docs] def sync_set_metadata(self, chunk, coords, value): """ Set a block's metadata in an unknown chunk. :returns: None """ chunk.set_metadata(coords, value)
@sync_coords_to_chunk
[docs] def sync_destroy(self, chunk, coords): """ Destroy a block in an unknown chunk. :returns: None """ chunk.destroy(coords)
@sync_coords_to_chunk
[docs] def sync_mark_dirty(self, chunk, coords): """ Mark an unknown chunk dirty. :returns: None """ chunk.dirty = True
@sync_coords_to_chunk
[docs] def sync_request_chunk(self, chunk, coords): """ Get an unknown chunk. :returns: the requested ``Chunk`` """ return chunk