Module: Gitlab::GithubImport::ParallelScheduling

Constant Summary collapse

ALREADY_IMPORTED_CACHE_KEY =

The base cache key to use for tracking already imported objects.

'github-importer/already-imported/%{project}/%{collection}'
JOB_WAITER_CACHE_KEY =

The base cache key to use for storing job waiter key

'github-importer/job-waiter/%{project}/%{collection}'
JOB_WAITER_REMAINING_CACHE_KEY =

The base cache key to use for storing job waiter remaining jobs

'github-importer/job-waiter-remaining/%{project}/%{collection}'

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods included from JobDelayCalculator

#parallel_import_batch

Instance Attribute Details

#already_imported_cache_keyObject (readonly)

Returns the value of attribute already_imported_cache_key.



8
9
10
# File 'lib/gitlab/github_import/parallel_scheduling.rb', line 8

def already_imported_cache_key
  @already_imported_cache_key
end

#clientObject (readonly)

Returns the value of attribute client.



8
9
10
# File 'lib/gitlab/github_import/parallel_scheduling.rb', line 8

def client
  @client
end

#enqueued_job_counterObject

Returns the value of attribute enqueued_job_counter.



11
12
13
# File 'lib/gitlab/github_import/parallel_scheduling.rb', line 11

def enqueued_job_counter
  @enqueued_job_counter
end

#job_started_atObject

Returns the value of attribute job_started_at.



11
12
13
# File 'lib/gitlab/github_import/parallel_scheduling.rb', line 11

def job_started_at
  @job_started_at
end

#job_waiter_cache_keyObject (readonly)

Returns the value of attribute job_waiter_cache_key.



8
9
10
# File 'lib/gitlab/github_import/parallel_scheduling.rb', line 8

def job_waiter_cache_key
  @job_waiter_cache_key
end

#job_waiter_remaining_cache_keyObject (readonly)

Returns the value of attribute job_waiter_remaining_cache_key.



8
9
10
# File 'lib/gitlab/github_import/parallel_scheduling.rb', line 8

def job_waiter_remaining_cache_key
  @job_waiter_remaining_cache_key
end

#page_counterObject (readonly)

Returns the value of attribute page_counter.



8
9
10
# File 'lib/gitlab/github_import/parallel_scheduling.rb', line 8

def page_counter
  @page_counter
end

#projectObject (readonly)

Returns the value of attribute project.



8
9
10
# File 'lib/gitlab/github_import/parallel_scheduling.rb', line 8

def project
  @project
end

Instance Method Details

#abort_on_failureObject



205
206
207
# File 'lib/gitlab/github_import/parallel_scheduling.rb', line 205

def abort_on_failure
  false
end

#already_imported?(object) ⇒ Boolean

Returns true if the given object has already been imported, false otherwise.

object - The object to check.

Returns:

  • (Boolean)


158
159
160
161
162
# File 'lib/gitlab/github_import/parallel_scheduling.rb', line 158

def already_imported?(object)
  id = id_for_already_imported_cache(object)

  Gitlab::Cache::Import::Caching.set_includes?(already_imported_cache_key, id)
end

#collection_methodObject

The name of the method to call to retrieve the data to import.

Raises:

  • (NotImplementedError)


201
202
203
# File 'lib/gitlab/github_import/parallel_scheduling.rb', line 201

def collection_method
  raise NotImplementedError
end

#collection_optionsObject

Any options to be passed to the method used for retrieving the data to import.



211
212
213
# File 'lib/gitlab/github_import/parallel_scheduling.rb', line 211

def collection_options
  {}
end

#each_object_to_importObject

The method that will be called for traversing through all the objects to import, yielding them to the supplied block.



114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
# File 'lib/gitlab/github_import/parallel_scheduling.rb', line 114

def each_object_to_import
  repo = project.import_source

  # We inject the page number here to make sure that all importers always
  # start where they left off. Simply starting over wouldn't work for
  # repositories with a lot of data (e.g. tens of thousands of comments).
  options = collection_options.merge(page: page_counter.current)

  client.each_page(collection_method, repo, options) do |page|
    # Technically it's possible that the same work is performed multiple
    # times, as Sidekiq doesn't guarantee there will ever only be one
    # instance of a job. In such a scenario it's possible for one job to
    # have a lower page number (e.g. 5) compared to another (e.g. 10). In
    # this case we skip over all the objects until we have caught up,
    # reducing the number of duplicate jobs scheduled by the provided
    # block.
    next unless page_counter.set(page.number)

    page.objects.each do |object|
      object = object.to_h

      next if already_imported?(object)

      if increment_object_counter?(object)
        Gitlab::GithubImport::ObjectCounter.increment(project, object_type, :fetched)
      end

      yield object

      # We mark the object as imported immediately so we don't end up
      # scheduling it multiple times.
      mark_as_imported(object)
    end
  end
end

#executeObject



42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
# File 'lib/gitlab/github_import/parallel_scheduling.rb', line 42

def execute
  info(project.id, message: "starting importer")

  retval =
    if parallel?
      parallel_import
    else
      sequential_import
    end

  # Once we have completed all work we can remove our "already exists"
  # cache so we don't put too much pressure on Redis.
  #
  # We don't immediately remove it since it's technically possible for
  # other instances of this job to still run, instead we set the
  # expiration time to a lower value. This prevents the other jobs from
  # still scheduling duplicates while. Since all work has already been
  # completed those jobs will just cycle through any remaining pages while
  # not scheduling anything.
  Gitlab::Cache::Import::Caching.expire(already_imported_cache_key,
    Gitlab::Cache::Import::Caching::SHORTER_TIMEOUT)
  info(project.id, message: "importer finished")

  retval
rescue StandardError => e
  Gitlab::Import::ImportFailureService.track(
    project_id: project.id,
    error_source: self.class.name,
    exception: e,
    fail_import: abort_on_failure,
    metrics: true
  )

  raise(e)
end

#id_for_already_imported_cache(object) ⇒ Object

Returns the ID to use for the cache used for checking if an object has already been imported or not.

object - The object we may want to import.

Raises:

  • (NotImplementedError)


179
180
181
# File 'lib/gitlab/github_import/parallel_scheduling.rb', line 179

def id_for_already_imported_cache(object)
  raise NotImplementedError
end

#importer_classObject

The class to use for importing objects when importing them sequentially.

Raises:

  • (NotImplementedError)


190
191
192
# File 'lib/gitlab/github_import/parallel_scheduling.rb', line 190

def importer_class
  raise NotImplementedError
end

#increment_object_counter?(_object) ⇒ Boolean

Returns:

  • (Boolean)


150
151
152
# File 'lib/gitlab/github_import/parallel_scheduling.rb', line 150

def increment_object_counter?(_object)
  true
end

#initialize(project, client, parallel: true) ⇒ Object

project - An instance of ‘Project`. client - An instance of `Gitlab::GithubImport::Client`. parallel - When set to true the objects will be imported in parallel.



26
27
28
29
30
31
32
33
34
35
36
# File 'lib/gitlab/github_import/parallel_scheduling.rb', line 26

def initialize(project, client, parallel: true)
  @project = project
  @client = client
  @parallel = parallel
  @page_counter = Gitlab::Import::PageCounter.new(project, collection_method)
  @already_imported_cache_key = format(ALREADY_IMPORTED_CACHE_KEY, project: project.id,
    collection: collection_method)
  @job_waiter_cache_key = format(JOB_WAITER_CACHE_KEY, project: project.id, collection: collection_method)
  @job_waiter_remaining_cache_key = format(JOB_WAITER_REMAINING_CACHE_KEY, project: project.id,
    collection: collection_method)
end

#mark_as_imported(object) ⇒ Object

Marks the given object as “already imported”.



165
166
167
168
169
# File 'lib/gitlab/github_import/parallel_scheduling.rb', line 165

def mark_as_imported(object)
  id = id_for_already_imported_cache(object)

  Gitlab::Cache::Import::Caching.set_add(already_imported_cache_key, id)
end

#object_typeObject

Raises:

  • (NotImplementedError)


171
172
173
# File 'lib/gitlab/github_import/parallel_scheduling.rb', line 171

def object_type
  raise NotImplementedError
end

#parallel?Boolean

Returns:

  • (Boolean)


38
39
40
# File 'lib/gitlab/github_import/parallel_scheduling.rb', line 38

def parallel?
  @parallel
end

#parallel_importObject

Imports all objects in parallel by scheduling a Sidekiq job for every individual object.



89
90
91
92
93
# File 'lib/gitlab/github_import/parallel_scheduling.rb', line 89

def parallel_import
  raise 'Batch settings must be defined for parallel import' if parallel_import_batch.blank?

  spread_parallel_import
end

#representation_classObject

The class used for converting API responses to Hashes when performing the import.

Raises:

  • (NotImplementedError)


185
186
187
# File 'lib/gitlab/github_import/parallel_scheduling.rb', line 185

def representation_class
  raise NotImplementedError
end

#sequential_importObject

Imports all the objects in sequence in the current thread.



79
80
81
82
83
84
85
# File 'lib/gitlab/github_import/parallel_scheduling.rb', line 79

def sequential_import
  each_object_to_import do |object|
    repr = object_representation(object)

    importer_class.new(repr, project, client).execute
  end
end

#sidekiq_worker_classObject

The Sidekiq worker class used for scheduling the importing of objects in parallel.

Raises:

  • (NotImplementedError)


196
197
198
# File 'lib/gitlab/github_import/parallel_scheduling.rb', line 196

def sidekiq_worker_class
  raise NotImplementedError
end

#spread_parallel_importObject



95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
# File 'lib/gitlab/github_import/parallel_scheduling.rb', line 95

def spread_parallel_import
  self.job_started_at = Time.current
  self.enqueued_job_counter = 0

  each_object_to_import do |object|
    repr = object_representation(object)

    sidekiq_worker_class.perform_in(job_delay, project.id, repr.to_hash.deep_stringify_keys, job_waiter.key.to_s)

    self.enqueued_job_counter += 1

    job_waiter.jobs_remaining = Gitlab::Cache::Import::Caching.increment(job_waiter_remaining_cache_key)
  end

  job_waiter
end