Classification: [Public]
⚠️ You agree that you have read and understanded the content in Notion - Disclaimer & Terms of Use before usage.
Copyright NestAI , for more information, please contact [email protected]
Abstract
Order processing is a time-consuming challenge faced by the manufacturing industry. Traditionally, sales personnel manually filled out standard order forms by scanning various received order documents line by line, subsequently entering them into ERP systems to complete the order recognition and electronic creation process. In recent years, with the development of VL model and LLM, we have successfully achieved end-to-end automation and human-in-the-loop (HITL) based on these models. Our works include:
We build a LLM-driven workflow that:
- Free up 80% of the time spent on manual order recognition and finishing
- Reducing the minimum order recognition time from 10 minutes to just 20 seconds
- Achieving an accuracy rate of 90%, and providing transparent processing logic—where business administrators can control the arrangement of prompts (implemented by Dify).
We build a Dify-centric solution that:
- Brings enterprise-level authentication into workflow access, support plug-&-play setup. (Supports any OAuth/OIDC vendors, and we tested Auth0 and Authing)
- Implements Better file upload, process and download.
- Concurrently calls Dify API and obtain 4x speed up.
Our workflow

Gradio+Dify+Authing/Auth0 = Enterprise Ready AI Workflow Solution

Detailed Data Process Approaches

The Challenges We Solved
8k LLM Output Limitation
Python Sandbox Code - Dynamic Split Prompt to Avoid 8k LLM Output Limitation
def main(arg1: str) -> dict: def merge_pages(split_list): max_length=7000 merged = [] current_chunk = [] current_length = 0 for elem in split_list: elem_len = len(elem) if current_chunk: new_length = current_length + 7 + elem_len else: new_length = elem_len if new_length <= max_length: current_chunk.append(elem) current_length = new_length else: if current_chunk: merged.append('--分页符--'.join(current_chunk)) current_chunk = [elem] current_length = elem_len else: merged.append(elem) current_chunk = [] current_length = 0 if current_chunk: merged.append('--分页符--'.join(current_chunk)) return merged if '--分页符--' in arg1: splited_result = arg1.split('--分页符--') else: splited_result = [arg1] merged_result = merge_pages(splited_result) return {"splited_result": merged_result}
Python Sandbox Code - Merge CSV String
def main(csv_list) -> dict: import csv from io import StringIO first_io = StringIO(csv_list[0]) reader = csv.reader(first_io) rows = list(reader) if not rows: return "" header = rows[0] data = rows[1:] for csv_str in csv_list[1:]: current_io = StringIO(csv_str) current_reader = csv.reader(current_io) current_rows = list(current_reader) if not current_rows: continue data.extend(current_rows[1:]) output = StringIO() writer = csv.writer(output, lineterminator='\n') writer.writerow(header) writer.writerows(data) return { "result": output.getvalue() }
Concurrent API Call to Speed up
def dify_pdf_path_to_text_list(filepath, request: gr.Request): """ Extract each page from the uploaded PDF into an image and send a separate dedify to recognize the order text information. Use multi-threaded concurrent processing, the maximum number of concurrent 4 Args: filepath: pdf file path request: Gradio request object Returns: Extracted text information for each page of the order (list), maintaining the original page order """ user_id = request.session['user']['username'] upload_dir = os.path.join(os.getcwd(), "uploaded_files") os.makedirs(upload_dir, exist_ok=True) image_paths = pdf_to_images(filepath, upload_dir) if not image_paths: return "PDF conversion failed to generate images" results = [None] * len(image_paths) errors = [] max_workers = 4 def process_image(index, image_path): upload_response = dify_file_upload(request, file_path=image_path) file_id = upload_response['id'] api_key = os.getenv('DIFY_READ_ORDER_IMG_KEY') url = os.getenv('DIFY_URL') + '/workflows/run' headers = { 'Authorization': f'Bearer {api_key}', 'Content-Type': 'application/json' } data = { "inputs": { "Pic": [{ "transfer_method": "local_file", "upload_file_id": file_id, "type": "image" }] }, "response_mode": "blocking", "user": user_id } try: response = requests.post(url, headers=headers, json=data) if response.status_code == 200: print(f'[API] Page {index+1} Dify API Success') return index, response.json()['data']['outputs']['text'], None else: error_msg = f'[API] Dify API Failed: {response.status_code}, {response.text}' print(error_msg) return index, None, error_msg except Exception as e: error_msg = f'[API] Error: {str(e)}' print(error_msg) return index, None, error_msg with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor: future_to_index = { executor.submit(process_image, idx, img_path): idx for idx, img_path in enumerate(image_paths) } for future in concurrent.futures.as_completed(future_to_index): try: idx, result, error = future.result() if error: errors.append(f"Page {idx+1} Failed: {error}") else: results[idx] = result except Exception as exc: idx = future_to_index[future] print(f'Page {idx+1} got error: {exc}') errors.append(f"Page {idx+1} got error: {str(exc)}") if errors: print(f"Error Count: {len(errors)} ") for err in errors: print(err) filtered_results = [r for r in results if r is not None] if not filtered_results and errors: return errors[0] return filtered_results