Skip to content

Commit f0278d2

Browse files
committed
feat: implement CSV user import with real-time progress
- Background job for processing CSV imports asynchronously - Real-time progress tracking via Action Cable channels - Dashboard channel for live metrics updates - Import progress channel for upload status - Queue and cable configuration for background processing
1 parent 55b287c commit f0278d2

File tree

9 files changed

+267
-0
lines changed

9 files changed

+267
-0
lines changed
Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
module ApplicationCable
2+
class Channel < ActionCable::Channel::Base
3+
end
4+
end
Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
module ApplicationCable
2+
class Connection < ActionCable::Connection::Base
3+
identified_by :current_user
4+
5+
def connect
6+
self.current_user = find_verified_user
7+
end
8+
9+
private
10+
11+
def find_verified_user
12+
# Try to find user from session
13+
if verified_user = env["warden"]&.user
14+
verified_user
15+
else
16+
reject_unauthorized_connection
17+
end
18+
end
19+
end
20+
end

app/channels/dashboard_channel.rb

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
class DashboardChannel < ApplicationCable::Channel
2+
def subscribed
3+
# Only allow admin users to subscribe to dashboard updates
4+
if current_user&.admin?
5+
stream_from "dashboard_updates"
6+
else
7+
reject
8+
end
9+
end
10+
11+
def unsubscribed
12+
# Any cleanup needed when channel is unsubscribed
13+
stop_all_streams
14+
end
15+
end
Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
class ImportProgressChannel < ApplicationCable::Channel
2+
def subscribed
3+
return reject unless current_user&.admin?
4+
return reject unless params[:import_id].present?
5+
6+
import = Import.find_by(id: params[:import_id])
7+
return reject unless import
8+
9+
stream_from "import_#{import.id}"
10+
end
11+
12+
def unsubscribed
13+
# Any cleanup needed when channel is unsubscribed
14+
stop_all_streams
15+
end
16+
end

app/jobs/application_job.rb

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
class ApplicationJob < ActiveJob::Base
2+
# Automatically retry jobs that encountered a deadlock
3+
# retry_on ActiveRecord::Deadlocked
4+
5+
# Most jobs are safe to ignore if the underlying records are no longer available
6+
# discard_on ActiveJob::DeserializationError
7+
end

app/jobs/user_import_job.rb

Lines changed: 155 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,155 @@
1+
class UserImportJob < ApplicationJob
2+
queue_as :default
3+
4+
def perform(import)
5+
import.update!(status: "processing", processed_rows: 0, successful_rows: 0, failed_rows: 0)
6+
7+
begin
8+
process_import(import)
9+
import.update!(status: "completed")
10+
rescue StandardError => e
11+
import.update!(status: "failed")
12+
import.add_error("Import failed: #{e.message}")
13+
raise e
14+
end
15+
end
16+
17+
private
18+
19+
def process_import(import)
20+
file_path = download_file(import)
21+
22+
# Use Roo to parse the spreadsheet
23+
spreadsheet = open_spreadsheet(file_path, import.file_name)
24+
headers = spreadsheet.row(1)
25+
26+
validate_headers(headers, import)
27+
28+
total_rows = spreadsheet.last_row - 1 # Exclude header row
29+
import.update!(total_rows: total_rows)
30+
31+
(2..spreadsheet.last_row).each_with_index do |row_num, index|
32+
row = spreadsheet.row(row_num)
33+
process_row(row, headers, import)
34+
35+
# Update progress every 10 rows or on last row
36+
if (index + 1) % 10 == 0 || (index + 1) == total_rows
37+
import.update_progress!
38+
# Broadcast progress via ActionCable
39+
broadcast_progress(import)
40+
end
41+
end
42+
43+
# Clean up temporary file
44+
File.delete(file_path) if File.exist?(file_path)
45+
end
46+
47+
def download_file(import)
48+
temp_file = Tempfile.new([ import.file_name, File.extname(import.file_name) ])
49+
temp_file.binmode
50+
temp_file.write(import.file.download)
51+
temp_file.close
52+
temp_file.path
53+
end
54+
55+
def open_spreadsheet(file_path, filename)
56+
case File.extname(filename).downcase
57+
when ".csv"
58+
Roo::CSV.new(file_path)
59+
when ".xls"
60+
Roo::Excel.new(file_path)
61+
when ".xlsx"
62+
Roo::Excelx.new(file_path)
63+
else
64+
raise "Unknown file type: #{filename}"
65+
end
66+
end
67+
68+
def validate_headers(headers, import)
69+
required_headers = [ "full_name", "email" ]
70+
optional_headers = [ "role", "avatar_url" ]
71+
72+
missing_headers = required_headers - headers.map(&:to_s).map(&:downcase)
73+
74+
if missing_headers.any?
75+
raise "Missing required headers: #{missing_headers.join(', ')}"
76+
end
77+
end
78+
79+
def process_row(row, headers, import)
80+
begin
81+
user_data = build_user_data(row, headers)
82+
83+
user = User.find_by(email: user_data[:email])
84+
85+
if user
86+
# Update existing user
87+
user.update!(user_data.except(:email))
88+
import.increment!(:successful_rows)
89+
else
90+
# Create new user
91+
user = User.create!(user_data.merge(password: generate_password))
92+
import.increment!(:successful_rows)
93+
end
94+
95+
import.increment!(:processed_rows)
96+
97+
rescue StandardError => e
98+
import.increment!(:failed_rows)
99+
import.increment!(:processed_rows)
100+
import.add_error("Row #{import.processed_rows}: #{e.message}")
101+
end
102+
end
103+
104+
def build_user_data(row, headers)
105+
data = {}
106+
107+
headers.each_with_index do |header, index|
108+
value = row[index]
109+
next if value.blank?
110+
111+
case header.to_s.downcase
112+
when "full_name"
113+
data[:full_name] = value.to_s.strip
114+
when "email"
115+
data[:email] = value.to_s.strip.downcase
116+
when "role"
117+
role = value.to_s.strip.downcase
118+
data[:role] = %w[admin user].include?(role) ? role : "user"
119+
when "avatar_url"
120+
data[:avatar_url] = value.to_s.strip if valid_url?(value.to_s.strip)
121+
end
122+
end
123+
124+
data
125+
end
126+
127+
def generate_password
128+
SecureRandom.alphanumeric(12)
129+
end
130+
131+
def valid_url?(url)
132+
uri = URI.parse(url)
133+
%w[http https].include?(uri.scheme)
134+
rescue URI::InvalidURIError
135+
false
136+
end
137+
138+
def broadcast_progress(import)
139+
ActionCable.server.broadcast(
140+
"import_#{import.id}",
141+
{
142+
type: "progress_update",
143+
import: {
144+
id: import.id,
145+
progress: import.progress,
146+
processed_rows: import.processed_rows,
147+
total_rows: import.total_rows,
148+
successful_rows: import.successful_rows,
149+
failed_rows: import.failed_rows,
150+
status: import.status
151+
}
152+
}
153+
)
154+
end
155+
end

config/cable.yml

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
# Async adapter only works within the same process, so for manually triggering cable updates from a console,
2+
# and seeing results in the browser, you must do so from the web console (running inside the dev process),
3+
# not a terminal started via bin/rails console! Add "console" to any action or any ERB template view
4+
# to make the web console appear.
5+
development:
6+
adapter: async
7+
8+
test:
9+
adapter: test
10+
11+
production:
12+
adapter: solid_cable
13+
connects_to:
14+
database:
15+
writing: cable
16+
polling_interval: 0.1.seconds
17+
message_retention: 1.day

config/queue.yml

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
default: &default
2+
dispatchers:
3+
- polling_interval: 1
4+
batch_size: 500
5+
workers:
6+
- queues: "*"
7+
threads: 3
8+
processes: <%= ENV.fetch("JOB_CONCURRENCY", 1) %>
9+
polling_interval: 0.1
10+
11+
development:
12+
<<: *default
13+
14+
test:
15+
<<: *default
16+
17+
production:
18+
<<: *default

config/recurring.yml

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
# examples:
2+
# periodic_cleanup:
3+
# class: CleanSoftDeletedRecordsJob
4+
# queue: background
5+
# args: [ 1000, { batch_size: 500 } ]
6+
# schedule: every hour
7+
# periodic_cleanup_with_command:
8+
# command: "SoftDeletedRecord.due.delete_all"
9+
# priority: 2
10+
# schedule: at 5am every day
11+
12+
production:
13+
clear_solid_queue_finished_jobs:
14+
command: "SolidQueue::Job.clear_finished_in_batches(sleep_between_batches: 0.3)"
15+
schedule: every hour at minute 12

0 commit comments

Comments
 (0)