Python Sample Code

Regular upload

import requests
import os
import shutil
import concurrent.futures
import timeimport mimetypes

# Set the API endpoint URL and your API token
url = "https://gateway.dolpin.io/api/v1/documents/upload-in-cluster-with-api?api_token=<your_api_token>"


# Set the folder path containing the files to upload
folder_path = "./source/"

def get_file_content_type(file_path):
    content_type, _ = mimetypes.guess_type(file_path)
    if len(content_type) >= 40:
        return "multipart/form-data"
    return content_type

def upload_file(file_path):
    file_name = os.path.basename(file_path)
    # Create the form data to send in the request
    data = {'name': file_name}
    print(f'Uploading {file_name}...')
    content_type = get_file_content_type(file_path)
    # Send the POST request to upload the file
    files = [('files', (file_name, open(file_path, 'rb'), content_type))]
    response = requests.post(url=url, files=files, data=data)
    print(f'Response code: {response.status_code}')
    # Check if the response status code is 200 (OK)
    if response.status_code == 200:
        response_json = response.json()
        print('Successfully uploaded file: ', response_json)
    else:
        print(response.text)
        time.sleep(10)

# Create a thread pool with a maximum of 10 worker threads
with concurrent.futures.ThreadPoolExecutor(max_workers=10) as executor:
    # Loop through each file in the folder and upload it to the API using threads
    futures = []
    for file_name in os.listdir(folder_path):
        file_path = os.path.join(folder_path, file_name)
        if os.path.isfile(file_path):
            print(f'Submitting task for {file_name}...')
            # Submit the task to the thread pool
            futures.append(executor.submit(upload_file, file_path))
        else:
            print("not a valid path")

    # Wait for all tasks to finish
    for future in concurrent.futures.as_completed(futures):
        try:
            # Get the result of the future (if any)
            result = future.result()
        except Exception as e:
            print(f'Exception while uploading file: {e}')
            

Upload with file movement

In this scenario you must have 3 folder source , fail, success you upload from source folder if upload is successful then move the file to success folder or if fail the move to fail folder. this way you can keep track files upload status

import requests
import os
import shutil
import concurrent.futures
import time
import mimetypes

# Set the API endpoint URL and your API token
url = "https://gateway.dolpin.io/api/v1/documents/upload-in-cluster-with-api?api_token=<your_api_token>"


# Set the folder path containing the files to upload
folder_path = "./source/"

def get_file_content_type(file_path):
    content_type, _ = mimetypes.guess_type(file_path)
    if len(content_type) >= 40:
        return "multipart/form-data"
    return content_type

def upload_file(file_path):
    file_name = os.path.basename(file_path)
    # Create the form data to send in the request
    data = {'name': file_name}
    print(f'Uploading {file_name}...')
    content_type = get_file_content_type(file_path)
    # Send the POST request to upload the file
    files = [('files', (file_name, open(file_path, 'rb'), content_type))]
    response = requests.post(url=url, files=files, data=data)
    print(f'Response code: {response.status_code}')
    # Check if the response status code is 200 (OK)
    if response.status_code == 200:
        response_json = response.json()
        print('Successfully uploaded file: ', response_json)
        # Get the CID of the uploaded file and add it to the dictionary
        shutil.move(file_path, "./success/")
    else:
        shutil.move(file_path, "./fail/")
        print(response.text)
        time.sleep(10)

# Create a thread pool with a maximum of 10 worker threads
with concurrent.futures.ThreadPoolExecutor(max_workers=10) as executor:
    # Loop through each file in the folder and upload it to the API using threads
    futures = []
    for file_name in os.listdir(folder_path):
        file_path = os.path.join(folder_path, file_name)
        if os.path.isfile(file_path):
            print(f'Submitting task for {file_name}...')
            # Submit the task to the thread pool
            futures.append(executor.submit(upload_file, file_path))
        else:
            print("not a valid path")

    # Wait for all tasks to finish
    for future in concurrent.futures.as_completed(futures):
        try:
            # Get the result of the future (if any)
            result = future.result()
        except Exception as e:
            print(f'Exception while uploading file: {e}')

Last updated