"""
Provides utilities dealing with files.
"""
import os
import sys
import logging
import datetime
import numpy as np
from time import ctime, localtime, strftime
try:
import hashlib
md5_constructor = hashlib.md5
except ImportError:
import md5
md5_constructor = md5.new
__version__ = '$Id: files.py 685 2012-03-29 04:22:32Z carthur $'
logger = logging.getLogger()
if not getattr(__builtins__, "WindowsError", None):
[docs] class WindowsError(OSError):
pass
[docs]def fl_module_path(level=1):
"""
Get the path of the module <level> levels above this function
:param int level: level in the stack of the module calling this function
(default = 1, function calling ``fl_module_path``)
:returns: path, basename and extension of the file containing the module
:Example: path, base, ext = fl_module_path( ), Calling fl_module_path()
from "/foo/bar/baz.py" produces the result "/foo/bar", "baz",
".py"
"""
filename = os.path.realpath(sys._getframe(level).f_code.co_filename)
path, fname = os.path.split(filename)
base, ext = os.path.splitext(fname)
path = path.replace(os.path.sep, '/')
return path, base, ext
[docs]def fl_module_name(level=1):
"""
Get the name of the module <level> levels above this function
:param int level: Level in the stack of the module calling this function
(default = 1, function calling ``fl_module_name``)
:returns: Module name.
:rtype: str
:Example: mymodule = fl_module_name( ) Calling fl_module_name() from
"/foo/bar/baz.py" returns "baz"
"""
package = sys._getframe(level).f_code.co_name
return package
[docs]def fl_program_version(level=None):
"""
Return the __version__ string from the top-level program, where defined.
If it is not defined, return an empty string.
:param int level: level in the stack of the main script
(default = maximum level in the stack)
:returns: version string (defined as the ``__version__`` global variable)
"""
if not level:
import inspect
level = len(inspect.stack()) - 1
f = sys._getframe(level)
if '__version__' in f.f_globals:
return f.f_globals['__version__']
else:
return ''
[docs]def fl_load_file(filename, comments='%', delimiter=',', skiprows=0):
"""
Load a delimited text file -- uses :func:`numpy.genfromtxt`
:param filename: File, filename, or generator to read
:type filename: file or str
:param comments: (default '%') indicator
:type comments: str, optional
:param delimiter: The string used to separate values.
:type delimiter: str, int or sequence, optional
"""
return np.genfromtxt(filename, comments=comments, delimiter=delimiter,
skip_header=skiprows)
[docs]def fl_save_file(filename, data, header='', delimiter=',', fmt='%.18e'):
"""
Save data to a file.
Does some basic checks to ensure the path exists before attempting
to write the file. Uses :class:`numpy.savetxt` to save the data.
:param str filename: Path to the destination file.
:param data: Array data to be written to file.
:param str header: Column headers (optional).
:param str delimiter: Field delimiter (default ',').
:param str fmt: Format statement for writing the data.
"""
directory, fname = os.path.split(filename)
if not os.path.isdir(directory):
os.makedirs(directory)
try:
np.savetxt(filename, data, header=header, delimiter=delimiter,
fmt=fmt, comments='%')
except TypeError:
np.savetxt(filename, data, delimiter=delimiter, fmt=fmt, comments='%')
[docs]def fl_get_stat(filename, chunk_whole=2 ** 16):
"""
Get basic statistics of filename - namely directory, name (excluding
base path), md5sum and the last modified date. Useful for checking
if a file has previously been processed.
:param str filename: Filename to check.
:param int chunk_whole: (optional) chunk size (for md5sum calculation).
:returns: path, name, md5sum, modification date for the file.
:raises TypeError: if the input file is not a string.
:raises IOError: if the file is not a valid file, or if the file
cannot be opened.
:Example: dir, name, md5sum, moddate = fl_get_stat(filename)
"""
try:
fh = open(filename)
fh.close()
except:
logger.exception("Cannot open %s" % (filename))
raise IOError("Cannot open %s" % (filename))
try:
directory, fname = os.path.split(filename)
except:
logger.exception('Input file is not a string')
raise TypeError('Input file is not a string')
try:
si = os.stat(filename)
except IOError:
logger.exception('Input file is not a valid file: %s' % (filename))
raise IOError('Input file is not a valid file: %s' % (filename))
moddate = ctime(si.st_mtime)
m = md5_constructor()
f = open(filename, 'rb')
while True:
chunk = f.read(chunk_whole)
if not chunk:
break
m.update(chunk)
md5sum = m.hexdigest()
return directory, fname, md5sum, moddate
[docs]def fl_config_file(extension='.ini', prefix='', level=None):
"""
Build a configuration filename (default extension .ini) based on the
name and path of the function/module calling this function. Can also
be useful for setting log file names automatically.
If prefix is passed, this is preprended to the filename.
:param str extension: file extension to use (default '.ini'). The
period ('.') must be included.
:param str prefix: Optional prefix to the filename (default '').
:param level: Optional level in the stack of the main script
(default = maximum level in the stack).
:returns: Full path of calling function/module, with the source file's
extension replaced with extension, and optionally prefix
inserted after the last path separator.
:Example: configFile = fl_config_file('.ini') Calling fl_config_file from
/foo/bar/baz.py should return /foo/bar/baz.ini
"""
if not level:
import inspect
level = len(inspect.stack())
path, base, ext = fl_module_path(level)
config_file = os.path.join(path, prefix + base + extension)
config_file = config_file.replace(os.path.sep, '/')
return config_file
[docs]def fl_start_log(log_file, log_level, verbose=False, datestamp=False,
newlog=True):
"""
Start logging to log_file all messages of log_level and higher.
Setting ``verbose=True`` will report all messages to STDOUT as well.
:param str log_file: Full path to log file.
:param str log_level: String specifiying one of the standard Python logging
levels ('NOTSET','DEBUG','INFO','WARNING','ERROR',
'CRITICAL')
:param boolean verbose: ``True`` will echo all logging calls to STDOUT
:param boolean datestamp: ``True`` will include a timestamp of the creation
time in the filename.
:param boolean newlog: ``True`` will create a new log file each time this
function is called. ``False`` will append to the
existing file.
:returns: :class:`logging.logger` object.
:Example: fl_start_log('/home/user/log/app.log', 'INFO', verbose=True)
"""
if datestamp:
b, e = os.path.splitext(log_file)
curdate = datetime.datetime.now()
curdatestr = curdate.strftime('%Y%m%d%H%M')
# The lstrip on the extension is required as splitext leaves it on.
log_file = "%s.%s.%s" % (b, curdatestr, e.lstrip('.'))
log_dir = os.path.dirname(os.path.realpath(log_file))
if not os.path.isdir(log_dir):
try:
os.makedirs(log_dir)
except OSError:
# Unable to create the directory, so stick it in the
# current working directory:
path, fname = os.path.split(log_file)
log_file = os.path.join(os.getcwd(), fname)
if newlog:
mode = 'w'
else:
mode = 'a'
logging.basicConfig(level=getattr(logging, log_level),
format='%(asctime)s: %(message)s',
datefmt='%Y-%m-%d %H:%M:%S',
filename=log_file,
filemode=mode)
logger = logging.getLogger()
if len(logger.handlers) < 2:
# Assume that the second handler is a StreamHandler for verbose
# logging. This ensures we do not create multiple StreamHandler
# instances that will *each* print to STDOUT
if verbose and sys.stdout.isatty():
# If set to true, all logging calls will also be printed to the
# console (i.e. STDOUT)
console = logging.StreamHandler()
console.setLevel(getattr(logging, log_level))
formatter = logging.Formatter('%(asctime)s: %(message)s',
'%H:%M:%S', )
console.setFormatter(formatter)
logger.addHandler(console)
logger.info('Started log file %s (detail level %s)' %
(log_file, log_level))
logger.info('Running %s (pid %d)' % (sys.argv[0], os.getpid()))
logger.info('Version %s' % (fl_program_version()))
return logger
[docs]def fl_log_fatal_error(tblines):
"""
Log the error messages normally reported in a traceback so that
all error messages can be caught, then exit. The input 'tblines'
is created by calling ``traceback.format_exc().splitlines()``.
:param list tblines: List of lines from the traceback.
"""
for line in tblines:
logger.critical(line.lstrip())
sys.exit()
[docs]def fl_mod_date(filename, dateformat='%Y-%m-%d %H:%M:%S'):
"""
Return the last modified date of the input file
:param str filename: file name (full path).
:param str dateformat: Format string for the date (default
'%Y-%m-%d %H:%M:%S')
:returns: File modification date/time as a string
:rtype: str
:Example: modDate = fl_mod_date( 'C:/foo/bar.csv' ,
dateformat='%Y-%m-%dT%H:%M:%S' )
"""
try:
si = os.stat(filename)
except IOError:
logger.exception('Input file is not a valid file: %s' % (filename))
raise IOError('Input file is not a valid file: %s' % (filename))
moddate = localtime(si.st_mtime)
return strftime(dateformat, moddate)
[docs]def fl_size(filename):
"""
Return the size of the input file in bytes
:param str filename: Full path to the file.
:returns: File size in bytes.
:rtype: int
:Example: file_size = fl_size( 'C:/foo/bar.csv' )
"""
try:
si = os.stat(filename)
except WindowsError:
logger.exception('Input file is not a valid file: %s' % (filename))
raise IOError('Input file is not a valid file: %s' % (filename))
except IOError:
logger.exception('Input file is not a valid file: %s' % (filename))
raise IOError('Input file is not a valid file: %s' % (filename))
else:
size = si.st_size
return size