Refactor: Integrate scripts into a single application (v1.2.0)

This commit is contained in:
2025-12-29 16:45:40 -06:00
parent 671741772f
commit 5bd154fb4e
6 changed files with 213 additions and 251 deletions

View File

@@ -1,4 +1,4 @@
# Territory Analysis Tool
# Territory Analysis Tool v1.2.0
## Overview
@@ -6,88 +6,80 @@ This tool provides a complete pipeline for processing and analyzing territory da
The workflow is managed by a command-line script that gives the user fine-grained control over the execution process.
## Installation
This tool requires Python 3 and has a few dependencies.
1. **Install dependencies:**
Navigate to this directory in your terminal and run the following command to install the required Python libraries:
```bash
pip install -r requirements.txt
```
## File Structure
All necessary files are located in this directory.
### Core Scripts
- `run_all.py`: The main command-line script to run the workflow. **This is the recommended entry point.**
- `process_territories.py`: (Step 1) Combines address and boundary data.
- `analysis.py`: (Step 2) Performs general territory analysis and generates `map.html`.
- `category_analysis.py`: (Step 2) Performs category-specific analysis and generates `category_map.html`.
### Input Data Files
- The tool is designed to work with any address and boundary CSV files.
- The example files `Okinawa Territory Jan 2026 - Addresses.csv` and `Okinawa Territory Jan 2026 - Boundaries.csv` are provided.
These two files can be found in NW Scheduler. Go to export -> Territories, and download them both from there.
- `run_all.py`: The main command-line script to run the workflow.
- `process_territories.py`: A module that combines address and boundary data.
- `analysis.py`: A module that performs general territory analysis.
- `category_analysis.py`: A module that performs category-specific analysis.
- `requirements.txt`: A list of Python dependencies.
## Usage
The entire workflow is managed through `run_all.py` using a command-line interface. You can see all available commands by running:
The entire workflow is managed through `run_all.py`. You can see all commands by running:
```bash
python run_all.py --help
```
### Full Pipeline Run
To run the entire process from start to finish (process raw files and then analyze them), use the `full-run` command. This is the most common use case.
To run the entire process from start to finish in memory, use the `full-run` command.
**Command:**
```bash
python run_all.py full-run --addresses <path_to_addresses.csv> --boundaries <path_to_boundaries.csv>
```
**Example:**
```bash
python run_all.py full-run --addresses "Okinawa Territory Jan 2026 - Addresses.csv" --boundaries "Okinawa Territory Jan 2026 - Boundaries.csv"
```
### Running Steps Individually
You can also run each step of the pipeline separately.
#### Step 1: Process Raw Files
To combine the address and boundary files into a single "Final" CSV, use the `process` command.
To combine the address and boundary files and save the result to a CSV, use the `process` command.
**Command:**
```bash
python run_all.py process --addresses <path_to_addresses.csv> --boundaries <path_to_boundaries.csv>
```
This will generate a new file named `Okinawa Territory <Mon Year> - Final.csv`.
#### Step 2: Analyze a Processed File
To run the analysis and generate maps from a "Final" CSV file, use the `analyze` command.
To run analysis from a "Final" CSV file, use the `analyze` command.
**Command:**
```bash
python run_all.py analyze --input <path_to_final_file.csv>
```
**Example:**
## Changelog
```bash
python run_all.py analyze --input "Okinawa Territory Dec 2025 - Final.csv"
```
### v1.2.0 (Current)
- Refactored the tool from a collection of separate scripts into a single, integrated Python application.
- Replaced `subprocess` calls with direct function imports for improved performance and reliability.
- Integrated the `pandas` library for more efficient in-memory data processing.
- The `full-run` command now processes data in memory without writing an intermediate CSV file.
- Added a `requirements.txt` file for easier dependency management.
## Workflow Details
### v1.1.0
- Introduced a command-line interface with `argparse` to replace the interactive menu.
- Added `process`, `analyze`, and `full-run` commands.
- Allowed for dynamic input file paths via command-line arguments.
1. **Data Processing:** The `process_territories.py` script reads the `Addresses.csv` to count addresses per `TerritoryID` and merges this count into the `Boundaries.csv` file. It outputs a new CSV file named in the format `Okinawa Territory Mon Year - Final.csv`.
2. **Data Analysis:** The `analysis.py` and `category_analysis.py` scripts take the `Final.csv` file as input to generate reports and interactive maps.
## Output Files
- `Okinawa Territory <Mon Year> - Final.csv`: The consolidated data file.
- `analysis.md`: A markdown summary of the general territory analysis.
- `map.html`: An interactive map visualizing territories colored by address count.
- `category_map.html`: An interactive map visualizing territories colored by their category's total address count.
### v1.0.0
- Initial release with separate scripts for processing and analysis.
- Workflow managed by an interactive `run_all.py` script.
- Project structure consolidated into a single directory.
- Git repository initialized.

View File

@@ -1,44 +1,43 @@
import csv
import json
import argparse
import pandas as pd
parser = argparse.ArgumentParser(description='Analyze territory data.')
parser.add_argument('filename', help='The CSV file to analyze.')
args = parser.parse_args()
def generate_analysis_artifacts(df):
"""
Takes a DataFrame, performs analysis, and generates analysis.md and map.html.
"""
total_territories = df['TerritoryID'].nunique()
total_addresses = df['Address Count'].sum()
average_addresses = df['Address Count'].mean()
data = []
with open(args.filename, 'r') as f:
reader = csv.DictReader(f)
for row in reader:
data.append(row)
# Territories per Category
category_counts = df.groupby('CategoryCode')['TerritoryID'].nunique().reset_index(name='Count')
total_territories = len(data)
address_counts = [int(row['Address Count']) for row in data if row['Address Count']]
total_addresses = sum(address_counts)
average_addresses = total_addresses / total_territories if total_territories > 0 else 0
# Generate Markdown Report
report_content = f"""# Territory Analysis v1.2.0
min_addresses = min(address_counts) if address_counts else 0
max_addresses = max(address_counts) if address_counts else 0
## Summary
- Total Territories: {total_territories}
- Total Addresses: {total_addresses}
- Average Addresses per Territory: {average_addresses:.2f}
category_counts = {}
for row in data:
category = row['CategoryCode']
if category in category_counts:
category_counts[category] += 1
else:
category_counts[category] = 1
## Territories per Category
| CategoryCode | Count |
|--------------|-------|
"""
for _, row in category_counts.iterrows():
report_content += f"| {row['CategoryCode']} | {row['Count']} |\n"
with open('analysis.md', 'w') as f:
f.write('# Territory Analysis\n')
f.write(f'Total Territories: {total_territories}\n')
f.write(f'Total Addresses: {total_addresses}\n')
f.write(f'Average Addresses per Territory: {average_addresses:.2f}\n')
f.write('## Territories by Category\n')
for category, count in category_counts.items():
f.write(f'- {category}: {count}\n')
with open('analysis.md', 'w') as f:
f.write(report_content)
with open('map.html', 'w') as f:
f.write(f'''
# Prepare data for embedding in HTML's JavaScript
# Convert DataFrame to a list of dicts for JSON serialization
data_for_json = df.to_dict(orient='records')
min_addresses = df['Address Count'].min()
max_addresses = df['Address Count'].max()
with open('map.html', 'w') as f:
f.write(f'''
<!DOCTYPE html>
<html>
<head>
@@ -56,12 +55,15 @@ with open('map.html', 'w') as f:
attribution: '&copy; <a href="https://www.openstreetmap.org/copyright">OpenStreetMap</a> contributors'
}}).addTo(map);
var territories = {json.dumps(data, indent=4)};
var territories = {json.dumps(data_for_json, indent=4)};
var minAddresses = {min_addresses};
var maxAddresses = {max_addresses};
function getColor(d) {{
var ratio = (d - minAddresses) / (maxAddresses - minAddresses);
// Handle division by zero if all counts are the same
var range = maxAddresses - minAddresses;
if (range === 0) return 'hsl(120, 100%, 50%)';
var ratio = (d - minAddresses) / range;
var hue = (1 - ratio) * 120;
return 'hsl(' + hue + ', 100%, 50%)';
}}
@@ -69,15 +71,19 @@ with open('map.html', 'w') as f:
for (var i = 0; i < territories.length; i++) {{
var territory = territories[i];
if (territory.Boundary) {{
var boundary = JSON.parse('[' + territory.Boundary + ']');
var color = getColor(territory['Address Count']);
var polygon = L.polygon(boundary.map(p => [p[1], p[0]]), {{
fillColor: color,
color: "#000",
weight: 1,
fillOpacity: 0.7
}}).addTo(map);
polygon.bindPopup('<b>Territory ID:</b> ' + territory.TerritoryID + '<br><b>Territory Number:</b> ' + territory.CategoryCode + '-' + territory.Number + '<br><b>Address Count:</b> ' + territory['Address Count']);
try {{
var boundary = JSON.parse('[' + territory.Boundary + ']');
var color = getColor(territory['Address Count']);
var polygon = L.polygon(boundary.map(p => [p[1], p[0]]), {{
fillColor: color,
color: "#000",
weight: 1,
fillOpacity: 0.7
}}).addTo(map);
polygon.bindPopup('<b>Territory ID:</b> ' + territory.TerritoryID + '<br><b>Territory Number:</b> ' + territory.CategoryCode + '-' + territory.Number + '<br><b>Address Count:</b> ' + territory['Address Count']);
}} catch(e) {{
console.error("Could not parse boundary for territory: " + territory.TerritoryID, e);
}}
}}
}}
</script>

View File

@@ -1,41 +1,26 @@
import csv
import json
import argparse
import pandas as pd
parser = argparse.ArgumentParser(description='Analyze territory data by category.')
parser.add_argument('filename', help='The CSV file to analyze.')
args = parser.parse_args()
def generate_category_map(df):
"""
Takes a DataFrame, performs category-based analysis, and generates category_map.html.
"""
# Calculate total address count for each CategoryCode
category_address_counts = df.groupby('CategoryCode')['Address Count'].sum().to_dict()
data = []
with open(args.filename, 'r') as f:
reader = csv.DictReader(f)
for row in reader:
data.append(row)
# Assign a distinct color to each category for the map
unique_categories = sorted(list(df['CategoryCode'].unique()))
colors = [
'#e6194b', '#3cb44b', '#ffe119', '#4363d8', '#f58231', '#911eb4',
'#46f0f0', '#f032e6', '#bcf60c', '#fabebe', '#008080', '#e6beff'
]
category_colors = {category: colors[i % len(colors)] for i, category in enumerate(unique_categories)}
category_address_counts = {}
for row in data:
category = row['CategoryCode']
if 'Address Count' in row and row['Address Count']:
address_count = int(row['Address Count'])
if category in category_address_counts:
category_address_counts[category] += address_count
else:
category_address_counts[category] = address_count
# Prepare data for embedding in HTML's JavaScript
data_for_json = df.to_dict(orient='records')
# --- New code for category colors ---
unique_categories = sorted(list(category_address_counts.keys()))
# A list of 12 distinct colors
colors = [
'#e6194b', '#3cb44b', '#ffe119', '#4363d8', '#f58231', '#911eb4',
'#46f0f0', '#f032e6', '#bcf60c', '#fabebe', '#008080', '#e6beff'
]
category_colors = {}
for i, category in enumerate(unique_categories):
category_colors[category] = colors[i % len(colors)]
with open('category_map.html', 'w') as f:
f.write(f'''
with open('category_map.html', 'w') as f:
f.write(f'''
<!DOCTYPE html>
<html>
<head>
@@ -53,23 +38,27 @@ with open('category_map.html', 'w') as f:
attribution: '&copy; <a href="https://www.openstreetmap.org/copyright">OpenStreetMap</a> contributors'
}}).addTo(map);
var territories = {json.dumps(data, indent=4)};
var territories = {json.dumps(data_for_json, indent=4)};
var categoryColors = {json.dumps(category_colors, indent=4)};
var categoryAddressCounts = {json.dumps(category_address_counts, indent=4)};
for (var i = 0; i < territories.length; i++) {{
var territory = territories[i];
if (territory.Boundary) {{
var boundary = JSON.parse('[' + territory.Boundary + ']');
var color = categoryColors[territory.CategoryCode];
var polygon = L.polygon(boundary.map(p => [p[1], p[0]]), {{
fillColor: color,
color: "#000",
weight: 1,
fillOpacity: 0.7
}}).addTo(map);
var categoryAddressCount = categoryAddressCounts[territory.CategoryCode]
polygon.bindPopup('<b>Territory ID:</b> ' + territory.TerritoryID + '<br><b>Category:</b> ' + territory.CategoryCode + '<br><b>Category Address Count:</b> ' + categoryAddressCount);
try {{
var boundary = JSON.parse('[' + territory.Boundary + ']');
var color = categoryColors[territory.CategoryCode];
var polygon = L.polygon(boundary.map(p => [p[1], p[0]]), {{
fillColor: color,
color: "#000",
weight: 1,
fillOpacity: 0.7
}}).addTo(map);
var categoryAddressCount = categoryAddressCounts[territory.CategoryCode]
polygon.bindPopup('<b>Territory ID:</b> ' + territory.TerritoryID + '<br><b>Category:</b> ' + territory.CategoryCode + '<br><b>Category Address Count:</b> ' + categoryAddressCount);
}} catch(e) {{
console.error("Could not parse boundary for territory: " + territory.TerritoryID, e);
}}
}}
}}
</script>

View File

@@ -1,53 +1,26 @@
import csv
import argparse
import pandas as pd
from datetime import datetime
def process_territories(addresses_file, boundaries_file, final_file):
# Read the addresses and count occurrences of each TerritoryID
address_counts = {}
with open(addresses_file, 'r', encoding='utf-8-sig') as f:
reader = csv.DictReader(f)
for row in reader:
territory_id = row['TerritoryID']
if territory_id:
address_counts[territory_id] = address_counts.get(territory_id, 0) + 1
def process_data(addresses_file, boundaries_file):
"""
Reads address and boundary CSVs, merges them, and returns a consolidated DataFrame.
"""
try:
# Read the addresses and count occurrences of each TerritoryID
address_counts = pd.read_csv(addresses_file).groupby('TerritoryID').size().reset_index(name='Address Count')
# Read the boundaries file and write to the final file
with open(boundaries_file, 'r', encoding='utf-8-sig') as f_in, \
open(final_file, 'w', newline='', encoding='utf-8') as f_out:
# Read the boundaries file
boundaries_df = pd.read_csv(boundaries_file)
reader = csv.DictReader(f_in)
# Merge the address counts with the boundaries data
merged_df = pd.merge(boundaries_df, address_counts, on='TerritoryID', how='left')
# Define the headers for the output file
fieldnames = ['TerritoryID', 'CategoryCode', 'Number', 'Area', 'Boundary', 'Address Count']
writer = csv.DictWriter(f_out, fieldnames=fieldnames)
writer.writeheader()
# Fill missing address counts with 0 and ensure the column is integer type
merged_df['Address Count'] = merged_df['Address Count'].fillna(0).astype(int)
for row in reader:
territory_id = row['TerritoryID']
return merged_df
# Get the address count for the current territory
address_count = address_counts.get(territory_id, 0)
# Write the new row to the final file
writer.writerow({
'TerritoryID': territory_id,
'CategoryCode': row.get('CategoryCode', ''),
'Number': row.get('Number', ''),
'Area': row.get('Area', ''),
'Boundary': row.get('Boundary', ''),
'Address Count': address_count
})
if __name__ == '__main__':
parser = argparse.ArgumentParser(description='Process territory data.')
parser.add_argument('addresses_file', help='The path to the addresses CSV file.')
parser.add_argument('boundaries_file', help='The path to the boundaries CSV file.')
args = parser.parse_args()
# Generate the output file name
date_str = datetime.now().strftime('%b %Y')
output_file = f'Okinawa Territory {date_str} - Final.csv'
process_territories(args.addresses_file, args.boundaries_file, output_file)
print(f"Processing complete. Output written to {output_file}")
except FileNotFoundError as e:
raise FileNotFoundError(f"Error during data processing: {e}")
except Exception as e:
raise Exception(f"An unexpected error occurred during data processing: {e}")

1
requirements.txt Normal file
View File

@@ -0,0 +1 @@
pandas

View File

@@ -1,89 +1,94 @@
import subprocess
import os
import sys
import argparse
import pandas as pd
from datetime import datetime
def run_script(command):
"""Runs a script and checks for errors, printing output in real-time."""
print(f"Executing: {' '.join(command)}", flush=True)
try:
process = subprocess.run(
command,
capture_output=True,
text=True,
check=True
)
print("✓ Success!")
if process.stdout:
print(process.stdout)
return process
except FileNotFoundError as e:
print(f"\nError: Command not found. Ensure Python is in your PATH. Details: {e}", file=sys.stderr)
sys.exit(1)
except subprocess.CalledProcessError as e:
print(f"\nError running command: {' '.join(command)}", file=sys.stderr)
print("\n--- STDERR ---", file=sys.stderr)
print(e.stderr, file=sys.stderr)
sys.exit(1)
# Import the refactored functions from other scripts
from process_territories import process_data
from analysis import generate_analysis_artifacts
from category_analysis import generate_category_map
def process_files(addresses_file, boundaries_file):
def process_and_save(addresses_file, boundaries_file):
"""
Runs the processing script to generate the 'Final' CSV.
Returns the path to the generated file or exits on error.
Runs the processing script and saves the result to a 'Final' CSV.
Returns the path to the generated file.
"""
print("\n--- Step 1: Processing territory files ---")
# Check if input files exist
if not os.path.exists(addresses_file):
print(f"Error: Address file not found at '{addresses_file}'", file=sys.stderr)
sys.exit(1)
if not os.path.exists(boundaries_file):
print(f"Error: Boundaries file not found at '{boundaries_file}'", file=sys.stderr)
sys.exit(1)
process_command = ["python", "process_territories.py", addresses_file, boundaries_file]
run_script(process_command)
# Find the most recently modified "Final.csv"
try:
final_files = [f for f in os.listdir('.') if "Final.csv" in f and os.path.isfile(f)]
if not final_files:
print("Error: No 'Final.csv' file found after processing.", file=sys.stderr)
sys.exit(1)
latest_file = max(final_files, key=os.path.getmtime)
print(f"Generated file: {latest_file}")
return latest_file
# Process data in memory
processed_df = process_data(addresses_file, boundaries_file)
# Save the processed DataFrame to a CSV file
date_str = datetime.now().strftime('%b %Y')
output_filename = f'Okinawa Territory {date_str} - Final.csv'
processed_df.to_csv(output_filename, index=False)
print(f"✓ Success! Generated file: {output_filename}")
return output_filename
except (FileNotFoundError, Exception) as e:
print(f"\nError during file processing: {e}", file=sys.stderr)
sys.exit(1)
def analyze_from_file(processed_file_path):
"""
Reads a processed file and runs the analysis scripts on it.
"""
print("\n--- Step 2: Running analysis from file ---")
try:
# Read the processed file into a DataFrame
df = pd.read_csv(processed_file_path)
# Run the analysis functions
generate_analysis_artifacts(df)
generate_category_map(df)
print("\n✓ Analysis complete!")
print("Generated files: analysis.md, map.html, category_map.html")
except FileNotFoundError as e:
print(f"\nError: Processed file not found at '{processed_file_path}'.", file=sys.stderr)
sys.exit(1)
except Exception as e:
print(f"Error locating processed file: {e}", file=sys.stderr)
print(f"\nAn unexpected error occurred during analysis: {e}", file=sys.stderr)
sys.exit(1)
def run_analysis(processed_file_path):
def full_run_in_memory(addresses_file, boundaries_file):
"""
Runs the analysis scripts on the processed file.
Processes data and runs analysis entirely in memory.
"""
if not processed_file_path or not os.path.exists(processed_file_path):
print(f"\nError: Processed file not found at '{processed_file_path}'. Please run the 'process' step first.", file=sys.stderr)
print("\n--- Running full pipeline in memory ---")
try:
# Step 1: Process data
print("Processing data...")
processed_df = process_data(addresses_file, boundaries_file)
print("✓ Data processing complete.")
# Step 2: Run analysis
print("\nRunning analysis...")
generate_analysis_artifacts(processed_df)
generate_category_map(processed_df)
print("\n✓ Analysis complete!")
print("Generated files: analysis.md, map.html, category_map.html")
except (FileNotFoundError, Exception) as e:
print(f"\nAn error occurred during the full run: {e}", file=sys.stderr)
sys.exit(1)
print("\n--- Step 2: Running analysis scripts ---")
analysis_command = ["python", "analysis.py", processed_file_path]
run_script(analysis_command)
category_analysis_command = ["python", "category_analysis.py", processed_file_path]
run_script(category_analysis_command)
print("\nAnalysis complete!")
print("Generated files: analysis.md, map.html, category_map.html")
def main():
"""Parses command-line arguments and orchestrates the workflow."""
parser = argparse.ArgumentParser(description="Territory Analysis Tool")
parser = argparse.ArgumentParser(
description="Territory Analysis Tool v1.2.0",
formatter_class=argparse.RawTextHelpFormatter
)
subparsers = parser.add_subparsers(dest="command", required=True, help="Available commands")
# Sub-command for 'process'
parser_process = subparsers.add_parser("process", help="Step 1: Process raw address and boundary files into a final CSV.")
parser_process = subparsers.add_parser("process", help="Step 1: Process raw files and save the result to a CSV.")
parser_process.add_argument("--addresses", required=True, help="Path to the addresses CSV file.")
parser_process.add_argument("--boundaries", required=True, help="Path to the boundaries CSV file.")
@@ -92,26 +97,22 @@ def main():
parser_analyze.add_argument("--input", required=True, help="Path to the processed 'Final' CSV file.")
# Sub-command for 'full-run'
parser_full_run = subparsers.add_parser("full-run", help="Run the full pipeline: process and then analyze.")
parser_full_run = subparsers.add_parser("full-run", help="Run the full pipeline (process and analyze) in memory.")
parser_full_run.add_argument("--addresses", required=True, help="Path to the addresses CSV file.")
parser_full_run.add_argument("--boundaries", required=True, help="Path to the boundaries CSV file.")
args = parser.parse_args()
if args.command == "process":
process_files(args.addresses, args.boundaries)
process_and_save(args.addresses, args.boundaries)
elif args.command == "analyze":
run_analysis(args.input)
analyze_from_file(args.input)
elif args.command == "full-run":
# Run step 1
processed_file = process_files(args.addresses, args.boundaries)
# Run step 2
run_analysis(processed_file)
full_run_in_memory(args.addresses, args.boundaries)
if __name__ == "__main__":
# Change working directory to the script's directory
# This makes file paths relative to the script's location
# Ensure the script runs in its own directory context
os.chdir(os.path.dirname(os.path.abspath(__file__)))
main()