Source code for benten_client.util

# coding:utf-8
"""
  Utilities

  Copyright (C) 2020 JASRI All Rights Reserved.
"""

from __future__ import print_function

import os
import sys
import codecs
import json
import yaml
import hashlib
import time
import datetime
import collections
import codecs
import xmltodict

from logging import getLogger, StreamHandler, DEBUG
logger = getLogger(__name__)
handler = StreamHandler()
handler.setLevel(DEBUG)
logger.setLevel(DEBUG)
logger.addHandler(handler)
logger.propagate = False

from os.path import expanduser

try:
    from . import rest
except:
    import rest

from . import config


[docs]class Error(BaseException): def __init__(self, message, domain=None, flag_print=True): self.__domain = domain self.__message = message if flag_print: log(" ==> util.Error()", config.ERROR) if domain is not None: log(" domain = {}".format(self.__domain), config.ERROR) log(" message = {}".format(self.__message), config.ERROR)
[docs] def domain(self): return self.__domain
[docs] def message(self): return self.__message
[docs]def workdir(): val = os.getenv("BENTEN_WORKDIR", "{}/.benten".format(expanduser("~"))) return val
[docs]def benten_agent_uri(): fname = benten_config_filename() try: vdict = load_yaml(fname) uri = vdict["benten_agent_uri"] except: uri = config.benten_agent_uri_default return uri
[docs]def benten_uploader_uri(): fname = benten_config_filename() try: vdict = load_yaml(fname) uri = vdict["benten_uploader_uri"] except: uri = config.benten_uploader_uri_default return uri
[docs]def benten_uploader_host(): uri = benten_uploader_uri() try: value = uri.split("//")[1] host = value.split(":")[0] except: host = None return host
[docs]def benten_uploader_port(): uri = benten_uploader_uri() try: value = uri.split("//")[1] port = int(value.split(":")[1]) except: port = None return port
[docs]def benten_uploader_enable(): fname = benten_config_filename() try: vdict = load_yaml(fname) uri = vdict["benten_uploader_enable"] except: uri = config.benten_uploader_enable_default return uri
[docs]def benten_uploader_secure_data(): fname = benten_config_filename() try: vdict = load_yaml(fname) uri = vdict["benten_uploader_secure_data"] except: uri = config.benten_uploader_secure_data_default return uri
[docs]def log(message, level=config.DEBUG, flush=False): if level == config.DEBUG: logger.debug(message) elif level == config.INFO: logger.info(message) elif level == config.WARNING: logger.warning(message) elif level == config.ERROR: logger.error(message) elif level == config.CRITICAL: logger.critical(message) if flush: logger.handlers[0].flush()
[docs]def makedirs(dirname): try: os.makedirs(dirname, exist_ok=True) # availavle after python3.3 except: try: if not os.path.exists(dirname): os.makedirs(dirname) except: raise Error("Not possible to create directory(={})".format(dirname))
[docs]def work_tmp_dir(): tmp_dir = workdir() + "/tmp" makedirs(tmp_dir) return tmp_dir
[docs]def check_workdir(): wdir = os.getenv("BENTEN_WORKDIR", "{}/.benten".format(expanduser("~"))) if wdir is None: message = "BENTEN_WORKDIR is not defined" raise Error(message, domain=error_domain(__file__, sys._getframe()))
[docs]def error_domain(filename, frame, classobj=None): v = "File '%s', line %d, in %s" % ( filename, frame.f_lineno, frame.f_code.co_name) if classobj is not None: v += ", %s" % classobj return v
[docs]def auth_filename(): file_auth = "%s/%s" % (workdir(), config.auth_info_file) return file_auth
[docs]def benten_config_filename(): file_config = "%s/%s" % (workdir(), config.benten_config_file) return file_config
[docs]def local_storage_config_filename(): file_local_storage_config = "%s/%s" % (workdir(), config.local_storage_config_file) return file_local_storage_config
[docs]def access_token(): auth_dict = load_json(auth_filename()) val = auth_dict.get("access_token") return val
[docs]def refresh_token(): auth_dict = load_json(auth_filename()) val = auth_dict.get("refresh_token") return val
[docs]def local_storage_config(): local_storage_config_dict = load_yaml(local_storage_config_filename()) val = local_storage_config_dict return val
[docs]def check_access_token(val): dr = rest.Repository() dr.authorize(val, debug=False) ret_dict = dr.auth.userinfo.post() if "error" in ret_dict: message = ret_dict["error"]["message"] raise Error(message, domain=error_domain(__file__, sys._getframe()))
[docs]def json_response(ret): try: ret_dict = json.loads(ret.text) except: ret_dict = {"text": ret.text} return ret_dict
[docs]def load_json(filename): ret_dict = collections.OrderedDict() with codecs.open(filename, "r", "utf-8") as f: ret_dict = json.load(f, object_pairs_hook=collections.OrderedDict) return ret_dict
[docs]def load_yaml(filename): yaml.add_constructor(yaml.resolver.BaseResolver.DEFAULT_MAPPING_TAG, lambda loader, node: collections.OrderedDict(loader.construct_pairs(node))) vdict = collections.OrderedDict() with codecs.open(filename, "r", "utf-8") as f: vdict = yaml.load(f) return vdict
[docs]def load_xml(filename): result = collections.OrderedDict() with codecs.open(filename, "r", "utf-8") as f: result = xmltodict.parse(f.read()) vdict = result.get("root", collections.OrderedDict()) return vdict
[docs]def save_auth(auth_dict): file_auth = auth_filename() log("==> save_auth") out_json(auth_dict, filename=file_auth)
[docs]def out_json(out_dict, debug=True, filename=None): if debug: if filename is not None: log("--> filename = {}".format(filename)) log(json.dumps(out_dict, indent=4, separators=(',', ':'), ensure_ascii=False)) if filename is not None: with codecs.open(filename, "w", "utf-8") as f: json.dump(out_dict, f, indent=4, separators=( ',', ':'), ensure_ascii=False)
[docs]def str_json(out_dict): return json.dumps(out_dict, indent=4, separators=(',', ':'), ensure_ascii=False)
[docs]def out_yaml(out_dict, debug=True, filename=None): def represent_odict(dumper, instance): return dumper.represent_mapping('tag:yaml.org,2002:map', instance.items()) yaml.add_representer(collections.OrderedDict, represent_odict) if debug: if filename is not None: log("--> filename = {}".format(filename)) log(json.dumps(out_dict, indent=4, separators=(',', ':'), ensure_ascii=False)) if filename is not None: with codecs.open(filename, "w", "utf-8") as f: yaml.dump(out_dict, f, encoding="utf-8", allow_unicode=True, default_flow_style=False)
[docs]def out_xml(out_dict, debug=True, filename=None): out_dict_xml = {} out_dict_xml["root"] = out_dict result = xmltodict.unparse(out_dict_xml, pretty=True) if debug: if filename is not None: log("--> filename = {}".format(filename)) log(result) if filename is not None: with codecs.open(filename, "w", "utf-8") as f: f.write(result)
[docs]def encode_json(vdict): return json.dumps(vdict).encode("utf-8")
[docs]def decode_json(val_str): return json.loads(val_str.decode())
[docs]def checksum(filename): md5 = hashlib.md5() with open(filename, 'rb') as f: for chunk in iter(lambda: f.read(2048 * md5.block_size), b''): md5.update(chunk) hash = md5.hexdigest() return hash
[docs]def str_datetime_reformed(strtime): try: v = datetime.datetime.strptime(strtime, "%Y-%m-%d %H-%M-%S") except: try: v = datetime.datetime.strptime(strtime, "%Y/%m/%d %H:%M:%S") except: try: v = datetime.datetime.strptime(strtime, "%Y-%m-%d") except: try: v = datetime.datetime.strptime(strtime, "%Y/%m/%d") except: v = None if v is not None: v = v.strftime("%Y-%m-%d %H:%M:%S") return v
[docs]def strtime_unixtime(t=None, flag_msec=False): if t is None: t = time.time() tstr = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(int(t))) if flag_msec: dt = int(1000. * (t - int(t))) tstr += ".%03d" % dt return tstr
[docs]def strtime_datetime(t=None, flag_usec=False): if t is None: t = datetime.datetime.now() tstr = t.strftime("%Y-%m-%d %H:%M:%S") if flag_usec: tstr += ".%06d" % t.microsecond return tstr
[docs]def strtime_file(filename): return strtime_unixtime(os.stat(filename).st_mtime)
[docs]def mktime_file(filename): return os.stat(filename).st_mtime
[docs]def mktime(strval): v = None try: v = time.mktime(time.strptime(strval, '%Y-%m-%d %H:%M:%S')) except: try: v = time.mktime(time.strptime(strval, '%Y/%m/%d %H:%M:%S')) except: try: v = time.mktime(time.strptime(strval, '%Y-%m-%d')) except: v = time.mktime(time.strptime(strval, '%Y/%m/%d')) return v
[docs]def headers_authorization(val_access_token): v = {} v["Authorization"] = "Bearer %s" % val_access_token return v
[docs]def add_info_file_dict(file_dict, local_storage_path, facility, class_name, disk_name): top_dir = "/%s/%s/%s" % (facility, class_name, disk_name) for f in file_dict: fname = local_storage_path + "/" + f[len(top_dir) + 1:] vdict = file_dict[f] if vdict is None: file_dict[f] = {} file_dict[f]["hash"] = checksum(fname) file_dict[f]["size"] = os.path.getsize(fname) if "time" not in file_dict[f]: file_dict[f]["time"] = strtime_file(fname) return file_dict
[docs]def add_info_file_each_dict(file_each_dict, local_storage_path, facility, class_name, disk_name): filename = file_each_dict["name"] if file_each_dict is None: file_each_dict = {} top_dir = "/%s/%s/%s" % (facility, class_name, disk_name) fname = local_storage_path + "/" + filename[len(top_dir) + 1:] file_each_dict["hash"] = checksum(fname) # need to convert into string to use with multipartencoder file_each_dict["size"] = str(os.path.getsize(fname)) if "time" not in file_each_dict: file_each_dict["time"] = strtime_file(fname) return file_each_dict
[docs]def repository_directory(config_value, current_directory=None, flag_all=False, flag_print=True): local_storage_path = config_value.local_storage_path facility = config_value.facility class_name = config_value.class_name disk_name = config_value.disk_name top_directory = "/%s/%s/%s" % (facility, class_name, disk_name) if flag_all: return top_directory directory = os.getcwd() if current_directory is not None: directory = current_directory if os.name == "nt": directory = directory.replace("\\", "/") if directory.find(local_storage_path) != 0: message = "invalid working directory=%s (the directory should to be located under %s)" % \ (directory, local_storage_path) raise Error(message, domain=error_domain( __file__, sys._getframe()), flag_print=flag_print) directory = directory[len(local_storage_path):] if directory == "": directory = top_directory else: directory = top_directory + directory return directory
[docs]def register_name_list(register_basename_list, repository_directory): vlist = [] if register_basename_list is not None: for reg in register_basename_list: if reg is None: continue parent_directory = os.path.dirname(reg) reg_input = reg if parent_directory == "": reg_input = repository_directory + "/" + reg vlist.append(reg_input) return vlist
[docs]def register_name(register_basename, repository_directory): try: return register_name_list([register_basename], repository_directory)[0] except: return None
[docs]def reference_path(path, repository_directory, flag_full_path=False): if flag_full_path: return path else: head = "" if repository_directory != "/": head = repository_directory v = path[len(head) + 1:] if v == "": v = "/" return v