diff --git a/projects/rdc/include/rdc_modules/rdc_rocp/RdcRocpBase.h b/projects/rdc/include/rdc_modules/rdc_rocp/RdcRocpBase.h index cb36fc71a0..9ff816cfd1 100644 --- a/projects/rdc/include/rdc_modules/rdc_rocp/RdcRocpBase.h +++ b/projects/rdc/include/rdc_modules/rdc_rocp/RdcRocpBase.h @@ -58,6 +58,22 @@ class RdcRocpBase { */ rdc_status_t rocp_lookup(rdc_gpu_field_t gpu_field, rdc_field_value_data* value, rdc_field_type_t* type); + + /** + * @brief Bulk lookup of multiple ROCProfiler counters for a single GPU + * + * @param[in] fields Vector of fields to lookup (all for the same GPU) + * @param[out] values Vector to be populated with returned values + * @param[out] types Vector to be populated with returned types + * @param[out] statuses Vector to be populated with status for each field + * + * @retval ::RDC_ST_OK The function has been executed successfully. + */ + rdc_status_t rocp_lookup_bulk(const std::vector& fields, + std::vector& values, + std::vector& types, + std::vector& statuses); + const char* get_field_id_from_name(rdc_field_t); const std::vector get_field_ids(); @@ -98,6 +114,25 @@ class RdcRocpBase { RDC_FI_PROF_EVAL_FLOPS_32_PERCENT, RDC_FI_PROF_EVAL_FLOPS_64_PERCENT, }; + /** + * @brief Apply field-specific transformations to raw profiler values + * + * @param[in] field Field ID to transform + * @param[in] agent_index Index of the agent/GPU + * @param[in] raw_value Raw value from profiler + * @param[in] elapsed_time_ms Elapsed time in milliseconds (for eval fields) + * @param[in] sampled_values Map of all sampled values (for fields needing multiple metrics) + * @param[out] output Transformed output value + * @param[out] type Output type + * + * @retval ::RDC_ST_OK Transformation successful + */ + rdc_status_t apply_field_transformation(rdc_field_t field, uint32_t agent_index, + double raw_value, double elapsed_time_ms, + const std::map& sampled_values, + rdc_field_value_data* output, + rdc_field_type_t* type); + /** * @brief Convert from profiler status into RDC status */ diff --git a/projects/rdc/include/rdc_modules/rdc_rocp/RdcRocpCounterSampler.h b/projects/rdc/include/rdc_modules/rdc_rocp/RdcRocpCounterSampler.h index 8d87a7c637..c7c579eca9 100644 --- a/projects/rdc/include/rdc_modules/rdc_rocp/RdcRocpCounterSampler.h +++ b/projects/rdc/include/rdc_modules/rdc_rocp/RdcRocpCounterSampler.h @@ -55,6 +55,21 @@ class CounterSampler { rocprofiler_agent_id_t get_agent() const { return agent_; } + // Profile set for greedy packing + struct ProfileSet { + struct Profile { + rocprofiler_counter_config_id_t config; + std::vector counter_names; + size_t expected_size; + }; + std::vector profiles; + }; + + // Sample multiple counters using greedy packing to minimize profiles + void sample_counters_with_packing(const std::vector& counters, + std::map& out_values, + uint64_t duration); + // Get the supported counters for an agent static std::unordered_map get_supported_counters( rocprofiler_agent_id_t agent); @@ -71,6 +86,7 @@ class CounterSampler { std::map, rocprofiler_counter_config_id_t> cached_counter_; std::map counter_sizes_; + std::map, ProfileSet> cached_profile_sets_; // Internal function used to set the profile for the agent when start_context is called void set_profile(rocprofiler_context_id_t ctx, rocprofiler_device_counting_agent_cb_t cb) const; @@ -82,6 +98,9 @@ class CounterSampler { std::vector get_counter_dimensions( rocprofiler_counter_id_t counter); + // Create profiles using greedy packing algorithm + ProfileSet create_profiles_for_counters(const std::vector& counters); + static std::vector> samplers_; }; diff --git a/projects/rdc/rdc_libs/rdc_modules/rdc_rocp/RdcRocpBase.cc b/projects/rdc/rdc_libs/rdc_modules/rdc_rocp/RdcRocpBase.cc index 63b1d010d6..e86905cfc5 100644 --- a/projects/rdc/rdc_libs/rdc_modules/rdc_rocp/RdcRocpBase.cc +++ b/projects/rdc/rdc_libs/rdc_modules/rdc_rocp/RdcRocpBase.cc @@ -340,19 +340,171 @@ rdc_status_t RdcRocpBase::rocp_lookup(rdc_gpu_field_t gpu_field, rdc_field_value init_rocp_if_not(); - const bool is_eval_field = (eval_fields.find(field) != eval_fields.end()); - const auto start_time = std::chrono::high_resolution_clock::now(); // direct read from rocprofiler const double read_dbl = run_profiler(agent_index, field); const auto stop_time = std::chrono::high_resolution_clock::now(); const double elapsed = std::chrono::duration(stop_time - start_time).count(); - // divide by elapsed time if needed - double divided_dbl = NAN; + // For OCC_ELAPSED, we need to read the occupancy metric as well + std::map sampled_values; + if (field == RDC_FI_PROF_OCC_ELAPSED) { + const double occupancy_val = run_profiler(agent_index, RDC_FI_PROF_OCC_PER_ACTIVE_CU); + auto occ_field_it = field_to_metric.find(RDC_FI_PROF_OCC_PER_ACTIVE_CU); + if (occ_field_it != field_to_metric.end()) { + sampled_values[occ_field_it->second] = occupancy_val; + } + } + + // Apply field transformations using the helper function + return apply_field_transformation(field, agent_index, read_dbl, elapsed, sampled_values, data, + type); +} + +rdc_status_t RdcRocpBase::rocp_lookup_bulk(const std::vector& fields, + std::vector& values, + std::vector& types, + std::vector& statuses) { + if (fields.empty()) { + return RDC_ST_OK; + } + + init_rocp_if_not(); + + // Resize output vectors + values.resize(fields.size()); + types.resize(fields.size()); + statuses.resize(fields.size()); + + // All fields should be for the same GPU + uint32_t agent_index = entity_to_prof_map[fields[0].gpu_index]; + + // Collect all unique metric names needed for sampling + std::vector metrics_to_sample; + std::map field_to_metric_index; // Maps field to position in metrics_to_sample + + for (size_t i = 0; i < fields.size(); i++) { + const auto& field = fields[i].field_id; + types[i] = DOUBLE; // Default type + statuses[i] = RDC_ST_OK; + + // Handle special case: RDC_FI_PROF_KFD_ID doesn't need sampling + if (field == RDC_FI_PROF_KFD_ID) { + types[i] = INTEGER; + values[i].l_int = agents[agent_index].gpu_id; + continue; + } + + // Get metric name for this field + auto field_it = field_to_metric.find(field); + if (field_it == field_to_metric.end()) { + RDC_LOG(RDC_ERROR, "Error: Field " << field << " not found in field_to_metric map."); + statuses[i] = RDC_ST_BAD_PARAMETER; + continue; + } + + const std::string metric_name = field_it->second; + + // Check if we've already added this metric + if (field_to_metric_index.find(field) == field_to_metric_index.end()) { + field_to_metric_index[field] = metrics_to_sample.size(); + metrics_to_sample.push_back(metric_name); + } + + // Special case: RDC_FI_PROF_OCC_ELAPSED needs two metrics + if (field == RDC_FI_PROF_OCC_ELAPSED) { + if (field_to_metric_index.find(RDC_FI_PROF_OCC_PER_ACTIVE_CU) == field_to_metric_index.end()) { + auto occ_field_it = field_to_metric.find(RDC_FI_PROF_OCC_PER_ACTIVE_CU); + if (occ_field_it != field_to_metric.end()) { + field_to_metric_index[RDC_FI_PROF_OCC_PER_ACTIVE_CU] = metrics_to_sample.size(); + metrics_to_sample.push_back(occ_field_it->second); + } + } + } + } + + // Sample all counters at once using greedy packing + std::map sampled_values; + auto counter_sampler = CounterSampler::get_samplers()[agent_index]; + if (!counter_sampler) { + RDC_LOG(RDC_ERROR, "Error: Counter sampler not found for GPU index " << agent_index); + for (size_t i = 0; i < fields.size(); i++) { + statuses[i] = RDC_ST_BAD_PARAMETER; + } + return RDC_ST_BAD_PARAMETER; + } + + const auto start_time = std::chrono::high_resolution_clock::now(); + if (!metrics_to_sample.empty()) { + try { + counter_sampler->sample_counters_with_packing(metrics_to_sample, sampled_values, + collection_duration_us_k); + } catch (const std::exception& e) { + RDC_LOG(RDC_ERROR, "Error while sampling counter values: " << e.what()); + for (size_t i = 0; i < fields.size(); i++) { + if (fields[i].field_id != RDC_FI_PROF_KFD_ID) { + statuses[i] = RDC_ST_BAD_PARAMETER; + } + } + return RDC_ST_BAD_PARAMETER; + } + } + const auto stop_time = std::chrono::high_resolution_clock::now(); + const double elapsed = std::chrono::duration(stop_time - start_time).count(); + + // Process results for each field + for (size_t i = 0; i < fields.size(); i++) { + const auto& field = fields[i].field_id; + + // Skip fields that already have values set (like RDC_FI_PROF_KFD_ID) + if (field == RDC_FI_PROF_KFD_ID) { + continue; + } + + // Skip fields that had errors earlier + if (statuses[i] != RDC_ST_OK) { + continue; + } + + // Get the sampled value for this field + auto field_it = field_to_metric.find(field); + if (field_it == field_to_metric.end()) { + continue; + } + + const std::string& metric_name = field_it->second; + auto sampled_it = sampled_values.find(metric_name); + if (sampled_it == sampled_values.end()) { + RDC_LOG(RDC_ERROR, "Error: Metric " << metric_name << " not found in sampled values."); + statuses[i] = RDC_ST_BAD_PARAMETER; + continue; + } + + double read_dbl = sampled_it->second; + + // Apply field transformation using the helper function + statuses[i] = apply_field_transformation(field, agent_index, read_dbl, elapsed, + sampled_values, &values[i], &types[i]); + } + + return RDC_ST_OK; +} + +rdc_status_t RdcRocpBase::apply_field_transformation( + rdc_field_t field, uint32_t agent_index, double raw_value, double elapsed_time_ms, + const std::map& sampled_values, rdc_field_value_data* output, + rdc_field_type_t* type) { + + // Default type is DOUBLE + *type = DOUBLE; + + const bool is_eval_field = (eval_fields.find(field) != eval_fields.end()); + + // Calculate divided value for eval fields + double divided_dbl = NAN; if (is_eval_field) { - if (elapsed != 0.0) { - divided_dbl = read_dbl / (elapsed / 1000.0); + if (elapsed_time_ms != 0.0) { + divided_dbl = raw_value / (elapsed_time_ms / 1000.0); } else { RDC_LOG(RDC_ERROR, "Error: Elapsed time is zero. Cannot divide by zero."); return RDC_ST_BAD_PARAMETER; @@ -361,61 +513,63 @@ rdc_status_t RdcRocpBase::rocp_lookup(rdc_gpu_field_t gpu_field, rdc_field_value switch (field) { case RDC_FI_PROF_GPU_UTIL_PERCENT: - // RDC_FI_PROF_GPU_UTIL_PERCENT is mapped to GPU_UTIL - // GPU_UTIL metric is available on more GPUs than ENGINE_ACTIVE. - // ENGINE_ACTIVE = GPU_UTIL/100, so do the math ourselves - data->dbl = read_dbl / 100.0; + output->dbl = raw_value / 100.0F; break; + case RDC_FI_PROF_OCC_ELAPSED: { - // RDC_FI_PROF_OCC_ELAPSED is mapped to GRBM_GUI_ACTIVE, the read happens earlier in this - // function - const double active_cycles_val = read_dbl; + const double active_cycles_val = raw_value; if (active_cycles_val != 0.0) { - // read second value from profiler - const double occupancy_val = run_profiler(agent_index, RDC_FI_PROF_OCC_PER_ACTIVE_CU); - data->dbl = occupancy_val / active_cycles_val; + // Look for the occupancy metric in sampled values + auto occ_field_it = field_to_metric.find(RDC_FI_PROF_OCC_PER_ACTIVE_CU); + if (occ_field_it != field_to_metric.end()) { + auto occ_sampled_it = sampled_values.find(occ_field_it->second); + if (occ_sampled_it != sampled_values.end()) { + const double occupancy_val = occ_sampled_it->second; + output->dbl = occupancy_val / active_cycles_val; + } else { + return RDC_ST_BAD_PARAMETER; + } + } else { + return RDC_ST_BAD_PARAMETER; + } } else { return RDC_ST_BAD_PARAMETER; } } break; + case RDC_FI_PROF_EVAL_FLOPS_16_PERCENT: { if (!is_eval_field) { RDC_LOG(RDC_ERROR, "Field expected to be in the eval_fields list but it isn't!"); return RDC_ST_BAD_PARAMETER; } - // 1024, 2048, and 256 are taken from "INTRODUCING AMD CDNA 3 ARCHITECTURE" white paper const std::string target_version = agents[agent_index].name; - // TODO: Design a lookup table for other GPUs const bool isMI200 = (target_version.find("gfx90a") != std::string::npos); - // FLOPS/clock/CU if (isMI200) { - data->dbl = divided_dbl / (1024.0 / static_cast(agents[agent_index].simd_per_cu)); + output->dbl = divided_dbl / (1024.0F / static_cast(agents[agent_index].simd_per_cu)); } else { // Assume mi300 - data->dbl = divided_dbl / (2048.0 / static_cast(agents[agent_index].simd_per_cu)); + output->dbl = divided_dbl / (2048.0F / static_cast(agents[agent_index].simd_per_cu)); } } break; + case RDC_FI_PROF_EVAL_FLOPS_32_PERCENT: case RDC_FI_PROF_EVAL_FLOPS_64_PERCENT: if (!is_eval_field) { RDC_LOG(RDC_ERROR, "Field expected to be in the eval_fields list but it isn't!"); return RDC_ST_BAD_PARAMETER; } - // FLOPS/clock/CU - data->dbl = divided_dbl / (256.0 / static_cast(agents[agent_index].simd_per_cu)); + output->dbl = divided_dbl / (256.0F / static_cast(agents[agent_index].simd_per_cu)); break; - case RDC_FI_PROF_KFD_ID: { - // do not care what it is mapped to. read value from agents + + case RDC_FI_PROF_KFD_ID: *type = INTEGER; - data->l_int = agents[agent_index].gpu_id; + output->l_int = agents[agent_index].gpu_id; break; - } + default: - // only support default fallback for doubles - assert(*type == DOUBLE); if (is_eval_field) { - data->dbl = divided_dbl; + output->dbl = divided_dbl; } else { - data->dbl = read_dbl; + output->dbl = raw_value; } break; } diff --git a/projects/rdc/rdc_libs/rdc_modules/rdc_rocp/RdcRocpCounterSampler.cc b/projects/rdc/rdc_libs/rdc_modules/rdc_rocp/RdcRocpCounterSampler.cc index 9597dbb87f..bc5c43d3ae 100644 --- a/projects/rdc/rdc_libs/rdc_modules/rdc_rocp/RdcRocpCounterSampler.cc +++ b/projects/rdc/rdc_libs/rdc_modules/rdc_rocp/RdcRocpCounterSampler.cc @@ -314,5 +314,157 @@ extern "C" rocprofiler_tool_configure_result_t* rocprofiler_configure(uint32_t v return &cfg; } +CounterSampler::ProfileSet CounterSampler::create_profiles_for_counters( + const std::vector& counters) { + ProfileSet profile_set; + auto roc_counters = get_supported_counters(agent_); + + // Build ordered list of counters + std::vector remaining_counters = counters; + + RDC_LOG(RDC_DEBUG, "Creating profiles for " << counters.size() << " counters on agent " + << agent_.handle); + + // Greedy packing: try to fit as many counters as possible into each profile + while (!remaining_counters.empty()) { + std::vector current_profile_counters; + std::vector failed_counters; + rocprofiler_counter_config_id_t last_valid_config = {}; + size_t last_valid_size = 0; + + // Try to add each remaining counter to the current profile + for (const auto& counter_name : remaining_counters) { + auto it = roc_counters.find(counter_name); + if (it == roc_counters.end()) { + RDC_LOG(RDC_DEBUG, "Counter " << counter_name << " not supported on agent " + << agent_.handle); + continue; + } + + current_profile_counters.push_back(counter_name); + + // Build the counter ID list + std::vector gpu_counters; + size_t expected_size = 0; + for (const auto& name : current_profile_counters) { + auto it2 = roc_counters.find(name); + if (it2 != roc_counters.end()) { + gpu_counters.push_back(it2->second); + expected_size += get_counter_size(it2->second); + } + } + + // Try to create config + rocprofiler_counter_config_id_t config = {}; + auto status = rocprofiler_create_counter_config(agent_, gpu_counters.data(), + gpu_counters.size(), &config); + + if (status == ROCPROFILER_STATUS_ERROR_EXCEEDS_HW_LIMIT) { + // Counter doesn't fit, try next one + current_profile_counters.pop_back(); + failed_counters.push_back(counter_name); + } else if (status == ROCPROFILER_STATUS_SUCCESS) { + // Success, save this config + last_valid_config = config; + last_valid_size = expected_size; + } else { + // Unexpected error + RDC_LOG(RDC_DEBUG, "Error creating counter config: " << status); + current_profile_counters.pop_back(); + failed_counters.push_back(counter_name); + } + } + + // Save the profile if valid + if (!current_profile_counters.empty() && last_valid_config.handle != 0) { + profile_set.profiles.push_back({last_valid_config, current_profile_counters, last_valid_size}); + + RDC_LOG(RDC_DEBUG, " Profile " << profile_set.profiles.size() + << ": " << current_profile_counters.size() << " counters"); + } + + // Continue with failed counters + remaining_counters = failed_counters; + + // Safety check to prevent infinite loop + if (current_profile_counters.empty() && !remaining_counters.empty()) { + RDC_LOG(RDC_ERROR, "Failed to create profile for remaining counters on agent " + << agent_.handle); + break; + } + } + + if (counters.size() == 0) { + RDC_LOG(RDC_DEBUG, "Created " << profile_set.profiles.size() + << " profiles from 0 counters (compression: N/A)"); + } else { + RDC_LOG(RDC_DEBUG, "Created " << profile_set.profiles.size() + << " profiles from " << counters.size() << " counters (compression: " + << (100.0 * profile_set.profiles.size() / counters.size()) << "%)"); + } + + return profile_set; +} + +void CounterSampler::sample_counters_with_packing(const std::vector& counters, + std::map& out_values, + uint64_t duration) { + // Sort counters for cache key + std::vector sorted_counters = counters; + std::sort(sorted_counters.begin(), sorted_counters.end()); + + // Check if we have a cached profile set + auto cached = cached_profile_sets_.find(sorted_counters); + if (cached == cached_profile_sets_.end()) { + // Create new profile set with greedy packing + RDC_LOG(RDC_DEBUG, "Creating new profile set for " << sorted_counters.size() + << " counters on agent " << agent_.handle); + ProfileSet profile_set = create_profiles_for_counters(sorted_counters); + cached = cached_profile_sets_.emplace(sorted_counters, std::move(profile_set)).first; + } + + // Clear output + out_values.clear(); + + // Statistics tracking (thread-safe) + static std::atomic total_sample_calls{0}; + static std::atomic total_profiles_sampled{0}; + + // Sample from all profiles in the set + for (const auto& profile : cached->second.profiles) { + std::vector records; + records.resize(profile.expected_size); + + counter_ = profile.config; + rocprofiler_start_context(ctx_); + size_t out_size = records.size(); + + // Wait for sampling window + usleep(duration); + + rocprofiler_sample_device_counting_service(ctx_, {}, ROCPROFILER_COUNTER_FLAG_NONE, + records.data(), &out_size); + total_sample_calls++; + rocprofiler_stop_context(ctx_); + records.resize(out_size); + + // Decode records and aggregate values + for (const auto& record : records) { + const std::string& name = decode_record_name(record); + out_values[name] += record.counter_value; + } + } + + total_profiles_sampled += cached->second.profiles.size(); + + // Log statistics periodically (every 100 sample calls) + if (total_sample_calls % 100 == 0) { + RDC_LOG(RDC_DEBUG, "Greedy packed sampling statistics: " + << total_sample_calls << " total sample calls, " + << total_profiles_sampled << " total profiles sampled, " + << "avg " << (double)total_profiles_sampled / total_sample_calls << " profiles/sample"); + } +} + } // namespace rdc } // namespace amd diff --git a/projects/rdc/rdc_libs/rdc_modules/rdc_rocp/RdcTelemetryLib.cc b/projects/rdc/rdc_libs/rdc_modules/rdc_rocp/RdcTelemetryLib.cc index cfd7904c58..fbf5fbfbd5 100644 --- a/projects/rdc/rdc_libs/rdc_modules/rdc_rocp/RdcTelemetryLib.cc +++ b/projects/rdc/rdc_libs/rdc_modules/rdc_rocp/RdcTelemetryLib.cc @@ -184,60 +184,88 @@ rdc_status_t rdc_telemetry_fields_value_get(rdc_gpu_field_t* fields, const uint3 return RDC_ST_FAIL_LOAD_MODULE; } - // Bulk fetch fields - std::vector bulk_results; - struct timeval tv{}; gettimeofday(&tv, nullptr); const uint64_t curTime = static_cast(tv.tv_sec) * 1000 + tv.tv_usec / 1000; - // Fetch it one by one for left fields + // Group fields by GPU index for bulk sampling + std::map> gpu_to_field_indices; + for (uint32_t i = 0; i < fields_count; i++) { + gpu_to_field_indices[fields[i].gpu_index].push_back(i); + } + + // Process each GPU group const int BULK_FIELDS_MAX = 16; rdc_gpu_field_value_t values[BULK_FIELDS_MAX]; uint32_t bulk_count = 0; - rdc_status_t status = RDC_ST_UNKNOWN_ERROR; - rdc_field_value_data data; - rdc_field_type_t type = DOUBLE; + rdc_status_t status = RDC_ST_OK; - for (uint32_t i = 0; i < fields_count; i++) { - if (bulk_count >= BULK_FIELDS_MAX) { - status = callback(values, bulk_count, user_data); - // When the callback returns errors, stop processing and return. - if (status != RDC_ST_OK) { - return status; + for (const auto& [gpu_index, field_indices] : gpu_to_field_indices) { + // Collect fields for this GPU + std::vector gpu_fields; + gpu_fields.reserve(field_indices.size()); + for (uint32_t idx : field_indices) { + gpu_fields.push_back(fields[idx]); + } + + // Bulk lookup for this GPU + // Note: rocp_lookup_bulk only handles rocprofiler-sdk metrics. + // Non-rocprofiler fields (e.g., RDC_FI_PROF_KFD_ID) are handled within + // the bulk lookup via special case logic in apply_field_transformation(). + // Fields without rocprofiler metric mappings will return RDC_ST_BAD_PARAMETER. + std::vector bulk_data; + std::vector bulk_types; + std::vector bulk_statuses; + + status = rocp_p->rocp_lookup_bulk(gpu_fields, bulk_data, bulk_types, bulk_statuses); + if (status != RDC_ST_OK) { + RDC_LOG(RDC_ERROR, "Error in bulk lookup for GPU " << gpu_index); + // Continue with next GPU even if this one failed + continue; + } + + // Distribute results to callback buffer + for (size_t j = 0; j < gpu_fields.size(); j++) { + if (bulk_count >= BULK_FIELDS_MAX) { + status = callback(values, bulk_count, user_data); + if (status != RDC_ST_OK) { + return status; + } + bulk_count = 0; } - bulk_count = 0; - } - status = rocp_p->rocp_lookup(fields[i], &data, &type); - // get value - values[bulk_count].gpu_index = fields[i].gpu_index; - values[bulk_count].field_value.status = status; - values[bulk_count].field_value.ts = curTime; - values[bulk_count].field_value.type = type; - values[bulk_count].field_value.field_id = fields[i].field_id; - switch (type) { - case DOUBLE: - values[bulk_count].field_value.value.dbl = data.dbl; - break; - case INTEGER: - values[bulk_count].field_value.value.l_int = data.l_int; - break; - case STRING: - case BLOB: - strncpy_with_null(values[bulk_count].field_value.value.str, data.str, RDC_MAX_STR_LENGTH); - break; - default: - break; + const uint32_t original_idx = field_indices[j]; + values[bulk_count].gpu_index = fields[original_idx].gpu_index; + values[bulk_count].field_value.status = bulk_statuses[j]; + values[bulk_count].field_value.ts = curTime; + values[bulk_count].field_value.type = bulk_types[j]; + values[bulk_count].field_value.field_id = fields[original_idx].field_id; + + switch (bulk_types[j]) { + case DOUBLE: + values[bulk_count].field_value.value.dbl = bulk_data[j].dbl; + break; + case INTEGER: + values[bulk_count].field_value.value.l_int = bulk_data[j].l_int; + break; + case STRING: + case BLOB: + strncpy_with_null(values[bulk_count].field_value.value.str, bulk_data[j].str, + RDC_MAX_STR_LENGTH); + break; + default: + break; + } + bulk_count++; } - bulk_count++; } + + // Flush remaining values if (bulk_count != 0) { status = callback(values, bulk_count, user_data); if (status != RDC_ST_OK) { return status; } - bulk_count = 0; } return status;