# coding:utf-8
"""
Utilities
Copyright (C) 2020 JASRI All Rights Reserved.
"""
from __future__ import print_function
import os
import sys
import codecs
import json
import yaml
import hashlib
import time
import datetime
import collections
import codecs
import xmltodict
from logging import getLogger, StreamHandler, DEBUG
logger = getLogger(__name__)
handler = StreamHandler()
handler.setLevel(DEBUG)
logger.setLevel(DEBUG)
logger.addHandler(handler)
logger.propagate = False
from os.path import expanduser
try:
from . import rest
except:
import rest
from . import config
[docs]class Error(BaseException):
def __init__(self, message, domain=None, flag_print=True):
self.__domain = domain
self.__message = message
if flag_print:
log(" ==> util.Error()", config.ERROR)
if domain is not None:
log(" domain = {}".format(self.__domain), config.ERROR)
log(" message = {}".format(self.__message), config.ERROR)
[docs] def domain(self):
return self.__domain
[docs] def message(self):
return self.__message
[docs]def workdir():
val = os.getenv("BENTEN_WORKDIR", "{}/.benten".format(expanduser("~")))
return val
[docs]def benten_agent_uri():
fname = benten_config_filename()
try:
vdict = load_yaml(fname)
uri = vdict["benten_agent_uri"]
except:
uri = config.benten_agent_uri_default
return uri
[docs]def benten_uploader_uri():
fname = benten_config_filename()
try:
vdict = load_yaml(fname)
uri = vdict["benten_uploader_uri"]
except:
uri = config.benten_uploader_uri_default
return uri
[docs]def benten_uploader_host():
uri = benten_uploader_uri()
try:
value = uri.split("//")[1]
host = value.split(":")[0]
except:
host = None
return host
[docs]def benten_uploader_port():
uri = benten_uploader_uri()
try:
value = uri.split("//")[1]
port = int(value.split(":")[1])
except:
port = None
return port
[docs]def benten_uploader_enable():
fname = benten_config_filename()
try:
vdict = load_yaml(fname)
uri = vdict["benten_uploader_enable"]
except:
uri = config.benten_uploader_enable_default
return uri
[docs]def benten_uploader_secure_data():
fname = benten_config_filename()
try:
vdict = load_yaml(fname)
uri = vdict["benten_uploader_secure_data"]
except:
uri = config.benten_uploader_secure_data_default
return uri
[docs]def log(message, level=config.DEBUG, flush=False):
if level == config.DEBUG:
logger.debug(message)
elif level == config.INFO:
logger.info(message)
elif level == config.WARNING:
logger.warning(message)
elif level == config.ERROR:
logger.error(message)
elif level == config.CRITICAL:
logger.critical(message)
if flush:
logger.handlers[0].flush()
[docs]def makedirs(dirname):
try:
os.makedirs(dirname, exist_ok=True) # availavle after python3.3
except:
try:
if not os.path.exists(dirname):
os.makedirs(dirname)
except:
raise Error("Not possible to create directory(={})".format(dirname))
[docs]def work_tmp_dir():
tmp_dir = workdir() + "/tmp"
makedirs(tmp_dir)
return tmp_dir
[docs]def check_workdir():
wdir = os.getenv("BENTEN_WORKDIR", "{}/.benten".format(expanduser("~")))
if wdir is None:
message = "BENTEN_WORKDIR is not defined"
raise Error(message, domain=error_domain(__file__, sys._getframe()))
[docs]def error_domain(filename, frame, classobj=None):
v = "File '%s', line %d, in %s" % (
filename, frame.f_lineno, frame.f_code.co_name)
if classobj is not None:
v += ", %s" % classobj
return v
[docs]def auth_filename():
file_auth = "%s/%s" % (workdir(), config.auth_info_file)
return file_auth
[docs]def benten_config_filename():
file_config = "%s/%s" % (workdir(), config.benten_config_file)
return file_config
[docs]def local_storage_config_filename():
file_local_storage_config = "%s/%s" % (workdir(),
config.local_storage_config_file)
return file_local_storage_config
[docs]def access_token():
auth_dict = load_json(auth_filename())
val = auth_dict.get("access_token")
return val
[docs]def refresh_token():
auth_dict = load_json(auth_filename())
val = auth_dict.get("refresh_token")
return val
[docs]def local_storage_config():
local_storage_config_dict = load_yaml(local_storage_config_filename())
val = local_storage_config_dict
return val
[docs]def check_access_token(val):
dr = rest.Repository()
dr.authorize(val, debug=False)
ret_dict = dr.auth.userinfo.post()
if "error" in ret_dict:
message = ret_dict["error"]["message"]
raise Error(message, domain=error_domain(__file__, sys._getframe()))
[docs]def json_response(ret):
try:
ret_dict = json.loads(ret.text)
except:
ret_dict = {"text": ret.text}
return ret_dict
[docs]def load_json(filename):
ret_dict = collections.OrderedDict()
with codecs.open(filename, "r", "utf-8") as f:
ret_dict = json.load(f, object_pairs_hook=collections.OrderedDict)
return ret_dict
[docs]def load_yaml(filename):
yaml.add_constructor(yaml.resolver.BaseResolver.DEFAULT_MAPPING_TAG,
lambda loader, node: collections.OrderedDict(loader.construct_pairs(node)))
vdict = collections.OrderedDict()
with codecs.open(filename, "r", "utf-8") as f:
vdict = yaml.load(f)
return vdict
[docs]def load_xml(filename):
result = collections.OrderedDict()
with codecs.open(filename, "r", "utf-8") as f:
result = xmltodict.parse(f.read())
vdict = result.get("root", collections.OrderedDict())
return vdict
[docs]def save_auth(auth_dict):
file_auth = auth_filename()
log("==> save_auth")
out_json(auth_dict, filename=file_auth)
[docs]def out_json(out_dict, debug=True, filename=None):
if debug:
if filename is not None:
log("--> filename = {}".format(filename))
log(json.dumps(out_dict, indent=4,
separators=(',', ':'), ensure_ascii=False))
if filename is not None:
with codecs.open(filename, "w", "utf-8") as f:
json.dump(out_dict, f, indent=4, separators=(
',', ':'), ensure_ascii=False)
[docs]def str_json(out_dict):
return json.dumps(out_dict, indent=4,
separators=(',', ':'), ensure_ascii=False)
[docs]def out_yaml(out_dict, debug=True, filename=None):
def represent_odict(dumper, instance):
return dumper.represent_mapping('tag:yaml.org,2002:map', instance.items())
yaml.add_representer(collections.OrderedDict, represent_odict)
if debug:
if filename is not None:
log("--> filename = {}".format(filename))
log(json.dumps(out_dict, indent=4,
separators=(',', ':'), ensure_ascii=False))
if filename is not None:
with codecs.open(filename, "w", "utf-8") as f:
yaml.dump(out_dict, f, encoding="utf-8",
allow_unicode=True, default_flow_style=False)
[docs]def out_xml(out_dict, debug=True, filename=None):
out_dict_xml = {}
out_dict_xml["root"] = out_dict
result = xmltodict.unparse(out_dict_xml, pretty=True)
if debug:
if filename is not None:
log("--> filename = {}".format(filename))
log(result)
if filename is not None:
with codecs.open(filename, "w", "utf-8") as f:
f.write(result)
[docs]def encode_json(vdict):
return json.dumps(vdict).encode("utf-8")
[docs]def decode_json(val_str):
return json.loads(val_str.decode())
[docs]def checksum(filename):
md5 = hashlib.md5()
with open(filename, 'rb') as f:
for chunk in iter(lambda: f.read(2048 * md5.block_size), b''):
md5.update(chunk)
hash = md5.hexdigest()
return hash
[docs]def strtime_unixtime(t=None, flag_msec=False):
if t is None:
t = time.time()
tstr = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(int(t)))
if flag_msec:
dt = int(1000. * (t - int(t)))
tstr += ".%03d" % dt
return tstr
[docs]def strtime_datetime(t=None, flag_usec=False):
if t is None:
t = datetime.datetime.now()
tstr = t.strftime("%Y-%m-%d %H:%M:%S")
if flag_usec:
tstr += ".%06d" % t.microsecond
return tstr
[docs]def strtime_file(filename):
return strtime_unixtime(os.stat(filename).st_mtime)
[docs]def mktime_file(filename):
return os.stat(filename).st_mtime
[docs]def mktime(strval):
v = None
try:
v = time.mktime(time.strptime(strval, '%Y-%m-%d %H:%M:%S'))
except:
try:
v = time.mktime(time.strptime(strval, '%Y/%m/%d %H:%M:%S'))
except:
try:
v = time.mktime(time.strptime(strval, '%Y-%m-%d'))
except:
v = time.mktime(time.strptime(strval, '%Y/%m/%d'))
return v
[docs]def add_info_file_dict(file_dict, local_storage_path, facility, class_name, disk_name):
top_dir = "/%s/%s/%s" % (facility, class_name, disk_name)
for f in file_dict:
fname = local_storage_path + "/" + f[len(top_dir) + 1:]
vdict = file_dict[f]
if vdict is None:
file_dict[f] = {}
file_dict[f]["hash"] = checksum(fname)
file_dict[f]["size"] = os.path.getsize(fname)
if "time" not in file_dict[f]:
file_dict[f]["time"] = strtime_file(fname)
return file_dict
[docs]def add_info_file_each_dict(file_each_dict, local_storage_path, facility, class_name, disk_name):
filename = file_each_dict["name"]
if file_each_dict is None:
file_each_dict = {}
top_dir = "/%s/%s/%s" % (facility, class_name, disk_name)
fname = local_storage_path + "/" + filename[len(top_dir) + 1:]
file_each_dict["hash"] = checksum(fname)
# need to convert into string to use with multipartencoder
file_each_dict["size"] = str(os.path.getsize(fname))
if "time" not in file_each_dict:
file_each_dict["time"] = strtime_file(fname)
return file_each_dict
[docs]def repository_directory(config_value,
current_directory=None,
flag_all=False,
flag_print=True):
local_storage_path = config_value.local_storage_path
facility = config_value.facility
class_name = config_value.class_name
disk_name = config_value.disk_name
top_directory = "/%s/%s/%s" % (facility, class_name, disk_name)
if flag_all:
return top_directory
directory = os.getcwd()
if current_directory is not None:
directory = current_directory
if os.name == "nt":
directory = directory.replace("\\", "/")
if directory.find(local_storage_path) != 0:
message = "invalid working directory=%s (the directory should to be located under %s)" % \
(directory, local_storage_path)
raise Error(message, domain=error_domain(
__file__, sys._getframe()), flag_print=flag_print)
directory = directory[len(local_storage_path):]
if directory == "":
directory = top_directory
else:
directory = top_directory + directory
return directory
[docs]def register_name_list(register_basename_list, repository_directory):
vlist = []
if register_basename_list is not None:
for reg in register_basename_list:
if reg is None:
continue
parent_directory = os.path.dirname(reg)
reg_input = reg
if parent_directory == "":
reg_input = repository_directory + "/" + reg
vlist.append(reg_input)
return vlist
[docs]def register_name(register_basename, repository_directory):
try:
return register_name_list([register_basename], repository_directory)[0]
except:
return None
[docs]def reference_path(path, repository_directory,
flag_full_path=False):
if flag_full_path:
return path
else:
head = ""
if repository_directory != "/":
head = repository_directory
v = path[len(head) + 1:]
if v == "":
v = "/"
return v