import numpy as np
from .data_structures import GadgetBinaryHeader, GadgetDataset
from .definitions import gadget_field_specs, gadget_ptype_specs
from .io import IOHandlerGadgetBinary
vector_fields = dict(IOHandlerGadgetBinary._vector_fields)
block_ids = {
"Coordinates": "POS",
"Velocities": "VEL",
"ParticleIDs": "ID",
"Mass": "MASS",
"InternalEnergy": "U",
"Density": "RHO",
"SmoothingLength": "HSML",
}
[docs]
def write_record(fp, data, endian):
dtype = endian + "i4"
size = np.array(data.nbytes, dtype=dtype)
fp.write(size.tobytes())
fp.write(data.tobytes())
fp.write(size.tobytes())
[docs]
def write_block(fp, data, endian, fmt, block_id):
assert fmt in [1, 2]
block_id = "%-4s" % block_id
if fmt == 2:
block_id_dtype = np.dtype([("id", "S", 4), ("offset", endian + "i4")])
block_id_data = np.zeros(1, dtype=block_id_dtype)
block_id_data["id"] = block_id
block_id_data["offset"] = data.nbytes + 8
write_record(fp, block_id_data, endian)
write_record(fp, data, endian)
[docs]
def fake_gadget_binary(
filename="fake_gadget_binary",
npart=(100, 100, 100, 0, 100, 0),
header_spec="default",
field_spec="default",
ptype_spec="default",
endian="",
fmt=2,
):
"""Generate a fake Gadget binary snapshot."""
header = GadgetBinaryHeader(filename, header_spec)
field_spec = GadgetDataset._setup_binary_spec(field_spec, gadget_field_specs)
ptype_spec = GadgetDataset._setup_binary_spec(ptype_spec, gadget_ptype_specs)
with open(filename, "wb") as fp:
# Generate and write header blocks
for i_header, header_spec in enumerate(header.spec):
specs = []
for name, dim, dtype in header_spec:
# workaround a FutureWarning in numpy where np.dtype(name, type, 1)
# will change meaning in a future version so
name_dtype = [name, endian + dtype, dim]
if dim == 1:
name_dtype.pop()
specs.append(tuple(name_dtype))
header_dtype = np.dtype(specs)
header = np.zeros(1, dtype=header_dtype)
if i_header == 0:
header["Npart"] = npart
header["Nall"] = npart
header["NumFiles"] = 1
header["BoxSize"] = 1
header["HubbleParam"] = 1
write_block(fp, header, endian, fmt, "HEAD")
npart = dict(zip(ptype_spec, npart, strict=True))
for fs in field_spec:
# Parse field name and particle type
if isinstance(fs, str):
field = fs
ptype = ptype_spec
else:
field, ptype = fs
if isinstance(ptype, str):
ptype = (ptype,)
# Determine field dimension
if field in vector_fields:
dim = vector_fields[field]
else:
dim = 1
# Determine dtype (in numpy convention)
if field == "ParticleIDs":
dtype = "u4"
else:
dtype = "f4"
dtype = endian + dtype
# Generate and write field block
data = []
rng = np.random.default_rng()
for pt in ptype:
data += [rng.random((npart[pt], dim))]
data = np.concatenate(data).astype(dtype)
if field in block_ids:
block_id = block_ids[field]
else:
block_id = ""
write_block(fp, data, endian, fmt, block_id)
return filename