Module: Gitlab::GithubImport::ParallelScheduling
- Includes:
- JobDelayCalculator
- Included in:
- Importer::Attachments::BaseImporter, Importer::CollaboratorsImporter, Importer::DiffNotesImporter, Importer::IssuesImporter, Importer::LfsObjectsImporter, Importer::NotesImporter, Importer::ProtectedBranchesImporter, Importer::PullRequestsImporter, Importer::SingleEndpointDiffNotesImporter, Importer::SingleEndpointIssueEventsImporter
- Defined in:
- lib/gitlab/github_import/parallel_scheduling.rb
Constant Summary collapse
- ALREADY_IMPORTED_CACHE_KEY =
The base cache key to use for tracking already imported objects.
'github-importer/already-imported/%{project}/%{collection}'
- JOB_WAITER_CACHE_KEY =
The base cache key to use for storing job waiter key
'github-importer/job-waiter/%{project}/%{collection}'
- JOB_WAITER_REMAINING_CACHE_KEY =
The base cache key to use for storing job waiter remaining jobs
'github-importer/job-waiter-remaining/%{project}/%{collection}'
Instance Attribute Summary collapse
-
#already_imported_cache_key ⇒ Object
readonly
Returns the value of attribute already_imported_cache_key.
-
#client ⇒ Object
readonly
Returns the value of attribute client.
-
#enqueued_job_counter ⇒ Object
Returns the value of attribute enqueued_job_counter.
-
#job_started_at ⇒ Object
Returns the value of attribute job_started_at.
-
#job_waiter_cache_key ⇒ Object
readonly
Returns the value of attribute job_waiter_cache_key.
-
#job_waiter_remaining_cache_key ⇒ Object
readonly
Returns the value of attribute job_waiter_remaining_cache_key.
-
#page_keyset ⇒ Object
readonly
Returns the value of attribute page_keyset.
-
#project ⇒ Object
readonly
Returns the value of attribute project.
Instance Method Summary collapse
- #abort_on_failure ⇒ Object
-
#already_imported?(object) ⇒ Boolean
Returns true if the given object has already been imported, false otherwise.
-
#collection_method ⇒ Object
The name of the method to call to retrieve the data to import.
-
#collection_options ⇒ Object
Any options to be passed to the method used for retrieving the data to import.
-
#each_object_to_import ⇒ Object
The method that will be called for traversing through all the objects to import, yielding them to the supplied block.
- #execute ⇒ Object
-
#id_for_already_imported_cache(object) ⇒ Object
Returns the ID to use for the cache used for checking if an object has already been imported or not.
-
#importer_class ⇒ Object
The class to use for importing objects when importing them sequentially.
- #increment_object_counter?(_object) ⇒ Boolean
-
#initialize(project, client, parallel: true) ⇒ Object
project - An instance of ‘Project`.
-
#mark_as_imported(object) ⇒ Object
Marks the given object as “already imported”.
- #object_type ⇒ Object
- #parallel? ⇒ Boolean
-
#parallel_import ⇒ Object
Imports all objects in parallel by scheduling a Sidekiq job for every individual object.
-
#representation_class ⇒ Object
The class used for converting API responses to Hashes when performing the import.
-
#sequential_import ⇒ Object
Imports all the objects in sequence in the current thread.
-
#sidekiq_worker_class ⇒ Object
The Sidekiq worker class used for scheduling the importing of objects in parallel.
- #spread_parallel_import ⇒ Object
Methods included from JobDelayCalculator
Instance Attribute Details
#already_imported_cache_key ⇒ Object (readonly)
Returns the value of attribute already_imported_cache_key.
8 9 10 |
# File 'lib/gitlab/github_import/parallel_scheduling.rb', line 8 def already_imported_cache_key @already_imported_cache_key end |
#client ⇒ Object (readonly)
Returns the value of attribute client.
8 9 10 |
# File 'lib/gitlab/github_import/parallel_scheduling.rb', line 8 def client @client end |
#enqueued_job_counter ⇒ Object
Returns the value of attribute enqueued_job_counter.
11 12 13 |
# File 'lib/gitlab/github_import/parallel_scheduling.rb', line 11 def enqueued_job_counter @enqueued_job_counter end |
#job_started_at ⇒ Object
Returns the value of attribute job_started_at.
11 12 13 |
# File 'lib/gitlab/github_import/parallel_scheduling.rb', line 11 def job_started_at @job_started_at end |
#job_waiter_cache_key ⇒ Object (readonly)
Returns the value of attribute job_waiter_cache_key.
8 9 10 |
# File 'lib/gitlab/github_import/parallel_scheduling.rb', line 8 def job_waiter_cache_key @job_waiter_cache_key end |
#job_waiter_remaining_cache_key ⇒ Object (readonly)
Returns the value of attribute job_waiter_remaining_cache_key.
8 9 10 |
# File 'lib/gitlab/github_import/parallel_scheduling.rb', line 8 def job_waiter_remaining_cache_key @job_waiter_remaining_cache_key end |
#page_keyset ⇒ Object (readonly)
Returns the value of attribute page_keyset.
8 9 10 |
# File 'lib/gitlab/github_import/parallel_scheduling.rb', line 8 def page_keyset @page_keyset end |
#project ⇒ Object (readonly)
Returns the value of attribute project.
8 9 10 |
# File 'lib/gitlab/github_import/parallel_scheduling.rb', line 8 def project @project end |
Instance Method Details
#abort_on_failure ⇒ Object
196 197 198 |
# File 'lib/gitlab/github_import/parallel_scheduling.rb', line 196 def abort_on_failure false end |
#already_imported?(object) ⇒ Boolean
Returns true if the given object has already been imported, false otherwise.
object - The object to check.
149 150 151 152 153 |
# File 'lib/gitlab/github_import/parallel_scheduling.rb', line 149 def already_imported?(object) id = id_for_already_imported_cache(object) Gitlab::Cache::Import::Caching.set_includes?(already_imported_cache_key, id) end |
#collection_method ⇒ Object
The name of the method to call to retrieve the data to import.
192 193 194 |
# File 'lib/gitlab/github_import/parallel_scheduling.rb', line 192 def collection_method raise NotImplementedError end |
#collection_options ⇒ Object
Any options to be passed to the method used for retrieving the data to import.
202 203 204 |
# File 'lib/gitlab/github_import/parallel_scheduling.rb', line 202 def {} end |
#each_object_to_import ⇒ Object
The method that will be called for traversing through all the objects to import, yielding them to the supplied block.
114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 |
# File 'lib/gitlab/github_import/parallel_scheduling.rb', line 114 def each_object_to_import repo = project.import_source # URL to resume the pagination from in case the job is interrupted. resume_url = page_keyset.current client.each_page(collection_method, resume_url, repo, ) do |page| page.objects.each do |object| object = object.to_h next if already_imported?(object) if increment_object_counter?(object) Gitlab::GithubImport::ObjectCounter.increment(project, object_type, :fetched) end yield object # We mark the object as imported immediately so we don't end up # scheduling it multiple times. mark_as_imported(object) end page_keyset.set(page.url) end end |
#execute ⇒ Object
42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 |
# File 'lib/gitlab/github_import/parallel_scheduling.rb', line 42 def execute info(project.id, message: "starting importer") retval = if parallel? parallel_import else sequential_import end # Once we have completed all work we can remove our "already exists" # cache so we don't put too much pressure on Redis. # # We don't immediately remove it since it's technically possible for # other instances of this job to still run, instead we set the # expiration time to a lower value. This prevents the other jobs from # still scheduling duplicates while. Since all work has already been # completed those jobs will just cycle through any remaining pages while # not scheduling anything. Gitlab::Cache::Import::Caching.expire(already_imported_cache_key, Gitlab::Cache::Import::Caching::SHORTER_TIMEOUT) info(project.id, message: "importer finished") retval rescue StandardError => e Gitlab::Import::ImportFailureService.track( project_id: project.id, error_source: self.class.name, exception: e, fail_import: abort_on_failure, metrics: true ) raise(e) end |
#id_for_already_imported_cache(object) ⇒ Object
Returns the ID to use for the cache used for checking if an object has already been imported or not.
object - The object we may want to import.
170 171 172 |
# File 'lib/gitlab/github_import/parallel_scheduling.rb', line 170 def id_for_already_imported_cache(object) raise NotImplementedError end |
#importer_class ⇒ Object
The class to use for importing objects when importing them sequentially.
181 182 183 |
# File 'lib/gitlab/github_import/parallel_scheduling.rb', line 181 def importer_class raise NotImplementedError end |
#increment_object_counter?(_object) ⇒ Boolean
141 142 143 |
# File 'lib/gitlab/github_import/parallel_scheduling.rb', line 141 def increment_object_counter?(_object) true end |
#initialize(project, client, parallel: true) ⇒ Object
project - An instance of ‘Project`. client - An instance of `Gitlab::GithubImport::Client`. parallel - When set to true the objects will be imported in parallel.
26 27 28 29 30 31 32 33 34 35 36 |
# File 'lib/gitlab/github_import/parallel_scheduling.rb', line 26 def initialize(project, client, parallel: true) @project = project @client = client @parallel = parallel @page_keyset = Gitlab::Import::PageKeyset.new(project, collection_method, ::Import::SOURCE_GITHUB) @already_imported_cache_key = format(ALREADY_IMPORTED_CACHE_KEY, project: project.id, collection: collection_method) @job_waiter_cache_key = format(JOB_WAITER_CACHE_KEY, project: project.id, collection: collection_method) @job_waiter_remaining_cache_key = format(JOB_WAITER_REMAINING_CACHE_KEY, project: project.id, collection: collection_method) end |
#mark_as_imported(object) ⇒ Object
Marks the given object as “already imported”.
156 157 158 159 160 |
# File 'lib/gitlab/github_import/parallel_scheduling.rb', line 156 def mark_as_imported(object) id = id_for_already_imported_cache(object) Gitlab::Cache::Import::Caching.set_add(already_imported_cache_key, id) end |
#object_type ⇒ Object
162 163 164 |
# File 'lib/gitlab/github_import/parallel_scheduling.rb', line 162 def object_type raise NotImplementedError end |
#parallel? ⇒ Boolean
38 39 40 |
# File 'lib/gitlab/github_import/parallel_scheduling.rb', line 38 def parallel? @parallel end |
#parallel_import ⇒ Object
Imports all objects in parallel by scheduling a Sidekiq job for every individual object.
89 90 91 92 93 |
# File 'lib/gitlab/github_import/parallel_scheduling.rb', line 89 def parallel_import raise 'Batch settings must be defined for parallel import' if parallel_import_batch.blank? spread_parallel_import end |
#representation_class ⇒ Object
The class used for converting API responses to Hashes when performing the import.
176 177 178 |
# File 'lib/gitlab/github_import/parallel_scheduling.rb', line 176 def representation_class raise NotImplementedError end |
#sequential_import ⇒ Object
Imports all the objects in sequence in the current thread.
79 80 81 82 83 84 85 |
# File 'lib/gitlab/github_import/parallel_scheduling.rb', line 79 def sequential_import each_object_to_import do |object| repr = object_representation(object) importer_class.new(repr, project, client).execute end end |
#sidekiq_worker_class ⇒ Object
The Sidekiq worker class used for scheduling the importing of objects in parallel.
187 188 189 |
# File 'lib/gitlab/github_import/parallel_scheduling.rb', line 187 def sidekiq_worker_class raise NotImplementedError end |
#spread_parallel_import ⇒ Object
95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 |
# File 'lib/gitlab/github_import/parallel_scheduling.rb', line 95 def spread_parallel_import self.job_started_at = Time.current self.enqueued_job_counter = 0 each_object_to_import do |object| repr = object_representation(object) sidekiq_worker_class.perform_in(job_delay, project.id, repr.to_hash.deep_stringify_keys, job_waiter.key.to_s) self.enqueued_job_counter += 1 job_waiter.jobs_remaining = Gitlab::Cache::Import::Caching.increment(job_waiter_remaining_cache_key) end job_waiter end |