# -*- coding: utf-8 -*-
"""Different tools to work with datasets in HDF5 file format.
Created on Thu Apr 2 21:12:46 2015
@author: akusok
"""
import numpy as np
import csv
from tables import open_file, Atom, Filters
import os
import fasteners # inter-process file lock
def _prepare_fHH(fHH, fHT, nnet, precision):
"""Prepares files for fHH, fHT if they are needed.
Args:
fHH (string): hdf5 filename to store HH, None for ignore disk storage
fHT (string): hdf5 filename to store HT, None for ignore disk storage
nent (nnets Object): neural network implementation from HPELM
precision (np.float32/64): precision
"""
if (fHH is not None) and (fHT is not None):
# reset accumulated data in ELM
nnet.reset()
L = nnet.L
outputs = nnet.outputs
norm = nnet.norm
# process fHH
if os.path.isfile(fHH):
h5 = open_file(fHH, 'r')
node = None
for node in h5.walk_nodes():
pass # find a node with whatever name
try:
assert node is not None, "Matrix in %d does not exist" % fHH
assert node is not None and node.shape[0] == L and node.shape[1] == L, \
"Matrix in %d has a wrong shape: (%d, %d) expected, (%d, %d) found" % \
(fHH, L, L, node.shape[0], node.shape[1])
except AssertionError as e:
raise # re-raise same error
finally:
h5.close()
else:
make_hdf5(np.eye(L, L, dtype=precision)*norm, fHH, precision)
# process fHT
if os.path.isfile(fHT):
h5 = open_file(fHT, 'r')
node = None
for node in h5.walk_nodes():
pass # find a node with whatever name
try:
assert node is not None, "Matrix in %d does not exist" % fHT
assert node is not None and node.shape[0] == L and node.shape[1] == outputs, \
"Matrix in %d has a wrong shape: (%d, %d) expected, (%d, %d) found" % \
(fHT, L, outputs, node.shape[0], node.shape[1])
except AssertionError as e:
raise # re-raise same error
finally:
h5.close()
else:
make_hdf5((L, outputs), fHT, precision)
def _write_fHH(fHH, fHT, HH, HT):
"""Writes HH,HT data into fHH,fHT files, multi-process safe with lock file.
Lock file has the same name as fHH,fHT, but with '.lock' extension.
"""
fHH_lock = fHH + ".lock"
with fasteners.InterProcessLock(fHH_lock):
h5 = open_file(fHH, "a")
for node in h5.walk_nodes():
pass # find a node with whatever name
node[:] += HH
h5.flush()
h5.close()
fHT_lock = fHT + ".lock"
with fasteners.InterProcessLock(fHT_lock):
h5 = open_file(fHT, "a")
for node in h5.walk_nodes():
pass # find a node with whatever name
node[:] += HT
h5.flush()
h5.close()
[docs]def normalize_hdf5(h5file, mean=None, std=None, batch=None):
"""Calculates and applies normalization to data in HDF5 file.
:param mean: - known vector of mean values
:param std: - known vector of standard deviations
:param batch: - number of rows to read at once, default is a native batch size
"""
h5 = open_file(h5file, "a")
for node in h5.walk_nodes():
pass # find a node with whatever name
dt = node.dtype
N, d = node.shape # HDF5 files are transposed, for Matlab compatibility
if batch is None:
batch = node.chunkshape[0]
nb = N/batch
if N > nb*batch:
nb += 1 # add last incomplete step
if mean is None or std is None:
if node.attrs.mean is None: # data was not normalized before
print("calculating mean and standard deviation of data")
E_x = np.zeros((d,), dtype=np.float64)
E_x2 = np.zeros((d,), dtype=np.float64)
for b in xrange(nb):
start = b*batch
step = min(batch, N-start)
X1 = node[start: start+step, :].astype(np.float64)
E_x += np.mean(X1, 0) * (1.0*step/N)
E_x2 += np.mean(X1**2, 0) * (1.0*step/N)
mean = E_x
E2_x = E_x**2
std = (E_x2 - E2_x)**0.5
node.attrs.mean = mean
node.attrs.std = std
return mean, std
else: # data is already normalized
print("data was already normalized, returning 'mean', 'std' parameters")
print("if you want to run normalization anyway, call the function with 'mean' and 'std' params")
return node.attrs.mean, node.attrs.std
else:
assert len(mean) == d, "Incorrect lenght of a vector of means: %d expected, %d found" % (d, len(mean))
assert len(std) == d, "Incorrect lenght of a vector of standard deviations: %d expected, %d found" % (d, len(std))
node.attrs.mean = mean
node.attrs.std = std
std[std == 0] = 1 # prevent division by zero for std=0
print("applying normalization")
for b in xrange(nb):
start = b*batch
step = min(batch, N-start)
X = node[start: start+step].astype(np.float64)
X = (X - mean) / std
node[start: start+step] = X.astype(dt)
h5.close() # closing file
return mean, std
#def oversample(data, targets, classes):
# pass
[docs]def make_hdf5(data, h5file, dtype=np.float64, delimiter=" ", skiprows=0, comp_level=0):
"""Makes an HDF5 file from whatever given data.
:param data: - input data in Numpy.ndarray or filename, or a shape tuple
:param h5file: - name (and path) of the output HDF5 file
:param delimiter: - data delimiter for text, csv files
:param comp_level: - compression level of the HDF5 file
"""
assert comp_level < 10, "Compression level must be 0-9 (0 for no compression)"
fill = ""
# open data file
if isinstance(data, np.ndarray):
X = data
elif isinstance(data, basestring) and data[-3:] in ['npy']:
X = np.load(data)
elif isinstance(data, basestring) and data[-3:] in ['.gz', 'bz2']:
X = np.loadtxt(data, dtype=dtype, delimiter=delimiter, skiprows=skiprows)
elif isinstance(data, basestring) and data[-3:] in ['txt', 'csv']:
# iterative out-of-memory loader for huge .csv/.txt files
fill = "iter"
# check data dimensionality
with open(data, "rU") as f:
for _ in xrange(skiprows):
f.readline()
reader = csv.reader(f, delimiter=delimiter)
for line in reader:
X = np.fromiter(line, dtype=dtype)
break
elif isinstance(data, tuple) and len(data) == 2:
X = np.empty((1, 1))
fill = "empty"
else:
assert False, "Input data must be Numpy ndarray, .npy file, or .txt/.csv text file (compressed .gz/.bz2)"
# process data
if len(X.shape) == 1:
X = X[:, np.newaxis]
assert len(X.shape) == 2, "Data in Numpy ndarray must have 2 dimensions"
# create hdf5 file
if comp_level > 0:
flt = Filters(complevel=comp_level, shuffle=True)
else:
flt = Filters(complevel=0)
h5 = open_file(h5file, "w")
a = Atom.from_dtype(np.dtype(dtype), dflt=0)
# write data to hdf5 file
if fill == "iter": # iteratively fill the data
h5data = h5.create_earray(h5.root, "data", a, (0, X.shape[0]), filters=flt)
with open(data, "rU") as f:
for _ in xrange(skiprows):
f.readline()
reader = csv.reader(f, delimiter=delimiter)
for line in reader:
row = np.fromiter(line, dtype=dtype)
h5data.append(row[np.newaxis, :])
elif fill == "empty": # no fill at all
h5data = h5.create_carray(h5.root, "data", a, data, filters=flt)
else: # write whole data matrix
h5data = h5.create_carray(h5.root, "data", a, X.shape, filters=flt)
h5data[:] = X
# close the file
h5data.attrs.mean = None
h5data.attrs.std = None
h5.flush()
h5.close()
def _ireader(fX, q_in, q_out):
"""Asyncronous reader for an HDF5 file.
q_in - a (start, stop) tuple of read indexes; if start >= stop then reader terminates
q_out - a queue for chunks red from a disk
"""
assert isinstance(fX, basestring), "Asyncronous I/O only supported with HDF5 data files"
hX = open_file(fX, "r")
for X in hX.walk_nodes():
pass # find a node with whatever name
while True: # returning data chunks on demand
start, stop = q_in.get()
if start >= stop:
break
q_out.put(X[start:stop])
hX.close()
def _iwriter(fX, q_in):
"""Asyncronous writer for an HDF5 file.
q_in - a (Xbatch, start, stop) tuple of data to write indexes; if q_in is None then writer terminates
"""
assert isinstance(fX, basestring), "Asyncronous I/O only supported with HDF5 data files"
hX = open_file(fX, "a")
for X in hX.walk_nodes():
pass # find a node with whatever name
while True: # returning data chunks on demand
data = q_in.get()
if data is None:
break
Xb, start, stop = data
X[start:stop] = Xb
X.flush()
hX.close()
if __name__ == "__main__":
# def make_hdf5(data, h5file, dtype=np.float64, delimiter=" ", skiprows=0, comp_level=0):
# make_hdf5("textfile.txt", "text.h5")
# make_hdf5("textfile.txt", "textz.h5", comp_level=3)
normalize_hdf5("text.h5")
print("Done!")