Source code for benten_client.local_storage

# coding:utf-8
"""
 local_storage

 Copyright (C) 2020 JASRI All Rights Reserved.
"""

import os
import sys
import re
import shutil
import copy
import fnmatch

import collections

from os.path import expanduser

from . import util
from . import config
from . import local_storage_util
from . import rest

[docs]class LocalStorage(): def __init__(self, access_token=None, current_directory=os.getcwd(), debug=True): util.check_workdir() self.__access_token = access_token self.__current_directory = current_directory self.__local_storage_config_file = \ "%s/%s" % (util.workdir(), config.local_storage_config_file) self.__proposal_number_file = \ "%s/%s" % (util.workdir(), config.proposal_number_file) self.__repository_info_file = \ "%s/%s" % (util.workdir(), config.repository_info_file) self.__local_storage_info_file = \ "%s/%s" % (util.workdir(), config.local_storage_info_file) self.__config_value = self.config_value() try: self.__repository_directory = \ util.repository_directory(self.__config_value, current_directory=self.__current_directory, flag_print=False) except: self.__repository_directory = None self.__debug = debug # ... methods
[docs] def init(self): """ Init for configuration settings :return: """ util.log("### LocalStorage::init() ###") # check BENTEN_WORKDIR benten_workdir = os.getenv( "BENTEN_WORKDIR", "{}/.benten".format(expanduser("~"))) util.log("BENTEN_WORKDIR = {}".format(benten_workdir)) self.authorize(util.access_token()) parent_basename = os.path.basename(os.getcwd()) # check userinfo repo = rest.Repository() repo.authorize(util.access_token()) ret_dict = repo.auth.userinfo.post() if "error" in ret_dict: message = "Unauthorized" raise util.Error(message, domain=util.error_domain( __file__, sys._getframe(), self.__class__)) profile = ret_dict["profile"] vdict = {} vdict["facility"] = ret_dict["facility"] vdict["class_name"] = ret_dict["class_name"] vdict["disk_name"] = parent_basename # check configFile vdict["local_storage_path"] = os.getcwd() if os.name == "nt": vdict["local_storage_path"] = os.getcwd().replace("\\", "/") util.log("### Initialize configurations ###") try: vinput = raw_input( "local_storage_path[default=%s]: " % vdict["local_storage_path"]) except: vinput = input( "local_storage_path[default=%s]: " % vdict["local_storage_path"]) if vinput != "": vdict["local_storage_path"] = vinput try: vinput = raw_input("disk_name[default=%s]: " % vdict["disk_name"]) except: vinput = input("disk_name[default=%s]: " % vdict["disk_name"]) if vinput != "": vdict["disk_name"] = vinput util.log("") util.log("### configurations parameters ###") util.log("local_storage_path = {}".format(vdict["local_storage_path"])) util.log("facility = {}".format(vdict["facility"])) util.log("class_name = {}".format(vdict["class_name"])) util.log("disk_name = {}".format(vdict["disk_name"])) util.log("") try: vinput = raw_input( ">> please type 'y' or 'yes' to set parameters :") except: vinput = input(">> please type 'y' or 'yes' to set parameters :") if vinput not in ["y", "yes"]: util.log("==> configuration parameters were NOT updated") return debug_tmp = self.__debug self.__debug = False for key in ["local_storage_path", "facility", "class_name", "disk_name"]: v = {} v["key"] = key v["value"] = vdict[key] self.set_config(**v) util.log("==> configuration parameters were updated") self.__debug = debug_tmp
# ... methods
[docs] def init_index(self, **v): """ Init for local storage settings :param \**v: See below. :Keyword Arguments: * *register_basename* (``str``) : 登録ベース名 * *debug* (``bool`` [Option]) : Debugオプション  (デフォルトは False) * *flag_all* (``bool`` [Option]): 全初期化するかどうかのフラグ (デフォルトはFalse) :return: """ debug = self.__debug if "debug" in v: debug = v["debug"] register_basename = None if "register_basename" in v: register_basename = v["register_basename"] util.check_access_token(self.__access_token) flag_all = v.get("flag_all", False) repository_directory = util.repository_directory(self.__config_value, self.__current_directory, flag_all) repositoryInfoFile = local_storage_util.RepositoryInfoFile(self.__repository_info_file, debug=debug) repositoryInfoFile.output(self.__access_token, repository_directory, register_basename) localStorageInfoFile = local_storage_util.LocalStorageInfoFile(self.__local_storage_info_file, debug=debug) localStorageInfoFile.output(self.__config_value, repository_directory, repositoryInfoFile.load(), register_basename)
[docs] def create_meta_system(self, **v): """ Create metadata for system :param \**v: See below. :Keyword Arguments: * *register_basename* (``str``) : 登録ベース名 :return: A dict of mapping keys. """ register_basename = v.get("register_basename", None) flag_replace = v.get("flag_replace", False) current_directory = self.__current_directory fname_base = "{}/{}.{}".format(current_directory, register_basename, "json") fname_system = "{}/{}.system.{}".format(current_directory, register_basename, "json") vdict_system = {} if os.path.exists(fname_system): vdict_system = util.load_json(fname_system) # ... create template if there are no hbase json file if os.path.exists(fname_base) is False: proposal_number = self.proposal_number() if proposal_number is not None: message = "**Warning** No metadata file for {}\n".format(fname_base) message += " template file is created with proposal_number={}".format(proposal_number) util.log(message) vdict = collections.OrderedDict() vdict[config.metadata_key_proposal_number] = proposal_number util.out_json(vdict, filename=fname_base) vdict, count_contact_name = self.metadata_from_file(register_basename, flag_dataset=False, flag_count_contact_name=True) # ... check if proposal_number defined proposal_number = vdict.get(config.metadata_key_proposal_number) if proposal_number is None: proposal_number = vdict.get(config.metadata_key_proposal_number_alternative) if proposal_number: vdict[config.metadata_key_proposal_number] = proposal_number if proposal_number is None: message = "undefined proposal_number" raise util.Error(message, domain=util.error_domain( __file__, sys._getframe(), self.__class__)) # ... get metadata for auto vdict_input = collections.OrderedDict() if os.path.exists(fname_system): try: vdict_input = util.load_json(fname_system) except: message = "illegal data format for %s" % fname_system raise util.Error(message, domain=util.error_domain( __file__, sys._getframe(), self.__class__)) # ... get metadata for others (remove metadata for auto) for k in vdict_input: if k in vdict: del vdict[k] # ... get metadata for output vdict_out = self.metadata_system( vdict_input, register_basename, proposal_number=proposal_number, data_input = vdict, count_contact_name=count_contact_name) flag_updated = False if vdict_out != vdict_system: flag_updated = True util.log("==> output system metadata into {}".format(fname_system)) util.out_json(vdict_out, debug=False, filename=fname_system) v_ret = collections.OrderedDict() v_ret["execute_create_meta_system"] = flag_updated if flag_updated: v_ret["metadata"] = vdict_out return v_ret
[docs] def list(self, **v): """ List for local storage settings :param \**v: See below. :Keyword Arguments: * *directory* (``str`` [Option]) : 相対ディレクトリ * デフォルトでは current_directory をベースにリスト検索を行う。 * *file_list* (``list(str)`` [Option]) : リスト表示を行うファイルリスト * *register_basename_list* (``list(str)`` [Option]) : リスト表示で指定する登録ベース名リスト * *mode* (``str`` [Option]) : リスト表示モード * file, register_name, register_name_only, all から選択する。 * デフォルトでは all が選択される。 * *status* (``str`` [Option]) : リスト表示において指定するステータス * normal, updated, deleted, noMetadata, ready, revised から選択する。 * *type* (``str`` [Option]) : リスト表示において指定するファイルのタイプ * file, directory から選択する。 * *flag_full_path* (``bool`` [Option]): 結果でfull path表示を行うかどうかのフラグ * デフォルトでは False * *flag_recursive* (``bool`` [Option]): recursive検索するかどううかのフラグ * デフォルトではFalse :return: A dict of mapping keys. """ directory = v.get("directory", None) file_list = v.get("file_list", []) register_basename_list = [] if "register_basename_list" in v: register_basename_list = v["register_basename_list"] mode = v.get("mode", None) status_val = v.get("status", None) type_val = v.get("type", None) flag_full_path = v.get("flag_full_path", False) flag_recursive = v.get("flag_recursive", False) repository_directory = self.__repository_directory if directory is not None: repository_directory += "/" + directory manage = self.Manage(repository_directory=repository_directory) vDict = manage.infoDict(file_list=file_list, register_basename_list=register_basename_list, mode=mode, status_val=status_val, type_val=type_val, flag_full_path=flag_full_path, flag_recursive=flag_recursive) return vDict
[docs] def upload(self, **v): """ Upload files :param \**v: See below. :Keyword Arguments: * *register_basename* (``str``) : 登録ベース名 * *flag_not_upload* (``bool`` [Option]) : 指定ファイルをアップロードするかどうかのフラグ * デフォルト値は False * *flag_each* (``bool`` [Option]) : 指定ファイルをeach アップロードするかどうかのフラグ * デフォルト値は False (複数ファイルがあった場合にはallアップロードを行う。) :return: A dict of mapping keys. """ register_basename = v.get("register_basename", None) flag_not_upload = v.get("flag_not_upload", False) flag_each = v.get("flag_each", False) repo = rest.Repository() repo.authorize(self.__access_token, debug=False) register_name = self.register_name(register_basename) # check metadata in file and db file_file_dict = self.metadata_file_from_file(register_basename) file_db_dict = self.metadata_file_from_db(register_basename) file_metadata_updated_list = [] for f in file_file_dict: if f not in file_db_dict: continue file_db_dict_ref = copy.copy(file_db_dict[f]) if "time" in file_db_dict_ref: if "time" not in file_file_dict[f]: del file_db_dict_ref["time"] if file_file_dict[f] != file_db_dict_ref: file_metadata_updated_list.append(f) manage = self.Manage() vDict = manage.infoDict(register_basename_list=[register_basename]) info_file_list = vDict["file_list"] file_list = [] if len(info_file_list) > 0: for v in info_file_list: fname = v["name"] fname_repository = "%s/%s" % ( self.__repository_directory, fname) if (v["type"] == "file" and v["status"] in ["ready", "updated"]) or \ (fname_repository in file_metadata_updated_list): file_list.append(fname_repository) else: # this is necessary if index is not ready just after db creation file_list = list(file_file_dict.keys()) file_list.sort() key_list = ["format", "caption", "time"] file_dict = collections.OrderedDict() for f in file_list: file_dict[f] = collections.OrderedDict() if f in file_file_dict: for key in file_file_dict[f]: if key in key_list: file_dict[f][key] = file_file_dict[f][key] ret_dict = collections.OrderedDict() error_list = [] file_list = [] if flag_not_upload in [False, 0]: repo.set_local_storage_path(self.__config_value.local_storage_path) repo.set_facility(self.__config_value.facility) repo.set_class_name(self.__config_value.class_name) repo.set_disk_name(self.__config_value.disk_name) if flag_each is False: v = {} v["register_name"] = register_name v["files_encode"] = util.encode_json(file_dict) ret_dict = repo.register.files.post(**v) if "file_list" in ret_dict: file_list.extend(ret_dict["file_list"]) if "error" in ret_dict: error_list.extend(ret_dict["error"]["errors"]) else: for f in file_dict: file_each_dict = {} file_each_dict[f] = file_dict[f] v = {} v["register_name"] = register_name v["files_encode"] = util.encode_json(file_each_dict) ret_dict = repo.register.files.post(**v) if "file_list" in ret_dict: file_list.extend(ret_dict["file_list"]) if "error" in ret_dict: error_list.extend(ret_dict["error"]["errors"]) ret_dict["flag_not_upload"] = flag_not_upload ret_dict["flag_each"] = flag_each ret_dict["file_list"] = file_list if len(error_list) > 0: if "error" in ret_dict: ret_dict["error"]["errors"].extend(error_list) else: ret_dict["error"] = {} ret_dict["error"]["message"] = error_list[-1]["message"] ret_dict["error"]["errors"] = error_list else: file_list = [] for f in file_dict: v = {} v["name"] = f v.update(file_dict[f]) file_list.append(v) ret_dict = {} ret_dict["flag_not_upload"] = flag_not_upload ret_dict["flag_each"] = flag_each ret_dict["file_list"] = file_list return ret_dict
[docs] def upload_with_ftp(self, **v): """ Upload files :param \**v: See below. :Keyword Arguments: * *register_basename* (``str``) : 登録ベース名 * *flag_not_upload* (``bool`` [Option]) : 指定ファイルをアップロードするかどうかのフラグ * デフォルト値は False * *flag_each* (``bool`` [Option]) : 指定ファイルをeach アップロードするかどうかのフラグ * デフォルト値は False (複数ファイルがあった場合にはallアップロードを行う。) :return: A dict of mapping keys. """ register_basename = v.get("register_basename", None) flag_not_upload = v.get("flag_not_upload", False) flag_each = v.get("flag_each", False) repo = rest.Repository() repo.authorize(self.__access_token, debug=False) register_name = self.register_name(register_basename) # check metadata in file and db file_file_dict = self.metadata_file_from_file(register_basename) file_db_dict = self.metadata_file_from_db(register_basename) file_metadata_updated_list = [] for f in file_file_dict: if f not in file_db_dict: continue file_db_dict_ref = copy.copy(file_db_dict[f]) if "time" in file_db_dict_ref: if "time" not in file_file_dict[f]: del file_db_dict_ref["time"] if file_file_dict[f] != file_db_dict_ref: file_metadata_updated_list.append(f) manage = self.Manage() vDict = manage.infoDict(register_basename_list=[register_basename]) info_file_list = vDict["file_list"] file_list = [] if len(info_file_list) > 0: for v in info_file_list: fname = v["name"] fname_repository = "%s/%s" % ( self.__repository_directory, fname) if (v["type"] == "file" and v["status"] in ["ready", "updated"]) or \ (fname_repository in file_metadata_updated_list): file_list.append(fname_repository) else: # this is necessary if index is not ready just after db creation file_list = list(file_file_dict.keys()) file_list.sort() key_list = ["format", "caption", "time"] file_dict = collections.OrderedDict() for f in file_list: file_dict[f] = collections.OrderedDict() if f in file_file_dict: for key in file_file_dict[f]: if key in key_list: file_dict[f][key] = file_file_dict[f][key] ret_dict = collections.OrderedDict() error_list = [] file_list = [] if flag_not_upload in [False, 0]: repo.set_local_storage_path(self.__config_value.local_storage_path) repo.set_facility(self.__config_value.facility) repo.set_class_name(self.__config_value.class_name) repo.set_disk_name(self.__config_value.disk_name) if flag_each is False: v = {} v["register_name"] = register_name v["files_encode"] = util.encode_json(file_dict) ret_dict = repo.register.files.ftp(**v) if "file_list" in ret_dict: file_list.extend(ret_dict["file_list"]) if "error" in ret_dict: error_list.extend(ret_dict["error"]["errors"]) else: for f in file_dict: file_each_dict = {} file_each_dict[f] = file_dict[f] v = {} v["register_name"] = register_name v["files_encode"] = util.encode_json(file_each_dict) ret_dict = repo.register.files.post(**v) if "file_list" in ret_dict: file_list.extend(ret_dict["file_list"]) if "error" in ret_dict: error_list.extend(ret_dict["error"]["errors"]) ret_dict["flag_not_upload"] = flag_not_upload ret_dict["flag_each"] = flag_each ret_dict["file_list"] = file_list if len(error_list) > 0: if "error" in ret_dict: ret_dict["error"]["errors"].extend(error_list) else: ret_dict["error"] = {} ret_dict["error"]["message"] = error_list[-1]["message"] ret_dict["error"]["errors"] = error_list else: file_list = [] for f in file_dict: v = {} v["name"] = f v.update(file_dict[f]) file_list.append(v) ret_dict = {} ret_dict["flag_not_upload"] = flag_not_upload ret_dict["flag_each"] = flag_each ret_dict["file_list"] = file_list return ret_dict
[docs] def reset_upload(self, **v): """ reset uploaded data for registration :param \**v: See below. :Keyword Arguments: * *register_basename* (``str``) : 登録ベース名 :return: A dict of mapping keys. """ register_basename = v.get("register_basename", None) repo = rest.Repository() repo.authorize(self.__access_token, debug=False) # ... get register_name from register_basename register_name = self.register_name(register_basename) v = {} v["register_name"] = register_name ret_dict = repo.register.reset.post(**v) ret_dict["register_name"] = register_name return ret_dict
[docs] def register(self, **v): """ Register metadata :param \**v: See below. :Keyword Arguments: * *register_basename* (``str``) : 登録ベース名 :return: A dict of mapping keys. """ register_basename = v.get("register_basename", None) repo = rest.Repository() repo.authorize(self.__access_token, debug=False) # ... get register_name from register_basename register_name = self.register_name(register_basename) file_list = [] manage = self.Manage() flag_new = True # ... check register_name is already registered or not vDict = manage.infoDict() info_file_list = vDict["file_list"] for f in info_file_list: if f["register_name"] == register_name: flag_new = False break # ... get metadata from file (file list is excluded) metadata_dict = self.metadata_from_file(register_basename) if config.metadata_key_directory_list in metadata_dict: del metadata_dict[config.metadata_key_directory_list] v = {} v["mode"] = "replace" if flag_new is False: v["mode"] = "update" v["metadata_encode"] = util.encode_json(metadata_dict) ret_dict = repo.register.metadata.post(**v) ret_dict["mode"] = v["mode"] ret_dict["register_name"] = register_name return ret_dict
[docs] def restore(self, **v): """ Restore data :param \**v: See below. * *register_basename* (``str``) : 登録ベース名 * *disable_hash* (``int`` [Option]) : disable hash check for downloaded file :return: A dict of mapping keys. """ vdict = collections.OrderedDict() register_basename = v.get("register_basename", None) disable_hash = v.get("disable_hash", False) manage = self.Manage() v = {} v["register_basename"] = register_basename vdict = manage.restore(register_basename, disable_hash=disable_hash) return vdict
[docs] def metadata(self, **v): """ Get metadata :param \**v: See below. :Keyword Arguments: * *register_basename* (``str``) : 登録ベース名 * *mode* (``str`` [Option]) : モード * db、またはfile から選択する。 * デフォルト値は db :return: A dict of mapping keys. """ register_basename = v.get("register_basename", None) mode = v.get("mode", "db") pattern = v.get("pattern", "all") register_name = self.register_name(register_basename) repo = rest.Repository() repo.authorize(self.__access_token, debug=False) v = {} v["register_name"] = register_name v["pattern"] = pattern ret_dict = collections.OrderedDict() ret_dict["mode"] = mode ret_dict["metadata"] = {} if mode == "db": v_dict = repo.data.metadata.post(**v) if ("metadata_list" in v_dict) and len(v_dict["metadata_list"]) > 0: ret_dict["metadata"] = v_dict["metadata_list"][0] if "error" in v_dict: ret_dict["error"] = v_dict["error"] elif mode == "file": metadata_dict, count_contact_name = self.metadata_from_file(register_basename, flag_count_contact_name=True) ret_dict["metadata"] = metadata_dict ret_dict["create_meta_system"] = True if config.metadata_key_version_data_register not in metadata_dict: ret_dict["create_meta_system"] = False elif metadata_dict[config.metadata_key_version_data_register] != \ config.parameter[config.metadata_key_version_data_register]: ret_dict["create_meta_system"] = False elif config.metadata_key_register_name in metadata_dict and \ metadata_dict[config.metadata_key_register_name] != register_name: ret_dict["create_meta_system"] = False elif count_contact_name: ret_dict["create_meta_system"] = False return ret_dict
[docs] def remove(self, **v): """ Remove data :param \**v: See below. :Keyword Arguments: * *register_basename* (``str``) : 登録ベース名 :return: A dict of mapping keys. """ register_basename = v.get("register_basename", None) manage = self.Manage() vDict = manage.infoDict(register_basename_list=[register_basename]) info_file_list = vDict["file_list"] file_list = [] local_storage_directory_list = [] for v in info_file_list: if v["type"] == "file" and v["status"] == "deleted": file_list.append("%s/%s" % (self.__repository_directory, v["name"])) elif v["type"] == "directory" and v["status"] == "noMetadata": local_storage_directory_list.append( "%s/%s" % (self.__repository_directory, v["name"])) ret_dict = collections.OrderedDict() if True: repo = rest.Repository() repo.authorize(self.__access_token, debug=False) v = {} v["file_list"] = file_list ret_dict_tmp = repo.delete.post(**v) local_storage_directory_removed_list = [] for d in local_storage_directory_list: d_storage = d[len(self.__repository_directory) + 1:] try: local_storage_directory_removed_list.append(d) os.rmdir(d_storage) except: pass ret_dict["directory_local_storage_removed_list"] = local_storage_directory_removed_list for key in ret_dict_tmp: if key != "register_name_list": ret_dict[key] = ret_dict_tmp[key] return ret_dict
[docs] def delete(self, **v): """ Delete Data :param \**v: See below. :Keyword Arguments: * *register_basename* (``str``) : 登録ベース名 * *flag_delete_local_storage_files* (``bool`` [Option]) : local storage ファイル deleteのフラグ * デフォルト値は False :return: A dict of mapping keys. """ register_basename = v.get("register_basename", None) flag_delete_local_storage_files = v.get( "flag_delete_local_storage_files", False) register_name = self.register_name(register_basename) manage = self.Manage() vDict = manage.infoDict(register_basename_list=[register_basename]) file_list = [] info_file_list = vDict["file_list"] for f in info_file_list: if f["type"] != "file": continue v = {} v["name"] = f["name"] file_list.append(v) repo = rest.Repository() repo.authorize(self.__access_token, debug=False) v = {} if register_name is not None: v["register_name_list"] = [register_name] ret_dict = collections.OrderedDict() if True: if flag_delete_local_storage_files: for f in info_file_list: if f["type"] != "file": continue os.remove(f["name"]) for f in info_file_list: if f["type"] != "directory": continue try: shutil.rmtree(f["name"]) except: pass ret_dict = repo.delete.post(**v) ret_dict["flag_delete_local_storage_files"] = flag_delete_local_storage_files ret_dict["register_name"] = register_name if "register_name_list" in ret_dict: del ret_dict["register_name_list"] return ret_dict
# ... utilities
[docs] def Manage(self, local_storage_info_file=None, config_value=None, repository_directory=None, access_token=None): if local_storage_info_file is None: local_storage_info_file = self.__local_storage_info_file if config_value is None: config_value = self.__config_value if repository_directory is None: repository_directory = self.__repository_directory if access_token is None: access_token = self.__access_token return local_storage_util.Manage(local_storage_info_file=local_storage_info_file, config_value=config_value, repository_directory=repository_directory, access_token=access_token, debug=self.__debug)
[docs] def debug(self): return self.__debug
[docs] def access_token(self): return self.__access_token
[docs] def config_value(self): configFile = local_storage_util.ConfigFile( self.__local_storage_config_file) config_val = configFile.load() return config_val
[docs] def register_name(self, register_basename): return util.register_name(register_basename, self.__repository_directory)
[docs] def authorize(self, v, debug=None): if debug is None: debug = True else: debug = self.__debug self.set_access_token(v) if debug: util.log("[LocalStorage::authorize] access_token = {}".format(v))
[docs] def list_config(self): """ List ConfigFile for local storage settings :return: """ configFile = local_storage_util.ConfigFile( self.__local_storage_config_file) configFile.list()
[docs] def set_config(self, **v): """ Set ConfigFile for local storage settings :param \**v: See below. :Keyword Arguments: * *key* (``str``) : キー名 * *value* (``str``): 値 :return: """ key = v.get("key", None) value = v.get("value", None) configFile = local_storage_util.ConfigFile( self.__local_storage_config_file) configFile.set(key, value, debug=self.__debug)
[docs] def reset_config(self): """ Reset ConfigFile for local storage settings :return: """ configFile = local_storage_util.ConfigFile( self.__local_storage_config_file) configFile.reset()
[docs] def proposal_number(self): proposalNumberFile = local_storage_util.ProposalNumberFile( self.__proposal_number_file) return proposalNumberFile.proposal_number()
[docs] def list_proposal_number(self): """ List proposal_number :return: """ proposalNumberFile = local_storage_util.ProposalNumberFile( self.__proposal_number_file) proposalNumberFile.list()
[docs] def set_proposal_number(self, **v): """ Set proposal_number :param \**v: See below. :Keyword Arguments: * *proposal_number* (``str``) : 課題情報番号 :return: """ value = v.get("proposal_number", None) repo = rest.Repository() repo.authorize(self.__access_token, debug=False) ret_dict = repo.proposal.post(**v) vlist = [] if "list" in ret_dict: vlist = ret_dict["list"] if value not in vlist: message = "proposal_number(=%s) is not valid" % value raise util.Error(message, domain=util.error_domain( __file__, sys._getframe(), self.__class__)) proposalNumberFile = local_storage_util.ProposalNumberFile( self.__proposal_number_file, debug=self.__debug) proposalNumberFile.set_proposal_number(value)
[docs] def clear_proposal_number(self): """ Clear proposal_number :return: """ proposalNumberFile = local_storage_util.ProposalNumberFile( self.__proposal_number_file) proposalNumberFile.clear()
[docs] def set_access_token(self, v): self.__access_token = v
[docs] def set_current_directory(self, v): self.__current_directory = v self.__repository_directory = util.repository_directory(self.__config_value, current_directory=self.__current_directory)
[docs] def set_debug(self, v): self.__debug = v
[docs] def metadata_from_file(self, register_basename, flag_dataset=True, flag_count_contact_name=False): manage = self.Manage() metadata_basename = "%s.json" % register_basename flag_metadata_basename = False file_list = [] vDict = manage.infoDict() register_basename_escape = re.escape(register_basename) info_file_list = vDict["file_list"] for f in info_file_list: if f["type"] != "file": continue if f["status"] == "deleted": continue fname = f["name"] str_match = "^%s\.(|[^/]+\.)%s$" % (register_basename_escape, "json") m = re.match(str_match, fname) if m: if fname == metadata_basename: flag_metadata_basename = True else: file_list.append(fname) file_list.sort() if flag_metadata_basename: file_list.insert(0, metadata_basename) metadata_dict = {} count_contact_name = 0 fname_system = "{}/{}.system.json".format(self.__current_directory, register_basename) for f in file_list: fname = "{}/{}".format(self.__current_directory, f) v = util.load_json(fname) if (fname != fname_system) and (config.metadata_key_contact_name in v): count_contact_name += 1 metadata_dict.update(v) register_name = "%s/%s" % (self.__repository_directory, register_basename) facility = self.__config_value.facility class_name = self.__config_value.class_name disk_name = self.__config_value.disk_name if config.metadata_key_register_name not in metadata_dict: metadata_dict[config.metadata_key_register_name] = register_name if config.metadata_key_facility not in metadata_dict: metadata_dict[config.metadata_key_facility] = facility metadata_dict[config.metadata_key_class_name] = class_name metadata_dict[config.metadata_key_disk_name] = disk_name # ... add measurement_directory_list if flag_dataset: vdict = self.metadata_files(register_basename) metadata_dict.update(vdict) if flag_count_contact_name is False: return metadata_dict else: return metadata_dict, count_contact_name
[docs] def metadata_file_from_file(self, register_basename): file_file_dict = {} metadata_dict = self.metadata_files(register_basename) key_list = ["format", "caption", "time"] if config.metadata_key_directory_list in metadata_dict: dlist = metadata_dict[config.metadata_key_directory_list] for d in dlist: flist = [] if "file_list" in d: flist = d["file_list"] for f in flist: if "name" not in f: continue fname = f["name"] file_file_dict[fname] = {} for key in key_list: if key in f: file_file_dict[fname][key] = f[key] return file_file_dict
[docs] def metadata_file_from_db(self, register_basename): file_db_dict = {} key_list = ["format", "caption", "time"] register_name = self.register_name(register_basename) repo = rest.Repository() repo.authorize(self.__access_token, debug=False) v = {} v["register_name"] = register_name v_dict = repo.data.metadata.post(**v) try: vlist_ref = v_dict["metadata_list"][0][config.metadata_key_directory_list] except: vlist_ref = [] for v in vlist_ref: if "file_list" not in v: continue flist = v["file_list"] for f in flist: fname = f["name"] file_db_dict[fname] = {} for key in key_list: if key in f: file_db_dict[fname][key] = f[key] return file_db_dict
[docs] def metadata_system(self, vdict, register_basename, proposal_number=None, data_input = {}, count_contact_name=0): vdict_input = copy.copy(vdict) if proposal_number is None: proposal_number = self.proposal_number() parameter = config.parameter register_name = "%s/%s" % (self.__repository_directory, register_basename) vdict[config.metadata_key_register_name] = register_name vdict[config.metadata_key_facility] = self.__config_value.facility vdict[config.metadata_key_class_name] = self.__config_value.class_name vdict[config.metadata_key_disk_name] = self.__config_value.disk_name if (config.metadata_key_proposal_number not in vdict) and \ (config.metadata_key_proposal_number not in data_input): vdict[config.metadata_key_proposal_number] = proposal_number repo = rest.Repository() repo.authorize(self.__access_token, debug=False) v_in = {} v_in["proposal_number"] = proposal_number ret_dict = repo.proposal.metadata.post(**v_in) metadata = ret_dict.get("metadata", {}) found = ret_dict.get("found") if found is False: message = "no metadata" if proposal_number is not None: message += " for proposal_number=%s" % proposal_number raise util.Error(message, domain=util.error_domain( __file__, sys._getframe(), self.__class__)) for key in config.proposal_info_key_list: if key in metadata: vdict[key] = metadata[key] # ... not output contact_name if external data exists if count_contact_name == 0: for key in metadata: if key.startswith(config.metadata_key_contact_name_base): vdict[key] = metadata[key] else: for key in vdict: if key.startswith(config.metadata_key_contact_name_base): del vdict[key] # ... set system parameter (force registered) for key in parameter: vdict[key] = parameter[key] # ... remove metadata key for subject_base key_list = list(vdict.keys()) for key in key_list: if key.startswith(config.metadata_key_subject_base): del vdict[key] return vdict
[docs] def metadata_files(self, register_basename): register_basename_escape = re.escape(register_basename) current_directory = self.__current_directory if os.name == "nt": current_directory = current_directory.replace("\\", "/") # ... extract file_caption_list , file_format_list fname_base = "%s/%s.%s" % (current_directory, register_basename, "json") vdict = collections.OrderedDict() if os.path.exists(fname_base): try: vdict = util.load_json(fname_base) except: message = "illegal data format for %s" % fname_base raise util.Error(message, domain=util.error_domain( __file__, sys._getframe(), self.__class__)) file_caption_list = vdict.get(config.metadata_key_file_caption_list, []) file_format_list = vdict.get(config.metadata_key_file_format_list, []) # ... register_name = self.register_name(register_basename) parent_directory = os.path.dirname(register_name) flist = [] files = os.listdir(current_directory) # .. list files beloing to register_basename for f in files: fname = current_directory + "/" + f if os.path.isfile(fname): # str_match = "^%s\.(|[^/]+\.)[^/]+$" % (register_basename) str_match = "^%s\.(|[^/]+\.)[^/]+$" % (register_basename_escape) m = re.match(str_match, f) if m: fname = re.sub("^" + current_directory, self.__repository_directory, fname) flist.append(fname) search_directory = current_directory + "/" + register_basename for root, dirs, files in os.walk(search_directory): for f in files: fname = root + "/" + f fname = re.sub("^" + current_directory, self.__repository_directory, fname) flist.append(fname) flist = list(set(flist)) flist.sort() caption_dict = {} format_dict = {} if True: for f in flist: for feach in file_caption_list: fmatch = parent_directory + "/" + feach[0] fcaption = feach[1] if fnmatch.fnmatch(f, fmatch): caption_dict[f] = fcaption else: benten_client.log("*ERR* invalid format in dataset_file_caption_list") try: for f in flist: for ftest in file_format_list: fmatch = parent_directory + "/" + ftest[0] fformat = ftest[1] if fnmatch.fnmatch(f, fmatch): format_dict[f] = fformat except: benten_client.log("*ERR* invalid format in dataset_file_format_list") d_dict = collections.OrderedDict() for f in flist: parent_directory = os.path.dirname(f) if parent_directory not in d_dict: d_dict[parent_directory] = [] d_dict[parent_directory].append(f) o_dict = {} o_dict[config.metadata_key_directory_list] = [] for d in d_dict: v = collections.OrderedDict() v["name"] = d v["file_list"] = [] for f in d_dict[d]: w = collections.OrderedDict() w["name"] = f if f in caption_dict: w["caption"] = caption_dict[f] if f in format_dict: w["format"] = format_dict[f] v["file_list"].append(w) o_dict[config.metadata_key_directory_list].append(v) return o_dict