Initial commit: Set up Territory Analysis Tool

This commit is contained in:
2025-12-29 16:36:10 -06:00
commit 671741772f
8 changed files with 10424 additions and 0 deletions

22
.gitignore vendored Normal file
View File

@@ -0,0 +1,22 @@
# Python
__pycache__/
*.pyc
*.pyo
*.pyd
.Python/
env/
venv/
*.env
.vscode/
# Editor-specific files
.idea/
*.swp
*~
.DS_Store
# Generated files
*.html
*.md
* - Final.csv
!README.md

File diff suppressed because it is too large Load Diff

File diff suppressed because one or more lines are too long

93
README.md Normal file
View File

@@ -0,0 +1,93 @@
# Territory Analysis Tool
## Overview
This tool provides a complete pipeline for processing and analyzing territory data. It takes raw address and boundary data, combines them, and then generates analytical reports in markdown and interactive HTML map formats.
The workflow is managed by a command-line script that gives the user fine-grained control over the execution process.
## File Structure
All necessary files are located in this directory.
### Core Scripts
- `run_all.py`: The main command-line script to run the workflow. **This is the recommended entry point.**
- `process_territories.py`: (Step 1) Combines address and boundary data.
- `analysis.py`: (Step 2) Performs general territory analysis and generates `map.html`.
- `category_analysis.py`: (Step 2) Performs category-specific analysis and generates `category_map.html`.
### Input Data Files
- The tool is designed to work with any address and boundary CSV files.
- The example files `Okinawa Territory Jan 2026 - Addresses.csv` and `Okinawa Territory Jan 2026 - Boundaries.csv` are provided.
These two files can be found in NW Scheduler. Go to export -> Territories, and download them both from there.
## Usage
The entire workflow is managed through `run_all.py` using a command-line interface. You can see all available commands by running:
```bash
python run_all.py --help
```
### Full Pipeline Run
To run the entire process from start to finish (process raw files and then analyze them), use the `full-run` command. This is the most common use case.
**Command:**
```bash
python run_all.py full-run --addresses <path_to_addresses.csv> --boundaries <path_to_boundaries.csv>
```
**Example:**
```bash
python run_all.py full-run --addresses "Okinawa Territory Jan 2026 - Addresses.csv" --boundaries "Okinawa Territory Jan 2026 - Boundaries.csv"
```
### Running Steps Individually
You can also run each step of the pipeline separately.
#### Step 1: Process Raw Files
To combine the address and boundary files into a single "Final" CSV, use the `process` command.
**Command:**
```bash
python run_all.py process --addresses <path_to_addresses.csv> --boundaries <path_to_boundaries.csv>
```
This will generate a new file named `Okinawa Territory <Mon Year> - Final.csv`.
#### Step 2: Analyze a Processed File
To run the analysis and generate maps from a "Final" CSV file, use the `analyze` command.
**Command:**
```bash
python run_all.py analyze --input <path_to_final_file.csv>
```
**Example:**
```bash
python run_all.py analyze --input "Okinawa Territory Dec 2025 - Final.csv"
```
## Workflow Details
1. **Data Processing:** The `process_territories.py` script reads the `Addresses.csv` to count addresses per `TerritoryID` and merges this count into the `Boundaries.csv` file. It outputs a new CSV file named in the format `Okinawa Territory Mon Year - Final.csv`.
2. **Data Analysis:** The `analysis.py` and `category_analysis.py` scripts take the `Final.csv` file as input to generate reports and interactive maps.
## Output Files
- `Okinawa Territory <Mon Year> - Final.csv`: The consolidated data file.
- `analysis.md`: A markdown summary of the general territory analysis.
- `map.html`: An interactive map visualizing territories colored by address count.
- `category_map.html`: An interactive map visualizing territories colored by their category's total address count.

86
analysis.py Normal file
View File

@@ -0,0 +1,86 @@
import csv
import json
import argparse
parser = argparse.ArgumentParser(description='Analyze territory data.')
parser.add_argument('filename', help='The CSV file to analyze.')
args = parser.parse_args()
data = []
with open(args.filename, 'r') as f:
reader = csv.DictReader(f)
for row in reader:
data.append(row)
total_territories = len(data)
address_counts = [int(row['Address Count']) for row in data if row['Address Count']]
total_addresses = sum(address_counts)
average_addresses = total_addresses / total_territories if total_territories > 0 else 0
min_addresses = min(address_counts) if address_counts else 0
max_addresses = max(address_counts) if address_counts else 0
category_counts = {}
for row in data:
category = row['CategoryCode']
if category in category_counts:
category_counts[category] += 1
else:
category_counts[category] = 1
with open('analysis.md', 'w') as f:
f.write('# Territory Analysis\n')
f.write(f'Total Territories: {total_territories}\n')
f.write(f'Total Addresses: {total_addresses}\n')
f.write(f'Average Addresses per Territory: {average_addresses:.2f}\n')
f.write('## Territories by Category\n')
for category, count in category_counts.items():
f.write(f'- {category}: {count}\n')
with open('map.html', 'w') as f:
f.write(f'''
<!DOCTYPE html>
<html>
<head>
<title>Territory Map</title>
<meta charset="utf-8" />
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<link rel="stylesheet" href="https://unpkg.com/leaflet@1.7.1/dist/leaflet.css" />
<script src="https://unpkg.com/leaflet@1.7.1/dist/leaflet.js"></script>
</head>
<body>
<div id="map" style="width: 100vw; height: 100vh;"></div>
<script>
var map = L.map('map').setView([26.3, 127.8], 10);
L.tileLayer('https://{{s}}.tile.openstreetmap.org/{{z}}/{{x}}/{{y}}.png', {{
attribution: '&copy; <a href="https://www.openstreetmap.org/copyright">OpenStreetMap</a> contributors'
}}).addTo(map);
var territories = {json.dumps(data, indent=4)};
var minAddresses = {min_addresses};
var maxAddresses = {max_addresses};
function getColor(d) {{
var ratio = (d - minAddresses) / (maxAddresses - minAddresses);
var hue = (1 - ratio) * 120;
return 'hsl(' + hue + ', 100%, 50%)';
}}
for (var i = 0; i < territories.length; i++) {{
var territory = territories[i];
if (territory.Boundary) {{
var boundary = JSON.parse('[' + territory.Boundary + ']');
var color = getColor(territory['Address Count']);
var polygon = L.polygon(boundary.map(p => [p[1], p[0]]), {{
fillColor: color,
color: "#000",
weight: 1,
fillOpacity: 0.7
}}).addTo(map);
polygon.bindPopup('<b>Territory ID:</b> ' + territory.TerritoryID + '<br><b>Territory Number:</b> ' + territory.CategoryCode + '-' + territory.Number + '<br><b>Address Count:</b> ' + territory['Address Count']);
}}
}}
</script>
</body>
</html>
''')

78
category_analysis.py Normal file
View File

@@ -0,0 +1,78 @@
import csv
import json
import argparse
parser = argparse.ArgumentParser(description='Analyze territory data by category.')
parser.add_argument('filename', help='The CSV file to analyze.')
args = parser.parse_args()
data = []
with open(args.filename, 'r') as f:
reader = csv.DictReader(f)
for row in reader:
data.append(row)
category_address_counts = {}
for row in data:
category = row['CategoryCode']
if 'Address Count' in row and row['Address Count']:
address_count = int(row['Address Count'])
if category in category_address_counts:
category_address_counts[category] += address_count
else:
category_address_counts[category] = address_count
# --- New code for category colors ---
unique_categories = sorted(list(category_address_counts.keys()))
# A list of 12 distinct colors
colors = [
'#e6194b', '#3cb44b', '#ffe119', '#4363d8', '#f58231', '#911eb4',
'#46f0f0', '#f032e6', '#bcf60c', '#fabebe', '#008080', '#e6beff'
]
category_colors = {}
for i, category in enumerate(unique_categories):
category_colors[category] = colors[i % len(colors)]
with open('category_map.html', 'w') as f:
f.write(f'''
<!DOCTYPE html>
<html>
<head>
<title>Category Map</title>
<meta charset="utf-8" />
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<link rel="stylesheet" href="https://unpkg.com/leaflet@1.7.1/dist/leaflet.css" />
<script src="https://unpkg.com/leaflet@1.7.1/dist/leaflet.js"></script>
</head>
<body>
<div id="map" style="width: 100vw; height: 100vh"></div>
<script>
var map = L.map('map').setView([26.3, 127.8], 10);
L.tileLayer('https://{{s}}.tile.openstreetmap.org/{{z}}/{{x}}/{{y}}.png', {{
attribution: '&copy; <a href="https://www.openstreetmap.org/copyright">OpenStreetMap</a> contributors'
}}).addTo(map);
var territories = {json.dumps(data, indent=4)};
var categoryColors = {json.dumps(category_colors, indent=4)};
var categoryAddressCounts = {json.dumps(category_address_counts, indent=4)};
for (var i = 0; i < territories.length; i++) {{
var territory = territories[i];
if (territory.Boundary) {{
var boundary = JSON.parse('[' + territory.Boundary + ']');
var color = categoryColors[territory.CategoryCode];
var polygon = L.polygon(boundary.map(p => [p[1], p[0]]), {{
fillColor: color,
color: "#000",
weight: 1,
fillOpacity: 0.7
}}).addTo(map);
var categoryAddressCount = categoryAddressCounts[territory.CategoryCode]
polygon.bindPopup('<b>Territory ID:</b> ' + territory.TerritoryID + '<br><b>Category:</b> ' + territory.CategoryCode + '<br><b>Category Address Count:</b> ' + categoryAddressCount);
}}
}}
</script>
</body>
</html>
''')

53
process_territories.py Normal file
View File

@@ -0,0 +1,53 @@
import csv
import argparse
from datetime import datetime
def process_territories(addresses_file, boundaries_file, final_file):
# Read the addresses and count occurrences of each TerritoryID
address_counts = {}
with open(addresses_file, 'r', encoding='utf-8-sig') as f:
reader = csv.DictReader(f)
for row in reader:
territory_id = row['TerritoryID']
if territory_id:
address_counts[territory_id] = address_counts.get(territory_id, 0) + 1
# Read the boundaries file and write to the final file
with open(boundaries_file, 'r', encoding='utf-8-sig') as f_in, \
open(final_file, 'w', newline='', encoding='utf-8') as f_out:
reader = csv.DictReader(f_in)
# Define the headers for the output file
fieldnames = ['TerritoryID', 'CategoryCode', 'Number', 'Area', 'Boundary', 'Address Count']
writer = csv.DictWriter(f_out, fieldnames=fieldnames)
writer.writeheader()
for row in reader:
territory_id = row['TerritoryID']
# Get the address count for the current territory
address_count = address_counts.get(territory_id, 0)
# Write the new row to the final file
writer.writerow({
'TerritoryID': territory_id,
'CategoryCode': row.get('CategoryCode', ''),
'Number': row.get('Number', ''),
'Area': row.get('Area', ''),
'Boundary': row.get('Boundary', ''),
'Address Count': address_count
})
if __name__ == '__main__':
parser = argparse.ArgumentParser(description='Process territory data.')
parser.add_argument('addresses_file', help='The path to the addresses CSV file.')
parser.add_argument('boundaries_file', help='The path to the boundaries CSV file.')
args = parser.parse_args()
# Generate the output file name
date_str = datetime.now().strftime('%b %Y')
output_file = f'Okinawa Territory {date_str} - Final.csv'
process_territories(args.addresses_file, args.boundaries_file, output_file)
print(f"Processing complete. Output written to {output_file}")

117
run_all.py Normal file
View File

@@ -0,0 +1,117 @@
import subprocess
import os
import sys
import argparse
def run_script(command):
"""Runs a script and checks for errors, printing output in real-time."""
print(f"Executing: {' '.join(command)}", flush=True)
try:
process = subprocess.run(
command,
capture_output=True,
text=True,
check=True
)
print("✓ Success!")
if process.stdout:
print(process.stdout)
return process
except FileNotFoundError as e:
print(f"\nError: Command not found. Ensure Python is in your PATH. Details: {e}", file=sys.stderr)
sys.exit(1)
except subprocess.CalledProcessError as e:
print(f"\nError running command: {' '.join(command)}", file=sys.stderr)
print("\n--- STDERR ---", file=sys.stderr)
print(e.stderr, file=sys.stderr)
sys.exit(1)
def process_files(addresses_file, boundaries_file):
"""
Runs the processing script to generate the 'Final' CSV.
Returns the path to the generated file or exits on error.
"""
print("\n--- Step 1: Processing territory files ---")
# Check if input files exist
if not os.path.exists(addresses_file):
print(f"Error: Address file not found at '{addresses_file}'", file=sys.stderr)
sys.exit(1)
if not os.path.exists(boundaries_file):
print(f"Error: Boundaries file not found at '{boundaries_file}'", file=sys.stderr)
sys.exit(1)
process_command = ["python", "process_territories.py", addresses_file, boundaries_file]
run_script(process_command)
# Find the most recently modified "Final.csv"
try:
final_files = [f for f in os.listdir('.') if "Final.csv" in f and os.path.isfile(f)]
if not final_files:
print("Error: No 'Final.csv' file found after processing.", file=sys.stderr)
sys.exit(1)
latest_file = max(final_files, key=os.path.getmtime)
print(f"Generated file: {latest_file}")
return latest_file
except Exception as e:
print(f"Error locating processed file: {e}", file=sys.stderr)
sys.exit(1)
def run_analysis(processed_file_path):
"""
Runs the analysis scripts on the processed file.
"""
if not processed_file_path or not os.path.exists(processed_file_path):
print(f"\nError: Processed file not found at '{processed_file_path}'. Please run the 'process' step first.", file=sys.stderr)
sys.exit(1)
print("\n--- Step 2: Running analysis scripts ---")
analysis_command = ["python", "analysis.py", processed_file_path]
run_script(analysis_command)
category_analysis_command = ["python", "category_analysis.py", processed_file_path]
run_script(category_analysis_command)
print("\nAnalysis complete!")
print("Generated files: analysis.md, map.html, category_map.html")
def main():
"""Parses command-line arguments and orchestrates the workflow."""
parser = argparse.ArgumentParser(description="Territory Analysis Tool")
subparsers = parser.add_subparsers(dest="command", required=True, help="Available commands")
# Sub-command for 'process'
parser_process = subparsers.add_parser("process", help="Step 1: Process raw address and boundary files into a final CSV.")
parser_process.add_argument("--addresses", required=True, help="Path to the addresses CSV file.")
parser_process.add_argument("--boundaries", required=True, help="Path to the boundaries CSV file.")
# Sub-command for 'analyze'
parser_analyze = subparsers.add_parser("analyze", help="Step 2: Run analysis on a processed 'Final' CSV file.")
parser_analyze.add_argument("--input", required=True, help="Path to the processed 'Final' CSV file.")
# Sub-command for 'full-run'
parser_full_run = subparsers.add_parser("full-run", help="Run the full pipeline: process and then analyze.")
parser_full_run.add_argument("--addresses", required=True, help="Path to the addresses CSV file.")
parser_full_run.add_argument("--boundaries", required=True, help="Path to the boundaries CSV file.")
args = parser.parse_args()
if args.command == "process":
process_files(args.addresses, args.boundaries)
elif args.command == "analyze":
run_analysis(args.input)
elif args.command == "full-run":
# Run step 1
processed_file = process_files(args.addresses, args.boundaries)
# Run step 2
run_analysis(processed_file)
if __name__ == "__main__":
# Change working directory to the script's directory
# This makes file paths relative to the script's location
os.chdir(os.path.dirname(os.path.abspath(__file__)))
main()