Source code for benten_client.local_storage_util

# coding:utf-8
"""
   local_storage utilities

   Copyright (C) 2020 JASRI All Rights Reserved.
"""

import sys
import os
import re
import copy
import collections
import zipfile
import codecs

from . import util
from . import config
from . import rest

[docs]class ConfigFile(): def __init__(self, filename, flag_replace=False): self.__filename = filename flag = False if flag_replace: flag = True elif os.path.isfile(self.__filename) is False: flag = True if flag: util.log("== ConfigFile::init() ==") v = collections.OrderedDict() v["local_storage_path"] = None v["facility"] = None v["class_name"] = None v["disk_name"] = None util.out_yaml(v, filename=self.__filename)
[docs] def list(self): util.log("== ConfigFile::list() ==") vdict = util.load_yaml(self.__filename) util.out_json(vdict)
[docs] def set(self, key, value, debug=True): if key is None: message = "key=%s is not set" % key raise util.Error(message, domain=util.error_domain( __file__, sys._getframe(), self.__class__)) if value is None: message = "value=%s not set" % value raise util.Error(message, domain=util.error_domain( __file__, sys._getframe(), self.__class__)) if debug: util.log("== ConfigFile::set() ==") util.log("--> {} = {}".format(key, value)) vdict = util.load_yaml(self.__filename) vdict[key] = value util.out_yaml(vdict, filename=self.__filename, debug=debug)
[docs] def reset(self): util.log("== ConfigFile::reset() ==") v = collections.OrderedDict() v["local_storage_path"] = None v["facility"] = None v["class_name"] = None v["disk_name"] = None util.out_yaml(v, filename=self.__filename)
[docs] def load(self): vdict = util.load_yaml(self.__filename) keys = " ".join(vdict.keys()) Config = collections.namedtuple("Config", keys) val = Config(**vdict) return val
[docs]class ProposalNumberFile(): def __init__(self, filename, flag_replace=False, debug=True): self.__filename = filename self.__debug = debug flag = False if flag_replace: flag = True elif os.path.isfile(self.__filename) is False: flag = True if flag: util.log("== ProposalNumberFile::init() (file={}) ==".format(self.__filename)) v = collections.OrderedDict() v["proposal_number"] = None util.out_yaml(v, filename=self.__filename, debug=False)
[docs] def list(self): util.log("== ProposalNumberFile::list() ==") vdict = util.load_yaml(self.__filename) util.out_json(vdict)
[docs] def set_proposal_number(self, value): if self.__debug: util.log("== ProposalNumberFile::set_proposal_number() ==") util.log("--> {}".format(value)) vdict = util.load_yaml(self.__filename) vdict["proposal_number"] = value util.out_yaml(vdict, filename=self.__filename, debug=self.__debug)
[docs] def clear(self): util.log("== ProposalNumberFile::clear() ==") vdict = util.load_yaml(self.__filename) vdict["proposal_number"] = None util.out_yaml(vdict, filename=self.__filename)
[docs] def proposal_number(self): vdict = util.load_yaml(self.__filename) val = None if "proposal_number" in vdict: val = vdict["proposal_number"] return val
[docs]class RepositoryInfoFile(): def __init__(self, filename, debug=True): self.__filename = filename self.__debug = debug
[docs] def output(self, access_token, repository_directory, register_basename=None): # ... get file_dict under base_directory repo = rest.Repository() repo.authorize(access_token, debug=False) v = {} if register_basename is not None: register_name = util.register_name(register_basename, repository_directory) v["register_name_list"] = [register_name] elif repository_directory != "/": v["directory_list"] = [repository_directory] v["flag_recursive"] = 1 v["flag_register_name"] = 1 v["flag_own"] = 1 vdict = repo.data.files.post(**v) file_dict = collections.OrderedDict() file_list = vdict.get("file_list", []) for f in file_list: f["time"] = int(util.mktime(f["time"])) file_dict[f["name"]] = f # ... get register_name_dict under base_directory register_name_dict = collections.OrderedDict() register_name_list = vdict.get("register_name_list", []) for r in register_name_list: register_name_dict[r] = collections.OrderedDict() # ... check if repository_directory is not under register_name_directory # if repository_directory is not None: # for register_name in register_name_dict: # if repository_directory.find(register_name) >= 0: # message = "repository_directory=%s should not be under directory for register_name=%s" % \ # (repository_directory, register_name) # raise util.Error(message, domain=util.error_domain( # __file__, sys._getframe(), self.__class__)) # ... output results if self.__debug: util.log("==> output_repository_info_file: {}".format(self.__filename)) vout_dict = collections.OrderedDict() vout_dict["time"] = util.strtime_datetime() if repository_directory is not None: vout_dict["repository_directory"] = repository_directory if register_basename is not None: vout_dict["register_basename"] = register_basename vout_dict["file_dict"] = file_dict vout_dict["register_name_dict"] = register_name_dict util.out_json(vout_dict, debug=False, filename=self.__filename)
[docs] def load(self): return util.load_json(self.__filename)
[docs]class LocalStorageInfoFile(): def __init__(self, filename, debug=True): self.__filename = filename self.__debug = debug
[docs] def output(self, config_value, repository_directory, repository_info_dict, register_basename=None): local_storage_path = config_value.local_storage_path repository_top_directory = "/%s/%s/%s" % \ (config_value.facility, config_value.class_name, config_value.disk_name) register_name_input = None if register_basename is not None: register_name_input = util.register_name(register_basename, repository_directory) # ... extract dict from repository_entry_file repository_info_file_dict = repository_info_dict["file_dict"] register_name_dict = repository_info_dict["register_name_dict"] local_storage_keys = [] # ... define top_directory top_directory = local_storage_path if repository_directory != repository_top_directory: top_directory += repository_directory[len( repository_top_directory):] # ... check local storage files under top_directory # compare with repository_info_file_dict and assign status # (status=normal, updated, ready, noMetadata, deleted) directory_dict = collections.defaultdict( lambda: collections.OrderedDict()) for root, dirs, files in os.walk(top_directory): root = root.replace("\\","/") # replaced for windows directory) file_local_storage_dir = root[len(local_storage_path):] if file_local_storage_dir == "": file_local_storage_dir = repository_top_directory else: file_local_storage_dir = repository_top_directory + file_local_storage_dir directory_dict[file_local_storage_dir]["status"] = "noMetadata" flag_repository = False for f in files: file_path = "%s/%s" % (root, f) file_local_storage_path = repository_top_directory + \ "%s/%s" % (root[len(local_storage_path):], f) if register_name_input is not None: register_name_input_escape = re.escape(register_name_input) str_match = "^%s((|\.[^/]+)|\/.+)$" % register_name_input_escape if not re.match(str_match, file_local_storage_path): continue if "file_dict" not in directory_dict[file_local_storage_dir]: directory_dict[file_local_storage_dir]["file_dict"] = collections.OrderedDict( ) v = collections.OrderedDict() v["status"] = "noMetadata" v["time_storage"] = int(util.mktime_file(file_path)) v["size_storage"] = os.path.getsize(file_path) # v["hash_storage"] = util.checksum(file_path) # too slow for big data register_name = None if file_local_storage_path in repository_info_file_dict: flag_repository = True v_rep = repository_info_file_dict[file_local_storage_path] for key in v_rep: v[key] = v_rep[key] register_name = v_rep["register_name"] # if v["hash_storage"] != v["hash"]: if (v["size_storage"] != v["size"]) or \ (v["time_storage"] != v["time"]): v["status"] = "updated" w = collections.OrderedDict() w["status"] = "updated" register_name_dict[register_name][file_local_storage_path] = w dir_test = file_local_storage_dir while True: directory_dict[dir_test]["status"] = "revised" dir_test = os.path.dirname(dir_test) if len(dir_test) <= 1: directory_dict["/"]["status"] = "revised" break else: v["status"] = "normal" if directory_dict[file_local_storage_dir]["status"] not in ["updated", "revised"]: directory_dict[file_local_storage_dir]["status"] = "normal" else: basename = os.path.basename(file_local_storage_path) dirname = os.path.dirname(file_local_storage_path) if dirname == "/": dirname = "" registrer_name = None file_test = "%s/%s" % (dirname, basename.split(".")[0]) while True: if file_test in register_name_dict: w = collections.OrderedDict() w["status"] = "ready" register_name_dict[file_test][file_local_storage_path] = w register_name = file_test v["status"] = "ready" dir_test = file_local_storage_dir while True: directory_dict[dir_test]["status"] = "revised" dir_test = os.path.dirname(dir_test) if len(dir_test) <= 1: directory_dict["/"]["status"] = "revised" break file_test = os.path.dirname(file_test) if len(file_test) <= 1: break if register_name is not None: v["register_name"] = register_name directory_dict[file_local_storage_dir]["file_dict"][file_local_storage_path] = v local_storage_keys.append(file_local_storage_path) if flag_repository: dir_test = file_local_storage_dir while True: if "status" not in directory_dict[dir_test] or \ directory_dict[dir_test]["status"] not in ["updated", "revised"]: directory_dict[dir_test]["status"] = "normal" dir_test = os.path.dirname(dir_test) if len(dir_test) <= 1: dir_test = "/" if "status" not in directory_dict[dir_test] or \ directory_dict[dir_test]["status"] not in ["updated", "revised"]: directory_dict[dir_test]["status"] = "normal" break if directory_dict[file_local_storage_dir]["status"] == "noMetadata": flag_register_name = False dir_test = file_local_storage_dir flag_local_storage_dir_split = file_local_storage_dir.split(".")[ 0] if flag_local_storage_dir_split in register_name_dict: flag_register_name = True directory_dict[file_local_storage_dir]["register_name"] = flag_local_storage_dir_split while True: dir_test = os.path.dirname(dir_test) if len(dir_test) <= 1: break if dir_test in register_name_dict: flag_register_name = True if flag_register_name: directory_dict[dir_test]["status"] = "revised" if "register_name" not in directory_dict[file_local_storage_dir]: directory_dict[file_local_storage_dir]["register_name"] = dir_test # ... check deleted files directory_deleted_set = set() repository_keys = list(repository_info_file_dict.keys()) repository_set = set(repository_keys) local_storage_set = set(local_storage_keys) diff_list = list(repository_set - local_storage_set) for f in diff_list: v_rep = repository_info_file_dict[f] register_name = v_rep["register_name"] w = collections.OrderedDict() w["status"] = "deleted" register_name_dict[register_name][f] = w v = collections.OrderedDict() v["status"] = "deleted" for key in v_rep: v[key] = v_rep[key] file_local_storage_dir = os.path.dirname(f) if "file_dict" not in directory_dict[file_local_storage_dir]: directory_dict[file_local_storage_dir]["file_dict"] = collections.OrderedDict( ) directory_dict[file_local_storage_dir]["file_dict"][f] = v dir_test = os.path.dirname(f) while True: dir_check = "%s%s" % (local_storage_path, dir_test) flag_dir_check = os.path.isdir(dir_check) if flag_dir_check: directory_dict[dir_test]["status"] = "revised" else: directory_dict[dir_test]["status"] = "deleted" directory_deleted_set.add(dir_test) dir_test = os.path.dirname(dir_test) if len(dir_test) <= 1: directory_dict["/"]["status"] = "revised" break # ... check deleted directories for d in directory_deleted_set: d_parent = os.path.dirname(d) if d_parent == "": d_parent = "/" if "deleted_directory_list" not in directory_dict[d_parent]: directory_dict[d_parent]["deleted_directory_list"] = [] directory_dict[d_parent]["deleted_directory_list"].append(d) # ... check register_name for deleted directories directory_tmp_dict = copy.copy(directory_dict) register_name_tmp_dict = copy.copy(register_name_dict) for register_name in register_name_dict: if register_name in directory_tmp_dict: directory_dict[register_name]["register_name"] = register_name del directory_tmp_dict[register_name] dir_test = register_name # upper search while True: dir_test = os.path.dirname(dir_test) if len(dir_test) <= 1: if "/" in directory_tmp_dict: del directory_tmp_dict["/"] break if dir_test in directory_tmp_dict: del directory_tmp_dict[dir_test] # lower search top_directory = local_storage_path + register_name if os.path.isdir(top_directory) is False: continue del register_name_tmp_dict[register_name] for root, dirs, files in os.walk(top_directory): root_local_storage = root[len(local_storage_path):] if root_local_storage == "": root_local_storage = "/" for d in dirs: dir_local_storage = "%s/%s" % (root_local_storage, d) directory_dict[dir_local_storage]["register_name"] = register_name del directory_tmp_dict[dir_local_storage] for reg in register_name_tmp_dict: reg_escape = re.escape(reg) for dir in directory_tmp_dict: str_match = "^%s(|/.+)$" % reg_escape if re.match(str_match, dir): if "register_name" not in directory_dict[dir]: directory_dict[dir]["register_name"] = register_name del directory_tmp_dict[dir] break # ... upper search for directory with noMetadata for d in directory_tmp_dict: v = directory_tmp_dict[d] if v["status"] != "noMetadata": continue while True: if "status" in directory_dict[dir_test] and directory_dict[dir_test]["status"] == "normal": directory_dict[dir_test]["status"] = "revised" dir_test = os.path.dirname(dir_test) if len(dir_test) <= 1: break # ... output the result if self.__debug: util.log("==> output_local_storage_info_file: {}".format(self.__filename)) vout_dict = collections.OrderedDict() vout_dict["time"] = util.strtime_datetime() if repository_directory is not None: vout_dict["repository_directory"] = repository_directory if register_basename is not None: vout_dict["register_basename"] = register_basename vout_dict["directory_dict"] = collections.OrderedDict(directory_dict) vout_dict["register_name_dict"] = collections.OrderedDict( register_name_dict) util.out_json(vout_dict, debug=False, filename=self.__filename)
[docs] def load(self): return util.load_json(self.__filename)
[docs]class Manage(): def __init__(self, local_storage_info_file=None, config_value=None, repository_directory=None, access_token=None, debug=True): self.__local_storage_info_file = local_storage_info_file self.__local_storage_path = config_value.local_storage_path self.__facility = config_value.facility self.__class_name = config_value.class_name self.__disk_name = config_value.disk_name self.__repository_directory = repository_directory self.__access_token = access_token self.__debug = debug self.__local_storage_info_dict = util.load_json( self.__local_storage_info_file) self.__directory_dict = self.__local_storage_info_dict["directory_dict"] self.__register_name_dict = self.__local_storage_info_dict["register_name_dict"] self.__repository_top_directory = "/%s/%s/%s" % ( self.__facility, self.__class_name, self.__disk_name) repository_directory_storage = self.__local_storage_info_dict.get( "repository_directory", None) if repository_directory_storage is not None and \ repository_directory.find(repository_directory_storage) < 0: message = "please execute benten.py init_index under repository_directory=%s)" % \ (repository_directory_storage) raise util.Error(message, domain=util.error_domain( __file__, sys._getframe(), self.__class__))
[docs] def infoDict(self, file_list=None, register_basename_list=None, mode="file", status_val=None, type_val=None, register_name=None, flag_full_path=False, flag_recursive=False): local_storage_directory = self.__local_storage_path + \ self.__repository_directory[len(self.__repository_top_directory):] register_name_select_list = util.register_name_list(register_basename_list, self.__repository_directory) if len(register_name_select_list) > 0: flag_recursive = True directoryList = [] fileList = [] for root, dirs, files in os.walk(local_storage_directory): root_tmp = self.__repository_directory + \ root[len(local_storage_directory):] root_tmp = root_tmp.replace("\\","/") if root_tmp == "/": root_tmp = "" for d in dirs: directoryList.append(root_tmp + "/" + d) for f in files: fileList.append(root_tmp + "/" + f) if flag_recursive is False: break directory_dict = collections.OrderedDict() if self.__repository_directory in self.__directory_dict: directory_dict[self.__repository_directory] = self.__directory_dict[self.__repository_directory] if flag_recursive: for d in directoryList: if d in self.__directory_dict: directory_dict[d] = self.__directory_dict[d] directoryDeletedList = [] dList = directory_dict.keys() while True: vListAdd = [] for d in dList: if "deleted_directory_list" in self.__directory_dict[d]: dList2 = self.__directory_dict[d]["deleted_directory_list"] directoryDeletedList.extend(dList2) vListAdd.extend(dList2) dList = vListAdd if len(vListAdd) == 0: break if flag_recursive is False: break file_dict_ref = {} for d in directory_dict: if "file_dict" in directory_dict[d]: file_dict_ref.update(directory_dict[d]["file_dict"]) if flag_recursive: for d in directoryDeletedList: if "file_dict" in self.__directory_dict[d]: file_dict_ref.update(self.__directory_dict[d]["file_dict"]) fileDeletedList = [] for f in file_dict_ref: if file_dict_ref[f]["status"] == "deleted": fileDeletedList.append(f) vList = [] if (type_val != "file") and (len(directoryList) > 0): for d in directoryList: sub_directory_dict = self.__directory_dict.get(d, {}) status = sub_directory_dict.get("status", "undefined") if (status_val not in [None, "revised"] and status != status_val) or \ (status_val == "revised" and status == "normal"): continue register_name = sub_directory_dict.get("register_name", None) if len(register_name_select_list) != 0 and register_name not in register_name_select_list: continue v = collections.OrderedDict() v["name"] = util.reference_path( d, self.__repository_directory, flag_full_path) v["status"] = status v["type"] = "directory" v["register_name"] = register_name vList.append(v) if (type_val != "file") and (len(directoryDeletedList) > 0): for d in directoryDeletedList: sub_directory_dict = self.__directory_dict.get(d, {}) status = "deleted" if (status_val not in [None, "revised"] and status != status_val) or \ (status_val == "revised" and status == "normal"): continue register_name = sub_directory_dict.get("register_name", None) if len(register_name_select_list) != 0 and register_name not in register_name_select_list: continue v = collections.OrderedDict() v["name"] = util.reference_path( d, self.__repository_directory, flag_full_path) v["status"] = status v["type"] = "directory" v["register_name"] = register_name vList.append(v) if (type_val != "directory") and (len(fileList) > 0): for f in fileList: status = "undefined" register_name = None if f in file_dict_ref: fdict = file_dict_ref[f] status = fdict["status"] register_name = fdict.get("register_name", None) if (status_val not in [None, "revised"] and status != status_val) or \ (status_val == "revised" and status == "normal"): continue if len(register_name_select_list) != 0 and register_name not in register_name_select_list: continue v = collections.OrderedDict() v["name"] = util.reference_path( f, self.__repository_directory, flag_full_path) v["status"] = status v["type"] = "file" v["register_name"] = register_name vList.append(v) if (type_val != "directory") and (len(fileDeletedList) > 0): for f in fileDeletedList: fdict = file_dict_ref[f] status = "deleted" if (status_val not in [None, "revised"] and status != status_val) or \ (status_val == "revised" and status == "normal"): continue register_name = None if "register_name" in fdict: register_name = fdict["register_name"] if len(register_name_select_list) != 0 and register_name not in register_name_select_list: continue v = collections.OrderedDict() v["name"] = util.reference_path( f, self.__repository_directory, flag_full_path) v["status"] = status v["type"] = "file" v["register_name"] = register_name vList.append(v) # ... construct dict for output vDict = collections.OrderedDict() vDict["repository_directory"] = self.__repository_directory if register_name_select_list is not None: vDict["register_name_select_list"] = register_name_select_list vDict["mode"] = mode if status_val is not None: vDict["status"] = status_val if type_val is not None: vDict["type"] = type_val vDict["full_path"] = flag_full_path vDict["recursive"] = flag_recursive file_out_list = [] if (file_list is None) or (len(file_list) == 0): vList_sel = vList vList_sel = sorted(vList_sel, key=lambda x: x["name"]) file_out_list = vList_sel else: vDict_each = None count_all = len(file_list) count = 0 for v in vList: if v["name"] in file_list: vDict_each = v file_out_list.append(vDict_each) count += 1 if count == count_all: break if mode in ["file", "all"]: vDict["file_list"] = file_out_list if mode in ["register_name", "register_name_only", "all"]: register_name_dict = {} for f in file_out_list: register_name = f["register_name"] if register_name is None: continue status = "normal" if register_name in register_name_dict: status = register_name_dict[register_name]["status"] else: register_name_dict[register_name] = collections.OrderedDict( ) register_name_dict[register_name]["name"] = register_name register_name_dict[register_name]["status"] = status if mode != "register_name_only": register_name_dict[register_name]["file_list"] = [] if f["status"] != "normal": status = "revised" register_name_dict[register_name]["status"] = status if mode != "register_name_only": register_name_dict[register_name]["file_list"].append(f) register_name_list = [] keyList = list(register_name_dict.keys()) keyList.sort() for key in keyList: register_name_list.append(register_name_dict[key]) vDict["register_name_list"] = register_name_list return vDict
[docs] def restore(self, register_basename, disable_hash=False): register_name_select_list = util.register_name_list([register_basename], self.__repository_directory) register_name_undefined_list = [] for reg in register_name_select_list: if reg not in self.__register_name_dict: register_name_undefined_list.append(reg) if len(register_name_undefined_list) > 0: message = "undefined register_name_list for %s" % ( str(register_name_undefined_list)) raise util.Error(message, domain=util.error_domain( __file__, sys._getframe(), self.__class__)) if len(register_name_select_list) == 0: message = "no input for register_name_list" raise util.Error(message, domain=util.error_domain( __file__, sys._getframe(), self.__class__)) repo = rest.Repository() repo.authorize(self.__access_token, debug=False) v = {} v["register_name_list"] = register_name_select_list v["flag_own"] = 1 ret_dict = repo.data.files.post(**v) register_name = register_name_select_list[0] register_name_dir = os.path.dirname(register_name) out_dir = "%s%s" % (self.__local_storage_path, register_name_dir[len(self.__repository_top_directory):]) file_list = ret_dict.get("file_list",[]) file_list_restored = [] for f in file_list: v = {} v["file"] = f["name"] v["flag_subdirectory"] =1 ret_dict = repo.download.file.post( v,out_directory=out_dir, flag_path=False, flag_subdirectory=True, disable_hash=disable_hash) if ret_dict is not None: util.log("==> response") util_client.out_json(ret_dict) file_list_restored.append(f) print("==> restore completed for register_name={}".format(register_name)) ret = {} ret["register_name"] = register_name ret["file_list_restored"] = file_list_restored return ret