import json

#from requests_toolbelt import MultipartEncoder
from requests_toolbelt.multipart.encoder import MultipartEncoderMonitor

import requests
import os
import codecs
import io
import uuid
import socket
import time

    from StringIO import StringIO # for Python 2
except ImportError:
    from io import StringIO # for Python 3

    from BytesIO import BytesIO # for Python 2
except ImportError:
    from io import BytesIO # for Python 3

import hashlib

import ftplib
from ftplib import FTP_TLS

FTP_TLS.encoding = "utf-8"

from . import config
from . import util

g_stored_bytes_read = 0

[docs]class Main(): def __init__(self, parent): self.__parent = parent self.files = Files(self) self.check = Check(self) self.reset = Reset(self) self.metadata = Metadata(self)
[docs] def endpoint(self, path): return self.__parent.endpoint(path)
[docs] def access_token(self): return self.__parent.access_token()
[docs] def local_storage_path(self): return self.__parent.local_storage_path()
[docs] def facility(self): return self.__parent.facility()
[docs] def class_name(self): return self.__parent.class_name()
[docs] def disk_name(self): return self.__parent.disk_name()
[docs]class Files(): # post files with streaming mode def __init__(self, parent): self.__parent = parent self.__endpoint = parent.endpoint(config.register_files_path)
[docs] def post(self, **v): """ REST API: ファイルアップロード * Endpoint : [agent-url]/benten/v1/register/files * Method : POST * Response : JSON * Authorization : 要 :param \**v: See below. :Keyword Arguments: * *register_name* (``string``) : 登録名 * *files_encode* (``str``) : 登録ファイル情報 * json形式でencode要 :return: A dict of mapping keys. :Keyword: * *file_list* (``list(dict)``) : 登録指令を受け付けたファイルリスト * *error* (``dict``) : エラー情報 (エラー発生時のみ付加) """ global g_stored_bytes_read def upload_monitor(monitor): global g_stored_bytes_read threshold_size = 1048576 * 100 # 100 Mbyte bytes_read = monitor.bytes_read diff = bytes_read - g_stored_bytes_read if diff > threshold_size: g_stored_bytes_read = bytes_read total = int(monitor.len/1048576) rate = int(100.*(float(bytes_read)/float(monitor.len))) util.log("[upload_monitor] total={} MB, {}% uploaded".format( total, rate), flush=True) vdata = {} for key in ["register_name", "files_encode"]: vdata[key] = v[key] local_storage_path = self.__parent.local_storage_path() facility = self.__parent.facility() class_name = self.__parent.class_name() disk_name = self.__parent.disk_name() repository_top_dir = "/%s/%s/%s" % (facility, class_name, disk_name) files_input = util.decode_json(vdata["files_encode"]) files_input = util.add_info_file_dict( files_input, local_storage_path, facility, class_name, disk_name) vdata["files_encode"] = util.encode_json(files_input) # ... adjust file time with registered time in metadata # (filename should not be one of ["register_name", "file_encode"]) file_list = [] for f in files_input: fname_storage = "%s/%s" % (local_storage_path, f[len(repository_top_dir) + 1:]) str_utime = files_input[f]["time"] mtime = util.mktime(str_utime) atime = os.stat(fname_storage).st_mtime # os.utime(fname_storage, times=(atime, mtime)) # <- not work with python2.7 os.utime(fname_storage, (atime, mtime)) fdata =, "rb") vdata[f] = (f, fdata, "application/octet-stream") file_list.append(f) # m = MultipartEncoder(fields=vdata) g_stored_bytes_read = 0 threshold_size = 1048576 * 100 # 100 Mbyte m = MultipartEncoderMonitor.from_fields(fields=vdata, callback=upload_monitor) headers = util.headers_authorization(self.__parent.access_token()) headers["Content-Type"] = m.content_type headers["Content-Length"] = str(m.len) total_size = m.len if m.len > config.max_register_file_length: util.log("**WARN** uploaded file size({}) excced the limit({})".format(m.len, config.max_register_file_length)) if m.len > threshold_size: util.log("[upload_monitor] for file_list={}".format( file_list), flush=True) ret =, data=m, verify=False, headers=headers) # fdata.close() return util.json_response(ret)
[docs] def ftp(self, **v): """ ftp: ファイルアップロード """ t0_begin = time.time() reply = {} register_name = v.get("register_name") files_encode = v.get("files_encode") # ... local_storage_path = self.__parent.local_storage_path() facility = self.__parent.facility() class_name = self.__parent.class_name() disk_name = self.__parent.disk_name() repository_top_dir = "/%s/%s/%s" % (facility, class_name, disk_name) meta_files = util.decode_json(files_encode) meta_files = util.add_info_file_dict( meta_files, local_storage_path, facility, class_name, disk_name) # ... adjust file time with registered time in metadata # (filename should not be one of ["register_name", "file_encode"]) for f in meta_files: filename_storage = "{}/{}".format(local_storage_path, f[len(repository_top_dir) + 1:]) str_utime = meta_files[f]["time"] mtime = util.mktime(str_utime) atime = os.stat(filename_storage).st_mtime # os.utime(filename_storage, times=(atime, mtime)) # <- not work with python2.7 os.utime(filename_storage, (atime, mtime)) t0_end = time.time() t1_begin = time.time() # ... upload via ftp # repo = rest.Repository() repo.authorize(self.__parent.access_token()) ret_dict = user = ret_dict.get("username") passwd = self.__parent.access_token() host = util.benten_uploader_host() port = util.benten_uploader_port() hash_register_name = hashlib.md5(register_name.encode("utf-8")).hexdigest() ftp = FTP_TLS() ftp.connect(host,port) ftp.login(user,passwd) if util.benten_uploader_secure_data(): ftp.prot_p() # option for data security # optimize socket parameters for upload task ftp.sock.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, config.ftp_socket_SO_KEEPALIVE) ftp.sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPINTVL, config.ftp_socket_TCP_KEEPINTVL) ftp.sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPIDLE, config.ftp_socket_TCP_KEEPIDLE) try: ftp.mkd(hash_register_name) except ftplib.error_perm as e: pass meta_dict = {} try: meta_filename = "{}/metadata.json".format(hash_register_name) with StringIO() as f: ftp.retrlines("RETR {}".format(meta_filename), f.write) meta_dict = json.loads(f.getvalue()) except: pass file_list = [] filename_list = [] # ... set uuid_name for files in meta_dict for f in meta_files: filename_list.append(f) v = meta_files[f] uuid_name = None if f in meta_dict: if "uuid_name" in meta_dict[f]: uuid_name = meta_dict[f]["uuid_name"] if uuid_name is None: uuid_name = str(uuid.uuid4()) v["uuid_name"] = uuid_name meta_dict[f] = v util.log("[upload_monitor(ftps)] for file_list={}".format(filename_list)) for f in meta_files: v = meta_files[f] total_size = v["size"] unit = "B" if total_size >= 1024: total_size /= 1024. unit = "KB" if total_size >= 1024: total_size /= 1024. unit = "MB" if total_size >= 1024: total_size /= 1024. unit = "GB" util.log("[upload_monitor(ftps)] {} ({} {})".format(f, total_size, unit)) filename_storage = "{}/{}".format(local_storage_path, f[len(repository_top_dir) + 1:]) uuid_name = v["uuid_name"] with, "rb") as fd: ftp.storbinary("STOR {}/{}".format(hash_register_name, uuid_name), fd) v["name"] = f file_list.append(v) with BytesIO() as fd: fd.write(util.str_json(meta_dict).encode("utf-8")) ftp.storbinary("STOR {}".format(meta_filename), fd) ftp.close() t1_end = time.time() t2_begin = time.time() file_error_list = [] count = 0 size = 0 count_total = 0 count_threshold = 100 count_max = len(meta_files) size_threshold = 1024*1024*1024 # 1GB each_dict = {} for f in meta_files: meta_value = meta_files[f] each_dict[f] = meta_value count += 1 count_total += 1 size += meta_value.get("size",0) if (size < size_threshold) and \ (count < count_threshold) and \ (count_total < count_max): continue v = {} v["register_name"] = register_name v["files_encode"] = util.encode_json(each_dict) util.log("[verification check] {}".format(list(each_dict.keys()))) ret =**v) if "error" in ret: message = ret["error"]["message"] util.log("--> Error: {}".format(message)) file_error_list_each = ret.get("file_error_list",[]) file_error_list.extend(file_error_list_each) else: util.log("--> OK") # ... reset values each_dict = {} count = 0 size = 0 if len(file_error_list)>0: reply["error"] = {} reply["error"]["message"] = "error in file: {}".format(file_error_list) reply["file_error_list"] = file_error_list t2_end = time.time() util.log("[upload_monitor(elapsed time)] all = {:.2f}s".format(t2_end - t0_begin)) util.log("[upload_monitor(elapsed time)] init = {:.2f}s".format(t0_end - t0_begin)) util.log("[upload_monitor(elapsed time)] ftps = {:.2f}s".format(t1_end - t1_begin)) util.log("[upload_monitor(elapsed time)] verification = {:.2f}s".format(t2_end - t2_begin)) reply["file_list"] = file_list return reply
[docs]class Files20190215(): # post files as it is (not possible to upload file more than 2GB) def __init__(self, parent): self.__parent = parent self.__endpoint = parent.endpoint(config.register_files_path)
[docs] def post(self, **v): """ REST API : ファイルアップロード * Endpoint : [agent-url]/benten/v1/register/files * Method : POST * Response : JSON * Authorization : 要 :param \**v: See below. :Keyword Arguments: * *register_name* (``string``) : 登録名 * *files_encode* (``str``) : 登録ファイル情報 * json形式でencode要 :return: A dict of mapping keys. :Keyword: * *file_list* (``list(dict)``) : 登録指令を受け付けたファイルリスト * *error* (``dict``) : エラー情報 (エラー発生時のみ付加) """ vdata = {} for key in ["register_name", "files_encode"]: vdata[key] = v[key] local_storage_path = self.__parent.local_storage_path() facility = self.__parent.facility() class_name = self.__parent.class_name() disk_name = self.__parent.disk_name() repository_top_dir = "/%s/%s/%s" % (facility, class_name, disk_name) files_input = util.decode_json(vdata["files_encode"]) files_input = util.add_info_file_dict( files_input, local_storage_path, facility, class_name, disk_name) vdata["files_encode"] = util.encode_json(files_input) files = {} # ... adjust file time with registered time in metadata for f in files_input: fname_storage = "%s/%s" % (local_storage_path, f[len(repository_top_dir) + 1:]) str_utime = files_input[f]["time"] mtime = util.mktime(str_utime) atime = os.stat(fname_storage).st_mtime # os.utime(fname_storage, times=(atime, mtime)) # <- not work with python2.7 os.utime(fname_storage, (atime, mtime)) fdata =, "rb") files[f] = (f, fdata, "application/octet-stream") ret =, vdata, files=files, verify=False, headers=util.headers_authorization(self.__parent.access_token())) # fdata.close() return util.json_response(ret)
[docs]class Check(): # check hash for uploaded data def __init__(self, parent): self.__parent = parent self.__endpoint = parent.endpoint(config.register_check_path)
[docs] def post(self, **v): """ REST API: ファイル Check * Endpoint : [agent-url]/benten/v1/register/check * Method : POST * Response : JSON * Authorization : 要 :param \**v: See below. :Keyword Arguments: * *register_name* (``string``) : 登録名 * *files_encode* (``str``) : 登録ファイル情報 * json形式でencode要 :return: A dict of mapping keys. :Keyword: * *file_list* (``list(dict)``) : 登録指令を受け付けたファイルリスト * *error* (``dict``) : エラー情報 (エラー発生時のみ付加) """ vdata = {} for key in ["register_name", "files_encode"]: vdata[key] = v[key] ret =, vdata, verify=False, headers=util.headers_authorization(self.__parent.access_token())) return util.json_response(ret)
[docs]class Reset(): def __init__(self, parent): self.__parent = parent self.__endpoint = parent.endpoint(config.register_reset_path)
[docs] def post(self, **v): """ REST API: アップロードされたファイルのリセット * Endpoint : [agent-url]/benten/v1/register/reset * Method : POST * Response : JSON * Authorization : 要 :param \**v: See below. :Keyword Arguments: * *register_name* (``str``) : 登録名 :return: A dict of mapping keys. :Keyword: * *result* (``str``) : 実行結果の状況 * 成功した場合のみ terminatedを付加 * *error* (``dict``) : エラー情報 (エラー発生時のみ付加) """ vdata = {} for key in ["register_name"]: vdata[key] = v[key] ret =, vdata, verify=False, headers=util.headers_authorization(self.__parent.access_token())) return util.json_response(ret)
[docs]class Metadata(): def __init__(self, parent): self.__parent = parent self.__endpoint = parent.endpoint(config.register_metadata_path)
[docs] def post(self, **v): """ REST API : * Endpoint: [agent-url]/benten/v1/register/metadata * Method: POST * Response: JSON * Authorization: 要 :param \**v: See below. :Keyword Arguments: * *mode* (``str`` [Option]) : 登録モードの指定 * replace,update,createのいずれか * デフォルト値はreplace * *metadata_encode* (``str``) : メタデータ * json形式でencode要 :return: A dict of mapping keys. :Keyword: * *result_register* (``str``) : 実行結果の状況 * 成功した場合のみ updated or createdを付加 * *result_file* (``str``) : ファイルアップロード結果の状況 * *mode* (``str``): 登録モード * *register_name* (``str``): 登録名 * *error* (``dict``) : エラー情報 (エラー発生時のみ付加) """ vdata = {} for key in ["metadata_encode"]: vdata[key] = v[key] for key in ["mode"]: if key in v: vdata[key] = v[key] ret =, vdata, verify=False, headers=util.headers_authorization(self.__parent.access_token())) return util.json_response(ret)