from array import array
from functools import wraps
from itertools import product
from struct import pack
from warnings import warn
from bravo.blocks import blocks, glowing_blocks
from bravo.beta.packets import make_packet
from bravo.geometry.section import Section
from bravo.utilities.bits import pack_nibbles
from bravo.utilities.coords import CHUNK_HEIGHT, XZ, iterchunk
from bravo.utilities.maths import clamp
[docs]class ChunkWarning(Warning):
"""
Somebody did something inappropriate to this chunk, but it probably isn't
lethal, so the chunk is issuing a warning instead of an exception.
"""
[docs]def check_bounds(f):
"""
Decorate a function or method to have its first positional argument be
treated as an (x, y, z) tuple which must fit inside chunk boundaries of
16, CHUNK_HEIGHT, and 16, respectively.
A warning will be raised if the bounds check fails.
"""
@wraps(f)
def deco(chunk, coords, *args, **kwargs):
x, y, z = coords
# Coordinates were out-of-bounds; warn and run away.
if not (0 <= x < 16 and 0 <= z < 16 and 0 <= y < CHUNK_HEIGHT):
warn("Coordinates %s are OOB in %s() of %s, ignoring call"
% (coords, f.func_name, chunk), ChunkWarning, stacklevel=2)
# A concession towards where this decorator will be used. The
# value is likely to be discarded either way, but if the value is
# used, we shouldn't horribly die because of None/0 mismatch.
return 0
return f(chunk, coords, *args, **kwargs)
return deco
[docs]def ci(x, y, z):
"""
Turn an (x, y, z) tuple into a chunk index.
This is really a macro and not a function, but Python doesn't know the
difference. Hopefully this is faster on PyPy than on CPython.
"""
return (x * 16 + z) * CHUNK_HEIGHT + y
[docs]def segment_array(a):
"""
Chop up a chunk-sized array into sixteen components.
The chops are done in order to produce the smaller chunks preferred by
modern clients.
"""
l = [array(a.typecode) for chaff in range(16)]
index = 0
for i in range(0, len(a), 16):
l[index].extend(a[i:i + 16])
index = (index + 1) % 16
return l
[docs]def make_glows():
"""
Set up glow tables.
These tables provide glow maps for illuminated points.
"""
glow = [None] * 16
for i in range(16):
dim = 2 * i + 1
glow[i] = array("b", [0] * (dim**3))
for x, y, z in product(xrange(dim), repeat=3):
distance = abs(x - i) + abs(y - i) + abs(z - i)
glow[i][(x * dim + y) * dim + z] = i + 1 - distance
glow[i] = array("B", [clamp(x, 0, 15) for x in glow[i]])
return glow
glow = make_glows()
[docs]def composite_glow(target, strength, x, y, z):
"""
Composite a light source onto a lightmap.
The exact operation is not quite unlike an add.
"""
ambient = glow[strength]
xbound, zbound, ybound = 16, CHUNK_HEIGHT, 16
sx = x - strength
sy = y - strength
sz = z - strength
ex = x + strength
ey = y + strength
ez = z + strength
si, sj, sk = 0, 0, 0
ei, ej, ek = strength * 2, strength * 2, strength * 2
if sx < 0:
sx, si = 0, -sx
if sy < 0:
sy, sj = 0, -sy
if sz < 0:
sz, sk = 0, -sz
if ex > xbound:
ex, ei = xbound, ei - ex + xbound
if ey > ybound:
ey, ej = ybound, ej - ey + ybound
if ez > zbound:
ez, ek = zbound, ek - ez + zbound
adim = 2 * strength + 1
# Composite! Apologies for the loops.
for (tx, ax) in zip(range(sx, ex), range(si, ei)):
for (tz, az) in zip(range(sz, ez), range(sk, ek)):
for (ty, ay) in zip(range(sy, ey), range(sj, ej)):
ambient_index = (ax * adim + az) * adim + ay
target[ci(tx, ty, tz)] += ambient[ambient_index]
[docs]def iter_neighbors(coords):
"""
Iterate over the chunk-local coordinates surrounding the given
coordinates.
All coordinates are chunk-local.
Coordinates which are not valid chunk-local coordinates will not be
generated.
"""
x, z, y = coords
for dx, dz, dy in (
(1, 0, 0),
(-1, 0, 0),
(0, 1, 0),
(0, -1, 0),
(0, 0, 1),
(0, 0, -1)):
nx = x + dx
nz = z + dz
ny = y + dy
if not (0 <= nx < 16 and
0 <= nz < 16 and
0 <= ny < CHUNK_HEIGHT):
continue
yield nx, nz, ny
[docs]def neighboring_light(glow, block):
"""
Calculate the amount of light that should be shone on a block.
``glow`` is the brighest neighboring light. ``block`` is the slot of the
block being illuminated.
The return value is always a valid light value.
"""
return clamp(glow - blocks[block].dim, 0, 15)
[docs]class Chunk(object):
"""
A chunk of blocks.
Chunks are large pieces of world geometry (block data). The blocks, light
maps, and associated metadata are stored in chunks. Chunks are
always measured 16xCHUNK_HEIGHTx16 and are aligned on 16x16 boundaries in
the xz-plane.
:cvar bool dirty: Whether this chunk needs to be flushed to disk.
:cvar bool populated: Whether this chunk has had its initial block data
filled out.
"""
all_damaged = False
populated = False
dirtied = None
"""
Optional hook to be called when this chunk becomes dirty.
"""
_dirty = True
"""
Internal flag describing whether the chunk is dirty. Don't touch directly;
use the ``dirty`` property instead.
"""
def __init__(self, x, z):
"""
:param int x: X coordinate in chunk coords
:param int z: Z coordinate in chunk coords
:ivar array.array heightmap: Tracks the tallest block in each xz-column.
:ivar bool all_damaged: Flag for forcing the entire chunk to be
damaged. This is for efficiency; past a certain point, it is not
efficient to batch block updates or track damage. Heavily damaged
chunks have their damage represented as a complete resend of the
entire chunk.
"""
self.x = int(x)
self.z = int(z)
self.heightmap = array("B", [0] * (16 * 16))
self.blocklight = array("B", [0] * (16 * 16 * CHUNK_HEIGHT))
self.sections = [Section() for i in range(16)]
self.entities = set()
self.tiles = {}
self.damaged = set()
def __repr__(self):
return "Chunk(%d, %d)" % (self.x, self.z)
__str__ = __repr__
@property
def dirty(self):
return self._dirty
@dirty.setter
def dirty(self, value):
if value and not self._dirty:
# Notify whoever cares.
if self.dirtied is not None:
self.dirtied(self)
self._dirty = value
[docs] def regenerate_heightmap(self):
"""
Regenerate the height map array.
The height map is merely the position of the tallest block in any
xz-column.
"""
for x in range(16):
for z in range(16):
column = x * 16 + z
for y in range(255, -1, -1):
if self.get_block((x, y, z)):
break
self.heightmap[column] = y
def regenerate_blocklight(self):
lightmap = array("L", [0] * (16 * 16 * CHUNK_HEIGHT))
for x, z, y in iterchunk():
block = self.get_block((x, y, z))
if block in glowing_blocks:
composite_glow(lightmap, glowing_blocks[block], x, y, z)
self.blocklight = array("B", [clamp(x, 0, 15) for x in lightmap])
[docs] def regenerate_skylight(self):
"""
Regenerate the ambient light map.
Each block's individual light comes from two sources. The ambient
light comes from the sky.
The height map must be valid for this method to produce valid results.
"""
# Create an array of skylights, and a mask of dimming blocks.
lights = [0xf] * (16 * 16)
mask = [0x0] * (16 * 16)
# For each y-level, we're going to update the mask, apply it to the
# lights, apply the lights to the section, and then blur the lights
# and move downwards. Since empty sections are full of air, and air
# doesn't ever dim, ignoring empty sections should be a correct way
# to speed things up. Another optimization is that the process ends
# early if the entire slice of lights is dark.
for section in reversed(self.sections):
if not section:
continue
for y in range(15, -1, -1):
# Early-out if there's no more light left.
if not any(lights):
break
# Update the mask.
for x, z in XZ:
offset = x * 16 + z
block = section.get_block((x, y, z))
mask[offset] = blocks[block].dim
# Apply the mask to the lights.
for i, dim in enumerate(mask):
# Keep it positive.
lights[i] = max(0, lights[i] - dim)
# Apply the lights to the section.
for x, z in XZ:
offset = x * 16 + z
section.set_skylight((x, y, z), lights[offset])
# XXX blur the lights
# And continue moving downward.
[docs] def regenerate(self):
"""
Regenerate all auxiliary tables.
"""
self.regenerate_heightmap()
self.regenerate_blocklight()
self.regenerate_skylight()
self.dirty = True
[docs] def damage(self, coords):
"""
Record damage on this chunk.
"""
if self.all_damaged:
return
x, y, z = coords
self.damaged.add(coords)
# The number 176 represents the threshold at which it is cheaper to
# resend the entire chunk instead of individual blocks.
if len(self.damaged) > 176:
self.all_damaged = True
self.damaged.clear()
[docs] def is_damaged(self):
"""
Determine whether any damage is pending on this chunk.
:rtype: bool
:returns: True if any damage is pending on this chunk, False if not.
"""
return self.all_damaged or bool(self.damaged)
[docs] def get_damage_packet(self):
"""
Make a packet representing the current damage on this chunk.
This method is not private, but some care should be taken with it,
since it wraps some fairly cryptic internal data structures.
If this chunk is currently undamaged, this method will return an empty
string, which should be safe to treat as a packet. Please check with
`is_damaged()` before doing this if you need to optimize this case.
To avoid extra overhead, this method should really be used in
conjunction with `Factory.broadcast_for_chunk()`.
Do not forget to clear this chunk's damage! Callers are responsible
for doing this.
>>> packet = chunk.get_damage_packet()
>>> factory.broadcast_for_chunk(packet, chunk.x, chunk.z)
>>> chunk.clear_damage()
:rtype: str
:returns: String representation of the packet.
"""
if self.all_damaged:
# Resend the entire chunk!
return self.save_to_packet()
elif not self.damaged:
# Send nothing at all; we don't even have a scratch on us.
return ""
elif len(self.damaged) == 1:
# Use a single block update packet. Find the first (only) set bit
# in the damaged array, and use it as an index.
coords = next(iter(self.damaged))
block = self.get_block(coords)
metadata = self.get_metadata(coords)
x, y, z = coords
return make_packet("block",
x=x + self.x * 16,
y=y,
z=z + self.z * 16,
type=block,
meta=metadata)
else:
# Use a batch update.
records = []
for coords in self.damaged:
block = self.get_block(coords)
metadata = self.get_metadata(coords)
x, y, z = coords
record = x << 28 | z << 24 | y << 16 | block << 4 | metadata
records.append(record)
data = "".join(pack(">I", record) for record in records)
return make_packet("batch", x=self.x, z=self.z,
count=len(records), data=data)
[docs] def clear_damage(self):
"""
Clear this chunk's damage.
"""
self.damaged.clear()
self.all_damaged = False
[docs] def save_to_packet(self):
"""
Generate a chunk packet.
"""
mask = 0
packed = []
ls = segment_array(self.blocklight)
for i, section in enumerate(self.sections):
if any(section.blocks):
mask |= 1 << i
packed.append(section.blocks.tostring())
for i, section in enumerate(self.sections):
if mask & 1 << i:
packed.append(pack_nibbles(section.metadata))
for i, l in enumerate(ls):
if mask & 1 << i:
packed.append(pack_nibbles(l))
for i, section in enumerate(self.sections):
if mask & 1 << i:
packed.append(pack_nibbles(section.skylight))
# Fake the biome data.
packed.append("\x00" * 256)
packet = make_packet("chunk", x=self.x, z=self.z, continuous=True,
primary=mask, add=0x0, data="".join(packed))
return packet
@check_bounds
[docs] def get_block(self, coords):
"""
Look up a block value.
:param tuple coords: coordinate triplet
:rtype: int
:returns: int representing block type
"""
x, y, z = coords
index, y = divmod(y, 16)
return self.sections[index].get_block((x, y, z))
@check_bounds
[docs] def set_block(self, coords, block):
"""
Update a block value.
:param tuple coords: coordinate triplet
:param int block: block type
"""
x, y, z = coords
index, section_y = divmod(y, 16)
column = x * 16 + z
if self.get_block(coords) != block:
self.sections[index].set_block((x, section_y, z), block)
if not self.populated:
return
# Regenerate heightmap at this coordinate.
if block:
self.heightmap[column] = max(self.heightmap[column], y)
else:
# If we replace the highest block with air, we need to go
# through all blocks below it to find the new top block.
height = self.heightmap[column]
if y == height:
for y in range(height, -1, -1):
if self.get_block((x, y, z)):
break
self.heightmap[column] = y
# Do the blocklight at this coordinate, if appropriate.
if block in glowing_blocks:
composite_glow(self.blocklight, glowing_blocks[block],
x, y, z)
bl = [clamp(light, 0, 15) for light in self.blocklight]
self.blocklight = array("B", bl)
# And the skylight.
glow = max(self.get_skylight((nx, ny, nz))
for nx, nz, ny in iter_neighbors((x, z, y)))
self.set_skylight((x, y, z), neighboring_light(glow, block))
self.dirty = True
self.damage(coords)
@check_bounds
@check_bounds
@check_bounds
[docs] def get_skylight(self, coords):
"""
Look up skylight value.
:param tuple coords: coordinate triplet
:rtype: int
"""
x, y, z = coords
index, y = divmod(y, 16)
return self.sections[index].get_skylight((x, y, z))
@check_bounds
[docs] def set_skylight(self, coords, value):
"""
Update skylight value.
:param tuple coords: coordinate triplet
:param int metadata:
"""
if self.get_metadata(coords) != value:
x, y, z = coords
index, y = divmod(y, 16)
self.sections[index].set_skylight((x, y, z), value)
@check_bounds
[docs] def destroy(self, coords):
"""
Destroy the block at the given coordinates.
This may or may not set the block to be full of air; it uses the
block's preferred replacement. For example, ice generally turns to
water when destroyed.
This is safe as a no-op; for example, destroying a block of air with
no metadata is not going to cause state changes.
:param tuple coords: coordinate triplet
"""
block = blocks[self.get_block(coords)]
self.set_block(coords, block.replace)
self.set_metadata(coords, 0)
[docs] def height_at(self, x, z):
"""
Get the height of an xz-column of blocks.
:param int x: X coordinate
:param int z: Z coordinate
:rtype: int
:returns: The height of the given column of blocks.
"""
return self.heightmap[x * 16 + z]
[docs] def sed(self, search, replace):
"""
Execute a search and replace on all blocks in this chunk.
Named after the ubiquitous Unix tool. Does a semantic
s/search/replace/g on this chunk's blocks.
:param int search: block to find
:param int replace: block to use as a replacement
"""
for section in self.sections:
for i, block in enumerate(section.blocks):
if block == search:
section.blocks[i] = replace
self.all_damaged = True
self.dirty = True